Memproses semula acara yang diterima daripada Kafka

Memproses semula acara yang diterima daripada Kafka

Hai Habr.

Baru-baru ini saya berkongsi pengalamannya tentang parameter yang kami sepasukan paling kerap gunakan untuk Pengeluar dan Pengguna Kafka untuk mendekati penghantaran yang terjamin. Dalam artikel ini saya ingin memberitahu anda bagaimana kami mengatur pemprosesan semula acara yang diterima daripada Kafka akibat ketiadaan sementara sistem luaran.

Aplikasi moden beroperasi dalam persekitaran yang sangat kompleks. Logik perniagaan dibalut dengan timbunan teknologi moden, berjalan dalam imej Docker yang diuruskan oleh orkestra seperti Kubernetes atau OpenShift, dan berkomunikasi dengan aplikasi lain atau penyelesaian perusahaan melalui rangkaian penghala fizikal dan maya. Dalam persekitaran sedemikian, sesuatu sentiasa boleh pecah, jadi memproses semula peristiwa jika salah satu sistem luaran tidak tersedia adalah bahagian penting dalam proses perniagaan kami.

Bagaimana keadaan sebelum Kafka

Terdahulu dalam projek kami menggunakan IBM MQ untuk penghantaran mesej tak segerak. Jika sebarang ralat berlaku semasa pengendalian perkhidmatan, mesej yang diterima boleh diletakkan dalam baris gilir huruf mati (DLQ) untuk penghuraian manual selanjutnya. DLQ telah dibuat di sebelah baris gilir masuk, mesej telah dipindahkan ke dalam IBM MQ.

Jika ralat itu bersifat sementara dan kami boleh menentukannya (contohnya, ResourceAccessException pada panggilan HTTP atau MongoTimeoutException pada permintaan MongoDb), maka strategi cuba semula akan berkuat kuasa. Tanpa mengira logik percabangan aplikasi, mesej asal telah dialihkan sama ada ke baris gilir sistem untuk penghantaran tertunda, atau ke aplikasi berasingan yang telah dibuat lama dahulu untuk menghantar semula mesej. Ini termasuk nombor hantar semula dalam pengepala mesej, yang terikat pada selang kelewatan atau penghujung strategi peringkat aplikasi. Jika kita telah mencapai penghujung strategi tetapi sistem luaran masih tidak tersedia, maka mesej akan diletakkan dalam DLQ untuk penghuraian manual.

Cari penyelesaian

Mencari di Internet, anda boleh mencari perkara berikut keputusan. Ringkasnya, adalah dicadangkan untuk mencipta topik untuk setiap selang kelewatan dan melaksanakan aplikasi Pengguna di sebelah, yang akan membaca mesej dengan kelewatan yang diperlukan.

Memproses semula acara yang diterima daripada Kafka

Walaupun banyak ulasan positif, nampaknya saya tidak berjaya sepenuhnya. Pertama sekali, kerana pemaju, sebagai tambahan kepada melaksanakan keperluan perniagaan, perlu menghabiskan banyak masa untuk melaksanakan mekanisme yang diterangkan.

Selain itu, jika kawalan akses didayakan pada gugusan Kafka, anda perlu meluangkan sedikit masa untuk mencipta topik dan menyediakan akses yang diperlukan kepada topik tersebut. Di samping itu, anda perlu memilih parameter retention.ms yang betul untuk setiap topik cuba semula supaya mesej mempunyai masa untuk dihantar semula dan tidak hilang daripadanya. Pelaksanaan dan permintaan akses perlu diulang untuk setiap perkhidmatan sedia ada atau baharu.

Sekarang mari kita lihat mekanisme spring secara umum dan spring-kafka khususnya yang disediakan untuk pemprosesan semula mesej. Spring-kafka mempunyai kebergantungan transitif pada spring-retry, yang menyediakan abstraksi untuk mengurus BackOffPolicies yang berbeza. Ini adalah alat yang agak fleksibel, tetapi kelemahan ketaranya ialah menyimpan mesej untuk dihantar semula dalam memori aplikasi. Ini bermakna memulakan semula aplikasi kerana kemas kini atau ralat operasi akan mengakibatkan kehilangan semua mesej sementara menunggu pemprosesan semula. Memandangkan perkara ini penting untuk sistem kami, kami tidak mempertimbangkannya lagi.

