Pozdravljeni vsi!
Naloga je naslednja - na zgornji sliki je prikazan tok, ki ga je treba uvesti na N strežnikov z
NiFi Site to Site (S2S) je varen, zelo prilagodljiv način za prenos podatkov med primerki NiFi. Oglejte si, kako deluje S2S
Ko gre za prenos podatkov s pomočjo S2S, se ena instanca imenuje odjemalec, druga pa strežnik. Odjemalec pošlje podatke, strežnik jih prejme. Dva načina za nastavitev prenosa podatkov med njimi:
- Push. Podatki se pošljejo iz instance odjemalca z uporabo skupine oddaljenih procesov (RPG). Na primerku strežnika so podatki prejeti z uporabo vhodnih vrat
- Potegnite. Strežnik prejme podatke z uporabo RPG, odjemalec pošlje z uporabo izhodnih vrat.
Tok za vrtenje je shranjen v registru Apache.
Apache NiFi Registry je podprojekt Apache NiFi, ki zagotavlja orodje za shranjevanje in različico pretoka. Nekakšen GIT. Informacije o namestitvi, konfiguraciji in delu z registrom najdete v
Na začetku, ko je N majhno število, se tok dostavi in posodobi ročno v razumnem času.
Toda ko N raste, je več težav:
- posodobitev toka traja več časa. Morate iti na vse strežnike
- pri posodabljanju predlog prihaja do napak. Tukaj so posodobili, tukaj pa pozabili
- človeška napaka pri izvajanju velikega števila podobnih operacij
Vse to nas pripelje do dejstva, da je potrebno avtomatizirati proces. Za rešitev te težave sem poskusil naslednje načine:
- Uporabite MiNiFi namesto NiFi
- NiFi CLI
- NiPyAPI
Uporaba MiNiFi
Drug podprojekt, MiNiFi C2 Server, bo pomagal rešiti to težavo. Ta izdelek naj bi bil osrednja točka v arhitekturi uvajanja. Kako konfigurirati okolje - opisano v
Možnost, opisana v zgornjem članku, deluje in ni težko izvedljiva, vendar ne smemo pozabiti naslednjega:
- minifi nima vseh procesorjev od nifi
- Različice CPE v Minifi zaostajajo za različicami CPE v NiFi.
V času pisanja je najnovejša različica NiFi 1.9.2. Različica procesorja najnovejše različice MiNiFi je 1.7.0. Procesorje je mogoče dodati v MiNiFi, vendar zaradi razlik v različicah med procesorji NiFi in MiNiFi to morda ne bo delovalo.
NiFi CLI
Sodeč po
Zaženite pripomoček
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Da lahko naložimo potreben tok iz registra, moramo poznati identifikatorje košarice (identifikator vedra) in sam tok (identifikator toka). Te podatke je mogoče pridobiti prek cli ali v spletnem vmesniku registra NiFi. Spletni vmesnik izgleda takole:
S pomočjo CLI se to naredi:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Zaženi skupino uvoznih procesov iz registra:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Pomembna točka je, da je mogoče kateri koli primerek nifi določiti kot gostitelja, na katerem izvajamo skupino procesov.
Dodana skupina procesov z ustavljenimi procesorji, ki jih je treba zagnati
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, procesorji so začeli. Vendar pa glede na pogoje problema potrebujemo primerke NiFi za pošiljanje podatkov drugim primerkom. Predpostavimo, da je bila za prenos podatkov na strežnik izbrana metoda Push. Za organizacijo prenosa podatkov je potrebno omogočiti prenos podatkov (Enable transmitting) na dodani Remote Process Group (RPG), ki je že vključena v naš tok.
V dokumentaciji v CLI in drugih virih nisem našel načina za omogočanje prenosa podatkov. Če veste, kako to storiti, napišite v komentarje.
Ker imamo udarec in smo pripravljeni iti do konca, bomo našli izhod! Za rešitev te težave lahko uporabite NiFi API. Uporabimo naslednjo metodo, vzamemo ID iz zgornjih primerov (v našem primeru je to 7f522a13-016e-1000-e504-d5b15587f2f3). Opis metod NiFi API
V telesu morate posredovati JSON v naslednji obliki:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri, ki jih je treba izpolniti, da "delujejo":
so bili — stanje prenosa podatkov. Na voljo TRANSMITTING za omogočanje prenosa podatkov, STOPPED za onemogočanje
različica - različica procesorja
različica bo ob ustvarjanju privzeto nastavljena na 0, vendar je te parametre mogoče pridobiti z metodo
Za ljubitelje skriptov bash se ta metoda morda zdi primerna, vendar je zame težko - skripti bash niso moji najljubši. Naslednji način je po mojem mnenju bolj zanimiv in bolj priročen.
NiPyAPI
NiPyAPI je knjižnica Python za interakcijo z instancami NiFi.
Naš skript za uvedbo konfiguracije je program Python. Preidimo na kodiranje.
Nastavite konfiguracije za nadaljnje delo. Potrebovali bomo naslednje parametre:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Nadalje bom vstavil imena metod te knjižnice, ki so opisane
Register povežemo z instanco nifi z uporabo
nipyapi.versioning.create_registry_client
Na tem koraku lahko dodate tudi preverjanje, ali je register že dodan primerku, za to lahko uporabite metodo
nipyapi.versioning.list_registry_clients
Poiščemo vedro za nadaljnje iskanje toka v košari
nipyapi.versioning.get_registry_bucket
Glede na najdeno vedro iščemo pretok
nipyapi.versioning.get_flow_in_bucket
Nato je pomembno razumeti, ali je bila ta procesna skupina že dodana. Skupina procesov je postavljena po koordinatah in lahko pride do situacije, ko je na eno nadgrajena druga. Preveril sem, lahko je 🙂 Če želite dobiti vso dodano skupino procesov, uporabite metodo
nipyapi.canvas.list_all_process_groups
in potem lahko iščemo na primer po imenu.
Ne bom opisal postopka posodabljanja predloge, rekel bom le, da če so v novi različici predloge dodani procesorji, potem ni težav s prisotnostjo sporočil v čakalnih vrstah. Če pa so procesorji odstranjeni, lahko pride do težav (nifi ne dovoli odstranitve procesorja, če se je pred njim nabrala čakalna vrsta sporočil). Če vas zanima, kako sem rešil to težavo - pišite mi, prosim, razpravljali bomo o tej točki. Kontakti na koncu članka. Preidimo na korak dodajanja skupine procesov.
Pri odpravljanju napak v skriptu sem naletel na funkcijo, da najnovejša različica toka ni vedno prikazana, zato priporočam, da najprej razjasnite to različico:
nipyapi.versioning.get_latest_flow_ver
Razmesti skupino procesov:
nipyapi.versioning.deploy_flow_version
Začetni procesorji:
nipyapi.canvas.schedule_process_group
V bloku o CLI je pisalo, da prenos podatkov ni samodejno omogočen v skupini oddaljenih procesov? Pri implementaciji skripta sem naletel tudi na to težavo. Takrat nisem mogel zagnati prenosa podatkov z API-jem in odločil sem se, da pišem razvijalcu knjižnice NiPyAPI in ga prosim za nasvet/pomoč. Razvijalec mi je odgovoril, pogovorila sva se o problemu in napisal je, da potrebuje čas, da "nekaj preveri". In zdaj, nekaj dni kasneje, pride e-poštno sporočilo, v katerem je napisana funkcija Python, ki reši mojo težavo pri zagonu !!! Takrat je bila različica NiPyAPI 0.13.3 in v njej seveda ni bilo nič takega. Toda v različici 0.14.0, ki je bila izdana pred kratkim, je ta funkcija že vključena v knjižnico. Srečati
nipyapi.canvas.set_remote_process_group_transmission
Tako smo s pomočjo knjižnice NiPyAPI povezali register, navili tok in celo zagnali procesorje in prenos podatkov. Nato lahko kodo prečešete, dodate vse vrste preverjanj, beleženje in to je to. A to je povsem druga zgodba.
Od možnosti avtomatizacije, ki sem jih obravnaval, se mi je slednja zdela najučinkovitejša. Prvič, to je še vedno koda python, v katero lahko vdelate pomožno programsko kodo in uživate v vseh prednostih programskega jezika. Drugič, projekt NiPyAPI se aktivno razvija in v primeru težav lahko pišete razvijalcu. Tretjič, NiPyAPI je še vedno bolj prilagodljivo orodje za interakcijo z NiFi pri reševanju kompleksnih problemov. Na primer pri ugotavljanju, ali so čakalne vrste sporočil trenutno prazne v toku in ali je mogoče posodobiti skupino procesov.
To je vse. Opisal sem 3 pristope k avtomatizaciji dostave pretoka v NiFi, pasti, na katere lahko naleti razvijalec, in zagotovil delujočo kodo za avtomatizacijo dostave. Če vas ta tema zanima tako kot mene -
Vir: www.habr.com