Hello everyone!
De taak is as folget - d'r is in stream, presintearre yn 'e ôfbylding hjirboppe, dy't moat wurde útrôle nei N servers mei
NiFi Site to Site (S2S) is in feilige, maklik ynstelbere manier om gegevens oer te dragen tusken NiFi-eksimplaren. Hoe S2S wurket, sjoch
Yn gefallen dêr't wy prate oer gegevens oerdracht mei help fan S2S, ien eksimplaar hjit client, de twadde tsjinner. De kliïnt stjoert gegevens, de tsjinner ûntfangt. Twa manieren om gegevensferfier tusken har te konfigurearjen:
- Triuwe. Fanút de kliïnteksimplaar wurde gegevens ferstjoerd mei in Remote Process Group (RPG). Op de servereksimplaar wurde gegevens ûntfongen mei de ynfierpoarte
- Lûke. De tsjinner ûntfangt gegevens mei help fan RPG, de kliïnt stjoert mei help fan Output haven.
Flow foar útrol wurdt opslein yn Apache Registry.
Apache NiFi Registry is in subprojekt fan Apache NiFi dat in ark leveret foar streamopslach en ferzjekontrôle. In soarte fan GIT. Ynformaasje oer it ynstallearjen, konfigurearjen en wurkjen mei it register is te finen yn
By it begjin, as N in lyts oantal is, wurdt stream levere en manuell bywurke yn in akseptabele tiid.
Mar as N groeit, wurde de problemen mear:
- it nimt mear tiid om de stream te aktualisearjen. Jo moatte oanmelde by alle servers
- Flaters by it bywurkjen fan sjabloanen komme foar. Hjir hawwe se it bywurke, mar hjir binne se fergetten
- minsklike flaters by it útfieren fan in grut oantal ferlykbere operaasjes
Dit alles bringt ús ta it feit dat wy it proses moatte automatisearje. Ik besocht de folgjende manieren om dit probleem op te lossen:
- Brûk MiNiFi ynstee fan NiFi
- NiFi CLI
- NiPyAPI
Mei help fan MiNiFi
In oar subprojekt sil helpe om dit probleem op te lossen - MiNiFi C2 Server. Dit produkt is bedoeld om it sintrale punt te wêzen yn 'e konfiguraasje-útrol-arsjitektuer. Hoe de omjouwing te konfigurearjen - beskreaun yn
De opsje beskreaun yn it boppesteande artikel wurket en is net dreech om te fieren, mar wy moatte it folgjende net ferjitte:
- Minifi hat net alle processors fan nifi
- Ferzjes fan Minifi-prosessor bliuwe efter NiFi-prosessorferzjes.
Op it stuit fan dit skriuwen is de lêste ferzje fan NiFi 1.9.2. De lêste ferzje fan MiNiFi-prosessor is 1.7.0. Prozessoren kinne wurde tafoege oan MiNiFi, mar fanwege ferzje ferskillen tusken NiFi en MiNiFi processors, dit kin net wurkje.
NiFi CLI
Oardielje op
Zapuskaem nut
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Om ús de fereaske stream út it register te laden, moatte wy de identifiers fan 'e emmer (bucket identifier) en de stream sels (flow identifier) kenne. Dizze gegevens kinne wurde krigen fia de cli of yn 'e NiFi-registraasje-webynterface. Yn 'e webynterface sjocht it der sa út:
Mei de CLI wurdt dit dien:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Wy begjinne it ymportearjen fan prosesgroep út it register:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
In wichtich punt is dat elke nifi-eksimplaar kin wurde oantsjutte as de host wêryn wy de prosesgroep rôlje.
Prosesgroep tafoege mei stoppe processors, se moatte wurde begon
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Geweldich, de processors binne begon. Neffens de betingsten fan 'e taak hawwe wy lykwols NiFi-eksimplaren nedich om gegevens nei oare eksimplaren te stjoeren. Litte wy oannimme dat jo de Push-metoade hawwe keazen om gegevens oer te bringen nei de tsjinner. Om gegevensoerdracht te organisearjen, moatte jo gegevensoerdracht ynskeakelje op 'e tafoege Remote Process Group (RPG), dy't al is opnommen yn ús stream.
Yn 'e dokumintaasje yn' e CLI en oare boarnen haw ik gjin manier fûn om gegevensferfier yn te skeakeljen. As jo witte hoe't jo dit dwaan, skriuw dan yn 'e opmerkings.
Om't wy bash hawwe en wy binne ree om nei it ein te gean, sille wy in útwei fine! Jo kinne de NiFi API brûke om dit probleem op te lossen. Litte wy de folgjende metoade brûke, nim de ID fan 'e foarbylden hjirboppe (yn ús gefal is it 7f522a13-016e-1000-e504-d5b15587f2f3). Beskriuwing fan NiFi API metoaden
Yn it lichem moatte jo JSON trochjaan, lykas dit:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameters dy't moatte wurde ynfolle foar it te wurkjen:
steat - gegevens oerdracht status. Beskikber: TRANSMITTING om gegevensoerdracht yn te skeakeljen, STOPPE om út te skeakeljen
ferzje - prosessor ferzje
ferzje sil standert oan 0 as oanmakke, mar dizze parameters kinne wurde krigen mei help fan de metoade
Foar fans fan bash-skripts kin dizze metoade geskikt lykje, mar it is in bytsje lestich foar my - bash-skripts binne net myn favoryt. De folgjende metoade is yn myn miening ynteressanter en handiger.
NiPyAPI
NiPyAPI is in Python-bibleteek foar ynteraksje mei NiFi-eksimplaren.
Us skript foar it útroljen fan de konfiguraasje is in programma yn Python. Litte wy trochgean nei kodearring.
Wy sette konfiguraasjes foar fierder wurk. Wy sille de folgjende parameters nedich wêze:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Folgjende sil ik de nammen fan 'e metoaden fan dizze bibleteek ynfoegje, dy't beskreaun binne
Ferbine it register mei de nifi-eksimplaar mei
nipyapi.versioning.create_registry_client
Op dizze stap kinne jo ek in kontrôle tafoegje dat it register al oan 'e eksimplaar is tafoege. Hjirfoar kinne jo de metoade brûke
nipyapi.versioning.list_registry_clients
Wy fine de bak foar fierder sykjen nei stream yn 'e koer
nipyapi.versioning.get_registry_bucket
Mei de fûnemmer sykje wy nei stream
nipyapi.versioning.get_flow_in_bucket
Dêrnei is it wichtich om te begripen oft dizze prosesgroep al tafoege is. De groep Proses wurdt pleatst neffens koördinaten en der kin in situaasje ûntstean as in twadde komponint boppe op ien wurdt lein. Ik kontrolearre, dit kin barre :) Om alle tafoege proses groepen te krijen brûke wy de metoade
nipyapi.canvas.list_all_process_groups
Wy kinne fierder sykje, bygelyks op namme.
Ik sil it proses fan it bywurkjen fan it sjabloan net beskriuwe, ik sil allinich sizze dat as processors wurde tafoege yn 'e nije ferzje fan' e sjabloan, dan binne d'r gjin problemen mei de oanwêzigens fan berjochten yn 'e wachtrijen. Mar as processors wurde fuortsmiten, dan kinne problemen ûntstean (nifi lit jo net fuortsmite in prosessor as in berjocht wachtrige hat sammele foar it). As jo ynteressearre binne yn hoe't ik dit probleem oplost, skriuw dan nei my en wy sille dit probleem beprate. Kontakten oan 'e ein fan it artikel. Litte wy trochgean nei de stap fan it tafoegjen fan in prosesgroep.
By it debuggen fan it skript kaam ik in eigenaardichheid tsjin dat de lêste ferzje fan stream net altyd ophelle wurdt, dus ik advisearje dizze ferzje earst te kontrolearjen:
nipyapi.versioning.get_latest_flow_ver
Deploy proses groep:
nipyapi.versioning.deploy_flow_version
Wy begjinne de processors:
nipyapi.canvas.schedule_process_group
Yn it blok oer CLI waard skreaun dat gegevensferfier net automatysk ynskeakele is yn 'e prosesgroep op ôfstân? By it útfieren fan it skript kaam ik dit probleem ek tsjin. Op dat stuit koe ik de gegevensferfier net begjinne mei de API en ik besleat om te skriuwen oan de ûntwikkelder fan 'e NiPyAPI-bibleteek en freegje om advys / help. De ûntwikkelder reagearre op my, wy bepraten it probleem en hy skreau dat hy tiid nedich hie om "wat te kontrolearjen". En dan, in pear dagen letter, komt der in brief wêryn in funksje yn Python skreaun is dy't myn lansearprobleem oplost!!! Op dat stuit wie de NiPyAPI-ferzje 0.13.3 en, fansels, wie d'r neat as dat. Mar yn ferzje 0.14.0, dy't frij koartlyn útbrocht waard, wie dizze funksje al opnommen yn 'e bibleteek. Moetsje,
nipyapi.canvas.set_remote_process_group_transmission
Dat, mei help fan de NiPyAPI-bibleteek, ferbûnen wy it register, rôle stream út, en begon sels prosessoren en gegevensferfier. Dan kinne jo de koade kammen, alle soarten kontrôles tafoegje, loggje, en dat is alles. Mar dat is in folslein oar ferhaal.
Fan 'e automatisearringsopsjes dy't ik beskôge, like de lêste my it effisjinter. As earste is dit noch python-koade, wêryn jo helpprogramma-koade kinne ynbêde en profitearje fan alle foardielen fan 'e programmeartaal. Twadder is it NiPyAPI-projekt aktyf ûntwikkele en yn gefal fan problemen kinne jo skriuwe nei de ûntwikkelder. Tredde is NiPyAPI noch in fleksibeler ark foar ynteraksje mei NiFi by it oplossen fan komplekse problemen. Bygelyks by it bepalen oft de berjochtenwachtrige no leech binne yn 'e stream en oft de prosesgroep bywurke wurde kin.
Da's alles. Ik beskreau 3 oanpakken foar it automatisearjen fan streamferliening yn NiFi, falkûlen dy't in ûntwikkelder kin tsjinkomme, en levere wurkkoade foar it automatisearjen fan levering. As jo sa ynteressearre binne yn dit ûnderwerp as ik bin -
Boarne: www.habr.com