Ievads
SagadÄ«jÄs tÄ, ka manÄ paÅ”reizÄjÄ darba vietÄ nÄcÄs iepazÄ«ties ar Å”o tehnoloÄ£iju. SÄkÅ”u ar nelielu priekÅ”vÄsturi. NÄkamajÄ sanÄksmÄ mÅ«su komandai teica, ka mums ir jÄizveido integrÄcija ar zinÄma sistÄma. Ar integrÄciju tika domÄts, ka Ŕī labi zinÄmÄ sistÄma mums nosÅ«tÄ«s pieprasÄ«jumus, izmantojot HTTP uz noteiktu galapunktu, un mÄs, dÄ«vainÄ kÄrtÄ, nosÅ«tÄm atbildes SOAP ziÅojuma veidÄ. Viss Ŕķiet vienkÄrÅ”s un triviÄls. No tÄ izriet, ka jums ir nepiecieÅ”ams...
Uzdevums
Izveidojiet 3 pakalpojumus. Pirmais no tiem ir datu bÄzes atjauninÄÅ”anas pakalpojums. Å is pakalpojums, kad no treÅ”Äs puses sistÄmas pienÄk jauni dati, atjaunina datus datu bÄzÄ un Ä£enerÄ failu CSV formÄtÄ, lai tos pÄrsÅ«tÄ«tu uz nÄkamo sistÄmu. OtrÄ pakalpojuma galapunkts tiek saukts par FTP transporta pakalpojumu, kas saÅem pÄrsÅ«tÄ«to failu, apstiprina to un ievieto failu krÄtuvÄ, izmantojot FTP. TreÅ”ais pakalpojums, patÄrÄtÄju datu pÄrsÅ«tÄ«Å”anas pakalpojums, darbojas asinhroni ar pirmajiem diviem. TÄ saÅem pieprasÄ«jumu no treÅ”Äs puses ÄrÄjÄs sistÄmas saÅemt iepriekÅ” apspriesto failu, paÅem gatavÄs atbildes failu, modificÄ to (atjaunina id, aprakstu, linkToFile laukus) un nosÅ«ta atbildi SOAP ziÅojuma veidÄ. Proti, kopaina ir Å”Äda: pirmie divi pakalpojumi sÄk darbu tikai tad, kad ir saÅemti atjauninÄÅ”anas dati. TreÅ”ais dienests strÄdÄ pastÄvÄ«gi, jo ir daudz informÄcijas patÄrÄtÄju, aptuveni 1000 datu pieprasÄ«jumu minÅ«tÄ. Pakalpojumi ir pieejami pastÄvÄ«gi, un to gadÄ«jumi atrodas dažÄdÄs vidÄs, piemÄram, testÄÅ”anas, demonstrÄcijas, pirmsražoÅ”anas un prod. ZemÄk ir diagramma, kÄ Å”ie pakalpojumi darbojas. Ä»aujiet man uzreiz paskaidrot, ka dažas detaļas ir vienkÄrÅ”otas, lai izvairÄ«tos no nevajadzÄ«gas sarežģītÄ«bas.
TehniskÄ padziļinÄÅ”ana
PlÄnojot problÄmas risinÄjumu, vispirms nolÄmÄm aplikÄcijas taisÄ«t Java, izmantojot Spring framework, Nginx balancer, Postgres datubÄzi un citas tehniskas un ne tik tehniskas lietas. TÄ kÄ tehniskÄ risinÄjuma izstrÄdes laiks ļÄva mums apsvÄrt citas pieejas Ŕīs problÄmas risinÄÅ”anai, mÅ«su skatiens krita uz Apache NIFI tehnoloÄ£iju, kas ir modÄ noteiktÄs aprindÄs. Uzreiz teikÅ”u, ka Ŕī tehnoloÄ£ija ļÄva mums pamanÄ«t Å”os 3 pakalpojumus. Å ajÄ rakstÄ tiks aprakstÄ«ta failu transportÄÅ”anas pakalpojuma un datu pÄrsÅ«tÄ«Å”anas pakalpojuma izstrÄde patÄrÄtÄjam, bet, ja raksts bÅ«s noderÄ«gs, rakstÄ«Å”u par datu atjauninÄÅ”anas pakalpojumu datu bÄzÄ.
Kas ir Ŕis
NIFI ir izkliedÄta arhitektÅ«ra Ätrai paralÄlai datu ielÄdei un apstrÄdei, liels skaits spraudÅu avotiem un transformÄcijÄm, konfigurÄciju versijas veidoÅ”ana un daudz kas cits. Jauks bonuss ir tas, ka to ir ļoti viegli lietot. TriviÄlus procesus, piemÄram, getFile, sendHttpRequest un citus, var attÄlot kÄ kvadrÄtus. Katrs kvadrÄts apzÄ«mÄ procesu, kura mijiedarbÄ«bu var redzÄt zemÄk esoÅ”ajÄ attÄlÄ. Ir uzrakstÄ«ta detalizÄtÄka dokumentÄcija par procesa iestatÄ«Å”anas mijiedarbÄ«bu
Ideja rakstÄ«t rakstu radÄs pÄc ilgiem meklÄjumiem un saÅemtÄs informÄcijas strukturÄÅ”anas apzinÄtÄ veidÄ, kÄ arÄ« vÄlme nedaudz atvieglot dzÄ«vi topoÅ”ajiem izstrÄdÄtÄjiem..
PiemÄrs
Tiek aplÅ«kots piemÄrs tam, kÄ kvadrÄti mijiedarbojas viens ar otru. VispÄrÄjÄ shÄma ir pavisam vienkÄrÅ”a: MÄs saÅemam HTTP pieprasÄ«jumu (TeorÄtiski ar failu pieprasÄ«juma pamattekstÄ. Lai demonstrÄtu NIFI iespÄjas, Å”ajÄ piemÄrÄ pieprasÄ«jums sÄk faila saÅemÅ”anas procesu no lokÄlÄs failu krÄtuves ), tad mÄs nosÅ«tÄm atpakaļ atbildi, ka pieprasÄ«jums ir saÅemts, paralÄli notiek faila saÅemÅ”ana no FH un pÄc tam tÄ pÄrvietoÅ”ana caur FTP uz FH. Ir vÄrts precizÄt, ka procesi mijiedarbojas viens ar otru, izmantojot tÄ saukto flowFile. Å Ä« ir NIFI pamata entÄ«tija, kas glabÄ atribÅ«tus un saturu. Saturs ir dati, kas tiek attÄloti straumes failÄ. Tas ir, rupji runÄjot, ja saÅemat failu no viena kvadrÄta un pÄrsÅ«tÄt to uz citu, saturs bÅ«s jÅ«su fails.
KÄ redzat, Å”is attÄls parÄda vispÄrÄjo procesu. HandleHttpRequest ā pieÅem pieprasÄ«jumus, ReplaceText ā Ä£enerÄ atbildes pamattekstu, HandleHttpResponse ā nosÅ«ta atbildi. FetchFile - saÅem failu no failu krÄtuves, pÄrsÅ«ta to uz kvadrÄtu PutSftp - ievieto Å”o failu FTP, norÄdÄ«tajÄ adresÄ. Tagad vairÄk par Å”o procesu.
Å ajÄ gadÄ«jumÄ pieprasÄ«jums ir visa sÄkums. ApskatÄ«sim tÄ konfigurÄcijas parametrus.
Å eit viss ir diezgan triviÄls, izÅemot StandardHttpContextMap - tas ir sava veida pakalpojums, kas ļauj nosÅ«tÄ«t un saÅemt pieprasÄ«jumus. SÄ«kÄk un pat ar piemÄriem jÅ«s varat redzÄt -
TÄlÄk apskatÄ«sim kvadrÄta ReplaceText konfigurÄcijas parametrus. Ir vÄrts pievÄrst uzmanÄ«bu ReplacementValue - tas ir tas, kas tiks atgriezts lietotÄjam atbildes veidÄ. IestatÄ«jumos var regulÄt reÄ£istrÄÅ”anas lÄ«meni, var redzÄt žurnÄlus {kur izpakojÄt nifi}/nifi-1.9.2/logs, ir arÄ« neveiksmes/veiksmes parametri - pamatojoties uz Å”iem parametriem, varat regulÄt procesu kopumÄ . Tas ir, veiksmÄ«gas teksta apstrÄdes gadÄ«jumÄ tiks izsaukts atbildes nosÅ«tÄ«Å”anas process lietotÄjam, un citÄ gadÄ«jumÄ mÄs vienkÄrÅ”i reÄ£istrÄsim neveiksmÄ«go procesu.
HandleHttpResponse rekvizÄ«tos nav nekÄ Ä«paÅ”i interesanta, izÅemot statusu, kad atbilde ir veiksmÄ«gi izveidota.
Esam nokÄrtojuÅ”i pieprasÄ«jumu un atbildi ā pÄriesim pie faila saÅemÅ”anas un ievietoÅ”anas FTP serverÄ«. FetchFile - saÅem failu iestatÄ«jumos norÄdÄ«tajÄ ceÄ¼Ä un nodod to nÄkamajam procesam.
Un tad PutSftp kvadrÄts - ievieto failu failu krÄtuvÄ. TÄlÄk mÄs varam redzÄt konfigurÄcijas parametrus.
Ir vÄrts pievÄrst uzmanÄ«bu tam, ka katrs laukums ir atseviŔķs process, kas jÄsÄk. MÄs apskatÄ«jÄm vienkÄrÅ”Äko piemÄru, kam nav nepiecieÅ”ama sarežģīta pielÄgoÅ”ana. TÄlÄk mÄs aplÅ«kosim procesu nedaudz sarežģītÄk, kur mÄs nedaudz uzrakstÄ«sim uz rievÄm.
SarežģītÄks piemÄrs
Datu pÄrsÅ«tÄ«Å”anas pakalpojums patÄrÄtÄjam izrÄdÄ«jÄs nedaudz sarežģītÄks SOAP ziÅojuma modificÄÅ”anas procesa dÄļ. VispÄrÄjais process ir parÄdÄ«ts zemÄk esoÅ”ajÄ attÄlÄ.
Å eit arÄ« ideja nav Ä«paÅ”i sarežģīta: saÅÄmÄm pieprasÄ«jumu no patÄrÄtÄja, ka viÅam nepiecieÅ”ami dati, nosÅ«tÄ«jÄm atbildi, ka ir saÅÄmis ziÅu, sÄkÄm atbildes faila saÅemÅ”anas procesu, pÄc tam ar noteiktu loÄ£iku to rediÄ£ÄjÄm un tad pÄrsÅ«tÄ«ja failu patÄrÄtÄjam SOAP ziÅojuma veidÄ uz serveri.
Es domÄju, ka nav nepiecieÅ”ams vÄlreiz aprakstÄ«t tos kvadrÄtus, kurus redzÄjÄm iepriekÅ” - pÄriesim tieÅ”i uz jaunajiem. Ja jums ir nepiecieÅ”ams rediÄ£Ät kÄdu failu un parastie ReplaceText tipa kvadrÄti nav piemÄroti, jums bÅ«s jÄraksta savs skripts. To var izdarÄ«t, izmantojot ExecuteGroogyScript kvadrÄtu. TÄs iestatÄ«jumi ir parÄdÄ«ti zemÄk.
Ir divas iespÄjas skripta ielÄdei Å”ajÄ laukumÄ. Pirmais ir, lejupielÄdÄjot failu ar skriptu. Otrais ir skripta ievietoÅ”ana scriptBody. Cik es zinu, executeScript kvadrÄts atbalsta vairÄkas valodas - viena no tÄm ir groovy. PievilÅ”u java izstrÄdÄtÄjus - tÄdos kvadrÄtos nevar rakstÄ«t skriptus java. Tiem, kas patieÅ”Äm vÄlas, jums ir jÄizveido savs pielÄgotais laukums un jÄpievieno tas NIFI sistÄmai. Visu Å”o operÄciju pavada diezgan ilga deja ar tamburÄ«nu, par ko Å”ajÄ rakstÄ mÄs nerunÄsim. Es izvÄlÄjos groovy valodu. ZemÄk ir testa skripts, kas vienkÄrÅ”i pakÄpeniski atjaunina ID SOAP ziÅojumÄ. Ir svarÄ«gi atzÄ«mÄt. JÅ«s paÅemat failu no flowFile un atjauniniet to, neaizmirstiet, ka tas ir jÄievieto tur atpakaļ, atjauninÄts. Ir arÄ« vÄrts atzÄ«mÄt, ka ne visas bibliotÄkas ir iekļautas. Var gadÄ«ties, ka tomÄr ir jÄimportÄ kÄds no libiem. VÄl viens mÄ«nuss ir tas, ka skriptu Å”ajÄ laukumÄ ir diezgan grÅ«ti atkļūdot. Ir veids, kÄ izveidot savienojumu ar NIFI JVM un sÄkt atkļūdoÅ”anas procesu. PersonÄ«gi es palaidu vietÄjo lietojumprogrammu un simulÄju faila saÅemÅ”anu no sesijas. Es arÄ« veicu atkļūdoÅ”anu lokÄli. Kļūdas, kas parÄdÄs, ielÄdÄjot skriptu, ir diezgan viegli Google, un tÄs ieraksta žurnÄlÄ pats NIFI.
import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder
def flowFile = session.get()
if (!flowFile) return
try {
flowFile = session.write(flowFile, { inputStream, outputStream ->
String result = IOUtils.toString(inputStream, "UTF-8");
def recordIn = new XmlSlurper().parseText(result)
def element = recordIn.depthFirst().find {
it.name() == 'id'
}
def newId = Integer.parseInt(element.toString()) + 1
def recordOut = new XmlSlurper().parseText(result)
recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId
def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.error("Error during processing of validate.groovy", e)
session.transfer(flowFile, REL_FAILURE)
}
Faktiski Å”eit beidzas laukuma pielÄgoÅ”ana. TÄlÄk atjauninÄtais fails tiek pÄrsÅ«tÄ«ts uz laukumu, kas ir atbildÄ«gs par faila nosÅ«tÄ«Å”anu uz serveri. TÄlÄk ir norÄdÄ«ti Ŕī laukuma iestatÄ«jumi.
MÄs aprakstÄm metodi, ar kuru tiks pÄrsÅ«tÄ«ts SOAP ziÅojums. MÄs rakstÄm, kur. TÄlÄk jums jÄnorÄda, ka tas ir SOAP.
Pievienojiet vairÄkus rekvizÄ«tus, piemÄram, saimniekdatoru un darbÄ«bu (soapAction). MÄs saglabÄjam un pÄrbaudÄm. Varat skatÄ«t sÄ«kÄku informÄciju par to, kÄ nosÅ«tÄ«t SOAP pieprasÄ«jumus
MÄs apskatÄ«jÄm vairÄkas NIFI procesu izmantoÅ”anas iespÄjas. KÄ viÅi mijiedarbojas un kÄds ir viÅu patiesais labums? ApskatÄ«tie piemÄri ir pÄrbaudes piemÄri un nedaudz atŔķiras no tÄ, kas patiesÄ«bÄ notiek kaujÄ. Es ceru, ka Å”is raksts bÅ«s nedaudz noderÄ«gs izstrÄdÄtÄjiem. Paldies par jÅ«su uzmanÄ«bu. Ja ir kÄdi jautÄjumi, rakstiet. Es mÄÄ£inÄÅ”u atbildÄt.
Avots: www.habr.com