spring-kafka sendiri menyediakan beberapa pelaksanaan ContainerAwareErrorHandler, sebagai contoh SeekToCurrentErrorHandler, yang dengannya anda boleh memproses mesej kemudian tanpa mengalihkan offset sekiranya berlaku ralat. Bermula dengan versi spring-kafka 2.3, anda boleh menetapkan BackOffPolicy.

Pendekatan ini membenarkan mesej yang diproses semula untuk meneruskan aplikasi dimulakan semula, tetapi masih tiada mekanisme DLQ. Kami memilih pilihan ini pada awal 2019, secara optimis mempercayai bahawa DLQ tidak akan diperlukan (kami bernasib baik dan sebenarnya tidak memerlukannya selepas beberapa bulan mengendalikan aplikasi dengan sistem pemprosesan semula sedemikian). Ralat sementara menyebabkan SeekToCurrentErrorHandler tercetus. Ralat yang selebihnya telah dicetak dalam log, mengakibatkan pengimbangan, dan pemprosesan diteruskan dengan mesej seterusnya.

Keputusan terakhir

Pelaksanaan berdasarkan SeekToCurrentErrorHandler mendorong kami untuk membangunkan mekanisme kami sendiri untuk menghantar semula mesej.

Pertama sekali, kami ingin menggunakan pengalaman sedia ada dan mengembangkannya bergantung pada logik aplikasi. Untuk aplikasi logik linear, adalah optimum untuk berhenti membaca mesej baharu untuk tempoh masa yang singkat yang ditentukan oleh strategi cuba semula. Untuk aplikasi lain, saya mahu mempunyai satu titik yang akan menguatkuasakan strategi cuba semula. Di samping itu, titik tunggal ini mesti mempunyai fungsi DLQ untuk kedua-dua pendekatan.

Strategi cuba semula itu sendiri mesti disimpan dalam aplikasi, yang bertanggungjawab untuk mendapatkan semula selang seterusnya apabila ralat sementara berlaku.

Menghentikan Pengguna untuk Aplikasi Logik Linear

Apabila bekerja dengan spring-kafka, kod untuk menghentikan Pengguna mungkin kelihatan seperti ini:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Dalam contoh, retryAt ialah masa untuk memulakan semula MessageListenerContainer jika ia masih berjalan. Pelancaran semula akan berlaku dalam urutan berasingan yang dilancarkan dalam TaskScheduler, yang pelaksanaannya juga disediakan oleh musim bunga.

Kami mencari nilai retryAt dengan cara berikut:

  1. Nilai kaunter panggilan semula dicari.
  2. Berdasarkan nilai pembilang, selang kelewatan semasa dalam strategi cuba semula dicari. Strategi diisytiharkan dalam aplikasi itu sendiri; kami memilih format JSON untuk menyimpannya.
  3. Selang yang ditemui dalam tatasusunan JSON mengandungi bilangan saat selepas pemprosesan perlu diulang. Bilangan saat ini ditambahkan pada masa semasa untuk membentuk nilai untuk retryAt.
  4. Jika selang tidak dijumpai, maka nilai retryAt adalah batal dan mesej akan dihantar ke DLQ untuk penghuraian manual.

Dengan pendekatan ini, yang tinggal hanyalah menyimpan bilangan panggilan berulang untuk setiap mesej yang sedang diproses, contohnya dalam memori aplikasi. Mengekalkan kiraan cuba semula dalam ingatan adalah tidak kritikal untuk pendekatan ini, kerana aplikasi logik linear tidak dapat mengendalikan pemprosesan secara keseluruhan. Tidak seperti percubaan semula musim bunga, memulakan semula aplikasi tidak akan menyebabkan semua mesej hilang diproses semula, tetapi hanya akan memulakan semula strategi.

Pendekatan ini membantu mengeluarkan beban dari sistem luaran, yang mungkin tidak tersedia kerana beban yang sangat berat. Dalam erti kata lain, sebagai tambahan kepada pemprosesan semula, kami mencapai pelaksanaan corak pemutus litar.

Dalam kes kami, ambang ralat hanya 1, dan untuk meminimumkan masa henti sistem akibat gangguan rangkaian sementara, kami menggunakan strategi percubaan semula yang sangat terperinci dengan selang kependaman yang kecil. Ini mungkin tidak sesuai untuk semua aplikasi kumpulan, jadi hubungan antara ambang ralat dan nilai selang mesti dipilih berdasarkan ciri-ciri sistem.

