Redis Stream – jūsų pranešimų sistemų patikimumas ir mastelio keitimas

Redis Stream – jūsų pranešimų sistemų patikimumas ir mastelio keitimas

Redis Stream yra naujas abstrakčių duomenų tipas, pristatytas Redis su 5.0 versija
Iš esmės Redis Stream yra sąrašas, į kurį galite įtraukti įrašų. Kiekvienas įrašas turi unikalų identifikatorių. Pagal numatytuosius nustatymus ID generuojamas automatiškai ir apima laiko žymą. Todėl laikui bėgant galite pateikti užklausą apie įrašų diapazonus arba gauti naujų duomenų, kai jie patenka į srautą, panašiai kaip Unix komanda „tail -f“ nuskaito žurnalo failą ir užstringa, kol laukiama naujų duomenų. Atminkite, kad keli klientai gali klausytis gijos vienu metu, lygiai taip pat daugelis „tail -f“ procesų gali skaityti failą vienu metu neprieštaraudami vienas kitam.

Norėdami suprasti visus naujo duomenų tipo privalumus, trumpai pažvelkime į seniai egzistuojančias Redis struktūras, kurios iš dalies atkartoja Redis Stream funkcijas.

Redis PUB/SUB

„Redis Pub/Sub“ yra paprasta pranešimų siuntimo sistema, jau įdiegta jūsų raktų vertės saugykloje. Tačiau paprastumas kainuoja:

  • Jei leidėjui dėl kokių nors priežasčių nepavyksta, jis praranda visus savo prenumeratorius
  • Leidėjas turi žinoti tikslų visų savo prenumeratorių adresą
  • Leidėjas gali perkrauti savo prenumeratorius darbu, jei duomenys skelbiami greičiau nei apdorojami
  • Pranešimas ištrinamas iš leidėjo buferio iš karto po paskelbimo, neatsižvelgiant į tai, kiek prenumeratorių jis buvo pristatytas ir kaip greitai jie sugebėjo apdoroti šį pranešimą.
  • Visi abonentai pranešimą gaus vienu metu. Patys abonentai turi kažkaip susitarti dėl tos pačios žinutės apdorojimo tvarkos.
  • Nėra integruoto mechanizmo, patvirtinančio, kad abonentas sėkmingai apdorojo pranešimą. Jei prenumeratorius gauna pranešimą ir apdorojimo metu sugenda, leidėjas apie tai nežinos.

Redis sąrašas

„Redis List“ yra duomenų struktūra, palaikanti skaitymo komandų blokavimą. Galite pridėti ir skaityti pranešimus nuo sąrašo pradžios arba pabaigos. Remdamiesi šia struktūra, savo paskirstytoje sistemoje galite sukurti gerą krūvą arba eilę, ir daugeliu atvejų to pakaks. Pagrindiniai skirtumai nuo Redis Pub/Sub:

  • Pranešimas pristatomas vienam klientui. Pirmasis skaitymo blokuotas klientas gaus duomenis pirmiausia.
  • Clintas turi pats inicijuoti kiekvieno pranešimo skaitymo operaciją. Sąrašas nieko nežino apie klientus.
  • Pranešimai saugomi tol, kol kas nors juos perskaito arba aiškiai ištrina. Jei sukonfigūravote Redis serverį, kad duomenys būtų išplauti į diską, sistemos patikimumas labai padidės.

Įvadas į srautą

Įrašo įtraukimas į srautą

Komanda XADD prideda naują įrašą į srautą. Įrašas nėra tik eilutė, jį sudaro viena ar daugiau raktų ir reikšmių porų. Taigi kiekvienas įrašas jau yra struktūrizuotas ir panašus į CSV failo struktūrą.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Aukščiau pateiktame pavyzdyje prie srauto pridedame du laukus pavadinimu (raktu) „mystream“: „sensor-id“ ir „temperature“ su reikšmėmis „1234“ ir „19.8“. Kaip antrasis argumentas, komanda paima identifikatorių, kuris bus priskirtas įrašui – šis identifikatorius unikaliai identifikuoja kiekvieną srauto įrašą. Tačiau šiuo atveju išlaikėme *, nes norime, kad Redis sugeneruotų mums naują ID. Kiekvienas naujas ID didės. Todėl kiekvienas naujas įrašas turės didesnį identifikatorių, palyginti su ankstesniais įrašais.

