Redis Stream – spolehlivost a škálovatelnost vašich systémů zasílání zpráv

Redis Stream – spolehlivost a škálovatelnost vašich systémů zasílání zpráv

Redis Stream je nový abstraktní datový typ představený v Redis ve verzi 5.0
Koncepčně je Redis Stream seznam, do kterého můžete přidávat položky. Každý záznam má jedinečný identifikátor. Ve výchozím nastavení je ID generováno automaticky a obsahuje časové razítko. Proto se můžete v průběhu času dotazovat na rozsahy záznamů nebo přijímat nová data, když přicházejí do proudu, podobně jako Unixový příkaz "tail -f" čte soubor protokolu a zamrzne při čekání na nová data. Všimněte si, že více klientů může naslouchat vláknu současně, stejně jako mnoho procesů "tail -f" může číst soubor současně bez vzájemného konfliktu.

Abychom pochopili všechny výhody nového datového typu, pojďme se rychle podívat na dlouho existující struktury Redis, které částečně replikují funkcionalitu Redis Stream.

Redis PUB/SUB

Redis Pub/Sub je jednoduchý systém zasílání zpráv, který je již zabudován do vašeho úložiště párů klíč–hodnota. Jednoduchost však něco stojí:

  • Pokud vydavatel z nějakého důvodu selže, přijde o všechny své předplatitele
  • Vydavatel potřebuje znát přesnou adresu všech svých předplatitelů
  • Vydavatel může přetížit své předplatitele prací, pokud jsou data publikována rychleji, než jsou zpracovávána
  • Zpráva je vymazána z vyrovnávací paměti vydavatele ihned po zveřejnění, bez ohledu na to, kolika odběratelům byla doručena a jak rychle byli schopni tuto zprávu zpracovat.
  • Všichni předplatitelé obdrží zprávu ve stejnou dobu. Sami předplatitelé se mezi sebou musí nějak dohodnout na pořadí zpracování stejné zprávy.
  • Neexistuje žádný vestavěný mechanismus, který by potvrdil, že předplatitel úspěšně zpracoval zprávu. Pokud předplatitel obdrží zprávu a během zpracování dojde k chybě, vydavatel se o tom nedozví.

Seznam Redis

Redis List je datová struktura, která podporuje blokování příkazů čtení. Můžete přidávat a číst zprávy od začátku nebo konce seznamu. Na základě této struktury můžete vytvořit dobrý zásobník nebo frontu pro váš distribuovaný systém a ve většině případů to bude stačit. Hlavní rozdíly oproti Redis Pub/Sub:

  • Zpráva je doručena jednomu klientovi. První klient s blokovaným čtením obdrží data jako první.
  • Clint musí zahájit operaci čtení pro každou zprávu sám. Seznam neví nic o klientech.
  • Zprávy jsou uloženy, dokud si je někdo nepřečte nebo je výslovně nesmaže. Pokud nakonfigurujete server Redis tak, aby vyprázdnil data na disk, spolehlivost systému se dramaticky zvýší.

Úvod do Stream

Přidání záznamu do streamu

Tým XADD přidá do streamu nový záznam. Záznam není jen řetězec, skládá se z jednoho nebo více párů klíč-hodnota. Každý záznam je tedy již strukturován a připomíná strukturu souboru CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Ve výše uvedeném příkladu přidáme do streamu dvě pole s názvem (klíčem) „mystream“: „id-senzoru“ a „teplota“ s hodnotami „1234“ a „19.8“. Jako druhý argument příkaz přebírá identifikátor, který bude přiřazen položce – tento identifikátor jednoznačně identifikuje každou položku v proudu. V tomto případě jsme však prošli *, protože chceme, aby nám Redis vygeneroval nové ID. Každé nové ID se zvýší. Proto bude mít každý nový záznam vyšší identifikátor než předchozí záznamy.

Formát identifikátoru

ID položky vrácené příkazem XADD, se skládá ze dvou částí:

{millisecondsTime}-{sequenceNumber}

