Redis Stream – хабар алмасу жүйелерінің сенімділігі мен ауқымдылығы

Redis Stream – хабар алмасу жүйелерінің сенімділігі мен ауқымдылығы

Redis Stream – 5.0 нұсқасымен Redis жүйесінде енгізілген жаңа дерексіз деректер түрі
Тұжырымдама бойынша Redis Stream - бұл жазбаларды қосуға болатын Тізім. Әрбір жазбаның бірегей идентификаторы болады. Әдепкі бойынша, идентификатор автоматты түрде жасалады және оған уақыт белгісі кіреді. Сондықтан, уақыт өте келе жазбалар ауқымын сұрауға немесе Unix "tail -f" пәрмені журнал файлын оқып, жаңа деректерді күту кезінде қатып қалуы сияқты, ағынға келгенде жаңа деректерді алуға болады. Бірнеше клиент бір уақытта ағынды тыңдай алатынын ескеріңіз, сол сияқты көптеген "tail -f" процестері бір уақытта файлды бір-біріне қайшы келмей оқи алады.

Жаңа деректер түрінің барлық артықшылықтарын түсіну үшін Redis Stream функционалдығын ішінара қайталайтын бұрыннан бар Redis құрылымдарын жылдам қарастырайық.

Redis PUB/SUB

Redis Pub/Sub - бұл кілт-мән дүкеніне ендірілген қарапайым хабар алмасу жүйесі. Дегенмен, қарапайымдылық бағасы бар:

  • Егер баспагер қандай да бір себептермен сәтсіздікке ұшыраса, ол барлық жазылушыларын жоғалтады
  • Баспагер барлық жазылушылардың нақты мекенжайын білуі керек
  • Деректер өңделгеннен жылдамырақ жарияланса, баспагер өз жазылушыларын жұмыспен шамадан тыс жүктеуі мүмкін
  • Хабарлама қанша жазылушыға жеткізілгеніне және бұл хабарды қаншалықты жылдам өңдей алғанына қарамастан, жарияланғаннан кейін бірден баспагер буферінен жойылады.
  • Барлық жазылушылар хабарламаны бір уақытта алады. Жазылушылар өздері бір хабарламаны өңдеу тәртібін қандай да бір түрде келісуі керек.
  • Абонент хабарламаны сәтті өңдегенін растайтын кіріктірілген механизм жоқ. Егер жазылушы хабарлама алса және өңдеу кезінде бұзылса, баспагер бұл туралы білмейді.

Redis тізімі

Redis List – оқу пәрмендерін блоктауды қолдайтын деректер құрылымы. Хабарларды тізімнің басынан немесе соңынан қосуға және оқуға болады. Осы құрылымға сүйене отырып, сіз өзіңіздің таратылған жүйеңіз үшін жақсы стек немесе кезек жасай аласыз және көп жағдайда бұл жеткілікті болады. Redis Pub/Sub-тен негізгі айырмашылықтар:

  • Хабарлама бір клиентке жеткізіледі. Бірінші оқуға тыйым салынған клиент алдымен деректерді алады.
  • Клинт әрбір хабарлама үшін оқу әрекетін өзі бастауы керек. Тізім клиенттер туралы ештеңе білмейді.
  • Хабарламалар біреу оларды оқымайынша немесе анық түрде жоймайынша сақталады. Егер Redis серверін деректерді дискіге тазарту үшін конфигурацияласаңыз, жүйенің сенімділігі күрт артады.

Ағынға кіріспе

Ағынға жазба қосу

команда XADD ағынға жаңа жазба қосады. Жазба жай ғана жол емес, ол бір немесе бірнеше кілт-мән жұптарынан тұрады. Осылайша, әрбір жазба әлдеқашан құрылымдалған және CSV файлының құрылымына ұқсайды.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Жоғарыдағы мысалда біз «mystream» атауы (кілт) бар екі өрісті қосамыз: сәйкесінше «1234» және «19.8» мәндері бар «сенсор-идентификатор» және «температура». Екінші аргумент ретінде пәрмен жазбаға тағайындалатын идентификаторды алады - бұл идентификатор ағындағы әрбір жазбаны бірегей түрде анықтайды. Дегенмен, бұл жағдайда біз * тапсырдық, өйткені біз Redis біз үшін жаңа идентификаторды жасағанын қалаймыз. Әрбір жаңа идентификатор артады. Сондықтан әрбір жаңа жазба алдыңғы жазбаларға қатысты жоғарырақ идентификаторға ие болады.

