Redis Stream – Zuverlässigkeit und Skalierbarkeit Ihrer Messaging-Systeme

Redis Stream – Zuverlässigkeit und Skalierbarkeit Ihrer Messaging-Systeme

Redis Stream ist ein neuer abstrakter Datentyp, der in Redis mit Version 5.0 eingeführt wurde
Konzeptionell ist Redis Stream eine Liste, zu der Sie Einträge hinzufügen können. Jeder Eintrag hat eine eindeutige Kennung. Standardmäßig wird die ID automatisch generiert und enthält einen Zeitstempel. Daher können Sie Datensätze im Laufe der Zeit abfragen oder neue Daten empfangen, sobald diese im Stream ankommen, ähnlich wie der Unix-Befehl „tail -f“ eine Protokolldatei liest und einfriert, während er auf neue Daten wartet. Beachten Sie, dass mehrere Clients gleichzeitig auf einen Thread hören können, ebenso wie viele „tail -f“-Prozesse eine Datei gleichzeitig lesen können, ohne dass es zu Konflikten untereinander kommt.

Um alle Vorteile des neuen Datentyps zu verstehen, werfen wir einen kurzen Blick auf die seit langem bestehenden Redis-Strukturen, die die Funktionalität von Redis Stream teilweise nachbilden.

Redis PUB/SUB

Redis Pub/Sub ist ein einfaches Nachrichtensystem, das bereits in Ihren Schlüsselwertspeicher integriert ist. Allerdings hat die Einfachheit ihren Preis:

  • Wenn der Herausgeber aus irgendeinem Grund scheitert, verliert er alle seine Abonnenten
  • Der Verlag muss die genaue Adresse aller seiner Abonnenten kennen
  • Ein Herausgeber kann seine Abonnenten mit Arbeit überlasten, wenn Daten schneller veröffentlicht als verarbeitet werden
  • Die Nachricht wird unmittelbar nach der Veröffentlichung aus dem Puffer des Herausgebers gelöscht, unabhängig davon, an wie viele Abonnenten sie zugestellt wurde und wie schnell diese diese Nachricht verarbeiten konnten.
  • Alle Abonnenten erhalten die Nachricht gleichzeitig. Die Abonnenten selbst müssen sich irgendwie auf die Reihenfolge der Verarbeitung derselben Nachricht einigen.
  • Es gibt keinen integrierten Mechanismus zur Bestätigung, dass ein Abonnent eine Nachricht erfolgreich verarbeitet hat. Wenn ein Abonnent eine Nachricht erhält und während der Verarbeitung abstürzt, erfährt der Herausgeber nichts davon.

Redis-Liste

Redis List ist eine Datenstruktur, die das Blockieren von Lesebefehlen unterstützt. Sie können Nachrichten am Anfang oder Ende der Liste hinzufügen und lesen. Basierend auf dieser Struktur können Sie einen guten Stack oder eine gute Warteschlange für Ihr verteiltes System erstellen, was in den meisten Fällen ausreicht. Hauptunterschiede zu Redis Pub/Sub:

  • Die Nachricht wird an einen Client übermittelt. Der erste lesegesperrte Client empfängt die Daten zuerst.
  • Clint muss den Lesevorgang für jede Nachricht selbst initiieren. List weiß nichts über Kunden.
  • Nachrichten werden gespeichert, bis jemand sie liest oder explizit löscht. Wenn Sie den Redis-Server so konfigurieren, dass Daten auf die Festplatte geleert werden, erhöht sich die Zuverlässigkeit des Systems erheblich.

Einführung in Stream

Einen Eintrag zu einem Stream hinzufügen

Team XADD Fügt dem Stream einen neuen Eintrag hinzu. Ein Datensatz ist nicht nur eine Zeichenfolge, er besteht aus einem oder mehreren Schlüssel-Wert-Paaren. Somit ist jeder Eintrag bereits strukturiert und ähnelt dem Aufbau einer CSV-Datei.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Im obigen Beispiel fügen wir dem Stream zwei Felder mit dem Namen (Schlüssel) „mystream“ hinzu: „sensor-id“ und „temperature“ mit den Werten „1234“ bzw. „19.8“. Als zweites Argument benötigt der Befehl einen Bezeichner, der dem Eintrag zugewiesen wird – dieser Bezeichner identifiziert jeden Eintrag im Stream eindeutig. In diesem Fall haben wir jedoch * übergeben, weil wir möchten, dass Redis eine neue ID für uns generiert. Jede neue ID erhöht sich. Daher hat jeder neue Eintrag im Vergleich zu früheren Einträgen eine höhere Kennung.