milisekundyČas — Unixový čas v milisekundách (čas serveru Redis). Pokud je však aktuální čas stejný nebo menší než čas předchozího záznamu, použije se časové razítko předchozího záznamu. Pokud se tedy čas serveru vrátí v čase, nový identifikátor si stále zachová vlastnost increment.

sekvenceNumber používá se pro záznamy vytvořené ve stejné milisekundě. sekvenceNumber se zvýší o 1 vzhledem k předchozímu záznamu. Protože sekvenceNumber má velikost 64 bitů, pak byste v praxi neměli narazit na limit počtu záznamů, které lze vygenerovat během jedné milisekundy.

Formát takových identifikátorů se může na první pohled zdát zvláštní. Nedůvěřivý čtenář by se mohl divit, proč je čas součástí identifikátoru. Důvodem je, že streamy Redis podporují dotazy na rozsah podle ID. Protože je identifikátor spojen s časem vytvoření záznamu, umožňuje to dotazovat se na časové rozsahy. Když se podíváme na příkaz, podíváme se na konkrétní příklad XRANGE.

Pokud uživatel z nějakého důvodu potřebuje specifikovat svůj vlastní identifikátor, který je například spojen s nějakým externím systémem, pak jej můžeme předat příkazu XADD místo *, jak je uvedeno níže:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Upozorňujeme, že v tomto případě musíte přírůstek ID sledovat sami. V našem příkladu je minimální identifikátor "0-1", takže příkaz nepřijme jiný identifikátor, který je roven nebo menší než "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Počet záznamů na stream

Počet záznamů v streamu je možné získat jednoduše pomocí příkazu XLEN. V našem příkladu tento příkaz vrátí následující hodnotu:

> XLEN somestream
(integer) 2

Rozsahové dotazy - XRANGE a XREVRANGE

Abychom mohli požadovat data podle rozsahu, musíme zadat dva identifikátory – začátek a konec rozsahu. Vrácený rozsah bude zahrnovat všechny prvky včetně hranic. Existují také dva speciální identifikátory „-“ a „+“, které znamenají nejmenší (první záznam) a největší (poslední záznam) identifikátor ve streamu. Níže uvedený příklad zobrazí seznam všech položek streamu.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Každý vrácený záznam je polem dvou prvků: identifikátoru a seznamu párů klíč-hodnota. Již jsme řekli, že identifikátory záznamů souvisí s časem. Proto můžeme požádat o rozsah konkrétního časového období. V požadavku však můžeme uvést nikoli celý identifikátor, ale pouze Unixový čas, přičemž vynecháme část související s sekvenceNumber. Vynechaná část identifikátoru bude automaticky nastavena na nulu na začátku rozsahu a na maximální možnou hodnotu na konci rozsahu. Níže je uveden příklad, jak můžete požádat o rozsah dvou milisekund.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

V tomto rozsahu máme pouze jednu položku, ale v reálných souborech dat může být vrácený výsledek obrovský. Z tohoto důvodu XRANGE podporuje možnost COUNT. Zadáním množství můžeme jednoduše získat prvních N záznamů. Pokud potřebujeme získat dalších N záznamů (stránkování), můžeme použít poslední přijaté ID, zvětšit jej sekvenceNumber jednou a zeptejte se znovu. Podívejme se na to v následujícím příkladu. Začneme přidávat 10 prvků s XADD (za předpokladu, že mystream byl již naplněn 10 prvky). Chcete-li zahájit iteraci získáním 2 prvků na příkaz, začneme s celým rozsahem, ale s COUNT rovným 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Chcete-li pokračovat v iteraci s dalšími dvěma prvky, musíme vybrat poslední přijaté ID, tj. 1519073279157-0, a přidat 1 k sekvenceNumber.
Výsledné ID, v tomto případě 1519073279157-1, lze nyní použít jako nový argument začátku rozsahu pro další volání XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

A tak dále. Kvůli složitosti XRANGE je O(log(N)) pro vyhledávání a poté O(M) pro návrat M prvků, pak je každý iterační krok rychlý. Tedy pomocí XRANGE streamy lze efektivně iterovat.

