Redis Stream - билдирүү системаларыңыздын ишенимдүүлүгү жана масштабдуулугу

Redis Stream - билдирүү системаларыңыздын ишенимдүүлүгү жана масштабдуулугу

Redis Stream - бул 5.0 версиясы менен Redisге киргизилген жаңы абстрактуу маалымат түрү
Концептуалдык жактан алганда, Redis Stream - бул жазууларды кошо турган тизме. Ар бир жазуу уникалдуу идентификаторго ээ. Демейки боюнча, ID автоматтык түрдө түзүлөт жана убакыт белгисин камтыйт. Ошондуктан, сиз убакыттын өтүшү менен жазуулардын диапазондорун сурасаңыз болот же агымга келгенде жаңы маалыматтарды ала аласыз, Unix "tail -f" буйругу журнал файлын окуп, жаңы маалыматтарды күтүп жатканда катып калат. Көптөгөн "tail -f" процесстери файлды бир эле учурда бири-бирине карама-каршы келбей окуй алгандай эле, бир нече кардарлар жипти уга алышарын эске алыңыз.

Берилиштердин жаңы түрүнүн бардык артыкчылыктарын түшүнүү үчүн, келгиле, Redis Stream функционалдуулугун жарым-жартылай кайталаган көптөн бери иштеп келе жаткан Redis структураларын карап чыгалы.

Redis PUB/SUB

Redis Pub/Sub – бул сиздин негизги баалуулуктар дүкөнүңүзгө орнотулган жөнөкөй билдирүү системасы. Бирок, жөнөкөйлүк баада келет:

  • Эгерде басмачы кандайдыр бир себептер менен иштебей калса, анда ал бардык жазылуучуларын жоготот
  • Басмачы өзүнүн бардык жазылуучуларынын так дарегин билиши керек
  • Маалыматтар иштетилгенден тезирээк жарыяланса, жарыялоочу өзүнүн жазылуучуларын жумуш менен ашыкча жүктөй алат
  • Билдирүү жарыялангандан кийин дароо чыгаруучунун буферинен өчүрүлөт, ал канча абонентке жеткирилгенине жана алар бул билдирүүнү канчалык тез иштете алганына карабастан.
  • Бардык абоненттер билдирүүнү бир эле учурда алышат. Абоненттер өздөрү кандайдыр бир жол менен бир эле билдирүүнү иштетүү тартибин макулдашып алышы керек.
  • Абонент билдирүүнү ийгиликтүү иштеткенин тастыктоо үчүн орнотулган механизм жок. Абонент билдирүү алып, иштеп жатканда бузулуп калса, басмачы бул тууралуу билбей калат.

Redis тизмеси

Redis List окуу буйруктарын бөгөттөөнү колдогон маалымат структурасы. Сиз билдирүүлөрдү тизменин башынан же аягынан кошуп жана окуй аласыз. Бул түзүмдүн негизинде сиз бөлүштүрүлгөн тутумуңуз үчүн жакшы стек же кезек түзө аласыз жана көпчүлүк учурларда бул жетиштүү болот. Redis Pub/Subдан негизги айырмачылыктар:

  • Билдирүү бир кардарга жеткирилет. Биринчи окуу бөгөттөлгөн кардар биринчи маалыматты алат.
  • Клинт ар бир билдирүү үчүн окуу операциясын өзү башташы керек. Тизме кардарлар жөнүндө эч нерсе билбейт.
  • Билдирүүлөр кимдир бирөө аларды окумайынча же ачыктан-ачык жок кылмайынча сакталат. Эгер сиз Redis серверин дискке маалыматтарды жууш үчүн конфигурацияласаңыз, анда системанын ишенимдүүлүгү кескин жогорулайт.

Агымга киришүү

Агымга жазуу кошуу

команда XADD агымга жаңы жазууну кошот. Жазуу жөн эле сап эмес, ал бир же бир нече ачкыч-маани жуптарынан турат. Ошентип, ар бир жазуу мурунтан эле структураланган жана CSV файлынын структурасына окшош.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Жогорудагы мисалда биз агымга эки талааны кошобуз (ачкыч) "mystream": "сенсор-ид" жана "температура" тиешелүүлүгүнө жараша "1234" жана "19.8" маанилери менен. Экинчи аргумент катары, команда жазууга дайындала турган идентификаторду алат - бул идентификатор агымдагы ар бир жазууну уникалдуу түрдө аныктайт. Бирок, бул учурда биз * өттүк, анткени биз Redis биз үчүн жаңы ID түзүшүн каалайбыз. Ар бир жаңы ID көбөйөт. Демек, ар бир жаңы жазуу мурунку жазууларга салыштырмалуу жогорураак идентификаторго ээ болот.

