Flow Delivery Automation ใน Apache NiFi

Hello!

Flow Delivery Automation ใน Apache NiFi

งานมีดังนี้ - มีขั้นตอนที่แสดงในภาพด้านบน ซึ่งจำเป็นต้องขยายไปยังเซิร์ฟเวอร์ N ด้วย อาปาเช่ นิไฟ. การทดสอบโฟลว์ - ไฟล์กำลังถูกสร้างขึ้นและส่งไปยังอินสแตนซ์ NiFi อื่น การถ่ายโอนข้อมูลเกิดขึ้นโดยใช้โปรโตคอล NiFi Site to Site

NiFi Site to Site (S2S) เป็นวิธีการถ่ายโอนข้อมูลระหว่างอินสแตนซ์ NiFi ที่ปลอดภัยและปรับแต่งได้สูง ดูว่า S2S ทำงานอย่างไร เอกสาร และสิ่งสำคัญคือต้องอย่าลืมตั้งค่าอินสแตนซ์ NiFi ของคุณเพื่อให้ S2S เห็นได้ ที่นี่.

เมื่อพูดถึงการถ่ายโอนข้อมูลโดยใช้ S2S อินสแตนซ์หนึ่งเรียกว่าไคลเอนต์ ส่วนที่สองคือเซิร์ฟเวอร์ ลูกค้าส่งข้อมูลเซิร์ฟเวอร์ได้รับข้อมูล สองวิธีในการตั้งค่าการถ่ายโอนข้อมูลระหว่างกัน:

  1. ผลัก. ข้อมูลถูกส่งจากอินสแตนซ์ไคลเอนต์โดยใช้ Remote Process Group (RPG) บนอินสแตนซ์เซิร์ฟเวอร์ ข้อมูลจะได้รับโดยใช้พอร์ตอินพุต
  2. ดึง. เซิร์ฟเวอร์รับข้อมูลโดยใช้ RPG ไคลเอนต์ส่งโดยใช้พอร์ตเอาท์พุต


โฟลว์สำหรับการกลิ้งจะถูกจัดเก็บไว้ใน Apache Registry

Apache NiFi Registry เป็นโปรเจ็กต์ย่อยของ Apache NiFi ที่ให้บริการพื้นที่เก็บข้อมูลโฟลว์และเครื่องมือกำหนดเวอร์ชัน GIT ประเภทหนึ่ง ข้อมูลเกี่ยวกับการติดตั้ง การกำหนดค่า และการทำงานกับรีจิสทรีสามารถพบได้ใน เอกสารราชการ. ขั้นตอนการจัดเก็บจะรวมกันเป็นกลุ่มกระบวนการและจัดเก็บไว้ในรีจิสทรีในรูปแบบนี้ เราจะกลับมาที่บทความนี้อีกครั้ง

ในช่วงเริ่มต้น เมื่อ N เป็นจำนวนน้อย โฟลว์จะถูกส่งและอัปเดตด้วยตนเองในเวลาที่เหมาะสม

แต่เมื่อ N เติบโตขึ้น ก็มีปัญหามากขึ้น:

  1. ต้องใช้เวลามากขึ้นในการอัปเดตโฟลว์ คุณต้องไปที่เซิร์ฟเวอร์ทั้งหมด
  2. มีข้อผิดพลาดในการอัปเดตเทมเพลต ที่นี่พวกเขาอัปเดต แต่ที่นี่พวกเขาลืม
  3. ข้อผิดพลาดของมนุษย์เมื่อดำเนินการที่คล้ายกันจำนวนมาก

ทั้งหมดนี้นำเราไปสู่ความจริงที่ว่าจำเป็นต้องทำให้กระบวนการเป็นแบบอัตโนมัติ ฉันได้ลองวิธีต่อไปนี้เพื่อแก้ไขปัญหานี้:

  1. ใช้ MiNiFi แทน NiFi
  2. NiFi CLI
  3. NiPyAPI

การใช้งาน MiNiFi

ApacheMiNify เป็นโครงการย่อยของ Apache NiFi MiNiFy เป็นตัวแทนขนาดกะทัดรัดที่ใช้โปรเซสเซอร์เดียวกันกับ NiFi ซึ่งช่วยให้คุณสามารถสร้างโฟลว์แบบเดียวกับใน NiFi เหนือสิ่งอื่นใดบรรลุถึงความเบาของเอเจนต์ได้ เนื่องจาก MiNiFy ไม่มีอินเทอร์เฟซแบบกราฟิกสำหรับการกำหนดค่าโฟลว์ การขาดอินเทอร์เฟซแบบกราฟิกของ MiNiFy หมายความว่าจำเป็นต้องแก้ไขปัญหาการส่งโฟลว์ใน minifi เนื่องจาก MiNiFy มีการใช้งานอย่างแข็งขันใน IOT จึงมีองค์ประกอบมากมาย และกระบวนการส่งโฟลว์ไปยังอินสแตนซ์ minifi สุดท้ายต้องเป็นไปโดยอัตโนมัติ งานที่คุ้นเคยใช่ไหม?

