Redis Stream - pouzdanost i skalabilnost vaših sistema za razmenu poruka

Redis Stream - pouzdanost i skalabilnost vaših sistema za razmenu poruka

Redis Stream je novi apstraktni tip podataka uveden u Redis sa verzijom 5.0
Konceptualno, Redis Stream je lista na koju možete dodati unose. Svaki unos ima jedinstveni identifikator. Podrazumevano, ID se automatski generiše i uključuje vremensku oznaku. Stoga, možete ispitivati ​​opsege zapisa tokom vremena ili primati nove podatke kako stignu u tok, slično kao što Unix naredba "tail -f" čita datoteku dnevnika i zamrzava dok čeka nove podatke. Imajte na umu da više klijenata može slušati nit u isto vrijeme, baš kao što mnogi "tail -f" procesi mogu čitati datoteku istovremeno bez sukoba jedan s drugim.

Da bismo razumjeli sve prednosti novog tipa podataka, pogledajmo na brzinu postojeće Redis strukture koje djelomično repliciraju funkcionalnost Redis Stream-a.

Redis PUB/SUB

Redis Pub/Sub je jednostavan sistem za razmenu poruka koji je već ugrađen u vašu prodavnicu ključ/vrijednost. Međutim, jednostavnost ima cijenu:

  • Ako izdavač iz nekog razloga ne uspije, onda gubi sve svoje pretplatnike
  • Izdavač mora znati tačnu adresu svih svojih pretplatnika
  • Izdavač može preopteretiti svoje pretplatnike poslom ako se podaci objavljuju brže nego što se obrađuju
  • Poruka se briše iz međuspremnika izdavača odmah nakon objavljivanja, bez obzira na to koliko pretplatnika je isporučena i koliko brzo su mogli obraditi ovu poruku.
  • Svi pretplatnici će primiti poruku u isto vrijeme. Sami pretplatnici se moraju nekako međusobno dogovoriti o redoslijedu obrade iste poruke.
  • Ne postoji ugrađeni mehanizam za potvrdu da je pretplatnik uspješno obradio poruku. Ako pretplatnik primi poruku i padne tokom obrade, izdavač neće znati za to.

Redis List

Redis lista je struktura podataka koja podržava blokiranje naredbi za čitanje. Možete dodavati i čitati poruke s početka ili kraja liste. Na osnovu ove strukture, možete napraviti dobar stog ili red za vaš distribuirani sistem, au većini slučajeva to će biti dovoljno. Glavne razlike u odnosu na Redis Pub/Sub:

  • Poruka se isporučuje jednom klijentu. Prvi klijent koji je blokirao čitanje će prvi primiti podatke.
  • Clint mora sam pokrenuti operaciju čitanja za svaku poruku. List ne zna ništa o klijentima.
  • Poruke se pohranjuju sve dok ih neko ne pročita ili eksplicitno ne izbriše. Ako konfigurišete Redis server za izbacivanje podataka na disk, tada se pouzdanost sistema dramatično povećava.

Uvod u Stream

Dodavanje unosa u stream

tim XADD dodaje novi unos u stream. Zapis nije samo niz, on se sastoji od jednog ili više parova ključ/vrijednost. Dakle, svaki unos je već strukturiran i podsjeća na strukturu CSV datoteke.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

U gornjem primjeru dodamo dva polja u stream sa imenom (ključem) “mystream”: “sensor-id” i “temperature” sa vrijednostima “1234” i “19.8”, respektivno. Kao drugi argument, komanda uzima identifikator koji će biti dodijeljen unosu - ovaj identifikator jedinstveno identificira svaki unos u toku. Međutim, u ovom slučaju smo proslijedili * jer želimo da Redis generiše novi ID za nas. Svaki novi ID će se povećavati. Stoga će svaki novi unos imati viši identifikator u odnosu na prethodne unose.

Format identifikatora

ID unosa koji vraća naredba XADD, sastoji se iz dva dela:

{millisecondsTime}-{sequenceNumber}