Идентификатор форматы

команда тарабынан кайтарылган кирүү ID XADD, эки бөлүктөн турат:

{millisecondsTime}-{sequenceNumber}

миллисекундУбакыт — Unix убактысы миллисекунд менен (Redis серверинин убактысы). Бирок, эгерде учурдагы убакыт мурунку жаздыруу убактысы менен бирдей же азыраак болсо, анда мурунку жазуунун убакыт белгиси колдонулат. Демек, сервердин убактысы артка кетсе, жаңы идентификатор дагы эле өсүү касиетин сактап калат.

sequenceNumber ошол эле миллисекундда түзүлгөн жазуулар үчүн колдонулат. sequenceNumber мурунку жазууга салыштырмалуу 1 көбөйөт. Себеби sequenceNumber өлчөмү 64 бит болсо, анда иш жүзүндө бир миллисекунд ичинде түзүлө турган жазуулардын санына чек койбош керек.

Мындай идентификаторлордун форматы бир караганда кызыктай көрүнүшү мүмкүн. Ишенбеген окурман эмне үчүн убакыт идентификатордун бир бөлүгү экенине таң калышы мүмкүн. Себеби, Redis агымдары ID боюнча диапазондун сурамдарын колдойт. Идентификатор жазуу түзүлгөн убакыт менен байланышкандыктан, бул убакыт диапазондорун суроого мүмкүндүк берет. Биз буйрукту карап жатканда конкреттүү мисалды карап чыгабыз XRANGE.

Эгер кандайдыр бир себептерден улам колдонуучу, мисалы, кандайдыр бир тышкы система менен байланышкан өзүнүн идентификаторун көрсөтүү керек болсо, анда биз аны буйрукка өткөрүп бере алабыз. XADD төмөндө көрсөтүлгөндөй * ордуна:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Сураныч, бул учурда сиз ID өсүштү өзүңүз көзөмөлдөшүңүз керек экенин эске алыңыз. Биздин мисалда минималдуу идентификатор "0-1" болуп саналат, андыктан команда "0-1ге" барабар же андан азыраак башка идентификаторду кабыл албайт.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Ар бир агымдагы жазуулардын саны

Жөн гана буйрукту колдонуу менен агымдагы жазуулардын санын алууга болот XLEN. Биздин мисал үчүн, бул буйрук төмөнкү маанини кайтарат:

> XLEN somestream
(integer) 2

Диапазон сурамдары - XRANGE жана XREVRANGE

Дайындарды диапазон боюнча суроо үчүн биз эки идентификаторду көрсөтүүбүз керек - диапазонун башталышы жана аягы. Кайтарылган диапазон бардык элементтерди, анын ичинде чектерди камтыйт. Ошондой эле эки атайын “-” жана “+” идентификаторлору бар, алар агымдагы эң кичине (биринчи жазуу) жана эң чоң (акыркы жазуу) идентификаторун билдирет. Төмөндөгү мисал бардык агым жазууларын тизмелейт.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Ар бир кайтарылган жазуу эки элементтен турган массив: идентификатор жана ачкыч-маани жуптарынын тизмеси. Биз буга чейин рекорддук идентификаторлор убакытка байланыштуу экенин айтканбыз. Ошондуктан, биз белгилүү бир убакыт аралыгын сурай алабыз. Бирок, биз суроо-талапта толук идентификаторду эмес, Unix убактысын гана көрсөтө алабыз. sequenceNumber. Идентификатордун калтырылган бөлүгү диапазонун башында нөлгө жана диапазонун аягында мүмкүн болгон максималдуу мааниге автоматтык түрдө коюлат. Төмөндө эки миллисекунддук диапазонду кантип сураса боло турганыңыздын мисалы келтирилген.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Бул диапазондо бизде бир гана жазуу бар, бирок чыныгы маалымат топтомдорунда кайтарылган натыйжа чоң болушу мүмкүн. Ушул себептүү XRANGE COUNT параметрин колдойт. Санды көрсөтүү менен, биз жөн гана биринчи N жазууларды ала алабыз. Биз кийинки N жазууларды (беттөө) алуу үчүн керек болсо, биз акыркы кабыл алынган ID колдоно аласыз, аны көбөйтүү sequenceNumber бирден жана кайра сура. Муну төмөнкү мисалда карап көрөлү. менен 10 элементти кошо баштайбыз XADD (mystream мурунтан эле 10 элемент менен толтурулган болсо). Ар бир командага 2 элементти алуу үчүн итерацияны баштоо үчүн, биз толук диапазондон баштайбыз, бирок 2ге барабар COUNT менен.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Кийинки эки элемент менен итерациялоону улантуу үчүн, биз акыркы кабыл алынган идентификаторду тандап алышыбыз керек, б.а. 1519073279157-0 жана 1 кошуу керек sequenceNumber.
Натыйжадагы ID, бул учурда 1519073279157-1, эми кийинки чалуу үчүн диапазонун жаңы башталышы катары колдонулушу мүмкүн XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Жана башка. Анткени татаалдык XRANGE издөө үчүн O(log(N)), андан кийин M элементтерди кайтаруу үчүн O(M) болсо, анда ар бир итерация кадамы тез болот. Ошентип, колдонуу XRANGE агымдарды натыйжалуу кайталаса болот.