Identifikatoriaus formatas

Komandos grąžintas įrašo ID XADD, susideda iš dviejų dalių:

{millisecondsTime}-{sequenceNumber}

milisekundėsLaikas — Unix laikas milisekundėmis (Redis serverio laikas). Tačiau jei dabartinis laikas yra toks pat arba mažesnis nei ankstesnio įrašymo laikas, naudojama ankstesnio įrašo laiko žyma. Todėl, jei serverio laikas grįžta atgal, naujasis identifikatorius vis tiek išsaugos prieaugio ypatybę.

sekaNumber naudojamas įrašams, sukurtiems per tą pačią milisekundę. sekaNumber bus padidintas 1, palyginti su ankstesniu įrašu. Nes sekaNumber yra 64 bitų dydžio, tada praktiškai neturėtumėte apriboti įrašų, kuriuos galima sugeneruoti per vieną milisekundę, skaičiaus.

Tokių identifikatorių formatas iš pirmo žvilgsnio gali pasirodyti keistas. Nepasitikintis skaitytojas gali susimąstyti, kodėl laikas yra identifikatoriaus dalis. Priežastis ta, kad Redis srautai palaiko diapazono užklausas pagal ID. Kadangi identifikatorius yra susietas su įrašo sukūrimo laiku, tai leidžia užklausti laiko intervalus. Pažvelgsime į konkretų pavyzdį, kai pažvelgsime į komandą XRANGE.

Jei dėl kokių nors priežasčių vartotojui reikia nurodyti savo identifikatorių, kuris, pavyzdžiui, yra susietas su kokia nors išorine sistema, galime jį perduoti komandai XADD vietoj *, kaip parodyta žemiau:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Atkreipkite dėmesį, kad tokiu atveju turite patys stebėti ID prieaugį. Mūsų pavyzdyje minimalus identifikatorius yra „0-1“, todėl komanda nepriims kito identifikatoriaus, kuris yra lygus „0-1“ arba mažesnis už jį.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Įrašų skaičius sraute

Įrašų skaičių sraute galima gauti tiesiog naudojant komandą XLEN. Mūsų pavyzdyje ši komanda grąžins šią reikšmę:

> XLEN somestream
(integer) 2

Diapazono užklausos – XRANGE ir XREVRANGE

