Hej alla!
Uppgiften är som följer - det finns ett flöde, presenterat på bilden ovan, som behöver rullas ut till N servrar med
NiFi Site to Site (S2S) är ett säkert, enkelt konfigurerbart sätt att överföra data mellan NiFi-instanser. Hur S2S fungerar, se
I de fall vi talar om dataöverföring med S2S, kallas en instans klient, den andra servern. Klienten skickar data, servern tar emot. Två sätt att konfigurera dataöverföring mellan dem:
- Tryck. Från klientinstansen skickas data med hjälp av en Remote Process Group (RPG). På serverinstansen tas data emot med hjälp av ingångsporten
- Dra. Servern tar emot data med hjälp av RPG, klienten skickar med utgångsport.
Flödet för utrullning lagras i Apache Registry.
Apache NiFi Registry är ett delprojekt av Apache NiFi som tillhandahåller ett verktyg för flödeslagring och versionskontroll. En sorts GIT. Information om att installera, konfigurera och arbeta med registret finns i
I början, när N är ett litet tal, levereras flödet och uppdateras manuellt inom en acceptabel tid.
Men när N växer blir problemen fler:
- det tar längre tid att uppdatera flödet. Du måste logga in på alla servrar
- Malluppdateringsfel uppstår. Här uppdaterade de det, men här glömde de
- mänskliga fel när man utför ett stort antal liknande operationer
Allt detta leder oss till det faktum att vi måste automatisera processen. Jag försökte lösa det här problemet på följande sätt:
- Använd MiNiFi istället för NiFi
- NiFi CLI
- NiPyAPI
Använder MiNiFi
Ett annat delprojekt kommer att hjälpa till att lösa detta problem - MiNiFi C2 Server. Den här produkten är avsedd att vara den centrala punkten i arkitekturen för utbyggnad av konfigurationer. Hur man konfigurerar miljön - beskrivs i
Alternativet som beskrivs i artikeln ovan fungerar och inte svårt att implementera, men vi får inte glömma följande:
- Minifi har inte alla processorer från nifi
- Minifi-processorversioner släpar efter NiFi-processorversioner.
I skrivande stund är den senaste versionen av NiFi 1.9.2. Den senaste versionen av MiNiFi-processorn är 1.7.0. Processorer kan läggas till i MiNiFi, men på grund av versionsskillnader mellan NiFi- och MiNiFi-processorer kanske detta inte fungerar.
NiFi CLI
Att döma efter
Starta verktyget
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
För att vi ska kunna ladda det erforderliga flödet från registret måste vi känna till identifierarna för hinken (hinkidentifierare) och själva flödet (flödesidentifierare). Dessa data kan erhållas antingen via cli eller i NiFi-registrets webbgränssnitt. I webbgränssnittet ser det ut så här:
Med hjälp av CLI görs detta:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Vi börjar importera processgrupp från registret:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
En viktig punkt är att vilken nifi-instans som helst kan anges som den värd som vi rullar processgruppen till.
Processgrupp läggs till med stoppade processorer, de måste startas
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Bra, processorerna har startat. Men enligt villkoren för uppgiften behöver vi NiFi-instanser för att skicka data till andra instanser. Låt oss anta att du har valt Push-metoden för att överföra data till servern. För att organisera dataöverföring måste du aktivera dataöverföring på den tillagda Remote Process Group (RPG), som redan ingår i vårt flöde.
I dokumentationen i CLI och andra källor hittade jag inget sätt att möjliggöra dataöverföring. Om du vet hur man gör detta, skriv gärna i kommentarerna.
Eftersom vi har bash och vi är redo att gå till slutet, kommer vi att hitta en väg ut! Du kan använda NiFi API för att lösa detta problem. Låt oss använda följande metod, ta ID:t från exemplen ovan (i vårt fall är det 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrivning av NiFi API-metoder
I kroppen måste du passera JSON, så här:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parametrar som måste fyllas i för att det ska fungera:
tillstånd — Status för dataöverföring. Tillgänglig: TRANSMITTING för att möjliggöra dataöverföring, STOPPAD för att inaktivera
version - processorversion
version kommer som standard till 0 när den skapas, men dessa parametrar kan erhållas med metoden
För fans av bash-skript kan den här metoden tyckas vara lämplig, men det är lite svårt för mig - bash-skript är inte min favorit. Nästa metod är mer intressant och bekväm enligt min mening.
NiPyAPI
NiPyAPI är ett Python-bibliotek för att interagera med NiFi-instanser.
Vårt skript för att rulla ut konfigurationen är ett program i Python. Låt oss gå vidare till kodning.
Vi ställer in konfigurationer för vidare arbete. Vi kommer att behöva följande parametrar:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Därefter kommer jag att infoga namnen på metoderna i detta bibliotek, som beskrivs
Anslut registret till nifi-instansen med hjälp av
nipyapi.versioning.create_registry_client
I det här steget kan du också lägga till en kontroll att registret redan har lagts till i instansen; för detta kan du använda metoden
nipyapi.versioning.list_registry_clients
Vi hittar hinken för vidare sökning efter flöde i korgen
nipyapi.versioning.get_registry_bucket
Med hjälp av den hittade skopan letar vi efter flöde
nipyapi.versioning.get_flow_in_bucket
Därefter är det viktigt att förstå om denna processgrupp redan har lagts till. Processgruppen är placerad enligt koordinater och en situation kan uppstå när en andra komponent läggs ovanpå en. Jag kollade, detta kan hända :) För att få alla tillagda processgrupper använder vi metoden
nipyapi.canvas.list_all_process_groups
Vi kan vidare söka, till exempel på namn.
Jag kommer inte att beskriva processen för att uppdatera mallen, jag kommer bara att säga att om processorer läggs till i den nya versionen av mallen, så finns det inga problem med närvaron av meddelanden i köerna. Men om processorer tas bort kan problem uppstå (nifi tillåter inte att du tar bort en processor om en meddelandekö har samlats framför den). Om du är intresserad av hur jag löste det här problemet, skriv till mig så diskuterar vi det här problemet. Kontakter i slutet av artikeln. Låt oss gå vidare till steget att lägga till en processgrupp.
När jag felsökte skriptet stötte jag på en egenhet att den senaste versionen av flow inte alltid dras upp, så jag rekommenderar att du kontrollerar den här versionen först:
nipyapi.versioning.get_latest_flow_ver
Distribuera processgrupp:
nipyapi.versioning.deploy_flow_version
Vi startar processorerna:
nipyapi.canvas.schedule_process_group
I blocket om CLI skrevs det att dataöverföring inte automatiskt aktiveras i fjärrprocessgruppen? När jag implementerade skriptet stötte jag också på detta problem. Vid den tiden kunde jag inte starta dataöverföring med API:t och jag bestämde mig för att skriva till utvecklaren av NiPyAPI-biblioteket och be om råd/hjälp. Utvecklaren svarade mig, vi diskuterade problemet och han skrev att han behövde tid för att "kolla något". Och så, ett par dagar senare, kommer ett brev där det skrivs en funktion i Python som löser mitt startproblem!!! Vid den tiden var NiPyAPI-versionen 0.13.3 och, naturligtvis, fanns det inget sådant. Men i version 0.14.0, som släpptes ganska nyligen, fanns den här funktionen redan i biblioteket. Träffa,
nipyapi.canvas.set_remote_process_group_transmission
Så med hjälp av NiPyAPI-biblioteket kopplade vi registret, rullade ut flödet och startade till och med processorer och dataöverföring. Sedan kan du kamma koden, lägga till alla typer av kontroller, loggning och det är allt. Men det är en helt annan historia.
Av de automatiseringsalternativ jag övervägde verkade det sista som var det mest effektiva. För det första är detta fortfarande python-kod, i vilken du kan bädda in extra programkod och dra nytta av alla fördelar med programmeringsspråket. För det andra utvecklas NiPyAPI-projektet aktivt och vid problem kan du skriva till utvecklaren. För det tredje är NiPyAPI fortfarande ett mer flexibelt verktyg för att interagera med NiFi för att lösa komplexa problem. Till exempel för att avgöra om meddelandeköerna nu är tomma i flödet och om processgruppen kan uppdateras.
Det är allt. Jag beskrev 3 tillvägagångssätt för att automatisera flödesleverans i NiFi, fallgropar som en utvecklare kan stöta på, och angav arbetskod för att automatisera leverans. Om du är lika intresserad av detta ämne som jag är -
Källa: will.com