Redis Stream - 消息系统的可靠性和可扩展性

Redis Stream - 消息系统的可靠性和可扩展性

Redis Stream是Redis 5.0版本中引入的一种新的抽象数据类型
从概念上讲,Redis Stream 是一个可以添加条目的列表。 每个条目都有一个唯一的标识符。 默认情况下,ID 是自动生成的,并包含时间戳。 因此,您可以随时间查询记录范围,或者在新数据到达流时接收新数据,就像 Unix“tail -f”命令读取日志文件并在等待新数据时冻结一样。 请注意,多个客户端可以同时侦听一个线程,就像许多“tail -f”进程可以同时读取文件而不会相互冲突一样。

为了了解新数据类型的所有优势,让我们快速浏览一下长期存在的 Redis 结构,这些结构部分复制了 Redis Stream 的功能。

Redis 发布/订阅

Redis Pub/Sub 是一个简单的消息传递系统,已内置到您的键值存储中。 然而,简单性是有代价的:

  • 如果发布者由于某种原因失败,那么他就会失去所有订阅者
  • 发布者需要知道其所有订阅者的确切地址
  • 如果数据发布速度快于处理速度,发布者可能会导致订阅者工作负担过重
  • 消息在发布后立即从发布者的缓冲区中删除,无论消息被传递给多少订阅者以及他们处理该消息的速度如何。
  • 所有订阅者将同时收到该消息。 订阅者本身必须以某种方式就处理同一消息的顺序达成一致。
  • 没有内置机制来确认订阅者已成功处理消息。 如果订阅者收到消息并在处理过程中崩溃,发布者将不会知道这一情况。

Redis列表

Redis List 是一种支持阻塞读命令的数据结构。 您可以从列表的开头或结尾添加和阅读消息。 基于这个结构,你可以为你的分布式系统制作一个好的堆栈或队列,在大多数情况下这就足够了。 与 Redis Pub/Sub 的主要区别:

  • 该消息被传递给一个客户端。 第一个读阻塞的客户端将首先接收数据。
  • Clint 必须亲自启动每条消息的读取操作。 List 对客户一无所知。
  • 消息将被存储,直到有人阅读或明确删除它们为止。 如果将Redis服务器配置为将数据刷新到磁盘,那么系统的可靠性将显着提高。

流简介

向流中添加条目

团队 XADD 向流中添加一个新条目。 一条记录不仅仅是一个字符串,它由一个或多个键值对组成。 因此,每个条目都已经结构化并且类似于 CSV 文件的结构。

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

在上面的示例中,我们向名称(键)“mystream”的流添加两个字段:“sensor-id”和“Temperature”,其值分别为“1234”和“19.8”。 作为第二个参数,该命令采用将分配给条目的标识符 - 该标识符唯一标识流中的每个条目。 然而,在本例中我们传递了 * ,因为我们希望 Redis 为我们生成一个新的 ID。 每个新的ID都会增加。 因此,每个新条目将具有相对于先前条目更高的标识符。

标识符格式

命令返回的条目ID XADD,由两部分组成:

{millisecondsTime}-{sequenceNumber}

毫秒时间 — Unix 时间(以毫秒为单位)(Redis 服务器时间)。 但是,如果当前时间等于或小于先前记录的时间,则使用先前记录的时间戳。 因此,如果服务器时间回到过去,新的标识符仍将保留增量属性。

序列号 用于在同一毫秒内创建的记录。 序列号 相对于前一个条目将增加 1。 因为 序列号 大小为 64 位,那么实际上您不应该遇到一毫秒内可以生成的记录数量的限制。

乍一看,此类标识符的格式可能看起来很奇怪。 不信任的读者可能想知道为什么时间是标识符的一部分。 原因是Redis流支持按ID进行范围查询。 由于标识符与创建记录的时间相关联,因此可以查询时间范围。 我们在看命令的时候会看一个具体的例子 异界.

如果由于某种原因用户需要指定自己的标识符,例如与某个外部系统关联,那么我们可以将其传递给命令 XADD 而不是*,如下所示:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

请注意,在这种情况下,您必须自己监控 ID 增量。 在我们的示例中,最小标识符是“0-1”,因此该命令不会接受等于或小于“0-1”的另一个标识符。

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

每个流的记录数

只需使用命令即可获取流中的记录数 XLEN。 对于我们的示例,此命令将返回以下值:

> XLEN somestream
(integer) 2

范围查询 - XRANGE 和 XREVRANGE

要按范围请求数据,我们需要指定两个标识符 - 范围的开始和结束。 返回的范围将包括所有元素,包括边界。 还有两个特殊标识符“-”和“+”,分别表示流中最小(第一条记录)和最大(最后一条记录)的标识符。 下面的示例将列出所有流条目。

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

