Redis Stream - Zouverlässegkeet an Skalierbarkeet vun Äre Messageriesystemer

Redis Stream - Zouverlässegkeet an Skalierbarkeet vun Äre Messageriesystemer

Redis Stream ass en neien abstrakte Datentyp deen a Redis mat Versioun 5.0 agefouert gëtt
Konzeptuell ass de Redis Stream eng Lëscht op déi Dir Entréen bäidroe kënnt. All Entrée huet en eenzegaartegen Identifizéierer. Par défaut gëtt d'ID automatesch generéiert an enthält en Zäitstempel. Dofir kënnt Dir d'Bande vu Rekorder iwwer Zäit ufroen, oder nei Daten kréien wéi se an de Stream ukommen, sou wéi den Unix "Schwanz -f" Kommando eng Logdatei liest a freet wärend op nei Daten waart. Notéiert datt verschidde Cliente gläichzäiteg op e Fuedem lauschtere kënnen, sou wéi vill "Schwanz -f" Prozesser kënnen eng Datei gläichzäiteg liesen ouni mateneen ze konfliktéieren.

Fir all d'Virdeeler vum neien Datentyp ze verstoen, kucke mer séier op déi laang existent Redis Strukturen déi deelweis d'Funktionalitéit vum Redis Stream replizéieren.

Redis PUB/SUB

Redis Pub/Sub ass en einfachen Messagerie System schonn an Ärem Schlësselwäertgeschäft gebaut. Wéi och ëmmer, Einfachheet kënnt zu engem Präis:

  • Wann de Verlag aus irgendege Grënn feelt, da verléiert hien all seng Abonnenten
  • De Verlag muss déi exakt Adress vun all sengen Abonnente wëssen
  • E Verlag kann seng Abonnente mat Aarbecht iwwerlaascht wann Daten méi séier publizéiert ginn wéi se veraarbecht ginn
  • De Message gëtt direkt no der Verëffentlechung aus dem Puffer vum Verlag geläscht, egal wéi vill Abonnente et geliwwert gouf a wéi séier se dëse Message konnten veraarbecht.
  • All Abonnente kréien de Message zur selwechter Zäit. Abonnente selwer mussen iergendwéi ënnereneen averstane sinn iwwer d'Uerdnung vun der Veraarbechtung vum selwechte Message.
  • Et gëtt keen agebaute Mechanismus fir ze bestätegen datt en Abonnent e Message erfollegräich veraarbecht huet. Wann en Abonnent e Message kritt a während der Veraarbechtung crasht, weess de Verlag net doriwwer.

Redis Lëscht

Redis Lëscht ass eng Datestruktur déi d'Blockéierung vun Liesbefehle ënnerstëtzt. Dir kënnt Messagen vum Ufank oder Enn vun der Lëscht addéieren a liesen. Baséiert op dëser Struktur, kënnt Dir e gudde Stack oder Schlaang fir Är verdeelt System maachen, an am meeschte Fäll wäert dat genuch ginn. Haapt Differenzen vum Redis Pub / Sub:

  • De Message gëtt un ee Client geliwwert. Den éischte gelies-blockéierte Client kritt d'Donnéeën als éischt.
  • De Clint muss d'Liesoperatioun fir all Message selwer initiéieren. Lëscht weess näischt iwwer Clienten.
  • Messagen ginn gespäichert bis een se liest oder se explizit läscht. Wann Dir de Redis Server konfiguréiert fir Daten op Disk ze spülen, da erhéicht d'Zouverlässegkeet vum System dramatesch.

Aféierung fir Stream

Füügt en Entrée an e Stream

Equipe XADD füügt en neien Entrée an de Stream. E Rekord ass net nëmmen e String, et besteet aus engem oder méi Schlësselwäertpaaren. Also ass all Entrée scho strukturéiert a gläicht d'Struktur vun enger CSV Datei.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Am Beispill hei uewen addéiere mir zwee Felder zum Stream mam Numm (Schlëssel) "mystream": "Sensor-ID" an "Temperatur" mat de Wäerter "1234" respektiv "19.8". Als zweet Argument hëlt de Kommando en Identifizéierer deen dem Entrée zougewisen gëtt - dësen Identifizéierer identifizéiert all Entrée am Stream eenzegaarteg. Wéi och ëmmer, an dësem Fall hu mir * passéiert well mir wëllen datt Redis eng nei ID fir eis generéiert. All nei ID wäert eropgoen. Dofir wäert all nei Entrée e méi héijen Identifizéierer par rapport zu virdrun Entréen hunn.