команда XREVRANGE эквивалент болуп саналат XRANGE, бирок элементтерди тескери тартипте кайтарат:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Буйрутма экенин эске алыңыз XREVRANGE диапазондун аргументтерин тескери тартипте баштоо жана токтотууну алат.

XREAD аркылуу жаңы жазууларды окуу

Көбүнчө агымга жазылуу жана жаңы билдирүүлөрдү гана алуу милдети пайда болот. Бул концепция Redis Pub/Sub же Redis List бөгөттөө сыяктуу көрүнүшү мүмкүн, бирок Redis Streamди кантип колдонууда негизги айырмачылыктар бар:

  1. Ар бир жаңы билдирүү демейки боюнча ар бир абонентке жеткирилет. Бул жүрүм-турум жаңы билдирүүнү бир гана абонент окуй турган бөгөттөөчү Redis тизмесинен айырмаланат.
  2. Redis Pub/Subда бардык билдирүүлөр унутулуп, эч качан сакталбайт, Streamде бардык билдирүүлөр чексиз сакталат (эгер кардар ачыктан-ачык өчүрүүгө себеп болбосо).
  3. Redis Stream бир агымдын ичиндеги билдирүүлөргө жетүүнү айырмалоого мүмкүндүк берет. Белгилүү бир абонент өзүнүн жеке билдирүү таржымалын гана көрө алат.

Сиз жипке жазылып, буйрукту колдонуу менен жаңы билдирүүлөрдү ала аласыз XREAD. Бул бир аз татаалыраак XRANGE, ошондуктан биз адегенде жөнөкөй мисалдардан баштайбыз.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Жогорудагы мисал бөгөттөлбөгөн форманы көрсөтөт XREAD. COUNT параметр милдеттүү эмес экенин эске алыңыз. Чынында, бир гана талап кылынган буйрук опциясы - бул STREAMS опциясы, ал агымдардын тизмесин жана тиешелүү максималдуу идентификаторду көрсөтөт. Биз “STREAMS mystream 0” деп жаздык – биз “0-0” идентификатору менен mystream агымынын бардык жазууларын алгыбыз келет. Мисалдан көрүнүп тургандай, буйрук жиптин атын кайтарып берет, анткени биз бир эле учурда бир нече жипке жазыла алабыз. Биз, мисалы, "STREAMS mystream otherstream 0 0" деп жазсак болот. STREAMS опциясынан кийин биз адегенде бардык керектүү агымдардын атын, андан кийин гана идентификаторлордун тизмесин беришибиз керек экенин эске алыңыз.

Бул жөнөкөй түрдө буйрук салыштырмалуу өзгөчө эч нерсе кылбайт XRANGE. Бирок, эң кызыгы, биз оңой эле бура алабыз XREAD BLOCK аргументин көрсөтүү менен бөгөттөө буйругуна:

> XREAD BLOCK 0 STREAMS mystream $