Bezeichnerformat

Die vom Befehl zurückgegebene Eintrags-ID XADD, besteht aus zwei Teilen:

{millisecondsTime}-{sequenceNumber}

MillisekundenZeit — Unix-Zeit in Millisekunden (Redis-Serverzeit). Wenn die aktuelle Zeit jedoch gleich oder kleiner als die Zeit der vorherigen Aufnahme ist, wird der Zeitstempel der vorherigen Aufnahme verwendet. Wenn die Serverzeit also zeitlich zurückgeht, behält der neue Bezeichner weiterhin die Inkrementierungseigenschaft.

Sequenznummer Wird für Datensätze verwendet, die in derselben Millisekunde erstellt wurden. Sequenznummer wird gegenüber dem vorherigen Eintrag um 1 erhöht. Weil das Sequenznummer 64 Bit groß ist, sollten Sie in der Praxis nicht auf eine Begrenzung der Anzahl der Datensätze stoßen, die innerhalb einer Millisekunde generiert werden können.

Das Format solcher Bezeichner mag auf den ersten Blick seltsam erscheinen. Ein misstrauischer Leser könnte sich fragen, warum die Zeit Teil der Kennung ist. Der Grund dafür ist, dass Redis-Streams Bereichsabfragen nach ID unterstützen. Da die Kennung mit dem Erstellungszeitpunkt des Datensatzes verknüpft ist, ist die Abfrage von Zeitbereichen möglich. Wir werden uns ein konkretes Beispiel ansehen, wenn wir uns den Befehl ansehen XRANGE.

Wenn der Benutzer aus irgendeinem Grund seine eigene Kennung angeben muss, die beispielsweise mit einem externen System verknüpft ist, können wir diese an den Befehl übergeben XADD anstelle von * wie unten gezeigt:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Bitte beachten Sie, dass Sie in diesem Fall die ID-Erhöhung selbst überwachen müssen. In unserem Beispiel ist der Mindestbezeichner „0-1“, daher akzeptiert der Befehl keinen anderen Bezeichner, der gleich oder kleiner als „0-1“ ist.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Anzahl der Datensätze pro Stream

Es ist möglich, die Anzahl der Datensätze in einem Stream einfach mit dem Befehl zu ermitteln XLEN. In unserem Beispiel gibt dieser Befehl den folgenden Wert zurück:

> XLEN somestream
(integer) 2

Bereichsabfragen – XRANGE und XREVRANGE

Um Daten nach Bereich anzufordern, müssen wir zwei Kennungen angeben – den Anfang und das Ende des Bereichs. Der zurückgegebene Bereich umfasst alle Elemente, einschließlich der Grenzen. Es gibt auch zwei spezielle Bezeichner „-“ und „+“, die jeweils den kleinsten (erster Datensatz) und den größten (letzten Datensatz) Bezeichner im Stream bedeuten. Im folgenden Beispiel werden alle Stream-Einträge aufgelistet.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Jeder zurückgegebene Datensatz ist ein Array aus zwei Elementen: einem Bezeichner und einer Liste von Schlüssel-Wert-Paaren. Wir haben bereits gesagt, dass Datensatzkennungen mit der Zeit zusammenhängen. Daher können wir einen Bereich für einen bestimmten Zeitraum anfordern. Allerdings können wir in der Anfrage nicht die vollständige Kennung, sondern nur die Unix-Zeit angeben und den entsprechenden Teil weglassen Sequenznummer. Der ausgelassene Teil des Bezeichners wird am Anfang des Bereichs automatisch auf Null und am Ende des Bereichs auf den maximal möglichen Wert gesetzt. Nachfolgend finden Sie ein Beispiel dafür, wie Sie einen Bereich von zwei Millisekunden anfordern können.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Wir haben nur einen Eintrag in diesem Bereich, aber in echten Datensätzen kann das zurückgegebene Ergebnis riesig sein. Aus diesem Grund XRANGE unterstützt die COUNT-Option. Durch Angabe der Menge können wir einfach die ersten N Datensätze erhalten. Wenn wir die nächsten N Datensätze abrufen müssen (Paginierung), können wir die zuletzt empfangene ID verwenden und diese erhöhen Sequenznummer um eins und fragen Sie noch einmal. Schauen wir uns das im folgenden Beispiel an. Wir beginnen mit dem Hinzufügen von 10 Elementen XADD (vorausgesetzt, mystream war bereits mit 10 Elementen gefüllt). Um die Iteration zu starten und 2 Elemente pro Befehl zu erhalten, beginnen wir mit dem gesamten Bereich, aber mit COUNT gleich 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Um die Iteration mit den nächsten beiden Elementen fortzusetzen, müssen wir die zuletzt empfangene ID auswählen, d. h. 1519073279157-0, und 1 hinzufügen Sequenznummer.
Die resultierende ID, in diesem Fall 1519073279157-1, kann nun als neues Argument für den Bereichsanfang für den nächsten Aufruf verwendet werden XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Usw. Weil Komplexität XRANGE ist O(log(N)) zum Suchen und dann O(M) zum Zurückgeben von M Elementen, dann ist jeder Iterationsschritt schnell. Also verwenden XRANGE Streams können effizient iteriert werden.

