Automatització del lliurament de flux a Apache NiFi

Hola a tots!

Automatització del lliurament de flux a Apache NiFi

La tasca és la següent: hi ha un flux, que es presenta a la imatge de dalt, que s'ha de desplegar a N servidors amb Apatxe NiFi. Prova de flux: s'està generant un fitxer i enviant-lo a una altra instància de NiFi. La transferència de dades es fa mitjançant el protocol NiFi Site to Site.

NiFi Site to Site (S2S) és una manera segura i fàcilment configurable de transferir dades entre instàncies NiFi. Com funciona S2S, vegeu documentació i és important no oblidar configurar la instància NiFi per permetre S2S, vegeu aquí.

En els casos en què estem parlant de transferència de dades mitjançant S2S, una instància s'anomena client, el segon servidor. El client envia dades, el servidor rep. Dues maneres de configurar la transferència de dades entre ells:

  1. Empenta. Des de la instància del client, les dades s'envien mitjançant un grup de processos remots (RPG). A la instància del servidor, les dades es reben mitjançant el port d'entrada
  2. Jale. El servidor rep dades mitjançant RPG, el client envia mitjançant el port de sortida.


El flux per al desplegament s'emmagatzema al registre Apache.

Apache NiFi Registry és un subprojecte d'Apache NiFi que proporciona una eina per a l'emmagatzematge de flux i el control de versions. Una mena de GIT. Podeu trobar informació sobre la instal·lació, la configuració i el treball amb el registre a documentació oficial. El flux d'emmagatzematge es combina en un grup de processos i s'emmagatzema en aquest formulari al registre. Tornarem a això més endavant en l'article.

Al principi, quan N és un nombre petit, el flux es lliura i s'actualitza manualment en un temps acceptable.

Però a mesura que N creix, els problemes es fan més nombrosos:

  1. es necessita més temps per actualitzar el flux. Heu d'iniciar sessió a tots els servidors
  2. Es produeixen errors d'actualització de plantilles. Aquí l'han actualitzat, però aquí s'han oblidat
  3. errors humans en realitzar un gran nombre d'operacions similars

Tot això ens porta al fet que hem d'automatitzar el procés. Vaig provar les maneres següents per resoldre aquest problema:

  1. Utilitzeu MiNiFi en comptes de NiFi
  2. NiFi CLI
  3. NiPyAPI

Ús de MiniFi

Apache MiNiFy - subprojecte d'Apache NiFi. MiNiFy és un agent compacte que utilitza els mateixos processadors que NiFi, la qual cosa us permet crear els mateixos fluxos que en NiFi. La naturalesa lleugera de l'agent s'aconsegueix, entre altres coses, pel fet que MiNiFy no té una interfície gràfica per a la configuració del flux. La manca d'una interfície gràfica a MiNiFy fa que cal resoldre el problema de lliurar el flux a la minifi. Com que MiNiFy s'utilitza activament a IOT, hi ha molts components i s'ha d'automatitzar el procés de lliurament del flux a les instàncies finals de minifi. Una tasca familiar, oi?

Un altre subprojecte ajudarà a resoldre aquest problema: el servidor MiNiFi C2. Aquest producte està pensat per ser el punt central de l'arquitectura de desplegament de configuració. Com configurar l'entorn - descrit a aquest article Hi ha prou informació sobre Habré per resoldre el problema. MiNiFi, juntament amb el servidor C2, actualitza automàticament la seva configuració. L'únic inconvenient d'aquest enfocament és que heu de crear plantilles al servidor C2; una simple confirmació al registre no és suficient.

L'opció descrita a l'article anterior funciona i no és difícil d'implementar, però no hem d'oblidar el següent:

  1. Minifi no té tots els processadors de nifi
  2. Les versions del processador Minifi es queden per darrere de les versions del processador NiFi.

En el moment d'escriure, l'última versió de NiFi és la 1.9.2. L'última versió del processador MiNiFi és la 1.7.0. Es poden afegir processadors a MiNiFi, però a causa de les discrepàncies de versió entre els processadors NiFi i MiNiFi, és possible que això no funcioni.

NiFi CLI

A jutjar per descripció al lloc web oficial, aquesta és una eina per automatitzar la interacció entre NiFI i NiFi Registry en l'àmbit del lliurament de flux o gestió de processos. Per començar, heu de descarregar aquesta eina. per tant.

Inicieu la utilitat

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Perquè puguem carregar el flux necessari des del registre, hem de conèixer els identificadors del cub (identificador del cub) i el propi flux (identificador del flux). Aquestes dades es poden obtenir a través del cli o a la interfície web del registre NiFi. A la interfície web es veu així:

Automatització del lliurament de flux a Apache NiFi

Amb la CLI es fa:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Comencem a importar el grup de processos des del registre:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un punt important és que qualsevol instància nifi es pot especificar com l'amfitrió al qual enrotllem el grup de processos.

Grup de processos afegit amb processadors aturats, cal iniciar-los

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Genial, els processadors han començat. Tanmateix, segons els termes de la tasca, necessitem instàncies NiFi per enviar dades a altres instàncies. Suposem que heu triat el mètode Push per transferir dades al servidor. Per organitzar la transferència de dades, cal que habiliteu la transferència de dades al grup de processos remots (RPG) afegit, que ja està inclòs al nostre flux.

Automatització del lliurament de flux a Apache NiFi

A la documentació de la CLI i altres fonts, no he trobat cap manera d'habilitar la transferència de dades. Si sabeu com fer-ho, escriviu als comentaris.

