Bagaimana Kafka menjadi realiti

Bagaimana Kafka menjadi realiti

Hai Habr!

Saya bekerja dengan pasukan Tinkoff, yang sedang membangunkan pusat pemberitahuannya sendiri. Saya kebanyakannya membangun di Jawa menggunakan but Spring dan menyelesaikan pelbagai masalah teknikal yang timbul dalam projek.

Kebanyakan perkhidmatan mikro kami berkomunikasi antara satu sama lain secara tidak segerak melalui broker mesej. Sebelum ini, kami menggunakan IBM MQ sebagai broker, yang tidak lagi dapat menampung beban, tetapi pada masa yang sama mempunyai jaminan penghantaran yang tinggi.

Sebagai pengganti, kami ditawarkan Apache Kafka, yang mempunyai potensi penskalaan yang tinggi, tetapi, malangnya, memerlukan pendekatan hampir individu untuk konfigurasi untuk senario yang berbeza. Di samping itu, sekurang-kurangnya sekali mekanisme penghantaran yang berfungsi di Kafka secara lalai tidak membenarkan mengekalkan tahap konsistensi yang diperlukan di luar kotak. Seterusnya, saya akan berkongsi pengalaman kami dalam konfigurasi Kafka, khususnya, saya akan memberitahu anda cara mengkonfigurasi dan hidup dengan tepat sekali penghantaran.

Penghantaran terjamin dan banyak lagi

Tetapan yang dibincangkan di bawah akan membantu mengelakkan beberapa masalah dengan tetapan sambungan lalai. Tetapi pertama-tama saya ingin memberi perhatian kepada satu parameter yang akan memudahkan kemungkinan nyahpepijat.

Ini akan membantu klien.id untuk Pengeluar dan Pengguna. Pada pandangan pertama, anda boleh menggunakan nama aplikasi sebagai nilai, dan dalam kebanyakan kes ini akan berfungsi. Walaupun situasi apabila aplikasi menggunakan beberapa Pengguna dan anda memberi mereka client.id yang sama, mengakibatkan amaran berikut:

org.apache.kafka.common.utils.AppInfoParser β€” Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Jika anda ingin menggunakan JMX dalam aplikasi dengan Kafka, maka ini boleh menjadi masalah. Untuk kes ini, sebaiknya gunakan gabungan nama aplikasi dan, sebagai contoh, nama topik sebagai nilai client.id. Hasil konfigurasi kami boleh dilihat dalam output arahan kumpulan-pengguna-kafka daripada utiliti daripada Confluent:

Bagaimana Kafka menjadi realiti

Sekarang mari kita lihat senario untuk penghantaran mesej terjamin. Kafka Producer mempunyai parameter acks, yang membolehkan anda mengkonfigurasi selepas berapa ramai yang mengakui ketua kluster perlu mempertimbangkan mesej yang berjaya ditulis. Parameter ini boleh mengambil nilai berikut:

  • 0 β€” pengakuan tidak akan dipertimbangkan.
  • 1 ialah parameter lalai, hanya 1 replika diperlukan untuk diakui.
  • βˆ’1 β€” pengakuan daripada semua replika yang disegerakkan diperlukan (persediaan kelompok min.insync.replicas).

Daripada nilai yang disenaraikan adalah jelas bahawa acks sama dengan -1 memberikan jaminan paling kuat bahawa mesej tidak akan hilang.

Seperti yang kita sedia maklum, sistem teragih tidak boleh dipercayai. Untuk melindungi daripada kerosakan sementara, Kafka Producer menyediakan pilihan mencuba semula, yang membolehkan anda menetapkan bilangan percubaan hantar semula dalam penghantaran.masa tamat.ms. Memandangkan parameter cuba semula mempunyai nilai lalai Integer.MAX_VALUE (2147483647), bilangan percubaan semula mesej boleh dilaraskan dengan menukar hanya delivery.timeout.ms.

Kami bergerak ke arah penghantaran tepat sekali

Tetapan yang disenaraikan membolehkan Pengeluar kami menghantar mesej dengan jaminan yang tinggi. Sekarang mari kita bincangkan tentang cara memastikan hanya satu salinan mesej ditulis kepada topik Kafka? Dalam kes paling mudah, untuk melakukan ini, anda perlu menetapkan parameter pada Producer membolehkan.dempotensi kepada benar. Idempotensi menjamin bahawa hanya satu mesej ditulis pada partition tertentu bagi satu topik. Prasyarat untuk mendayakan idempotensi ialah nilai acks = semua, cuba semula > 0, max.in.flight.requests.per.connection ≀ 5. Jika parameter ini tidak ditentukan oleh pembangun, nilai di atas akan ditetapkan secara automatik.

