Redis Stream ā€” jÅ«su ziņojumapmaiņas sistēmu uzticamÄ«ba un mērogojamÄ«ba

Redis Stream ā€” jÅ«su ziņojumapmaiņas sistēmu uzticamÄ«ba un mērogojamÄ«ba

Redis Stream ir jauns abstrakts datu veids, kas ieviests Redis ar versiju 5.0
Konceptuāli Redis Stream ir saraksts, kuram varat pievienot ierakstus. Katram ierakstam ir unikāls identifikators. Pēc noklusējuma ID tiek ģenerēts automātiski un ietver laikspiedolu. Tāpēc jūs varat vaicāt ierakstu diapazonus laika gaitā vai saņemt jaunus datus, tiklīdz tie nonāk straumē, līdzīgi kā Unix komanda "tail -f" nolasa žurnāla failu un sasalst, gaidot jaunus datus. Ņemiet vērā, ka vairāki klienti var klausīties pavedienu vienlaikus, tāpat kā daudzi "tail -f" procesi var nolasīt failu vienlaikus, neradot konfliktus vienam ar otru.

Lai saprastu visas jaunā datu veida priekÅ”rocÄ«bas, Ä«si apskatÄ«sim jau sen pastāvoŔās Redis struktÅ«ras, kas daļēji atkārto Redis Stream funkcionalitāti.

Redis PUB/SUB

Redis Pub/Sub ir vienkārÅ”a ziņojumapmaiņas sistēma, kas jau ir iebÅ«vēta jÅ«su atslēgu vērtÄ«bu veikalā. Tomēr vienkārŔībai ir sava cena:

  • Ja izdevējam kāda iemesla dēļ neizdodas, viņŔ zaudē visus savus abonentus
  • Izdevējam ir jāzina precÄ«za visu savu abonentu adrese
  • Izdevējs var pārslogot savus abonentus ar darbu, ja dati tiek publicēti ātrāk, nekā tie tiek apstrādāti
  • Ziņojums tiek dzēsts no izdevēja bufera tÅ«lÄ«t pēc publicÄ“Å”anas neatkarÄ«gi no tā, cik abonentiem tas tika piegādāts un cik ātri viņi varēja apstrādāt Å”o ziņojumu.
  • Visi abonenti saņems ziņu vienlaikus. PaÅ”iem abonentiem kaut kādā veidā jāvienojas savā starpā par vienas ziņas apstrādes kārtÄ«bu.
  • Nav iebÅ«vēta mehānisma, kas apstiprinātu, ka abonents ir veiksmÄ«gi apstrādājis ziņojumu. Ja abonents saņem ziņojumu un apstrādes laikā avarē, izdevējs par to nezinās.

Redisa saraksts

Redis saraksts ir datu struktÅ«ra, kas atbalsta lasÄ«Å”anas komandu bloÄ·Ä“Å”anu. Varat pievienot un lasÄ«t ziņas no saraksta sākuma vai beigām. Pamatojoties uz Å”o struktÅ«ru, jÅ«s varat izveidot labu steku vai rindu savai izplatÄ«tajai sistēmai, un vairumā gadÄ«jumu ar to pietiks. Galvenās atŔķirÄ«bas no Redis Pub/Sub:

  • Ziņa tiek piegādāta vienam klientam. Pirmais nolasÄ«Å”anas bloķētais klients saņems datus pirmais.
  • Klintam paÅ”am ir jāuzsāk katra ziņojuma lasÄ«Å”anas darbÄ«ba. Lists neko nezina par klientiem.
  • Ziņojumi tiek glabāti, lÄ«dz kāds tos izlasa vai nepārprotami izdzÄ“Å”. Ja konfigurējat Redis serveri, lai izskalotu datus diskā, sistēmas uzticamÄ«ba ievērojami palielinās.

Ievads pakalpojumā Stream

Ieraksta pievienoŔana straumei

