Redis Stream - betroubaarheid en skaalbaarheid van jou boodskapstelsels

Redis Stream - betroubaarheid en skaalbaarheid van jou boodskapstelsels

Redis Stream is 'n nuwe abstrakte datatipe wat in Redis bekendgestel is met weergawe 5.0
Konseptueel is Redis Stream 'n lys waarby u inskrywings kan voeg. Elke inskrywing het 'n unieke identifiseerder. By verstek word die ID outomaties gegenereer en sluit 'n tydstempel in. Daarom kan jy met verloop van tyd navraag doen oor reekse rekords, of nuwe data ontvang soos dit in die stroom aankom, baie soos die Unix "tail -f"-opdrag 'n loglêer lees en vries terwyl jy wag vir nuwe data. Let daarop dat verskeie kliënte tegelykertyd na 'n draad kan luister, net soos baie "stert -f"-prosesse 'n lêer gelyktydig kan lees sonder om met mekaar te bots.

Om al die voordele van die nuwe datatipe te verstaan, kom ons kyk vinnig na die lank bestaande Redis-strukture wat die funksionaliteit van Redis Stream gedeeltelik herhaal.

Redis PUB/SUB

Redis Pub/Sub is 'n eenvoudige boodskapstelsel wat reeds in jou sleutelwaardewinkel ingebou is. Eenvoud kom egter teen 'n prys:

  • As die uitgewer om een ​​of ander rede misluk, verloor hy al sy intekenare
  • Die uitgewer moet die presiese adres van al sy intekenare weet
  • 'n Uitgewer kan sy intekenare oorlaai met werk as data vinniger gepubliseer word as wat dit verwerk word
  • Die boodskap word onmiddellik na publikasie van die uitgewer se buffer uitgevee, ongeag aan hoeveel intekenare dit afgelewer is en hoe vinnig hulle hierdie boodskap kon verwerk.
  • Alle intekenare sal die boodskap op dieselfde tyd ontvang. Intekenare self moet op een of ander manier onder mekaar ooreenkom oor die volgorde van verwerking van dieselfde boodskap.
  • Daar is geen ingeboude meganisme om te bevestig dat 'n intekenaar 'n boodskap suksesvol verwerk het nie. As 'n intekenaar 'n boodskap ontvang en ineenstort tydens verwerking, sal die uitgewer nie daarvan weet nie.

Redis Lys

Redis List is 'n datastruktuur wat blokkeer leesopdragte ondersteun. Jy kan boodskappe vanaf die begin of einde van die lys byvoeg en lees. Op grond van hierdie struktuur kan jy 'n goeie stapel of tou vir jou verspreide stelsel maak, en in die meeste gevalle sal dit genoeg wees. Belangrikste verskille van Redis Pub/Sub:

  • Die boodskap word aan een kliënt afgelewer. Die eerste leesgeblokkeerde kliënt sal die data eerste ontvang.
  • Clint moet self die leesoperasie vir elke boodskap begin. Lys weet niks van kliënte nie.
  • Boodskappe word gestoor totdat iemand dit lees of dit uitdruklik uitvee. As jy die Redis-bediener instel om data na skyf te spoel, verhoog die betroubaarheid van die stelsel dramaties.

Inleiding tot stroom

Voeg 'n inskrywing by 'n stroom

Span XADD voeg 'n nuwe inskrywing by die stroom. 'n Rekord is nie net 'n string nie, dit bestaan ​​uit een of meer sleutel-waarde-pare. Elke inskrywing is dus reeds gestruktureer en lyk soos die struktuur van 'n CSV-lêer.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

