Apache NIFI - Una breve panoramica delle opportunità nella pratica

Introduzione

È successo così che nel mio attuale posto di lavoro ho dovuto familiarizzare con questa tecnologia. Inizierò con un po' di background. Nella riunione successiva, al nostro team è stato detto che dovevamo creare integrazione sistema conosciuto. Per integrazione si intendeva che questo noto sistema ci avrebbe inviato richieste tramite HTTP a un endpoint specifico e noi, stranamente, avremmo rispedito le risposte sotto forma di messaggio SOAP. Tutto sembra semplice e banale. Da ciò ne consegue che è necessario...

Compito

Crea 3 servizi. Il primo di questi è il servizio di aggiornamento del database. Questo servizio, quando arrivano nuovi dati da un sistema di terze parti, aggiorna i dati nel database e genera un file in formato CSV per trasferirlo al sistema successivo. L'endpoint del secondo servizio è chiamato Servizio di trasporto FTP, che riceve il file trasferito, lo convalida e lo inserisce nell'archivio file tramite FTP. Il terzo servizio, il servizio di trasferimento dati del consumatore, funziona in modo asincrono con i primi due. Riceve una richiesta da un sistema esterno di terze parti per ricevere il file discusso sopra, prende il file di risposta pronto, lo modifica (aggiorna i campi id, descrizione, linkToFile) e invia la risposta sotto forma di messaggio SOAP. Cioè, il quadro generale è il seguente: i primi due servizi iniziano a funzionare solo quando arrivano i dati per l'aggiornamento. Il terzo servizio funziona costantemente perché i consumatori di informazioni sono molti, circa 1000 richieste di dati al minuto. I servizi sono disponibili costantemente e le loro istanze si trovano in ambienti diversi, come test, demo, pre-produzione e prod. Di seguito è riportato uno schema di come funzionano questi servizi. Chiarisco subito che alcuni dettagli sono stati semplificati per evitare inutili complessità.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Approfondimento tecnico

Quando pianificavamo una soluzione al problema, abbiamo innanzitutto deciso di realizzare applicazioni in Java utilizzando il framework Spring, il bilanciatore Nginx, il database Postgres e altre cose tecniche e non così tecniche. Poiché il momento in cui abbiamo sviluppato una soluzione tecnica ci ha permesso di considerare altri approcci per risolvere questo problema, il nostro sguardo è caduto sulla tecnologia Apache NIFI, che è di moda in alcuni ambienti. Dirò subito che questa tecnologia ci ha permesso di notare questi 3 servizi. Questo articolo descriverà lo sviluppo di un servizio di trasporto file e di un servizio di trasferimento dati al consumatore, ma se l'articolo sarà utile scriverò del servizio di aggiornamento dei dati nel database.

Che cos'è questo

NIFI è un'architettura distribuita per il caricamento e l'elaborazione parallela rapida dei dati, un gran numero di plug-in per origini e trasformazioni, controllo delle versioni delle configurazioni e molto altro. Un bel vantaggio è che è molto facile da usare. Processi banali come getFile, sendHttpRequest e altri possono essere rappresentati come quadrati. Ogni quadrato rappresenta un processo, la cui interazione può essere vista nella figura seguente. È stata scritta una documentazione più dettagliata sulle interazioni di impostazione del processo qui , per chi parla russo - qui. La documentazione descrive perfettamente come decomprimere ed eseguire NIFI, nonché come creare processi, noti anche come quadrati
L'idea di scrivere un articolo è nata dopo una lunga ricerca e strutturazione delle informazioni ricevute in qualcosa di consapevole, nonché dal desiderio di rendere la vita un po' più facile ai futuri sviluppatori..

esempio

