Fluxuen entregaren automatizazioa Apache NiFi-n

Hola a todos!

Fluxuen entregaren automatizazioa Apache NiFi-n

Zeregin hau da: fluxu bat dago, goiko irudian aurkezten dena, N zerbitzarietara zabaldu behar dena. Apache NiFi. Fluxuaren proba - fitxategi bat sortzen ari da eta beste NiFi instantzia batera bidaltzen ari da. Datuen transferentzia NiFi Site to Site protokoloa erabiliz egiten da.

NiFi Site to Site (S2S) NiFi instantzien artean datuak transferitzeko modu seguru eta erraz konfiguragarria da. S2S nola funtzionatzen duen, ikusi dokumentazioa eta garrantzitsua da ez ahaztea NiFi instantzia konfiguratzea S2S baimentzeko, ikus Hemen.

S2S erabiliz datuen transferentziari buruz ari garen kasuetan, instantzia bati bezeroa deitzen zaio, bigarren zerbitzariari. Bezeroak datuak bidaltzen ditu, zerbitzariak jasotzen ditu. Bien arteko datu-transferentzia konfiguratzeko bi modu:

  1. Bultza. Bezeroaren instantziatik, datuak Urruneko Prozesu Taldea (RPG) erabiliz bidaltzen dira. Zerbitzariaren instantzian, datuak sarrerako ataka erabiliz jasotzen dira
  2. Pull. Zerbitzariak datuak RPG erabiliz jasotzen ditu, bezeroak Irteera ataka erabiliz bidaltzen ditu.


Abian jartzeko fluxua Apache Erregistroan gordetzen da.

Apache NiFi Registry Apache NiFiren azpiproiektua da, fluxua biltegiratzeko eta bertsioen kontrolarako tresna eskaintzen duena. GIT moduko bat. Erregistroarekin instalatzeari, konfiguratzeari eta lan egiteari buruzko informazioa hemen aurki daiteke dokumentazio ofiziala. Biltegiratzeko fluxua prozesu talde batean konbinatzen da eta inprimaki honetan gordetzen da erregistroan. Honetara itzuliko gara geroago artikuluan.

Hasieran, N zenbaki txikia denean, fluxua eskuz entregatu eta eguneratzen da denbora onargarrian.

Baina N hazi ahala, arazoak ugariagoak dira:

  1. denbora gehiago behar da fluxua eguneratzeko. Zerbitzari guztietan saioa hasi behar duzu
  2. Txantiloiak eguneratzean erroreak gertatzen dira. Hemen eguneratu dute, baina hemen ahaztu egin dira
  3. giza akatsak antzeko eragiketa ugari egitean

Horrek guztiak prozesua automatizatu behar dugula ezagutzera garamatza. Arazo hau konpontzeko modu hauek probatu ditut:

  1. Erabili MiNiFi NiFiren ordez
  2. NiFi CLI
  3. NiPyAPI

MiNiFi erabiliz

Apache MiNiFy - Apache NiFi-ren azpiproiektua. MiNiFy agente trinkoa da, NiFiren prozesadore berdinak erabiltzen dituena, NiFiren fluxu berdinak sortzeko aukera ematen duena. Agentearen izaera arina lortzen da, besteak beste, MiNiFy-k fluxuaren konfiguraziorako interfaze grafikorik ez izateak. MiNiFy-n interfaze grafikorik ezak esan nahi du beharrezkoa dela minifi-ra fluxua emateko arazoa konpondu behar dela. MiNiFy IOT-n aktiboki erabiltzen denez, osagai asko daude eta azken minifi instantzietara fluxua emateko prozesua automatizatu behar da. Zeregin ezaguna, ezta?

Beste azpiproiektu batek arazo hau konpontzen lagunduko du - MiNiFi C2 Server. Produktu hau konfigurazio-inplementazio-arkitekturako puntu nagusia izan nahi du. Nola konfiguratu ingurunea - atalean deskribatua Artikulu honetan HabrΓ©ri buruzko informazio nahikoa dago arazoa konpontzeko. MiNiFik, C2 zerbitzariarekin batera, automatikoki eguneratzen du bere konfigurazioa. Ikuspegi honen eragozpen bakarra C2 Server-en txantiloiak sortu behar dituzula da; Erregistrorako konpromiso soila ez da nahikoa.

Goiko artikuluan deskribatutako aukera funtzionatzen du eta ez da zaila ezartzeko, baina ez dugu ahaztu behar honako hau:

  1. Minifi-k ez ditu nifi-ren prozesadore guztiak
  2. Minifi prozesadorearen bertsioak NiFi prozesadoreen bertsioen atzean geratzen dira.

