Як Kafka стала буллю

Як Kafka стала буллю

Привіт, Хабре!

Я працюю у команді Tinkoff, яка займається розробкою власного центру нотифікацій. Здебільшого я розробляю на Java з використанням Spring boot та вирішую різні технічні проблеми, що виникають у проекті.

Більшість наших мікросервісів асинхронно взаємодіють між собою через брокер повідомлень. Раніше як брокер ми використовували IBM MQ, який перестав справлятися з навантаженням, але при цьому мав високі гарантії доставки.

Як заміну нам запропонували Apache Kafka, яка має високий потенціал масштабування, але, на жаль, вимагає практично індивідуального підходу до конфігурування для різних сценаріїв. Крім того, механізм at least once delivery, який працює в Kafka за умовчанням, не дозволяв підтримувати необхідний рівень консистентності з коробки. Далі я поділюся нашим досвідом конфігурації Kafka, зокрема розповім, як налаштувати та жити з exactly once delivery.

Гарантована доставка і не лише

Параметри, про які йтиметься далі, допоможуть запобігти ряду проблем з налаштуваннями підключення за замовчуванням. Але спочатку хочеться приділити увагу одному параметру, що полегшить можливий дебаг.

У цьому допоможе client.id для Producer та Consumer. На перший погляд, як значення можна використовувати ім'я програми, і в більшості випадків це працюватиме. Хоча ситуація, коли у додатку використовується кілька Consumer'ів і ви задаєте їм однаковий client.id, призводить до наступного попередження:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Якщо ви хочете використовувати JMX у додатку з Kafka, це може бути проблемою. Для цього випадку найкраще використовувати як значення client.id комбінацію з імені програми та, наприклад, імені топіка. Результат нашої конфігурації можна переглянути у виведенні команди kafka-consumer-groups з утиліт від Confluent:

Як Kafka стала буллю

Тепер розберемо сценарій гарантованої доставки повідомлення. Kafka Producer має параметр acks, який дозволяє налаштовувати, після скількох об'єднань лідера кластера необхідно вважати повідомлення успішно записаним. Цей параметр може приймати такі значення:

  • 0 - acknowledge не будуть вважатися.
  • 1 — параметр за замовчуванням, необхідний acknowledge тільки від 1 репліки.
  • −1 — необхідні acknowledge від усіх синхронізованих реплік (налаштування кластера min.insync.replicas).

З перелічених значень видно, що acks рівний −1 дає найсильніші гарантії, що повідомлення не загубиться.

Як ми знаємо, розподілені системи ненадійні. Щоб захиститись від тимчасових несправностей, Kafka Producer надає параметр повторити спроби, який дозволяє задавати кількість спроб повторного відправлення протягом delivery.timeout.ms. Оскільки параметр retries має значення за замовчуванням Integer.MAX_VALUE (2147483647), кількість повторних надсилань повідомлення можна регулювати, змінюючи лише delivery.timeout.ms.

Рухаємося до exactly once delivery

Перераховані налаштування дозволяють нашому Producer'у доставляти повідомлення з високою гарантією. Давайте поговоримо про те, як гарантувати запис тільки однієї копії повідомлення в Kafka-топік? У найпростішому випадку для цього на Producer необхідно встановити параметр enable.idempotence значення true. Ідемпотентність гарантує запис лише одного повідомлення до конкретної партиції одного топіка. Попередньою умовою для включення ідемпотентності є значення acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. Якщо ці параметри не задані розробником, то автоматично буде виставлено вказані вище значення.

