Redis Stream - zanesljivost in razširljivost vaših sporočilnih sistemov

Redis Stream - zanesljivost in razširljivost vaših sporočilnih sistemov

Redis Stream je nov abstraktni podatkovni tip, uveden v Redis z različico 5.0
Konceptualno je Redis Stream seznam, na katerega lahko dodajate vnose. Vsak vnos ima edinstven identifikator. ID se privzeto ustvari samodejno in vključuje časovni žig. Zato lahko sčasoma poizvedujete po obsegih zapisov ali prejemate nove podatke, ko prispejo v tok, podobno kot ukaz Unix "tail -f" prebere datoteko dnevnika in zamrzne med čakanjem na nove podatke. Upoštevajte, da lahko več odjemalcev posluša nit hkrati, tako kot lahko več procesov "tail -f" hkrati bere datoteko, ne da bi bili v nasprotju med seboj.

Da bi razumeli vse prednosti novega tipa podatkov, si na hitro oglejmo že dolgo obstoječe strukture Redis, ki delno posnemajo funkcionalnost Redis Stream.

Redis PUB/SUB

Redis Pub/Sub je preprost sistem za sporočanje, ki je že vgrajen v vašo shrambo ključev in vrednosti. Vendar ima preprostost svojo ceno:

  • Če založnik iz nekega razloga propade, potem izgubi vse svoje naročnike
  • Založnik mora poznati točen naslov vseh svojih naročnikov
  • Založnik lahko preobremeni svoje naročnike z delom, če se podatki objavljajo hitreje kot se obdelujejo
  • Sporočilo se takoj po objavi izbriše iz medpomnilnika založnika, ne glede na to, koliko naročnikom je bilo dostavljeno in kako hitro so lahko to sporočilo obdelali.
  • Vsi naročniki bodo sporočilo prejeli hkrati. Naročniki se morajo sami med seboj nekako dogovoriti o vrstnem redu obdelave istega sporočila.
  • Ni vgrajenega mehanizma za potrditev, da je naročnik uspešno obdelal sporočilo. Če naročnik prejme sporočilo in se med obdelavo zruši, založnik za to ne bo vedel.

Seznam Redis

Seznam Redis je podatkovna struktura, ki podpira blokiranje ukazov za branje. Sporočila lahko dodajate in berete z začetka ali konca seznama. Na podlagi te strukture lahko naredite dober sklad ali čakalno vrsto za svoj porazdeljeni sistem in v večini primerov bo to dovolj. Glavne razlike od Redis Pub/Sub:

  • Sporočilo je dostavljeno eni stranki. Prvi odjemalec z blokiranim branjem bo prvi prejel podatke.
  • Clint mora sam sprožiti operacijo branja za vsako sporočilo. List ne ve ničesar o strankah.
  • Sporočila so shranjena, dokler jih nekdo ne prebere ali izrecno izbriše. Če strežnik Redis konfigurirate za izpiranje podatkov na disk, se zanesljivost sistema dramatično poveča.

Uvod v Stream

Dodajanje vnosa v tok

Ekipa XDODAJ doda nov vnos v tok. Zapis ni samo niz, sestavljen je iz enega ali več parov ključ-vrednost. Tako je vsak vnos že strukturiran in je podoben strukturi datoteke CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

V zgornjem primeru v tok dodamo dve polji z imenom (ključem) "mystream": "sensor-id" in "temperature" z vrednostma "1234" oziroma "19.8". Kot drugi argument vzame ukaz identifikator, ki bo dodeljen vnosu - ta identifikator enolično identificira vsak vnos v toku. Vendar smo v tem primeru posredovali *, ker želimo, da nam Redis ustvari nov ID. Vsaka nova ID se bo povečala. Zato bo imel vsak nov vnos višji identifikator v primerjavi s prejšnjimi vnosi.

Oblika identifikatorja

ID vnosa, ki ga vrne ukaz XDODAJ, je sestavljen iz dveh delov:

{millisecondsTime}-{sequenceNumber}