Komanda XADD pievieno straumei jaunu ierakstu. Ieraksts nav tikai virkne, tas sastāv no viena vai vairākiem atslēgu-vērtību pāriem. Tādējādi katrs ieraksts jau ir strukturēts un atgādina CSV faila struktūru.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

IepriekÅ” minētajā piemērā straumei pievienojam divus laukus ar nosaukumu (atslēgu) ā€œmystreamā€: ā€œsensor-idā€ un ā€œtemperatÅ«raā€ ar vērtÄ«bām attiecÄ«gi ā€œ1234ā€ un ā€œ19.8ā€. Kā otro argumentu komanda ņem identifikatoru, kas tiks pieŔķirts ierakstam - Å”is identifikators unikāli identificē katru ierakstu straumē. Tomēr Å”ajā gadÄ«jumā mēs izturējām *, jo vēlamies, lai Redis mums Ä£enerē jaunu ID. Katrs jauns ID palielināsies. Tāpēc katram jaunajam ierakstam bÅ«s augstāks identifikators salÄ«dzinājumā ar iepriekŔējiem ierakstiem.

Identifikatora formāts

Komandas atgrieztais ieraksta ID XADD, sastāv no divām daļām:

{millisecondsTime}-{sequenceNumber}

milisekundesLaiks ā€” Unix laiks milisekundēs (Redis servera laiks). Tomēr, ja paÅ”reizējais laiks ir tāds pats vai mazāks nekā iepriekŔējā ieraksta laiks, tiek izmantots iepriekŔējā ieraksta laikspiedols. Tāpēc, ja servera laiks atgriežas pagātnē, jaunais identifikators joprojām saglabās pieauguma rekvizÄ«tu.

secÄ«basNumber izmanto ierakstiem, kas izveidoti tajā paŔā milisekundē. secÄ«basNumber tiks palielināts par 1 attiecÄ«bā pret iepriekŔējo ierakstu. Tāpēc ka secÄ«basNumber ir 64 biti liels, tad praksē nevajadzētu ierobežot ierakstu skaitu, ko var Ä£enerēt vienas milisekundes laikā.

Šādu identifikatoru formāts no pirmā acu uzmetiena var Ŕķist dÄ«vains. NeuzticÄ«gs lasÄ«tājs varētu brÄ«nÄ«ties, kāpēc laiks ir daļa no identifikatora. Iemesls ir tāds, ka Redis straumes atbalsta diapazona vaicājumus pēc ID. Tā kā identifikators ir saistÄ«ts ar ieraksta izveides laiku, tas ļauj vaicāt laika diapazonus. Mēs apskatÄ«sim konkrētu piemēru, kad aplÅ«kosim komandu XRANGE.

Ja kāda iemesla dēļ lietotājam ir jānorāda savs identifikators, kas, piemēram, ir saistīts ar kādu ārēju sistēmu, tad mēs varam to nodot komandai XADD * vietā, kā parādīts zemāk:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

LÅ«dzu, ņemiet vērā, ka Å”ajā gadÄ«jumā jums paÅ”am jāuzrauga ID palielinājums. MÅ«su piemērā minimālais identifikators ir "0-1", tāpēc komanda nepieņems citu identifikatoru, kas ir vienāds ar "0-1" vai mazāks par to.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Ierakstu skaits vienā straumē

Ir iespējams iegÅ«t ierakstu skaitu straumē, vienkārÅ”i izmantojot komandu XLEN. MÅ«su piemērā Ŕī komanda atgriezÄ«s Ŕādu vērtÄ«bu:

> XLEN somestream
(integer) 2

Diapazona vaicājumi - XRANGE un XREVRANGE

Lai pieprasÄ«tu datus pēc diapazona, mums ir jānorāda divi identifikatori - diapazona sākums un beigas. Atgrieztais diapazons ietvers visus elementus, tostarp robežas. Ir arÄ« divi Ä«paÅ”i identifikatori ā€œ-ā€ un ā€œ+ā€, kas attiecÄ«gi nozÄ«mē mazāko (pirmais ieraksts) un lielāko (pēdējais ieraksts) identifikatoru straumē. Tālāk esoÅ”ajā piemērā bÅ«s uzskaitÄ«ti visi straumes ieraksti.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Katrs atgrieztais ieraksts ir divu elementu masīvs: identifikators un atslēgu un vērtību pāru saraksts. Mēs jau teicām, ka ierakstu identifikatori ir saistīti ar laiku. Tāpēc mēs varam pieprasīt noteikta laika perioda diapazonu. Taču pieprasījumā varam norādīt nevis pilnu identifikatoru, bet tikai Unix laiku, izlaižot daļu, kas attiecas uz secībasNumber. Izlaistā identifikatora daļa diapazona sākumā tiks automātiski iestatīta uz nulli un diapazona beigās uz maksimālo iespējamo vērtību. Tālāk ir sniegts piemērs tam, kā varat pieprasīt divu milisekundu diapazonu.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Å ajā diapazonā mums ir tikai viens ieraksts, taču reālos datu kopās atgrieztais rezultāts var bÅ«t milzÄ«gs. Å Ä« iemesla dēļ XRANGE atbalsta opciju COUNT. Norādot daudzumu, mēs varam vienkārÅ”i iegÅ«t pirmos N ierakstus. Ja mums ir jāiegÅ«st nākamie N ieraksti (lapu ŔķiroÅ”ana), varam izmantot pēdējo saņemto ID, to palielināt secÄ«basNumber pa vienam un jautā vēlreiz. ApskatÄ«sim to nākamajā piemērā. Mēs sākam pievienot 10 elementus ar XADD (pieņemot, ka mystream jau bija piepildÄ«ts ar 10 elementiem). Lai sāktu iterāciju, iegÅ«stot 2 elementus katrā komandā, mēs sākam ar pilnu diapazonu, bet ar COUNT, kas vienāds ar 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Lai turpinātu atkārtoÅ”anu ar nākamajiem diviem elementiem, mums ir jāatlasa pēdējais saņemtais ID, t.i., 1519073279157-0, un jāpievieno 1. secÄ«basNumber.
IegÅ«to ID, Å”ajā gadÄ«jumā 1519073279157-1, tagad var izmantot kā jauno diapazona sākuma argumentu nākamajam izsaukumam. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Un tā tālāk. Sarežģītības dēļ XRANGE ir O(log(N)), lai meklētu un pēc tam O(M), lai atgrieztu M elementus, tad katrs iterācijas solis ir ātrs. Tādējādi, izmantojot XRANGE straumes var efektīvi atkārtot.

Komanda XREVRANGE ir līdzvērtīgs XRANGE, bet atgriež elementus apgrieztā secībā:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

LÅ«dzu, ņemiet vērā, ka komanda XREVRANGE ņem diapazona argumentus sākuma un beigÅ”anas apgrieztā secÄ«bā.

Jaunu ierakstu lasīŔana, izmantojot XREAD

Bieži vien rodas uzdevums abonēt straumi un saņemt tikai jaunas ziņas. Å is jēdziens var Ŕķist lÄ«dzÄ«gs Redis Pub/Sub vai Redis List bloÄ·Ä“Å”anai, taču Redis Stream lietoÅ”anā ir bÅ«tiskas atŔķirÄ«bas:

  1. Katra jauna ziņa pēc noklusējuma tiek piegādāta katram abonentam. Å Ä« darbÄ«ba atŔķiras no bloķējoŔā Redis saraksta, kurā jaunu ziņojumu lasÄ«s tikai viens abonents.
  2. Kamēr pakalpojumā Redis Pub/Sub visi ziņojumi tiek aizmirsti un nekad netiek saglabāti, pakalpojumā Stream visi ziņojumi tiek saglabāti neierobežotu laiku (ja vien klients nepārprotami neizraisa dzÄ“Å”anu).
  3. Redis Stream ļauj atŔķirt piekļuvi ziņojumiem vienā straumē. Konkrēts abonents var redzēt tikai savu personÄ«go ziņojumu vēsturi.