โปรเจ็กต์ย่อยอื่น MiNiFi C2 Server จะช่วยแก้ไขปัญหานี้ได้ ผลิตภัณฑ์นี้มีวัตถุประสงค์เพื่อให้เป็นจุดศูนย์กลางในสถาปัตยกรรมการปรับใช้ วิธีกำหนดค่าสภาพแวดล้อม - อธิบายไว้ใน บทความนี้ เกี่ยวกับHabréและข้อมูลก็เพียงพอที่จะแก้ไขปัญหาได้ MiNiFi ร่วมกับเซิร์ฟเวอร์ C2 จะอัปเดตการกำหนดค่าโดยอัตโนมัติ ข้อเสียเปรียบเพียงอย่างเดียวของวิธีนี้คือคุณต้องสร้างเทมเพลตบนเซิร์ฟเวอร์ C2 การคอมมิตกับรีจิสทรีแบบธรรมดานั้นไม่เพียงพอ

ตัวเลือกที่อธิบายไว้ในบทความข้างต้นใช้งานได้และใช้งานได้ไม่ยาก แต่เราต้องไม่ลืมสิ่งต่อไปนี้:

  1. minifi ไม่มีโปรเซสเซอร์ทั้งหมดจาก nifi
  2. เวอร์ชัน CPU ใน Minifi ช้ากว่าเวอร์ชัน CPU ใน NiFi

ในขณะที่เขียน NiFi เวอร์ชันล่าสุดคือ 1.9.2 เวอร์ชันโปรเซสเซอร์ของ MiNiFi เวอร์ชันล่าสุดคือ 1.7.0 สามารถเพิ่มโปรเซสเซอร์ลงใน MiNiFi ได้ แต่เนื่องจากความแตกต่างของเวอร์ชันระหว่างโปรเซสเซอร์ NiFi และ MiNiFi สิ่งนี้อาจไม่ทำงาน

NiFi CLI

ตัดสินโดย คำอธิบาย เครื่องมือบนเว็บไซต์อย่างเป็นทางการ นี่คือเครื่องมือสำหรับการโต้ตอบระหว่าง NiFI และ NiFi Registry ในด้านการส่งมอบโฟลว์หรือการจัดการกระบวนการโดยอัตโนมัติ ดาวน์โหลดเครื่องมือนี้เพื่อเริ่มต้น ด้วยเหตุนี้.

เรียกใช้ยูทิลิตี้

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

เพื่อให้เราสามารถโหลดโฟลว์ที่จำเป็นจากรีจิสทรี เราจำเป็นต้องทราบตัวระบุของตะกร้า (ตัวระบุที่เก็บข้อมูล) ​​และโฟลว์เอง (ตัวระบุโฟลว์) ข้อมูลนี้สามารถรับได้ผ่านทาง cli หรือในเว็บอินเทอร์เฟซของรีจิสทรี NiFi เว็บอินเตอร์เฟสมีลักษณะดังนี้:

Flow Delivery Automation ใน Apache NiFi

เมื่อใช้ CLI คุณทำสิ่งนี้:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

เรียกใช้กลุ่มกระบวนการนำเข้าจากรีจิสทรี:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

จุดสำคัญคือสามารถระบุอินสแตนซ์ nifi ใดๆ ให้เป็นโฮสต์ที่เรารวบรวมกลุ่มกระบวนการได้

กลุ่มกระบวนการที่ถูกเพิ่มพร้อมกับตัวประมวลผลที่หยุดทำงาน จะต้องเริ่มต้นใหม่

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

เยี่ยมมาก โปรเซสเซอร์เริ่มทำงานแล้ว อย่างไรก็ตาม ตามเงื่อนไขของปัญหา เราจำเป็นต้องมีอินสแตนซ์ NiFi เพื่อส่งข้อมูลไปยังอินสแตนซ์อื่น สมมติว่าเลือกวิธี Push เพื่อถ่ายโอนข้อมูลไปยังเซิร์ฟเวอร์ เพื่อจัดระเบียบการถ่ายโอนข้อมูลจำเป็นต้องเปิดใช้งานการถ่ายโอนข้อมูล (เปิดใช้งานการส่งสัญญาณ) บน Remote Process Group (RPG) ที่เพิ่มเข้ามาซึ่งรวมอยู่ในโฟลว์ของเราแล้ว

