Redis Stream - tillförlitlighet och skalbarhet för dina meddelandesystem

Redis Stream - tillförlitlighet och skalbarhet för dina meddelandesystem

Redis Stream är en ny abstrakt datatyp introducerad i Redis med version 5.0
Konceptuellt är Redis Stream en lista där du kan lägga till poster. Varje post har en unik identifierare. Som standard genereras ID:t automatiskt och inkluderar en tidsstämpel. Därför kan du fråga efter intervall av poster över tid, eller ta emot ny data när den kommer i strömmen, ungefär som Unix "tail -f"-kommandot läser en loggfil och fryser medan du väntar på ny data. Observera att flera klienter kan lyssna på en tråd samtidigt, precis som många "tail -f"-processer kan läsa en fil samtidigt utan att komma i konflikt med varandra.

För att förstå alla fördelar med den nya datatypen, låt oss ta en snabb titt på de sedan länge existerande Redis-strukturerna som delvis replikerar Redis Streams funktionalitet.

Redis PUB/SUB

Redis Pub/Sub är ett enkelt meddelandesystem som redan är inbyggt i din nyckel-värde-butik. Men enkelhet kommer till en kostnad:

  • Om förlaget av någon anledning misslyckas, förlorar han alla sina prenumeranter
  • Utgivaren behöver veta den exakta adressen till alla sina prenumeranter
  • En utgivare kan överbelasta sina prenumeranter med arbete om data publiceras snabbare än den bearbetas
  • Meddelandet raderas från förlagets buffert direkt efter publicering, oavsett hur många prenumeranter det levererades till och hur snabbt de kunde behandla detta meddelande.
  • Alla prenumeranter kommer att få meddelandet samtidigt. Prenumeranter själva måste på något sätt komma överens sinsemellan om ordningen för behandling av samma meddelande.
  • Det finns ingen inbyggd mekanism för att bekräfta att en abonnent har bearbetat ett meddelande. Om en prenumerant får ett meddelande och kraschar under bearbetningen kommer utgivaren inte att veta om det.

Redis lista

Redis List är en datastruktur som stöder blockerande läskommandon. Du kan lägga till och läsa meddelanden från början eller slutet av listan. Baserat på denna struktur kan du skapa en bra stack eller kö för ditt distribuerade system, och i de flesta fall räcker detta. Huvudskillnaderna från Redis Pub/Sub:

  • Meddelandet levereras till en klient. Den första läsblockerade klienten kommer att ta emot data först.
  • Clint måste själv initiera läsoperationen för varje meddelande. List vet ingenting om kunder.
  • Meddelanden lagras tills någon läser dem eller uttryckligen tar bort dem. Om du konfigurerar Redis-servern att spola data till disken, ökar systemets tillförlitlighet dramatiskt.

Introduktion till Stream

Lägga till en post i en stream

Team XADD lägger till en ny post i strömmen. En post är inte bara en sträng, den består av ett eller flera nyckel-värdepar. Således är varje post redan strukturerad och liknar strukturen hos en CSV-fil.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

I exemplet ovan lägger vi till två fält till strömmen med namnet (nyckel) "mystream": "sensor-id" och "temperatur" med värdena "1234" respektive "19.8". Som det andra argumentet tar kommandot en identifierare som kommer att tilldelas posten - denna identifierare identifierar unikt varje post i strömmen. Men i det här fallet klarade vi * eftersom vi vill att Redis ska generera ett nytt ID åt oss. Varje nytt ID kommer att öka. Därför kommer varje ny post att ha en högre identifierare i förhållande till tidigare poster.

Identifieringsformat

Det inmatnings-ID som returneras av kommandot XADD, består av två delar:

{millisecondsTime}-{sequenceNumber}

millisecondsTime — Unix-tid i millisekunder (Redis-servertid). Men om den aktuella tiden är samma eller kortare än tiden för den föregående inspelningen, används tidsstämpeln för den föregående inspelningen. Därför, om servertiden går tillbaka i tiden, kommer den nya identifieraren fortfarande att behålla inkrementegenskapen.

sekvensnummer används för poster skapade på samma millisekund. sekvensnummer kommer att ökas med 1 i förhållande till föregående post. Eftersom den sekvensnummer är 64 bitar i storlek, bör du i praktiken inte stöta på en gräns för antalet poster som kan genereras inom en millisekund.

Formatet på sådana identifierare kan verka konstigt vid första anblicken. En misstänksam läsare kanske undrar varför tid är en del av identifieraren. Anledningen är att Redis-strömmar stöder intervallfrågor efter ID. Eftersom identifieraren är associerad med den tidpunkt då posten skapades, gör detta det möjligt att fråga efter tidsintervall. Vi ska titta på ett specifikt exempel när vi tittar på kommandot XRANGE.

