Memproses ulang peristiwa yang diterima dari Kafka

Memproses ulang peristiwa yang diterima dari Kafka

Hei Habr.

Baru-baru ini saya berbagi pengalamannya tentang parameter apa yang paling sering kami gunakan sebagai tim untuk Produser dan Konsumen Kafka agar lebih dekat dengan jaminan pengiriman. Dalam artikel ini saya ingin memberi tahu Anda bagaimana kami mengatur pemrosesan ulang peristiwa yang diterima dari Kafka sebagai akibat dari tidak tersedianya sistem eksternal untuk sementara.

Aplikasi modern beroperasi di lingkungan yang sangat kompleks. Logika bisnis dibungkus dalam tumpukan teknologi modern, berjalan dalam image Docker yang dikelola oleh orkestrator seperti Kubernetes atau OpenShift, dan berkomunikasi dengan aplikasi lain atau solusi perusahaan melalui rantai router fisik dan virtual. Dalam lingkungan seperti itu, sesuatu bisa saja rusak, jadi memproses ulang kejadian jika salah satu sistem eksternal tidak tersedia adalah bagian penting dari proses bisnis kami.

Bagaimana keadaannya sebelum Kafka

Sebelumnya dalam proyek ini kami menggunakan IBM MQ untuk pengiriman pesan asinkron. Jika terjadi kesalahan selama pengoperasian layanan, pesan yang diterima dapat ditempatkan di antrian surat mati (DLQ) untuk penguraian manual lebih lanjut. DLQ dibuat di sebelah antrian masuk, pesan ditransfer ke dalam IBM MQ.

Jika kesalahan bersifat sementara dan kami dapat menentukannya (misalnya, ResourceAccessException pada panggilan HTTP atau MongoTimeoutException pada permintaan MongoDb), maka strategi percobaan ulang akan berlaku. Terlepas dari logika percabangan aplikasi, pesan asli dipindahkan ke antrian sistem untuk pengiriman tertunda, atau ke aplikasi terpisah yang dibuat sejak lama untuk mengirim ulang pesan. Ini termasuk nomor pengiriman ulang di header pesan, yang terkait dengan interval penundaan atau akhir dari strategi tingkat aplikasi. Jika kita telah mencapai akhir strategi tetapi sistem eksternal masih tidak tersedia, maka pesan akan ditempatkan di DLQ untuk diurai secara manual.

Cari solusinya

Mencari di Internet, Anda dapat menemukan yang berikut ini keputusan. Singkatnya, diusulkan untuk membuat topik untuk setiap interval penundaan dan mengimplementasikan aplikasi Konsumen di samping, yang akan membaca pesan dengan penundaan yang diperlukan.

Memproses ulang peristiwa yang diterima dari Kafka

Meskipun banyak ulasan positif, menurut saya tampaknya tidak sepenuhnya berhasil. Pertama-tama, karena pengembang, selain menerapkan persyaratan bisnis, harus menghabiskan banyak waktu untuk menerapkan mekanisme yang dijelaskan.

Selain itu, jika kontrol akses diaktifkan di kluster Kafka, Anda harus meluangkan waktu untuk membuat topik dan menyediakan akses yang diperlukan ke topik tersebut. Selain itu, Anda harus memilih parameter retensi.ms yang benar untuk setiap topik percobaan ulang sehingga pesan punya waktu untuk dikirim ulang dan tidak hilang. Implementasi dan permintaan akses harus diulang untuk setiap layanan yang ada atau baru.

Sekarang mari kita lihat mekanisme apa yang disediakan spring secara umum dan spring-kafka pada khususnya untuk pemrosesan ulang pesan. Spring-kafka memiliki ketergantungan transitif pada spring-retry, yang menyediakan abstraksi untuk mengelola BackOffPolicies yang berbeda. Ini adalah alat yang cukup fleksibel, namun kelemahan signifikannya adalah menyimpan pesan untuk dikirim ulang di memori aplikasi. Artinya, memulai ulang aplikasi karena pembaruan atau kesalahan operasional akan mengakibatkan hilangnya semua pesan yang menunggu pemrosesan ulang. Karena poin ini penting untuk sistem kami, kami tidak mempertimbangkannya lebih jauh.

