KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Queue (kolejka) – struktura danych na dysku lub w pamięci RAM, która przechowuje odwołania do wiadomości i zwraca ich kopie consumers (do konsumentów). Queue jest a Proces Erlanga ze stanem (gdzie same wiadomości mogą być buforowane). 1 kolejek może zająć około 80Mb.

Binding (wiążąca) – reguła informująca giełdę, do której kolejki mają trafić wiadomości.

Spis treści

Tymczasowe kolejki

Jeżeli kolejka jest tworzona z zestawem parametrów autoDelete, wówczas taka kolejka nabywa zdolność automatycznie usuń siebie. Takie kolejki są zazwyczaj tworzone, gdy łączy się pierwszy klient, i usuwane, gdy wszyscy klienci się rozłączą.

Jeżeli kolejka jest tworzona z zestawem parametrów exclusive, to taka kolejka umożliwia podłączenie tylko jednego konsumenta i zostaje usunięty po zamknięciu kanału. Dopóki kanał nie zostanie zamknięty, klient może się rozłączać i łączyć, ale tylko w ramach tego samego połączenia. Jeśli parametr exclusive jest ustawiony, wówczas parametr autoDelete nie ma żadnego efektu.

Cechy:

  • jeśli nastąpi krótkotrwała przerwa w połączeniu, utracimy wiadomości, które jeszcze nie dotarły do ​​konsumenta
  • możesz złapać to zjawisko binding churn. Zjawisko to występuje, gdy liczba operacji tworzenia/usuwania kolejek i powiązań osiąga bardzo duże wartości. W trybie klastrowym taki przepływ operacji rozłoży się na wszystkie węzły i spowoduje duże obciążenie. Proces ten można zoptymalizować poprzez kontrolowanie liczby subskrypcji.

Ciągłe kolejki

Jeżeli kolejka jest tworzona z zestawem parametrów durable, to taka kolejka utrzymuje swój stan i zostanie przywrócony po ponownym uruchomieniu serwera/brokera. Kolejka ta będzie istnieć do momentu wywołania polecenia. Queue.Delete.

Kolejka o wysokiej dostępności

Kolejki HA wymagają środowiska klastra RabbitMQ. W trybie klastra wszystkie informacje o wymiennikach, kolejkach, powiązaniach i konsumentach zostaną skopiowane do wszystkich węzłów.

Gdy wiadomość zostanie opublikowana w kolejce HA, jest ona zapisywana na każdym węźle należącym do kolejki HA. Po odebraniu wiadomości na dowolnym węźle wszystkie kopie tej wiadomości na innych węzłach zostaną usunięte.

Kolejki HA mogą być dystrybuowane do wszystkich węzłów w klastrze lub tylko do pojedynczych węzłów.

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Cechy:

  • Korzystanie z kolejek HA skutkuje obniżeniem wydajności. Podczas umieszczania wiadomości w kolejce HA lub pobierania wiadomości z kolejki HA, RabbitMQ musi wykonać koordynację na wszystkich węzłach (zwykle wystarczają 2–3 węzły)

Tworzenie kolejki

Kolejka jest tworzona przy użyciu metody synchronicznej RPC żądanie do serwera. Żądanie jest składane za pomocą metody Queue.Declare, wywołane z parametrami:

  • nazwa kolejki
  • inne opcje

Przykład tworzenia kolejki za pomocą Klient RabbitMQ:

// ...
channel.QueueDeclare(
    queue: "my_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null
);
// ...

  • queue — nazwa kolejki, którą chcemy utworzyć. Nazwa musi być unikatowa i nie może być taka sama, jak nazwa kolejki systemowej.
  • durable - jeśli to prawda, to kolejka będzie utrzymuj swój stan i zostanie przywrócony po ponownym uruchomieniu serwera/brokera
  • exclusive - jeśli prawda, kolejka pozwoli na podłączenie tylko jednego konsumenta
  • autoDelete - jeśli prawda, to kolejka zyskuje możliwość automatycznie usuń siebie
  • arguments — argumenty opcjonalne. Przyjrzymy się temu bliżej poniżej.

