Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Ynlieding

It barde sa dat ik op myn hjoeddeistige wurkplak yn 'e kunde komme moast mei dizze technology. Ik sil begjinne mei in bytsje eftergrûn. By de folgjende gearkomste waard ús team ferteld dat wy yntegraasje moatte meitsje mei bekend systeem. Mei yntegraasje waard bedoeld dat dit bekende systeem ús fersiken fia HTTP nei in spesifyk einpunt stjoere soe, en wy, frjemd genôch, antwurden weromstjoere yn 'e foarm fan in SOAP-berjocht. Alles liket ienfâldich en triviaal. Dêrút folget dat jo nedich hawwe ...

Objective

Meitsje 3 tsjinsten. De earste fan harren is de Database Update Service. Dizze tsjinst, as nije gegevens oankomme fan in systeem fan tredden, fernijt de gegevens yn 'e databank en genereart in bestân yn CSV-formaat om it oer te setten nei it folgjende systeem. It einpunt fan 'e twadde tsjinst wurdt neamd - de FTP-ferfiertsjinst, dy't it oerdroegen bestân ûntfangt, validearret it en set it yn bestân opslach fia FTP. De tredde tsjinst, de tsjinst foar oerdracht fan konsumintgegevens, wurket asynchronysk mei de earste twa. It ûntfangt in fersyk fan in ekstern systeem fan tredden om it hjirboppe besprutsen bestân te ûntfangen, nimt it kleare antwurdbestân, feroaret it (bywurket de id, beskriuwing, linkToFile-fjilden) en stjoert it antwurd yn 'e foarm fan in SOAP-berjocht. Dat is, it algemiene byld is as folget: de earste twa tsjinsten begjinne har wurk pas as de gegevens foar it bywurkjen binne oankommen. De tredde tsjinst wurket konstant om't d'r in protte konsuminten fan ynformaasje binne, sawat 1000 oanfragen foar gegevens per minuut. Tsjinsten binne konstant beskikber en har eksimplaren lizze yn ferskate omjouwings, lykas test, demo, pre-produksje en prod. Hjirûnder is in diagram fan hoe't dizze tsjinsten wurkje. Lit my fuortdaliks dúdlik meitsje dat guon details binne ferienfâldige om ûnnedige kompleksiteit te foarkommen.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Technyske ferdjipjen

By it plannen fan in oplossing foar it probleem, hawwe wy earst besletten om applikaasjes yn Java te meitsjen mei it Spring framework, Nginx balancer, Postgres database en oare technyske en net sa technyske dingen. Sûnt de tiid om in technyske oplossing te ûntwikkeljen koe ús oare oanpakken beskôgje foar it oplossen fan dit probleem, foel ús blik op 'e Apache NIFI-technology, dy't yn bepaalde rûnten modieus is. Ik sil daliks sizze dat dizze technology ús dizze 3 tsjinsten koe opmerke. Dit artikel sil de ûntwikkeling beskriuwe fan in tsjinst foar bestânferfier en in tsjinst foar gegevensferfier foar de konsumint, mar as it artikel nuttich is, sil ik skriuwe oer de tsjinst foar it bywurkjen fan gegevens yn 'e databank.

Wat is it?

NIFI is in ferspraat arsjitektuer foar fluch parallel laden en ferwurkjen fan gegevens, in grut oantal plugins foar boarnen en transformaasjes, ferzje fan konfiguraasjes en folle mear. In moaie bonus is dat it is hiel maklik te brûken. Triviale prosessen lykas getFile, sendHttpRequest en oaren kinne wurde fertsjintwurdige as fjouwerkanten. Elk plein stiet foar in proses, wêrfan de ynteraksje kin sjoen wurde yn 'e figuer hjirûnder. Mear detaillearre dokumintaasje oer proses opset ynteraksjes is skreaun hjir , foar dyjingen dy't Russysk prate - hjir. De dokumintaasje beskriuwt perfekt hoe't jo NIFI útpakke en útfiere, lykas hoe't jo prosessen kinne oanmeitsje, ek wol kwadraten neamd
It idee om in artikel te skriuwen waard berne nei in lange syktocht en strukturearjen fan de ûntfongen ynformaasje yn wat bewust, lykas de winsk om it libben in bytsje makliker te meitsjen foar takomstige ûntwikkelders.

