Outomatisering van vloeilewering in Apache NiFi

Hallo almal!

Outomatisering van vloeilewering in Apache NiFi

Die taak is soos volg - daar is 'n vloei, aangebied in die prentjie hierbo, wat na N bedieners uitgerol moet word met Apache NiFi. Vloeitoets - 'n lêer word gegenereer en na 'n ander NiFi-instansie gestuur. Data-oordrag vind plaas deur gebruik te maak van die NiFi Site to Site-protokol.

NiFi Site to Site (S2S) is 'n veilige, maklik konfigureerbare manier om data tussen NiFi-instansies oor te dra. Hoe S2S werk, sien dokumentasie en dit is belangrik om nie te vergeet om die NiFi-instansie op te stel om S2S toe te laat nie, sien hier.

In gevalle waar ons praat oor data-oordrag met behulp van S2S, word een geval kliënt genoem, die tweede bediener. Die kliënt stuur data, die bediener ontvang. Twee maniere om data-oordrag tussen hulle op te stel:

  1. Stoot. Vanaf die kliëntinstansie word data gestuur met behulp van 'n Remote Process Group (RPG). Op die bedienergeval word data ontvang deur die invoerpoort te gebruik
  2. Trek. Die bediener ontvang data met behulp van RPG, die kliënt stuur met die uitsetpoort.


Vloei vir uitrol word in Apache-register gestoor.

Apache NiFi Registry is 'n subprojek van Apache NiFi wat 'n instrument bied vir vloeiberging en weergawebeheer. 'n Soort GIT. Inligting oor die installering, konfigurasie en werk met register kan gevind word in amptelike dokumentasie. Vloei vir berging word in 'n prosesgroep gekombineer en in hierdie vorm in die register gestoor. Ons sal later in die artikel hierna terugkom.

Aan die begin, wanneer N 'n klein getal is, word vloei gelewer en met die hand bygewerk binne 'n aanvaarbare tyd.

Maar soos N groei, word die probleme meer:

  1. dit neem meer tyd om die vloei op te dateer. Jy moet by alle bedieners aanmeld
  2. Sjabloonopdateringsfoute kom voor. Hier het hulle dit bygewerk, maar hier het hulle vergeet
  3. menslike foute wanneer 'n groot aantal soortgelyke operasies uitgevoer word

Dit alles bring ons by die feit dat ons die proses moet outomatiseer. Ek het die volgende maniere probeer om hierdie probleem op te los:

  1. Gebruik MiNiFi in plaas van NiFi
  2. NiFi CLI
  3. NiPyAPI

Gebruik MiNiFi

Apache MiNiFy - subprojek van Apache NiFi. MiNiFy is 'n kompakte agent wat dieselfde verwerkers as NiFi gebruik, wat jou toelaat om dieselfde vloei as in NiFi te skep. Die liggewig aard van die agent word onder meer bereik deur die feit dat MiNiFy nie 'n grafiese koppelvlak vir vloeikonfigurasie het nie. Die gebrek aan 'n grafiese koppelvlak in MiNiFy beteken dat dit nodig is om die probleem op te los om vloei na minifi te lewer. Aangesien MiNiFy aktief in IOT gebruik word, is daar baie komponente en die proses om vloei na die finale minifi-instansies te lewer, moet geoutomatiseer word. 'n Bekende taak, reg?

Nog 'n subprojek sal help om hierdie probleem op te los - MiNiFi C2 Server. Hierdie produk is bedoel om die sentrale punt in die konfigurasie-ontplooiingsargitektuur te wees. Hoe om die omgewing op te stel - beskryf in Hierdie artikel Daar is genoeg inligting oor Habré om die probleem op te los. MiNiFi, in samewerking met die C2-bediener, dateer outomaties sy konfigurasie op. Die enigste nadeel van hierdie benadering is dat u sjablone op C2 Server moet skep; 'n eenvoudige verbintenis tot die register is nie genoeg nie.

Die opsie wat in die artikel hierbo beskryf word, werk en nie moeilik om te implementeer nie, maar ons moet nie die volgende vergeet nie:

  1. Minifi het nie alle verwerkers van Nifi nie
  2. Minifi-verwerkerweergawes lê agter NiFi-verwerkerweergawes.

