Voolu edastamise automatiseerimine Apache NiFi-s

Tere kõigile!

Voolu edastamise automatiseerimine Apache NiFi-s

Ülesanne on järgmine - ülaltoodud pildil on voog, mis tuleb N serverisse välja veeretada Apache NiFi. Voolu test – faili genereeritakse ja saadetakse teisele NiFi eksemplarile. Andmeedastus toimub NiFi Site to Site protokolli abil.

NiFi Site to Site (S2S) on turvaline ja hõlpsasti konfigureeritav viis andmete edastamiseks NiFi eksemplaride vahel. Kuidas S2S töötab, vaata dokumentatsioon ja oluline on mitte unustada konfigureerida NiFi eksemplari S2S-i lubamiseks, vt siin.

Juhtudel, kui me räägime andmeedastusest S2S-i abil, nimetatakse ühte eksemplari kliendiks, teist serveriks. Klient saadab andmeid, server võtab vastu. Nendevahelise andmeedastuse konfigureerimiseks on kaks võimalust:

  1. Lükkama. Kliendieksemplarist saadetakse andmed kaugprotsessirühma (RPG) abil. Serveri eksemplaris võetakse andmeid vastu sisendpordi kaudu
  2. Tõmmata. Server võtab andmeid vastu RPG abil, klient saadab väljundpordi kaudu.


Käivitamiseks mõeldud voog salvestatakse Apache registrisse.

Apache NiFi register on Apache NiFi alamprojekt, mis pakub voosalvestuse ja versioonikontrolli tööriista. Omamoodi GIT. Teavet registri installimise, konfigureerimise ja sellega töötamise kohta leiate aadressilt ametlik dokumentatsioon. Salvestusvoog ühendatakse protsessirühmaks ja salvestatakse sellel kujul registris. Selle juurde pöördume hiljem artiklis tagasi.

Alguses, kui N on väike arv, edastatakse voog ja värskendatakse seda käsitsi vastuvõetava aja jooksul.

Kuid kui N kasvab, muutuvad probleemid üha arvukamaks:

  1. voo värskendamine võtab rohkem aega. Peate kõikidesse serveritesse sisse logima
  2. Malli värskendamisel ilmnevad vead. Siin nad värskendasid seda, kuid siin nad unustasid
  3. inimlikud vead suure hulga sarnaste toimingute tegemisel

Kõik see viib meid tõsiasjani, et peame protsessi automatiseerima. Proovisin selle probleemi lahendamiseks järgmisi viise:

  1. Kasutage NiFi asemel MiNiFi
  2. NiFi CLI
  3. NiPyAPI

MiNiFi kasutamine

Apache MiNiFy - Apache NiFi alamprojekt. MiNiFy on kompaktne agent, mis kasutab samu protsessoreid nagu NiFi, võimaldades teil luua samu vooge nagu NiFi puhul. Agendi kerge olemus saavutatakse muuhulgas sellega, et MiNiFyl puudub voolu konfigureerimiseks graafiline liides. Graafilise liidese puudumine MiNiFy-s tähendab, et on vaja lahendada voo minifisse edastamise probleem. Kuna MiNiFy-d kasutatakse IOT-s aktiivselt, on seal palju komponente ja voo edastamise protsess lõplikele minifi eksemplaridele tuleb automatiseerida. Tuttav ülesanne, eks?

Teine alamprojekt aitab seda probleemi lahendada - MiNiFi C2 Server. See toode on mõeldud konfiguratsiooni levitamise arhitektuuri keskseks punktiks. Kuidas keskkonda konfigureerida - kirjeldatud artiklis see artikkel Probleemi lahendamiseks on Habré kohta piisavalt teavet. MiNiFi värskendab koos serveriga C2 automaatselt selle konfiguratsiooni. Selle lähenemisviisi ainsaks puuduseks on see, et peate C2 serveris malle looma; lihtsast registrile pühendumisest ei piisa.

