Redis Stream – spoľahlivosť a škálovateľnosť vašich systémov zasielania správ

Redis Stream – spoľahlivosť a škálovateľnosť vašich systémov zasielania správ

Redis Stream je nový abstraktný dátový typ predstavený v Redis vo verzii 5.0
Koncepčne je Redis Stream zoznam, do ktorého môžete pridávať položky. Každý záznam má jedinečný identifikátor. Štandardne sa ID generuje automaticky a obsahuje časovú pečiatku. Preto môžete vyhľadávať rozsahy záznamov v priebehu času alebo prijímať nové údaje, keď prichádzajú do prúdu, podobne ako príkaz Unix "tail -f" načíta súbor denníka a zamrzne počas čakania na nové údaje. Všimnite si, že viacerí klienti môžu počúvať vlákno súčasne, rovnako ako mnoho procesov "tail -f" môže čítať súbor súčasne bez toho, aby boli vo vzájomnom konflikte.

Aby sme pochopili všetky výhody nového dátového typu, pozrime sa v krátkosti na dlho existujúce štruktúry Redis, ktoré čiastočne replikujú funkčnosť Redis Stream.

Redis PUB/SUB

Redis Pub/Sub je jednoduchý systém zasielania správ, ktorý je už zabudovaný do vášho obchodu s hodnotami kľúča. Jednoduchosť však má svoju cenu:

  • Ak vydavateľ z nejakého dôvodu zlyhá, stratí všetkých svojich predplatiteľov
  • Vydavateľ potrebuje poznať presnú adresu všetkých svojich predplatiteľov
  • Vydavateľ môže preťažiť svojich predplatiteľov prácou, ak sa údaje zverejňujú rýchlejšie, ako sa spracúvajú
  • Správa sa vymaže z vyrovnávacej pamäte vydavateľa ihneď po zverejnení, bez ohľadu na to, koľkým odberateľom bola doručená a ako rýchlo boli schopní túto správu spracovať.
  • Všetci odberatelia dostanú správu v rovnakom čase. Samotní odberatelia sa musia medzi sebou nejako dohodnúť na poradí spracovania tej istej správy.
  • Neexistuje žiadny vstavaný mechanizmus na potvrdenie, že predplatiteľ úspešne spracoval správu. Ak predplatiteľ dostane správu a počas spracovania zlyhá, vydavateľ sa o tom nedozvie.

Zoznam Redis

Redis List je dátová štruktúra, ktorá podporuje blokovanie príkazov na čítanie. Môžete pridávať a čítať správy od začiatku alebo konca zoznamu. Na základe tejto štruktúry môžete vytvoriť dobrý zásobník alebo front pre váš distribuovaný systém a vo väčšine prípadov to bude stačiť. Hlavné rozdiely oproti Redis Pub/Sub:

  • Správa je doručená jednému klientovi. Prvý klient s blokovaním čítania dostane údaje ako prvý.
  • Clint musí spustiť operáciu čítania pre každú správu sám. Zoznam nevie nič o klientoch.
  • Správy sú uložené, kým si ich niekto neprečíta alebo kým ich explicitne nevymaže. Ak server Redis nakonfigurujete na vyprázdnenie údajov na disk, spoľahlivosť systému sa dramaticky zvýši.

Úvod do streamu

Pridanie záznamu do streamu

Tím XADD pridá nový záznam do streamu. Záznam nie je len reťazec, pozostáva z jedného alebo viacerých párov kľúč – hodnota. Každý záznam je teda už štruktúrovaný a pripomína štruktúru súboru CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Vo vyššie uvedenom príklade pridáme do streamu dve polia s názvom (kľúčom) „mystream“: „sensor-id“ a „temperature“ s hodnotami „1234“ a „19.8“. Ako druhý argument príkaz berie identifikátor, ktorý bude priradený k záznamu – tento identifikátor jednoznačne identifikuje každý záznam v streame. V tomto prípade sme však prešli *, pretože chceme, aby nám Redis vygeneroval nové ID. Každé nové ID sa zvýši. Preto bude mať každý nový záznam vyšší identifikátor v porovnaní s predchádzajúcimi záznamami.

Formát identifikátora

ID položky vrátené príkazom XADD, pozostáva z dvoch častí:

{millisecondsTime}-{sequenceNumber}

