Redis Stream - fiabilité et évolutivité de vos systèmes de messagerie

Redis Stream - fiabilité et évolutivité de vos systèmes de messagerie

Redis Stream est un nouveau type de données abstrait introduit dans Redis avec la version 5.0
Conceptuellement, Redis Stream est une liste à laquelle vous pouvez ajouter des entrées. Chaque entrée possède un identifiant unique. Par défaut, l'ID est généré automatiquement et inclut un horodatage. Par conséquent, vous pouvez interroger des plages d'enregistrements au fil du temps ou recevoir de nouvelles données dès qu'elles arrivent dans le flux, un peu comme la commande Unix "tail -f" lit un fichier journal et se fige en attendant de nouvelles données. Notez que plusieurs clients peuvent écouter un thread en même temps, tout comme de nombreux processus « tail -f » peuvent lire un fichier simultanément sans entrer en conflit les uns avec les autres.

Pour comprendre tous les avantages du nouveau type de données, examinons rapidement les structures Redis existantes de longue date qui reproduisent partiellement les fonctionnalités de Redis Stream.

Redis PUB/SUB

Redis Pub/Sub est un système de messagerie simple déjà intégré à votre magasin clé-valeur. Cependant, la simplicité a un prix :

  • Si l'éditeur échoue pour une raison quelconque, il perd tous ses abonnés
  • L'éditeur a besoin de connaître l'adresse exacte de tous ses abonnés
  • Un éditeur peut surcharger ses abonnés de travail si les données sont publiées plus rapidement qu'elles ne sont traitées.
  • Le message est supprimé du tampon de l'éditeur immédiatement après sa publication, quel que soit le nombre d'abonnés auxquels il a été envoyé et la rapidité avec laquelle ils ont pu traiter ce message.
  • Tous les abonnés recevront le message en même temps. Les abonnés eux-mêmes doivent d'une manière ou d'une autre se mettre d'accord sur l'ordre de traitement du même message.
  • Il n'existe aucun mécanisme intégré pour confirmer qu'un abonné a traité avec succès un message. Si un abonné reçoit un message et plante pendant le traitement, l'éditeur n'en sera pas informé.

Liste Redis

Redis List est une structure de données qui prend en charge le blocage des commandes de lecture. Vous pouvez ajouter et lire des messages depuis le début ou la fin de la liste. Sur la base de cette structure, vous pouvez créer une bonne pile ou file d'attente pour votre système distribué, et dans la plupart des cas, cela suffira. Principales différences par rapport à Redis Pub/Sub :

  • Le message est remis à un client. Le premier client bloqué en lecture recevra les données en premier.
  • Clint doit lancer lui-même l'opération de lecture pour chaque message. List ne sait rien des clients.
  • Les messages sont stockés jusqu'à ce que quelqu'un les lise ou les supprime explicitement. Si vous configurez le serveur Redis pour vider les données sur le disque, la fiabilité du système augmente considérablement.

Introduction au flux

Ajouter une entrée à un flux

Équipe XAJOUTER ajoute une nouvelle entrée au flux. Un enregistrement n’est pas simplement une chaîne, il se compose d’une ou plusieurs paires clé-valeur. Ainsi, chaque entrée est déjà structurée et ressemble à la structure d'un fichier CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Dans l'exemple ci-dessus, nous ajoutons deux champs au flux avec le nom (clé) « mystream » : « sensor-id » et « température » avec les valeurs « 1234 » et « 19.8 », respectivement. Comme deuxième argument, la commande prend un identifiant qui sera attribué à l'entrée - cet identifiant identifie de manière unique chaque entrée du flux. Cependant, dans ce cas, nous avons transmis * car nous voulons que Redis génère un nouvel identifiant pour nous. Chaque nouvel identifiant augmentera. Ainsi, chaque nouvelle entrée aura un identifiant plus élevé par rapport aux entrées précédentes.

Format de l'identifiant

L'ID d'entrée renvoyé par la commande XAJOUTER, se compose de deux parties :

{millisecondsTime}-{sequenceNumber}

