Redis Stream - pouzdanost i skalabilnost vaših sustava za razmjenu poruka

Redis Stream - pouzdanost i skalabilnost vaših sustava za razmjenu poruka

Redis Stream novi je apstraktni tip podataka uveden u Redis s verzijom 5.0
Konceptualno, Redis Stream je popis na koji možete dodavati unose. Svaki unos ima jedinstveni identifikator. Prema zadanim postavkama, ID se automatski generira i uključuje vremensku oznaku. Stoga možete postavljati upite o rasponima zapisa tijekom vremena ili primati nove podatke kako stignu u tok, slično kao što naredba Unix "tail -f" čita datoteku dnevnika i zamrzava se dok čeka nove podatke. Imajte na umu da više klijenata može slušati nit u isto vrijeme, baš kao što mnogi "tail -f" procesi mogu čitati datoteku istovremeno bez međusobnog sukoba.

Kako bismo razumjeli sve prednosti novog tipa podataka, bacimo brzi pogled na dugo postojeće Redis strukture koje djelomično kopiraju funkcionalnost Redis Streama.

Redis PUB/SUB

Redis Pub/Sub jednostavan je sustav za razmjenu poruka već ugrađen u vašu pohranu ključeva i vrijednosti. Međutim, jednostavnost ima svoju cijenu:

  • Ako izdavač iz nekog razloga propadne, onda gubi sve svoje pretplatnike
  • Izdavač mora znati točnu adresu svih svojih pretplatnika
  • Izdavač može preopteretiti svoje pretplatnike poslom ako se podaci objavljuju brže nego što se obrađuju
  • Poruka se briše iz međuspremnika izdavača odmah nakon objave, bez obzira na to koliko je pretplatnika isporučena i koliko brzo su uspjeli obraditi ovu poruku.
  • Svi pretplatnici će primiti poruku u isto vrijeme. Pretplatnici se sami moraju nekako međusobno dogovoriti o redoslijedu obrade iste poruke.
  • Ne postoji ugrađeni mehanizam za potvrdu da je pretplatnik uspješno obradio poruku. Ako pretplatnik primi poruku i sruši se tijekom obrade, izdavač neće znati za to.

Redis popis

Redis List je struktura podataka koja podržava blokiranje naredbi za čitanje. Poruke možete dodavati i čitati s početka ili s kraja popisa. Na temelju ove strukture možete napraviti dobar stog ili red čekanja za svoj distribuirani sustav, au većini slučajeva to će biti dovoljno. Glavne razlike u odnosu na Redis Pub/Sub:

  • Poruka se dostavlja jednom klijentu. Prvi klijent s blokiranim čitanjem prvi će primiti podatke.
  • Clint mora sam pokrenuti operaciju čitanja svake poruke. List ne zna ništa o klijentima.
  • Poruke se pohranjuju dok ih netko ne pročita ili izričito izbriše. Ako konfigurirate Redis poslužitelj za ispiranje podataka na disk, tada se pouzdanost sustava dramatično povećava.

Uvod u Stream

Dodavanje unosa u tok

Momčad XADD dodaje novi unos u tok. Zapis nije samo niz, on se sastoji od jednog ili više parova ključ-vrijednost. Stoga je svaki unos već strukturiran i nalikuje strukturi CSV datoteke.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

U gornjem primjeru dodajemo dva polja u stream s nazivom (ključem) "mystream": "sensor-id" i "temperature" s vrijednostima "1234" i "19.8", respektivno. Kao drugi argument, naredba uzima identifikator koji će biti dodijeljen unosu - ovaj identifikator jedinstveno identificira svaki unos u toku. Međutim, u ovom smo slučaju prošli * jer želimo da nam Redis generira novi ID. Svaki novi ID će se povećati. Stoga će svaki novi unos imati viši identifikator u odnosu na prethodne unose.

Format identifikatora

ID unosa koji vraća naredba XADD, sastoji se od dva dijela:

{millisecondsTime}-{sequenceNumber}

