Automatisering av flödesleverans i Apache NiFi

Hej alla!

Automatisering av flödesleverans i Apache NiFi

Uppgiften är som följer - det finns ett flöde, presenterat på bilden ovan, som behöver rullas ut till N servrar med Apache NiFi. Flödestest - en fil genereras och skickas till en annan NiFi-instans. Dataöverföring sker med hjälp av NiFi Site to Site-protokollet.

NiFi Site to Site (S2S) är ett säkert, enkelt konfigurerbart sätt att överföra data mellan NiFi-instanser. Hur S2S fungerar, se dokumentation och det är viktigt att inte glömma att konfigurera NiFi-instansen för att tillåta S2S, se här.

I de fall vi talar om dataöverföring med S2S, kallas en instans klient, den andra servern. Klienten skickar data, servern tar emot. Två sätt att konfigurera dataöverföring mellan dem:

  1. Tryck. Från klientinstansen skickas data med hjälp av en Remote Process Group (RPG). På serverinstansen tas data emot med hjälp av ingångsporten
  2. Dra. Servern tar emot data med hjälp av RPG, klienten skickar med utgångsport.


Flödet för utrullning lagras i Apache Registry.

Apache NiFi Registry är ett delprojekt av Apache NiFi som tillhandahåller ett verktyg för flödeslagring och versionskontroll. En sorts GIT. Information om att installera, konfigurera och arbeta med registret finns i officiell dokumentation. Flöde för lagring kombineras till en processgrupp och lagras i denna form i registret. Vi återkommer till detta senare i artikeln.

I början, när N är ett litet tal, levereras flödet och uppdateras manuellt inom en acceptabel tid.

Men när N växer blir problemen fler:

  1. det tar längre tid att uppdatera flödet. Du måste logga in på alla servrar
  2. Malluppdateringsfel uppstår. Här uppdaterade de det, men här glömde de
  3. mänskliga fel när man utför ett stort antal liknande operationer

Allt detta leder oss till det faktum att vi måste automatisera processen. Jag försökte lösa det här problemet på följande sätt:

  1. Använd MiNiFi istället för NiFi
  2. NiFi CLI
  3. NiPyAPI

Använder MiNiFi

Apache MiniFy - delprojekt av Apache NiFi. MiNiFy är en kompakt agent som använder samma processorer som NiFi, vilket gör att du kan skapa samma flöden som i NiFi. Agentens lätta karaktär uppnås bland annat genom att MiNiFy inte har något grafiskt gränssnitt för flödeskonfiguration. Avsaknaden av ett grafiskt gränssnitt i MiNiFy gör att det är nödvändigt att lösa problemet med att leverera flöde till minifi. Eftersom MiNiFy används aktivt i IOT finns det många komponenter och processen för att leverera flöde till de slutliga minifi-instanserna måste automatiseras. En välbekant uppgift, eller hur?

Ett annat delprojekt kommer att hjälpa till att lösa detta problem - MiNiFi C2 Server. Den här produkten är avsedd att vara den centrala punkten i arkitekturen för utbyggnad av konfigurationer. Hur man konfigurerar miljön - beskrivs i den här artikeln Det finns tillräckligt med information om Habré för att lösa problemet. MiNiFi, tillsammans med C2-servern, uppdaterar automatiskt sin konfiguration. Den enda nackdelen med detta tillvägagångssätt är att du måste skapa mallar på C2 Server, en enkel commit till registret räcker inte.

Alternativet som beskrivs i artikeln ovan fungerar och inte svårt att implementera, men vi får inte glömma följande:

  1. Minifi har inte alla processorer från nifi
  2. Minifi-processorversioner släpar efter NiFi-processorversioner.

I skrivande stund är den senaste versionen av NiFi 1.9.2. Den senaste versionen av MiNiFi-processorn är 1.7.0. Processorer kan läggas till i MiNiFi, men på grund av versionsskillnader mellan NiFi- och MiNiFi-processorer kanske detta inte fungerar.

NiFi CLI

Att döma efter beskrivning verktyg på den officiella webbplatsen, detta är ett verktyg för att automatisera interaktionen mellan NiFI och NiFi Registry inom området flödesleverans eller processhantering. För att komma igång måste du ladda ner det här verktyget. hence.

Starta verktyget

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

För att vi ska kunna ladda det erforderliga flödet från registret måste vi känna till identifierarna för hinken (hinkidentifierare) och själva flödet (flödesidentifierare). Dessa data kan erhållas antingen via cli eller i NiFi-registrets webbgränssnitt. I webbgränssnittet ser det ut så här:

Automatisering av flödesleverans i Apache NiFi

Med hjälp av CLI görs detta:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Vi börjar importera processgrupp från registret:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

En viktig punkt är att vilken nifi-instans som helst kan anges som den värd som vi rullar processgruppen till.

Processgrupp läggs till med stoppade processorer, de måste startas

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Bra, processorerna har startat. Men enligt villkoren för uppgiften behöver vi NiFi-instanser för att skicka data till andra instanser. Låt oss anta att du har valt Push-metoden för att överföra data till servern. För att organisera dataöverföring måste du aktivera dataöverföring på den tillagda Remote Process Group (RPG), som redan ingår i vårt flöde.

Automatisering av flödesleverans i Apache NiFi

