Redis Stream - pålidelighed og skalerbarhed af dine beskedsystemer

Redis Stream - pålidelighed og skalerbarhed af dine beskedsystemer

Redis Stream er en ny abstrakt datatype introduceret i Redis med version 5.0
Konceptuelt er Redis Stream en liste, som du kan tilføje poster til. Hver post har en unik identifikator. Som standard genereres ID'et automatisk og inkluderer et tidsstempel. Derfor kan du forespørge på rækker af poster over tid eller modtage nye data, når de ankommer i strømmen, ligesom Unix-kommandoen "tail -f" læser en logfil og fryser, mens du venter på nye data. Bemærk, at flere klienter kan lytte til en tråd på samme tid, ligesom mange "hale -f" processer kan læse en fil samtidigt uden at komme i konflikt med hinanden.

For at forstå alle fordelene ved den nye datatype, lad os tage et hurtigt kig på de længe eksisterende Redis-strukturer, der delvist replikerer funktionaliteten af ​​Redis Stream.

Redis PUB/SUB

Redis Pub/Sub er et simpelt meddelelsessystem, der allerede er indbygget i dit nøgleværdilager. Men enkelhed har en pris:

  • Hvis udgiveren af ​​en eller anden grund fejler, så mister han alle sine abonnenter
  • Udgiveren skal kende den nøjagtige adresse på alle sine abonnenter
  • En udgiver kan overbelaste sine abonnenter med arbejde, hvis data udgives hurtigere, end de behandles
  • Beskeden slettes fra udgiverens buffer umiddelbart efter offentliggørelsen, uanset hvor mange abonnenter den blev leveret til, og hvor hurtigt de var i stand til at behandle denne meddelelse.
  • Alle abonnenter vil modtage beskeden på samme tid. Abonnenter skal selv på en eller anden måde indbyrdes blive enige om rækkefølgen for behandling af den samme besked.
  • Der er ingen indbygget mekanisme til at bekræfte, at en abonnent har behandlet en besked. Hvis en abonnent modtager en besked og går ned under behandlingen, vil udgiveren ikke vide om det.

Redis liste

Redis List er en datastruktur, der understøtter blokering af læsekommandoer. Du kan tilføje og læse beskeder fra begyndelsen eller slutningen af ​​listen. Ud fra denne struktur kan du lave en god stack eller kø til dit distribuerede system, og i de fleste tilfælde vil det være nok. Vigtigste forskelle fra Redis Pub/Sub:

  • Beskeden leveres til én klient. Den første læseblokerede klient vil modtage dataene først.
  • Clint skal selv starte læseoperationen for hver besked. List ved intet om kunder.
  • Beskeder gemmes, indtil nogen læser dem eller eksplicit sletter dem. Hvis du konfigurerer Redis-serveren til at skylle data til disken, øges systemets pålidelighed dramatisk.

Introduktion til Stream

Tilføjelse af en post til en stream

Team XADD tilføjer en ny post til strømmen. En post er ikke bare en streng, den består af et eller flere nøgleværdi-par. Således er hver post allerede struktureret og ligner strukturen af ​​en CSV-fil.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

I eksemplet ovenfor tilføjer vi to felter til streamen med navnet (nøgle) "mystream": "sensor-id" og "temperatur" med henholdsvis værdierne "1234" og "19.8". Som det andet argument tager kommandoen en identifikator, der vil blive tildelt til posten - denne identifikator identificerer entydigt hver post i strømmen. Men i dette tilfælde bestod vi *, fordi vi vil have Redis til at generere et nyt ID til os. Hvert nyt ID vil stige. Derfor vil hver ny post have en højere identifikator i forhold til tidligere poster.

Identifikatorformat

Indtastnings-id'et returneret af kommandoen XADD, består af to dele:

{millisecondsTime}-{sequenceNumber}

millisekunderTid — Unix-tid i millisekunder (Redis-servertid). Men hvis det aktuelle klokkeslæt er det samme eller mindre end tidspunktet for den forrige optagelse, bruges tidsstemplet for den forrige optagelse. Derfor, hvis servertiden går tilbage i tiden, vil den nye identifikator stadig beholde inkrementegenskaben.

