Redis Stream - pålitelighet og skalerbarhet for meldingssystemene dine

Redis Stream - pålitelighet og skalerbarhet for meldingssystemene dine

Redis Stream er en ny abstrakt datatype introdusert i Redis med versjon 5.0
Konseptuelt er en Redis Stream en liste som du kan legge til oppføringer i. Hver oppføring har en unik identifikator. Som standard genereres IDen automatisk og inkluderer et tidsstempel. Derfor kan du forespørre rekkevidder av poster over tid, eller motta nye data når de kommer i strømmen, omtrent som Unix "tail -f"-kommandoen leser en loggfil og fryser mens du venter på nye data. Merk at flere klienter kan lytte til en tråd samtidig, akkurat som mange "tail -f" prosesser kan lese en fil samtidig uten å komme i konflikt med hverandre.

For å forstå alle fordelene med den nye datatypen, la oss ta en rask titt på de lenge eksisterende Redis-strukturene som delvis gjenskaper funksjonaliteten til Redis Stream.

Redis PUB/SUB

Redis Pub/Sub er et enkelt meldingssystem som allerede er innebygd i nøkkelverdibutikken din. Imidlertid har enkelhet en pris:

  • Hvis utgiveren av en eller annen grunn mislykkes, mister han alle abonnentene sine
  • Utgiveren må vite den nøyaktige adressen til alle sine abonnenter
  • En utgiver kan overbelaste sine abonnenter med arbeid hvis data publiseres raskere enn det blir behandlet
  • Meldingen slettes fra utgiverens buffer umiddelbart etter publisering, uavhengig av hvor mange abonnenter den ble levert til og hvor raskt de klarte å behandle denne meldingen.
  • Alle abonnenter vil motta meldingen samtidig. Abonnentene må på en eller annen måte bli enige seg imellom om rekkefølgen for behandling av den samme meldingen.
  • Det er ingen innebygd mekanisme for å bekrefte at en abonnent har behandlet en melding. Hvis en abonnent mottar en melding og krasjer under behandlingen, vil ikke utgiveren vite om det.

Redis liste

Redis List er en datastruktur som støtter blokkering av lesekommandoer. Du kan legge til og lese meldinger fra begynnelsen eller slutten av listen. Basert på denne strukturen kan du lage en god stack eller kø for ditt distribuerte system, og i de fleste tilfeller vil dette være nok. Hovedforskjeller fra Redis Pub/Sub:

  • Meldingen leveres til én klient. Den første leseblokkerte klienten vil motta dataene først.
  • Clint må selv starte leseoperasjonen for hver melding. List vet ingenting om klienter.
  • Meldinger lagres til noen leser dem eller eksplisitt sletter dem. Hvis du konfigurerer Redis-serveren til å skylle data til disk, øker påliteligheten til systemet dramatisk.

Introduksjon til Stream

Legge til en oppføring i en strøm

Lag XADD legger til en ny oppføring i strømmen. En post er ikke bare en streng, den består av ett eller flere nøkkelverdi-par. Dermed er hver oppføring allerede strukturert og ligner strukturen til en CSV-fil.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

I eksemplet ovenfor legger vi til to felt til strømmen med navnet (nøkkel) "mystream": "sensor-id" og "temperatur" med verdiene "1234" og "19.8", henholdsvis. Som det andre argumentet tar kommandoen en identifikator som vil bli tildelt til oppføringen - denne identifikatoren identifiserer unikt hver oppføring i strømmen. Men i dette tilfellet passerte vi * fordi vi vil at Redis skal generere en ny ID for oss. Hver ny ID vil øke. Derfor vil hver ny oppføring ha en høyere identifikator i forhold til tidligere oppføringer.

Identifikatorformat

Oppførings-IDen returnert av kommandoen XADD, består av to deler:

{millisecondsTime}-{sequenceNumber}

