Hello!

งานมีดังนี้ - มีขั้นตอนที่แสดงในภาพด้านบน ซึ่งจำเป็นต้องขยายไปยังเซิร์ฟเวอร์ N ด้วย . การทดสอบโฟลว์ - ไฟล์กำลังถูกสร้างขึ้นและส่งไปยังอินสแตนซ์ NiFi อื่น การถ่ายโอนข้อมูลเกิดขึ้นโดยใช้โปรโตคอล NiFi Site to Site
NiFi Site to Site (S2S) เป็นวิธีการถ่ายโอนข้อมูลระหว่างอินสแตนซ์ NiFi ที่ปลอดภัยและปรับแต่งได้สูง ดูว่า S2S ทำงานอย่างไร และสิ่งสำคัญคือต้องอย่าลืมตั้งค่าอินสแตนซ์ NiFi ของคุณเพื่อให้ S2S เห็นได้ .
เมื่อพูดถึงการถ่ายโอนข้อมูลโดยใช้ S2S อินสแตนซ์หนึ่งเรียกว่าไคลเอนต์ ส่วนที่สองคือเซิร์ฟเวอร์ ลูกค้าส่งข้อมูลเซิร์ฟเวอร์ได้รับข้อมูล สองวิธีในการตั้งค่าการถ่ายโอนข้อมูลระหว่างกัน:
- ผลัก. ข้อมูลถูกส่งจากอินสแตนซ์ไคลเอนต์โดยใช้ Remote Process Group (RPG) บนอินสแตนซ์เซิร์ฟเวอร์ ข้อมูลจะได้รับโดยใช้พอร์ตอินพุต
- ดึง. เซิร์ฟเวอร์รับข้อมูลโดยใช้ RPG ไคลเอนต์ส่งโดยใช้พอร์ตเอาท์พุต
โฟลว์สำหรับการกลิ้งจะถูกจัดเก็บไว้ใน Apache Registry
Apache NiFi Registry เป็นโปรเจ็กต์ย่อยของ Apache NiFi ที่ให้บริการพื้นที่เก็บข้อมูลโฟลว์และเครื่องมือกำหนดเวอร์ชัน GIT ประเภทหนึ่ง ข้อมูลเกี่ยวกับการติดตั้ง การกำหนดค่า และการทำงานกับรีจิสทรีสามารถพบได้ใน . ขั้นตอนการจัดเก็บจะรวมกันเป็นกลุ่มกระบวนการและจัดเก็บไว้ในรีจิสทรีในรูปแบบนี้ เราจะกลับมาที่บทความนี้อีกครั้ง
ในช่วงเริ่มต้น เมื่อ N เป็นจำนวนน้อย โฟลว์จะถูกส่งและอัปเดตด้วยตนเองในเวลาที่เหมาะสม
แต่เมื่อ N เติบโตขึ้น ก็มีปัญหามากขึ้น:
- ต้องใช้เวลามากขึ้นในการอัปเดตโฟลว์ คุณต้องไปที่เซิร์ฟเวอร์ทั้งหมด
- มีข้อผิดพลาดในการอัปเดตเทมเพลต ที่นี่พวกเขาอัปเดต แต่ที่นี่พวกเขาลืม
- ข้อผิดพลาดของมนุษย์เมื่อดำเนินการที่คล้ายกันจำนวนมาก
ทั้งหมดนี้นำเราไปสู่ความจริงที่ว่าจำเป็นต้องทำให้กระบวนการเป็นแบบอัตโนมัติ ฉันได้ลองวิธีต่อไปนี้เพื่อแก้ไขปัญหานี้:
- ใช้ MiNiFi แทน NiFi
- NiFi CLI
- NiPyAPI
การใช้งาน MiNiFi
เป็นโครงการย่อยของ Apache NiFi MiNiFy เป็นตัวแทนขนาดกะทัดรัดที่ใช้โปรเซสเซอร์เดียวกันกับ NiFi ซึ่งช่วยให้คุณสามารถสร้างโฟลว์แบบเดียวกับใน NiFi เหนือสิ่งอื่นใดบรรลุถึงความเบาของเอเจนต์ได้ เนื่องจาก MiNiFy ไม่มีอินเทอร์เฟซแบบกราฟิกสำหรับการกำหนดค่าโฟลว์ การขาดอินเทอร์เฟซแบบกราฟิกของ MiNiFy หมายความว่าจำเป็นต้องแก้ไขปัญหาการส่งโฟลว์ใน minifi เนื่องจาก MiNiFy มีการใช้งานอย่างแข็งขันใน IOT จึงมีองค์ประกอบมากมาย และกระบวนการส่งโฟลว์ไปยังอินสแตนซ์ minifi สุดท้ายต้องเป็นไปโดยอัตโนมัติ งานที่คุ้นเคยใช่ไหม?
โปรเจ็กต์ย่อยอื่น MiNiFi C2 Server จะช่วยแก้ไขปัญหานี้ได้ ผลิตภัณฑ์นี้มีวัตถุประสงค์เพื่อให้เป็นจุดศูนย์กลางในสถาปัตยกรรมการปรับใช้ วิธีกำหนดค่าสภาพแวดล้อม - อธิบายไว้ใน เกี่ยวกับHabréและข้อมูลก็เพียงพอที่จะแก้ไขปัญหาได้ MiNiFi ร่วมกับเซิร์ฟเวอร์ C2 จะอัปเดตการกำหนดค่าโดยอัตโนมัติ ข้อเสียเปรียบเพียงอย่างเดียวของวิธีนี้คือคุณต้องสร้างเทมเพลตบนเซิร์ฟเวอร์ C2 การคอมมิตกับรีจิสทรีแบบธรรมดานั้นไม่เพียงพอ
ตัวเลือกที่อธิบายไว้ในบทความข้างต้นใช้งานได้และใช้งานได้ไม่ยาก แต่เราต้องไม่ลืมสิ่งต่อไปนี้:
- minifi ไม่มีโปรเซสเซอร์ทั้งหมดจาก nifi
- เวอร์ชัน CPU ใน Minifi ช้ากว่าเวอร์ชัน CPU ใน NiFi
ในขณะที่เขียน NiFi เวอร์ชันล่าสุดคือ 1.9.2 เวอร์ชันโปรเซสเซอร์ของ MiNiFi เวอร์ชันล่าสุดคือ 1.7.0 สามารถเพิ่มโปรเซสเซอร์ลงใน MiNiFi ได้ แต่เนื่องจากความแตกต่างของเวอร์ชันระหว่างโปรเซสเซอร์ NiFi และ MiNiFi สิ่งนี้อาจไม่ทำงาน
NiFi CLI
ตัดสินโดย เครื่องมือบนเว็บไซต์อย่างเป็นทางการ นี่คือเครื่องมือสำหรับการโต้ตอบระหว่าง NiFI และ NiFi Registry ในด้านการส่งมอบโฟลว์หรือการจัดการกระบวนการโดยอัตโนมัติ ดาวน์โหลดเครื่องมือนี้เพื่อเริ่มต้น .
เรียกใช้ยูทิลิตี้
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
เพื่อให้เราสามารถโหลดโฟลว์ที่จำเป็นจากรีจิสทรี เราจำเป็นต้องทราบตัวระบุของตะกร้า (ตัวระบุที่เก็บข้อมูล) และโฟลว์เอง (ตัวระบุโฟลว์) ข้อมูลนี้สามารถรับได้ผ่านทาง cli หรือในเว็บอินเทอร์เฟซของรีจิสทรี NiFi เว็บอินเตอร์เฟสมีลักษณะดังนี้:

เมื่อใช้ CLI คุณทำสิ่งนี้:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
เรียกใช้กลุ่มกระบวนการนำเข้าจากรีจิสทรี:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
จุดสำคัญคือสามารถระบุอินสแตนซ์ nifi ใดๆ ให้เป็นโฮสต์ที่เรารวบรวมกลุ่มกระบวนการได้
กลุ่มกระบวนการที่ถูกเพิ่มพร้อมกับตัวประมวลผลที่หยุดทำงาน จะต้องเริ่มต้นใหม่
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
เยี่ยมมาก โปรเซสเซอร์เริ่มทำงานแล้ว อย่างไรก็ตาม ตามเงื่อนไขของปัญหา เราจำเป็นต้องมีอินสแตนซ์ NiFi เพื่อส่งข้อมูลไปยังอินสแตนซ์อื่น สมมติว่าเลือกวิธี Push เพื่อถ่ายโอนข้อมูลไปยังเซิร์ฟเวอร์ เพื่อจัดระเบียบการถ่ายโอนข้อมูลจำเป็นต้องเปิดใช้งานการถ่ายโอนข้อมูล (เปิดใช้งานการส่งสัญญาณ) บน Remote Process Group (RPG) ที่เพิ่มเข้ามาซึ่งรวมอยู่ในโฟลว์ของเราแล้ว