Norėdami prašyti duomenų pagal diapazoną, turime nurodyti du identifikatorius – diapazono pradžią ir pabaigą. Grąžintame diapazone bus visi elementai, įskaitant ribas. Taip pat yra du specialūs identifikatoriai „-“ ir „+“, atitinkamai reiškiantys mažiausią (pirmasis įrašas) ir didžiausią (paskutinis įrašas) identifikatorius sraute. Toliau pateiktame pavyzdyje bus pateikti visi srauto įrašai.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Kiekvienas grąžintas įrašas yra dviejų elementų masyvas: identifikatorius ir raktų-reikšmių porų sąrašas. Jau sakėme, kad įrašų identifikatoriai yra susiję su laiku. Todėl galime prašyti tam tikro laikotarpio. Tačiau užklausoje galime nurodyti ne visą identifikatorių, o tik Unix laiką, praleisdami dalį, susijusią su sekaNumber. Praleista identifikatoriaus dalis diapazono pradžioje bus automatiškai nustatyta į nulį, o diapazono pabaigoje – į didžiausią įmanomą reikšmę. Toliau pateikiamas pavyzdys, kaip galite prašyti dviejų milisekundžių diapazono.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Šiame diapazone turime tik vieną įrašą, tačiau realiuose duomenų rinkiniuose grąžinamas rezultatas gali būti didžiulis. Dėl šios priežasties XRANGE palaiko parinktį COUNT. Nurodę kiekį, galime tiesiog gauti pirmųjų N įrašų. Jei mums reikia gauti sekančius N įrašus (puslapius), galime naudoti paskutinį gautą ID, jį padidinti sekaNumber po vieną ir paklausk dar kartą. Pažvelkime į tai kitame pavyzdyje. Mes pradedame pridėti 10 elementų su XADD (darant prielaidą, kad mystream jau buvo užpildytas 10 elementų). Norėdami pradėti iteraciją, gaudami 2 elementus vienai komandai, pradedame nuo viso diapazono, bet su COUNT lygiu 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Norėdami tęsti kartojimą su kitais dviem elementais, turime pasirinkti paskutinį gautą ID, t. y. 1519073279157-0, ir pridėti 1 prie sekaNumber.
Gautas ID, šiuo atveju 1519073279157-1, dabar gali būti naudojamas kaip naujas diapazono pradžios argumentas kitam skambučiui XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Ir taip toliau. Dėl sudėtingumo XRANGE yra O(log(N)) paieškai, o po to O(M), kad grąžintų M elementų, tada kiekvienas iteracijos žingsnis yra greitas. Taigi, naudojant XRANGE srautai gali būti kartojami efektyviai.

Komanda XREVRANGE yra lygiavertis XRANGE, bet grąžina elementus atvirkštine tvarka:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Atkreipkite dėmesį, kad komanda XREVRANGE paima diapazono argumentus pradžia ir pabaiga atvirkštine tvarka.

Naujų įrašų skaitymas naudojant XREAD

Dažnai iškyla užduotis užsiprenumeruoti srautą ir gauti tik naujus pranešimus. Ši koncepcija gali atrodyti panaši į Redis Pub/Sub arba Redis List blokavimą, tačiau yra esminių skirtumų, kaip naudoti Redis Stream:

  1. Kiekviena nauja žinutė pagal numatytuosius nustatymus pristatoma kiekvienam abonentui. Šis elgesys skiriasi nuo blokuojančio Redis sąrašo, kuriame naują pranešimą skaitys tik vienas prenumeratorius.
  2. Nors Redis Pub/Sub visi pranešimai pamirštami ir niekada neišsaugomi, sraute visi pranešimai saugomi neribotą laiką (nebent klientas aiškiai ištrina).
  3. Redis Stream leidžia atskirti prieigą prie pranešimų viename sraute. Konkretus abonentas gali matyti tik savo asmeninių pranešimų istoriją.

Galite užsiprenumeruoti giją ir gauti naujų pranešimų naudodami komandą PERSKAITYTI. Tai šiek tiek sudėtingiau nei XRANGE, todėl pirmiausia pradėsime nuo paprastesnių pavyzdžių.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Aukščiau pateiktame pavyzdyje parodyta neblokuojanti forma PERSKAITYTI. Atminkite, kad parinktis COUNT yra neprivaloma. Tiesą sakant, vienintelė reikalinga komandos parinktis yra STREAMS parinktis, kuri nurodo srautų sąrašą kartu su atitinkamu didžiausiu identifikatoriumi. Parašėme „STREAMS mystream 0“ – norime gauti visus „mystream“ srauto įrašus su didesniu nei „0-0“ identifikatoriumi. Kaip matote iš pavyzdžio, komanda grąžina gijos pavadinimą, nes vienu metu galime užsiprenumeruoti kelias gijas. Pavyzdžiui, galėtume parašyti „STREAMS mystream otherstream 0 0“. Atkreipkite dėmesį, kad po parinkties STREAMS pirmiausia turime pateikti visų reikiamų srautų pavadinimus ir tik tada identifikatorių sąrašą.

Šia paprasta forma komanda nedaro nieko ypatingo, palyginti su XRANGE. Tačiau įdomiausia tai, kad galime lengvai pasukti PERSKAITYTI į blokavimo komandą, nurodant argumentą BLOKUOTI:

> XREAD BLOCK 0 STREAMS mystream $

Aukščiau pateiktame pavyzdyje nurodyta nauja BLOKUOTI parinktis, kurios laikas yra 0 milisekundžių (tai reiškia, kad reikia laukti neribotą laiką). Be to, užuot perdavęs įprastą srauto mystream identifikatorių, buvo perduotas specialus identifikatorius $. Šis specialus identifikatorius tai reiškia PERSKAITYTI kaip identifikatorių turi naudoti maksimalų mystream identifikatorių. Taigi naujus pranešimus gausime tik nuo to momento, kai pradėjome klausytis. Kai kuriais atžvilgiais tai panašu į Unix komandą "tail -f".

Atminkite, kad naudojant parinktį BLOKUOTI mums nebūtinai reikia naudoti specialų identifikatorių $. Galime naudoti bet kokį sraute esantį identifikatorių. Jei komanda gali nedelsiant aptarnauti mūsų užklausą be blokavimo, ji tai padarys, kitu atveju blokuos.

Blokavimas PERSKAITYTI taip pat gali klausytis kelių gijų vienu metu, tereikia nurodyti jų pavadinimus. Tokiu atveju komanda grąžins pirmojo srauto, kuris gavo duomenis, įrašą. Pirmasis tam tikros gijos užblokuotas abonentas pirmiausia gaus duomenis.

Vartotojų grupės

Tam tikromis užduotimis norime apriboti abonento prieigą prie pranešimų vienoje gijoje. Pavyzdys, kai tai gali būti naudinga, yra pranešimų eilė su darbuotojais, kurie gaus skirtingus pranešimus iš gijos, todėl pranešimų apdorojimas gali padidėti.

Jei įsivaizduosime, kad turime tris abonentus C1, C2, C3 ir giją, kurioje yra pranešimai 1, 2, 3, 4, 5, 6, 7, tada pranešimai bus pateikiami taip, kaip parodyta toliau pateiktoje diagramoje:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Kad pasiektų šį efektą, „Redis Stream“ naudoja koncepciją, pavadintą „Consumer Group“. Ši koncepcija yra panaši į pseudoabonentą, kuris gauna duomenis iš srauto, bet iš tikrųjų jį aptarnauja keli grupės abonentai, suteikdami tam tikras garantijas:

  1. Kiekvienas pranešimas pristatomas kitam grupės abonentui.
  2. Grupėje prenumeratoriai identifikuojami pagal jų vardą, kuris yra didžiosios ir mažosios raidės. Jei abonentas laikinai išeina iš grupės, jis gali būti grąžintas į grupę naudojant savo unikalų pavadinimą.
  3. Kiekviena vartotojų grupė vadovaujasi „pirmosios neskaitytos žinutės“ koncepcija. Kai abonentas prašo naujų pranešimų, jis gali gauti tik tuos pranešimus, kurie anksčiau nebuvo pristatyti jokiam grupės abonentui.
  4. Yra komanda, kuri aiškiai patvirtina, kad abonentas sėkmingai apdorojo pranešimą. Kol nebus iškviesta ši komanda, prašomo pranešimo būsena bus „laukiama“.
  5. Vartotojų grupėje kiekvienas abonentas gali prašyti jam pristatytų, bet dar neapdorotų pranešimų istorijos (būsena „laukiama“).

Tam tikra prasme grupės būsena gali būti išreikšta taip:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Dabar atėjo laikas susipažinti su pagrindinėmis vartotojų grupės komandomis, būtent:

  • XGROUP naudojamas grupėms kurti, naikinti ir valdyti
  • XREADGROUP naudojamas srautui skaityti per grupę
  • XACK - ši komanda leidžia abonentui pažymėti pranešimą kaip sėkmingai apdorotą

Vartotojų grupės sukūrimas

Tarkime, kad mano srautas jau egzistuoja. Tada grupės kūrimo komanda atrodys taip:

> XGROUP CREATE mystream mygroup $
OK

Kurdami grupę turime perduoti identifikatorių, nuo kurio grupė gaus pranešimus. Jei norime tik gauti visus naujus pranešimus, galime naudoti specialų identifikatorių $ (kaip mūsų pavyzdyje aukščiau). Jei vietoj specialaus identifikatoriaus nurodysite 0, visi gijos pranešimai bus pasiekiami grupei.

Dabar, kai grupė sukurta, galime iškart pradėti skaityti pranešimus naudodami komandą XREADGROUP. Ši komanda yra labai panaši į PERSKAITYTI ir palaiko pasirenkamą BLOKUOTI parinktį. Tačiau yra būtina parinktis GROUP, kuri visada turi būti nurodyta dviem argumentais: grupės pavadinimu ir abonento vardu. Taip pat palaikoma parinktis COUNT.

Prieš skaitydami temą, įdėkime keletą pranešimų:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Dabar pabandykime perskaityti šį srautą per grupę:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Aukščiau pateikta komanda pažodžiui skamba taip:

„Aš, prenumeratorė Alisa, mano grupės narė, noriu perskaityti vieną pranešimą iš mano srauto, kuris dar niekada niekam nebuvo pristatytas.

Kiekvieną kartą, kai abonentas atlieka operaciją grupėje, jis turi nurodyti savo pavadinimą, unikaliai identifikuojantis save grupėje. Aukščiau pateiktoje komandoje yra dar viena labai svarbi detalė – specialusis identifikatorius ">". Šis specialus identifikatorius filtruoja pranešimus, paliekant tik tuos, kurie niekada anksčiau nebuvo pristatyti.

Be to, ypatingais atvejais galite nurodyti tikrą identifikatorių, pvz., 0 arba bet kurį kitą galiojantį identifikatorių. Šiuo atveju komanda XREADGROUP grąžins jums istoriją žinučių, kurių būsena „laukiama“, kurios buvo pristatytos nurodytam abonentui (Alisa), bet dar nebuvo patvirtintos naudojant komandą XACK.

Šį elgesį galime išbandyti iš karto nurodydami ID 0, be parinkties SKAIČIAVIMAS. Tiesiog pamatysime vieną laukiantį pranešimą, tai yra, obuolio pranešimą:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Tačiau jei patvirtinsime, kad pranešimas sėkmingai apdorotas, jis nebebus rodomas:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Dabar atėjo Bobo eilė ką nors perskaityti:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bobas, mano grupės narys, paprašė ne daugiau kaip dviejų žinučių. Komanda praneša tik apie nepristatytus pranešimus dėl specialaus identifikatoriaus ">". Kaip matote, pranešimas „obuolys“ nebus rodomas, nes jis jau buvo pristatytas Alisai, todėl Bobas gauna „apelsiną“ ir „braškę“.

Tokiu būdu Alisa, Bobas ir bet kuris kitas grupės prenumeratorius gali skaityti skirtingus pranešimus iš to paties srauto. Jie taip pat gali skaityti savo neapdorotų pranešimų istoriją arba pažymėti pranešimus kaip apdorotus.

Reikia atsiminti keletą dalykų:

  • Kai tik abonentas mano, kad pranešimas yra komanda XREADGROUP, šis pranešimas pereina į būseną „laukiama“ ir priskiriamas tam konkrečiam abonentui. Kiti grupės prenumeratoriai negalės perskaityti šio pranešimo.
  • Abonentai automatiškai sukuriami pirmą kartą paminėjus, nereikia jų tiesiogiai kurti.
  • naudojant XREADGROUP vienu metu galite skaityti pranešimus iš kelių skirtingų gijų, tačiau kad tai veiktų, pirmiausia turite sukurti grupes tuo pačiu pavadinimu kiekvienai gijai naudodami XGROUP

Atsigavimas po nesėkmės

