Ahoj všetci!
Úloha je nasledovná - existuje tok, znázornený na obrázku vyššie, ktorý je potrebné zaviesť na N servery s
NiFi Site to Site (S2S) je bezpečný, ľahko konfigurovateľný spôsob prenosu údajov medzi inštanciami NiFi. Ako funguje S2S, pozri
V prípadoch, keď hovoríme o prenose dát pomocou S2S, jedna inštancia sa nazýva klient, druhá server. Klient odosiela dáta, server ich prijíma. Dva spôsoby konfigurácie prenosu údajov medzi nimi:
- Tlačiť. Z inštancie klienta sa údaje odosielajú pomocou skupiny vzdialených procesov (RPG). Na inštancii servera sa údaje prijímajú pomocou vstupného portu
- Ťahať. Server prijíma dáta pomocou RPG, klient odosiela cez Output port.
Tok na zavedenie je uložený v registri Apache.
Apache NiFi Registry je podprojekt Apache NiFi, ktorý poskytuje nástroj na ukladanie toku a kontrolu verzií. Akýsi GIT. Informácie o inštalácii, konfigurácii a práci s registrom nájdete v
Na začiatku, keď je N malé číslo, sa prietok dodáva a aktualizuje manuálne v prijateľnom čase.
Ale ako N rastie, problémy sú čoraz početnejšie:
- aktualizácia toku si vyžaduje viac času. Musíte sa prihlásiť na všetky servery
- Vyskytujú sa chyby pri aktualizácii šablóny. Tu to aktualizovali, ale tu zabudli
- ľudské chyby pri vykonávaní veľkého množstva podobných operácií
To všetko nás privádza k tomu, že musíme proces automatizovať. Skúsil som tento problém vyriešiť nasledujúcimi spôsobmi:
- Namiesto NiFi použite MiNiFi
- NiFi CLI
- NiPyAPI
Pomocou MiNiFi
Ďalší podprojekt pomôže vyriešiť tento problém - MiNiFi C2 Server. Tento produkt má byť ústredným bodom architektúry zavádzania konfigurácie. Ako nakonfigurovať prostredie - popísané v
Možnosť opísaná v článku vyššie je funkčná a nie je náročná na implementáciu, ale nesmieme zabúdať na nasledovné:
- Minifi nema vsetky procesory od nifi
- Verzie procesorov Minifi zaostávajú za verziami procesorov NiFi.
V čase písania tohto článku je najnovšia verzia NiFi 1.9.2. Najnovšia verzia procesora MiNiFi je 1.7.0. K MiNiFi je možné pridať procesory, ale kvôli rozdielom vo verziách medzi procesormi NiFi a MiNiFi to nemusí fungovať.
NiFi CLI
Súdiac podľa
Spustite pomôcku
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Aby sme mohli načítať požadovaný tok z registra, potrebujeme poznať identifikátory segmentu (identifikátor toku) a samotného toku (identifikátor toku). Tieto údaje je možné získať buď cez cli alebo vo webovom rozhraní registra NiFi. Vo webovom rozhraní to vyzerá takto:
Pomocou CLI sa to robí:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Začneme importovať skupinu procesov z registra:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Dôležitým bodom je, že ľubovoľná inštancia nifi môže byť špecifikovaná ako hostiteľ, ktorému rolujeme skupinu procesov.
Pridaná skupina procesov so zastavenými procesormi, je potrebné ich spustiť
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Výborne, procesory sa rozbehli. Podľa podmienok úlohy však potrebujeme inštancie NiFi na odosielanie údajov do iných inštancií. Predpokladajme, že ste na prenos údajov na server zvolili metódu Push. Ak chcete organizovať prenos údajov, musíte povoliť prenos údajov na pridanej skupine vzdialených procesov (RPG), ktorá je už zahrnutá v našom toku.
V dokumentácii v CLI a iných zdrojoch som nenašiel spôsob, ako povoliť prenos dát. Ak viete, ako na to, napíšte do komentárov.
Keďže máme bash a sme pripravení ísť až do konca, nájdeme cestu von! Na vyriešenie tohto problému môžete použiť rozhranie NiFi API. Použime nasledujúcu metódu, vezmite ID z príkladov vyššie (v našom prípade je to 7f522a13-016e-1000-e504-d5b15587f2f3). Popis metód NiFi API
V tele musíte odovzdať JSON takto:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametre, ktoré je potrebné vyplniť, aby to fungovalo:
stať — stav prenosu údajov. Dostupné: TRANSMITTING na povolenie prenosu dát, ZASTAVENÉ na deaktiváciu
verzia - verzia procesora
verzia bude pri vytvorení predvolene nastavená na 0, ale tieto parametre je možné získať pomocou tejto metódy
Pre fanúšikov bash skriptov sa táto metóda môže zdať vhodná, ale pre mňa je to trochu ťažké - bash skripty nie sú moje obľúbené. Ďalšia metóda je podľa môjho názoru zaujímavejšia a pohodlnejšia.
NiPyAPI
NiPyAPI je knižnica Pythonu na interakciu s inštanciami NiFi.
Náš skript na zavedenie konfigurácie je program v Pythone. Prejdime ku kódovaniu.
Nastavili sme konfigurácie pre ďalšiu prácu. Budeme potrebovať nasledujúce parametre:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Ďalej vložím názvy metód tejto knižnice, ktoré sú popísané
Pripojte register k inštancii nifi pomocou
nipyapi.versioning.create_registry_client
V tomto kroku môžete tiež pridať kontrolu, že register už bol pridaný do inštancie; na tento účel môžete použiť metódu
nipyapi.versioning.list_registry_clients
Vedro na ďalšie hľadanie prietoku nájdeme v košíku
nipyapi.versioning.get_registry_bucket
Pomocou nájdeného vedra hľadáme prietok
nipyapi.versioning.get_flow_in_bucket
Ďalej je dôležité pochopiť, či táto skupina procesov už bola pridaná. Skupina procesov je umiestnená podľa súradníc a môže nastať situácia, keď sa druhá zložka prekrýva s jednou zložkou. Skontroloval som, toto sa môže stať :) Na získanie všetkých pridaných skupín procesov používame metódu
nipyapi.canvas.list_all_process_groups
Ďalej môžeme hľadať napríklad podľa mena.
Nebudem popisovať proces aktualizácie šablóny, poviem len, že ak sa v novej verzii šablóny pridajú procesory, potom nie sú problémy s prítomnosťou správ vo frontoch. Ak sa však procesory odstránia, môžu nastať problémy (nifi vám neumožňuje odstrániť procesor, ak sa pred ním nahromadil front správ). Ak vás zaujíma, ako som tento problém vyriešil, napíšte mi a prediskutujeme tento problém. Kontakty na konci článku. Prejdime ku kroku pridania skupiny procesov.
Pri ladení skriptu som narazil na zvláštnosť, že nie vždy sa vytiahne najnovšia verzia toku, preto odporúčam najskôr skontrolovať túto verziu:
nipyapi.versioning.get_latest_flow_ver
Nasadiť skupinu procesov:
nipyapi.versioning.deploy_flow_version
Spustíme procesory:
nipyapi.canvas.schedule_process_group
V bloku o CLI bolo napísané, že v skupine vzdialených procesov nie je automaticky povolený prenos dát? Pri implementácii skriptu som narazil aj na tento problém. Vtedy sa mi nepodarilo spustiť prenos dát pomocou API a rozhodol som sa napísať vývojárovi knižnice NiPyAPI a požiadať o radu/pomoc. Developer mi odpovedal, diskutovali sme o probléme a napísal, že potrebuje čas „niečo skontrolovať“. A potom, o pár dní neskôr, príde list, v ktorom je napísaná funkcia v Pythone, ktorá rieši môj problém so spustením!!! Vtedy bola verzia NiPyAPI 0.13.3 a nič také samozrejme neexistovalo. Ale vo verzii 0.14.0, ktorá bola vydaná pomerne nedávno, bola táto funkcia už zahrnutá v knižnici. zoznámte sa,
nipyapi.canvas.set_remote_process_group_transmission
Takže pomocou knižnice NiPyAPI sme pripojili register, spustili tok a dokonca sme spustili procesory a prenos údajov. Potom môžete kód prečesať, pridať všetky druhy kontrol, protokolovanie a to je všetko. Ale to je úplne iný príbeh.
Z možností automatizácie, ktoré som zvažoval, sa mi posledná z nich zdala najefektívnejšia. Po prvé, stále ide o python kód, do ktorého môžete vložiť pomocný programový kód a využiť všetky výhody programovacieho jazyka. Po druhé, projekt NiPyAPI sa aktívne vyvíja a v prípade problémov môžete napísať vývojárovi. Po tretie, NiPyAPI je stále flexibilnejší nástroj na interakciu s NiFi pri riešení zložitých problémov. Napríklad pri určovaní, či sú teraz fronty správ v toku prázdne a či je možné aktualizovať skupinu procesov.
To je všetko. Opísal som 3 prístupy k automatizácii doručovania toku v NiFi, úskalia, s ktorými sa môže vývojár stretnúť, a poskytol som pracovný kód na automatizáciu doručovania. Ak vás táto téma zaujíma tak ako mňa,
Zdroj: hab.com