Redis Stream是Redis 5.0版本中引入的一种新的抽象数据类型
从概念上讲,Redis Stream 是一个可以添加条目的列表。 每个条目都有一个唯一的标识符。 默认情况下,ID 是自动生成的,并包含时间戳。 因此,您可以随时间查询记录范围,或者在新数据到达流时接收新数据,就像 Unix“tail -f”命令读取日志文件并在等待新数据时冻结一样。 请注意,多个客户端可以同时侦听一个线程,就像许多“tail -f”进程可以同时读取文件而不会相互冲突一样。
为了了解新数据类型的所有优势,让我们快速浏览一下长期存在的 Redis 结构,这些结构部分复制了 Redis Stream 的功能。
Redis 发布/订阅
Redis Pub/Sub 是一个简单的消息传递系统,已内置到您的键值存储中。 然而,简单性是有代价的:
- 如果发布者由于某种原因失败,那么他就会失去所有订阅者
- 发布者需要知道其所有订阅者的确切地址
- 如果数据发布速度快于处理速度,发布者可能会导致订阅者工作负担过重
- 消息在发布后立即从发布者的缓冲区中删除,无论消息被传递给多少订阅者以及他们处理该消息的速度如何。
- 所有订阅者将同时收到该消息。 订阅者本身必须以某种方式就处理同一消息的顺序达成一致。
- 没有内置机制来确认订阅者已成功处理消息。 如果订阅者收到消息并在处理过程中崩溃,发布者将不会知道这一情况。
Redis列表
Redis List 是一种支持阻塞读命令的数据结构。 您可以从列表的开头或结尾添加和阅读消息。 基于这个结构,你可以为你的分布式系统制作一个好的堆栈或队列,在大多数情况下这就足够了。 与 Redis Pub/Sub 的主要区别:
- 该消息被传递给一个客户端。 第一个读阻塞的客户端将首先接收数据。
- Clint 必须亲自启动每条消息的读取操作。 List 对客户一无所知。
- 消息将被存储,直到有人阅读或明确删除它们为止。 如果将Redis服务器配置为将数据刷新到磁盘,那么系统的可靠性将显着提高。
流简介
向流中添加条目
团队 XADD 向流中添加一个新条目。 一条记录不仅仅是一个字符串,它由一个或多个键值对组成。 因此,每个条目都已经结构化并且类似于 CSV 文件的结构。
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
在上面的示例中,我们向名称(键)“mystream”的流添加两个字段:“sensor-id”和“Temperature”,其值分别为“1234”和“19.8”。 作为第二个参数,该命令采用将分配给条目的标识符 - 该标识符唯一标识流中的每个条目。 然而,在本例中我们传递了 * ,因为我们希望 Redis 为我们生成一个新的 ID。 每个新的ID都会增加。 因此,每个新条目将具有相对于先前条目更高的标识符。
标识符格式
命令返回的条目ID XADD,由两部分组成:
{millisecondsTime}-{sequenceNumber}
毫秒时间 — Unix 时间(以毫秒为单位)(Redis 服务器时间)。 但是,如果当前时间等于或小于先前记录的时间,则使用先前记录的时间戳。 因此,如果服务器时间回到过去,新的标识符仍将保留增量属性。
序列号 用于在同一毫秒内创建的记录。 序列号 相对于前一个条目将增加 1。 因为 序列号 大小为 64 位,那么实际上您不应该遇到一毫秒内可以生成的记录数量的限制。
乍一看,此类标识符的格式可能看起来很奇怪。 不信任的读者可能想知道为什么时间是标识符的一部分。 原因是Redis流支持按ID进行范围查询。 由于标识符与创建记录的时间相关联,因此可以查询时间范围。 我们在看命令的时候会看一个具体的例子 异界.
如果由于某种原因用户需要指定自己的标识符,例如与某个外部系统关联,那么我们可以将其传递给命令 XADD 而不是*,如下所示:
> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2
请注意,在这种情况下,您必须自己监控 ID 增量。 在我们的示例中,最小标识符是“0-1”,因此该命令不会接受等于或小于“0-1”的另一个标识符。
> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
每个流的记录数
只需使用命令即可获取流中的记录数 XLEN。 对于我们的示例,此命令将返回以下值:
> XLEN somestream
(integer) 2
范围查询 - XRANGE 和 XREVRANGE
要按范围请求数据,我们需要指定两个标识符 - 范围的开始和结束。 返回的范围将包括所有元素,包括边界。 还有两个特殊标识符“-”和“+”,分别表示流中最小(第一条记录)和最大(最后一条记录)的标识符。 下面的示例将列出所有流条目。
> XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
每个返回的记录都是一个由两个元素组成的数组:标识符和键值对列表。 我们已经说过记录标识符与时间相关。 因此,我们可以请求特定时间段的范围。 但是,我们可以在请求中指定不完整的标识符,而只指定 Unix 时间,省略与 序列号。 标识符的省略部分将在范围开始处自动设置为零,并在范围结束处设置为最大可能值。 下面的示例说明了如何请求两毫秒的范围。
> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
我们在这个范围内只有一个条目,但在实际数据集中返回的结果可能会很大。 为此原因 异界 支持 COUNT 选项。 通过指定数量,我们可以简单地获取前N条记录。 如果我们需要获取接下来的N条记录(分页),我们可以使用最后收到的ID,增加它 序列号 按一再问。 让我们在下面的示例中看看这一点。 我们开始添加 10 个元素 XADD (假设 mystream 已填充 10 个元素)。 为了开始迭代,每个命令获取 2 个元素,我们从完整范围开始,但 COUNT 等于 2。
> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
要继续迭代接下来的两个元素,我们需要选择最后收到的 ID,即 1519073279157-0,并加 1 序列号.
生成的 ID(在本例中为 1519073279157-1)现在可以用作下一次调用的新范围参数开始 异界:
> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
2) 1) "foo"
2) "value_3"
2) 1) 1519073281432-0
2) 1) "foo"
2) "value_4"
等等。 因为复杂性 异界 搜索的时间复杂度为 O(log(N)),然后返回 M 个元素的时间复杂度为 O(M),那么每个迭代步骤都很快。 因此,使用 异界 流可以有效地迭代。
团队 XREVRANGE 是等价的 异界,但以相反的顺序返回元素:
> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
2) 1) "foo"
2) "value_10"
请注意该命令 XREVRANGE 以相反的顺序接受范围参数开始和停止。
使用 XREAD 读取新条目
通常,任务是订阅流并仅接收新消息。 这个概念看起来可能类似于 Redis Pub/Sub 或阻塞 Redis List,但在如何使用 Redis Stream 方面有根本的区别:
- 默认情况下,每条新消息都会传递给每个订阅者。 此行为与阻塞 Redis 列表不同,在阻塞 Redis 列表中,新消息只能由一个订阅者读取。
- 在 Redis Pub/Sub 中,所有消息都会被遗忘并且永远不会持久,而在 Stream 中,所有消息都会无限期保留(除非客户端明确导致删除)。
- Redis Stream 允许您区分对一个流中的消息的访问。 特定订阅者只能看到他们的个人消息历史记录。
您可以使用以下命令订阅线程并接收新消息 XREAD。 这比稍微复杂一点 异界,所以我们首先从更简单的示例开始。
> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1519073278252-0
2) 1) "foo"
2) "value_1"
2) 1) 1519073279157-0
2) 1) "foo"
2) "value_2"
上面的例子展示了一个非阻塞的形式 XREAD。 请注意,COUNT 选项是可选的。 事实上,唯一需要的命令选项是 STREAMS 选项,它指定流列表以及相应的最大标识符。 我们编写了“STREAMS mystream 0” - 我们希望接收标识符大于“0-0”的 mystream 流的所有记录。 从示例中可以看到,该命令返回线程的名称,因为我们可以同时订阅多个线程。 例如,我们可以编写“STREAMS mystream otherstream 0 0”。 请注意,在 STREAMS 选项之后,我们需要首先提供所有所需流的名称,然后才是标识符列表。
在这个简单的形式中,与以下命令相比,该命令没有做任何特别的事情 异界。 然而,有趣的是,我们可以轻松地将 XREAD 对于阻塞命令,指定 BLOCK 参数:
> XREAD BLOCK 0 STREAMS mystream $
在上面的示例中,指定了一个新的 BLOCK 选项,超时时间为 0 毫秒(这意味着无限期等待)。 此外,没有传递流 mystream 的常用标识符,而是传递了一个特殊标识符 $。 这个特殊的标识符意味着 XREAD 必须使用 mystream 中的最大标识符作为标识符。 因此,从我们开始收听的那一刻起,我们只会收到新消息。 在某些方面,这类似于 Unix 的“tail -f”命令。
请注意,当使用 BLOCK 选项时,我们不一定需要使用特殊标识符 $。 我们可以使用流中存在的任何标识符。 如果团队可以立即满足我们的请求而不会阻塞,那么它就会这样做,否则就会阻塞。
阻塞 XREAD 也可以同时监听多个线程,你只需要指定它们的名称。 在这种情况下,该命令将返回接收数据的第一个流的记录。 为给定线程阻塞的第一个订阅者将首先接收数据。
消费群体
在某些任务中,我们希望限制订阅者对一个线程内的消息的访问。 这可能有用的一个示例是消息队列,其中工作人员将从线程接收不同的消息,从而允许扩展消息处理。
如果我们假设有三个订阅者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的线程,那么消息的服务将如下图所示:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
为了达到这种效果,Redis Stream 使用了一个称为 Consumer Group 的概念。 这个概念类似于伪订阅者,它从流中接收数据,但实际上由一个组内的多个订阅者提供服务,提供一定的保证:
- 每条消息都会传递给组内的不同订阅者。
- 在组内,订阅者通过其名称进行标识,该名称是区分大小写的字符串。 如果订阅者暂时退出群组,他可以使用自己的唯一名称恢复到群组。
- 每个消费者群体都遵循“第一条未读消息”的概念。 当订阅者请求新消息时,它只能接收以前从未传递给组内任何订阅者的消息。
- 有一个命令可以明确确认消息已被订阅者成功处理。 在调用该命令之前,请求的消息将保持“待处理”状态。
- 在消费者组中,每个订阅者都可以请求发送给他但尚未处理的消息的历史记录(处于“待处理”状态)
从某种意义上来说,群体的状态可以表示为:
+----------------------------------------+
| consumer_group_name: mygroup
| consumer_group_stream: somekey
| last_delivered_id: 1292309234234-92
|
| consumers:
| "consumer-1" with pending messages
| 1292309234234-4
| 1292309234232-8
| "consumer-42" with pending messages
| ... (and so forth)
+----------------------------------------+
现在是时候熟悉 Consumer Group 的主要命令了,即:
- X集团 用于创建、销毁和管理组
- 瑞德集团 用于通过组读取流
- XACK - 此命令允许订阅者将消息标记为已成功处理
创建消费者组
假设 mystream 已经存在。 然后组创建命令将如下所示:
> XGROUP CREATE mystream mygroup $
OK
创建组时,我们必须传递一个标识符,组将从该标识符开始接收消息。 如果我们只想接收所有新消息,那么我们可以使用特殊标识符 $ (如上面的示例所示)。 如果指定 0 而不是特殊标识符,则线程中的所有消息都可供该组使用。
现在组已创建,我们可以立即开始使用命令读取消息 瑞德集团。 这个命令非常类似于 XREAD 并支持可选的 BLOCK 选项。 但是,有一个必需的 GROUP 选项必须始终使用两个参数指定:组名称和订户名称。 还支持 COUNT 选项。
在阅读该主题之前,让我们先放一些消息:
> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0
现在让我们尝试通过组读取此流:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
上述命令逐字读取如下:
“我,订阅者 Alice,mygroup 的成员,想要从 mystream 中读取一条以前从未发送给任何人的消息。”
每次订阅者对组执行操作时,它都必须提供其名称,以在组中唯一标识自己。 上述命令中还有一个非常重要的细节——特殊标识符“">”。 这个特殊标识符会过滤消息,只留下那些以前从未发送过的消息。
此外,在特殊情况下,您可以指定真实标识符,例如 0 或任何其他有效标识符。 在这种情况下命令 瑞德集团 将返回状态为“待处理”的消息历史记录,这些消息已发送给指定订阅者 (Alice),但尚未使用命令确认 XACK.
我们可以通过立即指定 ID 0 来测试此行为,无需选项 COUNT个。 我们只会看到一条待处理的消息,即苹果消息:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1526569495631-0
2) 1) "message"
2) "apple"
但是,如果我们确认消息已成功处理,则它将不再显示:
> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
现在轮到鲍勃读一些东西了:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
2) 1) 1526569506935-0
2) 1) "message"
2) "strawberry"
鲍勃是我的群组的成员,他请求的消息不超过两条。 该命令仅报告由于特殊标识符“">”而未发送的消息。 正如您所看到的,消息“apple”不会显示,因为它已经传递给Alice,因此Bob 收到“orange”和“strawberry”。
这样,Alice、Bob 和该组的任何其他订阅者都可以从同一流中读取不同的消息。 他们还可以读取未处理消息的历史记录或将消息标记为已处理。
有几点需要记住:
- 一旦订阅者认为该消息是命令 瑞德集团,该消息进入“待处理”状态并分配给该特定订阅者。 其他群组订阅者将无法阅读此消息。
- 订阅者会在第一次提及时自动创建,无需显式创建它们。
- 同 瑞德集团 您可以同时读取来自多个不同线程的消息,但是要使其工作,您需要首先使用以下命令为每个线程创建具有相同名称的组 X集团
故障后恢复
订户可以从故障中恢复并重新读取其处于“待处理”状态的消息列表。 然而,在现实世界中,订阅者最终可能会失败。 如果订阅者无法从故障中恢复,订阅者的卡住消息会发生什么情况?
Consumer Group 提供的功能正是用于此类情况 - 当您需要更改消息的所有者时。
您需要做的第一件事是调用命令 小鹏,显示组中状态为“待处理”的所有消息。 在最简单的形式中,仅使用两个参数调用该命令:线程名称和组名称:
> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
2) "2"
该团队显示了整个组和每个订阅者的未处理消息数。 我们只有鲍勃有两条未完成的消息,因为爱丽丝请求的唯一消息已被确认 XACK.
我们可以使用更多参数请求更多信息:
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - 标识符范围(可以使用“-”和“+”)
{count} — 投递尝试次数
{consumer-name} - 组名称
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
2) 1) 1526569506935-0
2) "Bob"
3) (integer) 74170458
4) (integer) 1
现在我们有了每条消息的详细信息:ID、订阅者姓名、空闲时间(以毫秒为单位)以及最后的传递尝试次数。 我们有两条来自 Bob 的消息,它们已经空闲了 74170458 毫秒,大约 20 小时。
请注意,没有人可以阻止我们简单地使用以下命令来检查消息的内容: 异界.
> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
我们只需在参数中重复相同的标识符两次即可。 现在我们有了一些想法,Alice 可能会认为,在 20 小时的停机时间之后,Bob 可能无法恢复,是时候查询这些消息并恢复为 Bob 处理它们了。 为此,我们使用命令 索赔:
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
使用此命令,我们可以通过将所有者更改为{consumer}来接收尚未处理的“外部”消息。 不过,我们也可以提供一个最小空闲时间{min-idle-time}。 这有助于避免两个客户端尝试同时更改同一消息的所有者的情况:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
第一个客户将重置停机时间并增加交货柜台。 因此第二个客户端将无法请求它。
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
Alice 已成功认领该消息,她现在可以处理该消息并确认该消息。
从上面的示例中,您可以看到成功的请求返回消息本身的内容。 然而,这不是必要的。 JUSTID 选项可用于仅返回消息 ID。 如果您对消息的详细信息不感兴趣并且想要提高系统性能,这非常有用。
送货柜台
您在输出中看到的计数器 小鹏 是每条消息的传递次数。 这样的计数器以两种方式递增:当通过以下方式成功请求消息时: 索赔 或者当使用呼叫时 瑞德集团.
有些消息被多次传递是正常的。 最主要的是所有消息最终都会被处理。 有时,处理消息时会出现问题,因为消息本身已损坏,或者消息处理导致处理程序代码中出现错误。 在这种情况下,可能没有人能够处理该消息。 由于我们有一个交付尝试计数器,因此我们可以使用该计数器来检测此类情况。 因此,一旦传递计数达到您指定的高数字,将此类消息放在另一个线程并向系统管理员发送通知可能会更明智。
线程状态
团队 信佛 用于请求有关线程及其组的各种信息。 例如,基本命令如下所示:
> XINFO STREAM mystream
1) length
2) (integer) 13
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1524494395530-0
2) 1) "a"
2) "1"
3) "b"
4) "2"
11) last-entry
12) 1) 1526569544280-0
2) 1) "message"
2) "banana"
上面的命令显示有关指定流的一般信息。 现在是一个稍微复杂一点的例子:
> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 2
5) pending
6) (integer) 2
2) 1) name
2) "some-other-group"
3) consumers
4) (integer) 1
5) pending
6) (integer) 0
上面的命令显示指定线程的所有组的一般信息
> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 1
5) idle
6) (integer) 9104628
2) 1) name
2) "Bob"
3) pending
4) (integer) 1
5) idle
6) (integer) 83841983
上面的命令显示指定流和组的所有订阅者的信息。
如果您忘记了命令语法,只需向命令本身寻求帮助:
> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}.
3) GROUPS {key} -- Show the stream consumer groups.
4) STREAM {key} -- Show information about the stream.
5) HELP -- Print this help.
流大小限制
许多应用程序不希望永远将数据收集到流中。 每个线程允许的最大消息数通常很有用。 在其他情况下,当达到指定的线程大小时,将所有消息从一个线程移动到另一个持久存储是很有用的。 您可以使用命令中的 MAXLEN 参数限制流的大小 XADD:
> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
当使用MAXLEN时,旧记录在达到指定长度时会自动删除,因此流具有恒定的大小。 然而,这种情况下的修剪在 Redis 内存中并不是以最有效的方式进行的。 您可以通过以下方式改善这种情况:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
上面示例中的 ~ 参数意味着我们不一定需要将流长度限制为特定值。 在我们的示例中,这可以是大于或等于 1000 的任何数字(例如 1000、1010 或 1030)。 我们只是明确指定我们希望流存储至少 1000 条记录。 这使得 Redis 内部的内存管理更加高效。
还有一个单独的团队 XTRIM,它做同样的事情:
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
持久存储和复制
Redis Stream 异步复制到从节点并保存到 AOF(所有数据的快照)和 RDB(所有写入操作的日志)等文件中。 还支持消费者组状态的复制。 因此,如果一条消息在主节点上处于“待处理”状态,那么在从节点上该消息将具有相同的状态。
从流中删除单个元素
有一个特殊的命令可以删除消息 西德尔。 该命令获取线程的名称,后跟要删除的消息 ID:
> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
2) 1) "value"
2) "2"
2) 1) 1526655000369-0
2) 1) "value"
2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
2) 1) "value"
2) "3"
使用该命令时,需要考虑到实际内存不会立即释放。
零长度流
流和其他 Redis 数据结构之间的区别在于,当其他数据结构中不再包含元素时,作为副作用,数据结构本身将从内存中删除。 因此,例如,当 ZREM 调用删除最后一个元素时,排序集将被完全删除。 相反,即使内部没有任何元素,线程也可以保留在内存中。
结论
Redis Stream 非常适合创建消息代理、消息队列、统一日志记录和历史记录聊天系统。