Redis Stream - niezawodność i skalowalność Twoich systemów przesyłania wiadomości

Redis Stream - niezawodność i skalowalność Twoich systemów przesyłania wiadomości

Redis Stream to nowy abstrakcyjny typ danych wprowadzony w Redis w wersji 5.0
Koncepcyjnie strumień Redis to lista, do której można dodawać wpisy. Każdy wpis ma unikalny identyfikator. Domyślnie identyfikator jest generowany automatycznie i zawiera znacznik czasu. W związku z tym można na przestrzeni czasu wysyłać zapytania do zakresów rekordów lub odbierać nowe dane w miarę ich pojawiania się w strumieniu, podobnie jak uniksowe polecenie „tail -f” odczytuje plik dziennika i zawiesza się w oczekiwaniu na nowe dane. Zauważ, że wielu klientów może nasłuchiwać wątku w tym samym czasie, tak samo wiele procesów „tail -f” może czytać plik jednocześnie bez konfliktów między sobą.

Aby zrozumieć wszystkie zalety nowego typu danych, rzućmy okiem na istniejące od dawna struktury Redis, które częściowo replikują funkcjonalność Redis Stream.

Redis PUB/SUB

Redis Pub/Sub to prosty system przesyłania wiadomości wbudowany już w Twój magazyn klucz-wartość. Jednak prostota ma swoją cenę:

  • Jeśli wydawca z jakiegoś powodu poniesie porażkę, traci wszystkich swoich subskrybentów
  • Wydawca musi znać dokładny adres wszystkich swoich subskrybentów
  • Wydawca może przeciążać swoich subskrybentów pracą, jeśli dane są publikowane szybciej niż przetwarzane
  • Wiadomość jest usuwana z bufora wydawcy natychmiast po publikacji, niezależnie od tego, do ilu subskrybentów została dostarczona i jak szybko udało im się ją przetworzyć.
  • Wszyscy subskrybenci otrzymają wiadomość w tym samym czasie. Abonenci sami muszą w jakiś sposób uzgodnić między sobą kolejność przetwarzania tej samej wiadomości.
  • Nie ma wbudowanego mechanizmu potwierdzającego, że subskrybent pomyślnie przetworzył wiadomość. Jeśli subskrybent otrzyma wiadomość i podczas przetwarzania nastąpi awaria, wydawca nie będzie o tym wiedział.

Lista Redisa

Redis List to struktura danych obsługująca blokowanie poleceń odczytu. Możesz dodawać i czytać wiadomości od początku lub końca listy. W oparciu o tę strukturę możesz stworzyć dobry stos lub kolejkę dla swojego systemu rozproszonego i w większości przypadków to wystarczy. Główne różnice w stosunku do Redis Pub/Sub:

  • Wiadomość jest dostarczana do jednego klienta. Pierwszy klient z zablokowanym odczytem otrzyma dane jako pierwszy.
  • Clint musi sam zainicjować operację odczytu każdej wiadomości. Lista nic nie wie o klientach.
  • Wiadomości są przechowywane do czasu, aż ktoś je przeczyta lub wyraźnie usunie. Jeśli skonfigurujesz serwer Redis tak, aby zrzucał dane na dysk, niezawodność systemu znacznie wzrośnie.

Wprowadzenie do strumienia

Dodanie wpisu do strumienia

Zespół XDODAJ dodaje nowy wpis do strumienia. Rekord to nie tylko ciąg znaków, składa się z jednej lub większej liczby par klucz-wartość. Tym samym każdy wpis ma już strukturę i przypomina strukturę pliku CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