Жогорудагы мисалда жаңы BLOCK опциясы 0 миллисекунд тайм-ауту менен көрсөтүлгөн (бул чексиз күтүүнү билдирет). Мындан тышкары, mystream агымы үчүн кадимки идентификаторду өткөрүүнүн ордуна, атайын $ идентификатору өттү. Бул атайын идентификатор ушуну билдирет XREAD идентификатор катары mystream ичиндеги максималдуу идентификаторду колдонушу керек. Ошентип, биз угуп баштаган учурдан баштап гана жаңы билдирүүлөрдү алабыз. Кээ бир жагынан бул Unixтин "tail -f" буйругуна окшош.

BLOCK опциясын колдонууда сөзсүз түрдө $ атайын идентификаторун колдонуунун кереги жок экенин эске алыңыз. Биз агымда бар каалаган идентификаторду колдоно алабыз. Эгерде команда биздин өтүнүчүбүздү бөгөттөбөй дароо тейлей алса, анда аны аткарат, антпесе бөгөт коёт.

Бөгөттөө XREAD бир эле учурда бир нече жипти уга алат, сиз жөн гана алардын аттарын көрсөтүшүңүз керек. Бул учурда, буйрук маалыматтарды кабыл алган биринчи агымдын жазуусун кайтарат. Берилген жип үчүн бөгөттөлгөн биринчи жазылуучу маалыматты биринчи алат.

Керектөөчү топтор

Белгилүү тапшырмаларда биз бир жиптин ичиндеги билдирүүлөргө жазылуучу мүмкүнчүлүгүн чектөөнү каалайбыз. Бул пайдалуу болушу мүмкүн болгон мисал катары, билдирүүлөрдү иштетүүнү масштабдуу кылууга мүмкүндүк берүүчү жиптен ар кандай билдирүүлөрдү кабыл ала турган жумушчулар менен билдирүү кезеги болуп саналат.

Эгерде бизде C1, C2, C3 үч абоненти жана 1, 2, 3, 4, 5, 6, 7 билдирүүлөрүн камтыган жип бар деп элестетсек, анда билдирүүлөр төмөндөгү диаграммадагыдай кызмат кылат:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Бул эффектке жетүү үчүн Redis Stream Consumer Group деп аталган концепцияны колдонот. Бул концепция псевдо-абонентке окшош, ал агымдан маалыматтарды алат, бирок иш жүзүндө бир топтун ичинде бир нече абоненттер тейлеп, белгилүү кепилдиктерди берет:

  1. Ар бир билдирүү топтун ичиндеги башка абонентке жеткирилет.
  2. Топтун ичинде жазылуучулар аты-жөнү менен аныкталат, ал регистрге сезимтал сап болуп саналат. Эгерде абонент топтон убактылуу чыгып кетсе, ал өзүнүн уникалдуу ысымы менен топко калыбына келтирилиши мүмкүн.
  3. Ар бир керектөөчү тобу "биринчи окула элек билдирүү" концепциясын карманат. Абонент жаңы билдирүүлөрдү сураганда, ал топтун ичиндеги бир дагы абонентке мурда эч качан жеткирилбеген билдирүүлөрдү гана ала алат.
  4. Билдирүүнүн абонент тарабынан ийгиликтүү иштетилгенин ачык ырастоо үчүн буйрук бар. Бул буйрук чакырылганга чейин, суралган билдирүү "күтүүдөгү" абалында кала берет.
  5. Керектөөчүлөр тобунун ичинде ар бир абонент ага жеткирилген, бирок иштетиле элек билдирүүлөрдүн тарыхын сурай алат («күтүү» абалында)

Кандайдыр бир мааниде топтун абалы төмөнкүчө чагылдырууга болот:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Эми Керектөөчүлөр тобунун негизги командалары менен таанышууга убакыт келди, атап айтканда:

  • XGROUP топторду түзүү, жок кылуу жана башкаруу үчүн колдонулат
  • XREADGROUP топ аркылуу агымды окуу үчүн колдонулат
  • XACK - бул буйрук абонентке билдирүүнү ийгиликтүү иштетилген деп белгилөөгө мүмкүндүк берет

Керектөөчүлөр тобун түзүү

mystream мурунтан эле бар деп ойлойлу. Андан кийин топ түзүү буйругу төмөнкүдөй болот:

> XGROUP CREATE mystream mygroup $
OK

