Automatizarea livrării fluxului în Apache NiFi

Bună ziua tuturor!

Automatizarea livrării fluxului în Apache NiFi

Sarcina este următoarea - există un flux afișat în imaginea de mai sus, care trebuie să fie implementat pe N servere cu Apache NiFi. Test de flux - un fișier este generat și trimis către o altă instanță NiFi. Transferul de date are loc folosind protocolul NiFi Site to Site.

NiFi Site to Site (S2S) este o modalitate sigură, foarte personalizabilă de a transfera date între instanțe NiFi. Vezi cum funcționează S2S documentație și este important să vă amintiți să vă configurați instanța NiFi pentru a permite S2S să vadă aici.

Când vine vorba de transferul de date folosind S2S, o instanță este numită client, a doua este un server. Clientul trimite date, serverul le primește. Două moduri de a configura transferul de date între ele:

  1. Împinge. Datele sunt trimise de la instanța client folosind un Remote Process Group (RPG). Pe instanța serverului, datele sunt primite utilizând portul de intrare
  2. Trage. Serverul primește date folosind RPG-ul, clientul trimite folosind portul de ieșire.


Fluxul pentru rulare este stocat în Registrul Apache.

Apache NiFi Registry este un subproiect al Apache NiFi care oferă un instrument de stocare în flux și de versiuni. Un fel de GIT. Informații despre instalarea, configurarea și lucrul cu registry pot fi găsite în documentație oficială. Fluxul pentru stocare este combinat într-un grup de procese și stocat în registru în acest formular. Vom reveni la asta mai târziu în articol.

La început, când N este un număr mic, fluxul este livrat și actualizat manual într-un timp rezonabil.

Dar pe măsură ce N crește, există mai multe probleme:

  1. este nevoie de mai mult timp pentru a actualiza fluxul. Trebuie să accesați toate serverele
  2. există erori la actualizarea șabloanelor. Aici au actualizat, dar aici au uitat
  3. eroare umană la efectuarea unui număr mare de operații similare

Toate acestea ne aduc la faptul că este necesară automatizarea procesului. Am încercat următoarele moduri de a rezolva această problemă:

  1. Utilizați MiNiFi în loc de NiFi
  2. NiFi CLI
  3. NiPyAPI

Folosind MiNiFi

ApacheMiNify este un subproiect al Apache NiFi. MiNiFy este un agent compact care folosește aceleași procesoare ca NiFi, permițându-vă să creați același flux ca în NiFi. Lejeritatea agentului se realizează, printre altele, datorită faptului că MiNiFy nu are o interfață grafică pentru configurarea fluxului. Lipsa unei interfețe grafice a lui MiNiFy înseamnă că este necesar să se rezolve problema livrării fluxului în minifi. Deoarece MiNiFy este utilizat în mod activ în IOT, există multe componente și procesul de livrare a fluxului către instanțele finale minifi trebuie automatizat. O sarcină familiară, nu?

Un alt subproiect, MiNiFi C2 Server, va ajuta la rezolvarea acestei probleme. Acest produs este destinat să fie punctul central în arhitectura de implementare. Cum se configurează mediul - descris în acest articol pe Habré și informațiile sunt suficiente pentru a rezolva problema. MiNiFi împreună cu serverul C2 își actualizează automat configurația. Singurul dezavantaj al acestei abordări este că trebuie să creați șabloane pe serverul C2, un simplu commit în registru nu este suficient.

Opțiunea descrisă în articolul de mai sus funcționează și nu este dificil de implementat, dar nu trebuie să uităm următoarele:

  1. minifi nu are toate procesoarele de la nifi
  2. Versiunile CPU din Minifi sunt în urmă față de versiunile CPU din NiFi.

La momentul scrierii, cea mai recentă versiune de NiFi este 1.9.2. Versiunea de procesor a celei mai recente versiuni MiNiFi este 1.7.0. Procesoarele pot fi adăugate la MiNiFi, dar din cauza discrepanțelor de versiune dintre procesoarele NiFi și MiNiFi, este posibil ca acest lucru să nu funcționeze.

NiFi CLI

Judecând după Descriere instrument de pe site-ul oficial, acesta este un instrument pentru automatizarea interacțiunii dintre NiFI și NiFi Registry în domeniul livrării fluxului sau al managementului procesului. Descărcați acest instrument pentru a începe. prin urmare.

Rulați utilitarul

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Pentru a încărca fluxul necesar din registru, trebuie să cunoaștem identificatorii coșului (identificatorul găleții) ​​și fluxul în sine (identificatorul fluxului). Aceste date pot fi obținute fie prin cli, fie în interfața web a registrului NiFi. Interfața web arată astfel:

Automatizarea livrării fluxului în Apache NiFi

Folosind CLI, faceți acest lucru:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Rulați grupul de procese de import din registru:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un punct important este că orice instanță nifi poate fi specificată ca gazdă pe care rulăm grupul de procese.

Grup de procese adăugat cu procesoare oprite, acestea trebuie pornite

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Super, procesoarele au pornit. Cu toate acestea, în funcție de condițiile problemei, avem nevoie de instanțe NiFi pentru a trimite date către alte instanțe. Să presupunem că metoda Push a fost aleasă pentru a transfera date pe server. Pentru a organiza transferul de date, este necesar să activați transferul de date (Activați transmiterea) pe grupul de procese la distanță (RPG) adăugat, care este deja inclus în fluxul nostru.

Automatizarea livrării fluxului în Apache NiFi

În documentația din CLI și din alte surse, nu am găsit o modalitate de a activa transferul de date. Dacă știți cum să faceți acest lucru, vă rugăm să scrieți în comentarii.

