Automatización de entrega de flujo en Apache NiFi

Hola a todos!

Automatización de entrega de flujo en Apache NiFi

La tarea es la siguiente: hay un flujo que se muestra en la imagen de arriba, que debe implementarse en N servidores con apache nifi. Prueba de flujo: se genera un archivo y se envía a otra instancia de NiFi. La transferencia de datos se produce mediante el protocolo NiFi de sitio a sitio.

NiFi Site to Site (S2S) es una forma segura y altamente personalizable de transferir datos entre instancias de NiFi. Vea cómo funciona S2S documentación y es importante recordar configurar su instancia NiFi para permitir que S2S vea aquí.

Cuando se trata de transferencia de datos mediante S2S, una instancia se llama cliente y la segunda es servidor. El cliente envía datos, el servidor los recibe. Dos formas de configurar la transferencia de datos entre ellos:

  1. Push. Los datos se envían desde la instancia del cliente mediante un grupo de procesos remotos (RPG). En la instancia del servidor, los datos se reciben mediante el puerto de entrada
  2. Jale. El servidor recibe datos mediante el RPG, el cliente los envía mediante el puerto de salida.


El flujo para rodar se almacena en el Registro de Apache.

Apache NiFi Registry es un subproyecto de Apache NiFi que proporciona una herramienta de control de versiones y almacenamiento de flujo. Una especie de GIT. Puede encontrar información sobre la instalación, configuración y trabajo con el registro en documentación oficial. El flujo de almacenamiento se combina en un grupo de procesos y se almacena en el registro de esta forma. Volveremos a esto más adelante en el artículo.

Al principio, cuando N es un número pequeño, el flujo se entrega y actualiza manualmente en un tiempo razonable.

Pero a medida que N crece, surgen más problemas:

  1. se necesita más tiempo para actualizar el flujo. Necesitas ir a todos los servidores.
  2. hay errores al actualizar las plantillas. Aquí actualizaron, pero aquí se olvidaron.
  3. error humano al realizar una gran cantidad de operaciones similares

Todo esto nos lleva al hecho de que necesitamos automatizar el proceso. Intenté las siguientes formas de resolver este problema:

  1. Utilice MiNiFi en lugar de NiFi
  2. CLI de Ni-Fi
  3. NiPyAPI

Usando MiniFi

ApacheMiNify es un subproyecto de Apache NiFi. MiNiFy es un agente compacto que utiliza los mismos procesadores que NiFi, lo que le permite crear el mismo flujo que en NiFi. La ligereza del agente se consigue, entre otras cosas, gracias a que MiNiFy no dispone de interfaz gráfica para la configuración del flujo. La falta de interfaz gráfica de MiNiFy significa que es necesario resolver el problema de la entrega de flujo en minifi. Dado que MiNiFy se utiliza activamente en IoT, existen muchos componentes y el proceso de entrega de flujo a las instancias minifi finales debe automatizarse. Una tarea familiar, ¿verdad?

Otro subproyecto, MiNiFi C2 Server, ayudará a resolver este problema. Este producto está destinado a ser el punto central de la arquitectura de implementación. Cómo configurar el entorno: descrito en este artículo sobre Habré y la información es suficiente para resolver el problema. MiNiFi junto con el servidor C2 actualiza automáticamente su configuración. El único inconveniente de este enfoque es que hay que crear plantillas en el servidor C2; una simple confirmación en el registro no es suficiente.

La opción descrita en el artículo anterior funciona y no es difícil de implementar, pero no debemos olvidar lo siguiente:

  1. minifi no tiene todos los procesadores de nifi
  2. Las versiones de CPU en Minifi van por detrás de las versiones de CPU en NiFi.

Al momento de escribir este artículo, la última versión de NiFi es 1.9.2. La versión del procesador de la última versión de MiNiFi es 1.7.0. Se pueden agregar procesadores a MiNiFi, pero debido a discrepancias de versión entre los procesadores NiFi y MiNiFi, es posible que esto no funcione.

CLI de Ni-Fi

A juzgar por los descripción herramienta en el sitio web oficial, esta es una herramienta para automatizar la interacción entre NiFI y NiFi Registry en el campo de la entrega de flujo o la gestión de procesos. Para comenzar, debe descargar esta herramienta. por lo tanto.

Ejecute la utilidad

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

Para que podamos cargar el flujo requerido desde el registro, necesitamos conocer los identificadores del depósito (identificador de depósito) y el flujo en sí (identificador de flujo). Estos datos se pueden obtener a través del cli o en la interfaz web del registro NiFi. La interfaz web se ve así:

Automatización de entrega de flujo en Apache NiFi

Usando la CLI esto se hace:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

Ejecute el grupo de procesos de importación desde el registro:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

Un punto importante es que cualquier instancia de nifi se puede especificar como el host en el que acumulamos el grupo de procesos.

Se agregó un grupo de procesos con procesadores detenidos, es necesario iniciarlos

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

Genial, los procesadores han comenzado. Sin embargo, según las condiciones del problema, necesitamos instancias NiFi para enviar datos a otras instancias. Supongamos que se eligió el método Push para transferir datos al servidor. Para organizar la transferencia de datos, es necesario habilitar la transferencia de datos (Habilitar transmisión) en el Grupo de procesos remotos (RPG) agregado, que ya está incluido en nuestro flujo.

Automatización de entrega de flujo en Apache NiFi

