Srauto pristatymo automatizavimas „Apache NiFi“.

Sveiki visi!

Srauto pristatymo automatizavimas „Apache NiFi“.

Užduotis yra tokia - yra srautas, pateiktas aukščiau esančiame paveikslėlyje, kurį reikia įdiegti į N serverius su Apache NiFi. Srauto testas – failas generuojamas ir siunčiamas į kitą NiFi egzempliorių. Duomenų perdavimas vyksta naudojant NiFi Site to Site protokolą.

„NiFi Site to Site“ (S2S) yra saugus, lengvai konfigūruojamas būdas perkelti duomenis tarp NiFi egzempliorių. Kaip veikia S2S, žr dokumentacija ir svarbu nepamiršti sukonfigūruoti NiFi egzemplioriaus, kad būtų leista S2S, žr čia.

Tais atvejais, kai kalbame apie duomenų perdavimą naudojant S2S, vienas egzempliorius vadinamas klientu, antrasis serveris. Klientas siunčia duomenis, serveris gauna. Du būdai, kaip konfigūruoti duomenų perdavimą tarp jų:

  1. Stumti. Iš kliento egzemplioriaus duomenys siunčiami naudojant nuotolinio proceso grupę (RPG). Serverio egzemplioriuje duomenys gaunami naudojant įvesties prievadą
  2. Traukti. Serveris gauna duomenis naudodamas RPG, klientas siunčia naudodamas išvesties prievadą.


Išleidimo srautas saugomas „Apache“ registre.

„Apache NiFi Registry“ yra „Apache NiFi“ subprojektas, teikiantis srauto saugojimo ir versijų valdymo įrankį. Savotiškas GIT. Informaciją apie registro diegimą, konfigūravimą ir darbą su juo galite rasti oficialius dokumentus. Saugojimas srautas sujungiamas į procesų grupę ir tokia forma saugomas registre. Prie to grįšime vėliau straipsnyje.

Pradžioje, kai N yra mažas skaičius, srautas pristatomas ir atnaujinamas rankiniu būdu per priimtiną laiką.

Tačiau augant N, problemų daugėja:

  1. srautui atnaujinti reikia daugiau laiko. Turite prisijungti prie visų serverių
  2. Atsiranda šablono atnaujinimo klaidų. Čia jie jį atnaujino, bet čia pamiršo
  3. žmogiškosios klaidos, kai atliekama daug panašių operacijų

Visa tai veda prie to, kad turime automatizuoti procesą. Bandžiau išspręsti šią problemą šiais būdais:

  1. Vietoj NiFi naudokite „MiNiFi“.
  2. NiFi CLI
  3. NiPyAPI

Naudojant „MiNiFi“.

Apache MiNiFy - Apache NiFi subprojektas. „MiNiFy“ yra kompaktiška priemonė, kuri naudoja tuos pačius procesorius kaip „NiFi“, leidžianti sukurti tokius pat srautus kaip ir „NiFi“. Lengvas agento pobūdis pasiekiamas, be kita ko, dėl to, kad „MiNiFy“ neturi srauto konfigūravimo grafinės sąsajos. Grafinės sąsajos trūkumas „MiNiFy“ reiškia, kad būtina išspręsti srauto perdavimo į minifi problemą. Kadangi „MiNiFy“ aktyviai naudojamas IOT, yra daug komponentų, o srauto pateikimo galutiniams minifigūbriams procesas turi būti automatizuotas. Pažįstama užduotis, tiesa?

Šią problemą padės išspręsti kitas subprojektas – „MiNiFi C2 Server“. Šis produktas turi būti pagrindinis konfigūracijos išleidimo architektūros taškas. Kaip sukonfigūruoti aplinką - aprašyta Šis straipsnis Yra pakankamai informacijos apie Habré, kad išspręstumėte problemą. „MiNiFi“ kartu su serveriu C2 automatiškai atnaujina savo konfigūraciją. Vienintelis šio metodo trūkumas yra tas, kad jūs turite sukurti šablonus C2 serveryje, neužtenka paprasto įsipareigojimo registrui.

Aukščiau esančiame straipsnyje aprašyta parinktis veikia ir ją įgyvendinti nėra sunku, tačiau nereikia pamiršti šių dalykų:

  1. „Minifi“ neturi visų „nifi“ procesorių
  2. Minifi procesoriaus versijos atsilieka nuo NiFi procesoriaus versijų.