Apabila idempotency dikonfigurasikan, adalah perlu untuk memastikan bahawa mesej yang sama berakhir dalam partition yang sama setiap kali. Ini boleh dilakukan dengan menetapkan kunci partitioner.class dan parameter kepada Producer. Mari kita mulakan dengan kunci. Ia mestilah sama untuk setiap penyerahan. Ini boleh dicapai dengan mudah dengan menggunakan mana-mana ID perniagaan daripada siaran asal. Parameter partitioner.class mempunyai nilai lalai βˆ’ DefaultPartitioner. Dengan strategi pembahagian ini, secara lalai kami bertindak seperti ini:

  • Jika partition dinyatakan secara eksplisit semasa menghantar mesej, maka kami menggunakannya.
  • Jika partition tidak ditentukan, tetapi kunci ditentukan, pilih partition dengan cincangan kekunci.
  • Jika partition dan key tidak dinyatakan, pilih partition satu persatu (round-robin).

Juga, menggunakan penghantaran kunci dan idempoten dengan parameter max.in.flight.requests.per.connection = 1 memberi anda pemprosesan mesej diperkemas pada Pengguna. Perlu diingat juga bahawa jika kawalan akses dikonfigurasikan pada kelompok anda, maka anda memerlukan hak untuk menulis topik secara idempoten.

Jika tiba-tiba anda kekurangan keupayaan penghantaran idempoten melalui kunci atau logik di pihak Pengeluar memerlukan mengekalkan konsistensi data antara partition yang berbeza, maka transaksi akan datang untuk menyelamatkan. Di samping itu, menggunakan transaksi rantaian, anda boleh menyegerakkan rekod dalam Kafka secara bersyarat, contohnya, dengan rekod dalam pangkalan data. Untuk membolehkan penghantaran transaksi kepada Pengeluar, ia mestilah idempoten dan juga ditetapkan transactional.id. Jika gugusan Kafka anda mempunyai kawalan akses yang dikonfigurasikan, maka rekod transaksi, seperti rekod idempoten, memerlukan kebenaran menulis, yang boleh diberikan dengan topeng menggunakan nilai yang disimpan dalam transactional.id.

Secara rasmi, sebarang rentetan, seperti nama aplikasi, boleh digunakan sebagai pengecam transaksi. Tetapi jika anda melancarkan beberapa contoh aplikasi yang sama dengan transactional.id yang sama, maka contoh pertama yang dilancarkan akan dihentikan dengan ralat, kerana Kafka akan menganggapnya sebagai proses zombi.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Untuk menyelesaikan masalah ini, kami menambah akhiran pada nama aplikasi dalam bentuk nama hos, yang kami peroleh daripada pembolehubah persekitaran.

Pengeluar dikonfigurasikan, tetapi transaksi di Kafka hanya mengawal skop mesej. Tanpa mengira status urus niaga, mesej segera pergi ke topik, tetapi mempunyai atribut sistem tambahan.

Untuk mengelakkan mesej sedemikian daripada dibaca oleh Pengguna lebih awal daripada masa, ia perlu menetapkan parameter pengasingan.peringkat untuk membaca_komitmen nilai. Pengguna sedemikian akan dapat membaca mesej bukan transaksi seperti sebelumnya, dan mesej transaksi hanya selepas komitmen.
Jika anda telah menetapkan semua tetapan yang disenaraikan sebelum ini, maka anda telah mengkonfigurasi tepat sekali penghantaran. tahniah!

Tetapi ada satu lagi nuansa. Transactional.id, yang kami konfigurasikan di atas, sebenarnya adalah awalan transaksi. Pada pengurus urus niaga, nombor turutan ditambahkan padanya. Pengecam yang diterima dikeluarkan kepada transactional.id.expiration.ms, yang dikonfigurasikan pada gugusan Kafka dan mempunyai nilai lalai "7 hari". Jika pada masa ini aplikasi tidak menerima sebarang mesej, maka apabila anda mencuba penghantaran transaksi seterusnya anda akan menerima InvalidPidMappingException. Penyelaras transaksi kemudiannya akan mengeluarkan nombor urutan baharu untuk transaksi seterusnya. Walau bagaimanapun, mesej mungkin hilang jika InvalidPidMappingException tidak dikendalikan dengan betul.

Daripada hasil

Seperti yang anda lihat, tidak cukup dengan hanya menghantar mesej kepada Kafka. Anda perlu memilih gabungan parameter dan bersedia untuk membuat perubahan pantas. Dalam artikel ini, saya cuba menunjukkan secara terperinci persediaan penghantaran tepat sekali dan menerangkan beberapa masalah dengan konfigurasi client.id dan transactional.id yang kami hadapi. Di bawah ialah ringkasan tetapan Pengeluar dan Pengguna.

Pengeluar:

  1. acks = semua
  2. cuba semula > 0
  3. enable.dempotensi = benar
  4. max.in.flight.requests.per.connection ≀ 5 (1 untuk penghantaran teratur)
  5. transactional.id = ${nama-permohonan}-${nama hos}

Pengguna:

  1. pengasingan.peringkat = read_committed

Untuk meminimumkan ralat dalam aplikasi masa hadapan, kami membuat pembalut kami sendiri pada konfigurasi spring, di mana nilai untuk beberapa parameter yang disenaraikan telah ditetapkan.

Berikut adalah beberapa bahan untuk belajar sendiri:

Sumber: www.habr.com

Tambah komen