Apache NiFi-da oqimni etkazib berishni avtomatlashtirish

Hammaga salom!

Apache NiFi-da oqimni etkazib berishni avtomatlashtirish

Vazifa quyidagicha - yuqoridagi rasmda ko'rsatilgan oqim mavjud bo'lib, uni N serverga tarqatish kerak. Apache NiFi. Oqim testi - fayl yaratilmoqda va boshqa NiFi nusxasiga yuborilmoqda. Ma'lumotlarni uzatish NiFi Saytdan Saytga protokoli yordamida amalga oshiriladi.

NiFi Saytdan Saytga (S2S) NiFi nusxalari o'rtasida ma'lumotlarni uzatishning xavfsiz, osongina sozlanishi mumkin. S2S qanday ishlaydi, qarang hujjatlar va S2S ga ruxsat berish uchun NiFi nusxasini sozlashni unutmaslik kerak, qarang shu yerda.

S2S yordamida ma'lumotlarni uzatish haqida gapiradigan hollarda, bitta misol mijoz, ikkinchi server deb ataladi. Mijoz ma'lumotlarni yuboradi, server qabul qiladi. Ular o'rtasida ma'lumotlar uzatishni sozlashning ikkita usuli:

  1. Durang. Mijoz misolidan ma'lumotlar Remote Process Group (RPG) yordamida yuboriladi. Server misolida ma'lumotlar kirish porti yordamida qabul qilinadi
  2. Torting. Server RPG yordamida ma'lumotlarni oladi, mijoz esa Chiqish porti yordamida yuboradi.


Chiqarish uchun oqim Apache registrida saqlanadi.

Apache NiFi Registry - bu oqimni saqlash va versiyani boshqarish uchun vositani ta'minlovchi Apache NiFi kichik loyihasi. GITning bir turi. Ro'yxatga olish kitobini o'rnatish, sozlash va u bilan ishlash haqida ma'lumotni topishingiz mumkin rasmiy hujjatlar. Saqlash uchun oqim jarayon guruhiga birlashtiriladi va ushbu shaklda registrda saqlanadi. Bu haqda keyinroq maqolada qaytamiz.

Boshida, N kichik raqam bo'lsa, oqim qabul qilinadigan vaqt ichida qo'lda etkazib beriladi va yangilanadi.

Ammo N o'sishi bilan muammolar ko'payadi:

  1. oqimni yangilash uchun ko'proq vaqt kerak bo'ladi. Siz barcha serverlarga kirishingiz kerak
  2. Shablonni yangilashda xatolar yuzaga keladi. Bu erda ular uni yangilashdi, lekin bu erda ular unutishdi
  3. ko'p sonli shunga o'xshash operatsiyalarni bajarishda insoniy xatolar

Bularning barchasi bizni jarayonni avtomatlashtirishimiz kerakligiga olib keladi. Men ushbu muammoni hal qilish uchun quyidagi usullarni sinab ko'rdim:

  1. NiFi o'rniga MiNiFi dan foydalaning
  2. NiFi CLI
  3. NiPyAPI

MiNiFi-dan foydalanish

Apache MiNiFy - Apache NiFi kichik loyihasi. MiNiFy NiFi bilan bir xil protsessorlardan foydalanadigan ixcham agent bo'lib, NiFi bilan bir xil oqimlarni yaratishga imkon beradi. Agentning engil tabiatiga, boshqa narsalar qatori, MiNiFy-ning oqim konfiguratsiyasi uchun grafik interfeysi yo'qligi bilan erishiladi. MiNiFy-da grafik interfeysning yo'qligi oqimni minifi-ga etkazish muammosini hal qilish kerakligini anglatadi. MiNiFy IOT-da faol qo'llanilganligi sababli, ko'plab komponentlar mavjud va oqimni yakuniy minifi nusxalariga etkazish jarayoni avtomatlashtirilishi kerak. Tanish vazifa, to'g'rimi?

Yana bir kichik loyiha ushbu muammoni hal qilishga yordam beradi - MiNiFi C2 Server. Ushbu mahsulot konfiguratsiyani ishlab chiqish arxitekturasining markaziy nuqtasi bo'lishi uchun mo'ljallangan. Atrof-muhitni qanday sozlash kerak - maqolada tasvirlangan Ushbu maqola Muammoni hal qilish uchun Habré haqida etarli ma'lumot mavjud. MiNiFi, C2 serveri bilan birgalikda uning konfiguratsiyasini avtomatik ravishda yangilaydi. Ushbu yondashuvning yagona kamchiliklari shundaki, siz C2 Serverda shablonlarni yaratishingiz kerak; ro'yxatga olish kitobiga oddiy majburiyat etarli emas.

Yuqoridagi maqolada tasvirlangan variant ishlaydi va amalga oshirish qiyin emas, lekin biz quyidagilarni unutmasligimiz kerak:

  1. Minifi-da nifi-dan barcha protsessorlar mavjud emas
  2. Minifi protsessor versiyalari NiFi protsessor versiyalaridan orqada qolmoqda.