millisekunderTid — Unix-tid i millisekunder (Redis-servertid). Men hvis gjeldende klokkeslett er det samme eller mindre enn klokkeslettet for forrige opptak, brukes tidsstemplet til forrige opptak. Derfor, hvis servertiden går tilbake i tid, vil den nye identifikatoren fortsatt beholde inkrementegenskapen.

sekvensnummer brukes for poster opprettet i samme millisekund. sekvensnummer økes med 1 i forhold til forrige oppføring. Fordi det sekvensnummer er 64 biter i størrelse, så bør du i praksis ikke kjøre inn i en grense for antall poster som kan genereres innen ett millisekund.

Formatet til slike identifikatorer kan virke rart ved første øyekast. En mistroisk leser kan lure på hvorfor tid er en del av identifikatoren. Årsaken er at Redis-strømmer støtter rekkeviddespørringer etter ID. Siden identifikatoren er knyttet til tidspunktet posten ble opprettet, gjør dette det mulig å spørre etter tidsområder. Vi skal se på et spesifikt eksempel når vi ser på kommandoen XRANGE.

Hvis brukeren av en eller annen grunn trenger å spesifisere sin egen identifikator, som for eksempel er knyttet til et eksternt system, kan vi sende det til kommandoen XADD i stedet for * som vist nedenfor:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Vær oppmerksom på at du i dette tilfellet må overvåke ID-inkrementet selv. I vårt eksempel er minimumsidentifikatoren "0-1", så kommandoen vil ikke akseptere en annen identifikator som er lik eller mindre enn "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Antall poster per strøm

Det er mulig å få opp antall poster i en strøm ganske enkelt ved å bruke kommandoen XLEN. For vårt eksempel vil denne kommandoen returnere følgende verdi:

> XLEN somestream
(integer) 2

Områdesøk - XRANGE og XREVRANGE

For å be om data etter område, må vi spesifisere to identifikatorer – begynnelsen og slutten av området. Det returnerte området vil inkludere alle elementene, inkludert grensene. Det er også to spesielle identifikatorer "-" og "+", som henholdsvis betyr den minste (første posten) og største (siste posten) identifikatoren i strømmen. Eksemplet nedenfor viser alle strømoppføringene.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Hver returnerte post er en matrise med to elementer: en identifikator og en liste over nøkkelverdi-par. Vi har allerede sagt at postidentifikatorer er relatert til tid. Derfor kan vi be om en rekkevidde for en bestemt tidsperiode. Vi kan imidlertid spesifisere i forespørselen ikke den fullstendige identifikatoren, men bare Unix-tiden, og utelate delen relatert til sekvensnummer. Den utelatte delen av identifikatoren vil automatisk bli satt til null i begynnelsen av området og til maksimalt mulig verdi på slutten av området. Nedenfor er et eksempel på hvordan du kan be om en rekkevidde på to millisekunder.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Vi har bare én oppføring i dette området, men i ekte datasett kan resultatet som returneres være enormt. Av denne grunn XRANGE støtter alternativet COUNT. Ved å spesifisere mengden kan vi ganske enkelt få de første N postene. Hvis vi trenger å få de neste N-postene (paginering), kan vi bruke sist mottatte ID, øke den sekvensnummer av én og spør igjen. La oss se på dette i følgende eksempel. Vi begynner å legge til 10 elementer med XADD (forutsatt at mystream allerede var fylt med 10 elementer). For å starte iterasjonen med å få 2 elementer per kommando, starter vi med hele området, men med COUNT lik 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

For å fortsette å iterere med de neste to elementene, må vi velge den sist mottatte ID-en, dvs. 1519073279157-0, og legge til 1 til sekvensnummer.
Den resulterende ID-en, i dette tilfellet 1519073279157-1, kan nå brukes som det nye start av rekkevidde-argumentet for neste anrop XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Og så videre. Fordi kompleksitet XRANGE er O(log(N)) for å søke og deretter O(M) for å returnere M elementer, så er hvert iterasjonstrinn raskt. Dermed bruker XRANGE bekker kan itereres effektivt.

