Flow Liwwerung Automatioun an Apache NiFi

Hallo jiddereen!

Flow Liwwerung Automatioun an Apache NiFi

D'Aufgab ass wéi follegt - et gëtt e Flow am Bild hei uewen gewisen, dee muss op N Server gerullt ginn Apache NiFi. Flowtest - eng Datei gëtt generéiert an an eng aner NiFi Instanz geschéckt. Datentransfer geschitt mam NiFi Site zu Site Protokoll.

NiFi Site to Site (S2S) ass e sécheren, héich personaliséierbare Wee fir Daten tëscht NiFi Instanzen ze transferéieren. Gesinn wéi S2S Wierker Dokumentatioun an et ass wichteg ze erënneren fir Är NiFi Instanz opzestellen fir S2S ze gesinn hei.

Wann et ëm Datenübertragung mat S2S kënnt, gëtt eng Instanz e Client genannt, déi zweet ass e Server. De Client schéckt Daten, de Server kritt se. Zwee Weeër fir Datentransfer tëscht hinnen opzestellen:

  1. Push. D'Donnéeë gi vun der Clientinstanz mat enger Remote Process Group (RPG) geschéckt. Op der Serverinstanz ginn Daten mam Input Port kritt
  2. Pull. De Server kritt Daten mam RPG, de Client schéckt mam Output Hafen.


Flow fir Rolling gëtt am Apache Registry gespäichert.

Apache NiFi Registry ass en Ënnerprojet vun Apache NiFi deen e Flowspeicher a Versiounsinstrument ubitt. Eng Zort GIT. Informatioun iwwer d'Installatioun, d'Konfiguratioun an d'Aarbecht mat der Registry fannt Dir an offiziell Dokumentatioun. Flow fir d'Späichere gëtt an eng Prozessgrupp kombinéiert an am Registry an dëser Form gespäichert. Mir kommen op dëst méi spéit am Artikel zréck.

Am Ufank, wann N eng kleng Zuel ass, gëtt de Flux geliwwert an aktualiséiert mat der Hand an enger raisonnabel Zäit.

Awer wéi N wiisst, ginn et méi Probleemer:

  1. et brauch méi Zäit fir de Flux ze aktualiséieren. Dir musst op all Server goen
  2. et gi Feeler beim Update vun Templates. Hei hu se aktualiséiert, awer hei hu se vergiess
  3. mënschleche Feeler wann Dir eng grouss Zuel vun ähnlechen Operatiounen ausféiert

All dëst bréngt eis op d'Tatsaach datt et néideg ass de Prozess ze automatiséieren. Ech hunn déi folgend Weeër probéiert fir dëse Problem ze léisen:

  1. Benotzt MiNiFi amplaz NiFi
  2. NiFi CLI
  3. NiPyAPI

MiNiFi benotzen

ApacheMiNify ass en Ënnerprojet vun Apache NiFi. MiNiFy ass e kompakten Agent deen déiselwecht Prozessoren wéi NiFi benotzt, wat Iech erlaabt dee selwechte Flow ze kreéieren wéi an NiFi. D'Liichtegkeet vum Agent gëtt erreecht, ënner anerem, wéinst der Tatsaach, datt MiNiFy keng grafesch Interface fir d'Flowkonfiguratioun huet. Dem MiNiFy säi Mangel un enger graphescher Interface bedeit datt et néideg ass de Problem vun der Flow Liwwerung am Minifi ze léisen. Zënter MiNiFy aktiv am IOT benotzt gëtt, ginn et vill Komponenten an de Prozess fir de Flux op endgülteg Minifi Instanzen ze liwweren muss automatiséiert ginn. Eng vertraute Aufgab, oder?

En aneren Ënnerprojet, MiNiFi C2 Server, hëlleft dëse Problem ze léisen. Dëst Produkt soll den zentrale Punkt an der Deploymentarchitektur sinn. Wéi konfiguréiert d'Ëmwelt - beschriwwen an dësen Artikel op Habré an d'Informatioun geet duer fir de Problem ze léisen. MiNiFi a Verbindung mam C2 Server aktualiséiert automatesch seng Konfiguratioun. Deen eenzegen Nodeel vun dëser Approche ass datt Dir Templates um C2 Server erstellen musst, en einfachen Engagement fir d'Registry ass net genuch.

D'Optioun, déi am Artikel hei uewen beschriwwe gëtt, funktionnéiert an net schwéier ze implementéieren, awer mir däerfen déi folgend net vergiessen:

  1. minifi huet net all Prozessoren vun nifi
  2. CPU Versiounen am Minifi lag hannert CPU Versiounen an NiFi.