In die voorbeeld hierbo voeg ons twee velde by die stroom met die naam (sleutel) "mystream": "sensor-id" en "temperatuur" met die waardes "1234" en "19.8", onderskeidelik. As die tweede argument neem die opdrag 'n identifiseerder wat aan die inskrywing toegeken sal word - hierdie identifiseerder identifiseer elke inskrywing in die stroom uniek. In hierdie geval het ons egter * geslaag omdat ons wil hê Redis moet 'n nuwe ID vir ons genereer. Elke nuwe ID sal toeneem. Daarom sal elke nuwe inskrywing 'n hoër identifiseerder hê in verhouding tot vorige inskrywings.

Identifiseerder formaat

Die inskrywing-ID wat deur die opdrag teruggestuur word XADD, bestaan ​​uit twee dele:

{millisecondsTime}-{sequenceNumber}

millisekondesTyd - Unix-tyd in millisekondes (Redis-bedienertyd). As die huidige tyd egter dieselfde of minder is as die tyd van die vorige opname, dan word die tydstempel van die vorige opname gebruik. As die bedienertyd dus in tyd teruggaan, sal die nuwe identifiseerder steeds die inkrementeienskap behou.

volgordenommer gebruik vir rekords wat in dieselfde millisekonde geskep is. volgordenommer sal met 1 verhoog word relatief tot die vorige inskrywing. Omdat die volgordenommer is 64 bisse groot, dan moet jy in die praktyk nie 'n beperking raak op die aantal rekords wat binne een millisekonde gegenereer kan word nie.

Die formaat van sulke identifiseerders lyk dalk met die eerste oogopslag vreemd. ’n Wantrouvolle leser sal dalk wonder hoekom tyd deel van die identifiseerder is. Die rede is dat Redis-strome reeksnavrae volgens ID ondersteun. Aangesien die identifiseerder geassosieer word met die tyd wat die rekord geskep is, maak dit dit moontlik om navraag te doen oor tydreekse. Ons sal na 'n spesifieke voorbeeld kyk wanneer ons na die opdrag kyk XRANGE.

As die gebruiker om een ​​of ander rede sy eie identifiseerder moet spesifiseer, wat byvoorbeeld met een of ander eksterne stelsel geassosieer word, dan kan ons dit aan die opdrag gee XADD in plaas van * soos hieronder getoon:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Neem asseblief kennis dat u in hierdie geval self die ID-inkrement moet monitor. In ons voorbeeld is die minimum identifiseerder "0-1", so die opdrag sal nie 'n ander identifiseerder aanvaar wat gelyk is aan of minder as "0-1" is.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Aantal rekords per stroom

Dit is moontlik om die aantal rekords in 'n stroom te kry bloot deur die opdrag te gebruik XLEN. Vir ons voorbeeld sal hierdie opdrag die volgende waarde terugstuur:

> XLEN somestream
(integer) 2

Reeksnavrae - XRANGE en XREVRANGE

Om data volgens reeks aan te vra, moet ons twee identifiseerders spesifiseer - die begin en die einde van die reeks. Die teruggekeerde reeks sal alle elemente insluit, insluitend die grense. Daar is ook twee spesiale identifiseerders "-" en "+", wat onderskeidelik die kleinste (eerste rekord) en grootste (laaste rekord) identifiseerder in die stroom beteken. Die voorbeeld hieronder sal al die stroominskrywings lys.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Elke teruggestuurde rekord is 'n reeks van twee elemente: 'n identifiseerder en 'n lys sleutel-waarde-pare. Ons het reeds gesê dat rekord identifiseerders met tyd verband hou. Daarom kan ons 'n reeks van 'n spesifieke tydperk aanvra. Ons kan egter in die versoek nie die volle identifiseerder spesifiseer nie, maar slegs die Unix-tyd, en die deel wat verband hou met weglaat volgordenommer. Die weggelaat deel van die identifiseerder sal outomaties op nul gestel word aan die begin van die reeks en op die maksimum moontlike waarde aan die einde van die reeks. Hieronder is 'n voorbeeld van hoe jy 'n reeks van twee millisekondes kan aanvra.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Ons het net een inskrywing in hierdie reeks, maar in werklike datastelle kan die resultaat wat opgelewer word, groot wees. Vir hierdie rede XRANGE ondersteun die COUNT opsie. Deur die hoeveelheid te spesifiseer, kan ons eenvoudig die eerste N rekords kry. As ons die volgende N rekords (paginering) moet kry, kan ons die laaste ontvang ID gebruik, dit verhoog volgordenommer een en vra weer. Kom ons kyk hierna in die volgende voorbeeld. Ons begin 10 elemente byvoeg met XADD (aanvaar mystream was reeds gevul met 10 elemente). Om die iterasie te begin om 2 elemente per opdrag te kry, begin ons met die volle reeks, maar met COUNT gelyk aan 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Om voort te gaan om met die volgende twee elemente te herhaal, moet ons die laaste ID wat ontvang is, dws 1519073279157-0, kies en 1 byvoeg by volgordenommer.
Die gevolglike ID, in hierdie geval 1519073279157-1, kan nou gebruik word as die nuwe begin van reeks argument vir die volgende oproep XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