argumenty

  • x-message-ttl(x-message-time-to-live) - umożliwia ustawienie czasu wygaśnięcia wiadomości w milisekundach. Jeśli kolejka jest tworzona z ustawioną wartością argumentu x-message-ttl, to będzie taka kolejka automatycznie wykluczaj wygasłe wiadomości. Ustawianie wartości argumentu x-message-ttl ustawia maksymalny wiek wszystkich wiadomości w tej kolejce. Tworzenie takiej kolejki pozwala zapobiec otrzymywaniu nieaktualnych informacji. Można go stosować w systemach czasu rzeczywistego. Jeżeli kolejka, dla której określono wymieniacz odrzuconych wiadomości, jest określona, ​​ustaw wartość argumentu x-message-ttl, a następnie odrzucone wiadomości w tej kolejce zacznie mieć żywotność.
  • x-expires — określa wartość w milisekundach, po której kolejka zostanie usunięta. Kolejka może wygasnąć tylko wtedy, gdy nie ma żadnych subskrybentów. Jeśli kolejka ma podłączonych abonentów, będzie można ją automatycznie usunąć dopiero po tym, jak wszyscy abonenci zadzwonią. Basic.Cancel lub rozłącz się. Czas życia kolejki może wygasnąć tylko wtedy, gdy nie ma żadnego żądania. Basic.Get. W przeciwnym wypadku bieżąca wartość ustawienia czasu życia zostanie zresetowana do zera, a kolejka nie będzie już automatycznie usuwana. Również nie ma gwarancji, jak szybko kolejka zostanie usunięta po upływie jej żywotności.
  • x-max-length — ustawia maksymalną liczbę wiadomości w kolejce. Jeżeli liczba wiadomości w kolejce zaczyna przekraczać wartość maksymalną, najstarsze wiadomości zaczynają być usuwane.

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

  • x-max-lenght-bytes — ustawia maksymalny dopuszczalny całkowity rozmiar ładunku wiadomości w kolejce. W przypadku przekroczenia ustawionej wartości (kolejka zapełni się w czasie kolejnej publikacji wiadomości) najstarsze wiadomości zostaną usunięte
  • x-overflow — ten argument służy do skonfigurowania zachowania w przypadku przepełnienia kolejki. Dostępne są dwie wartości: drop-head (wartość domyślna) i reject-publish. Jeśli wybierzesz drop-head, wówczas najstarsze wiadomości zostaną usunięte. Jeśli wybierzesz reject-publish, wówczas odbiór wiadomości zostanie zawieszony
  • x-dead-letter-exchange — określa giełdę, do której wysyłane są odrzucone wiadomości, które nie zostały ponownie umieszczone w kolejce
  • x-dead-letter-routing-key — określa opcjonalny klucz routingu dla odrzuconych wiadomości
  • x-max-priority — umożliwia sortowanie według priorytetu w kolejce z maksymalną wartością priorytetu 255 (wersje RabbitMQ 3.5.0 i nowsze). Liczba ta określa maksymalny priorytet, jaki kolejka będzie obsługiwać. Jeśli argument nie zostanie ustawiony, kolejka nie będzie obsługiwać priorytetu wiadomości.
  • x-queue-mode - umożliwia przeniesienie kolejki do tryb leniwy. W tym trybie na dysku będzie przechowywana możliwie jak największa liczba wiadomości. Użycie pamięci RAM będzie minimalne. Jeżeli nie jest ustawiona, kolejka będzie przechowywać wiadomości w pamięci w celu dostarczania ich tak szybko, jak to możliwe.
  • x-queue-master-locator - jeśli mamy klaster, możemy ustawić kolejkę główną
  • x-ha-policy — używany podczas tworzenia kolejek HA i określa sposób dystrybucji wiadomości pomiędzy węzłami. Jeśli wartość jest ustawiona all, wówczas wiadomość zostanie zapisana na wszystkich węzłach. Jeśli wartość jest ustawiona nodes, a następnie wiadomość zostanie zapisana na określonych węzłach klastra
  • x-ha-nodes — określa węzły, do których będzie należeć konkretna kolejka HA

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Jeśli tworzysz kolejkę być może, następnie serwer wyśle ​​synchroniczną wiadomość do klienta RPC odpowiedź Queue.DeclareOk. Jeśli tworzysz kolejkę jest niemożliwe (prośba została odrzucona Queue.Declare) następnie kanał zostanie zamknięty serwer za pomocą polecenia Channel.Close a klient otrzyma wyjątek Wyjątek przerwania operacji, który będzie zawierał kod błędu i jego opis.

Przypomnienie sobie czegoś Queue.Declare o podobnych parametrach zwróci przydatne informacje o tej kolejce. Na przykład całkowita liczba wiadomości oczekujących w danej kolejce i całkowita liczba subskrybentów.

Zadzwoń Queue.Declare na podstawie danych uwierzytelniających użytkownika, któremu nie przypisano niezbędnych uprawnień zamknie kanał za pomocą polecenia Channel.Close a klient otrzyma wyjątek Wyjątek przerwania operacji, który będzie zawierał kod błędu 403 i jego opis.

Po upływie 10 sekund bezczynności kolejki przechodzi w stan hibernacji, powodując GC w kolejce, co skutkuje znaczną redukcją pamięci wymaganej dla tej kolejki.

Tworzenie kolejki za pomocą GUI

