Automation Pangiriman Aliran dina Apache NiFi

Hello dulur!

Automation Pangiriman Aliran dina Apache NiFi

Tugasna nyaéta kieu - aya aliran anu dipidangkeun dina gambar di luhur, anu kedah digulung ka server N kalayan Apache NiFi. Test aliran - file keur dihasilkeun sarta dikirim ka conto NiFi sejen. Mindahkeun data lumangsung ngagunakeun protokol Loka NiFi ka Loka.

NiFi Site to Site (S2S) mangrupikeun cara anu aman sareng tiasa disaluyukeun pikeun mindahkeun data antara instansi NiFi. Tempo kumaha S2S jalan dokuméntasi jeung hal anu penting pikeun nginget nyetél conto NiFi anjeun pikeun ngidinan S2S tingali di dieu.

Lamun datang ka mindahkeun data ngagunakeun S2S, hiji conto disebut klien, kadua - server. Klién ngirim data, server nampi éta. Dua cara pikeun nyetél transfer data antara aranjeunna:

  1. Teken. Data dikirimkeun tina conto klien nganggo Grup Prosés Jauh (RPG). Dina conto server, data ditampi nganggo Port Input
  2. Betot. Server nampi data nganggo RPG, klien ngirim nganggo port Kaluaran.


Aliran pikeun rolling disimpen dina Apache Registry.

Apache NiFi Registry mangrupikeun subproyék tina Apache NiFi anu nyayogikeun panyimpen aliran sareng alat versi. Hiji jenis GIT. Inpormasi ngeunaan masang, ngonpigurasikeun sareng damel sareng pendaptaran tiasa dipendakan dina dokuméntasi resmi. Aliran pikeun neundeun digabungkeun kana grup prosés sareng disimpen dina pendaptaran dina bentuk ieu. Urang bakal balik deui ka ieu engké dina artikel.

Dina mimiti, nalika N nyaéta sajumlah leutik, aliran ieu dikirimkeun sarta diropéa ku leungeun dina waktu nu lumrah.

Tapi sakumaha N tumuwuh, aya deui masalah:

  1. peryogi langkung waktos pikeun ngapdet aliran. Anjeun kedah angkat ka sadaya server
  2. aya kasalahan ngamutahirkeun témplat. Di dieu aranjeunna diropéa, tapi di dieu maranéhna poho
  3. kasalahan manusa nalika ngalakukeun sajumlah ageung operasi anu sami

Sadaya ieu nyababkeun urang kanyataan yén prosésna kedah ngajadikeun otomatis. Kuring geus diusahakeun cara di handap ieu pikeun ngajawab masalah ieu:

  1. Paké MiNiFi tinimbang NiFi
  2. NiFi CLI
  3. NiPyAPI

Ngagunakeun MiNiFi

ApacheMiNify mangrupa subproyék tina Apache NiFi. MiNiFy mangrupikeun agén kompak anu ngagunakeun prosesor anu sami sareng NiFi, ngamungkinkeun anjeun nyiptakeun aliran anu sami sareng di NiFi. The lightness tina agén kahontal, antara séjén, alatan kanyataan yén MiNiFy teu boga panganteur grafis pikeun konfigurasi aliran. Kurangna antarbeungeut grafis MiNiFy hartosna peryogi pikeun ngabéréskeun masalah pangiriman aliran dina minifi. Kusabab MiNiFy aktip dipaké dina IOT, aya loba komponén jeung prosés delivering aliran ka instansi minifi final kudu otomatis. Hiji tugas akrab, katuhu?

subproyék sejen, MiNiFi C2 Server, bakal mantuan ngajawab masalah ieu. Produk ieu dimaksudkeun pikeun jadi titik sentral dina arsitéktur deployment. Kumaha ngonpigurasikeun lingkungan - dijelaskeun dina artikel ieu on Habré sarta informasi cukup pikeun ngajawab masalah. MiNiFi ditéang jeung server C2 otomatis ngamutahirkeun konfigurasi na. Hijina aral tina pendekatan ieu nu kudu nyieun témplat dina C2 Server, a commit basajan pikeun pendaptaran teu cukup.

Pilihan anu dijelaskeun dina tulisan di luhur tiasa dianggo sareng henteu sesah dilaksanakeun, tapi urang henteu kedah hilap ieu:

  1. minifi teu boga kabeh prosesor ti nifi
  2. Vérsi CPU di Minifi lag balik versi CPU di NiFi.

