Automatizácia dodávky toku v Apache NiFi

Ahoj všetci!

Automatizácia dodávky toku v Apache NiFi

Úloha je nasledovná - existuje tok, znázornený na obrázku vyššie, ktorý je potrebné zaviesť na N servery s Apache NiFi. Test toku – generuje sa súbor a odosiela sa do inej inštancie NiFi. Prenos údajov prebieha pomocou protokolu NiFi Site to Site.

NiFi Site to Site (S2S) je bezpečný, ľahko konfigurovateľný spôsob prenosu údajov medzi inštanciami NiFi. Ako funguje S2S, pozri dokumentáciu a dôležité je nezabudnúť nakonfigurovať inštanciu NiFi na povolenie S2S, viď tu.

V prípadoch, keď hovoríme o prenose dát pomocou S2S, jedna inštancia sa nazýva klient, druhá server. Klient odosiela dáta, server ich prijíma. Dva spôsoby konfigurácie prenosu údajov medzi nimi:

  1. Tlačiť. Z inštancie klienta sa údaje odosielajú pomocou skupiny vzdialených procesov (RPG). Na inštancii servera sa údaje prijímajú pomocou vstupného portu
  2. Ťahať. Server prijíma dáta pomocou RPG, klient odosiela cez Output port.


Tok na zavedenie je uložený v registri Apache.

Apache NiFi Registry je podprojekt Apache NiFi, ktorý poskytuje nástroj na ukladanie toku a kontrolu verzií. Akýsi GIT. Informácie o inštalácii, konfigurácii a práci s registrom nájdete v oficiálna dokumentácia. Tok pre ukladanie je spojený do skupiny procesov a uložený v tejto forme v registri. K tomu sa ešte v článku vrátime.

Na začiatku, keď je N malé číslo, sa prietok dodáva a aktualizuje manuálne v prijateľnom čase.

Ale ako N rastie, problémy sú čoraz početnejšie:

  1. aktualizácia toku si vyžaduje viac času. Musíte sa prihlásiť na všetky servery
  2. Vyskytujú sa chyby pri aktualizácii šablóny. Tu to aktualizovali, ale tu zabudli
  3. ľudské chyby pri vykonávaní veľkého množstva podobných operácií

To všetko nás privádza k tomu, že musíme proces automatizovať. Skúsil som tento problém vyriešiť nasledujúcimi spôsobmi:

  1. Namiesto NiFi použite MiNiFi
  2. NiFi CLI
  3. NiPyAPI

Pomocou MiNiFi

Apache MiNiFy - podprojekt Apache NiFi. MiNiFy je kompaktný agent, ktorý používa rovnaké procesory ako NiFi, čo vám umožňuje vytvárať rovnaké toky ako v NiFi. Odľahčený charakter agenta je dosiahnutý okrem iného aj tým, že MiNiFy nemá grafické rozhranie na konfiguráciu toku. Chýbajúce grafické rozhranie v MiNiFy znamená, že je potrebné vyriešiť problém s doručovaním toku do minifi. Keďže MiNiFy sa aktívne používa v IOT, existuje veľa komponentov a proces dodávania toku do finálnych minifi inštancií musí byť automatizovaný. Známa úloha, však?

Ďalší podprojekt pomôže vyriešiť tento problém - MiNiFi C2 Server. Tento produkt má byť ústredným bodom architektúry zavádzania konfigurácie. Ako nakonfigurovať prostredie - popísané v v tomto článku Na vyriešenie problému je o Habrém dostatok informácií. MiNiFi v spojení so serverom C2 automaticky aktualizuje svoju konfiguráciu. Jedinou nevýhodou tohto prístupu je, že musíte vytvárať šablóny na serveri C2; jednoduché odovzdanie do registra nestačí.

