Automatisering av flytlevering i Apache NiFi

Hei!

Automatisering av flytlevering i Apache NiFi

Oppgaven er som følger - det er en flyt, presentert på bildet ovenfor, som må rulles ut til N servere med Apache NiFi. Flyttest - en fil blir generert og sendt til en annen NiFi-instans. Dataoverføring skjer ved hjelp av NiFi Site to Site-protokollen.

NiFi Site to Site (S2S) er en sikker, lett konfigurerbar måte å overføre data mellom NiFi-instanser. Hvordan S2S fungerer, se dokumentasjon og det er viktig å ikke glemme å konfigurere NiFi-forekomsten til å tillate S2S, se her.

I tilfeller hvor vi snakker om dataoverføring ved hjelp av S2S, kalles en instans klient, den andre serveren. Klienten sender data, serveren mottar. To måter å konfigurere dataoverføring mellom dem:

  1. Skyv. Fra klientforekomsten sendes data ved hjelp av en Remote Process Group (RPG). På serverforekomsten mottas data ved hjelp av inngangsporten
  2. Trekk. Serveren mottar data ved hjelp av RPG, klienten sender ved hjelp av utgangsport.


Flyt for utrulling lagres i Apache Registry.

Apache NiFi Registry er et underprosjekt av Apache NiFi som gir et verktøy for flytlagring og versjonskontroll. En slags GIT. Informasjon om installasjon, konfigurering og arbeid med registret finner du i offisiell dokumentasjon. Flyt for lagring kombineres til en prosessgruppe og lagres i dette skjemaet i registeret. Dette kommer vi tilbake til senere i artikkelen.

Ved starten, når N er et lite tall, leveres flyten og oppdateres manuelt på en akseptabel tid.

Men etter hvert som N vokser, blir problemene flere:

  1. det tar mer tid å oppdatere flyten. Du må logge på alle servere
  2. Maloppdateringsfeil oppstår. Her oppdaterte de det, men her glemte de det
  3. menneskelige feil når du utfører et stort antall lignende operasjoner

Alt dette bringer oss til det faktum at vi trenger å automatisere prosessen. Jeg prøvde følgende måter å løse dette problemet på:

  1. Bruk MiNiFi i stedet for NiFi
  2. NiFi CLI
  3. NiPyAPI

Bruker MiNiFi

Apache MiNiFy - delprosjekt av Apache NiFi. MiNiFy er en kompakt agent som bruker de samme prosessorene som NiFi, slik at du kan lage de samme flytene som i NiFi. Agentens lette natur oppnås blant annet ved at MiNiFy ikke har et grafisk grensesnitt for flytkonfigurasjon. Mangelen på et grafisk grensesnitt i MiNiFy gjør at det er nødvendig å løse problemet med å levere flyt til minifi. Siden MiNiFy brukes aktivt i IOT, er det mange komponenter og prosessen med å levere flyt til de endelige minifi-forekomstene må automatiseres. En kjent oppgave, ikke sant?

Et annet delprosjekt vil bidra til å løse dette problemet - MiNiFi C2 Server. Dette produktet er ment å være det sentrale punktet i konfigurasjonsutrullingsarkitekturen. Hvordan konfigurere miljøet - beskrevet i denne artikkelen Det er nok informasjon om Habré til å løse problemet. MiNiFi, sammen med C2-serveren, oppdaterer automatisk konfigurasjonen. Den eneste ulempen med denne tilnærmingen er at du må lage maler på C2 Server; en enkel forpliktelse til registret er ikke nok.

Alternativet beskrevet i artikkelen ovenfor fungerer og ikke vanskelig å implementere, men vi må ikke glemme følgende:

  1. Minifi har ikke alle prosessorer fra nifi
  2. Minifi-prosessorversjoner ligger bak NiFi-prosessorversjoner.

I skrivende stund er siste versjon av NiFi 1.9.2. Den siste MiNiFi-prosessorversjonen er 1.7.0. Prosessorer kan legges til MiNiFi, men på grunn av versjonsavvik mellom NiFi- og MiNiFi-prosessorer kan det hende at dette ikke fungerer.

NiFi CLI

Dommer etter beskrivelse verktøy på den offisielle nettsiden, dette er et verktøy for å automatisere samspillet mellom NiFI og NiFi Registry innen flytlevering eller prosessstyring. For å komme i gang må du laste ned dette verktøyet. derav.

Start verktøyet

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

For at vi skal kunne laste inn den nødvendige flyten fra registeret, må vi kjenne identifikatorene til bøtten (bøtteidentifikator) og selve flyten (flytidentifikator). Disse dataene kan hentes enten gjennom cli eller i NiFi-registerets nettgrensesnitt. I webgrensesnittet ser det slik ut:

Automatisering av flytlevering i Apache NiFi

Ved å bruke CLI gjøres dette:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Vi begynner å importere prosessgruppe fra registeret:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Et viktig poeng er at enhver nifi-instans kan spesifiseres som verten som vi ruller prosessgruppen til.

Prosessgruppe lagt til med stoppede prosessorer, de må startes

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Flott, prosessorene har startet. I henhold til vilkårene for oppgaven trenger vi imidlertid NiFi-instanser for å sende data til andre instanser. La oss anta at du har valgt Push-metoden for å overføre data til serveren. For å organisere dataoverføring, må du aktivere dataoverføring på den ekstra Remote Process Group (RPG), som allerede er inkludert i flyten vår.

Automatisering av flytlevering i Apache NiFi

I dokumentasjonen i CLI og andre kilder fant jeg ikke en måte å aktivere dataoverføring på. Hvis du vet hvordan du gjør dette, vennligst skriv i kommentarfeltet.