Dina waktos nyerat, versi panganyarna tina NiFi nyaéta 1.9.2. Versi processor tina versi MiNiFi panganyarna nyaéta 1.7.0. Prosesor bisa ditambahkeun kana MiNiFi, tapi alatan discrepancies versi antara NiFi jeung prosesor MiNiFi, ieu bisa jadi teu jalan.

NiFi CLI

Ditilik ku katerangan alat dina ramatloka resmi, ieu alat pikeun automating interaksi antara NiFI na pendaptaran NiFi dina widang pangiriman aliran atawa manajemén prosés. Unduh alat ieu pikeun ngamimitian. di dieu.

Ngajalankeun utiliti

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Pikeun ngamuat aliran anu dipikabutuh tina pendaptaran, urang kedah terang identifiers tina karinjang (ember identifier) ​​sareng aliran sorangan (flow identifier). Data ieu tiasa didapet boh ngaliwatan cli atanapi dina panganteur wéb pendaptaran NiFi. Antarbeungeut wéb sapertos kieu:

Automation Pangiriman Aliran dina Apache NiFi

Ngagunakeun CLI, anjeun ngalakukeun ieu:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Jalankeun grup prosés impor tina pendaptaran:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Hiji titik penting nyaéta yén sagala conto nifi bisa dieusian salaku host nu urang gulung grup prosés.

Grup prosés ditambahkeun jeung prosesor dieureunkeun, maranéhanana kudu dimimitian

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Hébat, prosesor parantos ngamimitian. Nanging, dumasar kana kaayaan masalah, urang peryogi instansi NiFi pikeun ngirim data ka instansi anu sanés. Hayu urang nganggap yén métode Push dipilih pikeun mindahkeun data ka server. Dina raraga ngatur mindahkeun data, perlu pikeun ngaktipkeun mindahkeun data (Aktipkeun transmisi) dina ditambahkeun Jauh Prosés Grup (RPG), nu geus kaasup dina aliran urang.

Automation Pangiriman Aliran dina Apache NiFi

Dina dokuméntasi dina CLI sareng sumber anu sanés, kuring henteu mendakan cara pikeun ngaktipkeun transfer data. Upami anjeun terang kumaha ngalakukeun ieu, punten nyerat dina koméntar.

Kusabab urang gaduh bash sareng kami siap dugi ka tungtungna, kami bakal mendakan jalan kaluar! Anjeun tiasa make API NiFi pikeun ngajawab masalah ieu. Hayu urang nganggo metodeu di handap ieu, urang nyandak ID tina conto di luhur (dina hal urang éta 7f522a13-016e-1000-e504-d5b15587f2f3). Katerangan ngeunaan Métode API NiFi di dieu.

Automation Pangiriman Aliran dina Apache NiFi
Dina awak, anjeun kedah ngalangkungan JSON, tina bentuk ieu:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameter anu kedah dieusi pikeun "garap":
kaayaan - status mindahkeun data. Sadia TRANSMITTING pikeun ngaktifkeun transfer data, STOPPED pikeun nganonaktipkeun
Vérsi - Vérsi processor

Vérsi bakal dituna pikeun 0 nalika dijieun, tapi parameter ieu tiasa didapet ngagunakeun métode

Automation Pangiriman Aliran dina Apache NiFi

Pikeun anu mikaresep skrip bash, metode ieu sigana cocog, tapi sesah pikeun kuring - skrip bash henteu karesep kuring. Cara salajengna langkung narik sareng langkung merenah dina pamanggih kuring.

NiPyAPI

NiPyAPI mangrupikeun perpustakaan Python pikeun berinteraksi sareng instansi NiFi. Kaca dokuméntasi ngandung émbaran diperlukeun pikeun gawé bareng perpustakaan. Mimiti gancang dijelaskeun dina dijujut dina github.

Skrip kami pikeun ngagulung konfigurasi nyaéta program Python. Hayu urang ngaléngkah ka coding.
Nyetél configs pikeun digawé salajengna. Urang bakal peryogi parameter di handap ieu:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Salajengna kuring bakal nyelapkeun nami metode perpustakaan ieu, anu dijelaskeun di dieu.

Urang sambungkeun pendaptaran ka conto nifi ngagunakeun

nipyapi.versioning.create_registry_client

Dina léngkah ieu, anjeun ogé tiasa nambihan cek yén pendaptaran parantos parantos ditambah kana conto, pikeun ieu anjeun tiasa nganggo metodeu.

