Redis Stream - besueshmëria dhe shkallëzueshmëria e sistemeve tuaja të mesazheve

Redis Stream - besueshmëria dhe shkallëzueshmëria e sistemeve tuaja të mesazheve

Redis Stream është një lloj i ri i të dhënave abstrakte i prezantuar në Redis me versionin 5.0
Konceptualisht, Redis Stream është një listë në të cilën mund të shtoni hyrje. Çdo hyrje ka një identifikues unik. Si parazgjedhje, ID-ja gjenerohet automatikisht dhe përfshin një vulë kohore. Prandaj, ju mund të kërkoni një varg të dhënash me kalimin e kohës, ose të merrni të dhëna të reja kur ato mbërrin në transmetim, ashtu si komanda Unix "tail -f" lexon një skedar log dhe ngrin ndërsa pret për të dhëna të reja. Vini re se klientë të shumtë mund të dëgjojnë një thread në të njëjtën kohë, po aq shumë procese "tail -f" mund të lexojnë një skedar në të njëjtën kohë pa u konfliktuar me njëri-tjetrin.

Për të kuptuar të gjitha përfitimet e llojit të ri të të dhënave, le t'i hedhim një vështrim të shpejtë strukturave ekzistuese të Redis që pjesërisht përsërisin funksionalitetin e Redis Stream.

Redis PUB/SUB

Redis Pub/Sub është një sistem i thjeshtë mesazhesh i integruar tashmë në dyqanin tuaj me vlerë kyçe. Sidoqoftë, thjeshtësia ka një çmim:

  • Nëse botuesi për ndonjë arsye dështon, atëherë ai humbet të gjithë pajtimtarët e tij
  • Botuesi duhet të dijë adresën e saktë të të gjithë abonentëve të tij
  • Një botues mund të mbingarkojë abonentët e tij me punë nëse të dhënat publikohen më shpejt sesa përpunohen
  • Mesazhi fshihet nga buferi i botuesit menjëherë pas publikimit, pavarësisht sa abonentëve u dërgua dhe sa shpejt mundën ta përpunonin këtë mesazh.
  • Të gjithë abonentët do të marrin mesazhin në të njëjtën kohë. Vetë pajtimtarët duhet disi të bien dakord mes tyre për rendin e përpunimit të të njëjtit mesazh.
  • Nuk ka asnjë mekanizëm të integruar për të konfirmuar që një pajtimtar ka përpunuar me sukses një mesazh. Nëse një pajtimtar merr një mesazh dhe rrëzohet gjatë përpunimit, botuesi nuk do të dijë për të.

Lista Redis

Lista Redis është një strukturë të dhënash që mbështet bllokimin e komandave të leximit. Ju mund të shtoni dhe lexoni mesazhe nga fillimi ose fundi i listës. Bazuar në këtë strukturë, ju mund të krijoni një pirg ose radhë të mirë për sistemin tuaj të shpërndarë, dhe në shumicën e rasteve kjo do të jetë e mjaftueshme. Dallimet kryesore nga Redis Pub/Sub:

  • Mesazhi i dërgohet një klienti. Klienti i parë i bllokuar nga leximi do të marrë së pari të dhënat.
  • Clint duhet të fillojë vetë operacionin e leximit për çdo mesazh. Lista nuk di asgjë për klientët.
  • Mesazhet ruhen derisa dikush t'i lexojë ose t'i fshijë në mënyrë të qartë. Nëse konfiguroni serverin Redis për të hedhur të dhënat në disk, atëherë besueshmëria e sistemit rritet në mënyrë dramatike.

Hyrje në Stream

Shtimi i një hyrjeje në një transmetim

Ekip XADD shton një hyrje të re në transmetim. Një rekord nuk është thjesht një varg, ai përbëhet nga një ose më shumë çifte çelës-vlerë. Kështu, çdo hyrje është tashmë e strukturuar dhe i ngjan strukturës së një skedari CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Në shembullin e mësipërm, ne shtojmë dy fusha në transmetim me emrin (çelësin) "mystream": "sensor-id" dhe "temperature" me vlerat "1234" dhe "19.8", përkatësisht. Si argument i dytë, komanda merr një identifikues që do t'i caktohet hyrjes - ky identifikues identifikon në mënyrë unike çdo hyrje në rrjedhë. Megjithatë, në këtë rast kaluam * sepse duam që Redis të gjenerojë një ID të re për ne. Çdo ID e re do të rritet. Prandaj, çdo hyrje e re do të ketë një identifikues më të lartë në raport me hyrjet e mëparshme.