millisecondesTime — Heure Unix en millisecondes (heure du serveur Redis). Cependant, si l'heure actuelle est identique ou inférieure à l'heure de l'enregistrement précédent, l'horodatage de l'enregistrement précédent est utilisé. Par conséquent, si l'heure du serveur remonte dans le temps, le nouvel identifiant conservera toujours la propriété d'incrémentation.

numéro de séquence utilisé pour les enregistrements créés dans la même milliseconde. numéro de séquence sera augmenté de 1 par rapport à l’entrée précédente. Parce que le numéro de séquence a une taille de 64 bits, alors en pratique, vous ne devriez pas rencontrer de limite sur le nombre d'enregistrements pouvant être générés en une milliseconde.

Le format de ces identifiants peut paraître étrange à première vue. Un lecteur méfiant pourrait se demander pourquoi le temps fait partie de l’identifiant. La raison en est que les flux Redis prennent en charge les requêtes de plage par ID. L’identifiant étant associé à l’heure de création de l’enregistrement, cela permet d’interroger des plages horaires. Nous examinerons un exemple spécifique lorsque nous examinerons la commande XRANGE.

Si, pour une raison quelconque, l'utilisateur doit spécifier son propre identifiant, qui, par exemple, est associé à un système externe, nous pouvons alors le transmettre à la commande XAJOUTER au lieu de * comme indiqué ci-dessous :

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Veuillez noter que dans ce cas, vous devez surveiller vous-même l'incrément d'identification. Dans notre exemple, l'identifiant minimum est "0-1", donc la commande n'acceptera pas un autre identifiant égal ou inférieur à "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Nombre d'enregistrements par flux

Il est possible d'obtenir le nombre d'enregistrements dans un flux simplement en utilisant la commande XLEN. Pour notre exemple, cette commande renverra la valeur suivante :

> XLEN somestream
(integer) 2

Requêtes de plage - XRANGE et XREVRANGE

Pour demander des données par plage, nous devons spécifier deux identifiants : le début et la fin de la plage. La plage renvoyée inclura tous les éléments, y compris les limites. Il existe également deux identifiants spéciaux « - » et « + », signifiant respectivement le plus petit (premier enregistrement) et le plus grand (dernier enregistrement) du flux. L'exemple ci-dessous répertorie toutes les entrées de flux.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Chaque enregistrement renvoyé est un tableau de deux éléments : un identifiant et une liste de paires clé-valeur. Nous avons déjà dit que les identifiants d'enregistrement sont liés au temps. Nous pouvons donc demander une plage d’une période de temps spécifique. Cependant, nous pouvons spécifier dans la requête non pas l'identifiant complet, mais seulement l'heure Unix, en omettant la partie liée à numéro de séquence. La partie omise de l'identifiant sera automatiquement mise à zéro en début de plage et à la valeur maximale possible en fin de plage. Vous trouverez ci-dessous un exemple de la manière dont vous pouvez demander une plage de deux millisecondes.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Nous n'avons qu'une seule entrée dans cette plage, mais dans des ensembles de données réels, le résultat renvoyé peut être énorme. Pour cette raison XRANGE prend en charge l'option COUNT. En précisant la quantité, on peut simplement obtenir les N premiers enregistrements. Si nous avons besoin d'obtenir les N enregistrements suivants (pagination), nous pouvons utiliser le dernier identifiant reçu, l'augmenter numéro de séquence par un et demandez à nouveau. Regardons cela dans l'exemple suivant. Nous commençons à ajouter 10 éléments avec XAJOUTER (en supposant que mystream était déjà rempli de 10 éléments). Pour démarrer l'itération en obtenant 2 éléments par commande, nous commençons par la plage complète mais avec COUNT égal à 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Pour continuer à itérer avec les deux éléments suivants, nous devons sélectionner le dernier identifiant reçu, c'est-à-dire 1519073279157-0, et ajouter 1 à numéro de séquence.
L'ID résultant, dans ce cas 1519073279157-1, peut désormais être utilisé comme nouvel argument de début de plage pour le prochain appel. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Et ainsi de suite. Parce que la complexité XRANGE est O(log(N)) pour rechercher puis O(M) pour renvoyer M éléments, alors chaque étape d'itération est rapide. Ainsi, en utilisant XRANGE les flux peuvent être itérés efficacement.

