Ola a todos!
A tarefa é a seguinte: hai un fluxo que se mostra na imaxe de arriba, que debe ser implementado a N servidores con
NiFi Site to Site (S2S) é unha forma segura e altamente personalizable de transferir datos entre instancias NiFi. Vexa como funciona S2S
Cando se trata de transferencia de datos mediante S2S, unha instancia chámase cliente, a segunda é un servidor. O cliente envía datos, o servidor os recibe. Dúas formas de configurar a transferencia de datos entre eles:
- Empuxe. Os datos envíanse desde a instancia do cliente mediante un grupo de procesos remotos (RPG). Na instancia do servidor, os datos recíbense mediante o porto de entrada
- Puxe. O servidor recibe datos usando o RPG, o cliente envía usando o porto de saída.
O fluxo para rodar almacénase no Rexistro de Apache.
Apache NiFi Registry é un subproxecto de Apache NiFi que proporciona unha ferramenta de almacenamento de fluxos e versións. Unha especie de GIT. Pódese atopar información sobre a instalación, configuración e traballo co rexistro en
Ao principio, cando N é un número pequeno, o fluxo entrégase e actualízase a man nun tempo razoable.
Pero a medida que crece N, hai máis problemas:
- leva máis tempo actualizar o fluxo. Debes ir a todos os servidores
- hai erros ao actualizar os modelos. Aquí actualizaron, pero aquí esqueceron
- erro humano ao realizar un gran número de operacións similares
Todo isto lévanos ao feito de que é necesario automatizar o proceso. Probei as seguintes formas de resolver este problema:
- Use MiNiFi en lugar de NiFi
- NiFi CLI
- NiPyAPI
Usando MiNiFi
Outro subproxecto, MiNiFi C2 Server, axudará a resolver este problema. Este produto pretende ser o punto central na arquitectura de implantación. Como configurar o ambiente - descrito en
A opción descrita no artigo anterior funciona e non é difícil de implementar, pero non debemos esquecer o seguinte:
- minifi non ten todos os procesadores de nifi
- As versións de CPU en Minifi quedan por detrás das versións de CPU en NiFi.
No momento de escribir este artigo, a última versión de NiFi é 1.9.2. A versión do procesador da última versión de MiNiFi é 1.7.0. Pódense engadir procesadores a MiNiFi, pero debido a discrepancias de versión entre os procesadores NiFi e MiNiFi, é posible que isto non funcione.
NiFi CLI
A xulgar por
Executar a utilidade
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Para cargar o fluxo necesario desde o rexistro, necesitamos coñecer os identificadores da cesta (identificador de balde) e o propio fluxo (identificador de fluxo). Estes datos pódense obter a través do cli ou na interface web do rexistro NiFi. A interface web ten o seguinte aspecto:
Usando a CLI, fai isto:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Executar o grupo de procesos de importación desde o rexistro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Un punto importante é que calquera instancia de nifi pode especificarse como o host no que rodamos o grupo de procesos.
Engadiuse un grupo de procesos cos procesadores parados, hai que inicialos
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Xenial, os procesadores comezaron. Non obstante, segundo as condicións do problema, necesitamos instancias NiFi para enviar datos a outras instancias. Supoñamos que se escolleu o método Push para transferir datos ao servidor. Para organizar a transferencia de datos, é necesario activar a transferencia de datos (Activar a transmisión) no Grupo de procesos remotos (RPG) engadido, que xa está incluído no noso fluxo.
Na documentación da CLI e outras fontes, non atopei unha forma de habilitar a transferencia de datos. Se sabes como facelo, escribe nos comentarios.
Xa que temos bash e estamos preparados para ir ata o final, atoparemos unha saída! Podes usar a API NiFi para resolver este problema. Usemos o seguinte método, tomamos o ID dos exemplos anteriores (no noso caso é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrición dos métodos de API NiFi
No corpo, debes pasar JSON, da seguinte forma:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parámetros que se deben cubrir para "traballar":
foron - Estado da transferencia de datos. Dispoñible TRANSMITING para activar a transferencia de datos, STOPED para desactivala
versión - Versión do procesador
A versión predeterminada será 0 cando se crea, pero estes parámetros pódense obter mediante o método
Para os amantes dos scripts bash, este método pode parecer axeitado, pero é difícil para min: os scripts bash non son os meus favoritos. O seguinte xeito é máis interesante e máis cómodo na miña opinión.
NiPyAPI
NiPyAPI é unha biblioteca de Python para interactuar con instancias NiFi.
O noso script para implementar a configuración é un programa Python. Pasemos á codificación.
Configura as configuracións para seguir traballando. Necesitaremos os seguintes parámetros:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Ademais inserirei os nomes dos métodos desta biblioteca, que se describen
Conectamos o rexistro á instancia nifi usando
nipyapi.versioning.create_registry_client
Neste paso, tamén pode engadir unha comprobación de que o rexistro xa se engadiu á instancia, para iso pode usar o método
nipyapi.versioning.list_registry_clients
Atopamos o balde para seguir buscando o caudal na cesta
nipyapi.versioning.get_registry_bucket
Segundo o cubo atopado, buscamos caudal
nipyapi.versioning.get_flow_in_bucket
A continuación, é importante comprender se xa se engadiu este grupo de procesos. O grupo de procesos sitúase por coordenadas e pode xurdir unha situación cando se superpoña un segundo sobre un. Comprobei, pode ser 🙂 Para obter todo o grupo de procesos engadido, utiliza o método
nipyapi.canvas.list_all_process_groups
e despois podemos buscar, por exemplo, polo nome.
Non vou describir o proceso de actualización do modelo, só direi que se se engaden procesadores na nova versión do modelo, non hai problemas coa presenza de mensaxes nas filas. Pero se se eliminan os procesadores, poden xurdir problemas (nifi non permite a eliminación do procesador se se acumula unha fila de mensaxes diante del). Se estás interesado en como resolvín este problema, escríbeme, por favor, discutiremos este punto. Contactos ao final do artigo. Pasemos ao paso de engadir un grupo de procesos.
Ao depurar o script, atopeime cunha característica que indica que a última versión do fluxo non sempre está activada, polo que recomendo que primeiro aclare esta versión:
nipyapi.versioning.get_latest_flow_ver
Implementar grupo de procesos:
nipyapi.versioning.deploy_flow_version
Iniciamos os procesadores:
nipyapi.canvas.schedule_process_group
No bloque sobre CLI, escribiuse que a transferencia de datos non se activa automaticamente no grupo de procesos remotos? Ao implementar o script, tamén atopei este problema. Nese momento, non podía iniciar a transferencia de datos usando a API e decidín escribirlle ao programador da biblioteca NiPyAPI e pedirlle consello/axuda. O programador contestoume, discutimos o problema e escribiu que necesitaba tempo para "comprobar algo". E agora, un par de días despois, chega un correo electrónico no que se escribe unha función de Python que resolve o meu problema de inicio!!! Nese momento, a versión de NiPyAPI era 0.13.3 e, por suposto, non había nada dese tipo. Pero na versión 0.14.0, que se publicou hai pouco, esta función xa se incluíu na biblioteca. Coñece
nipyapi.canvas.set_remote_process_group_transmission
Entón, coa axuda da biblioteca NiPyAPI, conectamos o rexistro, aumentamos o fluxo e ata iniciamos os procesadores e a transferencia de datos. Despois podes peitear o código, engadir todo tipo de comprobacións, rexistrar e xa está. Pero esa é unha historia completamente diferente.
Das opcións de automatización que considerei, esta última pareceume a máis eficiente. En primeiro lugar, este aínda é código Python, no que pode incorporar código de programa auxiliar e gozar de todos os beneficios dunha linguaxe de programación. En segundo lugar, o proxecto NiPyAPI está a desenvolverse activamente e, en caso de problemas, pode escribirlle ao programador. En terceiro lugar, NiPyAPI aínda é unha ferramenta máis flexible para interactuar con NiFi na resolución de problemas complexos. Por exemplo, para determinar se as colas de mensaxes están baleiras no fluxo e se é posible actualizar o grupo de procesos.
Iso é todo. Describín 3 enfoques para automatizar a entrega de fluxo en NiFi, as trampas que pode atopar un desenvolvedor e proporcionei un código de traballo para automatizar a entrega. Se estás tan interesado neste tema coma min -
Fonte: www.habr.com