Hello dulur!
Tugasna nyaéta kieu - aya aliran anu dipidangkeun dina gambar di luhur, anu kedah digulung ka server N kalayan
NiFi Site to Site (S2S) mangrupikeun cara anu aman sareng tiasa disaluyukeun pikeun mindahkeun data antara instansi NiFi. Tempo kumaha S2S jalan
Lamun datang ka mindahkeun data ngagunakeun S2S, hiji conto disebut klien, kadua - server. Klién ngirim data, server nampi éta. Dua cara pikeun nyetél transfer data antara aranjeunna:
- Teken. Data dikirimkeun tina conto klien nganggo Grup Prosés Jauh (RPG). Dina conto server, data ditampi nganggo Port Input
- Betot. Server nampi data nganggo RPG, klien ngirim nganggo port Kaluaran.
Aliran pikeun rolling disimpen dina Apache Registry.
Apache NiFi Registry mangrupikeun subproyék tina Apache NiFi anu nyayogikeun panyimpen aliran sareng alat versi. Hiji jenis GIT. Inpormasi ngeunaan masang, ngonpigurasikeun sareng damel sareng pendaptaran tiasa dipendakan dina
Dina mimiti, nalika N nyaéta sajumlah leutik, aliran ieu dikirimkeun sarta diropéa ku leungeun dina waktu nu lumrah.
Tapi sakumaha N tumuwuh, aya deui masalah:
- peryogi langkung waktos pikeun ngapdet aliran. Anjeun kedah angkat ka sadaya server
- aya kasalahan ngamutahirkeun témplat. Di dieu aranjeunna diropéa, tapi di dieu maranéhna poho
- kasalahan manusa nalika ngalakukeun sajumlah ageung operasi anu sami
Sadaya ieu nyababkeun urang kanyataan yén prosésna kedah ngajadikeun otomatis. Kuring geus diusahakeun cara di handap ieu pikeun ngajawab masalah ieu:
- Paké MiNiFi tinimbang NiFi
- NiFi CLI
- NiPyAPI
Ngagunakeun MiNiFi
subproyék sejen, MiNiFi C2 Server, bakal mantuan ngajawab masalah ieu. Produk ieu dimaksudkeun pikeun jadi titik sentral dina arsitéktur deployment. Kumaha ngonpigurasikeun lingkungan - dijelaskeun dina
Pilihan anu dijelaskeun dina tulisan di luhur tiasa dianggo sareng henteu sesah dilaksanakeun, tapi urang henteu kedah hilap ieu:
- minifi teu boga kabeh prosesor ti nifi
- Vérsi CPU di Minifi lag balik versi CPU di NiFi.
Dina waktos nyerat, versi panganyarna tina NiFi nyaéta 1.9.2. Versi processor tina versi MiNiFi panganyarna nyaéta 1.7.0. Prosesor bisa ditambahkeun kana MiNiFi, tapi alatan discrepancies versi antara NiFi jeung prosesor MiNiFi, ieu bisa jadi teu jalan.
NiFi CLI
Ditilik ku
Ngajalankeun utiliti
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Pikeun ngamuat aliran anu dipikabutuh tina pendaptaran, urang kedah terang identifiers tina karinjang (ember identifier) sareng aliran sorangan (flow identifier). Data ieu tiasa didapet boh ngaliwatan cli atanapi dina panganteur wéb pendaptaran NiFi. Antarbeungeut wéb sapertos kieu:
Ngagunakeun CLI, anjeun ngalakukeun ieu:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Jalankeun grup prosés impor tina pendaptaran:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Hiji titik penting nyaéta yén sagala conto nifi bisa dieusian salaku host nu urang gulung grup prosés.
Grup prosés ditambahkeun jeung prosesor dieureunkeun, maranéhanana kudu dimimitian
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Hébat, prosesor parantos ngamimitian. Nanging, dumasar kana kaayaan masalah, urang peryogi instansi NiFi pikeun ngirim data ka instansi anu sanés. Hayu urang nganggap yén métode Push dipilih pikeun mindahkeun data ka server. Dina raraga ngatur mindahkeun data, perlu pikeun ngaktipkeun mindahkeun data (Aktipkeun transmisi) dina ditambahkeun Jauh Prosés Grup (RPG), nu geus kaasup dina aliran urang.
Dina dokuméntasi dina CLI sareng sumber anu sanés, kuring henteu mendakan cara pikeun ngaktipkeun transfer data. Upami anjeun terang kumaha ngalakukeun ieu, punten nyerat dina koméntar.
Kusabab urang gaduh bash sareng kami siap dugi ka tungtungna, kami bakal mendakan jalan kaluar! Anjeun tiasa make API NiFi pikeun ngajawab masalah ieu. Hayu urang nganggo metodeu di handap ieu, urang nyandak ID tina conto di luhur (dina hal urang éta 7f522a13-016e-1000-e504-d5b15587f2f3). Katerangan ngeunaan Métode API NiFi
Dina awak, anjeun kedah ngalangkungan JSON, tina bentuk ieu:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameter anu kedah dieusi pikeun "garap":
kaayaan - status mindahkeun data. Sadia TRANSMITTING pikeun ngaktifkeun transfer data, STOPPED pikeun nganonaktipkeun
Vérsi - Vérsi processor
Vérsi bakal dituna pikeun 0 nalika dijieun, tapi parameter ieu tiasa didapet ngagunakeun métode
Pikeun anu mikaresep skrip bash, metode ieu sigana cocog, tapi sesah pikeun kuring - skrip bash henteu karesep kuring. Cara salajengna langkung narik sareng langkung merenah dina pamanggih kuring.
NiPyAPI
NiPyAPI mangrupikeun perpustakaan Python pikeun berinteraksi sareng instansi NiFi.
Skrip kami pikeun ngagulung konfigurasi nyaéta program Python. Hayu urang ngaléngkah ka coding.
Nyetél configs pikeun digawé salajengna. Urang bakal peryogi parameter di handap ieu:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Salajengna kuring bakal nyelapkeun nami metode perpustakaan ieu, anu dijelaskeun
Urang sambungkeun pendaptaran ka conto nifi ngagunakeun
nipyapi.versioning.create_registry_client
Dina léngkah ieu, anjeun ogé tiasa nambihan cek yén pendaptaran parantos parantos ditambah kana conto, pikeun ieu anjeun tiasa nganggo metodeu.
nipyapi.versioning.list_registry_clients
Urang manggihan LIPI pikeun salajengna neangan aliran dina karinjang nu
nipyapi.versioning.get_registry_bucket
Numutkeun ember kapanggih, urang keur pilari aliran
nipyapi.versioning.get_flow_in_bucket
Salajengna, hal anu penting pikeun ngarti lamun grup prosés ieu geus ditambahkeun. Grup prosés disimpen ku koordinat sareng kaayaan tiasa timbul nalika anu kadua ditumpangkeun dina luhureun hiji. Kuring dipariksa, bisa 🙂 Pikeun meunang sagala grup prosés ditambahkeun, ngagunakeun métode
nipyapi.canvas.list_all_process_groups
lajeng urang tiasa milarian, contona, ku ngaran.
Kuring moal ngajelaskeun prosés ngamutahirkeun témplat, abdi ngan bakal disebutkeun yen lamun prosesor ditambahkeun dina versi anyar tina citakan, teras aya euweuh masalah ayana pesen dina antrian. Tapi lamun prosesor dihapus, masalah bisa timbul (nifi teu ngidinan ngaleupaskeun processor lamun antrian pesen geus akumulasi di hareup eta). Upami anjeun kabetot dina cara kuring ngarengsekeun masalah ieu - nyerat ka kuring, punten, urang bakal ngabahas titik ieu. Kontak dina tungtung artikel. Hayu urang ngaléngkah ka léngkah nambahkeun grup prosés.
Nalika nga-debug naskah, kuring mendakan fitur anu versi aliran panganyarna henteu salawasna ditarik, janten kuring nyarankeun yén anjeun netelakeun heula versi ieu:
nipyapi.versioning.get_latest_flow_ver
Grup prosés nyebarkeun:
nipyapi.versioning.deploy_flow_version
Urang ngamimitian prosésor:
nipyapi.canvas.schedule_process_group
Dina blok ngeunaan CLI, ieu ditulis yén mindahkeun data teu otomatis diaktipkeun dina grup prosés jauh? Nalika ngalaksanakeun naskah, kuring ogé mendakan masalah ieu. Dina waktos éta, kuring henteu tiasa ngamimitian mindahkeun data nganggo API sareng kuring mutuskeun nyerat ka pamekar perpustakaan NiPyAPI sareng naroskeun naséhat / bantosan. pamekar nu ngajawab kuring, urang bahas masalah jeung manéhna nulis yén manéhna diperlukeun waktu pikeun "pariksa hal". Sareng ayeuna, sababaraha dinten saatos, email sumping dimana fungsi Python ditulis anu ngarengsekeun masalah ngamimitian kuring !!! Dina waktos éta, versi NiPyAPI éta 0.13.3 sareng, tangtosna, teu aya nanaon di dinya. Tapi dina vérsi 0.14.0, anu dileupaskeun énggal-énggal, fungsi ieu parantos dilebetkeun kana perpustakaan. Papanggih
nipyapi.canvas.set_remote_process_group_transmission
Janten, kalayan bantosan perpustakaan NiPyAPI, kami nyambungkeun pendaptaran, ngagulung aliran, bahkan ngamimitian prosesor sareng transfer data. Teras anjeun tiasa nyisiran kode, tambahkeun sagala jinis cek, logging, sareng éta. Tapi éta carita lengkep béda.
Tina pilihan automation anu kuring anggap, anu terakhir sigana anu paling éfisién. Anu mimiti, ieu masih kode python, dimana anjeun tiasa ngalebetkeun kode program bantu sareng nikmati sagala kauntungan tina basa pamrograman. Bréh, proyék NiPyAPI aktip ngembang sareng upami aya masalah anjeun tiasa nyerat ka pamekar. Katilu, NiPyAPI masih mangrupikeun alat anu langkung fleksibel pikeun berinteraksi sareng NiFi dina ngarengsekeun masalah anu kompleks. Contona, dina nangtukeun naha antrian pesen ayeuna kosong dina aliran jeung naha kasebut nyaéta dimungkinkeun pikeun ngapdet grup prosés.
Éta hungkul. Kuring ngajelaskeun 3 pendekatan kana pangiriman aliran otomatis di NiFi, pitfalls anu tiasa dipendakan ku pamekar sareng nyayogikeun kode kerja pikeun pangiriman otomatis. Upami anjeun resep kana topik ieu sareng kuring -
sumber: www.habr.com