Kumusta sa lahat!
Ang gawain ay ang mga sumusunod - mayroong isang daloy na ipinapakita sa larawan sa itaas, na kailangang ilunsad sa N server na may
Ang NiFi Site to Site (S2S) ay isang secure, lubos na nako-customize na paraan upang maglipat ng data sa pagitan ng mga instance ng NiFi. Tingnan kung paano gumagana ang S2S
Pagdating sa paglipat ng data gamit ang S2S, ang isang pagkakataon ay tinatawag na isang kliyente, ang pangalawa ay isang server. Nagpapadala ang kliyente ng data, natatanggap ito ng server. Dalawang paraan upang i-set up ang paglipat ng data sa pagitan nila:
- Itulak. Ipinapadala ang data mula sa instance ng kliyente gamit ang Remote Process Group (RPG). Sa halimbawa ng server, natatanggap ang data gamit ang Input Port
- Hilahin. Ang server ay tumatanggap ng data gamit ang RPG, ang kliyente ay nagpapadala gamit ang Output port.
Ang daloy para sa pag-roll ay naka-imbak sa Apache Registry.
Ang Apache NiFi Registry ay isang subproject ng Apache NiFi na nagbibigay ng flow storage at tool sa pag-bersyon. Isang uri ng GIT. Ang impormasyon tungkol sa pag-install, pag-configure at pagtatrabaho sa registry ay matatagpuan sa
Sa simula, kapag ang N ay isang maliit na numero, ang daloy ay inihahatid at ina-update sa pamamagitan ng kamay sa isang makatwirang oras.
Ngunit habang lumalaki ang N, mas maraming problema:
- nangangailangan ng mas maraming oras upang i-update ang daloy. Kailangan mong pumunta sa lahat ng mga server
- may mga error sa pag-update ng mga template. Dito sila nag-update, ngunit dito nakalimutan nila
- pagkakamali ng tao kapag nagsasagawa ng malaking bilang ng mga katulad na operasyon
Ang lahat ng ito ay nagdadala sa amin sa katotohanan na ito ay kinakailangan upang i-automate ang proseso. Sinubukan ko ang mga sumusunod na paraan upang malutas ang problemang ito:
- Gamitin ang MiNiFi sa halip na NiFi
- NiFi CLI
- NiPyAPI
Gamit ang MiNiFi
Ang isa pang subproject, ang MiNiFi C2 Server, ay makakatulong sa paglutas ng problemang ito. Ang produktong ito ay nilayon na maging sentrong punto sa arkitektura ng deployment. Paano i-configure ang kapaligiran - inilarawan sa
Ang opsyon na inilarawan sa artikulo sa itaas ay gumagana at hindi mahirap ipatupad, ngunit hindi natin dapat kalimutan ang sumusunod:
- Ang minifi ay wala ang lahat ng mga processor mula sa nifi
- Ang mga bersyon ng CPU sa Minifi ay nahuhuli sa mga bersyon ng CPU sa NiFi.
Sa oras ng pagsulat, ang pinakabagong bersyon ng NiFi ay 1.9.2. Ang bersyon ng processor ng pinakabagong bersyon ng MiNiFi ay 1.7.0. Maaaring idagdag ang mga processor sa MiNiFi, ngunit dahil sa mga pagkakaiba sa bersyon sa pagitan ng mga processor ng NiFi at MiNiFi, maaaring hindi ito gumana.
NiFi CLI
Sa paghusga ni
Patakbuhin ang utility
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Upang mai-load natin ang kinakailangang daloy mula sa pagpapatala, kailangan nating malaman ang mga identifier ng basket (bucket identifier) ββββat ang daloy mismo (flow identifier). Ang data na ito ay maaaring makuha alinman sa pamamagitan ng cli o sa NiFi registry web interface. Ang web interface ay ganito ang hitsura:
Gamit ang CLI, gagawin mo ito:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Patakbuhin ang pangkat ng proseso ng pag-import mula sa pagpapatala:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Ang isang mahalagang punto ay ang anumang instance ng nifi ay maaaring tukuyin bilang host kung saan namin i-roll ang pangkat ng proseso.
Idinagdag ang pangkat ng proseso na may mga tumigil na processor, kailangan nilang simulan
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Mahusay, nagsimula na ang mga processor. Gayunpaman, ayon sa mga kondisyon ng problema, kailangan namin ng mga Instance ng NiFi upang magpadala ng data sa ibang mga pagkakataon. Ipagpalagay natin na ang paraan ng Push ay pinili upang maglipat ng data sa server. Upang maisaayos ang paglilipat ng data, kinakailangan na paganahin ang paglipat ng data (Paganahin ang pagpapadala) sa idinagdag na Remote Process Group (RPG), na kasama na sa aming daloy.
Sa dokumentasyon sa CLI at iba pang mga mapagkukunan, hindi ako nakahanap ng paraan upang paganahin ang paglipat ng data. Kung alam mo kung paano gawin ito, mangyaring sumulat sa mga komento.
Dahil may bash tayo at handa na tayong pumunta sa dulo, hahanap tayo ng paraan! Maaari mong gamitin ang NiFi API upang malutas ang problemang ito. Gamitin natin ang sumusunod na paraan, kinukuha natin ang ID mula sa mga halimbawa sa itaas (sa aming kaso ito ay 7f522a13-016e-1000-e504-d5b15587f2f3). Paglalarawan ng Mga Paraan ng NiFi API
Sa katawan, kailangan mong ipasa ang JSON, sa sumusunod na anyo:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Mga parameter na dapat punan upang "gumana":
ay β katayuan ng paglilipat ng data. Available ang TRANSMITTING upang paganahin ang paglipat ng data, STOPPED upang hindi paganahin
bersyon - bersyon ng processor
Ang bersyon ay magiging default sa 0 kapag ginawa, ngunit ang mga parameter na ito ay maaaring makuha gamit ang pamamaraan
Para sa mga mahilig sa mga script ng bash, maaaring mukhang angkop ang pamamaraang ito, ngunit mahirap para sa akin - hindi ko paborito ang mga script ng bash. Ang susunod na paraan ay mas kawili-wili at mas maginhawa sa aking opinyon.
NiPyAPI
Ang NiPyAPI ay isang Python library para sa pakikipag-ugnayan sa mga pagkakataon ng NiFi.
Ang aming script para sa paglulunsad ng configuration ay isang Python program. Lumipat tayo sa coding.
Mag-set up ng mga config para sa karagdagang trabaho. Kakailanganin namin ang mga sumusunod na parameter:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-api ΠΈΠ½ΡΡΠ°Π½ΡΠ°, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ ΡΠ°Π·Π²ΠΎΡΠ°ΡΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡΡΡ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡΠ΄Π΅Ρ Π½Π°Π·ΡΠ²Π°ΡΡΡΡ Π² ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΠΏΠΎΠ΄ΡΡΠ³ΠΈΠ²Π°Π΅ΠΌ
Karagdagang ilalagay ko ang mga pangalan ng mga pamamaraan ng aklatang ito, na inilarawan
Ikinonekta namin ang registry sa nifi instance gamit
nipyapi.versioning.create_registry_client
Sa hakbang na ito, maaari ka ring magdagdag ng isang tseke na ang pagpapatala ay naidagdag na sa halimbawa, para dito maaari mong gamitin ang pamamaraan
nipyapi.versioning.list_registry_clients
Hinahanap namin ang balde upang higit pang hanapin ang daloy sa basket
nipyapi.versioning.get_registry_bucket
Ayon sa nahanap na balde, naghahanap kami ng daloy
nipyapi.versioning.get_flow_in_bucket
Susunod, mahalagang maunawaan kung naidagdag na ang pangkat ng prosesong ito. Ang pangkat ng proseso ay inilalagay sa pamamagitan ng mga coordinate at maaaring lumitaw ang isang sitwasyon kapag ang pangalawa ay nakapatong sa ibabaw ng isa. Sinuri ko, maaari itong maging π Upang makuha ang lahat ng idinagdag na pangkat ng proseso, gamitin ang pamamaraan
nipyapi.canvas.list_all_process_groups
at pagkatapos ay maaari tayong maghanap, halimbawa, ayon sa pangalan.
Hindi ko ilalarawan ang proseso ng pag-update ng template, sasabihin ko lang na kung ang mga processor ay idinagdag sa bagong bersyon ng template, kung gayon walang mga problema sa pagkakaroon ng mga mensahe sa mga pila. Ngunit kung ang mga processor ay tinanggal, pagkatapos ay maaaring lumitaw ang mga problema (hindi pinapayagan ng nifi ang pag-alis ng processor kung ang isang queue ng mensahe ay naipon sa harap nito). Kung interesado ka sa kung paano ko nalutas ang problemang ito - sumulat sa akin, mangyaring, tatalakayin natin ang puntong ito. Mga contact sa dulo ng artikulo. Lumipat tayo sa hakbang ng pagdaragdag ng isang pangkat ng proseso.
Kapag nagde-debug sa script, nakatagpo ako ng feature na ang pinakabagong bersyon ng daloy ay hindi palaging nakuha, kaya inirerekomenda ko na linawin mo muna ang bersyong ito:
nipyapi.versioning.get_latest_flow_ver
I-deploy ang pangkat ng proseso:
nipyapi.versioning.deploy_flow_version
Mga nagsisimulang processor:
nipyapi.canvas.schedule_process_group
Sa block tungkol sa CLI, isinulat na ang paglipat ng data ay hindi awtomatikong pinagana sa remote na grupo ng proseso? Kapag ipinatupad ang script, naranasan ko rin ang problemang ito. Sa oras na iyon, hindi ako makapagsimula ng paglilipat ng data gamit ang API at nagpasya akong sumulat sa developer ng library ng NiPyAPI at humingi ng payo / tulong. Sinagot ako ng developer, napag-usapan namin ang problema at isinulat niya na kailangan niya ng oras upang "magsuri ng isang bagay". At ngayon, makalipas ang ilang araw, dumating ang isang email kung saan nakasulat ang isang Python function na lumulutas sa problema ko sa startup !!! Noong panahong iyon, ang bersyon ng NiPyAPI ay 0.13.3 at, siyempre, walang ganoong uri dito. Ngunit sa bersyon 0.14.0, na pinakawalan kamakailan, ang function na ito ay naisama na sa library. Magkita
nipyapi.canvas.set_remote_process_group_transmission
Kaya, sa tulong ng NiPyAPI library, ikinonekta namin ang registry, pinagsama ang daloy, at sinimulan pa ang mga processor at paglilipat ng data. Pagkatapos ay maaari mong suklayin ang code, magdagdag ng lahat ng uri ng mga tseke, pag-log, at iyon na. Ngunit iyon ay isang ganap na naiibang kuwento.
Sa mga opsyon sa pag-automate na isinasaalang-alang ko, ang huli ay tila sa akin ang pinaka mahusay. Una, ito ay python code pa rin, kung saan maaari mong i-embed ang auxiliary program code at tamasahin ang lahat ng mga benepisyo ng isang programming language. Pangalawa, ang proyekto ng NiPyAPI ay aktibong umuunlad at sa kaso ng mga problema maaari kang sumulat sa developer. Pangatlo, ang NiPyAPI ay isa pa ring mas nababaluktot na tool para sa pakikipag-ugnayan sa NiFi sa paglutas ng mga kumplikadong problema. Halimbawa, sa pagtukoy kung ang mga queue ng mensahe ay kasalukuyang walang laman sa daloy at kung posible bang i-update ang pangkat ng proseso.
Iyon lang. Inilarawan ko ang 3 diskarte sa pag-automate ng paghahatid ng daloy sa NiFi, ang mga pitfall na maaaring maranasan ng isang developer at nagbigay ng gumaganang code para sa pag-automate ng paghahatid. Kung interesado ka sa paksang ito tulad ko -
Pinagmulan: www.habr.com