Flow Delivery Automation yn Apache NiFi

Hello everyone!

Flow Delivery Automation yn Apache NiFi

De taak is as folget - d'r is in stream, presintearre yn 'e ôfbylding hjirboppe, dy't moat wurde útrôle nei N servers mei Apache NiFi. Flowtest - in bestân wurdt generearre en stjoerd nei in oare NiFi-eksimplaar. Gegevensferfier bart mei it NiFi Site to Site protokol.

NiFi Site to Site (S2S) is in feilige, maklik ynstelbere manier om gegevens oer te dragen tusken NiFi-eksimplaren. Hoe S2S wurket, sjoch dokumintaasje en it is wichtich om net te ferjitten om de NiFi-eksimplaar te konfigurearjen om S2S te tastean, sjoch hjir.

Yn gefallen dêr't wy prate oer gegevens oerdracht mei help fan S2S, ien eksimplaar hjit client, de twadde tsjinner. De kliïnt stjoert gegevens, de tsjinner ûntfangt. Twa manieren om gegevensferfier tusken har te konfigurearjen:

  1. Triuwe. Fanút de kliïnteksimplaar wurde gegevens ferstjoerd mei in Remote Process Group (RPG). Op de servereksimplaar wurde gegevens ûntfongen mei de ynfierpoarte
  2. Lûke. De tsjinner ûntfangt gegevens mei help fan RPG, de kliïnt stjoert mei help fan Output haven.


Flow foar útrol wurdt opslein yn Apache Registry.

Apache NiFi Registry is in subprojekt fan Apache NiFi dat in ark leveret foar streamopslach en ferzjekontrôle. In soarte fan GIT. Ynformaasje oer it ynstallearjen, konfigurearjen en wurkjen mei it register is te finen yn offisjele dokumintaasje. Flow foar opslach wurdt kombinearre yn in proses groep en opslein yn dit formulier yn it register. Dêr komme wy letter yn it artikel op werom.

By it begjin, as N in lyts oantal is, wurdt stream levere en manuell bywurke yn in akseptabele tiid.

Mar as N groeit, wurde de problemen mear:

  1. it nimt mear tiid om de stream te aktualisearjen. Jo moatte oanmelde by alle servers
  2. Flaters by it bywurkjen fan sjabloanen komme foar. Hjir hawwe se it bywurke, mar hjir binne se fergetten
  3. minsklike flaters by it útfieren fan in grut oantal ferlykbere operaasjes

Dit alles bringt ús ta it feit dat wy it proses moatte automatisearje. Ik besocht de folgjende manieren om dit probleem op te lossen:

  1. Brûk MiNiFi ynstee fan NiFi
  2. NiFi CLI
  3. NiPyAPI

Mei help fan MiNiFi

Apache MiNiFy - subprojekt fan Apache NiFi. MiNiFy is in kompakte agint dy't deselde processors brûkt as NiFi, wêrtroch jo deselde streamen kinne meitsje as yn NiFi. It lichtgewicht karakter fan 'e agint wurdt berikt ûnder oare troch it feit dat MiNiFy gjin grafyske ynterface hat foar streamkonfiguraasje. It ûntbrekken fan in grafyske ynterface yn MiNiFy betsjut dat it nedich is om it probleem op te lossen fan it leverjen fan stream nei minifi. Sûnt MiNiFy aktyf wurdt brûkt yn IOT, binne d'r in protte komponinten en it proses fan it leverjen fan stream nei de lêste minifi-eksimplaren moat automatisearre wurde. In bekende taak, krekt?

In oar subprojekt sil helpe om dit probleem op te lossen - MiNiFi C2 Server. Dit produkt is bedoeld om it sintrale punt te wêzen yn 'e konfiguraasje-útrol-arsjitektuer. Hoe de omjouwing te konfigurearjen - beskreaun yn dit artikel Der is genôch ynformaasje oer Habré om it probleem op te lossen. MiNiFi, yn gearhing mei de C2-tsjinner, fernijt automatysk syn konfiguraasje. It ienige nadeel fan dizze oanpak is dat jo sjabloanen op C2-tsjinner moatte oanmeitsje, in ienfâldige commit foar it register is net genôch.

De opsje beskreaun yn it boppesteande artikel wurket en is net dreech om te fieren, mar wy moatte it folgjende net ferjitte:

  1. Minifi hat net alle processors fan nifi
  2. Ferzjes fan Minifi-prosessor bliuwe efter NiFi-prosessorferzjes.