Zu der Zäit vum Schreiwen ass déi lescht Versioun vum NiFi 1.9.2. D'Prozessor Versioun vun der leschter MiNiFi Versioun ass 1.7.0. Prozessoren kënnen op MiNiFi bäigefüügt ginn, awer wéinst Versiounsdiskrepanzen tëscht NiFi a MiNiFi Prozessoren kann dëst net funktionnéieren.

NiFi CLI

Gemengt Beschreiwung Tool op der offizieller Websäit, dëst ass en Tool fir d'Interaktioun tëscht NiFI an NiFi Registry am Beräich vun der Flow Liwwerung oder Prozessmanagement ze automatiséieren. Luet dëst Tool erof fir unzefänken. vun hei.

Run den Utility

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Fir datt mir den néidege Flux aus der Registry lueden, musse mir d'Identifiséierer vum Kuerf (Eemer Identifizéierer) an de Flux selwer (Flow Identifizéierer) kennen. Dës Donnéeë kënnen entweder duerch d'cli oder an der NiFi Registry Web Interface kritt ginn. De Webinterface gesäit esou aus:

Flow Liwwerung Automatioun an Apache NiFi

Mat der CLI maacht Dir dëst:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Run Import Prozess Grupp aus Registry:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

E wichtege Punkt ass datt all Nifi Instanz kann als Host spezifizéiert ginn op deem mir de Prozessgrupp rullen.

Prozess Grupp dobäi mat gestoppt Prozessoren, si mussen ugefaangen ginn

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Super, d'Prozessoren hunn ugefaang. Wéi och ëmmer, laut de Konditioune vum Problem, brauche mir NiFi Instanzen fir Daten an aner Instanzen ze schécken. Loosst eis unhuelen datt d'Push Method gewielt gouf fir Daten op de Server ze transferéieren. Fir den Datenübertragung z'organiséieren, ass et néideg den Datenübertragung z'aktivéieren (Sendung aktivéieren) op der zousätzlech Remote Process Group (RPG), déi schonn an eisem Flow abegraff ass.

Flow Liwwerung Automatioun an Apache NiFi

An der Dokumentatioun am CLI an aner Quellen hunn ech kee Wee fonnt fir den Datenübertragung z'erméiglechen. Wann Dir wësst wéi Dir dëst maacht, schreift w.e.g. an de Kommentarer.

Well mir Bash hunn a mir prett sinn bis zum Schluss ze goen, fanne mir e Wee eraus! Dir kënnt den NiFi API benotzen fir dëse Problem ze léisen. Loosst eis déi folgend Method benotzen, mir huelen d'ID aus de Beispiller hei uewen (an eisem Fall ass et 7f522a13-016e-1000-e504-d5b15587f2f3). Beschreiwung vun NiFi API Methoden hei.

Flow Liwwerung Automatioun an Apache NiFi
Am Kierper musst Dir JSON passéieren, vun der folgender Form:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameteren déi musse gefëllt ginn fir ze "schaffen":
Staat - Daten Transfert Status. Verfügbar TRANSMITTING fir den Datenübertragung z'aktivéieren, STOPPED fir auszeschalten
Versioun - Prozessor Versioun

Versioun gëtt Standard op 0 wann geschaf, mä dës Parameteren kann mat der Method kritt ginn

Flow Liwwerung Automatioun an Apache NiFi

Fir Liebhaber vu Bash Scripten, kann dës Method gëeegent schéngen, awer et ass schwéier fir mech - Bash Scripten sinn net meng Liiblings. Deen nächste Wee ass menger Meenung no méi interessant a méi praktesch.

NiPyAPI

NiPyAPI ass eng Python Bibliothéik fir mat NiFi Instanzen ze interagéieren. Dokumentatioun Säit enthält déi néideg Informatioune fir mat der Bibliothéik ze schaffen. Quick Start ass beschriwwen an Projet op github.

Eis Skript fir d'Konfiguratioun auszerollen ass e Python Programm. Loosst eis op d'Kodéierung goen.
Konfiguréieren fir weider Aarbecht. Mir brauchen déi folgend Parameteren:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Weider wäert ech d'Nimm vun de Methoden vun dëser Bibliothéik setzen, déi beschriwwe sinn hei.

Mir verbannen de Registry un d'nifi Instanz benotzt

nipyapi.versioning.create_registry_client

Op dësem Schrëtt kënnt Dir och e Scheck addéieren datt d'Registry schonn an d'Instanz bäigefüügt gouf, dofir kënnt Dir d'Methode benotzen

nipyapi.versioning.list_registry_clients

Mir fannen den Eemer fir weider no de Flux am Kuerf ze sichen

