Hello!
งานมีดังนี้ - มีขั้นตอนที่แสดงในภาพด้านบน ซึ่งจำเป็นต้องขยายไปยังเซิร์ฟเวอร์ N ด้วย
NiFi Site to Site (S2S) เป็นวิธีการถ่ายโอนข้อมูลระหว่างอินสแตนซ์ NiFi ที่ปลอดภัยและปรับแต่งได้สูง ดูว่า S2S ทำงานอย่างไร
เมื่อพูดถึงการถ่ายโอนข้อมูลโดยใช้ S2S อินสแตนซ์หนึ่งเรียกว่าไคลเอนต์ ส่วนที่สองคือเซิร์ฟเวอร์ ลูกค้าส่งข้อมูลเซิร์ฟเวอร์ได้รับข้อมูล สองวิธีในการตั้งค่าการถ่ายโอนข้อมูลระหว่างกัน:
- ผลัก. ข้อมูลถูกส่งจากอินสแตนซ์ไคลเอนต์โดยใช้ Remote Process Group (RPG) บนอินสแตนซ์เซิร์ฟเวอร์ ข้อมูลจะได้รับโดยใช้พอร์ตอินพุต
- ดึง. เซิร์ฟเวอร์รับข้อมูลโดยใช้ RPG ไคลเอนต์ส่งโดยใช้พอร์ตเอาท์พุต
โฟลว์สำหรับการกลิ้งจะถูกจัดเก็บไว้ใน Apache Registry
Apache NiFi Registry เป็นโปรเจ็กต์ย่อยของ Apache NiFi ที่ให้บริการพื้นที่เก็บข้อมูลโฟลว์และเครื่องมือกำหนดเวอร์ชัน GIT ประเภทหนึ่ง ข้อมูลเกี่ยวกับการติดตั้ง การกำหนดค่า และการทำงานกับรีจิสทรีสามารถพบได้ใน
ในช่วงเริ่มต้น เมื่อ N เป็นจำนวนน้อย โฟลว์จะถูกส่งและอัปเดตด้วยตนเองในเวลาที่เหมาะสม
แต่เมื่อ N เติบโตขึ้น ก็มีปัญหามากขึ้น:
- ต้องใช้เวลามากขึ้นในการอัปเดตโฟลว์ คุณต้องไปที่เซิร์ฟเวอร์ทั้งหมด
- มีข้อผิดพลาดในการอัปเดตเทมเพลต ที่นี่พวกเขาอัปเดต แต่ที่นี่พวกเขาลืม
- ข้อผิดพลาดของมนุษย์เมื่อดำเนินการที่คล้ายกันจำนวนมาก
ทั้งหมดนี้นำเราไปสู่ความจริงที่ว่าจำเป็นต้องทำให้กระบวนการเป็นแบบอัตโนมัติ ฉันได้ลองวิธีต่อไปนี้เพื่อแก้ไขปัญหานี้:
- ใช้ MiNiFi แทน NiFi
- NiFi CLI
- NiPyAPI
การใช้งาน MiNiFi
โปรเจ็กต์ย่อยอื่น MiNiFi C2 Server จะช่วยแก้ไขปัญหานี้ได้ ผลิตภัณฑ์นี้มีวัตถุประสงค์เพื่อให้เป็นจุดศูนย์กลางในสถาปัตยกรรมการปรับใช้ วิธีกำหนดค่าสภาพแวดล้อม - อธิบายไว้ใน
ตัวเลือกที่อธิบายไว้ในบทความข้างต้นใช้งานได้และใช้งานได้ไม่ยาก แต่เราต้องไม่ลืมสิ่งต่อไปนี้:
- minifi ไม่มีโปรเซสเซอร์ทั้งหมดจาก nifi
- เวอร์ชัน CPU ใน Minifi ช้ากว่าเวอร์ชัน CPU ใน NiFi
ในขณะที่เขียน NiFi เวอร์ชันล่าสุดคือ 1.9.2 เวอร์ชันโปรเซสเซอร์ของ MiNiFi เวอร์ชันล่าสุดคือ 1.7.0 สามารถเพิ่มโปรเซสเซอร์ลงใน MiNiFi ได้ แต่เนื่องจากความแตกต่างของเวอร์ชันระหว่างโปรเซสเซอร์ NiFi และ MiNiFi สิ่งนี้อาจไม่ทำงาน
NiFi CLI
ตัดสินโดย
เรียกใช้ยูทิลิตี้
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
เพื่อให้เราสามารถโหลดโฟลว์ที่จำเป็นจากรีจิสทรี เราจำเป็นต้องทราบตัวระบุของตะกร้า (ตัวระบุที่เก็บข้อมูล) และโฟลว์เอง (ตัวระบุโฟลว์) ข้อมูลนี้สามารถรับได้ผ่านทาง cli หรือในเว็บอินเทอร์เฟซของรีจิสทรี NiFi เว็บอินเตอร์เฟสมีลักษณะดังนี้:
เมื่อใช้ CLI คุณทำสิ่งนี้:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
เรียกใช้กลุ่มกระบวนการนำเข้าจากรีจิสทรี:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
จุดสำคัญคือสามารถระบุอินสแตนซ์ nifi ใดๆ ให้เป็นโฮสต์ที่เรารวบรวมกลุ่มกระบวนการได้
กลุ่มกระบวนการที่ถูกเพิ่มพร้อมกับตัวประมวลผลที่หยุดทำงาน จะต้องเริ่มต้นใหม่
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
เยี่ยมมาก โปรเซสเซอร์เริ่มทำงานแล้ว อย่างไรก็ตาม ตามเงื่อนไขของปัญหา เราจำเป็นต้องมีอินสแตนซ์ NiFi เพื่อส่งข้อมูลไปยังอินสแตนซ์อื่น สมมติว่าเลือกวิธี Push เพื่อถ่ายโอนข้อมูลไปยังเซิร์ฟเวอร์ เพื่อจัดระเบียบการถ่ายโอนข้อมูลจำเป็นต้องเปิดใช้งานการถ่ายโอนข้อมูล (เปิดใช้งานการส่งสัญญาณ) บน Remote Process Group (RPG) ที่เพิ่มเข้ามาซึ่งรวมอยู่ในโฟลว์ของเราแล้ว
ในเอกสารประกอบของ CLI และแหล่งข้อมูลอื่นๆ ฉันไม่พบวิธีเปิดใช้งานการถ่ายโอนข้อมูล หากคุณรู้วิธีการทำเช่นนี้โปรดเขียนความคิดเห็น
ในเมื่อเราทุบตีและพร้อมที่จะไปให้สุดเราก็จะหาทางออก! คุณสามารถใช้ NiFi API เพื่อแก้ไขปัญหานี้ได้ ลองใช้วิธีต่อไปนี้ โดยนำ ID จากตัวอย่างด้านบน (ในกรณีของเราคือ 7f522a13-016e-1000-e504-d5b15587f2f3) คำอธิบายของวิธี NiFi API
ในส่วนเนื้อหา คุณจะต้องผ่าน JSON ในรูปแบบต่อไปนี้:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
พารามิเตอร์ที่ต้องกรอกเพื่อที่จะ "ทำงาน":
รัฐ — สถานะการถ่ายโอนข้อมูล มี TRANSMITTING เพื่อเปิดใช้งานการถ่ายโอนข้อมูล STOPPED เพื่อปิดใช้งาน
รุ่น - เวอร์ชันโปรเซสเซอร์
version จะใช้ค่าเริ่มต้นเป็น 0 เมื่อสร้างขึ้น แต่สามารถรับพารามิเตอร์เหล่านี้ได้โดยใช้วิธีการ
สำหรับผู้ชื่นชอบสคริปต์ทุบตี วิธีนี้อาจดูเหมือนเหมาะสม แต่มันยากสำหรับฉัน - สคริปต์ทุบตีไม่ใช่สิ่งที่ฉันโปรดปราน วิธีต่อไปน่าสนใจและสะดวกกว่าในความคิดของฉัน
NiPyAPI
NiPyAPI เป็นไลบรารี Python สำหรับการโต้ตอบกับอินสแตนซ์ NiFi
สคริปต์ของเราในการเปิดตัวการกำหนดค่าคือโปรแกรม Python มาดูการเขียนโค้ดกันดีกว่า
ตั้งค่าคอนฟิกเพื่อการทำงานต่อไป เราจะต้องมีพารามิเตอร์ต่อไปนี้:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
นอกจากนี้ ผมจะแทรกชื่อของวิธีการต่างๆ ของไลบรารีนี้ ซึ่งมีการอธิบายไว้
เราเชื่อมต่อรีจิสทรีกับอินสแตนซ์ nifi โดยใช้
nipyapi.versioning.create_registry_client
ในขั้นตอนนี้ คุณสามารถเพิ่มการตรวจสอบได้ว่ามีการเพิ่มรีจิสทรีลงในอินสแตนซ์แล้ว ซึ่งคุณสามารถใช้วิธีนี้ได้
nipyapi.versioning.list_registry_clients
เราพบที่เก็บข้อมูลเพื่อค้นหาโฟลว์ในตะกร้าเพิ่มเติม
nipyapi.versioning.get_registry_bucket
ตามถังที่พบ เรากำลังมองหาการไหล
nipyapi.versioning.get_flow_in_bucket
ถัดไป สิ่งสำคัญคือต้องเข้าใจว่ามีการเพิ่มกลุ่มกระบวนการนี้แล้วหรือไม่ กลุ่มกระบวนการถูกวางตามพิกัด และสถานการณ์อาจเกิดขึ้นเมื่อกลุ่มกระบวนการที่สองถูกวางทับบนกลุ่มกระบวนการหนึ่ง ฉันตรวจสอบแล้ว เป็นไปได้ 🙂 หากต้องการรับกลุ่มกระบวนการที่เพิ่มทั้งหมด ให้ใช้วิธีการ
nipyapi.canvas.list_all_process_groups
จากนั้นเราก็สามารถค้นหาตามชื่อได้
ฉันจะไม่อธิบายกระบวนการอัปเดตเทมเพลตฉันจะบอกว่าหากมีการเพิ่มโปรเซสเซอร์ในเทมเพลตเวอร์ชันใหม่ก็ไม่มีปัญหากับการมีข้อความอยู่ในคิว แต่ถ้าถอดโปรเซสเซอร์ออกก็อาจเกิดปัญหาได้ (nifi ไม่อนุญาตให้ถอดโปรเซสเซอร์ออกหากมีคิวข้อความสะสมอยู่ด้านหน้า) หากคุณสนใจว่าฉันแก้ไขปัญหานี้อย่างไร - โปรดเขียนถึงฉัน เราจะหารือในประเด็นนี้ ผู้ติดต่อในตอนท้ายของบทความ มาดูขั้นตอนการเพิ่มกลุ่มกระบวนการกันดีกว่า
เมื่อแก้ไขข้อบกพร่องของสคริปต์ ฉันพบคุณลักษณะที่โฟลว์เวอร์ชันล่าสุดไม่ได้ดึงขึ้นมาเสมอไป ดังนั้น ฉันขอแนะนำให้คุณชี้แจงเวอร์ชันนี้ก่อน:
nipyapi.versioning.get_latest_flow_ver
กลุ่มกระบวนการปรับใช้:
nipyapi.versioning.deploy_flow_version
เราเริ่มโปรเซสเซอร์:
nipyapi.canvas.schedule_process_group
ในบล็อกเกี่ยวกับ CLI มีเขียนว่าการถ่ายโอนข้อมูลไม่ได้เปิดใช้งานโดยอัตโนมัติในกลุ่มกระบวนการระยะไกล เมื่อนำสคริปต์ไปใช้ฉันก็ประสบปัญหานี้เช่นกัน ในเวลานั้น ฉันไม่สามารถเริ่มการถ่ายโอนข้อมูลโดยใช้ API ได้ และฉันตัดสินใจเขียนถึงผู้พัฒนาไลบรารี NiPyAPI และขอคำแนะนำ/ความช่วยเหลือ นักพัฒนาซอฟต์แวร์ตอบฉัน เราได้หารือเกี่ยวกับปัญหาแล้ว และเขาเขียนว่าเขาต้องการเวลาในการ "ตรวจสอบบางอย่าง" และสองสามวันต่อมาก็มีอีเมลมาถึงซึ่งมีการเขียนฟังก์ชัน Python ไว้เพื่อแก้ปัญหาการเริ่มต้นของฉัน !!! ในเวลานั้น เวอร์ชัน NiPyAPI คือ 0.13.3 และแน่นอนว่าไม่มีสิ่งใดในนั้นเลย แต่ในเวอร์ชัน 0.14.0 ซึ่งเปิดตัวเมื่อไม่นานมานี้ ฟังก์ชันนี้ได้รวมอยู่ในไลบรารีแล้ว พบปะ
nipyapi.canvas.set_remote_process_group_transmission
ดังนั้น ด้วยความช่วยเหลือของไลบรารี NiPyAPI เราจึงเชื่อมต่อรีจิสทรี รวบรวมโฟลว์ และแม้แต่เริ่มต้นโปรเซสเซอร์และการถ่ายโอนข้อมูล จากนั้นคุณสามารถรวมโค้ด เพิ่มการตรวจสอบทุกประเภท การบันทึก ก็แค่นั้นแหละ แต่นั่นเป็นเรื่องราวที่แตกต่างไปจากเดิมอย่างสิ้นเชิง
จากตัวเลือกระบบอัตโนมัติที่ฉันพิจารณา ตัวเลือกหลังดูเหมือนมีประสิทธิภาพมากที่สุดสำหรับฉัน ประการแรก นี่ยังคงเป็นโค้ด Python ซึ่งคุณสามารถฝังโค้ดโปรแกรมเสริมและใช้ประโยชน์จากภาษาการเขียนโปรแกรมได้อย่างเต็มที่ ประการที่สอง โครงการ NiPyAPI กำลังพัฒนาอย่างแข็งขัน และในกรณีที่เกิดปัญหา คุณสามารถเขียนถึงนักพัฒนาได้ ประการที่สาม NiPyAPI ยังคงเป็นเครื่องมือที่มีความยืดหยุ่นมากขึ้นสำหรับการโต้ตอบกับ NiFi ในการแก้ปัญหาที่ซับซ้อน ตัวอย่างเช่น ในการพิจารณาว่าขณะนี้คิวข้อความว่างเปล่าในโฟลว์หรือไม่ และจะสามารถอัพเดตกลุ่มกระบวนการได้หรือไม่
นั่นคือทั้งหมดที่ ฉันอธิบาย 3 วิธีในการทำให้การส่งโฟลว์เป็นอัตโนมัติใน NiFi ซึ่งเป็นข้อผิดพลาดที่นักพัฒนาอาจพบและจัดเตรียมโค้ดที่ใช้งานได้สำหรับการส่งแบบอัตโนมัติ หากคุณสนใจหัวข้อนี้เหมือนกับฉัน -
ที่มา: will.com