sekvensnummer bruges til poster oprettet i samme millisekund. sekvensnummer vil blive øget med 1 i forhold til den forrige post. Fordi sekvensnummer er 64 bit store, så bør du i praksis ikke løbe ind i en grænse for antallet af poster, der kan genereres inden for et millisekund.

Formatet af sådanne identifikatorer kan virke underligt ved første øjekast. En mistroisk læser kan undre sig over, hvorfor tid er en del af identifikatoren. Årsagen er, at Redis-streams understøtter rækkeviddeforespørgsler efter ID. Da identifikatoren er knyttet til det tidspunkt, hvor posten blev oprettet, gør dette det muligt at forespørge tidsintervaller. Vi vil se på et specifikt eksempel, når vi ser på kommandoen XRANGE.

Hvis brugeren af ​​en eller anden grund har brug for at angive sin egen identifikator, som for eksempel er forbundet med et eksternt system, så kan vi videregive det til kommandoen XADD i stedet for * som vist nedenfor:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Bemærk venligst, at du i dette tilfælde selv skal overvåge ID-stigningen. I vores eksempel er minimumsidentifikatoren "0-1", så kommandoen vil ikke acceptere en anden identifikator, der er lig med eller mindre end "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Antal poster pr. stream

Det er muligt at få antallet af poster i en strøm blot ved at bruge kommandoen XLEN. For vores eksempel vil denne kommando returnere følgende værdi:

> XLEN somestream
(integer) 2

Områdeforespørgsler - XRANGE og XREVRANGE

For at anmode om data efter område skal vi angive to identifikatorer - begyndelsen og slutningen af ​​området. Det returnerede område vil omfatte alle elementer, inklusive grænserne. Der er også to specielle identifikatorer "-" og "+", der henholdsvis betyder den mindste (første post) og største (sidste post) identifikator i strømmen. Eksemplet nedenfor viser alle stream-indgange.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Hver returneret post er en matrix af to elementer: en identifikator og en liste over nøgleværdi-par. Vi har allerede sagt, at registreringsidentifikatorer er relateret til tid. Derfor kan vi anmode om en række af en bestemt tidsperiode. Vi kan dog i anmodningen angive ikke den fulde identifikator, men kun Unix-tiden, idet vi udelader den del, der vedrører sekvensnummer. Den udeladte del af identifikatoren vil automatisk blive sat til nul i begyndelsen af ​​området og til den maksimalt mulige værdi i slutningen af ​​området. Nedenfor er et eksempel på, hvordan du kan anmode om et interval på to millisekunder.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Vi har kun én post i dette interval, men i rigtige datasæt kan det returnerede resultat være enormt. Af denne grund XRANGE understøtter indstillingen COUNT. Ved at angive mængden kan vi blot få de første N poster. Hvis vi har brug for at få de næste N poster (paginering), kan vi bruge det sidst modtagne ID, øge det sekvensnummer af en og spørg igen. Lad os se på dette i det følgende eksempel. Vi begynder at tilføje 10 elementer med XADD (forudsat at mystream allerede var fyldt med 10 elementer). For at starte iterationen med at få 2 elementer pr. kommando, starter vi med hele området, men med COUNT lig med 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

For at fortsætte med at iterere med de næste to elementer, skal vi vælge det sidst modtagne ID, dvs. 1519073279157-0, og tilføje 1 til sekvensnummer.
Det resulterende ID, i dette tilfælde 1519073279157-1, kan nu bruges som det nye start af interval-argument for det næste kald XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Og så videre. Fordi kompleksitet XRANGE er O(log(N)) for at søge og derefter O(M) for at returnere M elementer, så er hvert iterationstrin hurtigt. Således bruger XRANGE strømme kan itereres effektivt.

Team XREVRANGE er det tilsvarende XRANGE, men returnerer elementerne i omvendt rækkefølge:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Bemærk venligst, at kommandoen XREVRANGE tager afstandsargumenter start og stop i omvendt rækkefølge.

