Hello!
A feladat a következő - van egy folyamat, a fenti képen látható, amelyet N szerverre kell kihelyezni
A NiFi Site to Site (S2S) egy biztonságos, könnyen konfigurálható módja a NiFi-példányok közötti adatátvitelnek. Hogyan működik az S2S, lásd
Azokban az esetekben, amikor S2S-t használó adatátvitelről beszélünk, az egyik példányt kliensnek, a másodikat szervernek nevezzük. A kliens küld adatokat, a szerver fogad. Kétféleképpen konfigurálhatja a köztük lévő adatátvitelt:
- Nyomja. Az ügyfélpéldányból az adatok egy távoli folyamatcsoport (RPG) használatával kerülnek elküldésre. A szerverpéldányon az adatok fogadása a bemeneti porton keresztül történik
- Húz. A szerver RPG-vel fogad adatokat, a kliens pedig Output porton küld.
A bevezetéshez szükséges folyamatot az Apache Registry tárolja.
Az Apache NiFi Registry az Apache NiFi egyik alprojektje, amely eszközt biztosít az adatfolyam tárolására és a verziókezelésre. Egyfajta GIT. A rendszerleíró adatbázis telepítésével, konfigurálásával és kezelésével kapcsolatos információk itt találhatók
Kezdetben, amikor N kis szám, az áramlás kézbesítése és frissítése elfogadható időn belül történik.
De ahogy N növekszik, a problémák egyre többek lesznek:
- több időbe telik az áramlás frissítése. Minden szerverre be kell jelentkeznie
- Sablonfrissítési hibák lépnek fel. Itt frissítették, de itt elfelejtették
- emberi hibák nagyszámú hasonló művelet végrehajtása során
Mindez arra a tényre vezet, hogy automatizálnunk kell a folyamatot. A probléma megoldására a következő módszerekkel próbálkoztam:
- Használjon MiNFi-t NiFi helyett
- NiFi CLI
- NiPyAPI
MiNiFi használata
Egy másik alprojekt segít megoldani ezt a problémát - a MiNiFi C2 Server. Ez a termék a konfigurációs kihelyezési architektúra központi pontja. A környezet konfigurálása - le van írva
A fenti cikkben leírt lehetőség működik, és nem nehéz megvalósítani, de nem szabad megfeledkeznünk a következőkről:
- A Minifiben nincs minden nifi processzor
- A Minifi processzoros verziók lemaradnak a NiFi processzoros verziók mögött.
A cikk írásakor a NiFi legújabb verziója 1.9.2. A MiNiFi processzor legújabb verziója 1.7.0. Processzorok hozzáadhatók a MiNiFi-hez, de a NiFi és a MiNiFi processzorok közötti verzióeltérések miatt ez nem biztos, hogy működik.
NiFi CLI
Ítélve
Indítsa el a segédprogramot
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Ahhoz, hogy a rendszerleíró adatbázisból betölthessük a szükséges áramlást, ismernünk kell a vödör azonosítóit (vörös azonosító) és magát a folyamatot (folyamatazonosító). Ezeket az adatokat a cli-n keresztül vagy a NiFi registry webes felületén lehet megszerezni. A webes felületen így néz ki:
A CLI használatával ez történik:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Elkezdjük importálni a folyamatcsoportot a rendszerleíró adatbázisból:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Fontos szempont, hogy bármely nifi-példány megadható gazdagépként, amelyre a folyamatcsoportot görgetjük.
Folyamatcsoport hozzáadva leállított processzorokkal, ezeket el kell indítani
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Remek, beindultak a processzorok. A feladat feltételei szerint azonban NiFi-példányokra van szükségünk ahhoz, hogy adatokat küldjünk más példányoknak. Tegyük fel, hogy a Push módszert választotta az adatok kiszolgálóra való átviteléhez. Az adatátvitel megszervezéséhez engedélyeznie kell az adatátvitelt a hozzáadott Remote Process Group-on (RPG), amely már benne van a folyamatunkban.
A CLI-ben és más forrásokban található dokumentációban nem találtam módot az adatátvitel engedélyezésére. Ha tudja, hogyan kell ezt megtenni, kérjük, írja meg a megjegyzésekben.
Mivel van bashunk, és készen állunk a végére, megtaláljuk a kiutat! A probléma megoldásához használhatja a NiFi API-t. Használjuk a következő módszert, vegyük az azonosítót a fenti példákból (esetünkben ez 7f522a13-016e-1000-e504-d5b15587f2f3). A NiFi API metódusok leírása
A törzsben át kell adnia a JSON-t, így:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Paraméterek, amelyeket ki kell tölteni, hogy működjön:
voltak — adatátvitel állapota. Elérhető: TRANSMITTING az adatátvitel engedélyezéséhez, LEÁLLÍTOTT a letiltáshoz
változat - processzoros verzió
verzió alapértelmezés szerint 0 lesz a létrehozáskor, de ezek a paraméterek a módszerrel beszerezhetők
A bash szkriptek rajongói számára ez a módszer megfelelőnek tűnhet, de számomra egy kicsit nehéz - a bash szkriptek nem a kedvenceim. A következő módszer szerintem érdekesebb és kényelmesebb.
NiPyAPI
A NiPyAPI egy Python-könyvtár a NiFi-példányokkal való interakcióhoz.
A konfigurációt kidolgozó szkriptünk egy Python program. Térjünk át a kódolásra.
Konfigokat állítottunk be a további munkához. A következő paraméterekre lesz szükségünk:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Következő lépésként beszúrom a könyvtár metódusainak neveit, amelyeket leírtunk
Csatlakoztassa a beállításjegyzéket a nifi-példányhoz a használatával
nipyapi.versioning.create_registry_client
Ebben a lépésben ellenőrizheti, hogy a rendszerleíró adatbázis már hozzá lett-e adva a példányhoz; ehhez használhatja a metódust
nipyapi.versioning.list_registry_clients
Megtaláljuk a vödröt az áramlás további kereséséhez a kosárban
nipyapi.versioning.get_registry_bucket
A talált vödör segítségével áramlást keresünk
nipyapi.versioning.get_flow_in_bucket
Ezt követően fontos megérteni, hogy ez a folyamatcsoport hozzáadásra került-e már. A Folyamat csoport koordináták szerint kerül elhelyezésre, és olyan helyzet adódhat, amikor egy második komponens fölé kerül. Megnéztem, ez megtörténhet :) Az összes hozzáadott folyamatcsoport eléréséhez a metódust használjuk
nipyapi.canvas.list_all_process_groups
Kereshetünk tovább, például név alapján.
Nem írom le a sablon frissítésének folyamatát, csak azt mondom, hogy ha processzorokat adnak hozzá a sablon új verziójához, akkor nincs probléma az üzenetek jelenlétével a sorokban. De ha a processzorokat eltávolítják, akkor problémák merülhetnek fel (a nifi nem teszi lehetővé a processzor eltávolítását, ha üzenetsor halmozódott fel előtte). Ha érdekel, hogyan oldottam meg ezt a problémát, írjon nekem és megbeszéljük ezt a problémát. Elérhetőségek a cikk végén. Folytassuk a folyamatcsoport hozzáadásának lépésével.
A szkript hibakeresése során arra a sajátosságra bukkantam, hogy a folyam legfrissebb verziója nincs mindig felhúzva, ezért javaslom először ezt a verziót ellenőrizni:
nipyapi.versioning.get_latest_flow_ver
Telepítési folyamatcsoport:
nipyapi.versioning.deploy_flow_version
Elindítjuk a processzorokat:
nipyapi.canvas.schedule_process_group
A CLI-vel kapcsolatos blokkban azt írták, hogy az adatátvitel nincs automatikusan engedélyezve a távoli folyamatcsoportban? A szkript implementálásakor én is találkoztam ezzel a problémával. Ekkor nem tudtam elindítani az adatátvitelt az API segítségével, és úgy döntöttem, hogy írok a NiPyAPI könyvtár fejlesztőjének, és tanácsot/segítséget kérek. A fejlesztő válaszolt nekem, megbeszéltük a problémát, és azt írta, hogy időre van szüksége, hogy "ellenőrizzen valamit". És akkor pár nap múlva érkezik egy levél, amiben Pythonban van írva egy függvény, ami megoldja az indítási problémámat!!! Akkoriban a NiPyAPI verzió 0.13.3 volt, és persze semmi ilyesmi nem volt. De a 0.14.0-s verzióban, amely nemrég jelent meg, ez a funkció már benne volt a könyvtárban. Találkozik,
nipyapi.canvas.set_remote_process_group_transmission
Így a NiPyAPI könyvtár segítségével összekapcsoltuk a rendszerleíró adatbázist, elindítottuk a folyamatot, és még a processzorokat és az adatátvitelt is elindítottuk. Ezután átfésülheti a kódot, hozzáadhat mindenféle ellenőrzést, naplózást, és ennyi. De ez egy teljesen más történet.
Az általam mérlegelt automatizálási lehetőségek közül az utolsó tűnt a leghatékonyabbnak. Először is, ez még mindig python kód, amelybe beágyazhat segédprogramkódot, és kihasználhatja a programozási nyelv minden előnyét. Másodszor, a NiPyAPI projekt aktívan fejlődik, és probléma esetén írhat a fejlesztőnek. Harmadszor, a NiPyAPI még mindig rugalmasabb eszköz a NiFi-vel való interakcióhoz az összetett problémák megoldásában. Például annak meghatározásakor, hogy az üzenetsorok üresek-e a folyamatban, és hogy a folyamatcsoport frissíthető-e.
Ez minden. Leírtam 3 megközelítést a NiFi-ben történő áramlás-továbbítás automatizálására, a buktatókat, amelyekkel a fejlesztő találkozhat, és működő kódot adtam a kézbesítés automatizálásához. Ha annyira érdekel ez a téma, mint engem,
Forrás: will.com