millisecondsTime — Unix vrijeme u milisekundama (vrijeme Redis servera). Međutim, ako je trenutno vrijeme isto ili manje od vremena prethodnog snimanja, tada se koristi vremenska oznaka prethodnog snimanja. Stoga, ako se vrijeme servera vrati u prošlost, novi identifikator će i dalje zadržati svojstvo inkrementa.

sekvencaNumber koristi se za zapise kreirane u istoj milisekundi. sekvencaNumber će biti povećan za 1 u odnosu na prethodni unos. Zbog sekvencaNumber je veličine 64 bita, onda u praksi ne biste trebali naići na ograničenje broja zapisa koji se mogu generirati unutar jedne milisekundi.

Format takvih identifikatora može izgledati čudno na prvi pogled. Nepovjerljiv čitalac mogao bi se zapitati zašto je vrijeme dio identifikatora. Razlog je taj što Redis tokovi podržavaju upite opsega prema ID-u. Budući da je identifikator povezan s vremenom kada je zapis kreiran, to omogućava ispitivanje vremenskih raspona. Pogledaćemo konkretan primer kada pogledamo naredbu XRANGE.

Ako iz nekog razloga korisnik treba da navede svoj identifikator, koji je, na primer, povezan sa nekim eksternim sistemom, onda ga možemo proslediti naredbi XADD umjesto * kao što je prikazano ispod:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Imajte na umu da u ovom slučaju morate sami pratiti povećanje ID-a. U našem primjeru, minimalni identifikator je "0-1", tako da naredba neće prihvatiti drugi identifikator koji je jednak ili manji od "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Broj zapisa po streamu

Moguće je dobiti broj zapisa u toku jednostavnim korištenjem naredbe XLEN. Za naš primjer, ova naredba će vratiti sljedeću vrijednost:

> XLEN somestream
(integer) 2

Upiti raspona - XRANGE i XREVRANGE

Da bismo zatražili podatke po opsegu, moramo navesti dva identifikatora - početak i kraj raspona. Vraćeni opseg će uključivati ​​sve elemente, uključujući granice. Postoje i dva posebna identifikatora “-” i “+”, što znači najmanji (prvi zapis) i najveći (posljednji zapis) identifikator u toku. Primjer ispod će prikazati sve unose toka.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Svaki vraćeni zapis je niz od dva elementa: identifikatora i liste parova ključ/vrijednost. Već smo rekli da su identifikatori zapisa povezani sa vremenom. Stoga možemo zatražiti raspon određenog vremenskog perioda. Međutim, u zahtjevu možemo navesti ne puni identifikator, već samo Unix vrijeme, izostavljajući dio koji se odnosi na sekvencaNumber. Izostavljeni dio identifikatora će se automatski postaviti na nulu na početku raspona i na maksimalnu moguću vrijednost na kraju raspona. Ispod je primjer kako možete zatražiti raspon od dvije milisekunde.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Imamo samo jedan unos u ovom rasponu, međutim u stvarnim skupovima podataka vraćeni rezultat može biti ogroman. Iz ovog razloga XRANGE podržava opciju COUNT. Određivanjem količine možemo jednostavno dobiti prvih N zapisa. Ako trebamo dobiti sljedećih N zapisa (paginacija), možemo koristiti posljednji primljeni ID, povećati ga sekvencaNumber po jedan i pitaj ponovo. Pogledajmo ovo u sljedećem primjeru. Počinjemo sa dodavanjem 10 elemenata XADD (pod pretpostavkom da je mystream već popunjen sa 10 elemenata). Da bismo započeli iteraciju dobijajući 2 elementa po naredbi, počinjemo s punim rasponom, ali sa COUNT jednakim 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Da bismo nastavili ponavljati sa sljedeća dva elementa, moramo odabrati posljednji primljeni ID, tj. 1519073279157-0, i dodati 1 na sekvencaNumber.
Rezultirajući ID, u ovom slučaju 1519073279157-1, sada se može koristiti kao novi argument početka raspona za sljedeći poziv XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

I tako dalje. Zbog složenosti XRANGE je O(log(N)) za pretraživanje i zatim O(M) za vraćanje M elemenata, tada je svaki korak iteracije brz. Dakle, koristeći XRANGE tokovi se mogu efikasno ponavljati.

tim XREVRANGE je ekvivalent XRANGE, ali vraća elemente obrnutim redoslijedom:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Imajte na umu da je komanda XREVRANGE uzima argumente raspona start i stop obrnutim redoslijedom.

Čitanje novih unosa koristeći XREAD

Često se javlja zadatak pretplate na stream i primanja samo novih poruka. Ovaj koncept može izgledati slično Redis Pub/Sub-u ili blokiranju Redis liste, ali postoje fundamentalne razlike u tome kako koristiti Redis Stream:

  1. Svaka nova poruka se podrazumevano isporučuje svakom pretplatniku. Ovo ponašanje se razlikuje od blokiranja Redis liste, gdje će novu poruku pročitati samo jedan pretplatnik.
  2. Dok se u Redis Pub/Sub sve poruke zaboravljaju i nikada ne traju, u Stream-u se sve poruke zadržavaju neograničeno (osim ako klijent eksplicitno ne uzrokuje brisanje).
  3. Redis Stream vam omogućava da razlikujete pristup porukama unutar jednog toka. Određeni pretplatnik može vidjeti samo svoju ličnu historiju poruka.

Možete se pretplatiti na nit i primati nove poruke koristeći naredbu XREAD. Malo je komplikovanije od XRANGE, pa ćemo prvo početi s jednostavnijim primjerima.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Gornji primjer prikazuje obrazac koji ne blokira XREAD. Imajte na umu da je opcija COUNT opciona. U stvari, jedina potrebna opcija komande je STREAMS opcija, koja specificira listu tokova zajedno sa odgovarajućim maksimalnim identifikatorom. Napisali smo “STREAMS mystream 0” - želimo primiti sve zapise mystream toka sa identifikatorom većim od “0-0”. Kao što možete vidjeti iz primjera, naredba vraća ime niti jer se možemo pretplatiti na više niti u isto vrijeme. Mogli bismo napisati, na primjer, "STREAMS mystream otherstream 0 0". Imajte na umu da nakon opcije STREAMS prvo moramo dati nazive svih potrebnih tokova, a tek onda listu identifikatora.

U ovom jednostavnom obliku komanda ne radi ništa posebno u poređenju sa XRANGE. Međutim, zanimljivo je da se lako možemo okrenuti XREAD na komandu za blokiranje, navodeći argument BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

U gornjem primjeru, nova opcija BLOCK je specificirana s timeoutom od 0 milisekundi (to znači da se čeka neograničeno). Štaviše, umjesto prosljeđivanja uobičajenog identifikatora za stream mystream, proslijeđen je poseban identifikator $. Ovaj poseban identifikator to znači XREAD mora koristiti maksimalni identifikator u mystreamu kao identifikator. Dakle, nove poruke ćemo primati tek od trenutka kada počnemo da slušamo. Na neki način ovo je slično Unix komandi "tail -f".

Imajte na umu da kada koristimo opciju BLOK, ne moramo nužno koristiti poseban identifikator $. Možemo koristiti bilo koji identifikator koji postoji u toku. Ako tim može odmah da servisira naš zahtjev bez blokiranja, to će učiniti, inače će blokirati.

Blokiranje XREAD također možete slušati više niti odjednom, samo trebate navesti njihova imena. U ovom slučaju, komanda će vratiti zapis prvog toka koji je primio podatke. Prvi blokiran pretplatnik za datu nit će prvi primiti podatke.

Grupe potrošača

U određenim zadacima želimo ograničiti pristup pretplatnika porukama unutar jedne niti. Primjer gdje bi ovo moglo biti korisno je red poruka s radnicima koji će primati različite poruke iz niti, omogućavajući obradu poruka da se skalira.

Ako zamislimo da imamo tri pretplatnika C1, C2, C3 i nit koja sadrži poruke 1, 2, 3, 4, 5, 6, 7, tada će poruke biti servirane kao na donjem dijagramu:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Da bi postigao ovaj efekat, Redis Stream koristi koncept koji se zove Consumer Group. Ovaj koncept je sličan pseudo-pretplatniku, koji prima podatke iz streama, ali ga zapravo opslužuje više pretplatnika unutar grupe, pružajući određene garancije:

  1. Svaka poruka se dostavlja drugom pretplatniku unutar grupe.
  2. Unutar grupe, pretplatnici se identifikuju svojim imenom, koje je niz koji razlikuje velika i mala slova. Ako pretplatnik privremeno napusti grupu, može se vratiti u grupu koristeći svoje jedinstveno ime.
  3. Svaka grupa potrošača slijedi koncept „prve nepročitane poruke“. Kada pretplatnik zatraži nove poruke, može primiti samo poruke koje nikada ranije nisu bile isporučene nijednom pretplatniku unutar grupe.
  4. Postoji naredba za eksplicitnu potvrdu da je pretplatnik uspješno obradio poruku. Dok se ova komanda ne pozove, tražena poruka će ostati u statusu "na čekanju".
  5. Unutar Grupe potrošača svaki pretplatnik može zatražiti historiju poruka koje su mu dostavljene, ali još nisu obrađene (u statusu „na čekanju“)

U određenom smislu, stanje grupe se može izraziti na sljedeći način:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Sada je vrijeme da se upoznate sa glavnim komandama za Grupu potrošača, a to su:

  • XGROUP koristi se za stvaranje, uništavanje i upravljanje grupama
  • XREADGROUP koristi se za čitanje toka kroz grupu
  • XACK - ova komanda omogućava pretplatniku da označi poruku kao uspješno obrađenu

Stvaranje grupe potrošača

Pretpostavimo da mystream već postoji. Tada će komanda za kreiranje grupe izgledati ovako:

> XGROUP CREATE mystream mygroup $
OK

Prilikom kreiranja grupe moramo proslijediti identifikator od kojeg će grupa primati poruke. Ako samo želimo primati sve nove poruke, onda možemo koristiti poseban identifikator $ (kao u našem primjeru iznad). Ako navedete 0 umjesto posebnog identifikatora, tada će sve poruke u niti biti dostupne grupi.

Sada kada je grupa kreirana, možemo odmah početi čitati poruke pomoću naredbe XREADGROUP. Ova naredba je vrlo slična XREAD i podržava opcionu opciju BLOK. Međutim, postoji obavezna opcija GROUP koja se uvijek mora specificirati s dva argumenta: imenom grupe i imenom pretplatnika. Podržana je i opcija COUNT.

Prije čitanja teme, stavimo nekoliko poruka tamo:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Pokušajmo sada pročitati ovaj stream kroz grupu:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Gornja naredba doslovno glasi kako slijedi:

“Ja, pretplatnica Alice, članica moje grupe, želim da pročitam jednu poruku sa mystreama koja nikada prije nikome nije dostavljena.”

Svaki put kada pretplatnik izvrši operaciju na grupi, mora dati svoje ime, jedinstveno identificirajući se unutar grupe. Postoji još jedan vrlo važan detalj u gornjoj naredbi - poseban identifikator ">". Ovaj specijalni identifikator filtrira poruke, ostavljajući samo one koje nikada ranije nisu bile isporučene.

Također, u posebnim slučajevima, možete specificirati stvarni identifikator kao što je 0 ili bilo koji drugi važeći identifikator. U ovom slučaju komanda XREADGROUP vratit će vam historiju poruka sa statusom "na čekanju" koje su isporučene navedenom pretplatniku (Alice) ali još nisu potvrđene pomoću naredbe XACK.

Ovo ponašanje možemo testirati tako što ćemo odmah navesti ID 0, bez opcije COUNT. Jednostavno ćemo vidjeti jednu poruku na čekanju, odnosno poruku Apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Međutim, ako potvrdimo da je poruka uspješno obrađena, ona se više neće prikazivati:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Sada je red na Boba da pročita nešto:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, član moje grupe, tražio je najviše dvije poruke. Komanda izvještava samo o neisporučenim porukama zbog posebnog identifikatora ">". Kao što vidite, poruka "jabuka" neće biti prikazana jer je već dostavljena Alisi, pa Bob dobija "narandžastu" i "jagodu".

Na ovaj način, Alice, Bob i bilo koji drugi pretplatnik grupe mogu čitati različite poruke iz istog toka. Oni također mogu čitati svoju historiju neobrađenih poruka ili označiti poruke kao obrađene.

Treba imati na umu nekoliko stvari:

  • Čim pretplatnik smatra da je poruka naredba XREADGROUP, ova poruka prelazi u stanje "na čekanju" i dodjeljuje se tom određenom pretplatniku. Drugi pretplatnici grupe neće moći pročitati ovu poruku.
  • Pretplatnici se automatski kreiraju nakon prvog spominjanja, nema potrebe da ih eksplicitno kreirate.
  • Uz pomoć XREADGROUP možete čitati poruke iz više različitih niti u isto vrijeme, međutim da bi ovo funkcioniralo morate prvo kreirati grupe s istim imenom za svaku nit koristeći XGROUP

Oporavak nakon greške

Pretplatnik se može oporaviti od greške i ponovo pročitati svoju listu poruka sa statusom „na čekanju“. Međutim, u stvarnom svijetu, pretplatnici na kraju mogu propasti. Šta se dešava sa zaglavljenim porukama pretplatnika ako se pretplatnik ne može oporaviti od greške?
Consumer Group nudi funkciju koja se koristi upravo za takve slučajeve - kada trebate promijeniti vlasnika poruka.

Prva stvar koju treba da uradite je da pozovete komandu XPENDING, koji prikazuje sve poruke u grupi sa statusom „na čekanju“. U svom najjednostavnijem obliku, naredba se poziva sa samo dva argumenta: imenom niti i imenom grupe:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tim je prikazao broj neobrađenih poruka za cijelu grupu i za svakog pretplatnika. Imamo samo Boba sa dvije neriješene poruke jer je jedina poruka koju je Alice tražila potvrđena XACK.

Možemo tražiti više informacija koristeći više argumenata:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - raspon identifikatora (možete koristiti “-” i “+”)
{count} — broj pokušaja isporuke
{consumer-name} - naziv grupe

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Sada imamo detalje za svaku poruku: ID, ime pretplatnika, vrijeme mirovanja u milisekundama i na kraju broj pokušaja isporuke. Imamo dvije poruke od Boba i neaktivne su 74170458 milisekundi, oko 20 sati.

Napominjemo da nas niko ne brani da jednostavno korištenjem provjerimo kakav je sadržaj poruke XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Moramo samo dvaput ponoviti isti identifikator u argumentima. Sada kada imamo neku ideju, Alice bi mogla odlučiti da se nakon 20 sati zastoja Bob vjerovatno neće oporaviti i da je vrijeme da upitamo te poruke i nastavimo njihovu obradu za Boba. Za ovo koristimo naredbu XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Koristeći ovu naredbu, možemo primiti “stranu” poruku koja još nije obrađena promjenom vlasnika u {consumer}. Međutim, također možemo osigurati minimalno vrijeme mirovanja {min-idle-time}. Ovo pomaže da se izbjegne situacija u kojoj dva klijenta pokušavaju istovremeno promijeniti vlasnika istih poruka:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Prvi kupac će poništiti vrijeme zastoja i povećati brojač isporuke. Dakle, drugi klijent to neće moći zatražiti.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Poruku je uspješno preuzela Alice, koja sada može obraditi poruku i potvrditi je.

Iz gornjeg primjera možete vidjeti da uspješan zahtjev vraća sadržaj same poruke. Međutim, to nije neophodno. Opcija JUSTID se može koristiti samo za vraćanje ID-ova poruka. Ovo je korisno ako vas ne zanimaju detalji poruke i želite povećati performanse sistema.

Brojač dostave

Brojač koji vidite na izlazu XPENDING je broj isporuka svake poruke. Takav brojač se povećava na dva načina: kada je poruka uspješno zatražena putem XCLAIM ili kada se koristi poziv XREADGROUP.

Normalno je da se neke poruke isporučuju više puta. Glavna stvar je da se sve poruke na kraju obrađuju. Ponekad se javljaju problemi prilikom obrade poruke jer je sama poruka oštećena ili obrada poruke uzrokuje grešku u kodu rukovaoca. U ovom slučaju može se ispostaviti da niko neće moći obraditi ovu poruku. Pošto imamo brojač pokušaja isporuke, možemo koristiti ovaj brojač za otkrivanje takvih situacija. Stoga, kada broj isporuke dostigne visoki broj koji ste naveli, vjerovatno bi bilo mudrije staviti takvu poruku na drugu nit i poslati obavještenje administratoru sistema.

Thread State

tim XINFO koristi se za traženje različitih informacija o niti i njenim grupama. Na primjer, osnovna naredba izgleda ovako:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Gornja naredba prikazuje opće informacije o navedenom toku. Sada malo složeniji primjer:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Gornja naredba prikazuje opće informacije za sve grupe navedene niti

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Gornja naredba prikazuje informacije za sve pretplatnike navedenog toka i grupe.
Ako zaboravite sintaksu komande, samo zatražite pomoć od same komande:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Ograničenje veličine toka

Mnoge aplikacije ne žele zauvijek prikupljati podatke u stream. Često je korisno imati maksimalan dozvoljen broj poruka po niti. U drugim slučajevima, korisno je premjestiti sve poruke iz niti u drugu trajnu pohranu kada se dostigne navedena veličina niti. Možete ograničiti veličinu toka koristeći MAXLEN parametar u naredbi XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kada koristite MAXLEN, stari zapisi se automatski brišu kada dostignu određenu dužinu, tako da tok ima konstantnu veličinu. Međutim, obrezivanje se u ovom slučaju ne odvija na najefikasniji način u Redis memoriji. Situaciju možete poboljšati na sljedeći način:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ u gornjem primjeru znači da ne moramo nužno ograničiti dužinu toka na određenu vrijednost. U našem primjeru, to može biti bilo koji broj veći ili jednak 1000 (na primjer, 1000, 1010 ili 1030). Samo smo eksplicitno naveli da želimo da naš tok pohranjuje najmanje 1000 zapisa. Ovo čini upravljanje memorijom mnogo efikasnijim unutar Redis-a.

Postoji i zaseban tim XTRIM, koji radi istu stvar:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trajna pohrana i replikacija

Redis Stream se asinhrono replicira na podređene čvorove i čuva u fajlovima kao što su AOF (snimka svih podataka) i RDB (log svih operacija pisanja). Podržana je i replikacija stanja grupa potrošača. Stoga, ako je poruka u statusu "na čekanju" na glavnom čvoru, tada će na slave čvorovima ova poruka imati isti status.

Uklanjanje pojedinačnih elemenata iz toka

Postoji posebna komanda za brisanje poruka XDEL. Naredba dobiva ime niti nakon čega slijede ID-ovi poruka koje treba izbrisati:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kada koristite ovu naredbu, morate uzeti u obzir da se stvarna memorija neće odmah osloboditi.

Tokovi nulte dužine

Razlika između tokova i drugih Redis struktura podataka je u tome što kada druge strukture podataka više nemaju elemente u sebi, kao nuspojava, sama struktura podataka će biti uklonjena iz memorije. Tako će, na primjer, sortirani skup biti potpuno uklonjen kada ZREM poziv ukloni posljednji element. Umjesto toga, nitima je dozvoljeno da ostanu u memoriji čak i bez ikakvih elemenata unutra.

zaključak

Redis Stream je idealan za kreiranje brokera poruka, redova poruka, objedinjene evidencije i sistema za ćaskanje za čuvanje istorije.

Kao što sam jednom rekao Niklaus Wirth, programi su algoritmi plus strukture podataka, a Redis vam već daje oboje.

izvor: www.habr.com

Dodajte komentar