Топту түзүүдө биз идентификаторду өткөрүп беришибиз керек, андан баштап топ билдирүүлөрдү кабыл алат. Эгерде биз бардык жаңы билдирүүлөрдү алгыбыз келсе, анда биз $ атайын идентификаторун колдонсок болот (жогорку мисалдагыдай). Эгерде сиз атайын идентификатордун ордуна 0ду көрсөтсөңүз, анда жиптеги бардык билдирүүлөр топко жеткиликтүү болот.

Эми топ түзүлгөндөн кийин, биз дароо буйрукту колдонуп билдирүүлөрдү окуй баштайбыз XREADGROUP. Бул буйрук абдан окшош XREAD жана кошумча БЛОК опциясын колдойт. Бирок, талап кылынган GROUP параметри бар, ал ар дайым эки аргумент менен көрсөтүлүшү керек: топтун аты жана жазылуучунун аты. COUNT опциясы да колдоого алынат.

Теманы окуудан мурун, келгиле, ал жерге бир нече билдирүүлөрдү коёлу:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Эми бул агымды топ аркылуу окуганга аракет кылалы:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Жогорудагы буйрук сөзмө-сөз төмөндөгүдөй окулат:

"Мен, Алиса абоненти, менин группанын мүчөсү, mystreamден мурда эч кимге жеткирилбеген бир билдирүүнү окугум келет."

Абонент топко операция жасаган сайын, ал өзүн топтун ичинде уникалдуу түрдө идентификациялоо менен өзүнүн атын бериши керек. Жогорудагы буйрукта дагы бир абдан маанилүү деталь бар - атайын идентификатор ">". Бул атайын идентификатор билдирүүлөрдү чыпкалап, мурда эч качан жеткирилбегендерди гана калтырат.

Ошондой эле, өзгөчө учурларда, сиз 0 же башка жарактуу идентификатор сыяктуу чыныгы идентификаторду көрсөтө аласыз. Бул учурда буйрук XREADGROUP көрсөтүлгөн абонентке (Алиса) жеткирилген, бирок буйрукту колдонуу менен али тааныла элек билдирүүлөрдүн тарыхын сизге кайтарып берет. XACK.

Биз дароо ID 0 көрсөтүү менен бул жүрүм-турумду текшере алабыз, параметр жок COUNT. Биз жөн гана күтүүдөгү бир билдирүүнү, башкача айтканда, алма билдирүүсүн көрөбүз:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Бирок, эгер биз билдирүүнүн ийгиликтүү иштетилгенин ырастасак, анда ал мындан ары көрсөтүлбөйт:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Эми Бобдун бир нерсе окуй турган кезеги:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Боб, mygroup мүчөсү, экиден ашык эмес билдирүү сурады. Буйрук ">" атайын идентификаторуна байланыштуу жеткирилбеген билдирүүлөрдү гана билдирет. Көрүнүп тургандай, "алма" билдирүүсү көрүнбөйт, анткени ал Алисага жеткирилген, ошондуктан Боб "апельсин" жана "кулпунайды" алат.

Ошентип, Элис, Боб жана топтун башка абоненттери бир эле агымдан ар кандай билдирүүлөрдү окуй алышат. Алар ошондой эле иштетилбеген билдирүүлөрдүн тарыхын окуй алышат же кабарларды иштетилген деп белгилей алышат.

Эске ала турган бир нече нерселер бар:

  • Абонент кабарды буйрук деп эсептегенде эле XREADGROUP, бул билдирүү "күтүүдөгү" абалга өтөт жана ошол белгилүү абонентке дайындалат. Топтун башка жазылуучулары бул билдирүүнү окуй алышпайт.
  • Абоненттер биринчи айтылганда автоматтык түрдө түзүлөт, аларды түзүүнүн кереги жок.
  • Жардамы менен XREADGROUP сиз бир эле учурда бир нече түрдүү темалардан келген билдирүүлөрдү окуй аласыз, бирок бул иштеши үчүн, адегенде ар бир жип үчүн бирдей аталыштагы топторду түзүшүңүз керек. XGROUP

Катачылыктан кийин калыбына келтирүү

