Redis Stream – надзейнасць і маштабаванасць вашых сістэм паведамленняў

Redis Stream – надзейнасць і маштабаванасць вашых сістэм паведамленняў

Redis Stream - новы абстрактны тып дадзеных, прадстаўлены ў Redis з выхадам версіі 5.0
Канцэптуальна Redis Stream - гэта List, у які вы можаце дадаваць запісы. Кожны запіс мае ўнікальны ідэнтыфікатар. Па змаўчанні ідэнтыфікатар генеруецца аўтаматычна і складаецца з часавую пазнаку. Таму вы можаце запытваць дыяпазоны запісаў па часе ці атрымліваць новыя дадзеныя па меры іх паступлення ў струмень, як Unix каманда "tail -f" чытае лог-файл і замірае ў чаканні новых дадзеных. Звярніце ўвагу, што паток могуць слухаць адначасова некалькі кліентаў, як многія "tail -f" працэсы могуць адначасова чытаць файл, не канфліктуючы сябар з сябрам.

Каб зразумець усе перавагі новага тыпу дадзеных, давайце бегла ўспомнім даўно існуючыя структуры Redis, якія часткова паўтараюць функцыянальнасць Redis Stream.

Redis PUB/SUB

Redis Pub/Sub - простая сістэма паведамленняў, ужо убудаваная ў ваша key-value сховішча. Аднак за прастату даводзіцца плаціць:

  • Калі выдавец па якіх-небудзь прычынах выходзіць са строю, то ён губляе ўсіх сваіх падпісчыкаў.
  • Выдаўцу неабходна ведаць дакладны адрас усіх яго падпісчыкаў
  • Выдавец можа перагрузіць працай сваіх падпісчыкаў, калі дадзеныя публікуюцца хутчэй, чым апрацоўваюцца.
  • Паведамленне выдаляецца з буфера выдаўца адразу пасля публікацыі, незалежна ад таго, якой колькасці падпісчыкаў яно дастаўлена і як хутка тыя здолелі апрацаваць гэтае паведамленне.
  • Усе падпісанты атрымаюць паведамленне адначасова. Падпісчыкі самі павінны неяк паміж сабой узгадняць парадак апрацоўкі адно і таго ж паведамлення.
  • Няма ўбудаванага механізму пацверджання паспяховай апрацоўкі паведамлення падпісчыкам. Калі падпісчык атрымаў паведамленне і ўпаў падчас апрацоўкі, то выдавец пра гэта не даведаецца.

Redis List

Redis List - структура дадзеных, якая падтрымлівае каманды чытання з блакіроўкай. Вы можаце дадаваць і счытваць паведамленні з пачатку ці канец спісу. На базай гэтай структуры можна зрабіць нядрэнны стэк або чарга для вашай размеркаванай сістэмы і гэтага ў большасці выпадкаў будзе дастаткова. Асноўныя адрозненні ад Redis Pub/Sub:

  • Паведамленне дастаўляецца аднаму кліенту. Першы заблакаваны чытаннем кліент атрымае дадзеныя першым.
  • Клінт павінен сам ініцыяваць аперацыю чытання кожнага паведамлення. List нічога не ведае аб кліентах.
  • Паведамленні захоўваюцца да таго часу, пакуль іх нехта не лічыць ці не выдаліць відавочна. Калі вы наладзілі Redis сервер, каб ён скідаў дадзеныя на дыск, то надзейнасць сістэмы рэзка ўзрастае.

Увядзенне ў Stream

Даданне запісу ў паток

Каманда XADD дадае новы запіс у паток. Запіс - гэта не проста радок, яна складаецца з адной або некалькіх пар ключ-значэнне. Такім чынам, кожны запіс ужо структураваная і нагадвае структуру CSV файла.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

У прыкладзе вышэй мы дадаем у струмень з імем (ключом) "mystream" два палі: "sensor-id" і "temperature" са значэннямі "1234" і "19.8" адпаведна. У якасці другога аргумента каманда прымае ідэнтыфікатар, які будзе прысвоены запісы - гэты ідэнтыфікатар адназначна ідэнтыфікуе кожны запіс у струмені. Аднак у гэтым выпадку мы перадалі *, таму што мы жадаем, каб Redis згенераваў для нас новы ідэнтыфікатар. Кожны новы ідэнтыфікатар будзе павялічвацца. Таму кожны новы запіс будзе мець большы ідэнтыфікатар у адносінах да папярэдніх запісаў.