Lag XREVRANGE er tilsvarende XRANGE, men returnerer elementene i omvendt rekkefølge:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Vær oppmerksom på at kommandoen XREVRANGE tar avstandsargumenter start og stopp i omvendt rekkefølge.

Leser nye oppføringer ved hjelp av XREAD

Ofte oppstår oppgaven med å abonnere på en strøm og kun motta nye meldinger. Dette konseptet kan virke likt Redis Pub/Sub eller blokkering av Redis List, men det er grunnleggende forskjeller i hvordan du bruker Redis Stream:

  1. Hver ny melding leveres til hver abonnent som standard. Denne oppførselen er forskjellig fra en blokkerende Redis-liste, der en ny melding bare vil bli lest av én abonnent.
  2. Mens alle meldinger i Redis Pub/Sub blir glemt og aldri vedvarer, beholdes alle meldinger i Stream på ubestemt tid (med mindre klienten eksplisitt forårsaker sletting).
  3. Redis Stream lar deg skille tilgang til meldinger i én strøm. En spesifikk abonnent kan bare se sin personlige meldingshistorikk.

Du kan abonnere på en tråd og motta nye meldinger ved å bruke kommandoen XLES. Det er litt mer komplisert enn XRANGE, så vi starter med de enklere eksemplene først.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Eksemplet ovenfor viser et ikke-blokkerende skjema XLES. Merk at COUNT-alternativet er valgfritt. Faktisk er det eneste nødvendige kommandoalternativet STREAMS-alternativet, som spesifiserer en liste over strømmer sammen med den tilsvarende maksimale identifikatoren. Vi skrev "STREAMS mystream 0" - vi ønsker å motta alle poster for mystream-strømmen med en identifikator større enn "0-0". Som du kan se fra eksempelet, returnerer kommandoen navnet på tråden fordi vi kan abonnere på flere tråder samtidig. Vi kan for eksempel skrive "STREAMS mystream otherstream 0 0". Vær oppmerksom på at etter STREAMS-alternativet må vi først oppgi navnene på alle nødvendige strømmer og først deretter en liste over identifikatorer.

I denne enkle formen gjør ikke kommandoen noe spesielt i forhold til XRANGE. Det interessante er imidlertid at vi lett kan snu XLES til en blokkeringskommando, som spesifiserer BLOCK-argumentet:

> XREAD BLOCK 0 STREAMS mystream $

I eksemplet ovenfor er et nytt BLOKK-alternativ spesifisert med en timeout på 0 millisekunder (dette betyr å vente på ubestemt tid). Dessuten, i stedet for å sende den vanlige identifikatoren for strømmen mystream, ble en spesiell identifikator $ sendt. Denne spesielle identifikatoren betyr det XLES må bruke den maksimale identifikatoren i mystream som identifikator. Så vi vil bare motta nye meldinger fra det øyeblikket vi begynte å lytte. På noen måter ligner dette på Unix "tail -f"-kommandoen.

Merk at når du bruker BLOKK-alternativet, trenger vi ikke nødvendigvis å bruke den spesielle identifikatoren $. Vi kan bruke hvilken som helst identifikator som finnes i strømmen. Hvis teamet kan betjene forespørselen vår umiddelbart uten å blokkere, vil det gjøre det, ellers blokkeres det.

Blokkering XLES kan også lytte til flere tråder samtidig, du trenger bare å spesifisere navnene deres. I dette tilfellet vil kommandoen returnere en registrering av den første strømmen som mottok data. Den første abonnenten som er blokkert for en gitt tråd vil motta data først.

Forbrukergrupper

I visse oppgaver ønsker vi å begrense abonnentens tilgang til meldinger innenfor én tråd. Et eksempel der dette kan være nyttig er en meldingskø med arbeidere som vil motta forskjellige meldinger fra en tråd, slik at meldingsbehandlingen kan skaleres.

Hvis vi forestiller oss at vi har tre abonnenter C1, C2, C3 og en tråd som inneholder meldinger 1, 2, 3, 4, 5, 6, 7, vil meldingene bli servert som i diagrammet nedenfor:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

