Kafkadan olingan voqealarni qayta ishlash

Kafkadan olingan voqealarni qayta ishlash

Salom, Xabr.

Yaqinda men tajribasi bilan o‘rtoqlashdi Kafka ishlab chiqaruvchisi va iste'molchisi uchun kafolatlangan yetkazib berishga yaqinlashish uchun biz jamoa sifatida qaysi parametrlardan ko'proq foydalanamiz. Ushbu maqolada men sizga tashqi tizimning vaqtincha ishlamay qolishi natijasida Kafkadan olingan voqeani qayta ishlashni qanday tashkil qilganimizni aytib bermoqchiman.

Zamonaviy ilovalar juda murakkab muhitda ishlaydi. Kubernetes yoki OpenShift kabi orkestr tomonidan boshqariladigan Docker tasvirida ishlaydigan va jismoniy va virtual marshrutizatorlar zanjiri orqali boshqa ilovalar yoki korporativ yechimlar bilan bog‘langan biznes mantig‘i zamonaviy texnologiya stekiga o‘ralgan. Bunday muhitda biror narsa har doim buzilishi mumkin, shuning uchun tashqi tizimlardan biri mavjud bo'lmasa, hodisalarni qayta ishlash bizning biznes jarayonlarimizning muhim qismidir.

Kafkadan oldin qanday edi

Ilgari loyihada biz asinxron xabarlarni yetkazib berish uchun IBM MQ dan foydalanganmiz. Agar xizmatning ishlashi paytida biron bir xatolik yuzaga kelsa, qabul qilingan xabar keyingi qo'lda tahlil qilish uchun o'lik harflar navbatiga (DLQ) joylashtirilishi mumkin edi. DLQ kiruvchi navbat yonida yaratilgan, xabar IBM MQ ichiga uzatilgan.

Agar xatolik vaqtinchalik bo‘lsa va biz uni aniqlay olsak (masalan, HTTP qo‘ng‘irog‘idagi ResourceAccessException yoki MongoDb so‘rovidagi MongoTimeoutException), u holda qayta urinish strategiyasi kuchga kiradi. Ilovaning tarmoqlanish mantig'idan qat'i nazar, asl xabar yo kechiktirilgan jo'natish uchun tizim navbatiga yoki xabarlarni qayta jo'natish uchun ancha oldin yaratilgan alohida ilovaga ko'chirildi. Bunga xabar sarlavhasida kechikish oralig'iga yoki dastur darajasidagi strategiyaning oxiriga bog'langan qayta yuborish raqami kiradi. Agar biz strategiyaning oxiriga yetgan bo'lsak-da, lekin tashqi tizim hali ham mavjud bo'lmasa, xabar qo'lda tahlil qilish uchun DLQ-ga joylashtiriladi.

Yechim topish

Internetda qidirish, siz quyidagilarni topishingiz mumkin qaror. Muxtasar qilib aytganda, har bir kechikish oralig'i uchun mavzu yaratish va kerakli kechikish bilan xabarlarni o'qiy oladigan iste'molchi ilovalarini yon tomonda amalga oshirish taklif etiladi.

Kafkadan olingan voqealarni qayta ishlash

Ko'p sonli ijobiy sharhlarga qaramay, menimcha, bu unchalik muvaffaqiyatli emas. Birinchidan, chunki ishlab chiquvchi biznes talablarini amalga oshirishdan tashqari, tavsiflangan mexanizmni amalga oshirish uchun ko'p vaqt sarflashi kerak bo'ladi.

Bundan tashqari, agar Kafka klasterida kirishni boshqarish yoqilgan bo'lsa, siz mavzularni yaratish va ularga kerakli kirishni ta'minlash uchun biroz vaqt sarflashingiz kerak bo'ladi. Bunga qo'shimcha ravishda, xabarlar qayta yuborilishi va undan yo'qolib qolmasligi uchun har bir qayta urinish mavzusi uchun to'g'ri retention.ms parametrini tanlashingiz kerak bo'ladi. Amalga oshirish va kirish so'rovi har bir mavjud yoki yangi xizmat uchun takrorlanishi kerak.

Keling, xabarlarni qayta ishlash uchun bizga qanday mexanizmlar va umuman bahor-kafka taqdim etishini ko'rib chiqaylik. Spring-kafka turli BackOffPolicies-ni boshqarish uchun abstraktsiyalarni ta'minlovchi bahor-qayta urinishga o'tish davriga bog'liq. Bu juda moslashuvchan vosita, ammo uning muhim kamchiliklari xabarlarni dastur xotirasida qayta yuborish uchun saqlashdir. Bu shuni anglatadiki, yangilanish yoki operatsion xato tufayli dasturni qayta ishga tushirish qayta ishlash kutilayotgan barcha xabarlarning yo'qolishiga olib keladi. Bu nuqta bizning tizimimiz uchun juda muhim bo'lganligi sababli, biz buni boshqa ko'rib chiqmadik.

