Aŭtomatigo de flua livero en Apache NiFi

Saluton ĉiuj!

Aŭtomatigo de flua livero en Apache NiFi

La tasko estas kiel sekvas - estas fluo, prezentita en la supra bildo, kiu devas esti disvastigita al N serviloj kun Apache NiFi. Fluotesto - dosiero estas generita kaj sendita al alia NiFi-instanco. Transdono de datumoj okazas per la protokolo NiFi Site al Site.

NiFi Site to Site (S2S) estas sekura, facile agordebla maniero transdoni datumojn inter NiFi-instancoj. Kiel S2S funkcias, vidu dokumentado kaj gravas ne forgesi agordi la NiFi-instancon por permesi S2S, vidu tie.

En kazoj, kie ni parolas pri transdono de datumoj per S2S, unu kazo nomiĝas kliento, la dua servilo. La kliento sendas datumojn, la servilo ricevas. Du manieroj agordi datumtranslokigon inter ili:

  1. puŝo. De la klienta kazo, datumoj estas senditaj uzante Remote Process Group (RPG). Sur la servila petskribo, datumoj estas ricevitaj per la Eniga Haveno
  2. tiri. La servilo ricevas datumojn per RPG, la kliento sendas per Eliga haveno.


Fluo por ruliĝo estas konservita en Apache Registry.

Apache NiFi Registry estas subprojekto de Apache NiFi kiu disponigas ilon por fluostokado kaj versiokontrolo. Ia GIT. Informoj pri instali, agordi kaj labori kun registro troviĝas en oficiala dokumentaro. Fluo por stokado estas kombinita en procezgrupon kaj stokita en ĉi tiu formo en la registro. Ni revenos al ĉi tio poste en la artikolo.

Komence, kiam N estas malgranda nombro, fluo estas liverita kaj ĝisdatigita permane en akceptebla tempo.

Sed kiam N kreskas, la problemoj fariĝas pli multaj:

  1. necesas pli da tempo por ĝisdatigi la fluon. Vi devas ensaluti en ĉiujn servilojn
  2. Okazas eraroj pri ŝablona ĝisdatigo. Ĉi tie ili ĝisdatigis ĝin, sed ĉi tie ili forgesis
  3. homaj eraroj farante grandan nombron da similaj operacioj

Ĉio ĉi alportas nin al la fakto, ke ni devas aŭtomatigi la procezon. Mi provis la jenajn manierojn solvi ĉi tiun problemon:

  1. Uzu MiNiFi anstataŭ NiFi
  2. NiFi CLI
  3. NiPyAPI

Uzante MiNiFi

Apache MiNiFy - subprojekto de Apache NiFi. MiNiFy estas kompakta agento, kiu uzas la samajn procesorojn kiel NiFi, permesante al vi krei la samajn fluojn kiel en NiFi. La malpeza naturo de la agento estas atingita, interalie, per la fakto, ke MiNiFy ne havas grafikan interfacon por flua agordo. La manko de grafika interfaco en MiNiFy signifas, ke necesas solvi la problemon de liverado de fluo al minifi. Ĉar MiNiFy estas aktive uzata en IOT, ekzistas multaj komponentoj kaj la procezo de liverado de fluo al la finaj minifi-kazoj devas esti aŭtomatigita. Konata tasko, ĉu ne?

Alia subprojekto helpos solvi ĉi tiun problemon - MiNiFi C2 Server. Ĉi tiu produkto intencas esti la centra punkto en la agorda lanĉa arkitekturo. Kiel agordi la medion - priskribite en ĉi tiu artikolo Estas sufiĉe da informoj pri Habré por solvi la problemon. MiNiFi, kune kun la C2-servilo, aŭtomate ĝisdatigas ĝian agordon. La sola malavantaĝo de ĉi tiu aliro estas, ke vi devas krei ŝablonojn sur C2-Servilo; simpla engaĝiĝo al la registro ne sufiĉas.

La opcio priskribita en la supra artikolo funkcias kaj ne malfacile efektivigebla, sed ni ne devas forgesi la jenon:

  1. Minifi ne havas ĉiujn procesorojn de nifi
  2. Minifi-procesorversioj postrestas malantaŭ NiFi-procesorversioj.

