Redis Stream — надійність та масштабованість ваших систем повідомлень

Redis Stream — надійність та масштабованість ваших систем повідомлень

Redis Stream — новий абстрактний тип даних, представлений у Redis із виходом версії 5.0
Концептуально Redis Stream – це List, до якого ви можете додавати записи. Кожен запис має унікальний ідентифікатор. За замовчуванням ідентифікатор генерується автоматично і включає тимчасову мітку. Тому ви можете запитувати діапазони записів за часом або отримувати нові дані в міру їх надходження в потік, як Unix команда "tail-f" читає лог-файл і завмирає в очікуванні нових даних. Зверніть увагу, що потік можуть слухати одночасно кілька клієнтів, як багато "tail -f" процеси можуть одночасно читати файл, не конфліктуючи один з одним.

Щоб зрозуміти всі переваги нового типу даних, давайте швидко згадаємо давно існуючі структури Redis, які частково повторюють функціональність Redis Stream.

Redis PUB/SUB

Redis Pub/Sub — проста система повідомлень, що вже вбудована у ваше key-value сховище. Проте за простоту доводиться платити:

  • Якщо видавець з якихось причин виходить з ладу, він втрачає всіх своїх передплатників
  • Видавцеві необхідно знати точну адресу всіх його передплатників
  • Видавець може перевантажити роботою своїх передплатників, якщо дані публікуються швидше, ніж обробляються
  • Повідомлення видаляється з буфера видавця відразу після публікації, незалежно від того, якій кількості передплатників воно доставлено і як швидко ті зуміли обробити це повідомлення.
  • Усі передплатники отримають повідомлення одночасно. Передплатники самі повинні якось між собою узгоджувати порядок обробки того самого повідомлення.
  • Немає вбудованого механізму підтвердження успішної обробки повідомлення передплатником. Якщо передплатник отримав повідомлення та впав під час обробки, то видавець про це не дізнається.

Redis List

Redis List - структура даних, що підтримує команди читання із блокуванням. Ви можете додавати та зчитувати повідомлення з початку або кінця списку. На базі цієї структури можна зробити непоганий стек або чергу для вашої розподіленої системи і цього в більшості випадків буде достатньо. Основні відмінності від Redis Pub/Sub:

  • Повідомлення надсилається одному клієнту. Перший заблокований читанням клієнт отримає дані першим.
  • Клінт повинен сам ініціювати операцію для читання кожного повідомлення. List нічого не знає про клієнтів.
  • Повідомлення зберігаються доти, доки їх хтось не рахує або не видалить явно. Якщо ви налаштували Redis сервер, щоб він скидав дані на диск, надійність системи різко зростає.

Введення у Stream

Додавання запису до потоку

Команда XADD додає новий запис до потоку. Запис - це не просто рядок, він складається з однієї або декількох пар ключ-значення. Таким чином, кожен запис вже структурований і нагадує структуру CSV файлу.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

У прикладі вище ми додаємо в потік з ім'ям (ключом) "mystream" два поля: "sensor-id" і "temperature" зі значеннями "1234" і "19.8" відповідно. Як другий аргумент команда приймає ідентифікатор, який буде присвоєно записи — цей ідентифікатор однозначно ідентифікує кожен запис у потоці. Однак у цьому випадку ми передали *, тому що хочемо, щоб Redis згенерував для нас новий ідентифікатор. Кожен новий ідентифікатор збільшуватиметься. Тому кожен новий запис матиме більший ідентифікатор по відношенню до попередніх записів.

Формат ідентифікатора

Ідентифікатор запису, що повертається командою XADD, складається з двох частин:

{millisecondsTime}-{sequenceNumber}

millisecondsTime - Unix час у мілісекундах (час сервера Redis). Однак якщо поточний час виявляється таким же або меншим, ніж час попереднього запису, то використовується тимчасова позначка попереднього запису. Тому якщо час сервера повертається в минуле, новий ідентифікатор все ще буде зберігати властивість збільшення.

порядковий номер використовується для записів, створених в одну і ту ж мілісекунду. порядковий номер буде збільшено на 1 щодо попереднього запису. Оскільки порядковий номер має розмір 64 біти, то на практиці ви не повинні впертися в обмеження на кількість записів, які можуть бути згенеровані протягом однієї мілісекунди.

Формат таких ідентифікаторів, на перший погляд, може здатися дивним. Недовірливий читач може поставити запитання, чому час є частиною ідентифікатора. Причина в тому, що потоки Redis підтримують запити діапазону ідентифікаторів. Оскільки ідентифікатор пов'язаний з часом створення запису, це дає можливість вимагати діапазони часу. Ми розглянемо конкретний приклад, коли перейдемо до вивчення команди XRANGE.

