Redis Stream – az üzenetküldő rendszerek megbízhatósága és méretezhetősége

Redis Stream – az üzenetküldő rendszerek megbízhatósága és méretezhetősége

A Redis Stream egy új absztrakt adattípus, amelyet a Redis 5.0-s verziójával vezettek be
Elméletileg a Redis Stream egy lista, amelyhez bejegyzéseket adhat hozzá. Minden bejegyzés egyedi azonosítóval rendelkezik. Alapértelmezés szerint az azonosító automatikusan jön létre, és időbélyeget is tartalmaz. Ezért lekérdezhet rekordtartományokat az idő múlásával, vagy új adatokat fogadhat, amint azok megérkeznek az adatfolyamba, hasonlóan ahhoz, ahogy a Unix "tail -f" parancsa beolvassa a naplófájlt, és lefagy, miközben új adatokra vár. Vegye figyelembe, hogy egyszerre több kliens is hallgathat egy szálat, mint ahogy sok "tail -f" folyamat is képes egyidejűleg olvasni egy fájlt anélkül, hogy konfliktusba kerülne egymással.

Az új adattípus minden előnyének megértéséhez vessünk egy gyors pillantást a régóta létező Redis-struktúrákra, amelyek részben replikálják a Redis Stream funkcióit.

Redis PUB/SUB

A Redis Pub/Sub egy egyszerű üzenetküldő rendszer, amely már be van építve a kulcsérték-tárba. Az egyszerűségnek azonban ára van:

  • Ha a kiadó valamilyen okból megbukik, akkor elveszíti az összes előfizetőjét
  • A kiadónak tudnia kell minden előfizetőjének pontos címét
  • A kiadó túlterhelheti előfizetőit munkával, ha az adatok gyorsabban kerülnek közzétételre, mint a feldolgozás
  • Az üzenet a közzététel után azonnal törlődik a kiadó pufferéből, függetlenül attól, hogy hány előfizetőnek kézbesítették, és milyen gyorsan tudták feldolgozni az üzenetet.
  • Minden előfizető egyszerre kapja meg az üzenetet. Maguknak az előfizetőknek valamilyen módon meg kell állapodniuk egymás között ugyanazon üzenet feldolgozásának sorrendjében.
  • Nincs beépített mechanizmus annak ellenőrzésére, hogy az előfizető sikeresen feldolgozta-e az üzenetet. Ha egy előfizető üzenetet kap és a feldolgozás során összeomlik, a kiadó nem tud róla.

Redis lista

A Redis List egy adatstruktúra, amely támogatja az olvasási parancsok blokkolását. Hozzáadhat és elolvashat üzeneteket a lista elejétől vagy végétől. E struktúra alapján jó verem vagy sor készíthető az elosztott rendszerhez, és a legtöbb esetben ez elegendő is lesz. A fő különbségek a Redis Pub/Subhoz képest:

  • Az üzenetet egy ügyfélnek kézbesítik. Az első olvasási blokkolt ügyfél kapja meg először az adatokat.
  • Clintnek magának kell kezdeményeznie az egyes üzenetek olvasási műveletét. A lista semmit sem tud az ügyfelekről.
  • Az üzenetek addig tárolódnak, amíg valaki el nem olvassa vagy kifejezetten törli őket. Ha úgy konfigurálja a Redis szervert, hogy az adatokat a lemezre ürítse, akkor a rendszer megbízhatósága drámaian megnő.

Bevezetés a Streambe

Bejegyzés hozzáadása egy adatfolyamhoz

Csapat XADD új bejegyzést ad az adatfolyamhoz. Egy rekord nem csak egy karakterlánc, hanem egy vagy több kulcs-érték párból áll. Így minden bejegyzés már strukturált, és egy CSV-fájl szerkezetére hasonlít.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

A fenti példában két mezőt adunk az adatfolyamhoz „mystream” névvel (kulcs): „sensor-id” és „temperatura” „1234” és „19.8” értékekkel. Második argumentumként a parancs egy azonosítót vesz fel, amely hozzá lesz rendelve a bejegyzéshez – ez az azonosító egyedileg azonosítja az adatfolyam minden bejegyzését. Ebben az esetben azonban átadtuk a *-ot, mert azt akarjuk, hogy a Redis új azonosítót generáljon nekünk. Minden új azonosító növekszik. Ezért minden új bejegyzés magasabb azonosítóval rendelkezik a korábbi bejegyzésekhez képest.

Azonosító formátuma