Коли ідемпотентність налаштована, необхідно домогтися того, щоб однакові повідомлення потрапляли щоразу в ті самі партиції. Це можна зробити, настроюючи ключ та параметр partitioner.class на Producer. Почнемо з ключа. Для кожного відправлення він має бути однаковим. Цього легко досягти, використовуючи будь-який бізнес-ідентифікатор із оригінального повідомлення. Параметр partitioner.class має значення за замовчуванням. DefaultPartitioner. За цієї стратегії партиціонування за умовчанням діємо так:

  • Якщо партиція явно вказана під час надсилання повідомлення, використовуємо її.
  • Якщо партиція не вказана, але вказаний ключ – вибираємо хешу від ключа.
  • Якщо партиція і ключ не вказані, вибираємо партиції по черзі (round-robin).

Крім того, використання ключа та ідемпотентної відправки з параметром max.in.flight.requests.per.connection = 1 дає вам впорядковану обробку повідомлень на Consumer. Окремо варто пам'ятати, що, якщо на вашому кластері налаштовано керування доступом, то вам знадобляться права на ідемпотентний запис у топік.

Якщо раптом вам не вистачає можливостей ідемпотентної відправки по ключу або логіка на боці Producer вимагає збереження консистентності даних між різними партіціями, то на допомогу прийдуть транзакції. Крім того, за допомогою ланцюгової транзакції можна умовно синхронізувати запис Kafka, наприклад, із записом в БД. Для включення транзакційної відправки на Producer необхідно, щоб він мав ідемпотентність, і додатково задати transactional.id. Якщо на вашому Kafka-кластері налаштовано керування доступом, то для транзакційного запису, як і для ідемпотентного, знадобляться права на запис, які можуть бути надані маскою з використанням значення, що зберігається в transactional.id.

Формально як ідентифікатор транзакції можна використовувати будь-який рядок, наприклад ім'я програми. Але якщо ви запускаєте кілька інстансів однієї програми з однаковим transactional.id, то перший запущений інстанс буде зупинено з помилкою, оскільки Kafka вважатиме його зомбі-процесом.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Щоб вирішити цю проблему, ми додаємо до імені програми суфікс як ім'я хоста, який отримуємо зі змінних оточення.

Producer налаштований, але транзакції на Kafka керують лише областю видимості повідомлення. Незалежно від статусу транзакції, повідомлення відразу потрапляє в топік, але має додаткові системні атрибути.

Щоб такі повідомлення не зчитувалися Consumer'ом раніше, йому необхідно встановити параметр isolation.level значення read_committed. Такий Consumer зможе читати нетранзакційні повідомлення як і раніше, а транзакційні лише після комміту.
Якщо ви встановили всі наведені раніше налаштування, то ви налаштували exactly once delivery. Вітаю!

Але є ще один аспект. Transactional.id, який ми налаштовували вище, є насправді префіксом транзакції. На менеджері транзакцій до нього дописується порядковий номер. Отриманий ідентифікатор видається на transactional.id.expiration.ms, який конфігурується на Kafka кластері і має значення за умовчанням «7 днів». Якщо за цей час програма не отримувала жодних повідомлень, то при спробі наступного транзакційного відправлення ви отримаєте InvalidPidMappingException. Після цього координатор транзакцій видасть новий порядковий номер наступної транзакції. При цьому повідомлення може бути втрачено, якщо InvalidPidMappingException не буде правильно оброблено.

замість підсумків

Як можна помітити, недостатньо просто надсилати повідомлення до Kafka. Потрібно вибирати комбінацію параметрів та бути готовим до внесення швидких змін. У цій статті я постарався детально показати налаштування exactly once delivery і описав кілька проблем конфігурацій client.id і transactional.id, з якими ми зіткнулися. Нижче наведено налаштування Producer і Consumer.

Виробник:

  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 — для впорядкованого відправлення)
  5. transactional.id = ${application-name}-${hostname}

Споживач:

  1. isolation.level = read_committed

Щоб мінімізувати помилки в майбутніх додатках, ми зробили свою обгортку над конфігурацією spring, де вже задані значення для деяких з перерахованих параметрів.

А ось пара матеріалів для самостійного вивчення:

Джерело: habr.com

Додати коментар або відгук