nipyapi.versioning.get_registry_bucket

No der fonnt Eemer, mir sichen fir Flux

nipyapi.versioning.get_flow_in_bucket

Als nächst ass et wichteg ze verstoen ob dës Prozessgrupp scho bäigefüügt gouf. D'Prozessgrupp gëtt duerch Koordinate plazéiert an eng Situatioun kann entstoen wann en zweeten op engem iwwerlagert gëtt. Ech iwwerpréift, et kann 🙂 Ze kréien all dobäi Prozess Grupp, benotzen d'Method

nipyapi.canvas.list_all_process_groups

an da kënne mir zum Beispill mam Numm sichen.

Ech wäert de Prozess vun der Aktualiséierung vun der Schabloun net beschreiwen, ech wäert nëmme soen datt wann Prozessoren an der neier Versioun vun der Schabloun bäigefüügt ginn, da gi keng Probleemer mat der Präsenz vu Messagen an de Schlaangen. Awer wann d'Prozessoren ofgeschaaft ginn, da kënne Probleemer entstoen (nifi erlaabt d'Entfernung vum Prozessor net wann eng Messageschlaang virun him accumuléiert ass). Wann Dir interesséiert sidd wéi ech dëse Problem geléist hunn - schreiwt mir w.e.g., mir wäerten dëse Punkt diskutéieren. Kontakter um Enn vum Artikel. Loosst eis op de Schrëtt weidergoen fir eng Prozessgrupp derbäi ze ginn.

Beim Debugging vum Skript sinn ech op eng Feature begéint datt déi lescht Versioun vum Flow net ëmmer opgezunn ass, also ech recommandéieren Iech fir d'éischt dës Versioun ze klären:

nipyapi.versioning.get_latest_flow_ver

Deploy Prozess Grupp:

nipyapi.versioning.deploy_flow_version

Mir starten d'Prozessoren:

nipyapi.canvas.schedule_process_group

Am Block iwwer CLI gouf geschriwwen datt den Datenübertragung net automatesch an der Remote Prozessgrupp aktivéiert ass? Beim Ëmsetzung vum Skript hunn ech och dëse Problem begéint. Zu där Zäit konnt ech d'Datentransfer net mat der API starten an ech hu beschloss un den Entwéckler vun der NiPyAPI Bibliothéik ze schreiwen an ëm Rot / Hëllef ze froen. Den Entwéckler huet mir geäntwert, mir hunn de Problem diskutéiert an hien huet geschriwwen datt hien Zäit brauch fir "eppes ze kontrolléieren". An elo, e puer Deeg méi spéit, kënnt eng E-Mail an deem eng Python Funktioun geschriwwe gëtt, déi mäi Startupproblem léist !!! Zu där Zäit war d'NiPyAPI Versioun 0.13.3 an natierlech war et näischt vun der Aart dran. Awer an der Versioun 0.14.0, déi zimlech viru kuerzem verëffentlecht gouf, ass dës Funktioun schonn an der Bibliothéik abegraff. Treffen

nipyapi.canvas.set_remote_process_group_transmission

Also, mat der Hëllef vun der NiPyAPI Bibliothéik hu mir d'Registry verbonnen, de Flow opgerullt, a souguer d'Prozessoren an d'Datentransfer ugefaang. Da kënnt Dir de Code kämmen, all Zorte vu Schecken derbäisetzen, aloggen, an dat ass et. Mä dat ass eng ganz aner Geschicht.

Vun den Automatisatiounsoptiounen, déi ech betruecht hunn, war déi lescht fir mech am effizientesten. Als éischt ass dëst nach ëmmer Python-Code, an deem Dir Hëllefsprogrammcode kënnt abedecken an all d'Virdeeler vun enger Programméierungssprooch genéissen. Zweetens ass den NiPyAPI Projet aktiv entwéckelt an am Fall vu Probleemer kënnt Dir un den Entwéckler schreiwen. Drëttens ass NiPyAPI nach ëmmer e méi flexibelt Tool fir mat NiFi ze interagéieren fir komplex Probleemer ze léisen. Zum Beispill, fir ze bestëmmen ob d'Messageschlaangen am Moment eidel am Flux sinn an ob et méiglech ass de Prozessgrupp ze aktualiséieren.

Dat ass alles. Ech beschriwwen 3 Approche fir d'Automatiséierung vun der Flow-Liwwerung an NiFi, d'Fallen déi en Entwéckler begéine kann an en Aarbechtscode fir d'Liwwerung automatiséieren. Wann Dir grad esou un dësem Thema interesséiert sidd wéi ech - schreiwen!

Source: will.com

Setzt e Commentaire