Apache NiFi 中的流传输自动化

您好!

Apache NiFi 中的流传输自动化

任务如下 - 有一个流程,如上图所示,需要将其推广到 N 个服务器 阿帕奇尼菲。 流测试 - 正在生成文件并将其发送到另一个 NiFi 实例。 数据传输使用 NiFi 站点到站点协议进行。

NiFi 站点到站点 (S2S) 是一种在 NiFi 实例之间传输数据的安全且易于配置的方式。 S2S 的工作原理,请参阅 文件资料 重要的是不要忘记配置 NiFi 实例以允许 S2S,请参阅 这里.

在我们谈论使用 S2S 进行数据传输的情况下,一个实例称为客户端,第二个实例称为服务器。 客户端发送数据,服务器接收。 配置它们之间的数据传输的两种方法:

  1. 。 使用远程进程组 (RPG) 从客户端实例发送数据。 在服务器实例上,使用输入端口接收数据
  2. 。 服务器使用RPG接收数据,客户端使用Output端口发送数据。


推出流程存储在 Apache 注册表中。

Apache NiFiRegistry是ApacheNiFi的一个子项目,提供流存储和版本控制的工具。 一种 GIT。 有关安装、配置和使用注册表的信息可以在 官方文档。 用于存储的流程被组合成进程组并以这种形式存储在注册表中。 我们将在本文后面讨论这一点。

开始时,当N较小时,在可接受的时间内手动交付和更新流量。

但随着 N 的增长,问题也变得越来越多:

  1. 更新流程需要更多时间。 您需要登录所有服务器
  2. 出现模板更新错误。 在这里他们更新了,但在这里他们忘记了
  3. 执行大量类似操作时的人为错误

所有这些让我们意识到我们需要自动化该过程。 我尝试了以下方法来解决这个问题:

  1. 使用 MiNiFi 代替 NiFi
  2. NiFi CLI
  3. NiPy API

使用 MiNiFi

Apache MiniFy - Apache NiFi 的子项目。 MiNiFy 是一个紧凑的代理,它使用与 NiFi 相同的处理器,允许您创建与 NiFi 中相同的流。 除其他外,MiNiFy 没有用于流配置的图形界面,从而实现了代理的轻量级特性。 MiNiFy中缺乏图形界面意味着需要解决向minifi传递流量的问题。 由于 MiNiFy 在物联网中被广泛使用,因此有很多组件,并且将流传输到最终 minifi 实例的过程需要自动化。 一个熟悉的任务,对吧?

另一个子项目将帮助解决这个问题——MiNiFi C2 Server。 该产品旨在成为配置推出架构的中心点。 如何配置环境 - 中描述 本文 关于哈布雷的信息足以解决这个问题。 MiNiFi 与 C2 服务器一起自动更新其配置。 这种方法的唯一缺点是您必须在 C2 服务器上创建模板;简单地提交到注册表是不够的。

上面文章中描述的选项是可行的,并且实施起来并不困难,但我们不能忘记以下几点:

  1. Minifi 不具备 nifi 的所有处理器
  2. Minifi 处理器版本落后于 NiFi 处理器版本。

在撰写本文时,NiFi 的最新版本是 1.9.2。 最新的 MiNiFi 处理器版本是 1.7.0。 处理器可以添加到 MiNiFi 中,但由于 NiFi 和 MiNiFi 处理器之间的版本差异,这可能不起作用。

NiFi CLI

描述 官网上的工具,这是一个在流量交付或流程管理领域自动化NiFI和NiFiRegistry交互的工具。 首先,您需要下载此工具。 .

启动实用程序

./bin/cli.sh
           _     ___  _
 Apache   (_)  .' ..](_)   ,
 _ .--.   __  _| |_  __    )