Flow Delivery Automation ใน Apache NiFi

ในเอกสารประกอบของ CLI และแหล่งข้อมูลอื่นๆ ฉันไม่พบวิธีเปิดใช้งานการถ่ายโอนข้อมูล หากคุณรู้วิธีการทำเช่นนี้โปรดเขียนความคิดเห็น

ในเมื่อเราทุบตีและพร้อมที่จะไปให้สุดเราก็จะหาทางออก! คุณสามารถใช้ NiFi API เพื่อแก้ไขปัญหานี้ได้ ลองใช้วิธีต่อไปนี้ โดยนำ ID จากตัวอย่างด้านบน (ในกรณีของเราคือ 7f522a13-016e-1000-e504-d5b15587f2f3) คำอธิบายของวิธี NiFi API ที่นี่.

Flow Delivery Automation ใน Apache NiFi
ในส่วนเนื้อหา คุณจะต้องผ่าน JSON ในรูปแบบต่อไปนี้:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

พารามิเตอร์ที่ต้องกรอกเพื่อที่จะ "ทำงาน":
รัฐ — สถานะการถ่ายโอนข้อมูล มี TRANSMITTING เพื่อเปิดใช้งานการถ่ายโอนข้อมูล STOPPED เพื่อปิดใช้งาน
รุ่น - เวอร์ชันโปรเซสเซอร์

version จะใช้ค่าเริ่มต้นเป็น 0 เมื่อสร้างขึ้น แต่สามารถรับพารามิเตอร์เหล่านี้ได้โดยใช้วิธีการ

Flow Delivery Automation ใน Apache NiFi

สำหรับผู้ชื่นชอบสคริปต์ทุบตี วิธีนี้อาจดูเหมือนเหมาะสม แต่มันยากสำหรับฉัน - สคริปต์ทุบตีไม่ใช่สิ่งที่ฉันโปรดปราน วิธีต่อไปน่าสนใจและสะดวกกว่าในความคิดของฉัน

NiPyAPI

NiPyAPI เป็นไลบรารี Python สำหรับการโต้ตอบกับอินสแตนซ์ NiFi หน้าเอกสาร มีข้อมูลที่จำเป็นในการทำงานกับห้องสมุด การเริ่มต้นอย่างรวดเร็วมีอธิบายไว้ใน โครงการ บน GitHub

สคริปต์ของเราในการเปิดตัวการกำหนดค่าคือโปรแกรม Python มาดูการเขียนโค้ดกันดีกว่า
ตั้งค่าคอนฟิกเพื่อการทำงานต่อไป เราจะต้องมีพารามิเตอร์ต่อไปนี้:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

นอกจากนี้ ผมจะแทรกชื่อของวิธีการต่างๆ ของไลบรารีนี้ ซึ่งมีการอธิบายไว้ ที่นี่.

เราเชื่อมต่อรีจิสทรีกับอินสแตนซ์ nifi โดยใช้

nipyapi.versioning.create_registry_client

ในขั้นตอนนี้ คุณสามารถเพิ่มการตรวจสอบได้ว่ามีการเพิ่มรีจิสทรีลงในอินสแตนซ์แล้ว ซึ่งคุณสามารถใช้วิธีนี้ได้

nipyapi.versioning.list_registry_clients

เราพบที่เก็บข้อมูลเพื่อค้นหาโฟลว์ในตะกร้าเพิ่มเติม

nipyapi.versioning.get_registry_bucket

ตามถังที่พบ เรากำลังมองหาการไหล

nipyapi.versioning.get_flow_in_bucket

ถัดไป สิ่งสำคัญคือต้องเข้าใจว่ามีการเพิ่มกลุ่มกระบวนการนี้แล้วหรือไม่ กลุ่มกระบวนการถูกวางตามพิกัด และสถานการณ์อาจเกิดขึ้นเมื่อกลุ่มกระบวนการที่สองถูกวางทับบนกลุ่มกระบวนการหนึ่ง ฉันตรวจสอบแล้ว เป็นไปได้ 🙂 หากต้องการรับกลุ่มกระบวนการที่เพิ่มทั้งหมด ให้ใช้วิธีการ

nipyapi.canvas.list_all_process_groups

จากนั้นเราก็สามารถค้นหาตามชื่อได้

ฉันจะไม่อธิบายกระบวนการอัปเดตเทมเพลตฉันจะบอกว่าหากมีการเพิ่มโปรเซสเซอร์ในเทมเพลตเวอร์ชันใหม่ก็ไม่มีปัญหากับการมีข้อความอยู่ในคิว แต่ถ้าถอดโปรเซสเซอร์ออกก็อาจเกิดปัญหาได้ (nifi ไม่อนุญาตให้ถอดโปรเซสเซอร์ออกหากมีคิวข้อความสะสมอยู่ด้านหน้า) หากคุณสนใจว่าฉันแก้ไขปัญหานี้อย่างไร - โปรดเขียนถึงฉัน เราจะหารือในประเด็นนี้ ผู้ติดต่อในตอนท้ายของบทความ มาดูขั้นตอนการเพิ่มกลุ่มกระบวนการกันดีกว่า

