Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Paraqitje

Kështu ndodhi që në vendin tim aktual të punës më duhej të njihesha me këtë teknologji. Do të filloj me një sfond të vogël. Në takimin e radhës, ekipit tonë iu tha se duhej të krijonim integrim me sistemi i njohur. Me integrim nënkuptohej që ky sistem i mirënjohur do të na dërgonte kërkesa nëpërmjet HTTP në një pikë përfundimtare specifike, dhe ne, çuditërisht, do të dërgonim përgjigjet në formën e një mesazhi SOAP. Gjithçka duket e thjeshtë dhe e parëndësishme. Nga kjo rezulton se ju duhet...

Detyrë

Krijo 3 shërbime. I pari prej tyre është Shërbimi i përditësimit të bazës së të dhënave. Ky shërbim, kur të dhëna të reja mbërrijnë nga një sistem i palës së tretë, përditëson të dhënat në bazën e të dhënave dhe gjeneron një skedar në formatin CSV për ta transferuar atë në sistemin tjetër. Pika përfundimtare e shërbimit të dytë quhet - Shërbimi i Transportit FTP, i cili merr skedarin e transferuar, e vërteton atë dhe e vendos atë në ruajtjen e skedarëve përmes FTP. Shërbimi i tretë, shërbimi i transferimit të të dhënave të konsumatorit, funksionon në mënyrë asinkrone me dy të parët. Ai merr një kërkesë nga një sistem i jashtëm i palës së tretë për të marrë skedarin e diskutuar më sipër, merr skedarin e gatshëm të përgjigjes, e modifikon atë (përditëson id-në, përshkrimin, fushat linkToFile) dhe dërgon përgjigjen në formën e një mesazhi SOAP. Kjo do të thotë, pamja e përgjithshme është si më poshtë: dy shërbimet e para fillojnë punën e tyre vetëm kur të kenë mbërritur të dhënat për përditësimin. Shërbimi i tretë funksionon vazhdimisht sepse ka shumë konsumatorë informacioni, rreth 1000 kërkesa për të dhëna në minutë. Shërbimet janë të disponueshme vazhdimisht dhe instancat e tyre janë të vendosura në mjedise të ndryshme, si test, demonstrim, para-prodhim dhe prod. Më poshtë është një diagram se si funksionojnë këto shërbime. Më lejoni të sqaroj menjëherë se disa detaje janë thjeshtuar për të shmangur kompleksitetin e panevojshëm.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Thellimi Teknik

Kur planifikonim një zgjidhje për problemin, fillimisht vendosëm të bënim aplikacione në Java duke përdorur kornizën Spring, balancuesin Nginx, bazën e të dhënave Postgres dhe gjëra të tjera teknike dhe jo aq teknike. Meqenëse koha për të zhvilluar një zgjidhje teknike na lejoi të konsideronim qasje të tjera për zgjidhjen e këtij problemi, vështrimi ynë ra në teknologjinë Apache NIFI, e cila është në modë në qarqe të caktuara. Unë do të them menjëherë se kjo teknologji na lejoi të vëmë re këto 3 shërbime. Ky artikull do të përshkruajë zhvillimin e një shërbimi të transportit të skedarëve dhe një shërbimi të transferimit të të dhënave për konsumatorin, por nëse artikulli është i dobishëm, unë do të shkruaj për shërbimin për përditësimin e të dhënave në bazën e të dhënave.

Çfarë është kjo

NIFI është një arkitekturë e shpërndarë për ngarkim dhe përpunim të shpejtë paralel të të dhënave, një numër i madh shtojcash për burime dhe transformime, versionim të konfigurimeve dhe shumë më tepër. Një bonus i mirë është se është shumë i lehtë për t'u përdorur. Proceset e parëndësishme si getFile, sendHttpRequest dhe të tjera mund të përfaqësohen si katrorë. Çdo katror përfaqëson një proces, ndërveprimi i të cilit mund të shihet në figurën më poshtë. Është shkruar dokumentacion më i detajuar mbi ndërveprimet e konfigurimit të procesit këtu , për ata që flasin rusisht - këtu. Dokumentacioni përshkruan në mënyrë të përsosur se si të shpaketoni dhe ekzekutoni NIFI, si dhe si të krijoni procese, të njohura gjithashtu si katrore
Ideja për të shkruar një artikull lindi pas një kërkimi të gjatë dhe strukturimit të informacionit të marrë në diçka të ndërgjegjshme, si dhe dëshirës për ta bërë jetën pak më të lehtë për zhvilluesit e ardhshëm.