Фармат ідэнтыфікатара

Ідэнтыфікар запісу, які вяртаецца камандай XADD, складаецца з двух частак:

{millisecondsTime}-{sequenceNumber}

millisecondsTime - Unix час у мілісекундах (час сервера Redis). Аднак калі бягучы час аказваецца такім жа або меншым, чым час папярэдняга запісу, то выкарыстоўваецца часавая пазнака папярэдняга запісу. Таму калі час сервера вяртаецца ў мінулае, то новы ідэнтыфікатар усё яшчэ будзе захоўваць уласцівасць павелічэння.

паслядоўныНумар выкарыстоўваецца для запісаў, створаных у адну і тую ж мілісекунду. паслядоўныНумар будзе павялічаны на 1 адносна папярэдняга запісу. Паколькі паслядоўныНумар мае памер 64 біта, то на практыцы вы не павінны ўперціся ў абмежаванне на колькасць запісаў, якія могуць быць згенераваны на працягу адной мілісекунды.

Фармат такіх ідэнтыфікатараў на першы погляд можа падацца дзіўным. Недаверлівы чытач можа задацца пытаннем, чаму час з'яўляецца часткай ідэнтыфікатара. Чыннік у тым, што струмені Redis падтрымліваюць запыты дыяпазону па ідэнтыфікатарах. Паколькі ідэнтыфікатар злучаны з часам стварэння запісу, тое гэта дае магчымасць запытваць дыяпазоны часу. Мы разгледзім канкрэтны прыклад, калі пяройдзем да вывучэння каманды XRANGE.

Калі па якім-небудзь чынніку карыстачу патрабуецца паказаць свой уласны ідэнтыфікатар, які, напрыклад, злучаны з якой-небудзь вонкавай сістэмай, то мы можам яго перадаць камандзе XADD замест знака * як паказана ніжэй:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Звярніце ўвагу, што ў гэтым выпадку вы павінны сачыць за павелічэннем ідэнтыфікатара. У нашым прыкладзе мінімальны ідэнтыфікатар роўны "0-1", таму каманда не прыме яшчэ адзін ідэнтыфікатар, які роўны ці менш "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Колькасць запісаў у патоку

Можна атрымаць колькасць запісаў у патоку, проста выкарыстоўваючы каманду XLEN. Для нашага прыкладу гэтая каманда верне наступнае значэнне:

> XLEN somestream
(integer) 2

Запыты па дыяпазоне - XRANGE і XREVRANGE

Каб запытаць дадзеныя па дыяпазоне, нам трэба пазначыць два ідэнтыфікатары — пачатак і канец дыяпазону. Які вяртаецца дыяпазон будзе ўключаць усе элементы, уключаючы межы. Таксама існуюць два спецыяльныя ідэнтыфікатары «-» і «+», адпаведна якія азначаюць самы маленькі (першы запіс) і самы вялікі (апошні запіс) ідэнтыфікатар у струмені. Прыклад ніжэй выведзе ўсе запісы патоку.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Кожны які вяртаецца запіс уяўляе сабой масіў з двух элементаў: ідэнтыфікатар і спіс пар ключ-значэнне. Мы ўжо казалі, што ідэнтыфікатары запісу маюць дачыненне да часу. Таму мы можам запытваць дыяпазон канкрэтнага прамежку часу. Аднак, мы можам паказаць у запыце не поўны ідэнтыфікатар, а толькі Unix час, апусціўшы частку, якая адносіцца да паслядоўныНумар. Апушчаная частка ідэнтыфікатара аўтаматычна прыраўняецца да нуля ў пачатку дыяпазону і да максімальна магчымага значэння ў канцы дыяпазону. Ніжэй прыведзены прыклад, як можна запытаць дыяпазон, роўны двум мілісекундам.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