Met die skryf hiervan is die nuutste weergawe van NiFi 1.9.2. Die nuutste MiNiFi-verwerkerweergawe is 1.7.0. Verwerkers kan by MiNiFi gevoeg word, maar as gevolg van weergawe verskille tussen NiFi en MiNiFi verwerkers, sal dit dalk nie werk nie.

NiFi CLI

Te oordeel aan beskrywing instrument op die amptelike webwerf, dit is 'n hulpmiddel vir die outomatisering van die interaksie tussen NiFI en NiFi Registry op die gebied van vloeilewering of prosesbestuur. Om te begin, moet jy hierdie hulpmiddel aflaai. vandaar.

Begin die hulpprogram

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Om die vereiste vloei vanaf die register te laai, moet ons die identifiseerders van die emmer (emmeridentifiseerder) en die vloei self (vloeiidentifiseerder) ken. Hierdie data kan verkry word deur die cli of in die NiFi-register-webkoppelvlak. In die webkoppelvlak lyk dit soos volg:

Outomatisering van vloeilewering in Apache NiFi

Deur die CLI te gebruik word dit gedoen:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Ons begin die invoer van prosesgroep vanaf die register:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

'n Belangrike punt is dat enige nifi-instansie gespesifiseer kan word as die gasheer waarheen ons die prosesgroep rol.

Prosesgroep bygevoeg met gestopte verwerkers, hulle moet begin word

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Groot, die verwerkers het begin. Volgens die bepalings van die taak het ons egter NiFi-gevalle nodig om data na ander instansies te stuur. Kom ons neem aan dat jy die Push-metode gekies het om data na die bediener oor te dra. Om data-oordrag te organiseer, moet jy data-oordrag aktiveer op die bygevoegde Remote Process Group (RPG), wat reeds by ons vloei ingesluit is.

Outomatisering van vloeilewering in Apache NiFi

In die dokumentasie in die CLI en ander bronne het ek nie 'n manier gevind om data-oordrag moontlik te maak nie. As jy weet hoe om dit te doen, skryf asseblief in die kommentaar.

Aangesien ons bash het en ons gereed is om na die einde te gaan, sal ons 'n uitweg vind! Jy kan die NiFi API gebruik om hierdie probleem op te los. Kom ons gebruik die volgende metode, neem die ID van die voorbeelde hierbo (in ons geval is dit 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrywing van NiFi API-metodes hier.

Outomatisering van vloeilewering in Apache NiFi
In die liggaam moet jy JSON slaag, soos volg:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameters wat ingevul moet word vir dit om te werk:
was - data-oordragstatus. Beskikbaar: TRANSMITTING om data-oordrag te aktiveer, GESTOP om te deaktiveer
weergawe - verwerker weergawe

weergawe sal verstek na 0 wanneer geskep, maar hierdie parameters kan verkry word met behulp van die metode

Outomatisering van vloeilewering in Apache NiFi

Vir aanhangers van bash-skrifte kan hierdie metode geskik lyk, maar dit is vir my 'n bietjie moeilik - bash-skrifte is nie my gunsteling nie. Die volgende metode is na my mening interessanter en geriefliker.

NiPyAPI

NiPyAPI is 'n Python-biblioteek vir interaksie met NiFi-gevalle. Dokumentasie bladsy bevat die nodige inligting om met die biblioteek te werk. Vinnige begin word beskryf in die konsep op github.

Ons skrif vir die uitrol van die konfigurasie is 'n program in Python. Kom ons gaan aan na kodering.
Ons stel konfigurasies op vir verdere werk. Ons sal die volgende parameters benodig:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Volgende sal ek die name van die metodes van hierdie biblioteek, wat beskryf word, invoeg hier.

Koppel die register aan die nifi-instansie met behulp van

nipyapi.versioning.create_registry_client

By hierdie stap kan u ook 'n kontrole byvoeg dat die register reeds by die instansie gevoeg is; hiervoor kan u die metode gebruik

nipyapi.versioning.list_registry_clients

Ons kry die emmer vir verdere soektog na vloei in die mandjie

