Automatisering af flowlevering i Apache NiFi

Hej alle!

Automatisering af flowlevering i Apache NiFi

Opgaven er som følger - der er et flow, vist på billedet ovenfor, som skal rulles ud til N servere med Apache NiFi. Flowtest - en fil bliver genereret og sendt til en anden NiFi-instans. Dataoverførsel sker ved hjælp af NiFi Site to Site-protokollen.

NiFi Site to Site (S2S) er en sikker, let konfigurerbar måde at overføre data mellem NiFi-instanser. Hvordan S2S virker, se dokumentation og det er vigtigt ikke at glemme at konfigurere NiFi-instansen til at tillade S2S, se her.

I tilfælde, hvor vi taler om dataoverførsel ved hjælp af S2S, kaldes den ene instans klient, den anden server. Klienten sender data, serveren modtager. To måder at konfigurere dataoverførsel mellem dem:

  1. Skub ud. Fra klientinstansen sendes data ved hjælp af en Remote Process Group (RPG). På serverforekomsten modtages data ved hjælp af inputporten
  2. Træk. Serveren modtager data ved hjælp af RPG, klienten sender ved hjælp af outputport.


Flow til udrulning er gemt i Apache Registry.

Apache NiFi Registry er et underprojekt af Apache NiFi, der giver et værktøj til flowlagring og versionskontrol. En slags GIT. Information om installation, konfiguration og arbejde med registreringsdatabasen kan findes i officiel dokumentation. Flow til opbevaring kombineres i en procesgruppe og gemmes i denne form i registreringsdatabasen. Det vender vi tilbage til senere i artiklen.

Ved starten, når N er et lille tal, leveres flowet og opdateres manuelt på en acceptabel tid.

Men efterhånden som N vokser, bliver problemerne flere:

  1. det tager længere tid at opdatere flowet. Du skal logge ind på alle servere
  2. Skabelonopdateringsfejl opstår. Her har de opdateret det, men her har de glemt det
  3. menneskelige fejl ved udførelse af et stort antal lignende operationer

Alt dette bringer os til det faktum, at vi er nødt til at automatisere processen. Jeg prøvede følgende måder at løse dette problem på:

  1. Brug MiNiFi i stedet for NiFi
  2. NiFi CLI
  3. NiPyAPI

Bruger MiNiFi

Apache MiNiFy - delprojekt af Apache NiFi. MiNiFy er en kompakt agent, der bruger de samme processorer som NiFi, så du kan skabe de samme flows som i NiFi. Agentens lette natur opnås blandt andet ved, at MiNiFy ikke har en grafisk grænseflade til flowkonfiguration. Manglen på en grafisk grænseflade til MiNiFy betyder, at det er nødvendigt at løse problemet med at levere flow til minifi. Da MiNiFy bruges aktivt i IOT, er der mange komponenter, og processen med at levere flow til de sidste minifi-instanser skal automatiseres. En velkendt opgave, ikke?

Решить такую задачу поможет еще один подпроект — MiNiFi C2 Server. Этот продукт предназначен для того, чтобы быть центральной точкой в архитектуре раскатки конфигураций. Как сконфигурировать окружение — описано в denne artikel на Хабре и информации достаточно для решения поставленной задачи. MiNiFi в связке с C2 server автоматическом режиме обновляет конфигурацию у себя. Единственный недостаток такого подхода — приходится создавать шаблоны на C2 Server, простого коммита в registry не достаточно.

Muligheden beskrevet i artiklen ovenfor fungerer og ikke vanskelig at implementere, men vi må ikke glemme følgende:

  1. Minifi har ikke alle processorer fra nifi
  2. Minifi-processorversioner halter bagefter NiFi-processorversioner.

I skrivende stund er den seneste version af NiFi 1.9.2. Den seneste version af MiNiFi-processor er 1.7.0. Processorer kan tilføjes til MiNiFi, men på grund af versionsforskelle mellem NiFi- og MiNiFi-processorer virker dette muligvis ikke.

NiFi CLI

At dømme efter beskrivelse værktøj på den officielle hjemmeside, dette er et værktøj til at automatisere interaktionen mellem NiFI og NiFi Registry inden for flowlevering eller processtyring. For at komme i gang skal du downloade dette værktøj. dermed.

Start hjælpeprogrammet

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

For at vi kan indlæse det nødvendige flow fra registreringsdatabasen, skal vi kende identifikatorerne for spanden (bucket identifier) ​​og selve flowet (flow identifier). Disse data kan fås enten gennem cli eller i NiFi-registrets webgrænseflade. I webgrænsefladen ser det sådan ud:

Automatisering af flowlevering i Apache NiFi

Ved hjælp af CLI gøres dette:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Vi begynder at importere procesgruppe fra registreringsdatabasen:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

En vigtig pointe er, at enhver nifi-instans kan angives som den vært, som vi ruller procesgruppen til.

Procesgruppe tilføjet med stoppede processorer, de skal startes

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Отлично, процессоры стартанули. Однако, нам по условиям задачи надо, чтобы инстансы NiFi отправляли данные на другие инстансы. Предположим, что для передачи данных на сервер выбрали способ Push. Для того, чтобы организовать передачу данных, надо на добавленном Remote Process Group (RPG), который уже включен в наш flow включить передачу данных (Enable transmitting).

Automatisering af flowlevering i Apache NiFi

I dokumentationen i CLI og andre kilder fandt jeg ikke en måde at muliggøre dataoverførsel. Hvis du ved, hvordan du gør dette, så skriv venligst i kommentarerne.

