Áramlás-szállítás automatizálása Apache NiFi-ben

Hello!

Áramlás-szállítás automatizálása Apache NiFi-ben

A feladat a következő - van egy folyamat, a fenti képen látható, amelyet N szerverre kell kihelyezni Apache NiFi. Folyamatteszt – folyamatban van egy fájl létrehozása és elküldése egy másik NiFi-példánynak. Az adatátvitel a NiFi Site to Site protokoll használatával történik.

A NiFi Site to Site (S2S) egy biztonságos, könnyen konfigurálható módja a NiFi-példányok közötti adatátvitelnek. Hogyan működik az S2S, lásd dokumentáció és fontos, hogy ne felejtsük el beállítani a NiFi-példányt az S2S engedélyezéséhez, lásd itt.

Azokban az esetekben, amikor S2S-t használó adatátvitelről beszélünk, az egyik példányt kliensnek, a másodikat szervernek nevezzük. A kliens küld adatokat, a szerver fogad. Kétféleképpen konfigurálhatja a köztük lévő adatátvitelt:

  1. Nyomja. Az ügyfélpéldányból az adatok egy távoli folyamatcsoport (RPG) használatával kerülnek elküldésre. A szerverpéldányon az adatok fogadása a bemeneti porton keresztül történik
  2. Húz. A szerver RPG-vel fogad adatokat, a kliens pedig Output porton küld.


A bevezetéshez szükséges folyamatot az Apache Registry tárolja.

Az Apache NiFi Registry az Apache NiFi egyik alprojektje, amely eszközt biztosít az adatfolyam tárolására és a verziókezelésre. Egyfajta GIT. A rendszerleíró adatbázis telepítésével, konfigurálásával és kezelésével kapcsolatos információk itt találhatók hivatalos dokumentáció. A tárolási folyamatot egy folyamatcsoportba egyesítik, és ebben a formában tárolják a rendszerleíró adatbázisban. A cikk későbbi részében erre még visszatérünk.

Kezdetben, amikor N kis szám, az áramlás kézbesítése és frissítése elfogadható időn belül történik.

De ahogy N növekszik, a problémák egyre többek lesznek:

  1. több időbe telik az áramlás frissítése. Minden szerverre be kell jelentkeznie
  2. Sablonfrissítési hibák lépnek fel. Itt frissítették, de itt elfelejtették
  3. emberi hibák nagyszámú hasonló művelet végrehajtása során

Mindez arra a tényre vezet, hogy automatizálnunk kell a folyamatot. A probléma megoldására a következő módszerekkel próbálkoztam:

  1. Használjon MiNFi-t NiFi helyett
  2. NiFi CLI
  3. NiPyAPI

MiNiFi használata

Apache MiNiFy - az Apache NiFi alprojektje. A MiNiFy egy kompakt ügynök, amely ugyanazokat a processzorokat használja, mint a NiFi, lehetővé téve, hogy ugyanazokat az áramlásokat hozza létre, mint a NiFi-ben. Az ügynök könnyű jellegét többek között az a tény éri el, hogy a MiNiFy nem rendelkezik grafikus felülettel az áramlás konfigurálásához. A grafikus felület hiánya a MiNiFy-ben azt jelenti, hogy meg kell oldani az áramlás minifi-be való eljuttatásának problémáját. Mivel a MiNiFy-t aktívan használják az IOT-ban, számos összetevőből áll, és automatizálni kell a végső minifi-példányokhoz való áramlást. Ismerős feladat, igaz?

Egy másik alprojekt segít megoldani ezt a problémát - a MiNiFi C2 Server. Ez a termék a konfigurációs kihelyezési architektúra központi pontja. A környezet konfigurálása - le van írva ezt a cikket Habréról elegendő információ áll rendelkezésre a probléma megoldásához. A MiNiFi a C2 szerverrel együtt automatikusan frissíti a konfigurációját. Ennek a megközelítésnek az egyetlen hátránya, hogy sablonokat kell létrehoznia a C2 kiszolgálón; a rendszerleíró adatbázishoz való egyszerű kötelezettségvállalás nem elegendő.

