Hei!
Oppgaven er som følger - det er en flyt, presentert på bildet ovenfor, som må rulles ut til N servere med
NiFi Site to Site (S2S) er en sikker, lett konfigurerbar måte å overføre data mellom NiFi-instanser. Hvordan S2S fungerer, se
I tilfeller hvor vi snakker om dataoverføring ved hjelp av S2S, kalles en instans klient, den andre serveren. Klienten sender data, serveren mottar. To måter å konfigurere dataoverføring mellom dem:
- Skyv. Fra klientforekomsten sendes data ved hjelp av en Remote Process Group (RPG). På serverforekomsten mottas data ved hjelp av inngangsporten
- Trekk. Serveren mottar data ved hjelp av RPG, klienten sender ved hjelp av utgangsport.
Flyt for utrulling lagres i Apache Registry.
Apache NiFi Registry er et underprosjekt av Apache NiFi som gir et verktøy for flytlagring og versjonskontroll. En slags GIT. Informasjon om installasjon, konfigurering og arbeid med registret finner du i
Ved starten, når N er et lite tall, leveres flyten og oppdateres manuelt på en akseptabel tid.
Men etter hvert som N vokser, blir problemene flere:
- det tar mer tid å oppdatere flyten. Du må logge på alle servere
- Maloppdateringsfeil oppstår. Her oppdaterte de det, men her glemte de det
- menneskelige feil når du utfører et stort antall lignende operasjoner
Alt dette bringer oss til det faktum at vi trenger å automatisere prosessen. Jeg prøvde følgende måter å løse dette problemet på:
- Bruk MiNiFi i stedet for NiFi
- NiFi CLI
- NiPyAPI
Bruker MiNiFi
Et annet delprosjekt vil bidra til å løse dette problemet - MiNiFi C2 Server. Dette produktet er ment å være det sentrale punktet i konfigurasjonsutrullingsarkitekturen. Hvordan konfigurere miljøet - beskrevet i
Alternativet beskrevet i artikkelen ovenfor fungerer og ikke vanskelig å implementere, men vi må ikke glemme følgende:
- Minifi har ikke alle prosessorer fra nifi
- Minifi-prosessorversjoner ligger bak NiFi-prosessorversjoner.
I skrivende stund er siste versjon av NiFi 1.9.2. Den siste MiNiFi-prosessorversjonen er 1.7.0. Prosessorer kan legges til MiNiFi, men på grunn av versjonsavvik mellom NiFi- og MiNiFi-prosessorer kan det hende at dette ikke fungerer.
NiFi CLI
Dommer etter
Start verktøyet
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
For at vi skal kunne laste inn den nødvendige flyten fra registeret, må vi kjenne identifikatorene til bøtten (bøtteidentifikator) og selve flyten (flytidentifikator). Disse dataene kan hentes enten gjennom cli eller i NiFi-registerets nettgrensesnitt. I webgrensesnittet ser det slik ut:
Ved å bruke CLI gjøres dette:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Vi begynner å importere prosessgruppe fra registeret:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Et viktig poeng er at enhver nifi-instans kan spesifiseres som verten som vi ruller prosessgruppen til.
Prosessgruppe lagt til med stoppede prosessorer, de må startes
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Flott, prosessorene har startet. I henhold til vilkårene for oppgaven trenger vi imidlertid NiFi-instanser for å sende data til andre instanser. La oss anta at du har valgt Push-metoden for å overføre data til serveren. For å organisere dataoverføring, må du aktivere dataoverføring på den ekstra Remote Process Group (RPG), som allerede er inkludert i flyten vår.
I dokumentasjonen i CLI og andre kilder fant jeg ikke en måte å aktivere dataoverføring på. Hvis du vet hvordan du gjør dette, vennligst skriv i kommentarfeltet.
Siden vi har bash og vi er klare til å gå til slutten, vil vi finne en vei ut! Du kan bruke NiFi API for å løse dette problemet. La oss bruke følgende metode, ta ID-en fra eksemplene ovenfor (i vårt tilfelle er det 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrivelse av NiFi API-metoder
I kroppen må du passere JSON, slik:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametere som må fylles ut for at det skal fungere:
stat — dataoverføringsstatus. Tilgjengelig: SENDER for å aktivere dataoverføring, STOPPET for å deaktivere
versjon - prosessorversjon
versjon vil som standard være 0 når den opprettes, men disse parameterne kan fås ved hjelp av metoden
For fans av bash-skript kan denne metoden virke passende, men det er litt vanskelig for meg - bash-skript er ikke min favoritt. Den neste metoden er mer interessant og praktisk etter min mening.
NiPyAPI
NiPyAPI er et Python-bibliotek for samhandling med NiFi-forekomster.
Skriptet vårt for å rulle ut konfigurasjonen er et program i Python. La oss gå videre til koding.
Vi setter opp konfigurasjoner for videre arbeid. Vi trenger følgende parametere:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Deretter vil jeg sette inn navnene på metodene til dette biblioteket, som er beskrevet
Koble registeret til nifi-forekomsten ved hjelp av
nipyapi.versioning.create_registry_client
På dette trinnet kan du også legge til en sjekk på at registret allerede er lagt til forekomsten; for dette kan du bruke metoden
nipyapi.versioning.list_registry_clients
Vi finner bøtta for videre søk etter flyt i kurven
nipyapi.versioning.get_registry_bucket
Ved hjelp av den funnet bøtta ser vi etter flyt
nipyapi.versioning.get_flow_in_bucket
Deretter er det viktig å forstå om denne prosessgruppen allerede er lagt til. Prosessgruppen er plassert i henhold til koordinater og det kan oppstå en situasjon når en andre komponent legges over en. Jeg sjekket, dette kan skje :) For å få alle lagt til prosessgrupper bruker vi metoden
nipyapi.canvas.list_all_process_groups
Vi kan søke videre, for eksempel etter navn.
Jeg vil ikke beskrive prosessen med å oppdatere malen, jeg vil bare si at hvis prosessorer legges til i den nye versjonen av malen, er det ingen problemer med tilstedeværelsen av meldinger i køene. Men hvis prosessorer fjernes, kan det oppstå problemer (nifi lar deg ikke fjerne en prosessor hvis en meldingskø har samlet seg foran den). Hvis du er interessert i hvordan jeg løste dette problemet, vennligst skriv til meg, så vil vi diskutere dette problemet. Kontakter på slutten av artikkelen. La oss gå videre til trinnet med å legge til en prosessgruppe.
Når jeg feilsøkte skriptet, kom jeg over en særegenhet at den nyeste versjonen av flow ikke alltid blir trukket opp, så jeg anbefaler å sjekke denne versjonen først:
nipyapi.versioning.get_latest_flow_ver
Distribuer prosessgruppe:
nipyapi.versioning.deploy_flow_version
Vi starter prosessorene:
nipyapi.canvas.schedule_process_group
I blokken om CLI ble det skrevet at dataoverføring ikke automatisk er aktivert i den eksterne prosessgruppen? Da jeg implementerte skriptet, møtte jeg også dette problemet. På det tidspunktet klarte jeg ikke å starte dataoverføring ved hjelp av API, og jeg bestemte meg for å skrive til utvikleren av NiPyAPI-biblioteket og be om råd/hjelp. Utvikleren svarte meg, vi diskuterte problemet og han skrev at han trengte tid til å "sjekke noe". Og så, et par dager senere, kommer det et brev der det er skrevet en funksjon i Python som løser oppstartsproblemet mitt!!! På den tiden var NiPyAPI-versjonen 0.13.3, og det var selvfølgelig ikke noe sånt. Men i versjon 0.14.0, som ble utgitt ganske nylig, var denne funksjonen allerede inkludert i biblioteket. Møte,
nipyapi.canvas.set_remote_process_group_transmission
Så ved å bruke NiPyAPI-biblioteket koblet vi til registeret, rullet ut flyt og startet til og med prosessorer og dataoverføring. Deretter kan du finkjemme koden, legge til alle slags sjekker, logging, og det er alt. Men det er en helt annen historie.
Av automatiseringsalternativene jeg vurderte, virket det siste for meg som det mest effektive. For det første er dette fortsatt python-kode, der du kan legge inn hjelpeprogramkode og dra nytte av alle fordelene med programmeringsspråket. For det andre er NiPyAPI-prosjektet aktivt i utvikling, og i tilfelle problemer kan du skrive til utvikleren. For det tredje er NiPyAPI fortsatt et mer fleksibelt verktøy for å samhandle med NiFi for å løse komplekse problemer. For eksempel ved å avgjøre om meldingskøene nå er tomme i flyten og om prosessgruppen kan oppdateres.
Det er alt. Jeg beskrev 3 tilnærminger for å automatisere flytlevering i NiFi, fallgruver som en utvikler kan støte på, og ga arbeidskode for å automatisere levering. Hvis du er like interessert i dette emnet som meg -
Kilde: www.habr.com