nipyapi.versioning.get_registry_bucket

Deur die gevind emmer te gebruik, soek ons ​​vloei

nipyapi.versioning.get_flow_in_bucket

Vervolgens is dit belangrik om te verstaan ​​of hierdie prosesgroep reeds bygevoeg is. Die Prosesgroep word volgens koördinate geplaas en 'n situasie kan ontstaan ​​wanneer 'n tweede komponent bo-op een geplaas word. Ek het nagegaan, dit kan gebeur :) Om alle bygevoegde prosesgroepe te kry, gebruik ons ​​die metode

nipyapi.canvas.list_all_process_groups

Ons kan verder soek, byvoorbeeld op naam.

Ek sal nie die proses van die opdatering van die sjabloon beskryf nie, ek sal net sê dat as verwerkers in die nuwe weergawe van die sjabloon bygevoeg word, daar geen probleme is met die teenwoordigheid van boodskappe in die toue nie. Maar as verwerkers verwyder word, kan daar probleme ontstaan ​​(nifi laat jou nie toe om 'n verwerker te verwyder as 'n boodskapwag voor dit opgehoop het nie). As jy belangstel in hoe ek hierdie probleem opgelos het, skryf asseblief vir my en ons sal hierdie kwessie bespreek. Kontakte aan die einde van die artikel. Kom ons gaan aan na die stap om 'n prosesgroep by te voeg.

Toe ek die skrip ontfout, het ek 'n eienaardigheid teëgekom dat die nuutste weergawe van vloei nie altyd opgetrek word nie, so ek beveel aan om eers hierdie weergawe na te gaan:

nipyapi.versioning.get_latest_flow_ver

Ontplooi prosesgroep:

nipyapi.versioning.deploy_flow_version

Ons begin die verwerkers:

nipyapi.canvas.schedule_process_group

In die blok oor CLI is geskryf dat data-oordrag nie outomaties in die afgeleë prosesgroep geaktiveer word nie? Toe ek die skrip geïmplementeer het, het ek ook hierdie probleem ondervind. Op daardie tydstip kon ek nie data-oordrag met die API begin nie en ek het besluit om aan die ontwikkelaar van die NiPyAPI-biblioteek te skryf en vir raad/hulp te vra. Die ontwikkelaar het op my gereageer, ons het die probleem bespreek en hy het geskryf dat hy tyd nodig het om "iets na te gaan". En dan, 'n paar dae later, kom 'n brief waarin 'n funksie in Python geskryf is wat my bekendstellingsprobleem oplos!!! Op daardie tydstip was die NiPyAPI-weergawe 0.13.3 en natuurlik was daar niks so nie. Maar in weergawe 0.14.0, wat redelik onlangs vrygestel is, was hierdie funksie reeds by die biblioteek ingesluit. Ontmoet,

nipyapi.canvas.set_remote_process_group_transmission

Dus, deur die NiPyAPI-biblioteek te gebruik, het ons die register gekoppel, vloei uitgerol en selfs verwerkers en data-oordrag begin. Dan kan jy die kode fynkam, allerhande tjeks byvoeg, aanteken, en dit is al. Maar dit is 'n heeltemal ander storie.

Van die outomatiseringsopsies wat ek oorweeg het, het die laaste een vir my die doeltreffendste gelyk. Eerstens is dit steeds python-kode, waarin u hulpprogramkode kan insluit en voordeel kan trek uit al die voordele van die programmeertaal. Tweedens is die NiPyAPI-projek aktief besig om te ontwikkel en in geval van probleme kan jy aan die ontwikkelaar skryf. Derdens is NiPyAPI steeds 'n meer buigsame hulpmiddel om met NiFi te kommunikeer om komplekse probleme op te los. Byvoorbeeld, om te bepaal of die boodskaprye nou leeg is in die vloei en of die prosesgroep opgedateer kan word.

Dis al. Ek het 3 benaderings beskryf om vloeilewering in NiFi te outomatiseer, slaggate wat 'n ontwikkelaar kan teëkom, en werkskode verskaf vir die outomatisering van aflewering. As jy net so in hierdie onderwerp belangstel soos ek - skryf!

Bron: will.com

Voeg 'n opmerking