Redis Stream - pagiging maaasahan at scalability ng iyong mga system sa pagmemensahe

Redis Stream - pagiging maaasahan at scalability ng iyong mga system sa pagmemensahe

Ang Redis Stream ay isang bagong abstract na uri ng data na ipinakilala sa Redis na may bersyon 5.0
Sa konsepto, ang Redis Stream ay isang Listahan kung saan maaari kang magdagdag ng mga entry. Ang bawat entry ay may natatanging identifier. Bilang default, awtomatikong nabubuo ang ID at may kasamang timestamp. Samakatuwid, maaari kang mag-query ng mga hanay ng mga tala sa paglipas ng panahon, o tumanggap ng bagong data sa pagdating nito sa stream, katulad ng pagbabasa ng utos ng Unix na "tail -f" ng isang log file at nag-freeze habang naghihintay ng bagong data. Tandaan na maraming kliyente ang maaaring makinig sa isang thread nang sabay-sabay, tulad ng maraming proseso ng "tail -f" na makakapagbasa ng file nang sabay-sabay nang hindi sumasalungat sa isa't isa.

Upang maunawaan ang lahat ng mga benepisyo ng bagong uri ng data, tingnan natin ang matagal nang umiiral na mga istruktura ng Redis na bahagyang ginagaya ang functionality ng Redis Stream.

Redis PUB/SUB

Ang Redis Pub/Sub ay isang simpleng messaging system na naka-built na sa iyong key-value store. Gayunpaman, ang pagiging simple ay may halaga:

  • Kung nabigo ang publisher sa ilang kadahilanan, mawawala ang lahat ng kanyang mga subscriber
  • Kailangang malaman ng publisher ang eksaktong address ng lahat ng subscriber nito
  • Maaaring mag-overload ng trabaho ang isang publisher sa mga subscriber nito kung ang data ay nai-publish nang mas mabilis kaysa sa naproseso
  • Ang mensahe ay tinanggal mula sa buffer ng publisher kaagad pagkatapos ma-publish, hindi alintana kung gaano karaming mga subscriber ito naihatid at kung gaano kabilis nila naproseso ang mensaheng ito.
  • Ang lahat ng mga subscriber ay makakatanggap ng mensahe sa parehong oras. Ang mga subscriber mismo ay dapat magkasundo sa kanilang mga sarili sa pagkakasunud-sunod ng pagproseso ng parehong mensahe.
  • Walang built-in na mekanismo upang kumpirmahin na matagumpay na naproseso ng isang subscriber ang isang mensahe. Kung ang isang subscriber ay nakatanggap ng mensahe at nag-crash habang pinoproseso, hindi malalaman ng publisher ang tungkol dito.

Listahan ng Redis

Ang Redis List ay isang istraktura ng data na sumusuporta sa pagharang sa mga read command. Maaari kang magdagdag at magbasa ng mga mensahe mula sa simula o dulo ng listahan. Batay sa istrukturang ito, makakagawa ka ng magandang stack o queue para sa iyong distributed system, at sa karamihan ng mga kaso, ito ay magiging sapat na. Mga pangunahing pagkakaiba mula sa Redis Pub/Sub:

  • Ang mensahe ay inihahatid sa isang kliyente. Ang unang na-read-block na kliyente ay unang makakatanggap ng data.
  • Dapat simulan ni Clint ang read operation para sa bawat mensahe mismo. Walang alam ang listahan tungkol sa mga kliyente.
  • Iniimbak ang mga mensahe hanggang sa may magbasa sa kanila o tahasang tanggalin ang mga ito. Kung iko-configure mo ang Redis server upang i-flush ang data sa disk, ang pagiging maaasahan ng system ay tumataas nang husto.

Panimula sa Stream

Pagdaragdag ng entry sa isang stream

