Flow Delivery Automation v Apache NiFi

Ahoj všichni!

Flow Delivery Automation v Apache NiFi

Úkol je následující - existuje tok, znázorněný na obrázku výše, který je třeba zavést na N servery s Apache NiFi. Test toku – generuje se soubor a odesílá se do jiné instance NiFi. Přenos dat probíhá pomocí protokolu NiFi Site to Site.

NiFi Site to Site (S2S) je bezpečný, vysoce přizpůsobitelný způsob přenosu dat mezi instancemi NiFi. Podívejte se, jak S2S funguje dokumentace a je důležité mít na paměti, že je třeba nastavit instanci NiFi tak, aby S2S viděla zde.

Pokud jde o přenos dat pomocí S2S, jedna instance se nazývá klient, druhá je server. Klient odesílá data, server je přijímá. Přenos dat mezi nimi lze nastavit dvěma způsoby:

  1. Tlačit. Data jsou odesílána z klientské instance pomocí vzdálené skupiny procesů (RPG). Na instanci serveru jsou data přijímána pomocí vstupního portu
  2. Táhnout. Server přijímá data pomocí RPG, klient odesílá pomocí výstupního portu.


Tok pro rolování je uložen v registru Apache.

Apache NiFi Registry je dílčí projekt Apache NiFi, který poskytuje nástroj pro ukládání toku a správu verzí. Něco jako GIT. Informace o instalaci, konfiguraci a práci s registrem naleznete v oficiální dokumentace. Tok pro ukládání je sloučen do skupiny procesů a uložen v této podobě do registru. K tomu se ještě v článku vrátíme.

Na začátku, když N je malé číslo, je průtok dodáván a aktualizován ručně v rozumném čase.

Ale jak N roste, objevují se další problémy:

  1. aktualizace toku zabere více času. Musíte jít na všechny servery
  2. došlo k chybám při aktualizaci šablon. Tady aktualizovali, ale tady zapomněli
  3. lidské chyby při provádění velkého množství podobných operací

To vše nás přivádí k tomu, že je nutné proces automatizovat. Vyzkoušel jsem následující způsoby, jak tento problém vyřešit:

  1. Místo NiFi použijte MiNiFi
  2. NiFi CLI
  3. NiPyAPI

Pomocí MiNiFi

ApacheMiNify je dílčím projektem Apache NiFi. MiNiFy je kompaktní agent, který používá stejné procesory jako NiFi, což vám umožňuje vytvářet stejné toky jako v NiFi. Odlehčenosti agenta je dosaženo mimo jiné tím, že MiNiFy nemá grafické rozhraní pro konfiguraci toku. Absence grafického rozhraní v MiNiFy znamená, že je nutné vyřešit problém dodání toku do minifi. Vzhledem k tomu, že MiNiFy se aktivně používá v IOT, existuje mnoho komponent a proces dodávání toku do finálních minifi instancí musí být automatizován. Známý úkol, že?

Další dílčí projekt, MiNiFi C2 Server, pomůže tento problém vyřešit. Tento produkt má být ústředním bodem architektury zavádění konfigurace. Jak nakonfigurovat prostředí - popsáno v tento článek na Habré a informace stačí k vyřešení problému. MiNiFi ve spojení se serverem C2 automaticky aktualizuje svou konfiguraci. Jedinou nevýhodou tohoto přístupu je, že musíte vytvářet šablony na serveru C2, pouhé potvrzení registru nestačí.

Možnost popsaná v článku výše je funkční a není náročná na implementaci, ale nesmíme zapomenout na následující:

  1. minifi nemá všechny procesory od nifi
  2. Verze CPU v Minifi zaostávají za verzemi CPU v NiFi.

V době psaní tohoto článku je nejnovější verze NiFi 1.9.2. Nejnovější verze procesoru MiNiFi je 1.7.0. K MiNiFi lze přidat procesory, ale kvůli rozdílům ve verzích mezi procesory NiFi a MiNiFi to nemusí fungovat.

NiFi CLI

Soudě podle popis nástroj na oficiálních stránkách, jedná se o nástroj pro automatizaci interakce mezi NiFI a NiFi Registry v oblasti doručování toků nebo řízení procesů. Chcete-li začít, stáhněte si tento nástroj. proto.

Spusťte nástroj

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Abychom mohli načíst potřebný tok z registru, potřebujeme znát identifikátory koše (identifikátor bucketu) a samotný tok (identifikátor toku). Tato data lze získat buď přes cli nebo ve webovém rozhraní registru NiFi. Ve webovém rozhraní to vypadá takto:

Flow Delivery Automation v Apache NiFi

Pomocí CLI provedete toto:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Spusťte skupinu procesů importu z registru:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Důležitým bodem je, že libovolnou instanci nifi lze zadat jako hostitele, na kterém natočíme skupinu procesů.

Skupina procesů přidána se zastavenými procesory, je třeba je spustit

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Výborně, procesory začaly. Podle podmínek úlohy však potřebujeme instance NiFi k odesílání dat do jiných instancí. Předpokládejme, že jste pro přenos dat na server zvolili metodu Push. Abyste mohli organizovat přenos dat, musíte povolit přenos dat na přidané skupině Remote Process Group (RPG), která je již součástí našeho toku.

Flow Delivery Automation v Apache NiFi

V dokumentaci v CLI a dalších zdrojích jsem nenašel způsob, jak povolit přenos dat. Pokud víte, jak na to, napište do komentářů.

