RabbitMQ. Часть 3. Разбираемся с Queues и Bindings
Queue (очередь) — структура данных на диске или в оперативной памяти, которая хранит ссылки на сообщения и отдает их копии consumers (потребителям). Queue представляет собой Erlang-процесс с состоянием (где могут кэшироваться и сами сообщения). 1 тысяча очередей может занимать порядка 80Mb.
Binding (привязка) — правило, которое сообщает обменнику в какую из очередей должны попадать сообщения.
RabbitMQ. Часть 4. Разбираемся с тем что-такое сообщения и фреймы
RabbitMQ. Часть 5. Производительность публикации и потребления сообщений
RabbitMQ. Часть 6. Обзор модулей Federation и Shovel
RabbitMQ. Часть 7. Подробно про Connection и Chanel
RabbitMQ. Часть 8. RabbitMQ в .NET
RabbitMQ. Часть 9. Мониторинг
Временные очереди
Если создание очереди происходит с установленным параметром autoDelete, то такая очередь обретает способность автоматически удалять себя. Такие очереди обычно создаются в момент подключения первого клиента и удаляются в момент, когда все клиенты отсоединились.
Если создание очереди происходит с установленным параметром exclusive, то такая очередь разрешает подключаться только одному потребителю и удаляется если закроется канал. До тех пор пока канал не закроется, клиент может отключаться/подключаться, но только в рамках того же самого соединения. Если параметр exclusive установлен, то параметр autoDelete не имеет никакого эффекта.
Особенности:
при кратковременном разрыве связи мы будем терять сообщения, которые ещё не успели дойти до потребителя
можно поймать феномен binding churn. Феномен возникает, когда количество операций по созданию/удалению очередей и привязок достигает очень больших значений. В кластерном режиме такой поток операций будет расползаться по всем узлам и создаст большую нагрузку. Данный процесс можно оптимизировать за счет контроля количества подписок
Постоянные очереди
Если создание очереди происходит с установленным параметром durable, то такая очередь сохраняет свое состояние и восстанавливается после перезапуска сервера/брокера. Данная очередь будет существовать до тех пор пока не будет вызвана команду Queue.Delete.
Highly Available очереди
Очереди HA требуют кластерной среды RabbitMQ. В кластерном режиме вся информация об обменниках, очередях, привязках и потребителях будет скопирована на все узлы.
Когда сообщение публикуется в какую-то HA очередь, оно хранится на каждом узле, относящемуся к HA очереди. После того как сообщение потребляется на каком-то из узлов, все копии этого сообщения будут удалены на других узлах.
Очереди HA могут распространяться на все узлы в некотором кластере или только на индивидуальные.
Особенности:
использование HA очередей приводит к наказаниям в производительности. При помещении сообщения в некую HA очередь или при потреблении сообщения из HA очереди RabbitMQ должен выполнять координацию по всем узлам (2-3 узла обычно достаточно)
Создание очереди
Создание очереди происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Declare, вызываемого с параметрами:
queue — название очереди, которую мы хотим создать. Название должно быть уникальным и не может совпадать с системным именем очереди
durable — если true, то очередь будет сохранять свое состояние и восстанавливается после перезапуска сервера/брокера
exclusive — если true, то очередь будет разрешать подключаться только одному потребителю
autoDelete — если true, то очередь обретает способность автоматически удалять себя
arguments — необязательные аргументы. Ниже разберем подробнее.
arguments
x-message-ttl(x-message-time-to-live) — позволяет установить время истечения срока жизни сообщения в миллисекундах. Если создание очереди происходит с установленным значением аргумента x-message-ttl, то такая очередь будет автоматически исключать сообщения, у которых истек срок действия. Установка значения аргумента x-message-ttl задает максимальный возраст для всех сообщений в данной очереди. Создание такой очереди позволяет предотвратить получение устаревшей информации. Это можно использовать в системах реального времени. Если у очереди для которой задан обменник для отклоненных сообщений установить значение аргумента x-message-ttl, то отклоненные сообщения в данной очереди начнут обладать сроком жизни.
x-expires — задает значение в миллисекундах по истечению которого происходит удаление очереди. Очередь может израсходовать срок своего действия только если она не имеет никаких подписчиков. Если к очереди подключены подписчики, она сможет автоматически удалиться только тогда, когда все подписчики вызовут Basic.Cancel или отсоединятся. Срок жизни очереди может завершиться только в том случае, если к ней не было запроса Basic.Get. Иначе текущее значение установки времени жизни обнуляется, и очередь больше не будет автоматически удаляться. Также нет гарантий того, насколько быстро происходит удаление очереди после истечения её срока жизни.
x-max-length — задает максимальное число сообщений в очереди. Если число сообщений в очереди начинает превышать макимальное чило, то начинают удаляться самые старые
x-max-lenght-bytes — задает максимально допустимый суммарный размер полезной нагрузки сообщений в очереди. При превышении установленного значения (возникло переполнение очереди при очередной публикации сообщения) самые старые сообщения начнут удаляться
x-overflow — данный аргумент используется для настройки поведения в результате переполнения очереди. Доступны два значения: drop-head (значение по умолчанию) и reject-publish. Если выбрать drop-head, то самые старые сообщения будут удаляться. Если выбрать reject-publish, то прием сообщений будет приостановлен
x-dead-letter-exchange — задает exchange, в который направляются отвергнутые сообщения, которые не поставлены повторно в очередь
x-dead-letter-routing-key — задает не обязательный ключ маршрутизации для отвергнутых сообщений
x-max-priority — разрешает сортировку по приоритетам в очереди с максимальным значением приоритета 255 (RabbitMQ версий 3.5.0 и выше). Число указывает максимальный приоритет, который будет поддерживать очередь. Если аргумент не установлен, очередь не будет поддерживать приоритет сообщений
x-queue-mode — позволяет перевести очередь в ленивый режим. В таком режиме как можно больше сообщений будет храниться на диске. Использование оперативной памяти будет минимально. В случае, если он не установлен, очередь будет хранить сообщения в памяти, чтобы доставлять сообщения максимально быстро
x-queue-master-locator — если у нас кластер, то можно задать мастер очередь
x-ha-policy — используется при создании HA очередей и определяет как сообщение будет распространяться по узлам. Если установлено значение all, то сообщение будет сохраняться на всех узлах. Если установлено значение nodes, то сообщение будет сохраняться на определенных узлах кластера
x-ha-nodes — задает узлы, к которым будет относиться некая очередь HA
Если создание очереди возможно, то сервер отправит клиенту синхронный RPC ответ Queue.DeclareOk. Если создание очереди невозможно (произошел отказ по запросу Queue.Declare), то канал закроется сервером при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки и ее описание.
Повторный вызовQueue.Declareс аналогичными параметрами вернет полезную информацию об этой очереди. Например, общее число сообщений, находящихся в ожидании в данной очереди, и общее число подписанных на неё потребителей.
Вызов Queue.Declare под учетными данными пользователя, которому не назначены необходимые права закроет канал при помощи команды Channel.Close и клиент получит исключение OperationInterruptedException, которое будет содержать код ошибки 403 и ее описание.
После того, как очередь простаивает в течении >= 10 секунд, она впадает в спящий режим, вызывая GC в очереди, что приводит к значительному сокращению памяти, необходимой для этой очереди.
Создание Queue через графический интерфейс
Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и нажмем на Add a new queue. Заполняем свойства:
Как только мы введем все необходимые данные и нажмете на Add queues, очередь появится в общем списке.
Щелчок по имени очереди покажет ее детальную информацию. Здесь можно настроить привязку между обменом и очередью, посмотреть список consumers, опубликовать/получить сообщения, удалить очередь и посмотреть статистику.
Создание Binding
Создание привязки происходит при помощи синхронного RPC запроса к серверу. Запрос осуществляется при помощи метода Queue.Bind, вызываемого с параметрами:
Если создание привязки возможно, то сервер отправит клиенту синхронный RPC ответ Queue.BindOk.
Создание Binding через графический интерфейс
Заходим в панель администратора RabbitMQ под пользователем guest (username: guest и password: guest). Обратите внимание, что пользователь guest может подключаться только с локального хоста. Теперь перейдем на вкладку Queues и жмем на очередь my_queue. Заполняем поля раздела bindings:
Как только мы введем все необходимые данные и нажмем на Bind, привязка отобразится в общем списке:
Code
В данном разделе опишем очередь и привязку кодом на C#, так если бы нам требовалось разработать библиотеку. Возможно это будет полезно для восприятия.
public interface IQueue
{
string Name { get; }
/// <summary>
/// Если установить true, то queue будет являться постоянным.
/// Она будет храниться на диске и сможет
/// пережить перезапуск сервера/брокера.
/// Если значение false, то queue является временной и будет удаляться,
/// когда сервер/брокер будет перезагружен
/// </summary>
bool IsDurable { get; }
/// <summary>
/// Если значение равно true, то
/// такая очередь будет разрешать подключаться
/// только одному consumer-у
/// </summary>
bool IsExclusive { get; }
/// <summary>
/// Автоматическое удаление.
/// Очередь будет удалена, когда все клиенты отсоединятся.
/// </summary>
bool IsAutoDelete { get; }
/// <summary>
/// Необязательные аргументы
/// </summary>
IDictionary<string, object> Arguments { get; }
}
public class Queue : IQueue
{
public Queue(
string name,
bool isDurable = true,
bool isExclusive = false,
bool isAutoDelete = false,
IDictionary<string, object> arguments = null)
{
Name = name ??
throw new ArgumentNullException(name, $"{name} must not be null");
IsDurable = isDurable;
IsExclusive = isExclusive;
IsAutoDelete = isAutoDelete;
Arguments = arguments ?? new Dictionary<string, object>();
}
public string Name { get; }
public bool IsDurable { get; }
public bool IsExclusive { get; }
public bool IsAutoDelete { get; }
public IDictionary<string, object> Arguments { get; }
}
public static class QueueMode
{
public const string Default = "default";
/// <summary>
/// Ленивый режим. Ленивый режим заставит сохранять
/// как можно больше сообщений на диске, чтобы уменьшить
/// использование оперативной памяти
/// </summary>
public const string Lazy = "lazy";
}