milisecondsTime — Unix vrijeme u milisekundama (vrijeme Redis poslužitelja). Međutim, ako je trenutno vrijeme isto ili manje od vremena prethodnog snimanja, koristi se vremenska oznaka prethodnog snimanja. Stoga, ako se vrijeme poslužitelja vrati u prošlost, novi identifikator će i dalje zadržati svojstvo povećanja.

redni broj koristi se za zapise stvorene u istoj milisekundi. redni broj će se povećati za 1 u odnosu na prethodni unos. Jer redni broj je veličine 64 bita, onda u praksi ne biste trebali naići na ograničenje broja zapisa koji se mogu generirati unutar jedne milisekunde.

Format takvih identifikatora može se na prvi pogled činiti čudnim. Nepovjerljivi čitatelj mogao bi se zapitati zašto je vrijeme dio identifikatora. Razlog je taj što Redis streamovi podržavaju upite raspona prema ID-u. Budući da je identifikator povezan s vremenom kada je zapis kreiran, to omogućuje postavljanje upita o vremenskim rasponima. Pogledat ćemo konkretan primjer kada budemo gledali naredbu XRANGE.

Ako iz nekog razloga korisnik treba navesti vlastiti identifikator, koji je, na primjer, povezan s nekim vanjskim sustavom, tada ga možemo proslijediti naredbi XADD umjesto * kao što je prikazano u nastavku:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Imajte na umu da u ovom slučaju morate sami pratiti povećanje ID-a. U našem primjeru, minimalni identifikator je "0-1", tako da naredba neće prihvatiti drugi identifikator koji je jednak ili manji od "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Broj zapisa po toku

Moguće je dobiti broj zapisa u toku jednostavnim korištenjem naredbe XLEN. Za naš primjer, ova naredba će vratiti sljedeću vrijednost:

> XLEN somestream
(integer) 2

Raspon upita - XRANGE i XREVRANGE

Da bismo zatražili podatke po rasponu, moramo navesti dva identifikatora - početak i kraj raspona. Vraćeni raspon uključivat će sve elemente, uključujući granice. Postoje i dva posebna identifikatora "-" i "+", što znači najmanji (prvi zapis) i najveći (posljednji zapis) identifikator u toku. U primjeru u nastavku bit će navedeni svi unosi toka.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Svaki vraćeni zapis je polje od dva elementa: identifikator i popis parova ključ-vrijednost. Već smo rekli da su identifikatori zapisa povezani s vremenom. Stoga možemo zatražiti raspon određenog vremenskog razdoblja. Međutim, u zahtjevu možemo navesti ne puni identifikator, već samo Unix vrijeme, izostavljajući dio koji se odnosi na redni broj. Izostavljeni dio identifikatora automatski će se postaviti na nulu na početku raspona i na najveću moguću vrijednost na kraju raspona. Dolje je primjer kako možete zatražiti raspon od dvije milisekunde.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Imamo samo jedan unos u ovom rasponu, međutim u stvarnim skupovima podataka vraćeni rezultat može biti ogroman. Zbog ovog razloga XRANGE podržava opciju COUNT. Određivanjem količine možemo jednostavno dobiti prvih N zapisa. Ako trebamo dobiti sljedećih N zapisa (paginacija), možemo koristiti zadnji primljeni ID, povećati ga redni broj po jedan i pitaj ponovno. Pogledajmo to na sljedećem primjeru. Počinjemo dodavati 10 elemenata s XADD (pod pretpostavkom da je moj tok već ispunjen s 10 elemenata). Da bismo započeli iteraciju dobivajući 2 elementa po naredbi, počinjemo s punim rasponom, ali s COUNT jednakim 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Za nastavak ponavljanja sa sljedeća dva elementa, moramo odabrati zadnji primljeni ID, tj. 1519073279157-0, i dodati 1 na redni broj.
Rezultirajući ID, u ovom slučaju 1519073279157-1, sada se može koristiti kao novi argument početka raspona za sljedeći poziv XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

I tako dalje. Zbog složenosti XRANGE je O(log(N)) za pretraživanje i zatim O(M) za vraćanje M elemenata, tada je svaki korak iteracije brz. Dakle, koristeći XRANGE tokovi se mogu učinkovito ponavljati.