Identifikatiounsformat

D'Entrée ID zréck vum Kommando XADD, besteet aus zwee Deeler:

{millisecondsTime}-{sequenceNumber}

MillisekonnenTime - Unix Zäit a Millisekonnen (Redis Server Zäit). Wéi och ëmmer, wann déi aktuell Zäit d'selwecht oder manner ass wéi d'Zäit vun der viregter Opnam, da gëtt den Zäitstempel vun der viregter Opnam benotzt. Dofir, wann d'Serverzäit zréck an d'Zäit geet, behält den neien Identifizéierer nach ëmmer d'Inkrementeigenschaft.

Sequenznummer benotzt fir records erstallt an der selwechter Millisekonnen. Sequenznummer gëtt ëm 1 vergréissert par rapport zu der viregter Entrée. Well déi Sequenznummer ass 64 Stécker an der Gréisst, da sollt Dir an der Praxis net an eng Limit op d'Zuel vun den Opzeechnunge lafen, déi bannent enger Millisekonnen generéiert kënne ginn.

D'Format vun esou Identifizéierer kann op den éischte Bléck komesch schéngen. E mësstrauensvolle Lieser kéint sech froen firwat d'Zäit Deel vum Identifizéierer ass. De Grond ass datt Redis Streams Range Ufroen duerch ID ënnerstëtzen. Well den Identifizéierer mat der Zäit verbonnen ass wou de Rekord erstallt gouf, mécht dat et méiglech Zäitbereich ze froen. Mir kucken op e spezifescht Beispill wa mir de Kommando kucken XRANGE.

Wann aus irgendege Grënn de Benotzer säin eegenen Identifizéierer muss spezifizéieren, deen zum Beispill mat engem externen System assoziéiert ass, da kënne mir et un de Kommando weiderginn XADD amplaz * wéi hei ënnendrënner:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Notéiert w.e.g. datt an dësem Fall Dir d'ID-Inkrement selwer iwwerwaache musst. An eisem Beispill ass de Minimum Identifizéierer "0-1", sou datt de Kommando keen aneren Identifizéierer akzeptéiert deen gläich oder manner wéi "0-1" ass.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Zuel vun records pro Baach

Et ass méiglech d'Zuel vun den Opzeechnungen an engem Stream einfach ze kréien andeems Dir de Kommando benotzt XLEN. Fir eist Beispill gëtt dëse Kommando de folgende Wäert zréck:

> XLEN somestream
(integer) 2

Range Ufroen - XRANGE an XREVRANGE

Fir Daten no Gamme ze froen, musse mir zwee Identifizéierer spezifizéieren - den Ufank an d'Enn vun der Gamme. Déi zréckginn Gamme wäert all Elementer enthalen, dorënner d'Grenze. Et ginn och zwee speziell Identifizéierer "-" an "+", respektiv de klengste (éischte Rekord) a gréisste (leschte Rekord) Identifizéierer am Stroum bedeit. D'Beispill hei drënner wäert all d'Stream Entréen oplëschten.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