En so aan. Omdat kompleksiteit XRANGE is O(log(N)) om te soek en dan O(M) om M elemente terug te gee, dan is elke iterasiestap vinnig. Dus, gebruik XRANGE strome kan doeltreffend herhaal word.

Span XREVRANGE is die ekwivalent XRANGE, maar gee die elemente in omgekeerde volgorde terug:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Neem asseblief kennis dat die opdrag XREVRANGE neem reeks argumente begin en stop in omgekeerde volgorde.

Lees nuwe inskrywings met XREAD

Dikwels ontstaan ​​die taak om op 'n stroom in te teken en slegs nuwe boodskappe te ontvang. Hierdie konsep lyk dalk soortgelyk aan Redis Pub/Sub of blokkeer Redis List, maar daar is fundamentele verskille in hoe om Redis Stream te gebruik:

  1. Elke nuwe boodskap word by verstek aan elke intekenaar afgelewer. Hierdie gedrag verskil van 'n blokkerende Redis-lys, waar 'n nuwe boodskap slegs deur een intekenaar gelees sal word.
  2. Terwyl in Redis Pub/Sub alle boodskappe vergeet word en nooit volgehou word nie, word alle boodskappe in Stream onbepaald behou (tensy die kliënt uitdruklik uitvee veroorsaak).
  3. Redis Stream laat jou toe om toegang tot boodskappe binne een stroom te onderskei. 'n Spesifieke intekenaar kan slegs hul persoonlike boodskapgeskiedenis sien.

Jy kan op 'n draad inteken en nuwe boodskappe ontvang deur die opdrag te gebruik XLEES. Dit is 'n bietjie meer ingewikkeld as XRANGE, so ons sal eers met die eenvoudiger voorbeelde begin.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Die voorbeeld hierbo toon 'n nie-blokkerende vorm XLEES. Let daarop dat die COUNT opsie opsioneel is. Trouens, die enigste vereiste opdragopsie is die STREAMS-opsie, wat 'n lys strome spesifiseer saam met die ooreenstemmende maksimum identifiseerder. Ons het "STREAMS mystream 0" geskryf - ons wil alle rekords van die mystream-stroom ontvang met 'n identifiseerder groter as "0-0". Soos u uit die voorbeeld kan sien, gee die opdrag die naam van die draad terug omdat ons op verskeie drade gelyktydig kan inteken. Ons kan byvoorbeeld skryf "STREAMS mystream otherstream 0 0". Neem asseblief kennis dat ons na die STREAMS-opsie eers die name van al die vereiste strome moet verskaf en eers daarna 'n lys identifiseerders.

In hierdie eenvoudige vorm doen die opdrag niks spesiaals in vergelyking met XRANGE. Die interessante ding is egter dat ons maklik kan draai XLEES na 'n blokkeeropdrag, wat die BLOCK-argument spesifiseer:

> XREAD BLOCK 0 STREAMS mystream $

In die voorbeeld hierbo word 'n nuwe BLOKKIE-opsie gespesifiseer met 'n time-out van 0 millisekondes (dit beteken om onbepaald te wag). Boonop, in plaas daarvan om die gewone identifiseerder vir die stroom mystream deur te gee, is 'n spesiale identifiseerder $ geslaag. Hierdie spesiale identifiseerder beteken dat XLEES moet die maksimum identifiseerder in mystream as die identifiseerder gebruik. Ons sal dus net nuwe boodskappe ontvang vanaf die oomblik dat ons begin luister het. Op sommige maniere is dit soortgelyk aan die Unix "stert -f" opdrag.

Let daarop dat wanneer ons die BLOCK-opsie gebruik, ons nie noodwendig die spesiale identifiseerder $ hoef te gebruik nie. Ons kan enige identifiseerder wat in die stroom bestaan, gebruik. As die span ons versoek onmiddellik kan bedien sonder om te blokkeer, sal dit dit doen, anders sal dit blokkeer.

Blokkering XLEES kan ook na verskeie drade gelyktydig luister, jy hoef net hul name te spesifiseer. In hierdie geval sal die opdrag 'n rekord terugstuur van die eerste stroom wat data ontvang het. Die eerste intekenaar wat vir 'n gegewe draad geblokkeer is, sal eerste data ontvang.

Verbruikersgroepe

In sekere take wil ons intekenaartoegang tot boodskappe binne een draad beperk. 'n Voorbeeld waar dit nuttig kan wees, is 'n boodskap-tou met werkers wat verskillende boodskappe van 'n draad sal ontvang, wat boodskapverwerking toelaat om te skaal.

As ons ons voorstel dat ons drie intekenare C1, C2, C3 het en 'n draad wat boodskappe 1, 2, 3, 4, 5, 6, 7 bevat, sal die boodskappe bedien word soos in die diagram hieronder:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Om hierdie effek te bereik, gebruik Redis Stream 'n konsep genaamd Consumer Group. Hierdie konsep is soortgelyk aan 'n pseudo-intekenaar, wat data van 'n stroom ontvang, maar word eintlik bedien deur verskeie intekenare binne 'n groep, wat sekere waarborge bied:

  1. Elke boodskap word aan 'n ander intekenaar binne die groep afgelewer.
  2. Binne 'n groep word intekenare geïdentifiseer deur hul naam, wat 'n hooflettersensitiewe string is. As 'n intekenaar tydelik uit die groep val, kan hy met sy eie unieke naam na die groep herstel word.
  3. Elke Verbruikersgroep volg die "eerste ongeleesde boodskap"-konsep. Wanneer 'n intekenaar nuwe boodskappe versoek, kan dit slegs boodskappe ontvang wat nog nooit voorheen aan enige intekenaar binne die groep afgelewer is nie.
  4. Daar is 'n opdrag om uitdruklik te bevestig dat die boodskap suksesvol deur die intekenaar verwerk is. Totdat hierdie opdrag geroep word, sal die versoekte boodskap in die "hangende" status bly.
  5. Binne die Verbruikersgroep kan elke intekenaar 'n geskiedenis aanvra van boodskappe wat aan hom afgelewer is, maar nog nie verwerk is nie (in die "hangende" status)

In 'n sekere sin kan die toestand van die groep soos volg uitgedruk word:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nou is dit tyd om kennis te maak met die hoofopdragte vir die Verbruikersgroep, naamlik:

  • XGROUP gebruik om groepe te skep, te vernietig en te bestuur
  • XREADGROEP gebruik om stroom deur groep te lees
  • XACK - hierdie opdrag laat die intekenaar toe om die boodskap as suksesvol verwerk te merk

Skep van Verbruikersgroep

Kom ons neem aan dat mystream reeds bestaan. Dan sal die groepskeppingsbevel soos volg lyk:

> XGROUP CREATE mystream mygroup $
OK

Wanneer 'n groep geskep word, moet ons 'n identifiseerder deurgee, vanaf waar die groep boodskappe sal ontvang. As ons net alle nuwe boodskappe wil ontvang, kan ons die spesiale identifiseerder $ gebruik (soos in ons voorbeeld hierbo). As jy 0 in plaas van 'n spesiale identifiseerder spesifiseer, sal alle boodskappe in die draad vir die groep beskikbaar wees.

Noudat die groep geskep is, kan ons dadelik begin om boodskappe te lees deur die opdrag te gebruik XREADGROEP. Hierdie opdrag is baie soortgelyk aan XLEES en ondersteun die opsionele BLOCK-opsie. Daar is egter 'n vereiste GROEP-opsie wat altyd met twee argumente gespesifiseer moet word: die groepnaam en die intekenaarnaam. Die COUNT-opsie word ook ondersteun.

Voordat ons die draad lees, laat ons 'n paar boodskappe daar plaas:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Kom ons probeer nou om hierdie stroom deur die groep te lees:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Die bogenoemde opdrag lees woordeliks soos volg:

"Ek, intekenaar Alice, 'n lid van die mygroep, wil een boodskap van mystream lees wat nog nooit voorheen aan iemand afgelewer is nie."

Elke keer as 'n intekenaar 'n bewerking op 'n groep uitvoer, moet dit sy naam verskaf, wat homself uniek binne die groep identifiseer. Daar is nog een baie belangrike detail in die bogenoemde opdrag - die spesiale identifiseerder ">". Hierdie spesiale identifiseerder filtreer boodskappe en laat net dié wat nog nooit voorheen afgelewer is nie.

Ook, in spesiale gevalle, kan jy 'n werklike identifiseerder soos 0 of enige ander geldige identifiseerder spesifiseer. In hierdie geval die opdrag XREADGROEP sal vir jou 'n geskiedenis terugstuur van boodskappe met 'n status van "hangend" wat aan die gespesifiseerde intekenaar (Alice) afgelewer is, maar nog nie erken is deur die opdrag te gebruik nie XACK.

Ons kan hierdie gedrag toets deur onmiddellik die ID 0 te spesifiseer, sonder die opsie COUNT. Ons sal bloot 'n enkele hangende boodskap sien, dit wil sê die appelboodskap:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

As ons egter bevestig dat die boodskap suksesvol verwerk is, sal dit nie meer vertoon word nie:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nou is dit Bob se beurt om iets te lees:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, 'n lid van mygroup, het nie meer as twee boodskappe gevra nie. Die opdrag rapporteer slegs onafgelewerde boodskappe as gevolg van die spesiale identifiseerder ">". Soos jy kan sien, sal die boodskap "appel" nie vertoon word nie aangesien dit reeds by Alice afgelewer is, so Bob ontvang "oranje" en "aarbei".

Op hierdie manier kan Alice, Bob en enige ander intekenaar op die groep verskillende boodskappe uit dieselfde stroom lees. Hulle kan ook hul geskiedenis van onverwerkte boodskappe lees of boodskappe as verwerk merk.

Daar is 'n paar dinge om in gedagte te hou:

  • Sodra die intekenaar die boodskap as 'n opdrag beskou XREADGROEP, gaan hierdie boodskap in die "hangende" toestand en word aan daardie spesifieke intekenaar toegewys. Ander groepintekenare sal nie hierdie boodskap kan lees nie.
  • Intekenare word outomaties geskep by eerste vermelding, dit is nie nodig om hulle uitdruklik te skep nie.
  • Met XREADGROEP jy kan boodskappe van verskeie verskillende drade op dieselfde tyd lees, maar om dit te werk moet jy eers groepe met dieselfde naam vir elke draad skep met XGROUP

Herstel van mislukkings