Momčad XREVRANGE je ekvivalent XRANGE, ali vraća elemente obrnutim redoslijedom:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Imajte na umu da naredba XREVRANGE uzima argumente raspona start i stop obrnutim redoslijedom.

Čitanje novih unosa pomoću XREAD-a

Često se postavlja zadatak pretplate na stream i primanja samo novih poruka. Ovaj koncept se može činiti sličnim Redis Pub/Sub ili blokiranju Redis List, ali postoje temeljne razlike u tome kako koristiti Redis Stream:

  1. Svaka nova poruka prema zadanim postavkama isporučuje se svakom pretplatniku. Ovo se ponašanje razlikuje od blokirajućeg Redis popisa, gdje će novu poruku pročitati samo jedan pretplatnik.
  2. Dok se u Redis Pub/Sub sve poruke zaboravljaju i nikad se ne zadržavaju, u Streamu se sve poruke zadržavaju na neodređeno vrijeme (osim ako klijent izričito ne uzrokuje brisanje).
  3. Redis Stream vam omogućuje da razlikujete pristup porukama unutar jednog toka. Određeni pretplatnik može vidjeti samo svoju osobnu povijest poruka.

Možete se pretplatiti na nit i primati nove poruke pomoću naredbe XREAD. Malo je kompliciranije od XRANGE, pa ćemo prvo početi s jednostavnijim primjerima.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Gornji primjer prikazuje obrazac koji ne blokira XREAD. Imajte na umu da opcija COUNT nije obavezna. Zapravo, jedina potrebna opcija naredbe je STREAMS opcija, koja navodi popis tokova zajedno s odgovarajućim maksimalnim identifikatorom. Napisali smo “STREAMS mystream 0” - želimo primati sve zapise mystream streama s identifikatorom većim od “0-0”. Kao što možete vidjeti iz primjera, naredba vraća naziv niti jer se možemo pretplatiti na više niti istovremeno. Mogli bismo napisati, na primjer, "STREAMS mystream otherstream 0 0". Imajte na umu da nakon opcije STREAMS prvo trebamo dati nazive svih potrebnih tokova, a tek onda popis identifikatora.

U ovom jednostavnom obliku naredba ne čini ništa posebno u usporedbi s XRANGE. Međutim, zanimljivo je da se lako možemo okrenuti XREAD na naredbu za blokiranje, navodeći argument BLOK:

> XREAD BLOCK 0 STREAMS mystream $

U gornjem primjeru navedena je nova opcija BLOK s vremenskim ograničenjem od 0 milisekundi (ovo znači čekanje na neodređeno vrijeme). Štoviše, umjesto prosljeđivanja uobičajenog identifikatora za stream mystream, proslijeđen je poseban identifikator $. Ovaj posebni identifikator znači da XREAD mora koristiti maksimalni identifikator u mystreamu kao identifikator. Dakle, nove poruke ćemo primati samo od trenutka kada smo počeli slušati. Na neki način ovo je slično Unix naredbi "tail -f".

Imajte na umu da kada koristimo opciju BLOK ne moramo nužno koristiti poseban identifikator $. Možemo koristiti bilo koji identifikator koji postoji u streamu. Ako tim može odmah uslužiti naš zahtjev bez blokiranja, to će učiniti, inače će blokirati.

Blokiranje XREAD također može slušati više niti odjednom, samo trebate navesti njihova imena. U ovom slučaju, naredba će vratiti zapis prvog toka koji je primio podatke. Prvi pretplatnik blokiran za određenu nit će prvi primiti podatke.

Skupine potrošača

U određenim zadacima želimo ograničiti pretplatnički pristup porukama unutar jedne niti. Primjer gdje bi ovo moglo biti korisno je red poruka s radnicima koji će primati različite poruke iz niti, dopuštajući skaliranje obrade poruka.

Ako zamislimo da imamo tri pretplatnika C1, C2, C3 i nit koja sadrži poruke 1, 2, 3, 4, 5, 6, 7, tada će se poruke posluživati ​​kao na dijagramu ispod:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Kako bi postigao ovaj učinak, Redis Stream koristi koncept pod nazivom Consumer Group. Ovaj koncept je sličan pseudo-pretplatniku, koji prima podatke iz streama, ali ga zapravo opslužuje više pretplatnika unutar grupe, pružajući određena jamstva:

  1. Svaka poruka isporučuje se drugom pretplatniku unutar grupe.
  2. Unutar grupe, pretplatnici se identificiraju svojim imenom, koje je niz koji razlikuje velika i mala slova. Ako pretplatnik privremeno ispadne iz grupe, može se vratiti u grupu koristeći svoje jedinstveno ime.
  3. Svaka grupa potrošača slijedi koncept "prve nepročitane poruke". Kada pretplatnik zatraži nove poruke, može primiti samo poruke koje nikada ranije nisu bile isporučene nijednom pretplatniku unutar grupe.
  4. Postoji naredba za eksplicitnu potvrdu da je pretplatnik uspješno obradio poruku. Sve dok se ova naredba ne pozove, tražena poruka ostat će u statusu "na čekanju".
  5. Unutar Grupe potrošača svaki pretplatnik može zatražiti povijest poruka koje su mu isporučene, a još nisu obrađene (u statusu “na čekanju”)

U određenom smislu, stanje grupe može se izraziti na sljedeći način:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Sada je vrijeme da se upoznate s glavnim naredbama za Consumer Group, naime:

  • XGROUP koristi za stvaranje, uništavanje i upravljanje grupama
  • XREADGROUP koristi se za čitanje toka kroz grupu
  • XACK - ova naredba omogućuje pretplatniku da označi poruku kao uspješno obrađenu

Stvaranje grupe potrošača

Pretpostavimo da mystream već postoji. Tada će naredba za stvaranje grupe izgledati ovako:

> XGROUP CREATE mystream mygroup $
OK

Prilikom kreiranja grupe, moramo proslijediti identifikator, počevši od kojeg će grupa primati poruke. Ako samo želimo primati sve nove poruke, tada možemo koristiti poseban identifikator $ (kao u našem primjeru iznad). Ako navedete 0 umjesto posebnog identifikatora, tada će sve poruke u niti biti dostupne grupi.

Sada kada je grupa stvorena, možemo odmah početi čitati poruke pomoću naredbe XREADGROUP. Ova je naredba vrlo slična XREAD i podržava izbornu opciju BLOK. Međutim, postoji obavezna opcija GROUP koja se uvijek mora navesti s dva argumenta: nazivom grupe i imenom pretplatnika. Podržana je i opcija COUNT.

Prije čitanja teme, stavimo neke poruke tamo:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Pokušajmo sada čitati ovaj stream kroz grupu:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Gornja naredba doslovce glasi na sljedeći način:

"Ja, pretplatnik Alice, član moje grupe, želim pročitati jednu poruku iz svog streama koja nikada prije nije nikome isporučena."

Svaki put kada pretplatnik izvrši operaciju na grupi, mora dati svoje ime, jedinstveno se identificirajući unutar grupe. Postoji još jedan vrlo važan detalj u gornjoj naredbi - poseban identifikator ">". Ovaj posebni identifikator filtrira poruke, ostavljajući samo one koje nikad prije nisu isporučene.

Također, u posebnim slučajevima, možete navesti pravi identifikator kao što je 0 ili bilo koji drugi važeći identifikator. U ovom slučaju naredba XREADGROUP će vam vratiti povijest poruka sa statusom "na čekanju" koje su isporučene navedenom pretplatniku (Alice), ali još nisu potvrđene korištenjem naredbe XACK.

Ovo ponašanje možemo testirati tako da odmah navedemo ID 0, bez opcije TOČKA. Jednostavno ćemo vidjeti jednu poruku na čekanju, to jest poruku jabuke:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Međutim, ako potvrdimo da je poruka uspješno obrađena, više se neće prikazivati:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Sada je red na Boba da pročita nešto:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, član moje grupe, nije tražio više od dvije poruke. Naredba prijavljuje samo neisporučene poruke zbog posebnog identifikatora ">". Kao što vidite, poruka "jabuka" neće biti prikazana jer je već isporučena Alisi, tako da Bob prima "naranču" i "jagodu".

