Kumaha Kafka janten kanyataan

Kumaha Kafka janten kanyataan

Héy Habr!

Kuring damel di tim Tinkoff, anu ngembangkeun pusat béwara sorangan. Kuring lolobana ngamekarkeun di Java maké Spring boot jeung ngajawab sagala rupa masalah teknis anu timbul dina hiji proyék.

Kalolobaan microservices urang komunikasi saling asynchronously ngaliwatan calo pesen. Saméméhna, urang dipaké IBM MQ salaku calo, nu teu bisa Cope jeung beban, tapi di waktu nu sami gaduh jaminan pangiriman tinggi.

Salaku gaganti, kami ditawarkeun Apache Kafka, nu boga poténsi skala tinggi, tapi hanjakalna, merlukeun pendekatan ampir individual pikeun konfigurasi pikeun skenario béda. Salaku tambahan, sahenteuna sakali mékanisme pangiriman anu dianggo dina Kafka sacara standar henteu ngijinkeun ngajaga tingkat konsistensi anu diperyogikeun di luar kotak. Salajengna, abdi bakal babagi pangalaman urang dina konfigurasi Kafka, hususna, kuring bakal ngabejaan ka maneh kumaha ngonpigurasikeun tur hirup kalawan persis sakali pangiriman.

Dijamin pangiriman sareng seueur deui

Setélan anu dibahas di handap bakal ngabantosan nyegah sababaraha masalah sareng setélan sambungan standar. Tapi mimitina Abdi hoyong nengetan hiji parameter anu bakal mempermudah hiji mungkin debug.

Ieu bakal nulungan klien.id pikeun Produsén sareng Konsumén. Dina glance kahiji, anjeun tiasa nganggo nami aplikasi salaku nilai, sareng dina kalolobaan kasus ieu bakal jalan. Sanaos kaayaan nalika aplikasi nganggo sababaraha Konsumén sareng anjeun masihan aranjeunna klien.id anu sami, nyababkeun peringatan ieu:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Upami anjeun hoyong nganggo JMX dina aplikasi sareng Kafka, maka ieu tiasa janten masalah. Pikeun hal ieu, éta pangalusna ngagunakeun kombinasi nami aplikasi tur, contona, ngaran topik salaku nilai client.id. Hasil tina konfigurasi urang tiasa ditingali dina kaluaran paréntah kafka-konsumén-grup ti utilitas ti Confluent:

Kumaha Kafka janten kanyataan

Ayeuna hayu urang tingali dina skenario pikeun pangiriman pesen dijamin. Produser Kafka gaduh parameter acks, nu ngidinan Anjeun pikeun ngonpigurasikeun sanggeus sabaraha ngakuan pamimpin klaster perlu mertimbangkeun pesen hasil ditulis. Parameter ieu tiasa nyandak nilai-nilai ieu:

  • 0 - ngaku moal dianggap.
  • 1 mangrupikeun parameter standar, ngan ukur 1 réplika anu diperyogikeun pikeun ngaku.
  • −1 - ngaku ti sadaya réplika anu disingkronkeun diperyogikeun (setelan klaster min.insync.replicas).

Tina nilai anu didaptarkeun jelas yén acks sami sareng −1 masihan jaminan anu paling kuat yén pesenna moal leungit.

Sakumaha urang terang, sistem anu disebarkeun henteu tiasa dipercaya. Pikeun ngajaga tina kasalahan sementara, Produser Kafka nyayogikeun pilihan nyobian deui, nu ngidinan Anjeun pikeun nyetel jumlah usaha ngirim ulang dina delivery.timeout.ms. Kusabab parameter retries boga nilai standar Integer.MAX_VALUE (2147483647), jumlah retries pesen bisa disaluyukeun ku ngan ngarobah delivery.timeout.ms.

Urang pindah ka arah persis sakali pangiriman

Setélan anu didaptarkeun ngamungkinkeun Produsén kami pikeun ngirim pesen kalayan jaminan anu luhur. Hayu urang ayeuna ngobrol ngeunaan kumaha carana mastikeun yén ngan hiji salinan pesen ditulis kana topik Kafka? Dina kasus pangbasajanna, pikeun ngalakukeun ieu, anjeun kedah nyetél parameter dina Produser ngaktifkeun.dempotensi mun leres. Idempotency ngajamin yén ngan ukur hiji pesen anu ditulis kana partisi khusus tina hiji topik. Prasyarat pikeun ngaktipkeun idempotensi nyaéta nilai acks = sadayana, coba deui > 0, max.in.flight.requests.per.connection ≤ 5. Upami parameter ieu henteu ditangtukeun ku pamekar, nilai-nilai di luhur bakal otomatis disetel.