Die intekenaar kan herstel van die mislukking en weer sy lys boodskappe met die "hangende" status lees. In die regte wêreld kan intekenare egter uiteindelik misluk. Wat gebeur met 'n intekenaar se boodskappe wat vashaak as die intekenaar nie van 'n mislukking kan herstel nie?
Consumer Group bied 'n kenmerk wat gebruik word vir net sulke gevalle - wanneer jy die eienaar van boodskappe moet verander.

Die eerste ding wat jy moet doen is om die opdrag te roep VERKEER, wat alle boodskappe in die groep met die status "hangend" vertoon. In sy eenvoudigste vorm word die opdrag met slegs twee argumente genoem: die draadnaam en die groepnaam:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Die span het die aantal onverwerkte boodskappe vir die hele groep en vir elke intekenaar vertoon. Ons het net vir Bob met twee uitstaande boodskappe, want die enigste boodskap waarmee Alice versoek is, is bevestig XACK.

Ons kan meer inligting aanvra deur meer argumente te gebruik:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - reeks identifiseerders (jy kan "-" en "+") gebruik)
{count} — aantal afleweringspogings
{consumer-name} – groepnaam

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nou het ons besonderhede vir elke boodskap: ID, intekenaarnaam, ledige tyd in millisekondes en laastens die aantal afleweringspogings. Ons het twee boodskappe van Bob af en hulle was vir 74170458 millisekondes ledig, ongeveer 20 uur.

Neem asseblief kennis dat niemand ons keer om te kyk wat die inhoud van die boodskap was bloot deur te gebruik nie XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Ons moet net dieselfde identifiseerder twee keer in die argumente herhaal. Noudat ons 'n idee het, kan Alice besluit dat Bob waarskynlik nie sal herstel na 20 uur se stilstand nie, en dit is tyd om na daardie boodskappe navraag te doen en die verwerking daarvan vir Bob te hervat. Hiervoor gebruik ons ​​die opdrag XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Deur hierdie opdrag te gebruik, kan ons 'n "vreemde" boodskap ontvang wat nog nie verwerk is nie deur die eienaar na {consumer} te verander. Ons kan egter ook 'n minimum ledige tyd {min-idle-time} verskaf. Dit help om 'n situasie te vermy waar twee kliënte gelyktydig probeer om die eienaar van dieselfde boodskappe te verander:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Die eerste kliënt sal die stilstand terugstel en die afleweringsteller verhoog. Die tweede kliënt sal dit dus nie kan versoek nie.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Die boodskap is suksesvol opgeëis deur Alice, wat nou die boodskap kan verwerk en dit kan erken.

Uit die voorbeeld hierbo kan u sien dat 'n suksesvolle versoek die inhoud van die boodskap self terugstuur. Dit is egter nie nodig nie. Die JUSTID-opsie kan slegs gebruik word om boodskap-ID's terug te gee. Dit is nuttig as jy nie in die besonderhede van die boodskap belangstel nie en stelselwerkverrigting wil verhoog.

Afleweringstoonbank

Die teller wat jy in die uitset sien VERKEER is die aantal aflewerings van elke boodskap. So 'n teller word op twee maniere verhoog: wanneer 'n boodskap suksesvol aangevra word via XCLAIM of wanneer 'n oproep gebruik word XREADGROEP.

Dit is normaal dat sommige boodskappe verskeie kere afgelewer word. Die belangrikste ding is dat alle boodskappe uiteindelik verwerk word. Soms kom probleme voor wanneer 'n boodskap verwerk word omdat die boodskap self korrup is, of boodskapverwerking 'n fout in die hanteerderkode veroorsaak. In hierdie geval kan dit blyk dat niemand hierdie boodskap sal kan verwerk nie. Aangesien ons 'n afleweringspogingteller het, kan ons hierdie teller gebruik om sulke situasies op te spoor. Dus, sodra die afleweringstelling die hoë getal bereik wat jy spesifiseer, sal dit waarskynlik wyser wees om so 'n boodskap op 'n ander draad te plaas en 'n kennisgewing aan die stelseladministrateur te stuur.

