Hammaga salom!
Vazifa quyidagicha - yuqoridagi rasmda ko'rsatilgan oqim mavjud bo'lib, uni N serverga tarqatish kerak.
NiFi Saytdan Saytga (S2S) NiFi nusxalari o'rtasida ma'lumotlarni uzatishning xavfsiz, osongina sozlanishi mumkin. S2S qanday ishlaydi, qarang
S2S yordamida ma'lumotlarni uzatish haqida gapiradigan hollarda, bitta misol mijoz, ikkinchi server deb ataladi. Mijoz ma'lumotlarni yuboradi, server qabul qiladi. Ular o'rtasida ma'lumotlar uzatishni sozlashning ikkita usuli:
- Durang. Mijoz misolidan ma'lumotlar Remote Process Group (RPG) yordamida yuboriladi. Server misolida ma'lumotlar kirish porti yordamida qabul qilinadi
- Torting. Server RPG yordamida ma'lumotlarni oladi, mijoz esa Chiqish porti yordamida yuboradi.
Chiqarish uchun oqim Apache registrida saqlanadi.
Apache NiFi Registry - bu oqimni saqlash va versiyani boshqarish uchun vositani ta'minlovchi Apache NiFi kichik loyihasi. GITning bir turi. Ro'yxatga olish kitobini o'rnatish, sozlash va u bilan ishlash haqida ma'lumotni topishingiz mumkin
Boshida, N kichik raqam bo'lsa, oqim qabul qilinadigan vaqt ichida qo'lda etkazib beriladi va yangilanadi.
Ammo N o'sishi bilan muammolar ko'payadi:
- oqimni yangilash uchun ko'proq vaqt kerak bo'ladi. Siz barcha serverlarga kirishingiz kerak
- Shablonni yangilashda xatolar yuzaga keladi. Bu erda ular uni yangilashdi, lekin bu erda ular unutishdi
- ko'p sonli shunga o'xshash operatsiyalarni bajarishda insoniy xatolar
Bularning barchasi bizni jarayonni avtomatlashtirishimiz kerakligiga olib keladi. Men ushbu muammoni hal qilish uchun quyidagi usullarni sinab ko'rdim:
- NiFi o'rniga MiNiFi dan foydalaning
- NiFi CLI
- NiPyAPI
MiNiFi-dan foydalanish
Yana bir kichik loyiha ushbu muammoni hal qilishga yordam beradi - MiNiFi C2 Server. Ushbu mahsulot konfiguratsiyani ishlab chiqish arxitekturasining markaziy nuqtasi bo'lishi uchun mo'ljallangan. Atrof-muhitni qanday sozlash kerak - maqolada tasvirlangan
Yuqoridagi maqolada tasvirlangan variant ishlaydi va amalga oshirish qiyin emas, lekin biz quyidagilarni unutmasligimiz kerak:
- Minifi-da nifi-dan barcha protsessorlar mavjud emas
- Minifi protsessor versiyalari NiFi protsessor versiyalaridan orqada qolmoqda.
Yozish vaqtida NiFi-ning so'nggi versiyasi 1.9.2. MiNiFi protsessorining oxirgi versiyasi 1.7.0. Protsessorlarni MiNiFi-ga qo'shish mumkin, ammo NiFi va MiNiFi protsessorlari o'rtasidagi versiyalar nomuvofiqligi tufayli bu ishlamasligi mumkin.
NiFi CLI
Bunga qaraganda
Yordamchi dasturni ishga tushiring
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Ro'yxatga olish kitobidan kerakli oqimni yuklashimiz uchun biz chelak identifikatorlarini (paqir identifikatori) va oqimning o'zini (oqim identifikatori) bilishimiz kerak. Ushbu ma'lumotlarni cli orqali yoki NiFi registrining veb-interfeysida olish mumkin. Veb-interfeysda u quyidagicha ko'rinadi:
CLI yordamida bu amalga oshiriladi:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Biz registrdan jarayon guruhini import qilishni boshlaymiz:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Muhim nuqta shundaki, har qanday nifi namunasi biz jarayon guruhini o'tkazadigan xost sifatida belgilanishi mumkin.
To'xtatilgan protsessorlar bilan qo'shilgan jarayon guruhi, ularni ishga tushirish kerak
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Ajoyib, protsessorlar ishga tushdi. Biroq, vazifa shartlariga ko'ra, boshqa instansiyalarga ma'lumotlarni yuborish uchun bizga NiFi nusxalari kerak. Aytaylik, siz serverga ma'lumotlarni uzatish uchun Push usulini tanladingiz. Ma'lumotlar uzatishni tashkil qilish uchun siz allaqachon bizning oqimimizga kiritilgan qo'shilgan masofaviy jarayonlar guruhida (RPG) ma'lumotlarni uzatishni yoqishingiz kerak.
CLI va boshqa manbalardagi hujjatlarda men ma'lumotlarni uzatishni yoqish usulini topa olmadim. Agar buni qanday qilishni bilsangiz, iltimos, sharhlarda yozing.
Bizda bash bor va biz oxirigacha borishga tayyormiz, biz chiqish yo'lini topamiz! Ushbu muammoni hal qilish uchun NiFi API dan foydalanishingiz mumkin. Keling, quyidagi usuldan foydalanamiz, yuqoridagi misollardan identifikatorni oling (bizning holatda bu 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API usullarining tavsifi
Tanada siz JSON-ni o'tkazishingiz kerak, masalan:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Uning ishlashi uchun to'ldirilishi kerak bo'lgan parametrlar:
Davlat - ma'lumotlarni uzatish holati. Mavjud: maʼlumotlarni uzatishni yoqish uchun UZATILISH, oʻchirish uchun TOʻXTDI
versiya - protsessor versiyasi
versiya yaratilganda sukut bo'yicha 0 bo'ladi, lekin bu parametrlarni usul yordamida olish mumkin
Bash skriptlari muxlislari uchun bu usul mos bo'lib tuyulishi mumkin, ammo men uchun bu biroz qiyin - bash skriptlari mening sevimli emas. Keyingi usul menimcha qiziqarli va qulayroq.
NiPyAPI
NiPyAPI - bu NiFi misollari bilan ishlash uchun Python kutubxonasi.
Konfiguratsiyani yoyish uchun bizning skriptimiz Python dasturidir. Keling, kodlashga o'tamiz.
Biz keyingi ish uchun konfiguratsiyalarni o'rnatdik. Bizga quyidagi parametrlar kerak bo'ladi:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Keyinchalik men ushbu kutubxonaning tavsiflangan usullarining nomlarini kiritaman
Ro'yxatga olish kitobini nifi misoliga ulang
nipyapi.versioning.create_registry_client
Ushbu bosqichda siz ro'yxatga olish kitobi allaqachon namunaga qo'shilganligini tekshirishni qo'shishingiz mumkin; buning uchun siz usuldan foydalanishingiz mumkin.
nipyapi.versioning.list_registry_clients
Savatdagi oqimni keyingi izlash uchun chelakni topamiz
nipyapi.versioning.get_registry_bucket
Topilgan chelakdan foydalanib, biz oqimni qidiramiz
nipyapi.versioning.get_flow_in_bucket
Keyinchalik, ushbu jarayon guruhi allaqachon qo'shilganligini tushunish muhimdir. Jarayon guruhi koordinatalar bo'yicha joylashtirilgan va ikkinchi komponent birining ustiga qo'yilganda vaziyat yuzaga kelishi mumkin. Men tekshirdim, bu sodir bo'lishi mumkin :) Barcha qo'shilgan jarayon guruhlarini olish uchun biz usuldan foydalanamiz
nipyapi.canvas.list_all_process_groups
Biz, masalan, ism bo'yicha qo'shimcha qidirishimiz mumkin.
Men shablonni yangilash jarayonini tasvirlamayman, faqat aytamanki, agar shablonning yangi versiyasida protsessorlar qo'shilsa, navbatlarda xabarlar mavjudligi bilan bog'liq muammolar bo'lmaydi. Ammo protsessorlar olib tashlansa, muammolar paydo bo'lishi mumkin (agar uning oldida xabarlar navbati to'plangan bo'lsa, nifi protsessorni olib tashlashga ruxsat bermaydi). Agar siz bu muammoni qanday hal qilganim bilan qiziqsangiz, iltimos, menga yozing va biz bu masalani muhokama qilamiz. Maqolaning oxiridagi kontaktlar. Keling, jarayon guruhini qo'shish bosqichiga o'tamiz.
Skriptni disk raskadrovka qilishda men o'ziga xos xususiyatga duch keldimki, oqimning so'nggi versiyasi har doim ham tortib olinmaydi, shuning uchun avval ushbu versiyani tekshirishni maslahat beraman:
nipyapi.versioning.get_latest_flow_ver
Jarayon guruhini joylashtirish:
nipyapi.versioning.deploy_flow_version
Biz protsessorlarni ishga tushiramiz:
nipyapi.canvas.schedule_process_group
CLI haqidagi blokda ma'lumotlarni uzatish masofaviy jarayon guruhida avtomatik ravishda yoqilmaganligi yozilgan edi? Skriptni amalga oshirishda men ham bu muammoga duch keldim. O'sha paytda men API yordamida ma'lumotlar uzatishni boshlay olmadim va NiPyAPI kutubxonasini ishlab chiqaruvchisiga xat yozishga va maslahat/yordam so'rashga qaror qildim. Ishlab chiquvchi menga javob berdi, biz muammoni muhokama qildik va u "biror narsani tekshirish" uchun vaqt kerakligini yozdi. Va keyin, bir necha kundan keyin, Pythonda mening ishga tushirish muammomni hal qiladigan funktsiya yozilgan xat keladi!!! O'sha paytda NiPyAPI versiyasi 0.13.3 edi va, albatta, bunday narsa yo'q edi. Ammo yaqinda chiqarilgan 0.14.0 versiyasida bu funksiya allaqachon kutubxonaga kiritilgan. Tanishish,
nipyapi.canvas.set_remote_process_group_transmission
Shunday qilib, NiPyAPI kutubxonasidan foydalanib, biz ro'yxatga olish kitobini uladik, oqimni tarqatdik va hatto protsessorlar va ma'lumotlarni uzatishni boshladik. Keyin siz kodni tarashingiz, barcha turdagi cheklarni qo'shishingiz, jurnalga yozishingiz mumkin va bu hammasi. Ammo bu butunlay boshqacha hikoya.
Men ko'rib chiqqan avtomatlashtirish variantlaridan oxirgisi menga eng samarali bo'lib tuyuldi. Birinchidan, bu hali ham python kodi bo'lib, unga yordamchi dastur kodini joylashtirishingiz va dasturlash tilining barcha afzalliklaridan foydalanishingiz mumkin. Ikkinchidan, NiPyAPI loyihasi faol rivojlanmoqda va muammolar yuzaga kelganda siz dasturchiga yozishingiz mumkin. Uchinchidan, NiPyAPI hali ham murakkab muammolarni hal qilishda NiFi bilan o'zaro ishlash uchun yanada moslashuvchan vositadir. Masalan, oqimda xabarlar navbatlari bo'sh yoki yo'qligini va jarayon guruhini yangilash mumkinligini aniqlashda.
Ana xolos. Men NiFi-da oqim etkazib berishni avtomatlashtirishning 3 ta yondashuvini, ishlab chiquvchi duch kelishi mumkin bo'lgan tuzoqlarni va etkazib berishni avtomatlashtirish uchun ishchi kodni taqdim etdim. Agar siz ham men kabi bu mavzuga qiziqsangiz -
Manba: www.habr.com