每个返回的记录都是一个由两个元素组成的数组:标识符和键值对列表。 我们已经说过记录标识符与时间相关。 因此,我们可以请求特定时间段的范围。 但是,我们可以在请求中指定不完整​​的标识符,而只指定 Unix 时间,省略与 序列号。 标识符的省略部分将在范围开始处自动设置为零,并在范围结束处设置为最大可能值。 下面的示例说明了如何请求两毫秒的范围。

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

我们在这个范围内只有一个条目,但在实际数据集中返回的结果可能会很大。 为此原因 异界 支持 COUNT 选项。 通过指定数量,我们可以简单地获取前N条记录。 如果我们需要获取接下来的N条记录(分页),我们可以使用最后收到的ID,增加它 序列号 按一再问。 让我们在下面的示例中看看这一点。 我们开始添加 10 个元素 XADD (假设 mystream 已填充 10 个元素)。 为了开始迭代,每个命令获取 2 个元素,我们从完整范围开始,但 COUNT 等于 2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

要继续迭代接下来的两个元素,我们需要选择最后收到的 ID,即 1519073279157-0,并加 1 序列号.
生成的 ID(在本例中为 1519073279157-1)现在可以用作下一次调用的新范围参数开始 异界:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

等等。 因为复杂性 异界 搜索的时间复杂度为 O(log(N)),然后返回 M 个元素的时间复杂度为 O(M),那么每个迭代步骤都很快。 因此,使用 异界 流可以有效地迭代。

团队 XREVRANGE 是等价的 异界,但以相反的顺序返回元素:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

请注意该命令 XREVRANGE 以相反的顺序接受范围参数开始和停止。

使用 XREAD 读取新条目

通常,任务是订阅流并仅接收新消息。 这个概念看起来可能类似于 Redis Pub/Sub 或阻塞 Redis List,但在如何使用 Redis Stream 方面有根本的区别:

  1. 默认情况下,每条新消息都会传递给每个订阅者。 此行为与阻塞 Redis 列表不同,在阻塞 Redis 列表中,新消息只能由一个订阅者读取。
  2. 在 Redis Pub/Sub 中,所有消息都会被遗忘并且永远不会持久,而在 Stream 中,所有消息都会无限期保留(除非客户端明确导致删除)。
  3. Redis Stream 允许您区分对一个流中的消息的访问。 特定订阅者只能看到他们的个人消息历史记录。

您可以使用以下命令订阅线程并接收新消息 XREAD。 这比稍微复杂一点 异界,所以我们首先从更简单的示例开始。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

上面的例子展示了一个非阻塞的形式 XREAD。 请注意,COUNT 选项是可选的。 事实上,唯一需要的命令选项是 STREAMS 选项,它指定流列表以及相应的最大标识符。 我们编写了“STREAMS mystream 0” - 我们希望接收标识符大于“0-0”的 mystream 流的所有记录。 从示例中可以看到,该命令返回线程的名称,因为我们可以同时订阅多个线程。 例如,我们可以编写“STREAMS mystream otherstream 0 0”。 请注意,在 STREAMS 选项之后,我们需要首先提供所有所需流的名称,然后才是标识符列表。

在这个简单的形式中,与以下命令相比,该命令没有做任何特别的事情 异界。 然而,有趣的是,我们可以轻松地将 XREAD 对于阻塞命令,指定 BLOCK 参数:

> XREAD BLOCK 0 STREAMS mystream $

在上面的示例中,指定了一个新的 BLOCK 选项,超时时间为 0 毫秒(这意味着无限期等待)。 此外,没有传递流 mystream 的常用标识符,而是传递了一个特殊标识符 $。 这个特殊的标识符意味着 XREAD 必须使用 mystream 中的最大标识符作为标识符。 因此,从我们开始收听的那一刻起,我们只会收到新消息。 在某些方面,这类似于 Unix 的“tail -f”命令。

请注意,当使用 BLOCK 选项时,我们不一定需要使用特殊标识符 $。 我们可以使用流中存在的任何标识符。 如果团队可以立即满足我们的请求而不会阻塞,那么它就会这样做,否则就会阻塞。

阻塞 XREAD 也可以同时监听多个线程,你只需要指定它们的名称。 在这种情况下,该命令将返回接收数据的第一个流的记录。 为给定线程阻塞的第一个订阅者将首先接收数据。

消费群体

在某些任务中,我们希望限制订阅者对一个线程内的消息的访问。 这可能有用的一个示例是消息队列,其中工作人员将从线程接收不同的消息,从而允许扩展消息处理。

