Automation ng Paghahatid ng Daloy sa Apache NiFi

Kumusta sa lahat!

Automation ng Paghahatid ng Daloy sa Apache NiFi

Ang gawain ay ang mga sumusunod - mayroong isang daloy na ipinapakita sa larawan sa itaas, na kailangang ilunsad sa N server na may Apache NiFi. Flow test - isang file ay binubuo at ipinapadala sa isa pang NiFi instance. Nagaganap ang paglilipat ng data gamit ang NiFi Site to Site protocol.

Ang NiFi Site to Site (S2S) ay isang secure, lubos na nako-customize na paraan upang maglipat ng data sa pagitan ng mga instance ng NiFi. Tingnan kung paano gumagana ang S2S dokumentasyon at mahalagang tandaan na i-set up ang iyong NiFi instance upang payagan ang S2S na makita dito.

Pagdating sa paglipat ng data gamit ang S2S, ang isang pagkakataon ay tinatawag na isang kliyente, ang pangalawa ay isang server. Nagpapadala ang kliyente ng data, natatanggap ito ng server. Dalawang paraan upang i-set up ang paglipat ng data sa pagitan nila:

  1. Itulak. Ipinapadala ang data mula sa instance ng kliyente gamit ang Remote Process Group (RPG). Sa halimbawa ng server, natatanggap ang data gamit ang Input Port
  2. Hilahin. Ang server ay tumatanggap ng data gamit ang RPG, ang kliyente ay nagpapadala gamit ang Output port.


Ang daloy para sa pag-roll ay naka-imbak sa Apache Registry.

Ang Apache NiFi Registry ay isang subproject ng Apache NiFi na nagbibigay ng flow storage at tool sa pag-bersyon. Isang uri ng GIT. Ang impormasyon tungkol sa pag-install, pag-configure at pagtatrabaho sa registry ay matatagpuan sa opisyal na dokumentasyon. Ang daloy para sa imbakan ay pinagsama sa isang pangkat ng proseso at iniimbak sa registry sa form na ito. Babalik tayo dito mamaya sa artikulo.

Sa simula, kapag ang N ay isang maliit na numero, ang daloy ay inihahatid at ina-update sa pamamagitan ng kamay sa isang makatwirang oras.

Ngunit habang lumalaki ang N, mas maraming problema:

  1. nangangailangan ng mas maraming oras upang i-update ang daloy. Kailangan mong pumunta sa lahat ng mga server
  2. may mga error sa pag-update ng mga template. Dito sila nag-update, ngunit dito nakalimutan nila
  3. pagkakamali ng tao kapag nagsasagawa ng malaking bilang ng mga katulad na operasyon

Ang lahat ng ito ay nagdadala sa amin sa katotohanan na ito ay kinakailangan upang i-automate ang proseso. Sinubukan ko ang mga sumusunod na paraan upang malutas ang problemang ito:

  1. Gamitin ang MiNiFi sa halip na NiFi
  2. NiFi CLI
  3. NiPyAPI

Gamit ang MiNiFi

ApacheMiNify ay isang subproject ng Apache NiFi. Ang MiNiFy ay isang compact agent na gumagamit ng parehong mga processor gaya ng NiFi, na nagbibigay-daan sa iyong lumikha ng parehong daloy tulad ng sa NiFi. Ang liwanag ng ahente ay nakakamit, bukod sa iba pang mga bagay, dahil sa ang katunayan na ang MiNiFy ay walang graphical na interface para sa configuration ng daloy. Ang kakulangan ng MiNiFy ng isang graphical na interface ay nangangahulugan na ito ay kinakailangan upang malutas ang problema ng daloy ng paghahatid sa minifi. Dahil ang MiNiFy ay aktibong ginagamit sa IOT, maraming bahagi at ang proseso ng paghahatid ng daloy sa mga huling minifi na pagkakataon ay dapat na awtomatiko. Isang pamilyar na gawain, tama ba?

Ang isa pang subproject, ang MiNiFi C2 Server, ay makakatulong sa paglutas ng problemang ito. Ang produktong ito ay nilayon na maging sentrong punto sa arkitektura ng deployment. Paano i-configure ang kapaligiran - inilarawan sa artikulong ito sa HabrΓ© at sapat na ang impormasyon upang malutas ang problema. Awtomatikong ina-update ng MiNiFi kasabay ng C2 server ang configuration nito. Ang tanging disbentaha ng diskarteng ito ay kailangan mong lumikha ng mga template sa C2 Server, hindi sapat ang isang simpleng commit sa registry.

Ang opsyon na inilarawan sa artikulo sa itaas ay gumagana at hindi mahirap ipatupad, ngunit hindi natin dapat kalimutan ang sumusunod:

  1. Ang minifi ay wala ang lahat ng mga processor mula sa nifi
  2. Ang mga bersyon ng CPU sa Minifi ay nahuhuli sa mga bersyon ng CPU sa NiFi.

