Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka

Продовження перекладу невеликої книги:
"Understanding Message Brokers",
автор: Jakub Korab, видавництво: O'Reilly Media, Inc., дата видання: June 2017, ISBN: 9781492049296.

Попередня перекладена частина: Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 1. Вступ

ГЛАВА 3

Кафка

Kafka була розроблена в LinkedIn для того, щоб обійти деякі обмеження традиційних брокерів повідомлень і уникнути необхідності настроювати кілька брокерів повідомлень для різних взаємодій «точка-точка», що описано в цій книзі в розділі «Вертикальне та горизонтальне масштабування» на сторінці 28. Сценарії використання в LinkedIn в основному ґрунтувалися на односпрямованому поглинанні дуже великих обсягів даних, таких як кліки на сторінках та журнали доступу, водночас дозволяючи використовувати ці дані кільком системам, не впливаючи на продуктивність продюсерів чи інших консюмерів. Фактично причина існування Kafka полягає в тому, щоб отримати таку архітектуру обміну повідомленнями, яку описує Universal Data Pipeline.

З урахуванням цієї кінцевої мети, звісно, ​​виникли інші вимоги. Kafka повинна:

  • Бути надзвичайно швидкою
  • Надавати велику пропускну здатність під час роботи з повідомленнями
  • Підтримувати моделі «Видавець-Підписчик» та «Точка-Точка»
  • Не сповільнюватися з додаванням споживачів. Наприклад, продуктивність і черги, і топіка ActiveMQ погіршується при зростанні кількості споживачів на адресаті
  • бути горизонтально масштабованою; якщо один брокер, що зберігає (persists) повідомлення, може робити це тільки на максимальній швидкості диска, то для збільшення продуктивності є сенс вийти за межі одного екземпляра брокера
  • Розмежовувати доступ до зберігання та повторного вилучення повідомлень

Щоб досягти всього цього, Kafka прийнята архітектура, яка перевизначила ролі та обов'язки клієнтів і брокерів обміну повідомленнями. Модель JMS дуже орієнтована на брокер, де він відповідає за поширення повідомлень, а клієнти повинні турбуватися лише про відправлення та отримання повідомлень. Kafka, з іншого боку, орієнтована на клієнта, при цьому клієнт бере на себе багато функцій традиційного брокера, такі як справедливий розподіл відповідних повідомлень серед споживачів, в обмін отримуючи надзвичайно швидкий та масштабований брокер. Для людей, які працювали з традиційними системами обміну повідомленнями, робота з Kafka потребує фундаментальних змін у поглядах.
Це інженерне спрямування призвело до створення інфраструктури обміну повідомленнями, здатної на багато порядків збільшити пропускну здатність проти звичайним брокером. Як ми побачимо, цей підхід пов'язаний із компромісами, які означають, що Kafka не підходить для певних типів навантажень та встановленого програмного забезпечення.

Уніфікована модель адресата

Щоб виконати вимоги, описані вище, Kafka об'єднала обмін повідомленнями на кшталт «публікація-підписка» та «точка-точка» в рамках одного виду адресата. топіка. Це спантеличує людей, які працювали з системами обміну повідомленнями, де слово «топік» відноситься до широкомовного механізму, з якого (з топіка) читання не є надійним (is nondurable). Топики Kafka слід розглядати як гібридний тип адресата, відповідно до визначення, даного у вступі до цієї книги.

У частині цього розділу, якщо ми явно не вкажемо інше, термін «топік» буде відноситися до топіка Kafka.

Щоб повністю зрозуміти, як поводяться топіки і які гарантії вони надають, нам потрібно спочатку розглянути, як вони реалізовані в Kafka.
Кожен топік у Kafka має свій журнал.
Продюсери, які надсилають повідомлення в Kafka, дописують до цього журналу, а консюмери читають із журналу за допомогою покажчиків, які постійно переміщуються вперед. Періодично Kafka видаляє найстаріші частини журналу, незалежно від того, були повідомлення прочитані в цих частинах чи ні. Центральною частиною дизайну Kafka є те, що брокер не дбає про те, чи прочитані повідомлення чи ні – це відповідальність клієнта.

Терміни «журнал» та «покажчик» не зустрічаються в документації Kafka. Ці добре відомі терміни використовуються тут, щоб допомогти розумінню.