For å oppnå denne effekten bruker Redis Stream et konsept kalt Consumer Group. Dette konseptet ligner på en pseudo-abonnent, som mottar data fra en strøm, men som faktisk betjenes av flere abonnenter i en gruppe, og gir visse garantier:

  1. Hver melding leveres til en annen abonnent i gruppen.
  2. Innenfor en gruppe identifiseres abonnenter med navnet sitt, som er en streng som skiller mellom store og små bokstaver. Hvis en abonnent midlertidig faller ut av gruppen, kan han gjenopprettes til gruppen ved å bruke sitt eget unike navn.
  3. Hver forbrukergruppe følger konseptet "første uleste melding". Når en abonnent ber om nye meldinger, kan den bare motta meldinger som aldri tidligere har blitt levert til noen abonnent i gruppen.
  4. Det er en kommando for å eksplisitt bekrefte at meldingen ble behandlet av abonnenten. Inntil denne kommandoen blir kalt, vil den forespurte meldingen forbli i "venter"-status.
  5. Innenfor forbrukergruppen kan hver abonnent be om en historikk over meldinger som ble levert til ham, men som ennå ikke er behandlet (i «venter»-status)

På en måte kan gruppens tilstand uttrykkes som følger:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nå er det på tide å bli kjent med hovedkommandoene til Forbrukergruppen, nemlig:

  • XGROUP brukes til å opprette, ødelegge og administrere grupper
  • XREADGROUP brukes til å lese strøm gjennom gruppen
  • XACK - denne kommandoen lar abonnenten merke meldingen som vellykket behandlet

Opprettelse av forbrukergruppe

La oss anta at mystream allerede eksisterer. Deretter vil kommandoen for opprettelse av gruppe se slik ut:

> XGROUP CREATE mystream mygroup $
OK

Når vi oppretter en gruppe, må vi sende en identifikator, fra hvilken gruppen vil motta meldinger. Hvis vi bare ønsker å motta alle nye meldinger, kan vi bruke den spesielle identifikatoren $ (som i vårt eksempel ovenfor). Hvis du angir 0 i stedet for en spesiell identifikator, vil alle meldinger i tråden være tilgjengelige for gruppen.

Nå som gruppen er opprettet, kan vi umiddelbart begynne å lese meldinger ved å bruke kommandoen XREADGROUP. Denne kommandoen er veldig lik XLES og støtter det valgfrie BLOKK-alternativet. Imidlertid er det et obligatorisk GROUP-alternativ som alltid må spesifiseres med to argumenter: gruppenavnet og abonnentnavnet. Alternativet COUNT støttes også.

Før du leser tråden, la oss legge inn noen meldinger der:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

La oss nå prøve å lese denne strømmen gjennom gruppen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Kommandoen ovenfor lyder ordrett som følger:

"Jeg, abonnent Alice, et medlem av mygroup, ønsker å lese en melding fra mystream som aldri har blitt levert til noen før."

Hver gang en abonnent utfører en operasjon på en gruppe, må den oppgi navnet sitt, og identifisere seg unikt i gruppen. Det er en annen svært viktig detalj i kommandoen ovenfor - den spesielle identifikatoren ">". Denne spesielle identifikatoren filtrerer meldinger, og etterlater bare de som aldri har blitt levert før.

I spesielle tilfeller kan du også spesifisere en reell identifikator som 0 eller en annen gyldig identifikator. I dette tilfellet kommandoen XREADGROUP vil returnere deg en historie med meldinger med statusen "venter" som ble levert til den spesifiserte abonnenten (Alice), men som ennå ikke er bekreftet ved hjelp av kommandoen XACK.

Vi kan teste denne oppførselen ved å spesifisere ID 0 umiddelbart, uten alternativet COUNT. Vi vil ganske enkelt se en enkelt ventende melding, det vil si eplemeldingen:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Men hvis vi bekrefter meldingen som vellykket behandlet, vil den ikke lenger vises:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nå er det Bobs tur til å lese noe:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, et medlem av min gruppe, ba om ikke mer enn to meldinger. Kommandoen rapporterer kun uleverte meldinger på grunn av den spesielle identifikatoren ">". Som du kan se, vil ikke meldingen "eple" vises siden den allerede er levert til Alice, så Bob mottar "oransje" og "jordbær".