Foarbyld:

In foarbyld fan hoe't kwadraten mei-inoar omgeane wurdt beskôge. It algemiene skema is frij simpel: wy ûntfange in HTTP-fersyk (Yn teory, mei in bestân yn it lichem fan 'e oanfraach. Om de mooglikheden fan NIFI te demonstrearjen, yn dit foarbyld begjint it fersyk it proses fan it ûntfangen fan in bestân fan 'e lokale bestân opslach ), dan stjoere wy in antwurd werom dat it fersyk ûntfongen is, parallel it proses fan it ûntfangen fan in bestân fan FH en dan it proses fan it ferpleatsen fia FTP nei FH. It is de muoite wurdich om te ferdúdlikjen dat prosessen mei-inoar ynteraksje fia de saneamde flowFile. Dit is de basis-entiteit yn NIFI dy't attributen en ynhâld opslacht. Ynhâld is de gegevens dy't wurdt fertsjintwurdige troch de stream triem. Dat is, rûchwei sprutsen, as jo in bestân ûntfange fan ien plein en it oerdrage nei in oar, sil de ynhâld jo bestân wêze.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Sa't jo sjen kinne, toant dizze foto it algemiene proses. HandleHttpRequest - akseptearret fersiken, ReplaceText - genereart in antwurd lichem, HandleHttpResponse - stjoert in antwurd. FetchFile - ûntfangt in bestân fan in bestân opslach, bringt it oer nei it fjouwerkante PutSftp - set dizze bestân op FTP, op it oantsjutte adres. No mear oer dit proses.

Yn dit gefal is fersyk it begjin fan alles. Litte wy nei har konfiguraasjeparameters sjen.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Alles hjir is frij triviaal mei útsûndering fan StandardHttpContextMap - dit is in soarte fan tsjinst wêrmei jo te ferstjoeren en ûntfange fersiken. Yn mear detail en sels mei foarbylden kinne jo sjen - hjir

Litte wy dan nei de ReplaceText-konfiguraasjeparameters fan it plein sjen. It is de muoite wurdich omtinken te jaan oan ReplacementValue - dit is wat sil wurde weromjûn oan de brûker yn 'e foarm fan in antwurd. Yn ynstellings kinne jo it nivo fan logging oanpasse, jo kinne de logs sjen {wêr't jo nifi útpakt hawwe}/nifi-1.9.2/logs, d'r binne ek parameters foar mislearring / sukses - basearre op dizze parameters kinne jo it proses as gehiel regelje . Dat is, yn it gefal fan suksesfolle tekstferwurking sil it proses fan it ferstjoeren fan in antwurd oan 'e brûker wurde neamd, en yn in oar gefal sille wy gewoan it mislearre proses oanmelde.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

D'r is neat spesjaal ynteressant yn 'e HandleHttpResponse-eigenskippen, útsein de status as in antwurd mei súkses makke is.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Wy hawwe it fersyk en antwurd sorteare - litte wy trochgean nei it ûntfangen fan it bestân en it pleatsen op 'e FTP-tsjinner. FetchFile - ûntfangt in bestân op it paad spesifisearre yn 'e ynstellings en jout it troch nei it folgjende proses.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

En dan it PutSftp-plein - pleatst it bestân yn 'e triem opslach. Wy kinne de konfiguraasjeparameters hjirûnder sjen.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

It is it wurdich omtinken te jaan oan it feit dat elk plein in apart proses is dat moat wurde lansearre. Wy seagen nei it ienfâldichste foarbyld dat gjin komplekse oanpassing fereasket. Folgjende, wy sille sjen nei it proses in bytsje mear komplisearre, dêr't wy sille skriuwe in bytsje op 'e grooves.

Mear komplekse foarbyld

