Apache NIFI - 實踐機會的簡要概述

介紹

碰巧在我現在的工作地點我必須熟悉這項技術。 我將從一些背景開始。 在下次會議上,我們的團隊被告知我們需要與 已知系統。 整合意味著這個眾所周知的系統將透過 HTTP 向特定端點發送請求,而奇怪的是,我們將以 SOAP 訊息的形式發迴回應。 一切都顯得簡單而瑣碎。 由此可見,您需要...

任務

創建3個服務。 第一個是資料庫更新服務。 當來自第三方系統的新資料到達時,該服務會更新資料庫中的資料並產生 CSV 格式的檔案以將其傳輸到下一個系統。 第二個服務的端點稱為 FTP 傳輸服務,它接收傳輸的文件,對其進行驗證,然後透過 FTP 將其放入文件儲存中。 第三個服務是消費者資料傳輸服務,與前兩個服務非同步工作。 它接收來自第三方外部系統的請求以接收上述文件,以獲取準備好的回應文件,對其進行修改(更新 id、description、linkToFile 欄位)並以 SOAP 訊息的形式發送回應。 也就是說,整體情況是這樣的:只有當更新的資料到達時,前兩個服務才開始運作。 第三個服務持續運作,因為資訊消費者很多,每分鐘大約有 1000 個資料請求。 服務持續可用,其實例位於不同的環境中,例如測試、演示、預生產和生產。 下圖展示了這些服務的工作原理。 讓我立即澄清,一些細節已被簡化,以避免不必要的複雜性。

Apache NIFI - 實踐機會的簡要概述

科技深化

在規劃問題的解決方案時,我們首先決定使用 Spring 框架、Nginx 平衡器、Postgres 資料庫和其他技術性和非技術性的東西在 Java 中製作應用程式。 由於開發技術解決方案的時間讓我們可以考慮其他方法來解決這個問題,因此我們的目光落在了在某些圈子裡很流行的 Apache NIFI 技術。 我馬上就會說,這項技術讓我們注意到這 3 項服務。 本文將描述向消費者提供文件傳輸服務和資料傳輸服務的開發,但如果本文有用,我將寫有關更新資料庫中資料的服務。

這是什麼

NIFI 是一種分散式架構,用於快速並行載入和處理資料、大量用於來源和轉換的插件、配置的版本控制等等。 一個很好的好處是它非常容易使用。 getFile、sendHttpRequest 等簡單流程可以用方形表示。 每個方塊代表一個過程,其交互作用如下圖所示。 有關流程設定互動的更詳細文件已編寫 這裡 ,對於那些說俄語的人 - 這裡。 該文件完美地描述如何解壓縮和運行NIFI,以及如何建立進程,也稱為方塊
經過長時間的搜尋和將收到的資訊結構化為有意識的東西,以及讓未來開發人員的生活變得更輕鬆的願望,寫一篇文章的想法誕生了。

例子

考慮正方形如何相互作用的範例。 整體方案非常簡單:我們接收一個 HTTP 請求(理論上,請求正文中包含一個文件。為了演示 NIFI 的功能,在本例中,請求啟動從本地文件存儲接收文件的過程),然後我們發回請求已收到的回應,同時執行從FH 接收檔案的過程,然後透過FTP 將檔案移到FH 的過程。 值得澄清的是,進程之間透過所謂的flowFile進行互動。 這是 NIFI 中儲存屬性和內容的基本實體。 內容是流文件表示的資料。 也就是說,粗略地說,如果您從一個方塊收到一個檔案並將其傳輸到另一個方塊,則內容將是您的檔案。

Apache NIFI - 實踐機會的簡要概述

正如你所看到的,這張圖展示了大致的流程。 HandleHttpRequest - 接受請求,ReplaceText - 產生回應正文,HandleHttpResponse - 發送回應。 FetchFile - 從檔案儲存接收文件,將其傳輸到方形 PutSftp - 將此文件放在 FTP 上的指定位址。 現在更多地了解這個過程。

在這種情況下,請求就是一切的開始。 我們來看看它的配置參數。

Apache NIFI - 實踐機會的簡要概述

除了 StandardHttpContextMap 之外,這裡的一切都很瑣碎 - 這是一種允許您發送和接收請求的服務。 更詳細地甚至通過示例,您可以看到 - 這裡

