Avtomatizacija dostave toka v Apache NiFi

Pozdravljeni vsi!

Avtomatizacija dostave toka v Apache NiFi

Naloga je naslednja - na zgornji sliki je prikazan tok, ki ga je treba uvesti na N strežnikov z Apache NiFi. Test pretoka - datoteka se generira in pošlje drugi instanci NiFi. Prenos podatkov poteka po protokolu NiFi Site to Site.

NiFi Site to Site (S2S) je varen, zelo prilagodljiv način za prenos podatkov med primerki NiFi. Oglejte si, kako deluje S2S dokumentacijo in pomembno je, da ne pozabite nastaviti svojega primerka NiFi, da omogočite S2S see tukaj.

Ko gre za prenos podatkov s pomočjo S2S, se ena instanca imenuje odjemalec, druga pa strežnik. Odjemalec pošlje podatke, strežnik jih prejme. Dva načina za nastavitev prenosa podatkov med njimi:

  1. Push. Podatki se pošljejo iz instance odjemalca z uporabo skupine oddaljenih procesov (RPG). Na primerku strežnika so podatki prejeti z uporabo vhodnih vrat
  2. Potegnite. Strežnik prejme podatke z uporabo RPG, odjemalec pošlje z uporabo izhodnih vrat.


Tok za vrtenje je shranjen v registru Apache.

Apache NiFi Registry je podprojekt Apache NiFi, ki zagotavlja orodje za shranjevanje in različico pretoka. Nekakšen GIT. Informacije o namestitvi, konfiguraciji in delu z registrom najdete v uradna dokumentacija. Tok za shranjevanje je združen v skupino procesov in v tej obliki shranjen v registru. K temu se bomo vrnili kasneje v članku.

Na začetku, ko je N majhno število, se tok dostavi in ​​posodobi ročno v razumnem času.

Toda ko N raste, je več težav:

  1. posodobitev toka traja več časa. Morate iti na vse strežnike
  2. pri posodabljanju predlog prihaja do napak. Tukaj so posodobili, tukaj pa pozabili
  3. človeška napaka pri izvajanju velikega števila podobnih operacij

Vse to nas pripelje do dejstva, da je potrebno avtomatizirati proces. Za rešitev te težave sem poskusil naslednje načine:

  1. Uporabite MiNiFi namesto NiFi
  2. NiFi CLI
  3. NiPyAPI

Uporaba MiNiFi

ApacheMiNify je podprojekt Apache NiFi. MiNiFy je kompakten agent, ki uporablja iste procesorje kot NiFi, kar vam omogoča, da ustvarite enak pretok kot v NiFi. Lahkotnost agenta je med drugim dosežena zaradi dejstva, da MiNiFy nima grafičnega vmesnika za konfiguracijo toka. Pomanjkanje grafičnega vmesnika MiNiFy pomeni, da je treba rešiti problem dostave pretoka v minifi. Ker se MiNiFy aktivno uporablja v IOT, obstaja veliko komponent in postopek zagotavljanja pretoka do končnih instanc minifi mora biti avtomatiziran. Znana naloga, kajne?

Drug podprojekt, MiNiFi C2 Server, bo pomagal rešiti to težavo. Ta izdelek naj bi bil osrednja točka v arhitekturi uvajanja. Kako konfigurirati okolje - opisano v ta članek na Habréju in informacije so dovolj za rešitev problema. MiNiFi v povezavi s strežnikom C2 samodejno posodobi svojo konfiguracijo. Edina pomanjkljivost tega pristopa je, da morate ustvariti predloge na strežniku C2, preprosta objava v register ni dovolj.

Možnost, opisana v zgornjem članku, deluje in ni težko izvedljiva, vendar ne smemo pozabiti naslednjega:

  1. minifi nima vseh procesorjev od nifi
  2. Različice CPE v Minifi zaostajajo za različicami CPE v NiFi.

V času pisanja je najnovejša različica NiFi 1.9.2. Različica procesorja najnovejše različice MiNiFi je 1.7.0. Procesorje je mogoče dodati v MiNiFi, vendar zaradi razlik v različicah med procesorji NiFi in MiNiFi to morda ne bo delovalo.

NiFi CLI

Sodeč po opis orodje na uradni spletni strani, je to orodje za avtomatizacijo interakcije med NiFI in NiFi registrom na področju dostave pretoka ali upravljanja procesov. Za začetek prenesite to orodje. zato.

Zaženite pripomoček

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Da lahko naložimo potreben tok iz registra, moramo poznati identifikatorje košarice (identifikator vedra) in sam tok (identifikator toka). Te podatke je mogoče pridobiti prek cli ali v spletnem vmesniku registra NiFi. Spletni vmesnik izgleda takole:

Avtomatizacija dostave toka v Apache NiFi

S pomočjo CLI se to naredi:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Zaženi skupino uvoznih procesov iz registra:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Pomembna točka je, da je mogoče kateri koli primerek nifi določiti kot gostitelja, na katerem izvajamo skupino procesov.

Dodana skupina procesov z ustavljenimi procesorji, ki jih je treba zagnati

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Super, procesorji so začeli. Vendar pa glede na pogoje problema potrebujemo primerke NiFi za pošiljanje podatkov drugim primerkom. Predpostavimo, da je bila za prenos podatkov na strežnik izbrana metoda Push. Za organizacijo prenosa podatkov je potrebno omogočiti prenos podatkov (Enable transmitting) na dodani Remote Process Group (RPG), ki je že vključena v naš tok.

Avtomatizacija dostave toka v Apache NiFi