Spring-kafkaning o'zi, masalan, ContainerAwareErrorHandlerning bir nechta ilovalarini taqdim etadi SeekToCurrentErrorHandler, uning yordamida siz xatolik yuz berganda ofsetni o'zgartirmasdan xabarni keyinroq qayta ishlashingiz mumkin. Spring-kafka 2.3 versiyasidan boshlab BackOffPolicy-ni o'rnatish mumkin bo'ldi.

Ushbu yondashuv qayta ishlangan xabarlarga dasturni qayta ishga tushirishdan omon qolishga imkon beradi, ammo hali ham DLQ mexanizmi mavjud emas. Biz bu variantni 2019 yil boshida tanladik va DLQ kerak emasligiga optimistik tarzda ishondik (biz omadli edik va dasturni bunday qayta ishlash tizimi bilan bir necha oy ishlatganimizdan keyin bunga muhtoj emas edik). Vaqtinchalik xatolar SeekToCurrentErrorHandler dasturini ishga tushirishga olib keldi. Qolgan xatolar jurnalda chop etildi, natijada ofset paydo bo'ldi va qayta ishlash keyingi xabar bilan davom etdi.

Yakuniy qaror

SeekToCurrentErrorHandler-ga asoslangan dastur bizni xabarlarni qayta jo'natish uchun o'z mexanizmimizni ishlab chiqishga undadi.

Avvalo, biz mavjud tajribadan foydalanishni va uni dastur mantig'iga qarab kengaytirishni xohladik. Chiziqli mantiqiy dastur uchun qayta urinish strategiyasida ko'rsatilgan qisqa vaqt ichida yangi xabarlarni o'qishni to'xtatish maqbul bo'ladi. Boshqa ilovalar uchun men qayta urinish strategiyasini amalga oshiradigan yagona nuqtaga ega bo'lishni xohlardim. Bundan tashqari, ushbu bitta nuqta har ikkala yondashuv uchun DLQ funksiyasiga ega bo'lishi kerak.

Qayta urinish strategiyasining o'zi vaqtinchalik xatolik yuzaga kelganda keyingi intervalni olish uchun mas'ul bo'lgan ilovada saqlanishi kerak.

Lineer mantiqiy dastur uchun iste'molchini to'xtatish

Spring-kafka bilan ishlaganda, iste'molchini to'xtatish uchun kod quyidagicha ko'rinishi mumkin:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Misolda, retryAt, agar u hali ham ishlayotgan bo'lsa, MessageListenerContainerni qayta ishga tushirish vaqtidir. Qayta ishga tushirish TaskScheduler-da ishga tushirilgan alohida oqimda sodir bo'ladi, uni amalga oshirish ham bahorda ta'minlanadi.

RetryAt qiymatini quyidagi tarzda topamiz:

  1. Qayta qo'ng'iroq qilish hisoblagichining qiymati ko'rib chiqiladi.
  2. Hisoblagich qiymatiga asoslanib, qayta urinish strategiyasida joriy kechikish oralig'i qidiriladi. Strategiya ilovaning o'zida e'lon qilingan; biz uni saqlash uchun JSON formatini tanladik.
  3. JSON massivida topilgan interval qayta ishlashni takrorlash kerak bo'lgan soniyalar sonini o'z ichiga oladi. Bu soniyalar soni retryAt qiymatini shakllantirish uchun joriy vaqtga qo'shiladi.
  4. Agar interval topilmasa, retryAt qiymati null bo'ladi va xabar qo'lda tahlil qilish uchun DLQ ga yuboriladi.

Ushbu yondashuv bilan, hozirda qayta ishlanayotgan har bir xabar uchun takroriy qo'ng'iroqlar sonini, masalan, dastur xotirasida saqlash qoladi. Qayta urinishlar sonini xotirada saqlash ushbu yondashuv uchun juda muhim emas, chunki chiziqli mantiqiy dastur ishlov berishni umuman boshqara olmaydi. Bahor-qayta urinishdan farqli o'laroq, dasturni qayta ishga tushirish barcha xabarlarni qayta ishlashga yo'qolishiga olib kelmaydi, shunchaki strategiyani qayta ishga tushiradi.

Ushbu yondashuv tashqi tizimdan yukni olib tashlashga yordam beradi, bu juda og'ir yuk tufayli mavjud bo'lmasligi mumkin. Boshqacha qilib aytganda, qayta ishlashdan tashqari, biz naqshni amalga oshirishga erishdik elektron to'xtatuvchidir.

Bizning holatlarimizda xato chegarasi atigi 1 ga teng va tarmoqning vaqtinchalik uzilishlari tufayli tizimning ishlamay qolish vaqtini minimallashtirish uchun biz kichik kechikish oraliqlari bilan juda batafsil qayta urinish strategiyasidan foydalanamiz. Bu barcha guruh ilovalari uchun mos kelmasligi mumkin, shuning uchun xato chegarasi va interval qiymati o'rtasidagi munosabatlar tizimning xususiyatlaridan kelib chiqqan holda tanlanishi kerak.