All zréckginn Rekord ass eng Array vun zwee Elementer: en Identifizéierer an eng Lëscht vu Schlësselwäertpaaren. Mir hu scho gesot datt Rekordidentifizéierer mat der Zäit verbonne sinn. Dofir kënne mir eng Rei vun enger spezifescher Zäit ufroen. Wéi och ëmmer, mir kënnen an der Ufro net de ganzen Identifizéierer spezifizéieren, awer nëmmen d'Unix Zäit, andeems den Deel am Zesummenhang mat der Sequenznummer. Den ewechgeloossenen Deel vum Identifizéierer gëtt automatesch op Null am Ufank vum Beräich an op de maximal méigleche Wäert um Enn vum Beräich gesat. Drënner ass e Beispill wéi Dir eng Rei vun zwou Millisekonnen ufroe kënnt.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Mir hunn nëmmen eng Entrée an dësem Beräich, awer an realen Datesätz kann d'Resultat enorm enorm sinn. Aus dësem Grond XRANGE ënnerstëtzt d'COUNT Optioun. Andeems Dir d'Quantitéit uginn, kënne mir einfach déi éischt N records kréien. Wa mir mussen déi nächst N records kréien (Paginéierung), kënne mir déi lescht kritt ID benotzen, Erhéijung et Sequenznummer vun engem a froen nach eng Kéier. Loosst eis dëst am folgende Beispill kucken. Mir fänken un derbäi 10 Elementer mat XADD (ugeholl datt mystream scho mat 10 Elementer gefëllt war). Fir d'Iteratioun unzefänken 2 Elementer pro Kommando ze kréien, fänken mir mat der ganzer Palette un awer mat COUNT gläich wéi 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Fir weider mat den nächsten zwee Elementer ze iteréieren, musse mir déi lescht kritt ID auswielen, dh 1519073279157-0, an 1 addéieren Sequenznummer.
Déi resultéierend ID, an dësem Fall 1519073279157-1, kann elo als neit Start vun der Range Argument fir den nächsten Uruff benotzt ginn XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

A sou weider. Wéinst Komplexitéit XRANGE ass O(log(N)) fir ze sichen an dann O(M) fir M Elementer zréckzekommen, dann ass all Iteratiounsschrëtt séier. Also benotzt XRANGE Stréimunge kënnen effizient iteréiert ginn.

Equipe XREVRANGE ass d'Äquivalent XRANGE, awer bréngt d'Elementer an ëmgedréint Uerdnung zréck:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Maacht weg datt de Kommando XREVRANGE hëlt Range Argumenter Start an stoppen an ëmgedréint Uerdnung.

Liesen nei Entréen mat XREAD

