Hola a todos!
La tarea es la siguiente: hay un flujo que se muestra en la imagen de arriba, que debe implementarse en N servidores con
NiFi Site to Site (S2S) es una forma segura y altamente personalizable de transferir datos entre instancias de NiFi. Vea cómo funciona S2S
Cuando se trata de transferencia de datos mediante S2S, una instancia se llama cliente y la segunda es servidor. El cliente envía datos, el servidor los recibe. Dos formas de configurar la transferencia de datos entre ellos:
- Push. Los datos se envían desde la instancia del cliente mediante un grupo de procesos remotos (RPG). En la instancia del servidor, los datos se reciben mediante el puerto de entrada
- Jale. El servidor recibe datos mediante el RPG, el cliente los envía mediante el puerto de salida.
El flujo para rodar se almacena en el Registro de Apache.
Apache NiFi Registry es un subproyecto de Apache NiFi que proporciona una herramienta de control de versiones y almacenamiento de flujo. Una especie de GIT. Puede encontrar información sobre la instalación, configuración y trabajo con el registro en
Al principio, cuando N es un número pequeño, el flujo se entrega y actualiza manualmente en un tiempo razonable.
Pero a medida que N crece, surgen más problemas:
- se necesita más tiempo para actualizar el flujo. Necesitas ir a todos los servidores.
- hay errores al actualizar las plantillas. Aquí actualizaron, pero aquí se olvidaron.
- error humano al realizar una gran cantidad de operaciones similares
Todo esto nos lleva al hecho de que necesitamos automatizar el proceso. Intenté las siguientes formas de resolver este problema:
- Utilice MiNiFi en lugar de NiFi
- CLI de Ni-Fi
- NiPyAPI
Usando MiniFi
Otro subproyecto, MiNiFi C2 Server, ayudará a resolver este problema. Este producto está destinado a ser el punto central de la arquitectura de implementación. Cómo configurar el entorno: descrito en
La opción descrita en el artículo anterior funciona y no es difícil de implementar, pero no debemos olvidar lo siguiente:
- minifi no tiene todos los procesadores de nifi
- Las versiones de CPU en Minifi van por detrás de las versiones de CPU en NiFi.
Al momento de escribir este artículo, la última versión de NiFi es 1.9.2. La versión del procesador de la última versión de MiNiFi es 1.7.0. Se pueden agregar procesadores a MiNiFi, pero debido a discrepancias de versión entre los procesadores NiFi y MiNiFi, es posible que esto no funcione.
CLI de Ni-Fi
A juzgar por los
Ejecute la utilidad
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
Para que podamos cargar el flujo requerido desde el registro, necesitamos conocer los identificadores del depósito (identificador de depósito) y el flujo en sí (identificador de flujo). Estos datos se pueden obtener a través del cli o en la interfaz web del registro NiFi. La interfaz web se ve así:
Usando la CLI esto se hace:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Ejecute el grupo de procesos de importación desde el registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
Un punto importante es que cualquier instancia de nifi se puede especificar como el host en el que acumulamos el grupo de procesos.
Se agregó un grupo de procesos con procesadores detenidos, es necesario iniciarlos
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Genial, los procesadores han comenzado. Sin embargo, según las condiciones del problema, necesitamos instancias NiFi para enviar datos a otras instancias. Supongamos que se eligió el método Push para transferir datos al servidor. Para organizar la transferencia de datos, es necesario habilitar la transferencia de datos (Habilitar transmisión) en el Grupo de procesos remotos (RPG) agregado, que ya está incluido en nuestro flujo.
En la documentación de la CLI y otras fuentes, no encontré una manera de habilitar la transferencia de datos. Si sabes cómo hacer esto, por favor escribe en los comentarios.
Ya que tenemos bash y estamos listos para llegar al final, ¡encontraremos una salida! Puede utilizar la API NiFi para resolver este problema. Usemos el siguiente método, tomamos el ID de los ejemplos anteriores (en nuestro caso es 7f522a13-016e-1000-e504-d5b15587f2f3). Descripción de los métodos API de NiFi
En el cuerpo, debes pasar JSON, de la siguiente forma:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
Parámetros que deben completarse para “funcionar”:
estado — estado de la transferencia de datos. Disponible TRANSMITIENDO para habilitar la transferencia de datos, DETENIDO para deshabilitar
versión - versión del procesador
La versión predeterminada será 0 cuando se cree, pero estos parámetros se pueden obtener usando el método
Para los amantes de los scripts bash, este método puede parecer adecuado, pero para mí es difícil: los scripts bash no son mis favoritos. La siguiente forma es, en mi opinión, más interesante y más cómoda.
NiPyAPI
NiPyAPI es una biblioteca de Python para interactuar con instancias de NiFi.
Nuestro script para implementar la configuración es un programa Python. Pasemos a la codificación.
Configuramos configuraciones para seguir trabajando. Necesitaremos los siguientes parámetros:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
Además insertaré los nombres de los métodos de esta biblioteca, que se describen
Conectamos el registro a la instancia nifi usando
nipyapi.versioning.create_registry_client
En este paso, también puedes agregar una verificación de que el registro ya se haya agregado a la instancia, para esto puedes usar el método
nipyapi.versioning.list_registry_clients
Encontramos el balde para buscar más el flujo en la canasta.
nipyapi.versioning.get_registry_bucket
Según el cubo encontrado, buscamos flujo.
nipyapi.versioning.get_flow_in_bucket
A continuación, es importante comprender si este grupo de procesos ya se ha agregado. El grupo de procesos se coloca por coordenadas y puede surgir una situación en la que se superponga un segundo sobre uno. Lo verifiqué, puede ser 🙂 Para obtener todo el grupo de procesos agregado, use el método
nipyapi.canvas.list_all_process_groups
y luego podemos buscar, por ejemplo, por nombre.
No describiré el proceso de actualización de la plantilla, solo diré que si se agregan procesadores en la nueva versión de la plantilla, entonces no hay problemas con la presencia de mensajes en las colas. Pero si se eliminan los procesadores, pueden surgir problemas (nifi no permite la eliminación del procesador si se ha acumulado una cola de mensajes delante de él). Si está interesado en saber cómo resolví este problema, escríbame, por favor, discutiremos este punto. Contactos al final del artículo. Pasemos al paso de agregar un grupo de procesos.
Al depurar el script, encontré una característica que indica que la última versión del flujo no siempre se activa, por lo que recomiendo que primero aclares esta versión:
nipyapi.versioning.get_latest_flow_ver
Implementar grupo de procesos:
nipyapi.versioning.deploy_flow_version
Ponemos en marcha los procesadores:
nipyapi.canvas.schedule_process_group
¿En el bloque sobre CLI se escribió que la transferencia de datos no se habilita automáticamente en el grupo de procesos remotos? Al implementar el script, también encontré este problema. En ese momento, no pude iniciar la transferencia de datos usando la API y decidí escribirle al desarrollador de la biblioteca NiPyAPI y pedirle consejo/ayuda. El desarrollador me respondió, discutimos el problema y me escribió que necesitaba tiempo para "verificar algo". ¡¡¡Y luego, un par de días después, llega una carta en la que está escrita una función en Python que resuelve mi problema de lanzamiento!!! En ese momento, la versión de NiPyAPI era 0.13.3 y, por supuesto, no contenía nada de eso. Pero en la versión 0.14.0, que se lanzó recientemente, esta función ya estaba incluida en la biblioteca. Encontrarse,
nipyapi.canvas.set_remote_process_group_transmission
Entonces, utilizando la biblioteca NiPyAPI, conectamos el registro, implementamos el flujo e incluso iniciamos procesadores y transferencia de datos. Luego puedes peinar el código, agregar todo tipo de comprobaciones, registros y eso es todo. Pero esa es una historia completamente diferente.
De las opciones de automatización que consideré, la última me pareció la más eficiente. En primer lugar, esto sigue siendo código Python, en el que puedes incrustar código de programa auxiliar y disfrutar de todos los beneficios de un lenguaje de programación. En segundo lugar, el proyecto NiPyAPI se está desarrollando activamente y, en caso de problemas, puede escribir al desarrollador. En tercer lugar, NiPyAPI sigue siendo una herramienta más flexible para interactuar con NiFi para resolver problemas complejos. Por ejemplo, para determinar si las colas de mensajes están actualmente vacías en el flujo y si es posible actualizar el grupo de procesos.
Eso es todo. Describí tres enfoques para automatizar la entrega de flujo en NiFi, los obstáculos que puede encontrar un desarrollador y proporcioné un código de trabajo para automatizar la entrega. Si estás tan interesado en este tema como yo...
Fuente: habr.com