A parancs által visszaadott bejegyzésazonosító XADD, két részből áll:

{millisecondsTime}-{sequenceNumber}

ezredmásodpercTime — Unix idő ezredmásodpercben (Redis szerveridő). Ha azonban az aktuális idő megegyezik vagy kevesebb, mint az előző felvétel időpontja, akkor a rendszer az előző felvétel időbélyegét használja. Ezért, ha a szerver ideje visszamegy az időben, az új azonosító továbbra is megtartja a növekedési tulajdonságot.

sorszám ugyanazon ezredmásodperc alatt létrehozott rekordokhoz használják. sorszám 1-gyel nő az előző bejegyzéshez képest. Mert a sorszám 64 bites, akkor a gyakorlatban nem szabad belefutni az egy ezredmásodperc alatt generálható rekordok számának korlátozásába.

Az ilyen azonosítók formátuma első pillantásra furcsának tűnhet. A bizalmatlan olvasó elgondolkozhat azon, hogy miért része az idő az azonosítónak. Ennek az az oka, hogy a Redis adatfolyamok támogatják az azonosító alapján történő tartománylekérdezéseket. Mivel az azonosító a rekord létrehozásának idejéhez van társítva, ez lehetővé teszi az időtartományok lekérdezését. Megnézünk egy konkrét példát, amikor megnézzük a parancsot XRANGE.

Ha valamilyen oknál fogva a felhasználónak meg kell adnia saját azonosítóját, amely például valamilyen külső rendszerhez van társítva, akkor átadhatjuk a parancsnak. XADD * helyett az alábbiak szerint:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Felhívjuk figyelmét, hogy ebben az esetben magának kell figyelnie az azonosítónövekedést. Példánkban a minimális azonosító "0-1", így a parancs nem fogad el más olyan azonosítót, amely egyenlő vagy kisebb, mint "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Adatfolyamonkénti rekordok száma

A paranccsal egyszerűen lekérhető a rekordok száma egy adatfolyamban XLEN. Példánkban ez a parancs a következő értéket adja vissza:

> XLEN somestream
(integer) 2

Tartománylekérdezések – XRANGE és XREVRANGE

Az adatok tartomány szerinti lekéréséhez két azonosítót kell megadnunk - a tartomány elejét és végét. A visszaadott tartomány minden elemet tartalmazni fog, beleértve a határokat is. Két speciális azonosító is található: „-” és „+”, amelyek rendre a legkisebb (első rekord) és a legnagyobb (utolsó rekord) azonosítót jelentik az adatfolyamban. Az alábbi példa felsorolja az összes adatfolyam-bejegyzést.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Minden visszaadott rekord két elem tömbje: egy azonosító és egy kulcs-érték párok listája. Már említettük, hogy a rekordazonosítók az időhöz kapcsolódnak. Ezért kérhetünk egy meghatározott időtartamra vonatkozó tartományt. A kérésben azonban nem a teljes azonosítót, hanem csak a Unix-időt adhatjuk meg, a vonatkozó részt elhagyva. sorszám. Az azonosító kihagyott része a tartomány elején automatikusan nullára, a tartomány végén pedig a maximális lehetséges értékre áll be. Az alábbiakban egy példa látható arra, hogyan kérhet két ezredmásodperces tartományt.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Ebben a tartományban csak egy bejegyzésünk van, de valós adathalmazokban a visszaadott eredmény hatalmas lehet. Emiatt XRANGE támogatja a COUNT opciót. A mennyiség megadásával egyszerűen megkaphatjuk az első N rekordot. Ha meg kell szereznünk a következő N rekordot (oldalszámozás), használhatjuk az utoljára kapott azonosítót, növeljük azt sorszám egyenként, és kérdezd meg újra. Nézzük meg ezt a következő példában. Elkezdjük 10 elem hozzáadását XADD (feltételezve, hogy a mystream már tele volt 10 elemmel). Ahhoz, hogy az iteráció parancsonként 2 elemet kapjon, a teljes tartománnyal kezdjük, de a COUNT egyenlő 2-vel.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

A következő két elemmel való iteráció folytatásához ki kell választanunk az utoljára kapott azonosítót, azaz a 1519073279157-0-t, és adjunk hozzá 1-et sorszám.
Az eredményül kapott azonosító, ebben az esetben 1519073279157-1, mostantól használható a következő hívás új tartománykezdési argumentumaként. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Stb. A komplexitás miatt XRANGE O(log(N)) a kereséshez, majd O(M) M elem visszaadásához, akkor minden iterációs lépés gyors. Így felhasználva XRANGE A folyamok hatékonyan iterálhatók.

