Automatización de entrega de fluxo en Apache NiFi

Ola a todos!

Automatización de entrega de fluxo en Apache NiFi

A tarefa é a seguinte: hai un fluxo que se mostra na imaxe de arriba, que debe ser implementado a N servidores con Apache NiFi. Proba de fluxo: un ficheiro está a ser xerado e enviado a outra instancia de NiFi. A transferencia de datos realízase mediante o protocolo NiFi Site to Site.

NiFi Site to Site (S2S) é unha forma segura e altamente personalizable de transferir datos entre instancias NiFi. Vexa como funciona S2S documentación e é importante lembrar de configurar a súa instancia NiFi para permitir que S2S vexa aquí.

Cando se trata de transferencia de datos mediante S2S, unha instancia chámase cliente, a segunda é un servidor. O cliente envía datos, o servidor os recibe. Dúas formas de configurar a transferencia de datos entre eles:

  1. Empuxe. Os datos envíanse desde a instancia do cliente mediante un grupo de procesos remotos (RPG). Na instancia do servidor, os datos recíbense mediante o porto de entrada
  2. Puxe. O servidor recibe datos usando o RPG, o cliente envía usando o porto de saída.


O fluxo para rodar almacénase no Rexistro de Apache.

Apache NiFi Registry é un subproxecto de Apache NiFi que proporciona unha ferramenta de almacenamento de fluxos e versións. Unha especie de GIT. Pódese atopar información sobre a instalación, configuración e traballo co rexistro en documentación oficial. O fluxo de almacenamento combínase nun grupo de procesos e gárdase no rexistro neste formulario. Volveremos sobre isto máis adiante no artigo.

Ao principio, cando N é un número pequeno, o fluxo entrégase e actualízase a man nun tempo razoable.

Pero a medida que crece N, hai máis problemas:

  1. leva máis tempo actualizar o fluxo. Debes ir a todos os servidores
  2. hai erros ao actualizar os modelos. Aquí actualizaron, pero aquí esqueceron
  3. erro humano ao realizar un gran número de operacións similares

Todo isto lévanos ao feito de que é necesario automatizar o proceso. Probei as seguintes formas de resolver este problema:

  1. Use MiNiFi en lugar de NiFi
  2. NiFi CLI
  3. NiPyAPI

Usando MiNiFi

ApacheMiNify é un subproxecto de Apache NiFi. MiNiFy é un axente compacto que usa os mesmos procesadores que NiFi, o que lle permite crear o mesmo fluxo que en NiFi. A lixeireza do axente conséguese, entre outras cousas, debido a que MiNiFy non dispón dunha interface gráfica para a configuración do fluxo. A falta de interface gráfica de MiNiFy significa que é necesario resolver o problema da entrega de fluxo en minifi. Dado que MiNiFy se usa activamente en IOT, hai moitos compoñentes e o proceso de entrega de fluxo ás instancias de minifi finais debe ser automatizado. Unha tarefa coñecida, non?

Outro subproxecto, MiNiFi C2 Server, axudará a resolver este problema. Este produto pretende ser o punto central na arquitectura de implantación. Como configurar o ambiente - descrito en Este artigo sobre Habré e a información é suficiente para resolver o problema. MiNiFi xunto co servidor C2 actualiza automaticamente a súa configuración. O único inconveniente deste enfoque é que tes que crear modelos no servidor C2, un simple compromiso co rexistro non é suficiente.

A opción descrita no artigo anterior funciona e non é difícil de implementar, pero non debemos esquecer o seguinte:

  1. minifi non ten todos os procesadores de nifi
  2. As versións de CPU en Minifi quedan por detrás das versións de CPU en NiFi.

No momento de escribir este artigo, a última versión de NiFi é 1.9.2. A versión do procesador da última versión de MiNiFi é 1.7.0. Pódense engadir procesadores a MiNiFi, pero debido a discrepancias de versión entre os procesadores NiFi e MiNiFi, é posible que isto non funcione.

NiFi CLI

A xulgar por descrición ferramenta no sitio web oficial, esta é unha ferramenta para automatizar a interacción entre NiFI e NiFi Registry no campo da entrega de fluxos ou xestión de procesos. Descarga esta ferramenta para comezar. por iso.

Executar a utilidade

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Para cargar o fluxo necesario desde o rexistro, necesitamos coñecer os identificadores da cesta (identificador de balde) e o propio fluxo (identificador de fluxo). Estes datos pódense obter a través do cli ou na interface web do rexistro NiFi. A interface web ten o seguinte aspecto:

Automatización de entrega de fluxo en Apache NiFi

Usando a CLI, fai isto:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Executar o grupo de procesos de importación desde o rexistro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un punto importante é que calquera instancia de nifi pode especificarse como o host no que rodamos o grupo de procesos.

Engadiuse un grupo de procesos cos procesadores parados, hai que inicialos

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Xenial, os procesadores comezaron. Non obstante, segundo as condicións do problema, necesitamos instancias NiFi para enviar datos a outras instancias. Supoñamos que se escolleu o método Push para transferir datos ao servidor. Para organizar a transferencia de datos, é necesario activar a transferencia de datos (Activar a transmisión) no Grupo de procesos remotos (RPG) engadido, que xa está incluído no noso fluxo.

Automatización de entrega de fluxo en Apache NiFi