Læsning af nye poster ved hjælp af XREAD

Ofte opstår opgaven med at abonnere på en stream og kun modtage nye beskeder. Dette koncept kan ligne Redis Pub/Sub eller blokering af Redis List, men der er grundlæggende forskelle i, hvordan du bruger Redis Stream:

  1. Hver ny besked leveres som standard til hver abonnent. Denne adfærd er forskellig fra en blokerende Redis-liste, hvor en ny besked kun vil blive læst af én abonnent.
  2. Mens alle beskeder i Redis Pub/Sub bliver glemt og aldrig bliver ved, bevares alle beskeder i Stream på ubestemt tid (medmindre klienten eksplicit forårsager sletning).
  3. Redis Stream giver dig mulighed for at differentiere adgangen til beskeder inden for en strøm. En bestemt abonnent kan kun se deres personlige beskedhistorik.

Du kan abonnere på en tråd og modtage nye beskeder ved hjælp af kommandoen XLÆS. Det er lidt mere kompliceret end XRANGE, så vi starter med de enklere eksempler først.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Eksemplet ovenfor viser en ikke-blokerende formular XLÆS. Bemærk, at indstillingen COUNT er valgfri. Faktisk er den eneste nødvendige kommandomulighed indstillingen STREAMS, som specificerer en liste over streams sammen med den tilsvarende maksimale identifikator. Vi skrev "STREAMS mystream 0" - vi ønsker at modtage alle registreringer af mystream-strømmen med en identifikator større end "0-0". Som du kan se fra eksemplet, returnerer kommandoen navnet på tråden, fordi vi kan abonnere på flere tråde på samme tid. Vi kunne for eksempel skrive "STREAMS mystream otherstream 0 0". Bemærk venligst, at efter indstillingen STREAMS skal vi først angive navnene på alle de nødvendige streams og først derefter en liste over identifikatorer.

I denne enkle form gør kommandoen ikke noget særligt i forhold til XRANGE. Det interessante er dog, at vi sagtens kan vende XLÆS til en blokeringskommando, der angiver BLOCK-argumentet:

> XREAD BLOCK 0 STREAMS mystream $

I eksemplet ovenfor er en ny BLOCK-indstilling angivet med en timeout på 0 millisekunder (dette betyder, at man venter på ubestemt tid). Desuden, i stedet for at videregive den sædvanlige identifikator for streamen mystream, blev en speciel identifikator $ videregivet. Denne særlige identifikator betyder det XLÆS skal bruge den maksimale identifikator i mystream som identifikator. Så vi vil kun modtage nye beskeder fra det øjeblik, vi begyndte at lytte. På nogle måder ligner dette Unix "tail -f" kommandoen.

Bemærk, at når du bruger BLOCK-indstillingen, behøver vi ikke nødvendigvis at bruge den specielle identifikator $. Vi kan bruge enhver identifikator, der findes i strømmen. Hvis teamet kan servicere vores anmodning med det samme uden at blokere, vil det gøre det, ellers blokerer det.

Blokering XLÆS kan også lytte til flere tråde på én gang, du skal blot angive deres navne. I dette tilfælde vil kommandoen returnere en registrering af den første strøm, der modtog data. Den første abonnent, der er blokeret for en given tråd, vil modtage data først.

Forbrugergrupper

I visse opgaver ønsker vi at begrænse abonnenternes adgang til beskeder inden for en tråd. Et eksempel, hvor dette kunne være nyttigt, er en meddelelseskø med arbejdere, der vil modtage forskellige meddelelser fra en tråd, hvilket gør det muligt at skalere meddelelsesbehandling.

Hvis vi forestiller os, at vi har tre abonnenter C1, C2, C3 og en tråd, der indeholder beskeder 1, 2, 3, 4, 5, 6, 7, så vil beskederne blive serveret som i diagrammet nedenfor:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