Ülaltoodud artiklis kirjeldatud valik töötab ja seda pole keeruline rakendada, kuid me ei tohi unustada järgmist:

  1. Minifil pole kõiki nifi protsessoreid
  2. Minifi protsessori versioonid jäävad NiFi protsessori versioonidest maha.

Selle artikli kirjutamise ajal oli NiFi uusim versioon 1.9.2. Uusim MiNiFi protsessori versioon on 1.7.0. Protsessoreid saab MiNiFi-le lisada, kuid NiFi ja MiNiFi protsessorite versioonierinevuste tõttu ei pruugi see toimida.

NiFi CLI

Otsustades kirjeldus tööriist ametlikul veebisaidil, see on tööriist NiFI ja NiFi registri vahelise suhtluse automatiseerimiseks voo edastamise või protsesside haldamise valdkonnas. Alustamiseks peate selle tööriista alla laadima. siit.

Käivitage utiliit

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Selleks, et saaksime registrist vajaliku voo laadida, peame teadma ämbri (ämbri identifikaator) ja voo enda (voo identifikaator) identifikaatoreid. Neid andmeid saab hankida kas klii kaudu või NiFi registri veebiliidese kaudu. Veebiliideses näeb see välja järgmine:

Voolu edastamise automatiseerimine Apache NiFi-s

CLI abil tehakse seda:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Alustame protsessirühma importimist registrist:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Oluline punkt on see, et iga nifi eksemplari saab määrata hostiks, kuhu protsessirühma veeretame.

Peatatud protsessoritega protsessigrupp on lisatud, need tuleb käivitada

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Suurepärane, protsessorid on käivitunud. Kuid vastavalt ülesande tingimustele vajame NiFi eksemplare andmete saatmiseks teistele eksemplaridele. Oletame, et olete andmete serverisse edastamiseks valinud Push-meetodi. Andmeedastuse korraldamiseks peate lubama andmeedastuse lisatud Remote Process Groupis (RPG), mis on meie voos juba kaasatud.

Voolu edastamise automatiseerimine Apache NiFi-s

CLI-s ja muudes allikates olevas dokumentatsioonis ei leidnud ma võimalust andmeedastuse võimaldamiseks. Kui teate, kuidas seda teha, kirjutage kommentaaridesse.

Kuna meil on bash ja oleme valmis lõpuni minema, siis leiame väljapääsu! Selle probleemi lahendamiseks saate kasutada NiFi API-d. Kasutame järgmist meetodit, võtke ID ülaltoodud näidetest (meie puhul on see 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API meetodite kirjeldus siin.

Voolu edastamise automatiseerimine Apache NiFi-s
Kehas peate JSON-i edastama järgmiselt:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameetrid, mis tuleb selle toimimiseks täita:
riik — andmeedastuse olek. Saadaval: EDASTAMINE andmeedastuse lubamiseks, STOPPED keelamiseks
versioon - protsessori versioon

versioon on loomisel vaikimisi 0, kuid need parameetrid saab selle meetodi abil hankida

Voolu edastamise automatiseerimine Apache NiFi-s

Bash-skriptide austajatele võib see meetod tunduda sobiv, kuid minu jaoks on see pisut keeruline - bash-skriptid pole minu lemmikud. Järgmine meetod on minu arvates huvitavam ja mugavam.

NiPyAPI

NiPyAPI on Pythoni teek NiFi eksemplaridega suhtlemiseks. Dokumentatsiooni leht sisaldab raamatukoguga töötamiseks vajalikku teavet. Kiirkäivitus on kirjeldatud artiklis projekti githubis.

Meie konfiguratsiooni käivitamise skript on Pythoni programm. Liigume edasi kodeerimise juurde.
Seadistasime konfiguratsioonid edasiseks tööks. Vajame järgmisi parameetreid:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Järgmisena lisan selle teegi meetodite nimed, mida on kirjeldatud siin.

Ühendage register nifi eksemplariga kasutades

nipyapi.versioning.create_registry_client

Selles etapis saate lisada ka kontrolli, kas register on eksemplarile juba lisatud; selleks saate kasutada meetodit

nipyapi.versioning.list_registry_clients

Leiame ämbri korvis oleva voolu edasiseks otsimiseks

nipyapi.versioning.get_registry_bucket

Kasutades leitud ämbrit, otsime voolu

nipyapi.versioning.get_flow_in_bucket

Järgmiseks on oluline mõista, kas see protsessirühm on juba lisatud. Protsessi grupp paigutatakse koordinaatide järgi ja võib tekkida olukord, kui ühe peale asetatakse teine ​​komponent. Kontrollisin, see võib juhtuda :) Kõigi lisatud protsessirühmade hankimiseks kasutame meetodit