I dokumentationen i CLI och andra källor hittade jag inget sätt att möjliggöra dataöverföring. Om du vet hur man gör detta, skriv gärna i kommentarerna.

Eftersom vi har bash och vi är redo att gå till slutet, kommer vi att hitta en väg ut! Du kan använda NiFi API för att lösa detta problem. Låt oss använda följande metod, ta ID:t från exemplen ovan (i vårt fall är det 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrivning av NiFi API-metoder här.

Automatisering av flödesleverans i Apache NiFi
I kroppen måste du passera JSON, så här:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametrar som måste fyllas i för att det ska fungera:
tillstånd — Status för dataöverföring. Tillgänglig: TRANSMITTING för att möjliggöra dataöverföring, STOPPAD för att inaktivera
version - processorversion

version kommer som standard till 0 när den skapas, men dessa parametrar kan erhållas med metoden

Automatisering av flödesleverans i Apache NiFi

För fans av bash-skript kan den här metoden tyckas vara lämplig, men det är lite svårt för mig - bash-skript är inte min favorit. Nästa metod är mer intressant och bekväm enligt min mening.

NiPyAPI

NiPyAPI är ett Python-bibliotek för att interagera med NiFi-instanser. Dokumentationssida innehåller nödvändig information för att arbeta med biblioteket. Snabbstart beskrivs i projektet på github.

Vårt skript för att rulla ut konfigurationen är ett program i Python. Låt oss gå vidare till kodning.
Vi ställer in konfigurationer för vidare arbete. Vi kommer att behöva följande parametrar:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Därefter kommer jag att infoga namnen på metoderna i detta bibliotek, som beskrivs här.

Anslut registret till nifi-instansen med hjälp av

nipyapi.versioning.create_registry_client

I det här steget kan du också lägga till en kontroll att registret redan har lagts till i instansen; för detta kan du använda metoden

nipyapi.versioning.list_registry_clients

Vi hittar hinken för vidare sökning efter flöde i korgen

nipyapi.versioning.get_registry_bucket

Med hjälp av den hittade skopan letar vi efter flöde

nipyapi.versioning.get_flow_in_bucket

Därefter är det viktigt att förstå om denna processgrupp redan har lagts till. Processgruppen är placerad enligt koordinater och en situation kan uppstå när en andra komponent läggs ovanpå en. Jag kollade, detta kan hända :) För att få alla tillagda processgrupper använder vi metoden

nipyapi.canvas.list_all_process_groups

Vi kan vidare söka, till exempel på namn.

Jag kommer inte att beskriva processen för att uppdatera mallen, jag kommer bara att säga att om processorer läggs till i den nya versionen av mallen, så finns det inga problem med närvaron av meddelanden i köerna. Men om processorer tas bort kan problem uppstå (nifi tillåter inte att du tar bort en processor om en meddelandekö har samlats framför den). Om du är intresserad av hur jag löste det här problemet, skriv till mig så diskuterar vi det här problemet. Kontakter i slutet av artikeln. Låt oss gå vidare till steget att lägga till en processgrupp.

När jag felsökte skriptet stötte jag på en egenhet att den senaste versionen av flow inte alltid dras upp, så jag rekommenderar att du kontrollerar den här versionen först:

nipyapi.versioning.get_latest_flow_ver

Distribuera processgrupp:

nipyapi.versioning.deploy_flow_version

Vi startar processorerna:

nipyapi.canvas.schedule_process_group

I blocket om CLI skrevs det att dataöverföring inte automatiskt aktiveras i fjärrprocessgruppen? När jag implementerade skriptet stötte jag också på detta problem. Vid den tiden kunde jag inte starta dataöverföring med API:t och jag bestämde mig för att skriva till utvecklaren av NiPyAPI-biblioteket och be om råd/hjälp. Utvecklaren svarade mig, vi diskuterade problemet och han skrev att han behövde tid för att "kolla något". Och så, ett par dagar senare, kommer ett brev där det skrivs en funktion i Python som löser mitt startproblem!!! Vid den tiden var NiPyAPI-versionen 0.13.3 och, naturligtvis, fanns det inget sådant. Men i version 0.14.0, som släpptes ganska nyligen, fanns den här funktionen redan i biblioteket. Träffa,

nipyapi.canvas.set_remote_process_group_transmission

Så med hjälp av NiPyAPI-biblioteket kopplade vi registret, rullade ut flödet och startade till och med processorer och dataöverföring. Sedan kan du kamma koden, lägga till alla typer av kontroller, loggning och det är allt. Men det är en helt annan historia.

Av de automatiseringsalternativ jag övervägde verkade det sista som var det mest effektiva. För det första är detta fortfarande python-kod, i vilken du kan bädda in extra programkod och dra nytta av alla fördelar med programmeringsspråket. För det andra utvecklas NiPyAPI-projektet aktivt och vid problem kan du skriva till utvecklaren. För det tredje är NiPyAPI fortfarande ett mer flexibelt verktyg för att interagera med NiFi för att lösa komplexa problem. Till exempel för att avgöra om meddelandeköerna nu är tomma i flödet och om processgruppen kan uppdateras.

Det är allt. Jag beskrev 3 tillvägagångssätt för att automatisera flödesleverans i NiFi, fallgropar som en utvecklare kan stöta på, och angav arbetskod för att automatisera leverans. Om du är lika intresserad av detta ämne som jag är - skriva!

Källa: will.com

Lägg en kommentar