Koponan XADD nagdaragdag ng bagong entry sa stream. Ang record ay hindi lamang isang string, ito ay binubuo ng isa o higit pang key-value pairs. Kaya, ang bawat entry ay nakabalangkas na at kahawig ng istraktura ng isang CSV file.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Sa halimbawa sa itaas, nagdaragdag kami ng dalawang field sa stream na may pangalan (key) na "mystream": "sensor-id" at "temperatura" na may mga value na "1234" at "19.8", ayon sa pagkakabanggit. Bilang pangalawang argumento, ang command ay kumukuha ng isang identifier na itatalaga sa entry - ang identifier na ito ay natatanging kinikilala ang bawat entry sa stream. Gayunpaman, sa kasong ito, ipinasa namin ang * dahil gusto naming bumuo si Redis ng bagong ID para sa amin. Ang bawat bagong ID ay tataas. Samakatuwid, ang bawat bagong entry ay magkakaroon ng mas mataas na identifier kaugnay ng mga nakaraang entry.

Format ng pagkakakilanlan

Ang entry ID ay ibinalik ng command XADD, ay binubuo ng dalawang bahagi:

{millisecondsTime}-{sequenceNumber}

millisecondOras β€” Unix time sa milliseconds (Redis server time). Gayunpaman, kung ang kasalukuyang oras ay pareho o mas mababa kaysa sa oras ng nakaraang pag-record, ang timestamp ng nakaraang pag-record ay gagamitin. Samakatuwid, kung babalik ang oras ng server sa nakaraan, pananatilihin pa rin ng bagong identifier ang increment property.

sequenceNumber ginagamit para sa mga record na ginawa sa parehong millisecond. sequenceNumber ay tataas ng 1 kaugnay sa nakaraang entry. Dahil ang sequenceNumber ay 64 bits ang laki, kung gayon sa pagsasanay ay hindi ka dapat magkaroon ng limitasyon sa bilang ng mga tala na maaaring mabuo sa loob ng isang millisecond.

Ang format ng mga naturang identifier ay maaaring mukhang kakaiba sa unang tingin. Maaaring magtaka ang isang hindi mapagkakatiwalaang mambabasa kung bakit bahagi ng identifier ang oras. Ang dahilan ay sinusuportahan ng mga stream ng Redis ang mga query sa hanay sa pamamagitan ng ID. Dahil nauugnay ang identifier sa oras na ginawa ang tala, ginagawa nitong posible na mag-query ng mga hanay ng oras. Titingnan natin ang isang partikular na halimbawa kapag tinitingnan natin ang utos XRANGE.

Kung sa ilang kadahilanan ay kailangang tukuyin ng user ang kanyang sariling identifier, na, halimbawa, ay nauugnay sa ilang panlabas na sistema, maaari naming ipasa ito sa utos XADD sa halip na * tulad ng ipinapakita sa ibaba:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Pakitandaan na sa kasong ito, dapat mong subaybayan ang iyong sarili sa pagtaas ng ID. Sa aming halimbawa, ang minimum na identifier ay "0-1", kaya ang command ay hindi tatanggap ng isa pang identifier na katumbas o mas mababa sa "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Bilang ng mga tala sa bawat stream

Posibleng makuha ang bilang ng mga tala sa isang stream sa pamamagitan lamang ng paggamit ng command XLEN. Para sa aming halimbawa, ibabalik ng utos na ito ang sumusunod na halaga:

> XLEN somestream
(integer) 2

Mga query sa hanay - XRANGE at XREVRANGE