Na documentación da CLI e outras fontes, non atopei unha forma de habilitar a transferencia de datos. Se sabes como facelo, escribe nos comentarios.

Xa que temos bash e estamos preparados para ir ata o final, atoparemos unha saída! Podes usar a API NiFi para resolver este problema. Usemos o seguinte método, tomamos o ID dos exemplos anteriores (no noso caso é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrición dos métodos de API NiFi aquí.

Automatización de entrega de fluxo en Apache NiFi
No corpo, debes pasar JSON, da seguinte forma:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parámetros que se deben cubrir para "traballar":
foron - Estado da transferencia de datos. Dispoñible TRANSMITING para activar a transferencia de datos, STOPED para desactivala
versión - Versión do procesador

A versión predeterminada será 0 cando se crea, pero estes parámetros pódense obter mediante o método

Automatización de entrega de fluxo en Apache NiFi

Para os amantes dos scripts bash, este método pode parecer axeitado, pero é difícil para min: os scripts bash non son os meus favoritos. O seguinte xeito é máis interesante e máis cómodo na miña opinión.

NiPyAPI

NiPyAPI é unha biblioteca de Python para interactuar con instancias NiFi. Páxina de documentación contén a información necesaria para traballar coa biblioteca. O inicio rápido descríbese en proxecto en github.

O noso script para implementar a configuración é un programa Python. Pasemos á codificación.
Configura as configuracións para seguir traballando. Necesitaremos os seguintes parámetros:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Ademais inserirei os nomes dos métodos desta biblioteca, que se describen aquí.

Conectamos o rexistro á instancia nifi usando

nipyapi.versioning.create_registry_client

Neste paso, tamén pode engadir unha comprobación de que o rexistro xa se engadiu á instancia, para iso pode usar o método

nipyapi.versioning.list_registry_clients

Atopamos o balde para seguir buscando o caudal na cesta

nipyapi.versioning.get_registry_bucket

Segundo o cubo atopado, buscamos caudal

nipyapi.versioning.get_flow_in_bucket

A continuación, é importante comprender se xa se engadiu este grupo de procesos. O grupo de procesos sitúase por coordenadas e pode xurdir unha situación cando se superpoña un segundo sobre un. Comprobei, pode ser 🙂 Para obter todo o grupo de procesos engadido, utiliza o método

nipyapi.canvas.list_all_process_groups

e despois podemos buscar, por exemplo, polo nome.

Non vou describir o proceso de actualización do modelo, só direi que se se engaden procesadores na nova versión do modelo, non hai problemas coa presenza de mensaxes nas filas. Pero se se eliminan os procesadores, poden xurdir problemas (nifi non permite a eliminación do procesador se se acumula unha fila de mensaxes diante del). Se estás interesado en como resolvín este problema, escríbeme, por favor, discutiremos este punto. Contactos ao final do artigo. Pasemos ao paso de engadir un grupo de procesos.

Ao depurar o script, atopeime cunha característica que indica que a última versión do fluxo non sempre está activada, polo que recomendo que primeiro aclare esta versión:

nipyapi.versioning.get_latest_flow_ver

Implementar grupo de procesos:

nipyapi.versioning.deploy_flow_version

Iniciamos os procesadores:

nipyapi.canvas.schedule_process_group

No bloque sobre CLI, escribiuse que a transferencia de datos non se activa automaticamente no grupo de procesos remotos? Ao implementar o script, tamén atopei este problema. Nese momento, non podía iniciar a transferencia de datos usando a API e decidín escribirlle ao programador da biblioteca NiPyAPI e pedirlle consello/axuda. O programador contestoume, discutimos o problema e escribiu que necesitaba tempo para "comprobar algo". E agora, un par de días despois, chega un correo electrónico no que se escribe unha función de Python que resolve o meu problema de inicio!!! Nese momento, a versión de NiPyAPI era 0.13.3 e, por suposto, non había nada dese tipo. Pero na versión 0.14.0, que se publicou hai pouco, esta función xa se incluíu na biblioteca. Coñece

nipyapi.canvas.set_remote_process_group_transmission

Entón, coa axuda da biblioteca NiPyAPI, conectamos o rexistro, aumentamos o fluxo e ata iniciamos os procesadores e a transferencia de datos. Despois podes peitear o código, engadir todo tipo de comprobacións, rexistrar e xa está. Pero esa é unha historia completamente diferente.

Das opcións de automatización que considerei, esta última pareceume a máis eficiente. En primeiro lugar, este aínda é código Python, no que pode incorporar código de programa auxiliar e gozar de todos os beneficios dunha linguaxe de programación. En segundo lugar, o proxecto NiPyAPI está a desenvolverse activamente e, en caso de problemas, pode escribirlle ao programador. En terceiro lugar, NiPyAPI aínda é unha ferramenta máis flexible para interactuar con NiFi na resolución de problemas complexos. Por exemplo, para determinar se as colas de mensaxes están baleiras no fluxo e se é posible actualizar o grupo de procesos.

Iso é todo. Describín 3 enfoques para automatizar a entrega de fluxo en NiFi, as trampas que pode atopar un desenvolvedor e proporcionei un código de traballo para automatizar a entrega. Se estás tan interesado neste tema coma min - escribe!

Fonte: www.habr.com

Engadir un comentario