Abonentas gali atsigauti po gedimo ir dar kartą perskaityti savo pranešimų sąrašą, kurio būsena yra „laukiama“. Tačiau realiame pasaulyje abonentai galiausiai gali žlugti. Kas atsitiks su įstrigusiais abonento pranešimais, jei abonentas negali atsigauti po gedimo?
Consumer Group siūlo funkciją, kuri naudojama kaip tik tokiais atvejais – kai reikia pakeisti žinučių savininką.

Pirmas dalykas, kurį reikia padaryti, yra iškviesti komandą EXPENDING, kuriame rodomi visi grupės pranešimai, kurių būsena yra „laukiama“. Paprasčiausia komanda iškviečiama tik su dviem argumentais: gijos pavadinimu ir grupės pavadinimu:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Komanda parodė visos grupės ir kiekvieno abonento neapdorotų pranešimų skaičių. Turime tik Bobą su dviem neišspręstais pranešimais, nes vienintelė Alisa prašyta žinutė buvo patvirtinta XACK.

Galime paprašyti daugiau informacijos, naudodami daugiau argumentų:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} – identifikatorių diapazonas (galite naudoti „-“ ir „+“)
{count} – pristatymo bandymų skaičius
{consumer-name} – grupės pavadinimas

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Dabar turime išsamią informaciją apie kiekvieną pranešimą: ID, abonento vardą, neveikimo laiką milisekundėmis ir galiausiai pristatymo bandymų skaičių. Turime du Bobo pranešimus ir jie buvo neaktyvūs 74170458 milisekundes, apie 20 valandų.

Atminkite, kad niekas netrukdo mums patikrinti, koks buvo pranešimo turinys, paprasčiausiai naudojant XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mes tiesiog turime pakartoti tą patį identifikatorių du kartus argumentuose. Dabar, kai jau turime idėją, Alisa gali nuspręsti, kad po 20 valandų prastovos Bobas greičiausiai neatsigaus, ir laikas užklausti tuos pranešimus ir tęsti jų apdorojimą Bobui. Tam naudojame komandą PAREIŠKIMAS:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Naudodami šią komandą galime gauti „užsienio“ pranešimą, kuris dar nebuvo apdorotas, pakeitę savininką į {vartotojas}. Tačiau taip pat galime nurodyti minimalų tuščiosios eigos laiką {min-idle-time}. Tai padeda išvengti situacijos, kai du klientai vienu metu bando pakeisti tų pačių pranešimų savininką:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pirmasis klientas iš naujo nustatys prastovą ir padidins pristatymo skaitiklį. Taigi antrasis klientas negalės to prašyti.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alisa sėkmingai atsiėmė pranešimą, kuris dabar gali apdoroti pranešimą ir jį patvirtinti.

Iš aukščiau pateikto pavyzdžio matote, kad sėkminga užklausa grąžina paties pranešimo turinį. Tačiau tai nėra būtina. Parinktį JUSTID galima naudoti tik pranešimų ID grąžinimui. Tai naudinga, jei nesate suinteresuoti pranešimo detalėmis ir norite padidinti sistemos našumą.

Pristatymo skaitiklis

Skaitiklis, kurį matote išvestyje EXPENDING yra kiekvieno pranešimo pristatymų skaičius. Toks skaitiklis didinamas dviem būdais: kai pranešimo užklausa sėkmingai pateikiama per PAREIŠKIMAS arba kai naudojamas skambutis XREADGROUP.

Normalu, kad kai kurie pranešimai pristatomi kelis kartus. Svarbiausia, kad visi pranešimai galiausiai būtų apdoroti. Kartais apdorojant pranešimą kyla problemų, nes pats pranešimas yra sugadintas arba pranešimo apdorojimas sukelia tvarkyklės kodo klaidą. Tokiu atveju gali pasirodyti, kad niekas negalės apdoroti šios žinutės. Kadangi turime pristatymo bandymų skaitiklį, galime naudoti šį skaitiklį tokioms situacijoms aptikti. Todėl pristatymo skaičiui pasiekus jūsų nurodytą aukštą skaičių, tikriausiai būtų protingiau tokį pranešimą įdėti į kitą giją ir išsiųsti pranešimą sistemos administratoriui.