如果我们假设有三个订阅者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的线程,那么消息的服务将如下图所示:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了达到这种效果,Redis Stream 使用了一个称为 Consumer Group 的概念。 这个概念类似于伪订阅者,它从流中接收数据,但实际上由一个组内的多个订阅者提供服务,提供一定的保证:

  1. 每条消息都会传递给组内的不同订阅者。
  2. 在组内,订阅者通过其名称进行标识,该名称是区分大小写的字符串。 如果订阅者暂时退出群组,他可以使用自己的唯一名称恢复到群组。
  3. 每个消费者群体都遵循“第一条未读消息”的概念。 当订阅者请求新消息时,它只能接收以前从未传递给组内任何订阅者的消息。
  4. 有一个命令可以明确确认消息已被订阅者成功处理。 在调用该命令之前,请求的消息将保持“待处理”状态。
  5. 在消费者组中,每个订阅者都可以请求发送给他但尚未处理的消息的历史记录(处于“待处理”状态)

从某种意义上来说,群体的状态可以表示为:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

现在是时候熟悉 Consumer Group 的主要命令了,即:

  • X集团 用于创建、销毁和管理组
  • 瑞德集团 用于通过组读取流
  • XACK - 此命令允许订阅者将消息标记为已成功处理

创建消费者组

假设 mystream 已经存在。 然后组创建命令将如下所示:

> XGROUP CREATE mystream mygroup $
OK

创建组时,我们必须传递一个标识符,组将从该标识符开始接收消息。 如果我们只想接收所有新消息,那么我们可以使用特殊标识符 $ (如上面的示例所示)。 如果指定 0 而不是特殊标识符,则线程中的所有消息都可供该组使用。

现在组已创建,我们可以立即开始使用命令读取消息 瑞德集团。 这个命令非常类似于 XREAD 并支持可选的 BLOCK 选项。 但是,有一个必需的 GROUP 选项必须始终使用两个参数指定:组名称和订户名称。 还支持 COUNT 选项。

在阅读该主题之前,让我们先放一些消息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

现在让我们尝试通过组读取此流:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

上述命令逐字读取如下:

“我,订阅者 Alice,mygroup 的成员,想要从 mystream 中读取一条以前从未发送给任何人的消息。”

每次订阅者对组执行操作时,它都必须提供其名称,以在组中唯一标识自己。 上述命令中还有一个非常重要的细节——特殊标识符“">”。 这个特殊标识符会过滤消息,只留下那些以前从未发送过的消息。

此外,在特殊情况下,您可以指定真实标识符,例如 0 或任何其他有效标识符。 在这种情况下命令 瑞德集团 将返回状态为“待处理”的消息历史记录,这些消息已发送给指定订阅者 (Alice),但尚未使用命令确认 XACK.

我们可以通过立即指定 ID 0 来测试此行为,无需选项 COUNT个。 我们只会看到一条待处理的消息,即苹果消息:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

但是,如果我们确认消息已成功处理,则它将不再显示:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

现在轮到鲍勃读一些东西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

鲍勃是我的群组的成员,他请求的消息不超过两条。 该命令仅报告由于特殊标识符“">”而未发送的消息。 正如您所看到的,消息“apple”不会显示,因为它已经传递给Alice,因此Bob 收到“orange”和“strawberry”。

这样,Alice、Bob 和该组的任何其他订阅者都可以从同一流中读取不同的消息。 他们还可以读取未处理消息的历史记录或将消息标记为已处理。

有几点需要记住:

  • 一旦订阅者认为该消息是命令 瑞德集团,该消息进入“待处理”状态并分配给该特定订阅者。 其他群组订阅者将无法阅读此消息。
  • 订阅者会在第一次提及时自动创建,无需显式创建它们。
  • 瑞德集团 您可以同时读取来自多个不同线程的消息,但是要使其工作,您需要首先使用以下命令为每个线程创建具有相同名称的组 X集团

故障后恢复

订户可以从故障中恢复并重新读取其处于“待处理”状态的消息列表。 然而,在现实世界中,订阅者最终可能会失败。 如果订阅者无法从故障中恢复,订阅者的卡住消息会发生什么情况?
Consumer Group 提供的功能正是用于此类情况 - 当您需要更改消息的所有者时。

您需要做的第一件事是调用命令 小鹏,显示组中状态为“待处理”的所有消息。 在最简单的形式中,仅使用两个参数调用该命令:线程名称和组名称:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

该团队显示了整个组和每个订阅者的未处理消息数。 我们只有鲍勃有两条未完成的消息,因为爱丽丝请求的唯一消息已被确认 XACK.