Идентификатор пішімі

Пәрмен арқылы қайтарылған жазба идентификаторы XADD, екі бөліктен тұрады:

{millisecondsTime}-{sequenceNumber}

миллисекунд Уақыт — Unix уақыты миллисекундпен (Redis сервер уақыты). Дегенмен, егер ағымдағы уақыт алдыңғы жазбаның уақытымен бірдей немесе аз болса, онда алдыңғы жазбаның уақыт белгісі пайдаланылады. Сондықтан, егер сервер уақыты уақыт бойынша кері қайтарылса, жаңа идентификатор әлі де өсу сипатын сақтайды.

реттік нөмірі сол миллисекундта жасалған жазбалар үшін пайдаланылады. реттік нөмірі алдыңғы жазбаға қатысты 1-ге артады. Өйткені реттік нөмірі өлшемі 64 бит болса, онда іс жүзінде бір миллисекунд ішінде жасалуы мүмкін жазбалар санына шектеу қоюға болмайды.

Мұндай идентификаторлардың пішімі бір қарағанда оғаш көрінуі мүмкін. Сенімсіз оқырман уақыт неліктен идентификатордың бөлігі болып табылады деген сұрақ туындауы мүмкін. Себебі, Redis ағындары ID бойынша ауқым сұрауларын қолдайды. Идентификатор жазба жасалған уақытпен байланысты болғандықтан, бұл уақыт ауқымдарын сұрауға мүмкіндік береді. Біз пәрменді қарастырған кезде нақты мысалды қарастырамыз XRANGE.

Егер қандай да бір себептермен пайдаланушыға, мысалы, қандай да бір сыртқы жүйемен байланысқан өзінің идентификаторын көрсету қажет болса, біз оны пәрменге бере аламыз. XADD төменде көрсетілгендей * орнына:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Бұл жағдайда идентификатордың өсуін өзіңіз бақылауыңыз керек екенін ескеріңіз. Біздің мысалда минималды идентификатор «0-1» болып табылады, сондықтан команда «0-1» мәніне тең немесе одан аз басқа идентификаторды қабылдамайды.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Әр ағындағы жазбалар саны

Пәрменді қолдану арқылы ағындағы жазбалар санын алуға болады XLEN. Біздің мысал үшін бұл пәрмен келесі мәнді қайтарады:

> XLEN somestream
(integer) 2

Ауқым сұраулары - XRANGE және XREVRANGE