У нас ёсць толькі адна запіс у гэтым дыяпазоне, аднак у рэальных наборах дадзеных які вяртаецца вынік можа быць велізарным. Па гэтай прычыне XRANGE падтрымлівае опцыю COUNT. Указаўшы колькасць, мы можам проста атрымаць першыя N запісаў. Калі нам трэба атрымаць наступныя N запісаў (пагінацыя), мы можам выкарыстоўваць апошні атрыманы ідэнтыфікатар, павялічыць у яго паслядоўныНумар на адзінку і запытаць зноў. Давайце паглядзім на гэта ў наступным прыкладзе. Мы пачынаем дадаваць 10 элементаў з дапамогай XADD (Выкажам здагадку, што струмень mystream ужо быў запоўнены 10 элементамі). Каб пачаць ітэрацыю, атрымліваючы па 2 элементы на каманду, мы пачынаем з поўнага дыяпазону, але з COUNT роўным 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Каб працягнуць ітэрацыю з наступнымі двума элементамі, нам трэба абраць апошні атрыманы ідэнтыфікатар, гэта значыць 1519073279157-0, і дадаць 1 да паслядоўныНумар.
Выніковы ідэнтыфікатар, у дадзеным выпадку 1519073279157-1, зараз яго можна выкарыстоўваць у якасці новага аргументу пачатку дыяпазону для наступнага выкліку XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

І гэтак далей. Паколькі складанасць XRANGE складае O(log(N)) для пошуку, а затым O(M) для вяртання M элементаў, то кожны крок ітэрацыі з'яўляецца хуткім. Такім чынам, з дапамогай XRANGE можна эфектыўна ітэраваць патокі.

Каманда XREVRANGE з'яўляецца эквівалентам XRANGE, але вяртае элементы ў зваротным парадку:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Звярніце ўвагу, што каманда XREVRANGE прымае аргументы дыяпазону start і stop у зваротным парадку.

Чытанне новых запісаў з дапамогай XREAD

Часта ўзнікае задача падпісацца на паток і атрымліваць толькі новыя паведамленні. Гэтая канцэпцыя можа здацца падобнай на Redis Pub/Sub або які блакуе Redis List, але ёсць прынцыповыя адрозненні ў тым, як выкарыстоўваць Redis Stream:

  1. Кожнае новае паведамленне па змаўчанні дастаўляецца кожнаму падпісанту. Гэтыя паводзіны адрозніваюцца ад блакіруючага Redis List, дзе новае паведамленне будзе прачытана толькі нейкім адным падпісчыкам.
  2. У той час як у Redis Pub/Sub усе паведамленні забываюцца і ніколі не захоўваюцца, у Stream усе паведамленні захоўваюцца на нявызначаны тэрмін (калі кліент відавочна не выкліча выдаленне).
  3. Redis Stream дазваляе размежаваць доступ да паведамленняў усярэдзіне аднаго струменя. Канкрэтны падпісчык можа бачыць толькі сваю асабістую гісторыю паведамленняў.

Вы можаце падпісацца на паток і атрымліваць новыя паведамленні, выкарыстоўваючы каманду XREAD. Гэта крыху больш складана, чым XRANGE, таму мы спачатку пачнем з прыкладаў прасцей.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

У прыкладзе вышэй указана неблакіруючая форма XREAD. Звярніце ўвагу, што опцыя COUNT не з'яўляецца абавязковай. Фактычна адзінай абавязковай опцыяй каманды з'яўляецца опцыя STREAMS, якая задае спіс плыняў разам з адпаведным максімальным ідэнтыфікатарам. Мы напісалі "STREAMS mystream 0" - мы хочам атрымліваць усе запісы патоку mystream з ідэнтыфікатарам больш чым "0-0". Як бачна з прыкладу, каманда вяртае імя патоку, таму што мы можам падпісацца на некалькі патокаў адначасова. Мы маглі б напісаць, напрыклад, "STREAMS mystream otherstream 0 0". Звярніце ўвагу, што пасля опцыі STREAMS нам трэба спачатку даць імёны ўсіх патрэбных патокаў і толькі потым спіс ідэнтыфікатараў.

У гэтай простай форме каманда не робіць нічога асаблівага ў параўнанні з XRANGE. Аднак цікава тое, што мы можам лёгка ператварыць XREAD у блакавальную каманду, паказаўшы аргумент BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