Om användaren av någon anledning behöver ange sin egen identifierare, som till exempel är associerad med något externt system, så kan vi skicka det till kommandot XADD istället för * som visas nedan:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Observera att du i detta fall måste övervaka ID-ökningen själv. I vårt exempel är den minsta identifieraren "0-1", så kommandot accepterar inte en annan identifierare som är lika med eller mindre än "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Antal poster per stream

Det är möjligt att få antalet poster i en ström helt enkelt genom att använda kommandot XLEN. För vårt exempel kommer detta kommando att returnera följande värde:

> XLEN somestream
(integer) 2

Områdesfrågor - XRANGE och XREVRANGE

För att begära data efter intervall måste vi ange två identifierare - början och slutet av intervallet. Det returnerade intervallet kommer att inkludera alla element, inklusive gränserna. Det finns också två speciella identifierare "-" och "+", respektive betyder den minsta (första posten) och största (sista posten) identifierare i strömmen. Exemplet nedan kommer att lista alla strömposter.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Varje returnerad post är en matris med två element: en identifierare och en lista med nyckel-värdepar. Vi har redan sagt att postidentifierare är relaterade till tid. Därför kan vi begära ett intervall för en viss tidsperiod. Vi kan dock ange i begäran inte den fullständiga identifieraren, utan endast Unix-tiden, och utelämna den del som är relaterad till sekvensnummer. Den utelämnade delen av identifieraren ställs automatiskt in på noll i början av intervallet och till maximalt möjliga värde i slutet av intervallet. Nedan är ett exempel på hur du kan begära ett intervall på två millisekunder.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Vi har bara en post i det här intervallet, men i verkliga datamängder kan resultatet vara enormt. Av denna anledning XRANGE stöder alternativet COUNT. Genom att specificera kvantiteten kan vi helt enkelt få de första N posterna. Om vi ​​behöver få nästa N poster (paginering), kan vi använda det senast mottagna ID:t, öka det sekvensnummer av en och fråga igen. Låt oss titta på detta i följande exempel. Vi börjar lägga till 10 element med XADD (förutsatt att mystream redan var fylld med 10 element). För att starta iterationen och få 2 element per kommando börjar vi med hela intervallet men med COUNT lika med 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

För att fortsätta att iterera med de nästa två elementen måste vi välja det senast mottagna ID:t, dvs. 1519073279157-0, och lägga till 1 till sekvensnummer.
Det resulterande ID:t, i det här fallet 1519073279157-1, kan nu användas som det nya argumentet för start av intervall för nästa anrop XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Och så vidare. För komplexitet XRANGE är O(log(N)) för att söka och sedan O(M) för att returnera M element, då är varje iterationssteg snabbt. Således använder XRANGE strömmar kan itereras effektivt.

Team XREVRANGE är motsvarande XRANGE, men returnerar elementen i omvänd ordning:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Observera att kommandot XREVRANGE tar avståndsargument start och stopp i omvänd ordning.

Läser nya poster med XREAD

Ofta uppstår uppgiften att prenumerera på en stream och bara ta emot nya meddelanden. Det här konceptet kan likna Redis Pub/Sub eller blockering av Redis List, men det finns grundläggande skillnader i hur man använder Redis Stream:

  1. Varje nytt meddelande levereras till varje prenumerant som standard. Detta beteende skiljer sig från en blockerande Redis-lista, där ett nytt meddelande endast kommer att läsas av en prenumerant.
  2. Medan i Redis Pub/Sub alla meddelanden glöms bort och aldrig kvarstår, i Stream behålls alla meddelanden på obestämd tid (såvida inte klienten uttryckligen orsakar radering).
  3. Redis Stream låter dig skilja åtkomst till meddelanden inom en stream. En specifik prenumerant kan bara se sin personliga meddelandehistorik.

Du kan prenumerera på en tråd och ta emot nya meddelanden med kommandot XLÄS. Det är lite mer komplicerat än XRANGE, så vi börjar med de enklare exemplen först.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Exemplet ovan visar ett icke-blockerande formulär XLÄS. Observera att alternativet COUNT är valfritt. Faktum är att det enda nödvändiga kommandoalternativet är alternativet STREAMS, som specificerar en lista över strömmar tillsammans med motsvarande maximala identifierare. Vi skrev "STREAMS mystream 0" - vi vill ta emot alla poster i mystream-strömmen med en identifierare som är större än "0-0". Som du kan se i exemplet returnerar kommandot namnet på tråden eftersom vi kan prenumerera på flera trådar samtidigt. Vi skulle till exempel kunna skriva "STREAMS mystream otherstream 0 0". Observera att efter alternativet STREAMS måste vi först ange namnen på alla nödvändiga strömmar och först därefter en lista med identifierare.

