Redis Stream - 訊息系統的可靠性和可擴展性

Redis Stream - 訊息系統的可靠性和可擴展性

Redis Stream是Redis 5.0版本中引入的一種新的抽象資料類型
從概念上講,Redis Stream 是一個可以添加條目的清單。 每個條目都有一個唯一的識別碼。 預設情況下,ID 是自動產生的,並包含時間戳記。 因此,您可以隨時間查詢記錄範圍,或者在新數據到達流時接收新數據,就像 Unix“tail -f”命令讀取日誌檔案並在等待新數據時凍結一樣。 請注意,多個客戶端可以同時偵聽一個線程,就像許多“tail -f”進程可以同時讀取檔案而不會相互衝突一樣。

為了了解新資料類型的所有優勢,讓我們快速瀏覽一下長期存在的 Redis 結構,這些結構部分複製了 Redis Stream 的功能。

Redis 發佈/訂閱

Redis Pub/Sub 是一個簡單的訊息傳遞系統,已內建在您的鍵值儲存中。 然而,簡單性是有代價的:

  • 如果發布者因為某種原因失敗,那麼他就會失去所有訂閱者
  • 發布者需要知道其所有訂閱者的確切地址
  • 如果資料發布速度快於處理速度,發布者可能會導致訂閱者工作負擔過重
  • 訊息在發布後立即從發布者的緩衝區中刪除,無論訊息傳遞給多少訂閱者以及他們處理該訊息的速度如何。
  • 所有訂閱者將同時收到該訊息。 訂閱者本身必須以某種方式就處理相同訊息的順序達成一致。
  • 沒有內建機制來確認訂閱者已成功處理訊息。 如果訂閱者收到訊息並在處理過程中崩潰,發布者將不會知道這一點。

Redis列表

Redis List 是一種支援阻塞讀取指令的資料結構。 您可以從清單的開頭或結尾新增和閱讀訊息。 基於這個結構,你可以為你的分散式系統製作一個好的堆疊或佇列,在大多數情況下這就足夠了。 與 Redis Pub/Sub 的主要差異:

  • 該訊息被傳遞給一個客戶端。 第一個讀取阻塞的客戶端將首先接收資料。
  • Clint 必須親自啟動每個訊息的讀取操作。 List 對客戶一無所知。
  • 訊息將被存儲,直到有人閱讀或明確刪除它們為止。 如果將Redis伺服器配置為將資料刷新到磁碟,那麼系統的可靠性將顯著提高。

串流簡介

在流中新增條目

團隊 XADD 在流中新增一個條目。 一筆記錄不僅僅是一個字串,它由一個或多個鍵值對組成。 因此,每個條目都已經結構化並且類似於 CSV 檔案的結構。

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

在上面的範例中,我們在名稱(鍵)“mystream”的流中添加兩個欄位:“sensor-id”和“Temperature”,其值分別為“1234”和“19.8”。 作為第二個參數,此命令採用將分配給條目的識別碼 - 該識別碼唯一標識流中的每個條目。 然而,在本例中我們傳遞了 * ,因為我們希望 Redis 為我們產生一個新的 ID。 每個新的ID都會增加。 因此,每個新條目將具有相對於先前條目更高的識別碼。

識別符格式

命令返回的條目ID XADD,由兩部分組成:

{millisecondsTime}-{sequenceNumber}

毫秒時間 — Unix 時間(以毫秒為單位)(Redis 伺服器時間)。 但是,如果當前時間等於或小於先前記錄的時間,則使用先前記錄的時間戳記。 因此,如果伺服器時間回到過去,新的識別碼仍將保留增量屬性。

序列號 用於在同一毫秒內建立的記錄。 序列號 相對於前一個條目將增加 1。 因為 序列號 大小為 64 位,那麼實際上您不應該遇到一毫秒內可以產生的記錄數量的限制。

