摘抄。 5.3. 聚合和窗口操作
在本节中,我们将继续探索 Kafka Streams 最有前途的部分。 到目前为止,我们已经介绍了 Kafka Streams 的以下几个方面:
- 创建处理拓扑;
- 在流应用程序中使用状态;
- 执行数据流连接;
- 事件流 (KStream) 和更新流 (KTable) 之间的差异。
在下面的示例中,我们将把所有这些元素结合在一起。 您还将了解窗口化,这是流媒体应用程序的另一个重要功能。 我们的第一个例子是一个简单的聚合。
5.3.1. 按行业分类的股票销售汇总
聚合和分组是处理流数据时的重要工具。 对收到的个人记录进行检查往往是不够的。 为了从数据中提取附加信息,有必要对它们进行分组和组合。
在此示例中,您将扮演一名日间交易员,他需要跟踪多个行业公司股票的销量。 具体来说,您对每个行业中份额销售额最大的五家公司感兴趣。
这种聚合将需要以下几个步骤将数据转换为所需的形式(笼统地说)。
- 创建基于主题的源来发布原始股票交易信息。 我们必须将 StockTransaction 类型的对象映射到 ShareVolume 类型的对象。 关键是 StockTransaction 对象包含销售元数据,但我们只需要有关正在出售的股票数量的数据。
- 按股票代码对 ShareVolume 数据进行分组。 按代码分组后,您可以将此数据折叠为股票销量小计。 值得注意的是,KStream.groupBy 方法返回一个 KGroupedStream 类型的实例。 进一步调用KGroupedStream.reduce方法即可获取KTable实例。
什么是 KGroupedStream 接口
KStream.groupBy 和 KStream.groupByKey 方法返回 KGroupedStream 的实例。 KGroupedStream 是按键分组后事件流的中间表示。 它根本不是为了直接使用它而设计的。 相反,KGroupedStream 用于聚合操作,这始终会生成 KTable。 由于聚合操作的结果是一个 KTable 并且它们使用状态存储,因此可能并非所有更新结果都会进一步沿着管道发送。
KTable.groupBy 方法返回一个类似的 KGroupedTable - 更新流的中间表示,按键重新分组。
让我们稍微休息一下,看一下图。 5.9,这显示了我们所取得的成就。 您应该已经非常熟悉这种拓扑结构。
现在让我们看看这个拓扑的代码(可以在文件 src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java 中找到)(清单 5.2)。
给定的代码以其简洁性和在多行中执行的大量操作而著称。 您可能会注意到 builder.stream 方法的第一个参数中有一些新内容:枚举类型 AutoOffsetReset.EARLIEST(还有一个 LATEST)的值,使用 Consumed.withOffsetResetPolicy 方法设置。 此枚举类型可用于为每个 KStream 或 KTable 指定偏移重置策略,并优先于配置中的偏移重置选项。
GroupByKey 和 GroupBy
KStream 接口有两种对记录进行分组的方法:GroupByKey 和 GroupBy。 两者都返回一个 KGroupedTable,因此您可能想知道它们之间有什么区别以及何时使用哪一个?
当 KStream 中的键已经非空时,使用 GroupByKey 方法。 最重要的是,“需要重新分区”标志从未设置。
GroupBy 方法假定您已更改分组键,因此重新分区标志设置为 true。 在 GroupBy 方法之后执行联接、聚合等将导致自动重新分区。
摘要: 只要有可能,您应该使用 GroupByKey 而不是 GroupBy。
很清楚mapValues和groupBy方法的作用,所以让我们看一下sum()方法(位于src/main/java/bbejeck/model/ShareVolume.java)(清单5.3)。
ShareVolume.sum方法返回股票销量的运行总计,整个计算链的结果是一个KTable对象。 现在您了解了 KTable 所扮演的角色。 当ShareVolume对象到达时,对应的KTable对象存储最新的当前更新。 重要的是要记住,所有更新都会反映在之前的 shareVolumeKTable 中,但并非所有更新都会进一步发送。
然后,我们使用此 KTable 进行汇总(按交易股票数量),得出每个行业中股票交易量最高的五家公司。 在这种情况下,我们的操作将与第一次聚合的操作类似。
- 执行另一个 groupBy 操作以按行业对各个 ShareVolume 对象进行分组。
- 开始总结 ShareVolume 对象。 这次聚合对象是一个固定大小的优先级队列。 在这个固定大小的队列中,仅保留出售股票数量最多的五家公司。
- 将上一段中的队列映射到字符串值,并按行业数量返回交易量排名前五的股票。
- 将结果以字符串形式写入主题。
在图中。 数据流拓扑图如图5.10所示。 正如您所看到的,第二轮处理非常简单。
现在我们已经清楚地了解了第二轮处理的结构,我们可以转向它的源代码(您可以在文件 src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java 中找到它)(清单 5.4) 。
该初始化程序包含一个fixedQueue 变量。 这是一个自定义对象,它是 java.util.TreeSet 的适配器,用于跟踪按交易股票降序排列的前 N 个结果。
您已经看到了 groupBy 和 mapValues 调用,因此我们不会深入讨论它们(我们调用 KTable.toStream 方法是因为 KTable.print 方法已被弃用)。 但是您还没有看到aggregate() 的KTable 版本,所以我们将花一些时间讨论它。
正如您所记得的,KTable 的不同之处在于具有相同键的记录被视为更新。 KTable 用新条目替换旧条目。 聚合以类似的方式发生:聚合具有相同键的最新记录。 当一条记录到达时,会使用加法器(聚合方法调用中的第二个参数)将其添加到 FixSizePriorityQueue 类实例中,但如果已存在具有相同键的另一条记录,则使用减法器(聚合方法调用中的第三个参数)删除旧记录。聚合方法调用)。
这都意味着我们的聚合器FixedSizePriorityQueue并不是用一个键聚合所有值,而是存储N种交易最多的股票类型的数量的移动总和。 每个传入条目包含迄今为止已售出的股票总数。 KTable 将为您提供有关哪些公司的股票当前交易量最大的信息,而不需要每次更新的滚动聚合。
我们学会了做两件重要的事情:
- 通过公共键对 KTable 中的值进行分组;
- 对这些分组值执行有用的操作,例如汇总和聚合。
了解如何执行这些操作对于理解通过 Kafka Streams 应用程序移动的数据的含义以及它携带的信息非常重要。
我们还汇集了本书前面讨论的一些关键概念。 在第 4 章中,我们讨论了容错、本地状态对于流应用程序的重要性。 本章的第一个示例演示了为什么本地状态如此重要——它使您能够跟踪您已经看到的信息。 本地访问避免了网络延迟,使应用程序性能更高且更不易出错。
执行任何汇总或聚合操作时,必须指定状态存储的名称。 汇总和聚合操作返回一个 KTable 实例,KTable 使用状态存储将旧结果替换为新结果。 正如您所看到的,并非所有更新都会沿着管道发送,这一点很重要,因为聚合操作旨在生成摘要信息。 如果不应用本地状态,KTable 将转发所有聚合和汇总结果。
接下来,我们将研究在特定时间段内执行聚合等操作 - 所谓的窗口操作。
5.3.2. 窗口操作
在上一节中,我们介绍了滑动卷积和聚合。 该应用程序连续汇总股票销售情况,然后汇总交易所交易量最大的五只股票。
有时,这种连续聚合和汇总结果是必要的。 有时您只需要在给定的时间段内执行操作。 例如,计算过去 10 分钟内特定公司股票的交易量。 或者有多少用户在过去 15 分钟内点击了新的广告横幅。 应用程序可以多次执行此类操作,但结果仅适用于指定的时间段(时间窗口)。
按买家计算外汇交易
在下一个示例中,我们将跟踪多个交易者(大型组织或聪明的个人金融家)之间的股票交易。
这种跟踪有两个可能的原因。 其中之一是需要了解市场领导者正在购买/出售什么。 如果这些大公司和成熟的投资者看到了机会,那么遵循他们的策略就有意义。 第二个原因是希望发现任何可能的非法内幕交易迹象。 为此,您需要分析大幅销售高峰与重要新闻稿的相关性。
此类跟踪包括以下步骤:
- 创建一个用于读取股票交易主题的流;
- 按买家 ID 和股票代码对传入记录进行分组。 调用groupBy方法返回KGroupedStream类的实例;
- KGroupedStream.windowedBy 方法返回仅限于时间窗口的数据流,这允许窗口聚合。 根据窗口类型,返回 TimeWindowedKStream 或 SessionWindowedKStream;
- 聚合操作的事务计数。 窗口数据流确定是否在此计数中考虑特定记录;
- 在开发过程中将结果写入主题或将结果输出到控制台。
该应用程序的拓扑结构很简单,但清晰的描述会很有帮助。 我们来看一下图。 5.11.
接下来我们看看窗口操作的功能以及相应的代码。
窗口类型
Kafka Streams 中有三种类型的窗口:
- 会期;
- “翻滚”(翻滚);
- 滑动/跳跃。
选择哪一种取决于您的业务需求。 翻滚和跳跃窗口是有时间限制的,而会话窗口则受到用户活动的限制——会话的持续时间仅取决于用户的活跃程度。 要记住的主要事情是所有窗口类型都基于条目的日期/时间戳,而不是系统时间。
接下来,我们使用每种窗口类型实现拓扑。 仅在第一个示例中给出完整的代码;对于其他类型的窗口,除了窗口操作的类型之外,不会发生任何变化。
会话窗口
会话窗口与所有其他类型的窗口有很大不同。 它们不仅受时间限制,还受用户活动(或您想要跟踪的实体的活动)的限制。 会话窗口由不活动时间段界定。
图 5.12 说明了会话窗口的概念。 较小的会话将与其左侧的会话合并。 右侧的会话将是单独的,因为它是在长时间不活动之后进行的。 会话窗口基于用户活动,但使用条目中的日期/时间戳来确定条目属于哪个会话。
使用会话窗口跟踪股票交易
让我们使用会话窗口来捕获有关交换交易的信息。 会话窗口的实现如清单 5.5 所示(可以在 src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java 中找到)。
您已经了解了此拓扑中的大部分操作,因此无需在此处再次查看它们。 但这里还有一些新元素,我们现在将讨论这些新元素。
任何 groupBy 操作通常都会执行某种聚合操作(聚合、汇总或计数)。 您可以使用运行总计执行累积聚合,也可以执行窗口聚合,后者考虑指定时间窗口内的记录。
清单 5.5 中的代码计算会话窗口内的事务数量。 在图中。 5.13 这些行动是逐步分析的。
通过调用 windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) 我们创建一个会话窗口,其不活动间隔为 20 秒,持久间隔为 15 分钟。 20 秒的空闲间隔意味着应用程序会将当前会话结束或开始后 20 秒内到达的任何条目包含到当前(活动)会话中。
接下来,我们指定需要在会话窗口中执行哪个聚合操作 - 在本例中为 count。 如果传入条目超出非活动窗口(日期/时间戳的任一侧),应用程序将创建一个新会话。 保留间隔意味着将会话维持一定的时间,并允许超出会话非活动期但仍可以附加的最新数据。 此外,合并产生的新会话的开始和结束对应于最早和最晚的日期/时间戳。
让我们看一下 count 方法中的一些条目,看看会话是如何工作的(表 5.1)。
当记录到达时,我们会查找具有相同密钥、结束时间小于当前日期/时间戳 - 不活动间隔、开始时间大于当前日期/时间戳 + 不活动间隔的现有会话。 考虑到这一点,表中有四个条目。 5.1 合并为单个会话,如下所示。
1. 记录1先到达,因此开始时间等于结束时间,均为00:00:00。
2. 接下来,条目 2 到达,我们查找结束时间不早于 23:59:55 且开始时间不晚于 00:00:35 的会话。 我们找到记录 1 并合并会话 1 和 2。我们获取会话 1 的开始时间(较早)和会话 2 的结束时间(较晚),以便我们的新会话于 00:00:00 开始并于 00 结束: 00:15。
3. 记录 3 到达,我们查找 00:00:30 到 00:01:10 之间的会话,但没有找到任何会话。 为密钥 123-345-654,FFBE 添加第二个会话,开始和结束时间为 00:00:50。
4. 记录 4 到达,我们正在查找 23:59:45 到 00:00:25 之间的会话。 这次同时找到了会话 1 和会话 2。这三个会话合二为一,开始时间为 00:00:00,结束时间为 00:00:15。
根据本节的描述,值得记住以下重要的细微差别:
- 会话不是固定大小的窗口。 会话的持续时间由给定时间段内的活动决定;
- 数据中的日期/时间戳确定事件是在现有会话中还是在空闲期间。
接下来我们将讨论下一种类型的窗户——“翻滚”窗户。
“翻滚”的窗户
翻滚窗口捕获特定时间段内发生的事件。 想象一下,您需要每 20 秒捕获某个公司的所有股票交易,因此您收集了该时间段内的所有事件。 在 20 秒间隔结束时,窗口会翻转并移至新的 20 秒观察间隔。 图 5.14 说明了这种情况。
如您所见,过去 20 秒内收到的所有事件都包含在窗口中。 这段时间结束后,会创建一个新窗口。
清单 5.6 显示的代码演示了如何使用滚动窗口每 20 秒捕获一次股票交易(位于 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中)。
通过对 TimeWindows.of 方法调用的这个小更改,您可以使用滚动窗口。 本示例没有调用until()方法,因此将使用默认的保留间隔24小时。
最后,是时候转到最后一个窗口选项 - “跳跃”窗口。
滑动(“跳跃”)窗户
滑动/跳跃窗口与翻滚窗口类似,但略有不同。 滑动窗口不会等到时间间隔结束才创建新窗口来处理最近的事件。 他们在小于窗口持续时间的等待间隔后开始新的计算。
为了说明滚动窗口和跳跃窗口之间的差异,让我们回到计算股票交易交易的示例。 我们的目标仍然是计算交易数量,但我们不想在更新计数器之前等待整个时间。 相反,我们将以更短的时间间隔更新计数器。 例如,我们仍然每 20 秒计算一次交易数量,但每 5 秒更新一次计数器,如图 5.15 所示。 XNUMX。 在这种情况下,我们最终得到三个具有重叠数据的结果窗口。
清单 5.7 显示了定义滑动窗口的代码(位于 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中)。
通过添加对 advanceBy() 方法的调用,可以将翻滚窗口转换为跳跃窗口。 在所示示例中,保存间隔为 15 分钟。
您在本节中了解了如何将聚合结果限制为时间窗口。 我特别希望您记住本节中的以下三件事:
- 会话窗口的大小不受时间段限制,而是受用户活动限制;
- “翻滚”窗口提供给定时间段内事件的概览;
- 跳跃窗口的持续时间是固定的,但它们更新频繁,并且可能在所有窗口中包含重叠的条目。
接下来,我们将学习如何将 KTable 转换回 KStream 以进行连接。
5.3.3. 连接 KStream 和 KTable 对象
在第 4 章中,我们讨论了连接两个 KStream 对象。 现在我们要学习如何连接KTable和KStream。 出于以下简单原因可能需要这样做。 KStream 是记录流,KTable 是记录更新流,但有时您可能希望使用 KTable 中的更新向记录流添加其他上下文。
让我们获取证券交易所交易数量的数据,并将其与相关行业的证券交易所新闻结合起来。 鉴于您已有的代码,您需要执行以下操作才能实现此目的。
- 将包含股票交易数量数据的 KTable 对象转换为 KStream,然后将 key 替换为该股票代码对应的行业板块的 key。
- 创建一个 KTable 对象,该对象从包含证券交易所新闻的主题中读取数据。 这个新的KTable将按行业部门进行分类。
- 将新闻更新与按行业划分的证券交易所交易数量信息联系起来。
现在让我们看看如何实施这个行动计划。
将 KTable 转换为 KStream
要将 KTable 转换为 KStream,您需要执行以下操作。
- 调用 KTable.toStream() 方法。
- 通过调用 KStream.map 方法,将 key 替换为行业名称,然后从 Windowed 实例中检索 TransactionSummary 对象。
我们将这些操作链接在一起,如下所示(代码可以在文件 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清单 5.8)。
因为我们正在执行 KStream.map 操作,所以返回的 KStream 实例在连接中使用时会自动重新分区。
我们已经完成了转换过程,接下来我们需要创建一个KTable对象来读取股票新闻。
创建股票新闻KTable
幸运的是,创建一个 KTable 对象只需要一行代码(代码可以在 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清单 5.9)。
值得注意的是,不需要指定 Serde 对象,因为设置中使用了字符串 Serdes。 此外,通过使用 EARLIEST 枚举,表会在最开始处填充记录。
现在我们可以继续最后一步 - 连接。
将新闻更新与交易计数数据连接起来
创建连接并不困难。 如果没有相关行业的股票新闻,我们将使用左连接(必要的代码可以在文件 src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java 中找到)(清单 5.10)。
这个 leftJoin 运算符非常简单。 与第 4 章中的联接不同,不使用 JoinWindow 方法,因为在执行 KStream-KTable 联接时,每个键在 KTable 中只有一个条目。 这样的连接没有时间限制:记录要么在KTable中,要么不存在。 主要结论:使用 KTable 对象,您可以通过更新频率较低的参考数据来丰富 KStream。
现在我们将研究一种更有效的方法来丰富 KStream 中的事件。
5.3.4. GlobalKTable 对象
正如您所看到的,需要丰富事件流或为其添加上下文。 在第 4 章中,您看到了两个 KStream 对象之间的连接,在上一节中,您看到了 KStream 和 KTable 之间的连接。 在所有这些情况下,在将键映射到新类型或值时,有必要重新分区数据流。 有时重新分区是显式完成的,有时 Kafka Streams 自动完成。 重新分区是必要的,因为键已经改变,记录必须以新的部分结束,否则连接将是不可能的(这在第 4 章 4.2.4 小节的“重新分区数据”一节中讨论过)。
重新分区是有代价的
重新分区需要成本——创建中间主题、在另一个主题中存储重复数据的额外资源成本; 它还意味着由于从该主题写入和读取而导致延迟增加。 此外,如果您需要跨多个方面或维度进行联接,则必须链接联接、使用新键映射记录,然后再次运行重新分区过程。
连接到较小的数据集
在某些情况下,要连接的参考数据量相对较小,因此可以轻松地将其完整副本安装在每个节点上。 对于这种情况,Kafka Streams 提供了 GlobalKTable 类。
GlobalKTable 实例是唯一的,因为应用程序将所有数据复制到每个节点。 由于所有数据都存在于每个节点上,因此无需通过引用数据键对事件流进行分区,以便它可供所有分区使用。 您还可以使用 GlobalKTable 对象进行无键联接。 让我们回到前面的一个示例来演示此功能。
将 KStream 对象连接到 GlobalKTable 对象
在第 5.3.2 小节中,我们对买家的交易所交易进行了窗口聚合。 聚合的结果如下所示:
{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16
虽然这些结果达到了目的,但如果还显示客户的姓名和完整的公司名称,那就更有用了。 要添加客户名称和公司名称,您可以执行普通连接,但需要执行两个键映射和重新分区。 使用 GlobalKTable,您可以避免此类操作的成本。
为此,我们将使用清单 5.11 中的 countStream 对象(相应的代码可以在 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)并将其连接到两个 GlobalKTable 对象。
我们之前已经讨论过这个问题,所以我不再重复。 但我注意到,为了可读性,toStream().map 函数中的代码被抽象为函数对象,而不是内联 lambda 表达式。
下一步是声明 GlobalKTable 的两个实例(显示的代码可以在文件 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)(清单 5.12)。
请注意,主题名称是使用枚举类型描述的。
现在我们已经准备好了所有组件,剩下的就是编写连接代码(可以在文件 src/main/java/bbejeck/chapter_5/GlobalKTableExample.java 中找到)(清单 5.13)。
尽管此代码中有两个联接,但它们是链接的,因为它们的结果都没有单独使用。 结果显示在整个操作结束时。
当您运行上述连接操作时,您将得到如下结果:
{customer='Barney, Smith' company="Exxon", transactions= 17}
本质没有改变,但这些结果看起来更加清晰。
如果倒计时到第 4 章,您已经看到了几种实际的连接类型。 它们列在表中。 5.2. 该表反映了 Kafka Streams 1.0.0 版本以来的连接能力; 未来版本中可能会发生一些变化。
总而言之,让我们回顾一下基础知识:您可以使用本地状态连接事件流 (KStream) 和更新流 (KTable)。 或者,如果参考数据的大小不太大,则可以使用 GlobalKTable 对象。 GlobalKTables 将所有分区复制到每个 Kafka Streams 应用程序节点,确保无论键对应哪个分区,所有数据都可用。
接下来我们将看到 Kafka Streams 功能,借助该功能,我们可以观察状态变化,而无需使用 Kafka 主题中的数据。
5.3.5。 可查询状态
我们已经执行了一些涉及状态的操作,并且总是将结果输出到控制台(用于开发目的)或将它们写入主题(用于生产目的)。 将结果写入主题时,您必须使用 Kafka 消费者来查看它们。
从这些主题中读取数据可以被视为一种物化视图。 出于我们的目的,我们可以使用维基百科中物化视图的定义:“......包含查询结果的物理数据库对象。 例如,它可以是远程数据的本地副本,或者表或联接结果的行和/或列的子集,或者通过聚合获得的汇总表”(https://en.wikipedia.org/wiki /物化视图)。
Kafka Streams 还允许您在状态存储上运行交互式查询,从而允许您直接读取这些物化视图。 需要注意的是,对状态存储的查询是只读操作。 这确保您不必担心应用程序处理数据时意外地导致状态不一致。
直接查询状态存储的能力很重要。 这意味着您可以创建仪表板应用程序,而无需首先从 Kafka 使用者获取数据。 由于不需要再次写入数据,它还提高了应用程序的效率:
- 由于数据的本地化,可以快速访问它们;
- 由于数据不会写入外部存储,因此消除了数据重复。
我希望您记住的主要事情是您可以直接从应用程序内查询状态。 这给您带来的机会怎么强调都不为过。 您可以查询状态存储以获得相同的结果,而不是使用来自 Kafka 的数据并将记录存储在应用程序的数据库中。 对状态存储的直接查询意味着更少的代码(没有消费者)和更少的软件(不需要数据库表来存储结果)。
我们在本章中介绍了相当多的内容,因此我们暂时停止对状态存储的交互式查询的讨论。 但不用担心:在第 9 章中,我们将创建一个带有交互式查询的简单仪表板应用程序。 它将使用本章和前面章节中的一些示例来演示交互式查询以及如何将它们添加到 Kafka Streams 应用程序中。
总结
- KStream 对象表示事件流,类似于数据库中的插入。 KTable 对象代表更新流,更像是对数据库的更新。 KTable 对象的大小不会增长,旧记录会被新记录替换。
- 聚合操作需要 KTable 对象。
- 使用窗口操作,您可以将聚合数据拆分为时间段。
- 借助 GlobalKTable 对象,您可以在应用程序中的任何位置访问参考数据,而不管分区如何。
- KStream、KTable 和 GlobalKTable 对象之间的连接是可能的。
到目前为止,我们专注于使用高级 KStream DSL 构建 Kafka Streams 应用程序。 尽管高级方法允许您创建简洁的程序,但使用它需要权衡。 使用 DSL KStream 意味着通过减少控制程度来提高代码的简洁性。 在下一章中,我们将研究低级处理程序节点 API 并尝试其他权衡。 这些程序将比以前更长,但我们将能够创建几乎任何我们可能需要的处理程序节点。
→ 有关本书的更多详细信息,请访问
→ 对于 Habrozhiteli 使用优惠券可享受 25% 折扣 - 卡夫卡流
→ 支付纸质版书籍的费用后,将通过电子邮件发送电子书。
来源: habr.com