milisekundyČas — Unixový čas v milisekundách (čas servera Redis). Ak je však aktuálny čas rovnaký alebo menší ako čas predchádzajúceho záznamu, použije sa časová značka predchádzajúceho záznamu. Preto, ak sa čas servera vráti späť v čase, nový identifikátor si stále zachová vlastnosť prírastku.

poradové číslo používa sa pre záznamy vytvorené v rovnakej milisekunde. poradové číslo sa zvýši o 1 v porovnaní s predchádzajúcim záznamom. Pretože poradové číslo má veľkosť 64 bitov, potom by ste v praxi nemali naraziť na obmedzenie počtu záznamov, ktoré je možné vygenerovať v priebehu jednej milisekúnd.

Formát takýchto identifikátorov sa môže zdať na prvý pohľad zvláštny. Nedôverčivý čitateľ by sa mohol čudovať, prečo je čas súčasťou identifikátora. Dôvodom je, že streamy Redis podporujú dotazy na rozsah podľa ID. Keďže identifikátor je spojený s časom vytvorenia záznamu, umožňuje to dotazovať sa na časové rozsahy. Na konkrétny príklad sa pozrieme, keď sa pozrieme na príkaz XRANGE.

Ak z nejakého dôvodu používateľ potrebuje zadať svoj vlastný identifikátor, ktorý je napríklad spojený s nejakým externým systémom, potom ho môžeme odovzdať príkazu XADD namiesto *, ako je uvedené nižšie:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Upozorňujeme, že v tomto prípade musíte prírastok ID sledovať sami. V našom príklade je minimálny identifikátor "0-1", takže príkaz nebude akceptovať iný identifikátor, ktorý je rovný alebo menší ako "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Počet záznamov na stream

Počet záznamov v streame je možné získať jednoducho pomocou príkazu XLEN. V našom príklade tento príkaz vráti nasledujúcu hodnotu:

> XLEN somestream
(integer) 2

Rozsahové dotazy - XRANGE a XREVRANGE

Na vyžiadanie údajov podľa rozsahu musíme zadať dva identifikátory – začiatok a koniec rozsahu. Vrátený rozsah bude zahŕňať všetky prvky vrátane hraníc. Existujú aj dva špeciálne identifikátory „-“ a „+“, ktoré znamenajú najmenší (prvý záznam) a najväčší (posledný záznam) identifikátor v streame. V nižšie uvedenom príklade sú uvedené všetky položky streamu.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Každý vrátený záznam je pole dvoch prvkov: identifikátor a zoznam párov kľúč – hodnota. Už sme povedali, že identifikátory záznamov súvisia s časom. Preto môžeme požiadať o rozsah konkrétneho časového obdobia. V žiadosti však môžeme uviesť nie celý identifikátor, ale iba Unixový čas, pričom vynecháme časť týkajúcu sa poradové číslo. Vynechaná časť identifikátora sa automaticky nastaví na nulu na začiatku rozsahu a na maximálnu možnú hodnotu na konci rozsahu. Nižšie je uvedený príklad, ako môžete požiadať o rozsah dvoch milisekúnd.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

V tomto rozsahu máme iba jednu položku, ale v skutočných súboroch údajov môže byť vrátený výsledok obrovský. Pre tento dôvod XRANGE podporuje možnosť COUNT. Zadaním množstva jednoducho získame prvých N záznamov. Ak potrebujeme získať ďalších N záznamov (stránkovanie), môžeme použiť posledné prijaté ID, zvýšiť ho poradové číslo po jednej a spýtaj sa znova. Pozrime sa na to v nasledujúcom príklade. Začneme pridávať 10 prvkov s XADD (za predpokladu, že mystream už bol naplnený 10 prvkami). Ak chcete začať iteráciu získaním 2 prvkov na príkaz, začneme s celým rozsahom, ale s COUNT rovným 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Ak chcete pokračovať v iterácii s ďalšími dvoma prvkami, musíme vybrať posledné prijaté ID, t. j. 1519073279157-0, a pridať 1 k poradové číslo.
Výsledné ID, v tomto prípade 1519073279157-1, možno teraz použiť ako nový argument začiatku rozsahu pre ďalšie volanie XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

A tak ďalej. Pretože zložitosť XRANGE je O(log(N)) na vyhľadávanie a potom O(M) na vrátenie M prvkov, potom je každý krok iterácie rýchly. Teda pomocou XRANGE streamy je možné efektívne iterovať.