Деректерді диапазон бойынша сұрау үшін біз екі идентификаторды көрсетуіміз керек - диапазонның басы мен соңы. Қайтарылған ауқым барлық элементтерді, соның ішінде шекараларды қамтиды. Сондай-ақ екі арнайы идентификатор «-» және «+» бар, олар сәйкесінше ағындағы ең кіші (бірінші жазба) және ең үлкен (соңғы жазба) идентификаторды білдіреді. Төмендегі мысал барлық ағындық жазбаларды тізімдейді.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Әрбір қайтарылған жазба екі элементтің массиві болып табылады: идентификатор және кілт-мән жұптарының тізімі. Жазба идентификаторларының уақытқа байланысты екенін жоғарыда айттық. Сондықтан біз белгілі бір уақыт аралығын сұрай аламыз. Дегенмен, біз сұрауда толық идентификаторды емес, тек Unix уақытын ғана көрсете аламыз, оған қатысты бөлікті өткізіп жібереміз. реттік нөмірі. Идентификатордың түсірілген бөлігі автоматты түрде диапазонның басында нөлге және ауқымның соңында мүмкін болатын ең үлкен мәнге орнатылады. Төменде екі миллисекунд ауқымын қалай сұрауға болатынының мысалы берілген.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Бұл ауқымда бізде тек бір жазба бар, бірақ нақты деректер жиындарында қайтарылған нәтиже үлкен болуы мүмкін. Осы себептен XRANGE COUNT опциясын қолдайды. Санды көрсету арқылы біз жай ғана алғашқы N жазбаны ала аламыз. Келесі N жазбаны (беттеу) алу қажет болса, біз соңғы қабылданған идентификаторды пайдалана аламыз, оны көбейтеміз реттік нөмірі бір рет және қайтадан сұраңыз. Мұны келесі мысалда қарастырайық. Біз 10 элементті қосуды бастаймыз XADD (mystream қазірдің өзінде 10 элементпен толтырылған болса). Бір пәрменге 2 элемент алатын итерацияны бастау үшін біз толық ауқымнан бастаймыз, бірақ 2-ге тең COUNT.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Келесі екі элементпен итерацияны жалғастыру үшін бізге алынған соңғы идентификаторды, яғни 1519073279157-0 таңдап, 1 қосу керек. реттік нөмірі.
Нәтиже идентификаторы, бұл жағдайда 1519073279157-1, енді келесі қоңырау үшін ауқымның жаңа аргументі ретінде пайдаланылуы мүмкін XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Тағыда басқа. Өйткені күрделілік XRANGE іздеу үшін O(log(N)), содан кейін M элементтерді қайтару үшін O(M) болса, әрбір итерация қадамы жылдам. Осылайша, пайдалану XRANGE ағындарды тиімді қайталауға болады.

команда XREVRANGE эквивалент болып табылады XRANGE, бірақ элементтерді кері ретпен қайтарады:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Команда екенін ескеріңіз XREVRANGE ауқым аргументтерін кері ретпен бастау және тоқтату.

XREAD көмегімен жаңа жазбаларды оқу

Көбінесе ағынға жазылу және тек жаңа хабарламаларды алу міндеті туындайды. Бұл тұжырымдама Redis Pub/Sub немесе Redis тізімін бұғаттауға ұқсас болып көрінуі мүмкін, бірақ Redis Stream пайдалану жолында түбегейлі айырмашылықтар бар:

  1. Әрбір жаңа хабарлама әдепкі бойынша әрбір жазылушыға жеткізіледі. Бұл әрекет жаңа хабарды тек бір жазылушы оқитын блоктаушы Redis тізімінен ерекшеленеді.
  2. Redis Pub/Sub қолданбасында барлық хабарлар ұмытылады және ешқашан сақталмайды, Stream бағдарламасында барлық хабарлар шексіз сақталады (егер клиент анық жоюды тудырмаса).
  3. Redis Stream бір ағындағы хабарларға қол жеткізуді ажыратуға мүмкіндік береді. Белгілі бір жазылушы өзінің жеке хабарлама тарихын ғана көре алады.

Пәрменді пайдаланып ағынға жазылуға және жаңа хабарламаларды алуға болады XREAD. Бұл сәл күрделірек XRANGE, сондықтан біз алдымен қарапайым мысалдардан бастаймыз.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Жоғарыдағы мысал бұғаттамайтын пішінді көрсетеді XREAD. COUNT опциясының міндетті емес екенін ескеріңіз. Шын мәнінде, жалғыз талап етілетін пәрмен опциясы сәйкес максималды идентификатормен бірге ағындар тізімін көрсететін STREAMS опциясы болып табылады. Біз «STREAMS mystream 0» деп жаздық - біз «0-0» идентификаторы бар mystream ағынының барлық жазбаларын алғымыз келеді. Мысалдан көріп отырғаныңыздай, пәрмен ағынның атын қайтарады, себебі біз бір уақытта бірнеше ағынға жазыла аламыз. Біз, мысалы, "STREAMS mystream otherstream 0 0" деп жаза аламыз. STREAMS опциясынан кейін алдымен барлық қажетті ағындардың атауларын, содан кейін ғана идентификаторлар тізімін беру керек екенін ескеріңіз.

