Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Inleiding

Dit het so gebeur dat ek by my huidige werkplek met hierdie tegnologie kennis moes maak. Ek begin met 'n bietjie agtergrond. By die volgende saamtrek is ons span meegedeel dat ons 'n integrasie met bekende stelsel. Integrasie het beteken dat hierdie bekende stelsel vir ons versoeke via HTTP na 'n spesifieke eindpunt sou stuur, en, vreemd genoeg, sou ons antwoorde in die vorm van 'n SOAP-boodskap terugstuur. Alles blyk eenvoudig en onbenullig te wees. Wat hieruit volg is...

Taak

Skep 3 dienste. Die eerste een is die Database Update Service. Hierdie diens, by ontvangs van nuwe data van 'n derdeparty-stelsel, dateer die data in die databasis op en genereer 'n lêer in CSV-formaat om dit na die volgende stelsel oor te dra. Die eindpunt van die tweede diens word genoem - die FTP-vervoerdiens, wat die oorgedrade lêer ontvang, dit bekragtig en via FTP in die lêerberging plaas. Die derde diens - Data-oordragdiens aan die verbruiker, werk asynchronies met die eerste twee. Dit ontvang 'n versoek van 'n derdeparty eksterne stelsel om 'n lêer te ontvang, wat hierbo bespreek is, neem 'n klaargemaakte antwoordlêer, wysig dit (dateer die id, beskrywing, skakelToFile-velde op) en stuur 'n antwoord in die vorm van 'n SEEP boodskap. Dit wil sê, oor die algemeen is die prentjie soos volg: die eerste twee dienste begin hul werk eers wanneer die data vir opdatering aangekom het. Die derde diens werk voortdurend omdat daar baie verbruikers van inligting is, ongeveer 1000 versoeke vir data per minuut. Dienste is altyd beskikbaar en hul instansies is op verskillende omgewings geleë, soos toets, demo, preprod en prod. Hieronder is 'n diagram van hoe hierdie dienste werk. Laat ek dadelik verduidelik dat sommige besonderhede vereenvoudig word om onnodige kompleksiteit te vermy.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Tegniese verdieping

Toe ons 'n oplossing vir die probleem beplan het, het ons eers besluit om toepassings in java te maak deur die Spring-raamwerk, die Nginx-balanseerder, die Postgres-databasis en ander tegniese en nie baie tegniese dinge te gebruik nie. Aangesien die tyd om 'n tegniese oplossing te ontwikkel dit moontlik gemaak het om ander benaderings te oorweeg om hierdie probleem op te los, het die voorkoms geval op Apache NIFI-tegnologie, modieus in sekere kringe. Ek moet dadelik sê dat hierdie tegnologie ons toegelaat het om hierdie 3 dienste raak te sien. Hierdie artikel sal die ontwikkeling van 'n lêervervoerdiens en 'n data-oordragdiens aan die verbruiker beskryf, maar as die artikel inkom, sal ek oor die data-opdateringsdiens in die databasis skryf.

Wat is dit

NIFI is 'n verspreide argitektuur vir vinnige parallelle laai en verwerking van data, 'n groot aantal inproppe vir bronne en transformasies, konfigurasieweergawe en nog baie meer. 'n Goeie bonus is dat dit baie maklik is om te gebruik. Triviale prosesse soos getFile, sendHttpRequest en ander kan as vierkante voorgestel word. Elke vierkant verteenwoordig 'n sekere proses, waarvan die interaksie in die figuur hieronder gesien kan word. Meer gedetailleerde dokumentasie oor die proses-opstelling-interaksie word geskryf hier , vir diegene wat in Russies - hier. Die dokumentasie beskryf perfek hoe om NIFI uit te pak en uit te voer, asook hoe om prosesse te skep, dit is ook vierkante
Die idee om 'n artikel te skryf is gebore na 'n lang soektog en die strukturering van die inligting wat ontvang word in iets bewus, sowel as die begeerte om die lewe 'n bietjie makliker te maak vir toekomstige ontwikkelaars.

Voorbeeld

'n Voorbeeld van hoe vierkante met mekaar in wisselwerking is, word beskou. Die algemene skema is redelik eenvoudig: Ons ontvang 'n HTTP-versoek (In teorie, met 'n lêer in die versoekliggaam. Om die vermoëns van NIFI te demonstreer, in hierdie voorbeeld, begin die versoek die proses om 'n lêer van die plaaslike FH te verkry), dan stuur ons 'n antwoord terug dat die versoek ontvang is, parallel met die proses om 'n lêer van FH te bekom en dan die proses om dit via FTP na FH te skuif. Dit is die moeite werd om te verduidelik dat die prosesse met mekaar interaksie het deur die sogenaamde flowFile. Dit is die basiese entiteit in NIFI wat eienskappe en inhoud stoor. Inhoud - die data wat deur die stroomlêer verteenwoordig word. Dit wil sê, rofweg gesproke, as jy 'n lêer van een vierkant ontvang het en dit na 'n ander oordra, sal jou lêer die inhoud wees.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Soos u kan sien, toon hierdie prentjie die algehele proses. HandleHttpRequest - aanvaar versoeke, ReplaceText - genereer 'n antwoordliggaam, HandleHttpResponse - gee 'n antwoord. FetchFile - ontvang 'n lêer vanaf die lêerberging en dra dit oor na die PutSftp-vierkant - plaas hierdie lêer op FTP, by die gespesifiseerde adres. Nou meer oor hierdie proses.

In hierdie geval is versoek die begin van alles. Kom ons kyk na die konfigurasieopsies daarvan.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Alles hier is redelik triviaal, met die uitsondering van StandartHttpContextMap - dit is 'n soort diens waarmee u versoeke kan stuur en ontvang. Vir meer besonderhede en selfs met voorbeelde, kan jy sien - hier