Upang humiling ng data ayon sa hanay, kailangan naming tumukoy ng dalawang identifier - ang simula at dulo ng hanay. Kasama sa ibinalik na hanay ang lahat ng elemento, kabilang ang mga hangganan. Mayroon ding dalawang espesyal na identifier na "-" at "+", ayon sa pagkakabanggit, ang pinakamaliit (unang record) at pinakamalaking (huling record) na identifier sa stream. Ililista ng halimbawa sa ibaba ang lahat ng mga entry sa stream.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Ang bawat ibinalik na tala ay isang hanay ng dalawang elemento: isang identifier at isang listahan ng mga pares ng key-value. Nasabi na namin na ang mga record identifier ay nauugnay sa oras. Samakatuwid, maaari kaming humiling ng isang hanay ng isang partikular na yugto ng panahon. Gayunpaman, maaari naming tukuyin sa kahilingan hindi ang buong identifier, ngunit ang oras ng Unix lamang, na inaalis ang bahaging nauugnay sa sequenceNumber. Ang inalis na bahagi ng identifier ay awtomatikong itatakda sa zero sa simula ng range at sa maximum na posibleng value sa dulo ng range. Nasa ibaba ang isang halimbawa kung paano ka makakahiling ng hanay na dalawang millisecond.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Mayroon lang kaming isang entry sa hanay na ito, gayunpaman sa mga totoong set ng data, maaaring malaki ang resulta na ibinalik. Dahil dito XRANGE sumusuporta sa COUNT na opsyon. Sa pamamagitan ng pagtukoy ng dami, makukuha lang natin ang unang N record. Kung kailangan nating makuha ang susunod na N record (pagination), maaari nating gamitin ang huling natanggap na ID, dagdagan ito sequenceNumber ng isa at magtanong muli. Tingnan natin ito sa sumusunod na halimbawa. Nagsisimula kaming magdagdag ng 10 elemento na may XADD (ipagpalagay na ang mystream ay napuno na ng 10 elemento). Upang simulan ang pag-ulit sa pagkuha ng 2 elemento sa bawat command, magsisimula tayo sa buong hanay ngunit may COUNT na katumbas ng 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Upang magpatuloy sa pag-ulit sa susunod na dalawang elemento, kailangan nating piliin ang huling natanggap na ID, ibig sabihin, 1519073279157-0, at magdagdag ng 1 sa sequenceNumber.
Ang resultang ID, sa kasong ito, 1519073279157-1, ay magagamit na ngayon bilang bagong simula ng argument ng range para sa susunod na tawag XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

At iba pa. Dahil kumplikado XRANGE ay O(log(N)) upang maghanap at pagkatapos ay O(M) upang ibalik ang mga elemento ng M, kung gayon ang bawat hakbang sa pag-ulit ay mabilis. Kaya, gamit XRANGE stream ay maaaring umulit nang mahusay.

Koponan XREVRANGE ay ang katumbas XRANGE, ngunit ibinabalik ang mga elemento sa reverse order:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Mangyaring tandaan na ang utos XREVRANGE tumatagal ang mga argumento ng hanay na magsimula at huminto sa reverse order.

Pagbabasa ng mga bagong entry gamit ang XREAD

Kadalasan ang gawain ay lumitaw sa pag-subscribe sa isang stream at pagtanggap lamang ng mga bagong mensahe. Ang konseptong ito ay maaaring mukhang katulad sa Redis Pub/Sub o pagharang sa Redis List, ngunit may mga pangunahing pagkakaiba sa kung paano gamitin ang Redis Stream:

  1. Ang bawat bagong mensahe ay inihahatid sa bawat subscriber bilang default. Ang gawi na ito ay iba sa isang nakaharang na Listahan ng Redis, kung saan ang isang bagong mensahe ay babasahin lamang ng isang subscriber.
  2. Habang nasa Redis Pub/Sub ang lahat ng mga mensahe ay nakalimutan at hindi nagpapatuloy, sa Stream lahat ng mga mensahe ay pinananatili nang walang katiyakan (maliban kung ang kliyente ay tahasang nagdudulot ng pagtanggal).
  3. Binibigyang-daan ka ng Redis Stream na ibahin ang pag-access sa mga mensahe sa loob ng isang stream. Makikita lang ng isang partikular na subscriber ang kanilang kasaysayan ng personal na mensahe.