Yozish vaqtida NiFi-ning so'nggi versiyasi 1.9.2. MiNiFi protsessorining oxirgi versiyasi 1.7.0. Protsessorlarni MiNiFi-ga qo'shish mumkin, ammo NiFi va MiNiFi protsessorlari o'rtasidagi versiyalar nomuvofiqligi tufayli bu ishlamasligi mumkin.

NiFi CLI

Bunga qaraganda tavsifi rasmiy veb-saytdagi vosita, bu oqimni etkazib berish yoki jarayonni boshqarish sohasida NiFI va NiFi Registry o'rtasidagi o'zaro ta'sirni avtomatlashtirish uchun vositadir. Boshlash uchun ushbu vositani yuklab olishingiz kerak. shu yerda.

Yordamchi dasturni ishga tushiring

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Ro'yxatga olish kitobidan kerakli oqimni yuklashimiz uchun biz chelak identifikatorlarini (paqir identifikatori) va oqimning o'zini (oqim identifikatori) bilishimiz kerak. Ushbu ma'lumotlarni cli orqali yoki NiFi registrining veb-interfeysida olish mumkin. Veb-interfeysda u quyidagicha ko'rinadi:

Apache NiFi-da oqimni etkazib berishni avtomatlashtirish

CLI yordamida bu amalga oshiriladi:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Biz registrdan jarayon guruhini import qilishni boshlaymiz:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Muhim nuqta shundaki, har qanday nifi namunasi biz jarayon guruhini o'tkazadigan xost sifatida belgilanishi mumkin.

To'xtatilgan protsessorlar bilan qo'shilgan jarayon guruhi, ularni ishga tushirish kerak

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Ajoyib, protsessorlar ishga tushdi. Biroq, vazifa shartlariga ko'ra, boshqa instansiyalarga ma'lumotlarni yuborish uchun bizga NiFi nusxalari kerak. Aytaylik, siz serverga ma'lumotlarni uzatish uchun Push usulini tanladingiz. Ma'lumotlar uzatishni tashkil qilish uchun siz allaqachon bizning oqimimizga kiritilgan qo'shilgan masofaviy jarayonlar guruhida (RPG) ma'lumotlarni uzatishni yoqishingiz kerak.

Apache NiFi-da oqimni etkazib berishni avtomatlashtirish

CLI va boshqa manbalardagi hujjatlarda men ma'lumotlarni uzatishni yoqish usulini topa olmadim. Agar buni qanday qilishni bilsangiz, iltimos, sharhlarda yozing.

Bizda bash bor va biz oxirigacha borishga tayyormiz, biz chiqish yo'lini topamiz! Ushbu muammoni hal qilish uchun NiFi API dan foydalanishingiz mumkin. Keling, quyidagi usuldan foydalanamiz, yuqoridagi misollardan identifikatorni oling (bizning holatda bu 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API usullarining tavsifi shu yerda.

Apache NiFi-da oqimni etkazib berishni avtomatlashtirish
Tanada siz JSON-ni o'tkazishingiz kerak, masalan:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Uning ishlashi uchun to'ldirilishi kerak bo'lgan parametrlar:
Davlat - ma'lumotlarni uzatish holati. Mavjud: maʼlumotlarni uzatishni yoqish uchun UZATILISH, oʻchirish uchun TOʻXTDI
versiya - protsessor versiyasi

versiya yaratilganda sukut bo'yicha 0 bo'ladi, lekin bu parametrlarni usul yordamida olish mumkin

Apache NiFi-da oqimni etkazib berishni avtomatlashtirish

Bash skriptlari muxlislari uchun bu usul mos bo'lib tuyulishi mumkin, ammo men uchun bu biroz qiyin - bash skriptlari mening sevimli emas. Keyingi usul menimcha qiziqarli va qulayroq.

NiPyAPI

NiPyAPI - bu NiFi misollari bilan ishlash uchun Python kutubxonasi. Hujjatlar sahifasi kutubxona bilan ishlash uchun zarur ma'lumotlarni o'z ichiga oladi. Tez boshlash bo'limida tasvirlangan loyiha github-da.

Konfiguratsiyani yoyish uchun bizning skriptimiz Python dasturidir. Keling, kodlashga o'tamiz.
Biz keyingi ish uchun konfiguratsiyalarni o'rnatdik. Bizga quyidagi parametrlar kerak bo'ladi:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Keyinchalik men ushbu kutubxonaning tavsiflangan usullarining nomlarini kiritaman shu yerda.

Ro'yxatga olish kitobini nifi misoliga ulang

nipyapi.versioning.create_registry_client

Ushbu bosqichda siz ro'yxatga olish kitobi allaqachon namunaga qo'shilganligini tekshirishni qo'shishingiz mumkin; buning uchun siz usuldan foydalanishingiz mumkin.