Formati i identifikimit

ID-ja e hyrjes e kthyer nga komanda XADD, përbëhet nga dy pjesë:

{millisecondsTime}-{sequenceNumber}

milisekonda kohë — Koha Unix në milisekonda (koha e serverit Redis). Megjithatë, nëse ora aktuale është e njëjtë ose më e vogël se koha e regjistrimit të mëparshëm, atëherë përdoret vula kohore e regjistrimit të mëparshëm. Prandaj, nëse koha e serverit kthehet prapa në kohë, identifikuesi i ri do të vazhdojë të ruajë vetinë e rritjes.

sekuencaNumër përdoret për regjistrimet e krijuara në të njëjtin milisekondë. sekuencaNumër do të rritet me 1 në krahasim me hyrjen e mëparshme. Sepse sekuencaNumër është 64 bit në madhësi, atëherë në praktikë nuk duhet të hasni në një kufi në numrin e regjistrimeve që mund të gjenerohen brenda një milisekondi.

Formati i identifikuesve të tillë mund të duket i çuditshëm në shikim të parë. Një lexues mosbesues mund të pyesë veten pse koha është pjesë e identifikuesit. Arsyeja është se transmetimet Redis mbështesin pyetjet e gamës me ID. Meqenëse identifikuesi lidhet me kohën kur u krijua rekordi, kjo bën të mundur kërkimin e intervaleve kohore. Ne do të shohim një shembull specifik kur të shikojmë komandën XRANGE.

Nëse për ndonjë arsye përdoruesi duhet të specifikojë identifikuesin e tij, i cili, për shembull, është i lidhur me ndonjë sistem të jashtëm, atëherë ne mund ta kalojmë atë në komandë XADD në vend të * siç tregohet më poshtë:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Ju lutemi vini re se në këtë rast duhet të monitoroni vetë rritjen e ID-së. Në shembullin tonë, identifikuesi minimal është "0-1", kështu që komanda nuk do të pranojë një identifikues tjetër që është i barabartë ose më i vogël se "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Numri i regjistrimeve për transmetim

Është e mundur të merret numri i regjistrimeve në një transmetim thjesht duke përdorur komandën XLEN. Për shembullin tonë, kjo komandë do të kthejë vlerën e mëposhtme:

> XLEN somestream
(integer) 2

Pyetjet e gamës - XRANGE dhe XREVRANGE

Për të kërkuar të dhëna sipas diapazonit, duhet të specifikojmë dy identifikues - fillimin dhe fundin e diapazonit. Gama e kthyer do të përfshijë të gjithë elementët, duke përfshirë kufijtë. Ekzistojnë gjithashtu dy identifikues të veçantë "-" dhe "+", përkatësisht që do të thotë identifikuesi më i vogël (rekord i parë) dhe më i madhi (rekord i fundit) në rrjedhë. Shembulli më poshtë do të listojë të gjitha hyrjet e transmetimit.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Çdo rekord i kthyer është një grup prej dy elementësh: një identifikues dhe një listë çiftesh çelës-vlerë. Ne thamë tashmë se identifikuesit e rekordit lidhen me kohën. Prandaj, ne mund të kërkojmë një gamë të një periudhe të caktuar kohore. Megjithatë, ne mund të specifikojmë në kërkesë jo identifikuesin e plotë, por vetëm kohën e Unix-it, duke hequr pjesën që lidhet me sekuencaNumër. Pjesa e hequr e identifikuesit do të vendoset automatikisht në zero në fillim të diapazonit dhe në vlerën maksimale të mundshme në fund të diapazonit. Më poshtë është një shembull se si mund të kërkoni një interval prej dy milisekondash.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Ne kemi vetëm një hyrje në këtë gamë, megjithatë në grupet reale të të dhënave rezultati i kthyer mund të jetë i madh. Per kete arsye XRANGE mbështet opsionin COUNT. Duke specifikuar sasinë, ne thjesht mund të marrim regjistrimet e para N. Nëse na duhet të marrim regjistrimet e radhës N (faqezim), mund të përdorim ID-në e fundit të marrë, ta rrisim atë sekuencaNumër nga një dhe pyesni përsëri. Le ta shohim këtë në shembullin e mëposhtëm. Fillojmë të shtojmë 10 elementë me XADD (duke supozuar se mystream ishte mbushur tashmë me 10 elementë). Për të filluar përsëritjen duke marrë 2 elementë për komandë, fillojmë me gamën e plotë, por me COUNT të barabartë me 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Për të vazhduar përsëritjen me dy elementët vijues, duhet të zgjedhim ID-në e fundit të marrë, d.m.th. 1519073279157-0, dhe të shtojmë 1 në sekuencaNumër.
ID-ja që rezulton, në këtë rast 1519073279157-1, tani mund të përdoret si argumenti i ri i fillimit të intervalit për thirrjen tjetër XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Dhe kështu me radhë. Për shkak të kompleksitetit XRANGE është O(log(N)) për të kërkuar dhe më pas O(M) për të kthyer M elementë, atëherë çdo hap përsëritjeje është i shpejtë. Kështu, duke përdorur XRANGE rrymat mund të përsëriten në mënyrë efikase.