Team XREVRANGE ist das Äquivalent XRANGE, gibt aber die Elemente in umgekehrter Reihenfolge zurück:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Bitte beachten Sie, dass der Befehl XREVRANGE übernimmt die Bereichsargumente start und stop in umgekehrter Reihenfolge.

Neue Einträge mit XREAD lesen

Oftmals stellt sich die Aufgabe, einen Stream zu abonnieren und nur neue Nachrichten zu erhalten. Dieses Konzept scheint Redis Pub/Sub oder der blockierenden Redis-Liste ähnlich zu sein, es gibt jedoch grundlegende Unterschiede in der Verwendung von Redis Stream:

  1. Jede neue Nachricht wird standardmäßig an jeden Abonnenten zugestellt. Dieses Verhalten unterscheidet sich von einer blockierenden Redis-Liste, bei der eine neue Nachricht nur von einem Abonnenten gelesen wird.
  2. Während in Redis Pub/Sub alle Nachrichten vergessen und nie gespeichert werden, werden in Stream alle Nachrichten auf unbestimmte Zeit aufbewahrt (es sei denn, der Client veranlasst explizit das Löschen).
  3. Mit Redis Stream können Sie den Zugriff auf Nachrichten innerhalb eines Streams differenzieren. Ein bestimmter Abonnent kann nur seinen persönlichen Nachrichtenverlauf sehen.

Mit dem Befehl können Sie einen Thread abonnieren und neue Nachrichten empfangen XREAD. Es ist etwas komplizierter als XRANGE, also beginnen wir zunächst mit den einfacheren Beispielen.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Das obige Beispiel zeigt ein nicht blockierendes Formular XREAD. Beachten Sie, dass die COUNT-Option optional ist. Tatsächlich ist die einzige erforderliche Befehlsoption die Option STREAMS, die eine Liste von Streams zusammen mit der entsprechenden maximalen Kennung angibt. Wir haben „STREAMS mystream 0“ geschrieben – wir möchten alle Datensätze des Mystream-Streams mit einer Kennung größer als „0-0“ erhalten. Wie Sie dem Beispiel entnehmen können, gibt der Befehl den Namen des Threads zurück, da wir mehrere Threads gleichzeitig abonnieren können. Wir könnten zum Beispiel schreiben: „STREAMS mystream otherstream 0 0“. Bitte beachten Sie, dass wir nach der Option STREAMS zunächst die Namen aller erforderlichen Streams und erst dann eine Liste der Kennungen angeben müssen.

In dieser einfachen Form führt der Befehl im Vergleich zu nichts Besonderes aus XRANGE. Das Interessante ist jedoch, dass wir uns leicht umdrehen können XREAD zu einem Blockierungsbefehl unter Angabe des BLOCK-Arguments:

> XREAD BLOCK 0 STREAMS mystream $

