Redis Stream - betrouwbaarheid en schaalbaarheid van uw berichtensystemen

Redis Stream - betrouwbaarheid en schaalbaarheid van uw berichtensystemen

Redis Stream is een nieuw abstract gegevenstype dat in Redis is geïntroduceerd met versie 5.0
Conceptueel gezien is Redis Stream een ​​lijst waaraan u items kunt toevoegen. Elke inzending heeft een unieke identificatie. De ID wordt standaard automatisch gegenereerd en bevat een tijdstempel. Daarom kunt u in de loop van de tijd reeksen records opvragen, of nieuwe gegevens ontvangen zodra deze in de stroom binnenkomen, net zoals het Unix "tail -f" commando een logbestand leest en vastloopt tijdens het wachten op nieuwe gegevens. Houd er rekening mee dat meerdere clients tegelijkertijd naar een thread kunnen luisteren, net zoals veel "tail -f"-processen tegelijkertijd een bestand kunnen lezen zonder met elkaar in conflict te komen.

Laten we, om alle voordelen van het nieuwe gegevenstype te begrijpen, eens snel kijken naar de al lang bestaande Redis-structuren die de functionaliteit van Redis Stream gedeeltelijk repliceren.

Opnieuw PUB/SUB

Redis Pub/Sub is een eenvoudig berichtensysteem dat al in uw sleutelwaardewinkel is ingebouwd. Eenvoud heeft echter een prijs:

  • Als de uitgever om wat voor reden dan ook faalt, verliest hij al zijn abonnees
  • De uitgever moet het exacte adres van al zijn abonnees weten
  • Een uitgever kan zijn abonnees overbelasten met werk als de gegevens sneller worden gepubliceerd dan verwerkt
  • Het bericht wordt direct na publicatie uit de buffer van de uitgever verwijderd, ongeacht bij hoeveel abonnees het is afgeleverd en hoe snel zij dit bericht hebben kunnen verwerken.
  • Alle abonnees ontvangen het bericht tegelijkertijd. Abonnees moeten zelf op de een of andere manier overeenstemming bereiken over de volgorde waarin hetzelfde bericht wordt verwerkt.
  • Er is geen ingebouwd mechanisme om te bevestigen dat een abonnee een bericht met succes heeft verwerkt. Als een abonnee een bericht ontvangt en tijdens de verwerking crasht, weet de uitgever daar niets van.

Redis-lijst

Redis List is een datastructuur die het blokkeren van leesopdrachten ondersteunt. U kunt berichten aan het begin of einde van de lijst toevoegen en lezen. Op basis van deze structuur kunt u een goede stapel of wachtrij voor uw gedistribueerde systeem maken, en in de meeste gevallen zal dit voldoende zijn. Belangrijkste verschillen met Redis Pub/Sub:

  • Het bericht wordt afgeleverd bij één klant. De eerste tegen lezen geblokkeerde client ontvangt de gegevens als eerste.
  • Clint moet voor elk bericht zelf de leesoperatie starten. List weet niets over klanten.
  • Berichten worden bewaard totdat iemand ze leest of expliciet verwijdert. Als u de Redis-server configureert om gegevens naar schijf te spoelen, neemt de betrouwbaarheid van het systeem dramatisch toe.

Inleiding tot streamen

Een item aan een stream toevoegen

Team XADD voegt een nieuw item aan de stream toe. Een record is niet zomaar een string, het bestaat uit een of meer sleutel-waardeparen. Elke invoer is dus al gestructureerd en lijkt op de structuur van een CSV-bestand.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

In bovenstaand voorbeeld voegen we twee velden toe aan de stream met de naam (sleutel) ‘mystream’: ‘sensor-id’ en ‘temperature’ met respectievelijk de waarden ‘1234’ en ‘19.8’. Als tweede argument neemt de opdracht een identificatie die aan de invoer wordt toegewezen. Deze identificatie identificeert op unieke wijze elke invoer in de stroom. In dit geval zijn we echter geslaagd voor * omdat we willen dat Redis een nieuwe ID voor ons genereert. Elke nieuwe ID zal toenemen. Daarom zal elke nieuwe inzending een hogere identificatiecode hebben in vergelijking met eerdere inzendingen.

Identificatieformaat

De invoer-ID die door de opdracht wordt geretourneerd XADD, bestaat uit twee delen:

{millisecondsTime}-{sequenceNumber}