Na taj način Alice, Bob i bilo koji drugi pretplatnik grupe mogu čitati različite poruke iz istog toka. Također mogu pročitati svoju povijest neobrađenih poruka ili označiti poruke kao obrađene.

Imajte na umu nekoliko stvari:

  • Čim pretplatnik poruku smatra naredbom XREADGROUP, ova poruka prelazi u stanje "na čekanju" i dodjeljuje se tom određenom pretplatniku. Ostali pretplatnici grupe neće moći pročitati ovu poruku.
  • Pretplatnici se automatski stvaraju nakon prvog spomena, nema potrebe da ih izričito kreirate.
  • S XREADGROUP možete čitati poruke iz više različitih niti u isto vrijeme, ali da bi ovo radilo morate prvo stvoriti grupe s istim imenom za svaku nit pomoću XGROUP

Oporavak nakon neuspjeha

Pretplatnik se može oporaviti od kvara i ponovno pročitati svoj popis poruka sa statusom "na čekanju". Međutim, u stvarnom svijetu, pretplatnici mogu u konačnici propasti. Što se događa sa zaglavljenim porukama pretplatnika ako se pretplatnik ne može oporaviti od kvara?
Consumer Group nudi značajku koja se koristi upravo za takve slučajeve – kada trebate promijeniti vlasnika poruka.

Prvo što trebate učiniti je pozvati naredbu POTREBA, koji prikazuje sve poruke u grupi sa statusom “na čekanju”. U svom najjednostavnijem obliku, naredba se poziva sa samo dva argumenta: nazivom niti i nazivom grupe:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tim je prikazao broj neobrađenih poruka za cijelu grupu i za svakog pretplatnika. Imamo samo Boba s dvije otvorene poruke jer je jedina poruka koju je Alice zatražila potvrđena XACK.

Možemo zatražiti više informacija koristeći više argumenata:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - raspon identifikatora (možete koristiti “-” i “+”)
{count} — broj pokušaja isporuke
{consumer-name} - naziv grupe

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Sada imamo detalje za svaku poruku: ID, ime pretplatnika, vrijeme mirovanja u milisekundama i na kraju broj pokušaja isporuke. Imamo dvije poruke od Boba i neaktivne su 74170458 milisekundi, oko 20 sati.

Napominjemo da nas nitko ne sprječava da provjerimo sadržaj poruke jednostavnim korištenjem XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Samo moramo ponoviti isti identifikator dva puta u argumentima. Sada kada imamo neku ideju, Alice bi mogla odlučiti da se Bob vjerojatno neće oporaviti nakon 20 sati pauze i da je vrijeme da se potraže te poruke i nastavi njihova obrada za Boba. Za to koristimo naredbu XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Pomoću ove naredbe možemo primiti "stranu" poruku koja još nije obrađena promjenom vlasnika u {consumer}. Međutim, također možemo osigurati minimalno vrijeme mirovanja {min-idle-time}. Ovo pomaže u izbjegavanju situacije u kojoj dva klijenta pokušavaju istovremeno promijeniti vlasnika istih poruka:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Prvi kupac će resetirati vrijeme prekida i povećati brojač isporuke. Dakle, drugi klijent to neće moći zatražiti.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Poruku je uspješno preuzela Alice, koja sada može obraditi poruku i potvrditi je.

Iz gornjeg primjera možete vidjeti da uspješan zahtjev vraća sadržaj same poruke. Međutim, to nije potrebno. Opcija JUSTID može se koristiti samo za vraćanje ID-ova poruka. Ovo je korisno ako vas ne zanimaju detalji poruke i želite povećati performanse sustava.

Šalter za dostavu