millisecondsTime — Čas Unix v milisekundah (čas strežnika Redis). Če pa je trenutni čas enak ali krajši od časa prejšnjega posnetka, se uporabi časovni žig prejšnjega posnetka. Torej, če se čas strežnika vrne nazaj v preteklost, bo novi identifikator še vedno ohranil lastnost prirastka.

Zaporedna številka uporablja se za zapise, ustvarjene v isti milisekundi. Zaporedna številka bo povečan za 1 glede na prejšnji vnos. Zaradi Zaporedna številka je velik 64 bitov, potem v praksi ne bi smeli naleteti na omejitev števila zapisov, ki jih je mogoče ustvariti v eni milisekundi.

Oblika takšnih identifikatorjev se lahko na prvi pogled zdi čudna. Nezaupljiv bralec bi se morda vprašal, zakaj je čas del identifikatorja. Razlog je v tem, da tokovi Redis podpirajo poizvedbe obsega po ID-ju. Ker je identifikator povezan s časom, ko je bil zapis ustvarjen, to omogoča poizvedovanje po časovnih razponih. Ko bomo pogledali ukaz, si bomo ogledali poseben primer XRANGE.

Če mora uporabnik iz nekega razloga določiti svoj identifikator, ki je na primer povezan z nekim zunanjim sistemom, ga lahko posredujemo ukazu XDODAJ namesto *, kot je prikazano spodaj:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Upoštevajte, da morate v tem primeru sami spremljati povečanje ID-ja. V našem primeru je najmanjši identifikator "0-1", zato ukaz ne bo sprejel drugega identifikatorja, ki je enak ali manjši od "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Število zapisov na tok

Število zapisov v toku je mogoče dobiti preprosto z uporabo ukaza XLEN. Za naš primer bo ta ukaz vrnil naslednjo vrednost:

> XLEN somestream
(integer) 2

Poizvedbe obsega - XRANGE in XREVRANGE

Če želite zahtevati podatke po obsegu, moramo določiti dva identifikatorja - začetek in konec obsega. Vrnjeni obseg bo vključeval vse elemente, vključno z mejami. Obstajata tudi dva posebna identifikatorja »-« in »+«, ki pomenita najmanjši (prvi zapis) oziroma največji (zadnji zapis) identifikator v toku. V spodnjem primeru bodo navedeni vsi vnosi toka.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Vsak vrnjeni zapis je niz dveh elementov: identifikatorja in seznama parov ključ-vrednost. Rekli smo že, da so identifikatorji zapisa povezani s časom. Zato lahko zahtevamo razpon določenega časovnega obdobja. Vendar pa lahko v zahtevi ne navedemo celotnega identifikatorja, temveč le čas Unix, pri čemer izpustimo del, ki se nanaša na Zaporedna številka. Izpuščeni del identifikatorja bo samodejno nastavljen na nič na začetku obsega in na največjo možno vrednost na koncu obsega. Spodaj je primer, kako lahko zahtevate obseg dveh milisekund.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

V tem obsegu imamo samo en vnos, vendar je v resničnih nizih podatkov lahko vrnjeni rezultat ogromen. Zaradi tega razloga XRANGE podpira možnost COUNT. Z določitvijo količine lahko enostavno dobimo prvih N zapisov. Če moramo dobiti naslednjih N zapisov (paginacija), lahko uporabimo zadnji prejeti ID, ga povečamo Zaporedna številka za eno in vprašajte znova. Poglejmo si to na naslednjem primeru. Začnemo dodajati 10 elementov z XDODAJ (ob predpostavki, da je bil mystream že napolnjen z 10 elementi). Za začetek iteracije, ki dobi 2 elementa na ukaz, začnemo s celotnim obsegom, vendar s COUNT enakim 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Za nadaljevanje ponavljanja z naslednjima dvema elementoma moramo izbrati zadnji prejeti ID, tj. 1519073279157-0, in dodati 1 Zaporedna številka.
Dobljeni ID, v tem primeru 1519073279157-1, lahko zdaj uporabite kot nov začetni argument obsega za naslednji klic XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

In tako naprej. Zaradi kompleksnosti XRANGE je O(log(N)) za iskanje in nato O(M) za vrnitev M elementov, potem je vsak korak ponovitve hiter. Tako z uporabo XRANGE tokove je mogoče učinkovito ponoviti.

Ekipa XREVRANGE je enakovredno XRANGE, vendar vrne elemente v obratnem vrstnem redu:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Upoštevajte, da ukaz XREVRANGE sprejme argumente obsega začetek in konec v obratnem vrstnem redu.

Branje novih vnosov z uporabo XREAD

Pogosto se pojavi naloga, da se naročite na tok in prejemate samo nova sporočila. Ta koncept se morda zdi podoben Redis Pub/Sub ali blokiranju Redis List, vendar obstajajo temeljne razlike pri uporabi Redis Stream:

  1. Vsako novo sporočilo je privzeto dostavljeno vsakemu naročniku. To vedenje se razlikuje od blokiranja seznama Redis, kjer bo novo sporočilo prebral samo en naročnik.
  2. Medtem ko so v programu Redis Pub/Sub vsa sporočila pozabljena in se nikoli ne ohranijo, se v sistemu Stream vsa sporočila hranijo za nedoločen čas (razen če odjemalec izrecno povzroči izbris).
  3. Redis Stream vam omogoča razlikovanje dostopa do sporočil znotraj enega toka. Določen naročnik lahko vidi samo svojo osebno zgodovino sporočil.

Z ukazom se lahko naročite na nit in prejemate nova sporočila XREAD. Je malo bolj zapleteno kot XRANGE, zato bomo najprej začeli s preprostejšimi primeri.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Zgornji primer prikazuje neblokirno obliko XREAD. Upoštevajte, da možnost COUNT ni obvezna. Pravzaprav je edina zahtevana ukazna možnost možnost STREAMS, ki podaja seznam tokov skupaj z ustreznim največjim identifikatorjem. Napisali smo "STREAMS mystream 0" - želimo prejeti vse zapise toka mystream z identifikatorjem, večjim od "0-0". Kot lahko vidite iz primera, ukaz vrne ime niti, ker se lahko naročimo na več niti hkrati. Lahko bi napisali na primer "STREAMS mystream otherstream 0 0". Upoštevajte, da moramo po možnosti STREAMS najprej podati imena vseh zahtevanih tokov in šele nato seznam identifikatorjev.

V tej preprosti obliki ukaz ne naredi nič posebnega v primerjavi z XRANGE. Je pa zanimivo to, da se zlahka obrnemo XREAD na ukaz za blokiranje, ki določa argument BLOK:

> XREAD BLOCK 0 STREAMS mystream $

V zgornjem primeru je podana nova možnost BLOKIRANJE s časovno omejitvijo 0 milisekund (to pomeni čakanje za nedoločen čas). Poleg tega je bil namesto posredovanja običajnega identifikatorja za tok mystream posredovan poseben identifikator $. Ta posebni identifikator pomeni, da XREAD mora kot identifikator uporabiti največji identifikator v mojem toku. Nova sporočila bomo torej prejemali šele od trenutka, ko smo začeli poslušati. Na nek način je to podobno ukazu Unix "tail -f".

Upoštevajte, da pri uporabi možnosti BLOK ni nujno, da uporabimo poseben identifikator $. Uporabimo lahko kateri koli identifikator, ki obstaja v toku. Če lahko ekipa takoj servisira našo zahtevo brez blokade, bo to tudi storila, sicer bo blokirala.

Blokiranje XREAD lahko tudi posluša več niti hkrati, samo določiti morate njihova imena. V tem primeru bo ukaz vrnil zapis prvega toka, ki je prejel podatke. Prvi naročnik, blokiran za dano nit, bo prvi prejel podatke.

Skupine potrošnikov

Pri določenih opravilih želimo naročnikom omejiti dostop do sporočil znotraj ene niti. Primer, kjer bi to lahko bilo koristno, je čakalna vrsta sporočil z delavci, ki bodo prejeli različna sporočila iz niti, kar omogoča prilagajanje obdelave sporočil.

Če si predstavljamo, da imamo tri naročnike C1, C2, C3 in nit, ki vsebuje sporočila 1, 2, 3, 4, 5, 6, 7, bodo sporočila postrežena kot na spodnjem diagramu:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Za dosego tega učinka Redis Stream uporablja koncept, imenovan Consumer Group. Ta koncept je podoben psevdonaročniku, ki prejema podatke iz toka, vendar ga dejansko streže več naročnikov znotraj skupine, kar zagotavlja določena jamstva:

  1. Vsako sporočilo je dostavljeno drugemu naročniku znotraj skupine.
  2. Znotraj skupine so naročniki identificirani z imenom, ki je niz, ki razlikuje med velikimi in malimi črkami. Če naročnik začasno izstopi iz skupine, se lahko vrne v skupino z njegovim edinstvenim imenom.
  3. Vsaka skupina potrošnikov sledi konceptu "prvega neprebranega sporočila". Ko naročnik zahteva nova sporočila, lahko prejme samo sporočila, ki še nikoli niso bila dostavljena nobenemu naročniku v skupini.
  4. Obstaja ukaz za izrecno potrditev, da je naročnik uspešno obdelal sporočilo. Dokler tega ukaza ne pokličete, bo zahtevano sporočilo ostalo v statusu "na čakanju".
  5. V okviru Skupine potrošnikov lahko vsak naročnik zahteva zgodovino sporočil, ki so mu bila dostavljena, a še niso bila obdelana (v statusu “na čakanju”).

V nekem smislu lahko stanje skupine izrazimo na naslednji način:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Zdaj je čas, da se seznanimo z glavnimi ukazi za skupino potrošnikov, in sicer:

  • XGROUP uporablja za ustvarjanje, uničenje in upravljanje skupin
  • XREADGROUP uporablja se za branje toka skozi skupino
  • XACK - ta ukaz omogoča naročniku, da sporočilo označi kot uspešno obdelano

Ustanovitev skupine potrošnikov

Predpostavimo, da mystream že obstaja. Potem bo ukaz za ustvarjanje skupine videti tako:

> XGROUP CREATE mystream mygroup $
OK

Pri ustvarjanju skupine moramo posredovati identifikator, od katerega bo skupina prejemala sporočila. Če želimo samo prejemati vsa nova sporočila, potem lahko uporabimo poseben identifikator $ (kot v našem zgornjem primeru). Če namesto posebnega identifikatorja podate 0, bodo vsa sporočila v niti na voljo skupini.

Zdaj, ko je skupina ustvarjena, lahko takoj začnemo brati sporočila z ukazom XREADGROUP. Ta ukaz je zelo podoben XREAD in podpira izbirno možnost BLOK. Vendar obstaja zahtevana možnost SKUPINA, ki mora biti vedno navedena z dvema argumentoma: imenom skupine in imenom naročnika. Podprta je tudi možnost COUNT.

Pred branjem teme vstavimo nekaj sporočil:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Zdaj pa poskusimo prebrati ta tok skozi skupino:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Zgornji ukaz se dobesedno glasi takole:

"Jaz, naročnik Alice, član moje skupine, želim prebrati eno sporočilo iz svojega toka, ki še nikoli ni bilo dostavljeno nikomur."

Vsakič, ko naročnik izvede operacijo v skupini, mora podati svoje ime, ki se enolično identificira znotraj skupine. V zgornjem ukazu je še ena zelo pomembna podrobnost - posebni identifikator ">". Ta posebni identifikator filtrira sporočila in pusti le tista, ki še nikoli niso bila dostavljena.

Prav tako lahko v posebnih primerih določite pravi identifikator, kot je 0, ali kateri koli drug veljaven identifikator. V tem primeru ukaz XREADGROUP vam bo vrnil zgodovino sporočil s statusom "v teku", ki so bila dostavljena navedenemu naročniku (Alice), vendar še niso bila potrjena z ukazom XACK.

To vedenje lahko preizkusimo tako, da takoj navedemo ID 0, brez možnosti COUNT. Videli bomo eno samo čakajoče sporočilo, to je jabolčno sporočilo:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Če pa potrdimo, da je sporočilo uspešno obdelano, potem ne bo več prikazano:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Zdaj je Bob na vrsti, da nekaj prebere:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, član moje skupine, ni zahteval več kot dveh sporočil. Ukaz poroča samo o nedostavljenih sporočilih zaradi posebnega identifikatorja ">". Kot lahko vidite, sporočilo "jabolko" ne bo prikazano, ker je že bilo dostavljeno Alice, tako da Bob prejme "pomarančo" in "jagodo".

Tako lahko Alice, Bob in kateri koli drugi naročnik skupine berejo različna sporočila iz istega toka. Prav tako lahko preberejo svojo zgodovino neobdelanih sporočil ali označijo sporočila kot obdelana.

Upoštevati je treba nekaj stvari:

  • Takoj ko naročnik meni, da je sporočilo ukaz XREADGROUP, gre to sporočilo v stanje »čakajoče« in je dodeljeno temu določenemu naročniku. Drugi naročniki skupine ne bodo mogli prebrati tega sporočila.
  • Naročniki se samodejno ustvarijo ob prvi omembi, ni jih treba izrecno ustvarjati.
  • Z XREADGROUP lahko berete sporočila iz več različnih niti hkrati, a da bo to delovalo, morate najprej ustvariti skupine z enakim imenom za vsako nit z XGROUP

Okrevanje po neuspehu

Naročnik si lahko opomore od napake in ponovno prebere svoj seznam sporočil s statusom »na čakanju«. Vendar pa lahko v resničnem svetu naročniki na koncu propadejo. Kaj se zgodi z naročnikovimi zagozdenimi sporočili, če se naročnik ne more obnoviti po napaki?
Consumer Group ponuja funkcijo, ki se uporablja prav za takšne primere – ko morate zamenjati lastnika sporočil.

Prva stvar, ki jo morate storiti, je, da pokličete ukaz POTEKA, ki prikaže vsa sporočila v skupini s statusom “na čakanju”. V najpreprostejši obliki se ukaz kliče samo z dvema argumentoma: imenom niti in imenom skupine:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Ekipa je prikazala število neobdelanih sporočil za celotno skupino in za vsakega naročnika. Imamo samo Boba z dvema odprtima sporočiloma, ker je bilo potrjeno edino sporočilo, ki ga je zahtevala Alice XACK.

Zahtevamo lahko več informacij z več argumenti:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - obseg identifikatorjev (lahko uporabite »-« in »+«)
{count} — število poskusov dostave
{consumer-name} - ime skupine

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Zdaj imamo podrobnosti za vsako sporočilo: ID, ime naročnika, čas mirovanja v milisekundah in končno število poskusov dostave. Imamo dve sporočili od Boba, ki sta mirovali 74170458 milisekund, približno 20 ur.

Upoštevajte, da nam nihče ne preprečuje, da bi preprosto z uporabo preverili, kakšna je bila vsebina sporočila XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Samo dvakrat moramo ponoviti isti identifikator v argumentih. Zdaj, ko imamo nekaj zamisli, se lahko Alice odloči, da si po 20 urah izpada Bob verjetno ne bo opomogel, in da je čas, da poizvedujemo po teh sporočilih in nadaljujemo z njihovo obdelavo za Boba. Za to uporabimo ukaz XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

S tem ukazom lahko prejmemo »tuje« sporočilo, ki še ni bilo obdelano s spremembo lastnika v {consumer}. Vendar pa lahko zagotovimo tudi minimalni čas mirovanja {min-idle-time}. To pomaga preprečiti situacijo, ko dve stranki poskušata hkrati spremeniti lastnika istih sporočil:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Prva stranka bo ponastavila izpad in povečala števec dostave. Torej druga stranka tega ne bo mogla zahtevati.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Sporočilo je uspešno prevzela Alice, ki lahko zdaj obdela sporočilo in ga potrdi.

Iz zgornjega primera lahko vidite, da uspešna zahteva vrne vsebino samega sporočila. Vendar to ni potrebno. Možnost JUSTID lahko uporabite samo za vrnitev ID-jev sporočil. To je uporabno, če vas ne zanimajo podrobnosti sporočila in želite povečati zmogljivost sistema.

Števec dostave

Števec, ki ga vidite v izhodu POTEKA je število dostav posameznega sporočila. Tak števec se poveča na dva načina: ko je sporočilo uspešno zahtevano prek XCLAIM ali ko je uporabljen klic XREADGROUP.

Običajno je, da so nekatera sporočila dostavljena večkrat. Glavna stvar je, da so vsa sporočila na koncu obdelana. Včasih pride do težav pri obdelavi sporočila, ker je samo sporočilo poškodovano ali pa obdelava sporočila povzroči napako v kodi za obravnavo. V tem primeru se lahko izkaže, da nihče ne bo mogel obdelati tega sporočila. Ker imamo števec poskusov dostave, lahko s tem števcem zaznamo takšne situacije. Zato bi bilo verjetno pametneje, da bi bilo takšno sporočilo, ko število dostave doseže visoko številko, ki jo določite, v drugo nit in poslalo obvestilo skrbniku sistema.

Stanje niti

Ekipa XINFO uporablja se za zahtevanje različnih informacij o niti in njenih skupinah. Na primer, osnovni ukaz izgleda takole:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Zgornji ukaz prikaže splošne informacije o določenem toku. Zdaj pa malo bolj zapleten primer:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Zgornji ukaz prikaže splošne informacije za vse skupine podane niti

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Zgornji ukaz prikaže informacije za vse naročnike podanega toka in skupine.
Če pozabite sintakso ukaza, preprosto vprašajte sam ukaz za pomoč:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Omejitev velikosti toka

Številne aplikacije ne želijo večno zbirati podatkov v tok. Pogosto je koristno imeti največje dovoljeno število sporočil na nit. V drugih primerih je koristno premakniti vsa sporočila iz niti v drugo trajno shrambo, ko je dosežena določena velikost niti. Velikost toka lahko omejite s parametrom MAXLEN v ukazu XDODAJ:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Pri uporabi MAXLEN se stari zapisi samodejno izbrišejo, ko dosežejo določeno dolžino, tako da ima tok konstantno velikost. Vendar se obrezovanje v tem primeru ne izvede na najučinkovitejši način v pomnilniku Redis. Stanje lahko izboljšate na naslednji način:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ v zgornjem primeru pomeni, da nam ni treba nujno omejiti dolžine toka na določeno vrednost. V našem primeru je to lahko katero koli število, večje ali enako 1000 (na primer 1000, 1010 ali 1030). Pravkar smo izrecno določili, da želimo, da naš tok shrani vsaj 1000 zapisov. Zaradi tega je upravljanje pomnilnika v Redisu veliko bolj učinkovito.

Obstaja tudi ločena ekipa XTRIM, ki naredi isto:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trajno shranjevanje in podvajanje

Redis Stream se asinhrono replicira v podrejena vozlišča in shrani v datoteke, kot sta AOF (posnetek vseh podatkov) in RDB (dnevnik vseh operacij zapisovanja). Podprta je tudi replikacija stanja skupin potrošnikov. Če je torej sporočilo v statusu »čakajoče« na glavnem vozlišču, bo imelo to sporočilo na podrejenih vozliščih enak status.

Odstranjevanje posameznih elementov iz toka

Za brisanje sporočil obstaja poseben ukaz XDEL. Ukaz pridobi ime niti, ki mu sledijo ID-ji sporočil, ki jih je treba izbrisati:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Pri uporabi tega ukaza morate upoštevati, da dejanski pomnilnik ne bo sproščen takoj.

Tokovi ničelne dolžine

Razlika med tokovi in ​​drugimi podatkovnimi strukturami Redis je v tem, da ko druge podatkovne strukture v sebi nimajo več elementov, bo kot stranski učinek sama podatkovna struktura odstranjena iz pomnilnika. Tako bo na primer razvrščeni niz popolnoma odstranjen, ko klic ZREM odstrani zadnji element. Namesto tega lahko niti ostanejo v pomnilniku, tudi če nimajo nobenih elementov v sebi.

Zaključek

Redis Stream je idealen za ustvarjanje posrednikov sporočil, čakalnih vrst sporočil, poenotenega beleženja in klepetalnih sistemov za ohranjanje zgodovine.

Kot sem nekoč rekel Niklaus Wirth, programi so algoritmi in podatkovne strukture, Redis pa vam že ponuja oboje.

Vir: www.habr.com

Dodaj komentar