Viene considerato un esempio di come i quadrati interagiscono tra loro. Lo schema generale è abbastanza semplice: riceviamo una richiesta HTTP (in teoria, con un file nel corpo della richiesta. Per dimostrare le capacità di NIFI, in questo esempio la richiesta avvia il processo di ricezione di un file dall'archivio file locale ), quindi inviamo una risposta che la richiesta è stata ricevuta, parallelamente al processo di ricezione di un file da FH e quindi al processo di spostamento tramite FTP a FH. È bene chiarire che i processi interagiscono tra loro attraverso il cosiddetto flowFile. Questa è l'entità di base in NIFI che memorizza attributi e contenuti. Il contenuto è i dati rappresentati dal file di flusso. Cioè, grosso modo, se ricevi un file da un quadrato e lo trasferisci in un altro, il contenuto sarà il tuo file.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Come puoi vedere, questa immagine mostra il processo generale. HandleHttpRequest: accetta richieste, replaceText: genera un corpo della risposta, HandleHttpResponse: invia una risposta. FetchFile - riceve un file da un archivio file, lo trasferisce sul quadrato PutSftp - mette questo file su FTP, all'indirizzo specificato. Ora di più su questo processo.

In questo caso la richiesta è l'inizio di tutto. Diamo un'occhiata ai suoi parametri di configurazione.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Tutto qui è abbastanza banale ad eccezione di StandardHttpContextMap: questo è un tipo di servizio che ti consente di inviare e ricevere richieste. Più in dettaglio e anche con esempi, puoi vedere: qui

Successivamente, diamo un'occhiata ai parametri di configurazione replaceText del quadrato. Vale la pena prestare attenzione al valore di sostituzione: questo è ciò che verrà restituito all'utente sotto forma di risposta. Nelle impostazioni puoi regolare il livello di registrazione, puoi vedere i log {dove hai decompresso nifi}/nifi-1.9.2/logs, ci sono anche parametri di fallimento/successo - in base a questi parametri puoi regolare il processo nel suo complesso . Cioè, in caso di elaborazione del testo riuscita, verrà chiamato il processo di invio di una risposta all'utente e in un altro caso registreremo semplicemente il processo non riuscito.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Non c'è nulla di particolarmente interessante nelle proprietà HandleHttpResponse tranne lo stato quando una risposta viene creata con successo.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Abbiamo risolto la richiesta e la risposta: passiamo alla ricezione del file e al posizionamento sul server FTP. FetchFile: riceve un file nel percorso specificato nelle impostazioni e lo passa al processo successivo.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

E poi il quadrato PutSftp: inserisce il file nell'archivio file. Di seguito possiamo vedere i parametri di configurazione.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Vale la pena prestare attenzione al fatto che ogni quadrato è un processo separato che deve essere avviato. Abbiamo esaminato l'esempio più semplice che non richiede alcuna personalizzazione complessa. Successivamente, esamineremo il processo un po’ più complicato, dove scriveremo un po’ sui solchi.

Esempio più complesso

Il servizio di trasferimento dati al consumatore si è rivelato un po' più complicato a causa del processo di modifica del messaggio SOAP. Il processo generale è mostrato nella figura seguente.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Anche qui l'idea non è particolarmente complicata: abbiamo ricevuto una richiesta dal consumatore che aveva bisogno di dati, abbiamo inviato una risposta che aveva ricevuto un messaggio, abbiamo avviato il processo di ricezione del file di risposta, quindi lo abbiamo modificato con una certa logica, quindi ha trasferito il file al consumatore sotto forma di messaggio SOAP al server.

Penso che non sia necessario descrivere nuovamente i quadrati che abbiamo visto sopra: passiamo direttamente a quelli nuovi. Se devi modificare un file e i normali quadrati di tipo Sostituisci testo non sono adatti, dovrai scrivere il tuo script. Questo può essere fatto utilizzando il quadrato ExecuteGroogyScript. Le sue impostazioni sono presentate di seguito.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Ci sono due opzioni per caricare lo script in questo quadrato. Il primo è scaricare un file con uno script. Il secondo consiste nell'inserire uno script in scriptBody. Per quanto ne so, l'executiveScript Square supporta diverse lingue, una delle quali è fantastica. Deluderò gli sviluppatori Java: non puoi scrivere script in Java in tali quadrati. Per chi lo desidera davvero, è necessario creare il proprio quadrato personalizzato e aggiungerlo al sistema NIFI. Tutta questa operazione è accompagnata da una danza piuttosto lunga con il tamburello, di cui non parleremo in questo articolo. Ho scelto il linguaggio groovy. Di seguito è riportato uno script di test che aggiorna semplicemente in modo incrementale l'ID in un messaggio SOAP. È importante notare. Prendi il file da flowFile e aggiornalo, non dimenticare che devi rimetterlo lì, aggiornato. Vale anche la pena notare che non tutte le librerie sono incluse. Può succedere che tu debba ancora importare una delle librerie. Un altro svantaggio è che lo script in questo quadrato è piuttosto difficile da eseguire il debug. Esiste un modo per connettersi alla JVM NIFI e avviare il processo di debug. Personalmente, ho lanciato un'applicazione locale e ho simulato la ricezione di un file dalla sessione. Ho anche eseguito il debug localmente. Gli errori che compaiono durante il caricamento di uno script sono abbastanza facili per Google e vengono scritti da NIFI stessa nel registro.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

In realtà è qui che finisce la personalizzazione del quadrato. Successivamente, il file aggiornato viene trasferito a Square, che è responsabile dell'invio del file al server. Di seguito sono riportate le impostazioni per questo quadrato.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Descriviamo il metodo con cui verrà trasmesso un messaggio SOAP. Scriviamo dove. Successivamente è necessario indicare che si tratta di SOAP.

Apache NIFI - Una breve panoramica delle opportunità nella pratica

Aggiungi diverse proprietà come host e azione (soapAction). Salviamo e controlliamo. Puoi vedere maggiori dettagli su come inviare richieste SOAP qui

Abbiamo esaminato diverse opzioni per l'utilizzo dei processi NIFI. Come interagiscono e qual è il loro reale vantaggio? Gli esempi considerati sono di prova e sono leggermente diversi da ciò che realmente accade in combattimento. Spero che questo articolo sia un po' utile per gli sviluppatori. Grazie per l'attenzione. Se hai domande, scrivi. Proverò a rispondere.

Fonte: habr.com

Aggiungi un commento