Przejdź do panelu administracyjnego RabbitMQ pod użytkownikiem guest (nazwa użytkownika: guest i hasło: guest). Należy pamiętać, że użytkownik guest można połączyć się tylko z localhosta. Przejdźmy teraz do zakładki Queues i kliknij Add a new queue. Wypełnij właściwości:

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Po wprowadzeniu wszystkich niezbędnych danych i kliknięciu Add queueskolejka pojawi się na liście ogólnej.

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Kliknięcie na nazwę kolejki spowoduje wyświetlenie szczegółowych informacji na jej temat. Tutaj możesz skonfigurować połączenie między giełdą a kolejką, wyświetlić listę consumers, publikuj/odbieraj wiadomości, usuwaj kolejki i przeglądaj statystyki.

Tworzenie powiązania

Wiązanie jest tworzone przy użyciu synchronicznego RPC żądanie do serwera. Żądanie jest składane za pomocą metody Queue.Bind, wywołane z parametrami:

  • nazwa kolejki
  • nazwa punktu wymiany
  • inne opcje

Przykład tworzenia powiązania za pomocą Klient RabbitMQ:

//...
channel.QueueBind(
    queue: queueName,
    exchange: "my_exchange",
    routingKey: "my_key",
    arguments: null
);
//...

  • queue — nazwa kolejki
  • exchange — nazwa wymieniającego
  • routingKey — klucz routingu
  • arguments — argumenty opcjonalne

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Jeśli tworzysz powiązanie być może, następnie serwer wyśle ​​synchroniczną wiadomość do klienta RPC odpowiedź Queue.BindOk.

Tworzenie powiązania za pomocą interfejsu graficznego

Przejdź do panelu administracyjnego RabbitMQ pod użytkownikiem guest (nazwa użytkownika: guest i hasło: guest). Należy pamiętać, że użytkownik guest można połączyć się tylko z localhosta. Przejdźmy teraz do zakładki Queues i kliknij na kolejkę my_queue. Wypełnij pola sekcji bindings:

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Po wprowadzeniu wszystkich niezbędnych danych i kliknięciu Bind, powiązanie zostanie wyświetlone na liście ogólnej:

KrólikMQ. Część 3. Zrozumienie kolejek i powiązań

Code

W tej sekcji opiszemy kolejkę i wiązanie w kodzie C#, tak jakbyśmy mieli opracować bibliotekę. Być może będzie to przydatne dla percepcji.

public interface IQueue
    {        
        string Name { get; }

        /// <summary>
        ///     Если установить true, то queue будет являться постоянным. 
        ///     Она будет храниться на диске и сможет 
        ///     пережить перезапуск сервера/брокера. 
        ///     Если значение false, то queue является временной и будет удаляться, 
        ///     когда сервер/брокер будет перезагружен
        /// </summary>
        bool IsDurable { get; }

        /// <summary>
        ///     Если значение равно true, то 
        ///     такая очередь будет разрешать подключаться 
        ///     только одному consumer-у
        /// </summary>
        bool IsExclusive { get; }

        /// <summary>
        ///     Автоматическое удаление. 
        ///     Очередь будет удалена, когда все клиенты отсоединятся.
        /// </summary>
        bool IsAutoDelete { get; }

        /// <summary>
        ///     Необязательные аргументы
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Queue : IQueue
    {
        public Queue(
             string name, 
             bool isDurable = true, 
             bool isExclusive = false, 
             bool isAutoDelete = false, 
             IDictionary<string, object> arguments = null)
        {
            Name = name ??
                throw new ArgumentNullException(name, $"{name} must not be null");

            IsDurable = isDurable;
            IsExclusive = isExclusive;
            IsAutoDelete = isAutoDelete;
            Arguments = arguments ?? new Dictionary<string, object>();
        }

        public string Name { get; }
        public bool IsDurable { get; }
        public bool IsExclusive { get; }
        public bool IsAutoDelete { get; }
        public IDictionary<string, object> Arguments { get; }
    }

public static class QueueMode
    {       
        public const string Default = "default";
        /// <summary>
        ///     Ленивый режим. Ленивый режим заставит сохранять 
        ///     как можно больше сообщений на диске, чтобы уменьшить 
        ///     использование оперативной памяти
        /// </summary>
        public const string Lazy = "lazy";
    }

public interface IBinding
    {
        /// <summary>
        ///     Обменник, который будет связываться привязкой
        /// </summary>
        IExchange Exchange { get; }

        /// <summary>
        ///     Ключ маршрутизации
        /// </summary>
        string RoutingKey { get; }

        /// <summary>
        ///     Необязательные аргументы
        /// </summary>
        IDictionary<string, object> Arguments { get; }
    }

public class Binding : IBinding
    {
        public Binding(
             IExchange exchange, 
             string routingKey, 
             IDictionary<string, object> arguments)
        {
            Exchange = exchange;
            RoutingKey = routingKey;
            Arguments = arguments;
        }

        public IExchange Exchange { get; }
        public string RoutingKey { get; }
        public IDictionary<string, object> Arguments { get; }
    }

Źródło: www.habr.com

Dodaj komentarz