Idazteko unean, NiFiren azken bertsioa 1.9.2 da. MiNiFi prozesadorearen azken bertsioa 1.7.0 da. MiNiFi-ra prozesadoreak gehi daitezke, baina NiFi eta MiNiFi prozesadoreen arteko bertsio-desberdintasunen ondorioz, baliteke honek ez funtzionatzea.

NiFi CLI

arabera deskribapena webgune ofizialeko tresna, NiFI eta NiFi Registry-ren arteko elkarrekintza automatizatzeko tresna bat da fluxuen entrega edo prozesuen kudeaketaren arloan. Hasteko, tresna hau deskargatu behar duzu. beraz,.

Abiarazi utilitatea

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Erregistrotik beharrezkoa den fluxua kargatzeko, kuboaren identifikatzaileak (ontziaren identifikatzailea) eta fluxua bera (fluxuaren identifikatzailea) ezagutu behar ditugu. Datu hauek cli bidez edo NiFi erregistroko web interfazean lor daitezke. Web interfazean honela ikusten da:

Fluxuen entregaren automatizazioa Apache NiFi-n

CLI erabiliz hau egiten da:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Prozesu taldea erregistrotik inportatzen hasten gara:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Puntu garrantzitsu bat da edozein nifi instantzia prozesu-taldea eramango dugun ostalari gisa zehaztu daitekeela.

Prozesu taldea gelditutako prozesadoreekin gehitu da, hasi behar dira

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Bikaina, prozesadoreak hasi dira. Hala ere, zereginaren baldintzen arabera, NiFi instantziak behar ditugu datuak beste instantzia batzuetara bidaltzeko. Demagun Push metodoa aukeratu duzula zerbitzariari datuak transferitzeko. Datu-transferentzia antolatzeko, datu-transferentzia gaitu behar duzu gehitutako Remote Process Group (RPG), dagoeneko gure fluxuan sartuta dagoena.

Fluxuen entregaren automatizazioa Apache NiFi-n

CLI-ko eta beste iturri batzuetako dokumentazioan, ez nuen datuen transferentzia gaitzeko modurik aurkitu. Hau nola egin badakizu, mesedez idatzi iruzkinetan.

Bash daukagunez eta amaierara joateko prest gaudenez, aterabide bat aurkituko dugu! NiFi APIa erabil dezakezu arazo hau konpontzeko. Erabili dezagun metodo hau, har dezagun goiko adibideetako IDa (gure kasuan 7f522a13-016e-1000-e504-d5b15587f2f3 da). NiFi API metodoen deskribapena Hemen.

Fluxuen entregaren automatizazioa Apache NiFi-n
Gorputzean JSON pasatu behar duzu, honela:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Funtzionatzeko bete beharreko parametroak:
egoera β€” Datu transferentziaren egoera. Eskuragarri: TRANSMITTING datuen transferentzia gaitzeko, STOPED desgaitzeko
bertsioa - prozesadorearen bertsioa

bertsioa 0 izango da lehenetsita sortzen denean, baina parametro hauek metodoa erabiliz lor daitezke

Fluxuen entregaren automatizazioa Apache NiFi-n

Bash scripten zaleentzat, metodo hau egokia iruditu daiteke, baina pixka bat zaila da niretzat - bash scriptak ez dira nire gogokoenak. Hurrengo metodoa interesgarriagoa eta erosoagoa da nire ustez.

NiPyAPI

NiPyAPI NiFi instantziekin elkarreragiteko Python liburutegi bat da. Dokumentazio orria liburutegiarekin lan egiteko beharrezko informazioa dauka. Hasiera azkarra atalean deskribatzen da proiektua github-en.

Konfigurazioa zabaltzeko gure scripta Python-eko programa bat da. Goazen kodeketara.
Lan gehiago egiteko konfigurazioak konfiguratzen ditugu. Parametro hauek beharko ditugu:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-api инстанса, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ Ρ€Π°Π·Π²ΠΎΡ€Π°Ρ‡ΠΈΠ²Π°Π΅ΠΌ process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ registry, ΠΊΠ°ΠΊ Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Π·Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² инстансС nifi
nipyapi.config.bucket_name = 'BucketName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ bucket, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ подтягиваСм flow
nipyapi.config.flow_name = 'FlowName' #Π½Π°Π·Π²Π°Π½ΠΈΠ΅ flow, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ подтягиваСм

Jarraian, liburutegi honen metodoen izenak txertatuko ditut, deskribatzen direnak Hemen.

Konektatu erregistroa nifi instantziara erabiliz