V dokumentaciji v CLI in drugih virih nisem našel načina za omogočanje prenosa podatkov. Če veste, kako to storiti, napišite v komentarje.

Ker imamo udarec in smo pripravljeni iti do konca, bomo našli izhod! Za rešitev te težave lahko uporabite NiFi API. Uporabimo naslednjo metodo, vzamemo ID iz zgornjih primerov (v našem primeru je to 7f522a13-016e-1000-e504-d5b15587f2f3). Opis metod NiFi API tukaj.

Avtomatizacija dostave toka v Apache NiFi
V telesu morate posredovati JSON v naslednji obliki:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri, ki jih je treba izpolniti, da "delujejo":
so bili — stanje prenosa podatkov. Na voljo TRANSMITTING za omogočanje prenosa podatkov, STOPPED za onemogočanje
različica - različica procesorja

različica bo ob ustvarjanju privzeto nastavljena na 0, vendar je te parametre mogoče pridobiti z metodo

Avtomatizacija dostave toka v Apache NiFi

Za ljubitelje skriptov bash se ta metoda morda zdi primerna, vendar je zame težko - skripti bash niso moji najljubši. Naslednji način je po mojem mnenju bolj zanimiv in bolj priročen.

NiPyAPI

NiPyAPI je knjižnica Python za interakcijo z instancami NiFi. Stran z dokumentacijo vsebuje potrebne informacije za delo s knjižnico. Hitri začetek je opisan v projekt na githubu.

Naš skript za uvedbo konfiguracije je program Python. Preidimo na kodiranje.
Nastavite konfiguracije za nadaljnje delo. Potrebovali bomo naslednje parametre:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Nadalje bom vstavil imena metod te knjižnice, ki so opisane tukaj.

Register povežemo z instanco nifi z uporabo

nipyapi.versioning.create_registry_client

Na tem koraku lahko dodate tudi preverjanje, ali je register že dodan primerku, za to lahko uporabite metodo

nipyapi.versioning.list_registry_clients

Poiščemo vedro za nadaljnje iskanje toka v košari

nipyapi.versioning.get_registry_bucket

Glede na najdeno vedro iščemo pretok

nipyapi.versioning.get_flow_in_bucket

Nato je pomembno razumeti, ali je bila ta procesna skupina že dodana. Skupina procesov je postavljena po koordinatah in lahko pride do situacije, ko je na eno nadgrajena druga. Preveril sem, lahko je 🙂 Če želite dobiti vso dodano skupino procesov, uporabite metodo

nipyapi.canvas.list_all_process_groups

in potem lahko iščemo na primer po imenu.

Ne bom opisal postopka posodabljanja predloge, rekel bom le, da če so v novi različici predloge dodani procesorji, potem ni težav s prisotnostjo sporočil v čakalnih vrstah. Če pa so procesorji odstranjeni, lahko pride do težav (nifi ne dovoli odstranitve procesorja, če se je pred njim nabrala čakalna vrsta sporočil). Če vas zanima, kako sem rešil to težavo - pišite mi, prosim, razpravljali bomo o tej točki. Kontakti na koncu članka. Preidimo na korak dodajanja skupine procesov.

Pri odpravljanju napak v skriptu sem naletel na funkcijo, da najnovejša različica toka ni vedno prikazana, zato priporočam, da najprej razjasnite to različico:

nipyapi.versioning.get_latest_flow_ver

Razmesti skupino procesov:

nipyapi.versioning.deploy_flow_version

Začetni procesorji:

nipyapi.canvas.schedule_process_group

V bloku o CLI je pisalo, da prenos podatkov ni samodejno omogočen v skupini oddaljenih procesov? Pri implementaciji skripta sem naletel tudi na to težavo. Takrat nisem mogel zagnati prenosa podatkov z API-jem in odločil sem se, da pišem razvijalcu knjižnice NiPyAPI in ga prosim za nasvet/pomoč. Razvijalec mi je odgovoril, pogovorila sva se o problemu in napisal je, da potrebuje čas, da "nekaj preveri". In zdaj, nekaj dni kasneje, pride e-poštno sporočilo, v katerem je napisana funkcija Python, ki reši mojo težavo pri zagonu !!! Takrat je bila različica NiPyAPI 0.13.3 in v njej seveda ni bilo nič takega. Toda v različici 0.14.0, ki je bila izdana pred kratkim, je ta funkcija že vključena v knjižnico. Srečati

nipyapi.canvas.set_remote_process_group_transmission

Tako smo s pomočjo knjižnice NiPyAPI povezali register, navili tok in celo zagnali procesorje in prenos podatkov. Nato lahko kodo prečešete, dodate vse vrste preverjanj, beleženje in to je to. A to je povsem druga zgodba.

Od možnosti avtomatizacije, ki sem jih obravnaval, se mi je slednja zdela najučinkovitejša. Prvič, to je še vedno koda python, v katero lahko vdelate pomožno programsko kodo in uživate v vseh prednostih programskega jezika. Drugič, projekt NiPyAPI se aktivno razvija in v primeru težav lahko pišete razvijalcu. Tretjič, NiPyAPI je še vedno bolj prilagodljivo orodje za interakcijo z NiFi pri reševanju kompleksnih problemov. Na primer pri ugotavljanju, ali so čakalne vrste sporočil trenutno prazne v toku in ali je mogoče posodobiti skupino procesov.

To je vse. Opisal sem 3 pristope k avtomatizaciji dostave pretoka v NiFi, pasti, na katere lahko naleti razvijalec, in zagotovil delujočo kodo za avtomatizacijo dostave. Če vas ta tema zanima tako kot mene - napiši!

Vir: www.habr.com

Dodaj komentar