Automatizazione di a consegna di flussu in Apache NiFi

Bonghjornu ognunu!

Automatizazione di a consegna di flussu in Apache NiFi

U compitu hè a siguenti - ci hè un flussu, prisentatu in a stampa sopra, chì deve esse sbulicatu à N servitori cù Apache NiFi. Test di flussu - un schedariu hè generatu è mandatu à un altru esempiu NiFi. U trasferimentu di dati si faci cù u protocolu NiFi Site à Site.

NiFi Site to Site (S2S) hè un modu sicuru, facilmente configurabile per trasfiriri dati trà istanze NiFi. Cumu S2S travaglia, vede ducumentazione è hè impurtante micca di scurdà di cunfigurà l'istanza NiFi per permette S2S, vede ccà.

In i casi induve parlemu di trasferimentu di dati cù S2S, un esempiu hè chjamatu cliente, u sicondu servitore. U cliente manda dati, u servitore riceve. Dui manere di cunfigurà u trasferimentu di dati trà elli:

  1. Push. Da l'istanza di u cliente, i dati sò mandati cù un Gruppu di Processu Remote (RPG). In l'istanza di u servitore, i dati sò ricevuti utilizendu u Portu di Input
  2. Canapé. U servitore riceve dati cù RPG, u cliente manda cù u portu di Output.


U flussu per u rollu hè almacenatu in u Registru Apache.

Apache NiFi Registry hè un sottuprogettu di Apache NiFi chì furnisce un strumentu per u almacenamentu di flussu è u cuntrollu di versione. Una sorta di GIT. L'infurmazione nantu à a stallazione, a cunfigurazione è u travagliu cù u registru pò esse truvata in documentazione ufficiale. U flussu per u almacenamentu hè cumminatu in un gruppu di prucessu è guardatu in questa forma in u registru. Avemu da vultà à questu più tardi in l'articulu.

À u principiu, quandu N hè un picculu numeru, u flussu hè furnitu è ​​aghjurnatu manualmente in un tempu accettabile.

Ma cum'è N cresce, i prublemi diventanu più numerosi:

  1. ci vole più tempu per aghjurnà u flussu. Avete bisognu di log in tutti i servitori
  2. L'errore di l'aghjurnamentu di u mudellu si verificanu. Eccu l'aghjurnà, ma quì si scurdanu
  3. errori umani quandu facenu un gran numaru di operazioni simili

Tuttu chistu ci porta à u fattu chì avemu bisognu di automatizà u prucessu. Aghju pruvatu i seguenti modi per risolve stu prublema:

  1. Aduprate MiNiFi invece di NiFi
  2. NiFi CLI
  3. NiPyAPI

Utilizendu MiNiFi

Apache MiNiFy - sottuprogettu di Apache NiFi. MiNiFy hè un agentu compactu chì usa i stessi processori cum'è NiFi, chì permette di creà i stessi flussi cum'è in NiFi. A natura ligera di l'agente hè ottenuta, frà altri cose, da u fattu chì MiNiFy ùn hà micca una interfaccia gràfica per a cunfigurazione di u flussu. A mancanza di una interfaccia grafica in MiNiFy significa chì hè necessariu di risolve u prublema di furnisce u flussu à minifi. Siccomu MiNiFy hè attivamente utilizatu in IOT, ci sò parechji cumpunenti è u prucessu di furnisce u flussu à l'istanze finali minifi deve esse automatizatu. Un compitu famusu, nò?

Un altru sottuprogettu aiutarà à risolve stu prublema - MiNiFi C2 Server. Stu pruduttu hè destinatu à esse u puntu centrale in l'architettura di rollout di cunfigurazione. Cumu cunfigurà l'ambiente - descrittu in stu articulu Ci hè abbastanza infurmazione nantu à Habré per risolve u prublema. MiNiFi, in cunghjunzione cù u servitore C2, aghjurnà automaticamente a so cunfigurazione. L'unicu inconveniente di questu approcciu hè chì avete da creà mudelli nantu à C2 Server; un impegnu simplice à u registru ùn hè micca abbastanza.