Якщо з якоїсь причини користувачеві потрібно вказати свій власний ідентифікатор, який, наприклад, пов'язаний із якоюсь зовнішньою системою, ми можемо його передати команді XADD замість знака * як показано нижче:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Зауважте, що в цьому випадку ви повинні самі стежити за збільшенням ідентифікатора. У прикладі мінімальний ідентифікатор дорівнює «0-1», тому команда не прийме ще один ідентифікатор, який дорівнює або менше «0-1».

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Число записів у потоці

Можна отримати кількість записів у потоці, просто використовуючи команду XLEN. Для нашого прикладу ця команда поверне таке значення:

> XLEN somestream
(integer) 2

Запити по діапазону - XRANGE та XREVRANGE

Щоб запросити дані щодо діапазону, нам потрібно вказати два ідентифікатори – початок та кінець діапазону. Повертається діапазон включатиме всі елементи, включаючи межі. Також існують два спеціальні ідентифікатори «-» і «+», що відповідно означають найменший (перший запис) і найбільший (останній запис) ідентифікатор у потоці. Приклад нижче виведе усі записи потоку.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Кожний запис, що повертається, являє собою масив з двох елементів: ідентифікатор і список пар ключ-значення. Ми вже говорили, що ідентифікатори запису стосуються часу. Тому ми можемо вимагати діапазон конкретного проміжку часу. Однак, ми можемо вказати в запиті не повний ідентифікатор, а тільки Unix час, опустивши частину, що стосується порядковий номер. Опущена частина ідентифікатора автоматично прирівнюється до нуля на початку діапазону та максимально можливого значення в кінці діапазону. Нижче наведено приклад, як можна запросити діапазон, що дорівнює двом мілісекундам.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

У нас є тільки один запис у цьому діапазоні, однак у реальних наборах даних результат, що повертається, може бути величезним. З цієї причини XRANGE підтримує опцію COUNT. Вказавши кількість, ми можемо легко отримати перші N записів. Якщо нам потрібно отримати наступні N записи (пагінація), ми можемо використовувати останній отриманий ідентифікатор, збільшити у нього порядковий номер на одиницю та запросити знову. Погляньмо на це в наступному прикладі. Ми починаємо додавати 10 елементів за допомогою XADD (Припустимо, що потік mystream вже був заповнений 10 елементами). Щоб розпочати ітерацію, отримуючи по 2 елементи на команду, ми починаємо з повного діапазону, але з COUNT рівним 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Щоб продовжити ітерацію з наступними двома елементами, нам потрібно вибрати останній отриманий ідентифікатор, тобто 1519073279157-0, та додати 1 до порядковий номер.
Результуючий ідентифікатор, в даному випадку 1519073279157-1, тепер його можна використовувати як новий аргумент початку діапазону для наступного виклику XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

І так далі. Оскільки складність XRANGE становить O(log(N)) для пошуку, а потім O(M) для повернення елементів M, то кожен крок ітерації є швидким. Таким чином, за допомогою XRANGE можна ефективно ітерувати потоки.

Команда XREVRANGE є еквівалентом XRANGE, але повертає елементи у зворотному порядку:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Зверніть увагу, що команда XREVRANGE приймає аргументи діапазону start та stop у зворотному порядку.

Читання нових записів за допомогою XREAD

Часто виникає завдання підписатися на потік та отримувати тільки нові повідомлення. Ця концепція може бути схожою на Redis Pub/Sub або блокуючий Redis List, але є принципові відмінності в тому, як використовувати Redis Stream:

  1. Кожне нове повідомлення за замовчуванням надсилається кожному передплатнику. Ця поведінка відрізняється від блокуючого Redis List, де нове повідомлення буде прочитане лише якимсь одним передплатником.
  2. У той час, як у Redis Pub/Sub всі повідомлення забуваються і ніколи не зберігаються, в Stream всі повідомлення зберігаються на невизначений термін (якщо клієнт явно не викличе видалення).
  3. Redis Stream дозволяє розмежувати доступ до повідомлень усередині одного потоку. Конкретний передплатник може бачити лише свою особисту історію повідомлень.

Ви можете підписатися на потік і отримувати нові повідомлення за допомогою команди XREAD. Це трохи складніше, ніж XRANGEТому ми спочатку почнемо з прикладів простіше.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