For at opnå denne effekt bruger Redis Stream et koncept kaldet Consumer Group. Dette koncept ligner en pseudo-abonnent, som modtager data fra en strøm, men som faktisk betjenes af flere abonnenter inden for en gruppe, hvilket giver visse garantier:

  1. Hver besked leveres til en anden abonnent i gruppen.
  2. Inden for en gruppe identificeres abonnenter ved deres navn, som er en streng, der skelner mellem store og små bogstaver. Hvis en abonnent midlertidigt forlader gruppen, kan han blive gendannet til gruppen ved hjælp af sit eget unikke navn.
  3. Hver forbrugergruppe følger konceptet "første ulæste besked". Når en abonnent anmoder om nye beskeder, kan den kun modtage beskeder, der aldrig tidligere er blevet leveret til nogen abonnent i gruppen.
  4. Der er en kommando til eksplicit at bekræfte, at meddelelsen blev behandlet af abonnenten. Indtil denne kommando kaldes, vil den anmodede besked forblive i "afventer"-status.
  5. Inden for forbrugergruppen kan hver abonnent anmode om en historik over beskeder, der blev leveret til ham, men som endnu ikke er blevet behandlet (i status "afventer")

På en måde kan gruppens tilstand udtrykkes som følger:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nu er det tid til at stifte bekendtskab med de vigtigste kommandoer for Forbrugergruppen, nemlig:

  • XGROUP bruges til at oprette, ødelægge og administrere grupper
  • XREADGROUP bruges til at læse stream gennem gruppen
  • XACK - denne kommando giver abonnenten mulighed for at markere meddelelsen som behandlet

Oprettelse af forbrugergruppe

Lad os antage, at mystream allerede eksisterer. Så vil kommandoen til oprettelse af gruppe se sådan ud:

> XGROUP CREATE mystream mygroup $
OK

Når vi opretter en gruppe, skal vi videregive en identifikator, hvorfra gruppen vil modtage beskeder. Hvis vi blot ønsker at modtage alle nye beskeder, så kan vi bruge den specielle identifikator $ (som i vores eksempel ovenfor). Hvis du angiver 0 i stedet for en speciel identifikator, vil alle meddelelser i tråden være tilgængelige for gruppen.

Nu hvor gruppen er oprettet, kan vi straks begynde at læse beskeder ved hjælp af kommandoen XREADGROUP. Denne kommando ligner meget XLÆS og understøtter den valgfrie BLOCK-indstilling. Der er dog en påkrævet GROUP-indstilling, som altid skal angives med to argumenter: gruppenavnet og abonnentnavnet. Indstillingen COUNT er også understøttet.

Før vi læser tråden, lad os lægge nogle beskeder der:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Lad os nu prøve at læse denne strøm gennem gruppen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ovenstående kommando lyder ordret som følger:

"Jeg, abonnent Alice, et medlem af min gruppe, ønsker at læse en besked fra mystream, som aldrig er blevet leveret til nogen før."

Hver gang en abonnent udfører en operation på en gruppe, skal den oplyse sit navn, der entydigt identificerer sig selv i gruppen. Der er endnu en meget vigtig detalje i kommandoen ovenfor - den særlige identifikator ">". Denne specielle identifikator filtrerer beskeder og efterlader kun dem, der aldrig er blevet leveret før.

I særlige tilfælde kan du også angive en reel identifikator, såsom 0 eller en hvilken som helst anden gyldig identifikator. I dette tilfælde kommandoen XREADGROUP vil returnere dig en historie med beskeder med status "afventer", der blev leveret til den angivne abonnent (Alice), men som endnu ikke er blevet bekræftet ved hjælp af kommandoen XACK.

Vi kan teste denne adfærd ved straks at angive ID 0 uden muligheden COUNT. Vi vil blot se en enkelt afventende meddelelse, det vil sige æblemeddelelsen:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Men hvis vi bekræfter meddelelsen som vellykket behandlet, vil den ikke længere blive vist:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nu er det Bobs tur til at læse noget:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, medlem af min gruppe, bad om ikke mere end to beskeder. Kommandoen rapporterer kun ikke-leverede meddelelser på grund af den særlige identifikator ">". Som du kan se, vil beskeden "æble" ikke blive vist, da den allerede er leveret til Alice, så Bob modtager "appelsin" og "jordbær".

