Otomasi Pangiriman Aliran ing Apache NiFi

Всем Привет!

Otomasi Pangiriman Aliran ing Apache NiFi

Tugas kasebut kaya ing ngisor iki - ana aliran sing ditampilake ing gambar ing ndhuwur, sing kudu diluncurake menyang server N kanthi Apache NiFi. Tes aliran - file lagi digawe lan dikirim menyang conto NiFi liyane. Transfer data dumadi nggunakake protokol Situs menyang Situs NiFi.

NiFi Site to Site (S2S) minangka cara sing aman lan bisa disesuaikan kanggo mindhah data ing antarane kedadeyan NiFi. Waca cara kerjane S2S dokumentasi lan iku penting kanggo elinga nyetel NiFi Kayata kanggo ngidini S2S ndeleng kene.

Nalika nerangake transfer data nggunakake S2S, siji conto diarani klien, sing kapindho yaiku server. Klien ngirim data, server nampa. Rong cara kanggo nyiyapake transfer data ing antarane:

  1. push. Data dikirim saka conto klien nggunakake Remote Process Group (RPG). Ing conto server, data ditampa nggunakake Port Input
  2. narik. Server nampa data nggunakake RPG, klien ngirim nggunakake port Output.


Aliran kanggo rolling disimpen ing Registry Apache.

Apache NiFi Registry minangka subproyek Apache NiFi sing nyedhiyakake panyimpenan aliran lan alat versi. A jinis GIT. Informasi babagan nginstal, ngatur lan nggarap pendaptaran bisa ditemokake ing dokumentasi resmi. Aliran kanggo panyimpenan digabungake menyang grup proses lan disimpen ing registri ing formulir iki. Kita bakal bali menyang iki mengko ing artikel.

Ing wiwitan, nalika N minangka nomer cilik, aliran kasebut dikirim lan dianyari kanthi tangan ing wektu sing cukup.

Nanging nalika N tuwuh, ana masalah liyane:

  1. mbutuhake wektu liyane kanggo nganyari aliran. Sampeyan kudu pindhah menyang kabeh server
  2. ana kesalahan nganyari template. Kene padha nganyari, nanging kene padha lali
  3. kesalahan manungsa nalika nindakake akeh operasi sing padha

Kabeh iki ndadekke kita kasunyatan sing perlu kanggo ngotomatisasi proses. Aku wis nyoba cara ing ngisor iki kanggo ngatasi masalah iki:

  1. Gunakake MiNiFi tinimbang NiFi
  2. NiFi CLI
  3. NiPyAPI

Nggunakake MiNiFi

ApacheMiNify minangka subproyek Apache NiFi. MiNiFy minangka agen kompak sing nggunakake prosesor sing padha karo NiFi, ngidini sampeyan nggawe aliran sing padha karo NiFi. Lightness saka agen wis ngrambah, antarane liyane, amarga kasunyatan sing MiNiFy ora duwe antarmuka grafis kanggo konfigurasi aliran. Kekurangan antarmuka grafis MiNiFy tegese perlu kanggo ngatasi masalah pangiriman aliran ing minifi. Wiwit MiNiFy aktif digunakake ing IOT, ana akeh komponen lan proses ngirim aliran kanggo kedadean minifi final kudu otomatis. Tugas sing akrab, ta?

Subproyek liyane, MiNiFi C2 Server, bakal mbantu ngatasi masalah iki. Produk iki dimaksudake kanggo dadi titik tengah ing arsitektur penyebaran. Cara ngatur lingkungan - diterangake ing artikel iki ing Habré lan informasi cukup kanggo ngatasi masalah. MiNiFi bebarengan karo server C2 kanthi otomatis nganyari konfigurasi. Mung drawback saka pendekatan iki sing kudu nggawe Cithakan ing C2 Server, tundhuk prasaja kanggo pendaptaran ora cukup.

Opsi sing diterangake ing artikel ing ndhuwur bisa digunakake lan ora angel dileksanakake, nanging kita kudu ora lali ing ngisor iki:

  1. minifi ora kabeh prosesor saka nifi
  2. Versi CPU ing Minifi ketinggalan versi CPU ing NiFi.

