Apache NIFI: una breu visió general de les oportunitats a la pràctica

Introducció

Va passar que al meu lloc de treball actual m'havia de familiaritzar amb aquesta tecnologia. Començaré amb una mica de fons. A la següent reunió, es va dir al nostre equip que havíem de crear una integració sistema conegut. Per integració es volia dir que aquest sistema conegut ens enviava sol·licituds via HTTP a un punt final específic i nosaltres, curiosament, enviaríem respostes en forma de missatge SOAP. Tot sembla senzill i trivial. D'això es dedueix que necessiteu...

Tasca

Crea 3 serveis. El primer d'ells és el servei d'actualització de bases de dades. Aquest servei, quan arriben noves dades d'un sistema de tercers, actualitza les dades de la base de dades i genera un fitxer en format CSV per transferir-lo al següent sistema. El punt final del segon servei s'anomena: el servei de transport FTP, que rep el fitxer transferit, el valida i el col·loca en un emmagatzematge de fitxers mitjançant FTP. El tercer servei, el servei de transferència de dades del consumidor, funciona de manera asíncrona amb els dos primers. Rep una sol·licitud d'un sistema extern de tercers per rebre el fitxer comentat anteriorment, agafa el fitxer de resposta llest, el modifica (actualitza l'identificador, la descripció, els camps linkToFile) i envia la resposta en forma de missatge SOAP. És a dir, el panorama general és el següent: els dos primers serveis només comencen a treballar quan han arribat les dades per a l'actualització. El tercer servei funciona constantment perquè hi ha molts consumidors d'informació, unes 1000 peticions de dades per minut. Els serveis estan disponibles constantment i les seves instàncies es troben en diferents entorns, com ara proves, demostracions, preproducció i producció. A continuació es mostra un diagrama de com funcionen aquests serveis. Permeteu-me aclarir de seguida que alguns detalls s'han simplificat per evitar una complexitat innecessària.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Aprofundiment tècnic

En planificar una solució al problema, primer vam decidir fer aplicacions en Java utilitzant el marc Spring, l'equilibrador Nginx, la base de dades Postgres i altres coses tècniques i no tan tècniques. Atès que el moment de desenvolupar una solució tècnica ens va permetre plantejar altres enfocaments per resoldre aquest problema, la nostra mirada va caure en la tecnologia Apache NIFI, que està de moda en certs cercles. De seguida diré que aquesta tecnologia ens va permetre notar aquests 3 serveis. Aquest article descriurà el desenvolupament d'un servei de transport d'arxius i un servei de transferència de dades al consumidor, però si l'article és útil, escriuré sobre el servei d'actualització de dades a la base de dades.

Què és?

NIFI és una arquitectura distribuïda per a la càrrega i processament paral·lel ràpid de dades, un gran nombre de connectors per a fonts i transformacions, versions de configuracions i molt més. Un bon avantatge és que és molt fàcil d'utilitzar. Els processos trivials com getFile, sendHttpRequest i altres es poden representar com a quadrats. Cada quadrat representa un procés, la interacció del qual es pot veure a la figura següent. S'ha escrit una documentació més detallada sobre les interaccions de configuració del procés aquí , per als que parlen rus - aquí. La documentació descriu perfectament com desempaquetar i executar NIFI, així com com crear processos, també coneguts com a quadrats.
La idea d'escriure un article va néixer després d'una llarga recerca i estructuració de la informació rebuda en quelcom conscient, així com el desig de facilitar una mica la vida als futurs desenvolupadors.

Exemple

Es considera un exemple de com interactuen els quadrats entre si. L'esquema general és bastant simple: rebem una sol·licitud HTTP (En teoria, amb un fitxer al cos de la sol·licitud. Per demostrar les capacitats de NIFI, en aquest exemple la sol·licitud inicia el procés de recepció d'un fitxer des de l'emmagatzematge de fitxers local). ), després enviem una resposta que s'ha rebut la sol·licitud, paral·lelament el procés de recepció d'un fitxer de FH i després el procés de traslladar-lo per FTP a FH. Val la pena aclarir que els processos interactuen entre ells mitjançant l'anomenat flowFile. Aquesta és l'entitat base a NIFI que emmagatzema atributs i contingut. El contingut són les dades que es representen pel fitxer de flux. És a dir, a grans trets, si rebeu un fitxer d'un quadrat i el transferiu a un altre, el contingut serà el vostre fitxer.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Com podeu veure, aquesta imatge mostra el procés general. HandleHttpRequest: accepta peticions, ReplaceText: genera un cos de resposta, HandleHttpResponse: envia una resposta. FetchFile - rep un fitxer d'un emmagatzematge de fitxers, el transfereix al quadrat PutSftp - posa aquest fitxer a FTP, a l'adreça especificada. Ara més sobre aquest procés.