Maaari kang mag-subscribe sa isang thread at makatanggap ng mga bagong mensahe gamit ang command XREAD. Ito ay medyo mas kumplikado kaysa sa XRANGE, kaya magsisimula muna tayo sa mas simpleng mga halimbawa.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Ang halimbawa sa itaas ay nagpapakita ng isang hindi nakaharang na form XREAD. Tandaan na ang opsyon na COUNT ay opsyonal. Sa katunayan, ang tanging kinakailangang command na opsyon ay ang STREAMS na opsyon, na tumutukoy sa isang listahan ng mga stream kasama ang kaukulang maximum identifier. Isinulat namin ang "STREAMS mystream 0" - gusto naming matanggap ang lahat ng record ng mystream stream na may identifier na mas malaki kaysa sa "0-0". Tulad ng nakikita mo mula sa halimbawa, ibinabalik ng command ang pangalan ng thread dahil maaari tayong mag-subscribe sa maraming mga thread nang sabay-sabay. Maaari naming isulat, halimbawa, ang "STREAMS mystream otherstream 0 0". Pakitandaan na pagkatapos ng opsyong STREAMS kailangan muna naming ibigay ang mga pangalan ng lahat ng kinakailangang stream at pagkatapos lamang ng isang listahan ng mga identifier.

Sa simpleng form na ito ang utos ay hindi gumagawa ng anumang espesyal kumpara sa XRANGE. Gayunpaman, ang kawili-wiling bagay ay madali tayong lumiko XREAD sa isang blocking command, na tumutukoy sa BLOCK argument:

> XREAD BLOCK 0 STREAMS mystream $

Sa halimbawa sa itaas, isang bagong opsyon na BLOCK ang tinukoy na may timeout na 0 milliseconds (nangangahulugan ito ng paghihintay nang walang katiyakan). Bukod dito, sa halip na ipasa ang karaniwang identifier para sa stream mystream, isang espesyal na identifier na $ ang ipinasa. Ang espesyal na identifier na ito ay nangangahulugan na XREAD dapat gamitin ang maximum na identifier sa mystream bilang identifier. Kaya makakatanggap lang kami ng mga bagong mensahe simula sa sandaling nagsimula kaming makinig. Sa ilang mga paraan ito ay katulad ng Unix "tail -f" na utos.

Tandaan na kapag ginagamit ang opsyon na BLOCK hindi namin kinakailangang gamitin ang espesyal na identifier na $. Maaari naming gamitin ang anumang identifier na umiiral sa stream. Kung maseserbisyuhan kaagad ng team ang aming kahilingan nang hindi nagba-block, gagawin nito, kung hindi, haharangin nito.

Hinaharang XREAD maaari ding makinig sa maramihang mga thread nang sabay-sabay, kailangan mo lang tukuyin ang kanilang mga pangalan. Sa kasong ito, ibabalik ng command ang isang talaan ng unang stream na nakatanggap ng data. Ang unang subscriber na na-block para sa isang partikular na thread ay unang makakatanggap ng data.

Mga Grupo ng Consumer

Sa ilang partikular na gawain, gusto naming limitahan ang access ng subscriber sa mga mensahe sa loob ng isang thread. Ang isang halimbawa kung saan maaari itong maging kapaki-pakinabang ay isang pila ng mensahe sa mga manggagawa na makakatanggap ng iba't ibang mga mensahe mula sa isang thread, na nagpapahintulot sa pagpoproseso ng mensahe na lumaki.