W powyższym przykładzie dodajemy do strumienia dwa pola o nazwie (kluczu) „mystream”: „sensor-id” i „temperatura” o wartościach odpowiednio „1234” i „19.8”. Jako drugi argument polecenie przyjmuje identyfikator, który zostanie przypisany do wpisu - identyfikator ten jednoznacznie identyfikuje każdy wpis w strumieniu. Jednak w tym przypadku przekazaliśmy *, ponieważ chcemy, aby Redis wygenerował dla nas nowy identyfikator. Każdy nowy identyfikator będzie się zwiększał. Dlatego każdy nowy wpis będzie miał wyższy identyfikator w stosunku do poprzednich wpisów.

Format identyfikatora

Identyfikator wpisu zwrócony przez polecenie XDODAJ, składa się z dwóch części:

{millisecondsTime}-{sequenceNumber}

milisekundyCzas — Czas uniksowy w milisekundach (czas serwera Redis). Jeśli jednak bieżący czas jest taki sam lub krótszy niż czas poprzedniego nagrania, wówczas używany jest znacznik czasu poprzedniego nagrania. Dlatego też, jeśli czas serwera cofnie się w czasie, nowy identyfikator nadal zachowa właściwość przyrostu.

numer sekwencji używany do rekordów utworzonych w tej samej milisekundie. numer sekwencji zostanie zwiększona o 1 w stosunku do poprzedniego wpisu. Ponieważ numer sekwencji ma rozmiar 64 bitów, to w praktyce nie należy napotykać ograniczenia liczby rekordów, które można wygenerować w ciągu jednej milisekundy.

Format takich identyfikatorów może na pierwszy rzut oka wydawać się dziwny. Nieufny czytelnik może zastanawiać się, dlaczego czas jest częścią identyfikatora. Powodem jest to, że strumienie Redis obsługują zapytania o zakres według identyfikatora. Ponieważ identyfikator jest powiązany z czasem utworzenia rekordu, umożliwia to zapytanie o przedziały czasowe. Przyjrzymy się konkretnemu przykładowi, gdy spojrzymy na polecenie XRANGE.

Jeśli z jakiegoś powodu użytkownik musi podać swój własny identyfikator, który jest np. powiązany z jakimś zewnętrznym systemem, to możemy przekazać go do polecenia XDODAJ zamiast * jak pokazano poniżej:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Pamiętaj, że w tym przypadku musisz samodzielnie monitorować przyrost identyfikatora. W naszym przykładzie minimalny identyfikator to „0-1”, zatem polecenie nie przyjmie innego identyfikatora równego lub mniejszego niż „0-1”.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Liczba rekordów na strumień

Liczbę rekordów w strumieniu można uzyskać po prostu za pomocą polecenia XLEN. W naszym przykładzie to polecenie zwróci następującą wartość:

> XLEN somestream
(integer) 2

Zapytania o zakres - XRANGE i XREVRANGE

Aby zażądać danych według zakresu, musimy podać dwa identyfikatory - początek i koniec zakresu. Zwrócony zakres będzie zawierał wszystkie elementy, łącznie z granicami. Istnieją również dwa specjalne identyfikatory „-” i „+”, oznaczające odpowiednio najmniejszy (pierwszy rekord) i największy (ostatni rekord) identyfikator w strumieniu. Poniższy przykład wyświetli listę wszystkich wpisów strumienia.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Każdy zwrócony rekord jest tablicą składającą się z dwóch elementów: identyfikatora i listy par klucz-wartość. Powiedzieliśmy już, że identyfikatory rekordów są powiązane z czasem. Możemy zatem poprosić o zakres o określonym przedziale czasu. Możemy jednak podać w żądaniu nie pełny identyfikator, a jedynie czas uniksowy, pomijając część dot numer sekwencji. Pominięta część identyfikatora zostanie automatycznie ustawiona na zero na początku zakresu i na maksymalną możliwą wartość na końcu zakresu. Poniżej znajduje się przykład żądania zakresu dwóch milisekund.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Mamy tylko jeden wpis w tym zakresie, jednak w rzeczywistych zbiorach danych zwracany wynik może być ogromny. Z tego powodu XRANGE obsługuje opcję COUNT. Określając ilość, możemy po prostu uzyskać N pierwszych rekordów. Jeśli potrzebujemy uzyskać kolejnych N rekordów (paginacja), możemy wykorzystać ostatnio otrzymany identyfikator, zwiększyć go numer sekwencji o jeden i zapytaj ponownie. Przyjrzyjmy się temu w poniższym przykładzie. Zaczynamy dodawanie 10 elementów od XDODAJ (zakładając, że mój strumień był już wypełniony 10 elementami). Aby rozpocząć iterację otrzymując 2 elementy na polecenie, zaczynamy od pełnego zakresu, ale z LICZBĄ równą 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Aby kontynuować iterację z dwoma kolejnymi elementami, musimy wybrać ostatni otrzymany identyfikator, czyli 1519073279157-0 i dodać 1 do numer sekwencji.
Wynikowy identyfikator, w tym przypadku 1519073279157-1, może teraz zostać użyty jako nowy argument początku zakresu dla następnego wywołania XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

