Queue (kolejka) – struktura danych na dysku lub w pamięci RAM, która przechowuje odwołania do wiadomości i zwraca ich kopie consumers (do konsumentów). Queue jest a ze stanem (gdzie same wiadomości mogą być buforowane). 1 kolejek może zająć około 80Mb.
Binding (wiążąca) – reguła informująca giełdę, do której kolejki mają trafić wiadomości.
Spis treści
- KrólikMQ. Część 4. Zrozumienie, czym są wiadomości i ramki
- KrólikMQ. Część 5: Wydajność publikowania i wykorzystywania wiadomości
- KrólikMQ. Część 6. Omówienie modułów Federacji i Łopaty
- KrólikMQ. Część 7. Szczegóły dotyczące Connection i Chanel
- KrólikMQ. Część 8. RabbitMQ w .NET
- KrólikMQ. Część 9. Monitorowanie
Tymczasowe kolejki
Jeżeli kolejka jest tworzona z zestawem parametrów autoDelete, wówczas taka kolejka nabywa zdolność automatycznie usuń siebie. Takie kolejki są zazwyczaj tworzone, gdy łączy się pierwszy klient, i usuwane, gdy wszyscy klienci się rozłączą.
Jeżeli kolejka jest tworzona z zestawem parametrów exclusive, to taka kolejka umożliwia podłączenie tylko jednego konsumenta i zostaje usunięty po zamknięciu kanału. Dopóki kanał nie zostanie zamknięty, klient może się rozłączać i łączyć, ale tylko w ramach tego samego połączenia. Jeśli parametr exclusive jest ustawiony, wówczas parametr autoDelete nie ma żadnego efektu.
Cechy:
- jeśli nastąpi krótkotrwała przerwa w połączeniu, utracimy wiadomości, które jeszcze nie dotarły do konsumenta
- możesz złapać to zjawisko
binding churn. Zjawisko to występuje, gdy liczba operacji tworzenia/usuwania kolejek i powiązań osiąga bardzo duże wartości. W trybie klastrowym taki przepływ operacji rozłoży się na wszystkie węzły i spowoduje duże obciążenie. Proces ten można zoptymalizować poprzez kontrolowanie liczby subskrypcji.
Ciągłe kolejki
Jeżeli kolejka jest tworzona z zestawem parametrów durable, to taka kolejka utrzymuje swój stan i zostanie przywrócony po ponownym uruchomieniu serwera/brokera. Kolejka ta będzie istnieć do momentu wywołania polecenia. Queue.Delete.
Kolejka o wysokiej dostępności
Kolejki HA wymagają środowiska klastra RabbitMQ. W trybie klastra wszystkie informacje o wymiennikach, kolejkach, powiązaniach i konsumentach zostaną skopiowane do wszystkich węzłów.
Gdy wiadomość zostanie opublikowana w kolejce HA, jest ona zapisywana na każdym węźle należącym do kolejki HA. Po odebraniu wiadomości na dowolnym węźle wszystkie kopie tej wiadomości na innych węzłach zostaną usunięte.
Kolejki HA mogą być dystrybuowane do wszystkich węzłów w klastrze lub tylko do pojedynczych węzłów.

