Redis Stream - Reliability and scalability of your messaging systems

Redis Stream - Reliability and scalability of your messaging systems

Redis Stream is a new abstract data type introduced in Redis with the release of version 5.0
Conceptually, a Redis Stream is a List that you can add entries to. Each entry has a unique ID. By default, the identifier is generated automatically and includes a timestamp. So you can query for ranges of records over time, or get new data as it comes into the stream, much like the Unix "tail -f" command reads a log file and freezes waiting for new data. Note that multiple clients can listen to a thread at the same time, as many "tail -f" processes can read a file at the same time without conflicting with each other.

To understand all the benefits of the new data type, let's briefly recall the long-standing Redis structures that partially repeat the functionality of Redis Stream.

Redis PUB/SUB

Redis Pub/Sub is a simple messaging system already built into your key-value store. However, there is a price to pay for simplicity:

  • If the publisher fails for any reason, then he loses all his subscribers
  • The publisher needs to know the exact address of all its subscribers
  • A publisher can overload its subscribers with work if data is published faster than it can be processed.
  • The message is removed from the publisher's buffer immediately after publication, regardless of how many subscribers it was delivered to and how quickly they were able to process the message.
  • All subscribers will receive the message at the same time. Subscribers themselves must somehow agree among themselves on the order in which the same message is processed.
  • There is no built-in mechanism for confirming successful processing of a message by a subscriber. If the subscriber received the message and crashed during processing, the publisher will not know about it.

Redis List

Redis List is a data structure that supports blocking read commands. You can add and read messages from the beginning or end of the list. On the basis of this structure, you can make a good stack or queue for your distributed system, and in most cases this will be enough. Main differences from Redis Pub/Sub:

  • The message is delivered to one client. The first read-blocked client will receive the data first.
  • Clint must himself initiate the read operation of each message. List knows nothing about clients.
  • Messages are stored until someone reads them or explicitly deletes them. If you set up a Redis server to flush data to disk, then the reliability of the system increases dramatically.

Introduction to Stream

Adding an entry to a stream

Team XADD adds a new entry to the stream. An entry is not just a string, it consists of one or more key-value pairs. Thus, each entry is already structured and resembles the structure of a CSV file.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

In the example above, we add two fields to the stream with the name (key) "mystream": "sensor-id" and "temperature" with the values ​​"1234" and "19.8", respectively. As the second argument, the command takes an identifier that will be assigned to the entry - this identifier uniquely identifies each entry in the stream. However, in this case, we passed * because we want Redis to generate a new ID for us. Each new ID will increase. Therefore, each new entry will have a higher ID than previous entries.

ID format

The record ID returned by the command XADD, consists of two parts:

{millisecondsTime}-{sequenceNumber}

millisecondsTime β€” Unix time in milliseconds (Redis server time). However, if the current time is the same or less than the time of the previous entry, then the timestamp of the previous entry is used. So if the server's time goes back in time, the new identifier will still retain the increment property.

sequenceNumber used for records created in the same millisecond. sequenceNumber will be incremented by 1 relative to the previous entry. Because the sequenceNumber has a size of 64 bits, then in practice you should not run into a limit on the number of records that can be generated within one millisecond.

The format of such identifiers may seem strange at first glance. The incredulous reader may wonder why the time is part of the identifier. The reason is that Redis streams support ID range queries. Since the identifier is associated with the creation time of the entry, this makes it possible to query for time ranges. We will look at a specific example when we move on to examine the command XRANGE.

If for some reason the user needs to specify his own identifier, which, for example, is associated with some external system, then we can pass it to the command XADD instead of the * sign as shown below:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Note that in this case you have to keep track of the increment of the identifier yourself. In our example, the minimum ID is "0-1", so the command will not accept another ID that is equal to or less than "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Number of entries in the stream

It is possible to get the number of records in a stream by simply using the command XLEN. For our example, this command will return the following value:

> XLEN somestream
(integer) 2

Range queries - XRANGE and XREVRANGE