ในเอกสารประกอบของ CLI และแหล่งข้อมูลอื่นๆ ฉันไม่พบวิธีเปิดใช้งานการถ่ายโอนข้อมูล หากคุณรู้วิธีการทำเช่นนี้โปรดเขียนความคิดเห็น
ในเมื่อเราทุบตีและพร้อมที่จะไปให้สุดเราก็จะหาทางออก! คุณสามารถใช้ NiFi API เพื่อแก้ไขปัญหานี้ได้ ลองใช้วิธีต่อไปนี้ โดยนำ ID จากตัวอย่างด้านบน (ในกรณีของเราคือ 7f522a13-016e-1000-e504-d5b15587f2f3) คำอธิบายของวิธี NiFi API .

ในส่วนเนื้อหา คุณจะต้องผ่าน JSON ในรูปแบบต่อไปนี้:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
พารามิเตอร์ที่ต้องกรอกเพื่อที่จะ "ทำงาน":
รัฐ — สถานะการถ่ายโอนข้อมูล มี TRANSMITTING เพื่อเปิดใช้งานการถ่ายโอนข้อมูล STOPPED เพื่อปิดใช้งาน
รุ่น - เวอร์ชันโปรเซสเซอร์
version จะใช้ค่าเริ่มต้นเป็น 0 เมื่อสร้างขึ้น แต่สามารถรับพารามิเตอร์เหล่านี้ได้โดยใช้วิธีการ

