Bagaimana Kafka menjadi kenyataan

Bagaimana Kafka menjadi kenyataan

Hei Habr!

Saya bekerja di tim Tinkoff, yang mengembangkan pusat notifikasinya sendiri. Saya kebanyakan mengembangkan di Java menggunakan Spring boot dan menyelesaikan berbagai masalah teknis yang muncul dalam sebuah proyek.

Sebagian besar layanan mikro kami berkomunikasi satu sama lain secara asinkron melalui perantara pesan. Sebelumnya, kami menggunakan IBM MQ sebagai broker, yang tidak mampu lagi menahan beban, namun pada saat yang sama memiliki jaminan pengiriman yang tinggi.

Sebagai gantinya, kami ditawari Apache Kafka, yang memiliki potensi penskalaan tinggi, namun sayangnya memerlukan pendekatan konfigurasi yang hampir individual untuk skenario yang berbeda. Selain itu, mekanisme pengiriman setidaknya satu kali yang berfungsi di Kafka secara default tidak memungkinkan pemeliharaan tingkat konsistensi yang diperlukan secara langsung. Selanjutnya, saya akan berbagi pengalaman kami dalam konfigurasi Kafka, khususnya, saya akan memberi tahu Anda cara mengkonfigurasi dan menjalankan pengiriman tepat satu kali.

Pengiriman terjamin dan banyak lagi

Pengaturan yang dibahas di bawah ini akan membantu mencegah sejumlah masalah dengan pengaturan koneksi default. Tapi pertama-tama saya ingin memperhatikan satu parameter yang akan memfasilitasi kemungkinan debug.

Ini akan membantu klien.id bagi Produsen dan Konsumen. Sekilas, Anda dapat menggunakan nama aplikasi sebagai nilainya, dan dalam banyak kasus ini akan berhasil. Meskipun situasi ketika aplikasi menggunakan beberapa Konsumen dan Anda memberi mereka client.id yang sama, menghasilkan peringatan berikut:

org.apache.kafka.common.utils.AppInfoParser β€” Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Jika Anda ingin menggunakan JMX dalam aplikasi dengan Kafka, ini bisa menjadi masalah. Untuk kasus ini, yang terbaik adalah menggunakan kombinasi nama aplikasi dan, misalnya, nama topik sebagai nilai client.id. Hasil konfigurasi kita dapat dilihat pada output perintah kelompok konsumen kafka dari utilitas dari Confluent:

Bagaimana Kafka menjadi kenyataan

Sekarang mari kita lihat skenario pengiriman pesan yang terjamin. Produser Kafka memiliki parameter pantat, yang memungkinkan Anda mengonfigurasi berapa banyak pengakuan yang diperlukan pemimpin klaster agar pesan berhasil ditulis. Parameter ini dapat mengambil nilai berikut:

  • 0 β€” pengakuan tidak akan dipertimbangkan.
  • 1 adalah parameter default, hanya 1 replika yang diperlukan untuk mengonfirmasi.
  • βˆ’1 β€” diperlukan pengakuan dari semua replika yang disinkronkan (penyiapan klaster min.insync.replika).

Dari nilai yang tercantum jelas bahwa acks sama dengan βˆ’1 memberikan jaminan terkuat bahwa pesan tidak akan hilang.

Seperti kita ketahui, sistem terdistribusi tidak dapat diandalkan. Untuk melindungi dari kesalahan sementara, Kafka Producer menyediakan opsi tersebut coba lagi, yang memungkinkan Anda mengatur jumlah upaya pengiriman ulang di dalamnya pengiriman.timeout.ms. Karena parameter percobaan ulang memiliki nilai default Integer.MAX_VALUE (2147483647), jumlah percobaan ulang pesan dapat disesuaikan dengan mengubah hanya delivery.timeout.ms.

Kami bergerak menuju pengiriman tepat satu kali

Pengaturan yang tercantum memungkinkan Produser kami mengirimkan pesan dengan jaminan tinggi. Sekarang mari kita bahas tentang cara memastikan bahwa hanya satu salinan pesan yang ditulis untuk topik Kafka? Dalam kasus paling sederhana, untuk melakukan ini, Anda perlu mengatur parameter pada Produser aktifkan.idempotensi menjadi benar. Idempotensi menjamin bahwa hanya satu pesan yang ditulis ke partisi tertentu dari satu topik. Prasyarat untuk memungkinkan idempotensi adalah nilai-nilai acks = semua, coba lagi > 0, max.in.flight.requests.per.connection ≀ 5. Jika parameter ini tidak ditentukan oleh pengembang, nilai di atas akan ditetapkan secara otomatis.