Dacks entsteet d'Aufgab op e Stream ze abonnéieren an nëmmen nei Messagen ze kréien. Dëst Konzept kann ähnlech wéi Redis Pub / Sub schéngen oder Redis List blockéieren, awer et gi fundamental Differenzen wéi Dir Redis Stream benotzt:

  1. All neie Message gëtt par défaut un all Abonnent geliwwert. Dëst Verhalen ass anescht wéi eng blockéiert Redis Lëscht, wou en neie Message nëmmen vun engem Abonnent gelies gëtt.
  2. Wärend am Redis Pub / Sub all Messagen vergiess ginn an ni bestoe bleiwen, am Stream ginn all Messagen onbestëmmt behalen (ausser wann de Client explizit d'Läschung verursaacht).
  3. Redis Stream erlaabt Iech Zougang zu Messagen bannent engem Stream z'ënnerscheeden. E spezifesche Abonnent kann nëmmen hir perséinlech Messagegeschicht gesinn.

Dir kënnt op e Fuedem abonnéieren an nei Messagen mam Kommando kréien XREAD. Et ass e bësse méi komplizéiert wéi XRANGE, also fänke mer als éischt mat de méi einfache Beispiller un.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

D'Beispill hei uewen weist eng net-blockéierend Form XREAD. Notéiert datt d'COUNT Optioun fakultativ ass. Tatsächlech ass déi eenzeg erfuerderlech Kommandooptioun d'STREAMS Optioun, déi eng Lëscht vu Streamen zesumme mam entspriechende maximalen Identifizéierer spezifizéiert. Mir hunn "STREAMS mystream 0" geschriwwen - mir wëllen all records vum mystream Stream mat engem Identifizéierer méi wéi "0-0" kréien. Wéi Dir aus dem Beispill kënnt gesinn, gëtt de Kommando den Numm vum Fuedem zréck, well mir kënnen op verschidde Threads zur selwechter Zäit abonnéieren. Mir kéinten zum Beispill schreiwen "STREAMS mystream otherstream 0 0". Notéiert w.e.g. datt no der STREAMS-Optioun fir d'éischt d'Nimm vun all de erfuerderleche Streame musse ginn an nëmmen dann eng Lëscht vun den Identifizéierer.

An dëser einfacher Form mécht de Kommando näischt Besonnesches am Verglach mat XRANGE. Wéi och ëmmer, déi interessant Saach ass datt mir einfach kënne dréien XREAD op e Spärkommando, andeems d'BLOCK Argument spezifizéiert:

> XREAD BLOCK 0 STREAMS mystream $

Am Beispill hei uewen ass eng nei BLOCK Optioun spezifizéiert mat engem Timeout vun 0 Millisekonnen (dat heescht onbestëmmt waarden). Ausserdeem, amplaz den üblechen Identifizéierer fir de Stream mystream ze passéieren, gouf e speziellen Identifizéierer $ passéiert. Dëse speziellen Identifizéierer heescht dat XREAD muss de maximalen Identifizéierer am mystream als Identifizéierer benotzen. Also mir kréien nëmmen nei Messagen ab dem Moment wou mir ugefaang hunn ze lauschteren. Op e puer Manéier ass dëst ähnlech dem Unix "Schwanz -f" Kommando.

Notéiert datt wann Dir d'BLOCK Optioun benotzt, brauche mir net onbedéngt de speziellen Identifizéierer $ ze benotzen. Mir kënnen all Identifizéierer benotzen, déi am Stream existéiert. Wann d'Team eis Ufro direkt kann servéieren ouni ze blockéieren, mécht se dat, soss blockéiert se.

Blockéieren XREAD kann och e puer Threads gläichzäiteg lauschteren, Dir musst just hir Nimm uginn. An dësem Fall gëtt de Kommando e Rekord vum éischte Stroum zréck, deen Daten kritt huet. Den éischten Abonnent, dee fir e bestëmmte Fuedem blockéiert ass, kritt als éischt Daten.

Konsument Gruppen

A verschiddenen Aufgaben wëlle mir den Abonnent Zougang zu Messagen bannent engem thread limitéieren. E Beispill wou dëst nëtzlech ka sinn ass eng Messageschlaang mat Aarbechter déi verschidde Messagen aus engem thread kréien, wat d'Messageveraarbechtung erlaabt ze skaléieren.

Wa mir eis virstellen datt mir dräi Abonnenten C1, C2, C3 hunn an e Fuedem deen d'Botschaften 1, 2, 3, 4, 5, 6, 7 enthält, da ginn d'Messagen zerwéiert wéi am Diagramm hei ënnen:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Fir dësen Effekt z'erreechen, benotzt Redis Stream e Konzept mam Numm Consumer Group. Dëst Konzept ass ähnlech wéi e Pseudo-Abonnent, deen Daten aus engem Stream kritt, awer tatsächlech vu multiple Abonnente bannent enger Grupp zerwéiert gëtt, wat gewësse Garantien ubitt:

  1. All Message gëtt un en aneren Abonnent bannent der Grupp geliwwert.
  2. Bannent enger Grupp ginn Abonnente mat hirem Numm identifizéiert, wat e case-sensibel String ass. Wann en Abonnent temporär aus der Grupp fällt, kann hien an de Grupp mat sengem eegenen eenzegaartegen Numm restauréiert ginn.
  3. All Consumer Group follegt dem "éischt ongelies Message" Konzept. Wann en Abonnent nei Messagen freet, kann et nëmme Messagen kréien déi nach ni virdrun un en Abonnent an der Grupp geliwwert goufen.
  4. Et gëtt e Kommando fir explizit ze bestätegen datt de Message erfollegräich vum Abonnent veraarbecht gouf. Bis dëse Kommando opgeruff gëtt, bleift de ugefrote Message am "pending" Status.
  5. Bannent der Consumer Group kann all Abonnent eng Geschicht vu Messagen ufroen, déi him geliwwert goufen, awer nach net veraarbecht ginn (am "pending" Status)

An engem Sënn kann den Zoustand vun der Grupp wéi follegt ausgedréckt ginn:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Elo ass et Zäit d'Haaptbefehle fir d'Konsumentengrupp kennenzeléieren, nämlech:

  • XGROUP benotzt fir Gruppen ze kreéieren, zerstéieren a managen
  • XREADGROUP benotzt fir Stream duerch Grupp ze liesen
  • XACK - dëst Kommando erlaabt dem Abonnent de Message als erfollegräich veraarbecht ze markéieren

Kreatioun vun Konsument Group

Loosst eis unhuelen datt mystream scho existéiert. Da gesäit de Kommando fir d'Gruppschafung esou aus:

> XGROUP CREATE mystream mygroup $
OK

Wann Dir e Grupp erstellt, musse mir en Identifizéierer passéieren, vun deem de Grupp Messagen kritt. Wa mir just all nei Messagen wëllen kréien, da kënne mir de speziellen Identifizéierer $ benotzen (wéi an eisem Beispill hei uewen). Wann Dir 0 anstatt e speziellen Identifizéierer spezifizéiert, da sinn all Messagen am Fuedem fir de Grupp verfügbar.

Elo datt d'Grupp erstallt ass, kënne mir direkt ufänken Messagen mat dem Kommando ze liesen XREADGROUP. Dëse Kommando ass ganz ähnlech wéi XREAD an ënnerstëtzt d'optional BLOCK Optioun. Wéi och ëmmer, et gëtt eng erfuerderlech GROUP Optioun déi ëmmer mat zwee Argumenter spezifizéiert muss ginn: de Gruppnumm an den Abonnentnumm. D'COUNT Optioun gëtt och ënnerstëtzt.

Ier mer de Fuedem liesen, loosst eis e puer Messagen do setzen:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Loosst eis elo probéieren dëse Stream duerch de Grupp ze liesen:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Deen uewe genannte Kommando liest verbatim wéi follegt:

"Ech, Abonnent Alice, e Member vun der mygroup, wëll ee Message aus mystream liesen, deen nach ni virdru bei jidderengem geliwwert gouf."

All Kéier wann en Abonnent eng Operatioun op enger Grupp ausféiert, muss en säin Numm ubidden, sech eenzegaarteg an der Grupp z'identifizéieren. Et gëtt ee méi ganz wichtegen Detail am uewe genannte Kommando - de speziellen Identifizéierer ">". Dëse speziellen Identifizéierer filtert Messagen, léisst nëmmen déi, déi nach ni geliwwert goufen.

Och, a spezielle Fäll, kënnt Dir e richtegen Identifizéierer wéi 0 oder all aner gëlteg Identifizéierer uginn. An dësem Fall de Kommando XREADGROUP wäert Iech eng Geschicht vu Messagen mat engem Status vun "pending" zréckginn, déi dem spezifizéierte Abonnent (Alice) geliwwert goufen, awer nach net mat dem Kommando unerkannt goufen XACK.

Mir kënnen dëst Verhalen testen andeems Dir direkt d'ID 0 spezifizéiert, ouni d'Optioun ZIELT. Mir wäerten einfach eng eenzeg pendend Noriicht gesinn, dat ass den Apel Message:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Wéi och ëmmer, wa mir de Message als erfollegräich veraarbecht bestätegen, da gëtt et net méi ugewisen:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Elo ass et dem Bob säin Tour eppes ze liesen:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, e Member vun mygroup, gefrot fir net méi wéi zwee Messagen. De Kommando bericht nëmmen net geliwwert Messagen wéinst dem speziellen Identifizéierer ">". Wéi Dir gesitt, gëtt de Message "Apel" net ugewisen, well et scho bei Alice geliwwert gouf, sou datt de Bob "Orange" an "Erdbeere" kritt.

Op dës Manéier kënnen d'Alice, de Bob an all aner Abonnent vum Grupp verschidde Messagen aus dem selwechte Stream liesen. Si kënnen och hir Geschicht vun onveraarbechtte Messagen liesen oder Messagen als veraarbecht markéieren.

Et ginn e puer Saachen am Kapp ze halen:

  • Soubal den Abonnent de Message als Kommando betruecht XREADGROUP, geet dëse Message an den "pending" Staat a gëtt un deem spezifesche Abonnent zougewisen. Aner Grupp Abonnente kënnen dëse Message net liesen.
  • Abonnente ginn automatesch op der éischter Ernimmung erstallt, et ass net néideg se explizit ze kreéieren.
  • Mat der Hëllef vun XREADGROUP Dir kënnt Messagen aus verschiddene Threads zur selwechter Zäit liesen, awer fir datt dëst funktionnéiert, musst Dir fir d'éischt Gruppe mam selwechten Numm fir all Thread erstellen XGROUP

Erhuelung no engem Echec

Den Abonnent kann sech vum Versoen erholen a seng Lëscht vun de Messagen mam Status "pending" nei liesen. Wéi och ëmmer, an der realer Welt kënnen Abonnente schlussendlech versoen. Wat geschitt mat engem Abonnent d'Stéck Messagen wann den Abonnent net fäeg ass sech vun engem Feeler z'erhuelen?
Consumer Group bitt eng Fonktioun déi just fir esou Fäll benotzt gëtt - wann Dir de Besëtzer vun de Messagen änneren musst.

Dat éischt wat Dir maache musst ass de Kommando ze ruffen XPENDING, déi all Messagen an der Grupp mat dem Status "pending" weist. A senger einfachster Form gëtt de Kommando mat nëmmen zwee Argumenter genannt: de Fuedemnumm an de Gruppnumm:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

D'Team huet d'Zuel vun onveraarbechtte Messagen fir de ganze Grupp a fir all Abonnent ugewisen. Mir hunn nëmmen de Bob mat zwee aussergewéinleche Messagen, well deen eenzege Message, deen Alice gefrot huet, bestätegt gouf XACK.

Mir kënne méi Informatioun ufroen mat méi Argumenter:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - Gamme vun Identifizéierer (Dir kënnt "-" an "+") benotzen)
{count} - Zuel vun Liwwerung Versich
{Consumer-Name} - Gruppnumm

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Elo hu mir Detailer fir all Message: ID, Abonnentnumm, Idle Zäit a Millisekonnen a schliisslech d'Zuel vun de Liwwerversich. Mir hunn zwee Messagen vum Bob a si ware fir 74170458 Millisekonnen, ongeféier 20 Stonnen, idle.