Kung iniisip namin na mayroon kaming tatlong subscriber na C1, C2, C3 at isang thread na naglalaman ng mga mensahe 1, 2, 3, 4, 5, 6, 7, ang mga mensahe ay ihahatid tulad ng sa diagram sa ibaba:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Upang makamit ang epektong ito, gumagamit ang Redis Stream ng isang konsepto na tinatawag na Consumer Group. Ang konseptong ito ay katulad ng isang pseudo-subscriber, na tumatanggap ng data mula sa isang stream, ngunit aktwal na inihahatid ng maraming subscriber sa loob ng isang grupo, na nagbibigay ng ilang partikular na garantiya:

  1. Ang bawat mensahe ay inihahatid sa ibang subscriber sa loob ng grupo.
  2. Sa loob ng isang grupo, ang mga subscriber ay nakikilala sa pamamagitan ng kanilang pangalan, na isang case-sensitive na string. Kung ang isang subscriber ay pansamantalang umalis sa grupo, maaari siyang maibalik sa grupo gamit ang kanyang sariling natatanging pangalan.
  3. Ang bawat Consumer Group ay sumusunod sa konsepto ng "unang hindi pa nababasang mensahe". Kapag humiling ang isang subscriber ng mga bagong mensahe, maaari lamang itong makatanggap ng mga mensahe na hindi pa naihatid sa sinumang subscriber sa loob ng grupo.
  4. Mayroong utos na tahasang kumpirmahin na matagumpay na naproseso ng subscriber ang mensahe. Hanggang sa tawagin ang command na ito, ang hiniling na mensahe ay mananatili sa status na "nakabinbin".
  5. Sa loob ng Consumer Group, maaaring humiling ang bawat subscriber ng history ng mga mensaheng naihatid sa kanya, ngunit hindi pa napoproseso (sa status na "nakabinbin")

Sa isang kahulugan, ang estado ng grupo ay maaaring ipahayag tulad ng sumusunod:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Ngayon ay oras na upang maging pamilyar sa mga pangunahing utos para sa Consumer Group, lalo na:

  • XGROUP ginagamit upang lumikha, sirain at pamahalaan ang mga grupo
  • XREADGROUP ginagamit upang basahin ang stream sa pamamagitan ng grupo
  • XACK - pinapayagan ng command na ito ang subscriber na markahan ang mensahe bilang matagumpay na naproseso

Paglikha ng Consumer Group

Ipagpalagay natin na umiral na ang mystream. Pagkatapos ang utos ng paggawa ng grupo ay magiging ganito:

> XGROUP CREATE mystream mygroup $
OK

Kapag gumagawa ng grupo, dapat tayong magpasa ng identifier, simula kung saan makakatanggap ang grupo ng mga mensahe. Kung gusto lang naming makatanggap ng lahat ng bagong mensahe, maaari naming gamitin ang espesyal na identifier na $ (tulad ng sa aming halimbawa sa itaas). Kung tutukuyin mo ang 0 sa halip na isang espesyal na identifier, ang lahat ng mensahe sa thread ay magiging available sa grupo.

Ngayong nabuo na ang grupo, maaari na nating simulan agad ang pagbabasa ng mga mensahe gamit ang command XREADGROUP. Ang utos na ito ay halos kapareho sa XREAD at sumusuporta sa opsyonal na opsyon na BLOCK. Gayunpaman, mayroong kinakailangang opsyon na GROUP na dapat palaging tukuyin na may dalawang argumento: ang pangalan ng grupo at ang pangalan ng subscriber. Ang opsyon na COUNT ay sinusuportahan din.

Bago basahin ang thread, maglagay tayo ng ilang mensahe doon:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Ngayon subukan nating basahin ang stream na ito sa pamamagitan ng grupo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ang utos sa itaas ay nagbabasa ng verbatim tulad ng sumusunod:

"Ako, ang subscriber na si Alice, isang miyembro ng mygroup, ay gustong magbasa ng isang mensahe mula sa mystream na hindi pa naihatid sa sinuman."

Sa bawat oras na ang isang subscriber ay nagsasagawa ng isang operasyon sa isang grupo, dapat itong magbigay ng pangalan nito, na natatanging nagpapakilala sa sarili nito sa loob ng grupo. May isa pang napakahalagang detalye sa utos sa itaas - ang espesyal na identifier na ">". Pini-filter ng espesyal na identifier na ito ang mga mensahe, na iniiwan lamang ang mga hindi pa naihatid dati.