Din moment ce avem bash și suntem gata să mergem până la capăt, vom găsi o cale de ieșire! Puteți utiliza API-ul NiFi pentru a rezolva această problemă. Să folosim următoarea metodă, luăm ID-ul din exemplele de mai sus (în cazul nostru este 7f522a13-016e-1000-e504-d5b15587f2f3). Descrierea metodelor NiFi API aici.

Automatizarea livrării fluxului în Apache NiFi
În body, trebuie să treceți JSON, de următoarea formă:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametri care trebuie completați pentru a „funcționa”:
de stat — starea transferului de date. Disponibil TRANSMITING pentru a activa transferul de date, STOPED pentru a dezactiva
versiune - versiunea procesorului

versiunea va fi implicit 0 când este creată, dar acești parametri pot fi obținuți folosind metoda

Automatizarea livrării fluxului în Apache NiFi

Pentru iubitorii de scripturi bash, această metodă poate părea potrivită, dar pentru mine este greu - scripturile bash nu sunt preferatele mele. Următorul mod este mai interesant și mai convenabil în opinia mea.

NiPyAPI

NiPyAPI este o bibliotecă Python pentru interacțiunea cu instanțe NiFi. Pagina de documentație conține informațiile necesare pentru a lucra cu biblioteca. Pornirea rapidă este descrisă în proiect pe github.

Scriptul nostru pentru lansarea configurației este un program Python. Să trecem la codificare.
Configurați configurațiile pentru lucrări ulterioare. Vom avea nevoie de următorii parametri:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Mai departe voi introduce numele metodelor acestei biblioteci, care sunt descrise aici.

Conectam registry la instanța nifi folosind

nipyapi.versioning.create_registry_client

La acest pas, puteți adăuga și o verificare că registrul a fost deja adăugat la instanță, pentru aceasta puteți utiliza metoda

nipyapi.versioning.list_registry_clients

Găsim găleata pentru a căuta în continuare debitul din coș

nipyapi.versioning.get_registry_bucket

Conform găleții găsite, căutăm debit

nipyapi.versioning.get_flow_in_bucket

În continuare, este important să înțelegeți dacă acest grup de procese a fost deja adăugat. Grupul de proces este plasat după coordonate și poate apărea o situație când un al doilea este suprapus peste unul. Am verificat, se poate 🙂 Pentru a obține tot grupul de procese adăugat, utilizați metoda

nipyapi.canvas.list_all_process_groups

și apoi putem căuta, de exemplu, după nume.

Nu voi descrie procesul de actualizare a șablonului, voi spune doar că dacă procesoarele sunt adăugate în noua versiune a șablonului, atunci nu există probleme cu prezența mesajelor în cozi. Dar dacă procesoarele sunt îndepărtate, atunci pot apărea probleme (nifi nu permite eliminarea procesorului dacă s-a acumulat o coadă de mesaje în fața lui). Dacă sunteți interesat de modul în care am rezolvat această problemă - scrieți-mi, vă rog, vom discuta acest punct. Contacte la finalul articolului. Să trecem la pasul de adăugare a unui grup de procese.

La depanarea scriptului, am dat peste o caracteristică că cea mai recentă versiune a fluxului nu este întotdeauna extrasă, așa că vă recomand să clarificați mai întâi această versiune:

nipyapi.versioning.get_latest_flow_ver

Implementează grupul de procese:

nipyapi.versioning.deploy_flow_version

Pornim procesoarele:

nipyapi.canvas.schedule_process_group

În blocul despre CLI, a fost scris că transferul de date nu este activat automat în grupul de procese la distanță? La implementarea scriptului, am întâlnit și această problemă. La acel moment, nu puteam începe transferul de date folosind API-ul și am decis să scriu dezvoltatorului bibliotecii NiPyAPI și să cer sfat/ajutor. Dezvoltatorul mi-a răspuns, am discutat problema și a scris că are nevoie de timp pentru a „verifica ceva”. Și acum, câteva zile mai târziu, sosește un e-mail în care este scrisă o funcție Python care îmi rezolvă problema la pornire !!! La acel moment, versiunea NiPyAPI era 0.13.3 și, bineînțeles, nu exista nimic de acest fel în ea. Dar în versiunea 0.14.0, care a fost lansată destul de recent, această funcție a fost deja inclusă în bibliotecă. Întâlni

nipyapi.canvas.set_remote_process_group_transmission

Deci, cu ajutorul bibliotecii NiPyAPI, am conectat registrul, am rulat fluxul și chiar am început procesoarele și transferul de date. Apoi puteți pieptăna codul, adăugați tot felul de verificări, logare și atât. Dar asta este o cu totul altă poveste.

Dintre opțiunile de automatizare pe care le-am luat în considerare, cea din urmă mi s-a părut cea mai eficientă. În primul rând, acesta este încă cod Python, în care puteți încorpora codul de program auxiliar și vă puteți bucura de toate beneficiile unui limbaj de programare. În al doilea rând, proiectul NiPyAPI se dezvoltă activ și, în caz de probleme, puteți scrie dezvoltatorului. În al treilea rând, NiPyAPI este încă un instrument mai flexibil pentru interacțiunea cu NiFi în rezolvarea problemelor complexe. De exemplu, pentru a determina dacă cozile de mesaje sunt în prezent goale în flux și dacă este posibil să se actualizeze grupul de procese.

Asta e tot. Am descris 3 abordări pentru automatizarea livrării fluxului în NiFi, capcanele pe care le poate întâlni un dezvoltator și am furnizat un cod de lucru pentru automatizarea livrării. Dacă ești la fel de interesat de acest subiect ca și mine - scrie!

Sursa: www.habr.com

Adauga un comentariu