Csapat XREVRANGE az egyenértékű XRANGE, de az elemeket fordított sorrendben adja vissza:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Felhívjuk figyelmét, hogy a parancs XREVRANGE fordított sorrendben veszi a start és stop tartomány argumentumokat.

Új bejegyzések olvasása az XREAD segítségével

Gyakran felmerül a feladat, hogy feliratkozzon egy adatfolyamra, és csak új üzeneteket kapjon. Ez a koncepció hasonlónak tűnhet a Redis Pub/Sub-hoz vagy a Redis List blokkolásához, de alapvető különbségek vannak a Redis Stream használatában:

  1. Alapértelmezés szerint minden új üzenetet minden előfizető megkap. Ez a viselkedés különbözik a blokkoló Redis-listától, ahol az új üzenetet csak egy előfizető olvassa el.
  2. Míg a Redis Pub/Sub alkalmazásban az összes üzenet elfelejtődik, és soha nem marad fenn, a Streamben az összes üzenet korlátlan ideig megmarad (kivéve, ha a kliens kifejezetten törli).
  3. A Redis Stream lehetővé teszi az üzenetekhez való hozzáférés megkülönböztetését egy adatfolyamon belül. Egy adott előfizető csak személyes üzenetelőzményeit láthatja.

A parancs segítségével feliratkozhat egy szálra, és új üzeneteket fogadhat XREAD. Ez egy kicsit bonyolultabb annál XRANGE, ezért először az egyszerűbb példákkal kezdjük.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

A fenti példa egy nem blokkoló űrlapot mutat be XREAD. Vegye figyelembe, hogy a COUNT opció nem kötelező. Valójában az egyetlen kötelező parancsbeállítás a STREAMS opció, amely megadja a folyamok listáját a megfelelő maximális azonosítóval együtt. Azt írtuk, hogy „STREAMS mystream 0” – a mystream adatfolyam összes rekordját „0-0”-nál nagyobb azonosítóval szeretnénk megkapni. Ahogy a példából is látható, a parancs a szál nevét adja vissza, mert egyszerre több szálra is feliratkozhatunk. Írhatnánk például: "STREAMS mystream otherstream 0 0". Kérjük, vegye figyelembe, hogy a STREAMS opció után először meg kell adnunk az összes szükséges adatfolyam nevét, és csak utána az azonosítók listáját.

Ebben az egyszerű formában a parancs nem csinál semmi különöset ehhez képest XRANGE. Viszont az az érdekes, hogy könnyen tudunk fordulni XREAD egy blokkoló parancshoz, megadva a BLOCK argumentumot:

> XREAD BLOCK 0 STREAMS mystream $

A fenti példában egy új BLOCK opció van megadva 0 ezredmásodperces időkorláttal (ez azt jelenti, hogy határozatlan ideig kell várni). Sőt, ahelyett, hogy a stream mystream szokásos azonosítóját adta volna át, egy speciális $ azonosítót adtak át. Ez a speciális azonosító azt jelenti XREAD a mystream maximális azonosítóját kell használnia azonosítóként. Így csak attól a pillanattól fogva kapunk új üzeneteket, amikor elkezdtük hallgatni. Bizonyos szempontból ez hasonlít a Unix "tail -f" parancsához.

Vegye figyelembe, hogy a BLOCK opció használatakor nem feltétlenül szükséges a $ speciális azonosítót használnunk. Bármilyen azonosítót használhatunk a streamben. Ha a csapat letiltás nélkül azonnal teljesíteni tudja kérésünket, akkor ezt megteszi, ellenkező esetben letilt.

Blokkolás XREAD több szálat is hallgathat egyszerre, csak a nevüket kell megadni. Ebben az esetben a parancs az első adatfolyam rekordját adja vissza. Az adott szálhoz elsőként letiltott előfizető kapja meg először az adatokat.

Fogyasztói Csoportok

Bizonyos feladatoknál korlátozni szeretnénk az előfizetők hozzáférését az egy szálon belüli üzenetekhez. Egy példa, ahol ez hasznos lehet, egy üzenetsor olyan dolgozókkal, akik különböző üzeneteket kapnak egy száltól, lehetővé téve az üzenetfeldolgozás méretezését.

Ha úgy képzeljük el, hogy három előfizetőnk van C1, C2, C3 és egy szál, amely 1, 2, 3, 4, 5, 6, 7 üzeneteket tartalmaz, akkor az üzenetek az alábbi diagram szerint jelennek meg:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