Op it stuit fan dit skriuwen is de lêste ferzje fan NiFi 1.9.2. De lêste ferzje fan MiNiFi-prosessor is 1.7.0. Prozessoren kinne wurde tafoege oan MiNiFi, mar fanwege ferzje ferskillen tusken NiFi en MiNiFi processors, dit kin net wurkje.

NiFi CLI

Oardielje op beskriuwing ark op 'e offisjele webside, dit is in ark foar it automatisearjen fan de ynteraksje tusken NiFI en NiFi Registry op it mêd fan streamferliening of prosesbehear. Om te begjinnen, moatte jo dit ark downloade. fan hjir.

Zapuskaem nut

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Om ús de fereaske stream út it register te laden, moatte wy de identifiers fan 'e emmer (bucket identifier) ​​​​en de stream sels (flow identifier) ​​kenne. Dizze gegevens kinne wurde krigen fia de cli of yn 'e NiFi-registraasje-webynterface. Yn 'e webynterface sjocht it der sa út:

Flow Delivery Automation yn Apache NiFi

Mei de CLI wurdt dit dien:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Wy begjinne it ymportearjen fan prosesgroep út it register:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

In wichtich punt is dat elke nifi-eksimplaar kin wurde oantsjutte as de host wêryn wy de prosesgroep rôlje.

Prosesgroep tafoege mei stoppe processors, se moatte wurde begon

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Geweldich, de processors binne begon. Neffens de betingsten fan 'e taak hawwe wy lykwols NiFi-eksimplaren nedich om gegevens nei oare eksimplaren te stjoeren. Litte wy oannimme dat jo de Push-metoade hawwe keazen om gegevens oer te bringen nei de tsjinner. Om gegevensoerdracht te organisearjen, moatte jo gegevensoerdracht ynskeakelje op 'e tafoege Remote Process Group (RPG), dy't al is opnommen yn ús stream.

Flow Delivery Automation yn Apache NiFi

Yn 'e dokumintaasje yn' e CLI en oare boarnen haw ik gjin manier fûn om gegevensferfier yn te skeakeljen. As jo ​​​​witte hoe't jo dit dwaan, skriuw dan yn 'e opmerkings.

Om't wy bash hawwe en wy binne ree om nei it ein te gean, sille wy in útwei fine! Jo kinne de NiFi API brûke om dit probleem op te lossen. Litte wy de folgjende metoade brûke, nim de ID fan 'e foarbylden hjirboppe (yn ús gefal is it 7f522a13-016e-1000-e504-d5b15587f2f3). Beskriuwing fan NiFi API metoaden hjir.

Flow Delivery Automation yn Apache NiFi
Yn it lichem moatte jo JSON trochjaan, lykas dit:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameters dy't moatte wurde ynfolle foar it te wurkjen:
steat - gegevens oerdracht status. Beskikber: TRANSMITTING om gegevensoerdracht yn te skeakeljen, STOPPE om út te skeakeljen
ferzje - prosessor ferzje

ferzje sil standert oan 0 as oanmakke, mar dizze parameters kinne wurde krigen mei help fan de metoade

Flow Delivery Automation yn Apache NiFi

Foar fans fan bash-skripts kin dizze metoade geskikt lykje, mar it is in bytsje lestich foar my - bash-skripts binne net myn favoryt. De folgjende metoade is yn myn miening ynteressanter en handiger.

NiPyAPI

NiPyAPI is in Python-bibleteek foar ynteraksje mei NiFi-eksimplaren. Dokumintaasje side befettet de nedige ynformaasje foar it wurkjen mei de bibleteek. Fluch start wurdt beskreaun yn projekt op github.

Us skript foar it útroljen fan de konfiguraasje is in programma yn Python. Litte wy trochgean nei kodearring.
Wy sette konfiguraasjes foar fierder wurk. Wy sille de folgjende parameters nedich wêze:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Folgjende sil ik de nammen fan 'e metoaden fan dizze bibleteek ynfoegje, dy't beskreaun binne hjir.

Ferbine it register mei de nifi-eksimplaar mei

nipyapi.versioning.create_registry_client

Op dizze stap kinne jo ek in kontrôle tafoegje dat it register al oan 'e eksimplaar is tafoege. Hjirfoar kinne jo de metoade brûke