Im obigen Beispiel wird eine neue BLOCK-Option mit einem Timeout von 0 Millisekunden angegeben (das bedeutet, dass auf unbestimmte Zeit gewartet wird). Darüber hinaus wurde anstelle der üblichen Kennung für den Stream mystream eine spezielle Kennung $ übergeben. Diese spezielle Kennung bedeutet das XREAD Als Bezeichner muss der maximale Bezeichner in mystream verwendet werden. Daher erhalten wir neue Nachrichten erst ab dem Moment, in dem wir mit dem Abhören begonnen haben. In mancher Hinsicht ähnelt dies dem Unix-Befehl „tail -f“.

Beachten Sie, dass wir bei Verwendung der BLOCK-Option nicht unbedingt den speziellen Bezeichner $ verwenden müssen. Wir können jede im Stream vorhandene Kennung verwenden. Wenn das Team unsere Anfrage ohne Blockierung sofort bearbeiten kann, wird es dies tun, andernfalls wird es blockiert.

Blockierung XREAD Sie können auch mehrere Threads gleichzeitig abhören. Sie müssen lediglich deren Namen angeben. In diesem Fall gibt der Befehl einen Datensatz des ersten Streams zurück, der Daten empfangen hat. Der erste für einen bestimmten Thread blockierte Abonnent erhält zuerst Daten.

Verbrauchergruppen

Bei bestimmten Aufgaben möchten wir den Abonnentenzugriff auf Nachrichten innerhalb eines Threads beschränken. Ein Beispiel, bei dem dies nützlich sein könnte, ist eine Nachrichtenwarteschlange mit Workern, die verschiedene Nachrichten von einem Thread empfangen, wodurch die Nachrichtenverarbeitung skaliert werden kann.

Wenn wir uns vorstellen, dass wir drei Abonnenten C1, C2, C3 und einen Thread haben, der die Nachrichten 1, 2, 3, 4, 5, 6, 7 enthält, dann werden die Nachrichten wie im folgenden Diagramm zugestellt:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Um diesen Effekt zu erzielen, verwendet Redis Stream ein Konzept namens Consumer Group. Dieses Konzept ähnelt einem Pseudo-Abonnenten, der Daten aus einem Stream empfängt, tatsächlich jedoch von mehreren Abonnenten innerhalb einer Gruppe bedient wird, was bestimmte Garantien bietet:

  1. Jede Nachricht wird an einen anderen Abonnenten innerhalb der Gruppe übermittelt.
  2. Innerhalb einer Gruppe werden Abonnenten anhand ihres Namens identifiziert, bei dem es sich um eine Zeichenfolge handelt, bei der die Groß-/Kleinschreibung beachtet wird. Wenn ein Abonnent vorübergehend aus der Gruppe ausscheidet, kann er unter seinem eigenen, eindeutigen Namen wieder in die Gruppe aufgenommen werden.
  3. Jede Verbrauchergruppe folgt dem Konzept der „ersten ungelesenen Nachricht“. Wenn ein Abonnent neue Nachrichten anfordert, kann er nur Nachrichten empfangen, die noch nie zuvor an einen Abonnenten innerhalb der Gruppe zugestellt wurden.
  4. Es gibt einen Befehl, um explizit zu bestätigen, dass die Nachricht vom Abonnenten erfolgreich verarbeitet wurde. Bis zum Aufruf dieses Befehls verbleibt die angeforderte Nachricht im Status „pending“.
  5. Innerhalb der Consumer-Gruppe kann jeder Abonnent eine Historie der Nachrichten anfordern, die ihm zugestellt, aber noch nicht verarbeitet wurden (im Status „Ausstehend“).

In gewissem Sinne kann der Zustand der Gruppe wie folgt ausgedrückt werden:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Jetzt ist es an der Zeit, sich mit den wichtigsten Befehlen für die Verbrauchergruppe vertraut zu machen, nämlich:

  • XGROUP Wird zum Erstellen, Zerstören und Verwalten von Gruppen verwendet
  • XREADGROUP Wird verwendet, um den Stream durch die Gruppe zu lesen
  • XACK – Mit diesem Befehl kann der Abonnent die Nachricht als erfolgreich verarbeitet markieren

Gründung einer Verbrauchergruppe

