Paano naging realidad si Kafka

Paano naging realidad si Kafka

Hoy Habr!

Nagtatrabaho ako sa Tinkoff team, na gumagawa ng sarili nitong notification center. Karamihan ay nabubuo ako sa Java gamit ang Spring boot at nilulutas ang iba't ibang mga teknikal na problema na lumitaw sa isang proyekto.

Karamihan sa aming mga microservice ay nakikipag-ugnayan sa isa't isa nang asynchronous sa pamamagitan ng isang message broker. Noong nakaraan, ginamit namin ang IBM MQ bilang isang broker, na hindi na makayanan ang pagkarga, ngunit sa parehong oras ay may mataas na garantiya sa paghahatid.

Bilang kapalit, inalok kami ng Apache Kafka, na may mataas na potensyal sa pag-scale, ngunit, sa kasamaang-palad, ay nangangailangan ng halos indibidwal na diskarte sa pagsasaayos para sa iba't ibang mga sitwasyon. Bilang karagdagan, ang hindi bababa sa isang beses na mekanismo ng paghahatid na gumagana sa Kafka bilang default ay hindi nagpapahintulot sa pagpapanatili ng kinakailangang antas ng pagkakapare-pareho sa labas ng kahon. Susunod, ibabahagi ko ang aming karanasan sa pagsasaayos ng Kafka, sa partikular, sasabihin ko sa iyo kung paano i-configure at mabuhay nang eksaktong isang beses na paghahatid.

Garantisadong paghahatid at higit pa

Ang mga setting na tinalakay sa ibaba ay makakatulong na maiwasan ang ilang mga problema sa mga default na setting ng koneksyon. Ngunit una, nais kong bigyang-pansin ang isang parameter na magpapadali sa isang posibleng pag-debug.

Makakatulong ito ID ng kliyente para sa Producer at Consumer. Sa unang tingin, maaari mong gamitin ang pangalan ng application bilang halaga, at sa karamihan ng mga kaso ito ay gagana. Bagama't ang sitwasyon kapag ang isang application ay gumagamit ng ilang Consumer at binigyan mo sila ng parehong client.id, nagreresulta sa sumusunod na babala:

org.apache.kafka.common.utils.AppInfoParser β€” Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Kung gusto mong gumamit ng JMX sa isang application na may Kafka, maaaring ito ay isang problema. Para sa kasong ito, pinakamahusay na gumamit ng kumbinasyon ng pangalan ng application at, halimbawa, ang pangalan ng paksa bilang value ng client.id. Ang resulta ng aming pagsasaayos ay makikita sa output ng command kafka-consumer-groups mula sa mga utility mula sa Confluent:

Paano naging realidad si Kafka

Ngayon tingnan natin ang senaryo para sa garantisadong paghahatid ng mensahe. May parameter ang Kafka Producer mga acks, na nagbibigay-daan sa iyong i-configure pagkatapos kung gaano karaming kumikilala ang cluster leader na kailangang isaalang-alang na matagumpay na naisulat ang mensahe. Maaaring kunin ng parameter na ito ang mga sumusunod na halaga:

  • 0 β€” ang pagkilala ay hindi isasaalang-alang.
  • 1 ang default na parameter, 1 replica lang ang kailangan na kilalanin.
  • βˆ’1 β€” ang pagkilala mula sa lahat ng naka-synchronize na mga replika ay kinakailangan (cluster setup min.insync.replicas).

Mula sa mga nakalistang halaga ay malinaw na ang mga acks na katumbas ng βˆ’1 ay nagbibigay ng pinakamatibay na garantiya na ang mensahe ay hindi mawawala.

Tulad ng alam nating lahat, ang mga distributed system ay hindi maaasahan. Upang maprotektahan laban sa lumilipas na mga pagkakamali, ang Kafka Producer ay nagbibigay ng opsyon muling sinusubukan, na nagbibigay-daan sa iyong itakda ang bilang ng mga pagsubok na muling ipadala sa loob delivery.timeout.ms. Dahil ang parameter ng muling pagsubok ay may default na halaga na Integer.MAX_VALUE (2147483647), ang bilang ng mga muling pagsubok ng mensahe ay maaaring isaayos sa pamamagitan ng pagpapalit lamang ng delivery.timeout.ms.

Kami ay lumilipat patungo sa eksaktong isang beses na paghahatid

Ang mga nakalistang setting ay nagbibigay-daan sa aming Producer na maghatid ng mga mensahe na may mataas na garantiya. Pag-usapan natin ngayon kung paano masisiguro na isang kopya lang ng mensahe ang isinulat sa isang paksa ng Kafka? Sa pinakasimpleng kaso, upang gawin ito, kailangan mong itakda ang parameter sa Producer paganahin.idempotence sa totoo. Ginagarantiyahan ng Idempotency na isang mensahe lamang ang isinulat sa isang partikular na partisyon ng isang paksa. Ang paunang kondisyon para sa pagpapagana ng idempotency ay ang mga halaga acks = lahat, subukan muli > 0, max.in.flight.requests.per.connection ≀ 5. Kung ang mga parameter na ito ay hindi tinukoy ng developer, ang mga halaga sa itaas ay awtomatikong itatakda.

