Повторна обробка подій, отриманих з Kafka

Повторна обробка подій, отриманих з Kafka

Привіт Хабр.

Нещодавно я поділився досвідом про те, які параметри ми в команді найчастіше використовуємо для Kafka Producer та Consumer, щоб наблизитись до гарантованої доставки. У цій статті хочу розповісти, як ми організували повторну обробку події, отриманої з Kafka, внаслідок тимчасової недоступності зовнішньої системи.

Сучасні програми працюють у дуже складному середовищі. Бізнес-логіка, обернена в сучасний технологічний стек, що працює в Docker-образі, який управляється оркестратором на кшталт Kubernetes або OpenShift, і комунікує з іншими додатками або enterprise-рішеннями через ланцюжок фізичних та віртуальних маршрутизаторів. У такому оточенні завжди щось може зламатися, тому повторне опрацювання подій у разі недоступності однієї із зовнішніх систем — важлива частина наших бізнес-процесів.

Як було до Kafka

Раніше у проекті ми використовували IBM MQ для асинхронної доставки повідомлень. У разі виникнення будь-якої помилки в процесі роботи сервісу отримане повідомлення могло бути поміщене в dead-letter-queue (DLQ) для подальшого ручного розбору. DLQ створювався поруч із вхідною чергою, перекладання повідомлення відбувалося всередині IBM MQ.

Якщо помилка мала тимчасовий характер і ми могли це визначити (наприклад, ResourceAccessException при HTTP-дзвінку або MongoTimeoutException при запиті до MongoDb), то набула чинності стратегія повторних викликів. Незалежно від розгалуження логіки програми, вихідне повідомлення перекладалося або в системну чергу для відкладеного відправлення, або в окрему програму, яка колись давно була зроблена для повторного надсилання повідомлень. При цьому в заголовок повідомлення записується номер повторного надсилання, який прив'язаний до інтервалу затримки або до кінця стратегії на рівні програми. Якщо ми досягли кінця стратегії, але зовнішня система все ще недоступна, повідомлення буде поміщене в DLQ для ручного розбору.

Пошук рішення

Пошукавши в інтернеті, можна знайти наступне рішення. Якщо коротко, то пропонується завести по топіку на кожен інтервал затримки та реалізувати на стороні програми Consumer'и, які вичитуватимуть повідомлення з необхідною затримкою.

Повторна обробка подій, отриманих з Kafka

Незважаючи на велику кількість позитивних відгуків, воно здається мені не зовсім вдалим. Насамперед тому, що розробнику, окрім реалізації бізнес-вимог, доведеться витратити багато часу на реалізацію описаного механізму.

Крім того, якщо на Kafka-кластері включено управління доступами, доведеться витратити якийсь час на заклад топиків і забезпечення необхідних доступів до них. На додаток до цього потрібно буде підбирати правильний параметр retention.ms для кожного з ретрай-топиків, щоб повідомлення встигали повторно відправлятися та не пропадали з нього. Реалізацію та запит доступів доведеться повторювати для кожного існуючого чи нового сервісу.

Давайте тепер подивимося, які механізми для повторної обробки повідомлення надає нам spring загалом і spring-kafka зокрема. Spring-kafka має транзитивну залежність на spring-retry, яка надає абстракції для керування різними BackOffPolicy. Це досить гнучкий інструмент, але його значним недоліком є ​​зберігання повідомлень для повторного надсилання в пам'яті програми. Це означає, що перезапуск програми через оновлення або помилку під час експлуатації призведе до втрати всіх повідомлень, які очікують повторної обробки. Оскільки цей пункт критичний нашій системі, ми почали розглядати його далі.

Сама spring-kafka надає кілька реалізацій ContainerAwareErrorHandler, наприклад SeekToCurrentErrorHandler, за допомогою якого можна, не зміщуючи offset у разі виникнення помилки, обробити повідомлення пізніше. Починаючи з версії spring-kafka 2.3, з'явилася можливість задавати BackOffPolicy.

Цей підхід дозволяє повторно оброблюваним повідомленням переживати рестарт програми, але механізм DLQ, як і раніше, відсутній. Саме цей варіант ми вибрали на початку 2019 року, оптимістично вважаючи, що DLQ не знадобиться (нам пощастило і він дійсно не знадобився за кілька місяців експлуатації програми з такою системою повторної обробки). Тимчасові помилки призводили до спрацьовування SeekToCurrentErrorHandler. Інші помилки друкувалися в балку, приводили до зміщення offset, і обробка тривала з наступним повідомленням.

Підсумкове рішення

Реалізація, заснована на SeekToCurrentErrorHandler, спонукала нас до розробки власного механізму для повторного надсилання повідомлень.

Насамперед ми хотіли використати вже наявний досвід та розширити його залежно від логіки програми. Для застосування з лінійною логікою оптимальним було б припинити зчитування нових повідомлень протягом невеликого проміжку часу, заданого в рамках стратегії повторних викликів. Для інших додатків хотілося мати єдину точку, яка забезпечувала б виконання стратегії повторних викликів. На додаток ця єдина точка повинна мати функціональність DLQ для обох підходів.

Сама стратегія повторних дзвінків повинна зберігатися у додатку, який відповідає за отримання наступного інтервалу у разі виникнення тимчасової помилки.

Зупинка Consumer'a для застосування з лінійною логікою

При роботі з spring-kafka код для зупинки Consumer'a може виглядати приблизно так:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