我们可以使用更多参数请求更多信息:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - 标识符范围(可以使用“-”和“+”)
{count} — 投递尝试次数
{consumer-name} - 组名称

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

现在我们有了每条消息的详细信息:ID、订阅者姓名、空闲时间(以毫秒为单位)以及最后的传递尝试次数。 我们有两条来自 Bob 的消息,它们已经空闲了 74170458 毫秒,大约 20 小时。

请注意,没有人可以阻止我们简单地使用以下命令来检查消息的内容: 异界.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我们只需在参数中重复相同的标识符两次即可。 现在我们有了一些想法,Alice 可能会认为,在 20 小时的停机时间之后,Bob 可能无法恢复,是时候查询这些消息并恢复为 Bob 处理它们了。 为此,我们使用命令 索赔:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

使用此命令,我们可以通过将所有者更改为{consumer}来接收尚未处理的“外部”消息。 不过,我们也可以提供一个最小空闲时间{min-idle-time}。 这有助于避免两个客户端尝试同时更改同一消息的所有者的情况:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

第一个客户将重置停机时间并增加交货柜台。 因此第二个客户端将无法请求它。

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice 已成功认领该消息,她现在可以处理该消息并确认该消息。

从上面的示例中,您可以看到成功的请求返回消息本身的内容。 然而,这不是必要的。 JUSTID 选项可用于仅返回消息 ID。 如果您对消息的详细信息不感兴趣并且想要提高系统性能,这非常有用。

送货柜台

您在输出中看到的计数器 小鹏 是每条消息的传递次数。 这样的计数器以两种方式递增:当通过以下方式成功请求消息时: 索赔 或者当使用呼叫时 瑞德集团.

有些消息被多次传递是正常的。 最主要的是所有消息最终都会被处理。 有时,处理消息时会出现问题,因为消息本身已损坏,或者消息处理导致处理程序代码中出现错误。 在这种情况下,可能没有人能够处理该消息。 由于我们有一个交付尝试计数器,因此我们可以使用该计数器来检测此类情况。 因此,一旦传递计数达到您指定的高数字,将此类消息放在另一个线程并向系统管理员发送通知可能会更明智。

线程状态

团队 信佛 用于请求有关线程及其组的各种信息。 例如,基本命令如下所示:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

上面的命令显示有关指定流的一般信息。 现在是一个稍微复杂一点的例子:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

上面的命令显示指定线程的所有组的一般信息

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

上面的命令显示指定流和组的所有订阅者的信息。
如果您忘记了命令语法,只需向命令本身寻求帮助:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

流大小限制

许多应用程序不希望永远将数据收集到流中。 每个线程允许的最大消息数通常很有用。 在其他情况下,当达到指定的线程大小时,将所有消息从一个线程移动到另一个持久存储是很有用的。 您可以使用命令中的 MAXLEN 参数限制流的大小 XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

当使用MAXLEN时,旧记录在达到指定长度时会自动删除,因此流具有恒定的大小。 然而,这种情况下的修剪在 Redis 内存中并不是以最有效的方式进行的。 您可以通过以下方式改善这种情况:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

上面示例中的 ~ 参数意味着我们不一定需要将流长度限制为特定值。 在我们的示例中,这可以是大于或等于 1000 的任何数字(例如 1000、1010 或 1030)。 我们只是明确指定我们希望流存储至少 1000 条记录。 这使得 Redis 内部的内存管理更加高效。

还有一个单独的团队 XTRIM,它做同样的事情:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

持久存储和复制

Redis Stream 异步复制到从节点并保存到 AOF(所有数据的快照)和 RDB(所有写入操作的日志)等文件中。 还支持消费者组状态的复制。 因此,如果一条消息在主节点上处于“待处理”状态,那么在从节点上该消息将具有相同的状态。

从流中删除单个元素

有一个特殊的命令可以删除消息 西德尔。 该命令获取线程的名称,后跟要删除的消息 ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用该命令时,需要考虑到实际内存不会立即释放。

零长度流

流和其他 Redis 数据结构之间的区别在于,当其他数据结构中不再包含元素时,作为副作用,数据结构本身将从内存中删除。 因此,例如,当 ZREM 调用删除最后一个元素时,排序集将被完全删除。 相反,即使内部没有任何元素,线程也可以保留在内存中。

结论

Redis Stream 非常适合创建消息代理、消息队列、统一日志记录和历史记录聊天系统。

正如我曾经说过的 尼克劳斯·沃斯,程序是算法加数据结构,Redis 已经为您提供了两者。

来源: habr.com

添加评论