I tak dalej. Ponieważ złożoność XRANGE to O(log(N)) do wyszukiwania, a następnie O(M) do zwrócenia M elementów, wtedy każdy krok iteracji jest szybki. Zatem używając XRANGE strumienie można efektywnie iterować.

Zespół XREVRANGE jest odpowiednikiem XRANGE, ale zwraca elementy w odwrotnej kolejności:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Należy pamiętać, że polecenie XREVRANGE przyjmuje argumenty zakresu start i stop w odwrotnej kolejności.

Czytanie nowych wpisów za pomocą XREAD

Często pojawia się zadanie subskrybowania strumienia i odbierania tylko nowych wiadomości. Ta koncepcja może wydawać się podobna do Redis Pub/Sub lub blokowania Redis List, ale istnieją zasadnicze różnice w sposobie korzystania z Redis Stream:

  1. Każda nowa wiadomość domyślnie jest dostarczana do każdego abonenta. To zachowanie różni się od blokującej listy Redis, gdzie nowa wiadomość będzie czytana tylko przez jednego abonenta.
  2. Podczas gdy w Redis Pub/Sub wszystkie wiadomości są zapominane i nigdy nie są utrwalane, w Stream wszystkie wiadomości są przechowywane przez czas nieokreślony (chyba że klient jawnie spowoduje usunięcie).
  3. Redis Stream pozwala różnicować dostęp do wiadomości w ramach jednego strumienia. Konkretny subskrybent może zobaczyć tylko swoją osobistą historię wiadomości.

Za pomocą polecenia możesz subskrybować wątek i otrzymywać nowe wiadomości XCZYTAJ. To trochę bardziej skomplikowane niż XRANGE, więc zaczniemy najpierw od prostszych przykładów.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Powyższy przykład przedstawia formularz nieblokujący XCZYTAJ. Należy pamiętać, że opcja COUNT jest opcjonalna. Tak naprawdę jedyną wymaganą opcją polecenia jest opcja STREAMS, która określa listę strumieni wraz z odpowiadającym im maksymalnym identyfikatorem. Napisaliśmy „STREAMS mystream 0” – chcemy otrzymać wszystkie rekordy strumienia mystream o identyfikatorze większym niż „0-0”. Jak widać na przykładzie polecenie zwraca nazwę wątku, ponieważ możemy subskrybować wiele wątków jednocześnie. Moglibyśmy napisać na przykład „STREAMS mój strumień inny strumień 0 0”. Należy pamiętać, że po opcji STREAMS należy najpierw podać nazwy wszystkich wymaganych strumieni, a dopiero potem listę identyfikatorów.

W tej prostej formie polecenie nie robi nic specjalnego w porównaniu do XRANGE. Ciekawostką jest jednak to, że możemy łatwo zawrócić XCZYTAJ do polecenia blokującego, podając argument BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