Izmantojot komandu, varat abonēt pavedienu un saņemt jaunus ziņojumus IZLASÄŖT. Tas ir nedaudz sarežģītāk nekā XRANGE, tāpēc vispirms sāksim ar vienkārŔākiem piemēriem.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

IepriekÅ” minētajā piemērā ir parādÄ«ta nebloķējoÅ”a forma IZLASÄŖT. Ņemiet vērā, ka opcija COUNT nav obligāta. Faktiski vienÄ«gā nepiecieÅ”amā komandas opcija ir opcija STREAMS, kas norāda straumju sarakstu kopā ar atbilstoÅ”o maksimālo identifikatoru. Mēs uzrakstÄ«jām "STREAMS mystream 0" - mēs vēlamies saņemt visus mystream straumes ierakstus ar identifikatoru, kas ir lielāks par "0-0". Kā redzat piemērā, komanda atgriež pavediena nosaukumu, jo mēs varam abonēt vairākus pavedienus vienlaikus. Mēs varētu rakstÄ«t, piemēram, "STREAMS mystream otherstream 0 0". LÅ«dzu, ņemiet vērā, ka pēc opcijas STREAMS vispirms ir jānorāda visu nepiecieÅ”amo straumju nosaukumi un tikai pēc tam identifikatoru saraksts.

Å ajā vienkārÅ”ajā formā komanda nedara neko Ä«paÅ”u salÄ«dzinājumā ar XRANGE. Tomēr interesanti ir tas, ka mēs varam viegli pagriezties IZLASÄŖT uz bloÄ·Ä“Å”anas komandu, norādot argumentu BLOĶĒT:

> XREAD BLOCK 0 STREAMS mystream $

IepriekÅ” minētajā piemērā jauna opcija BLOĶĒT ir norādÄ«ta ar taimautu 0 milisekundes (tas nozÄ«mē, ka jāgaida bezgalÄ«gi). Turklāt tā vietā, lai nodotu parasto straumes mystream identifikatoru, tika nodots Ä«paÅ”s identifikators $. Å is Ä«paÅ”ais identifikators to nozÄ«mē IZLASÄŖT kā identifikators ir jāizmanto maksimālais mystream identifikators. Tātad jaunus ziņojumus saņemsim tikai no brīža, kad sākām klausÄ«ties. Dažos veidos tas ir lÄ«dzÄ«gs Unix komandai "tail -f".

Ņemiet vērā, ka, izmantojot opciju BLOĶĒT, mums nav obligāti jāizmanto Ä«paÅ”ais identifikators $. Mēs varam izmantot jebkuru straumē esoÅ”o identifikatoru. Ja komanda var nekavējoties apkalpot mÅ«su pieprasÄ«jumu bez bloÄ·Ä“Å”anas, tā to darÄ«s, pretējā gadÄ«jumā tā tiks bloķēta.

BloÄ·Ä“Å”ana IZLASÄŖT var arÄ« klausÄ«ties vairākus pavedienus vienlaikus, jums vienkārÅ”i jānorāda to nosaukumi. Å ajā gadÄ«jumā komanda atgriezÄ«s pirmās straumes ierakstu, kas saņēma datus. Pirmais abonents, kas bloķēts konkrētajam pavedienam, vispirms saņems datus.

Patērētāju grupas

Dažos uzdevumos mēs vēlamies ierobežot abonentu piekļuvi ziņojumiem vienā pavedienā. Piemērs, kur tas varētu bÅ«t noderÄ«gi, ir ziņojumu rinda ar darbiniekiem, kuri saņems dažādus ziņojumus no pavediena, ļaujot ziņojumu apstrādei palielināt mērogoÅ”anu.