Notéiert w.e.g. datt keen eis verhënnert fir ze iwwerpréiwen wat den Inhalt vum Message war einfach andeems Dir benotzt XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mir mussen just déiselwecht Identifizéierer zweemol an den Argumenter widderhuelen. Elo wou mir e puer Iddi hunn, kann d'Alice entscheeden datt no 20 Stonnen Ausdauer de Bob wahrscheinlech net erholen, an et ass Zäit dës Messagen ze froen an d'Veraarbechtung fir de Bob weiderzeféieren. Fir dëst benotze mir de Kommando XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Mat dësem Kommando kënne mir e "auslännesche" Message kréien, deen nach net veraarbecht gouf andeems Dir de Besëtzer op {Consumer} ännert. Mir kënnen awer och e Minimum Idle Zäit {min-idle-time} ubidden. Dëst hëlleft eng Situatioun ze vermeiden wou zwee Cliente probéieren de Besëtzer vun de selwechte Messagen gläichzäiteg z'änneren:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Den éischte Client wäert d'Downtime zrécksetzen an d'Liwwerzähler erhéijen. Also den zweete Client kann et net ufroen.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

De Message gouf erfollegräich vun Alice behaapt, déi elo de Message veraarbecht an unerkennen kann.

Aus dem uewe Beispill kënnt Dir gesinn datt eng erfollegräich Ufro den Inhalt vun der Noriicht selwer zréckkënnt. Dëst ass awer net néideg. D'JUSTID Optioun kann nëmme benotzt ginn fir Message IDen zréckzeginn. Dëst ass nëtzlech wann Dir net un d'Detailer vum Message interesséiert sidd an Dir wëllt d'Systemleistung erhéijen.