Nehmen wir an, dass mystream bereits existiert. Dann sieht der Befehl zur Gruppenerstellung so aus:

> XGROUP CREATE mystream mygroup $
OK

Beim Erstellen einer Gruppe müssen wir eine Kennung übergeben, ab der die Gruppe Nachrichten empfängt. Wenn wir nur alle neuen Nachrichten empfangen möchten, können wir den speziellen Bezeichner $ verwenden (wie in unserem Beispiel oben). Wenn Sie 0 anstelle einer speziellen Kennung angeben, stehen alle Nachrichten im Thread der Gruppe zur Verfügung.

Nachdem die Gruppe nun erstellt ist, können wir mit dem Befehl sofort mit dem Lesen von Nachrichten beginnen XREADGROUP. Dieser Befehl ist sehr ähnlich XREAD und unterstützt die optionale BLOCK-Option. Es gibt jedoch eine erforderliche GROUP-Option, die immer mit zwei Argumenten angegeben werden muss: dem Gruppennamen und dem Abonnentennamen. Die COUNT-Option wird ebenfalls unterstützt.

Bevor wir den Thread lesen, wollen wir dort einige Nachrichten platzieren:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Versuchen wir nun, diesen Stream durch die Gruppe zu lesen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Der obige Befehl lautet wörtlich wie folgt:

„Ich, Abonnentin Alice, ein Mitglied der mygroup, möchte eine Nachricht von mystream lesen, die noch nie zuvor an jemanden zugestellt wurde.“

Jedes Mal, wenn ein Abonnent eine Operation an einer Gruppe ausführt, muss er seinen Namen angeben und sich innerhalb der Gruppe eindeutig identifizieren. Im obigen Befehl gibt es noch ein weiteres sehr wichtiges Detail – den speziellen Bezeichner „>“. Diese spezielle Kennung filtert Nachrichten und hinterlässt nur diejenigen, die noch nie zugestellt wurden.

In besonderen Fällen können Sie auch einen echten Bezeichner wie 0 oder einen anderen gültigen Bezeichner angeben. In diesem Fall der Befehl XREADGROUP gibt Ihnen einen Verlauf der Nachrichten mit dem Status „Ausstehend“ zurück, die an den angegebenen Abonnenten (Alice) zugestellt wurden, aber noch nicht mit dem Befehl bestätigt wurden XACK.

Wir können dieses Verhalten testen, indem wir sofort die ID 0 angeben, ohne die Option ANZAHL. Wir sehen lediglich eine einzelne ausstehende Nachricht, nämlich die Apple-Nachricht:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Wenn wir die Nachricht jedoch als erfolgreich verarbeitet bestätigen, wird sie nicht mehr angezeigt:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Jetzt ist Bob an der Reihe, etwas vorzulesen:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, ein Mitglied meiner Gruppe, bat um nicht mehr als zwei Nachrichten. Der Befehl meldet aufgrund der speziellen Kennung „>“ nur nicht zugestellte Nachrichten. Wie Sie sehen, wird die Nachricht „Apfel“ nicht angezeigt, da sie bereits an Alice übermittelt wurde, sodass Bob „Orange“ und „Erdbeere“ erhält.

Auf diese Weise können Alice, Bob und alle anderen Abonnenten der Gruppe verschiedene Nachrichten aus demselben Stream lesen. Sie können auch den Verlauf unverarbeiteter Nachrichten lesen oder Nachrichten als verarbeitet markieren.

Es gibt ein paar Dinge, die Sie beachten sollten:

  • Sobald der Abonnent die Nachricht als Befehl betrachtet XREADGROUP, geht diese Nachricht in den Status „ausstehend“ und wird diesem bestimmten Abonnenten zugewiesen. Andere Gruppenabonnenten können diese Nachricht nicht lesen.
  • Abonnenten werden automatisch bei der ersten Erwähnung erstellt, eine explizite Erstellung ist nicht erforderlich.
  • Mit XREADGROUP Sie können Nachrichten von mehreren verschiedenen Threads gleichzeitig lesen. Damit dies funktioniert, müssen Sie jedoch zunächst Gruppen mit demselben Namen für jeden Thread erstellen, den Sie verwenden XGROUP

Wiederherstellung nach einem Ausfall