Ja iedomājamies, ka mums ir trÄ«s abonenti C1, C2, C3 un pavediens, kas satur ziņojumus 1, 2, 3, 4, 5, 6, 7, tad ziņojumi tiks pasniegti, kā parādÄ«ts zemāk esoÅ”ajā diagrammā:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Lai sasniegtu Å”o efektu, Redis Stream izmanto koncepciju, ko sauc par Consumer Group. Å Ä« koncepcija ir lÄ«dzÄ«ga pseidoabonentam, kas saņem datus no straumes, bet faktiski apkalpo vairāki abonenti grupā, nodroÅ”inot noteiktas garantijas:

  1. Katrs ziņojums tiek piegādāts citam grupas abonentam.
  2. Grupas ietvaros abonenti tiek identificēti pēc viņu vārda, kas ir reģistrjutīga virkne. Ja abonents uz laiku izstājas no grupas, viņu var atjaunot grupā, izmantojot viņa unikālo nosaukumu.
  3. Katra patērētāju grupa ievēro ā€œpirmā nelasÄ«tā ziņojumaā€ koncepciju. Kad abonents pieprasa jaunus ziņojumus, tas var saņemt tikai tādas ziņas, kuras nekad iepriekÅ” nav piegādātas nevienam grupas abonentam.
  4. Ir komanda, kas nepārprotami apstiprina, ka abonents ir veiksmÄ«gi apstrādājis ziņojumu. Kamēr Ŕī komanda netiek izsaukta, pieprasÄ«tais ziņojums paliks statusā "gaida".
  5. Patērētāju grupā katrs abonents var pieprasÄ«t to ziņojumu vēsturi, kas viņam tika piegādāti, bet vēl nav apstrādāti (statusā ā€œgaidaā€).

Savā ziņā grupas stāvokli var izteikt Ŕādi:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Tagad ir pienācis laiks iepazīties ar galvenajām patērētāju grupas komandām, proti:

  • XGROUP izmanto, lai izveidotu, iznÄ«cinātu un pārvaldÄ«tu grupas
  • XREADGROUP izmanto, lai lasÄ«tu straumi caur grupu
  • XACK - Ŕī komanda ļauj abonentam atzÄ«mēt ziņojumu kā veiksmÄ«gi apstrādātu

Patērētāju grupas izveide

Pieņemsim, ka mystream jau pastāv. Pēc tam grupas izveides komanda izskatÄ«sies Ŕādi:

> XGROUP CREATE mystream mygroup $
OK

Veidojot grupu, mums ir jānodod identifikators, no kura grupa saņems ziņojumus. Ja mēs tikai vēlamies saņemt visus jaunos ziņojumus, mēs varam izmantot Ä«paÅ”o identifikatoru $ (kā mÅ«su piemērā iepriekÅ”). Ja Ä«paŔā identifikatora vietā norādāt 0, visi pavedienā esoÅ”ie ziņojumi bÅ«s pieejami grupai.

Tagad, kad grupa ir izveidota, mēs varam nekavējoties sākt lasÄ«t ziņojumus, izmantojot komandu XREADGROUP. Å Ä« komanda ir ļoti lÄ«dzÄ«ga IZLASÄŖT un atbalsta opciju BLOĶĒT. Tomēr ir obligāta opcija GROUP, kas vienmēr jānorāda ar diviem argumentiem: grupas nosaukumu un abonenta vārdu. Tiek atbalstÄ«ta arÄ« opcija COUNT.

Pirms pavediena lasÄ«Å”anas ievietosim dažus ziņojumus:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Tagad mēģināsim lasÄ«t Å”o straumi caur grupu:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

IepriekÅ” minētā komanda burtiski skan Ŕādi:

"Es, abonente Alise, manas grupas dalÄ«bniece, vēlos izlasÄ«t vienu ziņojumu no manas straumes, kas nekad iepriekÅ” nevienam nav piegādāts."

Katru reizi, kad abonents veic darbÄ«bu grupā, tam ir jānorāda savs nosaukums, unikāli identificējot sevi grupā. IepriekÅ” minētajā komandā ir vēl viena ļoti svarÄ«ga detaļa - Ä«paÅ”ais identifikators ">". Å is Ä«paÅ”ais identifikators filtrē ziņojumus, atstājot tikai tos, kas nekad iepriekÅ” nav piegādāti.

