Apache NIFI - O scurtă prezentare a oportunităților în practică

Introducere

S-a întâmplat că la locul meu actual de muncă a trebuit să mă familiarizez cu această tehnologie. Voi începe cu un mic fundal. La următoarea întâlnire, echipei noastre i sa spus că trebuie să creăm integrare cu sistem cunoscut. Prin integrare s-a înțeles că acest sistem bine-cunoscut ne va trimite solicitări prin HTTP către un anumit punct final, iar noi, destul de ciudat, vom trimite înapoi răspunsuri sub forma unui mesaj SOAP. Totul pare simplu și banal. De aici rezultă că aveți nevoie de...

Sarcină

Creați 3 servicii. Primul dintre ele este Serviciul de actualizare a bazei de date. Acest serviciu, când sosesc date noi de la un sistem terț, actualizează datele din baza de date și generează un fișier în format CSV pentru a-l transfera pe următorul sistem. Punctul final al celui de-al doilea serviciu este numit - Serviciul de transport FTP, care primește fișierul transferat, îl validează și îl pune în stocarea fișierelor prin FTP. Al treilea serviciu, serviciul de transfer de date pentru consumatori, funcționează asincron cu primele două. Primește o solicitare de la un sistem extern terț pentru a primi fișierul discutat mai sus, ia fișierul de răspuns gata, îl modifică (actualizează id-ul, descrierea, câmpurile linkToFile) și trimite răspunsul sub forma unui mesaj SOAP. Adică, imaginea de ansamblu este următoarea: primele două servicii își încep activitatea numai când au sosit datele pentru actualizare. Al treilea serviciu funcționează constant pentru că sunt mulți consumatori de informații, aproximativ 1000 de solicitări de date pe minut. Serviciile sunt disponibile în mod constant și instanțele lor sunt situate în medii diferite, cum ar fi testare, demonstrație, pre-producție și producție. Mai jos este o diagramă a modului în care funcționează aceste servicii. Permiteți-mi să clarific imediat că unele detalii au fost simplificate pentru a evita complexitatea inutilă.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Aprofundare tehnică

Când planificam o soluție la problemă, am decis mai întâi să facem aplicații în Java folosind framework-ul Spring, Nginx balancer, baza de date Postgres și alte lucruri tehnice și nu atât de tehnice. Întrucât timpul dezvoltării unei soluții tehnice ne-a permis să luăm în considerare și alte abordări pentru rezolvarea acestei probleme, privirea ne-a căzut asupra tehnologiei Apache NIFI, care este la modă în anumite cercuri. Voi spune imediat că această tehnologie ne-a permis să observăm aceste 3 servicii. Acest articol va descrie dezvoltarea unui serviciu de transport de fișiere și a unui serviciu de transfer de date către consumator, dar dacă articolul este util, voi scrie despre serviciul de actualizare a datelor din baza de date.

Ce este acesta

NIFI este o arhitectură distribuită pentru încărcarea și procesarea rapidă în paralel a datelor, un număr mare de plugin-uri pentru surse și transformări, versiunea configurațiilor și multe altele. Un bonus frumos este că este foarte ușor de utilizat. Procesele banale precum getFile, sendHttpRequest și altele pot fi reprezentate ca pătrate. Fiecare pătrat reprezintă un proces, a cărui interacțiune poate fi văzută în figura de mai jos. S-a scris o documentație mai detaliată despre interacțiunile de configurare a procesului aici , pentru cei care vorbesc rusa - aici. Documentația descrie perfect cum să despachetați și să rulați NIFI, precum și cum să creați procese, cunoscute și sub numele de pătrate
Ideea de a scrie un articol s-a născut după o lungă căutare și structurare a informațiilor primite în ceva conștient, precum și dorința de a face viața un pic mai ușoară viitorilor dezvoltatori.

Exemplu

Este luat în considerare un exemplu de modul în care pătratele interacționează între ele. Schema generală este destul de simplă: Primim o solicitare HTTP (În teorie, cu un fișier în corpul cererii. Pentru a demonstra capabilitățile NIFI, în acest exemplu solicitarea începe procesul de primire a unui fișier din stocarea locală a fișierelor ), apoi trimitem înapoi un răspuns că cererea a fost primită, în paralel procesul de primire a unui fișier de la FH și apoi procesul de mutare prin FTP la FH. Merită să clarificăm faptul că procesele interacționează între ele prin așa-numitul flowFile. Aceasta este entitatea de bază în NIFI care stochează atribute și conținut. Conținutul este datele care sunt reprezentate de fișierul flux. Adică, aproximativ vorbind, dacă primești un fișier dintr-un pătrat și îl transferi în altul, conținutul va fi fișierul tău.

Apache NIFI - O scurtă prezentare a oportunităților în practică

După cum puteți vedea, această imagine arată procesul general. HandleHttpRequest - acceptă cereri, ReplaceText - generează un corp de răspuns, HandleHttpResponse - trimite un răspuns. FetchFile - primește un fișier dintr-un fișier de stocare, îl transferă în pătratul PutSftp - pune acest fișier pe FTP, la adresa specificată. Acum mai multe despre acest proces.

