Паўторная апрацоўка падзей, атрыманых з Kafka

Паўторная апрацоўка падзей, атрыманых з Kafka

Прывітанне, Хабр.

Нядаўна я падзяліўся вопытам пра тое, якія параметры мы ў камандзе часцей за ўсё выкарыстоўваем для Kafka Producer і Consumer, каб наблізіцца да гарантаванай дастаўкі. У гэтым артыкуле хачу расказаць, як мы арганізавалі паўторную апрацоўку падзеі, атрыманай з Kafka, у выніку часовай недаступнасці знешняй сістэмы.

Сучасныя прыкладанні працуюць у вельмі складаным асяроддзі. Бізнес-логіка, абгорнутая ў сучасны тэхналагічны стэк, якая працуе ў Docker-вобразе, які кіруецца аркестратарам накшталт Kubernetes або OpenShift, і камунікуючая з іншымі праграмамі або enterprise-рашэннямі праз ланцужок фізічных і віртуальных маршрутызатараў. У такім асяроддзі заўсёды нешта можа зламацца, таму паўторная апрацоўка падзей у выпадку недаступнасці адной са знешніх сістэм - важная частка нашых бізнес-працэсаў.

Як было да Kafka

Раней у праекце мы выкарыстоўвалі IBM MQ для асінхроннай дастаўкі паведамленняў. Пры ўзнікненні якой-небудзь памылкі падчас працы сэрвісу атрыманае паведамленне магло быць змешчана ў dead-letter-queue (DLQ) для наступнага ручнога разбору. DLQ ствараўся побач з уваходнай чаргой, перакладанне паведамлення адбывалася ўсярэдзіне IBM MQ.

Калі памылка мела часовы характар ​​і мы маглі гэта вызначыць (напрыклад, ResourceAccessException пры HTTP-выкліку ці MongoTimeoutException пры запыце ў MongoDb), то ў сілу ўступала стратэгія паўторных выклікаў. Незалежна ад галінавання логікі прыкладання, зыходнае паведамленне перакладалася ці ў сістэмную чаргу для адкладзенай адпраўкі, ці ў асобнае прыкладанне, якое калісьці даўно было зроблена для паўторнай адпраўкі паведамленняў. Пры гэтым у загаловак паведамлення запісваецца нумар паўторнай адпраўкі, які прывязаны да інтэрвалу затрымкі ці да канца стратэгіі на ўзроўні прыкладання. Калі мы дасягнулі канца стратэгіі, але знешняя сістэма ўсё яшчэ недаступная, тое паведамленне будзе змешчана ў DLQ для ручнога разбору.

пошук рашэння

Пашукаўшы ў інтэрнэце, можна знайсці наступнае рашэнне. Калі сцісла, то прапануецца завесці па топіку на кожны інтэрвал затрымкі і рэалізаваць на баку прыкладання Consumer'ы, якія будуць вычытваць паведамленні з неабходнай затрымкай.

Паўторная апрацоўка падзей, атрыманых з Kafka

Нягледзячы на ​​вялікую колькасць станоўчых водгукаў, яно здаецца мне не зусім удалым. У першую чаргу таму, што распрацоўшчыку, акрамя рэалізацыі бізнес-патрабаванняў, давядзецца патраціць шмат часу на рэалізацыю апісанага механізму.

Акрамя таго, калі на Kafka-кластары ўключана кіраванне доступамі, то прыйдзецца выдаткаваць нейкі час на ўстанову топікаў і забеспячэнне неабходных доступаў да іх. У дадатак да гэтага трэба будзе падбіраць правільны параметр retention.ms для кожнага з рэтрай-топікаў, каб паведамленні паспявалі паўторна адпраўляцца і не знікалі з яго. Рэалізацыю і запыт доступаў давядзецца паўтараць для кожнага існуючага або новага сэрвісу.

Давайце зараз паглядзім, якія механізмы для паўторнай апрацоўкі паведамлення дае нам spring у цэлым і spring-kafka у прыватнасці. Spring-kafka мае транзітыўную залежнасць на spring-retry, які дае абстракцыі для кіравання рознымі BackOffPolicy. Гэта даволі гнуткая прылада, але яго значным недахопам з'яўляецца захоўванне паведамленняў для паўторнай адпраўкі ў памяці прыкладання. Гэта значыць, што перазапуск прыкладання з-за абнаўлення ці памылкі падчас эксплуатацыі прывядзе да страты ўсіх паведамленняў, якія чакаюць паўторнай апрацоўкі. Бо гэты пункт крытычны для нашай сістэмы, мы не сталі разглядаць яго далей.