Tāpat Ä«paÅ”os gadÄ«jumos varat norādÄ«t reālu identifikatoru, piemēram, 0 vai jebkuru citu derÄ«gu identifikatoru. Å ajā gadÄ«jumā komanda XREADGROUP atgriezÄ«s jums to ziņojumu vēsturi ar statusu "gaida", kas tika piegādāti norādÄ«tajam abonentam (Alisei), bet vēl nav apstiprināti, izmantojot komandu XACK.

Mēs varam pārbaudÄ«t Å”o darbÄ«bu, nekavējoties norādot ID 0, bez opcijas COUNT. Mēs vienkārÅ”i redzēsim vienu neapstiprinātu ziņojumu, tas ir, Apple ziņojumu:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Tomēr, ja mēs apstiprināsim, ka ziņojums ir veiksmīgi apstrādāts, tas vairs netiks rādīts:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Tagad ir Boba kārta kaut ko lasīt:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bobs, manas grupas dalÄ«bnieks, lÅ«dza ne vairāk kā divas ziņas. Komanda ziņo tikai par nepiegādātajiem ziņojumiem Ä«paŔā identifikatora ">" dēļ. Kā redzat, ziņojums "ābols" netiks parādÄ«ts, jo tas jau ir piegādāts Alisei, tāpēc Bobs saņem "apelsÄ«nu" un "zemeņu".

Tādā veidā Alise, Bobs un jebkurÅ” cits grupas abonents var lasÄ«t dažādus ziņojumus no vienas straumes. Viņi var arÄ« lasÄ«t savu neapstrādāto ziņojumu vēsturi vai atzÄ«mēt ziņojumus kā apstrādātus.

Ir dažas lietas, kas jāpatur prātā:

  • TiklÄ«dz abonents uzskata ziņojumu par komandu XREADGROUP, Å”is ziņojums nonāk stāvoklÄ« ā€œgaidaā€ un tiek pieŔķirts konkrētajam abonentam. Citi grupas abonenti nevarēs izlasÄ«t Å”o ziņojumu.
  • Abonenti tiek automātiski izveidoti pēc pirmās pieminÄ“Å”anas, tie nav Ä«paÅ”i jāizveido.
  • Ar XREADGROUP JÅ«s varat lasÄ«t ziņas no vairākiem dažādiem pavedieniem vienlaikus, taču, lai tas darbotos, vispirms ir jāizveido grupas ar vienādu nosaukumu katram pavedienam, izmantojot XGROUP

AtveseļoÅ”anās pēc neveiksmes

Abonents var atgÅ«ties no kļūmes un atkārtoti izlasÄ«t savu ziņojumu sarakstu ar statusu ā€œgaidaā€. Tomēr reālajā pasaulē abonenti galu galā var neizdoties. Kas notiek ar abonenta iestrēguÅ”ajiem ziņojumiem, ja abonents nevar atgÅ«ties no neveiksmes?
Consumer Group piedāvā iespēju, kas tiek izmantota tieÅ”i tādiem gadÄ«jumiem ā€“ kad jāmaina ziņu Ä«paÅ”nieks.

Pirmā lieta, kas jums jādara, ir izsaukt komandu LÄŖDZEKÄ»I, kurā tiek parādÄ«ti visi grupas ziņojumi ar statusu ā€œgaidaā€. VienkārŔākajā formā komanda tiek izsaukta tikai ar diviem argumentiem: pavediena nosaukumu un grupas nosaukumu:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Komanda parādÄ«ja neapstrādāto ziņojumu skaitu visai grupai un katram abonentam. Mums ir tikai Bobs ar diviem izciliem ziņojumiem, jo ā€‹ā€‹vienÄ«gais ziņojums, ko Alise pieprasÄ«ja, tika apstiprināts XACK.