Ekip XREVRANGE është ekuivalenti XRANGE, por i kthen elementet në rend të kundërt:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Ju lutemi vini re se komanda XREVRANGE merr argumentet e diapazonit fillimin dhe ndalimin në rend të kundërt.

Leximi i hyrjeve të reja duke përdorur XREAD

Shpesh lind detyra për t'u abonuar në një transmetim dhe për të marrë vetëm mesazhe të reja. Ky koncept mund të duket i ngjashëm me Redis Pub/Sub ose bllokimin e Listës Redis, por ka dallime thelbësore në mënyrën e përdorimit të Redis Stream:

  1. Çdo mesazh i ri i dërgohet çdo abonenti si parazgjedhje. Kjo sjellje është e ndryshme nga një listë bllokuese Redis, ku një mesazh i ri do të lexohet vetëm nga një pajtimtar.
  2. Ndërsa në Redis Pub/Sub të gjitha mesazhet harrohen dhe nuk vazhdojnë kurrë, në Stream të gjitha mesazhet mbahen për një kohë të pacaktuar (përveç nëse klienti shkakton fshirje në mënyrë të qartë).
  3. Redis Stream ju lejon të dalloni aksesin në mesazhe brenda një transmetimi. Një pajtimtar specifik mund të shohë vetëm historinë e tij personale të mesazheve.

Ju mund të regjistroheni në një temë dhe të merrni mesazhe të reja duke përdorur komandën XREAD. Është pak më e komplikuar se XRANGE, kështu që fillimisht do të fillojmë me shembujt më të thjeshtë.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Shembulli i mësipërm tregon një formë jo-bllokuese XREAD. Vini re se opsioni COUNT është opsional. Në fakt, i vetmi opsion i kërkuar i komandës është opsioni STREAMS, i cili specifikon një listë të rrymave së bashku me identifikuesin maksimal përkatës. Ne kemi shkruar "STREAMS mystream 0" - duam të marrim të gjitha regjistrimet e transmetimit mystream me një identifikues më të madh se "0-0". Siç mund ta shihni nga shembulli, komanda kthen emrin e thread-it sepse ne mund të regjistrohemi në shumë thread në të njëjtën kohë. Mund të shkruajmë, për shembull, "STREAMS mystream otherstream 0 0". Ju lutemi vini re se pas opsionit STREAMS ne fillimisht duhet të japim emrat e të gjitha transmetimeve të kërkuara dhe vetëm më pas një listë identifikuesish.