Tým XREVRANGE je ekvivalent XRANGE, ale vrátí prvky v opačném pořadí:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Vezměte prosím na vědomí, že příkaz XREVRANGE přebírá argumenty rozsahu start a stop v opačném pořadí.

Čtení nových záznamů pomocí XREAD

Často vzniká úkol přihlásit se k odběru streamu a přijímat pouze nové zprávy. Tento koncept se může zdát podobný Redis Pub/Sub nebo blokování Redis Listu, ale existují zásadní rozdíly v tom, jak používat Redis Stream:

  1. Každá nová zpráva je standardně doručena každému účastníkovi. Toto chování se liší od blokovacího seznamu Redis, kde si novou zprávu přečte pouze jeden odběratel.
  2. Zatímco v Redis Pub/Sub jsou všechny zprávy zapomenuty a nikdy neuchovány, ve Stream jsou všechny zprávy uchovávány po neomezenou dobu (pokud klient výslovně nezpůsobí smazání).
  3. Redis Stream umožňuje rozlišit přístup ke zprávám v rámci jednoho streamu. Konkrétní odběratel může vidět pouze svou osobní historii zpráv.

Pomocí příkazu se můžete přihlásit k odběru vlákna a přijímat nové zprávy XREAD. Je to trochu složitější než XRANGE, takže nejprve začneme s jednoduššími příklady.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Výše uvedený příklad ukazuje neblokující formulář XREAD. Všimněte si, že možnost COUNT je volitelná. Ve skutečnosti je jedinou požadovanou volbou příkazu volba STREAMS, která specifikuje seznam toků spolu s odpovídajícím maximálním identifikátorem. Napsali jsme „STREAMS mystream 0“ – chceme přijímat všechny záznamy streamu mystream s identifikátorem větším než „0-0“. Jak můžete vidět z příkladu, příkaz vrací název vlákna, protože se můžeme přihlásit k odběru více vláken současně. Mohli bychom napsat například „STREAMS mystream otherstream 0 0“. Upozorňujeme, že po volbě STREAMS musíme nejprve poskytnout názvy všech požadovaných streamů a teprve potom seznam identifikátorů.

V této jednoduché podobě nedělá příkaz nic zvláštního ve srovnání s XRANGE. Zajímavostí však je, že se můžeme snadno otáčet XREAD na blokovací příkaz s uvedením argumentu BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Ve výše uvedeném příkladu je zadána nová možnost BLOK s časovým limitem 0 milisekund (to znamená čekání po neomezenou dobu). Navíc místo předání obvyklého identifikátoru pro stream mystream byl předán speciální identifikátor $. Tento speciální identifikátor to znamená XREAD musí jako identifikátor použít maximální identifikátor v mystream. Nové zprávy tedy budeme dostávat pouze od okamžiku, kdy jsme začali naslouchat. V některých ohledech je to podobné unixovému příkazu "tail -f".

Všimněte si, že při použití možnosti BLOCK nemusíme nutně používat speciální identifikátor $. Můžeme použít jakýkoli identifikátor existující ve streamu. Pokud tým může naši žádost obsloužit okamžitě bez blokování, učiní tak, v opačném případě zablokuje.

Blokování XREAD může také poslouchat více vláken najednou, stačí zadat jejich názvy. V tomto případě příkaz vrátí záznam prvního proudu, který přijal data. První odběratel blokovaný pro dané vlákno obdrží data jako první.

Skupiny spotřebitelů

V určitých úlohách chceme omezit přístup odběratelů ke zprávám v rámci jednoho vlákna. Příkladem, kde by to mohlo být užitečné, je fronta zpráv s pracovníky, kteří budou přijímat různé zprávy z vlákna, což umožňuje škálování zpracování zpráv.

Pokud si představíme, že máme tři předplatitele C1, C2, C3 a vlákno, které obsahuje zprávy 1, 2, 3, 4, 5, 6, 7, budou zprávy podávány jako na níže uvedeném diagramu:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