Nalika idempotency dikonpigurasi, perlu pikeun mastikeun yén pesen anu sami mungkas dina partisi anu sami unggal waktos. Ieu tiasa dilakukeun ku netepkeun konci partitioner.class sareng parameter ka Produser. Hayu urang mimitian ku konci. Éta kedah sami pikeun unggal kiriman. Ieu bisa gampang dihontal ku ngagunakeun salah sahiji ID bisnis ti pos aslina. Parameter partitioner.class ngabogaan nilai standar − DefaultPartitioner. Kalayan strategi partisi ieu, sacara standar kami ngalakukeun sapertos kieu:

  • Upami partisi sacara eksplisit ditetepkeun nalika ngirim pesen, maka kami nganggo éta.
  • Lamun partisi teu dieusian, tapi konci dieusian, pilih partisi ku hash sahiji konci.
  • Lamun partisi jeung konci teu dieusian, pilih partisi hiji-hiji (round-robin).

Ogé, ngagunakeun konci na idempotent ngirim kalawan parameter max.in.flight.requests.per.connection = 1 méré Anjeun ngolah pesen streamlined on Consumer. Éta ogé patut émut yén upami kontrol aksés dikonpigurasi dina kluster anjeun, maka anjeun peryogi hak pikeun nyerat sacara idempotent kana topik.

Upami ujug-ujug anjeun kakurangan kamampuan ngirim idempotent ku konci atanapi logika di sisi Produser peryogi ngajaga konsistensi data antara partisi anu béda, maka transaksi bakal nyalametkeun. Salaku tambahan, nganggo transaksi ranté, anjeun tiasa nyinkronkeun catetan dina Kafka, contona, kalayan rékaman dina pangkalan data. Pikeun ngaktifkeun ngirim transaksional ka Produser, éta kedah idempotent sareng set tambahan transactional.id. Upami klaster Kafka anjeun gaduh kontrol aksés anu dikonpigurasi, maka rékaman transaksional, sapertos catetan idempoten, peryogi idin nyerat, anu tiasa dipasihkeun ku masker nganggo nilai anu disimpen dina transactional.id.

Sacara resmi, string naon waé, sapertos nami aplikasi, tiasa dianggo salaku identifier transaksi. Tapi upami anjeun ngaluncurkeun sababaraha conto aplikasi anu sami sareng transactional.id anu sami, maka conto anu diluncurkeun munggaran bakal dieureunkeun ku kasalahan, sabab Kafka bakal nganggap éta prosés zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Pikeun ngajawab masalah ieu, urang tambahkeun a ahiran kana ngaran aplikasi dina bentuk hostname, nu urang ménta tina variabel lingkungan.

Produser dikonpigurasikeun, tapi transaksi dina Kafka ngan ukur ngatur ruang lingkup pesen. Paduli status urus, pesen langsung ka topik, tapi boga atribut sistem tambahan.

Pikeun nyegah pesen sapertos dibaca ku Konsumén sateuacanna, éta kedah nyetél parameter isolasi.tingkat pikeun read_committed nilai. Konsumén sapertos kitu bakal tiasa maca seratan non-transaksional sapertos sateuacanna, sareng pesen transaksi ngan ukur saatos komitmen.
Upami anjeun parantos nyetél sadaya setélan anu didaptarkeun tadi, maka anjeun parantos ngonpigurasikeun pas sakali pangiriman. Wilujeng!

Tapi aya hiji deui nuansa. Transactional.id, anu kami konfigurasikeun di luhur, saleresna mangrupikeun awalan transaksi. Dina manajer transaksi, hiji nomer runtuyan ditambahkeun kana eta. Identifier nampi dikaluarkeun pikeun transactional.id.expiration.ms, anu dikonpigurasi dina klaster Kafka sareng gaduh nilai standar "7 dinten". Upami salami ieu aplikasi henteu nampi pesen, teras nalika anjeun nyobian kiriman transaksi salajengna anjeun bakal nampi InvalidPidMappingException. Koordinator transaksi lajeng bakal ngaluarkeun nomer runtuyan anyar pikeun urus salajengna. Nanging, pesenna tiasa leungit upami InvalidPidMappingException henteu diurus leres.

Gantina hasilna

Sakumaha anjeun tiasa tingali, teu cukup ngan ngirim pesen ka Kafka. Anjeun kedah milih kombinasi parameter sareng disiapkeun pikeun parobihan gancang. Dina tulisan ieu, kuring nyobian nunjukkeun sacara rinci setélan pangiriman pas sakali sareng ngajelaskeun sababaraha masalah sareng konfigurasi client.id sareng transactional.id anu kami hadapi. Di handap ieu kasimpulan setelan Produser sareng Konsumén.

produser:

  1. acks = sadayana
  2. cobaan deui > 0
  3. enable.idempotence = leres
  4. max.in.flight.requests.per.connection ≤ 5 (1 pikeun pangiriman anu teratur)
  5. transactional.id = ${ngaran-aplikasi}-${ngaran host}

Nu meuli:

  1. isolation.level = read_committed

Pikeun ngaleutikan kasalahan dina aplikasi nu bakal datang, urang nyieun wrapper sorangan dina konfigurasi spring, dimana nilai sababaraha parameter didaptarkeun geus disetel.

Ieu sababaraha bahan pikeun diajar mandiri:

sumber: www.habr.com

Tambahkeun komentar