乍一看,此類標識符的格式可能看起來很奇怪。 不信任的讀者可能想知道為什麼時間是標識符的一部分。 原因是Redis流支援按ID進行範圍查詢。 由於標識符與建立記錄的時間相關聯,因此可以查詢時間範圍。 我們在看命令的時候會看一個具體的例子 X範圍.

如果由於某種原因用戶需要指定自己的標識符,例如與某個外部系統關聯,那麼我們可以將其傳遞給命令 XADD 而不是*,如下圖所示:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

請注意,在這種情況下,您必須自行監控 ID 增量。 在我們的範例中,最小標識符是“0-1”,因此該命令不會接受等於或小於“0-1”的另一個標識符。

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

每個流的記錄數

只需使用命令即可取得流中的記錄數 XLEN。 對於我們的範例,此命令將傳回以下值:

> XLEN somestream
(integer) 2

範圍查詢 - XRANGE 和 XREVRANGE

要按範圍請求數據,我們需要指定兩個標識符 - 範圍的開始和結束。 傳回的範圍將包括所有元素,包括邊界。 還有兩個特殊標識符“-”和“+”,分別表示流中最小(第一筆記錄)和最大(最後一筆記錄)的標識符。 下面的範例將列出所有流條目。

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

每個傳回的記錄都是一個由兩個元素組成的陣列:標識符和鍵值對列表。 我們已經說過記錄標識符與時間相關。 因此,我們可以請求特定時間段的範圍。 但是,我們可以在請求中指定不完整​​的標識符,而只指定 Unix 時間,省​​略與 序列號。 標識符的省略部分將在範圍開始處自動設定為零,並在範圍結束處設定為最大可能值。 下面的範例說明如何請求兩毫秒的範圍。

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

我們在這個範圍內只有一個條目,但在實際資料集中傳回的結果可能會很大。 為此原因 X範圍 支援 COUNT 選項。 透過指定數量,我們可以簡單地取得前N筆記錄。 如果我們需要取得接下來的N筆記錄(分頁),我們可以使用最後收到的ID,增加它 序列號 按一再問。 讓我們在下面的範例中看看這一點。 我們開始加入 10 個元素 XADD (假設 mystream 已填滿 10 個元素)。 為了開始迭代,每個指令取得 2 個元素,我們從完整範圍開始,但 COUNT 等於 2。

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

要繼續迭代接下來的兩個元素,我們需要選擇最後收到的 ID,即 1519073279157-0,並加上 1 序列號.
產生的 ID(在本例中為 1519073279157-1)現在可以用作下一次呼叫的新範圍參數開始 X範圍:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

等等。 因為複雜性 X範圍 搜尋的時間複雜度為 O(log(N)),然後傳回 M 個元素的時間複雜度為 O(M),那麼每個迭代步驟都很快。 因此,使用 X範圍 流可以有效地迭代。

團隊 XREVRANGE 是等價的 X範圍,但以相反的順序返回元素:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

請注意該命令 XREVRANGE 以相反的順序接受範圍參數開始和停止。

使用 XREAD 讀取新條目

通常,任務是訂閱流並僅接收新訊息。 這個概念看起來可能類似於 Redis Pub/Sub 或阻塞 Redis List,但在如何使用 Redis Stream 方面有根本的區別:

  1. 預設情況下,每個新訊息都會傳遞給每個訂閱者。 此行為與阻塞 Redis 清單不同,在阻塞 Redis 清單中,新訊息只能由一個訂閱者讀取。
  2. 在 Redis Pub/Sub 中,所有訊息都會被遺忘並且永遠不會持久,而在 Stream 中,所有訊息都會無限期保留(除非客戶端明確導致刪除)。
  3. Redis Stream 讓您區分對一個流中的消息的存取。 特定訂閱者只能看到他們的個人訊息歷史記錄。

您可以使用以下命令訂閱線程並接收新訊息 XREAD。 這比稍微複雜一點 X範圍,所以我們首先從更簡單的範例開始。

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