Ця модель повністю відрізняється від ActiveMQ, де повідомлення з усіх черг зберігаються в одному журналі, а брокер позначає повідомлення як віддалені, після того як вони були прочитані.
Давайте тепер трохи заглибимося та розглянемо журнал топіка докладніше.
Журнал Kafka складається з кількох партицій (Малюнок 3-1). Kafka гарантує строгу впорядкованість у кожній партиції. Це означає, що повідомлення, записані в партицію у визначеному порядку, будуть прочитані у тому порядку. Кожна партиція реалізована у вигляді циклічного (rolling) файлу журналу, що містить підмножина (subset) всіх повідомлень, відправлених у топік його продюсерами. Створений топік містить за замовчуванням одну партицію. Ідея партицій – це центральна ідея Kafka для горизонтального масштабування.

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
3-1. Партії Kafka

Коли продюсер відправляє повідомлення в топік Kafka, він вирішує, яку партицію відправити повідомлення. Ми розглянемо це докладніше пізніше.

Читання повідомлень

Клієнт, який хоче прочитати повідомлення, керує іменованим покажчиком, що називається група консюмерів (consumer group), який вказує на зміщення (offset) повідомлення у партиції. Зміщення - це позиція зі зростаючим номером, яка починається з 0 на початку партиції. Ця група консюмерів, на яку посилаються в API через ідентифікатор group_id, що визначається користувачем, відповідає одному логічному споживачеві чи системі.

Більшість систем, які використовують обмін повідомленнями, читають дані з адресата за допомогою кількох екземплярів та потоків для паралельної обробки повідомлень. Таким чином, зазвичай буде багато екземплярів консюмерів, спільно використовують одну й ту саму групу консюмерів.

Проблему читання можна так:

  • Топік має кілька партицій
  • Використовувати топ може одночасно безліч груп консюмерів
  • Група консюмерів може мати кілька окремих екземплярів

Це нетривіальна проблема «багато хто до багатьох». Щоб зрозуміти, як Kafka поводиться з відносинами між групами консюмерів, екземплярами консюмерів і партіціями, розглянемо ряд сценаріїв читання, що поступово ускладнюються.

Консюмери та групи консюмерів

Давайте візьмемо як відправну точку топік з однією партицією (Малюнок 3-2).

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
Figure 3-2. Консюмер читає із партиції

Коли екземпляр консюмера підключається зі своїм власним group_id до цього топіка, йому призначається партиція для читання та зміщення в цій партиції. Положення цього зміщення конфігурується в клієнті, як покажчик на останню позицію (найновіше повідомлення) або ранню позицію (найстаріше повідомлення). Консюмер запитує (polls) повідомлення з топіка, що призводить до їх послідовного читання з журналу.
Позиція усунення регулярно комітується назад у Kafka і зберігається, як повідомлення у внутрішньому топіці _consumer_offsets. Прочитані повідомлення не видаляються, на відміну від звичайного брокера, і клієнт може перемотати (rewind) зсув, щоб повторно обробити вже переглянуті повідомлення.

Коли підключається другий логічний консюмер, використовуючи інший group_id, він управляє другим покажчиком, який залежить від першого (Малюнок 3-3). Таким чином, топік Kafka діє як черга, в якій існує один консюмер і, як звичайний топік видавець-передплатник (pub-sub), на який підписано кілька консюмерів, з додатковою перевагою, що всі повідомлення зберігаються і можуть оброблятися кілька разів.

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
Figure 3-3. Два консюмери в різних групах консюмерів читають із однієї партиції

Консюмери у групі консюмерів

Коли один екземпляр консюмера читає дані з партиції, він повністю контролює покажчик та обробляє повідомлення, як описано у попередньому розділі.
Якщо кілька екземплярів консюмерів були підключені з одним і тим же group_id до топіка з однією партицією, то екземпляру, який підключився останнім, буде передано контроль над покажчиком і з цього моменту він отримуватиме всі повідомлення (Малюнок 3-4).

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
Figure 3-4. Два консюмери в одній і тій же групі консюмерів читають із однієї партиції

Цей режим обробки, у якому кількість екземплярів консюмерів перевищує число партицій, можна як різновид монопольного споживача. Це може бути корисно, якщо вам потрібна «активно-пасивна» (або «гаряча-тепла») кластеризація ваших екземплярів консюмерів, хоча паралельна робота кількох консюмерів («активно-активна» або «гаряча-гаряча») набагато типовіша, ніж консюмери в режимі очікування.

Така поведінка розподілу повідомлень, описана вище, може викликати подив у порівнянні з тим, як поводиться звичайна черга JMS. У цій моделі повідомлення, надіслані у чергу, будуть рівномірно розподілені між двома консюмерами.

Найчастіше, коли ми створюємо кілька екземплярів консюмерів, ми робимо це або для паралельної обробки повідомлень, або для збільшення швидкості читання, або для підвищення стійкості читання. Оскільки читати дані з партиції може одночасно лише один екземпляр консюмера, як це досягається в Kafka?

Один із способів зробити це – використовувати один екземпляр консюмера, щоб прочитати всі повідомлення та передати їх у пул потоків. Хоча цей підхід збільшує пропускну спроможність обробки, він збільшує складність логіки консюмерів і нічого не робить для підвищення стійкості читання. Якщо один екземпляр консюмера відключається через збій живлення або аналогічну подію, то вичитка припиняється.

Канонічним способом вирішення цієї проблеми в Kafka є використання бОбільшої кількості партицій.

Партіціювання

Партиції є основним механізмом розпаралелювання читання та масштабування топіка за межі пропускної спроможності одного екземпляра брокера. Щоб краще зрозуміти це, давайте розглянемо ситуацію, коли існує топік з двома партіціями і на цей топік підписується один консюмер (Малюнок 3-5).

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
Figure 3-5. Один консюмер читає з кількох партицій

У цьому сценарії консюмеру дається контроль над покажчиками, що відповідають його group_id в обох партіях, і починається читання повідомлень з обох партій.
Коли до цього топіка додається додатковий консюмер для того ж group_id, Kafka перепризначає (reallocate) одну з партицій з першого на другий консюмер. Після чого кожен екземпляр консюмера вичитуватиме з однієї партиції топіка (Малюнок 3-6).

Щоб забезпечити обробку повідомлень паралельно до 20 потоків, вам знадобиться щонайменше 20 партицій. Якщо партицій буде менше, у вас залишаться консюмери, яким нема над чим працювати, що описано раніше в обговоренні монопольних консюмерів.

Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Розділ 3. Kafka
Figure 3-6. Два консюмери в одній і тій же групі консюмерів читають із різних партицій

Ця схема значно знижує складність роботи брокера Kafka порівняно з розподілом повідомлень, необхідним підтримки черги JMS. Тут не потрібно дбати про наступні моменти:

  • Який консюмер повинен отримати наступне повідомлення, ґрунтуючись на циклічному (round-robin) розподілі, поточній ємності буферів попередньої вибірки або попередніх повідомленнях (як груп повідомлень JMS).
  • Які повідомлення надіслані якимсь консюмерам і чи повинні вони бути доставлені повторно у разі збою.

Все, що повинен зробити брокер Kafka - це послідовно передавати повідомлення консюмер, коли останній запитує їх.

Однак, вимоги до розпаралелювання вичитки та повторного відправлення невдалих повідомлень нікуди не діваються — відповідальність за них просто переходить від брокера до клієнта. Це означає, що вони мають бути враховані у вашому коді.

Відправка повідомлень

Відповідальність за рішення, до якої партиції надіслати повідомлення, покладається на продюсер цього повідомлення. Щоб зрозуміти механізм, за допомогою якого це робиться, спочатку слід розглянути, що саме ми насправді відправляємо.

У той час, як у JMS ми використовуємо структуру повідомлення з метаданими (заголовками та властивостями) та тілом, що містить корисне навантаження (payload), у Kafka повідомлення є парою «ключ-значення». Корисне навантаження повідомлення надсилається, як значення (value). Ключ, з іншого боку, використовується головним чином для партиціонування і має містити специфічний для бізнес-логіки ключ, щоб помістити пов'язані повідомлення в ту саму партію.

У Главі 2 ми обговорювали сценарій онлайн-ставок, коли пов'язані події повинні оброблятися по порядку одним консюмером:

  1. Обліковий запис користувача налаштований.
  2. Гроші зараховуються на рахунок.
  3. Робиться ставка, яка виводить гроші з рахунку.