Kom ons kyk dan na die ReplaceText-vierkant-konfigurasie-opsies. Dit is die moeite werd om aandag te skenk aan Vervangingswaarde - dit is wat as 'n antwoord aan die gebruiker teruggestuur sal word. In instellings kan jy die aantekenvlak aanpas, jy kan die logs sien {waar nifi uitgepak is}/nifi-1.9.2/logs, daar is ook mislukking / sukses parameters - gebaseer op hierdie parameters, kan jy die proses beheer as 'n hele. Dit wil sê, in die geval van suksesvolle teksverwerking, sal die proses om 'n antwoord aan die gebruiker te stuur, opgeroep word, en in die ander geval belowe ons bloot die onsuksesvolle proses.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Daar is niks besonder interessant in die HandleHttpResponse-eienskappe nie, behalwe vir die status toe die antwoord suksesvol geskep is.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Ons het die versoekreaksie uitgepluis - kom ons gaan voort om die lêer te ontvang en dit op die FTP-bediener te plaas. FetchFile - ontvang 'n lêer vanaf die pad wat in die instellings gespesifiseer is en dra dit oor na die volgende proses.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

En dan die vierkantige PutSftp - plaas die lêer in die lêerberging. Die konfigurasie-opsies kan hieronder gesien word.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Dit is die moeite werd om aandag te skenk aan die feit dat elke vierkant 'n aparte proses is wat van stapel gestuur moet word. Ons het die eenvoudigste voorbeeld oorweeg wat geen ingewikkelde aanpassing vereis nie. Vervolgens sal ons die proses 'n bietjie meer ingewikkeld beskou, waar ons 'n bietjie oor die groewe sal skryf.

Meer komplekse voorbeeld

Die data-oordragdiens aan die verbruiker word 'n bietjie meer ingewikkeld gemaak deur die proses om die SOAP-boodskap te wysig. Die algehele proses word in die figuur hieronder getoon.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Hier is die idee ook nie baie ingewikkeld nie: ons het 'n versoek van die verbruiker ontvang dat hy data benodig, 'n antwoord gestuur dat hy 'n boodskap ontvang het, die proses begin om die antwoordlêer te ontvang, dit dan met sekere logika geredigeer en toe oorgedra die lêer aan die verbruiker in die vorm van 'n SOAP-boodskap aan die bediener.

Ek dink dit is nie die moeite werd om weer daardie blokkies wat ons hierbo gesien het te beskryf nie - kom ons gaan dadelik aan na die nuwes. As jy enige lêer moet redigeer en gewone blokkies soos ReplaceText is nie geskik nie, sal jy jou eie skrif moet skryf. Dit kan gedoen word deur die ExecuteGroogyScript-vierkant te gebruik. Die instellings daarvan word hieronder getoon.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Daar is twee opsies om die skrif in hierdie blokkie te laai. Die eerste is deur 'n lêer met 'n skrif op te laai. Die tweede is deur die skrif in die scriptBody in te voeg. Sover ek weet, ondersteun die executeScript-vierkant verskeie PL's - een van hulle is groovy. Ek sal java-ontwikkelaars teleurstel – jy kan nie skrifte in java in sulke blokkies skryf nie. Vir diegene wat regtig wil, moet jy jou eie pasgemaakte vierkant skep en dit in die NIFI-stelsel gooi. Hierdie hele operasie word vergesel deur nogal lang danse met 'n tamboeryn, wat ons nie in hierdie artikel sal behandel nie. Ek het die groovy taal gekies. Hieronder is 'n toetsskrif wat eenvoudig die ID in die SOAP-boodskap inkrementeel opdateer. Dit is belangrik om daarop te let. Jy neem 'n lêer van flowFile en werk dit op, moenie vergeet dat jy dit nodig het nie, opgedateer, sit dit daar terug. Dit is ook opmerklik dat nie alle biblioteke gekoppel is nie. Dit kan blyk dat jy nog steeds een van die libs moet invoer. Die nadeel is dat die skrif in hierdie vierkant redelik moeilik is om te ontfout. Daar is 'n manier om aan die NIFI JVM te koppel en die ontfoutingsproses te begin. Persoonlik het ek 'n plaaslike toepassing bestuur en gesimuleer om 'n lêer van die sessie af te kry. Ontfouting is ook plaaslik gedoen. Foute wat opduik wanneer die skrip gelaai word, is redelik maklik om te google en word deur NIFI self na die log geskryf.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Eintlik is dit waar die aanpassing van die vierkant eindig. Vervolgens word die opgedateerde lêer oorgedra na die vierkant wat die lêer na die bediener stuur. Hieronder is die instellings vir hierdie vierkant.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Ons beskryf die metode waardeur die SOAP-boodskap versend sal word. Ons skryf waar. Vervolgens moet jy spesifiseer dat dit SOAP is.

Apache NIFI - 'n Kort oorsig van geleenthede in die praktyk

Ons voeg 'n paar eienskappe by soos gasheer en aksie (soapAction). Stoor, kyk. Vir meer besonderhede oor hoe om SOAP-versoeke te stuur, sien hier

Ons het verskeie opsies oorweeg vir die gebruik van NIFI-prosesse. Hoe werk hulle in wisselwerking en wat is die werklike voordele daarvan. Die oorwoë voorbeelde is toetse en verskil effens van wat werklik in gevegte is. Ek hoop dat hierdie artikel vir ontwikkelaars nuttig sal wees. Dankie vir jou aandag. As jy enige vrae het - skryf. Ek sal probeer antwoord.

Bron: will.com

Voeg 'n opmerking