Der Abonnent kann den Fehler beheben und seine Liste der Nachrichten mit dem Status „Ausstehend“ erneut lesen. In der realen Welt können Abonnenten jedoch letztendlich scheitern. Was passiert mit den blockierten Nachrichten eines Abonnenten, wenn der Abonnent nach einem Fehler nicht wiederhergestellt werden kann?
Consumer Group bietet eine Funktion, die genau für solche Fälle verwendet wird – wenn Sie den Eigentümer von Nachrichten ändern müssen.

Als erstes müssen Sie den Befehl aufrufen XPENDING, das alle Nachrichten der Gruppe mit dem Status „ausstehend“ anzeigt. In seiner einfachsten Form wird der Befehl mit nur zwei Argumenten aufgerufen: dem Thread-Namen und dem Gruppennamen:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Das Team zeigte die Anzahl der unverarbeiteten Nachrichten für die gesamte Gruppe und für jeden Abonnenten an. Wir haben nur Bob mit zwei ausstehenden Nachrichten, da die einzige von Alice angeforderte Nachricht bestätigt wurde XACK.

Wir können weitere Informationen mit mehr Argumenten anfordern:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} – Bereich von Kennungen (Sie können „-“ und „+“ verwenden)
{count} – Anzahl der Zustellversuche
{consumer-name} – Gruppenname

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Jetzt haben wir Details zu jeder Nachricht: ID, Abonnentenname, Leerlaufzeit in Millisekunden und schließlich die Anzahl der Zustellversuche. Wir haben zwei Nachrichten von Bob erhalten und sie waren 74170458 Millisekunden lang inaktiv, also etwa 20 Stunden.

Bitte beachten Sie, dass uns niemand daran hindert, den Inhalt der Nachricht allein durch die Verwendung zu überprüfen XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Wir müssen denselben Bezeichner nur zweimal in den Argumenten wiederholen. Nachdem wir nun eine gewisse Vorstellung davon haben, könnte Alice entscheiden, dass sich Bob nach 20 Stunden Ausfallzeit wahrscheinlich nicht erholen wird, und dass es an der Zeit ist, diese Nachrichten abzufragen und die Verarbeitung für Bob fortzusetzen. Hierzu verwenden wir den Befehl XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Mit diesem Befehl können wir eine „fremde“ Nachricht empfangen, die noch nicht verarbeitet wurde, indem wir den Besitzer in {consumer} ändern. Wir können jedoch auch eine Mindestleerlaufzeit {min-idle-time} angeben. Dies hilft, eine Situation zu vermeiden, in der zwei Clients gleichzeitig versuchen, den Besitzer derselben Nachrichten zu ändern:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Der erste Kunde setzt die Ausfallzeit zurück und erhöht den Lieferzähler. Der zweite Kunde kann es also nicht anfordern.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Die Nachricht wurde erfolgreich von Alice beansprucht, die sie nun verarbeiten und bestätigen kann.

Aus dem obigen Beispiel können Sie ersehen, dass eine erfolgreiche Anfrage den Inhalt der Nachricht selbst zurückgibt. Dies ist jedoch nicht notwendig. Die Option JUSTID kann nur zum Zurückgeben von Nachrichten-IDs verwendet werden. Dies ist nützlich, wenn Sie nicht an den Details der Nachricht interessiert sind und die Systemleistung steigern möchten.

Lieferschalter

Der Zähler, den Sie in der Ausgabe sehen XPENDING ist die Anzahl der Zustellungen jeder Nachricht. Ein solcher Zähler wird auf zwei Arten erhöht: wenn eine Nachricht erfolgreich über angefordert wurde XCLAIM oder wenn ein Anruf verwendet wird XREADGROUP.

Es ist normal, dass einige Nachrichten mehrmals zugestellt werden. Die Hauptsache ist, dass alle Nachrichten irgendwann verarbeitet werden. Manchmal treten bei der Verarbeitung einer Nachricht Probleme auf, weil die Nachricht selbst beschädigt ist oder die Nachrichtenverarbeitung einen Fehler im Handlercode verursacht. In diesem Fall kann es sein, dass niemand diese Nachricht verarbeiten kann. Da wir über einen Zustellversuchszähler verfügen, können wir mit diesem Zähler solche Situationen erkennen. Sobald die Anzahl der Zustellungen die von Ihnen angegebene hohe Zahl erreicht, wäre es daher wahrscheinlich klüger, eine solche Nachricht in einen anderen Thread zu stellen und eine Benachrichtigung an den Systemadministrator zu senden.