I denna enkla form gör kommandot inget speciellt jämfört med XRANGE. Det intressanta är dock att vi lätt kan vända XLÄS till ett blockeringskommando, som anger BLOCK-argumentet:

> XREAD BLOCK 0 STREAMS mystream $

I exemplet ovan anges ett nytt BLOCK-alternativ med en timeout på 0 millisekunder (det betyder att man väntar på obestämd tid). Dessutom, istället för att skicka den vanliga identifieraren för streamen mystream, skickades en speciell identifierare $. Denna speciella identifierare betyder att XLÄS måste använda den maximala identifieraren i mystream som identifierare. Så vi kommer bara att få nya meddelanden från det ögonblick vi började lyssna. På vissa sätt liknar detta Unix-kommandot "tail -f".

Observera att när vi använder alternativet BLOCK behöver vi inte nödvändigtvis använda den speciella identifieraren $. Vi kan använda vilken identifierare som helst som finns i flödet. Om teamet kan betjäna vår begäran omedelbart utan att blockera kommer det att göra det, annars blockeras det.

Blockering XLÄS kan också lyssna på flera trådar samtidigt, du behöver bara ange deras namn. I det här fallet kommer kommandot att returnera en post över den första strömmen som tog emot data. Den första abonnenten som är blockerad för en given tråd kommer att få data först.

Konsumentgrupper

I vissa uppgifter vill vi begränsa prenumerantåtkomsten till meddelanden inom en tråd. Ett exempel där detta kan vara användbart är en meddelandekö med arbetare som kommer att ta emot olika meddelanden från en tråd, vilket gör att meddelandebearbetningen kan skalas.

Om vi ​​föreställer oss att vi har tre prenumeranter C1, C2, C3 och en tråd som innehåller meddelanden 1, 2, 3, 4, 5, 6, 7, kommer meddelandena att visas som i diagrammet nedan:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

För att uppnå denna effekt använder Redis Stream ett koncept som kallas Consumer Group. Detta koncept liknar en pseudo-abonnent, som tar emot data från en ström, men som faktiskt betjänas av flera abonnenter inom en grupp, vilket ger vissa garantier:

  1. Varje meddelande levereras till en annan abonnent inom gruppen.
  2. Inom en grupp identifieras prenumeranter med sitt namn, vilket är en skiftlägeskänslig sträng. Om en prenumerant tillfälligt hoppar ur gruppen kan han återställas till gruppen med sitt eget unika namn.
  3. Varje konsumentgrupp följer konceptet "första olästa meddelandet". När en prenumerant begär nya meddelanden kan den bara ta emot meddelanden som aldrig tidigare har levererats till någon prenumerant inom gruppen.
  4. Det finns ett kommando för att uttryckligen bekräfta att meddelandet bearbetades framgångsrikt av abonnenten. Tills detta kommando anropas kommer det begärda meddelandet att förbli i "väntande" status.
  5. Inom konsumentgruppen kan varje prenumerant begära en historik över meddelanden som har levererats till honom, men som ännu inte har behandlats (i "väntande" status)

På ett sätt kan gruppens tillstånd uttryckas på följande sätt:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nu är det dags att bekanta sig med huvudkommandona för Konsumentgruppen, nämligen:

  • XGROUP används för att skapa, förstöra och hantera grupper
  • XREADGROUP används för att läsa stream genom gruppen
  • XACK - detta kommando låter abonnenten markera meddelandet som framgångsrikt bearbetat

Skapande av konsumentgrupp

Låt oss anta att mystream redan finns. Då kommer kommandot för att skapa grupp se ut så här:

> XGROUP CREATE mystream mygroup $
OK

När vi skapar en grupp måste vi skicka en identifierare, från vilken gruppen kommer att ta emot meddelanden. Om vi ​​bara vill ta emot alla nya meddelanden kan vi använda den speciella identifieraren $ (som i vårt exempel ovan). Om du anger 0 istället för en speciell identifierare, kommer alla meddelanden i tråden att vara tillgängliga för gruppen.

Nu när gruppen är skapad kan vi omedelbart börja läsa meddelanden med kommandot XREADGROUP. Detta kommando är mycket likt XLÄS och stöder det valfria alternativet BLOCK. Det finns dock ett obligatoriskt GROUP-alternativ som alltid måste anges med två argument: gruppnamnet och abonnentnamnet. Alternativet COUNT stöds också.