millisecondenTijd — Unix-tijd in milliseconden (Redis-servertijd). Als de huidige tijd echter gelijk is aan of korter is dan de tijd van de vorige opname, wordt de tijdstempel van de vorige opname gebruikt. Als de servertijd teruggaat in de tijd, behoudt de nieuwe identificatie dus nog steeds de eigenschap increment.

volgnummer gebruikt voor records die in dezelfde milliseconde zijn gemaakt. volgnummer wordt met 1 verhoogd ten opzichte van de vorige invoer. Omdat de volgnummer 64 bits groot is, dan zul je in de praktijk niet tegen een limiet aanlopen op het aantal records dat binnen één milliseconde kan worden gegenereerd.

Het formaat van dergelijke identificatiegegevens lijkt op het eerste gezicht misschien vreemd. Een wantrouwige lezer zou zich kunnen afvragen waarom tijd deel uitmaakt van de identificatie. De reden is dat Redis bereikquery's op ID streamt. Omdat de identifier gekoppeld is aan het tijdstip waarop het record is aangemaakt, is het mogelijk om tijdsbereiken op te vragen. We zullen naar een specifiek voorbeeld kijken als we naar de opdracht kijken XRANGE.

Als de gebruiker om wat voor reden dan ook zijn eigen identificatie moet opgeven, die bijvoorbeeld aan een extern systeem is gekoppeld, dan kunnen we deze doorgeven aan de opdracht XADD in plaats van * zoals hieronder weergegeven:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Houd er rekening mee dat u in dit geval zelf de ID-verhoging moet controleren. In ons voorbeeld is de minimale identificatiecode "0-1", dus de opdracht accepteert geen andere identificatiecode die gelijk is aan of kleiner is dan "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Aantal records per stream

Het is mogelijk om het aantal records in een stream eenvoudig op te vragen met behulp van de opdracht XLEN. Voor ons voorbeeld retourneert deze opdracht de volgende waarde:

> XLEN somestream
(integer) 2

Bereikquery's - XRANGE en XREVRANGE

Om gegevens per bereik op te vragen, moeten we twee identificatiegegevens opgeven: het begin en het einde van het bereik. Het geretourneerde bereik omvat alle elementen, inclusief de grenzen. Er zijn ook twee speciale identificatiegegevens “-” en “+”, die respectievelijk de kleinste (eerste record) en grootste (laatste record) identificatie in de stream betekenen. In het onderstaande voorbeeld worden alle streamvermeldingen weergegeven.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Elke geretourneerde record is een array van twee elementen: een ID en een lijst met sleutelwaardeparen. We hebben al gezegd dat record-ID's gerelateerd zijn aan tijd. Daarom kunnen wij een bereik voor een bepaalde periode opvragen. We kunnen in het verzoek echter niet de volledige identificatie opgeven, maar alleen de Unix-tijd, waarbij we het gedeelte dat verband houdt met volgnummer. Het weggelaten deel van de identificatie wordt aan het begin van het bereik automatisch op nul gezet en aan het einde van het bereik op de maximaal mogelijke waarde. Hieronder ziet u een voorbeeld hoe u een bereik van twee milliseconden kunt opvragen.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

We hebben slechts één vermelding in dit bereik, maar in echte datasets kan het geretourneerde resultaat enorm zijn. Om deze reden XRANGE ondersteunt de COUNT-optie. Door het aantal op te geven, kunnen we eenvoudig de eerste N-records verkrijgen. Als we de volgende N records (paginering) nodig hebben, kunnen we de laatst ontvangen ID gebruiken en deze verhogen volgnummer door één en vraag het opnieuw. Laten we dit in het volgende voorbeeld bekijken. We beginnen met het toevoegen van 10 elementen XADD (ervan uitgaande dat mijnstream al gevuld was met 10 elementen). Om de iteratie te starten met het verkrijgen van 2 elementen per commando, beginnen we met het volledige bereik, maar met COUNT gelijk aan 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Om door te gaan met de volgende twee elementen, moeten we de laatst ontvangen ID selecteren, d.w.z. 1519073279157-0, en 1 toevoegen aan volgnummer.
De resulterende ID, in dit geval 1519073279157-1, kan nu worden gebruikt als het nieuwe begin van bereikargument voor de volgende aanroep XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Enzovoort. Omdat complexiteit XRANGE is O(log(N)) om te zoeken en dan O(M) om M elementen terug te geven, dan is elke iteratiestap snel. Gebruiken dus XRANGE stromen kunnen efficiënt worden herhaald.