În acest caz, cererea este începutul tuturor. Să ne uităm la parametrii săi de configurare.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Totul aici este destul de banal, cu excepția StandardHttpContextMap - acesta este un fel de serviciu care vă permite să trimiteți și să primiți cereri. Mai detaliat și chiar cu exemple, puteți vedea - aici

În continuare, să ne uităm la parametrii de configurare ReplaceText ai pătratului. Merită să acordați atenție ReplacementValue - acesta este ceea ce va fi returnat utilizatorului sub forma unui răspuns. În setări puteți regla nivelul de înregistrare, puteți vedea jurnalele {unde ați despachetat nifi}/nifi-1.9.2/logs, există și parametri de eșec/reușit - pe baza acestor parametri puteți regla procesul în ansamblu . Adică, în cazul procesării cu succes a textului, procesul de trimitere a unui răspuns către utilizator va fi apelat, iar într-un alt caz vom înregistra pur și simplu procesul nereușit.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Nu există nimic deosebit de interesant în proprietățile HandleHttpResponse, cu excepția stării când un răspuns este creat cu succes.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Am rezolvat cererea și răspunsul - să trecem la primirea fișierului și plasarea lui pe serverul FTP. FetchFile - primește un fișier pe calea specificată în setări și îl transmite procesului următor.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Și apoi pătratul PutSftp - plasează fișierul în stocarea fișierelor. Putem vedea mai jos parametrii de configurare.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Merită să acordați atenție faptului că fiecare pătrat este un proces separat care trebuie lansat. Ne-am uitat la cel mai simplu exemplu care nu necesită personalizare complexă. În continuare, ne vom uita la procesul puțin mai complicat, unde vom scrie puțin pe caneluri.

Exemplu mai complex

Serviciul de transfer de date către consumator s-a dovedit a fi puțin mai complicat din cauza procesului de modificare a mesajului SOAP. Procesul general este prezentat în figura de mai jos.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Nici aici ideea nu este deosebit de complicată: am primit o solicitare de la consumator că are nevoie de date, am trimis un răspuns că a primit un mesaj, am început procesul de primire a fișierului de răspuns, apoi l-am editat cu o anumită logică și apoi a transferat fișierul către consumator sub forma unui mesaj SOAP către server.

Cred că nu este nevoie să descriem din nou acele pătrate pe care le-am văzut mai sus - să trecem direct la cele noi. Dacă trebuie să editați orice fișier și pătratele obișnuite de tip ReplaceText nu sunt potrivite, va trebui să vă scrieți propriul script. Acest lucru se poate face folosind pătratul ExecuteGroogyScript. Setările sale sunt prezentate mai jos.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Există două opțiuni pentru încărcarea scriptului în acest pătrat. Primul este prin descărcarea unui fișier cu un script. Al doilea este prin inserarea unui script în scriptBody. Din câte știu, pătratul executeScript acceptă mai multe limbaje - unul dintre ele este groovy. Voi dezamăgi dezvoltatorii java - nu puteți scrie scripturi în java în astfel de pătrate. Pentru cei care doresc cu adevărat, trebuie să vă creați propriul pătrat personalizat și să îl adăugați în sistemul NIFI. Toată această operațiune este însoțită de un dans destul de lung cu tamburina, de care nu ne vom ocupa în acest articol. Am ales limbajul groovy. Mai jos este un script de testare care pur și simplu actualizează în mod incremental id-ul într-un mesaj SOAP. Este important de remarcat. Luați fișierul din flowFile și îl actualizați, nu uitați că trebuie să îl puneți înapoi acolo, actualizat. De asemenea, este de remarcat faptul că nu toate bibliotecile sunt incluse. Se poate întâmpla să fie încă necesar să importați una dintre biblioteci. Un alt dezavantaj este că scriptul din acest pătrat este destul de greu de depanat. Există o modalitate de a vă conecta la NIFI JVM și de a începe procesul de depanare. Personal, am lansat o aplicație locală și am simulat primirea unui fișier din sesiune. Am făcut și depanare local. Erorile care apar la încărcarea unui script sunt destul de ușoare pentru Google și sunt scrise chiar de NIFI în jurnal.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

De fapt, aici se termină personalizarea pătratului. Apoi, fișierul actualizat este transferat în pătrat, care este responsabil pentru trimiterea fișierului către server. Mai jos sunt setările pentru acest pătrat.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Descriem metoda prin care va fi transmis un mesaj SOAP. Scriem unde. Apoi trebuie să indicați că acesta este SOAP.

Apache NIFI - O scurtă prezentare a oportunităților în practică

Adăugați mai multe proprietăți, cum ar fi gazdă și acțiune (soapAction). Salvăm și verificăm. Puteți vedea mai multe detalii despre cum să trimiteți solicitări SOAP aici

Am analizat mai multe opțiuni pentru utilizarea proceselor NIFI. Cum interacționează ei și care este beneficiul lor real? Exemplele luate în considerare sunt cele de testare și sunt ușor diferite de ceea ce se întâmplă de fapt în luptă. Sper că acest articol va fi puțin util pentru dezvoltatori. Vă mulțumim pentru atenție. Dacă aveți întrebări, scrieți. Voi încerca să răspund.

Sursa: www.habr.com

Adauga un comentariu