Бұл қарапайым пішінде пәрмен салыстырғанда ерекше ештеңе жасамайды XRANGE. Дегенмен, бір қызығы, біз оңай бұрыла аламыз XREAD BLOCK аргументін көрсете отырып, блоктау пәрменіне:

> XREAD BLOCK 0 STREAMS mystream $

Жоғарыдағы мысалда жаңа BLOCK опциясы 0 миллисекунд күту уақытымен көрсетілген (бұл шексіз күтуді білдіреді). Сонымен қатар, mystream ағыны үшін әдеттегі идентификаторды берудің орнына арнайы $ идентификаторы жіберілді. Бұл арнайы идентификатор мынаны білдіреді XREAD идентификатор ретінде mystream ішіндегі максималды идентификаторды пайдалану керек. Сондықтан біз тыңдай бастаған сәттен бастап жаңа хабарламаларды аламыз. Кейбір жолдармен бұл Unix «tail -f» пәрменіне ұқсас.

BLOCK опциясын пайдаланған кезде бізге арнайы $ идентификаторын пайдалану міндетті емес екенін ескеріңіз. Біз ағында бар кез келген идентификаторды пайдалана аламыз. Егер команда біздің сұрауымызды бұғаттамай дереу қызмет ете алса, ол мұны жасайды, әйтпесе ол блоктайды.

Блоктау XREAD сонымен қатар бірден бірнеше ағындарды тыңдай алады, тек олардың аттарын көрсету керек. Бұл жағдайда пәрмен деректерді алған бірінші ағынның жазбасын қайтарады. Берілген ағын үшін бұғатталған бірінші жазылушы деректерді алдымен алады.

Тұтынушылар топтары

Белгілі бір тапсырмаларда жазылушының бір ағындағы хабарларға қол жеткізуін шектегіміз келеді. Бұл пайдалы болуы мүмкін мысал - хабарды өңдеуді масштабтауға мүмкіндік беретін ағыннан әртүрлі хабарларды алатын жұмысшылар бар хабарлама кезегі.

Егер бізде C1, C2, C3 үш жазылушы және 1, 2, 3, 4, 5, 6, 7 хабарламалары бар ағын бар деп елестетсек, хабарламалар төмендегі диаграммадағыдай қызмет етеді:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Бұл әсерге жету үшін Redis Stream тұтынушылар тобы деп аталатын тұжырымдаманы пайдаланады. Бұл концепция ағыннан деректерді алатын псевдо-абонентке ұқсайды, бірақ нақты кепілдіктерді қамтамасыз ететін топтағы бірнеше жазылушыларға қызмет көрсетеді:

  1. Әрбір хабарлама топтағы басқа жазылушыға жеткізіледі.
  2. Топ ішінде жазылушылар өздерінің аты бойынша анықталады, бұл регистрді ескеретін жол. Егер жазылушы топтан уақытша шығып кетсе, оны өзінің бірегей атын пайдаланып топқа қалпына келтіруге болады.
  3. Әрбір Тұтынушы тобы «бірінші оқылмаған хабарлама» тұжырымдамасын ұстанады. Абонент жаңа хабарларды сұраған кезде, ол топтағы ешбір жазылушыға бұрын ешқашан жеткізілмеген хабарламаларды ғана қабылдай алады.
  4. Хабарламаның жазылушымен сәтті өңделгенін нақты растау пәрмені бар. Бұл пәрмен шақырылғанға дейін сұралған хабарлама «күтуде» күйінде қалады.
  5. Тұтынушылар тобында әрбір абонент өзіне жеткізілген, бірақ әлі өңделмеген («күтуде» күйінде) хабарламалар тарихын сұрай алады.

Белгілі бір мағынада топтың күйін келесідей көрсетуге болады:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Енді тұтынушылар тобына арналған негізгі командалармен танысу уақыты келді, атап айтқанда:

  • XGROUP топтарды құру, жою және басқару үшін қолданылады
  • XREADGROUP топ арқылы ағынды оқу үшін қолданылады
  • XACK - бұл пәрмен жазылушыға хабарламаны сәтті өңделген деп белгілеуге мүмкіндік береді

Тұтынушылар тобын құру