Sa oras ng pagsulat, ang pinakabagong bersyon ng NiFi ay 1.9.2. Ang bersyon ng processor ng pinakabagong bersyon ng MiNiFi ay 1.7.0. Maaaring idagdag ang mga processor sa MiNiFi, ngunit dahil sa mga pagkakaiba sa bersyon sa pagitan ng mga processor ng NiFi at MiNiFi, maaaring hindi ito gumana.

NiFi CLI

Sa paghusga ni paglalarawan tool sa opisyal na website, ito ay isang tool para sa pag-automate ng pakikipag-ugnayan sa pagitan ng NiFI at NiFi Registry sa larangan ng paghahatid ng daloy o pamamahala ng proseso. I-download ang tool na ito para makapagsimula. kaya.

Patakbuhin ang utility

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Upang mai-load natin ang kinakailangang daloy mula sa pagpapatala, kailangan nating malaman ang mga identifier ng basket (bucket identifier) ​​​​at ang daloy mismo (flow identifier). Ang data na ito ay maaaring makuha alinman sa pamamagitan ng cli o sa NiFi registry web interface. Ang web interface ay ganito ang hitsura:

Automation ng Paghahatid ng Daloy sa Apache NiFi

Gamit ang CLI, gagawin mo ito:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Patakbuhin ang pangkat ng proseso ng pag-import mula sa pagpapatala:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Ang isang mahalagang punto ay ang anumang instance ng nifi ay maaaring tukuyin bilang host kung saan namin i-roll ang pangkat ng proseso.

Idinagdag ang pangkat ng proseso na may mga tumigil na processor, kailangan nilang simulan

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Mahusay, nagsimula na ang mga processor. Gayunpaman, ayon sa mga kondisyon ng problema, kailangan namin ng mga Instance ng NiFi upang magpadala ng data sa ibang mga pagkakataon. Ipagpalagay natin na ang paraan ng Push ay pinili upang maglipat ng data sa server. Upang maisaayos ang paglilipat ng data, kinakailangan na paganahin ang paglipat ng data (Paganahin ang pagpapadala) sa idinagdag na Remote Process Group (RPG), na kasama na sa aming daloy.

Automation ng Paghahatid ng Daloy sa Apache NiFi

Sa dokumentasyon sa CLI at iba pang mga mapagkukunan, hindi ako nakahanap ng paraan upang paganahin ang paglipat ng data. Kung alam mo kung paano gawin ito, mangyaring sumulat sa mga komento.

Dahil may bash tayo at handa na tayong pumunta sa dulo, hahanap tayo ng paraan! Maaari mong gamitin ang NiFi API upang malutas ang problemang ito. Gamitin natin ang sumusunod na paraan, kinukuha natin ang ID mula sa mga halimbawa sa itaas (sa aming kaso ito ay 7f522a13-016e-1000-e504-d5b15587f2f3). Paglalarawan ng Mga Paraan ng NiFi API dito.

Automation ng Paghahatid ng Daloy sa Apache NiFi
Sa katawan, kailangan mong ipasa ang JSON, sa sumusunod na anyo:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Mga parameter na dapat punan upang "gumana":
ay β€” katayuan ng paglilipat ng data. Available ang TRANSMITTING upang paganahin ang paglipat ng data, STOPPED upang hindi paganahin
bersyon - bersyon ng processor

Ang bersyon ay magiging default sa 0 kapag ginawa, ngunit ang mga parameter na ito ay maaaring makuha gamit ang pamamaraan

Automation ng Paghahatid ng Daloy sa Apache NiFi

Para sa mga mahilig sa mga script ng bash, maaaring mukhang angkop ang pamamaraang ito, ngunit mahirap para sa akin - hindi ko paborito ang mga script ng bash. Ang susunod na paraan ay mas kawili-wili at mas maginhawa sa aking opinyon.

NiPyAPI

Ang NiPyAPI ay isang Python library para sa pakikipag-ugnayan sa mga pagkakataon ng NiFi. Pahina ng dokumentasyon naglalaman ng kinakailangang impormasyon para magtrabaho kasama ang aklatan. Ang mabilisang pagsisimula ay inilalarawan sa proyekto sa github.

Ang aming script para sa paglulunsad ng configuration ay isang Python program. Lumipat tayo sa coding.
Mag-set up ng mga config para sa karagdagang trabaho. Kakailanganin namin ang mga sumusunod na parameter:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-api инстанса, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Π·Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² инстансС nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ подтягиваСм flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ подтягиваСм

Karagdagang ilalagay ko ang mga pangalan ng mga pamamaraan ng aklatang ito, na inilarawan dito.

Ikinonekta namin ang registry sa nifi instance gamit

nipyapi.versioning.create_registry_client