Tím XREVRANGE je ekvivalent XRANGE, ale vráti prvky v opačnom poradí:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Upozorňujeme, že príkaz XREVRANGE berie argumenty rozsahu štart a stop v opačnom poradí.

Čítanie nových záznamov pomocou XREAD

Často vzniká úloha prihlásiť sa na odber streamu a prijímať iba nové správy. Tento koncept sa môže zdať podobný Redis Pub/Sub alebo blokovaniu zoznamu Redis, existujú však zásadné rozdiely v tom, ako používať Redis Stream:

  1. Každá nová správa je štandardne doručená každému účastníkovi. Toto správanie sa líši od blokovania zoznamu Redis, kde si novú správu prečíta iba jeden účastník.
  2. Zatiaľ čo v Redis Pub/Sub sú všetky správy zabudnuté a nikdy sa neuložia, v Streame sú všetky správy uchovávané na dobu neurčitú (pokiaľ klient výslovne nespôsobí vymazanie).
  3. Redis Stream umožňuje rozlišovať prístup k správam v rámci jedného streamu. Konkrétny predplatiteľ môže vidieť iba svoju históriu osobných správ.

Pomocou príkazu sa môžete prihlásiť na odber vlákna a prijímať nové správy XREAD. Je to trochu zložitejšie ako XRANGE, takže najskôr začneme s jednoduchšími príkladmi.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Vyššie uvedený príklad ukazuje neblokujúci formulár XREAD. Upozorňujeme, že možnosť COUNT je voliteľná. V skutočnosti je jedinou požadovanou voľbou príkazu voľba STREAMS, ktorá špecifikuje zoznam tokov spolu s príslušným maximálnym identifikátorom. Napísali sme „STREAMS mystream 0“ – chceme dostávať všetky záznamy streamu mystream s identifikátorom väčším ako „0-0“. Ako môžete vidieť z príkladu, príkaz vráti názov vlákna, pretože sa môžeme prihlásiť na odber viacerých vlákien súčasne. Mohli by sme napísať napríklad „STREAMS mystream otherstream 0 0“. Upozorňujeme, že po voľbe STREAMS musíme najprv poskytnúť názvy všetkých požadovaných streamov a až potom zoznam identifikátorov.

V tejto jednoduchej forme príkaz nerobí nič zvláštne v porovnaní s XRANGE. Zaujímavosťou však je, že sa môžeme ľahko otočiť XREAD na blokovací príkaz s uvedením argumentu BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Vo vyššie uvedenom príklade je špecifikovaná nová možnosť BLOCK s časovým limitom 0 milisekúnd (to znamená čakanie na neurčito). Navyše namiesto odovzdania obvyklého identifikátora pre stream mystream bol odovzdaný špeciálny identifikátor $. Tento špeciálny identifikátor to znamená XREAD musí ako identifikátor použiť maximálny identifikátor v mystream. Nové správy teda budeme dostávať len od okamihu, keď sme začali počúvať. V niektorých ohľadoch je to podobné príkazu Unix "tail -f".

Všimnite si, že pri použití možnosti BLOCK nemusíme nevyhnutne použiť špeciálny identifikátor $. Môžeme použiť akýkoľvek identifikátor existujúci v streame. Ak tím môže obslúžiť našu požiadavku okamžite bez blokovania, urobí tak, v opačnom prípade zablokuje.

Blokovanie XREAD môže tiež počúvať viacero vlákien naraz, stačí zadať ich názvy. V tomto prípade príkaz vráti záznam prvého toku, ktorý prijal dáta. Prvý účastník zablokovaný pre dané vlákno dostane dáta ako prvý.

Spotrebiteľské skupiny

V určitých úlohách chceme obmedziť prístup účastníkov k správam v rámci jedného vlákna. Príkladom, kde by to mohlo byť užitočné, je front správ s pracovníkmi, ktorí budú prijímať rôzne správy z vlákna, čo umožňuje škálovanie spracovania správ.