Shembull

Është konsideruar një shembull se si katrorët ndërveprojnë me njëri-tjetrin. Skema e përgjithshme është mjaft e thjeshtë: Ne marrim një kërkesë HTTP (Teorikisht, me një skedar në trupin e kërkesës. Për të demonstruar aftësitë e NIFI, në këtë shembull kërkesa fillon procesin e marrjes së një skedari nga ruajtja e skedarit lokal ), më pas dërgojmë një përgjigje se kërkesa është marrë, paralelisht procesi i marrjes së një skedari nga FH dhe më pas procesi i zhvendosjes së tij përmes FTP në FH. Vlen të sqarohet se proceset ndërveprojnë me njëri-tjetrin përmes të ashtuquajturit flowFile. Ky është entiteti bazë në NIFI që ruan atributet dhe përmbajtjen. Përmbajtja është të dhënat që përfaqësohen nga skedari i transmetimit. Kjo do të thotë, përafërsisht, nëse merrni një skedar nga një shesh dhe e transferoni atë në një tjetër, përmbajtja do të jetë skedari juaj.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Siç mund ta shihni, kjo foto tregon procesin e përgjithshëm. HandleHttpRequest - pranon kërkesat, ReplaceText - gjeneron një trup përgjigjeje, HandleHttpResponse - dërgon një përgjigje. FetchFile - merr një skedar nga ruajtja e skedarit, e transferon atë në katrorin PutSftp - e vendos këtë skedar në FTP, në adresën e specifikuar. Tani më shumë rreth këtij procesi.

Në këtë rast, kërkesa është fillimi i gjithçkaje. Le të shohim parametrat e tij të konfigurimit.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Gjithçka këtu është mjaft e parëndësishme me përjashtim të StandardHttpContextMap - ky është një lloj shërbimi që ju lejon të dërgoni dhe merrni kërkesa. Më në detaje dhe madje edhe me shembuj, mund të shihni - këtu

Më pas, le të shohim parametrat e konfigurimit të ReplaceText të katrorit. Vlen t'i kushtohet vëmendje ReplacementValue - kjo është ajo që do t'i kthehet përdoruesit në formën e një përgjigje. Në cilësimet mund të rregulloni nivelin e regjistrimit, mund të shihni regjistrat {where you unpacked nifi}/nifi-1.9.2/logs, ka edhe parametra dështimi/sukses - bazuar në këto parametra mund ta rregulloni procesin në tërësi . Kjo do të thotë, në rastin e përpunimit të suksesshëm të tekstit, procesi i dërgimit të një përgjigje te përdoruesi do të thirret, dhe në një rast tjetër ne thjesht do të regjistrojmë procesin e pasuksesshëm.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Nuk ka asgjë veçanërisht interesante në vetitë HandleHttpResponse përveç statusit kur një përgjigje krijohet me sukses.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Kemi renditur kërkesën dhe përgjigjen - le të kalojmë në marrjen e skedarit dhe vendosjen e tij në serverin FTP. FetchFile - merr një skedar në shtegun e specifikuar në cilësimet dhe e kalon atë në procesin tjetër.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Dhe pastaj katrori PutSftp - vendos skedarin në ruajtjen e skedarëve. Ne mund të shohim parametrat e konfigurimit më poshtë.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Vlen t'i kushtohet vëmendje faktit që çdo shesh është një proces i veçantë që duhet të nisë. Ne shikuam shembullin më të thjeshtë që nuk kërkon ndonjë personalizim kompleks. Tjetra, ne do ta shohim procesin pak më të komplikuar, ku do të shkruajmë pak në groove.

Shembull më kompleks