K dosažení tohoto efektu používá Redis Stream koncept nazvaný Consumer Group. Tento koncept je podobný pseudopředplatiteli, který přijímá data z proudu, ale ve skutečnosti je obsluhován více předplatiteli v rámci skupiny, což poskytuje určité záruky:

  1. Každá zpráva je doručena jinému odběrateli ve skupině.
  2. V rámci skupiny jsou účastníci identifikováni svým jménem, ​​což je řetězec citlivý na velká a malá písmena. Pokud účastník dočasně opustí skupinu, může být do skupiny obnoven pomocí svého vlastního jedinečného jména.
  3. Každá skupina spotřebitelů se řídí konceptem „první nepřečtené zprávy“. Když si předplatitel vyžádá nové zprávy, může přijímat pouze zprávy, které ještě nikdy nebyly doručeny žádnému předplatiteli ve skupině.
  4. Existuje příkaz, který výslovně potvrzuje, že zpráva byla úspěšně zpracována předplatitelem. Dokud nebude tento příkaz vyvolán, zůstane požadovaná zpráva ve stavu „čekající“.
  5. V rámci spotřebitelské skupiny si každý předplatitel může vyžádat historii zpráv, které mu byly doručeny, ale ještě nebyly zpracovány (ve stavu „nevyřízeno“)

V jistém smyslu lze stav skupiny vyjádřit takto:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nyní je čas seznámit se s hlavními příkazy pro spotřebitelskou skupinu, konkrétně:

  • XGROUP slouží k vytváření, ničení a správě skupin
  • XREADGROUP používá se ke čtení proudu přes skupinu
  • XACK - tento příkaz umožňuje účastníkovi označit zprávu jako úspěšně zpracovanou

Vytvoření skupiny spotřebitelů

Předpokládejme, že mystream již existuje. Poté bude příkaz pro vytvoření skupiny vypadat takto:

> XGROUP CREATE mystream mygroup $
OK

Při vytváření skupiny musíme předat identifikátor, od kterého bude skupina přijímat zprávy. Pokud chceme pouze přijímat všechny nové zprávy, pak můžeme použít speciální identifikátor $ (jako v našem příkladu výše). Pokud místo speciálního identifikátoru zadáte 0, budou skupině k dispozici všechny zprávy ve vláknu.

Nyní, když je skupina vytvořena, můžeme okamžitě začít číst zprávy pomocí příkazu XREADGROUP. Tento příkaz je velmi podobný XREAD a podporuje volitelnou možnost BLOCK. Existuje však povinná volba GROUP, která musí být vždy zadána dvěma argumenty: názvem skupiny a jménem účastníka. Podporována je také možnost COUNT.

Než si přečtete vlákno, vložte tam několik zpráv:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Nyní se pokusíme přečíst tento stream prostřednictvím skupiny:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Výše uvedený příkaz se čte doslovně takto:

"Já, předplatitelka Alice, členka mygroup, si chci přečíst jednu zprávu z mystreamu, která ještě nikdy nebyla nikomu doručena."

Pokaždé, když předplatitel provádí operaci na skupině, musí poskytnout své jméno, které se v rámci skupiny jednoznačně identifikuje. Ve výše uvedeném příkazu je ještě jeden velmi důležitý detail - speciální identifikátor ">". Tento speciální identifikátor filtruje zprávy a ponechává pouze ty, které ještě nikdy nebyly doručeny.

Ve zvláštních případech můžete také zadat skutečný identifikátor, jako je 0 nebo jakýkoli jiný platný identifikátor. V tomto případě příkaz XREADGROUP vám vrátí historii zpráv se stavem „nevyřízeno“, které byly doručeny zadanému odběrateli (Alice), ale ještě nebyly potvrzeny pomocí příkazu XACK.

Toto chování můžeme otestovat okamžitým zadáním ID 0, bez možnosti COUNT. Jednoduše uvidíme jednu nevyřízenou zprávu, tedy zprávu jablka:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Pokud však zprávu potvrdíme jako úspěšně zpracovanou, pak se již nebude zobrazovat:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Teď je řada na Bobovi, aby si něco přečetl:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, člen mé skupiny, nepožádal o více než dvě zprávy. Příkaz pouze hlásí nedoručené zprávy kvůli speciálnímu identifikátoru ">". Jak vidíte, zpráva „jablko“ se nezobrazí, protože již byla doručena Alici, takže Bob obdrží „pomeranč“ a „jahoda“.