สำหรับผู้ชื่นชอบสคริปต์ทุบตี วิธีนี้อาจดูเหมือนเหมาะสม แต่มันยากสำหรับฉัน - สคริปต์ทุบตีไม่ใช่สิ่งที่ฉันโปรดปราน วิธีต่อไปน่าสนใจและสะดวกกว่าในความคิดของฉัน
NiPyAPI
NiPyAPI เป็นไลบรารี Python สำหรับการโต้ตอบกับอินสแตนซ์ NiFi มีข้อมูลที่จำเป็นในการทำงานกับห้องสมุด การเริ่มต้นอย่างรวดเร็วมีอธิบายไว้ใน บน GitHub
สคริปต์ของเราในการเปิดตัวการกำหนดค่าคือโปรแกรม Python มาดูการเขียนโค้ดกันดีกว่า
ตั้งค่าคอนฟิกเพื่อการทำงานต่อไป เราจะต้องมีพารามิเตอร์ต่อไปนี้:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
นอกจากนี้ ผมจะแทรกชื่อของวิธีการต่างๆ ของไลบรารีนี้ ซึ่งมีการอธิบายไว้ .
เราเชื่อมต่อรีจิสทรีกับอินสแตนซ์ nifi โดยใช้
nipyapi.versioning.create_registry_clientในขั้นตอนนี้ คุณสามารถเพิ่มการตรวจสอบได้ว่ามีการเพิ่มรีจิสทรีลงในอินสแตนซ์แล้ว ซึ่งคุณสามารถใช้วิธีนี้ได้
nipyapi.versioning.list_registry_clientsเราพบที่เก็บข้อมูลเพื่อค้นหาโฟลว์ในตะกร้าเพิ่มเติม
nipyapi.versioning.get_registry_bucketตามถังที่พบ เรากำลังมองหาการไหล
nipyapi.versioning.get_flow_in_bucketถัดไป สิ่งสำคัญคือต้องเข้าใจว่ามีการเพิ่มกลุ่มกระบวนการนี้แล้วหรือไม่ กลุ่มกระบวนการถูกวางตามพิกัด และสถานการณ์อาจเกิดขึ้นเมื่อกลุ่มกระบวนการที่สองถูกวางทับบนกลุ่มกระบวนการหนึ่ง ฉันตรวจสอบแล้ว เป็นไปได้ 🙂 หากต้องการรับกลุ่มกระบวนการที่เพิ่มทั้งหมด ให้ใช้วิธีการ
nipyapi.canvas.list_all_process_groupsจากนั้นเราก็สามารถค้นหาตามชื่อได้
ฉันจะไม่อธิบายกระบวนการอัปเดตเทมเพลตฉันจะบอกว่าหากมีการเพิ่มโปรเซสเซอร์ในเทมเพลตเวอร์ชันใหม่ก็ไม่มีปัญหากับการมีข้อความอยู่ในคิว แต่ถ้าถอดโปรเซสเซอร์ออกก็อาจเกิดปัญหาได้ (nifi ไม่อนุญาตให้ถอดโปรเซสเซอร์ออกหากมีคิวข้อความสะสมอยู่ด้านหน้า) หากคุณสนใจว่าฉันแก้ไขปัญหานี้อย่างไร - โปรดเขียนถึงฉัน เราจะหารือในประเด็นนี้ ผู้ติดต่อในตอนท้ายของบทความ มาดูขั้นตอนการเพิ่มกลุ่มกระบวนการกันดีกว่า
เมื่อแก้ไขข้อบกพร่องของสคริปต์ ฉันพบคุณลักษณะที่โฟลว์เวอร์ชันล่าสุดไม่ได้ดึงขึ้นมาเสมอไป ดังนั้น ฉันขอแนะนำให้คุณชี้แจงเวอร์ชันนี้ก่อน:
nipyapi.versioning.get_latest_flow_verกลุ่มกระบวนการปรับใช้:
nipyapi.versioning.deploy_flow_versionเราเริ่มโปรเซสเซอร์:
nipyapi.canvas.schedule_process_groupในบล็อกเกี่ยวกับ CLI มีเขียนว่าการถ่ายโอนข้อมูลไม่ได้เปิดใช้งานโดยอัตโนมัติในกลุ่มกระบวนการระยะไกล เมื่อนำสคริปต์ไปใช้ฉันก็ประสบปัญหานี้เช่นกัน ในเวลานั้น ฉันไม่สามารถเริ่มการถ่ายโอนข้อมูลโดยใช้ API ได้ และฉันตัดสินใจเขียนถึงผู้พัฒนาไลบรารี NiPyAPI และขอคำแนะนำ/ความช่วยเหลือ นักพัฒนาซอฟต์แวร์ตอบฉัน เราได้หารือเกี่ยวกับปัญหาแล้ว และเขาเขียนว่าเขาต้องการเวลาในการ "ตรวจสอบบางอย่าง" และสองสามวันต่อมาก็มีอีเมลมาถึงซึ่งมีการเขียนฟังก์ชัน Python ไว้เพื่อแก้ปัญหาการเริ่มต้นของฉัน !!! ในเวลานั้น เวอร์ชัน NiPyAPI คือ 0.13.3 และแน่นอนว่าไม่มีสิ่งใดในนั้นเลย แต่ในเวอร์ชัน 0.14.0 ซึ่งเปิดตัวเมื่อไม่นานมานี้ ฟังก์ชันนี้ได้รวมอยู่ในไลบรารีแล้ว พบปะ
nipyapi.canvas.set_remote_process_group_transmissionดังนั้น ด้วยความช่วยเหลือของไลบรารี NiPyAPI เราจึงเชื่อมต่อรีจิสทรี รวบรวมโฟลว์ และแม้แต่เริ่มต้นโปรเซสเซอร์และการถ่ายโอนข้อมูล จากนั้นคุณสามารถรวมโค้ด เพิ่มการตรวจสอบทุกประเภท การบันทึก ก็แค่นั้นแหละ แต่นั่นเป็นเรื่องราวที่แตกต่างไปจากเดิมอย่างสิ้นเชิง
จากตัวเลือกระบบอัตโนมัติที่ฉันพิจารณา ตัวเลือกหลังดูเหมือนมีประสิทธิภาพมากที่สุดสำหรับฉัน ประการแรก นี่ยังคงเป็นโค้ด Python ซึ่งคุณสามารถฝังโค้ดโปรแกรมเสริมและใช้ประโยชน์จากภาษาการเขียนโปรแกรมได้อย่างเต็มที่ ประการที่สอง โครงการ NiPyAPI กำลังพัฒนาอย่างแข็งขัน และในกรณีที่เกิดปัญหา คุณสามารถเขียนถึงนักพัฒนาได้ ประการที่สาม NiPyAPI ยังคงเป็นเครื่องมือที่มีความยืดหยุ่นมากขึ้นสำหรับการโต้ตอบกับ NiFi ในการแก้ปัญหาที่ซับซ้อน ตัวอย่างเช่น ในการพิจารณาว่าขณะนี้คิวข้อความว่างเปล่าในโฟลว์หรือไม่ และจะสามารถอัพเดตกลุ่มกระบวนการได้หรือไม่
นั่นคือทั้งหมดที่ ฉันอธิบาย 3 วิธีในการทำให้การส่งโฟลว์เป็นอัตโนมัติใน NiFi ซึ่งเป็นข้อผิดพลาดที่นักพัฒนาอาจพบและจัดเตรียมโค้ดที่ใช้งานได้สำหรับการส่งแบบอัตโนมัติ หากคุณสนใจหัวข้อนี้เหมือนกับฉัน -
ที่มา: will.com