Équipe XREVRANGE est l'équivalent XRANGE, mais renvoie les éléments dans l'ordre inverse :

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Veuillez noter que la commande XREVRANGE prend les arguments de plage de début et de fin dans l'ordre inverse.

Lire de nouvelles entrées à l'aide de XREAD

Souvent, la tâche consiste à s'abonner à un flux et à recevoir uniquement de nouveaux messages. Ce concept peut sembler similaire à Redis Pub/Sub ou au blocage de Redis List, mais il existe des différences fondamentales dans la façon d'utiliser Redis Stream :

  1. Chaque nouveau message est envoyé par défaut à chaque abonné. Ce comportement est différent d'une liste Redis bloquante, où un nouveau message ne sera lu que par un seul abonné.
  2. Alors que dans Redis Pub/Sub, tous les messages sont oubliés et ne sont jamais conservés, dans Stream, tous les messages sont conservés indéfiniment (sauf si le client provoque explicitement la suppression).
  3. Redis Stream vous permet de différencier l'accès aux messages au sein d'un même flux. Un abonné spécifique ne peut voir que son historique de messages personnel.

Vous pouvez vous abonner à un fil de discussion et recevoir de nouveaux messages en utilisant la commande LECTURE X. C'est un peu plus compliqué que XRANGE, nous allons donc commencer par les exemples les plus simples.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

L'exemple ci-dessus montre un formulaire non bloquant LECTURE X. Notez que l'option COUNT est facultative. En fait, la seule option de commande requise est l'option STREAMS, qui spécifie une liste de flux ainsi que l'identifiant maximum correspondant. Nous avons écrit « STREAMS mystream 0 » - nous voulons recevoir tous les enregistrements du flux mystream avec un identifiant supérieur à « 0-0 ». Comme vous pouvez le voir dans l'exemple, la commande renvoie le nom du thread car nous pouvons nous abonner à plusieurs threads en même temps. On pourrait écrire par exemple « STREAMS mystream otherstream 0 0 ». Veuillez noter qu'après l'option STREAMS, nous devons d'abord fournir les noms de tous les flux requis et ensuite seulement une liste d'identifiants.

Sous cette forme simple, la commande ne fait rien de spécial par rapport à XRANGE. Cependant, ce qui est intéressant, c'est que l'on peut facilement tourner LECTURE X à une commande de blocage, en spécifiant l'argument BLOCK :

> XREAD BLOCK 0 STREAMS mystream $

Dans l'exemple ci-dessus, une nouvelle option BLOCK est spécifiée avec un délai d'attente de 0 milliseconde (cela signifie attendre indéfiniment). De plus, au lieu de transmettre l'identifiant habituel du flux mystream, un identifiant spécial $ a été transmis. Cet identifiant spécial signifie que LECTURE X doit utiliser l'identifiant maximum dans mystream comme identifiant. Nous ne recevrons donc de nouveaux messages qu’à partir du moment où nous commencerons à écouter. D'une certaine manière, cela ressemble à la commande Unix "tail -f".

Notez que lors de l'utilisation de l'option BLOCK, nous n'avons pas nécessairement besoin d'utiliser l'identifiant spécial $. Nous pouvons utiliser n'importe quel identifiant existant dans le flux. Si l'équipe peut répondre immédiatement à notre demande sans bloquer, elle le fera, sinon elle bloquera.

Blocage LECTURE X peut également écouter plusieurs fils de discussion à la fois, il vous suffit de préciser leurs noms. Dans ce cas, la commande renverra un enregistrement du premier flux ayant reçu des données. Le premier abonné bloqué pour un fil de discussion donné recevra les données en premier.

Groupes de consommateurs

Dans certaines tâches, nous souhaitons limiter l'accès des abonnés aux messages d'un même fil de discussion. Un exemple où cela pourrait être utile est une file d'attente de messages avec des travailleurs qui recevront différents messages d'un fil, permettant ainsi d'évoluer le traitement des messages.