上面的例子展示了一個非阻塞的形式 XREAD。 請注意,COUNT 選項是可選的。 事實上,唯一需要的命令選項是 STREAMS 選項,它指定流列表以及對應的最大識別碼。 我們編寫了「STREAMS mystream 0」 - 我們希望接收識別碼大於「0-0」的 mystream 流的所有記錄。 從範例中可以看到,該命令傳回線程的名稱,因為我們可以同時訂閱多個線程。 例如,我們可以編寫「STREAMS mystream otherstream 0 0」。 請注意,在 STREAMS 選項之後,我們需要先提供所有所需流的名稱,然後才是識別碼清單。

在這個簡單的形式中,與以下命令相比,該命令沒有做任何特別的事情 X範圍。 然而,有趣的是,我們可以輕鬆地將 XREAD 對於阻塞指令,指定 BLOCK 參數:

> XREAD BLOCK 0 STREAMS mystream $

在上面的範例中,指定了一個新的 BLOCK 選項,超時時間為 0 毫秒(這意味著無限期等待)。 此外,沒有傳遞流 mystream 的常用標識符,而是傳遞了一個特殊標識符 $。 這個特殊的識別碼意味著 XREAD 必須使用 mystream 中的最大識別碼作為識別碼。 因此,從我們開始收聽的那一刻起,我們只會收到新消息。 在某些方面,這類似於 Unix 的“tail -f”命令。

請注意,當使用 BLOCK 選項時,我們不一定需要使用特殊標識符 $。 我們可以使用流中存在的任何標識符。 如果團隊可以立即滿足我們的請求而不會阻塞,那麼它就會這樣做,否則就會阻塞。

阻塞 XREAD 也可以同時監聽多個線程,你只需要指定它們的名稱。 在這種情況下,該命令將傳回接收資料的第一個流的記錄。 為給定線程阻塞的第一個訂閱者將首先接收資料。

消費者群體

在某些任務中,我們希望限制訂閱者對一個執行緒內的消息的存取。 這可能有用的一個範例是帶有工作人員的訊息佇列,這些工作人員將從執行緒接收不同的訊息,從而允許擴展訊息處理。

如果我們假設有三個訂閱者 C1、C2、C3 和一個包含訊息 1、2、3、4、5、6、7 的線程,那麼訊息的服務將如下圖所示:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

為了達到這種效果,Redis Stream 使用了一個稱為 Consumer Group 的概念。 這個概念類似於偽訂閱者,它從流中接收數據,但實際上由一個群組內的多個訂閱者提供服務,提供一定的保證:

  1. 每個訊息都會傳遞給群組內的不同訂閱者。
  2. 在群組內,訂閱者透過其名稱進行標識,該名稱是區分大小寫的字串。 如果訂閱者暫時退出群組,他可以使用自己的唯一名稱恢復到群組。
  3. 每個消費者群體都遵循「第一個未讀訊息」的概念。 當訂閱者要求新訊息時,它只能接收以前從未傳遞給群組內任何訂閱者的訊息。
  4. 有一個命令可以明確確認訊息已被訂閱者成功處理。 在呼叫該命令之前,請求的訊息將保持「待處理」狀態。
  5. 在消費者群組中,每個訂閱者都可以要求發送給他但尚未處理的訊息的歷史記錄(處於「待處理」狀態)

從某種意義上來說,群體的狀態可以表示為:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

現在是時候熟悉 Consumer Group 的主要命令了,即:

  • X集團 用於建立、銷毀和管理群組
  • 瑞德集團 用於透過群組讀取流
  • XACK - 此命令允許訂閱者將訊息標記為已成功處理

創建消費者組

假設 mystream 已經存在。 然後群組創建命令將如下所示:

> XGROUP CREATE mystream mygroup $
OK

建立群組時,我們必須傳遞一個標識符,群組將從該標識符開始接收訊息。 如果我們只想接收所有新訊息,那麼我們可以使用特殊標識符 $ (如上面的範例所示)。 如果指定 0 而不是特殊標識符,則執行緒中的所有訊息都可供該群組使用。