Mystream бұрыннан бар деп есептейік. Содан кейін топ құру пәрмені келесідей болады:

> XGROUP CREATE mystream mygroup $
OK

Топты құру кезінде біз идентификаторды беруіміз керек, одан топ хабарламалар алады. Егер біз барлық жаңа хабарламаларды алғымыз келсе, онда арнайы $ идентификаторын пайдалана аламыз (жоғарыдағы мысалдағыдай). Арнайы идентификатордың орнына 0 мәнін көрсетсеңіз, ағындағы барлық хабарлар топқа қолжетімді болады.

Енді топ құрылды, біз пәрмен арқылы хабарламаларды оқуды бірден бастай аламыз XREADGROUP. Бұл пәрменге өте ұқсас XREAD және қосымша БЛОКТАУ опциясын қолдайды. Дегенмен, әрқашан екі аргументпен көрсетілуі тиіс міндетті GROUP опциясы бар: топ атауы және жазылушы аты. COUNT опциясына да қолдау көрсетіледі.

Жіпті оқымас бұрын, сол жерге бірнеше хабарламалар жіберейік:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Енді осы ағынды топ арқылы оқып көрейік:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Жоғарыдағы пәрмен келесідей сөзбе-сөз оқылады:

«Мен, жазылушы Алиса, менің тобымның мүшесі, бұрын ешкімге жеткізілмеген mystream хабарламасын оқығым келеді.»

Абонент топта әрекетті орындаған сайын, ол топ ішінде өзін бірегей түрде сәйкестендіретін өз атын көрсетуі керек. Жоғарыдағы пәрменде тағы бір өте маңызды деталь бар - арнайы идентификатор «>». Бұл арнайы идентификатор хабарларды сүзеді, тек бұрын ешқашан жеткізілмегендерін қалдырады.

Сондай-ақ, ерекше жағдайларда 0 немесе кез келген басқа жарамды идентификатор сияқты нақты идентификаторды көрсетуге болады. Бұл жағдайда пәрмен XREADGROUP көрсетілген жазылушыға (Алиса) жеткізілген, бірақ пәрмен арқылы әлі расталмаған «күтуде» күйі бар хабарламалар тарихын қайтарады. XACK.

Біз бұл әрекетті опциясыз бірден 0 идентификаторын көрсету арқылы тексере аламыз COUNT. Біз жай ғана күтудегі хабарды, яғни алма хабарын көреміз:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Алайда, егер хабардың сәтті өңделгенін растасақ, ол енді көрсетілмейді:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Енді Бобтың бірдеңені оқитын кезегі:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Менің тобымның мүшесі Боб екі хабарламадан артық емес сұрады. Пәрмен тек «>» арнайы идентификаторына байланысты жеткізілмеген хабарламалар туралы хабарлайды. Көріп отырғаныңыздай, «алма» хабары көрсетілмейді, өйткені ол Алисаға жеткізілген, сондықтан Боб «апельсин» және «құлпынай» алады.

Осылайша, Элис, Боб және топтың кез келген басқа жазылушысы бір ағыннан әртүрлі хабарламаларды оқи алады. Олар сондай-ақ өңделмеген хабарлар тарихын оқи алады немесе хабарларды өңделген деп белгілей алады.

Есте сақтау керек бірнеше нәрсе бар:

  • Абонент хабарламаны бұйрық деп санаған бойда XREADGROUP, бұл хабар «күтудегі» күйге өтеді және сол нақты жазылушыға тағайындалады. Басқа топ жазылушылары бұл хабарламаны оқи алмайды.
  • Жазылушылар бірінші айтылған кезде автоматты түрде жасалады, оларды нақты жасаудың қажеті жоқ.
  • Көмегімен XREADGROUP сіз бір уақытта бірнеше түрлі ағындардан хабарларды оқи аласыз, бірақ бұл жұмыс істеуі үшін алдымен әр ағын үшін бірдей атаумен топтар жасау керек. XGROUP

Сәтсіздіктен кейін қалпына келтіру