Në këtë formë të thjeshtë komanda nuk bën asgjë të veçantë në krahasim me XRANGE. Megjithatë, gjëja interesante është se ne mund të kthehemi lehtësisht XREAD në një komandë bllokimi, duke specifikuar argumentin BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Në shembullin e mësipërm, një opsion i ri BLOCK specifikohet me një afat kohor prej 0 milisekonda (kjo do të thotë të presësh pafundësisht). Për më tepër, në vend që të kalonte identifikuesin e zakonshëm për transmetimin mystream, u kalua një identifikues i veçantë $. Ky identifikues special do të thotë se XREAD duhet të përdorë identifikuesin maksimal në mystream si identifikues. Pra, ne do të marrim vetëm mesazhe të reja duke filluar nga momenti kur filluam të dëgjojmë. Në disa mënyra kjo është e ngjashme me komandën Unix "tail -f".

Vini re se kur përdorim opsionin BLOCK nuk kemi nevojë të përdorim domosdoshmërisht identifikuesin special $. Ne mund të përdorim çdo identifikues që ekziston në transmetim. Nëse ekipi mund të shërbejë menjëherë kërkesën tonë pa bllokuar, ai do ta bëjë këtë, përndryshe do të bllokojë.

Bllokimi XREAD gjithashtu mund të dëgjojnë tema të shumta në të njëjtën kohë, ju vetëm duhet të specifikoni emrat e tyre. Në këtë rast, komanda do të kthejë një rekord të rrymës së parë që ka marrë të dhëna. Abonenti i parë i bllokuar për një bashkëbisedim të caktuar do të marrë së pari të dhënat.

Grupet e Konsumatorëve

Në detyra të caktuara, ne duam të kufizojmë aksesin e abonentëve në mesazhe brenda një teme. Një shembull ku kjo mund të jetë e dobishme është një radhë mesazhesh me punëtorë që do të marrin mesazhe të ndryshme nga një thread, duke lejuar përpunimin e mesazhit të shkallëzohet.

Nëse imagjinojmë se kemi tre abonentë C1, C2, C3 dhe një thread që përmban mesazhet 1, 2, 3, 4, 5, 6, 7, atëherë mesazhet do të shërbehen si në diagramin më poshtë:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Për të arritur këtë efekt, Redis Stream përdor një koncept të quajtur Grupi i Konsumatorëve. Ky koncept është i ngjashëm me një pseudo-abonent, i cili merr të dhëna nga një transmetim, por në fakt shërbehet nga abonentë të shumtë brenda një grupi, duke ofruar garanci të caktuara:

  1. Çdo mesazh i dërgohet një abonenti të ndryshëm brenda grupit.
  2. Brenda një grupi, abonentët identifikohen me emrin e tyre, i cili është një varg i ndjeshëm ndaj shkronjave të vogla. Nëse një pajtimtar del përkohësisht nga grupi, ai mund të rikthehet në grup duke përdorur emrin e tij unik.
  3. Çdo Grup Konsumatorësh ndjek konceptin e "mesazhit të parë të palexuar". Kur një pajtimtar kërkon mesazhe të reja, ai mund të marrë vetëm mesazhe që nuk i janë dorëzuar kurrë më parë ndonjë pajtimtari brenda grupit.
  4. Ekziston një komandë për të konfirmuar në mënyrë eksplicite që mesazhi është përpunuar me sukses nga pajtimtari. Derisa të thirret kjo komandë, mesazhi i kërkuar do të mbetet në statusin "në pritje".
  5. Brenda Grupit të Konsumatorëve, çdo pajtimtar mund të kërkojë një histori të mesazheve që i janë dorëzuar, por nuk janë përpunuar ende (në statusin "në pritje")

Në një farë kuptimi, gjendja e grupit mund të shprehet si më poshtë:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Tani është koha për t'u njohur me komandat kryesore për Grupin e Konsumatorëve, përkatësisht:

  • XGRUPI përdoret për të krijuar, shkatërruar dhe menaxhuar grupe
  • XREADGRUPI përdoret për të lexuar transmetimin përmes grupit
  • XACK - kjo komandë i mundëson pajtimtarit të shënojë mesazhin si të përpunuar me sukses

Krijimi i Grupit të Konsumatorëve

Le të supozojmë se mystream tashmë ekziston. Pastaj komanda e krijimit të grupit do të duket si kjo:

> XGROUP CREATE mystream mygroup $
OK

Kur krijojmë një grup, duhet të kalojmë një identifikues, nga i cili grupi do të marrë mesazhe. Nëse thjesht duam të marrim të gjitha mesazhet e reja, atëherë mund të përdorim identifikuesin special $ (si në shembullin tonë më lart). Nëse specifikoni 0 në vend të një identifikuesi të veçantë, atëherë të gjitha mesazhet në thread do të jenë të disponueshme për grupin.

Tani që grupi është krijuar, ne mund të fillojmë menjëherë të lexojmë mesazhe duke përdorur komandën XREADGRUPI. Kjo komandë është shumë e ngjashme me XREAD dhe mbështet opsionin opsional BLOCK. Megjithatë, ekziston një opsion i kërkuar GROUP që duhet të specifikohet gjithmonë me dy argumente: emrin e grupit dhe emrin e pajtimtarit. Opsioni COUNT mbështetet gjithashtu.

Përpara se të lexojmë temën, le të vendosim disa mesazhe atje:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Tani le të përpiqemi ta lexojmë këtë transmetim përmes grupit:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Komanda e mësipërme lexohet fjalë për fjalë si më poshtë:

"Unë, abonentja Alice, një anëtare e grupit tim, dua të lexoj një mesazh nga mystream që nuk i është dorëzuar askujt më parë."

Sa herë që një pajtimtar kryen një operacion në një grup, ai duhet të japë emrin e tij, duke u identifikuar në mënyrë unike brenda grupit. Ekziston edhe një detaj shumë i rëndësishëm në komandën e mësipërme - identifikuesi special ">". Ky identifikues i veçantë filtron mesazhet, duke lënë vetëm ato që nuk janë dorëzuar kurrë më parë.

Gjithashtu, në raste të veçanta, mund të specifikoni një identifikues real si 0 ose ndonjë identifikues tjetër të vlefshëm. Në këtë rast komanda XREADGRUPI do t'ju kthejë një histori të mesazheve me një status "në pritje" që i janë dorëzuar pajtimtarit të specifikuar (Alice), por nuk janë pranuar ende duke përdorur komandën XACK.

Ne mund ta testojmë këtë sjellje duke specifikuar menjëherë ID 0, pa opsionin COUNT. Ne thjesht do të shohim një mesazh të vetëm në pritje, domethënë mesazhin e mollës:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Megjithatë, nëse konfirmojmë se mesazhi është përpunuar me sukses, atëherë ai nuk do të shfaqet më:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Tani është radha e Bobit të lexojë diçka:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, një anëtar i mygroup, kërkoi jo më shumë se dy mesazhe. Komanda raporton vetëm mesazhe të padorëzuara për shkak të identifikuesit special ">". Siç mund ta shihni, mesazhi "mollë" nuk do të shfaqet pasi tashmë i është dorëzuar Alice, kështu që Bob merr "portokalli" dhe "luleshtrydhe".

Në këtë mënyrë, Alice, Bob dhe çdo pajtimtar tjetër në grup mund të lexojnë mesazhe të ndryshme nga i njëjti transmetim. Ata gjithashtu mund të lexojnë historinë e tyre të mesazheve të papërpunuara ose të shënojnë mesazhe si të përpunuara.

Ka disa gjëra që duhen mbajtur parasysh:

  • Sapo abonenti e konsideron mesazhin si komandë XREADGRUPI, ky mesazh shkon në gjendjen "në pritje" dhe i caktohet atij pajtimtari specifik. Abonentët e tjerë të grupit nuk do të mund ta lexojnë këtë mesazh.
  • Abonentët krijohen automatikisht me përmendjen e parë, nuk ka nevojë t'i krijoni në mënyrë eksplicite.
  • Me XREADGRUPI ju mund të lexoni mesazhe nga shumë tema të ndryshme në të njëjtën kohë, megjithatë që kjo të funksionojë, fillimisht duhet të krijoni grupe me të njëjtin emër për secilën temë duke përdorur XGRUPI

Rimëkëmbja pas një dështimi

Abonenti mund të shërohet nga dështimi dhe të rilexojë listën e tij të mesazheve me statusin "në pritje". Sidoqoftë, në botën reale, abonentët përfundimisht mund të dështojnë. Çfarë ndodh me mesazhet e bllokuara të një pajtimtari nëse pajtimtari nuk është në gjendje të rikuperohet nga një dështim?
Grupi i Konsumatorëve ofron një veçori që përdoret pikërisht për raste të tilla - kur duhet të ndryshoni pronarin e mesazheve.

