å°å ¥
ããŸããŸãçŸåšã®è·å Žã§ãã®ãã¯ãããžãŒã«è§Šããå¿
èŠããããŸããã å°ãèæ¯ããå§ããŸãã 次ã®äŒè°ã§ãç§ãã¡ã®ããŒã ã¯ããšã®çµ±åãæ§ç¯ããå¿
èŠããããšèšãããŸããã æ¢ç¥ã®ã·ã¹ãã ã çµ±åãšã¯ããã®ããç¥ãããã·ã¹ãã ã HTTP çµç±ã§ãªã¯ãšã¹ããç¹å®ã®ãšã³ããã€ã³ãã«éä¿¡ããå¥åŠãªããšã«ãSOAP ã¡ãã»ãŒãžã®åœ¢åŒã§å¿çãéãè¿ãããšãæå³ããŠããŸããã ãã¹ãŠãåçŽã§äºçŽ°ãªããšã®ããã«æããŸãã ãã®ããšããã次ã®ããšãå¿
èŠã«ãªããŸã...
ã¿ã¹ã¯
3ã€ã®ãµãŒãã¹ãäœæããŸãã 1000 ã€ç®ã¯ããŒã¿ããŒã¹æŽæ°ãµãŒãã¹ã§ãã ãã®ãµãŒãã¹ã¯ããµãŒãããŒãã£ã®ã·ã¹ãã ããæ°ããããŒã¿ãå°çãããšãããŒã¿ããŒã¹å ã®ããŒã¿ãæŽæ°ããCSV 圢åŒã®ãã¡ã€ã«ãçæããŠæ¬¡ã®ã·ã¹ãã ã«è»¢éããŸãã XNUMX çªç®ã®ãµãŒãã¹ã®ãšã³ããã€ã³ã㯠FTP ãã©ã³ã¹ããŒã ãµãŒãã¹ãšåŒã°ãã転éããããã¡ã€ã«ãåä¿¡ããæ€èšŒããŠãFTP çµç±ã§ãã¡ã€ã« ã¹ãã¬ãŒãžã«çœ®ããŸãã XNUMX çªç®ã®ãµãŒãã¹ã§ããæ¶è²»è ããŒã¿è»¢éãµãŒãã¹ã¯ãæåã® XNUMX ã€ã®ãµãŒãã¹ãšéåæã§åäœããŸãã ãµãŒãããŒãã£ã®å€éšã·ã¹ãã ããäžèšã®ãã¡ã€ã«ãåä¿¡ãããªã¯ãšã¹ããåä¿¡ããæºåãã§ããå¿çãã¡ã€ã«ãååŸããŠå€æŽã (IDã説æãlinkToFile ãã£ãŒã«ããæŽæ°)ãSOAP ã¡ãã»ãŒãžã®åœ¢åŒã§å¿çãéä¿¡ããŸãã ã€ãŸããå šäœåã¯æ¬¡ã®ããã«ãªããŸããæåã® XNUMX ã€ã®ãµãŒãã¹ã¯ãæŽæ°çšã®ããŒã¿ãå°çãããšãã«ã®ã¿äœæ¥ãéå§ããŸãã XNUMX çªç®ã®ãµãŒãã¹ã¯ãæ å ±ã®å©çšè ãå€ããXNUMX åãããçŽ XNUMX 件ã®ããŒã¿èŠæ±ããããããåžžã«åäœããŸãã ãµãŒãã¹ã¯åžžã«å©çšå¯èœã§ããããã®ã€ã³ã¹ã¿ã³ã¹ã¯ãã¹ãããã¢ãéçšåãæ¬çªãªã©ã®ããŸããŸãªç°å¢ã«é 眮ãããŠããŸãã 以äžã¯ããããã®ãµãŒãã¹ãã©ã®ããã«æ©èœãããã瀺ãå³ã§ãã äžå¿ èŠãªè€éããé¿ããããã«ãããã€ãã®è©³çŽ°ãç°¡ç¥åãããŠããããšãããã«æ確ã«ãããŠãã ããã
æè¡ã®æ·±å
ãã®åé¡ã®è§£æ±ºçãèšç»ããéãç§ãã¡ã¯ãŸã Spring ãã¬ãŒã ã¯ãŒã¯ãNginx ãã©ã³ãµãŒãPostgres ããŒã¿ããŒã¹ããã®ä»ã®æè¡çããã³ããã»ã©æè¡çã§ãªããã®ã䜿çšã㊠Java ã§ã¢ããªã±ãŒã·ã§ã³ãäœæããããšã«ããŸããã æè¡çãªãœãªã¥ãŒã·ã§ã³ãéçºããæéãã§ããããããã®åé¡ã解決ããããã®ä»ã®ã¢ãããŒããæ€èšããããšãã§ããã®ã§ãç§ãã¡ã¯ç¹å®ã®ãµãŒã¯ã«ã§æµè¡ããŠãã Apache NIFI ãã¯ãããžãŒã«æ³šç®ããŸããã ãã®ãã¯ãããžãŒã«ãããããã 3 ã€ã®ãµãŒãã¹ãèªèã§ããããã«ãªã£ããšããã«èšããŸãã ãã®èšäºã§ã¯ãã³ã³ã·ã¥ãŒããŒåãã®ãã¡ã€ã«è»¢éãµãŒãã¹ãšããŒã¿è»¢éãµãŒãã¹ã®éçºã«ã€ããŠèª¬æããŸããããã®èšäºã圹ç«ã€å Žåã¯ãããŒã¿ããŒã¹å ã®ããŒã¿ãæŽæ°ãããµãŒãã¹ã«ã€ããŠãæžããŸãã
æ€ãã¯äœã§ãã
NIFI ã¯ãããŒã¿ã®é«éãªäžŠåèªã¿èŸŒã¿ãšåŠçããœãŒã¹ãšå€æçšã®å€æ°ã®ãã©ã°ã€ã³ãæ§æã®ããŒãžã§ã³ç®¡çãªã©ãè¡ãããã®åæ£ã¢ãŒããã¯ãã£ã§ãã çŽ æŽãããå©ç¹ã¯ãéåžžã«äœ¿ããããããšã§ãã getFileãsendHttpRequest ãªã©ã®åçŽãªããã»ã¹ã¯æ£æ¹åœ¢ãšããŠè¡šãããšãã§ããŸãã ååè§åœ¢ã¯ããã»ã¹ãè¡šããŠããããã®çžäºäœçšã以äžã®å³ã«ç€ºããŸãã ããã»ã¹èšå®ã®å¯Ÿè©±ã«é¢ãããã詳现ãªããã¥ã¡ã³ããäœæãããŸãã
èšäºãæžããšããã¢ã€ãã¢ã¯ãé·ãæ€çŽ¢ãšåãåã£ãæ
å ±ãæèçãªãã®ã«æ§é åããããšããããŠå°æ¥ã®éçºè
ã®äœæ¥ãå°ã楜ã«ããããšããé¡æã®åŸã«çãŸããŸããã
äŸ
æ£æ¹åœ¢ãã©ã®ããã«çžäºäœçšãããã®äŸãèããŠã¿ãŸãããã äžè¬çãªã¹ããŒã ã¯éåžžã«åçŽã§ã: HTTP ãªã¯ãšã¹ããåä¿¡ããŸã (çè«çã«ã¯ããªã¯ãšã¹ãã®æ¬æã«ãã¡ã€ã«ãå«ãŸããŸããNIFI ã®æ©èœã瀺ãããã«ããã®äŸã§ã¯ããªã¯ãšã¹ãã¯ããŒã«ã« ãã¡ã€ã« ã¹ãã¬ãŒãžãããã¡ã€ã«ãåä¿¡ããããã»ã¹ãéå§ããŸã) ïŒãFHãããã¡ã€ã«ãåä¿¡ããåŠçãšãFTPçµç±ã§FHã«ãã¡ã€ã«ã移åããåŠçã䞊è¡ããŠããªã¯ãšã¹ããåä¿¡ããæšã®å¿çãè¿ããŸãã ããã»ã¹ã¯ãããã flowFile ãéããŠçžäºã«å¯Ÿè©±ããããšãæ確ã«ãã䟡å€ããããŸãã ããã¯ãå±æ§ãšã³ã³ãã³ããä¿åãã NIFI ã®åºæ¬ãšã³ãã£ãã£ã§ãã ã³ã³ãã³ãã¯ãã¹ããªãŒã ãã¡ã€ã«ã«ãã£ãŠè¡šãããããŒã¿ã§ãã ã€ãŸãã倧ãŸãã«èšããšãããã¹ã¯ãšã¢ãããã¡ã€ã«ãåä¿¡ãããããå¥ã®ã¹ã¯ãšã¢ã«è»¢éãããšããã®ã³ã³ãã³ããããªãã®ãã¡ã€ã«ã«ãªããŸãã
ã芧ã®ãšããããã®å³ã¯äžè¬çãªããã»ã¹ã瀺ããŠããŸãã HandleHttpRequest - ãªã¯ãšã¹ããåãå ¥ããReplaceText - å¿çæ¬æãçæããHandleHttpResponse - å¿çãéä¿¡ããŸãã FetchFile - ãã¡ã€ã« ã¹ãã¬ãŒãžãããã¡ã€ã«ãåä¿¡ãããããæ£æ¹åœ¢ã«è»¢éããŸãã PutSftp - ãã®ãã¡ã€ã«ã FTP äžã®æå®ãããã¢ãã¬ã¹ã«çœ®ããŸãã ãã®ããã»ã¹ã«ã€ããŠè©³ãã説æããŸãã
ãã®å Žåããªã¯ãšã¹ãããã¹ãŠã®å§ãŸãã§ãã ãã®æ§æãã©ã¡ãŒã¿ãèŠãŠã¿ãŸãããã
ããã§ã®å
容ã¯ãStandardHttpContextMap ãé€ããŠãã¹ãŠéåžžã«ç°¡åã§ããStandardHttpContextMap ã¯ããªã¯ãšã¹ãã®éåä¿¡ãå¯èœã«ãããµãŒãã¹ã®äžçš®ã§ãã ããã«è©³ãããäŸãå«ããŠèª¬æãããšã次ã®ããã«ãªããŸãã
次ã«ãæ£æ¹åœ¢ã® ReplaceText æ§æãã©ã¡ãŒã¿ãŒãèŠãŠã¿ãŸãããã ReplacementValue ã«æ³šæãã䟡å€ããããŸããããã¯ãå¿çã®åœ¢åŒã§ãŠãŒã¶ãŒã«è¿ããããã®ã§ãã èšå®ã§ã¯ãã°ã®ã¬ãã«ã調æŽã§ãããã° {nifi ã解åããå Žæ}/nifi-1.9.2/logs ã確èªã§ããŸãããŸãã倱æ/æåãã©ã¡ãŒã¿ããããŸãããããã®ãã©ã¡ãŒã¿ã«åºã¥ããŠãããã»ã¹å šäœã調æŽã§ããŸãã ã ã€ãŸããããã¹ãåŠçãæåããå Žåã¯ããŠãŒã¶ãŒã«å¿çãéä¿¡ããããã»ã¹ãåŒã³åºãããŸãããå¥ã®å Žåã¯ã倱æããããã»ã¹ãåã«ãã°ã«èšé²ããŸãã
HandleHttpResponse ããããã£ã«ã¯ãå¿çãæ£åžžã«äœæããããšãã®ã¹ããŒã¿ã¹ãé€ããŠãç¹ã«èå³æ·±ããã®ã¯ãããŸããã
ãªã¯ãšã¹ããšã¬ã¹ãã³ã¹ãæŽçããŸããã次ã«ããã¡ã€ã«ã®åä¿¡ãš FTP ãµãŒããŒãžã®é 眮ã«é²ã¿ãŸãã FetchFile - èšå®ã§æå®ããããã¹ã§ãã¡ã€ã«ãåä¿¡ãã次ã®ããã»ã¹ã«æž¡ããŸãã
次ã«ãPutSftp åè§åœ¢ - ãã¡ã€ã«ããã¡ã€ã« ã¹ãã¬ãŒãžã«é 眮ããŸãã 以äžã«èšå®ãã©ã¡ãŒã¿ã瀺ããŸãã
ååè§åœ¢ã¯ãèµ·åããå¿ èŠãããå¥åã®ããã»ã¹ã§ãããšããäºå®ã«æ³šæãæã䟡å€ããããŸãã è€éãªã«ã¹ã¿ãã€ãºãå¿ èŠãšããªãæãåçŽãªäŸãèŠãŠãããŸããã 次ã«ãããå°ãè€éãªããã»ã¹ãèŠãŠãæºã«å°ãæžã蟌ã¿ãŸãã
ããè€éãªäŸ
ã³ã³ã·ã¥ãŒããžã®ããŒã¿è»¢éãµãŒãã¹ã¯ãSOAP ã¡ãã»ãŒãžãå€æŽããããã»ã¹ã«ãããå°ãè€éã«ãªãããšãå€æããŸããã äžè¬çãªããã»ã¹ã次ã®å³ã«ç€ºããŸãã
ããã§ããèãæ¹ã¯ç¹ã«è€éã§ã¯ãããŸãããæ¶è²»è ããããŒã¿ãå¿ èŠã§ãããšãããªã¯ãšã¹ããåä¿¡ããã¡ãã»ãŒãžãåä¿¡ãããšããå¿çãéä¿¡ããå¿çãã¡ã€ã«ãåä¿¡ããããã»ã¹ãéå§ãããããç¹å®ã®ããžãã¯ã§ç·šéããŠããµãŒããŒãžã® SOAP ã¡ãã»ãŒãžã®åœ¢åŒã§ãã¡ã€ã«ãã³ã³ã·ã¥ãŒãã«è»¢éããŸããã
äžã§èŠãåè§åœ¢ã«ã€ããŠå床説æããå¿ èŠã¯ãªããšæããŸããæ°ããåè§åœ¢ã«çŽæ¥é²ã¿ãŸãããã ãã¡ã€ã«ãç·šéããå¿ èŠããããéåžžã® ReplaceText ã¿ã€ãã®åè§åœ¢ãé©ããŠããªãå Žåã¯ãç¬èªã®ã¹ã¯ãªãããäœæããå¿ èŠããããŸãã ããã¯ãExecuteGroogyScript ã¹ã¯ãšã¢ã䜿çšããŠå®è¡ã§ããŸãã ãã®èšå®ã以äžã«ç€ºããŸãã
ãã®åè§åœ¢ã«ã¹ã¯ãªãããããŒãããã«ã¯ XNUMX ã€ã®ãªãã·ã§ã³ããããŸãã XNUMX ã€ç®ã¯ãã¹ã¯ãªãããå«ããã¡ã€ã«ãããŠã³ããŒãããããšã§ãã XNUMX ã€ç®ã¯ãscriptBody ã«ã¹ã¯ãªãããæ¿å ¥ããæ¹æ³ã§ãã ç§ã®ç¥ãéããexecuteScript åºå Žã¯ããã€ãã®èšèªããµããŒãããŠããŸãããã®ãã¡ã® XNUMX ã€ã¯ groovy ã§ãã Java éçºè ã¯ãã£ããããã§ããã - ãã®ãããªåè§åœ¢ã§ã¯ Java ã§ã¹ã¯ãªãããæžãããšã¯ã§ããŸããã æ¬åœã«ãããããå Žåã¯ãç¬èªã®ã«ã¹ã¿ã ã¹ã¯ãšã¢ãäœæã㊠NIFI ã·ã¹ãã ã«è¿œå ããå¿ èŠããããŸãã ãã®æäœå šäœã«ã¯ãã¿ã³ããªã³ã䜿ã£ãããªãé·ããã³ã¹ã䌎ããŸããããã®èšäºã§ã¯æ±ããŸããã ç§ã¯ã°ã«ãŒãŽã£ãŒãªèšèªãéžã³ãŸããã 以äžã¯ãSOAP ã¡ãã»ãŒãžå ã® ID ãåçŽã«å¢åæŽæ°ãããã¹ã ã¹ã¯ãªããã§ãã 泚æããããšãéèŠã§ãã flowFile ãããã¡ã€ã«ãååŸããŠæŽæ°ããŸããæŽæ°ããŠããã«æ»ãå¿ èŠãããããšãå¿ããªãã§ãã ããã ãã¹ãŠã®ã©ã€ãã©ãªãå«ãŸããŠããããã§ã¯ãªãããšã«ã泚æããŠãã ããã ããããã®ã©ã€ãã©ãªãã€ã³ããŒãããå¿ èŠãããå ŽåããããŸãã ãã XNUMX ã€ã®æ¬ ç¹ã¯ããã®åºå Žã®ã¹ã¯ãªããã®ãããã°ãéåžžã«é£ããããšã§ãã NIFI JVM ã«æ¥ç¶ããŠãããã° ããã»ã¹ãéå§ããæ¹æ³ããããŸãã å人çã«ã¯ãããŒã«ã« ã¢ããªã±ãŒã·ã§ã³ãèµ·åããã»ãã·ã§ã³ããã®ãã¡ã€ã«ã®åä¿¡ãã·ãã¥ã¬ãŒãããŸããã ããŒã«ã«ã§ã®ãããã°ãè¡ããŸããã ã¹ã¯ãªããã®ããŒãæã«è¡šç€ºããããšã©ãŒã¯ãGoogle ã§ç°¡åã«æ€çŽ¢ã§ããNIFI èªäœã«ãã£ãŠãã°ã«æžã蟌ãŸããŸãã
import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder
def flowFile = session.get()
if (!flowFile) return
try {
flowFile = session.write(flowFile, { inputStream, outputStream ->
String result = IOUtils.toString(inputStream, "UTF-8");
def recordIn = new XmlSlurper().parseText(result)
def element = recordIn.depthFirst().find {
it.name() == 'id'
}
def newId = Integer.parseInt(element.toString()) + 1
def recordOut = new XmlSlurper().parseText(result)
recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId
def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
log.error("Error during processing of validate.groovy", e)
session.transfer(flowFile, REL_FAILURE)
}
å®éã«ã¯ãããã§æ£æ¹åœ¢ã®ã«ã¹ã¿ãã€ãºã¯çµäºã§ãã 次ã«ãæŽæ°ããããã¡ã€ã«ã¯ã¹ã¯ãšã¢ã«è»¢éãããã¹ã¯ãšã¢ã¯ãã¡ã€ã«ããµãŒããŒã«éä¿¡ããŸãã 以äžã¯ãã®åºå Žã®èšå®ã§ãã
SOAP ã¡ãã»ãŒãžãéä¿¡ãããæ¹æ³ã«ã€ããŠèª¬æããŸãã ã©ãã«ãããæžããŸãã 次ã«ãããã SOAP ã§ããããšã瀺ãå¿ èŠããããŸãã
ãã¹ããã¢ã¯ã·ã§ã³ (soapAction) ãªã©ã®ããã€ãã®ããããã£ãè¿œå ããŸãã ä¿åããŠç¢ºèªããŸãã SOAP ãªã¯ãšã¹ãã®éä¿¡æ¹æ³ã®è©³çŽ°ã確èªã§ããŸãã
NIFI ããã»ã¹ã䜿çšããããã®ããã€ãã®ãªãã·ã§ã³ãæ€èšããŸããã ãããã¯ã©ã®ããã«çžäºäœçšãããã®æ¬åœã®å©ç¹ã¯äœã§ãããã? èæ ®ãããŠããäŸã¯ãã¹ãçšã§ãããå®éã®æŠéã§èµ·ããããšãšã¯è¥å¹²ç°ãªããŸãã ãã®èšäºãéçºè ã«ãšã£ãŠå°ãã§ã圹ç«ã€ããšãé¡ã£ãŠããŸãã ãæž èŽããããšãããããŸããã ã質åãããããŸãããããæžããã ããã çããŠã¿ãŸãã
åºæïŒ habr.com