Liwwerung Konter

De Konter deen Dir am Ausgang gesitt XPENDING ass d'Zuel vun de Liwwerunge vun all Message. Sou e Konter gëtt op zwou Manéieren erhéicht: wann e Message erfollegräich ugefrot gëtt XCLAIM oder wann en Uruff benotzt gëtt XREADGROUP.

Et ass normal datt verschidde Messagen e puer Mol geliwwert ginn. Den Haapt Saach ass datt all Messagen schlussendlech veraarbecht ginn. Heiansdo geschéien Problemer beim Veraarbechtung vun engem Message well de Message selwer beschiedegt ass, oder Message Veraarbechtung verursaacht e Feeler am Handlercode. An dësem Fall kann et sech erausstellen datt keen dëse Message veraarbecht ka ginn. Well mir hunn eng Liwwerung Versuch Konter, mir kënnen dëse Konter benotzen esou Situatiounen ze entdecken. Dofir, wann d'Liwwerzuel déi héich Zuel erreecht, déi Dir uginn hutt, wier et wahrscheinlech méi schlau fir sou e Message op en anere Fuedem ze setzen an eng Notifikatioun un de Systemadministrator ze schécken.

Thread Staat

Equipe XINFO benotzt fir verschidde Informatioun iwwer e Fuedem a seng Gruppen ze froen. Zum Beispill gesäit e Basisbefehl esou aus:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