Gjëja e parë që duhet të bëni është të telefononi komandën NË PRERJE, i cili shfaq të gjitha mesazhet në grup me statusin "në pritje". Në formën e saj më të thjeshtë, komanda thirret vetëm me dy argumente: emrin e fillit dhe emrin e grupit:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Ekipi shfaqi numrin e mesazheve të papërpunuara për të gjithë grupin dhe për çdo pajtimtar. Ne kemi vetëm Bobin me dy mesazhe të pazgjidhura sepse mesazhi i vetëm që kërkoi Alice u konfirmua XACK.

Ne mund të kërkojmë më shumë informacion duke përdorur më shumë argumente:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - diapazoni i identifikuesve (mund të përdorni "-" dhe "+")
{count} - numri i përpjekjeve për dorëzim
{consumer-name} - emri i grupit

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Tani kemi detaje për çdo mesazh: ID-ja, emri i pajtimtarit, koha e papunësisë në milisekonda dhe së fundi numri i përpjekjeve për dërgim. Kemi dy mesazhe nga Bob dhe ata kanë qenë të papunë për 74170458 milisekonda, rreth 20 orë.

Ju lutemi vini re se askush nuk po na ndalon të kontrollojmë se çfarë ishte përmbajtja e mesazhit thjesht duke përdorur XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Thjesht duhet të përsërisim të njëjtin identifikues dy herë në argumente. Tani që kemi një ide, Alice mund të vendosë që pas 20 orësh pushimi, Bob ndoshta nuk do të shërohet dhe është koha për t'i kërkuar ato mesazhe dhe për të rifilluar përpunimin e tyre për Bob. Për këtë përdorim komandën XPREJTIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Duke përdorur këtë komandë, ne mund të marrim një mesazh "të huaj" që ende nuk është përpunuar duke ndryshuar pronarin në {consumer}. Megjithatë, ne mund të ofrojmë gjithashtu një kohë minimale të papunësisë {min-idle-time}. Kjo ndihmon për të shmangur një situatë ku dy klientë përpiqen të ndryshojnë njëkohësisht pronarin e të njëjtave mesazhe:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Klienti i parë do të rivendosë kohën e ndërprerjes dhe do të rrisë sportelin e dorëzimit. Kështu që klienti i dytë nuk do të jetë në gjendje ta kërkojë atë.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mesazhi u pretendua me sukses nga Alice, e cila tani mund ta përpunojë mesazhin dhe ta pranojë atë.

Nga shembulli i mësipërm, mund të shihni se një kërkesë e suksesshme kthen përmbajtjen e vetë mesazhit. Megjithatë, kjo nuk është e nevojshme. Opsioni JUSTID mund të përdoret vetëm për të kthyer ID-të e mesazheve. Kjo është e dobishme nëse nuk jeni të interesuar për detajet e mesazhit dhe dëshironi të rrisni performancën e sistemit.

Sporteli i dorëzimit

Numëruesi që shihni në dalje NË PRERJE është numri i dërgesave të çdo mesazhi. Një numërues i tillë rritet në dy mënyra: kur një mesazh kërkohet me sukses nëpërmjet XPREJTIM ose kur përdoret një telefonatë XREADGRUPI.

Është normale që disa mesazhe të dërgohen disa herë. Gjëja kryesore është që të gjitha mesazhet të përpunohen përfundimisht. Ndonjëherë shfaqen probleme gjatë përpunimit të një mesazhi sepse vetë mesazhi është i dëmtuar ose përpunimi i mesazhit shkakton një gabim në kodin e mbajtësit. Në këtë rast, mund të rezultojë se askush nuk do të jetë në gjendje ta përpunojë këtë mesazh. Meqenëse kemi një numërues të përpjekjeve për dorëzim, mund ta përdorim këtë numërues për të zbuluar situata të tilla. Prandaj, pasi numri i dërgesave të arrijë numrin e lartë që specifikoni, ndoshta do të ishte më e mençur të vendosni një mesazh të tillë në një temë tjetër dhe t'i dërgoni një njoftim administratorit të sistemit.

