Ahoj všichni!
Úkol je následující - existuje tok, znázorněný na obrázku výše, který je třeba zavést na N servery s
NiFi Site to Site (S2S) je bezpečný, vysoce přizpůsobitelný způsob přenosu dat mezi instancemi NiFi. Podívejte se, jak S2S funguje
Pokud jde o přenos dat pomocí S2S, jedna instance se nazývá klient, druhá je server. Klient odesílá data, server je přijímá. Přenos dat mezi nimi lze nastavit dvěma způsoby:
- Tlačit. Data jsou odesílána z klientské instance pomocí vzdálené skupiny procesů (RPG). Na instanci serveru jsou data přijímána pomocí vstupního portu
- Táhnout. Server přijímá data pomocí RPG, klient odesílá pomocí výstupního portu.
Tok pro rolování je uložen v registru Apache.
Apache NiFi Registry je dílčí projekt Apache NiFi, který poskytuje nástroj pro ukládání toku a správu verzí. Něco jako GIT. Informace o instalaci, konfiguraci a práci s registrem naleznete v
Na začátku, když N je malé číslo, je průtok dodáván a aktualizován ručně v rozumném čase.
Ale jak N roste, objevují se další problémy:
- aktualizace toku zabere více času. Musíte jít na všechny servery
- došlo k chybám při aktualizaci šablon. Tady aktualizovali, ale tady zapomněli
- lidské chyby při provádění velkého množství podobných operací
To vše nás přivádí k tomu, že je nutné proces automatizovat. Vyzkoušel jsem následující způsoby, jak tento problém vyřešit:
- Místo NiFi použijte MiNiFi
- NiFi CLI
- NiPyAPI
Pomocí MiNiFi
Další dílčí projekt, MiNiFi C2 Server, pomůže tento problém vyřešit. Tento produkt má být ústředním bodem architektury zavádění konfigurace. Jak nakonfigurovat prostředí - popsáno v
Možnost popsaná v článku výše je funkční a není náročná na implementaci, ale nesmíme zapomenout na následující:
- minifi nemá všechny procesory od nifi
- Verze CPU v Minifi zaostávají za verzemi CPU v NiFi.
V době psaní tohoto článku je nejnovější verze NiFi 1.9.2. Nejnovější verze procesoru MiNiFi je 1.7.0. K MiNiFi lze přidat procesory, ale kvůli rozdílům ve verzích mezi procesory NiFi a MiNiFi to nemusí fungovat.
NiFi CLI
Soudě podle
Spusťte nástroj
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Abychom mohli načíst potřebný tok z registru, potřebujeme znát identifikátory koše (identifikátor bucketu) a samotný tok (identifikátor toku). Tato data lze získat buď přes cli nebo ve webovém rozhraní registru NiFi. Ve webovém rozhraní to vypadá takto:
Pomocí CLI provedete toto:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Spusťte skupinu procesů importu z registru:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Důležitým bodem je, že libovolnou instanci nifi lze zadat jako hostitele, na kterém natočíme skupinu procesů.
Skupina procesů přidána se zastavenými procesory, je třeba je spustit
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Výborně, procesory začaly. Podle podmínek úlohy však potřebujeme instance NiFi k odesílání dat do jiných instancí. Předpokládejme, že jste pro přenos dat na server zvolili metodu Push. Abyste mohli organizovat přenos dat, musíte povolit přenos dat na přidané skupině Remote Process Group (RPG), která je již součástí našeho toku.
V dokumentaci v CLI a dalších zdrojích jsem nenašel způsob, jak povolit přenos dat. Pokud víte, jak na to, napište do komentářů.
Protože máme bash a jsme připraveni jít až do konce, najdeme cestu ven! K vyřešení tohoto problému můžete použít NiFi API. Použijme následující metodu, ID vezmeme z příkladů výše (v našem případě je to 7f522a13-016e-1000-e504-d5b15587f2f3). Popis metod NiFi API
V těle musíte předat JSON v následujícím tvaru:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametry, které musí být vyplněny, aby „fungovaly“:
stát — stav přenosu dat. K dispozici TRANSMITTING pro povolení přenosu dat, STOPPED pro deaktivaci
verze - verze procesoru
verze bude při vytvoření výchozí 0, ale tyto parametry lze získat pomocí této metody
Pro fanoušky bash skriptů se tato metoda může zdát vhodná, ale pro mě je to trochu obtížné - bash skripty nejsou moje oblíbené. Další způsob je podle mého názoru zajímavější a pohodlnější.
NiPyAPI
NiPyAPI je knihovna Pythonu pro interakci s instancemi NiFi.
Náš skript pro zavedení konfigurace je program v Pythonu. Pojďme ke kódování.
Nastavte konfigurace pro další práci. Budeme potřebovat následující parametry:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Dále vložím názvy metod této knihovny, které jsou popsány
Registr připojíme k instanci nifi pomocí
nipyapi.versioning.create_registry_client
V tomto kroku můžete také přidat kontrolu, že registr již byl do instance přidán, k tomu můžete použít metodu
nipyapi.versioning.list_registry_clients
Najdeme kbelík k dalšímu hledání toku v koši
nipyapi.versioning.get_registry_bucket
Podle nalezeného kbelíku hledáme průtok
nipyapi.versioning.get_flow_in_bucket
Dále je důležité pochopit, zda tato skupina procesů již byla přidána. Procesní skupina je umístěna podle souřadnic a může nastat situace, kdy je druhá navrstvena na jednu. Zkontroloval jsem, může to být 🙂 Chcete-li získat všechny přidané skupiny procesů, použijte metodu
nipyapi.canvas.list_all_process_groups
Dále můžeme hledat např. podle jména.
Nebudu popisovat proces aktualizace šablony, pouze řeknu, že pokud jsou v nové verzi šablony přidány procesory, pak nejsou problémy s přítomností zpráv ve frontách. Ale pokud jsou procesory odstraněny, pak mohou nastat problémy (nifi neumožňuje odebrání procesoru, pokud se před ním nahromadila fronta zpráv). Pokud vás zajímá, jak jsem tento problém vyřešil - napište mi, prosím, probereme tento bod. Kontakty na konci článku. Přejděme ke kroku přidání skupiny procesů.
Při ladění skriptu jsem narazil na zvláštnost, že ne vždy se stáhne nejnovější verze flow, proto doporučuji nejprve zkontrolovat tuto verzi:
nipyapi.versioning.get_latest_flow_ver
Nasadit skupinu procesů:
nipyapi.versioning.deploy_flow_version
Spustíme procesory:
nipyapi.canvas.schedule_process_group
V bloku o CLI bylo napsáno, že ve skupině vzdálených procesů není automaticky povolen přenos dat? Při implementaci skriptu jsem narazil také na tento problém. V té době jsem nemohl spustit přenos dat pomocí API a rozhodl jsem se napsat vývojáři knihovny NiPyAPI a požádat o radu / pomoc. Vývojář mi odpověděl, probrali jsme problém a on napsal, že potřebuje čas, aby si něco „zkontroloval“. A teď, o pár dní později, přijde e-mail, ve kterém je napsána funkce Pythonu, která řeší můj problém se spuštěním!!! V té době byla verze NiPyAPI 0.13.3 a nic takového v ní samozřejmě nebylo. Ale ve verzi 0.14.0, která byla vydána poměrně nedávno, je tato funkce již součástí knihovny. Setkat
nipyapi.canvas.set_remote_process_group_transmission
Takže pomocí knihovny NiPyAPI jsme propojili registr, spustili tok a dokonce jsme spustili procesory a přenos dat. Poté můžete kód pročesat, přidat všechny druhy kontrol, protokolování a to je vše. Ale to je úplně jiný příběh.
Z možností automatizace, které jsem zvažoval, se mi poslední jmenovaná zdála nejúčinnější. Za prvé je to stále kód pythonu, do kterého můžete vložit pomocný programový kód a plně využít výhod programovacího jazyka. Za druhé, projekt NiPyAPI se aktivně vyvíjí a v případě problémů můžete napsat vývojáři. Za třetí, NiPyAPI je stále flexibilnějším nástrojem pro interakci s NiFi při řešení složitých problémů. Například při určování, zda jsou fronty zpráv v toku aktuálně prázdné a zda je možné aktualizovat skupinu procesů.
To je vše. Popsal jsem 3 přístupy k automatizaci doručování toku v NiFi, úskalí, se kterými se může vývojář setkat, a poskytl jsem funkční kód pro automatizaci doručování. Pokud vás toto téma zajímá stejně jako mě,
Zdroj: www.habr.com