Deterministik bo'lmagan mantiqqa ega ilovalardan xabarlarni qayta ishlash uchun alohida dastur

Mana shunday dasturga (Qayta urinish) xabar yuboruvchi kod misoli, RETRY_AT vaqti yetganda DESTINATION mavzusiga qayta yuboriladi:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Misol ko'rsatadiki, sarlavhalarda juda ko'p ma'lumotlar uzatiladi. RETRY_AT qiymati Consumer stop orqali qayta urinish mexanizmi bilan bir xil tarzda topiladi. DESTINATION va RETRY_AT yo‘nalishlaridan tashqari biz:

  • GROUP_ID, bu orqali biz qo‘lda tahlil qilish va soddalashtirilgan qidiruv uchun xabarlarni guruhlaymiz.
  • ORIGINAL_PARTITION bir xil iste'molchini qayta ishlash uchun saqlab qolish uchun. Ushbu parametr null bo'lishi mumkin, bu holda yangi bo'lim asl xabarning record.key() tugmasi yordamida olinadi.
  • Qayta urinish strategiyasiga amal qilish uchun COUNTER qiymati yangilandi.
  • SEND_TO - bu xabar RETRY_AT ga yetgandan so'ng qayta ishlash uchun yuborilganligini yoki DLQga joylashtirilganligini ko'rsatadigan doimiydir.
  • REASON - xabarni qayta ishlash to'xtatilganining sababi.

Retryer xabarlarni PostgreSQL-da qayta yuborish va qo'lda tahlil qilish uchun saqlaydi. Taymer RETRY_AT yordamida xabarlarni topadigan vazifani boshlaydi va ularni record.key() kaliti bilan DESTINATION mavzusining ORIGINAL_PARTITION bo'limiga yuboradi.

Yuborilgandan so'ng, xabarlar PostgreSQL-dan o'chiriladi. Xabarlarni qo'lda tahlil qilish REST API orqali Retryer bilan o'zaro aloqada bo'lgan oddiy foydalanuvchi interfeysida sodir bo'ladi. Uning asosiy xususiyatlari DLQ-dan xabarlarni qayta yuborish yoki o'chirish, xato ma'lumotlarini ko'rish va xabarlarni qidirish, masalan, xato nomi bo'yicha.

Klasterlarimizda kirishni boshqarish yoqilganligi sababli, Retryer tinglayotgan mavzuga qo'shimcha ravishda ruxsat so'rash va Retryerga DESTINATION mavzusiga yozishga ruxsat berish kerak. Bu noqulay, ammo intervalli mavzu yondashuvidan farqli o'laroq, bizda uni boshqarish uchun to'liq DLQ va UI mavjud.

Kiruvchi mavzuni ilovalari turli mantiqni amalga oshiradigan bir nechta turli iste'molchilar guruhlari tomonidan o'qiladigan holatlar mavjud. Ushbu ilovalardan biri uchun Retryer orqali xabarni qayta ishlash ikkinchisida dublikatga olib keladi. Bundan himoya qilish uchun biz qayta ishlash uchun alohida mavzu yaratamiz. Kiruvchi va qayta urinish mavzulari bir xil iste'molchi tomonidan hech qanday cheklovlarsiz o'qilishi mumkin.

Kafkadan olingan voqealarni qayta ishlash

Odatiy bo'lib, bu yondashuv elektron to'xtatuvchining funksiyasini ta'minlamaydi, biroq u yordamida ilovaga qo'shilishi mumkin bahor-bulut-netflix yoki yangi bahor bulutli o'chirgich, tashqi xizmatlar chaqiriladigan joylarni tegishli abstraktsiyalarga o'rash. Bundan tashqari, strategiyani tanlash mumkin bo'ladi panjara naqsh, bu ham foydali bo'lishi mumkin. Masalan, spring-cloud-netflix-da bu iplar hovuzi yoki semafor bo'lishi mumkin.

xulosa

Natijada, har qanday tashqi tizim vaqtincha mavjud bo'lmasa, xabarlarni qayta ishlashni takrorlash imkonini beruvchi alohida dastur mavjud.

Ilovaning asosiy afzalliklaridan biri shundaki, u bir xil Kafka klasterida ishlaydigan tashqi tizimlar tomonidan muhim o'zgarishlarsiz foydalanishi mumkin! Bunday dastur faqat qayta urinish mavzusiga kirishi, bir nechta Kafka sarlavhalarini to'ldirishi va Retryerga xabar yuborishi kerak bo'ladi. Hech qanday qo'shimcha infratuzilmani ko'tarishning hojati yo'q. Ilovadan Retryerga va orqaga uzatiladigan xabarlar sonini kamaytirish uchun biz chiziqli mantiqqa ega ilovalarni aniqladik va ularni Consumer stop orqali qayta ishladik.

Manba: www.habr.com

a Izoh qo'shish