E hatás eléréséhez a Redis Stream a Consumer Group nevű koncepciót használja. Ez a koncepció hasonlít egy pszeudo-előfizetőhöz, amely adatfolyamból kap adatokat, de valójában több előfizető szolgálja ki egy csoporton belül, bizonyos garanciákat nyújtva:

  1. Minden üzenet a csoporton belül más előfizetőhöz érkezik.
  2. Egy csoporton belül az előfizetőket a nevük alapján azonosítják, ami egy kis- és nagybetűk megkülönböztetése. Ha egy előfizető ideiglenesen kiesik a csoportból, a saját egyedi nevével vissza lehet állítani a csoportba.
  3. Minden fogyasztói csoport az „első olvasatlan üzenet” koncepciót követi. Amikor egy előfizető új üzeneteket kér, csak olyan üzeneteket tud fogadni, amelyeket korábban még nem kézbesítettek a csoporton belüli egyik előfizetőnek sem.
  4. Van egy parancs, amely kifejezetten megerősíti, hogy az üzenetet az előfizető sikeresen feldolgozta. Amíg ezt a parancsot nem hívják, a kért üzenet „függőben” állapotban marad.
  5. A Fogyasztói Csoporton belül minden előfizető kérheti a neki kézbesített, de még nem feldolgozott ("függőben" státuszban) üzenetek előzményeit.

Bizonyos értelemben a csoport állapota a következőképpen fejezhető ki:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Itt az ideje, hogy megismerkedjünk a fogyasztói csoport főbb parancsaival, nevezetesen:

  • XGROUP csoportok létrehozására, megsemmisítésére és kezelésére használják
  • XREADGROUP a csoporton keresztüli adatfolyam olvasására szolgál
  • XACK - ez a parancs lehetővé teszi az előfizető számára, hogy sikeresen feldolgozottként jelölje meg az üzenetet

Fogyasztói Csoport létrehozása

Tegyük fel, hogy a mystream már létezik. Ezután a csoportlétrehozási parancs így fog kinézni:

> XGROUP CREATE mystream mygroup $
OK

Csoport létrehozásakor egy azonosítót kell átadnunk, amelytől kezdve a csoport üzeneteket fog kapni. Ha csak az összes új üzenetet szeretnénk megkapni, akkor használhatjuk a $ speciális azonosítót (mint a fenti példánkban). Ha speciális azonosító helyett 0-t ad meg, akkor a szál összes üzenete elérhető lesz a csoport számára.

Most, hogy a csoport létrejött, azonnal elkezdhetjük az üzenetek olvasását a paranccsal XREADGROUP. Ez a parancs nagyon hasonlít a XREAD és támogatja az opcionális BLOCK opciót. Van azonban egy kötelező GROUP beállítás, amelyet mindig két argumentummal kell megadni: a csoportnévvel és az előfizető nevével. A COUNT opció is támogatott.

Mielőtt elolvasná a szálat, tegyünk oda néhány üzenetet:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Most próbáljuk meg elolvasni ezt az adatfolyamot a csoporton keresztül:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

A fenti parancs szó szerint így szól:

"Én, Alice előfizető, a csoportom tagja, el akarok olvasni egy olyan üzenetet a mystreamből, amelyet még soha senkinek nem juttattak el."

Minden alkalommal, amikor egy előfizető műveletet hajt végre egy csoporton, meg kell adnia a nevét, amely egyedileg azonosítja magát a csoporton belül. Van még egy nagyon fontos részlet a fenti parancsban - a speciális azonosító ">". Ez a speciális azonosító szűri az üzeneteket, és csak azokat hagyja meg, amelyeket még soha nem kézbesítettek.

Ezenkívül speciális esetekben valódi azonosítót is megadhat, például 0-t vagy bármely más érvényes azonosítót. Ebben az esetben a parancs XREADGROUP visszaadja a „függőben” állapotú üzenetek előzményeit, amelyeket a megadott előfizetőnek (Alice) kézbesítettek, de még nem nyugtáztak a paranccsal XACK.

Ezt a viselkedést úgy tudjuk tesztelni, ha azonnal megadjuk a 0 azonosítót, opció nélkül COUNT. Egyszerűen egyetlen függőben lévő üzenetet fogunk látni, vagyis az alma üzenetet:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ha azonban megerősítjük az üzenet sikeres feldolgozását, akkor az többé nem jelenik meg:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Most Bobon a sor, hogy olvasson valamit:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, a csoportom tagja legfeljebb két üzenetet kért. A parancs csak a nem kézbesített üzeneteket jelenti a speciális ">" azonosító miatt. Amint láthatja, az "alma" üzenet nem jelenik meg, mivel azt már kézbesítették Alice-nek, így Bob "narancsot" és "epret" kap.