På denne måten kan Alice, Bob og alle andre abonnenter på gruppen lese forskjellige meldinger fra samme strøm. De kan også lese historien om ubehandlede meldinger eller merke meldinger som behandlet.

Det er et par ting å huske på:

  • Så snart abonnenten anser meldingen som en kommando XREADGROUP, går denne meldingen inn i "ventende" tilstand og tilordnes den spesifikke abonnenten. Andre gruppeabonnenter vil ikke kunne lese denne meldingen.
  • Abonnenter opprettes automatisk ved første omtale, det er ikke nødvendig å eksplisitt opprette dem.
  • Med XREADGROUP du kan lese meldinger fra flere forskjellige tråder samtidig, men for at dette skal fungere må du først opprette grupper med samme navn for hver tråd ved å bruke XGROUP

Gjenoppretting etter feil

Abonnenten kan komme seg etter feilen og lese listen over meldinger på nytt med statusen "venter". Men i den virkelige verden kan abonnenter til slutt mislykkes. Hva skjer med en abonnents meldinger hvis abonnenten ikke klarer å komme seg etter en feil?
Consumer Group tilbyr en funksjon som brukes til nettopp slike tilfeller – når du skal endre eier av meldinger.

Det første du må gjøre er å ringe kommandoen XPENDING, som viser alle meldinger i gruppen med statusen «venter». I sin enkleste form kalles kommandoen med bare to argumenter: trådnavnet og gruppenavnet:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Teamet viste antall ubehandlede meldinger for hele gruppen og for hver abonnent. Vi har bare Bob med to utestående meldinger fordi den eneste meldingen Alice ba om ble bekreftet med XACK.

Vi kan be om mer informasjon ved å bruke flere argumenter:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - rekke identifikatorer (du kan bruke "-" og "+")
{count} – antall leveringsforsøk
{consumer-name} – gruppenavn

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nå har vi detaljer for hver melding: ID, abonnentnavn, inaktiv tid i millisekunder og til slutt antall leveringsforsøk. Vi har to meldinger fra Bob, og de har vært inaktive i 74170458 millisekunder, omtrent 20 timer.

Vær oppmerksom på at ingen hindrer oss i å sjekke innholdet i meldingen ganske enkelt ved å bruke XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Vi må bare gjenta den samme identifikatoren to ganger i argumentene. Nå som vi har en idé, kan det hende at Alice bestemmer seg for at Bob sannsynligvis ikke vil komme seg etter 20 timers nedetid, og det er på tide å spørre etter disse meldingene og fortsette behandlingen av dem for Bob. Til dette bruker vi kommandoen XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Ved å bruke denne kommandoen kan vi motta en "utenlandsk" melding som ennå ikke er behandlet ved å endre eieren til {consumer}. Vi kan imidlertid også gi en minimum inaktiv tid {min-idle-time}. Dette bidrar til å unngå en situasjon der to klienter prøver å endre eieren av de samme meldingene samtidig:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Den første kunden vil nullstille nedetiden og øke leveringstelleren. Så den andre klienten vil ikke kunne be om det.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Meldingen ble gjort krav på av Alice, som nå kan behandle meldingen og bekrefte den.

Fra eksemplet ovenfor kan du se at en vellykket forespørsel returnerer innholdet i selve meldingen. Dette er imidlertid ikke nødvendig. Alternativet JUSTID kan kun brukes til å returnere meldings-IDer. Dette er nyttig hvis du ikke er interessert i detaljene i meldingen og ønsker å øke systemytelsen.

Leveringsdisk

Telleren du ser i utgangen XPENDING er antall leveranser av hver melding. En slik teller økes på to måter: når en melding er vellykket forespurt via XCLAIM eller når en samtale brukes XREADGROUP.