Team XREVRANGE is het equivalent XRANGE, maar retourneert de elementen in omgekeerde volgorde:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Houd er rekening mee dat de opdracht XREVRANGE neemt bereikargumenten start en stop in omgekeerde volgorde.

Nieuwe vermeldingen lezen met XREAD

Vaak ontstaat de taak om je te abonneren op een stream en alleen nieuwe berichten te ontvangen. Dit concept lijkt misschien op Redis Pub/Sub of het blokkeren van Redis List, maar er zijn fundamentele verschillen in het gebruik van Redis Stream:

  1. Elk nieuw bericht wordt standaard bij elke abonnee afgeleverd. Dit gedrag verschilt van een blokkerende Redis-lijst, waarbij een nieuw bericht slechts door één abonnee wordt gelezen.
  2. Terwijl in Redis Pub/Sub alle berichten worden vergeten en nooit bewaard blijven, worden in Stream alle berichten voor onbepaalde tijd bewaard (tenzij de client expliciet verwijdering veroorzaakt).
  3. Met Redis Stream kunt u de toegang tot berichten binnen één stream differentiëren. Een specifieke abonnee kan alleen zijn persoonlijke berichtengeschiedenis zien.

U kunt zich abonneren op een thread en nieuwe berichten ontvangen met behulp van de opdracht XLEES. Het is iets ingewikkelder dan XRANGE, dus we beginnen eerst met de eenvoudigere voorbeelden.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Het bovenstaande voorbeeld toont een niet-blokkerend formulier XLEES. Houd er rekening mee dat de optie AANTAL optioneel is. In feite is de enige vereiste opdrachtoptie de STREAMS-optie, die een lijst met streams specificeert samen met de bijbehorende maximale identificatie. We hebben “STREAMS mystream 0” geschreven - we willen alle records van de mystream-stream ontvangen met een ID groter dan “0-0”. Zoals je in het voorbeeld kunt zien, retourneert de opdracht de naam van de thread omdat we ons tegelijkertijd op meerdere threads kunnen abonneren. We zouden bijvoorbeeld kunnen schrijven: "STREAMS mystream otherstream 0 0". Houd er rekening mee dat we na de STREAMS-optie eerst de namen van alle vereiste streams moeten opgeven en pas daarna een lijst met identificatiegegevens.

In deze eenvoudige vorm doet het commando niets bijzonders vergeleken met XRANGE. Het interessante is echter dat we gemakkelijk kunnen keren XLEES naar een blokkeeropdracht, waarbij het BLOCK-argument wordt opgegeven:

> XREAD BLOCK 0 STREAMS mystream $

In het bovenstaande voorbeeld is een nieuwe BLOCK-optie gespecificeerd met een time-out van 0 milliseconden (dit betekent voor onbepaalde tijd wachten). Bovendien werd in plaats van de gebruikelijke identificatie voor de stream mystream een ​​speciale identificatie $ doorgegeven. Deze speciale identificatie betekent dat XLEES moet de maximale identificatie in mystream als identificatie gebruiken. We ontvangen dus pas nieuwe berichten vanaf het moment dat we zijn begonnen met luisteren. In sommige opzichten is dit vergelijkbaar met het Unix-commando "tail -f".

Houd er rekening mee dat we bij het gebruik van de BLOCK-optie niet noodzakelijkerwijs de speciale identificatie $ hoeven te gebruiken. We kunnen elke bestaande ID in de stream gebruiken. Als het team ons verzoek onmiddellijk kan inwilligen zonder te blokkeren, zal het dat doen, anders wordt het geblokkeerd.

Blokkeren XLEES Je kunt ook naar meerdere threads tegelijk luisteren, je hoeft alleen maar hun namen op te geven. In dit geval retourneert de opdracht een record van de eerste stream die gegevens heeft ontvangen. De eerste abonnee die voor een bepaalde thread is geblokkeerd, ontvangt als eerste gegevens.

Consumentengroepen

Bij bepaalde taken willen we de toegang van abonnees tot berichten binnen één thread beperken. Een voorbeeld waarbij dit nuttig zou kunnen zijn, is een berichtenwachtrij met werkers die verschillende berichten uit een thread ontvangen, waardoor de berichtverwerking kan worden geschaald.