Rašymo metu naujausia NiFi versija yra 1.9.2. Naujausia „MiNiFi“ procesoriaus versija yra 1.7.0. Procesorius galima pridėti prie MiNiFi, tačiau dėl versijų skirtumų tarp NiFi ir MiNiFi procesorių tai gali neveikti.

NiFi CLI

Sprendžiant iš apibūdinimas įrankis oficialioje svetainėje, tai yra įrankis, skirtas automatizuoti NiFI ir NiFi registro sąveiką srauto tiekimo ar procesų valdymo srityje. Norėdami pradėti, turite atsisiųsti šį įrankį. taigi.

Paleiskite programą

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Kad galėtume įkelti reikiamą srautą iš registro, turime žinoti segmento (sąrašo identifikatorius) ir paties srauto (srauto identifikatorius) identifikatorius. Šiuos duomenis galima gauti naudojant cli arba NiFi registro žiniatinklio sąsają. Žiniatinklio sąsajoje tai atrodo taip:

Srauto pristatymo automatizavimas „Apache NiFi“.

Naudojant CLI tai daroma:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Pradedame importuoti procesų grupę iš registro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Svarbus dalykas yra tai, kad bet kurį nifi egzempliorių galima nurodyti kaip pagrindinį kompiuterį, į kurį nukreipiame proceso grupę.

Pridėta procesų grupė su sustabdytais procesoriais, juos reikia paleisti

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Puiku, procesoriai paleisti. Tačiau pagal užduoties sąlygas mums reikia NiFi egzempliorių, kad galėtume siųsti duomenis į kitus atvejus. Tarkime, kad duomenų perdavimui į serverį pasirinkote Push metodą. Norėdami organizuoti duomenų perdavimą, turite įjungti duomenų perdavimą pridėtoje nuotolinio proceso grupėje (RPG), kuri jau įtraukta į mūsų srautą.

Srauto pristatymo automatizavimas „Apache NiFi“.

CLI dokumentacijoje ir kituose šaltiniuose neradau būdo, kaip įjungti duomenų perdavimą. Jei žinote, kaip tai padaryti, parašykite komentaruose.

Kadangi turime bash ir esame pasiruošę eiti iki galo, rasime išeitį! Norėdami išspręsti šią problemą, galite naudoti NiFi API. Naudokime šį metodą, paimkime ID iš anksčiau pateiktų pavyzdžių (mūsų atveju jis yra 7f522a13-016e-1000-e504-d5b15587f2f3). NiFi API metodų aprašymas čia.

Srauto pristatymo automatizavimas „Apache NiFi“.
Turinyje turite perduoti JSON, pavyzdžiui:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametrai, kuriuos reikia užpildyti, kad jis veiktų:
valstybės — duomenų perdavimo būsena. Galima: PERDAVIMAS, kad įgalintumėte duomenų perdavimą, STOPPED, kad išjungtumėte
versija - procesoriaus versija

versija bus sukurta pagal nutylėjimą 0, tačiau šiuos parametrus galima gauti naudojant šį metodą

Srauto pristatymo automatizavimas „Apache NiFi“.

Bash scenarijų gerbėjams šis metodas gali atrodyti tinkamas, bet man tai šiek tiek sunku - bash scenarijai nėra mano mėgstamiausi. Kitas būdas, mano nuomone, yra įdomesnis ir patogesnis.

NiPyAPI

NiPyAPI yra Python biblioteka, skirta bendrauti su NiFi egzemplioriais. Dokumentacijos puslapis yra darbui su biblioteka reikalinga informacija. Greita pradžia aprašyta juodraštį github'e.

Mūsų scenarijus, skirtas konfigūracijos išleidimui, yra programa Python. Pereikime prie kodavimo.
Sukūrėme konfigūracijas tolimesniam darbui. Mums reikės šių parametrų:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Toliau įterpsiu šios bibliotekos metodų pavadinimus, kurie aprašyti čia.

Prijunkite registrą prie nifi egzemplioriaus naudodami

nipyapi.versioning.create_registry_client

Šiame veiksme taip pat galite patikrinti, ar registras jau buvo įtrauktas į egzempliorių, todėl galite naudoti šį metodą