Ing wektu nulis, versi paling anyar saka NiFi punika 1.9.2. Versi prosesor versi MiNiFi paling anyar yaiku 1.7.0. Prosesor bisa ditambahake menyang MiNiFi, nanging amarga beda versi antarane prosesor NiFi lan MiNiFi, iki bisa uga ora bisa digunakake.

NiFi CLI

Kang menehi kritik dening katrangan alat ing situs web resmi, iki minangka alat kanggo ngotomatisasi interaksi antarane NiFI lan Registry NiFi ing bidang pangiriman aliran utawa manajemen proses. Download alat iki kanggo miwiti. saka kene.

Mbukak sarana

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Supaya bisa mbukak aliran sing dibutuhake saka registri, kita kudu ngerti pengenal basket (pengenal ember) lan aliran kasebut dhewe (pengenal aliran). Data iki bisa dipikolehi liwat cli utawa ing antarmuka web pendaptaran NiFi. Antarmuka web katon kaya iki:

Otomasi Pangiriman Aliran ing Apache NiFi

Nggunakake CLI, sampeyan nindakake iki:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Jalanake grup proses impor saka registri:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Titik penting iku sembarang Kayata nifi bisa kasebut minangka inang sing kita muter grup proses.

Klompok proses ditambahake karo prosesor mandheg, kudu diwiwiti

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Apik banget, prosesor wis diwiwiti. Nanging, miturut kondisi masalah, kita kudu NiFi kedadean kanggo ngirim data kanggo kedadean liyane. Ayo nganggep yen metode Push dipilih kanggo nransfer data menyang server. Kanggo ngatur transfer data, perlu ngaktifake transfer data (Aktifake transmisi) ing Grup Proses Jauh (RPG) sing ditambahake, sing wis kalebu ing aliran kita.

Otomasi Pangiriman Aliran ing Apache NiFi

Ing dokumentasi ing CLI lan sumber liyane, aku ora nemokake cara kanggo ngaktifake transfer data. Yen sampeyan ngerti carane nindakake iki, mangga nulis ing komentar.

Amarga kita duwe bash lan kita siyap nganti pungkasan, kita bakal nemokake dalan metu! Sampeyan bisa nggunakake API NiFi kanggo ngatasi masalah iki. Ayo nggunakake cara ing ngisor iki, kita njupuk ID saka conto ing ndhuwur (ing kasus kita 7f522a13-016e-1000-e504-d5b15587f2f3). Katrangan saka Metode API NiFi kene.

Otomasi Pangiriman Aliran ing Apache NiFi
Ing awak, sampeyan kudu ngliwati JSON, saka wangun ing ngisor iki:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameter sing kudu diisi supaya bisa "bisa":
negara - status transfer data. TRANSMITTING kasedhiya kanggo ngaktifake transfer data, STOPPED kanggo mateni
versi - versi prosesor

versi bakal gawan kanggo 0 nalika digawe, nanging paramèter iki bisa dijupuk nggunakake cara

Otomasi Pangiriman Aliran ing Apache NiFi

Kanggo penyayang skrip bash, cara iki bisa uga cocog, nanging angel kanggoku - skrip bash ora dadi favoritku. Cara sabanjure luwih menarik lan luwih trep miturut pendapatku.

NiPyAPI

NiPyAPI minangka perpustakaan Python kanggo sesambungan karo conto NiFi. Halaman dokumentasi ngemot informasi sing dibutuhake kanggo nggarap perpustakaan. Wiwitan cepet diterangake ing proyek ing github.

Skrip kita kanggo nggulung konfigurasi yaiku program Python. Ayo pindhah menyang coding.
Nyiyapake configs kanggo karya luwih. Kita butuh paramèter ing ngisor iki:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Salajengipun aku bakal masang jeneng saka cara perpustakaan iki, kang diterangake kene.

Kita nyambungake pendaptaran menyang conto nifi nggunakake

nipyapi.versioning.create_registry_client

Ing langkah iki, sampeyan uga bisa nambah priksa manawa registri wis ditambahake menyang conto, kanggo iki sampeyan bisa nggunakake metode kasebut.