Gjendja e fillit

Ekip XINFO përdoret për të kërkuar informacione të ndryshme rreth një thread dhe grupeve të tij. Për shembull, një komandë bazë duket si kjo:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Komanda e mësipërme shfaq informacion të përgjithshëm në lidhje me rrymën e specifikuar. Tani një shembull pak më kompleks:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Komanda e mësipërme shfaq informacion të përgjithshëm për të gjitha grupet e thread-it të specifikuar

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Komanda e mësipërme shfaq informacion për të gjithë pajtimtarët e transmetimit dhe grupit të specifikuar.
Nëse harroni sintaksën e komandës, thjesht kërkoni ndihmë nga vetë komanda:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Kufiri i madhësisë së transmetimit

Shumë aplikacione nuk duan të mbledhin të dhëna në një transmetim përgjithmonë. Shpesh është e dobishme të kesh një numër maksimal mesazhesh të lejuara për thread. Në raste të tjera, është e dobishme që të zhvendosen të gjitha mesazhet nga një thread në një dyqan tjetër të vazhdueshëm kur arrihet madhësia e specifikuar e fillit. Ju mund të kufizoni madhësinë e një transmetimi duke përdorur parametrin MAXLEN në komandë XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kur përdorni MAXLEN, regjistrimet e vjetra fshihen automatikisht kur arrijnë një gjatësi të caktuar, kështu që transmetimi ka një madhësi konstante. Megjithatë, krasitja në këtë rast nuk ndodh në mënyrën më efikase në kujtesën Redis. Ju mund ta përmirësoni situatën si më poshtë:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argumenti ~ në shembullin e mësipërm do të thotë se nuk kemi nevojë të kufizojmë gjatësinë e transmetimit në një vlerë specifike. Në shembullin tonë, ky mund të jetë çdo numër më i madh ose i barabartë me 1000 (për shembull, 1000, 1010 ose 1030). Sapo specifikuam në mënyrë eksplicite se duam që transmetimi ynë të ruajë të paktën 1000 regjistrime. Kjo e bën menaxhimin e kujtesës shumë më efikas brenda Redis.

Ekziston edhe një ekip i veçantë XTRIM, e cila bën të njëjtën gjë:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Ruajtja dhe përsëritja e vazhdueshme

Redis Stream përsëritet në mënyrë asinkrone në nyjet skllav dhe ruhet në skedarë si AOF (fotografi e të gjitha të dhënave) dhe RDB (regjistri i të gjitha operacioneve të shkrimit). Replikimi i gjendjes së Grupeve të Konsumatorëve është gjithashtu i mbështetur. Prandaj, nëse një mesazh është në statusin "në pritje" në nyjen kryesore, atëherë në nyjet skllav ky mesazh do të ketë të njëjtin status.

Heqja e elementeve individuale nga një rrymë

Ekziston një komandë e veçantë për të fshirë mesazhet XDEL. Komanda merr emrin e fillit të ndjekur nga ID-të e mesazheve që do të fshihen:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kur përdorni këtë komandë, duhet të keni parasysh që memoria aktuale nuk do të lëshohet menjëherë.

Rrjedhat me gjatësi zero

Dallimi midis streams dhe strukturave të tjera të të dhënave Redis është se kur strukturat e tjera të të dhënave nuk kanë më elementë brenda tyre, si një efekt anësor, vetë struktura e të dhënave do të hiqet nga memoria. Kështu, për shembull, grupi i renditur do të hiqet plotësisht kur thirrja ZREM heq elementin e fundit. Në vend të kësaj, fijet lejohen të mbeten në kujtesë edhe pa pasur ndonjë element brenda.

Përfundim

Redis Stream është ideal për krijimin e ndërmjetësve të mesazheve, radhëve të mesazheve, regjistrimeve të unifikuara dhe sistemeve të bisedës për ruajtjen e historisë.

Siç thashë dikur Niklaus Wirth, programet janë algoritme plus struktura të dhënash, dhe Redis tashmë ju jep të dyja.

Burimi: www.habr.com

Shto një koment