Gayundin, sa mga espesyal na kaso, maaari kang tumukoy ng totoong identifier gaya ng 0 o anumang iba pang wastong identifier. Sa kasong ito ang utos XREADGROUP ibabalik sa iyo ang isang kasaysayan ng mga mensahe na may katayuang "nakabinbin" na naihatid sa tinukoy na subscriber (Alice) ngunit hindi pa kinikilala gamit ang command XACK.

Maaari naming subukan ang gawi na ito sa pamamagitan ng pagtukoy kaagad ng ID 0, nang walang opsyon COUNT. Makakakita lang kami ng isang nakabinbing mensahe, iyon ay, ang mensahe ng mansanas:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Gayunpaman, kung kinumpirma namin ang mensahe bilang matagumpay na naproseso, hindi na ito ipapakita:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Ngayon ay si Bob na ang magbasa ng isang bagay:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Si Bob, isang miyembro ng mygroup, ay humiling ng hindi hihigit sa dalawang mensahe. Ang command ay nag-uulat lamang ng mga hindi naihatid na mensahe dahil sa espesyal na identifier na ">". Tulad ng nakikita mo, ang mensaheng "mansanas" ay hindi ipapakita dahil naihatid na ito kay Alice, kaya natanggap ni Bob ang "orange" at "strawberry".

Sa ganitong paraan, makakabasa sina Alice, Bob, at sinumang iba pang subscriber sa grupo ng iba't ibang mensahe mula sa parehong stream. Maaari din nilang basahin ang kanilang kasaysayan ng mga hindi naprosesong mensahe o markahan ang mga mensahe bilang naproseso.

Mayroong ilang mga bagay na dapat tandaan:

  • Sa sandaling isaalang-alang ng subscriber ang mensahe bilang isang utos XREADGROUP, ang mensaheng ito ay mapupunta sa "nakabinbin" na estado at itinalaga sa partikular na subscriber na iyon. Hindi mababasa ng ibang mga subscriber ng grupo ang mensaheng ito.
  • Awtomatikong nagagawa ang mga subscriber sa unang pagbanggit, hindi na kailangang tahasang likhain ang mga ito.
  • May XREADGROUP maaari kang magbasa ng mga mensahe mula sa maraming magkakaibang mga thread nang sabay-sabay, gayunpaman para gumana ito kailangan mo munang lumikha ng mga pangkat na may parehong pangalan para sa bawat thread gamit ang XGROUP

Pagbawi pagkatapos ng kabiguan

Maaaring makabawi ang subscriber mula sa pagkabigo at muling basahin ang kanyang listahan ng mga mensahe na may katayuang "nakabinbin". Gayunpaman, sa totoong mundo, maaaring mabigo ang mga subscriber. Ano ang mangyayari sa mga nakatigil na mensahe ng subscriber kung hindi maka-recover ang subscriber mula sa isang pagkabigo?
Nag-aalok ang Consumer Group ng feature na ginagamit para lang sa mga ganitong kaso - kapag kailangan mong palitan ang may-ari ng mga mensahe.

Ang unang bagay na kailangan mong gawin ay tawagan ang utos XPENDING, na nagpapakita ng lahat ng mensahe sa pangkat na may katayuang β€œnakabinbin”. Sa pinakasimpleng anyo nito, ang utos ay tinatawag na may dalawang argumento lamang: ang pangalan ng thread at ang pangalan ng grupo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Ipinakita ng team ang bilang ng mga hindi naprosesong mensahe para sa buong grupo at para sa bawat subscriber. Mayroon lang kaming Bob na may dalawang natitirang mensahe dahil ang tanging mensahe na hiniling ni Alice ay nakumpirma XACK.