Als we ons voorstellen dat we drie abonnees C1, C2, C3 hebben en een thread die de berichten 1, 2, 3, 4, 5, 6, 7 bevat, dan worden de berichten weergegeven zoals in het onderstaande diagram:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Om dit effect te bereiken maakt Redis Stream gebruik van een concept genaamd Consumer Group. Dit concept is vergelijkbaar met een pseudo-abonnee, die gegevens ontvangt van een stream, maar feitelijk wordt bediend door meerdere abonnees binnen een groep, wat bepaalde garanties biedt:

  1. Elk bericht wordt afgeleverd bij een andere abonnee binnen de groep.
  2. Binnen een groep worden abonnees geïdentificeerd aan de hand van hun naam, een hoofdlettergevoelige reeks. Als een abonnee tijdelijk uit de groep valt, kan hij met zijn eigen unieke naam weer in de groep worden opgenomen.
  3. Elke consumentengroep volgt het concept van het ‘eerste ongelezen bericht’. Wanneer een abonnee nieuwe berichten aanvraagt, kan deze alleen berichten ontvangen die nog nooit eerder zijn afgeleverd bij een abonnee binnen de groep.
  4. Er is een opdracht om expliciet te bevestigen dat het bericht met succes door de abonnee is verwerkt. Totdat dit commando wordt aangeroepen, blijft het opgevraagde bericht in de status "in behandeling".
  5. Binnen de Consumentengroep kan elke abonnee een historie opvragen van berichten die bij hem zijn afgeleverd, maar nog niet zijn verwerkt (in de status ‘in behandeling’)

In zekere zin kan de toestand van de groep als volgt worden uitgedrukt:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nu is het tijd om kennis te maken met de belangrijkste commando's voor de Consumer Group, namelijk:

  • XGROEP gebruikt om groepen te creëren, vernietigen en beheren
  • XREADGROEP gebruikt om stream door groep te lezen
  • XACK - met dit commando kan de abonnee het bericht als succesvol verwerkt markeren

Oprichting van de consumentengroep

Laten we aannemen dat mystream al bestaat. Dan ziet het commando voor het maken van een groep er als volgt uit:

> XGROUP CREATE mystream mygroup $
OK

Bij het aanmaken van een groep moeten we een identificatie doorgeven, vanaf waar de groep berichten zal ontvangen. Als we alleen alle nieuwe berichten willen ontvangen, kunnen we de speciale identificatie $ gebruiken (zoals in ons voorbeeld hierboven). Als u 0 opgeeft in plaats van een speciale ID, zijn alle berichten in de thread beschikbaar voor de groep.

Nu de groep is aangemaakt, kunnen we onmiddellijk beginnen met het lezen van berichten met behulp van de opdracht XREADGROEP. Deze opdracht lijkt erg op XLEES en ondersteunt de optionele BLOCK-optie. Er is echter een vereiste GROUP-optie die altijd moet worden opgegeven met twee argumenten: de groepsnaam en de abonneenaam. De optie AANTAL wordt ook ondersteund.

Laten we, voordat we de draad lezen, enkele berichten daar plaatsen:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Laten we nu proberen deze stream door de groep te lezen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Het bovenstaande commando luidt letterlijk als volgt:

"Ik, abonnee Alice, lid van de mijngroep, wil een bericht uit mijnstream lezen dat nog nooit eerder bij iemand is afgeleverd."

Elke keer dat een abonnee een bewerking op een groep uitvoert, moet deze zijn naam opgeven, waarmee hij zichzelf op unieke wijze binnen de groep identificeert. Er is nog een heel belangrijk detail in de bovenstaande opdracht: de speciale identificatie ">". Deze speciale identificatie filtert berichten, waardoor alleen berichten overblijven die nog nooit eerder zijn afgeleverd.

In speciale gevallen kunt u ook een echte ID opgeven, zoals 0 of een andere geldige ID. In dit geval de opdracht XREADGROEP geeft u een geschiedenis van berichten terug met de status "in behandeling" die zijn afgeleverd bij de opgegeven abonnee (Alice) maar nog niet zijn bevestigd met behulp van de opdracht XACK.

We kunnen dit gedrag testen door onmiddellijk de ID 0 op te geven, zonder de optie COUNT. We zullen eenvoudigweg één enkel bericht in behandeling zien, dat wil zeggen het Apple-bericht:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Als we echter bevestigen dat het bericht succesvol is verwerkt, wordt het niet langer weergegeven:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nu is het de beurt aan Bob om iets te lezen:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, een lid van mijngroep, vroeg om niet meer dan twee berichten. De opdracht rapporteert alleen niet-bezorgde berichten vanwege de speciale identificatie ">". Zoals je kunt zien, wordt het bericht "appel" niet weergegeven omdat het al bij Alice is afgeleverd, dus ontvangt Bob "sinaasappel" en "aardbei".