W powyższym przykładzie określono nową opcję BLOCK z limitem czasu wynoszącym 0 milisekund (oznacza to czas oczekiwania). Co więcej, zamiast przekazać zwykły identyfikator strumienia mystream, przekazany został specjalny identyfikator $. Oznacza to ten specjalny identyfikator XCZYTAJ jako identyfikatora należy użyć maksymalnego identyfikatora w mystream. Dlatego będziemy otrzymywać nowe wiadomości tylko od momentu, w którym zaczęliśmy słuchać. W pewnym sensie jest to podobne do uniksowego polecenia „tail -f”.

Należy pamiętać, że korzystając z opcji BLOCK nie musimy koniecznie używać specjalnego identyfikatora $. Możemy użyć dowolnego identyfikatora istniejącego w strumieniu. Jeśli zespół będzie w stanie obsłużyć naszą prośbę natychmiast, bez blokowania, zrobi to, w przeciwnym razie zablokuje.

Bloking XCZYTAJ może także słuchać wielu wątków jednocześnie, wystarczy podać ich nazwy. W takim przypadku polecenie zwróci zapis pierwszego strumienia, który odebrał dane. Pierwszy abonent zablokowany dla danego wątku otrzyma dane jako pierwszy.

Grupy konsumentów

W niektórych zadaniach chcemy ograniczyć dostęp abonenta do wiadomości w ramach jednego wątku. Przykładem, gdzie może to być przydatne, jest kolejka komunikatów z procesami roboczymi, które będą otrzymywać różne komunikaty z wątku, co pozwala na skalowanie przetwarzania komunikatów.

Jeśli wyobrazimy sobie, że mamy trzech abonentów C1, C2, C3 i wątek zawierający wiadomości 1, 2, 3, 4, 5, 6, 7, to wiadomości będą obsługiwane jak na poniższym schemacie:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Aby osiągnąć taki efekt, Redis Stream wykorzystuje koncepcję zwaną Grupą Konsumencką. Koncepcja ta przypomina pseudoabonenta, który odbiera dane ze strumienia, ale w rzeczywistości jest obsługiwany przez wielu abonentów w ramach grupy, zapewniając pewne gwarancje:

  1. Każda wiadomość jest dostarczana do innego abonenta w ramach grupy.
  2. W grupie subskrybenci są identyfikowani na podstawie nazwy, która jest ciągiem znaków, w którym rozróżniana jest wielkość liter. Jeśli abonent chwilowo opuści grupę, może zostać przywrócony do grupy przy użyciu własnej, unikalnej nazwy.
  3. Każda Grupa Konsumencka kieruje się zasadą „pierwszej nieprzeczytanej wiadomości”. Kiedy abonent żąda nowych wiadomości, może otrzymać tylko te wiadomości, które nigdy wcześniej nie zostały dostarczone żadnemu abonentowi w grupie.
  4. Istnieje polecenie jawnego potwierdzenia, że ​​wiadomość została pomyślnie przetworzona przez subskrybenta. Dopóki nie zostanie wywołane to polecenie, żądana wiadomość pozostanie w statusie „oczekująca”.
  5. W ramach Grupy Konsumenckiej każdy abonent może zażądać historii wiadomości, które zostały mu dostarczone, ale nie zostały jeszcze przetworzone (w statusie „oczekujące”)

W pewnym sensie stan grupy można wyrazić następująco:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Teraz czas zapoznać się z głównymi poleceniami dla Grupy Konsumenckiej, a mianowicie:

  • XGRUPA używany do tworzenia, niszczenia i zarządzania grupami
  • XREADGROUP używany do odczytywania strumienia przez grupę
  • XAK - polecenie to pozwala abonentowi oznaczyć wiadomość jako pomyślnie przetworzoną

Utworzenie Grupy Konsumenckiej

Załóżmy, że mójstream już istnieje. Następnie polecenie utworzenia grupy będzie wyglądać następująco:

> XGROUP CREATE mystream mygroup $
OK