På denne måde kan Alice, Bob og enhver anden abonnent på gruppen læse forskellige beskeder fra den samme strøm. De kan også læse deres historie med ubehandlede meddelelser eller markere meddelelser som behandlede.

Der er et par ting at huske på:

  • Så snart abonnenten anser beskeden for at være en kommando XREADGROUP, går denne meddelelse i "afventende" tilstand og tildeles den specifikke abonnent. Andre gruppeabonnenter vil ikke kunne læse denne besked.
  • Abonnenter oprettes automatisk ved første omtale, det er ikke nødvendigt at oprette dem eksplicit.
  • Med XREADGROUP du kan læse beskeder fra flere forskellige tråde på samme tid, men for at dette skal fungere skal du først oprette grupper med det samme navn for hver tråd vha. XGROUP

Gendannelse af fejl

Abonnenten kan komme sig over fejlen og genlæse sin liste over beskeder med statussen "afventer". Men i den virkelige verden kan abonnenter i sidste ende fejle. Hvad sker der med en abonnents fastlåste beskeder, hvis abonnenten ikke er i stand til at komme sig efter en fejl?
Consumer Group tilbyder en funktion, som bruges til netop sådanne sager – når du skal skifte ejer af beskeder.

Den første ting du skal gøre er at kalde kommandoen XPENDING, som viser alle beskeder i gruppen med status "afventer". I sin enkleste form kaldes kommandoen med kun to argumenter: trådnavnet og gruppenavnet:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Holdet viste antallet af ubehandlede beskeder for hele gruppen og for hver abonnent. Vi har kun Bob med to udestående beskeder, fordi den eneste besked Alice anmodede om blev bekræftet med XACK.

Vi kan anmode om flere oplysninger ved at bruge flere argumenter:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - række af identifikatorer (du kan bruge "-" og "+")
{count} — antal leveringsforsøg
{consumer-name} - gruppenavn

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nu har vi detaljer for hver besked: ID, abonnentnavn, inaktiv tid i millisekunder og til sidst antallet af leveringsforsøg. Vi har to beskeder fra Bob, og de har været inaktive i 74170458 millisekunder, omkring 20 timer.

Bemærk venligst, at ingen forhindrer os i at kontrollere, hvad indholdet af beskeden var ved blot at bruge XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Vi skal bare gentage den samme identifikator to gange i argumenterne. Nu hvor vi har en idé, kan Alice beslutte, at Bob sandsynligvis ikke vil komme sig efter 20 timers nedetid, og det er tid til at forespørge på disse beskeder og genoptage behandlingen af ​​dem for Bob. Til dette bruger vi kommandoen XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Ved at bruge denne kommando kan vi modtage en "fremmed" besked, som endnu ikke er blevet behandlet, ved at ændre ejeren til {consumer}. Vi kan dog også give en minimum ledig tid {min-idle-time}. Dette hjælper med at undgå en situation, hvor to klienter forsøger at ændre ejeren af ​​de samme meddelelser samtidigt:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Den første kunde nulstiller nedetiden og øger leveringstælleren. Så den anden klient vil ikke kunne anmode om det.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Beskeden blev med succes gjort krav på af Alice, som nu kan behandle meddelelsen og bekræfte den.

Fra ovenstående eksempel kan du se, at en vellykket anmodning returnerer indholdet af selve beskeden. Dette er dog ikke nødvendigt. Indstillingen JUSTID kan kun bruges til at returnere besked-id'er. Dette er nyttigt, hvis du ikke er interesseret i detaljerne i meddelelsen og ønsker at øge systemets ydeevne.

Leveringstæller

Tælleren du ser i outputtet XPENDING er antallet af leveringer af hver besked. En sådan tæller øges på to måder: når en meddelelse er vellykket anmodet via XCLAIM eller når et opkald bruges XREADGROUP.