A fenti cikkben leírt lehetőség működik, és nem nehéz megvalósítani, de nem szabad megfeledkeznünk a következőkről:

  1. A Minifiben nincs minden nifi processzor
  2. A Minifi processzoros verziók lemaradnak a NiFi processzoros verziók mögött.

A cikk írásakor a NiFi legújabb verziója 1.9.2. A MiNiFi processzor legújabb verziója 1.7.0. Processzorok hozzáadhatók a MiNiFi-hez, de a NiFi és a MiNiFi processzorok közötti verzióeltérések miatt ez nem biztos, hogy működik.

NiFi CLI

Ítélve leírás eszköz a hivatalos webhelyen, ez egy olyan eszköz, amely automatizálja a NiFI és a NiFi Registry közötti interakciót az áramlástovábbítás vagy a folyamatkezelés területén. A kezdéshez le kell töltenie ezt az eszközt. ezért.

Indítsa el a segédprogramot

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Ahhoz, hogy a rendszerleíró adatbázisból betölthessük a szükséges áramlást, ismernünk kell a vödör azonosítóit (vörös azonosító) és magát a folyamatot (folyamatazonosító). Ezeket az adatokat a cli-n keresztül vagy a NiFi registry webes felületén lehet megszerezni. A webes felületen így néz ki:

Áramlás-szállítás automatizálása Apache NiFi-ben

A CLI használatával ez történik:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Elkezdjük importálni a folyamatcsoportot a rendszerleíró adatbázisból:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Fontos szempont, hogy bármely nifi-példány megadható gazdagépként, amelyre a folyamatcsoportot görgetjük.

Folyamatcsoport hozzáadva leállított processzorokkal, ezeket el kell indítani

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Remek, beindultak a processzorok. A feladat feltételei szerint azonban NiFi-példányokra van szükségünk ahhoz, hogy adatokat küldjünk más példányoknak. Tegyük fel, hogy a Push módszert választotta az adatok kiszolgálóra való átviteléhez. Az adatátvitel megszervezéséhez engedélyeznie kell az adatátvitelt a hozzáadott Remote Process Group-on (RPG), amely már benne van a folyamatunkban.

Áramlás-szállítás automatizálása Apache NiFi-ben

A CLI-ben és más forrásokban található dokumentációban nem találtam módot az adatátvitel engedélyezésére. Ha tudja, hogyan kell ezt megtenni, kérjük, írja meg a megjegyzésekben.

Mivel van bashunk, és készen állunk a végére, megtaláljuk a kiutat! A probléma megoldásához használhatja a NiFi API-t. Használjuk a következő módszert, vegyük az azonosítót a fenti példákból (esetünkben ez 7f522a13-016e-1000-e504-d5b15587f2f3). A NiFi API metódusok leírása itt.

Áramlás-szállítás automatizálása Apache NiFi-ben
A törzsben át kell adnia a JSON-t, így:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Paraméterek, amelyeket ki kell tölteni, hogy működjön:
voltak — adatátvitel állapota. Elérhető: TRANSMITTING az adatátvitel engedélyezéséhez, LEÁLLÍTOTT a letiltáshoz
változat - processzoros verzió

verzió alapértelmezés szerint 0 lesz a létrehozáskor, de ezek a paraméterek a módszerrel beszerezhetők

Áramlás-szállítás automatizálása Apache NiFi-ben

A bash szkriptek rajongói számára ez a módszer megfelelőnek tűnhet, de számomra egy kicsit nehéz - a bash szkriptek nem a kedvenceim. A következő módszer szerintem érdekesebb és kényelmesebb.

NiPyAPI

A NiPyAPI egy Python-könyvtár a NiFi-példányokkal való interakcióhoz. Dokumentációs oldal tartalmazza a könyvtárral való munkához szükséges információkat. A gyorsindítás leírása a program a githubon.

A konfigurációt kidolgozó szkriptünk egy Python program. Térjünk át a kódolásra.
Konfigokat állítottunk be a további munkához. A következő paraméterekre lesz szükségünk:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Következő lépésként beszúrom a könyvtár metódusainak neveit, amelyeket leírtunk itt.

Csatlakoztassa a beállításjegyzéket a nifi-példányhoz a használatával

nipyapi.versioning.create_registry_client

Ebben a lépésben ellenőrizheti, hogy a rendszerleíró adatbázis már hozzá lett-e adva a példányhoz; ehhez használhatja a metódust

