Apache NIFI: una breve descripción general de las oportunidades en la práctica

introducción

Sucedió que en mi lugar de trabajo actual tuve que familiarizarme con esta tecnología. Empezaré con un poco de historia. En la siguiente reunión, le dijeron a nuestro equipo que necesitábamos crear integración con sistema conocido. Por integración se entendía que este conocido sistema nos enviaría solicitudes a través de HTTP a un punto final específico y nosotros, curiosamente, enviaríamos respuestas en forma de mensaje SOAP. Todo parece simple y trivial. De esto se deduce que necesitas...

Tarea

Crea 3 servicios. El primero de ellos es el Servicio de Actualización de Bases de Datos. Este servicio, cuando llegan nuevos datos desde un sistema de terceros, actualiza los datos de la base de datos y genera un archivo en formato CSV para transferirlo al siguiente sistema. El punto final del segundo servicio se llama: Servicio de transporte FTP, que recibe el archivo transferido, lo valida y lo almacena a través de FTP. El tercer servicio, el servicio de transferencia de datos del consumidor, funciona de forma asincrónica con los dos primeros. Recibe una solicitud de un sistema externo de terceros para recibir el archivo discutido anteriormente, toma el archivo de respuesta listo, lo modifica (actualiza los campos id, descripción, linkToFile) y envía la respuesta en forma de mensaje SOAP. Es decir, el panorama general es el siguiente: los dos primeros servicios comienzan a funcionar solo cuando llegan los datos para la actualización. El tercer servicio funciona constantemente porque hay muchos consumidores de información, alrededor de 1000 solicitudes de datos por minuto. Los servicios están disponibles constantemente y sus instancias están ubicadas en diferentes entornos, como prueba, demostración, preproducción y producción. A continuación se muestra un diagrama de cómo funcionan estos servicios. Permítanme aclarar de inmediato que se han simplificado algunos detalles para evitar complejidades innecesarias.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Profundización Técnica

Al planificar una solución al problema, primero decidimos crear aplicaciones en Java utilizando el marco Spring, el equilibrador Nginx, la base de datos Postgres y otras cosas técnicas y no tan técnicas. Dado que el momento de desarrollar una solución técnica nos permitió considerar otros enfoques para resolver este problema, nuestra mirada se posó en la tecnología Apache NIFI, que está de moda en ciertos círculos. Diré de inmediato que esta tecnología nos permitió notar estos 3 servicios. Este artículo describirá el desarrollo de un servicio de transporte de archivos y un servicio de transferencia de datos al consumidor, pero si el artículo es útil, escribiré sobre el servicio de actualización de datos en la base de datos.

¿Qué es esto

NIFI es una arquitectura distribuida para carga y procesamiento de datos en paralelo rápido, una gran cantidad de complementos para fuentes y transformaciones, control de versiones de configuraciones y mucho más. Una buena ventaja es que es muy fácil de usar. Los procesos triviales como getFile, sendHttpRequest y otros se pueden representar como cuadrados. Cada cuadrado representa un proceso, cuya interacción se puede ver en la siguiente figura. Se ha escrito documentación más detallada sobre las interacciones de configuración de procesos. aquí , para los que hablan ruso - aquí. La documentación describe perfectamente cómo descomprimir y ejecutar NIFI, así como también cómo crear procesos, también conocidos como cuadrados.
La idea de escribir un artículo nació después de una larga búsqueda y estructuración de la información recibida en algo consciente, así como del deseo de hacer la vida un poco más fácil a los futuros desarrolladores.

ejemplo

Se considera un ejemplo de cómo interactúan los cuadrados entre sí. El esquema general es bastante simple: recibimos una solicitud HTTP (en teoría, con un archivo en el cuerpo de la solicitud. Para demostrar las capacidades de NIFI, en este ejemplo la solicitud inicia el proceso de recibir un archivo del almacenamiento de archivos local). ), luego enviamos una respuesta de que la solicitud ha sido recibida, en paralelo el proceso de recibir un archivo de FH y luego el proceso de moverlo vía FTP a FH. Vale aclarar que los procesos interactúan entre sí a través del llamado flowFile. Esta es la entidad base en NIFI que almacena atributos y contenido. El contenido son los datos representados por el archivo continuo. Es decir, a grandes rasgos, si recibes un archivo de un cuadrado y lo transfieres a otro, el contenido será tu archivo.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Como puede ver, esta imagen muestra el proceso general. HandleHttpRequest: acepta solicitudes, ReemplazoTexto: genera un cuerpo de respuesta, HandleHttpResponse: envía una respuesta. FetchFile: recibe un archivo de un almacenamiento de archivos, lo transfiere al cuadrado PutSftp: coloca este archivo en FTP, en la dirección especificada. Ahora más sobre este proceso.