spring-kafka sendiri menyediakan beberapa implementasi ContainerAwareErrorHandler, misalnya SeekToCurrentErrorHandler, yang dengannya Anda dapat memproses pesan nanti tanpa menggeser offset jika terjadi kesalahan. Dimulai dengan versi spring-kafka 2.3, BackOffPolicy dapat disetel.

Pendekatan ini memungkinkan pesan yang diproses ulang untuk bertahan saat aplikasi dimulai ulang, namun masih belum ada mekanisme DLQ. Kami memilih opsi ini pada awal tahun 2019, dengan optimisme yakin bahwa DLQ tidak diperlukan (kami beruntung dan sebenarnya tidak memerlukannya setelah beberapa bulan mengoperasikan aplikasi dengan sistem pemrosesan ulang seperti itu). Kesalahan sementara menyebabkan SeekToCurrentErrorHandler diaktifkan. Kesalahan yang tersisa dicetak di log, menghasilkan offset, dan pemrosesan dilanjutkan dengan pesan berikutnya.

Keputusan akhir

Implementasi berdasarkan SeekToCurrentErrorHandler mendorong kami untuk mengembangkan mekanisme kami sendiri untuk mengirim ulang pesan.

Pertama-tama, kami ingin menggunakan pengalaman yang ada dan memperluasnya bergantung pada logika aplikasi. Untuk aplikasi logika linier, sebaiknya berhenti membaca pesan baru dalam jangka waktu singkat yang ditentukan oleh strategi coba lagi. Untuk aplikasi lain, saya ingin memiliki satu poin yang akan menerapkan strategi coba lagi. Selain itu, titik tunggal ini harus memiliki fungsionalitas DLQ untuk kedua pendekatan.

Strategi percobaan ulang itu sendiri harus disimpan dalam aplikasi, yang bertanggung jawab untuk mengambil interval berikutnya ketika terjadi kesalahan sementara.

Menghentikan Konsumen untuk Aplikasi Logika Linier

Saat bekerja dengan spring-kafka, kode untuk menghentikan Konsumen mungkin terlihat seperti ini:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Pada contoh, retryAt adalah waktu untuk memulai ulang MessageListenerContainer jika masih berjalan. Peluncuran ulang akan dilakukan di thread terpisah yang diluncurkan di TaskScheduler, yang implementasinya juga disediakan pada musim semi.

Kami menemukan nilai retryAt dengan cara berikut:

  1. Nilai penghitung panggilan ulang dicari.
  2. Berdasarkan nilai penghitung, interval penundaan saat ini dalam strategi percobaan ulang dicari. Strateginya dideklarasikan dalam aplikasi itu sendiri; kami memilih format JSON untuk menyimpannya.
  3. Interval yang ditemukan dalam array JSON berisi jumlah detik setelah pemrosesan perlu diulang. Jumlah detik ini ditambahkan ke waktu saat ini untuk membentuk nilai retryAt.
  4. Jika interval tidak ditemukan, maka nilai retryAt adalah null dan pesan akan dikirim ke DLQ untuk diurai secara manual.

Dengan pendekatan ini, yang tersisa hanyalah menyimpan jumlah panggilan berulang untuk setiap pesan yang sedang diproses, misalnya di memori aplikasi. Menyimpan jumlah percobaan ulang dalam memori tidak penting untuk pendekatan ini, karena aplikasi logika linier tidak dapat menangani pemrosesan secara keseluruhan. Berbeda dengan spring-retry, memulai ulang aplikasi tidak akan menyebabkan semua pesan hilang untuk diproses ulang, namun hanya akan memulai ulang strategi.

Pendekatan ini membantu menghilangkan beban sistem eksternal, yang mungkin tidak tersedia karena beban yang sangat berat. Dengan kata lain, selain pemrosesan ulang, kami mencapai implementasi pola tersebut pemutus arus.

Dalam kasus kami, ambang kesalahan hanya 1, dan untuk meminimalkan waktu henti sistem karena pemadaman jaringan sementara, kami menggunakan strategi percobaan ulang yang sangat terperinci dengan interval latensi yang kecil. Ini mungkin tidak cocok untuk semua aplikasi grup, sehingga hubungan antara ambang kesalahan dan nilai interval harus dipilih berdasarkan karakteristik sistem.

Aplikasi terpisah untuk memproses pesan dari aplikasi dengan logika non-deterministik

Berikut adalah contoh kode yang mengirimkan pesan ke aplikasi tersebut (Retryer), yang akan dikirim ulang ke topik DESTINATION ketika waktu RETRY_AT tercapai:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Contoh menunjukkan bahwa banyak informasi dikirimkan dalam header. Nilai RETRY_AT ditemukan dengan cara yang sama seperti mekanisme percobaan ulang melalui penghentian Konsumen. Selain DESTINATION dan RETRY_AT kami meneruskan:

  • GROUP_ID, yang dengannya kami mengelompokkan pesan untuk analisis manual dan pencarian yang disederhanakan.
  • ASLI_PARTISI untuk mencoba mempertahankan Konsumen yang sama untuk diproses ulang. Parameter ini bisa berupa null, dalam hal ini partisi baru akan diperoleh menggunakan kunci record.key() dari pesan asli.
  • Nilai COUNTER diperbarui untuk mengikuti strategi percobaan ulang.
  • SEND_TO adalah konstanta yang menunjukkan apakah pesan dikirim untuk diproses ulang setelah mencapai RETRY_AT atau ditempatkan di DLQ.
  • ALASAN - alasan mengapa pemrosesan pesan terhenti.

Retryer menyimpan pesan untuk dikirim ulang dan diurai secara manual di PostgreSQL. Pengatur waktu memulai tugas yang menemukan pesan dengan RETRY_AT dan mengirimkannya kembali ke partisi ASLI_PARTITION topik DESTINATION dengan kunci record.key().

Setelah terkirim, pesan akan dihapus dari PostgreSQL. Penguraian pesan secara manual terjadi di UI sederhana yang berinteraksi dengan Retryer melalui REST API. Fitur utamanya adalah mengirim ulang atau menghapus pesan dari DLQ, melihat informasi kesalahan dan mencari pesan, misalnya berdasarkan nama kesalahan.

Karena kontrol akses diaktifkan di cluster kami, maka perlu untuk meminta akses tambahan ke topik yang sedang didengarkan Retryer, dan mengizinkan Retryer untuk menulis ke topik DESTINATION. Ini merepotkan, namun, tidak seperti pendekatan topik interval, kami memiliki DLQ dan UI lengkap untuk mengelolanya.

Ada kalanya topik masuk dibaca oleh beberapa kelompok konsumen berbeda, yang aplikasinya menerapkan logika berbeda. Memproses ulang pesan melalui Retryer untuk salah satu aplikasi ini akan menghasilkan duplikat di aplikasi lainnya. Untuk melindungi dari hal ini, kami membuat topik terpisah untuk diproses ulang. Topik yang masuk dan dicoba kembali dapat dibaca oleh Konsumen yang sama tanpa batasan apa pun.

Memproses ulang peristiwa yang diterima dari Kafka

Secara default, pendekatan ini tidak menyediakan fungsionalitas pemutus sirkuit, namun dapat ditambahkan ke aplikasi menggunakan musim semi-cloud-netflix atau baru pemutus sirkuit awan pegas, membungkus tempat di mana layanan eksternal dipanggil ke dalam abstraksi yang sesuai. Selain itu, menjadi mungkin untuk memilih strategi sekat pola, yang juga dapat berguna. Misalnya, di spring-cloud-netflix, ini bisa berupa kumpulan thread atau semaphore.

Keluaran

Hasilnya, kami memiliki aplikasi terpisah yang memungkinkan kami mengulangi pemrosesan pesan jika sistem eksternal untuk sementara tidak tersedia.

Salah satu keunggulan utama aplikasi ini adalah dapat digunakan oleh sistem eksternal yang berjalan pada cluster Kafka yang sama, tanpa modifikasi signifikan di pihaknya! Aplikasi semacam itu hanya perlu mengakses topik percobaan ulang, mengisi beberapa header Kafka dan mengirim pesan ke Retryer. Tidak perlu membangun infrastruktur tambahan apa pun. Dan untuk mengurangi jumlah pesan yang ditransfer dari aplikasi ke Retryer dan sebaliknya, kami mengidentifikasi aplikasi dengan logika linier dan memprosesnya kembali melalui penghentian Konsumen.

Sumber: www.habr.com

Tambah komentar