Tere kõigile!
Ülesanne on järgmine - ülaltoodud pildil on voog, mis tuleb N serverisse välja veeretada
NiFi Site to Site (S2S) on turvaline ja hõlpsasti konfigureeritav viis andmete edastamiseks NiFi eksemplaride vahel. Kuidas S2S töötab, vaata
Juhtudel, kui me räägime andmeedastusest S2S-i abil, nimetatakse ühte eksemplari kliendiks, teist serveriks. Klient saadab andmeid, server võtab vastu. Nendevahelise andmeedastuse konfigureerimiseks on kaks võimalust:
- Lükkama. Kliendieksemplarist saadetakse andmed kaugprotsessirühma (RPG) abil. Serveri eksemplaris võetakse andmeid vastu sisendpordi kaudu
- Tõmmata. Server võtab andmeid vastu RPG abil, klient saadab väljundpordi kaudu.
Käivitamiseks mõeldud voog salvestatakse Apache registrisse.
Apache NiFi register on Apache NiFi alamprojekt, mis pakub voosalvestuse ja versioonikontrolli tööriista. Omamoodi GIT. Teavet registri installimise, konfigureerimise ja sellega töötamise kohta leiate aadressilt
Alguses, kui N on väike arv, edastatakse voog ja värskendatakse seda käsitsi vastuvõetava aja jooksul.
Kuid kui N kasvab, muutuvad probleemid üha arvukamaks:
- voo värskendamine võtab rohkem aega. Peate kõikidesse serveritesse sisse logima
- Malli värskendamisel ilmnevad vead. Siin nad värskendasid seda, kuid siin nad unustasid
- inimlikud vead suure hulga sarnaste toimingute tegemisel
Kõik see viib meid tõsiasjani, et peame protsessi automatiseerima. Proovisin selle probleemi lahendamiseks järgmisi viise:
- Kasutage NiFi asemel MiNiFi
- NiFi CLI
- NiPyAPI
MiNiFi kasutamine
Teine alamprojekt aitab seda probleemi lahendada - MiNiFi C2 Server. See toode on mõeldud konfiguratsiooni levitamise arhitektuuri keskseks punktiks. Kuidas keskkonda konfigureerida - kirjeldatud artiklis
Ülaltoodud artiklis kirjeldatud valik töötab ja seda pole keeruline rakendada, kuid me ei tohi unustada järgmist:
- Minifil pole kõiki nifi protsessoreid
- Minifi protsessori versioonid jäävad NiFi protsessori versioonidest maha.
Selle artikli kirjutamise ajal oli NiFi uusim versioon 1.9.2. Uusim MiNiFi protsessori versioon on 1.7.0. Protsessoreid saab MiNiFi-le lisada, kuid NiFi ja MiNiFi protsessorite versioonierinevuste tõttu ei pruugi see toimida.
NiFi CLI
Otsustades
Käivitage utiliit
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Selleks, et saaksime registrist vajaliku voo laadida, peame teadma ämbri (ämbri identifikaator) ja voo enda (voo identifikaator) identifikaatoreid. Neid andmeid saab hankida kas klii kaudu või NiFi registri veebiliidese kaudu. Veebiliideses näeb see välja järgmine:
CLI abil tehakse seda:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Alustame protsessirühma importimist registrist:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Oluline punkt on see, et iga nifi eksemplari saab määrata hostiks, kuhu protsessirühma veeretame.
Peatatud protsessoritega protsessigrupp on lisatud, need tuleb käivitada
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Suurepärane, protsessorid on käivitunud. Kuid vastavalt ülesande tingimustele vajame NiFi eksemplare andmete saatmiseks teistele eksemplaridele. Oletame, et olete andmete serverisse edastamiseks valinud Push-meetodi. Andmeedastuse korraldamiseks peate lubama andmeedastuse lisatud Remote Process Groupis (RPG), mis on meie voos juba kaasatud.
CLI-s ja muudes allikates olevas dokumentatsioonis ei leidnud ma võimalust andmeedastuse võimaldamiseks. Kui teate, kuidas seda teha, kirjutage kommentaaridesse.
Kuna meil on bash ja oleme valmis lõpuni minema, siis leiame väljapääsu! Selle probleemi lahendamiseks saate kasutada NiFi API-d. Kasutame järgmist meetodit, võtke ID ülaltoodud näidetest (meie puhul on see 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API meetodite kirjeldus
Kehas peate JSON-i edastama järgmiselt:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameetrid, mis tuleb selle toimimiseks täita:
riik — andmeedastuse olek. Saadaval: EDASTAMINE andmeedastuse lubamiseks, STOPPED keelamiseks
versioon - protsessori versioon
versioon on loomisel vaikimisi 0, kuid need parameetrid saab selle meetodi abil hankida
Bash-skriptide austajatele võib see meetod tunduda sobiv, kuid minu jaoks on see pisut keeruline - bash-skriptid pole minu lemmikud. Järgmine meetod on minu arvates huvitavam ja mugavam.
NiPyAPI
NiPyAPI on Pythoni teek NiFi eksemplaridega suhtlemiseks.
Meie konfiguratsiooni käivitamise skript on Pythoni programm. Liigume edasi kodeerimise juurde.
Seadistasime konfiguratsioonid edasiseks tööks. Vajame järgmisi parameetreid:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Järgmisena lisan selle teegi meetodite nimed, mida on kirjeldatud
Ühendage register nifi eksemplariga kasutades
nipyapi.versioning.create_registry_client
Selles etapis saate lisada ka kontrolli, kas register on eksemplarile juba lisatud; selleks saate kasutada meetodit
nipyapi.versioning.list_registry_clients
Leiame ämbri korvis oleva voolu edasiseks otsimiseks
nipyapi.versioning.get_registry_bucket
Kasutades leitud ämbrit, otsime voolu
nipyapi.versioning.get_flow_in_bucket
Järgmiseks on oluline mõista, kas see protsessirühm on juba lisatud. Protsessi grupp paigutatakse koordinaatide järgi ja võib tekkida olukord, kui ühe peale asetatakse teine komponent. Kontrollisin, see võib juhtuda :) Kõigi lisatud protsessirühmade hankimiseks kasutame meetodit
nipyapi.canvas.list_all_process_groups
Saame edasi otsida näiteks nime järgi.
Ma ei kirjelda malli värskendamise protsessi, ütlen ainult, et kui malli uude versiooni lisatakse protsessorid, siis pole probleeme sõnumite olemasoluga järjekordades. Kui aga protsessorid eemaldada, siis võivad tekkida probleemid (nifi ei luba protsessorit eemaldada, kui selle ette on kogunenud sõnumijärjekord). Kui olete huvitatud sellest, kuidas ma selle probleemi lahendasin, kirjutage mulle ja me arutame seda küsimust. Kontaktid artikli lõpus. Liigume edasi protsessirühma lisamise sammu juurde.
Skripti silumisel puutusin kokku eripäraga, et voo uusimat versiooni ei tõmmata alati üles, seega soovitan esmalt seda versiooni kontrollida:
nipyapi.versioning.get_latest_flow_ver
Protsessirühma juurutamine:
nipyapi.versioning.deploy_flow_version
Käivitame protsessorid:
nipyapi.canvas.schedule_process_group
CLI-teemalises plokis oli kirjas, et andmeedastus pole kaugprotsessigrupis automaatselt lubatud? Skripti rakendamisel puutusin ka selle probleemiga kokku. Sel ajal ei saanud ma API abil andmeedastust alustada ja otsustasin kirjutada NiPyAPI teegi arendajale ja küsida nõu/abi. Arendaja vastas mulle, arutasime probleemi ja ta kirjutas, et vajab aega “millegi kontrollimiseks”. Ja siis paari päeva pärast tuleb kiri, milles on Pythonis kirjutatud funktsioon, mis lahendab minu käivitamisprobleemi!!! Tol ajal oli NiPyAPI versioon 0.13.3 ja loomulikult polnud midagi sellist. Kuid versioonis 0.14.0, mis ilmus üsna hiljuti, oli see funktsioon juba teegis. kohtuda,
nipyapi.canvas.set_remote_process_group_transmission
Nii ühendasime NiPyAPI teeki kasutades registri, käivitasime voo ja isegi alustasime protsessoreid ja andmeedastust. Seejärel saate koodi kammida, lisada igasuguseid kontrolle, logida ja see on kõik. Aga see on hoopis teine lugu.
Kaalutud automatiseerimisvõimalustest tundus viimane mulle kõige tõhusam. Esiteks on see ikkagi pythoni kood, millesse saab põimida abiprogrammi koodi ja kasutada kõiki programmeerimiskeele eeliseid. Teiseks NiPyAPI projekt areneb aktiivselt ja probleemide korral saab arendajale kirjutada. Kolmandaks on NiPyAPI endiselt paindlikum tööriist NiFi-ga suhtlemiseks keerukate probleemide lahendamisel. Näiteks selleks, et teha kindlaks, kas sõnumijärjekorrad on voos nüüd tühjad ja kas protsessirühma saab värskendada.
See on kõik. Kirjeldasin 3 lähenemisviisi voo edastamise automatiseerimiseks NiFi-s, lõkse, millega arendaja võib kokku puutuda, ja andsin töökoodi kohaletoimetamise automatiseerimiseks. Kui olete sellest teemast sama huvitatud kui mina -
Allikas: www.habr.com