Hallo jiddereen!
D'Aufgab ass wéi follegt - et gëtt e Flow am Bild hei uewen gewisen, dee muss op N Server gerullt ginn
NiFi Site to Site (S2S) ass e sécheren, héich personaliséierbare Wee fir Daten tëscht NiFi Instanzen ze transferéieren. Gesinn wéi S2S Wierker
Wann et ëm Datenübertragung mat S2S kënnt, gëtt eng Instanz e Client genannt, déi zweet ass e Server. De Client schéckt Daten, de Server kritt se. Zwee Weeër fir Datentransfer tëscht hinnen opzestellen:
- Push. D'Donnéeë gi vun der Clientinstanz mat enger Remote Process Group (RPG) geschéckt. Op der Serverinstanz ginn Daten mam Input Port kritt
- Pull. De Server kritt Daten mam RPG, de Client schéckt mam Output Hafen.
Flow fir Rolling gëtt am Apache Registry gespäichert.
Apache NiFi Registry ass en Ënnerprojet vun Apache NiFi deen e Flowspeicher a Versiounsinstrument ubitt. Eng Zort GIT. Informatioun iwwer d'Installatioun, d'Konfiguratioun an d'Aarbecht mat der Registry fannt Dir an
Am Ufank, wann N eng kleng Zuel ass, gëtt de Flux geliwwert an aktualiséiert mat der Hand an enger raisonnabel Zäit.
Awer wéi N wiisst, ginn et méi Probleemer:
- et brauch méi Zäit fir de Flux ze aktualiséieren. Dir musst op all Server goen
- et gi Feeler beim Update vun Templates. Hei hu se aktualiséiert, awer hei hu se vergiess
- mënschleche Feeler wann Dir eng grouss Zuel vun ähnlechen Operatiounen ausféiert
All dëst bréngt eis op d'Tatsaach datt et néideg ass de Prozess ze automatiséieren. Ech hunn déi folgend Weeër probéiert fir dëse Problem ze léisen:
- Benotzt MiNiFi amplaz NiFi
- NiFi CLI
- NiPyAPI
MiNiFi benotzen
En aneren Ënnerprojet, MiNiFi C2 Server, hëlleft dëse Problem ze léisen. Dëst Produkt soll den zentrale Punkt an der Deploymentarchitektur sinn. Wéi konfiguréiert d'Ëmwelt - beschriwwen an
D'Optioun, déi am Artikel hei uewen beschriwwe gëtt, funktionnéiert an net schwéier ze implementéieren, awer mir däerfen déi folgend net vergiessen:
- minifi huet net all Prozessoren vun nifi
- CPU Versiounen am Minifi lag hannert CPU Versiounen an NiFi.
Zu der Zäit vum Schreiwen ass déi lescht Versioun vum NiFi 1.9.2. D'Prozessor Versioun vun der leschter MiNiFi Versioun ass 1.7.0. Prozessoren kënnen op MiNiFi bäigefüügt ginn, awer wéinst Versiounsdiskrepanzen tëscht NiFi a MiNiFi Prozessoren kann dëst net funktionnéieren.
NiFi CLI
Gemengt
Run den Utility
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Fir datt mir den néidege Flux aus der Registry lueden, musse mir d'Identifiséierer vum Kuerf (Eemer Identifizéierer) an de Flux selwer (Flow Identifizéierer) kennen. Dës Donnéeë kënnen entweder duerch d'cli oder an der NiFi Registry Web Interface kritt ginn. De Webinterface gesäit esou aus:
Mat der CLI maacht Dir dëst:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Run Import Prozess Grupp aus Registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
E wichtege Punkt ass datt all Nifi Instanz kann als Host spezifizéiert ginn op deem mir de Prozessgrupp rullen.
Prozess Grupp dobäi mat gestoppt Prozessoren, si mussen ugefaangen ginn
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, d'Prozessoren hunn ugefaang. Wéi och ëmmer, laut de Konditioune vum Problem, brauche mir NiFi Instanzen fir Daten an aner Instanzen ze schécken. Loosst eis unhuelen datt d'Push Method gewielt gouf fir Daten op de Server ze transferéieren. Fir den Datenübertragung z'organiséieren, ass et néideg den Datenübertragung z'aktivéieren (Sendung aktivéieren) op der zousätzlech Remote Process Group (RPG), déi schonn an eisem Flow abegraff ass.
An der Dokumentatioun am CLI an aner Quellen hunn ech kee Wee fonnt fir den Datenübertragung z'erméiglechen. Wann Dir wësst wéi Dir dëst maacht, schreift w.e.g. an de Kommentarer.
Well mir Bash hunn a mir prett sinn bis zum Schluss ze goen, fanne mir e Wee eraus! Dir kënnt den NiFi API benotzen fir dëse Problem ze léisen. Loosst eis déi folgend Method benotzen, mir huelen d'ID aus de Beispiller hei uewen (an eisem Fall ass et 7f522a13-016e-1000-e504-d5b15587f2f3). Beschreiwung vun NiFi API Methoden
Am Kierper musst Dir JSON passéieren, vun der folgender Form:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameteren déi musse gefëllt ginn fir ze "schaffen":
Staat - Daten Transfert Status. Verfügbar TRANSMITTING fir den Datenübertragung z'aktivéieren, STOPPED fir auszeschalten
Versioun - Prozessor Versioun
Versioun gëtt Standard op 0 wann geschaf, mä dës Parameteren kann mat der Method kritt ginn
Fir Liebhaber vu Bash Scripten, kann dës Method gëeegent schéngen, awer et ass schwéier fir mech - Bash Scripten sinn net meng Liiblings. Deen nächste Wee ass menger Meenung no méi interessant a méi praktesch.
NiPyAPI
NiPyAPI ass eng Python Bibliothéik fir mat NiFi Instanzen ze interagéieren.
Eis Skript fir d'Konfiguratioun auszerollen ass e Python Programm. Loosst eis op d'Kodéierung goen.
Konfiguréieren fir weider Aarbecht. Mir brauchen déi folgend Parameteren:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Weider wäert ech d'Nimm vun de Methoden vun dëser Bibliothéik setzen, déi beschriwwe sinn
Mir verbannen de Registry un d'nifi Instanz benotzt
nipyapi.versioning.create_registry_client
Op dësem Schrëtt kënnt Dir och e Scheck addéieren datt d'Registry schonn an d'Instanz bäigefüügt gouf, dofir kënnt Dir d'Methode benotzen
nipyapi.versioning.list_registry_clients
Mir fannen den Eemer fir weider no de Flux am Kuerf ze sichen
nipyapi.versioning.get_registry_bucket
No der fonnt Eemer, mir sichen fir Flux
nipyapi.versioning.get_flow_in_bucket
Als nächst ass et wichteg ze verstoen ob dës Prozessgrupp scho bäigefüügt gouf. D'Prozessgrupp gëtt duerch Koordinate plazéiert an eng Situatioun kann entstoen wann en zweeten op engem iwwerlagert gëtt. Ech iwwerpréift, et kann 🙂 Ze kréien all dobäi Prozess Grupp, benotzen d'Method
nipyapi.canvas.list_all_process_groups
an da kënne mir zum Beispill mam Numm sichen.
Ech wäert de Prozess vun der Aktualiséierung vun der Schabloun net beschreiwen, ech wäert nëmme soen datt wann Prozessoren an der neier Versioun vun der Schabloun bäigefüügt ginn, da gi keng Probleemer mat der Präsenz vu Messagen an de Schlaangen. Awer wann d'Prozessoren ofgeschaaft ginn, da kënne Probleemer entstoen (nifi erlaabt d'Entfernung vum Prozessor net wann eng Messageschlaang virun him accumuléiert ass). Wann Dir interesséiert sidd wéi ech dëse Problem geléist hunn - schreiwt mir w.e.g., mir wäerten dëse Punkt diskutéieren. Kontakter um Enn vum Artikel. Loosst eis op de Schrëtt weidergoen fir eng Prozessgrupp derbäi ze ginn.
Beim Debugging vum Skript sinn ech op eng Feature begéint datt déi lescht Versioun vum Flow net ëmmer opgezunn ass, also ech recommandéieren Iech fir d'éischt dës Versioun ze klären:
nipyapi.versioning.get_latest_flow_ver
Deploy Prozess Grupp:
nipyapi.versioning.deploy_flow_version
Mir starten d'Prozessoren:
nipyapi.canvas.schedule_process_group
Am Block iwwer CLI gouf geschriwwen datt den Datenübertragung net automatesch an der Remote Prozessgrupp aktivéiert ass? Beim Ëmsetzung vum Skript hunn ech och dëse Problem begéint. Zu där Zäit konnt ech d'Datentransfer net mat der API starten an ech hu beschloss un den Entwéckler vun der NiPyAPI Bibliothéik ze schreiwen an ëm Rot / Hëllef ze froen. Den Entwéckler huet mir geäntwert, mir hunn de Problem diskutéiert an hien huet geschriwwen datt hien Zäit brauch fir "eppes ze kontrolléieren". An elo, e puer Deeg méi spéit, kënnt eng E-Mail an deem eng Python Funktioun geschriwwe gëtt, déi mäi Startupproblem léist !!! Zu där Zäit war d'NiPyAPI Versioun 0.13.3 an natierlech war et näischt vun der Aart dran. Awer an der Versioun 0.14.0, déi zimlech viru kuerzem verëffentlecht gouf, ass dës Funktioun schonn an der Bibliothéik abegraff. Treffen
nipyapi.canvas.set_remote_process_group_transmission
Also, mat der Hëllef vun der NiPyAPI Bibliothéik hu mir d'Registry verbonnen, de Flow opgerullt, a souguer d'Prozessoren an d'Datentransfer ugefaang. Da kënnt Dir de Code kämmen, all Zorte vu Schecken derbäisetzen, aloggen, an dat ass et. Mä dat ass eng ganz aner Geschicht.
Vun den Automatisatiounsoptiounen, déi ech betruecht hunn, war déi lescht fir mech am effizientesten. Als éischt ass dëst nach ëmmer Python-Code, an deem Dir Hëllefsprogrammcode kënnt abedecken an all d'Virdeeler vun enger Programméierungssprooch genéissen. Zweetens ass den NiPyAPI Projet aktiv entwéckelt an am Fall vu Probleemer kënnt Dir un den Entwéckler schreiwen. Drëttens ass NiPyAPI nach ëmmer e méi flexibelt Tool fir mat NiFi ze interagéieren fir komplex Probleemer ze léisen. Zum Beispill, fir ze bestëmmen ob d'Messageschlaangen am Moment eidel am Flux sinn an ob et méiglech ass de Prozessgrupp ze aktualiséieren.
Dat ass alles. Ech beschriwwen 3 Approche fir d'Automatiséierung vun der Flow-Liwwerung an NiFi, d'Fallen déi en Entwéckler begéine kann an en Aarbechtscode fir d'Liwwerung automatiséieren. Wann Dir grad esou un dësem Thema interesséiert sidd wéi ech -
Source: will.com