Apache NiFi 中的流傳輸自動化

大家好!

Apache NiFi 中的流傳輸自動化

任務如下 - 有一個如上圖所示的流程,需要將其推廣到 N 個服務器 阿帕奇NiFi。 流測試 - 正在生成文件並將其發送到另一個 NiFi 實例。 數據傳輸使用 NiFi 站點到站點協議進行。

NiFi 站點到站點 (S2S) 是一種在 NiFi 實例之間傳輸數據的安全、高度可定制的方式。 了解 S2S 的工作原理 文件 重要的是要記住設置你的 NiFi 實例以允許 S2S 看到 這裡.

當涉及使用 S2S 進行數據傳輸時,一個實例稱為客戶端,第二個實例稱為服務器。 客戶端發送數據,服務器接收數據。 設置它們之間的數據傳輸的兩種方法:

  1. 。 數據是使用遠程進程組 (RPG) 從客戶端實例發送的。 在服務器實例上,使用輸入端口接收數據
  2. 。 服務器使用 RPG 接收數據,客戶端使用輸出端口發送數據。


滾動流存儲在 Apache 註冊表中。

Apache NiFiRegistry是ApacheNiFi的一個子項目,提供流存儲和版本控制工具。 一種 GIT。 有關安裝、配置和使用註冊表的信息可以在 官方文檔。 用於存儲的流程被組合成進程組並以這種形式存儲在註冊表中。 我們將在本文後面討論這一點。

一開始,當N是一個很小的數字時,流程會在合理的時間內手工交付和更新。

但隨著N的增長,問題也越來越多:

  1. 更新流程需要更多時間。 你需要去所有服務器
  2. 更新模板時出錯。 他們在這裡更新了,但在這裡他們忘記了
  3. 執行大量類似操作時的人為錯誤

所有這些讓我們意識到有必要實現流程自動化。 我嘗試了以下方法來解決這個問題:

  1. 使用 MiNiFi 代替 NiFi
  2. NiFi CLI
  3. NiPy API

使用 MiNiFi

ApacheMiNify 是 Apache NiFi 的子項目。 MiNiFy 是一個緊湊的代理,它使用與 NiFi 相同的處理器,允許您創建與 NiFi 中相同的流程。 除其他外,由於 MiNiFy 沒有用於流配置的圖形界面,因此實現了代理的輕便性。 MiNiFy缺乏圖形界面意味著需要解決minifi中的流傳遞問題。 由於 MiNiFy 在物聯網中被積極使用,因此有許多組件,並且將流傳輸到最終 minifi 實例的過程必須自動化。 一個熟悉的任務,對吧?

另一個子項目 MiNiFi C2 Server 將幫助解決這個問題。 該產品旨在成為部署架構中的中心點。 如何配置環境 - 中描述 這篇文章 關於哈布雷的信息足以解決問題。 MiNiFi 與 C2 服務器一起自動更新其配置。 這種方法的唯一缺點是您必須在 C2 服務器上創建模板;簡單地提交到註冊表是不夠的。

上面文章中描述的選項是可行的,並且實施起來並不困難,但我們不能忘記以下幾點:

  1. minifi 不具備 nifi 的所有處理器
  2. Minifi 處理器版本落後於 NiFi 處理器版本。

在撰寫本文時,NiFi 的最新版本是 1.9.2。 最新MiNiFi版本的處理器版本是1.7.0。 處理器可以添加到 MiNiFi 中,但由於 NiFi 和 MiNiFi 處理器之間的版本差異,這可能不起作用。

NiFi CLI

判斷依據 描述 官網上的工具,這是一個在流量交付或流程管理領域自動化NiFI和NiFiRegistry交互的工具。 下載此工具即可開始使用。 .

啟動實用程序

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

為了讓我們從註冊表加載必要的流,我們需要知道籃子的標識符(桶標識符)和流本身(流標識符)。 該數據可以通過 cli 或 NiFi 註冊表 Web 界面獲取。 網絡界面如下所示:

Apache NiFi 中的流傳輸自動化

使用 CLI,您可以執行以下操作:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

從註冊表運行導入進程組:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

重要的一點是,任何 nifi 實例都可以指定為我們滾動進程組的主機。

添加了已停止處理器的進程組,需要啟動它們

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

太好了,處理器已經啟動了。 但是,根據問題的情況,我們需要NiFi實例向其他實例發送數據。 我們假設選擇 Push 方法將數據傳輸到服務器。 為了組織數據傳輸,需要在添加的遠程進程組(RPG)上啟用數據傳輸(Enable Transmission),該遠程進程組已包含在我們的流程中。

Apache NiFi 中的流傳輸自動化