У прыведзеным вышэй прыкладзе, пазначана новая опцыя BLOCK з часам чакання 0 мілісекунд (гэта азначае бясконцае чаканне). Больш за тое, замест перадачы звычайнага ідэнтыфікатара для патоку mystream, быў перададзены спецыяльны ідэнтыфікатар $. Гэты спецыяльны ідэнтыфікатар азначае, што XREAD павінен выкарыстоўваць у якасці ідэнтыфікатара максімальны ідэнтыфікатар у струмені mystream. Так што мы будзем атрымліваць толькі новыя паведамленні, пачынаючы з моманту, калі мы пачалі праслухоўванне. У некаторым сэнсе гэта падобна на Unix каманду "tail-f".

Звярніце ўвагу, што пры выкарыстанні опцыі BLOCK нам не абавязкова трэба выкарыстоўваць спецыяльны ідэнтыфікатар $. Мы можам выкарыстоўваць любы існуючы ў патоку ідэнтыфікатар. Калі каманда зможа абслужыць наш запыт неадкладна, без блакавання, яна зробіць гэта, у адваротным выпадку яна заблакуецца.

Блакуючы XREAD таксама можа праслухоўваць адразу некалькі патокаў, проста трэба пазначыць іх імёны. У гэтым выпадку каманда верне запіс першага патоку, у які паступілі даныя. Першы падпісчык, заблакаваны для дадзенага струменя, будзе атрымліваць дадзеныя першым.

Consumer Groups

У пэўных задачах мы хочам размежаваць доступ падпісчыкаў да паведамленняў унутры аднаго патоку. Прыкладам, калі гэта можа быць карысна - чарга паведамленняў з воркерамі, якія будуць атрымліваць розныя паведамленні патоку, дазваляючы маштабаваць апрацоўку паведамленняў.

Калі мы прадставім, што ў нас ёсць тры падпісчыка C1, C2, C3 і паток, які змяшчае паведамленні 1, 2, 3, 4, 5, 6, 7, то абслугоўванне паведамленняў будзе адбывацца як на дыяграме ніжэй:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Каб атрымаць гэты эфект, Redis Stream выкарыстоўвае канцэпцыю, званую Consumer Group. Гэтая канцэпцыя падобная да псеўда-падпісчыка, які атрымлівае дадзеныя з патоку, але фактычна абслугоўваецца некалькімі падпісчыкамі ўнутры групы, падаючы пэўныя гарантыі:

  1. Кожнае паведамленне дастаўляецца розным падпісчыкам унутры групы.
  2. У межах групы падпісчыкі ідэнтыфікуюцца па імені, якое ўяўляе сабой радок з улікам рэгістра. Калі нейкі падпісчык часова выпадзе з групы, то ён можа аднавіцца ў групу па ўласным унікальным імі.
  3. Кожная Consumer Group варта канцэпцыі "першае непрачытанае паведамленне". Калі падпісчык запытвае новыя паведамленні, ён можа атрымаць толькі тыя паведамленні, якія ніколі раней не дастаўляліся ніводнаму падпісанту ўнутры групы.
  4. Існуе каманда відавочнага пацверджання паспяховай апрацоўкі паведамлення падпісчыкам. Пакуль не будзе выклікана гэтая каманда, запытанае паведамленне будзе заставацца ў статусе "pending".
  5. Унутры Consumer Group кожны падпісчык можа запытваць гісторыю паведамленняў, якія былі дастаўлены менавіта яму, але яшчэ не былі апрацаваны (у статусе «pending»)

У некаторым сэнсе, стан групы можа быць перастаўлена так:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Цяпер прыйшоў час пазнаёміцца ​​з асноўнымі камандамі для Consumer Group, а менавіта:

  • XGROUP выкарыстоўваецца для стварэння, знішчэння і кіравання групамі
  • XREADGROUP выкарыстоўваецца для чытання патоку праз групу
  • XACK - гэта каманда дазваляе падпісанту пазначыць паведамленне як паспяхова апрацаванае

Стварэнне Consumer Group

Выкажам здагадку, што паток mystream ужо існуе. Тады каманда стварэння групы будзе мець выгляд:

> XGROUP CREATE mystream mygroup $
OK

Пры стварэнні групы мы павінны перадаць ідэнтыфікатар, пачынаючы з якога група будзе атрымліваць паведамленні. Калі мы хочам проста атрымліваць усе новыя паведамленні, то мы можам выкарыстоўваць спецыяльны ідэнтыфікатар $ (як у нашым прыкладзе вышэй). Калі замест спецыяльнага ідэнтыфікатара пазначыць 0, то групе будуць даступны ўсе паведамленні патоку.

Цяпер, калі група створана, мы можам адразу ж пачаць чытаць паведамленні з дапамогай каманды XREADGROUP. Гэтая каманда вельмі падобная на XREAD і падтрымлівае неабавязковую опцыю BLOCK. Аднак ёсць абавязковая опцыя GROUP, якая павінна быць заўсёды пазначана з двума аргументамі: імя групы і імя падпісчыка. Опцыя COUNT таксама падтрымліваецца.

Перш чым чытаць паток, давайце змесцім туды некалькі паведамленняў:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

А зараз паспрабуем прачытаць гэты струмень праз групу:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Прыведзеная вышэй каманда даслоўна абвяшчае наступнае:

"Я, Аліса-падпісчык, член гурта mygroup, хачу прачытаць з патоку mystream адно паведамленне, якое ніколі не было нікому дастаўлена раней".

Кожны раз, калі падпісчык выконвае аперацыю з групай, ён павінен пазначыць сваё імя, адназначна ідэнтыфікуючы сябе ўнутры групы. У прыведзенай вышэй камандзе ёсць яшчэ адна вельмі важная дэталь - спецыяльны ідэнтыфікатар «>». Гэты спецыяльны ідэнтыфікатар фільтруе паведамленні, пакідаючы толькі тыя, якія да гэтага часу ні разу не дастаўляліся.

Таксама, у асаблівых выпадках, вы можаце пазначыць рэальны ідэнтыфікатар, такі як 0 ці любы іншы сапраўдны ідэнтыфікатар. У гэтым выпадку каманда XREADGROUP верне вам гісторыю паведамленняў са статусам "pending", якія былі дастаўлены названаму падпісанту (Alice), але яшчэ не былі пацверджаны з дапамогай каманды XACK.

Мы можам праверыць гэтыя паводзіны, адразу паказаўшы ідэнтыфікатар 0, без опцыі COUNT. Мы проста ўбачым адзінае якое чакае паведамленне, гэта значыць паведамленне з яблыкам:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Аднак калі мы пацвердзім паведамленне як паспяхова апрацаванае, тое яно больш не будзе адлюстроўвацца:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Цяпер настала чарга Боба нешта прачытаць:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Боб, чалец групы mygroup, папрасіў не больш за два паведамленні. Каманда паведамляе толькі аб недастаўленых паведамленнях з-за спецыяльнага ідэнтыфікатара ">". Як бачыце, паведамленне "apple" не адлюструецца, бо яно ўжо дастаўлена Алісе, таму Боб атрымлівае "orange" і "strawberry".

Такім чынам, Аліса, Боб і любы іншы падпісчык групы могуць чытаць розныя паведамленні з аднаго і таго ж патоку. Таксама яны могуць чытаць сваю гісторыю неапрацаваных паведамленняў або пазначаць паведамленні як апрацаваныя.

Ёсць некалькі рэчаў, якія трэба мець на ўвазе:

  • Як толькі падпісчык лічыць паведамленне камандай XREADGROUP, гэтае паведамленне пераходзіць у стан «pending» і замацоўваецца за гэтым канкрэтным падпісчыкам. Іншыя падпісчыкі групы не змогуць прачытаць гэтае паведамленне.
  • Падпісчыкі аўтаматычна ствараюцца пры першым згадванні, няма неабходнасці ў іх яўным стварэнні.
  • З дапамогай XREADGROUP вы можаце чытаць паведамленні з некалькіх розных патокаў адначасова, аднак, каб гэта працавала, вам трэба папярэдне стварыць групы з аднолькавым імем для кожнага патоку з дапамогай XGROUP

Аднаўленне пасля збою