Ketika idempotensi dikonfigurasi, penting untuk memastikan bahwa pesan yang sama berakhir di partisi yang sama setiap saat. Hal ini dapat dilakukan dengan mengatur kunci dan parameter partisi.kelas ke Produser. Mari kita mulai dengan kuncinya. Itu harus sama untuk setiap pengiriman. Ini dapat dengan mudah dicapai dengan menggunakan salah satu ID bisnis dari postingan asli. Parameter partisi.kelas memiliki nilai default - Partisi Default. Dengan strategi partisi ini, secara default kita bertindak seperti ini:

  • Jika partisi ditentukan secara eksplisit saat mengirim pesan, maka kami menggunakannya.
  • Jika partisi tidak ditentukan, tetapi kuncinya ditentukan, pilih partisi dengan hash kunci.
  • Jika partisi dan kunci tidak ditentukan, pilih partisi satu per satu (round-robin).

Juga, menggunakan kunci dan pengiriman idempoten dengan parameter max.in.flight.requests.per.koneksi = 1 memberi Anda pemrosesan pesan yang efisien pada Konsumen. Perlu juga diingat bahwa jika kontrol akses dikonfigurasi pada klaster Anda, maka Anda memerlukan hak untuk menulis topik secara idempoten.

Jika tiba-tiba Anda kekurangan kemampuan pengiriman idempoten dengan kunci atau logika di sisi Produser memerlukan menjaga konsistensi data antara partisi yang berbeda, maka transaksi akan menjadi penyelamat. Selain itu, dengan menggunakan transaksi berantai, Anda dapat menyinkronkan catatan di Kafka secara kondisional, misalnya, dengan catatan di database. Untuk mengaktifkan pengiriman transaksional ke Produser, pengiriman tersebut harus idempoten dan disetel tambahan transaksional.id. Jika klaster Kafka Anda memiliki kontrol akses yang dikonfigurasi, maka rekaman transaksional, seperti rekaman idempoten, akan memerlukan izin tulis, yang dapat diberikan melalui mask menggunakan nilai yang disimpan di transaksional.id.

Secara formal, string apa pun, seperti nama aplikasi, dapat digunakan sebagai pengidentifikasi transaksi. Namun jika Anda meluncurkan beberapa instance dari aplikasi yang sama dengan transaksional.id yang sama, maka instance yang diluncurkan pertama kali akan dihentikan karena kesalahan, karena Kafka akan menganggapnya sebagai proses zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Untuk mengatasi masalah ini, kami menambahkan akhiran pada nama aplikasi berupa nama host, yang kami peroleh dari variabel lingkungan.

Produser sudah dikonfigurasi, tetapi transaksi di Kafka hanya mengontrol cakupan pesan. Terlepas dari status transaksi, pesan langsung menuju ke topik, namun memiliki atribut sistem tambahan.

Untuk mencegah pesan tersebut dibaca oleh Konsumen sebelumnya, Konsumen perlu mengatur parameternya isolasi.level ke nilai read_commit. Konsumen seperti itu akan dapat membaca pesan non-transaksional seperti sebelumnya, dan pesan transaksional hanya setelah dilakukan.
Jika Anda telah mengatur semua pengaturan yang tercantum sebelumnya, maka Anda telah mengkonfigurasi tepat satu kali pengiriman. Selamat!

Namun ada satu nuansa lagi. Transactional.id yang kita konfigurasikan di atas sebenarnya adalah awalan transaksi. Pada manajer transaksi, nomor urut ditambahkan ke dalamnya. Pengidentifikasi yang diterima dikeluarkan untuk transaksional.id.expiration.ms, yang dikonfigurasi pada klaster Kafka dan memiliki nilai default β€œ7 hari”. Jika selama ini aplikasi belum menerima pesan apa pun, maka ketika Anda mencoba pengiriman transaksional berikutnya, Anda akan menerimanya PengecualianPidMappingTidak Valid. Koordinator transaksi kemudian akan mengeluarkan nomor urut baru untuk transaksi berikutnya. Namun, pesan tersebut mungkin hilang jika InvalidPidMappingException tidak ditangani dengan benar.

Alih-alih total

Seperti yang Anda lihat, mengirim pesan ke Kafka saja tidak cukup. Anda harus memilih kombinasi parameter dan bersiap untuk melakukan perubahan cepat. Pada artikel kali ini saya mencoba menampilkan secara detail setup pengiriman tepat sekali dan menjelaskan beberapa masalah pada konfigurasi client.id dan transaksional.id yang kami temui. Di bawah ini adalah ringkasan pengaturan Produsen dan Konsumen.

Produser:

  1. acks = semua
  2. percobaan ulang> 0
  3. aktifkan.idempotensi = benar
  4. max.in.flight.requests.per.connection ≀ 5 (1 untuk pengiriman teratur)
  5. transaksional.id = ${nama-aplikasi}-${nama host}

Konsumen:

  1. isolasi.level = read_commited

Untuk meminimalkan kesalahan dalam aplikasi masa depan, kami membuat pembungkus sendiri pada konfigurasi pegas, di mana nilai untuk beberapa parameter yang tercantum sudah ditetapkan.

Berikut adalah beberapa bahan untuk belajar mandiri:

Sumber: www.habr.com

Tambah komentar