Mēs varam pieprasīt vairāk informācijas, izmantojot vairāk argumentu:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} ā€” identifikatoru diapazons (varat izmantot ā€œ-ā€ un ā€œ+ā€)
{count} ā€” piegādes mēģinājumu skaits
{consumer-name} ā€” grupas nosaukums

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Tagad mums ir informācija par katru ziņojumu: ID, abonenta vārds, dÄ«kstāves laiks milisekundēs un visbeidzot piegādes mēģinājumu skaits. Mums ir divi ziņojumi no Boba, un tie ir bijuÅ”i dÄ«kstāvē 74170458 milisekundes, aptuveni 20 stundas.

LÅ«dzu, ņemiet vērā, ka neviens neliedz mums pārbaudÄ«t, kāds bija ziņojuma saturs, vienkārÅ”i izmantojot XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mums tikai argumentos divreiz jāatkārto viens un tas pats identifikators. Tagad, kad mums ir zināms priekÅ”stats, Alise varētu nolemt, ka pēc 20 stundu dÄ«kstāves Bobs, visticamāk, neatgÅ«sies, un ir pienācis laiks vaicāt Å”os ziņojumus un atsākt to apstrādi Bobam. Å im nolÅ«kam mēs izmantojam komandu XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Izmantojot Å”o komandu, mēs varam saņemt ā€œÄrzemjuā€ ziņojumu, kas vēl nav apstrādāts, mainot Ä«paÅ”nieku uz {consumer}. Tomēr mēs varam nodroÅ”ināt arÄ« minimālo dÄ«kstāves laiku {min-idle-time}. Tas palÄ«dz izvairÄ«ties no situācijas, kad divi klienti mēģina vienlaikus mainÄ«t vienu un to paÅ”u ziņojumu Ä«paÅ”nieku:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pirmais klients atiestatīs dīkstāves laiku un palielinās piegādes skaitītāju. Tātad otrais klients nevarēs to pieprasīt.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Ziņojumu veiksmīgi pieprasīja Alise, kura tagad var apstrādāt ziņojumu un to apstiprināt.

IepriekÅ” minētajā piemērā var redzēt, ka veiksmÄ«gs pieprasÄ«jums atgriež paÅ”a ziņojuma saturu. Tomēr tas nav nepiecieÅ”ams. Opciju JUSTID var izmantot, lai atgrieztu tikai ziņojumu ID. Tas ir noderÄ«gi, ja jÅ«s neinteresē ziņas informācija un vēlaties palielināt sistēmas veiktspēju.

Piegādes skaitītājs

SkaitÄ«tājs, ko redzat izejā LÄŖDZEKÄ»I ir katra ziņojuma piegādes reižu skaits. Šāds skaitÄ«tājs tiek palielināts divos veidos: kad ziņojums ir veiksmÄ«gi pieprasÄ«ts, izmantojot XCLAIM vai kad tiek izmantots zvans XREADGROUP.

Ir normāli, ka daži ziņojumi tiek piegādāti vairākas reizes. Galvenais ir tas, ka visi ziņojumi galu galā tiek apstrādāti. Dažreiz, apstrādājot ziņojumu, rodas problēmas, jo pats ziņojums ir bojāts vai ziņojuma apstrāde izraisa kļūdu apstrādātāja kodā. Å ajā gadÄ«jumā var izrādÄ«ties, ka neviens nevarēs apstrādāt Å”o ziņojumu. Tā kā mums ir piegādes mēģinājumu skaitÄ«tājs, mēs varam izmantot Å”o skaitÄ«tāju, lai noteiktu Ŕādas situācijas. Tāpēc, tiklÄ«dz piegādes skaits sasniedz jÅ«su norādÄ«to lielo skaitu, iespējams, bÅ«tu prātÄ«gāk ievietot Ŕādu ziņojumu citā pavedienā un nosÅ«tÄ«t paziņojumu sistēmas administratoram.

Vītnes stāvoklis

Komanda XINFO izmanto, lai pieprasÄ«tu dažādu informāciju par pavedienu un tā grupām. Piemēram, pamata komanda izskatās Ŕādi:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

IepriekÅ” esoŔā komanda parāda vispārÄ«gu informāciju par norādÄ«to straumi. Tagad nedaudz sarežģītāks piemērs:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