У прикладі retryAt це час, коли потрібно заново запустити MessageListenerContainer, якщо він ще працює. Повторний запуск відбудеться в окремому потоці, запущеному TaskScheduler, реалізацію якого теж надає spring.

Значення retryAt ми знаходимо в такий спосіб:

  1. Шукається значення лічильника повторних дзвінків.
  2. Відповідно до значення лічильника шукається поточний інтервал затримки у стратегії повторних дзвінків. Стратегія оголошується в самій програмі, для її зберігання ми вибрали формат JSON.
  3. Знайдений в JSON-масиві інтервал містить кількість секунд, через яке необхідно буде повторити обробку. Ця кількість секунд додається до поточного часу, утворюючи значення для retryAt.
  4. Якщо інтервал не знайдено, значення retryAt дорівнює null і повідомлення відправиться в DLQ для ручного розбору.

При такому підході залишається лише зберегти кількість повторних дзвінків для кожного повідомлення, яке зараз перебуває на обробці, наприклад у пам'яті програми. Збереження лічильника спроб у пам'яті не є критичним для цього підходу, оскільки додаток з лінійною логікою не може виконувати обробку в цілому. На відміну від spring-retry, перезапуск програми призведе не до втрати всіх повідомлень для повторної обробки, а просто до перезапуску стратегії.

Цей підхід допомагає зняти навантаження із зовнішньої системи, яка може бути недоступна через дуже велике навантаження. Іншими словами, на додаток до повторної обробки ми домоглися реалізації патерну. автоматичний вимикач.

У нашому випадку поріг помилки дорівнює всього 1, а щоб мінімізувати простий системи через тимчасовий мережевий перебій, ми використовуємо дуже гранулярну стратегію повторних викликів з невеликими інтервалами затримки. Це може не підійти для всіх програм групи компаній, тому співвідношення між порогом помилки і величиною інтервалу потрібно підбирати, спираючись на особливості системи.

Окрема програма для обробки повідомлень від програм з недетермінованою логікою

Ось приклад коду, що відправляє повідомлення в таку програму (Retryer), яка виконає повторне відправлення в топік DESTINATION при досягненні часу RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

З прикладу видно, що багато інформації передається у хедерах. Значення RETRY_AT перебуває так само, як і механізму повтору через зупинку Consumer'a. Крім DESTINATION та RETRY_AT ми передаємо:

  • GROUP_ID, за яким групуємо повідомлення для ручного аналізу та спрощення пошуку.
  • ORIGINAL_PARTITION спробувати зберегти той же Consumer для повторної обробки. Цей параметр може дорівнювати null, у такому разі нова partition буде отримана за ключом record.key() оригінального повідомлення.
  • Оновлене значення COUNTER слідувати за стратегією повторних викликів.
  • SEND_TO — константа, яка показує, чи надіслати повідомлення на повторну обробку після досягнення RETRY_AT або помістити в DLQ.
  • REASON — причина, через яку обробку повідомлення було перервано.

Retryer зберігає повідомлення для повторного відправлення та ручного розбору в PostgreSQL. За таймером запускається завдання, що знаходить повідомлення з RETRY_AT, що настав, і відправляє їх назад до партиції ORIGINAL_PARTITION топіка DESTINATION з ключем record.key().

Після надсилання повідомлення видаляються з PostgreSQL. Ручний розбір повідомлень відбувається у простому UI, який взаємодіє з Retryer за REST API. Основними його особливостями є перенаправлення або видалення повідомлень з DLQ, перегляд інформації про помилку та пошук повідомлень, наприклад, по імені помилки.

Так як на наших кластерах включено керування доступом, необхідно додатково запитувати доступи до топіка, який слухає Retryer, і дати можливість Retryer'у писати в DESTINATION топік. Це незручно, але, на відміну від підходу з топіком на інтервал, у нас з'являється повноцінна DLQ та UI для керування нею.

Бувають випадки, коли вхідний топік читають кілька різних consumer-груп, додатки яких реалізують різну логіку. Повторне оброблення повідомлення через Retryer для одного з таких програм призведе до дублікату на іншому. Щоб захиститися від цього, ми заводимо окремий топік для повторної обробки. Вхідний та retry-топік може читати один і той же Consumer без будь-яких обмежень.

Повторна обробка подій, отриманих з Kafka

За замовчуванням цей підхід не надає можливості circuit breaker'a, проте його можна додати до програми за допомогою spring-cloud-netflix або нового spring cloud circuit breaker, обернувши місця викликів зовнішніх сервісів у відповідні абстракції. Крім того, з'являється можливість вибору стратегії для перегородка патерну, що теж може бути корисним. Наприклад, у spring-cloud-netflix це може бути thread pool або семафор.

Висновок

В результаті у нас вийшла окрема програма, яка дозволяє повторити обробку повідомлення при тимчасовій недоступності будь-якої зовнішньої системи.

Однією з головних переваг програми є те, що ним можуть користуватися зовнішні системи, що працюють на тому ж Kafka-кластері, без значних доробок на своїй стороні! Такому додатку необхідно буде лише отримати доступ до retry-топіка, заповнити кілька Kafka-заголовків і надіслати повідомлення Retryer. Не потрібно піднімати жодної додаткової інфраструктури. А щоб зменшити кількість повідомлень, що перекладаються з програми в Retryer і назад, ми виділили програми з лінійною логікою і зробили в них повторну обробку через зупинку Consumer.

Джерело: habr.com

Додати коментар або відгук