Падпісчык можа аднавіцца пасля збою і перачытаць свой спіс паведамленняў са статусам "pending". Аднак у рэальным свеце падпісчыкі могуць канчаткова пацярпець няўдачу. Што адбываецца з падвіслымі паведамленнямі падпісчыка, калі ён не змог аднавіцца пасля збою?
Consumer Group прапануе функцыю, якая выкарыстоўваецца менавіта для такіх выпадкаў - калі неабходна змяніць уладальніка паведамленняў.

Перш за ўсё неабходна выклікаць каманду XPENDING, якая адлюстроўвае ўсе паведамленні групы са статусам "pending". У сваёй найпростай форме каманда выклікаецца толькі з двума аргументамі: імем патоку і імем групы:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Каманда вывела колькасць неапрацаваных паведамленняў для ўсёй групы і для кожнага падпісчыка. У нас ёсць толькі Боб з двума неапрацаванымі паведамленнямі, таму што адзінае паведамленне, запытанае Алісай, было пацверджана з дапамогай XACK.

Мы можам запытаць дадатковую інфармацыю, выкарыстоўваючы больш аргументаў:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} — дыяпазон ідэнтыфікатараў (можна выкарыстоўваць "-" і "+")
{count} - колькасць спроб дастаўкі
{consumer-name} - імя групы

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Цяпер у нас ёсць дэталі для кожнага паведамлення: ідэнтыфікатар, імя падпісчыка, час прастою ў мілісекундах і, нарэшце, колькасць спроб дастаўкі. У нас ёсць два паведамленні ад Боба, і яны прастойваюць на працягу 74170458 мілісекунд, каля 20 гадзін.

Звярніце ўвагу, што ніхто не перашкаджае нам праверыць, якім быў змест паведамлення, проста выкарыстоўваючы XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Мы проста павінны паўтарыць адзін і той самы ідэнтыфікатар двойчы ў аргумэнтах. Цяпер, калі ў нас ёсць некаторая ідэя, Аліса можа вырашыць, што пасля 20 гадзін прастою Боб, верагодна, не адновіцца, і прыйшоў час запытаць гэтыя паведамленні і аднавіць іх апрацоўку замест Боба. Для гэтага мы выкарыстоўваем каманду XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

З дапамогай гэтай каманды мы можам атрымаць "чужое" паведамленне, якое яшчэ не было апрацавана, шляхам змены ўладальніка на {consumer}. Аднак мы таксама можам даць мінімальны час прастою {min-idle-time}. Гэта дапамагае пазбегнуць сітуацыю, калі два кліенты спрабуюць адначасова змяніць уладальніка ў адных і тых жа паведамленняў:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Першы кліент скіне час прастою і павялічыць лічыльнік колькасці даставак. Так што другі кліент не зможа яго запытаць.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Паведамленне было паспяхова запатрабавана Алісай, якая зараз можа апрацаваць паведамленне і пацвердзіць яго.

З прыведзенага вышэй прыкладу бачна, што паспяховае выкананне запыту вяртае змесціва самога паведамлення. Аднак гэта не абавязкова. Опцыя JUSTID можа выкарыстоўвацца для вяртання толькі ідэнтыфікатараў паведамлення. Гэта карысна, калі вам не цікавыя дэталі паведамлення і вы хочаце павялічыць прадукцыйнасць сістэмы.

Лічыльнік дастаўкі

Лічыльнік, які вы назіраеце ў выснове XPENDING - Гэта колькасць даставак кожнага паведамлення. Такі лічыльнік павялічваецца двума спосабамі: калі паведамленне паспяхова запатрабавана праз XCLAIM ці калі выкарыстоўваецца выклік XREADGROUP.

Гэта нармальна, што некаторыя паведамленні дастаўляюцца па некалькі разоў. Галоўнае, каб у выніку ўсе паведамленні былі апрацаваны. Часам пры апрацоўцы паведамлення ўзнікаюць праблемы з-за пашкоджанні самога паведамленне ці апрацоўка паведамлення выклікае памылку ў кодзе апрацоўшчыка. У такім выпадку можа аказацца, што гэтае паведамленне ніхто не будзе ў стане апрацаваць. Паколькі ў нас ёсць лічыльнік спроб дастаўкі, мы можам выкарыстоўваць гэты лічыльнік для выяўлення такіх сітуацый. Таму, як толькі лічыльнік даставак дасягне зададзенай вамі вялікай колькасці, верагодна, будзе разумней змясціць такое паведамленне ў іншы струмень і адправіць апавяшчэнне сістэмнаму адміністратару.