nipyapi.versioning.list_registry_clients

Megtaláljuk a vödröt az áramlás további kereséséhez a kosárban

nipyapi.versioning.get_registry_bucket

A talált vödör segítségével áramlást keresünk

nipyapi.versioning.get_flow_in_bucket

Ezt követően fontos megérteni, hogy ez a folyamatcsoport hozzáadásra került-e már. A Folyamat csoport koordináták szerint kerül elhelyezésre, és olyan helyzet adódhat, amikor egy második komponens fölé kerül. Megnéztem, ez megtörténhet :) Az összes hozzáadott folyamatcsoport eléréséhez a metódust használjuk

nipyapi.canvas.list_all_process_groups

Kereshetünk tovább, például név alapján.

Nem írom le a sablon frissítésének folyamatát, csak azt mondom, hogy ha processzorokat adnak hozzá a sablon új verziójához, akkor nincs probléma az üzenetek jelenlétével a sorokban. De ha a processzorokat eltávolítják, akkor problémák merülhetnek fel (a nifi nem teszi lehetővé a processzor eltávolítását, ha üzenetsor halmozódott fel előtte). Ha érdekel, hogyan oldottam meg ezt a problémát, írjon nekem és megbeszéljük ezt a problémát. Elérhetőségek a cikk végén. Folytassuk a folyamatcsoport hozzáadásának lépésével.

A szkript hibakeresése során arra a sajátosságra bukkantam, hogy a folyam legfrissebb verziója nincs mindig felhúzva, ezért javaslom először ezt a verziót ellenőrizni:

nipyapi.versioning.get_latest_flow_ver

Telepítési folyamatcsoport:

nipyapi.versioning.deploy_flow_version

Elindítjuk a processzorokat:

nipyapi.canvas.schedule_process_group

A CLI-vel kapcsolatos blokkban azt írták, hogy az adatátvitel nincs automatikusan engedélyezve a távoli folyamatcsoportban? A szkript implementálásakor én is találkoztam ezzel a problémával. Ekkor nem tudtam elindítani az adatátvitelt az API segítségével, és úgy döntöttem, hogy írok a NiPyAPI könyvtár fejlesztőjének, és tanácsot/segítséget kérek. A fejlesztő válaszolt nekem, megbeszéltük a problémát, és azt írta, hogy időre van szüksége, hogy "ellenőrizzen valamit". És akkor pár nap múlva érkezik egy levél, amiben Pythonban van írva egy függvény, ami megoldja az indítási problémámat!!! Akkoriban a NiPyAPI verzió 0.13.3 volt, és persze semmi ilyesmi nem volt. De a 0.14.0-s verzióban, amely nemrég jelent meg, ez a funkció már benne volt a könyvtárban. Találkozik,

nipyapi.canvas.set_remote_process_group_transmission

Így a NiPyAPI könyvtár segítségével összekapcsoltuk a rendszerleíró adatbázist, elindítottuk a folyamatot, és még a processzorokat és az adatátvitelt is elindítottuk. Ezután átfésülheti a kódot, hozzáadhat mindenféle ellenőrzést, naplózást, és ennyi. De ez egy teljesen más történet.

Az általam mérlegelt automatizálási lehetőségek közül az utolsó tűnt a leghatékonyabbnak. Először is, ez még mindig python kód, amelybe beágyazhat segédprogramkódot, és kihasználhatja a programozási nyelv minden előnyét. Másodszor, a NiPyAPI projekt aktívan fejlődik, és probléma esetén írhat a fejlesztőnek. Harmadszor, a NiPyAPI még mindig rugalmasabb eszköz a NiFi-vel való interakcióhoz az összetett problémák megoldásában. Például annak meghatározásakor, hogy az üzenetsorok üresek-e a folyamatban, és hogy a folyamatcsoport frissíthető-e.

Ez minden. Leírtam 3 megközelítést a NiFi-ben történő áramlás-továbbítás automatizálására, a buktatókat, amelyekkel a fejlesztő találkozhat, és működő kódot adtam a kézbesítés automatizálásához. Ha annyira érdekel ez a téma, mint engem, ír!

Forrás: will.com

Hozzászólás