Apache NIFI - 实践机会的简要概述

介绍

碰巧在我现在的工作地点我必须熟悉这项技术。 我将从一些背景开始。 在下次会议上,我们的团队被告知我们需要与 已知系统。 集成意味着这个众所周知的系统将通过 HTTP 向特定端点发送请求,而奇怪的是,我们将以 SOAP 消息的形式发回响应。 一切都显得简单而琐碎。 由此可见,您需要...

任务

创建3个服务。 第一个是数据库更新服务。 当来自第三方系统的新数据到达时,该服务会更新数据库中的数据并生成 CSV 格式的文件以将其传输到下一个系统。 第二个服务的端点称为 FTP 传输服务,它接收传输的文件,对其进行验证,然后通过 FTP 将其放入文件存储中。 第三个服务是消费者数据传输服务,与前两个服务异步工作。 它接收来自第三方外部系统的请求以接收上述文件,获取准备好的响应文件,对其进行修改(更新 id、description、linkToFile 字段)并以 SOAP 消息的形式发送响应。 也就是说,整体情况是这样的:只有当更新的数据到达时,前两个服务才开始工作。 第三个服务持续工作,因为信息消费者很多,每分钟大约有 1000 个数据请求。 服务持续可用,其实例位于不同的环境中,例如测试、演示、预生产和生产。 下图展示了这些服务的工作原理。 让我立即澄清,一些细节已被简化,以避免不必要的复杂性。

Apache NIFI - 实践机会的简要概述

技术深化

在规划问题的解决方案时,我们首先决定使用 Spring 框架、Nginx 平衡器、Postgres 数据库和其他技术性和非技术性的东西在 Java 中制作应用程序。 由于开发技术解决方案的时间允许我们考虑其他方法来解决这个问题,因此我们的目光落在了在某些圈子里很流行的 Apache NIFI 技术。 我马上就会说,这项技术让我们能够注意到这 3 项服务。 本文将描述向消费者提供文件传输服务和数据传输服务的开发,但如果本文有用,我将写有关更新数据库中数据的服务。

这是什么

NIFI 是一种分布式架构,用于快速并行加载和处理数据、大量用于源和转换的插件、配置的版本控制等等。 一个很好的好处是它非常容易使用。 getFile、sendHttpRequest 等简单流程可以用正方形表示。 每个方块代表一个过程,其交互作用如下图所示。 有关流程设置交互的更详细文档已编写 这里 ,对于那些说俄语的人 - 这里。 该文档完美地描述了如何解压和运行NIFI,以及如何创建进程,也称为方块
经过长时间的搜索和将收到的信息结构化为有意识的东西,以及让未来开发人员的生活变得更轻松的愿望,写一篇文章的想法诞生了。

例子

考虑正方形如何相互作用的示例。 总体方案非常简单:我们接收一个 HTTP 请求(理论上,请求正文中包含一个文件。为了演示 NIFI 的功能,在本例中,请求启动从本地文件存储接收文件的过程),然后我们发回请求已收到的响应,同时执行从 FH 接收文件的过程,然后通过 FTP 将文件移动到 FH 的过程。 值得澄清的是,进程之间通过所谓的flowFile进行交互。 这是 NIFI 中存储属性和内容的基本实体。 内容是流文件表示的数据。 也就是说,粗略地说,如果您从一个方块收到一个文件并将其传输到另一个方块,则内容将是您的文件。

Apache NIFI - 实践机会的简要概述

正如你所看到的,这张图展示了大致的流程。 HandleHttpRequest - 接受请求,ReplaceText - 生成响应正文,HandleHttpResponse - 发送响应。 FetchFile - 从文件存储接收文件,将其传输到方形 PutSftp - 将此文件放在 FTP 上的指定地址。 现在更多地了解这个过程。

在这种情况下,请求就是一切的开始。 我们来看看它的配置参数。

Apache NIFI - 实践机会的简要概述

除了 StandardHttpContextMap 之外,这里的一切都很琐碎 - 这是一种允许您发送和接收请求的服务。 更详细地甚至通过示例,您可以看到 - 这里

