Bună ziua tuturor!
Sarcina este următoarea - există un flux afișat în imaginea de mai sus, care trebuie să fie implementat pe N servere cu
NiFi Site to Site (S2S) este o modalitate sigură, foarte personalizabilă de a transfera date între instanțe NiFi. Vezi cum funcționează S2S
Când vine vorba de transferul de date folosind S2S, o instanță este numită client, a doua este un server. Clientul trimite date, serverul le primește. Două moduri de a configura transferul de date între ele:
- Împinge. Datele sunt trimise de la instanța client folosind un Remote Process Group (RPG). Pe instanța serverului, datele sunt primite utilizând portul de intrare
- Trage. Serverul primește date folosind RPG-ul, clientul trimite folosind portul de ieșire.
Fluxul pentru rulare este stocat în Registrul Apache.
Apache NiFi Registry este un subproiect al Apache NiFi care oferă un instrument de stocare în flux și de versiuni. Un fel de GIT. Informații despre instalarea, configurarea și lucrul cu registry pot fi găsite în
La început, când N este un număr mic, fluxul este livrat și actualizat manual într-un timp rezonabil.
Dar pe măsură ce N crește, există mai multe probleme:
- este nevoie de mai mult timp pentru a actualiza fluxul. Trebuie să accesați toate serverele
- există erori la actualizarea șabloanelor. Aici au actualizat, dar aici au uitat
- eroare umană la efectuarea unui număr mare de operații similare
Toate acestea ne aduc la faptul că este necesară automatizarea procesului. Am încercat următoarele moduri de a rezolva această problemă:
- Utilizați MiNiFi în loc de NiFi
- NiFi CLI
- NiPyAPI
Folosind MiNiFi
Un alt subproiect, MiNiFi C2 Server, va ajuta la rezolvarea acestei probleme. Acest produs este destinat să fie punctul central în arhitectura de implementare. Cum se configurează mediul - descris în
Opțiunea descrisă în articolul de mai sus funcționează și nu este dificil de implementat, dar nu trebuie să uităm următoarele:
- minifi nu are toate procesoarele de la nifi
- Versiunile CPU din Minifi sunt în urmă față de versiunile CPU din NiFi.
La momentul scrierii, cea mai recentă versiune de NiFi este 1.9.2. Versiunea de procesor a celei mai recente versiuni MiNiFi este 1.7.0. Procesoarele pot fi adăugate la MiNiFi, dar din cauza discrepanțelor de versiune dintre procesoarele NiFi și MiNiFi, este posibil ca acest lucru să nu funcționeze.
NiFi CLI
Judecând după
Rulați utilitarul
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Pentru a încărca fluxul necesar din registru, trebuie să cunoaștem identificatorii coșului (identificatorul găleții) și fluxul în sine (identificatorul fluxului). Aceste date pot fi obținute fie prin cli, fie în interfața web a registrului NiFi. Interfața web arată astfel:
Folosind CLI, faceți acest lucru:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Rulați grupul de procese de import din registru:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Un punct important este că orice instanță nifi poate fi specificată ca gazdă pe care rulăm grupul de procese.
Grup de procese adăugat cu procesoare oprite, acestea trebuie pornite
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, procesoarele au pornit. Cu toate acestea, în funcție de condițiile problemei, avem nevoie de instanțe NiFi pentru a trimite date către alte instanțe. Să presupunem că metoda Push a fost aleasă pentru a transfera date pe server. Pentru a organiza transferul de date, este necesar să activați transferul de date (Activați transmiterea) pe grupul de procese la distanță (RPG) adăugat, care este deja inclus în fluxul nostru.
În documentația din CLI și din alte surse, nu am găsit o modalitate de a activa transferul de date. Dacă știți cum să faceți acest lucru, vă rugăm să scrieți în comentarii.
Din moment ce avem bash și suntem gata să mergem până la capăt, vom găsi o cale de ieșire! Puteți utiliza API-ul NiFi pentru a rezolva această problemă. Să folosim următoarea metodă, luăm ID-ul din exemplele de mai sus (în cazul nostru este 7f522a13-016e-1000-e504-d5b15587f2f3). Descrierea metodelor NiFi API
În body, trebuie să treceți JSON, de următoarea formă:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametri care trebuie completați pentru a „funcționa”:
de stat — starea transferului de date. Disponibil TRANSMITING pentru a activa transferul de date, STOPED pentru a dezactiva
versiune - versiunea procesorului
versiunea va fi implicit 0 când este creată, dar acești parametri pot fi obținuți folosind metoda
Pentru iubitorii de scripturi bash, această metodă poate părea potrivită, dar pentru mine este greu - scripturile bash nu sunt preferatele mele. Următorul mod este mai interesant și mai convenabil în opinia mea.
NiPyAPI
NiPyAPI este o bibliotecă Python pentru interacțiunea cu instanțe NiFi.
Scriptul nostru pentru lansarea configurației este un program Python. Să trecem la codificare.
Configurați configurațiile pentru lucrări ulterioare. Vom avea nevoie de următorii parametri:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Mai departe voi introduce numele metodelor acestei biblioteci, care sunt descrise
Conectam registry la instanța nifi folosind
nipyapi.versioning.create_registry_client
La acest pas, puteți adăuga și o verificare că registrul a fost deja adăugat la instanță, pentru aceasta puteți utiliza metoda
nipyapi.versioning.list_registry_clients
Găsim găleata pentru a căuta în continuare debitul din coș
nipyapi.versioning.get_registry_bucket
Conform găleții găsite, căutăm debit
nipyapi.versioning.get_flow_in_bucket
În continuare, este important să înțelegeți dacă acest grup de procese a fost deja adăugat. Grupul de proces este plasat după coordonate și poate apărea o situație când un al doilea este suprapus peste unul. Am verificat, se poate 🙂 Pentru a obține tot grupul de procese adăugat, utilizați metoda
nipyapi.canvas.list_all_process_groups
și apoi putem căuta, de exemplu, după nume.
Nu voi descrie procesul de actualizare a șablonului, voi spune doar că dacă procesoarele sunt adăugate în noua versiune a șablonului, atunci nu există probleme cu prezența mesajelor în cozi. Dar dacă procesoarele sunt îndepărtate, atunci pot apărea probleme (nifi nu permite eliminarea procesorului dacă s-a acumulat o coadă de mesaje în fața lui). Dacă sunteți interesat de modul în care am rezolvat această problemă - scrieți-mi, vă rog, vom discuta acest punct. Contacte la finalul articolului. Să trecem la pasul de adăugare a unui grup de procese.
La depanarea scriptului, am dat peste o caracteristică că cea mai recentă versiune a fluxului nu este întotdeauna extrasă, așa că vă recomand să clarificați mai întâi această versiune:
nipyapi.versioning.get_latest_flow_ver
Implementează grupul de procese:
nipyapi.versioning.deploy_flow_version
Pornim procesoarele:
nipyapi.canvas.schedule_process_group
În blocul despre CLI, a fost scris că transferul de date nu este activat automat în grupul de procese la distanță? La implementarea scriptului, am întâlnit și această problemă. La acel moment, nu puteam începe transferul de date folosind API-ul și am decis să scriu dezvoltatorului bibliotecii NiPyAPI și să cer sfat/ajutor. Dezvoltatorul mi-a răspuns, am discutat problema și a scris că are nevoie de timp pentru a „verifica ceva”. Și acum, câteva zile mai târziu, sosește un e-mail în care este scrisă o funcție Python care îmi rezolvă problema la pornire !!! La acel moment, versiunea NiPyAPI era 0.13.3 și, bineînțeles, nu exista nimic de acest fel în ea. Dar în versiunea 0.14.0, care a fost lansată destul de recent, această funcție a fost deja inclusă în bibliotecă. Întâlni
nipyapi.canvas.set_remote_process_group_transmission
Deci, cu ajutorul bibliotecii NiPyAPI, am conectat registrul, am rulat fluxul și chiar am început procesoarele și transferul de date. Apoi puteți pieptăna codul, adăugați tot felul de verificări, logare și atât. Dar asta este o cu totul altă poveste.
Dintre opțiunile de automatizare pe care le-am luat în considerare, cea din urmă mi s-a părut cea mai eficientă. În primul rând, acesta este încă cod Python, în care puteți încorpora codul de program auxiliar și vă puteți bucura de toate beneficiile unui limbaj de programare. În al doilea rând, proiectul NiPyAPI se dezvoltă activ și, în caz de probleme, puteți scrie dezvoltatorului. În al treilea rând, NiPyAPI este încă un instrument mai flexibil pentru interacțiunea cu NiFi în rezolvarea problemelor complexe. De exemplu, pentru a determina dacă cozile de mesaje sunt în prezent goale în flux și dacă este posibil să se actualizeze grupul de procese.
Asta e tot. Am descris 3 abordări pentru automatizarea livrării fluxului în NiFi, capcanele pe care le poate întâlni un dezvoltator și am furnizat un cod de lucru pentru automatizarea livrării. Dacă ești la fel de interesat de acest subiect ca și mine -
Sursa: www.habr.com