Det er normalt, at nogle beskeder bliver leveret flere gange. Det vigtigste er, at alle beskeder til sidst behandles. Nogle gange opstår der problemer under behandling af en meddelelse, fordi selve meddelelsen er beskadiget, eller meddelelsesbehandling forårsager en fejl i behandlerkoden. I dette tilfælde kan det vise sig, at ingen vil være i stand til at behandle denne besked. Da vi har en leveringsforsøgstæller, kan vi bruge denne tæller til at opdage sådanne situationer. Derfor, når leveringsantallet når det høje tal du angiver, ville det nok være klogere at lægge sådan en besked på en anden tråd og sende en notifikation til systemadministratoren.

Trådtilstand

Team XINFO bruges til at anmode om forskellige oplysninger om en tråd og dens grupper. For eksempel ser en grundlæggende kommando sådan ud:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Kommandoen ovenfor viser generel information om den angivne strøm. Nu et lidt mere komplekst eksempel:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Kommandoen ovenfor viser generel information for alle grupper i den angivne tråd

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Kommandoen ovenfor viser information for alle abonnenter af den angivne strøm og gruppe.
Hvis du glemmer kommandosyntaksen, skal du bare spørge selve kommandoen om hjælp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Streams størrelsesgrænse

Mange applikationer ønsker ikke at samle data i en strøm for evigt. Det er ofte nyttigt at have et maksimalt antal meddelelser tilladt pr. tråd. I andre tilfælde er det nyttigt at flytte alle meddelelser fra en tråd til et andet vedvarende lager, når den angivne trådstørrelse er nået. Du kan begrænse størrelsen af ​​en strøm ved at bruge parameteren MAXLEN i kommandoen XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Når du bruger MAXLEN, slettes gamle poster automatisk, når de når en specificeret længde, så strømmen har en konstant størrelse. Imidlertid sker beskæring i dette tilfælde ikke på den mest effektive måde i Redis hukommelse. Du kan forbedre situationen på følgende måde:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

~-argumentet i eksemplet ovenfor betyder, at vi ikke nødvendigvis behøver at begrænse strømlængden til en bestemt værdi. I vores eksempel kan dette være et hvilket som helst tal, der er større end eller lig med 1000 (f.eks. 1000, 1010 eller 1030). Vi har netop præciseret, at vi ønsker, at vores stream skal gemme mindst 1000 poster. Dette gør hukommelseshåndtering meget mere effektiv inde i Redis.

Der er også et separat hold XTRIM, som gør det samme:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Vedvarende lagring og replikering

Redis Stream replikeres asynkront til slavenoder og gemmes i filer som AOF (øjebliksbillede af alle data) og RDB (log over alle skriveoperationer). Replikering af forbrugergruppers tilstand understøttes også. Derfor, hvis en meddelelse er i "afventende" status på masterknudepunktet, så vil denne meddelelse på slaveknuderne have samme status.

Fjernelse af individuelle elementer fra en strøm

Der er en speciel kommando til at slette beskeder XDEL. Kommandoen får navnet på tråden efterfulgt af meddelelses-id'erne, der skal slettes:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Når du bruger denne kommando, skal du tage højde for, at den faktiske hukommelse ikke frigives med det samme.

Nul længde streams

Forskellen mellem streams og andre Redis-datastrukturer er, at når andre datastrukturer ikke længere har elementer i sig, vil selve datastrukturen som en bivirkning blive fjernet fra hukommelsen. Så for eksempel vil det sorterede sæt blive fuldstændig fjernet, når ZREM-kaldet fjerner det sidste element. I stedet får tråde lov til at forblive i hukommelsen, selv uden at have nogen elementer indeni.

Konklusion

Redis Stream er ideel til at oprette meddelelsesmæglere, meddelelseskøer, ensartet logning og chatsystemer til historik.

Som jeg sagde engang Niklaus Wirth, programmer er algoritmer plus datastrukturer, og Redis giver dig allerede begge dele.

Kilde: www.habr.com

Tilføj en kommentar