Protože máme bash a jsme připraveni jít až do konce, najdeme cestu ven! K vyřešení tohoto problému můžete použít NiFi API. Použijme následující metodu, ID vezmeme z příkladů výše (v našem případě je to 7f522a13-016e-1000-e504-d5b15587f2f3). Popis metod NiFi API zde.

Flow Delivery Automation v Apache NiFi
V těle musíte předat JSON v následujícím tvaru:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametry, které musí být vyplněny, aby „fungovaly“:
stát — stav přenosu dat. K dispozici TRANSMITTING pro povolení přenosu dat, STOPPED pro deaktivaci
verze - verze procesoru

verze bude při vytvoření výchozí 0, ale tyto parametry lze získat pomocí této metody

Flow Delivery Automation v Apache NiFi

Pro fanoušky bash skriptů se tato metoda může zdát vhodná, ale pro mě je to trochu obtížné - bash skripty nejsou moje oblíbené. Další způsob je podle mého názoru zajímavější a pohodlnější.

NiPyAPI

NiPyAPI je knihovna Pythonu pro interakci s instancemi NiFi. Stránka dokumentace obsahuje potřebné informace pro práci s knihovnou. Rychlý start je popsán v projekt na githubu.

Náš skript pro zavedení konfigurace je program v Pythonu. Pojďme ke kódování.
Nastavte konfigurace pro další práci. Budeme potřebovat následující parametry:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Dále vložím názvy metod této knihovny, které jsou popsány zde.

Registr připojíme k instanci nifi pomocí

nipyapi.versioning.create_registry_client

V tomto kroku můžete také přidat kontrolu, že registr již byl do instance přidán, k tomu můžete použít metodu

nipyapi.versioning.list_registry_clients

Najdeme kbelík k dalšímu hledání toku v koši

nipyapi.versioning.get_registry_bucket

Podle nalezeného kbelíku hledáme průtok

nipyapi.versioning.get_flow_in_bucket

Dále je důležité pochopit, zda tato skupina procesů již byla přidána. Procesní skupina je umístěna podle souřadnic a může nastat situace, kdy je druhá navrstvena na jednu. Zkontroloval jsem, může to být 🙂 Chcete-li získat všechny přidané skupiny procesů, použijte metodu

nipyapi.canvas.list_all_process_groups

Dále můžeme hledat např. podle jména.

Nebudu popisovat proces aktualizace šablony, pouze řeknu, že pokud jsou v nové verzi šablony přidány procesory, pak nejsou problémy s přítomností zpráv ve frontách. Ale pokud jsou procesory odstraněny, pak mohou nastat problémy (nifi neumožňuje odebrání procesoru, pokud se před ním nahromadila fronta zpráv). Pokud vás zajímá, jak jsem tento problém vyřešil - napište mi, prosím, probereme tento bod. Kontakty na konci článku. Přejděme ke kroku přidání skupiny procesů.

Při ladění skriptu jsem narazil na zvláštnost, že ne vždy se stáhne nejnovější verze flow, proto doporučuji nejprve zkontrolovat tuto verzi:

nipyapi.versioning.get_latest_flow_ver

Nasadit skupinu procesů:

nipyapi.versioning.deploy_flow_version

Spustíme procesory:

nipyapi.canvas.schedule_process_group

V bloku o CLI bylo napsáno, že ve skupině vzdálených procesů není automaticky povolen přenos dat? Při implementaci skriptu jsem narazil také na tento problém. V té době jsem nemohl spustit přenos dat pomocí API a rozhodl jsem se napsat vývojáři knihovny NiPyAPI a požádat o radu / pomoc. Vývojář mi odpověděl, probrali jsme problém a on napsal, že potřebuje čas, aby si něco „zkontroloval“. A teď, o pár dní později, přijde e-mail, ve kterém je napsána funkce Pythonu, která řeší můj problém se spuštěním!!! V té době byla verze NiPyAPI 0.13.3 a nic takového v ní samozřejmě nebylo. Ale ve verzi 0.14.0, která byla vydána poměrně nedávno, je tato funkce již součástí knihovny. Setkat

nipyapi.canvas.set_remote_process_group_transmission

Takže pomocí knihovny NiPyAPI jsme propojili registr, spustili tok a dokonce jsme spustili procesory a přenos dat. Poté můžete kód pročesat, přidat všechny druhy kontrol, protokolování a to je vše. Ale to je úplně jiný příběh.

Z možností automatizace, které jsem zvažoval, se mi poslední jmenovaná zdála nejúčinnější. Za prvé je to stále kód pythonu, do kterého můžete vložit pomocný programový kód a plně využít výhod programovacího jazyka. Za druhé, projekt NiPyAPI se aktivně vyvíjí a v případě problémů můžete napsat vývojáři. Za třetí, NiPyAPI je stále flexibilnějším nástrojem pro interakci s NiFi při řešení složitých problémů. Například při určování, zda jsou fronty zpráv v toku aktuálně prázdné a zda je možné aktualizovat skupinu procesů.

To je vše. Popsal jsem 3 přístupy k automatizaci doručování toku v NiFi, úskalí, se kterými se může vývojář setkat, a poskytl jsem funkční kód pro automatizaci doručování. Pokud vás toto téma zajímá stejně jako mě, napsat!

Zdroj: www.habr.com

Přidat komentář