Saluton ĉiuj!
La tasko estas kiel sekvas - estas fluo, prezentita en la supra bildo, kiu devas esti disvastigita al N serviloj kun
NiFi Site to Site (S2S) estas sekura, facile agordebla maniero transdoni datumojn inter NiFi-instancoj. Kiel S2S funkcias, vidu
En kazoj, kie ni parolas pri transdono de datumoj per S2S, unu kazo nomiĝas kliento, la dua servilo. La kliento sendas datumojn, la servilo ricevas. Du manieroj agordi datumtranslokigon inter ili:
- puŝo. De la klienta kazo, datumoj estas senditaj uzante Remote Process Group (RPG). Sur la servila petskribo, datumoj estas ricevitaj per la Eniga Haveno
- tiri. La servilo ricevas datumojn per RPG, la kliento sendas per Eliga haveno.
Fluo por ruliĝo estas konservita en Apache Registry.
Apache NiFi Registry estas subprojekto de Apache NiFi kiu disponigas ilon por fluostokado kaj versiokontrolo. Ia GIT. Informoj pri instali, agordi kaj labori kun registro troviĝas en
Komence, kiam N estas malgranda nombro, fluo estas liverita kaj ĝisdatigita permane en akceptebla tempo.
Sed kiam N kreskas, la problemoj fariĝas pli multaj:
- necesas pli da tempo por ĝisdatigi la fluon. Vi devas ensaluti en ĉiujn servilojn
- Okazas eraroj pri ŝablona ĝisdatigo. Ĉi tie ili ĝisdatigis ĝin, sed ĉi tie ili forgesis
- homaj eraroj farante grandan nombron da similaj operacioj
Ĉio ĉi alportas nin al la fakto, ke ni devas aŭtomatigi la procezon. Mi provis la jenajn manierojn solvi ĉi tiun problemon:
- Uzu MiNiFi anstataŭ NiFi
- NiFi CLI
- NiPyAPI
Uzante MiNiFi
Alia subprojekto helpos solvi ĉi tiun problemon - MiNiFi C2 Server. Ĉi tiu produkto intencas esti la centra punkto en la agorda lanĉa arkitekturo. Kiel agordi la medion - priskribite en
La opcio priskribita en la supra artikolo funkcias kaj ne malfacile efektivigebla, sed ni ne devas forgesi la jenon:
- Minifi ne havas ĉiujn procesorojn de nifi
- Minifi-procesorversioj postrestas malantaŭ NiFi-procesorversioj.
Al la horo de skribado, la plej nova versio de NiFi estas 1.9.2. La plej nova versio de procesoro MiNiFi estas 1.7.0. Procesoroj povas esti aldonitaj al MiNiFi, sed pro versiaj diferencoj inter NiFi kaj MiNiFi-procesoroj, tio eble ne funkcias.
NiFi CLI
Juĝante per
Lanĉu la utilecon
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Por ke ni ŝarĝu la bezonatan fluon de la registro, ni devas scii la identigilojn de la sitelo (sitelo-identigilo) kaj la fluon mem (flua identigilo). Ĉi tiuj datumoj povas esti akiritaj aŭ per la cli aŭ en la interfaco de la registro NiFi. En la retinterfaco ĝi aspektas jene:
Uzante la CLI ĉi tio estas farita:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Ni komencas importi procezgrupon el registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Grava punkto estas, ke iu ajn nifi-okazaĵo povas esti specifita kiel la gastiganto al kiu ni ruliĝas la procezgrupon.
Proceza grupo aldonita kun haltigitaj procesoroj, ili devas esti komencitaj
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Bonege, la procesoroj komenciĝis. Tamen, laŭ la kondiĉoj de la tasko, ni bezonas NiFi-instancojn por sendi datumojn al aliaj instancoj. Ni supozu, ke vi elektis la Push-metodon por transdoni datumojn al la servilo. Por organizi transdonon de datumoj, vi devas ebligi transdonon de datumoj sur la aldonita Remote Process Group (RPG), kiu jam estas inkluzivita en nia fluo.
En la dokumentado en la CLI kaj aliaj fontoj, mi ne trovis manieron ebligi datumojn. Se vi scias kiel fari tion, bonvolu skribi en la komentoj.
Ĉar ni havas baton kaj ni pretas iri ĝis la fino, ni trovos eliron! Vi povas uzi la NiFi API por solvi ĉi tiun problemon. Ni uzu la sekvan metodon, prenu la ID el la supraj ekzemploj (en nia kazo ĝi estas 7f522a13-016e-1000-e504-d5b15587f2f3). Priskribo de NiFi API-metodoj
En la korpo vi devas pasi JSON, jene:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametroj kiuj devas esti plenigitaj por ke ĝi funkciu:
ŝtato - stato de transdono de datumoj. Disponebla: TRANSMITANTA por ebligi transdonon de datumoj, STOPPED por malŝalti
versio - versio de procesoro
versio defaŭlte al 0 kiam kreata, sed ĉi tiuj parametroj povas esti akiritaj per la metodo
Por ŝatantoj de bash-skriptoj, ĉi tiu metodo eble ŝajnas taŭga, sed ĝi estas iom malfacila por mi - bash-skriptoj ne estas miaj plej ŝatataj. La sekva metodo estas pli interesa kaj oportuna laŭ mi.
NiPyAPI
NiPyAPI estas Python-biblioteko por interagado kun NiFi-kazoj.
Nia skripto por lanĉi la agordon estas programo en Python. Ni transiru al kodigo.
Ni starigis agordojn por plua laboro. Ni bezonos la jenajn parametrojn:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Poste mi enmetos la nomojn de la metodoj de ĉi tiu biblioteko, kiuj estas priskribitaj
Konekti la registron al la nifi-instanco uzante
nipyapi.versioning.create_registry_client
Je ĉi tiu paŝo, vi ankaŭ povas aldoni kontrolon, ke la registro jam aldoniĝis al la petskribo; por tio vi povas uzi la metodon
nipyapi.versioning.list_registry_clients
Ni trovas la sitelon por plua serĉo de fluo en la korbo
nipyapi.versioning.get_registry_bucket
Uzante la trovitan sitelon, ni serĉas fluon
nipyapi.versioning.get_flow_in_bucket
Poste, estas grave kompreni ĉu ĉi tiu proceza grupo jam estis aldonita. La Proceza grupo estas metita laŭ koordinatoj kaj situacio povas ekesti kiam dua komponanto estas supermetita super unu. Mi kontrolis, tio povas okazi :) Por akiri ĉiujn aldonitajn procezgrupojn ni uzas la metodon
nipyapi.canvas.list_all_process_groups
Ni povas plu serĉi, ekzemple, laŭnome.
Mi ne priskribos la procezon de ĝisdatigo de la ŝablono, mi nur diros, ke se oni aldonas procesorojn en la nova versio de la ŝablono, tiam ne estas problemoj pri la ĉeesto de mesaĝoj en la vostoj. Sed se procesoroj estas forigitaj, tiam povas aperi problemoj (nifi ne permesas al vi forigi procesoron, se antaŭ ĝi akumuliĝis mesaĝvico). Se vi interesiĝas pri kiel mi solvis ĉi tiun problemon, bonvolu skribi al mi kaj ni diskutos ĉi tiun aferon. Kontaktoj ĉe la fino de la artikolo. Ni transiru al la paŝo aldoni procezgrupon.
Sencimigante la skripton, mi trovis proprecon, ke la plej nova versio de fluo ne ĉiam estas eltirita, do mi rekomendas unue kontroli ĉi tiun version:
nipyapi.versioning.get_latest_flow_ver
Deploji procezgrupon:
nipyapi.versioning.deploy_flow_version
Ni startas la procesorojn:
nipyapi.canvas.schedule_process_group
En la bloko pri CLI estis skribite, ke datumtransigo ne estas aŭtomate ebligita en la fora proceza grupo? Dum la efektivigo de la skripto, mi ankaŭ renkontis ĉi tiun problemon. Tiutempe, mi ne povis komenci datumojn per la API kaj mi decidis skribi al la programisto de la biblioteko NiPyAPI kaj peti konsilon/helpon. La programisto respondis al mi, ni diskutis la problemon kaj li skribis, ke li bezonas tempon por "kontroli ion". Kaj tiam, kelkajn tagojn poste, alvenas letero en kiu estas skribita en Python funkcio kiu solvas mian lanĉan problemon!!! Tiutempe, la versio de NiPyAPI estis 0.13.3 kaj, kompreneble, estis nenio tia. Sed en la versio 0.14.0, kiu estis publikigita sufiĉe lastatempe, ĉi tiu funkcio jam estis inkluzivita en la biblioteko. Renkontu,
nipyapi.canvas.set_remote_process_group_transmission
Do, uzante la NiPyAPI-bibliotekon, ni konektis la registron, eligis fluon, kaj eĉ komencis procesorojn kaj transdonon de datumoj. Tiam vi povas kombi la kodon, aldoni ĉiajn kontrolojn, ensaluti, kaj jen ĉio. Sed tio estas tute alia historio.
El la aŭtomatigaj opcioj, kiujn mi konsideris, la lasta ŝajnis al mi la plej efika. Unue, ĉi tio ankoraŭ estas python-kodo, en kiu vi povas enmeti helpan programkodon kaj profiti ĉiujn avantaĝojn de la programlingvo. Due, la projekto NiPyAPI aktive disvolviĝas kaj en kazo de problemoj vi povas skribi al la programisto. Trie, NiPyAPI estas ankoraŭ pli fleksebla ilo por interagi kun NiFi por solvi kompleksajn problemojn. Ekzemple, en determini ĉu la mesaĝvostoj nun estas malplenaj en la fluo kaj ĉu la procezgrupo povas esti ĝisdatigita.
Tio estas ĉio. Mi priskribis 3 alirojn por aŭtomatigi fluan liveron en NiFi, malfacilaĵojn, kiujn programisto povas renkonti, kaj disponigis laborkodon por aŭtomatigi liveron. Se vi interesiĝas pri ĉi tiu temo kiel mi -
fonto: www.habr.com