Всем Привет!
Tugas kasebut kaya ing ngisor iki - ana aliran sing ditampilake ing gambar ing ndhuwur, sing kudu diluncurake menyang server N kanthi
NiFi Site to Site (S2S) minangka cara sing aman lan bisa disesuaikan kanggo mindhah data ing antarane kedadeyan NiFi. Waca cara kerjane S2S
Nalika nerangake transfer data nggunakake S2S, siji conto diarani klien, sing kapindho yaiku server. Klien ngirim data, server nampa. Rong cara kanggo nyiyapake transfer data ing antarane:
- push. Data dikirim saka conto klien nggunakake Remote Process Group (RPG). Ing conto server, data ditampa nggunakake Port Input
- narik. Server nampa data nggunakake RPG, klien ngirim nggunakake port Output.
Aliran kanggo rolling disimpen ing Registry Apache.
Apache NiFi Registry minangka subproyek Apache NiFi sing nyedhiyakake panyimpenan aliran lan alat versi. A jinis GIT. Informasi babagan nginstal, ngatur lan nggarap pendaptaran bisa ditemokake ing
Ing wiwitan, nalika N minangka nomer cilik, aliran kasebut dikirim lan dianyari kanthi tangan ing wektu sing cukup.
Nanging nalika N tuwuh, ana masalah liyane:
- mbutuhake wektu liyane kanggo nganyari aliran. Sampeyan kudu pindhah menyang kabeh server
- ana kesalahan nganyari template. Kene padha nganyari, nanging kene padha lali
- kesalahan manungsa nalika nindakake akeh operasi sing padha
Kabeh iki ndadekke kita kasunyatan sing perlu kanggo ngotomatisasi proses. Aku wis nyoba cara ing ngisor iki kanggo ngatasi masalah iki:
- Gunakake MiNiFi tinimbang NiFi
- NiFi CLI
- NiPyAPI
Nggunakake MiNiFi
Subproyek liyane, MiNiFi C2 Server, bakal mbantu ngatasi masalah iki. Produk iki dimaksudake kanggo dadi titik tengah ing arsitektur penyebaran. Cara ngatur lingkungan - diterangake ing
Opsi sing diterangake ing artikel ing ndhuwur bisa digunakake lan ora angel dileksanakake, nanging kita kudu ora lali ing ngisor iki:
- minifi ora kabeh prosesor saka nifi
- Versi CPU ing Minifi ketinggalan versi CPU ing NiFi.
Ing wektu nulis, versi paling anyar saka NiFi punika 1.9.2. Versi prosesor versi MiNiFi paling anyar yaiku 1.7.0. Prosesor bisa ditambahake menyang MiNiFi, nanging amarga beda versi antarane prosesor NiFi lan MiNiFi, iki bisa uga ora bisa digunakake.
NiFi CLI
Kang menehi kritik dening
Mbukak sarana
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Supaya bisa mbukak aliran sing dibutuhake saka registri, kita kudu ngerti pengenal basket (pengenal ember) lan aliran kasebut dhewe (pengenal aliran). Data iki bisa dipikolehi liwat cli utawa ing antarmuka web pendaptaran NiFi. Antarmuka web katon kaya iki:
Nggunakake CLI, sampeyan nindakake iki:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Jalanake grup proses impor saka registri:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Titik penting iku sembarang Kayata nifi bisa kasebut minangka inang sing kita muter grup proses.
Klompok proses ditambahake karo prosesor mandheg, kudu diwiwiti
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Apik banget, prosesor wis diwiwiti. Nanging, miturut kondisi masalah, kita kudu NiFi kedadean kanggo ngirim data kanggo kedadean liyane. Ayo nganggep yen metode Push dipilih kanggo nransfer data menyang server. Kanggo ngatur transfer data, perlu ngaktifake transfer data (Aktifake transmisi) ing Grup Proses Jauh (RPG) sing ditambahake, sing wis kalebu ing aliran kita.
Ing dokumentasi ing CLI lan sumber liyane, aku ora nemokake cara kanggo ngaktifake transfer data. Yen sampeyan ngerti carane nindakake iki, mangga nulis ing komentar.
Amarga kita duwe bash lan kita siyap nganti pungkasan, kita bakal nemokake dalan metu! Sampeyan bisa nggunakake API NiFi kanggo ngatasi masalah iki. Ayo nggunakake cara ing ngisor iki, kita njupuk ID saka conto ing ndhuwur (ing kasus kita 7f522a13-016e-1000-e504-d5b15587f2f3). Katrangan saka Metode API NiFi
Ing awak, sampeyan kudu ngliwati JSON, saka wangun ing ngisor iki:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameter sing kudu diisi supaya bisa "bisa":
negara - status transfer data. TRANSMITTING kasedhiya kanggo ngaktifake transfer data, STOPPED kanggo mateni
versi - versi prosesor
versi bakal gawan kanggo 0 nalika digawe, nanging paramèter iki bisa dijupuk nggunakake cara
Kanggo penyayang skrip bash, cara iki bisa uga cocog, nanging angel kanggoku - skrip bash ora dadi favoritku. Cara sabanjure luwih menarik lan luwih trep miturut pendapatku.
NiPyAPI
NiPyAPI minangka perpustakaan Python kanggo sesambungan karo conto NiFi.
Skrip kita kanggo nggulung konfigurasi yaiku program Python. Ayo pindhah menyang coding.
Nyiyapake configs kanggo karya luwih. Kita butuh paramèter ing ngisor iki:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Salajengipun aku bakal masang jeneng saka cara perpustakaan iki, kang diterangake
Kita nyambungake pendaptaran menyang conto nifi nggunakake
nipyapi.versioning.create_registry_client
Ing langkah iki, sampeyan uga bisa nambah priksa manawa registri wis ditambahake menyang conto, kanggo iki sampeyan bisa nggunakake metode kasebut.
nipyapi.versioning.list_registry_clients
We golek ember kanggo luwih nelusuri aliran ing basket
nipyapi.versioning.get_registry_bucket
Miturut ember sing ditemokake, kita nggoleki aliran
nipyapi.versioning.get_flow_in_bucket
Sabanjure, penting kanggo mangerteni yen klompok proses iki wis ditambahake. Klompok proses diselehake kanthi koordinat lan bisa uga ana kahanan nalika sing liyane ditumpangake ing ndhuwur siji. Aku dicenthang, bisa 🙂 Kanggo njaluk kabeh klompok proses ditambahaké, nggunakake cara
nipyapi.canvas.list_all_process_groups
banjur kita bisa nelusuri, contone, jeneng.
Aku ora bakal njlèntrèhaké proses nganyari cithakan, Aku mung bakal ngomong yen prosesor ditambahake ing versi anyar saka cithakan, banjur ora ana masalah karo ngarsane pesen ing antrian. Nanging yen pemroses dibusak, banjur ana masalah (nifi ora ngidini mbusak prosesor yen antrian pesen wis nambah ing ngarepe). Yen sampeyan kasengsem ing carane aku ditanggulangi masalah iki - nulis kanggo kula, please, kita bakal ngrembug bab iki. Kontak ing pungkasan artikel. Ayo pindhah menyang langkah nambah grup proses.
Nalika debugging skrip, aku nemokake fitur sing versi paling anyar saka aliran ora tansah ditarik munggah, supaya aku nyaranake sing pisanan njlentrehake versi iki:
nipyapi.versioning.get_latest_flow_ver
Grup proses nyebarake:
nipyapi.versioning.deploy_flow_version
Kita miwiti prosesor:
nipyapi.canvas.schedule_process_group
Ing blok babagan CLI, ditulis manawa transfer data ora diaktifake kanthi otomatis ing grup proses remot? Nalika ngetrapake skrip, aku uga nemoni masalah iki. Ing wektu iku, aku ora bisa miwiti transfer data nggunakake API lan aku mutusaké kanggo nulis kanggo pangembang perpustakaan NiPyAPI lan njaluk saran / bantuan. Pangembang mangsuli kula, kita rembugan masalah lan wrote sing perlu wektu kanggo "mriksa soko". Lan saiki, sawetara dina mengko, email teka ing ngendi fungsi Python ditulis sing ngrampungake masalah wiwitan !!! Ing wektu kasebut, versi NiPyAPI yaiku 0.13.3 lan, mesthi, ora ana sing kaya ngono. Nanging ing versi 0.14.0, sing dirilis bubar, fungsi iki wis kalebu ing perpustakaan. ketemu
nipyapi.canvas.set_remote_process_group_transmission
Dadi, kanthi bantuan perpustakaan NiPyAPI, kita nyambungake pendaptaran, nggulung aliran, lan malah miwiti prosesor lan transfer data. Banjur sampeyan bisa nyisir kode, nambah kabeh jinis mriksa, logging, lan iku. Nanging kuwi crita sing beda.
Saka opsi otomatisasi sing dakanggep, sing terakhir katon paling efisien. Kaping pisanan, iki isih kode python, ing ngendi sampeyan bisa nglebokake kode program tambahan lan nikmati kabeh mupangat saka basa pamrograman. Kapindho, proyek NiPyAPI aktif berkembang lan yen ana masalah, sampeyan bisa nulis menyang pangembang. Katelu, NiPyAPI isih dadi alat sing luwih fleksibel kanggo sesambungan karo NiFi kanggo ngrampungake masalah sing rumit. Contone, ing nentokake apa queues pesen saiki kosong ing aliran lan apa iku bisa kanggo nganyari grup proses.
Mekaten. Aku njlèntrèhaké 3 pendekatan kanggo ngotomatisasi pangiriman aliran ing NiFi, pitfalls sing bisa ditemoni pangembang lan nyedhiyakake kode sing bisa digunakake kanggo pangiriman otomatis. Yen sampeyan kasengsem ing topik iki kaya aku -
Source: www.habr.com