У прикладі вище вказано неблокуючу форму XREAD. Зауважте, що опція COUNT не є обов'язковою. Фактично єдиною обов'язковою опцією команди є опція STREAMS, яка задає список потоків разом із максимальним ідентифікатором. Ми написали "STREAMS mystream 0" - ми хочемо отримувати всі записи потоку mystream з ідентифікатором більше ніж "0-0". Як очевидно з прикладу, команда повертає ім'я потоку, оскільки ми можемо підписатися кілька потоків одночасно. Ми могли б написати, наприклад, "STREAMS mystream otherstream 0 0". Зверніть увагу, що після налаштування STREAMS нам потрібно спочатку надати імена всіх потрібних потоків і тільки потім список ідентифікаторів.

У цій простій формі команда не робить нічого особливого в порівнянні з XRANGE. Однак цікаво те, що ми можемо легко перетворити XREAD до блокуючої команди, вказавши аргумент BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

У наведеному вище прикладі вказана нова опція BLOCK з часом очікування 0 мілісекунд (це означає нескінченне очікування). Більше того, замість передачі звичайного ідентифікатора для потоку mystream, було передано спеціальний ідентифікатор $. Цей спеціальний ідентифікатор означає, що XREAD повинен використовувати як ідентифікатор максимальний ідентифікатор у потоці mystream. Отже, ми будемо отримувати тільки нові повідомлення, починаючи з моменту, коли ми почали прослуховування. У певному сенсі це схоже на Unix команду tail-f.

Зауважте, що при використанні опції BLOCK нам не обов'язково потрібно використовувати спеціальний ідентифікатор $. Ми можемо використовувати будь-який ідентифікатор, що існує в потоці. Якщо команда зможе обслужити наш запит негайно, без блокування, вона зробить це, інакше вона заблокується.

Блокуючий XREAD також може прослуховувати одразу кілька потоків, просто потрібно вказати їхні імена. У цьому випадку команда поверне запис першого потоку, до якого надійшли дані. Перший передплатник, заблокований для цього потоку, отримуватиме дані першим.

Групи споживачів

У певних завданнях ми хочемо розмежувати доступ передплатників до повідомлень усередині одного потоку. Прикладом, коли це може бути корисно - черга повідомлень з воркерами, які будуть отримувати різні повідомлення потоку, що дозволяє масштабувати обробку повідомлень.

Якщо ми уявимо, що у нас є три передплатники C1, C2, C3 та потік, який містить повідомлення 1, 2, 3, 4, 5, 6, 7, то обслуговування повідомлень буде відбуватися як на діаграмі нижче:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Щоб отримати цей ефект, Redis Stream використовує концепцію Consumer Group. Ця концепція подібна до псевдо-передплатника, який отримує дані з потоку, але фактично обслуговується кількома передплатниками всередині групи, надаючи певні гарантії:

  1. Кожне повідомлення надсилається різним передплатникам усередині групи.
  2. У межах групи передплатники ідентифікуються на ім'я, яке є рядком з урахуванням регістру. Якщо якийсь передплатник тимчасово випаде з групи, він може відновитися в групу за власним унікальним ім'ям.
  3. Кожна Consumer Group слідує концепції «перше непрочитане повідомлення». Коли передплатник запитує нові повідомлення, він може отримати лише ті повідомлення, які ніколи раніше не доставлялися жодному передплатнику всередині групи.
  4. Існує команда явного підтвердження успішної обробки повідомлення передплатником. Поки не буде викликана ця команда, повідомлення буде залишатися в статусі «pending» .
  5. Усередині Consumer Group кожен передплатник може запитувати історію повідомлень, які були доставлені саме йому, але ще не були опрацьовані (у статусі «pending»)

У певному сенсі, стан групи може бути поставлено так:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Тепер настав час познайомитися з основними командами для Consumer Group, а саме:

  • XGROUP використовується для створення, знищення та управління групами
  • XREADGROUP використовується для читання потоку через групу
  • XACK — ця команда дозволяє передплатнику позначити повідомлення як успішно оброблене

Створення Consumer Group

Припустимо, що потік mystream вже існує. Тоді команда створення групи матиме вигляд:

> XGROUP CREATE mystream mygroup $
OK

При створенні групи ми повинні передати ідентифікатор, з якого група отримуватиме повідомлення. Якщо ми хочемо просто отримувати нові повідомлення, то ми можемо використовувати спеціальний ідентифікатор $ (як у нашому прикладі вище). Якщо замість спеціального ідентифікатора вказати 0, групі будуть доступні всі повідомлення потоку.

Тепер, коли група створена, ми можемо відразу ж почати читати повідомлення за допомогою команди XREADGROUP. Ця команда дуже схожа на XREAD та підтримує необов'язкову опцію BLOCK. Однак є обов'язкова опція GROUP, яка повинна завжди бути вказана з двома аргументами: ім'я групи та ім'я передплатника. Опція COUNT також підтримується.