nipyapi.canvas.list_all_process_groups

Saame edasi otsida näiteks nime järgi.

Ma ei kirjelda malli värskendamise protsessi, ütlen ainult, et kui malli uude versiooni lisatakse protsessorid, siis pole probleeme sõnumite olemasoluga järjekordades. Kui aga protsessorid eemaldada, siis võivad tekkida probleemid (nifi ei luba protsessorit eemaldada, kui selle ette on kogunenud sõnumijärjekord). Kui olete huvitatud sellest, kuidas ma selle probleemi lahendasin, kirjutage mulle ja me arutame seda küsimust. Kontaktid artikli lõpus. Liigume edasi protsessirühma lisamise sammu juurde.

Skripti silumisel puutusin kokku eripäraga, et voo uusimat versiooni ei tõmmata alati üles, seega soovitan esmalt seda versiooni kontrollida:

nipyapi.versioning.get_latest_flow_ver

Protsessirühma juurutamine:

nipyapi.versioning.deploy_flow_version

Käivitame protsessorid:

nipyapi.canvas.schedule_process_group

CLI-teemalises plokis oli kirjas, et andmeedastus pole kaugprotsessigrupis automaatselt lubatud? Skripti rakendamisel puutusin ka selle probleemiga kokku. Sel ajal ei saanud ma API abil andmeedastust alustada ja otsustasin kirjutada NiPyAPI teegi arendajale ja küsida nõu/abi. Arendaja vastas mulle, arutasime probleemi ja ta kirjutas, et vajab aega “millegi kontrollimiseks”. Ja siis paari päeva pärast tuleb kiri, milles on Pythonis kirjutatud funktsioon, mis lahendab minu käivitamisprobleemi!!! Tol ajal oli NiPyAPI versioon 0.13.3 ja loomulikult polnud midagi sellist. Kuid versioonis 0.14.0, mis ilmus üsna hiljuti, oli see funktsioon juba teegis. kohtuda,

nipyapi.canvas.set_remote_process_group_transmission

Nii ühendasime NiPyAPI teeki kasutades registri, käivitasime voo ja isegi alustasime protsessoreid ja andmeedastust. Seejärel saate koodi kammida, lisada igasuguseid kontrolle, logida ja see on kõik. Aga see on hoopis teine ​​lugu.

Kaalutud automatiseerimisvõimalustest tundus viimane mulle kõige tõhusam. Esiteks on see ikkagi pythoni kood, millesse saab põimida abiprogrammi koodi ja kasutada kõiki programmeerimiskeele eeliseid. Teiseks NiPyAPI projekt areneb aktiivselt ja probleemide korral saab arendajale kirjutada. Kolmandaks on NiPyAPI endiselt paindlikum tööriist NiFi-ga suhtlemiseks keerukate probleemide lahendamisel. Näiteks selleks, et teha kindlaks, kas sõnumijärjekorrad on voos nüüd tühjad ja kas protsessirühma saab värskendada.

See on kõik. Kirjeldasin 3 lähenemisviisi voo edastamise automatiseerimiseks NiFi-s, lõkse, millega arendaja võib kokku puutuda, ja andsin töökoodi kohaletoimetamise automatiseerimiseks. Kui olete sellest teemast sama huvitatud kui mina - kirjuta!

Allikas: www.habr.com

Lisa kommentaar