Cechy:
- Korzystanie z kolejek HA skutkuje obniżeniem wydajności. Podczas umieszczania wiadomości w kolejce HA lub pobierania wiadomości z kolejki HA, RabbitMQ musi wykonać koordynację na wszystkich węzłach (zwykle wystarczają 2–3 węzły)
Tworzenie kolejki
Kolejka jest tworzona przy użyciu metody synchronicznej RPC żądanie do serwera. Żądanie jest składane za pomocą metody Queue.Declare, wywołane z parametrami:
- nazwa kolejki
- inne opcje
Przykład tworzenia kolejki za pomocą :
// ...
channel.QueueDeclare(
queue: "my_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// ...queue— nazwa kolejki, którą chcemy utworzyć. Nazwa musi być unikatowa i nie może być taka sama, jak nazwa kolejki systemowej.durable- jeśli to prawda, to kolejka będzie utrzymuj swój stan i zostanie przywrócony po ponownym uruchomieniu serwera/brokeraexclusive- jeśli prawda, kolejka pozwoli na podłączenie tylko jednego konsumentaautoDelete- jeśli prawda, to kolejka zyskuje możliwość automatycznie usuń siebiearguments— argumenty opcjonalne. Przyjrzymy się temu bliżej poniżej.
argumenty
x-message-ttl(x-message-time-to-live) - umożliwia ustawienie czasu wygaśnięcia wiadomości w milisekundach. Jeśli kolejka jest tworzona z ustawioną wartością argumentux-message-ttl, to będzie taka kolejka automatycznie wykluczaj wygasłe wiadomości. Ustawianie wartości argumentux-message-ttlustawia maksymalny wiek wszystkich wiadomości w tej kolejce. Tworzenie takiej kolejki pozwala zapobiec otrzymywaniu nieaktualnych informacji. Można go stosować w systemach czasu rzeczywistego. Jeżeli kolejka, dla której określono wymieniacz odrzuconych wiadomości, jest określona, ustaw wartość argumentux-message-ttl, a następnie odrzucone wiadomości w tej kolejce zacznie mieć żywotność.x-expires— określa wartość w milisekundach, po której kolejka zostanie usunięta. Kolejka może wygasnąć tylko wtedy, gdy nie ma żadnych subskrybentów. Jeśli kolejka ma podłączonych abonentów, będzie można ją automatycznie usunąć dopiero po tym, jak wszyscy abonenci zadzwonią.Basic.Cancellub rozłącz się. Czas życia kolejki może wygasnąć tylko wtedy, gdy nie ma żadnego żądania.Basic.Get. W przeciwnym wypadku bieżąca wartość ustawienia czasu życia zostanie zresetowana do zera, a kolejka nie będzie już automatycznie usuwana. Również nie ma gwarancji, jak szybko kolejka zostanie usunięta po upływie jej żywotności.x-max-length— ustawia maksymalną liczbę wiadomości w kolejce. Jeżeli liczba wiadomości w kolejce zaczyna przekraczać wartość maksymalną, najstarsze wiadomości zaczynają być usuwane.

x-max-lenght-bytes— ustawia maksymalny dopuszczalny całkowity rozmiar ładunku wiadomości w kolejce. W przypadku przekroczenia ustawionej wartości (kolejka zapełni się w czasie kolejnej publikacji wiadomości) najstarsze wiadomości zostaną usuniętex-overflow— ten argument służy do skonfigurowania zachowania w przypadku przepełnienia kolejki. Dostępne są dwie wartości:drop-head(wartość domyślna) ireject-publish. Jeśli wybierzeszdrop-head, wówczas najstarsze wiadomości zostaną usunięte. Jeśli wybierzeszreject-publish, wówczas odbiór wiadomości zostanie zawieszonyx-dead-letter-exchange— określa giełdę, do której wysyłane są odrzucone wiadomości, które nie zostały ponownie umieszczone w kolejcex-dead-letter-routing-key— określa opcjonalny klucz routingu dla odrzuconych wiadomościx-max-priority— umożliwia sortowanie według priorytetu w kolejce z maksymalną wartością priorytetu 255 (wersje RabbitMQ 3.5.0 i nowsze). Liczba ta określa maksymalny priorytet, jaki kolejka będzie obsługiwać. Jeśli argument nie zostanie ustawiony, kolejka nie będzie obsługiwać priorytetu wiadomości.x-queue-mode- umożliwia przeniesienie kolejki do tryb leniwy. W tym trybie na dysku będzie przechowywana możliwie jak największa liczba wiadomości. Użycie pamięci RAM będzie minimalne. Jeżeli nie jest ustawiona, kolejka będzie przechowywać wiadomości w pamięci w celu dostarczania ich tak szybko, jak to możliwe.x-queue-master-locator- jeśli mamy klaster, możemy ustawić kolejkę głównąx-ha-policy— używany podczas tworzenia kolejek HA i określa sposób dystrybucji wiadomości pomiędzy węzłami. Jeśli wartość jest ustawionaall, wówczas wiadomość zostanie zapisana na wszystkich węzłach. Jeśli wartość jest ustawionanodes, a następnie wiadomość zostanie zapisana na określonych węzłach klastrax-ha-nodes— określa węzły, do których będzie należeć konkretna kolejkaHA

Jeśli tworzysz kolejkę być może, następnie serwer wyśle synchroniczną wiadomość do klienta RPC odpowiedź Queue.DeclareOk. Jeśli tworzysz kolejkę jest niemożliwe (prośba została odrzucona Queue.Declare) następnie kanał zostanie zamknięty serwer za pomocą polecenia Channel.Close a klient otrzyma wyjątek , który będzie zawierał kod błędu i jego opis.
Przypomnienie sobie czegoś Queue.Declare o podobnych parametrach zwróci przydatne informacje o tej kolejce. Na przykład całkowita liczba wiadomości oczekujących w danej kolejce i całkowita liczba subskrybentów.
Zadzwoń Queue.Declare na podstawie danych uwierzytelniających użytkownika, któremu nie przypisano niezbędnych uprawnień zamknie kanał za pomocą polecenia Channel.Close a klient otrzyma wyjątek , który będzie zawierał kod błędu i jego opis.
Po upływie 10 sekund bezczynności kolejki przechodzi w stan hibernacji, powodując GC w kolejce, co skutkuje znaczną redukcją pamięci wymaganej dla tej kolejki.
Tworzenie kolejki za pomocą GUI
Przejdź do panelu administracyjnego RabbitMQ pod użytkownikiem guest (nazwa użytkownika: guest i hasło: guest). Należy pamiętać, że użytkownik guest można połączyć się tylko z localhosta. Przejdźmy teraz do zakładki Queues i kliknij Add a new queue. Wypełnij właściwości:

Po wprowadzeniu wszystkich niezbędnych danych i kliknięciu Add queueskolejka pojawi się na liście ogólnej.

Kliknięcie na nazwę kolejki spowoduje wyświetlenie szczegółowych informacji na jej temat. Tutaj możesz skonfigurować połączenie między giełdą a kolejką, wyświetlić listę consumers, publikuj/odbieraj wiadomości, usuwaj kolejki i przeglądaj statystyki.
Tworzenie powiązania
Wiązanie jest tworzone przy użyciu synchronicznego RPC żądanie do serwera. Żądanie jest składane za pomocą metody Queue.Bind, wywołane z parametrami:
- nazwa kolejki
- nazwa punktu wymiany
- inne opcje
Przykład tworzenia powiązania za pomocą :
//...
channel.QueueBind(
queue: queueName,
exchange: "my_exchange",
routingKey: "my_key",
arguments: null
);
//...queue— nazwa kolejkiexchange— nazwa wymieniającegoroutingKey— klucz routinguarguments— argumenty opcjonalne

Jeśli tworzysz powiązanie być może, następnie serwer wyśle synchroniczną wiadomość do klienta RPC odpowiedź Queue.BindOk.
Tworzenie powiązania za pomocą interfejsu graficznego
Przejdź do panelu administracyjnego RabbitMQ pod użytkownikiem guest (nazwa użytkownika: guest i hasło: guest). Należy pamiętać, że użytkownik guest można połączyć się tylko z localhosta. Przejdźmy teraz do zakładki Queues i kliknij na kolejkę my_queue. Wypełnij pola sekcji bindings:

Po wprowadzeniu wszystkich niezbędnych danych i kliknięciu Bind, powiązanie zostanie wyświetlone na liście ogólnej:

Code
W tej sekcji opiszemy kolejkę i wiązanie w kodzie C#, tak jakbyśmy mieli opracować bibliotekę. Być może będzie to przydatne dla percepcji.
public interface IQueue
{
string Name { get; }
/// <summary>
/// Если установить true, то queue будет являться постоянным.
/// Она будет храниться на диске и сможет
/// пережить перезапуск сервера/брокера.
/// Если значение false, то queue является временной и будет удаляться,
/// когда сервер/брокер будет перезагружен
/// </summary>
bool IsDurable { get; }
/// <summary>
/// Если значение равно true, то
/// такая очередь будет разрешать подключаться
/// только одному consumer-у
/// </summary>
bool IsExclusive { get; }
/// <summary>
/// Автоматическое удаление.
/// Очередь будет удалена, когда все клиенты отсоединятся.
/// </summary>
bool IsAutoDelete { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}public class Queue : IQueue
{
public Queue(
string name,
bool isDurable = true,
bool isExclusive = false,
bool isAutoDelete = false,
IDictionary<string, object> arguments = null)
{
Name = name ??
throw new ArgumentNullException(name, $"{name} must not be null");
IsDurable = isDurable;
IsExclusive = isExclusive;
IsAutoDelete = isAutoDelete;
Arguments = arguments ?? new Dictionary<string, object>();
}
public string Name { get; }
public bool IsDurable { get; }
public bool IsExclusive { get; }
public bool IsAutoDelete { get; }
public IDictionary<string, object> Arguments { get; }
}public static class QueueMode
{
public const string Default = "default";
/// <summary>
/// Ленивый режим. Ленивый режим заставит сохранять
/// как можно больше сообщений на диске, чтобы уменьшить
/// использование оперативной памяти
/// </summary>
public const string Lazy = "lazy";
}public interface IBinding
{
/// <summary>
/// Обменник, который будет связываться привязкой
/// </summary>
IExchange Exchange { get; }
/// <summary>
/// Ключ маршрутизации
/// </summary>
string RoutingKey { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}public class Binding : IBinding
{
public Binding(
IExchange exchange,
string routingKey,
IDictionary<string, object> arguments)
{
Exchange = exchange;
RoutingKey = routingKey;
Arguments = arguments;
}
public IExchange Exchange { get; }
public string RoutingKey { get; }
public IDictionary<string, object> Arguments { get; }
}Źródło: www.habr.com