Перш ніж читати потік, давайте помістимо туди кілька повідомлень:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

А тепер спробуємо прочитати цей потік через групу:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Наведена вище команда дослівно каже наступне:

"Я, Аліса-передплатник, член групи mygroup, хочу прочитати з потоку mystream одне повідомлення, яке ніколи не було нікому доставлене раніше".

Щоразу, коли передплатник виконує операцію з групою, він має вказати своє ім'я, однозначно ідентифікуючи себе всередині групи. У наведеній вище команді є ще одна дуже важлива деталь спеціальний ідентифікатор «>». Цей спеціальний ідентифікатор фільтрує повідомлення, залишаючи тільки ті, які досі не доставлялися.

Також, в особливих випадках, можна вказати реальний ідентифікатор, такий як 0 або будь-який інший дійсний ідентифікатор. У цьому випадку команда XREADGROUP поверне вам історію повідомлень зі статусом pending, які були доставлені вказаному передплатнику (Alice), але ще не були підтверджені за допомогою команди XACK.

Ми можемо перевірити цю поведінку, відразу вказавши ідентифікатор 0, без опції COUNT. Ми просто побачимо єдине повідомлення, що очікує, тобто повідомлення з яблуком:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Однак якщо ми підтвердимо повідомлення як успішно оброблене, воно більше не відображатиметься:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Тепер настала черга Боба щось прочитати:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Боб, член групи mygroup, попросив трохи більше двох повідомлень. Команда повідомляє лише про недоставлені повідомлення через спеціальний ідентифікатор «>». Як бачите, повідомлення "apple" не відображається, оскільки воно вже доставлене Алісі, тому Боб отримує "orange" та "strawberry".

Таким чином, Аліса, Боб та будь-який інший передплатник групи можуть читати різні повідомлення з одного й того самого потоку. Також вони можуть читати історію необроблених повідомлень або позначати повідомлення як оброблені.

Є кілька речей, які потрібно мати на увазі:

  • Як тільки передплатник вважає повідомлення командою XREADGROUP, це повідомлення перетворюється на стан «pending» і закріплюється за цим конкретним передплатником. Інші передплатники групи не зможуть прочитати це повідомлення.
  • Передплатники автоматично створюються при першій згадці, немає необхідності в їхньому явному створенні.
  • За допомогою XREADGROUP ви можете читати повідомлення з декількох різних потоків одночасно, однак, щоб це працювало, вам потрібно заздалегідь створити групи з однаковим ім'ям для кожного потоку за допомогою XGROUP

Відновлення після збою

Передплатник може відновитись після збою та перечитати свій список повідомлень зі статусом «pending». Однак у реальному світі передплатники можуть остаточно зазнати невдачі. Що відбувається з повідомленнями передплатника, що підвисли, якщо він не зміг відновитися після збою?
Consumer Group пропонує функцію, яка використовується саме для таких випадків, коли необхідно змінити власника повідомлень.

Насамперед необхідно викликати команду XPENDING, яка відображає всі повідомлення групи зі статусом pending. У своїй простій формі команда викликається лише з двома аргументами: ім'ям потоку та ім'ям групи:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Команда вивела кількість необроблених повідомлень для всієї групи та для кожного передплатника. У нас є тільки Боб із двома необробленими повідомленнями, тому що єдине повідомлення, яке запитав Аліса, було підтверджено за допомогою XACK.

Ми можемо запросити додаткову інформацію, використовуючи більше аргументів:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} — діапазон ідентифікаторів (можна використовувати «-» та «+»)
{count} — кількість спроб доставки
{consumer-name} - ім'я групи

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Тепер ми маємо деталі для кожного повідомлення: ідентифікатор, ім'я передплатника, час простою в мілісекундах і, нарешті, кількість спроб доставки. У нас є два повідомлення від Боба, і вони простоюють протягом 74170458 20 XNUMX мілісекунд, близько XNUMX годин.

Зверніть увагу, що ніхто не заважає нам перевірити, яким був зміст повідомлення, просто використовуючи XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Ми просто повинні повторити той самий ідентифікатор двічі в аргументах. Тепер, коли ми маємо деяку ідею, Аліса може вирішити, що після 20 годин простою Боб, ймовірно, не відновиться, і настав час запитати ці повідомлення і відновити їх обробку замість Боба. Для цього ми використовуємо команду XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