nipyapi.versioning.list_registry_clients

Mes randame kibirą tolesnei srauto paieškai krepšelyje

nipyapi.versioning.get_registry_bucket

Naudodami rastą kibirą ieškome srauto

nipyapi.versioning.get_flow_in_bucket

Be to, svarbu suprasti, ar ši proceso grupė jau buvo įtraukta. Procesų grupė dedama pagal koordinates ir gali susidaryti situacija, kai ant vieno komponento yra uždėtas antrasis komponentas. Patikrinau, taip gali atsitikti :) Norėdami gauti visas pridėtas procesų grupes, naudojame metodą

nipyapi.canvas.list_all_process_groups

Galime toliau ieškoti, pavyzdžiui, pagal pavadinimą.

Šablono atnaujinimo proceso neaprašysiu, pasakysiu tik tiek, kad jei naujoje šablono versijoje bus pridėti procesoriai, tai nekyla problemų dėl pranešimų buvimo eilėse. Bet jei procesoriai pašalinami, gali kilti problemų (nifi neleidžia pašalinti procesoriaus, jei prieš jį susikaupė pranešimų eilė). Jei jus domina kaip aš išsprendžiau šią problemą, parašykite man ir mes aptarsime šį klausimą. Kontaktai straipsnio pabaigoje. Pereikime prie proceso grupės pridėjimo veiksmo.

Derindamas scenarijų, susidūriau su ypatumu, kad naujausia srauto versija ne visada ištraukiama, todėl rekomenduoju pirmiausia patikrinti šią versiją:

nipyapi.versioning.get_latest_flow_ver

Diegti proceso grupę:

nipyapi.versioning.deploy_flow_version

Paleidžiame procesorius:

nipyapi.canvas.schedule_process_group

Bloke apie CLI buvo parašyta, kad nuotolinio proceso grupėje duomenų perdavimas nėra automatiškai įjungtas? Diegiant scenarijų aš taip pat susidūriau su šia problema. Tuo metu man nepavyko pradėti duomenų perdavimo naudojant API ir nusprendžiau parašyti NiPyAPI bibliotekos kūrėjui ir paprašyti patarimo/pagalbos. Kūrėjas man atsakė, aptarėme problemą ir jis parašė, kad jam reikia laiko "kažkam patikrinti". Ir tada po poros dienų ateina laiškas, kuriame Python parašyta funkcija, kuri išsprendžia mano paleidimo problemą!!! Tuo metu NiPyAPI versija buvo 0.13.3 ir, žinoma, nieko panašaus nebuvo. Tačiau 0.14.0 versijoje, kuri buvo išleista visai neseniai, ši funkcija jau buvo įtraukta į biblioteką. Susitikti,

nipyapi.canvas.set_remote_process_group_transmission

Taigi, naudodami NiPyAPI biblioteką, sujungėme registrą, išleidome srautą ir net pradėjome procesorius bei duomenų perdavimą. Tada galite sušukuoti kodą, pridėti visokių patikrinimų, registravimo ir viskas. Bet tai visiškai kita istorija.

Iš mano svarstytų automatizavimo variantų paskutinis man pasirodė efektyviausias. Pirma, tai vis dar yra python kodas, į kurį galite įterpti pagalbinės programos kodą ir pasinaudoti visais programavimo kalbos pranašumais. Antra, NiPyAPI projektas aktyviai vystosi ir iškilus problemoms galite rašyti kūrėjui. Trečia, NiPyAPI vis dar yra lankstesnis įrankis bendrauti su NiFi sprendžiant sudėtingas problemas. Pavyzdžiui, nustatant, ar pranešimų eilės dabar yra tuščios sraute ir ar galima atnaujinti procesų grupę.

Tai viskas. Aprašiau 3 metodus, kaip automatizuoti srauto pristatymą naudojant NiFi, spąstus, su kuriais gali susidurti kūrėjas, ir pateikiau veikiantį pristatymo automatizavimo kodą. Jei jus taip pat domina ši tema kaip aš - parašyk!

Šaltinis: www.habr.com

Pirkite patikimą prieglobą svetainėms su DDoS apsauga, VPS VDS serveriais 🔥 Įsigykite patikimą svetainių talpinimą su DDoS apsauga, VPS VDS serveriais | ProHoster