Hallo an alle!

Die Aufgabe ist wie folgt: Es gibt einen Ablauf, wie im Bild oben dargestellt, der auf N Servern ausgerollt werden muss . Flow-Test – eine Datei wird generiert und an eine andere NiFi-Instanz gesendet. Die Datenübertragung erfolgt über das NiFi Site-to-Site-Protokoll.
NiFi Site to Site (S2S) ist eine sichere, einfach konfigurierbare Möglichkeit, Daten zwischen NiFi-Instanzen zu übertragen. Wie S2S funktioniert, siehe und es ist wichtig, nicht zu vergessen, die NiFi-Instanz so zu konfigurieren, dass S2S zugelassen wird, siehe .
In Fällen, in denen es um die Datenübertragung mit S2S geht, wird eine Instanz als Client und die zweite als Server bezeichnet. Der Client sendet Daten, der Server empfängt. Zwei Möglichkeiten, die Datenübertragung zwischen ihnen zu konfigurieren:
- Push. Von der Client-Instanz werden Daten mithilfe einer Remote Process Group (RPG) gesendet. Auf der Serverinstanz werden Daten über den Eingabeport empfangen
- Pull. Der Server empfängt Daten über RPG, der Client sendet sie über den Ausgabeport.
Der Ablauf für die Einführung wird in der Apache-Registrierung gespeichert.
Apache NiFi Registry ist ein Unterprojekt von Apache NiFi, das ein Tool zur Flow-Speicherung und Versionskontrolle bereitstellt. Eine Art GIT. Informationen zum Installieren, Konfigurieren und Arbeiten mit der Registrierung finden Sie in . Der Flow zur Speicherung wird zu einer Prozessgruppe zusammengefasst und in dieser Form in der Registry gespeichert. Wir werden später im Artikel darauf zurückkommen.
Zu Beginn, wenn N eine kleine Zahl ist, wird der Fluss innerhalb einer akzeptablen Zeit manuell bereitgestellt und aktualisiert.
Aber je mehr N wächst, desto zahlreicher werden die Probleme:
- Es dauert länger, den Flow zu aktualisieren. Sie müssen sich bei allen Servern anmelden
- Es treten Fehler bei der Aktualisierung der Vorlage auf. Hier haben sie es aktualisiert, aber hier haben sie es vergessen
- menschliche Fehler bei der Durchführung einer großen Anzahl ähnlicher Vorgänge
All dies führt uns zu der Tatsache, dass wir den Prozess automatisieren müssen. Ich habe folgende Möglichkeiten ausprobiert, um dieses Problem zu lösen:
- Verwenden Sie MiNiFi anstelle von NiFi
- NiFi-CLI
- NiPyAPI
Verwenden von MiNiFi
- Teilprojekt von Apache NiFi. MiNiFy ist ein kompakter Agent, der dieselben Prozessoren wie NiFi verwendet und es Ihnen ermöglicht, dieselben Flows wie in NiFi zu erstellen. Die Leichtgewichtigkeit des Agenten wird unter anderem dadurch erreicht, dass MiNiFy über keine grafische Oberfläche zur Flow-Konfiguration verfügt. Das Fehlen einer grafischen Oberfläche in MiNiFy bedeutet, dass das Problem der Bereitstellung des Flusses an Minifi gelöst werden muss. Da MiNiFy aktiv im IOT eingesetzt wird, gibt es viele Komponenten und der Prozess der Bereitstellung des Flusses an die endgültigen Minifi-Instanzen muss automatisiert werden. Eine vertraute Aufgabe, oder?
Ein weiteres Teilprojekt wird zur Lösung dieses Problems beitragen – MiNiFi C2 Server. Dieses Produkt soll der zentrale Punkt in der Konfigurations-Rollout-Architektur sein. So konfigurieren Sie die Umgebung – beschrieben in Es gibt genügend Informationen über Habré, um das Problem zu lösen. MiNiFi aktualisiert in Verbindung mit dem C2-Server automatisch seine Konfiguration. Der einzige Nachteil dieses Ansatzes besteht darin, dass Sie Vorlagen auf dem C2-Server erstellen müssen; ein einfaches Commit in die Registrierung reicht nicht aus.
Die im obigen Artikel beschriebene Option funktioniert und ist nicht schwer umzusetzen, wir dürfen jedoch Folgendes nicht vergessen:
- Minifi verfügt nicht über alle Prozessoren von Nifi
- Minifi-Prozessorversionen hinken den NiFi-Prozessorversionen hinterher.
Zum Zeitpunkt des Schreibens ist die neueste Version von NiFi 1.9.2. Die neueste MiNiFi-Prozessorversion ist 1.7.0. Prozessoren können zu MiNiFi hinzugefügt werden, aber aufgrund von Versionsunterschieden zwischen NiFi und MiNiFi-Prozessoren funktioniert dies möglicherweise nicht.
NiFi-CLI
Gemessen an Tool auf der offiziellen Website. Hierbei handelt es sich um ein Tool zur Automatisierung der Interaktion zwischen NiFI und NiFi Registry im Bereich der Flussbereitstellung oder des Prozessmanagements. Um zu beginnen, müssen Sie dieses Tool herunterladen. .
Starten Sie das Dienstprogramm
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Damit wir den erforderlichen Flow aus der Registry laden können, müssen wir die Identifikatoren des Buckets (Bucket-Identifier) und den Flow selbst (Flow-Identifier) kennen. Diese Daten können entweder über die CLI oder über die Weboberfläche der NiFi-Registrierung abgerufen werden. Im Webinterface sieht es so aus:

Mit der CLI geschieht dies:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Wir beginnen mit dem Importieren der Prozessgruppe aus der Registrierung:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Ein wichtiger Punkt ist, dass jede Nifi-Instanz als Host angegeben werden kann, auf den wir die Prozessgruppe übertragen.
Prozessgruppe mit gestoppten Prozessoren hinzugefügt, diese müssen gestartet werden
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, die Prozessoren sind gestartet. Gemäß den Bedingungen der Aufgabe benötigen wir jedoch NiFi-Instanzen, um Daten an andere Instanzen zu senden. Nehmen wir an, Sie haben die Push-Methode gewählt, um Daten an den Server zu übertragen. Um die Datenübertragung zu organisieren, müssen Sie die Datenübertragung für die hinzugefügte Remote Process Group (RPG) aktivieren, die bereits in unserem Ablauf enthalten ist.

In der Dokumentation der CLI und anderen Quellen habe ich keine Möglichkeit gefunden, die Datenübertragung zu aktivieren. Wenn Sie wissen, wie das geht, schreiben Sie es bitte in die Kommentare.
Da wir Bash haben und bereit sind, bis zum Ende zu gehen, werden wir einen Ausweg finden! Sie können die NiFi-API verwenden, um dieses Problem zu lösen. Verwenden wir die folgende Methode und übernehmen die ID aus den obigen Beispielen (in unserem Fall ist sie 7f522a13-016e-1000-e504-d5b15587f2f3). Beschreibung der NiFi-API-Methoden .

Im Text müssen Sie JSON wie folgt übergeben:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parameter, die ausgefüllt werden müssen, damit es funktioniert:
Zustand — Datenübertragungsstatus. Verfügbar: TRANSMITTING zum Aktivieren der Datenübertragung, STOPPED zum Deaktivieren
Version - Prozessorversion
Die Version wird beim Erstellen standardmäßig auf 0 gesetzt, diese Parameter können jedoch mit der Methode abgerufen werden

Für Fans von Bash-Skripten scheint diese Methode geeignet zu sein, für mich ist sie jedoch etwas schwierig – Bash-Skripte sind nicht mein Favorit. Die nächste Methode ist meiner Meinung nach interessanter und bequemer.
NiPyAPI
NiPyAPI ist eine Python-Bibliothek zur Interaktion mit NiFi-Instanzen. enthält die notwendigen Informationen für die Arbeit mit der Bibliothek. Der Schnellstart ist in beschrieben auf Github.
Unser Skript zum Ausrollen der Konfiguration ist ein Programm in Python. Kommen wir zum Codieren.
Wir richten Konfigurationen für die weitere Arbeit ein. Wir benötigen die folgenden Parameter:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Als nächstes werde ich die Namen der Methoden dieser Bibliothek einfügen, die beschrieben werden .
Verbinden Sie die Registrierung über mit der Nifi-Instanz
nipyapi.versioning.create_registry_clientIn diesem Schritt können Sie auch eine Überprüfung hinzufügen, ob die Registrierung bereits zur Instanz hinzugefügt wurde; hierfür können Sie die Methode verwenden
nipyapi.versioning.list_registry_clientsWir finden den Eimer für die weitere Suche nach Flow im Korb
nipyapi.versioning.get_registry_bucketMit dem gefundenen Eimer suchen wir nach Fluss
nipyapi.versioning.get_flow_in_bucketAls nächstes ist es wichtig zu verstehen, ob diese Prozessgruppe bereits hinzugefügt wurde. Die Prozessgruppe wird entsprechend der Koordinaten platziert und es kann vorkommen, dass eine zweite Komponente über einer anderen liegt. Ich habe nachgesehen, dass das passieren kann :) Um alle hinzugefügten Prozessgruppen zu erhalten, verwenden wir die Methode
nipyapi.canvas.list_all_process_groupsWir können beispielsweise nach Namen weitersuchen.
Ich werde den Prozess der Aktualisierung der Vorlage nicht beschreiben, sondern nur sagen, dass es keine Probleme mit dem Vorhandensein von Nachrichten in den Warteschlangen gibt, wenn Prozessoren in der neuen Version der Vorlage hinzugefügt werden. Wenn jedoch Prozessoren entfernt werden, können Probleme auftreten (nifi erlaubt nicht, einen Prozessor zu entfernen, wenn sich davor eine Nachrichtenwarteschlange angesammelt hat). Wenn Sie daran interessiert sind, wie ich dieses Problem gelöst habe, schreiben Sie mir bitte und wir werden dieses Problem besprechen. Kontakte am Ende des Artikels. Fahren wir mit dem Schritt des Hinzufügens einer Prozessgruppe fort.
Beim Debuggen des Skripts bin ich auf die Besonderheit gestoßen, dass nicht immer die neueste Version von Flow aufgerufen wird, daher empfehle ich, diese Version zuerst zu überprüfen:
nipyapi.versioning.get_latest_flow_verProzessgruppe bereitstellen:
nipyapi.versioning.deploy_flow_versionWir starten die Prozessoren:
nipyapi.canvas.schedule_process_groupIm Block über CLI wurde geschrieben, dass die Datenübertragung in der Remote-Prozessgruppe nicht automatisch aktiviert wird? Bei der Implementierung des Skripts bin ich auch auf dieses Problem gestoßen. Zu diesem Zeitpunkt konnte ich die Datenübertragung über die API nicht starten und beschloss, an den Entwickler der NiPyAPI-Bibliothek zu schreiben und ihn um Rat/Hilfe zu bitten. Der Entwickler antwortete mir, wir besprachen das Problem und er schrieb, dass er Zeit brauche, um „etwas zu überprüfen“. Und dann, ein paar Tage später, kommt ein Brief, in dem eine Funktion in Python geschrieben ist, die mein Startproblem löst!!! Damals war die NiPyAPI-Version 0.13.3 und natürlich gab es nichts Vergleichbares. Doch in der erst kürzlich veröffentlichten Version 0.14.0 war diese Funktion bereits in der Bibliothek enthalten. Treffen,
nipyapi.canvas.set_remote_process_group_transmissionMithilfe der NiPyAPI-Bibliothek haben wir also die Registrierung verbunden, den Flow eingeführt und sogar mit den Prozessoren und der Datenübertragung begonnen. Dann können Sie den Code durchkämmen, alle Arten von Prüfungen und Protokollierungen hinzufügen und das ist alles. Aber das ist eine ganz andere Geschichte.
Von den Automatisierungsoptionen, die ich in Betracht gezogen habe, schien mir die letzte die effizienteste zu sein. Erstens handelt es sich immer noch um Python-Code, in den Sie Hilfsprogrammcode einbetten und alle Vorteile der Programmiersprache nutzen können. Zweitens entwickelt sich das NiPyAPI-Projekt aktiv weiter und bei Problemen können Sie sich an den Entwickler wenden. Drittens ist NiPyAPI immer noch ein flexibleres Werkzeug für die Interaktion mit NiFi bei der Lösung komplexer Probleme. Beispielsweise bei der Feststellung, ob die Nachrichtenwarteschlangen im Fluss jetzt leer sind und ob die Prozessgruppe aktualisiert werden kann.
Das ist alles. Ich habe drei Ansätze zur Automatisierung der Flow-Bereitstellung in NiFi beschrieben, Fallstricke beschrieben, auf die ein Entwickler stoßen kann, und Arbeitscode für die Automatisierung der Bereitstellung bereitgestellt. Wenn Sie sich für dieses Thema genauso interessieren wie ich –
Source: habr.com