nipyapi.versioning.list_registry_clients

We golek ember kanggo luwih nelusuri aliran ing basket

nipyapi.versioning.get_registry_bucket

Miturut ember sing ditemokake, kita nggoleki aliran

nipyapi.versioning.get_flow_in_bucket

Sabanjure, penting kanggo mangerteni yen klompok proses iki wis ditambahake. Klompok proses diselehake kanthi koordinat lan bisa uga ana kahanan nalika sing liyane ditumpangake ing ndhuwur siji. Aku dicenthang, bisa 🙂 Kanggo njaluk kabeh klompok proses ditambahaké, nggunakake cara

nipyapi.canvas.list_all_process_groups

banjur kita bisa nelusuri, contone, jeneng.

Aku ora bakal njlèntrèhaké proses nganyari cithakan, Aku mung bakal ngomong yen prosesor ditambahake ing versi anyar saka cithakan, banjur ora ana masalah karo ngarsane pesen ing antrian. Nanging yen pemroses dibusak, banjur ana masalah (nifi ora ngidini mbusak prosesor yen antrian pesen wis nambah ing ngarepe). Yen sampeyan kasengsem ing carane aku ditanggulangi masalah iki - nulis kanggo kula, please, kita bakal ngrembug bab iki. Kontak ing pungkasan artikel. Ayo pindhah menyang langkah nambah grup proses.

Nalika debugging skrip, aku nemokake fitur sing versi paling anyar saka aliran ora tansah ditarik munggah, supaya aku nyaranake sing pisanan njlentrehake versi iki:

nipyapi.versioning.get_latest_flow_ver

Grup proses nyebarake:

nipyapi.versioning.deploy_flow_version

Kita miwiti prosesor:

nipyapi.canvas.schedule_process_group

Ing blok babagan CLI, ditulis manawa transfer data ora diaktifake kanthi otomatis ing grup proses remot? Nalika ngetrapake skrip, aku uga nemoni masalah iki. Ing wektu iku, aku ora bisa miwiti transfer data nggunakake API lan aku mutusaké kanggo nulis kanggo pangembang perpustakaan NiPyAPI lan njaluk saran / bantuan. Pangembang mangsuli kula, kita rembugan masalah lan wrote sing perlu wektu kanggo "mriksa soko". Lan saiki, sawetara dina mengko, email teka ing ngendi fungsi Python ditulis sing ngrampungake masalah wiwitan !!! Ing wektu kasebut, versi NiPyAPI yaiku 0.13.3 lan, mesthi, ora ana sing kaya ngono. Nanging ing versi 0.14.0, sing dirilis bubar, fungsi iki wis kalebu ing perpustakaan. ketemu

nipyapi.canvas.set_remote_process_group_transmission

Dadi, kanthi bantuan perpustakaan NiPyAPI, kita nyambungake pendaptaran, nggulung aliran, lan malah miwiti prosesor lan transfer data. Banjur sampeyan bisa nyisir kode, nambah kabeh jinis mriksa, logging, lan iku. Nanging kuwi crita sing beda.

Saka opsi otomatisasi sing dakanggep, sing terakhir katon paling efisien. Kaping pisanan, iki isih kode python, ing ngendi sampeyan bisa nglebokake kode program tambahan lan nikmati kabeh mupangat saka basa pamrograman. Kapindho, proyek NiPyAPI aktif berkembang lan yen ana masalah, sampeyan bisa nulis menyang pangembang. Katelu, NiPyAPI isih dadi alat sing luwih fleksibel kanggo sesambungan karo NiFi kanggo ngrampungake masalah sing rumit. Contone, ing nentokake apa queues pesen saiki kosong ing aliran lan apa iku bisa kanggo nganyari grup proses.

Mekaten. Aku njlèntrèhaké 3 pendekatan kanggo ngotomatisasi pangiriman aliran ing NiFi, pitfalls sing bisa ditemoni pangembang lan nyedhiyakake kode sing bisa digunakake kanggo pangiriman otomatis. Yen sampeyan kasengsem ing topik iki kaya aku - nulis!

Source: www.habr.com

Add a comment