Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт

хүн бүрт Сайн байна уу!

Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт

Даалгавар нь дараах байдалтай байна - дээрх зурган дээр харуулсан урсгал байгаа бөгөөд үүнийг N серверт шилжүүлэх шаардлагатай. Apache NiFi. Урсгалын тест - файлыг үүсгэж, өөр NiFi инстанс руу илгээж байна. Мэдээлэл дамжуулах нь NiFi сайтаас сайт руу протоколыг ашиглан хийгддэг.

NiFi Site to Site (S2S) нь NiFi instance-ийн хооронд өгөгдөл дамжуулах найдвартай, өндөр тохируулгатай арга юм. S2S хэрхэн ажилладагийг харна уу баримт бичиг S2S-ийг харах боломжийг олгохын тулд NiFi инстанцаа тохируулахаа санах нь чухал юм энд.

S2S ашиглан өгөгдөл дамжуулах тухай ярихад нэг жишээг клиент, хоёр дахь нь сервер гэж нэрлэдэг. Үйлчлүүлэгч өгөгдөл илгээдэг, сервер нь хүлээн авдаг. Тэдний хооронд өгөгдөл дамжуулах хоёр арга зам:

  1. түлхэх. Мэдээллийг Remote Process Group (RPG) ашиглан үйлчлүүлэгчийн жишээнээс илгээдэг. Серверийн жишээн дээр оролтын порт ашиглан өгөгдлийг хүлээн авдаг
  2. Татах. Сервер нь RPG ашиглан өгөгдлийг хүлээн авдаг бол үйлчлүүлэгч нь Гаралтын портыг ашиглан илгээдэг.


Гулсуулах урсгал нь Apache бүртгэлд хадгалагддаг.

Apache NiFi Registry нь урсгалыг хадгалах, хувилбар гаргах хэрэгслээр хангадаг Apache NiFi-ийн дэд төсөл юм. Нэг төрлийн GIT. Бүртгэлийг суулгах, тохируулах, түүнтэй ажиллах талаархи мэдээллийг эндээс авах боломжтой албан ёсны баримт бичиг. Хадгалах урсгалыг процессын бүлэгт нэгтгэж, бүртгэлд энэ хэлбэрээр хадгална. Энэ талаар бид дараа нь нийтлэлд эргэн харах болно.

Эхэндээ N нь бага тоо байх үед урсгалыг боломжийн хугацаанд гараар хүргэж, шинэчилдэг.

Гэхдээ N өсөх тусам илүү олон асуудал гарч ирнэ:

  1. урсгалыг шинэчлэхэд илүү их цаг зарцуулдаг. Та бүх серверүүд рүү очих хэрэгтэй
  2. загваруудыг шинэчлэхэд алдаа гарлаа. Энд тэд шинэчлэгдсэн боловч энд мартжээ
  3. олон тооны ижил төстэй үйлдлүүдийг гүйцэтгэх үед хүний ​​алдаа

Энэ бүхэн нь үйл явцыг автоматжуулах шаардлагатай гэдгийг бидэнд харуулж байна. Энэ асуудлыг шийдэхийн тулд би дараах аргуудыг туршиж үзсэн.

  1. NiFi-ийн оронд MiNiFi ашигла
  2. NiFi CLI
  3. NiPyAPI

MiNiFi ашиглах

ApacheMiNify нь Apache NiFi-ийн дэд төсөл юм. MiNiFy нь NiFi-тэй ижил процессоруудыг ашигладаг авсаархан агент бөгөөд NiFi-тэй ижил урсгалыг үүсгэх боломжийг танд олгоно. MiNiFy нь урсгалын тохиргоонд зориулсан график интерфэйсгүй тул агентын хөнгөн байдал нь бусад зүйлсийн дунд хүрдэг. MiNiFy-ийн график интерфэйс байхгүй байгаа нь minifi дахь урсгалыг дамжуулах асуудлыг шийдэх шаардлагатай гэсэн үг юм. MiNiFy-ийг IOT-д идэвхтэй ашигладаг тул олон бүрэлдэхүүн хэсгүүд байдаг бөгөөд эцсийн minifi инстанцуудад урсгалыг дамжуулах үйл явцыг автоматжуулах шаардлагатай. Танил даалгавар, тийм үү?

