Reprocessing acara nampi ti Kafka

Reprocessing acara nampi ti Kafka

Hejo Habr.

Nembe I dibagikeun pangalamanana ngeunaan parameter naon anu kami salaku tim anu paling sering dianggo pikeun Produser sareng Konsumén Kafka pikeun ngadeukeutan pangiriman anu dijamin. Dina tulisan ieu kuring badé nyarioskeun ka anjeun kumaha urang ngatur ngolah ulang acara anu ditampi ti Kafka salaku hasil tina henteuna samentawis sistem éksternal.

Aplikasi modern beroperasi dina lingkungan anu kompleks pisan. Logika bisnis dibungkus dina tumpukan téknologi modéren, dijalankeun dina gambar Docker anu dikelola ku orkestra sapertos Kubernetes atanapi OpenShift, sareng komunikasi sareng aplikasi atanapi solusi perusahaan sanés ngalangkungan ranté router fisik sareng virtual. Dina lingkungan sapertos kitu, hiji hal salawasna bisa megatkeun, jadi reprocessing acara lamun salah sahiji sistem éksternal teu sadia mangrupa bagian penting tina prosés bisnis urang.

Kumaha ieu sateuacan Kafka

Saméméhna dina proyék kami nganggo IBM MQ pikeun pangiriman pesen asinkron. Upami aya kasalahan nalika ngajalankeun jasa, pesen anu ditampi tiasa ditempatkeun dina antrian-hurup-antri (DLQ) pikeun parsing manual salajengna. DLQ dijieun gigireun antrian asup, pesen ditransferkeun ka jero IBM MQ.

Upami kasalahan éta samentawis sareng urang tiasa nangtoskeunana (contona, ResourceAccessException dina telepon HTTP atanapi MongoTimeoutException dina pamundut MongoDb), teras strategi cobian deui bakal dianggo. Paduli logika branching tina aplikasi, pesen aslina ieu dipindahkeun boh ka antrian sistem pikeun nyangsang ngirim, atawa ka aplikasi misah nu dijieun lila pisan pikeun resend pesen. Ieu kalebet nomer kiriman ulang dina lulugu pesen, anu dihijikeun kana interval reureuh atanapi tungtung strategi tingkat aplikasi. Lamun urang geus nepi ka ahir strategi tapi sistem éksternal masih teu sadia, teras pesen bakal disimpen dina DLQ pikeun parsing manual.

Milarian jalan kaluarna

Pilarian dina Internét, anjeun tiasa mendakan di handap ieu solusi. Pondokna, eta diusulkeun pikeun nyieun hiji topik pikeun tiap interval reureuh sarta nerapkeun aplikasi Consumer di sisi, nu bakal maca pesen kalawan reureuh diperlukeun.

Reprocessing acara nampi ti Kafka

Sanaos seueur ulasan anu positif, sigana kuring henteu suksés pisan. Anu mimiti, sabab pamekar, salian ngalaksanakeun syarat bisnis, kedah nyéépkeun seueur waktos pikeun ngalaksanakeun mékanisme anu dijelaskeun.

Salaku tambahan, upami kontrol aksés diaktipkeun dina klaster Kafka, anjeun kedah nyéépkeun waktos nyiptakeun topik sareng nyayogikeun aksés anu diperyogikeun. Salian ti ieu, anjeun kedah milih parameter retention.ms anu leres pikeun unggal jejer cobian deui supados pesen gaduh waktos pikeun ambek sareng henteu ngaleungit. Palaksanaan sareng pamundut aksés kedah diulang pikeun unggal jasa anu aya atanapi énggal.

Hayu urang tingali naon mékanisme spring umumna jeung spring-kafka hususna nyadiakeun kami pikeun reprocessing pesen. Spring-kafka boga katergantungan transitif dina spring-retry, nu nyadiakeun abstraksi pikeun ngatur BackOffPolicies béda. Ieu mangrupikeun alat anu cukup fleksibel, tapi kalemahan anu penting nyaéta nyimpen pesen pikeun ngirim ulang dina mémori aplikasi. Ieu ngandung harti yén ngabalikan deui aplikasi kusabab pembaruan atanapi kasalahan operasional bakal nyababkeun kaleungitan sadaya pesen anu ngantosan diolah deui. Kusabab titik ieu kritis pikeun sistem kami, kami henteu nganggap éta salajengna.

spring-kafka sorangan nyadiakeun sababaraha palaksanaan ContainerAwareErrorHandler, contona SeekToCurrentErrorHandler, anu anjeun tiasa ngolah pesen engké tanpa mindahkeun offset upami aya kasalahan. Dimimitian ku vérsi spring-kafka 2.3, janten kamungkinan pikeun nyetél BackOffPolicy.

Pendekatan ieu ngamungkinkeun pesen anu diprosés deui pikeun salamet ngamimitian deui aplikasi, tapi masih teu aya mékanisme DLQ. Kami milih pilihan ieu dina awal 2019, optimis yakin yén DLQ henteu diperyogikeun (kami untung sareng saleresna henteu peryogina saatos sababaraha bulan ngoperasikeun aplikasi sareng sistem reprocessing sapertos kitu). Kasalahan samentara nyababkeun SeekToCurrentErrorHandler kahuruan. Kasalahan sésana dicitak dina log, hasilna offset, jeung ngolah dituluykeun jeung pesen salajengna.

Kaputusan ahir

Palaksanaan dumasar kana SeekToCurrentErrorHandler ngajurung kami pikeun ngembangkeun mékanisme sorangan pikeun ngirimkeun deui pesen.

Anu mimiti, urang hoyong nganggo pangalaman anu tos aya sareng dilegakeun gumantung kana logika aplikasi. Pikeun aplikasi logika linier, éta bakal optimal pikeun ngeureunkeun maca pesen anyar pikeun periode pondok waktu dieusian ku strategi cobian deui. Pikeun aplikasi anu sanés, kuring hoyong gaduh titik tunggal anu bakal ngalaksanakeun strategi cobian deui. Salaku tambahan, titik tunggal ieu kedah gaduh fungsionalitas DLQ pikeun duanana pendekatan.

Strategi coba deui sorangan kudu disimpen dina aplikasi, nu jawab retrieving interval salajengna lamun kasalahan samentara lumangsung.

Ngeureunkeun Konsumén pikeun Aplikasi Logika Linier

Nalika damel sareng spring-kafka, kode pikeun ngeureunkeun Konsumén sigana sapertos kieu:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Dina conto, retryAt mangrupikeun waktos pikeun ngamimitian deui MessageListenerContainer upami masih jalan. Re-peluncuran bakal lumangsung dina thread misah dibuka dina TaskScheduler, palaksanaan nu ogé disadiakeun ku spring.

Kami mendakan nilai retryAt ku cara kieu:

  1. Nilai counter panggero ulang kasampak up.
  2. Dumasar kana nilai counter, interval reureuh ayeuna dina strategi coba deui searched. Strategi dinyatakeun dina aplikasi sorangan; kami milih format JSON pikeun nyimpen éta.
  3. Interval kapanggih dina Asép Sunandar Sunarya JSON ngandung jumlah detik sanggeus processing bakal perlu diulang. Jumlah detik ieu ditambahkeun kana waktu ayeuna pikeun ngabentuk nilai pikeun retryAt.
  4. Upami interval henteu kapendak, maka nilai retryAt nol sareng pesen bakal dikirim ka DLQ pikeun parsing manual.

Kalayan pendekatan ieu, ngan ukur nyimpen jumlah telepon anu diulang-ulang pikeun unggal pesen anu ayeuna diolah, contona dina mémori aplikasi. Ngajaga count cobian deui dina mémori henteu kritis pikeun pendekatan ieu, sabab aplikasi logika linier teu tiasa ngadamel ngolah sacara gembleng. Beda sareng spring-retry, ngabalikan deui aplikasi moal nyababkeun sadaya pesen leungit diolah deui, tapi ngan saukur bakal ngamimitian deui strategi.

Pendekatan ieu ngabantosan ngaleungitkeun beban tina sistem éksternal, anu tiasa henteu sayogi kusabab beban anu beurat pisan. Kalayan kecap séjén, sajaba reprocessing, urang ngahontal palaksanaan pola pemutus sirkuit.

Dina kasus urang, bangbarung kasalahan ngan 1, sarta pikeun ngaleutikan downtime sistem alatan outages jaringan samentara, urang ngagunakeun strategi ulang pisan granular kalawan interval latency leutik. Ieu bisa jadi teu cocog pikeun sakabéh aplikasi grup, jadi hubungan antara bangbarung kasalahan jeung nilai interval kudu dipilih dumasar kana karakteristik sistem.

Aplikasi anu misah pikeun ngolah pesen tina aplikasi kalayan logika non-deterministik

Ieu conto kode anu ngirim pesen ka aplikasi sapertos kitu (Retryer), anu bakal dikirim deui ka topik DESTINATION nalika waktos RETRY_AT parantos ngahontal:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Conto nunjukkeun yén seueur inpormasi anu dikirimkeun dina header. Nilai RETRY_AT kapanggih dina cara nu sarua jeung mékanisme retry ngaliwatan Konsumén eureun. Salian DESTINATION sareng RETRY_AT kami lulus:

  • GROUP_ID, dimana urang ngakelompokkeun pesen pikeun analisa manual sareng milarian saderhana.
  • ORIGINAL_PARTITION pikeun nyobaan ngajaga Konsumén anu sami pikeun diolah deui. Parameter ieu tiasa null, bisi nu partisi anyar bakal diala maké record.key () konci pesen aslina.
  • Nilai COUNTER diropéa pikeun nuturkeun strategi cobian deui.
  • SEND_TO nyaéta konstanta anu nuduhkeun naha suratna dikirim pikeun diolah deui nalika ngahontal RETRY_AT atanapi disimpen dina DLQ.
  • ALASAN - alesan kunaon pamrosésan pesen diganggu.

Retryer nyimpen pesen pikeun ngirim ulang sareng parsing manual dina PostgreSQL. A timer dimimitian tugas nu manggihan pesen kalawan RETRY_AT sarta ngirimkeunana deui ka partisi ORIGINAL_PARTITION topik DESTINATION kalawan konci record.key ().

Sakali dikirim, pesen dihapus tina PostgreSQL. Parsing manual pesen lumangsung dina UI basajan nu berinteraksi sareng Retryer via REST API. Fitur utami nyaéta ngirim deui atanapi mupus pesen tina DLQ, ningali inpormasi kasalahan sareng milarian pesen, contona ku nami kasalahan.

Kusabab kontrol aksés diaktipkeun dina klaster urang, perlu ogé ménta aksés ka topik nu Retryer dengekeun, sarta ngidinan Retryer nulis ka topik DESTINATION. Ieu teu merenah, tapi, teu saperti pendekatan topik interval, urang boga DLQ full-fledged jeung UI pikeun ngatur eta.

Aya kasus nalika topik anu datang dibaca ku sababaraha grup konsumen anu béda, anu aplikasina ngalaksanakeun logika anu béda. Ngolah deui pesen ngaliwatan Retryer pikeun salah sahiji aplikasi ieu bakal ngahasilkeun duplikat dina aplikasi anu sanés. Pikeun ngajaga ngalawan ieu, urang nyieun hiji topik misah pikeun ulang processing. Topik anu asup sareng coba deui tiasa dibaca ku Konsumén anu sami tanpa aya larangan.

Reprocessing acara nampi ti Kafka

Sacara standar pendekatan ieu teu nyadiakeun pungsi circuit breaker, tapi bisa ditambahkeun kana aplikasi ngagunakeun cinyusu-awan-netflix atawa anyar cinyusu awan circuit breaker, wrapping tempat dimana jasa éksternal disebut abstraksi luyu. Sajaba ti éta, janten mungkin pikeun milih strategi pikeun sirah gedé pola, nu ogé bisa jadi mangpaat. Contona, dina spring-cloud-netflix ieu bisa jadi thread pool atawa semafor a.

kacindekan

Hasilna, urang gaduh aplikasi anu misah anu ngamungkinkeun urang ngulang ngolah pesen upami aya sistem éksternal anu samentawis henteu sayogi.

Salah sahiji kaunggulan utama aplikasi téh nya éta bisa dipaké ku sistem éksternal ngajalankeun dina klaster Kafka sarua, tanpa modifikasi signifikan di sisi maranéhna! Aplikasi sapertos kitu ngan ukur kedah ngaksés topik cobian deui, eusian sababaraha header Kafka sareng ngirim pesen ka Retryer. Teu perlu ningkatkeun infrastruktur tambahan. Sareng pikeun ngirangan jumlah pesen anu ditransfer tina aplikasi ka Retryer sareng deui, kami ngaidentipikasi aplikasi nganggo logika linier sareng ngolah deui ngaliwatan Konsumén eureun.

sumber: www.habr.com

Tambahkeun komentar