Így Alice, Bob és a csoport bármely más előfizetője különböző üzeneteket olvashat ugyanabból az adatfolyamból. Ezenkívül elolvashatják a feldolgozatlan üzenetek előzményeit, vagy feldolgozottként jelölhetik meg az üzeneteket.

Néhány dolgot érdemes szem előtt tartani:

  • Amint az előfizető parancsnak tekinti az üzenetet XREADGROUP, ez az üzenet „függőben” állapotba kerül, és az adott előfizetőhöz van hozzárendelve. A csoport többi előfizetője nem fogja tudni elolvasni ezt az üzenetet.
  • Az előfizetők az első említésre automatikusan létrejönnek, nincs szükség kifejezetten létrehozásukra.
  • -Val XREADGROUP egyszerre több különböző szálból is olvashat üzeneteket, de ahhoz, hogy ez működjön, először létre kell hoznia minden szálhoz azonos nevű csoportokat a XGROUP

Felépülés kudarc után

Az előfizető felépülhet a hibából, és újra elolvashatja a „függőben” állapotú üzenetek listáját. A valós világban azonban az előfizetők végül kudarcot vallanak. Mi történik az előfizető elakadt üzeneteivel, ha az előfizető nem tud felépülni egy hibából?
A Consumer Group egy olyan funkciót kínál, amelyet csak az ilyen esetekre használnak - amikor meg kell változtatni az üzenetek tulajdonosát.

Az első dolog, amit meg kell tennie, hogy hívja a parancsot KIFEJEZÉS, amely megjeleníti a csoport összes „függőben” állapotú üzenetét. A legegyszerűbb formájában a parancsot csak két argumentum hívja meg: a szálnév és a csoportnév:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

A csapat megjelenítette a feldolgozatlan üzenetek számát a teljes csoportra és minden előfizetőre vonatkozóan. Bobnak csak két kiemelkedő üzenete van, mert az egyetlen üzenetet, amelyet Alice kért, megerősítették XACK.

További érvekkel több információt kérhetünk:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - azonosítók tartománya (használhat „-” és „+” karaktereket)
{count} – kézbesítési kísérletek száma
{consumer-name} – csoport neve

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Mostantól minden üzenethez részletes adatokat találunk: azonosító, előfizető neve, tétlenségi idő ezredmásodpercben és végül a kézbesítési kísérletek száma. Két üzenetünk érkezett Bobtól, és 74170458 ezredmásodpercig tétlenek voltak, körülbelül 20 órán keresztül.