nipyapi.versioning.list_registry_clients

Wy fine de bak foar fierder sykjen nei stream yn 'e koer

nipyapi.versioning.get_registry_bucket

Mei de fûnemmer sykje wy nei stream

nipyapi.versioning.get_flow_in_bucket

Dêrnei is it wichtich om te begripen oft dizze prosesgroep al tafoege is. De groep Proses wurdt pleatst neffens koördinaten en der kin in situaasje ûntstean as in twadde komponint boppe op ien wurdt lein. Ik kontrolearre, dit kin barre :) Om alle tafoege proses groepen te krijen brûke wy de metoade

nipyapi.canvas.list_all_process_groups

Wy kinne fierder sykje, bygelyks op namme.

Ik sil it proses fan it bywurkjen fan it sjabloan net beskriuwe, ik sil allinich sizze dat as processors wurde tafoege yn 'e nije ferzje fan' e sjabloan, dan binne d'r gjin problemen mei de oanwêzigens fan berjochten yn 'e wachtrijen. Mar as processors wurde fuortsmiten, dan kinne problemen ûntstean (nifi lit jo net fuortsmite in prosessor as in berjocht wachtrige hat sammele foar it). As jo ​​​​ynteressearre binne yn hoe't ik dit probleem oplost, skriuw dan nei my en wy sille dit probleem beprate. Kontakten oan 'e ein fan it artikel. Litte wy trochgean nei de stap fan it tafoegjen fan in prosesgroep.

By it debuggen fan it skript kaam ik in eigenaardichheid tsjin dat de lêste ferzje fan stream net altyd ophelle wurdt, dus ik advisearje dizze ferzje earst te kontrolearjen:

nipyapi.versioning.get_latest_flow_ver

Deploy proses groep:

nipyapi.versioning.deploy_flow_version

Wy begjinne de processors:

nipyapi.canvas.schedule_process_group

Yn it blok oer CLI waard skreaun dat gegevensferfier net automatysk ynskeakele is yn 'e prosesgroep op ôfstân? By it útfieren fan it skript kaam ik dit probleem ek tsjin. Op dat stuit koe ik de gegevensferfier net begjinne mei de API en ik besleat om te skriuwen oan de ûntwikkelder fan 'e NiPyAPI-bibleteek en freegje om advys / help. De ûntwikkelder reagearre op my, wy bepraten it probleem en hy skreau dat hy tiid nedich hie om "wat te kontrolearjen". En dan, in pear dagen letter, komt der in brief wêryn in funksje yn Python skreaun is dy't myn lansearprobleem oplost!!! Op dat stuit wie de NiPyAPI-ferzje 0.13.3 en, fansels, wie d'r neat as dat. Mar yn ferzje 0.14.0, dy't frij koartlyn útbrocht waard, wie dizze funksje al opnommen yn 'e bibleteek. Moetsje,

nipyapi.canvas.set_remote_process_group_transmission

Dat, mei help fan de NiPyAPI-bibleteek, ferbûnen wy it register, rôle stream út, en begon sels prosessoren en gegevensferfier. Dan kinne jo de koade kammen, alle soarten kontrôles tafoegje, loggje, en dat is alles. Mar dat is in folslein oar ferhaal.

Fan 'e automatisearringsopsjes dy't ik beskôge, like de lêste my it effisjinter. As earste is dit noch python-koade, wêryn jo helpprogramma-koade kinne ynbêde en profitearje fan alle foardielen fan 'e programmeartaal. Twadder is it NiPyAPI-projekt aktyf ûntwikkele en yn gefal fan problemen kinne jo skriuwe nei de ûntwikkelder. Tredde is NiPyAPI noch in fleksibeler ark foar ynteraksje mei NiFi by it oplossen fan komplekse problemen. Bygelyks by it bepalen oft de berjochtenwachtrige no leech binne yn 'e stream en oft de prosesgroep bywurke wurde kin.

Da's alles. Ik beskreau 3 oanpakken foar it automatisearjen fan streamferliening yn NiFi, falkûlen dy't in ûntwikkelder kin tsjinkomme, en levere wurkkoade foar it automatisearjen fan levering. As jo ​​​​sa ynteressearre binne yn dit ûnderwerp as ik bin - skriuwe!

Boarne: www.habr.com

Add a comment