En la documentación de la CLI y otras fuentes, no encontré una manera de habilitar la transferencia de datos. Si sabes cómo hacer esto, por favor escribe en los comentarios.

Ya que tenemos bash y estamos listos para llegar al final, ¡encontraremos una salida! Puede utilizar la API NiFi para resolver este problema. Usemos el siguiente método, tomamos el ID de los ejemplos anteriores (en nuestro caso es 7f522a13-016e-1000-e504-d5b15587f2f3). Descripción de los métodos API de NiFi aquí.

Automatización de entrega de flujo en Apache NiFi
En el cuerpo, debes pasar JSON, de la siguiente forma:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

Parámetros que deben completarse para “funcionar”:
estado — estado de la transferencia de datos. Disponible TRANSMITIENDO para habilitar la transferencia de datos, DETENIDO para deshabilitar
versión - versión del procesador

La versión predeterminada será 0 cuando se cree, pero estos parámetros se pueden obtener usando el método

Automatización de entrega de flujo en Apache NiFi

Para los amantes de los scripts bash, este método puede parecer adecuado, pero para mí es difícil: los scripts bash no son mis favoritos. La siguiente forma es, en mi opinión, más interesante y más cómoda.

NiPyAPI

NiPyAPI es una biblioteca de Python para interactuar con instancias de NiFi. Página de documentación contiene la información necesaria para trabajar con la biblioteca. El inicio rápido se describe en proyecto en github.

Nuestro script para implementar la configuración es un programa Python. Pasemos a la codificación.
Configuramos configuraciones para seguir trabajando. Necesitaremos los siguientes parámetros:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

Además insertaré los nombres de los métodos de esta biblioteca, que se describen aquí.

Conectamos el registro a la instancia nifi usando

nipyapi.versioning.create_registry_client

En este paso, también puedes agregar una verificación de que el registro ya se haya agregado a la instancia, para esto puedes usar el método

nipyapi.versioning.list_registry_clients

Encontramos el balde para buscar más el flujo en la canasta.

nipyapi.versioning.get_registry_bucket

Según el cubo encontrado, buscamos flujo.

nipyapi.versioning.get_flow_in_bucket

A continuación, es importante comprender si este grupo de procesos ya se ha agregado. El grupo de procesos se coloca por coordenadas y puede surgir una situación en la que se superponga un segundo sobre uno. Lo verifiqué, puede ser 🙂 Para obtener todo el grupo de procesos agregado, use el método

nipyapi.canvas.list_all_process_groups

y luego podemos buscar, por ejemplo, por nombre.

No describiré el proceso de actualización de la plantilla, solo diré que si se agregan procesadores en la nueva versión de la plantilla, entonces no hay problemas con la presencia de mensajes en las colas. Pero si se eliminan los procesadores, pueden surgir problemas (nifi no permite la eliminación del procesador si se ha acumulado una cola de mensajes delante de él). Si está interesado en saber cómo resolví este problema, escríbame, por favor, discutiremos este punto. Contactos al final del artículo. Pasemos al paso de agregar un grupo de procesos.

Al depurar el script, encontré una característica que indica que la última versión del flujo no siempre se activa, por lo que recomiendo que primero aclares esta versión:

nipyapi.versioning.get_latest_flow_ver

Implementar grupo de procesos:

nipyapi.versioning.deploy_flow_version

Ponemos en marcha los procesadores:

nipyapi.canvas.schedule_process_group

¿En el bloque sobre CLI se escribió que la transferencia de datos no se habilita automáticamente en el grupo de procesos remotos? Al implementar el script, también encontré este problema. En ese momento, no pude iniciar la transferencia de datos usando la API y decidí escribirle al desarrollador de la biblioteca NiPyAPI y pedirle consejo/ayuda. El desarrollador me respondió, discutimos el problema y me escribió que necesitaba tiempo para "verificar algo". ¡¡¡Y luego, un par de días después, llega una carta en la que está escrita una función en Python que resuelve mi problema de lanzamiento!!! En ese momento, la versión de NiPyAPI era 0.13.3 y, por supuesto, no contenía nada de eso. Pero en la versión 0.14.0, que se lanzó recientemente, esta función ya estaba incluida en la biblioteca. Encontrarse,

nipyapi.canvas.set_remote_process_group_transmission

Entonces, utilizando la biblioteca NiPyAPI, conectamos el registro, implementamos el flujo e incluso iniciamos procesadores y transferencia de datos. Luego puedes peinar el código, agregar todo tipo de comprobaciones, registros y eso es todo. Pero esa es una historia completamente diferente.

De las opciones de automatización que consideré, la última me pareció la más eficiente. En primer lugar, esto sigue siendo código Python, en el que puedes incrustar código de programa auxiliar y disfrutar de todos los beneficios de un lenguaje de programación. En segundo lugar, el proyecto NiPyAPI se está desarrollando activamente y, en caso de problemas, puede escribir al desarrollador. En tercer lugar, NiPyAPI sigue siendo una herramienta más flexible para interactuar con NiFi para resolver problemas complejos. Por ejemplo, para determinar si las colas de mensajes están actualmente vacías en el flujo y si es posible actualizar el grupo de procesos.

Eso es todo. Describí tres enfoques para automatizar la entrega de flujo en NiFi, los obstáculos que puede encontrar un desarrollador y proporcioné un código de trabajo para automatizar la entrega. Si estás tan interesado en este tema como yo... ¡escribir!

Fuente: habr.com

Añadir un comentario