Det er normalt at noen meldinger leveres flere ganger. Hovedsaken er at alle meldinger til slutt blir behandlet. Noen ganger oppstår det problemer ved behandling av en melding fordi selve meldingen er ødelagt, eller at meldingsbehandling forårsaker en feil i behandlerkoden. I dette tilfellet kan det vise seg at ingen vil kunne behandle denne meldingen. Siden vi har en teller for leveringsforsøk, kan vi bruke denne telleren til å oppdage slike situasjoner. Derfor, når leveringsantallet når det høye tallet du spesifiserer, vil det sannsynligvis være lurere å legge en slik melding på en annen tråd og sende en melding til systemadministratoren.

Trådtilstand

Lag XINFO brukes til å be om ulike opplysninger om en tråd og dens grupper. For eksempel ser en grunnleggende kommando slik ut:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Kommandoen ovenfor viser generell informasjon om den angitte strømmen. Nå et litt mer komplekst eksempel:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Kommandoen ovenfor viser generell informasjon for alle gruppene i den angitte tråden

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Kommandoen ovenfor viser informasjon for alle abonnenter av den angitte strømmen og gruppen.
Hvis du glemmer kommandosyntaksen, spør bare kommandoen selv om hjelp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Strømbegrensning

Mange applikasjoner ønsker ikke å samle data i en strøm for alltid. Det er ofte nyttig å ha et maksimalt antall tillatte meldinger per tråd. I andre tilfeller er det nyttig å flytte alle meldinger fra en tråd til en annen vedvarende lagring når den angitte trådstørrelsen er nådd. Du kan begrense størrelsen på en strøm ved å bruke MAXLEN-parameteren i kommandoen XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Når du bruker MAXLEN, slettes gamle poster automatisk når de når en spesifisert lengde, slik at strømmen har konstant størrelse. Imidlertid skjer ikke beskjæring i dette tilfellet på den mest effektive måten i Redis minne. Du kan forbedre situasjonen på følgende måte:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

~-argumentet i eksemplet ovenfor betyr at vi ikke nødvendigvis trenger å begrense strømlengden til en bestemt verdi. I vårt eksempel kan dette være et hvilket som helst tall som er større enn eller lik 1000 (for eksempel 1000, 1010 eller 1030). Vi spesifiserte nettopp at vi vil at strømmen vår skal lagre minst 1000 poster. Dette gjør minneadministrasjonen mye mer effektiv inne i Redis.

Det er også et eget lag XTRIM, som gjør det samme:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Vedvarende lagring og replikering

Redis Stream replikeres asynkront til slavenoder og lagres i filer som AOF (øyeblikksbilde av alle data) og RDB (logg over alle skriveoperasjoner). Replikering av forbrukergruppers tilstand støttes også. Derfor, hvis en melding er i "ventende" status på masternoden, vil denne meldingen på slavenodene ha samme status.

Fjerning av individuelle elementer fra en strøm

Det er en spesiell kommando for å slette meldinger XDEL. Kommandoen får navnet på tråden etterfulgt av meldings-ID-ene som skal slettes:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Når du bruker denne kommandoen, må du ta hensyn til at det faktiske minnet ikke frigjøres umiddelbart.

Strømmer med null lengde

Forskjellen mellom strømmer og andre Redis-datastrukturer er at når andre datastrukturer ikke lenger har elementer i seg, som en bieffekt, vil selve datastrukturen bli fjernet fra minnet. Så for eksempel vil det sorterte settet bli fullstendig fjernet når ZREM-kallet fjerner det siste elementet. I stedet får tråder forbli i minnet selv uten å ha noen elementer inne.

Konklusjon

Redis Stream er ideell for å lage meldingsmeglere, meldingskøer, enhetlig logging og historikkbevarende chattesystemer.

Som jeg sa en gang Niklaus Wirth, programmer er algoritmer pluss datastrukturer, og Redis gir deg allerede begge deler.

Kilde: www.habr.com

Legg til en kommentar