Tworząc grupę musimy przekazać identyfikator, od którego grupa będzie otrzymywać wiadomości. Jeśli chcemy tylko otrzymywać wszystkie nowe wiadomości, możemy użyć specjalnego identyfikatora $ (jak w naszym przykładzie powyżej). Jeśli zamiast specjalnego identyfikatora podasz 0, wszystkie wiadomości w wątku będą dostępne dla grupy.

Po utworzeniu grupy możemy od razu przystąpić do czytania wiadomości za pomocą polecenia XREADGROUP. To polecenie jest bardzo podobne do XCZYTAJ i obsługuje opcjonalną opcję BLOK. Wymagana jest jednak opcja GROUP, którą należy zawsze podać z dwoma argumentami: nazwą grupy i nazwą abonenta. Obsługiwana jest także opcja COUNT.

Zanim przeczytasz wątek, umieśćmy tam kilka wiadomości:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Spróbujmy teraz przeczytać ten strumień w grupie:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Powyższe polecenie brzmi dosłownie w następujący sposób:

„Ja, subskrybentka Alice, członkini mygroup, chcę przeczytać jedną wiadomość z mystream, która nigdy wcześniej nie została nikomu dostarczona”.

Za każdym razem, gdy abonent wykonuje operację na grupie, musi podać swoją nazwę, jednoznacznie identyfikując się w obrębie grupy. W powyższym poleceniu jest jeszcze jeden bardzo ważny szczegół - specjalny identyfikator „>”. Ten specjalny identyfikator filtruje wiadomości, pozostawiając tylko te, które nigdy wcześniej nie zostały dostarczone.

Ponadto w szczególnych przypadkach możesz podać identyfikator rzeczywisty, taki jak 0, lub inny ważny identyfikator. W tym przypadku polecenie XREADGROUP zwróci Ci historię wiadomości o statusie „oczekujące”, które zostały dostarczone do wskazanego abonenta (Alicja), ale nie zostały jeszcze potwierdzone komendą XAK.

Możemy przetestować to zachowanie, natychmiast podając identyfikator 0, bez opcji COUNT. Zobaczymy po prostu pojedynczą oczekującą wiadomość, czyli wiadomość Apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Jeśli jednak potwierdzimy, że wiadomość została pomyślnie przetworzona, wówczas nie będzie już ona wyświetlana:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Teraz kolej Boba, żeby coś przeczytać:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, członek mojej grupy, poprosił o nie więcej niż dwie wiadomości. Polecenie raportuje tylko niedostarczone wiadomości ze względu na specjalny identyfikator „>”. Jak widać, wiadomość „jabłko” nie zostanie wyświetlona, ​​ponieważ została już dostarczona Alicji, więc Bob otrzymuje „pomarańczę” i „truskawkę”.

W ten sposób Alicja, Bob i każdy inny subskrybent grupy może czytać różne wiadomości z tego samego strumienia. Mogą także zapoznać się z historią nieprzetworzonych wiadomości lub oznaczyć wiadomości jako przetworzone.

Jest kilka rzeczy, o których warto pamiętać:

  • Gdy tylko abonent uzna wiadomość za polecenie XREADGROUP, wiadomość ta przechodzi w stan „oczekujący” i jest przypisana do tego konkretnego abonenta. Inni subskrybenci grupy nie będą mogli przeczytać tej wiadomości.
  • Subskrybenci są tworzeni automatycznie po pierwszej wzmiance, nie ma potrzeby ich jawnego tworzenia.
  • Z XREADGROUP możesz czytać wiadomości z wielu różnych wątków jednocześnie, jednak aby to zadziałało, musisz najpierw utworzyć grupy o tej samej nazwie dla każdego wątku za pomocą XGRUPA

Odzyskiwanie po awarii