Možnosť opísaná v článku vyššie je funkčná a nie je náročná na implementáciu, ale nesmieme zabúdať na nasledovné:

  1. Minifi nema vsetky procesory od nifi
  2. Verzie procesorov Minifi zaostávajú za verziami procesorov NiFi.

V čase písania tohto článku je najnovšia verzia NiFi 1.9.2. Najnovšia verzia procesora MiNiFi je 1.7.0. K MiNiFi je možné pridať procesory, ale kvôli rozdielom vo verziách medzi procesormi NiFi a MiNiFi to nemusí fungovať.

NiFi CLI

Súdiac podľa popis nástroj na oficiálnej stránke, ide o nástroj na automatizáciu interakcie medzi NiFI a NiFi Registry v oblasti doručovania toku alebo riadenia procesov. Ak chcete začať, musíte si stiahnuť tento nástroj. preto.

Spustite pomôcku

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Aby sme mohli načítať požadovaný tok z registra, potrebujeme poznať identifikátory segmentu (identifikátor toku) a samotného toku (identifikátor toku). Tieto údaje je možné získať buď cez cli alebo vo webovom rozhraní registra NiFi. Vo webovom rozhraní to vyzerá takto:

Automatizácia dodávky toku v Apache NiFi

Pomocou CLI sa to robí:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Začneme importovať skupinu procesov z registra:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Dôležitým bodom je, že ľubovoľná inštancia nifi môže byť špecifikovaná ako hostiteľ, ktorému rolujeme skupinu procesov.

Pridaná skupina procesov so zastavenými procesormi, je potrebné ich spustiť

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Výborne, procesory sa rozbehli. Podľa podmienok úlohy však potrebujeme inštancie NiFi na odosielanie údajov do iných inštancií. Predpokladajme, že ste na prenos údajov na server zvolili metódu Push. Ak chcete organizovať prenos údajov, musíte povoliť prenos údajov na pridanej skupine vzdialených procesov (RPG), ktorá je už zahrnutá v našom toku.

Automatizácia dodávky toku v Apache NiFi

V dokumentácii v CLI a iných zdrojoch som nenašiel spôsob, ako povoliť prenos dát. Ak viete, ako na to, napíšte do komentárov.

Keďže máme bash a sme pripravení ísť až do konca, nájdeme cestu von! Na vyriešenie tohto problému môžete použiť rozhranie NiFi API. Použime nasledujúcu metódu, vezmite ID z príkladov vyššie (v našom prípade je to 7f522a13-016e-1000-e504-d5b15587f2f3). Popis metód NiFi API tu.

Automatizácia dodávky toku v Apache NiFi
V tele musíte odovzdať JSON takto:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametre, ktoré je potrebné vyplniť, aby to fungovalo:
stať — stav prenosu údajov. Dostupné: TRANSMITTING na povolenie prenosu dát, ZASTAVENÉ na deaktiváciu
verzia - verzia procesora

verzia bude pri vytvorení predvolene nastavená na 0, ale tieto parametre je možné získať pomocou tejto metódy

Automatizácia dodávky toku v Apache NiFi

Pre fanúšikov bash skriptov sa táto metóda môže zdať vhodná, ale pre mňa je to trochu ťažké - bash skripty nie sú moje obľúbené. Ďalšia metóda je podľa môjho názoru zaujímavejšia a pohodlnejšia.

NiPyAPI

NiPyAPI je knižnica Pythonu na interakciu s inštanciami NiFi. Stránka dokumentácie obsahuje potrebné informácie pre prácu s knižnicou. Rýchly štart je popísaný v projekt na githube.

Náš skript na zavedenie konfigurácie je program v Pythone. Prejdime ku kódovaniu.
Nastavili sme konfigurácie pre ďalšiu prácu. Budeme potrebovať nasledujúce parametre:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Ďalej vložím názvy metód tejto knižnice, ktoré sú popísané tu.

Pripojte register k inštancii nifi pomocou

nipyapi.versioning.create_registry_client