nipyapi.versioning.list_registry_clients

Urang manggihan LIPI pikeun salajengna neangan aliran dina karinjang nu

nipyapi.versioning.get_registry_bucket

Numutkeun ember kapanggih, urang keur pilari aliran

nipyapi.versioning.get_flow_in_bucket

Salajengna, hal anu penting pikeun ngarti lamun grup prosés ieu geus ditambahkeun. Grup prosés disimpen ku koordinat sareng kaayaan tiasa timbul nalika anu kadua ditumpangkeun dina luhureun hiji. Kuring dipariksa, bisa 🙂 Pikeun meunang sagala grup prosés ditambahkeun, ngagunakeun métode

nipyapi.canvas.list_all_process_groups

lajeng urang tiasa milarian, contona, ku ngaran.

Kuring moal ngajelaskeun prosés ngamutahirkeun témplat, abdi ngan bakal disebutkeun yen lamun prosesor ditambahkeun dina versi anyar tina citakan, teras aya euweuh masalah ayana pesen dina antrian. Tapi lamun prosesor dihapus, masalah bisa timbul (nifi teu ngidinan ngaleupaskeun processor lamun antrian pesen geus akumulasi di hareup eta). Upami anjeun kabetot dina cara kuring ngarengsekeun masalah ieu - nyerat ka kuring, punten, urang bakal ngabahas titik ieu. Kontak dina tungtung artikel. Hayu urang ngaléngkah ka léngkah nambahkeun grup prosés.

Nalika nga-debug naskah, kuring mendakan fitur anu versi aliran panganyarna henteu salawasna ditarik, janten kuring nyarankeun yén anjeun netelakeun heula versi ieu:

nipyapi.versioning.get_latest_flow_ver

Grup prosés nyebarkeun:

nipyapi.versioning.deploy_flow_version

Urang ngamimitian prosésor:

nipyapi.canvas.schedule_process_group

Dina blok ngeunaan CLI, ieu ditulis yén mindahkeun data teu otomatis diaktipkeun dina grup prosés jauh? Nalika ngalaksanakeun naskah, kuring ogé mendakan masalah ieu. Dina waktos éta, kuring henteu tiasa ngamimitian mindahkeun data nganggo API sareng kuring mutuskeun nyerat ka pamekar perpustakaan NiPyAPI sareng naroskeun naséhat / bantosan. pamekar nu ngajawab kuring, urang bahas masalah jeung manéhna nulis yén manéhna diperlukeun waktu pikeun "pariksa hal". Sareng ayeuna, sababaraha dinten saatos, email sumping dimana fungsi Python ditulis anu ngarengsekeun masalah ngamimitian kuring !!! Dina waktos éta, versi NiPyAPI éta 0.13.3 sareng, tangtosna, teu aya nanaon di dinya. Tapi dina vérsi 0.14.0, anu dileupaskeun énggal-énggal, fungsi ieu parantos dilebetkeun kana perpustakaan. Papanggih

nipyapi.canvas.set_remote_process_group_transmission

Janten, kalayan bantosan perpustakaan NiPyAPI, kami nyambungkeun pendaptaran, ngagulung aliran, bahkan ngamimitian prosesor sareng transfer data. Teras anjeun tiasa nyisiran kode, tambahkeun sagala jinis cek, logging, sareng éta. Tapi éta carita lengkep béda.

Tina pilihan automation anu kuring anggap, anu terakhir sigana anu paling éfisién. Anu mimiti, ieu masih kode python, dimana anjeun tiasa ngalebetkeun kode program bantu sareng nikmati sagala kauntungan tina basa pamrograman. Bréh, proyék NiPyAPI aktip ngembang sareng upami aya masalah anjeun tiasa nyerat ka pamekar. Katilu, NiPyAPI masih mangrupikeun alat anu langkung fleksibel pikeun berinteraksi sareng NiFi dina ngarengsekeun masalah anu kompleks. Contona, dina nangtukeun naha antrian pesen ayeuna kosong dina aliran jeung naha kasebut nyaéta dimungkinkeun pikeun ngapdet grup prosés.

Éta hungkul. Kuring ngajelaskeun 3 pendekatan kana pangiriman aliran otomatis di NiFi, pitfalls anu tiasa dipendakan ku pamekar sareng nyayogikeun kode kerja pikeun pangiriman otomatis. Upami anjeun resep kana topik ieu sareng kuring - nulis!

sumber: www.habr.com

Tambahkeun komentar