Si nous imaginons que nous avons trois abonnés C1, C2, C3 et un fil qui contient les messages 1, 2, 3, 4, 5, 6, 7, alors les messages seront servis comme dans le schéma ci-dessous :

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Pour obtenir cet effet, Redis Stream utilise un concept appelé Consumer Group. Ce concept s'apparente à un pseudo-abonné, qui reçoit les données d'un flux, mais est en réalité servi par plusieurs abonnés au sein d'un groupe, offrant certaines garanties :

  1. Chaque message est transmis à un abonné différent au sein du groupe.
  2. Au sein d'un groupe, les abonnés sont identifiés par leur nom, qui est une chaîne sensible à la casse. Si un abonné quitte temporairement le groupe, il peut être réintégré au groupe en utilisant son propre nom unique.
  3. Chaque groupe de consommateurs suit le concept du « premier message non lu ». Lorsqu'un abonné demande de nouveaux messages, il ne peut recevoir que des messages qui n'ont jamais été remis auparavant à un abonné du groupe.
  4. Il existe une commande pour confirmer explicitement que le message a été traité avec succès par l'abonné. Jusqu'à ce que cette commande soit appelée, le message demandé restera dans l'état "en attente".
  5. Au sein du Groupe Consommateurs, chaque abonné peut demander un historique des messages qui lui ont été livrés, mais qui n'ont pas encore été traités (en statut « en attente »)

En un sens, l’état du groupe peut s’exprimer comme suit :

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Il est maintenant temps de se familiariser avec les principales commandes du Groupe Consommateur, à savoir :

  • GROUPE X utilisé pour créer, détruire et gérer des groupes
  • GROUPEXREAD utilisé pour lire le flux à travers le groupe
  • XACK - cette commande permet à l'abonné de marquer le message comme traité avec succès

Création d'un groupe de consommateurs

Supposons que mystream existe déjà. Ensuite, la commande de création de groupe ressemblera à :

> XGROUP CREATE mystream mygroup $
OK

Lors de la création d'un groupe, il faut transmettre un identifiant, à partir duquel le groupe recevra des messages. Si nous voulons simplement recevoir tous les nouveaux messages, alors nous pouvons utiliser l'identifiant spécial $ (comme dans notre exemple ci-dessus). Si vous spécifiez 0 au lieu d'un identifiant spécial, alors tous les messages du fil de discussion seront disponibles pour le groupe.

Maintenant que le groupe est créé, nous pouvons immédiatement commencer à lire les messages à l'aide de la commande GROUPEXREAD. Cette commande est très similaire à LECTURE X et prend en charge l'option optionnelle BLOCK. Cependant, il existe une option GROUP obligatoire qui doit toujours être spécifiée avec deux arguments : le nom du groupe et le nom de l'abonné. L'option COUNT est également prise en charge.

Avant de lire le fil de discussion, mettons-y quelques messages :

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Essayons maintenant de lire ce flux à travers le groupe :

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

La commande ci-dessus se lit textuellement comme suit :

"Moi, abonnée Alice, membre de mon groupe, je souhaite lire un message de mon flux qui n'a jamais été envoyé à personne auparavant."

Chaque fois qu'un abonné effectue une opération sur un groupe, il doit fournir son nom, s'identifiant de manière unique au sein du groupe. Il y a un autre détail très important dans la commande ci-dessus : l'identifiant spécial ">". Cet identifiant spécial filtre les messages, ne laissant que ceux qui n'ont jamais été livrés auparavant.

Aussi, dans des cas particuliers, vous pouvez spécifier un identifiant réel tel que 0 ou tout autre identifiant valide. Dans ce cas la commande GROUPEXREAD vous renverra un historique des messages avec un statut « en attente » qui ont été remis à l'abonné spécifié (Alice) mais qui n'ont pas encore été accusés de réception à l'aide de la commande XACK.

Nous pouvons tester ce comportement en spécifiant immédiatement l'ID 0, sans l'option COUNT. Nous verrons simplement un seul message en attente, c'est-à-dire le message Apple :

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Cependant, si nous confirmons que le message a été traité avec succès, il ne sera plus affiché :

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

C'est maintenant au tour de Bob de lire quelque chose :

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, membre de mon groupe, n'a demandé que deux messages. La commande signale uniquement les messages non remis en raison de l'identifiant spécial ">". Comme vous pouvez le constater, le message « pomme » ne s'affichera pas puisqu'il a déjà été remis à Alice, donc Bob reçoit « orange » et « fraise ».