Өөр нэг дэд төсөл болох MiNiFi C2 Server нь энэ асуудлыг шийдвэрлэхэд тусална. Энэ бүтээгдэхүүн нь байршуулалтын архитектурын гол цэг байх зорилготой юм. Хүрээлэн буй орчныг хэрхэн тохируулах талаар тайлбарласан болно энэ нийтлэл Habré болон мэдээлэл нь асуудлыг шийдвэрлэхэд хангалттай юм. MiNiFi нь C2 сервертэй хамт тохиргоогоо автоматаар шинэчилдэг. Энэ аргын цорын ганц сул тал бол та C2 сервер дээр загвар үүсгэх хэрэгтэй бөгөөд бүртгэлд энгийн үүрэг өгөх нь хангалтгүй юм.

Дээрх нийтлэлд тайлбарласан сонголт нь ажиллаж байгаа бөгөөд хэрэгжүүлэхэд хэцүү биш боловч бид дараахь зүйлийг мартаж болохгүй.

  1. minifi-д nifi-ийн бүх процессор байдаггүй
  2. Minifi дахь CPU хувилбарууд нь NiFi дахь CPU хувилбаруудаас хоцорч байна.

Үүнийг бичиж байх үед NiFi-ийн хамгийн сүүлийн хувилбар нь 1.9.2. Хамгийн сүүлийн үеийн MiNiFi хувилбарын процессорын хувилбар нь 1.7.0. Процессоруудыг MiNiFi-д нэмж болох боловч NiFi болон MiNiFi процессоруудын хоорондох хувилбарын зөрүүгээс шалтгаалан энэ нь ажиллахгүй байж магадгүй юм.

NiFi CLI

Шүүгч тайлбар Албан ёсны вэбсайт дээрх хэрэгсэл, энэ нь урсгалыг дамжуулах эсвэл процессын удирдлагын чиглэлээр NiFI болон NiFi бүртгэлийн хоорондын харилцан үйлчлэлийг автоматжуулах хэрэгсэл юм. Эхлэхийн тулд энэ хэрэгслийг татаж аваарай. Эндээс.

Хэрэгслийг ажиллуул

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Бүртгэлээс шаардлагатай урсгалыг ачаалахын тулд бид сагс (хувин танигч) болон урсгалын өөрөө (урсгал танигч) танигчийг мэдэх хэрэгтэй. Энэ өгөгдлийг cli эсвэл NiFi бүртгэлийн вэб интерфэйсээр дамжуулан авах боломжтой. Вэб интерфэйс нь дараах байдлаар харагдаж байна.

Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт

CLI ашиглан та дараах зүйлийг хийнэ үү.

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Бүртгэлээс импортын процессын бүлгийг ажиллуулах:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Хамгийн чухал зүйл бол ямар ч nifi жишээг процессын бүлгийг эргэлдүүлдэг хостоор зааж өгч болно.

Процессын бүлэг зогссон процессоруудтай нэмэгдсэн тул тэдгээрийг эхлүүлэх шаардлагатай

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Гайхалтай, процессорууд эхэлсэн. Гэсэн хэдий ч асуудлын нөхцлийн дагуу бидэнд бусад инстанцууд руу өгөгдөл илгээхийн тулд NiFi instance хэрэгтэй. Сервер рүү өгөгдөл дамжуулахын тулд Push аргыг сонгосон гэж үзье. Өгөгдөл дамжуулалтыг зохион байгуулахын тулд бидний урсгалд аль хэдийн орсон байгаа Remote Process Group (RPG) дээр өгөгдөл дамжуулахыг идэвхжүүлэх шаардлагатай.

Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт

CLI болон бусад эх сурвалж дахь баримт бичигт би өгөгдөл дамжуулахыг идэвхжүүлэх аргыг олсонгүй. Хэрэв та үүнийг яаж хийхийг мэддэг бол сэтгэгдэл дээр бичнэ үү.

Бидэнд bash байгаа бөгөөд бид эцсээ хүртэл явахад бэлэн байгаа тул бид гарах арга замыг олох болно! Та энэ асуудлыг шийдэхийн тулд NiFi API ашиглаж болно. Дараах аргыг ашиглацгаая, бид дээрх жишээнүүдээс ID-г авна (бидний тохиолдолд энэ нь 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API аргуудын тодорхойлолт энд.

Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт
Биедээ та дараах хэлбэрийн JSON-г нэвтрүүлэх шаардлагатай.

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

"Ажиллах" тулд бөглөх ёстой параметрүүд:
төлөв - өгөгдөл дамжуулах төлөв. Өгөгдөл дамжуулалтыг идэвхжүүлэхийн тулд ДАМЖУУЛАХ боломжтой, идэвхгүй болгохын тулд ЗОГСООД
хувилбар - процессорын хувилбар

хувилбарыг үүсгэх үед анхдагч нь 0 байх боловч эдгээр параметрүүдийг аргыг ашиглан авч болно

Apache NiFi дахь урсгалын хүргэлтийн автоматжуулалт

Bash скриптийг хайрлагчдын хувьд энэ арга тохиромжтой мэт санагдаж болох ч энэ нь надад хэцүү байдаг - bash скрипт нь миний дуртай зүйл биш юм. Дараагийн арга нь миний бодлоор илүү сонирхолтой, илүү тохиромжтой юм.

NiPyAPI

NiPyAPI нь NiFi инстанцуудтай харилцахад зориулагдсан Python номын сан юм. Баримт бичгийн хуудас номын сантай ажиллахад шаардлагатай мэдээллийг агуулсан. Хурдан эхлүүлэх хэсэгт тайлбарласан болно төсөл github дээр.

Манай тохиргоог хийх скрипт нь Python програм юм. Кодчилол руу шилжье.
Цаашдын ажилд зориулж тохиргоог тохируулна уу. Бидэнд дараах параметрүүд хэрэгтэй болно.

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Цаашид би энэ номын сангийн аргуудын нэрсийг оруулах болно энд.

Бид бүртгэлийг nifi инстанцтай холбодог

nipyapi.versioning.create_registry_client

Энэ алхамд та бүртгэлийг инстант дээр аль хэдийн нэмсэн эсэхийг шалгах боломжтой бөгөөд үүний тулд та аргыг ашиглаж болно.

nipyapi.versioning.list_registry_clients

Бид сагсан дахь урсгалыг цаашид хайхын тулд хувиныг олдог

nipyapi.versioning.get_registry_bucket

Олдсон хувингийн дагуу бид урсгалыг хайж байна

nipyapi.versioning.get_flow_in_bucket

Дараа нь энэ процессын бүлгийг аль хэдийн нэмсэн эсэхийг ойлгох нь чухал юм. Процессын бүлгийг координатаар байрлуулсан бөгөөд хоёр дахь нь нэг дээр тавигдах үед нөхцөл байдал үүсч болно. Би шалгасан, энэ нь байж болно 🙂 Бүх нэмсэн процессын бүлгийг авахын тулд аргыг ашиглана уу

nipyapi.canvas.list_all_process_groups

тэгээд бид жишээ нь нэрээр нь хайж болно.

Загварыг шинэчлэх үйл явцыг би тайлбарлахгүй, хэрэв загварын шинэ хувилбарт процессорууд нэмэгдсэн бол дараалалд мессеж байх асуудал гарахгүй гэдгийг л хэлэх болно. Гэхдээ хэрэв процессоруудыг устгавал асуудал гарч болзошгүй (хэрэв өмнө нь мессежийн дараалал хуримтлагдсан бол nifi нь процессорыг арилгахыг зөвшөөрдөггүй). Хэрэв та миний энэ асуудлыг хэрхэн шийдсэнийг сонирхож байвал над руу бичээрэй, бид энэ асуудлыг хэлэлцэх болно. Өгүүллийн төгсгөлд байгаа холбоо барих хаягууд. Процессын бүлгийг нэмэх алхам руу шилжье.

Скриптийг дибаг хийхдээ урсгалын хамгийн сүүлийн хувилбар үргэлж гарч ирдэггүй гэсэн онцлогтой тулгарсан тул эхлээд энэ хувилбарыг тодруулахыг зөвлөж байна.

nipyapi.versioning.get_latest_flow_ver

Процессын бүлгийг байрлуулах:

nipyapi.versioning.deploy_flow_version

Бид процессоруудыг эхлүүлнэ:

nipyapi.canvas.schedule_process_group

CLI-ийн тухай блок дээр алсын процессын бүлэгт өгөгдөл дамжуулах автоматаар идэвхждэггүй гэж бичсэн байсан уу? Скриптийг хэрэгжүүлэх үед би бас ийм асуудалтай тулгарсан. Тэр үед би API ашиглан өгөгдөл дамжуулалтыг эхлүүлж чадахгүй байсан тул NiPyAPI номын сангийн хөгжүүлэгч рүү бичиж зөвлөгөө / тусламж хүсэхээр шийдсэн. Хөгжүүлэгч надад хариулж, бид асуудлын талаар ярилцаж, түүнд "ямар нэгэн зүйлийг шалгах" цаг хэрэгтэй гэж бичжээ. Одоо, хэд хоногийн дараа миний эхлүүлэх асуудлыг шийддэг Python функц бичигдсэн имэйл ирлээ !!! Тэр үед NiPyAPI хувилбар нь 0.13.3 байсан бөгөөд мэдээжийн хэрэг ийм төрлийн зүйл байгаагүй. Гэхдээ саяхан гарсан 0.14.0 хувилбарт энэ функц аль хэдийн номын санд орсон байна. Уулзана

nipyapi.canvas.set_remote_process_group_transmission

Тиймээс, NiPyAPI номын сангийн тусламжтайгаар бид бүртгэлийг холбож, урсгалыг эргүүлж, процессор болон өгөгдөл дамжуулах ажлыг эхлүүлсэн. Дараа нь та кодыг самнаж, бүх төрлийн чек нэмж, бүртгэл хөтлөх, тэгээд л болоо. Гэхдээ энэ бол огт өөр түүх юм.

Миний авч үзсэн автоматжуулалтын сонголтуудаас сүүлийнх нь хамгийн үр дүнтэй нь надад санагдсан. Нэгдүгээрт, энэ нь туслах програмын кодыг суулгаж, програмчлалын хэлний бүх давуу талыг ашиглах боломжтой python код хэвээр байна. Хоёрдугаарт, NiPyAPI төсөл идэвхтэй хөгжиж байгаа бөгөөд асуудал гарсан тохиолдолд та хөгжүүлэгч рүү бичиж болно. Гуравдугаарт, NiPyAPI нь нарийн төвөгтэй асуудлыг шийдвэрлэхэд NiFi-тай харилцах илүү уян хатан хэрэгсэл хэвээр байна. Жишээлбэл, мессежийн дараалал нь урсгалд одоогоор хоосон байгаа эсэх, процессын бүлгийг шинэчлэх боломжтой эсэхийг тодорхойлоход.

Тэгээд л болоо. Би NiFi дахь урсгалын дамжуулалтыг автоматжуулах 3 хандлагыг тайлбарлаж, хөгжүүлэгчид тулгарч болох бэрхшээлүүдийг тайлбарлаж, хүргэлтийг автоматжуулах ажлын кодыг өгсөн. Хэрэв та над шиг энэ сэдвийг сонирхож байгаа бол - бичих!

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх