Automatisering van flowlevering in Apache NiFi

Hallo iedereen!

Automatisering van flowlevering in Apache NiFi

De taak is als volgt: er is een stroom, weergegeven in de afbeelding hierboven, die naar N-servers moet worden uitgerold Apache NiFi. Flowtest - er wordt een bestand gegenereerd en naar een andere NiFi-instantie verzonden. Gegevensoverdracht vindt plaats met behulp van het NiFi Site to Site-protocol.

NiFi Site to Site (S2S) is een veilige, eenvoudig configureerbare manier om gegevens tussen NiFi-instanties over te dragen. Hoe S2S werkt, zie documentatie en het is belangrijk om niet te vergeten de NiFi-instantie te configureren om S2S toe te staan, zie hier.

In gevallen waarin we het hebben over gegevensoverdracht met behulp van S2S, wordt één instantie client genoemd, de tweede server. De client verzendt gegevens, de server ontvangt. Twee manieren om de gegevensoverdracht daartussen te configureren:

  1. Duwen. Vanaf de clientinstantie worden gegevens verzonden met behulp van een Remote Process Group (RPG). Op de serverinstantie worden gegevens ontvangen via de invoerpoort
  2. Trekken. De server ontvangt gegevens via RPG, de client verzendt via de uitvoerpoort.


De stroom voor het uitrollen wordt opgeslagen in Apache Registry.

Apache NiFi Registry is een subproject van Apache NiFi dat een tool biedt voor flowopslag en versiebeheer. Een soort GIT. Informatie over het installeren, configureren en werken met register vindt u in officiële documentatie. De stroom voor opslag wordt samengevoegd tot een procesgroep en in deze vorm opgeslagen in het register. We komen hier later in het artikel op terug.

In het begin, wanneer N een klein getal is, wordt de stroom binnen een acceptabele tijd handmatig geleverd en bijgewerkt.

Maar naarmate N groeit, worden de problemen talrijker:

  1. het kost meer tijd om de stroom bij te werken. U moet op alle servers inloggen
  2. Er treden fouten op bij het bijwerken van sjablonen. Hier hebben ze het bijgewerkt, maar hier zijn ze het vergeten
  3. menselijke fouten bij het uitvoeren van een groot aantal soortgelijke handelingen

Dit alles brengt ons tot het feit dat we het proces moeten automatiseren. Ik heb de volgende manieren geprobeerd om dit probleem op te lossen:

  1. Gebruik MiNiFi in plaats van NiFi
  2. NiFi CLI
  3. NiPyAPI

Het gebruik van MiNiFi

Apache MiNiFy - deelproject van Apache NiFi. MiNiFy is een compacte agent die dezelfde processors gebruikt als NiFi, waardoor je dezelfde stromen kunt creëren als in NiFi. Het lichtgewicht karakter van de agent wordt onder meer bereikt door het feit dat MiNiFy geen grafische interface heeft voor stroomconfiguratie. Het ontbreken van een grafische interface voor MiNiFy betekent dat het nodig is om het probleem van het leveren van flow aan minifi op te lossen. Omdat MiNiFy actief wordt gebruikt in IoT, zijn er veel componenten en moet het proces van het leveren van stroom naar de uiteindelijke minifi-instanties worden geautomatiseerd. Een bekende taak, toch?

Een ander deelproject zal dit probleem helpen oplossen: MiNiFi C2 Server. Dit product is bedoeld als centraal punt in de configuratie-uitrolarchitectuur. Hoe de omgeving te configureren - beschreven in dit artikel Er is voldoende informatie over Habré om het probleem op te lossen. MiNiFi werkt, in combinatie met de C2-server, de configuratie automatisch bij. Het enige nadeel van deze aanpak is dat je sjablonen moet maken op C2 Server; een simpele commit naar het register is niet genoeg.

De in het bovenstaande artikel beschreven optie werkt en is niet moeilijk te implementeren, maar we mogen het volgende niet vergeten:

  1. Minifi beschikt niet over alle processors van nifi
  2. Minifi-processorversies blijven achter bij NiFi-processorversies.

Op het moment van schrijven is de nieuwste versie van NiFi 1.9.2. De nieuwste MiNiFi-processorversie is 1.7.0. Processoren kunnen aan MiNiFi worden toegevoegd, maar vanwege versieverschillen tussen NiFi en MiNiFi-processors werkt dit mogelijk niet.

NiFi CLI

Oordelend door Omschrijving tool op de officiële website, dit is een tool voor het automatiseren van de interactie tussen NiFI en NiFi Registry op het gebied van flow delivery of procesmanagement. Om aan de slag te gaan, moet u deze tool downloaden. vandaar.

Start het hulpprogramma

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Om de vereiste stroom uit het register te kunnen laden, moeten we de identificatiegegevens van de bucket (bucket-ID) en de stroom zelf (flow-ID) kennen. Deze gegevens kunnen worden verkregen via de cli of in de NiFi-registerwebinterface. In de webinterface ziet het er als volgt uit:

Automatisering van flowlevering in Apache NiFi

Met behulp van de CLI wordt dit gedaan:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

We beginnen met het importeren van procesgroepen uit het register:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Een belangrijk punt is dat elk nifi-exemplaar kan worden opgegeven als de host waarnaar we de procesgroep rollen.

Procesgroep toegevoegd met gestopte processors, deze moeten worden gestart

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Geweldig, de processors zijn gestart. Volgens de voorwaarden van de taak hebben we echter NiFi-instanties nodig om gegevens naar andere instanties te verzenden. Laten we aannemen dat u de Push-methode hebt gekozen om gegevens naar de server over te dragen. Om gegevensoverdracht te organiseren, moet u gegevensoverdracht inschakelen op de toegevoegde Remote Process Group (RPG), die al in onze stroom is opgenomen.

Automatisering van flowlevering in Apache NiFi

In de documentatie in de CLI en andere bronnen heb ik geen manier gevonden om gegevensoverdracht mogelijk te maken. Als je weet hoe je dit moet doen, schrijf dan in de reacties.

Omdat we bash hebben en klaar zijn om tot het einde te gaan, zullen we een uitweg vinden! U kunt de NiFi API gebruiken om dit probleem op te lossen. Laten we de volgende methode gebruiken, waarbij we de ID uit de bovenstaande voorbeelden nemen (in ons geval is dit 7f522a13-016e-1000-e504-d5b15587f2f3). Beschrijving van NiFi API-methoden hier.

Automatisering van flowlevering in Apache NiFi
In de body moet je JSON als volgt doorgeven:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parameters die moeten worden ingevuld om te laten werken:
staat — status van gegevensoverdracht. Beschikbaar: VERZENDEN om gegevensoverdracht in te schakelen, GESTOPT om uit te schakelen
versie - processorversie

versie wordt standaard ingesteld op 0 wanneer deze wordt gemaakt, maar deze parameters kunnen worden verkregen met behulp van de methode

Automatisering van flowlevering in Apache NiFi

Voor fans van bash-scripts lijkt deze methode misschien geschikt, maar voor mij is het een beetje moeilijk - bash-scripts zijn niet mijn favoriet. De volgende methode is naar mijn mening interessanter en handiger.

NiPyAPI

NiPyAPI is een Python-bibliotheek voor interactie met NiFi-instanties. Documentatie pagina bevat de nodige informatie voor het werken met de bibliotheek. Snelle start wordt beschreven in project op github.

Ons script voor het uitrollen van de configuratie is een programma in Python. Laten we verder gaan met coderen.
We hebben configuraties opgezet voor verder werk. We hebben de volgende parameters nodig:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Vervolgens zal ik de namen invoegen van de methoden van deze bibliotheek, die worden beschreven hier.

Verbind het register met de nifi-instantie met behulp van

nipyapi.versioning.create_registry_client

Bij deze stap kunt u ook een controle toevoegen dat het register al aan de instance is toegevoegd; hiervoor kunt u de methode gebruiken

nipyapi.versioning.list_registry_clients

De emmer voor verder zoeken naar stroming vinden we in de mand

nipyapi.versioning.get_registry_bucket

Met behulp van de gevonden emmer zoeken we naar flow

nipyapi.versioning.get_flow_in_bucket

Vervolgens is het belangrijk om te begrijpen of deze procesgroep al is toegevoegd. De Procesgroep wordt op basis van coördinaten geplaatst en er kan een situatie ontstaan ​​wanneer een tweede component er bovenop wordt gelegd. Ik heb het gecontroleerd, dit kan gebeuren :) Om alle toegevoegde procesgroepen te krijgen, gebruiken we de methode

nipyapi.canvas.list_all_process_groups

We kunnen verder zoeken, bijvoorbeeld op naam.

Ik zal het proces van het bijwerken van de sjabloon niet beschrijven, ik zal alleen zeggen dat als processors worden toegevoegd aan de nieuwe versie van de sjabloon, er geen problemen zijn met de aanwezigheid van berichten in de wachtrijen. Maar als processors worden verwijderd, kunnen er problemen optreden (nifi staat u niet toe een processor te verwijderen als er zich een berichtenwachtrij voor heeft verzameld). Als u geïnteresseerd bent in hoe ik dit probleem heb opgelost, schrijf mij dan, dan zullen we dit punt bespreken. Contacten aan het einde van het artikel. Laten we verder gaan met de stap van het toevoegen van een procesgroep.

Bij het debuggen van het script kwam ik een eigenaardigheid tegen dat de nieuwste versie van flow niet altijd wordt opgehaald, dus ik raad aan om eerst deze versie te controleren:

nipyapi.versioning.get_latest_flow_ver

Procesgroep implementeren:

nipyapi.versioning.deploy_flow_version

We starten de verwerkers:

nipyapi.canvas.schedule_process_group

In het blok over CLI stond dat gegevensoverdracht niet automatisch wordt ingeschakeld in de externe procesgroep? Bij het implementeren van het script kwam ik dit probleem ook tegen. Op dat moment kon ik de gegevensoverdracht niet starten met behulp van de API en besloot ik naar de ontwikkelaar van de NiPyAPI-bibliotheek te schrijven en om advies/hulp te vragen. De ontwikkelaar reageerde op mij, we bespraken het probleem en hij schreef dat hij tijd nodig had om “iets te controleren”. En dan, een paar dagen later, komt er een brief waarin een functie in Python is geschreven die mijn opstartprobleem oplost!!! Destijds was de NiPyAPI-versie 0.13.3 en zoiets bestond natuurlijk niet. Maar in versie 0.14.0, die vrij recentelijk werd uitgebracht, was deze functie al in de bibliotheek opgenomen. Ontmoeten,

nipyapi.canvas.set_remote_process_group_transmission

Dus met behulp van de NiPyAPI-bibliotheek hebben we het register aangesloten, flow uitgerold en zelfs processors en gegevensoverdracht gestart. Vervolgens kun je de code kammen, allerlei controles toevoegen, loggen, en dat is alles. Maar dat is een heel ander verhaal.

Van de automatiseringsopties die ik heb overwogen, leek de laatste mij het meest efficiënt. Ten eerste is dit nog steeds Python-code, waarin u hulpprogrammacode kunt insluiten en kunt profiteren van alle voordelen van de programmeertaal. Ten tweede is het NiPyAPI-project actief in ontwikkeling en in geval van problemen kun je naar de ontwikkelaar schrijven. Ten derde is NiPyAPI nog steeds een flexibeler hulpmiddel voor interactie met NiFi bij het oplossen van complexe problemen. Bijvoorbeeld bij het bepalen of de berichtenwachtrijen nu leeg zijn in de stroom en of de procesgroep bijgewerkt kan worden.

Dat is alles. Ik beschreef drie benaderingen voor het automatiseren van de levering van stromen in NiFi, de valkuilen die een ontwikkelaar kan tegenkomen, en leverde werkende code voor het automatiseren van de levering. Als je net zo geïnteresseerd bent in dit onderwerp als ik - ишите!

Bron: www.habr.com

Voeg een reactie