Абонент сәтсіздікті қалпына келтіріп, «күтуде» күйі бар хабарламалар тізімін қайта оқи алады. Дегенмен, нақты әлемде жазылушылар ақыр соңында сәтсіздікке ұшырауы мүмкін. Абонент ақаулықтан арыла алмаса, жазылушының тұрып қалған хабарламаларымен не болады?
Consumer Group дәл осындай жағдайларда – хабарлардың иесін өзгерту қажет болғанда қолданылатын мүмкіндікті ұсынады.

Сізге бірінші кезекте пәрменді шақыру керек XPENDING, ол «күтуде» күйімен топтағы барлық хабарларды көрсетеді. Ең қарапайым түрде пәрмен тек екі аргументпен шақырылады: ағын аты және топ атауы:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Команда бүкіл топ пен әрбір жазылушы үшін өңделмеген хабарламалардың санын көрсетті. Бізде тек екі тамаша хабары бар Боб бар, себебі Алиса сұраған жалғыз хабар расталды XACK.

Қосымша аргументтер арқылы қосымша ақпаратты сұрай аламыз:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - идентификаторлар ауқымы («-» және «+» қолдануға болады)
{count} — жеткізу әрекеттерінің саны
{consumer-name} - топ атауы

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Енді бізде әрбір хабарламаның мәліметтері бар: идентификатор, жазылушы аты, миллисекундтағы бос тұру уақыты және соңында жеткізу әрекеттерінің саны. Бізде Бобтан екі хабар бар және олар 74170458 миллисекунд, шамамен 20 сағат бойы жұмыс істемей тұрды.

Хабарламаның мазмұнын жай пайдалану арқылы тексеруге бізге ешкім кедергі келтірмейтінін ескеріңіз XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Дәлелдемелерде бір идентификаторды екі рет қайталау керек. Енді бізде біраз идея бар, Алиса 20 сағаттық үзілістен кейін Боб қалпына келмейтінін және сол хабарларды сұрап, оларды Боб үшін өңдеуді жалғастыратын уақыт келді деп шешуі мүмкін. Ол үшін пәрменді қолданамыз XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Бұл пәрменді пайдаланып, иесін {consumer} деп өзгерту арқылы әлі өңделмеген "бөтен" хабарды аламыз. Дегенмен, біз ең аз бос тұру уақытын {min-idle-time} қамтамасыз ете аламыз. Бұл екі клиент бір хабарламалардың иесін бір уақытта өзгертуге тырысатын жағдайды болдырмауға көмектеседі:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Бірінші тұтынушы тоқтау уақытын қалпына келтіреді және жеткізу есептегішін арттырады. Сондықтан екінші клиент оны сұрай алмайды.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Хабарды Алиса сәтті талап етті, енді ол хабарды өңдеп, оны мойындай алады.

Жоғарыдағы мысалдан сәтті сұрау хабарламаның мазмұнын қайтаратынын көруге болады. Дегенмен, бұл қажет емес. JUSTID опциясын тек хабар идентификаторларын қайтару үшін пайдалануға болады. Бұл хабардың егжей-тегжейлері сізді қызықтырмаса және жүйе өнімділігін арттырғыңыз келсе пайдалы.

Жеткізу есептегіші

Шығуда көріп тұрған есептегіш XPENDING әрбір хабарламаның жеткізілім саны болып табылады. Мұндай санауыш екі жолмен көбейтіледі: хабарлама арқылы сәтті сұралғанда XCLAIM немесе қоңырау пайдаланылғанда XREADGROUP.

Кейбір хабарламалардың бірнеше рет жеткізілуі қалыпты жағдай. Ең бастысы, барлық хабарламалар ақырында өңделеді. Кейде хабарламаны өңдеу кезінде ақаулықтар туындайды, себебі хабарламаның өзі бүлінген немесе хабарды өңдеу өңдеуші кодында қатені тудырады. Бұл жағдайда бұл хабарламаны ешкім өңдей алмайтыны белгілі болуы мүмкін. Бізде жеткізу әрекетінің есептегіші болғандықтан, біз мұндай жағдайларды анықтау үшін осы есептегішті пайдалана аламыз. Сондықтан, жеткізу саны сіз көрсеткен жоғары санға жеткенде, мұндай хабарламаны басқа ағынға қойып, жүйе әкімшісіне хабарландыру жіберген дұрыс болар еді.