To request data by range, we need to specify two identifiers - the beginning and end of the range. The returned range will include all elements, including the borders. There are also two special identifiers "-" and "+", respectively meaning the smallest (first entry) and largest (last entry) identifier in the stream. The example below will output all entries of the stream.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Each entry returned is an array of two elements: an identifier and a list of key-value pairs. We have already said that record identifiers are related to time. Therefore, we can request a range of a specific period of time. However, we can specify in the request not the full identifier, but only the Unix time, omitting the part related to sequenceNumber. The omitted part of the identifier is automatically set to zero at the beginning of the range and to the maximum possible value at the end of the range. Below is an example of how you can request a range of two milliseconds.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

We only have one entry in this range, however in real datasets the result returned can be huge. For this reason XRANGE supports the COUNT option. By specifying a count, we can simply get the first N records. If we need to get the next N records (pagination), we can use the last received ID, increment it sequenceNumber per unit and request again. Let's look at it in the following example. We start adding 10 elements with XADD (assume mystream has already been filled with 10 elements). To start iterating with 2 elements per command, we start with the full range, but with COUNT equal to 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

To continue iterating with the next two elements, we need to select the last received ID, i.e. 1519073279157-0, and add 1 to sequenceNumber.
The resulting ID, in this case 1519073279157-1, can now be used as the new range start argument for the next call XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

And so on. Since the complexity XRANGE is O(log(N)) to find and then O(M) to return M elements, then each iteration step is fast. Thus, with the help XRANGE streams can be iterated efficiently.

Team XREVRANGE is the equivalent XRANGE, but returns the elements in reverse order:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Please note that the command XREVRANGE takes range arguments start and stop in reverse order.

Reading New Records with XREAD

Often there is a task to subscribe to a stream and receive only new messages. This concept may seem similar to Redis Pub/Sub or blocking Redis List, but there are fundamental differences in how to use Redis Stream:

  1. Each new message is delivered to each subscriber by default. This behavior differs from a blocking Redis List, where a new message will only be read by a single subscriber.
  2. Whereas in Redis Pub/Sub all messages are forgotten and never persisted, in Stream all messages are persisted indefinitely (unless the client explicitly causes deletion).
  3. Redis Stream allows you to restrict access to messages within a single stream. A particular subscriber can only see their personal message history.

You can subscribe to a thread and receive new messages using the command XREAD. It's a little harder than XRANGE, so we'll start with simpler examples first.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

The example above is a non-blocking form XREAD. Note that the COUNT option is optional. In fact, the only mandatory command option is the STREAMS option, which specifies a list of streams along with the corresponding maximum identifier. We wrote "STREAMS mystream 0" - we want to receive all records of the stream mystream with an ID greater than "0-0". As you can see from the example, the command returns the thread name because we can subscribe to multiple threads at the same time. We could write, for example, "STREAMS mystream otherstream 0 0". Note that after the STREAMS option, we first need to provide the names of all the desired streams and only then the list of identifiers.

In this simple form, the command does nothing special compared to XRANGE. However, what is interesting is that we can easily turn XREAD to a blocking command by specifying the BLOCK argument:

> XREAD BLOCK 0 STREAMS mystream $

In the example above, the new BLOCK option is specified with a timeout of 0 milliseconds (which means an infinite wait). Moreover, instead of passing the usual identifier for the stream mystream, a special identifier $ was passed. This special identifier means that XREAD must use as identifier the maximum identifier in the stream mystream. So we will only receive new messages from the moment we started listening. In a way, this is similar to the Unix "tail -f" command.

Note that when using the BLOCK option, we don't necessarily need to use the $ special identifier. We can use any identifier that exists in the thread. If the team can serve our request immediately, without blocking, it will do so, otherwise it will block.

Blocking XREAD can also listen to multiple threads at once, you just need to specify their names. In this case, the command will return the record of the first stream that received the data. The first subscriber blocked for a given thread will receive data first.

Consumer Groups

In certain tasks, we want to restrict subscribers' access to messages within the same thread. An example of when this can be useful is a message queue with workers that will receive different messages from the thread, allowing you to scale message processing.

If we imagine that we have three subscribers C1, C2, C3 and a stream that contains messages 1, 2, 3, 4, 5, 6, 7, then the messages will be served as in the diagram below:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