Innan vi läser tråden, låt oss lägga några meddelanden där:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Låt oss nu försöka läsa denna ström genom gruppen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ovanstående kommando lyder ordagrant enligt följande:

"Jag, prenumerant Alice, en medlem av min grupp, vill läsa ett meddelande från mystream som aldrig har levererats till någon tidigare."

Varje gång en abonnent utför en operation på en grupp måste den ange sitt namn, vilket unikt identifierar sig inom gruppen. Det finns ytterligare en mycket viktig detalj i kommandot ovan - den speciella identifieraren ">". Denna speciella identifierare filtrerar meddelanden och lämnar bara de som aldrig har levererats tidigare.

I speciella fall kan du också ange en riktig identifierare som 0 eller någon annan giltig identifierare. I det här fallet kommandot XREADGROUP kommer att returnera dig en historik över meddelanden med statusen "väntande" som levererades till den angivna abonnenten (Alice) men som ännu inte har bekräftats med kommandot XACK.

Vi kan testa detta beteende genom att omedelbart ange ID 0, utan alternativet RÄKNA. Vi kommer helt enkelt att se ett enda väntande meddelande, det vill säga Apple-meddelandet:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Men om vi bekräftar att meddelandet har bearbetats kommer det inte längre att visas:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nu är det Bobs tur att läsa något:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, medlem i min grupp, bad om högst två meddelanden. Kommandot rapporterar endast ej levererade meddelanden på grund av den speciella identifieraren ">". Som du kan se kommer meddelandet "äpple" inte att visas eftersom det redan har levererats till Alice, så Bob får "apelsin" och "jordgubbe".

På så sätt kan Alice, Bob och alla andra prenumeranter på gruppen läsa olika meddelanden från samma ström. De kan också läsa sin historik över obehandlade meddelanden eller markera meddelanden som bearbetade.

Det finns några saker att tänka på:

  • Så snart abonnenten anser att meddelandet är ett kommando XREADGROUP, det här meddelandet går in i läget "väntande" och tilldelas den specifika abonnenten. Andra gruppprenumeranter kommer inte att kunna läsa detta meddelande.
  • Prenumeranter skapas automatiskt vid första omnämnande, det finns inget behov av att uttryckligen skapa dem.
  • Med XREADGROUP du kan läsa meddelanden från flera olika trådar samtidigt, men för att detta ska fungera måste du först skapa grupper med samma namn för varje tråd med XGROUP

Återhämtning efter ett misslyckande

Prenumeranten kan återhämta sig från felet och läsa om sin lista över meddelanden med statusen "väntande". Men i den verkliga världen kan prenumeranter i slutändan misslyckas. Vad händer med en abonnents meddelanden som har fastnat om abonnenten inte kan återhämta sig efter ett fel?
Consumer Group erbjuder en funktion som används för just sådana fall – när du behöver byta ägare till meddelanden.

Det första du behöver göra är att anropa kommandot XPENDING, som visar alla meddelanden i gruppen med statusen "väntande". I sin enklaste form anropas kommandot med endast två argument: trådnamnet och gruppnamnet:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Teamet visade antalet obehandlade meddelanden för hela gruppen och för varje prenumerant. Vi har bara Bob med två utestående meddelanden eftersom det enda meddelande som Alice begärde bekräftades med XACK.

Vi kan begära mer information med fler argument:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - intervall av identifierare (du kan använda "-" och "+")
{count} — antal leveransförsök
{konsumentnamn} – gruppnamn

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nu har vi detaljer för varje meddelande: ID, abonnentnamn, vilotid i millisekunder och slutligen antalet leveransförsök. Vi har två meddelanden från Bob och de har varit inaktiva i 74170458 millisekunder, ungefär 20 timmar.

Observera att ingen hindrar oss från att kontrollera innehållet i meddelandet helt enkelt genom att använda XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Vi måste bara upprepa samma identifierare två gånger i argumenten. Nu när vi har en idé kanske Alice bestämmer sig för att Bob förmodligen inte kommer att återhämta sig efter 20 timmars driftstopp, och det är dags att fråga efter dessa meddelanden och återuppta bearbetningen av dem för Bob. För detta använder vi kommandot XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Med detta kommando kan vi ta emot ett "utländskt" meddelande som ännu inte har behandlats genom att ändra ägaren till {konsument}. Men vi kan också tillhandahålla en minsta vilotid {min-idle-time}. Detta hjälper till att undvika en situation där två klienter försöker byta ägare till samma meddelanden samtidigt:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Den första kunden kommer att återställa stilleståndstiden och öka leveransräknaren. Så den andra klienten kommer inte att kunna begära det.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Meddelandet gjordes anspråk på framgångsrikt av Alice, som nu kan bearbeta meddelandet och bekräfta det.