Siden vi har bash og vi er klare til å gå til slutten, vil vi finne en vei ut! Du kan bruke NiFi API for å løse dette problemet. La oss bruke følgende metode, ta ID-en fra eksemplene ovenfor (i vårt tilfelle er det 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrivelse av NiFi API-metoder her.

Automatisering av flytlevering i Apache NiFi
I kroppen må du passere JSON, slik:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametere som må fylles ut for at det skal fungere:
stat — dataoverføringsstatus. Tilgjengelig: SENDER for å aktivere dataoverføring, STOPPET for å deaktivere
versjon - prosessorversjon

versjon vil som standard være 0 når den opprettes, men disse parameterne kan fås ved hjelp av metoden

Automatisering av flytlevering i Apache NiFi

For fans av bash-skript kan denne metoden virke passende, men det er litt vanskelig for meg - bash-skript er ikke min favoritt. Den neste metoden er mer interessant og praktisk etter min mening.

NiPyAPI

NiPyAPI er et Python-bibliotek for samhandling med NiFi-forekomster. Dokumentasjonsside inneholder nødvendig informasjon for å jobbe med biblioteket. Hurtigstart er beskrevet i prosjekt på github.

Skriptet vårt for å rulle ut konfigurasjonen er et program i Python. La oss gå videre til koding.
Vi setter opp konfigurasjoner for videre arbeid. Vi trenger følgende parametere:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Deretter vil jeg sette inn navnene på metodene til dette biblioteket, som er beskrevet her.

Koble registeret til nifi-forekomsten ved hjelp av

nipyapi.versioning.create_registry_client

På dette trinnet kan du også legge til en sjekk på at registret allerede er lagt til forekomsten; for dette kan du bruke metoden

nipyapi.versioning.list_registry_clients

Vi finner bøtta for videre søk etter flyt i kurven

nipyapi.versioning.get_registry_bucket

Ved hjelp av den funnet bøtta ser vi etter flyt

nipyapi.versioning.get_flow_in_bucket

Deretter er det viktig å forstå om denne prosessgruppen allerede er lagt til. Prosessgruppen er plassert i henhold til koordinater og det kan oppstå en situasjon når en andre komponent legges over en. Jeg sjekket, dette kan skje :) For å få alle lagt til prosessgrupper bruker vi metoden

nipyapi.canvas.list_all_process_groups

Vi kan søke videre, for eksempel etter navn.

Jeg vil ikke beskrive prosessen med å oppdatere malen, jeg vil bare si at hvis prosessorer legges til i den nye versjonen av malen, er det ingen problemer med tilstedeværelsen av meldinger i køene. Men hvis prosessorer fjernes, kan det oppstå problemer (nifi lar deg ikke fjerne en prosessor hvis en meldingskø har samlet seg foran den). Hvis du er interessert i hvordan jeg løste dette problemet, vennligst skriv til meg, så vil vi diskutere dette problemet. Kontakter på slutten av artikkelen. La oss gå videre til trinnet med å legge til en prosessgruppe.

Når jeg feilsøkte skriptet, kom jeg over en særegenhet at den nyeste versjonen av flow ikke alltid blir trukket opp, så jeg anbefaler å sjekke denne versjonen først:

nipyapi.versioning.get_latest_flow_ver

Distribuer prosessgruppe:

nipyapi.versioning.deploy_flow_version

Vi starter prosessorene:

nipyapi.canvas.schedule_process_group

I blokken om CLI ble det skrevet at dataoverføring ikke automatisk er aktivert i den eksterne prosessgruppen? Da jeg implementerte skriptet, møtte jeg også dette problemet. På det tidspunktet klarte jeg ikke å starte dataoverføring ved hjelp av API, og jeg bestemte meg for å skrive til utvikleren av NiPyAPI-biblioteket og be om råd/hjelp. Utvikleren svarte meg, vi diskuterte problemet og han skrev at han trengte tid til å "sjekke noe". Og så, et par dager senere, kommer det et brev der det er skrevet en funksjon i Python som løser oppstartsproblemet mitt!!! På den tiden var NiPyAPI-versjonen 0.13.3, og det var selvfølgelig ikke noe sånt. Men i versjon 0.14.0, som ble utgitt ganske nylig, var denne funksjonen allerede inkludert i biblioteket. Møte,

nipyapi.canvas.set_remote_process_group_transmission

Så ved å bruke NiPyAPI-biblioteket koblet vi til registeret, rullet ut flyt og startet til og med prosessorer og dataoverføring. Deretter kan du finkjemme koden, legge til alle slags sjekker, logging, og det er alt. Men det er en helt annen historie.

Av automatiseringsalternativene jeg vurderte, virket det siste for meg som det mest effektive. For det første er dette fortsatt python-kode, der du kan legge inn hjelpeprogramkode og dra nytte av alle fordelene med programmeringsspråket. For det andre er NiPyAPI-prosjektet aktivt i utvikling, og i tilfelle problemer kan du skrive til utvikleren. For det tredje er NiPyAPI fortsatt et mer fleksibelt verktøy for å samhandle med NiFi for å løse komplekse problemer. For eksempel ved å avgjøre om meldingskøene nå er tomme i flyten og om prosessgruppen kan oppdateres.

Det er alt. Jeg beskrev 3 tilnærminger for å automatisere flytlevering i NiFi, fallgruver som en utvikler kan støte på, og ga arbeidskode for å automatisere levering. Hvis du er like interessert i dette emnet som meg - skrive!

Kilde: www.habr.com

Legg til en kommentar