Ak si predstavíme, že máme troch predplatiteľov C1, C2, C3 a vlákno, ktoré obsahuje správy 1, 2, 3, 4, 5, 6, 7, správy sa budú podávať ako na obrázku nižšie:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Na dosiahnutie tohto efektu používa Redis Stream koncept s názvom Consumer Group. Tento koncept je podobný pseudopredplatiteľovi, ktorý prijíma údaje z toku, ale v skutočnosti ho obsluhuje viacero predplatiteľov v rámci skupiny, čo poskytuje určité záruky:

  1. Každá správa je doručená inému účastníkovi v rámci skupiny.
  2. V rámci skupiny sú predplatitelia identifikovaní podľa mena, čo je reťazec, v ktorom sa rozlišujú malé a veľké písmená. Ak účastník dočasne vypadne zo skupiny, môže byť do skupiny obnovený pomocou jeho vlastného jedinečného mena.
  3. Každá skupina spotrebiteľov sa riadi konceptom „prvá neprečítaná správa“. Keď si predplatiteľ vyžiada nové správy, môže prijímať iba správy, ktoré ešte nikdy neboli doručené žiadnemu predplatiteľovi v rámci skupiny.
  4. Existuje príkaz na explicitné potvrdenie, že správa bola úspešne spracovaná predplatiteľom. Kým nie je zavolaný tento príkaz, požadovaná správa zostane v stave „čakajúca“.
  5. V rámci skupiny spotrebiteľov si každý predplatiteľ môže vyžiadať históriu správ, ktoré mu boli doručené, ale ešte neboli spracované (v stave „čakajúce“).

V určitom zmysle možno stav skupiny vyjadriť takto:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Teraz je čas zoznámiť sa s hlavnými príkazmi pre skupinu spotrebiteľov, konkrétne:

  • XGROUP používa sa na vytváranie, ničenie a správu skupín
  • XREADGROUP používa sa na čítanie prúdu cez skupinu
  • XACK - tento príkaz umožňuje účastníkovi označiť správu ako úspešne spracovanú

Vytvorenie skupiny spotrebiteľov

Predpokladajme, že mystream už existuje. Potom bude príkaz na vytvorenie skupiny vyzerať takto:

> XGROUP CREATE mystream mygroup $
OK

Pri vytváraní skupiny musíme odovzdať identifikátor, od ktorého bude skupina prijímať správy. Ak chceme iba prijímať všetky nové správy, potom môžeme použiť špeciálny identifikátor $ (ako v našom príklade vyššie). Ak zadáte 0 namiesto špeciálneho identifikátora, všetky správy vo vlákne budú dostupné skupine.

Teraz, keď je skupina vytvorená, môžeme okamžite začať čítať správy pomocou príkazu XREADGROUP. Tento príkaz je veľmi podobný XREAD a podporuje voliteľnú možnosť BLOKOVAŤ. Existuje však požadovaná voľba GROUP, ktorá musí byť vždy špecifikovaná dvoma argumentmi: názvom skupiny a názvom účastníka. Podporovaná je aj možnosť COUNT.

Pred čítaním vlákna tam dajme niekoľko správ:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Teraz sa pokúsme prečítať tento stream cez skupinu:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Vyššie uvedený príkaz znie doslovne takto:

„Ja, predplatiteľka Alice, členka mygroup, si chcem prečítať jednu správu z mystreamu, ktorá ešte nebola nikomu doručená.“

Zakaždým, keď predplatiteľ vykoná operáciu na skupine, musí poskytnúť svoje meno, ktoré sa v rámci skupiny jednoznačne identifikuje. Vo vyššie uvedenom príkaze je ešte jeden veľmi dôležitý detail - špeciálny identifikátor ">". Tento špeciálny identifikátor filtruje správy a ponecháva len tie, ktoré ešte neboli nikdy doručené.

V špeciálnych prípadoch môžete zadať aj skutočný identifikátor, napríklad 0 alebo akýkoľvek iný platný identifikátor. V tomto prípade príkaz XREADGROUP vráti vám históriu správ so stavom „nevybavené“, ktoré boli doručené určenému účastníkovi (Alice), ale ešte neboli potvrdené príkazom XACK.

Toto správanie môžeme otestovať okamžitým zadaním ID 0 bez možnosti COUNT. Jednoducho uvidíme jednu čakajúcu správu, teda správu jablka:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ak však správu potvrdíme ako úspešne spracovanú, už sa nebude zobrazovať:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Teraz je na rade Bob, aby si niečo prečítal:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, člen mojej skupiny, nepožiadal o viac ako dve správy. Príkaz hlási iba nedoručené správy kvôli špeciálnemu identifikátoru ">". Ako vidíte, správa „jablko“ sa nezobrazí, pretože už bola doručená Alici, takže Bob dostane „pomaranč“ a „jahoda“.