De cette façon, Alice, Bob et tout autre abonné du groupe peuvent lire différents messages du même flux. Ils peuvent également lire leur historique de messages non traités ou marquer les messages comme traités.

Il y a quelques points à garder à l'esprit :

  • Dès que l'abonné considère le message comme une commande GROUPEXREAD, ce message passe à l'état « en attente » et est attribué à cet abonné spécifique. Les autres abonnés du groupe ne pourront pas lire ce message.
  • Les abonnés sont automatiquement créés dès la première mention, il n'est pas nécessaire de les créer explicitement.
  • Avec GROUPEXREAD vous pouvez lire les messages de plusieurs fils de discussion différents en même temps, mais pour que cela fonctionne, vous devez d'abord créer des groupes portant le même nom pour chaque fil de discussion en utilisant GROUPE X

Récupération après un échec

L'abonné peut se remettre de la panne et relire sa liste de messages avec le statut « en attente ». Cependant, dans le monde réel, les abonnés peuvent finir par échouer. Qu'arrive-t-il aux messages bloqués d'un abonné si celui-ci ne parvient pas à se remettre d'un échec ?
Consumer Group propose une fonctionnalité qui est utilisée uniquement dans de tels cas : lorsque vous devez changer le propriétaire des messages.

La première chose à faire est d'appeler la commande EN DÉPENDANT, qui affiche tous les messages du groupe avec le statut « en attente ». Dans sa forme la plus simple, la commande est appelée avec seulement deux arguments : le nom du thread et le nom du groupe :

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

L'équipe a affiché le nombre de messages non traités pour l'ensemble du groupe et pour chaque abonné. Nous n'avons que Bob avec deux messages en attente car le seul message demandé par Alice a été confirmé par XACK.

Nous pouvons demander plus d’informations en utilisant plus d’arguments :

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - plage d'identifiants (vous pouvez utiliser "-" et "+")
{count} – nombre de tentatives de livraison
{nom-consommateur} : nom du groupe

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nous avons désormais des détails pour chaque message : identifiant, nom de l'abonné, temps d'inactivité en millisecondes et enfin le nombre de tentatives de livraison. Nous avons deux messages de Bob et ils sont restés inactifs pendant 74170458 millisecondes, soit environ 20 heures.

Veuillez noter que personne ne nous empêche de vérifier le contenu du message simplement en utilisant XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Il suffit de répéter deux fois le même identifiant dans les arguments. Maintenant que nous avons une idée, Alice pourrait décider qu'après 20 heures d'inactivité, Bob ne récupérera probablement pas, et il est temps d'interroger ces messages et de reprendre leur traitement pour Bob. Pour cela nous utilisons la commande XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

En utilisant cette commande, nous pouvons recevoir un message « étranger » qui n'a pas encore été traité en changeant le propriétaire en {consumer}. Cependant, nous pouvons également fournir un temps d'inactivité minimum {min-idle-time}. Cela permet d'éviter une situation dans laquelle deux clients tentent simultanément de changer le propriétaire des mêmes messages :

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Le premier client réinitialisera le temps d'arrêt et augmentera le compteur de livraison. Le deuxième client ne pourra donc pas le demander.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Le message a été réclamé avec succès par Alice, qui peut désormais traiter le message et en accuser réception.

À partir de l'exemple ci-dessus, vous pouvez voir qu'une requête réussie renvoie le contenu du message lui-même. Toutefois, ce n'est pas nécessaire. L'option JUSTID peut être utilisée pour renvoyer uniquement les ID de message. Ceci est utile si vous n'êtes pas intéressé par les détails du message et souhaitez augmenter les performances du système.

Comptoir de livraison

Le compteur que vous voyez dans la sortie EN DÉPENDANT est le nombre de livraisons de chaque message. Un tel compteur est incrémenté de deux manières : lorsqu'un message est demandé avec succès via XCLAIM ou lorsqu'un appel est utilisé GROUPEXREAD.