nipyapi.versioning.create_registry_client

Urrats honetan, erregistroa instantzian dagoeneko gehitu dela egiaztatzea ere gehi dezakezu; horretarako metodoa erabil dezakezu

nipyapi.versioning.list_registry_clients

Saskian fluxua gehiago bilatzeko kuboa aurkituko dugu

nipyapi.versioning.get_registry_bucket

Aurkitutako ontzia erabiliz, fluxua bilatzen dugu

nipyapi.versioning.get_flow_in_bucket

Ondoren, garrantzitsua da prozesu-talde hori dagoeneko gehitu den ulertzea. Prozesu taldea koordenatuen arabera kokatzen da eta egoera bat sor daiteke bigarren osagai bat baten gainean gainjartzean. Egiaztatu dut, hau gerta daiteke :) Gehitutako prozesu-talde guztiak lortzeko metodoa erabiltzen dugu

nipyapi.canvas.list_all_process_groups

Gehiago bilatu dezakegu, adibidez, izenaren arabera.

Ez dut txantiloia eguneratzeko prozesua deskribatuko, soilik esango dut txantiloiaren bertsio berrian prozesadoreak gehitzen badira, ez dagoela arazorik ilaretan mezuak egotearekin. Baina prozesadoreak kentzen badira, arazoak sor daitezke (nifi-k ez dizu prozesadore bat kentzen uzten, horren aurrean mezu-ilara bat pilatu bada). Arazo hau nola konpondu dudan interesatzen bazaizu, idatzi iezadazu eta arazo hau eztabaidatuko dugu. Kontaktuak artikuluaren amaieran. Goazen prozesu-talde bat gehitzeko urratsera.

Script-a araztean, fluxuaren azken bertsioa beti ez dela ateratzen berezitasun batekin egin nuen topo, beraz bertsio hau lehenbailehen egiaztatzea gomendatzen dut:

nipyapi.versioning.get_latest_flow_ver

Inplementatu prozesu taldea:

nipyapi.versioning.deploy_flow_version

Prozesadoreak abiarazten ditugu:

nipyapi.canvas.schedule_process_group

CLIri buruzko blokean idatzi zen datuen transferentzia ez dela automatikoki gaituta urruneko prozesu taldean? Scripta ezartzerakoan, arazo hau ere aurkitu dut. Garai hartan, ezin izan nuen APIa erabiliz datuen transferentzia hasi eta NiPyAPI liburutegiaren garatzaileari idaztea erabaki nuen eta aholku/laguntza eskatzea. Garatzaileak erantzun zidan, arazoaz eztabaidatu genuen eta "zerbait egiaztatzeko" denbora behar zuela idatzi zuen. Eta gero, pare bat egun geroago, gutun bat iristen da eta bertan Python-en idatzitako funtzio bat nire abiarazte arazoa konpontzen da!!! Garai hartan, NiPyAPI bertsioa 0.13.3 zen eta, noski, ez zegoen horrelakorik. Baina duela gutxi kaleratu zen 0.14.0 bertsioan, funtzio hau liburutegian sartuta zegoen jada. Ezagutu,

nipyapi.canvas.set_remote_process_group_transmission

Beraz, NiPyAPI liburutegia erabiliz, erregistroa konektatu genuen, fluxua zabaldu genuen eta prozesadoreak eta datu-transferentziak ere hasi genituen. Ondoren, kodea orraztu, era guztietako egiaztapenak gehitu, saioa erregistratu eta kito. Baina hori guztiz bestelako istorio bat da.

Kontuan izan ditudan automatizazio aukeren artean, azkena iruditu zait eraginkorrena. Lehenik eta behin, hau python kodea da oraindik, eta programa-kode laguntzailea txertatu dezakezu eta programazio-lengoaiaren abantaila guztiak aprobetxatu. Bigarrenik, NiPyAPI proiektua aktiboki garatzen ari da eta arazoak izanez gero garatzaileari idatzi dezakezu. Hirugarrenik, NiPyAPI tresna malguagoa da oraindik NiFirekin elkarreraginean arazo konplexuak konpontzeko. Adibidez, mezu-ilarak fluxuan hutsik dauden eta prozesu-taldea eguneratu daitekeen zehaztean.

Hori da dena. NiFi-n fluxu-entrega automatizatzeko 3 ikuspegi deskribatu nituen, garatzaile batek aurki ditzakeen akatsak, eta entrega automatizatzeko lan-kodea eman nuen. Gai hau ni bezain interesatzen bazaizu - idatzi!

Iturria: www.habr.com

Gehitu iruzkin berria