Абонент катадан айыгып, "күтүүдөгү" статусу менен өзүнүн билдирүүлөрүнүн тизмесин кайра окуй алат. Бирок, чыныгы дүйнөдө, абоненттер акыры ишке ашпай калышы мүмкүн. Абонент катадан калыбына келе албаса, анын тыгылып калган билдирүүлөрү эмне болот?
Керектөөчүлөр тобу дал ушундай учурларда колдонулуучу функцияны сунуштайт - билдирүүлөрдүн ээсин өзгөртүү керек болгондо.

Биринчи нерсе - бул буйрукту чакыруу XPENDING, ал "күтүүдөгү" статусу менен топтогу бардык билдирүүлөрдү көрсөтөт. Эң жөнөкөй формада буйрук эки гана аргумент менен чакырылат: жиптин аты жана топтун аты:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Команда бүтүндөй топ жана ар бир абонент үчүн иштетилбеген билдирүүлөрдүн санын көрсөттү. Бизде эки көрүнүктүү билдирүүсү бар Боб гана бар, анткени Элис сураган жалгыз билдирүү менен тастыкталган XACK.

Көбүрөөк аргументтер аркылуу көбүрөөк маалымат сурай алабыз:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - идентификаторлордун диапазону (сиз “-” жана “+” колдонсоңуз болот)
{count} — жеткирүү аракеттеринин саны
{consumer-name} - топтун аталышы

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Эми бизде ар бир билдирүүнүн чоо-жайы бар: идентификатор, абоненттин аты, миллисекунддагы бош убакыт жана акырында жеткирүү аракеттеринин саны. Бизде Бобдон эки билдирүү бар жана алар 74170458 миллисекунд, болжол менен 20 саат бою иштебей турушат.

Сураныч, эч ким бизге билдирүүнүн мазмунун жөн гана колдонуу менен текшерүүгө тоскоол болбогонун эске алыңыз XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Аргументтерде бир эле идентификаторду эки жолу кайталашыбыз керек. Эми бизде кандайдыр бир идея бар, Алиса 20 сааттык иштен кийин Боб калыбына келбей калат деп чечиши мүмкүн жана ал билдирүүлөрдү сурап, аларды Боб үчүн иштетүүнү улантууга убакыт келди. Бул үчүн биз буйрукту колдонобуз XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Бул буйрукту колдонуу менен биз ээсин {consumer} деп өзгөртүү менен иштетиле элек "чет элдик" билдирүүнү ала алабыз. Бирок, биз ошондой эле минималдуу бош туруп калуу убактысын да камсыздай алабыз {min-idle-time}. Бул эки кардар бир эле билдирүүлөрдүн ээсин өзгөртүүгө аракет кылган кырдаалдан качууга жардам берет:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Биринчи кардар токтоп калуу убактысын баштапкы абалга келтирет жана жеткирүү эсептегичти көбөйтөт. Ошентип, экинчи кардар аны сурай албайт.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Билдирүү Алиса тарабынан ийгиликтүү талап кылынды, ал эми билдирүүнү иштеп чыгып, аны тааный алат.

Жогорудагы мисалдан ийгиликтүү сурам билдирүүнүн мазмунун кайтарып берерин көрө аласыз. Бирок, бул зарыл эмес. JUSTID опциясы билдирүү ID'лерин гана кайтаруу үчүн колдонулушу мүмкүн. Бул билдирүүнүн чоо-жайына кызыкпасаңыз жана системанын иштешин жогорулатууну кааласаңыз пайдалуу.

Жеткирүү эсептегичи

Сиз чыгарууда көргөн эсептегич XPENDING ар бир билдирүүнүн жеткирүүлөрдүн саны болуп саналат. Мындай эсептегич эки жол менен көбөйөт: билдирүү ийгиликтүү аркылуу суралганда XCLAIM же чалуу колдонулганда XREADGROUP.

Кээ бир билдирүүлөрдүн бир нече жолу жеткирилиши нормалдуу көрүнүш. Эң негизгиси, бардык билдирүүлөр акыры иштетилет. Кээде билдирүүнү иштеп чыгууда көйгөйлөр пайда болот, анткени билдирүүнүн өзү бузулган же билдирүүнү иштетүү иштеткич кодунда катаны пайда кылат. Бул учурда, бул билдирүүнү эч ким иштете албай калышы мүмкүн. Бизде жеткирүү аракети эсептегич болгондуктан, биз бул эсептегичти мындай жагдайларды аныктоо үчүн колдоно алабыз. Ошондуктан, жеткирүүлөрдүн саны сиз көрсөткөн эң жогорку санга жеткенде, мындай билдирүүнү башка жипке коюп, системанын администраторуна эскертме жөнөтсөңүз керек.