L'opzione descritta in l'articulu sopra hè travagliatu è ùn hè micca difficiule di implementà, ma ùn deve micca scurdate di ciò chì segue:

  1. Minifi ùn hà micca tutti i processori da nifi
  2. E versioni di u processore Minifi sò in daretu à e versioni di u processore NiFi.

À u mumentu di a scrittura, l'ultima versione di NiFi hè 1.9.2. L'ultima versione di processore MiNiFi hè 1.7.0. I prucessori ponu esse aghjuntu à MiNiFi, ma per via di discrepanze di versione trà i prucessori NiFi è MiNiFi, questu ùn pò micca travaglià.

NiFi CLI

A ghjudicà per descrizzione strumentu nantu à u situ web ufficiale, questu hè un strumentu per automatizà l'interazzione trà NiFI è NiFi Registru in u campu di a spedizione di flussu o a gestione di prucessu. Per principià, avete bisognu di scaricà stu strumentu. da quì.

Lanciate l'utilità

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Per noi per carricà u flussu necessariu da u registru, avemu bisognu di cunnosce l'identificatori di u bucket (identificatore di bucket) è u flussu stessu (identificatore di flussu). Queste dati ponu esse ottenuti sia attraversu u cli sia in l'interfaccia web di u registru NiFi. In l'interfaccia web si vede cusì:

Automatizazione di a consegna di flussu in Apache NiFi

Utilizendu a CLI questu hè fattu:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Cuminciamu à impurtà u gruppu di prucessu da u registru:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un puntu impurtante hè chì ogni istanza di nifi pò esse specificatu cum'è l'ospitu à quale avemu rollu u gruppu di prucessu.

Gruppu di prucessu aghjuntu cù i prucessori fermati, anu da esse cuminciatu

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Grande, i processori sò cuminciati. Tuttavia, sicondu i termini di u compitu, avemu bisognu di istanze NiFi per mandà dati à altri casi. Assumimu chì avete sceltu u metudu Push per trasfiriri dati à u servitore. Per urganizà u trasferimentu di dati, avete bisognu di attivà u trasferimentu di dati nantu à u Gruppu di Process Remote (RPG) aghjuntu, chì hè digià inclusu in u nostru flussu.

Automatizazione di a consegna di flussu in Apache NiFi

In a documentazione in u CLI è altre fonti, ùn aghju micca truvatu un modu per attivà u trasferimentu di dati. Se sapete cumu fà questu, scrivite in i cumenti.

Siccomu avemu bash è simu pronti per andà finu à a fine, truveremu una manera di esce! Pudete aduprà l'API NiFi per risolve stu prublema. Utilizemu u metudu seguente, pigliate l'ID da l'esempii sopra (in u nostru casu hè 7f522a13-016e-1000-e504-d5b15587f2f3). Descrizzione di i metudi NiFi API ccà.

Automatizazione di a consegna di flussu in Apache NiFi
In u corpu avete bisognu di passà JSON, cusì:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri chì deve esse cumpletu per u travagliu:
statu - u statu di trasferimentu di dati. Disponibile: TRASMISSIONE per attivà u trasferimentu di dati, STOPPED per disattivà
versione - versione di processore

A versione predeterminata à 0 quandu hè creata, ma sti paràmetri ponu esse ottenuti cù u metudu

Automatizazione di a consegna di flussu in Apache NiFi

Per i fan di bash scripts, stu metudu pò esse adattatu, ma hè un pocu difficiule per mè - bash scripts ùn sò micca u mo preferitu. U metudu prossimu hè più interessante è cunvene in u mo parè.

NiPyAPI

NiPyAPI hè una libreria Python per interagisce cù istanze NiFi. Pagina di documentazione cuntene l'infurmazioni necessarii per travaglià cù a biblioteca. A partenza rapida hè descritta in prughjettu nantu à github.

U nostru script per rializà a cunfigurazione hè un prugramma in Python. Passemu à a codificazione.
Avemu stallatu cunfigurazioni per u travagliu più. Avemu bisognu di i seguenti parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

In seguitu aghju inseritu i nomi di i metudi di sta biblioteca, chì sò descritti ccà.

Cunnette u registru à l'istanza nifi usendu

nipyapi.versioning.create_registry_client

À questu passu, pudete ancu aghjunghje un verificatu chì u registru hè digià aghjuntu à l'istanza; per questu pudete aduprà u metudu