De Kommando uewen weist allgemeng Informatioun iwwer de spezifizéierte Stroum. Elo e bësse méi komplex Beispill:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

De Kommando uewen weist allgemeng Informatioun fir all Gruppe vum spezifizéierte Fuedem

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

De Kommando uewen weist Informatioun fir all Abonnente vum spezifizéierte Stream a Grupp.
Wann Dir de Kommando Syntax vergiesst, frot just de Kommando selwer fir Hëllef:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stream Gréisst Limit

Vill Applikatiounen wëllen net fir ëmmer Daten an e Stroum sammelen. Et ass dacks nëtzlech eng maximal Unzuel u Messagen pro thread erlaabt ze hunn. An anere Fäll ass et nëtzlech all Messagen vun engem Fuedem an en anert persistent Geschäft ze réckelen wann déi spezifizéiert Fuedemgréisst erreecht gëtt. Dir kënnt d'Gréisst vun engem Stroum limitéieren mam MAXLEN Parameter am Kommando XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Wann Dir MAXLEN benotzt, ginn al records automatesch geläscht wann se eng spezifizéiert Längt erreechen, sou datt de Stroum eng konstant Gréisst huet. Wéi och ëmmer, Pruning an dësem Fall geschitt net am effizientesten Wee am Redis Erënnerung. Dir kënnt d'Situatioun verbesseren wéi follegt:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Den ~ Argument am Beispill hei uewen bedeit datt mir net onbedéngt d'Streamlängt op e spezifesche Wäert limitéiere mussen. An eisem Beispill kann dëst all Zuel méi grouss wéi oder gläich wéi 1000 sinn (zum Beispill 1000, 1010 oder 1030). Mir hunn just explizit uginn datt mir wëllen datt eise Stream op d'mannst 1000 records späichert. Dëst mécht Erënnerung Gestioun vill méi efficace bannent Redis.

Et gëtt och eng separat Equipe XTRIM, wat datselwecht mécht:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Persistent Lagerung a Replikatioun

Redis Stream gëtt asynchron op Sklavennoden replizéiert a gespäichert op Dateie wéi AOF (Snapshot vun all Daten) an RDB (Log vun all Schreifoperatioune). Replikatioun vum Consumer Groups Staat gëtt och ënnerstëtzt. Dofir, wann e Message am "pending" Status um Master Node ass, dann op de Sklavennoden wäert dëse Message dee selwechte Status hunn.

Ewechzehuelen eenzel Elementer aus engem Baach

Et gëtt e spezielle Kommando fir Messagen ze läschen XDEL. De Kommando kritt den Numm vum Fuedem gefollegt vun de Message IDen déi geläscht ginn:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Wann Dir dëse Kommando benotzt, musst Dir berücksichtegen datt déi aktuell Erënnerung net direkt verëffentlecht gëtt.

Null Längt Baachen

Den Ënnerscheed tëscht Streamen an anere Redis Datestrukturen ass datt wann aner Datestrukturen net méi Elementer an hinnen hunn, als Nebenwirkung, gëtt d'Datestruktur selwer aus der Erënnerung geläscht. Also, zum Beispill, gëtt de zortéierte Set komplett geläscht wann den ZREM-Uruff dat lescht Element läscht. Amplaz sinn Threads erlaabt an der Erënnerung ze bleiwen och ouni Elementer dobannen ze hunn.

Konklusioun

Redis Stream ass ideal fir Messagen Brokeren, Messageschlaangen, vereenegt Logbicher, a Geschichtshalend Chatsystemer ze kreéieren.

Wéi ech eemol gesot hunn Niklaus Wirth, Programmer sinn Algorithmen plus Datestrukturen, a Redis gëtt Iech scho béid.

Source: will.com

Setzt e Commentaire