接下来我们看一下方块的ReplaceText配置参数。 值得关注 ReplacementValue - 这将以响应的形式返回给用户。 在设置中您可以调整日志记录的级别,您可以看到日志{您解压nifi的位置}/nifi-1.9.2/logs,还有失败/成功参数 - 基于这些参数您可以整体调节流程。 也就是说,在文本处理成功的情况下,将调用向用户发送响应的过程,而在另一种情况下,我们将简单地记录不成功的过程。

Apache NIFI - 实践机会的简要概述

除了成功创建响应时的状态之外,HandleHttpResponse 属性中没有什么特别有趣的内容。

Apache NIFI - 实践机会的简要概述

我们已经整理好了请求和响应 - 让我们继续接收文件并将其放置在 FTP 服务器上。 FetchFile - 在设置中指定的路径接收文件并将其传递到下一个进程。

Apache NIFI - 实践机会的简要概述

然后 PutSftp 方块 - 将文件放入文件存储中。 我们可以看到下面的配置参数。

Apache NIFI - 实践机会的简要概述

值得注意的是,每个方块都是一个必须启动的单独进程。 我们查看了最简单的示例,不需要任何复杂的自定义。 接下来,我们将看看稍微复杂一点的过程,我们将在凹槽上写一些内容。

更复杂的例子

由于修改 SOAP 消息的过程,向消费者提供的数据传输服务变得稍微复杂一些。 大致流程如下图所示。

Apache NIFI - 实践机会的简要概述

这里的思路也不是特别复杂:我们收到了消费者的一个请求,他需要数据,发送了一个他收到消息的响应,开始了接收响应文件的过程,然后按照一定的逻辑进行编辑,然后将文件以 SOAP 消息的形式传输给服务器的使用者。

我认为没有必要再次描述我们上面看到的那些方块 - 让我们直接转向新的方块。 如果您需要编辑任何文件并且普通的 ReplaceText 类型的方块不适合,您将必须编写自己的脚本。 这可以使用 ExecuteGroogyScript 方块来完成。 其设置如下所示。

Apache NIFI - 实践机会的简要概述

有两个选项可将脚本加载到该方块中。 第一种方法是下载带有脚本的文件。 第二种是将脚本插入到 scriptBody 中。 据我所知,executeScript square 支持多种语言 - 其中之一是 groovy。 我会让java开发人员失望的——你不能在这样的方块中用java编写脚本。 对于那些真正想要的人,您需要创建自己的自定义方块并将其添加到 NIFI 系统中。 整个操作伴随着相当长的手鼓舞蹈,我们在本文中不会讨论。 我选择了groovy语言。 下面是一个测试脚本,它只是增量更新 SOAP 消息中的 id。 值得注意的是。 您从 flowFile 中获取文件并更新它,不要忘记您需要将其放回原处并进行更新。 还值得注意的是,并非所有库都包含在内。 您可能仍然需要导入其中一个库。 另一个缺点是这个方块中的脚本很难调试。 有一种方法可以连接到 NIFI JVM 并启动调试过程。 就我个人而言,我启动了一个本地应用程序并模拟从会话接收文件。 我也在本地进行了调试。 加载脚本时出现的错误很容易被 Google 搜索到,并且由 NIFI 本身写入日志。

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

实际上,正方形的定制就到此结束了。 接下来,更新后的文件被传输到square,它负责将文件发送到服务器。 以下是该方块的设置。

Apache NIFI - 实践机会的简要概述

我们描述传输 SOAP 消息的方法。 我们写在哪里。 接下来您需要表明这是 SOAP。

Apache NIFI - 实践机会的简要概述

添加多个属性,例如主机和操作 (soapAction)。 我们保存并检查。 您可以查看有关如何发送 SOAP 请求的更多详细信息 这里

我们研究了使用 NIFI 进程的几种选项。 他们如何互动以及他们的真正好处是什么? 所考虑的示例是测试示例,与战斗中实际发生的情况略有不同。 我希望这篇文章对开发者来说有点用处。 感谢您的关注。 如果您有任何疑问,请写信。 我会尝试回答。

来源: habr.com

添加评论