nipyapi.versioning.list_registry_clients

Savatdagi oqimni keyingi izlash uchun chelakni topamiz

nipyapi.versioning.get_registry_bucket

Topilgan chelakdan foydalanib, biz oqimni qidiramiz

nipyapi.versioning.get_flow_in_bucket

Keyinchalik, ushbu jarayon guruhi allaqachon qo'shilganligini tushunish muhimdir. Jarayon guruhi koordinatalar bo'yicha joylashtirilgan va ikkinchi komponent birining ustiga qo'yilganda vaziyat yuzaga kelishi mumkin. Men tekshirdim, bu sodir bo'lishi mumkin :) Barcha qo'shilgan jarayon guruhlarini olish uchun biz usuldan foydalanamiz

nipyapi.canvas.list_all_process_groups

Biz, masalan, ism bo'yicha qo'shimcha qidirishimiz mumkin.

Men shablonni yangilash jarayonini tasvirlamayman, faqat aytamanki, agar shablonning yangi versiyasida protsessorlar qo'shilsa, navbatlarda xabarlar mavjudligi bilan bog'liq muammolar bo'lmaydi. Ammo protsessorlar olib tashlansa, muammolar paydo bo'lishi mumkin (agar uning oldida xabarlar navbati to'plangan bo'lsa, nifi protsessorni olib tashlashga ruxsat bermaydi). Agar siz bu muammoni qanday hal qilganim bilan qiziqsangiz, iltimos, menga yozing va biz bu masalani muhokama qilamiz. Maqolaning oxiridagi kontaktlar. Keling, jarayon guruhini qo'shish bosqichiga o'tamiz.

Skriptni disk raskadrovka qilishda men o'ziga xos xususiyatga duch keldimki, oqimning so'nggi versiyasi har doim ham tortib olinmaydi, shuning uchun avval ushbu versiyani tekshirishni maslahat beraman:

nipyapi.versioning.get_latest_flow_ver

Jarayon guruhini joylashtirish:

nipyapi.versioning.deploy_flow_version

Biz protsessorlarni ishga tushiramiz:

nipyapi.canvas.schedule_process_group

CLI haqidagi blokda ma'lumotlarni uzatish masofaviy jarayon guruhida avtomatik ravishda yoqilmaganligi yozilgan edi? Skriptni amalga oshirishda men ham bu muammoga duch keldim. O'sha paytda men API yordamida ma'lumotlar uzatishni boshlay olmadim va NiPyAPI kutubxonasini ishlab chiqaruvchisiga xat yozishga va maslahat/yordam so'rashga qaror qildim. Ishlab chiquvchi menga javob berdi, biz muammoni muhokama qildik va u "biror narsani tekshirish" uchun vaqt kerakligini yozdi. Va keyin, bir necha kundan keyin, Pythonda mening ishga tushirish muammomni hal qiladigan funktsiya yozilgan xat keladi!!! O'sha paytda NiPyAPI versiyasi 0.13.3 edi va, albatta, bunday narsa yo'q edi. Ammo yaqinda chiqarilgan 0.14.0 versiyasida bu funksiya allaqachon kutubxonaga kiritilgan. Tanishish,

nipyapi.canvas.set_remote_process_group_transmission

Shunday qilib, NiPyAPI kutubxonasidan foydalanib, biz ro'yxatga olish kitobini uladik, oqimni tarqatdik va hatto protsessorlar va ma'lumotlarni uzatishni boshladik. Keyin siz kodni tarashingiz, barcha turdagi cheklarni qo'shishingiz, jurnalga yozishingiz mumkin va bu hammasi. Ammo bu butunlay boshqacha hikoya.

Men ko'rib chiqqan avtomatlashtirish variantlaridan oxirgisi menga eng samarali bo'lib tuyuldi. Birinchidan, bu hali ham python kodi bo'lib, unga yordamchi dastur kodini joylashtirishingiz va dasturlash tilining barcha afzalliklaridan foydalanishingiz mumkin. Ikkinchidan, NiPyAPI loyihasi faol rivojlanmoqda va muammolar yuzaga kelganda siz dasturchiga yozishingiz mumkin. Uchinchidan, NiPyAPI hali ham murakkab muammolarni hal qilishda NiFi bilan o'zaro ishlash uchun yanada moslashuvchan vositadir. Masalan, oqimda xabarlar navbatlari bo'sh yoki yo'qligini va jarayon guruhini yangilash mumkinligini aniqlashda.

Ana xolos. Men NiFi-da oqim etkazib berishni avtomatlashtirishning 3 ta yondashuvini, ishlab chiquvchi duch kelishi mumkin bo'lgan tuzoqlarni va etkazib berishni avtomatlashtirish uchun ishchi kodni taqdim etdim. Agar siz ham men kabi bu mavzuga qiziqsangiz - yozing!

Manba: www.habr.com

a Izoh qo'shish