IepriekŔ esoŔā komanda parāda vispārīgu informāciju par visām norādītā pavediena grupām

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

IepriekŔ esoŔā komanda parāda informāciju par visiem norādītās straumes un grupas abonentiem.
Ja esat aizmirsis komandas sintaksi, vienkārŔi lūdziet palīdzību paŔai komandai:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Straumes lieluma ierobežojums

Daudzas lietojumprogrammas nevēlas mūžīgi apkopot datus straumē. Bieži vien ir lietderīgi iestatīt maksimālo atļauto ziņojumu skaitu vienā pavedienā. Citos gadījumos ir lietderīgi pārvietot visus ziņojumus no pavediena uz citu pastāvīgo krātuvi, kad ir sasniegts norādītais pavediena lielums. Varat ierobežot straumes lielumu, izmantojot komandas parametru MAXLEN XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Lietojot MAXLEN, vecie ieraksti tiek automātiski dzēsti, kad tie sasniedz noteiktu garumu, tāpēc straumei ir nemainÄ«gs izmērs. Tomēr atzaroÅ”ana Å”ajā gadÄ«jumā Redis atmiņā nenotiek visefektÄ«vākajā veidā. JÅ«s varat uzlabot situāciju Ŕādi:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Arguments ~ iepriekÅ” minētajā piemērā nozÄ«mē, ka mums nav obligāti jāierobežo straumes garums lÄ«dz noteiktai vērtÄ«bai. MÅ«su piemērā tas varētu bÅ«t jebkurÅ” skaitlis, kas ir lielāks par 1000 vai vienāds ar to (piemēram, 1000, 1010 vai 1030). Mēs tikko skaidri norādÄ«jām, ka vēlamies, lai mÅ«su straumē bÅ«tu vismaz 1000 ierakstu. Tas padara atmiņas pārvaldÄ«bu daudz efektÄ«vāku Redis iekÅ”ienē.

Ir arī atseviŔķa komanda XTRIM, kas dara to paŔu:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Pastāvīga glabāŔana un replikācija

Redis Stream tiek asinhroni replicēts pakārtotajos mezglos un saglabāts tādos failos kā AOF (visu datu momentuzņēmums) un RDB (visu rakstÄ«Å”anas darbÄ«bu žurnāls). Tiek atbalstÄ«ta arÄ« patērētāju grupu stāvokļa replikācija. Tāpēc, ja ziņojuma statuss ir ā€œgaidaā€ galvenajā mezglā, tad pakārtotajos mezglos Å”im ziņojumam bÅ«s tāds pats statuss.

AtseviŔķu elementu noņemÅ”ana no straumes

Ir Ä«paÅ”a komanda ziņojumu dzÄ“Å”anai XDEL. Komanda iegÅ«st pavediena nosaukumu, kam seko dzÄ“Å”amo ziņojumu ID:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Lietojot Å”o komandu, jāņem vērā, ka faktiskā atmiņa netiks atbrÄ«vota uzreiz.

Nulles garuma straumes

AtŔķirÄ«ba starp straumēm un citām Redis datu struktÅ«rām ir tāda, ka tad, kad citās datu struktÅ«rās vairs nav elementu, kā blakusparādÄ«ba pati datu struktÅ«ra tiks noņemta no atmiņas. Tātad, piemēram, sakārtotā kopa tiks pilnÄ«bā noņemta, kad ZREM izsaukums noņems pēdējo elementu. Tā vietā pavedieniem ir atļauts palikt atmiņā pat bez elementiem.

Secinājums

Redis Stream ir ideāli piemērots ziņojumu brokeru, ziņojumu rindu, vienotas reÄ£istrÄ“Å”anas un vēstures saglabāŔanas tērzÄ“Å”anas sistēmu izveidei.

Kā jau reiz teicu Niklauss Virts, programmas ir algoritmi plus datu struktūras, un Redis jau sniedz jums abus.

Avots: www.habr.com

Pievieno komentāru