Сама spring-kafka дае некалькі рэалізацый ContainerAwareErrorHandler, напрыклад SeekToCurrentErrorHandler, з дапамогай якога можна, не ссоўваючы offset у выпадку ўзнікнення памылкі, апрацаваць паведамленне пазней. Пачынальна з версіі spring-kafka 2.3 з'явілася магчымасць задаваць BackOffPolicy.

Гэты падыход дазваляе паўторна апрацоўваным паведамленням перажываць рэстарт прыкладання, але механізм DLQ па-ранейшаму адсутнічае. Менавіта гэты варыянт мы абралі ў пачатку 2019 года, аптымістычна мяркуючы, што DLQ не спатрэбіцца (нам пашанцавала і ён сапраўды не спатрэбіўся за некалькі месяцаў эксплуатацыі дадатку з такой сістэмай паўторнай апрацоўкі). Часавыя памылкі прыводзілі да спрацоўвання SeekToCurrentErrorHandler. Астатнія памылкі друкаваліся ў лог, прыводзілі да зрушэння offset, і апрацоўка працягвалася з наступным паведамленнем.

Выніковае рашэнне

Рэалізацыя, заснаваная на SeekToCurrentErrorHandler, падштурхнула нас да распрацоўкі ўласнага механізму для паўторнай адпраўкі паведамленняў.

Першым чынам мы жадалі выкарыстаць ужо наяўны досвед і пашырыць яго ў залежнасці ад логікі прыкладання. Для дадатку з лінейнай логікай аптымальным было б спыніць счытванне новых паведамленняў на працягу невялікага прамежку часу, зададзенага ў рамках стратэгіі паўторных выклікаў. Для астатніх прыкладанняў хацелася мець адзіную кропку, якая б забяспечвала выкананне стратэгіі паўторных выклікаў. У дадатак гэтая адзіная кропка павінна валодаць функцыянальнасцю DLQ для абодвух падыходаў.

Сама стратэгія паўторных выклікаў павінна захоўвацца ў дадатку, які адказвае за атрыманне наступнага інтэрвалу пры ўзнікненні часовай памылкі.

Прыпынак Consumer'a для прыкладання з лінейнай логікай

Пры працы з spring-kafka код для прыпынку Consumer'a можа выглядаць прыкладна так:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

У прыкладзе retryAt - гэта час, калі трэба зноўку запусціць MessageListenerContainer, калі ён яшчэ працуе. Паўторны запуск адбудзецца ў асобным струмені, запушчаным у TaskScheduler, рэалізацыю якога таксама падае spring.

Значэнне retryAt мы знаходзім наступным чынам:

  1. Шукаецца значэнне лічыльніка паўторных выклікаў.
  2. У адпаведнасці са значэннем лічыльніка шукаецца бягучы інтэрвал затрымкі ў стратэгіі паўторных выклікаў. Стратэгія аб'яўляецца ў самім дадатку, для яе захоўвання мы выбралі фармат JSON.
  3. Знойдзены ў JSON-масіве інтэрвал утрымоўвае ў сабе колькасць секунд, праз якое неабходна будзе паўтарыць апрацоўку. Гэтая колькасць секунд дадаецца да бягучага часу, утвараючы значэнне для retryAt.
  4. Калі інтэрвал не знойдзены, тое значэнне retryAt роўна null і паведамленне адправіцца ў DLQ для ручнога разбору.

Пры такім падыходзе застаецца толькі захаваць колькасць паўторных выклікаў для кожнага паведамлення, якое знаходзіцца зараз на апрацоўцы, напрыклад у памяці прыкладання. Захаванне лічыльніка спроб у памяці не крытычна для гэтага падыходу, бо дадатак з лінейнай логікай не можа выконваць апрацоўку ў цэлым. У адрозненне ад spring-retry перазапуск прыкладання прывядзе не да страты ўсіх паведамленняў для паўторнай апрацоўкі, а проста да перазапуску стратэгіі.

Гэты падыход дапамагае зняць нагрузку са знешняй сістэмы, якая можа быць недаступная з-за вельмі вялікай нагрузкі. Іншымі словамі, у дадатак да паўторнай апрацоўкі мы дамагліся рэалізацыі патэрна. выключальнік.

У нашым выпадку парог памылкі роўны ўсяго 1, а каб мінімізаваць просты сістэмы з-за часавага сеткавага перабою, мы выкарыстоўваем вельмі гранулярную стратэгію паўторных выклікаў з невялікімі інтэрваламі затрымкі. Гэта можа не падысці для ўсіх прыкладанняў групы кампаній, таму суадносіны паміж парогам памылкі і велічынёй інтэрвалу трэба падбіраць, абапіраючыся на асаблівасці сістэмы.