Maaari kaming humiling ng higit pang impormasyon gamit ang higit pang mga argumento:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - hanay ng mga identifier (maaari mong gamitin ang β€œ-” at β€œ+”)
{count} β€” bilang ng mga pagtatangka sa paghahatid
{consumer-name} - pangalan ng grupo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Ngayon ay mayroon na kaming mga detalye para sa bawat mensahe: ID, pangalan ng subscriber, idle time sa millisecond at panghuli ang bilang ng mga pagtatangka sa paghahatid. Mayroon kaming dalawang mensahe mula kay Bob at ang mga ito ay idle sa loob ng 74170458 millisecond, mga 20 oras.

Pakitandaan na walang pumipigil sa amin na suriin kung ano ang nilalaman ng mensahe sa pamamagitan lamang ng paggamit XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Kailangan lang nating ulitin ang parehong identifier nang dalawang beses sa mga argumento. Ngayon na mayroon na kaming ideya, maaaring magpasya si Alice na pagkatapos ng 20 oras ng downtime, malamang na hindi na mababawi si Bob, at oras na para i-query ang mga mensaheng iyon at ipagpatuloy ang pagproseso ng mga ito para kay Bob. Para dito ginagamit namin ang utos XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Gamit ang command na ito, makakatanggap kami ng mensaheng "banyaga" na hindi pa napoproseso sa pamamagitan ng pagpapalit ng may-ari sa {consumer}. Gayunpaman, maaari rin kaming magbigay ng minimum na idle time {min-idle-time}. Nakakatulong ito na maiwasan ang isang sitwasyon kung saan sinusubukan ng dalawang kliyente na sabay na baguhin ang may-ari ng parehong mga mensahe:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Ire-reset ng unang customer ang downtime at tataas ang delivery counter. Kaya hindi ito magagawa ng pangalawang kliyente na hilingin.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Ang mensahe ay matagumpay na na-claim ni Alice, na maaari na ngayong magproseso ng mensahe at kilalanin ito.

Mula sa halimbawa sa itaas, makikita mo na ang isang matagumpay na kahilingan ay nagbabalik ng mga nilalaman ng mensahe mismo. Gayunpaman, hindi ito kinakailangan. Ang opsyon na JUSTID ay maaaring gamitin upang ibalik ang mga message ID lamang. Ito ay kapaki-pakinabang kung hindi ka interesado sa mga detalye ng mensahe at nais na pataasin ang pagganap ng system.

Delivery counter

Ang counter na nakikita mo sa output XPENDING ay ang bilang ng mga paghahatid ng bawat mensahe. Ang naturang counter ay dinaragdagan sa dalawang paraan: kapag matagumpay na hiniling ang isang mensahe sa pamamagitan ng XCLAIM o kapag ginamit ang isang tawag XREADGROUP.

Normal para sa ilang mga mensahe na maihatid nang maraming beses. Ang pangunahing bagay ay ang lahat ng mga mensahe ay naproseso sa kalaunan. Minsan ang mga problema ay nangyayari kapag nagpoproseso ng mensahe dahil ang mensahe mismo ay sira, o ang pagpoproseso ng mensahe ay nagdudulot ng error sa handler code. Sa kasong ito, maaaring lumabas na walang makakapagproseso ng mensaheng ito. Dahil mayroon kaming counter ng pagtatangka sa paghahatid, magagamit namin ang counter na ito upang matukoy ang mga ganitong sitwasyon. Samakatuwid, kapag naabot na ng bilang ng paghahatid ang mataas na numero na iyong tinukoy, malamang na mas matalinong maglagay ng ganoong mensahe sa isa pang thread at magpadala ng notification sa administrator ng system.

Estado ng Thread

Koponan XINFO ginagamit upang humiling ng iba't ibang impormasyon tungkol sa isang thread at mga pangkat nito. Halimbawa, ganito ang hitsura ng isang pangunahing utos:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Ang command sa itaas ay nagpapakita ng pangkalahatang impormasyon tungkol sa tinukoy na stream. Ngayon isang bahagyang mas kumplikadong halimbawa:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Ang command sa itaas ay nagpapakita ng pangkalahatang impormasyon para sa lahat ng grupo ng tinukoy na thread

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Ang command sa itaas ay nagpapakita ng impormasyon para sa lahat ng mga subscriber ng tinukoy na stream at grupo.
Kung nakalimutan mo ang command syntax, humingi lamang ng tulong sa mismong command:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limitasyon sa Laki ng Stream