Tímto způsobem mohou Alice, Bob a jakýkoli jiný odběratel skupiny číst různé zprávy ze stejného streamu. Mohou si také přečíst svou historii nezpracovaných zpráv nebo označit zprávy jako zpracované.

Je třeba mít na paměti několik věcí:

  • Jakmile předplatitel považuje zprávu za příkaz XREADGROUP, tato zpráva přejde do stavu „nevyřízeno“ a je přiřazena tomuto konkrétnímu účastníkovi. Ostatní účastníci skupiny nebudou moci tuto zprávu číst.
  • Odběratelé se vytvářejí automaticky při první zmínce, není třeba je explicitně vytvářet.
  • S XREADGROUP můžete číst zprávy z více různých vláken současně, ale aby to fungovalo, musíte nejprve vytvořit skupiny se stejným názvem pro každé vlákno pomocí XGROUP

Obnova po selhání

Účastník se může zotavit ze selhání a znovu si přečíst svůj seznam zpráv se stavem „čekající“. V reálném světě však mohou předplatitelé nakonec selhat. Co se stane se zaseknutými zprávami předplatitele, pokud se předplatitel nedokáže zotavit po selhání?
Consumer Group nabízí funkci, která se používá právě pro takové případy – když potřebujete změnit vlastníka zpráv.

První věc, kterou musíte udělat, je zavolat příkaz EXPENDING, která zobrazuje všechny zprávy ve skupině se stavem „čekající“. Ve své nejjednodušší podobě je příkaz volán pouze se dvěma argumenty: názvem vlákna a názvem skupiny:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tým zobrazil počet nezpracovaných zpráv pro celou skupinu a pro každého odběratele. Máme pouze Boba se dvěma nevyřízenými zprávami, protože jediná zpráva, kterou Alice požadovala, byla potvrzena XACK.

Můžeme požádat o další informace pomocí více argumentů:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} – rozsah identifikátorů (můžete použít „-“ a „+“)
{count} — počet pokusů o doručení
{consumer-name} – název skupiny

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nyní máme podrobnosti pro každou zprávu: ID, jméno účastníka, doba nečinnosti v milisekundách a nakonec počet pokusů o doručení. Máme dvě zprávy od Boba a byly nečinné po dobu 74170458 milisekund, přibližně 20 hodin.

Vezměte prosím na vědomí, že nám nikdo nebrání zkontrolovat, co bylo obsahem zprávy, pouhým použitím XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Musíme jen dvakrát zopakovat stejný identifikátor v argumentech. Teď, když máme nějakou představu, se Alice může rozhodnout, že po 20 hodinách odstávky se Bob pravděpodobně nevzpamatuje a je čas se na tyto zprávy zeptat a obnovit je pro Boba. K tomu použijeme příkaz XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Pomocí tohoto příkazu můžeme obdržet „cizí“ zprávu, která ještě nebyla zpracována, a to změnou vlastníka na {consumer}. Můžeme však také poskytnout minimální dobu nečinnosti {min-idle-time}. To pomáhá vyhnout se situaci, kdy se dva klienti pokoušejí současně změnit vlastníka stejných zpráv:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

První zákazník vynuluje prostoj a zvýší počítadlo doručení. Takže druhý klient o to nebude moci požádat.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Zpráva byla úspěšně vyzvednuta Alice, která nyní může zprávu zpracovat a potvrdit ji.

Z výše uvedeného příkladu můžete vidět, že úspěšný požadavek vrátí obsah samotné zprávy. To však není nutné. Volbu JUSTID lze použít pouze k vrácení ID zpráv. To je užitečné, pokud vás nezajímají podrobnosti zprávy a chcete zvýšit výkon systému.

Doručovací pult