Стан патокаў

Каманда XINFO выкарыстоўваецца для запыту рознай інфармацыі патоку і яго групах. Напрыклад, базавы від каманды выглядае наступным чынам:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Каманда вышэй адлюстроўвае агульную інфармацыю па ўказаным патоку. Цяпер крыху больш складаны прыклад:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Каманда вышэй адлюстроўвае агульную інфармацыю па ўсіх групах названага патоку.

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Каманда вышэй адлюстроўвае інфармацыю па ўсіх падпісчыкам названага патоку і групы.
Калі вы забудзецеся сінтаксіс каманды, проста звернецеся за дапамогай да самой каманды:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Абмежаванне памеру патоку

Многія прыкладанні не хочуць збіраць дадзеныя ў паток вечна. Часта карысна мець максімальна дапушчальную колькасць паведамленняў у струмені. У астатніх выпадках карысна перанесці ўсе паведамленні з струменя ў іншае сталае сховішча пры дасягненні зададзенага памеру струменя. Абмежаваць памер патоку можна з дапамогай параметра MAXLEN у камандзе XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Пры выкарыстанні MAXLEN старыя запісы аўтаматычна выдаляюцца пры дасягненні названай даўжыні, таму паток мае пастаянны памер. Аднак абразанне ў гэтым выпадку адбываецца не самым прадукцыйным спосабам у памяці Redis. Палепшыць сітуацыю можна наступным чынам:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Аргумент ~ у прыкладзе вышэй азначае, што нам не абавязкова трэба абмяжоўваць даўжыню патоку канкрэтным значэннем. У нашым прыкладзе гэта можа быць любы лік больш ці роўна 1000 (напрыклад, 1000, 1010 ці 1030). Проста мы відавочна паказалі, што жадаем, каб наш струмень захоўваў не меней 1000 запісаў. Гэта робіць працу з памяццю нашмат больш эфектыўна ўнутры Redis.

Таксама існуе асобная каманда XTRIM, Якая выконвае тое ж самае:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Пастаяннае захоўванне і рэплікацыя

Redis Stream асінхронна рэплікуецца на slave ноды і захоўваецца ў файлы тыпу AOF (снапшот усіх дадзеных) і RDB (лог усіх аперацый запісу). Таксама падтрымліваецца рэплікацыя стану Consumer Groups. Таму, калі паведамленне знаходзіцца ў статусе "pending" на master нодзе, то на slave нодах гэтае паведамленне будзе мець такі ж статус.

Выдаленне асобных элементаў з патоку

Для выдалення паведамленняў існуе спецыяльная каманда XDEL. Каманда атрымлівае імя патоку, за якім ідуць ідэнтыфікатары паведамленняў, які неабходна выдаліць:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Пры выкарыстанні гэтай каманды трэба ўлічыць, што фактычна памяць будзе вызвалена не адразу.

Патокі нулявой даўжыні

Розніца паміж струменямі і іншымі структурамі дадзеных Redis складаецца ў тым, што калі іншыя структуры дадзеных больш не маюць элементаў усярэдзіне сябе, у якасці пабочнага эфекту, сама структура дадзеных будзе выдаленая з памяці. Так, напрыклад, адсартаваны набор будзе поўнасцю выдалены, калі выклік ZREM выдаліць апошні элемент. Замест гэтага струменям дазваляецца заставацца ў памяці, нават не маючы ніводнага элемента ўсярэдзіне.

Заключэнне

Redis Stream ідэальна падыходзіць для стварэння брокераў паведамленняў, чэргаў паведамленняў, уніфікаваных часопісаў і сістэм чата, якія захоўваюць гісторыю.

Як аднойчы сказаў Ніклаўс Вірт, праграмы - гэта алгарытмы плюс структуры дадзеных, а Redis ужо дае вам і тое, і іншае.

Крыніца: habr.com

Дадаць каментар