Maraming mga application ang hindi gustong mangolekta ng data sa isang stream magpakailanman. Madalas na kapaki-pakinabang na magkaroon ng maximum na bilang ng mga mensaheng pinapayagan sa bawat thread. Sa ibang mga kaso, kapaki-pakinabang na ilipat ang lahat ng mga mensahe mula sa isang thread patungo sa isa pang paulit-ulit na tindahan kapag naabot na ang tinukoy na laki ng thread. Maaari mong limitahan ang laki ng isang stream gamit ang MAXLEN parameter sa command XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kapag gumagamit ng MAXLEN, awtomatikong nade-delete ang mga lumang record kapag umabot sila sa tinukoy na haba, kaya ang stream ay may pare-parehong laki. Gayunpaman, ang pruning sa kasong ito ay hindi nangyayari sa pinaka mahusay na paraan sa memorya ng Redis. Maaari mong pagbutihin ang sitwasyon tulad ng sumusunod:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Ang argumentong ~ sa halimbawa sa itaas ay nangangahulugan na hindi natin kailangang limitahan ang haba ng stream sa isang partikular na halaga. Sa aming halimbawa, ito ay maaaring anumang numerong mas malaki sa o katumbas ng 1000 (halimbawa, 1000, 1010, o 1030). Tahasang tinukoy lang namin na gusto naming mag-imbak ang aming stream ng hindi bababa sa 1000 record. Ginagawa nitong mas mahusay ang pamamahala ng memorya sa loob ng Redis.

Mayroon ding hiwalay na koponan XTRIM, na gumagawa ng parehong bagay:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Patuloy na imbakan at pagtitiklop

Ang Redis Stream ay asynchronous na kinokopya sa mga slave node at nai-save sa mga file tulad ng AOF (snapshot ng lahat ng data) at RDB (log ng lahat ng mga operasyon sa pagsulat). Sinusuportahan din ang pagkopya ng estado ng Consumer Groups. Samakatuwid, kung ang isang mensahe ay nasa status na "nakabinbin" sa master node, pagkatapos ay sa mga slave node ang mensaheng ito ay magkakaroon ng parehong katayuan.

Pag-alis ng mga indibidwal na elemento mula sa isang stream

Mayroong isang espesyal na utos upang tanggalin ang mga mensahe XDEL. Nakukuha ng command ang pangalan ng thread na sinusundan ng mga message ID na tatanggalin:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kapag ginagamit ang utos na ito, kailangan mong isaalang-alang na ang aktwal na memorya ay hindi ilalabas kaagad.

Walang haba na mga stream

Ang pagkakaiba sa pagitan ng mga stream at iba pang mga istruktura ng data ng Redis ay kapag ang ibang mga istruktura ng data ay wala nang mga elemento sa loob ng mga ito, bilang isang side effect, ang istraktura ng data mismo ay aalisin mula sa memorya. Kaya, halimbawa, ang pinagsunod-sunod na hanay ay ganap na aalisin kapag ang ZREM na tawag ay nag-alis ng huling elemento. Sa halip, ang mga thread ay pinapayagang manatili sa memorya kahit na walang anumang elemento sa loob.

Konklusyon

Ang Redis Stream ay perpekto para sa paglikha ng mga broker ng mensahe, mga pila ng mensahe, pinag-isang pag-log, at mga sistema ng chat na nag-iingat ng kasaysayan.

Gaya ng sinabi ko minsan Niklaus Wirth, ang mga programa ay mga algorithm at mga istruktura ng data, at pareho na kayong binibigyan ng Redis.

Pinagmulan: www.habr.com

Magdagdag ng komento