Draad Staat

Span XINFO gebruik om verskeie inligting oor 'n draad en sy groepe aan te vra. Byvoorbeeld, 'n basiese opdrag lyk soos volg:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Die opdrag hierbo vertoon algemene inligting oor die gespesifiseerde stroom. Nou 'n effens meer komplekse voorbeeld:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Die opdrag hierbo vertoon algemene inligting vir alle groepe van die gespesifiseerde draad

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Die opdrag hierbo vertoon inligting vir alle intekenare van die gespesifiseerde stroom en groep.
As jy die opdragsintaksis vergeet, vra net die opdrag self vir hulp:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stroomgrootte beperking

Baie toepassings wil nie vir ewig data in 'n stroom insamel nie. Dit is dikwels nuttig om 'n maksimum aantal boodskappe per draad te hê. In ander gevalle is dit nuttig om alle boodskappe van 'n draad na 'n ander aanhoudende winkel te skuif wanneer die gespesifiseerde draadgrootte bereik is. U kan die grootte van 'n stroom beperk deur die MAXLEN-parameter in die opdrag te gebruik XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Wanneer MAXLEN gebruik word, word ou rekords outomaties uitgevee wanneer hulle 'n gespesifiseerde lengte bereik, dus het die stroom 'n konstante grootte. Snoei vind in hierdie geval egter nie op die doeltreffendste manier in Redis-geheue plaas nie. U kan die situasie soos volg verbeter:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Die ~ argument in die voorbeeld hierbo beteken dat ons nie noodwendig die stroomlengte tot 'n spesifieke waarde hoef te beperk nie. In ons voorbeeld kan dit enige getal groter as of gelyk aan 1000 wees (byvoorbeeld 1000, 1010 of 1030). Ons het net uitdruklik gespesifiseer dat ons wil hê dat ons stroom ten minste 1000 rekords moet stoor. Dit maak geheuebestuur baie doeltreffender binne Redis.

Daar is ook 'n aparte span XTRIM, wat dieselfde ding doen:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Aanhoudende berging en replikasie

Redis Stream word asynchronies na slawe-nodusse gerepliseer en gestoor in lêers soos AOF (snapshot van alle data) en RDB (logboek van alle skryfbewerkings). Replikasie van Verbruikersgroepe-toestand word ook ondersteun. As 'n boodskap dus in die "hangende" status op die meesterknoop is, sal hierdie boodskap op die slaafnodusse dieselfde status hê.

Verwyder individuele elemente uit 'n stroom

Daar is 'n spesiale opdrag om boodskappe uit te vee XDEL. Die opdrag kry die naam van die draad, gevolg deur die boodskap-ID's wat uitgevee moet word:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Wanneer u hierdie opdrag gebruik, moet u in ag neem dat die werklike geheue nie onmiddellik vrygestel sal word nie.

Strome van nul lengte

Die verskil tussen strome en ander Redis-datastrukture is dat wanneer ander datastrukture nie meer elemente binne hulle het nie, as 'n newe-effek, die datastruktuur self uit die geheue verwyder sal word. So, byvoorbeeld, sal die gesorteerde stel heeltemal verwyder word wanneer die ZREM-oproep die laaste element verwyder. In plaas daarvan word drade toegelaat om in die geheue te bly, selfs sonder om enige elemente binne te hê.

Gevolgtrekking

Redis Stream is ideaal vir die skep van boodskapmakelaars, boodskaprye, verenigde aantekening en kletsstelsels wat geskiedenis hou.

Soos ek eenkeer gesê het Niklaus Wirth, programme is algoritmes plus datastrukture, en Redis gee jou reeds albei.

Bron: will.com

Voeg 'n opmerking