Kérjük, vegye figyelembe, hogy senki sem akadályoz meg bennünket abban, hogy pusztán annak használatával ellenőrizzük, mi volt az üzenet tartalma XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Csak kétszer kell megismételnünk ugyanazt az azonosítót az argumentumokban. Most, hogy van valami ötletünk, Alice úgy dönthet, hogy 20 órás leállás után Bob valószínűleg nem fog felépülni, és ideje lekérdezni ezeket az üzeneteket, és folytatni a feldolgozást Bob számára. Ehhez a parancsot használjuk XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Ezzel a paranccsal „idegen” üzenetet kaphatunk, amelyet még nem dolgoztunk fel, ha a tulajdonost {fogyasztóra változtatjuk. Ugyanakkor megadhatunk egy minimális üresjárati időt is {min-idle-time}. Ez segít elkerülni azt a helyzetet, amikor két ügyfél egyszerre próbálja megváltoztatni ugyanazon üzenetek tulajdonosát:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Az első ügyfél visszaállítja az állásidőt és növeli a szállítási számlálót. Így a második ügyfél nem tudja kérni.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Az üzenetet sikeresen lefoglalta Alice, aki most már képes feldolgozni és nyugtázni az üzenetet.

A fenti példából láthatja, hogy a sikeres kérés magának az üzenetnek a tartalmát adja vissza. Ez azonban nem szükséges. A JUSTID opció csak üzenetazonosítók visszaadására használható. Ez akkor hasznos, ha nem érdeklik az üzenet részletei, és növelni szeretné a rendszer teljesítményét.

Szállítási pult

A kimeneten látható számláló KIFEJEZÉS az egyes üzenetek kézbesítéseinek száma. Az ilyen számláló kétféleképpen növekszik: amikor sikeresen lekérnek egy üzenetet a következőn keresztül XCLAIM vagy ha hívást használnak XREADGROUP.

Normális, hogy egyes üzeneteket többször kézbesítenek. A lényeg az, hogy az összes üzenetet végül feldolgozzák. Néha problémák lépnek fel egy üzenet feldolgozása során, mert maga az üzenet sérült, vagy az üzenetfeldolgozás hibát okoz a kezelőkódban. Ebben az esetben kiderülhet, hogy senki sem fogja tudni feldolgozni ezt az üzenetet. Mivel van kézbesítési kísérlet számlálónk, ezt a számlálót tudjuk használni az ilyen helyzetek észlelésére. Ezért, ha a kézbesítések száma eléri az Ön által megadott magas számot, valószínűleg bölcsebb lenne egy ilyen üzenetet egy másik szálra helyezni, és értesítést küldeni a rendszergazdának.

Szál állapota

Csapat XINFO arra szolgál, hogy különféle információkat kérjen egy szálról és csoportjairól. Például egy alapparancs így néz ki:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

A fenti parancs általános információkat jelenít meg a megadott adatfolyamról. Most egy kicsit összetettebb példa:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

A fenti parancs általános információkat jelenít meg a megadott szál összes csoportjára vonatkozóan

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

A fenti parancs információkat jelenít meg a megadott adatfolyam és csoport összes előfizetőjéről.
Ha elfelejti a parancs szintaxisát, kérjen segítséget magától a parancstól:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Stream méretkorlát

Sok alkalmazás nem akar örökre adatfolyamba gyűjteni. Gyakran hasznos, ha szálanként maximális számú üzenetet engedélyezünk. Más esetekben hasznos az összes üzenetet áthelyezni egy szálból egy másik állandó tárolóba, amikor elérte a megadott szálméretet. A folyam méretét a parancs MAXLEN paraméterével korlátozhatja XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

A MAXLEN használatakor a régi rekordok automatikusan törlődnek, amikor elérik a meghatározott hosszúságot, így a folyam állandó méretű. A metszés azonban ebben az esetben nem a leghatékonyabb módon történik a Redis memóriájában. A következőképpen javíthat a helyzeten:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

A fenti példában a ~ argumentum azt jelenti, hogy nem feltétlenül kell a folyam hosszát egy adott értékre korlátoznunk. Példánkban ez bármely 1000-nél nagyobb vagy azzal egyenlő szám lehet (például 1000, 1010 vagy 1030). Az imént kifejezetten meghatároztuk, hogy legalább 1000 rekordot szeretnénk tárolni a streamünkben. Ez sokkal hatékonyabbá teszi a memóriakezelést a Redisben.

Van külön csapat is XTRIM, amely ugyanazt teszi:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Állandó tárolás és replikáció

A Redis Stream aszinkron módon replikálódik a szolga csomópontokra, és olyan fájlokba kerül mentésre, mint az AOF (az összes adat pillanatképe) és az RDB (az összes írási művelet naplója). A fogyasztói csoportok állapotának replikációja szintén támogatott. Ezért, ha egy üzenet „függőben” állapotban van a fő csomóponton, akkor a szolga csomópontokon ez az üzenet ugyanaz lesz.

Egyedi elemek eltávolítása egy adatfolyamból

Van egy speciális parancs az üzenetek törlésére XDEL. A parancs megkapja a szál nevét, majd a törölni kívánt üzenetazonosítókat:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

A parancs használatakor figyelembe kell venni, hogy a tényleges memória nem szabadul fel azonnal.

Nulla hosszúságú patakok

A streamek és a többi Redis adatstruktúra közötti különbség az, hogy amikor más adatstruktúrákban már nincsenek elemek, mellékhatásként maga az adatstruktúra törlődik a memóriából. Így például a rendezett halmaz teljesen el lesz távolítva, amikor a ZREM hívás eltávolítja az utolsó elemet. Ehelyett a szálak a memóriában maradhatnak, még akkor is, ha nincs benne elem.

Következtetés

A Redis Stream ideális üzenetközvetítők, üzenetsorok, egységes naplózási és előzményeket őrző csevegőrendszerek létrehozására.

Ahogy egyszer mondtam Niklaus Wirth, a programok algoritmusok plusz adatstruktúrák, és a Redis már mindkettőt megadja.

Forrás: will.com

Hozzászólás