To get this effect, Redis Stream uses a concept called Consumer Group. This concept is similar to a pseudo-subscriber that receives data from a stream, but is actually serviced by multiple subscribers within a group, providing certain guarantees:

  1. Each message is delivered to different subscribers within the group.
  2. Within a group, subscribers are identified by their name, which is a case-sensitive string. If a subscriber temporarily drops out of the group, then he can be restored to the group by his own unique name.
  3. Every Consumer Group follows the "first unread message" concept. When a subscriber requests new messages, it can only receive messages that have never been delivered to any subscriber within the group before.
  4. There is a command to explicitly confirm that the message was successfully processed by the subscriber. Until this command is called, the requested message will remain in the "pending" status.
  5. Inside the Consumer Group, each subscriber can request the history of messages that were delivered to him, but have not yet been processed (in the "pending" status)

In a sense, the state of a group can be represented like this:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Now it's time to get acquainted with the main commands for the Consumer Group, namely:

  • XGROUP used to create, destroy and manage groups
  • XREADGROUP used to read the stream through the group
  • XACK - this command allows the subscriber to mark the message as successfully processed

Creation of the Consumer Group

Let's assume that the stream mystream already exists. Then the group creation command will look like:

> XGROUP CREATE mystream mygroup $
OK

When creating a group, we must pass an identifier, starting from which the group will receive messages. If we just want to receive all new messages, then we can use the special identifier $ (as in our example above). If you specify 0 instead of a special identifier, then all messages of the thread will be available to the group.

Now that the group has been created, we can immediately start reading messages with the command XREADGROUP. This command is very similar to XREAD and supports the optional BLOCK option. However, there is a mandatory GROUP option that must always be specified with two arguments: the group name and the subscriber name. The COUNT option is also supported.

Before reading the stream, let's put some messages there:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

And now let's try to read this stream through the group:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

The above command verbatim reads as follows:

"I, subscriber Alice, a member of mygroup, want to read from mystream a single message that has never been delivered to anyone before."

Each time a subscriber performs an operation on a group, it must provide its name, uniquely identifying itself within the group. There is one more very important detail in the above command - the special identifier ">". This special identifier filters messages, leaving only those that have never been delivered so far.

Also, in special cases, you can specify a real identifier, such as 0 or any other valid identifier. In this case the command XREADGROUP will return you the history of messages with "pending" status that have been delivered to the specified subscriber (Alice) but have not yet been acknowledged with the command XACK.

We can test this behavior by immediately specifying the id 0, without the option COUNT. We will just see the only pending message, i.e. the message with the apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

However, if we acknowledge the message as successfully processed, then it will no longer be displayed:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Now it's Bob's turn to read something:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, a member of mygroup, asked for no more than two messages. The command only reports undelivered messages due to the special identifier ">". As you can see, the message "apple" will not be displayed as it has already been delivered to Alice, so Bob gets "orange" and "strawberry".

Thus, Alice, Bob, and any other group subscriber can read different messages from the same thread. They can also read their history of unprocessed messages or mark messages as processed.

There are a few things to keep in mind:

  • As soon as the subscriber considers the message a command XREADGROUP, this message enters the "pending" state and is assigned to that specific subscriber. Other subscribers of the group will not be able to read this message.
  • Subscribers are automatically created the first time they are mentioned, there is no need to explicitly create them.
  • With XREADGROUP you can read messages from multiple different threads at the same time, however, for this to work, you need to pre-create groups with the same name for each thread with XGROUP

Failover recovery

The subscriber can recover from the crash and reread their list of pending messages. However, in the real world, subscribers can fail definitively. What happens to a subscriber's stuck messages if the subscriber is unable to recover from the crash?
The Consumer Group offers a feature that is used exactly for such cases - when it is necessary to change the owner of messages.

The first step is to call the command XPENDING, which displays all the group's messages with the "pending" status. In its simplest form, the command is called with only two arguments: the thread name and the group name:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