Kapag na-configure ang idempotency, kinakailangan upang matiyak na ang parehong mga mensahe ay napupunta sa parehong mga partisyon sa bawat oras. Magagawa ito sa pamamagitan ng pagtatakda ng partitioner.class key at parameter sa Producer. Magsimula tayo sa susi. Dapat pareho ito para sa bawat pagsusumite. Madali itong makamit sa pamamagitan ng paggamit ng alinman sa mga business ID mula sa orihinal na post. Ang parameter ng partitioner.class ay may default na halaga βˆ’ DefaultPartitioner. Gamit ang diskarte sa partitioning na ito, bilang default, kumikilos kami nang ganito:

  • Kung ang partition ay tahasang tinukoy kapag nagpapadala ng mensahe, pagkatapos ay ginagamit namin ito.
  • Kung ang partition ay hindi tinukoy, ngunit ang key ay tinukoy, piliin ang partition sa pamamagitan ng hash ng key.
  • Kung ang partition at key ay hindi tinukoy, piliin ang mga partisyon nang paisa-isa (round-robin).

Gayundin, gamit ang isang susi at idempotent na pagpapadala na may parameter max.in.flight.requests.per.connection = 1 nagbibigay sa iyo ng streamline na pagpoproseso ng mensahe sa Consumer. Ito rin ay nagkakahalaga ng pag-alala na kung ang kontrol sa pag-access ay na-configure sa iyong cluster, kakailanganin mo ng mga karapatan upang magsulat nang walang kakayahan sa isang paksa.

Kung bigla kang kulang sa mga kakayahan ng idempotent na pagpapadala sa pamamagitan ng susi o ang lohika sa panig ng Producer ay nangangailangan ng pagpapanatili ng pagkakapare-pareho ng data sa pagitan ng iba't ibang mga partisyon, ang mga transaksyon ay darating upang iligtas. Bilang karagdagan, gamit ang isang transaksyon sa chain, maaari mong kondisyon na i-synchronize ang isang tala sa Kafka, halimbawa, na may isang talaan sa database. Upang paganahin ang transaksyonal na pagpapadala sa Producer, ito ay dapat na idempotent at bukod pa rito ay nakatakda transactional.id. Kung ang iyong Kafka cluster ay may access control na na-configure, ang isang transactional record, tulad ng isang idempotent record, ay mangangailangan ng mga pahintulot sa pagsusulat, na maaaring ibigay sa pamamagitan ng mask gamit ang value na nakaimbak sa transactional.id.

Sa pormal, anumang string, gaya ng pangalan ng application, ay maaaring gamitin bilang isang transaction identifier. Ngunit kung maglulunsad ka ng ilang pagkakataon ng parehong application na may parehong transactional.id, ang unang inilunsad na pagkakataon ay ititigil nang may error, dahil ituring ito ng Kafka na proseso ng zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Upang malutas ang problemang ito, nagdaragdag kami ng suffix sa pangalan ng application sa anyo ng hostname, na nakukuha namin mula sa mga variable ng kapaligiran.

Naka-configure ang producer, ngunit kinokontrol lamang ng mga transaksyon sa Kafka ang saklaw ng mensahe. Anuman ang katayuan ng transaksyon, ang mensahe ay agad na mapupunta sa paksa, ngunit may mga karagdagang katangian ng system.

Upang maiwasang maagang mabasa ng Consumer ang mga naturang mensahe, kailangan nitong itakda ang parameter paghihiwalay.antas sa read_committed value. Magagawa ng naturang Consumer na magbasa ng mga di-transaksyonal na mensahe tulad ng dati, at mga transaksyonal na mensahe lamang pagkatapos ng isang commit.
Kung naitakda mo ang lahat ng mga setting na nakalista nang mas maaga, pagkatapos ay na-configure mo nang eksaktong isang beses ang paghahatid. Binabati kita!

Ngunit may isa pang nuance. Ang Transactional.id, na na-configure namin sa itaas, ay talagang ang prefix ng transaksyon. Sa manager ng transaksyon, isang sequence number ang idinagdag dito. Ang natanggap na identifier ay ibinibigay sa transactional.id.expiration.ms, na naka-configure sa isang Kafka cluster at may default na value na "7 araw". Kung sa panahong ito ang application ay hindi nakatanggap ng anumang mga mensahe, pagkatapos ay kapag sinubukan mo ang susunod na transaksyonal na pagpapadala ay makakatanggap ka InvalidPidMappingException. Magbibigay ang transaction coordinator ng bagong sequence number para sa susunod na transaksyon. Gayunpaman, maaaring mawala ang mensahe kung ang InvalidPidMappingException ay hindi pinangangasiwaan nang tama.

Sa halip na kabuuan

Tulad ng nakikita mo, hindi sapat na magpadala lamang ng mga mensahe sa Kafka. Kailangan mong pumili ng kumbinasyon ng mga parameter at maging handa sa mga mabilisang pagbabago. Sa artikulong ito, sinubukan kong ipakita nang detalyado ang eksaktong isang beses na pag-setup ng paghahatid at inilarawan ang ilang mga problema sa mga configuration ng client.id at transactional.id na nakatagpo namin. Nasa ibaba ang isang buod ng mga setting ng Producer at Consumer.

producer:

  1. acks = lahat
  2. muling subukan > 0
  3. enable.idempotence = totoo
  4. max.in.flight.requests.per.connection ≀ 5 (1 para sa maayos na pagpapadala)
  5. transactional.id = ${application-name}-${hostname}

Mamimili:

  1. isolation.level = read_committed

Para mabawasan ang mga error sa hinaharap na mga application, gumawa kami ng sarili naming wrapper sa configuration ng spring, kung saan nakatakda na ang mga value para sa ilan sa mga nakalistang parameter.

Narito ang ilang mga materyales para sa sariling pag-aaral:

Pinagmulan: www.habr.com

Magdagdag ng komento