V tomto kroku môžete tiež pridať kontrolu, že register už bol pridaný do inštancie; na tento účel môžete použiť metódu

nipyapi.versioning.list_registry_clients

Vedro na ďalšie hľadanie prietoku nájdeme v košíku

nipyapi.versioning.get_registry_bucket

Pomocou nájdeného vedra hľadáme prietok

nipyapi.versioning.get_flow_in_bucket

Ďalej je dôležité pochopiť, či táto skupina procesov už bola pridaná. Skupina procesov je umiestnená podľa súradníc a môže nastať situácia, keď sa druhá zložka prekrýva s jednou zložkou. Skontroloval som, toto sa môže stať :) Na získanie všetkých pridaných skupín procesov používame metódu

nipyapi.canvas.list_all_process_groups

Ďalej môžeme hľadať napríklad podľa mena.

Nebudem popisovať proces aktualizácie šablóny, poviem len, že ak sa v novej verzii šablóny pridajú procesory, potom nie sú problémy s prítomnosťou správ vo frontoch. Ak sa však procesory odstránia, môžu nastať problémy (nifi vám neumožňuje odstrániť procesor, ak sa pred ním nahromadil front správ). Ak vás zaujíma, ako som tento problém vyriešil, napíšte mi a prediskutujeme tento problém. Kontakty na konci článku. Prejdime ku kroku pridania skupiny procesov.

Pri ladení skriptu som narazil na zvláštnosť, že nie vždy sa vytiahne najnovšia verzia toku, preto odporúčam najskôr skontrolovať túto verziu:

nipyapi.versioning.get_latest_flow_ver

Nasadiť skupinu procesov:

nipyapi.versioning.deploy_flow_version

Spustíme procesory:

nipyapi.canvas.schedule_process_group

V bloku o CLI bolo napísané, že v skupine vzdialených procesov nie je automaticky povolený prenos dát? Pri implementácii skriptu som narazil aj na tento problém. Vtedy sa mi nepodarilo spustiť prenos dát pomocou API a rozhodol som sa napísať vývojárovi knižnice NiPyAPI a požiadať o radu/pomoc. Developer mi odpovedal, diskutovali sme o probléme a napísal, že potrebuje čas „niečo skontrolovať“. A potom, o pár dní neskôr, príde list, v ktorom je napísaná funkcia v Pythone, ktorá rieši môj problém so spustením!!! Vtedy bola verzia NiPyAPI 0.13.3 a nič také samozrejme neexistovalo. Ale vo verzii 0.14.0, ktorá bola vydaná pomerne nedávno, bola táto funkcia už zahrnutá v knižnici. zoznámte sa,

nipyapi.canvas.set_remote_process_group_transmission

Takže pomocou knižnice NiPyAPI sme pripojili register, spustili tok a dokonca sme spustili procesory a prenos údajov. Potom môžete kód prečesať, pridať všetky druhy kontrol, protokolovanie a to je všetko. Ale to je úplne iný príbeh.

Z možností automatizácie, ktoré som zvažoval, sa mi posledná z nich zdala najefektívnejšia. Po prvé, stále ide o python kód, do ktorého môžete vložiť pomocný programový kód a využiť všetky výhody programovacieho jazyka. Po druhé, projekt NiPyAPI sa aktívne vyvíja a v prípade problémov môžete napísať vývojárovi. Po tretie, NiPyAPI je stále flexibilnejší nástroj na interakciu s NiFi pri riešení zložitých problémov. Napríklad pri určovaní, či sú teraz fronty správ v toku prázdne a či je možné aktualizovať skupinu procesov.

To je všetko. Opísal som 3 prístupy k automatizácii doručovania toku v NiFi, úskalia, s ktorými sa môže vývojár stretnúť, a poskytol som pracovný kód na automatizáciu doručovania. Ak vás táto téma zaujíma tak ako mňa, píšte!

Zdroj: hab.com

Pridať komentár