Жіп күйі

команда XINFO ағын және оның топтары туралы әртүрлі ақпаратты сұрау үшін қолданылады. Мысалы, негізгі пәрмен келесідей көрінеді:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Жоғарыдағы пәрмен көрсетілген ағын туралы жалпы ақпаратты көрсетеді. Енді сәл күрделірек мысал:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Жоғарыдағы пәрмен көрсетілген ағынның барлық топтары үшін жалпы ақпаратты көрсетеді

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Жоғарыдағы пәрмен көрсетілген ағын мен топтың барлық жазылушыларына арналған ақпаратты көрсетеді.
Пәрмен синтаксисін ұмытып қалсаңыз, пәрменнің өзінен көмек сұраңыз:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Ағын өлшемі шегі

Көптеген қолданбалар деректерді ағынға мәңгілікке жинағысы келмейді. Әр ағынға рұқсат етілген хабарлардың максималды саны болуы жиі пайдалы. Басқа жағдайларда, көрсетілген ағын өлшеміне жеткенде барлық хабарларды ағыннан басқа тұрақты қоймаға жылжыту пайдалы. Пәрмендегі MAXLEN параметрін пайдаланып ағынның өлшемін шектеуге болады XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLEN пайдаланған кезде ескі жазбалар белгіленген ұзындыққа жеткенде автоматты түрде жойылады, сондықтан ағынның тұрақты өлшемі болады. Дегенмен, бұл жағдайда кесу Redis жадында ең тиімді түрде болмайды. Жағдайды келесідей жақсартуға болады:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Жоғарыдағы мысалдағы ~ аргументі ағын ұзындығын белгілі бір мәнге шектеудің міндетті емес екенін білдіреді. Біздің мысалда бұл 1000-нан үлкен немесе оған тең кез келген сан болуы мүмкін (мысалы, 1000, 1010 немесе 1030). Біз ағынның кемінде 1000 жазбаны сақтауын қалайтынымызды анық көрсеттік. Бұл Redis ішінде жадты басқаруды әлдеқайда тиімді етеді.

Сондай-ақ бөлек команда бар XTRIM, ол бірдей нәрсені жасайды:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Тұрақты сақтау және қайталау

Redis Stream бағынышты түйіндерге асинхронды түрде көшіріледі және AOF (барлық деректердің суреті) және RDB (барлық жазу әрекеттерінің журналы) сияқты файлдарға сақталады. Тұтынушы топтары күйінің репликациясына да қолдау көрсетіледі. Сондықтан, егер хабар негізгі түйінде «күтуде» күйінде болса, бағынышты түйіндерде бұл хабар бірдей күйге ие болады.

Ағыннан жеке элементтерді жою

Хабарламаларды жою үшін арнайы команда бар XDEL. Пәрмен ағынның атын, одан кейін жойылатын хабар идентификаторларын алады:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Бұл пәрменді пайдаланған кезде нақты жад бірден босатылмайтынын ескеру қажет.

Ұзындығы нөлдік ағындар

Ағындар мен басқа Redis деректер құрылымдарының арасындағы айырмашылық мынада: басқа деректер құрылымдарында енді элементтер болмаған кезде жанама әсер ретінде деректер құрылымының өзі жадтан жойылады. Мәселен, мысалы, ZREM шақыруы соңғы элементті жойған кезде сұрыпталған жиын толығымен жойылады. Оның орнына, ағындар ішінде ешқандай элементтер болмаса да жадта қалуға рұқсат етіледі.

қорытынды

Redis Stream хабар брокерлерін, хабарламалар кезегін, бірыңғай журналды және тарихты сақтайтын чат жүйелерін құру үшін өте қолайлы.

Бірде айтқанымдай Никлаус Вирт, бағдарламалар алгоритмдер және деректер құрылымдары болып табылады және Redis сізге екеуін де береді.

Ақпарат көзі: www.habr.com

пікір қалдыру