Sa hakbang na ito, maaari ka ring magdagdag ng isang tseke na ang pagpapatala ay naidagdag na sa halimbawa, para dito maaari mong gamitin ang pamamaraan

nipyapi.versioning.list_registry_clients

Hinahanap namin ang balde upang higit pang hanapin ang daloy sa basket

nipyapi.versioning.get_registry_bucket

Ayon sa nahanap na balde, naghahanap kami ng daloy

nipyapi.versioning.get_flow_in_bucket

Susunod, mahalagang maunawaan kung naidagdag na ang pangkat ng prosesong ito. Ang pangkat ng proseso ay inilalagay sa pamamagitan ng mga coordinate at maaaring lumitaw ang isang sitwasyon kapag ang pangalawa ay nakapatong sa ibabaw ng isa. Sinuri ko, maaari itong maging πŸ™‚ Upang makuha ang lahat ng idinagdag na pangkat ng proseso, gamitin ang pamamaraan

nipyapi.canvas.list_all_process_groups

at pagkatapos ay maaari tayong maghanap, halimbawa, ayon sa pangalan.

Hindi ko ilalarawan ang proseso ng pag-update ng template, sasabihin ko lang na kung ang mga processor ay idinagdag sa bagong bersyon ng template, kung gayon walang mga problema sa pagkakaroon ng mga mensahe sa mga pila. Ngunit kung ang mga processor ay tinanggal, pagkatapos ay maaaring lumitaw ang mga problema (hindi pinapayagan ng nifi ang pag-alis ng processor kung ang isang queue ng mensahe ay naipon sa harap nito). Kung interesado ka sa kung paano ko nalutas ang problemang ito - sumulat sa akin, mangyaring, tatalakayin natin ang puntong ito. Mga contact sa dulo ng artikulo. Lumipat tayo sa hakbang ng pagdaragdag ng isang pangkat ng proseso.

Kapag nagde-debug sa script, nakatagpo ako ng feature na ang pinakabagong bersyon ng daloy ay hindi palaging nakuha, kaya inirerekomenda ko na linawin mo muna ang bersyong ito:

nipyapi.versioning.get_latest_flow_ver

I-deploy ang pangkat ng proseso:

nipyapi.versioning.deploy_flow_version

Mga nagsisimulang processor:

nipyapi.canvas.schedule_process_group

Sa block tungkol sa CLI, isinulat na ang paglipat ng data ay hindi awtomatikong pinagana sa remote na grupo ng proseso? Kapag ipinatupad ang script, naranasan ko rin ang problemang ito. Sa oras na iyon, hindi ako makapagsimula ng paglilipat ng data gamit ang API at nagpasya akong sumulat sa developer ng library ng NiPyAPI at humingi ng payo / tulong. Sinagot ako ng developer, napag-usapan namin ang problema at isinulat niya na kailangan niya ng oras upang "magsuri ng isang bagay". At ngayon, makalipas ang ilang araw, dumating ang isang email kung saan nakasulat ang isang Python function na lumulutas sa problema ko sa startup !!! Noong panahong iyon, ang bersyon ng NiPyAPI ay 0.13.3 at, siyempre, walang ganoong uri dito. Ngunit sa bersyon 0.14.0, na pinakawalan kamakailan, ang function na ito ay naisama na sa library. Magkita

nipyapi.canvas.set_remote_process_group_transmission

Kaya, sa tulong ng NiPyAPI library, ikinonekta namin ang registry, pinagsama ang daloy, at sinimulan pa ang mga processor at paglilipat ng data. Pagkatapos ay maaari mong suklayin ang code, magdagdag ng lahat ng uri ng mga tseke, pag-log, at iyon na. Ngunit iyon ay isang ganap na naiibang kuwento.

Sa mga opsyon sa pag-automate na isinasaalang-alang ko, ang huli ay tila sa akin ang pinaka mahusay. Una, ito ay python code pa rin, kung saan maaari mong i-embed ang auxiliary program code at tamasahin ang lahat ng mga benepisyo ng isang programming language. Pangalawa, ang proyekto ng NiPyAPI ay aktibong umuunlad at sa kaso ng mga problema maaari kang sumulat sa developer. Pangatlo, ang NiPyAPI ay isa pa ring mas nababaluktot na tool para sa pakikipag-ugnayan sa NiFi sa paglutas ng mga kumplikadong problema. Halimbawa, sa pagtukoy kung ang mga queue ng mensahe ay kasalukuyang walang laman sa daloy at kung posible bang i-update ang pangkat ng proseso.

Iyon lang. Inilarawan ko ang 3 diskarte sa pag-automate ng paghahatid ng daloy sa NiFi, ang mga pitfall na maaaring maranasan ng isang developer at nagbigay ng gumaganang code para sa pag-automate ng paghahatid. Kung interesado ka sa paksang ito tulad ko - magsulat!

Pinagmulan: www.habr.com

Magdagdag ng komento