現在群組已創建,我們可以立即開始使用命令讀取訊息 瑞德集團。 這個命令非常類似於 XREAD 並支援可選的 BLOCK 選項。 但是,有一個必要的 GROUP 選項必須始終使用兩個參數來指定:群組名稱和訂戶名稱。 也支援 COUNT 選項。

在閱讀主題之前,讓我們先放一些訊息:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

現在讓我們嘗試透過群組讀取此流:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

上述命令逐字讀取如下:

“我,訂閱者 Alice,mygroup 的成員,想要從 mystream 中讀取一條以前從未發送給任何人的消息。”

每次訂閱者對群組執行操作時,它必須提供其名稱,以在群組中唯一標識自己。 上述指令中還有一個非常重要的細節-特殊識別碼「">」。 這個特殊標識符會過濾訊息,只留下那些以前從未發送過的訊息。

此外,在特殊情況下,您可以指定真實標識符,例如 0 或任何其他有效標識符。 在這種情況下命令 瑞德集團 將傳回狀態為「待處理」的訊息歷史記錄,這些訊息已傳送給指定訂閱者 (Alice),但尚未使用命令確認 XACK.

我們可以透過立即指定 ID 0 來測試此行為,無需選項 COUNT。 我們只會看到一條待處理的訊息,即蘋果訊息:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

但是,如果我們確認訊息已成功處理,則它將不再顯示:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

現在輪到鮑伯讀一些東西了:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

鮑伯是我的群組的成員,他要求的訊息不超過兩條。 此命令僅報告由於特殊識別碼“">”而未發送的訊息。 正如您所看到的,訊息“apple”不會顯示,因為它已經傳遞給Alice,因此Bob 收到“orange”和“strawberry”。

這樣,Alice、Bob 和該組的任何其他訂閱者都可以從同一流中讀取不同的訊息。 他們還可以讀取未處理訊息的歷史記錄或將訊息標記為已處理。

有幾點要記住:

  • 一旦訂閱者認為該訊息是命令 瑞德集團,該訊息進入「待處理」狀態並指派給該特定訂閱者。 其他群組訂閱者將無法閱讀此訊息。
  • 訂閱者會在第一次提及時自動創建,而無需明確創建它們。
  • 瑞德集團 您可以同時讀取來自多個不同線程的訊息,但是要使其工作,您需要先使用以下命令為每個線程建立具有相同名稱的群組 X集團

故障後恢復

訂戶可以從故障中恢復並重新讀取其處於「待處理」狀態的訊息清單。 然而,在現實世界中,訂閱者最終可能會失敗。 如果訂閱者無法從故障中恢復,訂閱者的卡住訊息會發生什麼情況?
Consumer Group 提供的功能正是用於此類情況 - 當您需要更改訊息的擁有者時。

您需要做的第一件事是呼叫命令 小鵬,顯示群組中狀態為「待處理」的所有訊息。 在最簡單的形式中,僅使用兩個參數來呼叫該命令:線程名稱和群組名稱:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

該團隊顯示了整個群組和每個訂閱者的未處理訊息數。 我們只有鮑伯有兩條未完成的訊息,因為愛麗絲要求的唯一訊息已被確認 XACK.

我們可以使用更多參數請求更多資訊:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - 標識符範圍(可使用「-」和「+」)
{count} — 投遞嘗試次數
{consumer-name} - 群組名稱

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

現在我們有了每個訊息的詳細資訊:ID、訂閱者姓名、空閒時間(以毫秒為單位)以及最後的傳遞嘗試次數。 我們有兩個來自 Bob 的訊息,它們已經空閒了 74170458 毫秒,大約 20 小時。

請注意,沒有人可以阻止我們簡單地使用以下命令來檢查訊息的內容: X範圍.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

我們只需在參數中重複相同的識別符兩次即可。 現在我們有了一些想法,Alice 可能會認為,在 20 小時的停機時間之後,Bob 可能無法恢復,是時候查詢這些訊息並恢復為 Bob 處理它們了。 為此,我們使用命令 索賠:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