在 CLI 和其他來源的文檔中,我沒有找到啟用數據傳輸的方法。 如果您知道如何執行此操作,請寫在評論中。

既然我們有bash,準備走到最後,我們就會找到出路! 您可以使用NiFi API來解決這個問題。 讓我們使用以下方法,從上面的示例中獲取 ID(在我們的示例中為 7f522a13-016e-1000-e504-d5b15587f2f3)。 NiFi API方法說明 這裡.

Apache NiFi 中的流傳輸自動化
在正文中,您需要傳遞以下形式的 JSON:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

必須填寫才能“工作”的參數:
— 數據傳輸狀態。 可用 TRANSMITTING 啟用數據傳輸,STOPPED 禁用
版本 - 處理器版本

version創建時會默認為0,但是這些參數可以使用方法獲取

Apache NiFi 中的流傳輸自動化

對於 bash 腳本愛好者來說,這種方法似乎很合適,但對我來說很難 - bash 腳本不是我的最愛。 我認為接下來的方式更有趣,也更方便。

NiPy API

NiPyAPI 是一個用於與 NiFi 實例交互的 Python 庫。 文檔頁面 包含與圖書館合作所需的信息。 快速啟動描述於 項目 在 github 上。

我們用於推出配置的腳本是一個 Python 程序。 讓我們繼續編碼。
設置配置以進行進一步的工作。 我們需要以下參數:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

此外,我將插入該庫的方法名稱,這些方法的描述如下 這裡.

我們使用以下命令將註冊表連接到 nifi 實例

nipyapi.versioning.create_registry_client

在此步驟中,您還可以添加檢查註冊表是否已添加到實例中,為此您可以使用以下方法

nipyapi.versioning.list_registry_clients

我們找到桶進一步尋找籃子裡的流量

nipyapi.versioning.get_registry_bucket

使用找到的桶,我們尋找流量

nipyapi.versioning.get_flow_in_bucket

接下來,了解該進程組是否已添加非常重要。 進程組是按坐標放置的,當第二個進程組疊加在一個進程組之上時可能會出現這種情況。 我查了一下,可以的 🙂 要獲取所有添加的進程組,請使用該方法

nipyapi.canvas.list_all_process_groups

然後我們可以例如按名稱進行搜索。

我不會描述更新模板的過程,我只會說如果在新版本的模板中添加處理器,那麼隊列中消息的存在就沒有問題。 但是如果處理器被移除,那麼可能會出現問題(如果消息隊列前面積累了一個處理器,nifi 不允許你移除該處理器​​)。 如果您對我如何解決這個問題感興趣,請寫信給我,我們將討論這個問題。 聯繫方式見文章末尾。 讓我們繼續添加進程組的步驟。

在調試腳本的時候,我遇到了一個特點,最新版本的flow總是拉不起來,所以我建議你先澄清一下這個版本:

nipyapi.versioning.get_latest_flow_ver

部署進程組:

nipyapi.versioning.deploy_flow_version

我們啟動處理器:

nipyapi.canvas.schedule_process_group

在關於 CLI 的塊中寫道,遠程進程組中不會自動啟用數據傳輸? 我在實現腳本的時候也遇到了這個問題。 當時,我無法使用 API 開始數據傳輸,因此我決定寫信給 NiPyAPI 庫的開發人員尋求建議/幫助。 開發人員回復了我,我們討論了這個問題,他寫道他需要時間“檢查一些東西”。 然後,幾天后,收到一封信,其中用 Python 編寫了一個函數,解決了我的啟動問題! 當時 NiPyAPI 版本是 0.13.3,當然還沒有這樣的東西。 但在最近發布的0.14.0版本中,這個函數已經包含在庫中了。 見面,

nipyapi.canvas.set_remote_process_group_transmission

因此,在 NiPyAPI 庫的幫助下,我們連接了註冊表、匯總了流程,甚至啟動了處理器和數據傳輸。 然後你可以梳理代碼,添加各種檢查、日誌記錄,僅此而已。 但這是一個完全不同的故事。

在我考慮的自動化選項中,後者對我來說似乎是最有效的。 首先,這仍然是Python代碼,您可以在其中嵌入輔助程序代碼並充分利用編程語言。 其次,NiPyAPI 項目正在積極開發中,如果出現問題,您可以寫信給開發人員。 第三,NiPyAPI仍然是與NiFi交互解決複雜問題的更靈活的工具。 例如,確定流中的消息隊列當前是否為空以及是否可以更新進程組。

就這樣。 我描述了 NiFi 中自動化流交付的 3 種方法、開發人員可能遇到的陷阱,並提供了用於自動化交付的工作代碼。 如果你和我一樣對這個話題感興趣 - 寫!

來源: www.habr.com

添加評論