Hola a tots!
La tasca és la següent: hi ha un flux, que es presenta a la imatge de dalt, que s'ha de desplegar a N servidors amb
NiFi Site to Site (S2S) és una manera segura i fàcilment configurable de transferir dades entre instàncies NiFi. Com funciona S2S, vegeu
En els casos en què estem parlant de transferència de dades mitjançant S2S, una instància s'anomena client, el segon servidor. El client envia dades, el servidor rep. Dues maneres de configurar la transferència de dades entre ells:
- Empenta. Des de la instància del client, les dades s'envien mitjançant un grup de processos remots (RPG). A la instància del servidor, les dades es reben mitjançant el port d'entrada
- Jale. El servidor rep dades mitjançant RPG, el client envia mitjançant el port de sortida.
El flux per al desplegament s'emmagatzema al registre Apache.
Apache NiFi Registry és un subprojecte d'Apache NiFi que proporciona una eina per a l'emmagatzematge de flux i el control de versions. Una mena de GIT. Podeu trobar informació sobre la instal·lació, la configuració i el treball amb el registre a
Al principi, quan N és un nombre petit, el flux es lliura i s'actualitza manualment en un temps acceptable.
Però a mesura que N creix, els problemes es fan més nombrosos:
- es necessita més temps per actualitzar el flux. Heu d'iniciar sessió a tots els servidors
- Es produeixen errors d'actualització de plantilles. Aquí l'han actualitzat, però aquí s'han oblidat
- errors humans en realitzar un gran nombre d'operacions similars
Tot això ens porta al fet que hem d'automatitzar el procés. Vaig provar les maneres següents per resoldre aquest problema:
- Utilitzeu MiNiFi en comptes de NiFi
- NiFi CLI
- NiPyAPI
Ús de MiniFi
Un altre subprojecte ajudarà a resoldre aquest problema: el servidor MiNiFi C2. Aquest producte està pensat per ser el punt central de l'arquitectura de desplegament de configuració. Com configurar l'entorn - descrit a
L'opció descrita a l'article anterior funciona i no és difícil d'implementar, però no hem d'oblidar el següent:
- Minifi no té tots els processadors de nifi
- Les versions del processador Minifi es queden per darrere de les versions del processador NiFi.
En el moment d'escriure, l'última versió de NiFi és la 1.9.2. L'última versió del processador MiNiFi és la 1.7.0. Es poden afegir processadors a MiNiFi, però a causa de les discrepàncies de versió entre els processadors NiFi i MiNiFi, és possible que això no funcioni.
NiFi CLI
A jutjar per
Inicieu la utilitat
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Perquè puguem carregar el flux necessari des del registre, hem de conèixer els identificadors del cub (identificador del cub) i el propi flux (identificador del flux). Aquestes dades es poden obtenir a través del cli o a la interfície web del registre NiFi. A la interfície web es veu així:
Amb la CLI es fa:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Comencem a importar el grup de processos des del registre:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Un punt important és que qualsevol instància nifi es pot especificar com l'amfitrió al qual enrotllem el grup de processos.
Grup de processos afegit amb processadors aturats, cal iniciar-los
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Genial, els processadors han començat. Tanmateix, segons els termes de la tasca, necessitem instàncies NiFi per enviar dades a altres instàncies. Suposem que heu triat el mètode Push per transferir dades al servidor. Per organitzar la transferència de dades, cal que habiliteu la transferència de dades al grup de processos remots (RPG) afegit, que ja està inclòs al nostre flux.
A la documentació de la CLI i altres fonts, no he trobat cap manera d'habilitar la transferència de dades. Si sabeu com fer-ho, escriviu als comentaris.
Com que tenim una festa i estem preparats per anar al final, trobarem una sortida! Podeu utilitzar l'API NiFi per resoldre aquest problema. Utilitzem el mètode següent, prenem l'identificador dels exemples anteriors (en el nostre cas és 7f522a13-016e-1000-e504-d5b15587f2f3). Descripció dels mètodes de l'API NiFi
Al cos heu de passar JSON, així:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Paràmetres que s'han d'emplenar perquè funcioni:
van ser - estat de transferència de dades. Disponible: TRANSMITENT per habilitar la transferència de dades, STOPPED per desactivar
versió - versió del processador
La versió serà 0 per defecte quan es creï, però aquests paràmetres es poden obtenir mitjançant el mètode
Per als amants dels scripts bash, aquest mètode pot semblar adequat, però és una mica difícil per a mi: els scripts bash no són els meus preferits. El següent mètode és més interessant i convenient al meu entendre.
NiPyAPI
NiPyAPI és una biblioteca de Python per interactuar amb instàncies NiFi.
El nostre script per desplegar la configuració és un programa en Python. Passem a la codificació.
Configurem les configuracions per treballar més. Necessitarem els següents paràmetres:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
A continuació inseriré els noms dels mètodes d'aquesta biblioteca, que es descriuen
Connecteu el registre a la instància nifi utilitzant
nipyapi.versioning.create_registry_client
En aquest pas, també podeu afegir una comprovació que el registre ja s'ha afegit a la instància; per a això podeu utilitzar el mètode
nipyapi.versioning.list_registry_clients
Trobem la galleda per a una recerca posterior de flux a la cistella
nipyapi.versioning.get_registry_bucket
Amb la galleda trobada, busquem el flux
nipyapi.versioning.get_flow_in_bucket
A continuació, és important entendre si aquest grup de processos ja s'ha afegit. El grup de Procés es col·loca segons les coordenades i es pot produir una situació quan se superposa un segon component a sobre d'un. He comprovat, això pot passar :) Per obtenir tots els grups de processos afegits, utilitzem el mètode
nipyapi.canvas.list_all_process_groups
Podem cercar més, per exemple, pel nom.
No descriuré el procés d'actualització de la plantilla, només diré que si s'afegeixen processadors a la nova versió de la plantilla, no hi ha problemes amb la presència de missatges a les cues. Però si s'eliminen processadors, poden sorgir problemes (nifi no us permet eliminar un processador si s'ha acumulat una cua de missatges al davant). Si esteu interessats en com vaig resoldre aquest problema, si us plau, escriu-me i parlarem d'aquest problema. Contactes al final de l'article. Passem al pas d'afegir un grup de processos.
En depurar l'script, em vaig trobar amb una peculiaritat que la darrera versió de flow no sempre s'aixeca, així que recomano comprovar aquesta versió primer:
nipyapi.versioning.get_latest_flow_ver
Desplega el grup de processos:
nipyapi.versioning.deploy_flow_version
Iniciem els processadors:
nipyapi.canvas.schedule_process_group
Al bloc sobre CLI es va escriure que la transferència de dades no s'habilita automàticament al grup de processos remots? Quan vaig implementar l'script, també vaig trobar aquest problema. En aquell moment, no vaig poder iniciar la transferència de dades mitjançant l'API i vaig decidir escriure al desenvolupador de la biblioteca NiPyAPI i demanar consell/ajuda. El desenvolupador em va respondre, vam parlar del problema i va escriure que necessitava temps per "comprovar alguna cosa". I aleshores, un parell de dies després, arriba una carta on s'escriu una funció en Python que resol el meu problema de llançament!!! En aquell moment, la versió de NiPyAPI era la 0.13.3 i, per descomptat, no hi havia res semblant. Però a la versió 0.14.0, que es va publicar fa poc, aquesta funció ja estava inclosa a la biblioteca. Trobar-se,
nipyapi.canvas.set_remote_process_group_transmission
Per tant, utilitzant la biblioteca NiPyAPI, vam connectar el registre, vam implementar el flux i fins i tot vam iniciar processadors i transferència de dades. A continuació, podeu pentinar el codi, afegir tot tipus de comprovacions, registrar i això és tot. Però aquesta és una història completament diferent.
De les opcions d'automatització que vaig considerar, l'última em va semblar la més eficient. En primer lloc, encara és codi Python, en el qual podeu incrustar codi de programa auxiliar i aprofitar tots els avantatges del llenguatge de programació. En segon lloc, el projecte NiPyAPI s'està desenvolupant activament i en cas de problemes podeu escriure al desenvolupador. En tercer lloc, NiPyAPI segueix sent una eina més flexible per interactuar amb NiFi per resoldre problemes complexos. Per exemple, per determinar si les cues de missatges estan ara buides al flux i si el grup de processos es pot actualitzar.
Això és tot. Vaig descriure 3 enfocaments per automatitzar el lliurament de flux a NiFi, inconvenients que pot trobar un desenvolupador i vaig proporcionar un codi de treball per automatitzar el lliurament. Si estàs tan interessat en aquest tema com a mi...
Font: www.habr.com