您好!
任务如下 - 有一个流程,如上图所示,需要将其推广到 N 个服务器
NiFi 站点到站点 (S2S) 是一种在 NiFi 实例之间传输数据的安全且易于配置的方式。 S2S 的工作原理,请参阅
在我们谈论使用 S2S 进行数据传输的情况下,一个实例称为客户端,第二个实例称为服务器。 客户端发送数据,服务器接收。 配置它们之间的数据传输的两种方法:
- 推。 使用远程进程组 (RPG) 从客户端实例发送数据。 在服务器实例上,使用输入端口接收数据
- 拉。 服务器使用RPG接收数据,客户端使用Output端口发送数据。
推出流程存储在 Apache 注册表中。
Apache NiFiRegistry是ApacheNiFi的一个子项目,提供流存储和版本控制的工具。 一种 GIT。 有关安装、配置和使用注册表的信息可以在
开始时,当N较小时,在可接受的时间内手动交付和更新流量。
但随着 N 的增长,问题也变得越来越多:
- 更新流程需要更多时间。 您需要登录所有服务器
- 出现模板更新错误。 在这里他们更新了,但在这里他们忘记了
- 执行大量类似操作时的人为错误
所有这些让我们意识到我们需要自动化该过程。 我尝试了以下方法来解决这个问题:
- 使用 MiNiFi 代替 NiFi
- NiFi CLI
- NiPy API
使用 MiNiFi
另一个子项目将帮助解决这个问题——MiNiFi C2 Server。 该产品旨在成为配置推出架构的中心点。 如何配置环境 - 中描述
上面文章中描述的选项是可行的,并且实施起来并不困难,但我们不能忘记以下几点:
- Minifi 不具备 nifi 的所有处理器
- Minifi 处理器版本落后于 NiFi 处理器版本。
在撰写本文时,NiFi 的最新版本是 1.9.2。 最新的 MiNiFi 处理器版本是 1.7.0。 处理器可以添加到 MiNiFi 中,但由于 NiFi 和 MiNiFi 处理器之间的版本差异,这可能不起作用。
NiFi CLI
依
启动实用程序
./bin/cli.sh
_ ___ _
Apache (_) .' ..](_) ,
_ .--. __ _| |_ __ )
[ `.-. | [ |'-| |-'[ | /
| | | | | | | | | | ' '
[___||__][___][___] [___]', ,'
`'
CLI v1.9.2
Type 'help' to see a list of available commands, use tab to auto-complete.
为了让我们从注册中心加载所需的流,我们需要知道桶的标识符(桶标识符)和流本身(流标识符)。 该数据可以通过 cli 或 NiFi 注册表 Web 界面获取。 在网络界面中,它看起来像这样:
使用 CLI 可以完成以下操作:
#> registry list-buckets -u http://nifi-registry:18080
# Name Id Description
- -------------- ------------------------------------ -----------
1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty)
#> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080
# Name Id Description
- ------------ ------------------------------------ -----------
1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
我们开始从注册表导入进程组:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080
7f522a13-016e-1000-e504-d5b15587f2f3
重要的一点是,任何 nifi 实例都可以指定为我们将进程组滚动到的主机。
添加了已停止处理器的进程组,需要启动它们
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
太好了,处理器已经启动了。 然而,根据任务条款,我们需要 NiFi 实例向其他实例发送数据。 假设您选择了 Push 方法将数据传输到服务器。 为了组织数据传输,您需要在添加的远程进程组 (RPG) 上启用数据传输,该组已包含在我们的流程中。
在 CLI 和其他来源的文档中,我没有找到启用数据传输的方法。 如果您知道如何执行此操作,请写在评论中。
既然我们有bash,准备走到最后,我们就会找到出路! 您可以使用NiFi API来解决这个问题。 让我们使用以下方法,从上面的示例中获取 ID(在我们的示例中为 7f522a13-016e-1000-e504-d5b15587f2f3)。 NiFi API方法说明
在正文中,您需要传递 JSON,如下所示:
{
"revision": {
"clientId": "value",
"version": 0,
"lastModifier": "value"
},
"state": "value",
"disconnectedNodeAcknowledged": true
}
需要填写的参数才能工作:
州 — 数据传输状态。 可用:TRANSMITTING 启用数据传输,STOPPED 禁用
版本 - 处理器版本
version创建时会默认为0,但是这些参数可以使用方法获取
对于 bash 脚本的爱好者来说,这种方法可能看起来很合适,但对我来说有点困难 - bash 脚本不是我的最爱。 我认为下一个方法更有趣、更方便。
NiPy API
NiPyAPI 是一个用于与 NiFi 实例交互的 Python 库。
我们用于推出配置的脚本是一个 Python 程序。 让我们继续编码。
我们为进一步的工作设置了配置。 我们需要以下参数:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #путь до nifi-api инстанса, на котором разворачиваем process group
nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #путь до nifi-registry-api registry
nipyapi.config.registry_name = 'MyBeutifulRegistry' #название registry, как будет называться в инстансе nifi
nipyapi.config.bucket_name = 'BucketName' #название bucket, из которого подтягиваем flow
nipyapi.config.flow_name = 'FlowName' #название flow, которое подтягиваем
接下来我将插入这个库的方法的名称,这些方法的描述如下
使用以下命令将注册表连接到 nifi 实例
nipyapi.versioning.create_registry_client
在此步骤中,您还可以添加一个检查,以确保注册表已添加到实例中;为此,您可以使用以下方法
nipyapi.versioning.list_registry_clients
我们找到桶,进一步搜索篮子里的流量
nipyapi.versioning.get_registry_bucket
使用找到的桶,我们寻找流量
nipyapi.versioning.get_flow_in_bucket
接下来,重要的是要了解该进程组是否已经添加。 Process 组根据坐标放置,当第二个组件叠加在一个组件之上时可能会出现这种情况。 我检查过,这可能会发生:)为了获取所有添加的进程组,我们使用该方法
nipyapi.canvas.list_all_process_groups
我们可以进一步搜索,例如按名称。
我不会描述更新模板的过程,我只会说如果在新版本的模板中添加了处理器,那么队列中消息的存在就没有问题。 但是如果处理器被移除,那么可能会出现问题(如果消息队列前面积累了一个处理器,nifi 不允许你移除该处理器)。 如果您对我如何解决这个问题感兴趣,请写信给我,我们将讨论这个问题。 联系方式见文章末尾。 让我们继续添加进程组的步骤。
在调试脚本时,我发现了一个特点,即最新版本的 flow 并不总是被拉起,所以我建议首先检查这个版本:
nipyapi.versioning.get_latest_flow_ver
部署进程组:
nipyapi.versioning.deploy_flow_version
我们启动处理器:
nipyapi.canvas.schedule_process_group
在关于 CLI 的块中写道,远程进程组中不会自动启用数据传输? 我在实现脚本的时候也遇到了这个问题。 当时,我无法使用 API 开始数据传输,因此我决定写信给 NiPyAPI 库的开发人员寻求建议/帮助。 开发人员回复了我,我们讨论了这个问题,他写道他需要时间“检查一些东西”。 然后,几天后,收到一封信,其中用 Python 编写了一个函数,解决了我的启动问题! 当时 NiPyAPI 版本是 0.13.3,当然还没有这样的东西。 但在最近发布的0.14.0版本中,这个函数已经包含在库中了。 见面,
nipyapi.canvas.set_remote_process_group_transmission
因此,使用 NiPyAPI 库,我们连接了注册表,推出了流程,甚至启动了处理器和数据传输。 然后你可以梳理代码,添加各种检查、日志记录,仅此而已。 但这是一个完全不同的故事。
在我考虑的自动化选项中,最后一种对我来说似乎是最有效的。 首先,这仍然是Python代码,您可以在其中嵌入辅助程序代码并利用该编程语言的所有优势。 其次,NiPyAPI 项目正在积极开发中,如果出现问题,您可以写信给开发人员。 第三,NiPyAPI仍然是与NiFi交互解决复杂问题的更灵活的工具。 例如,确定流中的消息队列现在是否为空以及进程组是否可以更新。
就这样。 我描述了 NiFi 中自动化流交付的 3 种方法、开发人员可能遇到的陷阱,并提供了自动化交付的工作代码。 如果你和我一样对这个话题感兴趣——
来源: habr.com