Al la horo de skribado, la plej nova versio de NiFi estas 1.9.2. La plej nova versio de procesoro MiNiFi estas 1.7.0. Procesoroj povas esti aldonitaj al MiNiFi, sed pro versiaj diferencoj inter NiFi kaj MiNiFi-procesoroj, tio eble ne funkcias.

NiFi CLI

Juĝante per Priskribo ilo en la oficiala retejo, ĉi tio estas ilo por aŭtomatigi la interagon inter NiFI kaj NiFi Registry en la kampo de fluo-liverado aŭ proceza administrado. Por komenci, vi devas elŝuti ĉi tiun ilon. de ĉi tie.

Lanĉu la utilecon

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Por ke ni ŝarĝu la bezonatan fluon de la registro, ni devas scii la identigilojn de la sitelo (sitelo-identigilo) kaj la fluon mem (flua identigilo). Ĉi tiuj datumoj povas esti akiritaj aŭ per la cli aŭ en la interfaco de la registro NiFi. En la retinterfaco ĝi aspektas jene:

Aŭtomatigo de flua livero en Apache NiFi

Uzante la CLI ĉi tio estas farita:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Ni komencas importi procezgrupon el registro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Grava punkto estas, ke iu ajn nifi-okazaĵo povas esti specifita kiel la gastiganto al kiu ni ruliĝas la procezgrupon.

Proceza grupo aldonita kun haltigitaj procesoroj, ili devas esti komencitaj

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Bonege, la procesoroj komenciĝis. Tamen, laŭ la kondiĉoj de la tasko, ni bezonas NiFi-instancojn por sendi datumojn al aliaj instancoj. Ni supozu, ke vi elektis la Push-metodon por transdoni datumojn al la servilo. Por organizi transdonon de datumoj, vi devas ebligi transdonon de datumoj sur la aldonita Remote Process Group (RPG), kiu jam estas inkluzivita en nia fluo.

Aŭtomatigo de flua livero en Apache NiFi

En la dokumentado en la CLI kaj aliaj fontoj, mi ne trovis manieron ebligi datumojn. Se vi scias kiel fari tion, bonvolu skribi en la komentoj.

Ĉar ni havas baton kaj ni pretas iri ĝis la fino, ni trovos eliron! Vi povas uzi la NiFi API por solvi ĉi tiun problemon. Ni uzu la sekvan metodon, prenu la ID el la supraj ekzemploj (en nia kazo ĝi estas 7f522a13-016e-1000-e504-d5b15587f2f3). Priskribo de NiFi API-metodoj tie.

Aŭtomatigo de flua livero en Apache NiFi
En la korpo vi devas pasi JSON, jene:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametroj kiuj devas esti plenigitaj por ke ĝi funkciu:
ŝtato - stato de transdono de datumoj. Disponebla: TRANSMITANTA por ebligi transdonon de datumoj, STOPPED por malŝalti
versio - versio de procesoro

versio defaŭlte al 0 kiam kreata, sed ĉi tiuj parametroj povas esti akiritaj per la metodo

Aŭtomatigo de flua livero en Apache NiFi

Por ŝatantoj de bash-skriptoj, ĉi tiu metodo eble ŝajnas taŭga, sed ĝi estas iom malfacila por mi - bash-skriptoj ne estas miaj plej ŝatataj. La sekva metodo estas pli interesa kaj oportuna laŭ mi.

NiPyAPI

NiPyAPI estas Python-biblioteko por interagado kun NiFi-kazoj. Dokumenta paĝo enhavas la necesajn informojn por labori kun la biblioteko. Rapida komenco estas priskribita en projekto sur github.

Nia skripto por lanĉi la agordon estas programo en Python. Ni transiru al kodigo.
Ni starigis agordojn por plua laboro. Ni bezonos la jenajn parametrojn:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Poste mi enmetos la nomojn de la metodoj de ĉi tiu biblioteko, kiuj estas priskribitaj tie.

Konekti la registron al la nifi-instanco uzante

nipyapi.versioning.create_registry_client

Je ĉi tiu paŝo, vi ankaŭ povas aldoni kontrolon, ke la registro jam aldoniĝis al la petskribo; por tio vi povas uzi la metodon

nipyapi.versioning.list_registry_clients