Shërbimi i transferimit të të dhënave te konsumatori doli të ishte pak më i komplikuar për shkak të procesit të modifikimit të mesazhit SOAP. Procesi i përgjithshëm është paraqitur në figurën më poshtë.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Këtu ideja gjithashtu nuk është veçanërisht e ndërlikuar: ne morëm një kërkesë nga konsumatori që ai kishte nevojë për të dhëna, i dërguam një përgjigje se ai kishte marrë një mesazh, filluam procesin e marrjes së skedarit të përgjigjes, më pas e redaktuam atë me një logjikë të caktuar dhe më pas transferoi skedarin te konsumatori në formën e një mesazhi SOAP në server.

Unë mendoj se nuk ka nevojë të përshkruajmë përsëri ato sheshe që pamë më lart - le të kalojmë direkt në ato të reja. Nëse keni nevojë të redaktoni ndonjë skedar dhe katrorët e zakonshëm të tipit ReplaceText nuk janë të përshtatshëm, do t'ju duhet të shkruani skriptin tuaj. Kjo mund të bëhet duke përdorur katrorin ExecuteGroogyScript. Cilësimet e tij janë paraqitur më poshtë.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Ekzistojnë dy opsione për të ngarkuar skriptin në këtë katror. E para është duke shkarkuar një skedar me një skenar. E dyta është duke futur një skript në scriptBody. Me sa di unë, katrori executeScript mbështet disa gjuhë - njëra prej tyre është groovy. Unë do t'i zhgënjej zhvilluesit e Java - nuk mund të shkruani skripta në java në katrorë të tillë. Për ata që duan vërtet, ju duhet të krijoni katrorin tuaj personal dhe ta shtoni atë në sistemin NIFI. I gjithë ky operacion shoqërohet me një kërcim mjaft të gjatë me një dajre, me të cilën nuk do të trajtojmë në këtë artikull. Unë zgjodha gjuhën groovy. Më poshtë është një skript provë që thjesht përditëson gradualisht ID-në në një mesazh SOAP. Është e rëndësishme të theksohet. Ju e merrni skedarin nga flowFile dhe e përditësoni, mos harroni që duhet ta vendosni përsëri atje, të përditësuar. Vlen gjithashtu të theksohet se jo të gjitha bibliotekat janë të përfshira. Mund të ndodhë që ju ende duhet të importoni një nga libs. Një tjetër dobësi është se skripti në këtë katror është mjaft i vështirë për t'u korrigjuar. Ekziston një mënyrë për t'u lidhur me NIFI JVM dhe për të filluar procesin e korrigjimit. Personalisht, kam nisur një aplikacion lokal dhe kam simuluar marrjen e një skedari nga sesioni. Kam bërë gjithashtu korrigjimin lokal. Gabimet që shfaqen gjatë ngarkimit të një skripti janë mjaft të lehta për Google dhe shkruhen nga vetë NIFI në regjistër.

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

Në fakt, këtu përfundon personalizimi i katrorit. Më pas, skedari i përditësuar transferohet në shesh, i cili është përgjegjës për dërgimin e skedarit në server. Më poshtë janë cilësimet për këtë katror.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Ne përshkruajmë metodën me të cilën do të transmetohet një mesazh SOAP. Ne shkruajmë ku. Tjetra duhet të tregoni se ky është SOAP.

Apache NIFI - Një përmbledhje e shkurtër e mundësive në praktikë

Shto disa veti të tilla si host dhe veprim (soapAction). Ne ruajmë dhe kontrollojmë. Mund të shihni më shumë detaje se si të dërgoni kërkesat SOAP këtu

Ne shikuam disa opsione për përdorimin e proceseve NIFI. Si ndërveprojnë dhe cili është përfitimi i tyre real? Shembujt e konsideruar janë ato provë dhe janë paksa të ndryshëm nga ajo që ndodh në të vërtetë në luftime. Shpresoj se ky artikull do të jetë pak i dobishëm për zhvilluesit. Faleminderit per vemendjen. Nëse keni ndonjë pyetje, shkruani. Do të përpiqem të përgjigjem.

Burimi: www.habr.com

Shto një koment