Якщо кожна подія є повідомленням, надісланим у топік, то в цьому випадку природним ключем буде ідентифікатор облікового запису.
Коли повідомлення надсилається за допомогою Kafka Producer API, воно передається функції партиціонування, яка, враховуючи повідомлення та поточний стан кластера Kafka, повертає ідентифікатор партиції, до якої має бути надіслано повідомлення. Ця функція реалізована Java через інтерфейс Partitioner.

Цей інтерфейс виглядає так:

interface Partitioner {
    int partition(String topic,
        Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
}

Реалізація Partitioner для визначення партиції використовує за умовчанням алгоритм хешування ключа (general-purpose hashing algorithm over the key) або циклічний перебір (round-robin), якщо ключ не вказано. Це значення за замовчуванням працює добре в більшості випадків. Однак у майбутньому ви захочете написати свій власний.

Написання власної стратегії партиціонування

Давайте розглянемо приклад, коли ви хочете відправити метадані разом із корисним навантаженням повідомлення. Корисне навантаження у нашому прикладі – це інструкція для внесення депозиту на ігровий рахунок. Інструкція - це те, що ми хотіли б гарантовано не модифікувати при передачі і хочемо бути впевнені, що тільки довірена система, що стоїть вище, може ініціювати цю інструкцію. У цьому випадку система, що відправляє та приймає, узгоджують використання підпису для перевірки автентичності повідомлення.
У звичайному JMS ми просто визначаємо властивість «підпис повідомлення» та додаємо його до повідомлення. Тим не менш, Kafka не надає нам механізму передачі метаданих — тільки ключ і значення.

Оскільки значення – це корисне навантаження банківського переказу (bank transfer payload), цілісність якого ми хочемо зберегти, у нас не залишається іншого вибору, крім визначення структури даних для використання у ключі. Припускаючи, що нам потрібен ідентифікатор облікового запису для партиціонування, тому що всі повідомлення, які стосуються облікового запису, повинні оброблятися по порядку, ми вигадаємо таку структуру JSON:

{
  "signature": "541661622185851c248b41bf0cea7ad0",
  "accountId": "10007865234"
}

Оскільки значення підпису варіюватиметься залежно від корисного навантаження, дефолтна стратегія хешування інтерфейсу Partitioner не надійно групуватиме пов'язані повідомлення. Тому нам потрібно буде написати свою власну стратегію, яка аналізуватиме цей ключ і розділятиме (partition) значення accountId.

Kafka включає контрольні суми для виявлення пошкодження повідомлень у сховищі та має повний набір функцій безпеки. Навіть у цьому випадку іноді з'являються специфічні для галузі вимоги, такі як вище.

Користувацька стратегія партиціонування повинна гарантувати, що всі пов'язані повідомлення виявляться в одній партиції. Хоча це здається простим, але вимога може бути ускладнена через важливість упорядкування пов'язаних повідомлень та того, наскільки фіксовано кількість партій у топіці.

Кількість партій у топіці може змінюватися з часом, оскільки їх можна додати, якщо трафік виходить за межі початкових очікувань. Таким чином, ключі повідомлень можуть бути пов'язані з партицією, в яку вони були спочатку відправлені, маючи на увазі частину стану, що має бути розподілено між екземплярами продюсера.

Іншим чинником, який слід зважати, є рівномірність розподілу повідомлень між партіціями. Як правило, ключі не розподіляються рівномірно за повідомленнями, і хеш функції не гарантують справедливий розподіл повідомлень для невеликого набору ключів.
Важливо відзначити, що, хоч би як ви вирішили розділити повідомлення, сам роздільник, можливо, доведеться використовувати повторно.

Розглянемо вимогу реплікації даних між кластерами Kafka у різних географічних розташуваннях. Для цієї мети Kafka поставляється з інструментом командного рядка під назвою MirrorMaker, який використовується для читання повідомлень з одного кластера та передачі їх до іншого.

MirrorMaker повинен розуміти ключі топіка, що реплікується, щоб підтримувати відносний порядок між повідомленнями при реплікації між кластерами, оскільки кількість партій для цього топіка може не збігатися в двох кластерах.

Користувальницькі стратегії партіціонування зустрічаються відносно рідко, оскільки дефолтні хешування або циклічний перебір успішно працюють у більшості сценаріїв. Однак, якщо вам потрібні суворі гарантії впорядкування або вам необхідно витягти метадані з корисних навантажень, то партіціонування - це те, на що вам слід подивитись детальніше.

Переваги масштабованості та продуктивності Kafka обумовлені перенесенням деяких обов'язків традиційного брокера на клієнта. У цьому випадку приймається рішення про розподіл потенційно пов'язаних повідомлень за кількома консюмерами, що працюють паралельно.

JMS брокери також повинні мати справу з такими вимогами. Цікаво, що механізм відправлення пов'язаних повідомлень одному і тому ж консюмеру, реалізований через JMS Message Groups (різновид стратегії балансування sticky load balancing (SLB)), також вимагає, щоб відправник позначав повідомлення як пов'язані. У випадку JMS, брокер відповідає за відправку цієї групи пов'язаних повідомлень одному консюмеру з багатьох та передачу прав власності на групу, якщо консюмер відвалився.

Угоди щодо продюсера

Партиціонування – це не єдине, що необхідно враховувати при надсиланні повідомлень. Давайте розглянемо методи send() класу Producer у Java API:

Future < RecordMetadata > send(ProducerRecord < K, V > record);
Future < RecordMetadata > send(ProducerRecord < K, V > record, Callback callback);

Слід відразу зазначити, що обидва методи повертають Future, що свідчить про те, що операція відправки не виконується негайно. В результаті виходить, що повідомлення (ProducerRecord) записується в буфер відправки кожної активної партиції і передається брокеру фоновим потоком в бібліотеці клієнта Kafka. Хоча це робить роботу неймовірно швидкою, це означає, що недосвідчено написана програма може втратити повідомлення, якщо його процес буде зупинено.

Як завжди, є спосіб зробити операцію відправлення надійнішою за рахунок продуктивності. Розмір цього буфера можна встановити в 0, і потік відправляє програми буде змушений чекати, поки передача повідомлення брокеру не буде завершена, таким чином:

RecordMetadata metadata = producer.send(record).get();

Ще раз про читання повідомлень

Читання повідомлень має додаткові складнощі, про які потрібно поміркувати. На відміну від API JMS, який може запускати слухача повідомлень (message listener) у відповідь надходження повідомлення, інтерфейс Споживач Kafka лише опитує (polling). Давайте детальніше розглянемо метод poll (), що використовується для цієї мети:

ConsumerRecords < K, V > poll(long timeout);

Значення методу, що повертається, — це контейнерна структура, що містить кілька об'єктів ConsumerRecord з потенційно кількох партицій. ConsumerRecord сам по собі є об'єктом-холдером для пари ключ-значення з відповідними метаданими, такими як партиція, з якої він отриманий.

Як було обговорено в Розділі 2, ми повинні постійно пам'ятати, що відбувається з повідомленнями після їх успішної або неуспішної обробки, наприклад, якщо клієнт не може обробити повідомлення або якщо він перериває роботу. У JMS це оброблялося через режим підтвердження (acknowledgement mode). Брокер або видаляє успішно оброблене повідомлення, або повторно доставить необроблене або зафейлене (за умови використання транзакції).
Kafka працює зовсім по-іншому. Повідомлення не видаляються в брокері після вичитування і відповідальність за те, що відбувається при збої, лежить на коді, що вичитує.

Як ми вже говорили, група консюмерів пов'язана зі зміщенням у журналі. Позиція в журналі, пов'язана з цим усуненням, відповідає наступному повідомленню, яке буде видано у відповідь poll (). Вирішальне значення під час читання має час, коли це зсув збільшується.

Повертаючись до моделі читання, розглянутої раніше, обробка повідомлення складається із трьох етапів:

  1. Вийняти повідомлення для читання.
  2. Обробити повідомлення.
  3. Підтвердити повідомлення.

Консюмер Kafka поставляється з опцією конфігурації enable.auto.commit. Це часто використовується стандартне налаштування, як це зазвичай буває з налаштуваннями, що містять слово «авто».

До Kafka 0.10 клієнт, який використовував цей параметр, відправляв усунення останнього прочитаного повідомлення під час наступного виклику poll () після обробки. Це означало, що будь-які повідомлення, які вже були вилучені (fetched), могли бути повторно опрацьовані, якщо клієнт їх вже обробив, але був несподівано знищений перед викликом poll (). Оскільки брокер не зберігає жодного стану щодо того, скільки разів повідомлення було прочитано, наступний консюмер, який витягує це повідомлення, не знатиме, що сталося щось погане. Ця поведінка була псевдо-транзакційною. Зміщення комітілося тільки у разі успішної обробки повідомлення, але якщо клієнт переривав роботу, брокер знову надсилав те саме повідомлення іншому клієнту. Така поведінка відповідала гарантії доставки повідомлень.принаймні один раз".

У Kafka 0.10 код клієнта був змінений таким чином, що коміт став періодично запускатися бібліотекою клієнта відповідно до налаштування auto.commit.interval.ms. Ця поведінка знаходиться десь між режимами JMS AUTO_ACKNOWLEDGE та DUPS_OK_ACKNOWLEDGE. При використанні автокомміта повідомлення могли бути підтверджені незалежно від того, чи вони фактично були оброблені — це могло статися у разі повільного консюмера. Якщо консюмер переривав роботу, повідомлення витягувалися наступним консюмером, починаючи із закомміченої позиції, що могло призвести до пропуску повідомлення. У цьому випадку Kafka не втрачала повідомлення, код просто не обробляв їх.

Цей режим має ті ж перспективи, що й у версії 0.9: повідомлення можуть бути оброблені, але у разі збою, усунення може бути не закоммічене, що потенційно може призвести до подвоєння доставки. Чим більше повідомлень ви отримуєте під час виконання poll ()тим більше ця проблема.

Як обговорювалося в розділі «Читання повідомлень з черги» на стор. 21, у системі обміну повідомленнями немає такого поняття, як одноразова доставка повідомлення, якщо взяти до уваги режими збоїв.

У Kafka є два способи зафіксувати (закоммітити) зміщення (офсет): автоматично та вручну. В обох випадках повідомлення можуть оброблятися кілька разів, якщо повідомлення було оброблено, але стався збій до комміту. Ви також можете взагалі не обробляти повідомлення, якщо коміт відбувся у фоні і ваш код був завершений до того, як він приступив до обробки (можливо в Kafka 0.9 і раніше версіях).

Керувати процесом комміту зміщення вручну можна в API консюмера Kafka, встановивши параметр enable.auto.commit у значення false і явно викликавши один із таких методів:

void commitSync();
void commitAsync();

Якщо ви прагнете обробити повідомлення «хоч би один раз», ви повинні закоммітити зміщення вручну за допомогою commitSync()Виконавши цю команду відразу після обробки повідомлень.

Ці методи не дозволяють підтверджувати (acknowledged) повідомлення до того, як вони будуть оброблені, але вони нічого не роблять для усунення потенційного задваювання обробки, водночас створюючи видимість транзакційності. У Kafka відсутні транзакції. У клієнта немає можливості зробити таке:

  • Автоматично відкотити (roll back) зафейлене повідомлення. Консюмери самі повинні обробляти винятки, що виникають через проблемні пейлоади та відключення бекенда, оскільки вони не можуть покладатися на повторну доставку повідомлень брокером.
  • Надіслати повідомлення в кілька топіків в рамках однієї атомарної операції. Як ми скоро побачимо, контроль над різними топиками та партіціями може перебувати на різних машинах у кластері Kafka, які не координують транзакції під час відправлення. На момент написання цієї статті було виконано певну роботу, щоб зробити це можливим за допомогою KIP-98.
  • Зв'язати читання одного повідомлення з одного топіка з відправкою іншого повідомлення до іншого топіка. Знову ж таки, архітектура Kafka залежить від безлічі незалежних машин, що працюють як одна шина і не робиться жодних спроб приховати це. Наприклад, немає компонентів API, які б зв'язати Консюмер и продюсер у транзакції. У JMS це забезпечується об'єктом Session, з якого створюються MessageProducers и MessageConsumers.

Якщо ми не можемо покладатися на транзакції, як ми можемо забезпечити семантику, ближчу до тієї, яку надають традиційні системи обміну повідомленнями?

Якщо існує ймовірність того, що зсув консюмера може збільшитися до того, як повідомлення було оброблено, наприклад, під час збою консюмера, то консюмер не має способу дізнатися, чи пропустила його група консюмерів повідомлення, коли їй призначають партицію. Таким чином, одна із стратегій полягає у перемотуванні (rewind) зміщення на попередню позицію. API консюмера Kafka надає такі методи для цього:

void seek(TopicPartition partition, long offset);
void seekToBeginning(Collection < TopicPartition > partitions);

метод seek() може використовуватися з методом
offsetsForTimes (Map timestampsToSearch) для перемотування в стан у певний момент у минулому.

Неявно, використання цього підходу означає, що, можливо, деякі повідомлення, які були оброблені раніше, будуть прочитані і оброблені заново. Щоб уникнути цього, ми можемо використовувати ідемпотентне читання, як описано в Розділі 4, для відстеження раніше переглянутих повідомлень та виключення дублікатів.

Як альтернатива, код вашого консюмера може бути простим, якщо допустима втрата або дублювання повідомлень. Коли ми розглядаємо сценарії використання, для яких зазвичай використовується Kafka, наприклад, обробка подій логів, метрик, відстеження кліків і т.д., ми розуміємо, що втрата окремих повідомлень навряд чи вплине на навколишні програми. У разі значення за замовчуванням цілком припустимі. З іншого боку, якщо вашому додатку потрібно передавати платежі, ви повинні ретельно дбати про кожне окреме повідомлення. Все зводиться до контексту.

Особисті спостереження показують, що зі зростанням інтенсивності повідомлень цінність кожного окремого повідомлення знижується. Повідомлення великого обсягу стають зазвичай цінними, якщо їх розглядати в агрегованій формі.

Висока доступність (High Availability)

Підхід Kafka щодо високої доступності значно відрізняється від підходу ActiveMQ. Kafka розроблена на базі кластерів, що горизонтально масштабуються, в яких всі екземпляри брокера приймають і роздають повідомлення одночасно.

Кластер Kafka складається з кількох екземплярів брокера, що працюють на різних серверах. Kafka була розроблена для роботи на звичайному автономному залізі, де кожен вузол має власне виділене сховище. Використання мережевих сховищ (SAN) не рекомендується, оскільки множинні обчислювальні вузли можуть конкурувати за часомЫе інтервали сховища та створювати конфлікти.

Kafka - це постійно включена система. Багато великих користувачів Kafka ніколи не гасять свої кластери і програмне забезпечення завжди забезпечує оновлення шляхом послідовного рестарту. Це досягається за рахунок гарантування сумісності з попередньою версією для повідомлень та взаємодій між брокерами.

Брокери підключені до кластера серверів ZooKeeper, що діє як реєстр конфігураційних даний і використовується для координації ролей кожного брокера. ZooKeeper сам є розподіленою системою, яка забезпечує високу доступність за допомогою реплікації інформації шляхом встановлення кворуму.

У базовому випадку топік створюється в кластері Kafka з наступними властивостями:

  • Кількість партицій. Як було обговорено раніше, точне значення, що використовується тут, залежить від бажаного рівня паралельного читання.
  • Коефіцієнт (фактор) реплікації визначає, скільки екземплярів брокера в кластері повинні містити журнали для цієї партиції.

Використовуючи ZooKeepers для координації, Kafka намагається справедливо розподілити нові партиції між брокерами у кластері. Це робиться одним екземпляром, який виконує роль Контролера.

У рантаймі для кожної партиції топіка контролер призначає брокеру ролі лідера (leader, master, ведучого) та послідовників (followers, slaves, підлеглих). Брокер, який виступає як лідер для даної партиції, відповідає за прийом усіх повідомлень, надісланих йому продюсерами, і поширення повідомлень по консюмерах. При надсиланні повідомлень у партицію топіка вони реплікуються на всі вузли брокера, які є послідовниками для цієї партиції. Кожен вузол, що містить журнали для партиції, називається реплікою. Брокер може виступати як лідер для одних партицій і як послідовник для інших.

Послідовник, що містить усі повідомлення, що зберігаються у лідера, називається синхронізованою реплікою (Реплікою, що знаходиться в синхронізованому стані, in-sync replica). Якщо брокер, який виступає як лідер для партиції, відключається, будь-який брокер, який знаходиться в актуалізованому або синхронізованому стані для цієї партиції, може взяти на себе роль лідера. Це надзвичайно стійкий дизайн.

Частиною конфігурації продюсера є параметр acks, Який визначає, скільки реплік має підтвердити (acknowledge) отримання повідомлення, перш ніж потік програми продовжить відправку: 0, 1 або всі. Якщо встановлено значення всі, то при отриманні повідомлення лідер відправить підтвердження (confirmation) назад продюсеру, як тільки отримає підтвердження (acknowledgements) запису від кількох реплік (включаючи саму себе), визначених налаштуванням топіка min.insync.replicas (за умовчанням 1). Якщо повідомлення не може бути успішно репліковано, продюсер викличе виняток для програми (NotEnoughReplicas або NotEnoughReplicasAfterAppend).

У типовій конфігурації створюється топік з коефіцієнтом реплікації 3 (1 лідер, 2 послідовники для кожної партиції) і параметр min.insync.replicas встановлюється значення 2. У цьому випадку кластер буде допускати, щоб один з брокерів, що управляють партицією топіка, міг відключатися без впливу на клієнтські додатки.

Це повертає нас до вже знайомого компромісу між продуктивністю та надійністю. Реплікація відбувається за рахунок додаткового часу очікування підтверджень (acknowledgments) від послідовників. Хоча, оскільки вона виконується паралельно, реплікація як мінімум на три вузли має таку ж продуктивність, як і на два (ігноруючи збільшення використання пропускної спроможності мережі).

Використовуючи цю схему реплікації, Kafka спритно уникає необхідності забезпечувати фізичний запис кожного повідомлення на диск за допомогою операції sync(). Кожне повідомлення, надіслане продюсером, буде записано в журнал партиції, але, як обговорювалося в Розділі 2, запис файл спочатку виконується в буфер операційної системи. Якщо це повідомлення репліковане на інший екземпляр Kafka і знаходиться в його пам'яті, втрата лідера не означає, що саме повідомлення було втрачено, його може взяти на себе синхронізована репліка.
Відмова від необхідності виконувати операцію sync() означає, що Kafka може приймати повідомлення зі швидкістю, з якою може записувати їх у пам'ять. І навпаки, що довше можна уникнути скидання (flushing) пам'яті на диск, то краще. З цієї причини трапляються випадки, коли брокерам Kafka виділяється 64 Гб пам'яті або більше. Таке використання пам'яті означає, що один екземпляр Kafka може легко працювати на швидкостях багато тисяч разів швидше, ніж традиційний брокер повідомлень.

Kafka також можна налаштувати для застосування операції sync() до пакетів повідомлень. Оскільки все в Kafka орієнтоване на роботу з пакетами, це насправді працює досить добре для багатьох сценаріїв використання та є корисним інструментом для користувачів, які потребують дуже сильних гарантій. Більшість чистої продуктивності Kafka пов'язана з повідомленнями, які відправляються брокеру у вигляді пакетів, і з тим, що ці повідомлення зчитуються з брокера послідовними блоками за допомогою нульовий примірник операцій (операціями, у ході яких виконується завдання копіювання даних із однієї області пам'яті до іншої). Останнє є великим виграшем з точки зору продуктивності та ресурсів і можливе лише завдяки використанню лежачої в основі структури даних журналу, що визначає схему партиції.