De tsjinst foar gegevensoerdracht oan 'e konsumint die bliken in bytsje yngewikkelder te wêzen fanwege it proses fan it feroarjen fan it SOAP-berjocht. It algemiene proses wurdt werjûn yn 'e figuer hjirûnder.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Hjir is it idee ek net bysûnder yngewikkeld: wy krigen in fersyk fan 'e konsumint dat hy gegevens nedich wie, stjoerde in antwurd dat hy in berjocht krige, begon it proses fan it ûntfangen fan it antwurdbestân, bewurke it dan mei in bepaalde logika, en dan it bestân oerbrocht nei de konsumint yn 'e foarm fan in SOAP-berjocht nei de tsjinner.

Ik tink dat it net nedich is om dizze pleinen dy't wy hjirboppe seagen nochris te beskriuwen - litte wy direkt nei de nije gean. As jo ​​in bestân bewurkje moatte en gewoane fjilden fan it type ReplaceText binne net geskikt, dan moatte jo jo eigen skript skriuwe. Dit kin dien wurde mei it ExecuteGroogyScript-plein. Syn ynstellingen wurde hjirûnder presintearre.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

D'r binne twa opsjes foar it laden fan it skript yn dit plein. De earste is troch it downloaden fan in bestân mei in skript. De twadde is troch it ynfoegjen fan in skript yn scriptBody. Foar safier't ik wit, stipet it executeScript-plein ferskate talen - ien fan har is groovy. Ik sil java-ûntwikkelders teloarstelle - jo kinne gjin skripts yn java skriuwe yn sokke fjilden. Foar dyjingen dy't echt wolle, moatte jo jo eigen oanpaste plein meitsje en it tafoegje oan it NIFI-systeem. Dizze hiele operaasje wurdt begelaat troch in frij lange dûns mei in tamboerine, dy't wy net sille omgean yn dit artikel. Ik keas foar de groovy taal. Hjirûnder is in testskript dat de id gewoan inkrementeel bywurket yn in SOAP-berjocht. It is wichtich om te notearjen. Jo nimme it bestân fan flowFile en aktualisearje it, ferjit net dat jo it dêr werom moatte pleatse, bywurke. It is ek de muoite wurdich op te merken dat net alle bibleteken opnommen binne. It kin barre dat jo noch ien fan 'e libs moatte ymportearje. In oar nadeel is dat it skript op dit plein frijwat lestich is om te debuggen. D'r is in manier om te ferbinen mei de NIFI JVM en it debuggenproses te begjinnen. Persoanlik lansearre ik in lokale applikaasje en simulearre it ûntfangen fan in bestân fan 'e sesje. Ik die ek debuggen lokaal. Flaters dy't ferskine by it laden fan in skript binne frij maklik foar Google en wurde skreaun troch NIFI sels nei it log.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Eins, dit is wêr't de oanpassing fan it plein einiget. Dêrnei wurdt it bywurke bestân oerbrocht nei it plein, dat ferantwurdlik is foar it ferstjoeren fan it bestân nei de tsjinner. Hjirûnder binne de ynstellingen foar dit plein.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Wy beskriuwe de metoade wêrmei't in SOAP-berjocht sil wurde ferstjoerd. Wy skriuwe wêr. Folgjende moatte jo oanjaan dat dit SOAP is.

Apache NIFI - In koart oersjoch fan funksjes yn 'e praktyk

Foegje ferskate eigenskippen ta lykas host en aksje (soapAction). Wy bewarje en kontrolearje. Jo kinne mear details sjen oer hoe't jo SOAP-oanfragen ferstjoere hjir

Wy seagen ferskate opsjes foar it brûken fan NIFI-prosessen. Hoe omgean se en wat is har echte foardiel? De beskôge foarbylden binne test en binne wat oars as wat der eins bart yn 'e striid. Ik hoopje dat dit artikel in bytsje nuttich sil wêze foar ûntwikkelders. Tank foar jo oandacht. As jo ​​​​fragen hawwe, skriuw dan. Ik sil besykje te antwurdzjen.

Boarne: www.habr.com

Add a comment