Brojač koji vidite u izlazu POTREBA je broj isporuka svake poruke. Takav brojač se povećava na dva načina: kada je poruka uspješno zatražena putem XCLAIM ili kada se koristi poziv XREADGROUP.

Normalno je da se neke poruke isporučuju više puta. Glavno je da se sve poruke na kraju obrade. Ponekad dolazi do problema prilikom obrade poruke jer je sama poruka oštećena ili obrada poruke uzrokuje pogrešku u kodu rukovatelja. U tom slučaju može se ispostaviti da nitko neće moći obraditi ovu poruku. Budući da imamo brojač pokušaja isporuke, možemo koristiti ovaj brojač za otkrivanje takvih situacija. Stoga, kada broj isporuka dosegne visoku brojku koju navedete, vjerojatno bi bilo pametnije staviti takvu poruku na drugu nit i poslati obavijest administratoru sustava.

Stanje niti

Momčad XINFO koristi se za traženje raznih informacija o niti i njenim grupama. Na primjer, osnovna naredba izgleda ovako:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Gornja naredba prikazuje opće informacije o navedenom toku. Sada malo složeniji primjer:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Gornja naredba prikazuje opće informacije za sve grupe navedene niti

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Gornja naredba prikazuje informacije za sve pretplatnike navedenog toka i grupe.
Ako zaboravite sintaksu naredbe, samo zatražite pomoć od same naredbe:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Ograničenje veličine toka

Mnoge aplikacije ne žele zauvijek prikupljati podatke u tok. Često je korisno imati najveći dopušteni broj poruka po niti. U drugim slučajevima, korisno je premjestiti sve poruke iz niti u drugu trajnu pohranu kada se dosegne navedena veličina niti. Možete ograničiti veličinu toka pomoću parametra MAXLEN u naredbi XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kada koristite MAXLEN, stari zapisi se automatski brišu kada dostignu određenu duljinu, tako da tok ima stalnu veličinu. Međutim, obrezivanje se u ovom slučaju ne događa na najučinkovitiji način u Redis memoriji. Situaciju možete poboljšati na sljedeći način:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ u gornjem primjeru znači da ne moramo nužno ograničiti duljinu toka na određenu vrijednost. U našem primjeru to može biti bilo koji broj veći ili jednak 1000 (na primjer, 1000, 1010 ili 1030). Upravo smo eksplicitno naveli da želimo da naš tok pohranjuje najmanje 1000 zapisa. To čini upravljanje memorijom mnogo učinkovitijim unutar Redisa.

Postoji i zaseban tim XTRIM, koji radi istu stvar:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trajna pohrana i replikacija

Redis Stream se asinkrono replicira na podređene čvorove i sprema u datoteke kao što su AOF (snimak svih podataka) i RDB (dnevnik svih operacija pisanja). Također je podržana replikacija stanja potrošačkih grupa. Stoga, ako je poruka u statusu "na čekanju" na glavnom čvoru, tada će na podređenim čvorovima ova poruka imati isti status.

Uklanjanje pojedinačnih elemenata iz toka

Postoji posebna naredba za brisanje poruka XDEL. Naredba dobiva naziv niti nakon kojeg slijede ID-ovi poruka koje treba izbrisati:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kada koristite ovu naredbu, morate uzeti u obzir da se stvarna memorija neće odmah osloboditi.

Potoci nulte duljine

Razlika između tokova i drugih Redis podatkovnih struktura je u tome što kada druge podatkovne strukture više nemaju elemente u sebi, kao nuspojava, sama podatkovna struktura bit će uklonjena iz memorije. Tako će, na primjer, sortirani skup biti potpuno uklonjen kada ZREM poziv ukloni posljednji element. Umjesto toga, nitima je dopušteno da ostanu u memoriji čak i bez bilo kakvih elemenata unutra.

Zaključak

Redis Stream idealan je za stvaranje brokera za poruke, redova poruka, objedinjeno bilježenje i sustave za chat koji čuvaju povijest.

Kao što sam jednom rekao Niklaus Wirth, programi su algoritmi plus strukture podataka, a Redis vam već daje oboje.

Izvor: www.habr.com

Dodajte komentar