За допомогою цієї команди ми можемо отримати «чуже» повідомлення, яке ще не було оброблено шляхом зміни власника на {consumer}. Однак ми можемо надати мінімальний час простою {min-idle-time}. Це допомагає уникнути ситуації, коли два клієнти намагаються одночасно змінити власника в тих самих повідомлень:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Перший клієнт скине час простою та збільшить лічильник кількості доставок. Отже, другий клієнт не зможе його запросити.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Повідомлення було успішно затребуване Алісою, яка тепер може опрацювати повідомлення та підтвердити його.

З наведеного вище прикладу видно, що успішне виконання запиту повертає вміст повідомлення. Однак, це не обов'язково. Опція JUSTID може використовуватись для повернення лише ідентифікаторів повідомлення. Це корисно, якщо вам не цікаві деталі повідомлення, і ви хочете збільшити продуктивність системи.

Лічильник доставки

Лічильник, який ви спостерігаєте у висновку XPENDING — кількість доставок кожного повідомлення. Такий лічильник збільшується двома способами: коли повідомлення успішно потрібне через XCLAIM або коли використовується виклик XREADGROUP.

Це нормально, деякі повідомлення доставляються по кілька разів. Головне, щоб у результаті всі повідомлення були опрацьовані. Іноді при обробці повідомлення виникають проблеми через пошкодження повідомлення або обробка повідомлення викликає помилку в коді обробника. У такому разі може виявитися, що це повідомлення ніхто не зможе обробити. Оскільки ми маємо лічильник спроб доставки, ми можемо використовувати цей лічильник для виявлення таких ситуацій. Тому, як тільки лічильник доставки досягне заданого вами великого числа, ймовірно, буде розумніше помістити таке повідомлення в інший потік і відправити повідомлення системному адміністратору.

Стан потоків

Команда XINFO використовується для запиту різної інформації потоку та його групах. Наприклад, базовий вигляд команди виглядає так:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Команда вище відображає загальну інформацію щодо вказаного потоку. Тепер трохи складніший приклад:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Команда вище відображає загальну інформацію щодо всіх груп зазначеного потоку

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Команда вище відображає інформацію щодо всіх передплатників зазначеного потоку та групи.
Якщо ви забудете синтаксис команди, просто зверніться за допомогою до самої команди:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Обмеження розміру потоку

Багато програм не хочуть збирати дані в потік вічно. Часто корисно мати максимально допустиму кількість повідомлень у потоці. В інших випадках корисно перенести всі повідомлення з потоку до іншого постійного сховища при досягненні заданого розміру потоку. Обмежити розмір потоку можна за допомогою параметра MAXLEN у команді XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

При використанні MAXLEN старі записи автоматично видаляються при досягненні вказаної довжини, тому потік має постійний розмір. Однак обрізка в цьому випадку відбувається не найбільш продуктивним способом у пам'яті Redis. Поліпшити ситуацію можна так:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Аргумент ~ у прикладі вище означає, що нам не обов'язково потрібно обмежувати довжину потоку конкретним значенням. У прикладі це може бути будь-яке число більше або дорівнює 1000 (наприклад, 1000, 1010 або 1030). Просто ми явно вказали, що хочемо, щоб наш потік зберігав щонайменше 1000 записів. Це робить роботу з пам'яттю набагато ефективнішою всередині Redis.

Також існує окрема команда XTRIM, що виконує те саме:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Постійне зберігання та реплікація

Redis Stream асинхронно реплікується на slave ноди і зберігається у файли типу AOF (Снапшот всіх даних) і RDB (лог всіх операцій запису). Також підтримується реплікація стану Consumer Groups. Тому, якщо повідомлення знаходиться в статусі "pending" на master ноді, то на slave нодах це повідомлення матиме такий самий статус.

Видалення окремих елементів із потоку

Для видалення повідомлень існує спеціальна команда XDEL. Команда отримує ім'я потоку, за яким ідуть ідентифікатори повідомлень, який необхідно видалити:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

При використанні цієї команди потрібно врахувати, що фактично пам'ять буде вивільнено не одразу.

Потоки нульової довжини

Різниця між потоками та іншими структурами даних Redis полягає в тому, що коли інші структури даних більше не мають елементів у собі, як побічний ефект, сама структура даних буде видалена з пам'яті. Так, наприклад, відсортований набір буде повністю видалено, коли виклик ZREM видалить останній елемент. Натомість потокам дозволяється залишатися в пам'яті, навіть не маючи жодного елемента всередині.

Висновок

Redis Stream ідеально підходить для створення брокерів повідомлень, черг повідомлень, уніфікованих журналів та систем чату, що зберігають історію.

Як одного разу сказав Ніклаус Вірт, програми - це алгоритми плюс структури даних, а Redis вже дає вам і те, й інше.

Джерело: habr.com

Додати коментар або відгук