Týmto spôsobom môžu Alice, Bob a ďalší predplatiteľ skupiny čítať rôzne správy z toho istého streamu. Môžu si tiež prečítať svoju históriu nespracovaných správ alebo označiť správy ako spracované.

Je potrebné mať na pamäti niekoľko vecí:

  • Akonáhle predplatiteľ považuje správu za príkaz XREADGROUP, táto správa prejde do stavu „čakajúca“ a je priradená tomuto konkrétnemu účastníkovi. Ostatní odberatelia skupiny si túto správu nebudú môcť prečítať.
  • Odberatelia sa vytvárajú automaticky pri prvej zmienke, nie je potrebné ich explicitne vytvárať.
  • S XREADGROUP môžete čítať správy z viacerých rôznych vlákien súčasne, ale aby to fungovalo, musíte najprv vytvoriť skupiny s rovnakým názvom pre každé vlákno pomocou XGROUP

Obnova po zlyhaní

Predplatiteľ sa môže po zlyhaní zotaviť a znovu si prečítať svoj zoznam správ so stavom „čakajúce“. V reálnom svete však môžu predplatitelia nakoniec zlyhať. Čo sa stane so zaseknutými správami predplatiteľa, ak sa predplatiteľ nedokáže zotaviť z zlyhania?
Consumer Group ponúka funkciu, ktorá sa používa práve pre takéto prípady – keď potrebujete zmeniť vlastníka správ.

Prvá vec, ktorú musíte urobiť, je zavolať príkaz EXPENDING, ktorá zobrazuje všetky správy v skupine so stavom „čakajúce“. Vo svojej najjednoduchšej forme sa príkaz volá iba s dvoma argumentmi: názvom vlákna a názvom skupiny:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tím zobrazil počet nespracovaných správ pre celú skupinu a pre každého odberateľa. Máme len Boba s dvoma nevybavenými správami, pretože jediná správa, ktorú Alice požadovala, bola potvrdená XACK.

Môžeme požiadať o ďalšie informácie pomocou viacerých argumentov:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} – rozsah identifikátorov (môžete použiť „-“ a „+“)
{count} — počet pokusov o doručenie
{consumer-name} – názov skupiny

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Teraz máme podrobnosti pre každú správu: ID, meno účastníka, čas nečinnosti v milisekundách a nakoniec počet pokusov o doručenie. Máme dve správy od Boba a boli nečinné 74170458 20 XNUMX milisekúnd, približne XNUMX hodín.

Upozorňujeme, že nikto nám nebráni v tom, aby sme skontrolovali, čo bolo obsahom správy, jednoduchým použitím XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Musíme len dvakrát zopakovať rovnaký identifikátor v argumentoch. Teraz, keď už máme nejakú predstavu, sa Alice môže rozhodnúť, že po 20 hodinách odstávky sa Bob pravdepodobne nezotaví a je načase spýtať sa týchto správ a obnoviť ich spracovanie pre Boba. Na to použijeme príkaz XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Pomocou tohto príkazu môžeme prijať „cudziu“ správu, ktorá ešte nebola spracovaná, a to zmenou vlastníka na {consumer}. Môžeme však poskytnúť aj minimálny čas nečinnosti {min-idle-time}. To pomáha vyhnúť sa situácii, keď sa dvaja klienti pokúšajú súčasne zmeniť vlastníka rovnakých správ:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Prvý zákazník vynuluje prestoj a zvýši počítadlo dodávok. Takže druhý klient o to nebude môcť požiadať.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Správu úspešne uplatnila Alice, ktorá ju teraz môže spracovať a potvrdiť.

Z vyššie uvedeného príkladu môžete vidieť, že úspešná požiadavka vráti obsah samotnej správy. Nie je to však potrebné. Voľba JUSTID sa dá použiť len na vrátenie ID správ. To je užitočné, ak vás nezaujímajú detaily správy a chcete zvýšiť výkon systému.

Doručovací pult

Počítadlo, ktoré vidíte na výstupe EXPENDING je počet doručení každej správy. Takéto počítadlo sa zvyšuje dvoma spôsobmi: keď je úspešne vyžiadaná správa cez XCLAIM alebo pri použití hovoru XREADGROUP.