En este caso, la petición es el comienzo de todo. Veamos sus parámetros de configuración.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Todo aquí es bastante trivial, con la excepción de StandardHttpContextMap: este es un tipo de servicio que le permite enviar y recibir solicitudes. Con más detalle e incluso con ejemplos, puedes ver: aquí

A continuación, veamos los parámetros de configuración de ReemplazarTexto del cuadrado. Vale la pena prestar atención a ReemplazoValue: esto es lo que se devolverá al usuario en forma de respuesta. En la configuración, puede ajustar el nivel de registro, puede ver los registros {donde descomprimió nifi}/nifi-1.9.2/logs, también hay parámetros de falla/éxito; en función de estos parámetros, puede regular el proceso en su conjunto. . Es decir, en el caso de un procesamiento de texto exitoso, se llamará al proceso de enviar una respuesta al usuario y, en otro caso, simplemente registraremos el proceso fallido.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

No hay nada particularmente interesante en las propiedades de HandleHttpResponse excepto el estado cuando una respuesta se crea exitosamente.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Hemos resuelto la solicitud y la respuesta; pasemos a recibir el archivo y colocarlo en el servidor FTP. FetchFile: recibe un archivo en la ruta especificada en la configuración y lo pasa al siguiente proceso.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Y luego el cuadrado PutSftp: coloca el archivo en el almacenamiento de archivos. Podemos ver los parámetros de configuración a continuación.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Vale la pena prestar atención al hecho de que cada casilla es un proceso separado que debe iniciarse. Analizamos el ejemplo más simple que no requiere ninguna personalización compleja. A continuación, veremos el proceso un poco más complicado, donde escribiremos un poco en los surcos.

Ejemplo más complejo

El servicio de transferencia de datos al consumidor resultó un poco más complicado debido al proceso de modificación del mensaje SOAP. El proceso general se muestra en la siguiente figura.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Aquí la idea tampoco es particularmente complicada: recibimos una solicitud del consumidor de que necesitaba datos, enviamos una respuesta diciendo que había recibido un mensaje, iniciamos el proceso de recepción del archivo de respuesta, luego lo editamos con cierta lógica y luego transfirió el archivo al consumidor en forma de mensaje SOAP al servidor.

Creo que no es necesario volver a describir los cuadrados que vimos arriba; pasemos directamente a los nuevos. Si necesita editar algún archivo y los cuadrados normales del tipo ReemplazarTexto no son adecuados, tendrá que escribir su propio script. Esto se puede hacer usando el cuadrado ExecuteGroogyScript. Sus configuraciones se presentan a continuación.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Hay dos opciones para cargar el script en este cuadrado. La primera es descargando un archivo con un script. El segundo es insertando un script en scriptBody. Hasta donde yo sé, el cuadrado de ejecución de script admite varios idiomas, uno de ellos es maravilloso. Decepcionaré a los desarrolladores de Java: no se pueden escribir scripts en Java en esos cuadrados. Para aquellos que realmente quieran, deben crear su propio cuadrado personalizado y agregarlo al sistema NIFI. Toda esta operación va acompañada de un baile bastante largo con pandero, del que no nos ocuparemos en este artículo. Elegí el lenguaje maravilloso. A continuación se muestra un script de prueba que simplemente actualiza incrementalmente la identificación en un mensaje SOAP. Es importante tener en cuenta. Tomas el archivo de flowFile y lo actualizas, no olvides que debes volver a colocarlo allí, actualizado. También vale la pena señalar que no todas las bibliotecas están incluidas. Puede suceder que aún tengas que importar una de las bibliotecas. Otra desventaja es que el script en este cuadrado es bastante difícil de depurar. Hay una manera de conectarse a NIFI JVM e iniciar el proceso de depuración. Personalmente, inicié una aplicación local y simulé recibir un archivo de la sesión. También hice la depuración local. Los errores que aparecen al cargar un script son bastante fáciles de buscar en Google y el propio NIFI los escribe en el registro.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

En realidad, aquí es donde termina la personalización del cuadrado. A continuación, el archivo actualizado se transfiere al cuadrado, que se encarga de enviar el archivo al servidor. A continuación se muestran los ajustes para este cuadrado.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Describimos el método mediante el cual se transmitirá un mensaje SOAP. Escribimos dónde. A continuación debes indicar que se trata de SOAP.

Apache NIFI: una breve descripción general de las oportunidades en la práctica

Agregue varias propiedades como host y acción (soapAction). Guardamos y comprobamos. Puedes ver más detalles sobre cómo enviar solicitudes SOAP aquí

Analizamos varias opciones para utilizar procesos NIFI. ¿Cómo interactúan y cuál es su beneficio real? Los ejemplos considerados son de prueba y son ligeramente diferentes de lo que realmente sucede en combate. Espero que este artículo sea un poco útil para los desarrolladores. Gracias por su atención. Si tienes alguna pregunta, escribe. Intentaré responder.

Fuente: habr.com

Añadir un comentario