Op deze manier kunnen Alice, Bob en elke andere abonnee van de groep verschillende berichten uit dezelfde stream lezen. Ze kunnen ook hun geschiedenis van onverwerkte berichten lezen of berichten als verwerkt markeren.

Er zijn een paar dingen waarmee u rekening moet houden:

  • Zodra de abonnee het bericht als een commando beschouwt XREADGROEP, krijgt dit bericht de status 'in behandeling' en wordt het toegewezen aan die specifieke abonnee. Andere groepsabonnees kunnen dit bericht niet lezen.
  • Abonnees worden automatisch aangemaakt bij de eerste vermelding, het is niet nodig om ze expliciet aan te maken.
  • Met XREADGROEP je kunt berichten uit meerdere verschillende threads tegelijkertijd lezen, maar om dit te laten werken moet je eerst groepen maken met dezelfde naam voor elke thread met behulp van XGROEP

Herstel na een mislukking

De abonnee kan herstellen van de storing en zijn lijst met berichten met de status 'in behandeling' opnieuw lezen. In de echte wereld kunnen abonnees uiteindelijk echter mislukken. Wat gebeurt er met de vastgelopen berichten van een abonnee als de abonnee niet kan herstellen van een storing?
Consumer Group biedt een functie die juist voor dergelijke gevallen wordt gebruikt: wanneer u de eigenaar van berichten moet wijzigen.

Het eerste dat u hoeft te doen, is het commando aanroepen XPENDING, waarmee alle berichten in de groep worden weergegeven met de status 'in behandeling'. In de eenvoudigste vorm wordt de opdracht aangeroepen met slechts twee argumenten: de threadnaam en de groepsnaam:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Het team toonde het aantal onverwerkte berichten voor de hele groep en voor elke abonnee. We hebben alleen Bob met twee openstaande berichten omdat het enige bericht waar Alice om vroeg, werd bevestigd XACK.

We kunnen meer informatie opvragen met meer argumenten:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - bereik van ID's (u kunt “-” en “+” gebruiken)
{count} — aantal bezorgpogingen
{consumer-name} - groepsnaam

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nu hebben we details voor elk bericht: ID, abonneenaam, inactieve tijd in milliseconden en tenslotte het aantal bezorgpogingen. We hebben twee berichten van Bob en deze zijn 74170458 milliseconden inactief geweest, ongeveer 20 uur.

Houd er rekening mee dat niemand ons ervan weerhoudt om eenvoudigweg te controleren wat de inhoud van het bericht is XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

We hoeven alleen maar dezelfde identificatie tweemaal in de argumenten te herhalen. Nu we enig idee hebben, zou Alice kunnen besluiten dat Bob na 20 uur downtime waarschijnlijk niet zal herstellen, en dat het tijd is om die berichten op te vragen en de verwerking ervan voor Bob te hervatten. Hiervoor gebruiken we het commando XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Met dit commando kunnen we een “buitenlands” bericht ontvangen dat nog niet is verwerkt door de eigenaar te wijzigen in {consumer}. We kunnen echter ook een minimale inactiviteitstijd {min-idle-time} opgeven. Dit helpt een situatie te voorkomen waarin twee clients tegelijkertijd de eigenaar van dezelfde berichten proberen te wijzigen:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

De eerste klant zal de downtime resetten en de bezorgteller verhogen. De tweede klant kan er dus niet om vragen.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Het bericht is succesvol geclaimd door Alice, die het bericht nu kan verwerken en bevestigen.

Uit het bovenstaande voorbeeld kunt u zien dat een succesvol verzoek de inhoud van het bericht zelf retourneert. Dit is echter niet nodig. De JUSTID-optie kan alleen worden gebruikt om bericht-ID's te retourneren. Dit is handig als u niet geïnteresseerd bent in de details van het bericht en de systeemprestaties wilt verbeteren.

Bezorgbalie

De teller die u in de uitvoer ziet XPENDING is het aantal bezorgingen van elk bericht. Zo'n teller wordt op twee manieren verhoogd: wanneer een bericht succesvol wordt opgevraagd via XCLAIM of wanneer een oproep wordt gebruikt XREADGROEP.