Ni trovas la sitelon por plua serĉo de fluo en la korbo

nipyapi.versioning.get_registry_bucket

Uzante la trovitan sitelon, ni serĉas fluon

nipyapi.versioning.get_flow_in_bucket

Poste, estas grave kompreni ĉu ĉi tiu proceza grupo jam estis aldonita. La Proceza grupo estas metita laŭ koordinatoj kaj situacio povas ekesti kiam dua komponanto estas supermetita super unu. Mi kontrolis, tio povas okazi :) Por akiri ĉiujn aldonitajn procezgrupojn ni uzas la metodon

nipyapi.canvas.list_all_process_groups

Ni povas plu serĉi, ekzemple, laŭnome.

Mi ne priskribos la procezon de ĝisdatigo de la ŝablono, mi nur diros, ke se oni aldonas procesorojn en la nova versio de la ŝablono, tiam ne estas problemoj pri la ĉeesto de mesaĝoj en la vostoj. Sed se procesoroj estas forigitaj, tiam povas aperi problemoj (nifi ne permesas al vi forigi procesoron, se antaŭ ĝi akumuliĝis mesaĝvico). Se vi interesiĝas pri kiel mi solvis ĉi tiun problemon, bonvolu skribi al mi kaj ni diskutos ĉi tiun aferon. Kontaktoj ĉe la fino de la artikolo. Ni transiru al la paŝo aldoni procezgrupon.

Sencimigante la skripton, mi trovis proprecon, ke la plej nova versio de fluo ne ĉiam estas eltirita, do mi rekomendas unue kontroli ĉi tiun version:

nipyapi.versioning.get_latest_flow_ver

Deploji procezgrupon:

nipyapi.versioning.deploy_flow_version

Ni startas la procesorojn:

nipyapi.canvas.schedule_process_group

En la bloko pri CLI estis skribite, ke datumtransigo ne estas aŭtomate ebligita en la fora proceza grupo? Dum la efektivigo de la skripto, mi ankaŭ renkontis ĉi tiun problemon. Tiutempe, mi ne povis komenci datumojn per la API kaj mi decidis skribi al la programisto de la biblioteko NiPyAPI kaj peti konsilon/helpon. La programisto respondis al mi, ni diskutis la problemon kaj li skribis, ke li bezonas tempon por "kontroli ion". Kaj tiam, kelkajn tagojn poste, alvenas letero en kiu estas skribita en Python funkcio kiu solvas mian lanĉan problemon!!! Tiutempe, la versio de NiPyAPI estis 0.13.3 kaj, kompreneble, estis nenio tia. Sed en la versio 0.14.0, kiu estis publikigita sufiĉe lastatempe, ĉi tiu funkcio jam estis inkluzivita en la biblioteko. Renkontu,

nipyapi.canvas.set_remote_process_group_transmission

Do, uzante la NiPyAPI-bibliotekon, ni konektis la registron, eligis fluon, kaj eĉ komencis procesorojn kaj transdonon de datumoj. Tiam vi povas kombi la kodon, aldoni ĉiajn kontrolojn, ensaluti, kaj jen ĉio. Sed tio estas tute alia historio.

El la aŭtomatigaj opcioj, kiujn mi konsideris, la lasta ŝajnis al mi la plej efika. Unue, ĉi tio ankoraŭ estas python-kodo, en kiu vi povas enmeti helpan programkodon kaj profiti ĉiujn avantaĝojn de la programlingvo. Due, la projekto NiPyAPI aktive disvolviĝas kaj en kazo de problemoj vi povas skribi al la programisto. Trie, NiPyAPI estas ankoraŭ pli fleksebla ilo por interagi kun NiFi por solvi kompleksajn problemojn. Ekzemple, en determini ĉu la mesaĝvostoj nun estas malplenaj en la fluo kaj ĉu la procezgrupo povas esti ĝisdatigita.

Tio estas ĉio. Mi priskribis 3 alirojn por aŭtomatigi fluan liveron en NiFi, malfacilaĵojn, kiujn programisto povas renkonti, kaj disponigis laborkodon por aŭtomatigi liveron. Se vi interesiĝas pri ĉi tiu temo kiel mi - skribu!

fonto: www.habr.com

Aldoni komenton