Čítač, který vidíte na výstupu EXPENDING je počet doručení každé zprávy. Takové počítadlo se zvyšuje dvěma způsoby: když je zpráva úspěšně vyžádána přes XCLAIM nebo při použití hovoru XREADGROUP.

Je normální, že některé zprávy jsou doručeny vícekrát. Hlavní věc je, že všechny zprávy jsou nakonec zpracovány. Někdy nastanou problémy při zpracování zprávy, protože samotná zpráva je poškozená nebo zpracování zprávy způsobí chybu v kódu obslužné rutiny. V takovém případě se může ukázat, že tuto zprávu nebude moci nikdo zpracovat. Protože máme počítadlo pokusů o doručení, můžeme toto počítadlo použít k detekci takových situací. Jakmile tedy počet doručení dosáhne vámi zadaného vysokého čísla, bylo by pravděpodobně moudřejší umístit takovou zprávu do jiného vlákna a poslat upozornění správci systému.

Stav vlákna

Tým XINFO používá se k vyžádání různých informací o vláknu a jeho skupinách. Základní příkaz vypadá například takto:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Výše uvedený příkaz zobrazí obecné informace o zadaném proudu. Nyní trochu složitější příklad:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Výše uvedený příkaz zobrazí obecné informace pro všechny skupiny zadaného vlákna

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Výše uvedený příkaz zobrazí informace pro všechny odběratele zadaného streamu a skupiny.
Pokud zapomenete syntaxi příkazu, požádejte o pomoc samotný příkaz:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limit velikosti streamu

Mnoho aplikací nechce shromažďovat data do streamu navždy. Často je užitečné mít maximální povolený počet zpráv na vlákno. V ostatních případech je užitečné přesunout všechny zprávy z vlákna do jiného trvalého úložiště, když je dosaženo zadané velikosti vlákna. Velikost proudu můžete omezit pomocí parametru MAXLEN v příkazu XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Při použití MAXLEN se staré záznamy automaticky vymažou, když dosáhnou zadané délky, takže stream má konstantní velikost. K ořezávání však v tomto případě nedochází v paměti Redis tím nejefektivnějším způsobem. Situaci můžete zlepšit následovně:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ ve výše uvedeném příkladu znamená, že nemusíme nutně omezovat délku proudu na konkrétní hodnotu. V našem příkladu to může být libovolné číslo větší nebo rovné 1000 (například 1000, 1010 nebo 1030). Právě jsme výslovně uvedli, že chceme, aby náš stream ukládal alespoň 1000 záznamů. Díky tomu je správa paměti v Redis mnohem efektivnější.

Existuje také samostatný tým XTRIM, který dělá to samé:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trvalé úložiště a replikace

Redis Stream je asynchronně replikován do slave uzlů a ukládán do souborů jako AOF (snímek všech dat) a RDB (protokol všech operací zápisu). Podporována je také replikace stavu spotřebitelských skupin. Pokud je tedy zpráva ve stavu „nevyřízeno“ na hlavním uzlu, pak na podřízených uzlech bude mít tato zpráva stejný stav.

Odebrání jednotlivých prvků z proudu

Pro mazání zpráv existuje speciální příkaz XDEL. Příkaz získá název vlákna následovaný ID zpráv, které mají být odstraněny:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Při použití tohoto příkazu je třeba počítat s tím, že skutečná paměť se neuvolní okamžitě.

Proudy nulové délky

Rozdíl mezi streamy a jinými datovými strukturami Redis je v tom, že když jiné datové struktury již v sobě nemají prvky, jako vedlejší efekt bude samotná datová struktura odstraněna z paměti. Takže například setříděná sada bude zcela odstraněna, když volání ZREM odstraní poslední prvek. Místo toho mohou vlákna zůstat v paměti i bez jakýchkoli prvků uvnitř.

Závěr

Redis Stream je ideální pro vytváření zprostředkovatelů zpráv, front zpráv, jednotného protokolování a chatovacích systémů s uchováváním historie.

Jak jsem kdysi řekl Niklaus Wirth, programy jsou algoritmy a datové struktury a Redis vám již nabízí obojí.

Zdroj: www.habr.com

Přidat komentář