The command outputs the number of outstanding messages for the entire group and for each subscriber. We only have Bob with two outstanding messages because the only message requested by Alice was acknowledged with XACK.

We can request more information using more arguments:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} β€” range of identifiers (you can use "-" and "+")
{count} β€” number of delivery attempts
{consumer-name} - group name

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

We now have the details for each message: id, subscriber name, timeout in milliseconds, and finally the number of delivery attempts. We have two messages from Bob and they are idle for 74170458 milliseconds, about 20 hours.

Note that there is nothing stopping us from checking what the content of the message was by simply using XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

We just have to repeat the same identifier twice in the arguments. Now that we have some idea, Alice may decide that after 20 hours of downtime, Bob probably won't recover, and it's time to query these messages and resume processing them instead of Bob. For this we use the command XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

With this command, we can get a "foreign" message that has not yet been processed by changing the owner to {consumer}. However, we can also provide a minimum idle time of {min-idle-time}. This helps to avoid the situation when two clients try to change the owner of the same messages at the same time:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

The first client will reset the idle time and increment the delivery counter. So the second client will not be able to request it.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

The message has been successfully claimed by Alice, who can now process the message and acknowledge it.

From the example above, you can see that a successful query returns the content of the message itself. However, this is not required. The JUSTID option can be used to return only message identifiers. This is useful if you are not interested in the details of the message and want to improve system performance.

Delivery counter

The counter you are observing in the output XPENDING is the number of deliveries of each message. Such a counter is incremented in two ways: when a message is successfully requested via XCLAIM or when call is used XREADGROUP.

It is normal for some messages to be delivered multiple times. The main thing is that in the end all messages are processed. Sometimes problems arise when processing a message because the message itself is corrupted, or processing the message causes an error in the handler code. In this case, it may turn out that no one will be able to process this message. Since we have a counter of delivery attempts, we can use this counter to detect such situations. Therefore, once the delivery count reaches the high number you specify, it would probably make more sense to put such a message on another thread and send a notification to the system administrator.

Thread state

Team XINFO used to request various information about a thread and its groups. For example, the basic form of a command looks like this:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

The command above displays general information on the specified stream. Now a slightly more complex example:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

The command above displays general information for all groups of the specified stream

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

The command above displays information on all subscribers of the specified stream and group.
If you forget the command syntax, just ask the command itself for help:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stream Size Limit

Many applications don't want to stream data forever. It is often useful to have a maximum allowed number of messages per thread. In other cases, it is useful to transfer all messages from the stream to another persistent storage when the specified stream size is reached. You can limit the stream size using the MAXLEN parameter in the command XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

When using MAXLEN, old records are automatically removed when the specified length is reached, so the stream has a constant size. However, pruning in this case does not occur in the most efficient way in Redis memory. You can improve the situation in the following way:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

The ~ argument in the example above means that we don't necessarily need to limit the length of the stream to a specific value. In our example, this can be any number greater than or equal to 1000 (for example, 1000, 1010, or 1030). We just explicitly indicated that we want our stream to store at least 1000 records. This makes memory management much more efficient inside Redis.

There is also a separate command XTRIM, which does the same:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Persistent storage and replication

Redis Stream is asynchronously replicated to slave nodes and stored in AOF (snapshot of all data) and RDB (log of all write operations) files. State replication of Consumer Groups is also supported. Therefore, if the message is in the "pending" status on the master node, then on the slave nodes this message will have the same status.

Removing Individual Elements from a Stream

There is a special command to delete messages XDEL. The command gets the name of the thread followed by the IDs of the messages to be removed:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

When using this command, you need to take into account that the actual memory will not be freed immediately.

Zero length streams

The difference between streams and other Redis data structures is that when other data structures no longer have elements inside them, as a side effect, the data structure itself will be removed from memory. So, for example, a sorted set will be completely removed when the ZREM call removes the last element. Instead, threads are allowed to stay in memory even without having a single element inside.

Conclusion

Redis Stream is ideal for building message brokers, message queues, unified logging, and history-keeping chat systems.

As once said Niklaus Wirth, programs are algorithms plus data structures, and Redis already gives you both.

Source: habr.com

Add a comment