使用此命令,我們可以透過將所有者更改為{consumer}來接收尚未處理的「外部」訊息。 不過,我們也可以提供一個最小空閒時間{min-idle-time}。 這有助於避免兩個客戶端嘗試同時更改相同訊息的擁有者的情況:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

第一個客戶將重置停機時間並增加交貨櫃檯。 因此第二個客戶端將無法請求它。

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice 已成功認領該訊息,她現在可以處理該訊息並確認該訊息。

從上面的範例中,您可以看到成功的請求返回訊息本身的內容。 然而,這不是必要的。 JUSTID 選項可用於僅傳回訊息 ID。 如果您對訊息的詳細資訊不感興趣並且想要提高系統效能,這非常有用。

送貨櫃檯

您在輸出中看到的計數器 小鵬 是每條訊息的傳遞次數。 這樣的計數器以兩種方式遞增:當透過以下方式成功請求訊息時: 索賠 或使用通話時 瑞德集團.

有些訊息被多次傳遞是正常的。 最主要的是所有訊息最終都會被處理。 有時,處理訊息時會出現問題,因為訊息本身已損壞,或訊息處理導致處理程序程式碼中出現錯誤。 在這種情況下,可能沒有人能夠處理該訊息。 由於我們有一個交付嘗試計數器,因此我們可以使用該計數器來檢測此類情況。 因此,一旦傳遞計數達到您指定的高數字,將此類訊息放在另一個線程並向系統管理員發送通知可能會更明智。

線程狀態

團隊 信佛 用於請求有關線程及其群組的各種資訊。 例如,基本指令如下所示:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

上面的命令顯示有關指定流的一般資訊。 現在是一個稍微複雜一點的例子:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

上面的命令顯示指定線程的所有群組的一般信息

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

上面的命令顯示指定流和群組的所有訂閱者的資訊。
如果您忘記了命令語法,只需向命令本身尋求協助:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

流大小限制

許多應用程式不希望永遠將資料收集到流中。 每個執行緒允許的最大訊息數通常很有用。 在其他情況下,當達到指定的執行緒大小時,將所有訊息從一個執行緒移動到另一個持久性儲存是很有用的。 您可以使用指令中的 MAXLEN 參數限制流的大小 XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

當使用MAXLEN時,舊記錄在達到指定長度時會自動刪除,因此串流具有恆定的大小。 然而,這種情況下的修剪在 Redis 記憶體中並不是以最有效的方式進行的。 您可以透過以下方式改善這種情況:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

上面範例中的 ~ 參數意味著我們不一定需要將流長度限制為特定值。 在我們的範例中,這可以是大於或等於 1000 的任何數字(例如 1000、1010 或 1030)。 我們只是明確指定我們希望流儲存至少 1000 筆記錄。 這使得 Redis 內部的記憶體管理更加有效率。

還有一個單獨的團隊 XTRIM,它做同樣的事情:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

持久性儲存和複製

Redis Stream 非同步複製到從節點並保存到 AOF(所有資料的快照)和 RDB(所有寫入操作的日誌)等檔案中。 也支援消費者組狀態的複製。 因此,如果一條訊息在主節點上處於「待處理」狀態,那麼在從節點上該訊息將具有相同的狀態。

從串流中刪除單一元素

有一個特殊的命令可以刪除訊息 西德爾。 該指令取得線程的名稱,後面跟著要刪除的訊息 ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

使用該命令時,需要考慮到實際記憶體不會立即釋放。

零長度流

流和其他 Redis 資料結構之間的區別在於,當其他資料結構中不再包含元素時,作為副作用,資料結構本身將從記憶體中刪除。 因此,例如,當 ZREM 呼叫刪除最後一個元素時,排序集將會完全刪除。 相反,即使內部沒有任何元素,線程也可以保留在記憶體中。

結論

Redis Stream 非常適合建立訊息代理程式、訊息佇列、統一日誌記錄和歷史記錄聊天系統。

正如我曾經說過的 尼克勞斯·沃斯,程式是演算法加資料結構,Redis 已經為您提供了兩者。

來源: www.habr.com

添加評論