เมื่อแก้ไขข้อบกพร่องของสคริปต์ ฉันพบคุณลักษณะที่โฟลว์เวอร์ชันล่าสุดไม่ได้ดึงขึ้นมาเสมอไป ดังนั้น ฉันขอแนะนำให้คุณชี้แจงเวอร์ชันนี้ก่อน:

nipyapi.versioning.get_latest_flow_ver

กลุ่มกระบวนการปรับใช้:

nipyapi.versioning.deploy_flow_version

เราเริ่มโปรเซสเซอร์:

nipyapi.canvas.schedule_process_group

ในบล็อกเกี่ยวกับ CLI มีเขียนว่าการถ่ายโอนข้อมูลไม่ได้เปิดใช้งานโดยอัตโนมัติในกลุ่มกระบวนการระยะไกล เมื่อนำสคริปต์ไปใช้ฉันก็ประสบปัญหานี้เช่นกัน ในเวลานั้น ฉันไม่สามารถเริ่มการถ่ายโอนข้อมูลโดยใช้ API ได้ และฉันตัดสินใจเขียนถึงผู้พัฒนาไลบรารี NiPyAPI และขอคำแนะนำ/ความช่วยเหลือ นักพัฒนาซอฟต์แวร์ตอบฉัน เราได้หารือเกี่ยวกับปัญหาแล้ว และเขาเขียนว่าเขาต้องการเวลาในการ "ตรวจสอบบางอย่าง" และสองสามวันต่อมาก็มีอีเมลมาถึงซึ่งมีการเขียนฟังก์ชัน Python ไว้เพื่อแก้ปัญหาการเริ่มต้นของฉัน !!! ในเวลานั้น เวอร์ชัน NiPyAPI คือ 0.13.3 และแน่นอนว่าไม่มีสิ่งใดในนั้นเลย แต่ในเวอร์ชัน 0.14.0 ซึ่งเปิดตัวเมื่อไม่นานมานี้ ฟังก์ชันนี้ได้รวมอยู่ในไลบรารีแล้ว พบปะ

nipyapi.canvas.set_remote_process_group_transmission

ดังนั้น ด้วยความช่วยเหลือของไลบรารี NiPyAPI เราจึงเชื่อมต่อรีจิสทรี รวบรวมโฟลว์ และแม้แต่เริ่มต้นโปรเซสเซอร์และการถ่ายโอนข้อมูล จากนั้นคุณสามารถรวมโค้ด เพิ่มการตรวจสอบทุกประเภท การบันทึก ก็แค่นั้นแหละ แต่นั่นเป็นเรื่องราวที่แตกต่างไปจากเดิมอย่างสิ้นเชิง

จากตัวเลือกระบบอัตโนมัติที่ฉันพิจารณา ตัวเลือกหลังดูเหมือนมีประสิทธิภาพมากที่สุดสำหรับฉัน ประการแรก นี่ยังคงเป็นโค้ด Python ซึ่งคุณสามารถฝังโค้ดโปรแกรมเสริมและใช้ประโยชน์จากภาษาการเขียนโปรแกรมได้อย่างเต็มที่ ประการที่สอง โครงการ NiPyAPI กำลังพัฒนาอย่างแข็งขัน และในกรณีที่เกิดปัญหา คุณสามารถเขียนถึงนักพัฒนาได้ ประการที่สาม NiPyAPI ยังคงเป็นเครื่องมือที่มีความยืดหยุ่นมากขึ้นสำหรับการโต้ตอบกับ NiFi ในการแก้ปัญหาที่ซับซ้อน ตัวอย่างเช่น ในการพิจารณาว่าขณะนี้คิวข้อความว่างเปล่าในโฟลว์หรือไม่ และจะสามารถอัพเดตกลุ่มกระบวนการได้หรือไม่

นั่นคือทั้งหมดที่ ฉันอธิบาย 3 วิธีในการทำให้การส่งโฟลว์เป็นอัตโนมัติใน NiFi ซึ่งเป็นข้อผิดพลาดที่นักพัฒนาอาจพบและจัดเตรียมโค้ดที่ใช้งานได้สำหรับการส่งแบบอัตโนมัติ หากคุณสนใจหัวข้อนี้เหมือนกับฉัน - เขียน!

ที่มา: will.com

เพิ่มความคิดเห็น