Abonent może odzyskać siły po awarii i ponownie przeczytać swoją listę wiadomości ze statusem „oczekujące”. Jednak w prawdziwym świecie abonenci mogą ostatecznie ponieść porażkę. Co dzieje się z zablokowanymi wiadomościami subskrybenta, jeśli subskrybent nie jest w stanie odzyskać sprawności po awarii?
Consumer Group oferuje funkcję, która sprawdza się właśnie w takich przypadkach – gdy zachodzi potrzeba zmiany właściciela wiadomości.

Pierwszą rzeczą, którą musisz zrobić, to wywołać polecenie WYDAWANIE, który wyświetla wszystkie wiadomości w grupie ze statusem „oczekujące”. W najprostszej formie polecenie jest wywoływane tylko z dwoma argumentami: nazwą wątku i nazwą grupy:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Zespół wyświetlił liczbę nieprzetworzonych wiadomości dla całej grupy i dla każdego abonenta. Mamy tylko Boba z dwiema zaległymi wiadomościami, ponieważ jedyna wiadomość, o którą prosiła Alicja, została potwierdzona XAK.

Możemy poprosić o więcej informacji, używając większej liczby argumentów:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - zakres identyfikatorów (można użyć „-” i „+”)
{count} — liczba prób doręczenia
{consumer-name} - nazwa grupy

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Teraz mamy szczegółowe informacje na temat każdej wiadomości: identyfikator, nazwę subskrybenta, czas bezczynności w milisekundach i na koniec liczbę prób dostarczenia. Mamy dwie wiadomości od Boba, które były bezczynne przez 74170458 milisekund, czyli około 20 godzin.

Pamiętaj, że nikt nam nie przeszkadza, abyśmy sprawdzili, jaka była treść wiadomości, po prostu za pomocą jej użycia XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Musimy po prostu dwukrotnie powtórzyć ten sam identyfikator w argumentach. Teraz, gdy mamy już pewne pojęcie, Alicja może zdecydować, że po 20 godzinach przestoju Bob prawdopodobnie nie odzyska sił i nadszedł czas, aby sprawdzić te wiadomości i wznowić ich przetwarzanie w imieniu Boba. W tym celu używamy polecenia XROSZCZENIE:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Za pomocą tego polecenia możemy otrzymać „obcą” wiadomość, która nie została jeszcze przetworzona, zmieniając właściciela na {consumer}. Możemy jednak podać również minimalny czas bezczynności {min-idle-time}. Pomaga to uniknąć sytuacji, w której dwóch klientów próbuje jednocześnie zmienić właściciela tych samych wiadomości:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pierwszy klient zresetuje przestój i zwiększy licznik dostaw. Zatem drugi klient nie będzie mógł o to poprosić.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alicja pomyślnie odebrała wiadomość, która może teraz ją przetworzyć i potwierdzić.

Z powyższego przykładu widać, że pomyślne żądanie zwraca treść samej wiadomości. Jednakże nie jest to konieczne. Opcji JUSTID można używać do zwracania wyłącznie identyfikatorów wiadomości. Jest to przydatne, jeśli nie interesują Cię szczegóły wiadomości i chcesz zwiększyć wydajność systemu.

Licznik dostaw

Licznik widoczny na wyjściu WYDAWANIE to liczba dostaw każdej wiadomości. Licznik taki jest zwiększany na dwa sposoby: po pomyślnym zażądaniu wiadomości przez XROSZCZENIE lub gdy używane jest połączenie XREADGROUP.

To normalne, że niektóre wiadomości są dostarczane wielokrotnie. Najważniejsze jest to, że wszystkie wiadomości są ostatecznie przetwarzane. Czasami podczas przetwarzania wiadomości pojawiają się problemy, ponieważ sama wiadomość jest uszkodzona lub przetwarzanie wiadomości powoduje błąd w kodzie obsługi. W takim przypadku może się okazać, że nikt nie będzie w stanie przetworzyć tej wiadomości. Ponieważ mamy licznik prób doręczenia, możemy go używać do wykrywania takich sytuacji. Dlatego też, gdy liczba dostaw osiągnie określoną przez Ciebie liczbę, prawdopodobnie rozsądniej będzie umieścić taką wiadomość w innym wątku i wysłać powiadomienie do administratora systemu.