Thread-Status

Team XINFO Wird verwendet, um verschiedene Informationen über einen Thread und seine Gruppen anzufordern. Ein einfacher Befehl sieht beispielsweise so aus:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Der obige Befehl zeigt allgemeine Informationen zum angegebenen Stream an. Nun ein etwas komplexeres Beispiel:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Der obige Befehl zeigt allgemeine Informationen für alle Gruppen des angegebenen Threads an

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Der obige Befehl zeigt Informationen für alle Abonnenten des angegebenen Streams und der angegebenen Gruppe an.
Wenn Sie die Befehlssyntax vergessen haben, fragen Sie einfach den Befehl selbst um Hilfe:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stream-Größenbeschränkung

Viele Anwendungen möchten Daten nicht für immer in einem Stream sammeln. Oft ist es sinnvoll, eine maximale Anzahl an Nachrichten pro Thread festzulegen. In anderen Fällen ist es sinnvoll, alle Nachrichten von einem Thread in einen anderen persistenten Speicher zu verschieben, wenn die angegebene Thread-Größe erreicht ist. Sie können die Größe eines Streams mithilfe des Parameters MAXLEN im Befehl begrenzen XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Bei Verwendung von MAXLEN werden alte Datensätze automatisch gelöscht, wenn sie eine bestimmte Länge erreichen, sodass der Stream eine konstante Größe hat. Allerdings erfolgt das Bereinigen in diesem Fall nicht auf die effizienteste Art und Weise im Redis-Speicher. Sie können die Situation wie folgt verbessern:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Das ~-Argument im obigen Beispiel bedeutet, dass wir die Stream-Länge nicht unbedingt auf einen bestimmten Wert begrenzen müssen. In unserem Beispiel könnte dies eine beliebige Zahl sein, die größer oder gleich 1000 ist (z. B. 1000, 1010 oder 1030). Wir haben gerade explizit angegeben, dass unser Stream mindestens 1000 Datensätze speichern soll. Dadurch wird die Speicherverwaltung in Redis wesentlich effizienter.

Es gibt auch ein separates Team XTRIM, was das Gleiche bewirkt:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Persistente Speicherung und Replikation

Redis Stream wird asynchron auf Slave-Knoten repliziert und in Dateien wie AOF (Snapshot aller Daten) und RDB (Protokoll aller Schreibvorgänge) gespeichert. Die Replikation des Verbrauchergruppenstatus wird ebenfalls unterstützt. Wenn sich also eine Nachricht auf dem Master-Knoten im Status „ausstehend“ befindet, hat diese Nachricht auf den Slave-Knoten den gleichen Status.

Einzelne Elemente aus einem Stream entfernen

Es gibt einen speziellen Befehl zum Löschen von Nachrichten XDEL. Der Befehl ruft den Namen des Threads ab, gefolgt von den zu löschenden Nachrichten-IDs:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Bei der Verwendung dieses Befehls müssen Sie berücksichtigen, dass der tatsächliche Speicher nicht sofort freigegeben wird.

Streams mit der Länge Null

Der Unterschied zwischen Streams und anderen Redis-Datenstrukturen besteht darin, dass als Nebeneffekt die Datenstruktur selbst aus dem Speicher entfernt wird, wenn andere Datenstrukturen keine Elemente mehr enthalten. So wird beispielsweise die sortierte Menge vollständig entfernt, wenn der ZREM-Aufruf das letzte Element entfernt. Stattdessen dürfen Threads auch dann im Speicher verbleiben, wenn sich darin keine Elemente befinden.

Abschluss

Redis Stream eignet sich ideal zum Erstellen von Nachrichtenbrokern, Nachrichtenwarteschlangen, einheitlicher Protokollierung und Chat-Systemen zur Protokollierung des Verlaufs.

Wie ich einmal sagte Niklaus Wirth, Programme sind Algorithmen plus Datenstrukturen, und Redis bietet Ihnen bereits beides.

Source: habr.com

Kommentar hinzufügen