Gijos būsena

Komanda XINFO naudojamas prašyti įvairios informacijos apie giją ir jos grupes. Pavyzdžiui, pagrindinė komanda atrodo taip:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Aukščiau pateikta komanda rodo bendrą informaciją apie nurodytą srautą. Dabar šiek tiek sudėtingesnis pavyzdys:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Aukščiau pateikta komanda rodo bendrą informaciją apie visas nurodytos gijos grupes

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Aukščiau pateikta komanda rodo informaciją apie visus nurodyto srauto ir grupės abonentus.
Jei pamiršote komandos sintaksę, tiesiog paprašykite pagalbos pačios komandos:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Srauto dydžio apribojimas

Daugelis programų nenori amžinai rinkti duomenų į srautą. Dažnai naudinga turėti didžiausią leistiną pranešimų skaičių vienoje gijoje. Kitais atvejais naudinga perkelti visus pranešimus iš gijos į kitą nuolatinę saugyklą, kai pasiekiamas nurodytas gijos dydis. Srauto dydį galite apriboti naudodami komandos parametrą MAXLEN XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Naudojant MAXLEN, seni įrašai automatiškai ištrinami, kai pasiekia nurodytą ilgį, todėl srauto dydis yra pastovus. Tačiau šiuo atveju genėjimas Redis atmintyje nevyksta pačiu efektyviausiu būdu. Galite pagerinti situaciją taip:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argumentas ~ aukščiau pateiktame pavyzdyje reiškia, kad mums nebūtinai reikia apriboti srauto ilgį iki konkrečios vertės. Mūsų pavyzdyje tai gali būti bet koks skaičius, didesnis arba lygus 1000 (pavyzdžiui, 1000, 1010 arba 1030). Mes ką tik aiškiai nurodėme, kad norime, kad mūsų sraute būtų saugoma bent 1000 įrašų. Dėl to „Redis“ atminties valdymas tampa daug efektyvesnis.

Taip pat yra atskira komanda XTRIM, kuris daro tą patį:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Nuolatinis saugojimas ir replikacija

„Redis Stream“ asinchroniškai replikuojamas į pavaldinius mazgus ir išsaugomas tokiuose failuose kaip AOF (visų duomenų momentinė nuotrauka) ir RDB (visų rašymo operacijų žurnalas). Taip pat palaikoma vartotojų grupių būsenos replikacija. Todėl, jei pranešimo būsena yra „laukiama“ pagrindiniame mazge, tada pagalbiniuose mazguose šio pranešimo būsena bus tokia pati.

Atskirų elementų pašalinimas iš srauto

Yra speciali komanda, skirta ištrinti pranešimus XDEL. Komanda gauna gijos pavadinimą, po kurio nurodomi pranešimų ID, kuriuos reikia ištrinti:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Naudodami šią komandą turite atsižvelgti į tai, kad tikroji atmintis nebus iš karto atlaisvinta.

Nulinio ilgio srautai

Skirtumas tarp srautų ir kitų Redis duomenų struktūrų yra tas, kad kai kitose duomenų struktūrose nebeliks elementų, kaip šalutinis poveikis, pati duomenų struktūra bus pašalinta iš atminties. Taigi, pavyzdžiui, surūšiuotas rinkinys bus visiškai pašalintas, kai ZREM iškvietimas pašalins paskutinį elementą. Vietoj to, gijos gali likti atmintyje net ir neturint jokių elementų.

išvada

„Redis Stream“ idealiai tinka pranešimų tarpininkams, pranešimų eilėms, vieningam registravimui ir istorijos saugojimo pokalbių sistemoms kurti.

Kaip kažkada sakiau Niklausas Wirthas, programos yra algoritmai ir duomenų struktūros, o Redis jau suteikia jums abu.

Šaltinis: www.habr.com

Добавить комментарий