Hallo almal!
Die taak is soos volg - daar is 'n vloei, aangebied in die prentjie hierbo, wat na N bedieners uitgerol moet word met
NiFi Site to Site (S2S) is 'n veilige, maklik konfigureerbare manier om data tussen NiFi-instansies oor te dra. Hoe S2S werk, sien
In gevalle waar ons praat oor data-oordrag met behulp van S2S, word een geval kliënt genoem, die tweede bediener. Die kliënt stuur data, die bediener ontvang. Twee maniere om data-oordrag tussen hulle op te stel:
- Stoot. Vanaf die kliëntinstansie word data gestuur met behulp van 'n Remote Process Group (RPG). Op die bedienergeval word data ontvang deur die invoerpoort te gebruik
- Trek. Die bediener ontvang data met behulp van RPG, die kliënt stuur met die uitsetpoort.
Vloei vir uitrol word in Apache-register gestoor.
Apache NiFi Registry is 'n subprojek van Apache NiFi wat 'n instrument bied vir vloeiberging en weergawebeheer. 'n Soort GIT. Inligting oor die installering, konfigurasie en werk met register kan gevind word in
Aan die begin, wanneer N 'n klein getal is, word vloei gelewer en met die hand bygewerk binne 'n aanvaarbare tyd.
Maar soos N groei, word die probleme meer:
- dit neem meer tyd om die vloei op te dateer. Jy moet by alle bedieners aanmeld
- Sjabloonopdateringsfoute kom voor. Hier het hulle dit bygewerk, maar hier het hulle vergeet
- menslike foute wanneer 'n groot aantal soortgelyke operasies uitgevoer word
Dit alles bring ons by die feit dat ons die proses moet outomatiseer. Ek het die volgende maniere probeer om hierdie probleem op te los:
- Gebruik MiNiFi in plaas van NiFi
- NiFi CLI
- NiPyAPI
Gebruik MiNiFi
Nog 'n subprojek sal help om hierdie probleem op te los - MiNiFi C2 Server. Hierdie produk is bedoel om die sentrale punt in die konfigurasie-ontplooiingsargitektuur te wees. Hoe om die omgewing op te stel - beskryf in
Die opsie wat in die artikel hierbo beskryf word, werk en nie moeilik om te implementeer nie, maar ons moet nie die volgende vergeet nie:
- Minifi het nie alle verwerkers van Nifi nie
- Minifi-verwerkerweergawes lê agter NiFi-verwerkerweergawes.
Met die skryf hiervan is die nuutste weergawe van NiFi 1.9.2. Die nuutste MiNiFi-verwerkerweergawe is 1.7.0. Verwerkers kan by MiNiFi gevoeg word, maar as gevolg van weergawe verskille tussen NiFi en MiNiFi verwerkers, sal dit dalk nie werk nie.
NiFi CLI
Te oordeel aan
Begin die hulpprogram
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Om die vereiste vloei vanaf die register te laai, moet ons die identifiseerders van die emmer (emmeridentifiseerder) en die vloei self (vloeiidentifiseerder) ken. Hierdie data kan verkry word deur die cli of in die NiFi-register-webkoppelvlak. In die webkoppelvlak lyk dit soos volg:
Deur die CLI te gebruik word dit gedoen:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Ons begin die invoer van prosesgroep vanaf die register:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
'n Belangrike punt is dat enige nifi-instansie gespesifiseer kan word as die gasheer waarheen ons die prosesgroep rol.
Prosesgroep bygevoeg met gestopte verwerkers, hulle moet begin word
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Groot, die verwerkers het begin. Volgens die bepalings van die taak het ons egter NiFi-gevalle nodig om data na ander instansies te stuur. Kom ons neem aan dat jy die Push-metode gekies het om data na die bediener oor te dra. Om data-oordrag te organiseer, moet jy data-oordrag aktiveer op die bygevoegde Remote Process Group (RPG), wat reeds by ons vloei ingesluit is.
In die dokumentasie in die CLI en ander bronne het ek nie 'n manier gevind om data-oordrag moontlik te maak nie. As jy weet hoe om dit te doen, skryf asseblief in die kommentaar.
Aangesien ons bash het en ons gereed is om na die einde te gaan, sal ons 'n uitweg vind! Jy kan die NiFi API gebruik om hierdie probleem op te los. Kom ons gebruik die volgende metode, neem die ID van die voorbeelde hierbo (in ons geval is dit 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrywing van NiFi API-metodes
In die liggaam moet jy JSON slaag, soos volg:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameters wat ingevul moet word vir dit om te werk:
was - data-oordragstatus. Beskikbaar: TRANSMITTING om data-oordrag te aktiveer, GESTOP om te deaktiveer
weergawe - verwerker weergawe
weergawe sal verstek na 0 wanneer geskep, maar hierdie parameters kan verkry word met behulp van die metode
Vir aanhangers van bash-skrifte kan hierdie metode geskik lyk, maar dit is vir my 'n bietjie moeilik - bash-skrifte is nie my gunsteling nie. Die volgende metode is na my mening interessanter en geriefliker.
NiPyAPI
NiPyAPI is 'n Python-biblioteek vir interaksie met NiFi-gevalle.
Ons skrif vir die uitrol van die konfigurasie is 'n program in Python. Kom ons gaan aan na kodering.
Ons stel konfigurasies op vir verdere werk. Ons sal die volgende parameters benodig:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Volgende sal ek die name van die metodes van hierdie biblioteek, wat beskryf word, invoeg
Koppel die register aan die nifi-instansie met behulp van
nipyapi.versioning.create_registry_client
By hierdie stap kan u ook 'n kontrole byvoeg dat die register reeds by die instansie gevoeg is; hiervoor kan u die metode gebruik
nipyapi.versioning.list_registry_clients
Ons kry die emmer vir verdere soektog na vloei in die mandjie
nipyapi.versioning.get_registry_bucket
Deur die gevind emmer te gebruik, soek ons vloei
nipyapi.versioning.get_flow_in_bucket
Vervolgens is dit belangrik om te verstaan of hierdie prosesgroep reeds bygevoeg is. Die Prosesgroep word volgens koördinate geplaas en 'n situasie kan ontstaan wanneer 'n tweede komponent bo-op een geplaas word. Ek het nagegaan, dit kan gebeur :) Om alle bygevoegde prosesgroepe te kry, gebruik ons die metode
nipyapi.canvas.list_all_process_groups
Ons kan verder soek, byvoorbeeld op naam.
Ek sal nie die proses van die opdatering van die sjabloon beskryf nie, ek sal net sê dat as verwerkers in die nuwe weergawe van die sjabloon bygevoeg word, daar geen probleme is met die teenwoordigheid van boodskappe in die toue nie. Maar as verwerkers verwyder word, kan daar probleme ontstaan (nifi laat jou nie toe om 'n verwerker te verwyder as 'n boodskapwag voor dit opgehoop het nie). As jy belangstel in hoe ek hierdie probleem opgelos het, skryf asseblief vir my en ons sal hierdie kwessie bespreek. Kontakte aan die einde van die artikel. Kom ons gaan aan na die stap om 'n prosesgroep by te voeg.
Toe ek die skrip ontfout, het ek 'n eienaardigheid teëgekom dat die nuutste weergawe van vloei nie altyd opgetrek word nie, so ek beveel aan om eers hierdie weergawe na te gaan:
nipyapi.versioning.get_latest_flow_ver
Ontplooi prosesgroep:
nipyapi.versioning.deploy_flow_version
Ons begin die verwerkers:
nipyapi.canvas.schedule_process_group
In die blok oor CLI is geskryf dat data-oordrag nie outomaties in die afgeleë prosesgroep geaktiveer word nie? Toe ek die skrip geïmplementeer het, het ek ook hierdie probleem ondervind. Op daardie tydstip kon ek nie data-oordrag met die API begin nie en ek het besluit om aan die ontwikkelaar van die NiPyAPI-biblioteek te skryf en vir raad/hulp te vra. Die ontwikkelaar het op my gereageer, ons het die probleem bespreek en hy het geskryf dat hy tyd nodig het om "iets na te gaan". En dan, 'n paar dae later, kom 'n brief waarin 'n funksie in Python geskryf is wat my bekendstellingsprobleem oplos!!! Op daardie tydstip was die NiPyAPI-weergawe 0.13.3 en natuurlik was daar niks so nie. Maar in weergawe 0.14.0, wat redelik onlangs vrygestel is, was hierdie funksie reeds by die biblioteek ingesluit. Ontmoet,
nipyapi.canvas.set_remote_process_group_transmission
Dus, deur die NiPyAPI-biblioteek te gebruik, het ons die register gekoppel, vloei uitgerol en selfs verwerkers en data-oordrag begin. Dan kan jy die kode fynkam, allerhande tjeks byvoeg, aanteken, en dit is al. Maar dit is 'n heeltemal ander storie.
Van die outomatiseringsopsies wat ek oorweeg het, het die laaste een vir my die doeltreffendste gelyk. Eerstens is dit steeds python-kode, waarin u hulpprogramkode kan insluit en voordeel kan trek uit al die voordele van die programmeertaal. Tweedens is die NiPyAPI-projek aktief besig om te ontwikkel en in geval van probleme kan jy aan die ontwikkelaar skryf. Derdens is NiPyAPI steeds 'n meer buigsame hulpmiddel om met NiFi te kommunikeer om komplekse probleme op te los. Byvoorbeeld, om te bepaal of die boodskaprye nou leeg is in die vloei en of die prosesgroep opgedateer kan word.
Dis al. Ek het 3 benaderings beskryf om vloeilewering in NiFi te outomatiseer, slaggate wat 'n ontwikkelaar kan teëkom, en werkskode verskaf vir die outomatisering van aflewering. As jy net so in hierdie onderwerp belangstel soos ek -
Bron: will.com