У кластері Kafka можлива набагато вища продуктивність, ніж при використанні одного брокера Kafka, оскільки партиції топіки можуть горизонтально масштабуватися на безлічі окремих машин.

Підсумки

У цьому розділі ми розглянули, як архітектура Kafka переосмислює відносини між клієнтами та брокерами, щоб забезпечити неймовірно стійкий конвеєр обміну повідомленнями, з пропускною спроможністю у багато разів більшою, ніж у звичайного брокера повідомлень. Ми обговорили функціональність, яку вона використовує для досягнення цієї мети, та коротко розглянули архітектуру додатків, що забезпечують цю функціональність. У наступному розділі ми розглянемо спільні проблеми, які необхідно вирішувати програмам на основі обміну повідомленнями, та обговоримо стратегії їх вирішення. Ми завершимо розділ, окресливши, як міркувати про технології обміну повідомленнями в цілому, щоб ви могли оцінити їхню придатність для ваших сценаріїв використання.

Попередня перекладена частина: Розуміння брокерів повідомлень. Вивчення механіки обміну повідомленнями за допомогою ActiveMQ та Kafka. Глава 1

Переклад виконано: tele.gg/middle_java

Далі буде ...

Тільки зареєстровані користувачі можуть брати участь в опитуванні. Увійдіть, будь ласка.

Чи використовується Kafka у вашій організації?

  • Так

  • Ні

  • Раніше використовувалася, зараз ні

  • Плануємо використати

Проголосували 38 користувачів. Утрималися 8 користувачів.

Джерело: habr.com

Додати коментар або відгук