Com que tenim una festa i estem preparats per anar al final, trobarem una sortida! Podeu utilitzar l'API NiFi per resoldre aquest problema. Utilitzem el mètode següent, prenem l'identificador dels exemples anteriors (en el nostre cas és 7f522a13-016e-1000-e504-d5b15587f2f3). Descripció dels mètodes de l'API NiFi aquí.

Automatització del lliurament de flux a Apache NiFi
Al cos heu de passar JSON, així:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Paràmetres que s'han d'emplenar perquè funcioni:
van ser - estat de transferència de dades. Disponible: TRANSMITENT per habilitar la transferència de dades, STOPPED per desactivar
versió - versió del processador

La versió serà 0 per defecte quan es creï, però aquests paràmetres es poden obtenir mitjançant el mètode

Automatització del lliurament de flux a Apache NiFi

Per als amants dels scripts bash, aquest mètode pot semblar adequat, però és una mica difícil per a mi: els scripts bash no són els meus preferits. El següent mètode és més interessant i convenient al meu entendre.

NiPyAPI

NiPyAPI és una biblioteca de Python per interactuar amb instàncies NiFi. Pàgina de documentació conté la informació necessària per treballar amb la biblioteca. L'inici ràpid es descriu a projecte a github.

El nostre script per desplegar la configuració és un programa en Python. Passem a la codificació.
Configurem les configuracions per treballar més. Necessitarem els següents paràmetres:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

A continuació inseriré els noms dels mètodes d'aquesta biblioteca, que es descriuen aquí.

Connecteu el registre a la instància nifi utilitzant

nipyapi.versioning.create_registry_client

En aquest pas, també podeu afegir una comprovació que el registre ja s'ha afegit a la instància; per a això podeu utilitzar el mètode

nipyapi.versioning.list_registry_clients

Trobem la galleda per a una recerca posterior de flux a la cistella

nipyapi.versioning.get_registry_bucket

Amb la galleda trobada, busquem el flux

nipyapi.versioning.get_flow_in_bucket

A continuació, és important entendre si aquest grup de processos ja s'ha afegit. El grup de Procés es col·loca segons les coordenades i es pot produir una situació quan se superposa un segon component a sobre d'un. He comprovat, això pot passar :) Per obtenir tots els grups de processos afegits, utilitzem el mètode

nipyapi.canvas.list_all_process_groups

Podem cercar més, per exemple, pel nom.

No descriuré el procés d'actualització de la plantilla, només diré que si s'afegeixen processadors a la nova versió de la plantilla, no hi ha problemes amb la presència de missatges a les cues. Però si s'eliminen processadors, poden sorgir problemes (nifi no us permet eliminar un processador si s'ha acumulat una cua de missatges al davant). Si esteu interessats en com vaig resoldre aquest problema, si us plau, escriu-me i parlarem d'aquest problema. Contactes al final de l'article. Passem al pas d'afegir un grup de processos.

En depurar l'script, em vaig trobar amb una peculiaritat que la darrera versió de flow no sempre s'aixeca, així que recomano comprovar aquesta versió primer:

nipyapi.versioning.get_latest_flow_ver

Desplega el grup de processos:

nipyapi.versioning.deploy_flow_version

Iniciem els processadors:

nipyapi.canvas.schedule_process_group

Al bloc sobre CLI es va escriure que la transferència de dades no s'habilita automàticament al grup de processos remots? Quan vaig implementar l'script, també vaig trobar aquest problema. En aquell moment, no vaig poder iniciar la transferència de dades mitjançant l'API i vaig decidir escriure al desenvolupador de la biblioteca NiPyAPI i demanar consell/ajuda. El desenvolupador em va respondre, vam parlar del problema i va escriure que necessitava temps per "comprovar alguna cosa". I aleshores, un parell de dies després, arriba una carta on s'escriu una funció en Python que resol el meu problema de llançament!!! En aquell moment, la versió de NiPyAPI era la 0.13.3 i, per descomptat, no hi havia res semblant. Però a la versió 0.14.0, que es va publicar fa poc, aquesta funció ja estava inclosa a la biblioteca. Trobar-se,

nipyapi.canvas.set_remote_process_group_transmission

Per tant, utilitzant la biblioteca NiPyAPI, vam connectar el registre, vam implementar el flux i fins i tot vam iniciar processadors i transferència de dades. A continuació, podeu pentinar el codi, afegir tot tipus de comprovacions, registrar i això és tot. Però aquesta és una història completament diferent.

De les opcions d'automatització que vaig considerar, l'última em va semblar la més eficient. En primer lloc, encara és codi Python, en el qual podeu incrustar codi de programa auxiliar i aprofitar tots els avantatges del llenguatge de programació. En segon lloc, el projecte NiPyAPI s'està desenvolupant activament i en cas de problemes podeu escriure al desenvolupador. En tercer lloc, NiPyAPI segueix sent una eina més flexible per interactuar amb NiFi per resoldre problemes complexos. Per exemple, per determinar si les cues de missatges estan ara buides al flux i si el grup de processos es pot actualitzar.

Això és tot. Vaig descriure 3 enfocaments per automatitzar el lliurament de flux a NiFi, inconvenients que pot trobar un desenvolupador i vaig proporcionar un codi de treball per automatitzar el lliurament. Si estàs tan interessat en aquest tema com a mi... escriure!

Font: www.habr.com

Afegeix comentari