Het is normaal dat sommige berichten meerdere keren worden afgeleverd. Het belangrijkste is dat alle berichten uiteindelijk worden verwerkt. Soms treden er problemen op bij het verwerken van een bericht omdat het bericht zelf beschadigd is, of omdat de berichtverwerking een fout in de afhandelingscode veroorzaakt. In dit geval kan het blijken dat niemand dit bericht kan verwerken. Omdat wij over een bezorgpogingenteller beschikken, kunnen wij deze teller gebruiken om dergelijke situaties te detecteren. Daarom is het waarschijnlijk verstandiger om een ​​dergelijk bericht in een andere thread te plaatsen en een melding naar de systeembeheerder te sturen zodra het aantal bezorgingen het door u opgegeven hoge aantal bereikt.

Draadstatus

Team XINFO gebruikt om verschillende informatie over een thread en zijn groepen op te vragen. Een basisopdracht ziet er bijvoorbeeld als volgt uit:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Met de bovenstaande opdracht wordt algemene informatie over de opgegeven stream weergegeven. Nu een iets complexer voorbeeld:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

De bovenstaande opdracht geeft algemene informatie weer voor alle groepen van de opgegeven thread

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

De bovenstaande opdracht geeft informatie weer voor alle abonnees van de opgegeven stream en groep.
Als u de syntaxis van de opdracht vergeet, vraagt ​​u de opdracht zelf om hulp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limiet voor streamgrootte

Veel applicaties willen niet voor altijd gegevens in een stream verzamelen. Het is vaak handig om een ​​maximaal aantal berichten per thread toe te staan. In andere gevallen is het handig om alle berichten van een thread naar een andere permanente opslag te verplaatsen wanneer de opgegeven threadgrootte is bereikt. U kunt de grootte van een stream beperken met behulp van de MAXLEN-parameter in de opdracht XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Bij gebruik van MAXLEN worden oude records automatisch verwijderd wanneer ze een bepaalde lengte bereiken, zodat de stream een ​​constante grootte heeft. Het snoeien gebeurt in dit geval echter niet op de meest efficiënte manier in het Redis-geheugen. U kunt de situatie als volgt verbeteren:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Het argument ~ in het bovenstaande voorbeeld betekent dat we de streamlengte niet noodzakelijkerwijs tot een specifieke waarde hoeven te beperken. In ons voorbeeld kan dit elk getal zijn dat groter is dan of gelijk is aan 1000 (bijvoorbeeld 1000, 1010 of 1030). We hebben zojuist expliciet aangegeven dat we willen dat onze stream minimaal 1000 records opslaat. Dit maakt het geheugenbeheer binnen Redis veel efficiënter.

Er is ook een apart team XTRIM, die hetzelfde doet:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Permanente opslag en replicatie

Redis Stream wordt asynchroon gerepliceerd naar slave-knooppunten en opgeslagen in bestanden zoals AOF (momentopname van alle gegevens) en RDB (logboek van alle schrijfbewerkingen). Replicatie van de status Consumentengroepen wordt ook ondersteund. Als een bericht dus de status 'in behandeling' heeft op het masterknooppunt, zal dit bericht op de slaveknooppunten dezelfde status hebben.

Afzonderlijke elementen uit een stream verwijderen

Er is een speciale opdracht om berichten te verwijderen XDEL. De opdracht haalt de naam van de thread op, gevolgd door de bericht-ID's die moeten worden verwijderd:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Wanneer u dit commando gebruikt, moet u er rekening mee houden dat het daadwerkelijke geheugen niet onmiddellijk wordt vrijgegeven.

Stromen van nul lengte

Het verschil tussen streams en andere Redis-datastructuren is dat wanneer andere datastructuren geen elementen meer bevatten, als neveneffect de datastructuur zelf uit het geheugen wordt verwijderd. De gesorteerde set wordt dus bijvoorbeeld volledig verwijderd wanneer de ZREM-aanroep het laatste element verwijdert. In plaats daarvan mogen threads in het geheugen blijven, zelfs zonder dat er elementen in zitten.

Conclusie

Redis Stream is ideaal voor het maken van message brokers, berichtenwachtrijen, uniforme logboekregistratie en chatsystemen voor het bijhouden van de geschiedenis.

Zoals ik ooit zei Niklaus Wirth, programma's zijn algoritmen plus datastructuren, en Redis biedt je beide al.

Bron: www.habr.com

Voeg een reactie