Da vi har bash, og vi er klar til at gå til slutningen, vil vi finde en vej ud! Du kan bruge NiFi API til at løse dette problem. Lad os bruge følgende metode, tag ID'et fra eksemplerne ovenfor (i vores tilfælde er det 7f522a13-016e-1000-e504-d5b15587f2f3). Beskrivelse af NiFi API metoder her.

Automatisering af flowlevering i Apache NiFi
I kroppen skal du videregive JSON, sådan her:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parametre, der skal udfyldes for at det virker:
tilstand — dataoverførselsstatus. Tilgængelig: TRANSMITTING for at aktivere dataoverførsel, STOPPT for at deaktivere
udgave - processor version

version vil som standard være 0, når den oprettes, men disse parametre kan opnås ved hjælp af metoden

Automatisering af flowlevering i Apache NiFi

For fans af bash-scripts kan denne metode virke passende, men det er lidt svært for mig - bash-scripts er ikke min favorit. Den næste metode er efter min mening mere interessant og bekvem.

NiPyAPI

NiPyAPI er et Python-bibliotek til at interagere med NiFi-instanser. Dokumentationsside indeholder den nødvendige information til arbejdet med biblioteket. Hurtig start er beskrevet i projekt på github.

Vores script til udrulning af konfigurationen er et program i Python. Lad os gå videre til kodning.
Настраиваем конфиги для дальнейшей работы. Нам понадобятся следующие параметры:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Dernæst vil jeg indsætte navnene på metoderne i dette bibliotek, som er beskrevet her.

Tilslut registreringsdatabasen til nifi-instansen ved hjælp af

nipyapi.versioning.create_registry_client

På dette trin kan du også tilføje en kontrol af, at registreringsdatabasen allerede er blevet tilføjet til instansen; til dette kan du bruge metoden

nipyapi.versioning.list_registry_clients

Vi finder spanden til videre søgning efter flow i kurven

nipyapi.versioning.get_registry_bucket

По найденному bucket ищем flow

nipyapi.versioning.get_flow_in_bucket

Dernæst er det vigtigt at forstå, om denne procesgruppe allerede er tilføjet. Procesgruppen placeres efter koordinater, og der kan opstå en situation, hvor en anden komponent er overlejret oven på en. Jeg tjekkede, dette kan ske :) For at få alle tilføjede procesgrupper bruger vi metoden

nipyapi.canvas.list_all_process_groups

Vi kan yderligere søge, for eksempel på navn.

Я не буду описывать процесс обновления шаблона, скажу лишь, что если в новой версии шаблона процессоры добавляются, то проблем с наличием сообщений в очередях нет. А вот если процессоры удаляются, то проблемы могут возникнуть (nifi не дает удалять процессор, если перед ним скопилась очередь сообщений). Если вам интересно как я решил эту проблему — напишите мне, пожалуйста, обсудим этот момент. Контакты в конце статьи. Перейдем к шагу добавления process group.

Da jeg fejlede scriptet, stødte jeg på en ejendommelighed, at den seneste version af flow ikke altid trækkes op, så jeg anbefaler at tjekke denne version først:

nipyapi.versioning.get_latest_flow_ver

Implementer procesgruppe:

nipyapi.versioning.deploy_flow_version

Vi starter processorerne:

nipyapi.canvas.schedule_process_group

I blokken om CLI stod der, at dataoverførsel ikke automatisk er aktiveret i fjernprocesgruppen? Da jeg implementerede scriptet, stødte jeg også på dette problem. På det tidspunkt var jeg ikke i stand til at starte dataoverførsel ved hjælp af API'et, og jeg besluttede at skrive til udvikleren af ​​NiPyAPI-biblioteket og bede om råd/hjælp. Udvikleren svarede mig, vi diskuterede problemet, og han skrev, at han havde brug for tid til at "tjekke noget". Og så, et par dage senere, kommer der et brev, hvori der er skrevet en funktion i Python, der løser mit startproblem!!! På det tidspunkt var NiPyAPI-versionen 0.13.3, og der var selvfølgelig ikke noget lignende. Men i version 0.14.0, som blev udgivet for ganske nylig, var denne funktion allerede inkluderet i biblioteket. Møde,

nipyapi.canvas.set_remote_process_group_transmission

Så ved at bruge NiPyAPI-biblioteket tilsluttede vi registreringsdatabasen, rullede flow ud og startede endda processorer og dataoverførsel. Så kan du finkæmme koden, tilføje alle former for checks, logning, og det er alt. Men det er en helt anden historie.

Af de automatiseringsmuligheder, jeg overvejede, forekom den sidste for mig at være den mest effektive. For det første er dette stadig python-kode, hvor du kan indlejre hjælpeprogramkode og drage fordel af alle fordelene ved programmeringssproget. For det andet er NiPyAPI-projektet aktivt under udvikling, og i tilfælde af problemer kan du skrive til udvikleren. For det tredje er NiPyAPI stadig et mere fleksibelt værktøj til at interagere med NiFi i løsning af komplekse problemer. For eksempel ved at afgøre, om beskedkøerne nu er tomme i flowet, og om procesgruppen kan opdateres.

Det er alt. Jeg beskrev 3 tilgange til automatisering af flowlevering i NiFi, faldgruber, som en udvikler kan støde på, og leverede arbejdskode til automatisering af levering. Hvis du er lige så interesseret i dette emne som jeg er - skrive!

Kilde: www.habr.com

Tilføj en kommentar