Il est normal que certains messages soient transmis plusieurs fois. L'essentiel est que tous les messages soient finalement traités. Parfois, des problèmes surviennent lors du traitement d'un message, car le message lui-même est corrompu ou le traitement du message provoque une erreur dans le code du gestionnaire. Dans ce cas, il se peut que personne ne puisse traiter ce message. Puisque nous disposons d’un compteur de tentatives de livraison, nous pouvons utiliser ce compteur pour détecter de telles situations. Par conséquent, une fois que le nombre de livraisons atteint le nombre élevé que vous avez spécifié, il serait probablement plus sage de placer un tel message sur un autre fil de discussion et d'envoyer une notification à l'administrateur système.

État du fil

Équipe XINFO utilisé pour demander diverses informations sur un fil de discussion et ses groupes. Par exemple, une commande de base ressemble à ceci :

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

La commande ci-dessus affiche des informations générales sur le flux spécifié. Maintenant un exemple un peu plus complexe :

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

La commande ci-dessus affiche des informations générales pour tous les groupes du thread spécifié

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

La commande ci-dessus affiche des informations sur tous les abonnés du flux et du groupe spécifiés.
Si vous oubliez la syntaxe de la commande, demandez simplement de l'aide à la commande elle-même :

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limite de taille du flux

De nombreuses applications ne souhaitent pas collecter indéfiniment des données dans un flux. Il est souvent utile d'avoir un nombre maximum de messages autorisés par fil de discussion. Dans d'autres cas, il est utile de déplacer tous les messages d'un thread vers un autre magasin persistant lorsque la taille du thread spécifiée est atteinte. Vous pouvez limiter la taille d'un flux à l'aide du paramètre MAXLEN dans la commande XAJOUTER:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Lors de l'utilisation de MAXLEN, les anciens enregistrements sont automatiquement supprimés lorsqu'ils atteignent une longueur spécifiée, le flux a donc une taille constante. Cependant, dans ce cas, l'élagage ne s'effectue pas de la manière la plus efficace dans la mémoire Redis. Vous pouvez améliorer la situation comme suit :

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

L'argument ~ dans l'exemple ci-dessus signifie que nous n'avons pas nécessairement besoin de limiter la longueur du flux à une valeur spécifique. Dans notre exemple, il peut s'agir de n'importe quel nombre supérieur ou égal à 1000 1000 (par exemple, 1010 1030, 1000 XNUMX ou XNUMX XNUMX). Nous venons de préciser explicitement que nous souhaitons que notre flux stocke au moins XNUMX XNUMX enregistrements. Cela rend la gestion de la mémoire beaucoup plus efficace dans Redis.

Il y a aussi une équipe distincte XTRIM, ce qui fait la même chose :

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Stockage persistant et réplication

Redis Stream est répliqué de manière asynchrone sur les nœuds esclaves et enregistré dans des fichiers tels que AOF (instantané de toutes les données) et RDB (journal de toutes les opérations d'écriture). La réplication de l’état des groupes de consommateurs est également prise en charge. Par conséquent, si un message est dans l’état « en attente » sur le nœud maître, alors sur les nœuds esclaves, ce message aura le même statut.

Suppression d'éléments individuels d'un flux

Il existe une commande spéciale pour supprimer les messages XDEL. La commande récupère le nom du fil de discussion suivi des ID de message à supprimer :

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Lorsque vous utilisez cette commande, vous devez tenir compte du fait que la mémoire réelle ne sera pas libérée immédiatement.

Flux de longueur nulle

La différence entre les flux et les autres structures de données Redis est que lorsque d'autres structures de données ne contiennent plus d'éléments, la structure de données elle-même sera supprimée de la mémoire. Ainsi, par exemple, l'ensemble trié sera complètement supprimé lorsque l'appel ZREM supprimera le dernier élément. Au lieu de cela, les threads sont autorisés à rester en mémoire même sans aucun élément à l’intérieur.

Conclusion

Redis Stream est idéal pour créer des courtiers de messages, des files d'attente de messages, une journalisation unifiée et des systèmes de discussion de conservation de l'historique.

Comme je l'ai dit une fois Nicolas Wirth, les programmes sont des algorithmes plus des structures de données, et Redis vous offre déjà les deux.

Source: habr.com

Ajouter un commentaire