Stan wątku

Zespół XINFO używany do żądania różnych informacji o wątku i jego grupach. Na przykład podstawowe polecenie wygląda następująco:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Powyższe polecenie wyświetla ogólne informacje o określonym strumieniu. Teraz nieco bardziej złożony przykład:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Powyższe polecenie wyświetla ogólne informacje o wszystkich grupach określonego wątku

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Powyższe polecenie wyświetla informacje o wszystkich subskrybentach określonego strumienia i grupy.
Jeśli zapomnisz składnię polecenia, po prostu poproś o pomoc samo polecenie:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limit rozmiaru strumienia

Wiele aplikacji nie chce wiecznie gromadzić danych w strumieniu. Często przydatne jest ustawienie maksymalnej liczby wiadomości na wątek. W innych przypadkach przydatne jest przeniesienie wszystkich wiadomości z wątku do innego trwałego magazynu po osiągnięciu określonego rozmiaru wątku. Możesz ograniczyć rozmiar strumienia za pomocą parametru MAXLEN w poleceniu XDODAJ:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Podczas korzystania z MAXLEN stare rekordy są automatycznie usuwane po osiągnięciu określonej długości, dzięki czemu strumień ma stały rozmiar. Jednak w tym przypadku czyszczenie w pamięci Redis nie przebiega w najbardziej efektywny sposób. Możesz poprawić sytuację w następujący sposób:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argument ~ w powyższym przykładzie oznacza, że ​​niekoniecznie musimy ograniczać długość strumienia do określonej wartości. W naszym przykładzie może to być dowolna liczba większa lub równa 1000 (na przykład 1000, 1010 lub 1030). Właśnie wyraźnie określiliśmy, że chcemy, aby nasz strumień przechowywał co najmniej 1000 rekordów. Dzięki temu zarządzanie pamięcią w Redis jest znacznie wydajniejsze.

Jest też oddzielny zespół XTRIM, który robi to samo:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Trwałe przechowywanie i replikacja

Strumień Redis jest asynchronicznie replikowany do węzłów podrzędnych i zapisywany w plikach takich jak AOF (migawka wszystkich danych) i RDB (dziennik wszystkich operacji zapisu). Obsługiwana jest także replikacja stanu grup konsumentów. Dlatego też, jeśli wiadomość w węźle głównym ma status „oczekujący”, wówczas w węzłach podrzędnych wiadomość ta będzie miała ten sam status.

Usuwanie poszczególnych elementów ze strumienia

Istnieje specjalne polecenie usuwania wiadomości XDEL. Polecenie pobiera nazwę wątku i identyfikatory wiadomości, które mają zostać usunięte:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Korzystając z tego polecenia, należy wziąć pod uwagę, że rzeczywista pamięć nie zostanie natychmiast zwolniona.

Strumienie o zerowej długości

Różnica między strumieniami a innymi strukturami danych Redis polega na tym, że gdy inne struktury danych nie zawierają już elementów, jako efekt uboczny sama struktura danych zostanie usunięta z pamięci. Na przykład posortowany zbiór zostanie całkowicie usunięty, gdy wywołanie ZREM usunie ostatni element. Zamiast tego wątki mogą pozostać w pamięci nawet bez żadnych elementów w środku.

wniosek

Redis Stream idealnie nadaje się do tworzenia brokerów wiadomości, kolejek wiadomości, ujednoliconego rejestrowania i systemów czatów przechowujących historię.

Jak kiedyś powiedziałem Niklausa Wirtha, programy to algorytmy i struktury danych, a Redis już zapewnia jedno i drugie.

Źródło: www.habr.com

Dodaj komentarz