Från exemplet ovan kan du se att en lyckad begäran returnerar innehållet i själva meddelandet. Detta är dock inte nödvändigt. Alternativet JUSTID kan endast användas för att returnera meddelande-ID:n. Detta är användbart om du inte är intresserad av detaljerna i meddelandet och vill öka systemets prestanda.

Leveransdisk

Räknaren du ser i utgången XPENDING är antalet leveranser av varje meddelande. En sådan räknare inkrementeras på två sätt: när ett meddelande framgångsrikt begärs via XCLAIM eller när ett samtal används XREADGROUP.

Det är normalt att vissa meddelanden levereras flera gånger. Huvudsaken är att alla meddelanden så småningom behandlas. Ibland uppstår problem vid bearbetning av ett meddelande eftersom själva meddelandet är skadat, eller att meddelandebehandling orsakar ett fel i hanterarkoden. I det här fallet kan det visa sig att ingen kommer att kunna behandla detta meddelande. Eftersom vi har en leveransförsöksräknare kan vi använda denna räknare för att upptäcka sådana situationer. Därför, när leveransantalet når det höga antalet du anger, skulle det förmodligen vara klokare att lägga ett sådant meddelande på en annan tråd och skicka ett meddelande till systemadministratören.

Trådtillstånd

Team XINFO används för att begära olika information om en tråd och dess grupper. Till exempel ser ett grundläggande kommando ut så här:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Kommandot ovan visar allmän information om den angivna strömmen. Nu ett lite mer komplext exempel:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Kommandot ovan visar allmän information för alla grupper i den angivna tråden

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Kommandot ovan visar information för alla prenumeranter av den angivna strömmen och gruppen.
Om du glömmer kommandosyntaxen, fråga bara själva kommandot om hjälp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Strömstorleksgräns

Många applikationer vill inte samla in data i en ström för alltid. Det är ofta användbart att ha ett maximalt antal meddelanden per tråd. I andra fall är det användbart att flytta alla meddelanden från en tråd till en annan beständig lagring när den angivna trådstorleken har nåtts. Du kan begränsa storleken på en ström genom att använda parametern MAXLEN i kommandot XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

När du använder MAXLEN raderas gamla poster automatiskt när de når en angiven längd, så strömmen har en konstant storlek. Men beskärning i detta fall sker inte på det mest effektiva sättet i Redis minne. Du kan förbättra situationen enligt följande:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argumentet ~ i exemplet ovan betyder att vi inte nödvändigtvis behöver begränsa strömlängden till ett specifikt värde. I vårt exempel kan detta vara vilket tal som helst som är större än eller lika med 1000 (till exempel 1000, 1010 eller 1030). Vi har precis angett att vi vill att vår stream ska lagra minst 1000 poster. Detta gör minneshanteringen mycket effektivare inuti Redis.

Det finns också ett separat team XTRIM, som gör samma sak:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Beständig lagring och replikering

Redis Stream replikeras asynkront till slavnoder och sparas i filer som AOF (ögonblicksbild av all data) och RDB (logg över alla skrivoperationer). Replikering av Consumer Groups-tillstånd stöds också. Därför, om ett meddelande har statusen "väntande" på masternoden, kommer detta meddelande att ha samma status på slavnoderna.

Ta bort enskilda element från en ström

Det finns ett speciellt kommando för att radera meddelanden XDEL. Kommandot får namnet på tråden följt av meddelande-ID:n som ska raderas:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

När du använder det här kommandot måste du ta hänsyn till att det faktiska minnet inte kommer att släppas omedelbart.

Strömmar med noll längd

Skillnaden mellan strömmar och andra Redis-datastrukturer är att när andra datastrukturer inte längre har element inom sig, som en bieffekt, kommer själva datastrukturen att tas bort från minnet. Så till exempel kommer den sorterade uppsättningen att tas bort helt när ZREM-anropet tar bort det sista elementet. Istället får trådar finnas kvar i minnet även utan att ha några element inuti.

Slutsats

Redis Stream är idealiskt för att skapa meddelandeförmedlare, meddelandeköer, enhetlig loggning och historikbevarande chattsystem.

Som jag sa en gång Niklaus Wirth, program är algoritmer plus datastrukturer, och Redis ger dig redan båda.

Källa: will.com

Lägg en kommentar