[ `.-. | [  |'-| |-'[  |  /  
|  | | |  | |  | |   | | '    '
[___||__][___][___] [___]',  ,'
                           `'
          CLI v1.9.2

Type 'help' to see a list of available commands, use tab to auto-complete.

为了让我们从注册中心加载所需的流,我们需要知道桶的标识符(桶标识符)和流本身(流标识符)。 该数据可以通过 cli 或 NiFi 注册表 Web 界面获取。 在网络界面中,它看起来像这样:

Apache NiFi 中的流传输自动化

使用 CLI 可以完成以下操作:

#> registry list-buckets -u http://nifi-registry:18080

#   Name             Id                                     Description
-   --------------   ------------------------------------   -----------
1   test_bucket   709d387a-9ce9-4535-8546-3621efe38e96   (empty)

#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080

#   Name           Id                                     Description
-   ------------   ------------------------------------   -----------
1   test_flow   d27af00a-5b47-4910-89cd-9c664cd91e85

我们开始从注册表导入进程组:

#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080

7f522a13-016e-1000-e504-d5b15587f2f3

重要的一点是,任何 nifi 实例都可以指定为我们将进程组滚动到的主机。

添加了已停止处理器的进程组,需要启动它们

#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080

太好了,处理器已经启动了。 然而,根据任务条款,我们需要 NiFi 实例向其他实例发送数据。 假设您选择了 Push 方法将数据传输到服务器。 为了组织数据传输,您需要在添加的远程进程组 (RPG) 上启用数据传输,该组已包含在我们的流程中。

Apache NiFi 中的流传输自动化

在 CLI 和其他来源的文档中,我没有找到启用数据传输的方法。 如果您知道如何执行此操作,请写在评论中。

既然我们有bash,准备走到最后,我们就会找到出路! 您可以使用NiFi API来解决这个问题。 让我们使用以下方法,从上面的示例中获取 ID(在我们的示例中为 7f522a13-016e-1000-e504-d5b15587f2f3)。 NiFi API方法说明 这里.

Apache NiFi 中的流传输自动化
在正文中,您需要传递 JSON,如下所示:

{
    "revision": {
	    "clientId": "value",
	    "version": 0,
	    "lastModifier": "value"
	},
    "state": "value",
    "disconnectedNodeAcknowledged": true
}

需要填写的参数才能工作:
— 数据传输状态。 可用:TRANSMITTING 启用数据传输,STOPPED 禁用
版本 - 处理器版本

version创建时会默认为0,但是这些参数可以使用方法获取

Apache NiFi 中的流传输自动化

对于 bash 脚本的爱好者来说,这种方法可能看起来很合适,但对我来说有点困难 - bash 脚本不是我的最爱。 我认为下一个方法更有趣、更方便。

NiPy API

NiPyAPI 是一个用于与 NiFi 实例交互的 Python 库。 文档页面 包含与图书馆合作的必要信息。 快速启动描述于 项目 在 github 上。

我们用于推出配置的脚本是一个 Python 程序。 让我们继续编码。
我们为进一步的工作设置了配置。 我们需要以下参数:

nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем

接下来我将插入这个库的方法的名称,这些方法的描述如下 这里.

使用以下命令将注册表连接到 nifi 实例

nipyapi.versioning.create_registry_client

在此步骤中,您还可以添加一个检查,以确保注册表已添加到实例中;为此,您可以使用以下方法

nipyapi.versioning.list_registry_clients

我们找到桶,进一步搜索篮子里的流量

nipyapi.versioning.get_registry_bucket

使用找到的桶,我们寻找流量

nipyapi.versioning.get_flow_in_bucket

接下来,重要的是要了解该进程组是否已经添加。 Process 组根据坐标放置,当第二个组件叠加在一个组件之上时可能会出现这种情况。 我检查过,这可能会发生:)为了获取所有添加的进程组,我们使用该方法

nipyapi.canvas.list_all_process_groups

我们可以进一步搜索,例如按名称。

我不会描述更新模板的过程,我只会说如果在新版本的模板中添加了处理器,那么队列中消息的存在就没有问题。 但是如果处理器被移除,那么可能会出现问题(如果消息队列前面积累了一个处理器,nifi 不允许你移除该处理器​​)。 如果您对我如何解决这个问题感兴趣,请写信给我,我们将讨论这个问题。 联系方式见文章末尾。 让我们继续添加进程组的步骤。

在调试脚本时,我发现了一个特点,即最新版本的 flow 并不总是被拉起,所以我建议首先检查这个版本:

nipyapi.versioning.get_latest_flow_ver

部署进程组:

nipyapi.versioning.deploy_flow_version

我们启动处理器:

nipyapi.canvas.schedule_process_group

在关于 CLI 的块中写道,远程进程组中不会自动启用数据传输? 我在实现脚本的时候也遇到了这个问题。 当时,我无法使用 API 开始数据传输,因此我决定写信给 NiPyAPI 库的开发人员寻求建议/帮助。 开发人员回复了我,我们讨论了这个问题,他写道他需要时间“检查一些东西”。 然后,几天后,收到一封信,其中用 Python 编写了一个函数,解决了我的启动问题! 当时 NiPyAPI 版本是 0.13.3,当然还没有这样的东西。 但在最近发布的0.14.0版本中,这个函数已经包含在库中了。 见面,

nipyapi.canvas.set_remote_process_group_transmission

因此,使用 NiPyAPI 库,我们连接了注册表,推出了流程,甚至启动了处理器和数据传输。 然后你可以梳理代码,添加各种检查、日志记录,仅此而已。 但这是一个完全不同的故事。

在我考虑的自动化选项中,最后一种对我来说似乎是最有效的。 首先,这仍然是Python代码,您可以在其中嵌入辅助程序代码并利用该编程语言的所有优势。 其次,NiPyAPI 项目正在积极开发中,如果出现问题,您可以写信给开发人员。 第三,NiPyAPI仍然是与NiFi交互解决复杂问题的更灵活的工具。 例如,确定流中的消息队列现在是否为空以及进程组是否可以更新。

就这样。 我描述了 NiFi 中自动化流交付的 3 种方法、开发人员可能遇到的陷阱,并提供了自动化交付的工作代码。 如果你和我一样对这个话题感兴趣——

来源: habr.com

添加评论