Thread State

команда XINFO жип жана анын топтору жөнүндө ар кандай маалыматтарды суроо үчүн колдонулат. Мисалы, негизги буйрук төмөнкүдөй көрүнөт:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Жогорудагы буйрук көрсөтүлгөн агым жөнүндө жалпы маалыматты көрсөтөт. Эми бир аз татаалыраак мисал:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Жогорудагы буйрук көрсөтүлгөн жиптин бардык топтору үчүн жалпы маалыматты көрсөтөт

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Жогорудагы буйрук көрсөтүлгөн агымдын жана топтун бардык абоненттери үчүн маалыматты көрсөтөт.
Эгер сиз буйруктун синтаксисин унутуп калсаңыз, жөн гана буйруктун өзүнөн жардам сураңыз:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Агымдын көлөмүнүн чеги

Көптөгөн колдонмолор маалыматтарды түбөлүккө агымга чогултууну каалабайт. Ар бир жипке уруксат берилген билдирүүлөрдүн максималдуу саны көп учурда пайдалуу. Башка учурларда, белгиленген жиптин өлчөмүнө жеткенде бардык билдирүүлөрдү жиптен башка туруктуу дүкөнгө жылдыруу пайдалуу. Сиз буйруктагы MAXLEN параметрин колдонуп агымдын өлчөмүн чектей аласыз XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLENди колдонууда, эски жазуулар белгиленген узундукка жеткенде автоматтык түрдө жок кылынат, андыктан агымдын көлөмү туруктуу болот. Бирок, бул учурда бутоо Redis эс тутумунда эң эффективдүү түрдө болбойт. Сиз кырдаалды төмөндөгүдөй жакшыртсаңыз болот:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Жогорудагы мисалдагы ~ аргументи агымдын узундугун белгилүү бир мааниге чектөөнүн кереги жок дегенди билдирет. Биздин мисалда бул 1000ден чоң же барабар каалаган сан болушу мүмкүн (мисалы, 1000, 1010 же 1030). Биз агымыбыз жок дегенде 1000 жазууну сакташын каалай турганыбызды ачык айттык. Бул Redis ичинде эстутумду башкарууну кыйла натыйжалуу кылат.

Ошондой эле өзүнчө команда бар XTRIM, ошол эле нерсени жасайт:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Туруктуу сактоо жана репликация

Redis Stream асинхрондук түрдө кул түйүндөрүнө репликацияланат жана AOF (бардык маалыматтардын сүрөтү) жана RDB (бардык жазуу операцияларынын журналы) сыяктуу файлдарга сакталат. Керектөөчү топторунун абалын репликациялоо да колдоого алынат. Демек, эгер билдирүү башкы түйүндө “күтүүдө” абалында болсо, анда кул түйүндөрүндө бул билдирүү бирдей статуска ээ болот.

Агымдан айрым элементтерди алып салуу

Билдирүүлөрдү жок кылуу үчүн атайын буйрук бар XDEL. Буйрук жиптин атын, андан кийин өчүрүлө турган билдирүү ID'лерин алат:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Бул команданы колдонууда, сиз иш жүзүндө эстутум дароо бошотулбай турганын эске алуу керек.

Нөл узундуктагы агымдар

Агымдардын башка Redis маалымат түзүмдөрүнөн айырмасы, башка маалымат структураларында алардын ичинде элементтер жок болгондо, кошумча эффект катары, маалымат структурасынын өзү эстутумдан алынып салынат. Ошентип, мисалы, ZREM чакыруусу акыркы элементти алып салганда, сорттолгон топтом толугу менен алынып салынат. Анын ордуна, жиптердин ичинде эч кандай элементтер жок болсо да эс тутумда калууга уруксат берилет.

жыйынтыктоо

Redis Stream билдирүү брокерлерин, билдирүү кезектерин, бирдиктүү журналдарды жана тарыхты сактоочу чат системаларын түзүү үчүн идеалдуу.

Мен бир жолу айткандай Никлаус Вирт, программалар алгоритмдер плюс маалымат структуралары жана Redis сизге экөөнү тең берет.

Source: www.habr.com

Комментарий кошуу