Асобнае прыкладанне для апрацоўкі паведамленняў ад прыкладанняў з недэтэрмінаванай логікай

Вось прыклад кода, які адпраўляе паведамленне ў такое прыкладанне (Retryer), якое выканае паўторную адпраўку ў топік DESTINATION пры дасягненні часу RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

З прыкладу бачна, што шмат інфармацыі перадаецца ў хедэрах. Значэнне RETRY_AT знаходзіцца гэтак жа, як і для механізму паўтору праз прыпынак Consumer'a. Акрамя DESTINATION і RETRY_AT мы перадаем:

  • GROUP_ID, па якім групуем паведамленні для ручнога аналізу і спрашчэнні пошуку.
  • ORIGINAL_PARTITION, каб паспрабаваць захаваць той жа Consumer для паўторнай апрацоўкі. Гэты параметр можа быць роўны null, у такім разе новая partition будзе атрымана па ключы record.key() арыгінальнага паведамлення.
  • Абноўленае значэнне COUNTER, каб прытрымлівацца стратэгіі паўторных выклікаў.
  • SEND_TO - канстанта, якая паказвае, адправіць паведамленне на паўторную апрацоўку па дасягненні RETRY_AT або змясціць у DLQ.
  • REASON - прычына, па якой апрацоўка паведамлення была перапынена.

Retryer захоўвае паведамленні для паўторнай адпраўкі і ручнога разбору ў PostgreSQL. Па таймеры запускаецца задача, якая знаходзіць паведамленні з надышоўшым RETRY_AT і адпраўляе іх зваротна ў партыцыю ORIGINAL_PARTITION топіка DESTINATION з ключом record.key().

Пасля адпраўкі паведамлення выдаляюцца з PostgreSQL. Ручны разбор паведамленняў адбываецца ў простым UI, які ўзаемадзейнічае з Retryer па REST API. Асноўнымі яго асаблівасцямі з'яўляюцца пераадпраўка ці выдаленне паведамленняў з DLQ, прагляд інфармацыі аб памылцы і пошук паведамленняў, напрыклад па імі памылкі.

Так як на нашых кластарах ўключана кіраванне доступам, неабходна дадаткова запытваць доступы да топіка, які слухае Retryer, і даць магчымасць Retryer'у пісаць у DESTINATION топік. Гэта няёмка, але, у адрозненне ад падыходу з топікам на інтэрвал, у нас з'яўляецца паўнавартасная DLQ і UI для кіравання ёю.

Бываюць выпадкі, калі ўваходны топік чытаюць некалькі розных consumer-груп, прыкладанні якіх рэалізуюць розную логіку. Паўторная апрацоўка паведамлення праз Retryer для аднаго з такіх прыкладанняў прывядзе да дубліката на іншым. Каб абараніцца ад гэтага, мы заводзім асобны топік для паўторнай апрацоўкі. Уваходны і retry-топік можа чытаць адзін і той жа Consumer без якіх-небудзь абмежаванняў.

Паўторная апрацоўка падзей, атрыманых з Kafka

Па змаўчанні гэты падыход не дае магчымасці circuit breaker'a, аднак яго можна дадаць у дадатак з дапамогай spring-cloud-netflix ці новага spring cloud circuit breaker, абгарнуўшы месцы выклікаў знешніх сэрвісаў у адпаведныя абстракцыі. Акрамя таго, з'яўляецца магчымасць выбару стратэгіі для пераборка патэрна, што таксама можа быць карысна. Напрыклад, у spring-cloud-netflix гэта можа быць thread pool ці семафор.

Выснова

У выніку ў нас атрымаўся асобны дадатак, які дазваляе паўтарыць апрацоўку паведамлення пры часовай недаступнасці якой-небудзь знешняй сістэмы.

Адным з галоўных пераваг прыкладання з'яўляецца тое, што ім могуць карыстацца знешнія сістэмы, якія працуюць на тым жа Kafka-кластары, без значных дапрацовак на сваім баку! Такому з дадаткам неабходна будзе толькі атрымаць доступ да retry-топіка, запоўніць некалькі Kafka-загалоўкаў і адправіць паведамленне ў Retryer. Ня трэба паднімаць ніякай дадатковай інфраструктуры. А каб паменшыць колькасць перакладаных паведамленняў з прыкладання ў Retryer і назад, мы вылучылі прыкладанні з лінейнай логікай і зрабілі ў іх паўторную апрацоўку праз прыпынак Consumer.

Крыніца: habr.com

Дадаць каментар