接下來我們來看一下方區塊的ReplaceText配置參數。 值得關注 ReplacementValue - 這將以響應的形式返回給用戶。 在設定中您可以調整日誌記錄的級別,您可以看到日誌{您解壓縮nifi的位置}/nifi-1.9.2/logs,還有失敗/成功參數 - 基於這些參數您可以整體調節流程。 也就是說,在文字處理成功的情況下,將呼叫向使用者發送回應的過程,而在另一種情況下,我們將簡單地記錄不成功的過程。

Apache NIFI - 實踐機會的簡要概述

除了成功建立回應時的狀態之外,HandleHttpResponse 屬性中沒有什麼特別有趣的內容。

Apache NIFI - 實踐機會的簡要概述

我們已經整理好了請求和回應 - 讓我們繼續接收檔案並將其放置在 FTP 伺服器上。 FetchFile - 在設定中指定的路徑接收檔案並將其傳遞到下一個進程。

Apache NIFI - 實踐機會的簡要概述

然後 PutSftp 方塊 - 將檔案放入檔案儲存中。 我們可以看到下面的配置參數。

Apache NIFI - 實踐機會的簡要概述

值得注意的是,每個方塊都是必須啟動的單獨進程。 我們查看了最簡單的範例,不需要任何複雜的自訂。 接下來,我們將看看稍微複雜一點的過程,我們將在凹槽上寫一些內容。

更複雜的例子

由於修改 SOAP 訊息的過程,提供給消費者的資料傳輸服務變得稍微複雜一些。 大致流程如下圖所示。

Apache NIFI - 實踐機會的簡要概述

這裡的想法也不是特別複雜:我們收到了消費者的一個請求,他需要數據,發送了一個他收到訊息的回應,開始了接收回應文件的過程,然後按照一定的邏輯進行編輯,然後將文件以SOAP 訊息的形式傳送給伺服器的使用者。

我認為沒有必要再次描述我們上面看到的那些方塊 - 讓我們直接轉向新的方塊。 如果您需要編輯任何文件並且普通的 ReplaceText 類型的方塊不適合,您將必須編寫自己的腳本。 這可以使用 ExecuteGroogyScript 方塊來完成。 其設定如下所示。

Apache NIFI - 實踐機會的簡要概述

有兩個選項可將腳本載入到該方塊中。 第一種方法是下載帶有腳本的檔案。 第二種是將腳本插入 scriptBody 中。 據我所知,executeScript square 支援多種語言 - 其中之一是 groovy。 我會讓java開發人員失望的——你不能在這樣的方塊中用java編寫腳本。 對於那些真正想要的人,您需要建立自己的自訂方塊並將其添加到 NIFI 系統中。 整個操作伴隨著相當長的手鼓舞蹈,我們不會在本文中討論。 我選擇了groovy語言。 下面是一個測試腳本,它只是增量更新 SOAP 訊息中的 id。 值得注意的是。 您從 flowFile 獲取文件並更新它,不要忘記您需要將其放回原處並進行更新。 另外值得注意的是,並非所有函式庫都包含在內。 您可能仍然需要匯入其中一個庫。 另一個缺點是這個方塊中的腳本很難調試。 有一種方法可以連接到 NIFI JVM 並啟動調試過程。 就我個人而言,我啟動了一個本機應用程式並模擬從會話接收檔案。 我也在本地進行了調試。 載入腳本時出現的錯誤很容易被 Google 搜尋到,並且由 NIFI 本身寫入日誌。

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

實際上,正方形的定制就到此結束了。 接下來,更新後的檔案被傳輸到square,它負責將檔案傳送到伺服器。 以下是該方塊的設定。

Apache NIFI - 實踐機會的簡要概述

我們描述傳輸 SOAP 訊息的方法。 我們寫在哪裡。 接下來您需要表明這是 SOAP。

Apache NIFI - 實踐機會的簡要概述

新增多個屬性,例如主機和操作 (soapAction)。 我們保存並檢查。 您可以查看有關如何發送 SOAP 請求的更多詳細信息 這裡

我們研究了使用 NIFI 進程的幾種選項。 他們如何互動以及他們的真正好處是什麼? 所考慮的例子是測試範例,與戰鬥中實際發生的情況略有不同。 我希望這篇文章對開發者來說有點用處。 感謝您的關注。 如果您有任何疑問,請寫信。 我會嘗試回答。

來源: www.habr.com

添加評論