nipyapi.versioning.list_registry_clients

Truvemu u bucket per più ricerca di flussu in a cesta

nipyapi.versioning.get_registry_bucket

Utilizendu u bucket trovu, circhemu u flussu

nipyapi.versioning.get_flow_in_bucket

In seguitu, hè impurtante per capisce chì stu gruppu di prucessu hè digià aghjuntu. U gruppu di Prucessu hè piazzatu secondu e coordenate è una situazione pò esce quandu un secondu cumpunente hè superpostu nantu à unu. Aghju verificatu, questu pò succede :) Per uttene tutti i gruppi di prucessu aghjuntu, avemu aduprà u metudu

nipyapi.canvas.list_all_process_groups

Pudemu più ricerca, per esempiu, per nome.

Ùn aghju micca descrizzione di u prucessu di aghjurnà u mudellu, solu diceraghju chì se i prucessori sò aghjuntu in a nova versione di u mudellu, ùn ci sò micca prublemi cù a presenza di messagi in fila. Ma se i prucessori sò sguassati, ponu esse prublemi (nifi ùn permettenu micca di sguassà un processore se una fila di messagi s'hè accumulata davanti). Sè vo site interessatu in quantu aghju risoltu stu prublema, per piacè scrivitemi è discuteremu di stu prublema. Cuntatti à a fine di l'articulu. Andemu à u passu di aghjunghje un gruppu di prucessu.

Quandu debugging u script, aghju scontru una peculiarità chì l'ultima versione di u flussu ùn hè micca sempre tirata, cusì vi cunsigliu di verificà prima sta versione:

nipyapi.versioning.get_latest_flow_ver

Impulsà u gruppu di prucessu:

nipyapi.versioning.deploy_flow_version

Cuminciamu i processori:

nipyapi.canvas.schedule_process_group

In u bloccu circa CLI hè statu scrittu chì u trasferimentu di dati ùn hè micca automaticamente attivatu in u gruppu di prucessu remota? Quandu implementa u script, aghju ancu scontru stu prublema. À quellu tempu, ùn aghju micca pussutu inizià a trasferimentu di dati cù l'API è decisu di scrive à u sviluppatore di a biblioteca NiPyAPI è dumandà cunsiglii / aiutu. U sviluppatore hà rispostu à mè, avemu discututu u prublema è hà scrittu chì avia bisognu di tempu per "verificà qualcosa". È dopu, un paru di ghjorni dopu, ghjunghje una lettera in quale hè scritta una funzione in Python chì risolve u mo prublema di lanciamentu !!! À quellu tempu, a versione NiPyAPI era 0.13.3 è, sicuru, ùn ci era nunda cusì. Ma in a versione 0.14.0, chì hè stata liberata pocu pocu tempu, sta funzione era digià inclusa in a biblioteca. scuntrà,

nipyapi.canvas.set_remote_process_group_transmission

Allora, usendu a libreria NiPyAPI, avemu cunnessu u registru, stendu u flussu, è ancu cuminciatu i processori è u trasferimentu di dati. Allora pudete pettine u codice, aghjunghje tutti i tipi di cuntrolli, logging, è questu hè tuttu. Ma hè una storia completamente diversa.

Di l'opzioni d'automatizazione chì aghju cunsideratu, l'ultima mi pareva u più efficace. Prima, questu hè sempre codice python, in quale pudete incrustà u codice di u prugramma ausiliariu è prufittà di tutti i benefici di a lingua di prugrammazione. Siconda, u prughjettu NiPyAPI hè attivamente sviluppatu è in casu di prublemi pudete scrive à u sviluppatore. In terzu, NiPyAPI hè sempre un strumentu più flexible per interagisce cù NiFi per risolve i prublemi cumplessi. Per esempiu, in determinà se e file di messagi sò avà vacanti in u flussu è se u gruppu di prucessu pò esse aghjurnatu.

Eccu tuttu. Aghju descrittu 3 approcci per l'automatizazione di a spedizione di flussu in NiFi, trappule chì un sviluppatore pò scontru, è furnitu codice di travagliu per automatizà a consegna. Sè site interessatu in questu tema cum'è mè - scrivi!

Source: www.habr.com

Add a comment