Aplikasi berasingan untuk memproses mesej daripada aplikasi dengan logik bukan deterministik

Berikut ialah contoh kod yang menghantar mesej kepada aplikasi sedemikian (Retryer), yang akan menghantar semula ke topik DESTINATION apabila masa RETRY_AT dicapai:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Contoh menunjukkan bahawa banyak maklumat dihantar dalam pengepala. Nilai RETRY_AT ditemui dengan cara yang sama seperti mekanisme percubaan semula melalui hentian Pengguna. Selain DESTINATION dan RETRY_AT kami lulus:

  • GROUP_ID, yang mana kami mengumpulkan mesej untuk analisis manual dan carian yang dipermudahkan.
  • ORIGINAL_PARTITION untuk cuba mengekalkan Pengguna yang sama untuk diproses semula. Parameter ini boleh menjadi batal, dalam hal ini partition baharu akan diperoleh menggunakan kunci record.key() bagi mesej asal.
  • Nilai COUNTER dikemas kini untuk mengikuti strategi cuba semula.
  • SEND_TO ialah pemalar yang menunjukkan sama ada mesej dihantar untuk diproses semula apabila mencapai RETRY_AT atau diletakkan dalam DLQ.
  • SEBAB - sebab pemprosesan mesej terganggu.

Retryer menyimpan mesej untuk penghantaran semula dan penghuraian manual dalam PostgreSQL. Pemasa memulakan tugas yang mencari mesej dengan RETRY_AT dan menghantarnya kembali ke partition ORIGINAL_PARTITION topik DESTINATION dengan rekod kunci.key().

Setelah dihantar, mesej dipadamkan daripada PostgreSQL. Penghuraian mesej secara manual berlaku dalam UI mudah yang berinteraksi dengan Retryer melalui REST API. Ciri utamanya ialah menghantar semula atau memadam mesej daripada DLQ, melihat maklumat ralat dan mencari mesej, contohnya dengan nama ralat.

Memandangkan kawalan akses didayakan pada kluster kami, anda juga perlu meminta akses kepada topik yang sedang didengari oleh Pencuba Semula dan membenarkan Pencuba Semula menulis ke topik DESTINATION. Ini menyusahkan, tetapi, tidak seperti pendekatan topik selang, kami mempunyai DLQ dan UI yang lengkap untuk mengurusnya.

Terdapat kes apabila topik masuk dibaca oleh beberapa kumpulan pengguna yang berbeza, yang aplikasinya melaksanakan logik yang berbeza. Memproses semula mesej melalui Percubaan Semula untuk salah satu aplikasi ini akan menghasilkan pendua pada yang lain. Untuk melindungi daripada perkara ini, kami mencipta topik berasingan untuk diproses semula. Topik masuk dan cuba semula boleh dibaca oleh Pengguna yang sama tanpa sebarang sekatan.

Memproses semula acara yang diterima daripada Kafka

Secara lalai pendekatan ini tidak menyediakan fungsi pemutus litar, namun ia boleh ditambah pada aplikasi menggunakan spring-cloud-netflix atau baru pemutus litar awan musim bunga, membalut tempat di mana perkhidmatan luaran dipanggil ke dalam abstraksi yang sesuai. Di samping itu, ia menjadi mungkin untuk memilih strategi untuk sekat corak, yang juga boleh berguna. Contohnya, dalam spring-cloud-netflix ini boleh menjadi kumpulan benang atau semafor.

Output

Akibatnya, kami mempunyai aplikasi berasingan yang membolehkan kami mengulangi pemprosesan mesej jika mana-mana sistem luaran tidak tersedia buat sementara waktu.

Salah satu kelebihan utama aplikasi ini ialah ia boleh digunakan oleh sistem luaran yang berjalan pada gugusan Kafka yang sama, tanpa pengubahsuaian ketara di sisi mereka! Aplikasi sedemikian hanya perlu mengakses topik cuba semula, mengisi beberapa tajuk Kafka dan menghantar mesej kepada Pencuba Semula. Tidak perlu menaikkan sebarang infrastruktur tambahan. Dan untuk mengurangkan bilangan mesej yang dipindahkan daripada aplikasi ke Retryer dan kembali, kami mengenal pasti aplikasi dengan logik linear dan memprosesnya semula melalui hentian Pengguna.

Sumber: www.habr.com

Tambah komen