En aquest cas, la petició és el principi de tot. Vegem els seus paràmetres de configuració.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Tot aquí és bastant trivial amb l'excepció de StandardHttpContextMap: aquest és un tipus de servei que us permet enviar i rebre sol·licituds. Amb més detall i fins i tot amb exemples, podeu veure - aquí

A continuació, mirem els paràmetres de configuració de ReplaceText del quadrat. Val la pena parar atenció a ReplacementValue: això és el que es retornarà a l'usuari en forma de resposta. A la configuració podeu ajustar el nivell de registre, podeu veure els registres {on heu desempaquetat nifi}/nifi-1.9.2/logs, també hi ha paràmetres d'error/èxit; en funció d'aquests paràmetres podeu regular el procés en conjunt. . És a dir, en el cas d'un processament de text amb èxit, s'anomenarà el procés d'enviament d'una resposta a l'usuari i, en un altre cas, simplement registrarem el procés infructuós.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

No hi ha res especialment interessant a les propietats de HandleHttpResponse excepte l'estat quan es crea correctament una resposta.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Hem resolt la sol·licitud i la resposta; passem a rebre el fitxer i col·locar-lo al servidor FTP. FetchFile: rep un fitxer a la ruta especificada a la configuració i el passa al següent procés.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

I després el quadrat PutSftp: col·loca el fitxer a l'emmagatzematge de fitxers. Podem veure els paràmetres de configuració a continuació.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Val la pena parar atenció al fet que cada quadrat és un procés independent que s'ha de posar en marxa. Hem observat l'exemple més senzill que no requereix cap personalització complexa. A continuació, veurem el procés una mica més complicat, on escriurem una mica sobre els solcs.

Exemple més complex

El servei de transferència de dades al consumidor va resultar una mica més complicat a causa del procés de modificació del missatge SOAP. El procés general es mostra a la figura següent.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Aquí la idea tampoc és especialment complicada: vam rebre una sol·licitud del consumidor que necessitava dades, vam enviar una resposta que havia rebut un missatge, vam iniciar el procés de recepció del fitxer de resposta, després l'hem editat amb una certa lògica i després transferit el fitxer al consumidor en forma de missatge SOAP al servidor.

Crec que no cal tornar a descriure els quadrats que hem vist més amunt; passem directament als nous. Si necessiteu editar qualsevol fitxer i els quadrats de tipus ReplaceText normals no són adequats, haureu d'escriure el vostre propi script. Això es pot fer mitjançant el quadrat ExecuteGroogyScript. La seva configuració es presenta a continuació.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Hi ha dues opcions per carregar l'script en aquest quadrat. El primer és baixant un fitxer amb un script. El segon és inserint un script a scriptBody. Pel que jo sé, el quadrat executeScript admet diversos idiomes, un d'ells és groovy. Deceberé els desenvolupadors de Java: no podeu escriure scripts en Java en aquests quadrats. Per a aquells que realment ho vulguin, heu de crear el vostre propi quadrat personalitzat i afegir-lo al sistema NIFI. Tota aquesta operació va acompanyada d'un ball força llarg amb un tamborí, que no tractarem en aquest article. Vaig triar el llenguatge groovy. A continuació es mostra un script de prova que simplement actualitza gradualment l'identificador en un missatge SOAP. És important tenir en compte. Agafeu el fitxer de flowFile i l'actualitzeu, no oblideu que l'heu de tornar a posar allà, actualitzat. També val la pena assenyalar que no s'inclouen totes les biblioteques. Pot passar que encara hàgiu d'importar una de les biblioteques. Un altre inconvenient és que l'script d'aquest quadrat és bastant difícil de depurar. Hi ha una manera de connectar-se a la JVM NIFI i iniciar el procés de depuració. Personalment, vaig llançar una aplicació local i vaig simular rebre un fitxer de la sessió. També vaig fer depuració local. Els errors que apareixen en carregar un script són bastant fàcils per a Google i són escrits pel mateix NIFI al registre.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

De fet, aquí acaba la personalització del quadrat. A continuació, el fitxer actualitzat es transfereix al quadrat, que s'encarrega d'enviar el fitxer al servidor. A continuació es mostren la configuració d'aquest quadrat.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Descrivim el mètode pel qual es transmetrà un missatge SOAP. Escrivim on. A continuació, heu d'indicar que això és SOAP.

Apache NIFI: una breu visió general de les oportunitats a la pràctica

Afegiu diverses propietats com ara host i action (soapAction). Guardem i comprovem. Podeu veure més detalls sobre com enviar sol·licituds SOAP aquí

Hem analitzat diverses opcions per utilitzar processos NIFI. Com interactuen i quin és el seu benefici real? Els exemples considerats són de prova i són lleugerament diferents del que passa realment en combat. Espero que aquest article sigui una mica útil per als desenvolupadors. Gràcies per la vostra atenció. Si tens alguna pregunta, escriu. Intentaré respondre.

Font: www.habr.com

Afegeix comentari