Je normálne, že niektoré správy sú doručené viackrát. Hlavná vec je, že všetky správy sú nakoniec spracované. Niekedy sa vyskytnú problémy pri spracovaní správy, pretože samotná správa je poškodená alebo spracovanie správy spôsobuje chybu v kóde obsluhy. V takom prípade sa môže ukázať, že túto správu nebude môcť nikto spracovať. Keďže máme počítadlo pokusov o doručenie, môžeme pomocou tohto počítadla odhaliť takéto situácie. Preto, keď počet doručenia dosiahne vysoké číslo, ktoré zadáte, bude pravdepodobne rozumnejšie umiestniť takúto správu do iného vlákna a poslať upozornenie správcovi systému.

Stav vlákna

Tím XINFO používa sa na vyžiadanie rôznych informácií o vlákne a jeho skupinách. Napríklad základný príkaz vyzerá takto:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Vyššie uvedený príkaz zobrazí všeobecné informácie o zadanom streame. Teraz trochu zložitejší príklad:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Vyššie uvedený príkaz zobrazí všeobecné informácie pre všetky skupiny zadaného vlákna

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Vyššie uvedený príkaz zobrazuje informácie pre všetkých odberateľov zadaného streamu a skupiny.
Ak zabudnete syntax príkazu, požiadajte o pomoc samotný príkaz:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limit veľkosti streamu

Mnoho aplikácií nechce navždy zhromažďovať údaje do streamu. Často je užitočné mať povolený maximálny počet správ na vlákno. V iných prípadoch je užitočné presunúť všetky správy z vlákna do iného trvalého úložiska, keď sa dosiahne zadaná veľkosť vlákna. Veľkosť toku môžete obmedziť pomocou parametra MAXLEN v príkaze XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Pri použití MAXLEN sa staré záznamy automaticky vymažú, keď dosiahnu určenú dĺžku, takže tok má konštantnú veľkosť. Orezávanie však v tomto prípade neprebieha v pamäti Redis tým najefektívnejším spôsobom. Situáciu môžete zlepšiť takto:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ vo vyššie uvedenom príklade znamená, že nemusíme nevyhnutne obmedziť dĺžku prúdu na konkrétnu hodnotu. V našom príklade to môže byť akékoľvek číslo väčšie alebo rovné 1000 (napríklad 1000, 1010 alebo 1030). Explicitne sme špecifikovali, že chceme, aby náš stream ukladal aspoň 1000 záznamov. Vďaka tomu je správa pamäte v Redis oveľa efektívnejšia.

Existuje aj samostatný tím XTRIM, ktorý robí to isté:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trvalé ukladanie a replikácia

Redis Stream sa asynchrónne replikuje do podriadených uzlov a ukladá sa do súborov ako AOF (snímka všetkých údajov) a RDB (záznam všetkých operácií zápisu). Podporovaná je aj replikácia stavu spotrebiteľských skupín. Preto, ak je správa v stave „čakajúca“ na nadradenom uzle, potom na podriadených uzloch bude mať táto správa rovnaký stav.

Odstránenie jednotlivých prvkov z prúdu

Existuje špeciálny príkaz na odstránenie správ XDEL. Príkaz získa názov vlákna, za ktorým nasledujú ID správ, ktoré sa majú odstrániť:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Pri použití tohto príkazu musíte počítať s tým, že skutočná pamäť sa neuvoľní okamžite.

Prúdy s nulovou dĺžkou

Rozdiel medzi prúdmi a inými dátovými štruktúrami Redis je v tom, že keď ostatné dátové štruktúry už v sebe nemajú prvky, ako vedľajší efekt sa samotná dátová štruktúra odstráni z pamäte. Takže napríklad triedená množina bude úplne odstránená, keď volanie ZREM odstráni posledný prvok. Namiesto toho môžu vlákna zostať v pamäti aj bez akýchkoľvek prvkov vo vnútri.

Záver

Redis Stream je ideálny na vytváranie sprostredkovateľov správ, frontov správ, jednotného protokolovania a chatovacích systémov na uchovávanie histórie.

Ako som už raz povedal Niklaus Wirth, programy sú algoritmy plus dátové štruktúry a Redis vám už poskytuje oboje.

Zdroj: hab.com

Pridať komentár