Kif Kafka sar realtà

Kif Kafka sar realtà

Ħej Habr!

Naħdem fit-tim Tinkoff, li qed jiżviluppa ċ-ċentru tan-notifika tiegħu stess. Jiena l-aktar niżviluppa fil-Java billi nuża Spring boot u nsolvi diversi problemi tekniċi li jinqalgħu fi proġett.

Ħafna mill-mikroservizzi tagħna jikkomunikaw ma 'xulxin b'mod asinkroniku permezz ta' sensar tal-messaġġi. Preċedentement, użajna IBM MQ bħala sensar, li ma setax ilaħħaq aktar mat-tagħbija, iżda fl-istess ħin kellu garanziji ta 'kunsinna għolja.

Bħala sostitut, ġejna offruti Apache Kafka, li għandu potenzjal ta 'skala għoli, iżda, sfortunatament, jeħtieġ approċċ kważi individwali għall-konfigurazzjoni għal xenarji differenti. Barra minn hekk, il-mekkaniżmu tal-kunsinna tal-inqas darba li jaħdem f'Kafka awtomatikament ma ppermettiex li jinżamm il-livell meħtieġ ta' konsistenza barra mill-kaxxa. Sussegwentement, se naqsam l-esperjenza tagħna fil-konfigurazzjoni ta 'Kafka, b'mod partikolari, se ngħidlek kif tikkonfigura u tgħix b'kunsinna eżatta darba.

Kunsinna garantita u aktar

Is-settings diskussi hawn taħt se jgħinu biex jipprevjenu numru ta 'problemi bl-issettjar tal-konnessjoni default. Imma l-ewwel nixtieq nagħti attenzjoni għal parametru wieħed li jiffaċilita debug possibbli.

Dan se jgħin klijent.id għall-Produttur u l-Konsumatur. L-ewwel daqqa t'għajn, tista 'tuża l-isem tal-applikazzjoni bħala l-valur, u f'ħafna każijiet dan jaħdem. Għalkemm is-sitwazzjoni meta applikazzjoni tuża diversi Konsumaturi u inti tagħtihom l-istess client.id, tirriżulta fit-twissija li ġejja:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Jekk trid tuża JMX f'applikazzjoni ma' Kafka, allura din tista' tkun problema. Għal dan il-każ, huwa aħjar li tuża kombinazzjoni tal-isem tal-applikazzjoni u, pereżempju, l-isem tas-suġġett bħala l-valur client.id. Ir-riżultat tal-konfigurazzjoni tagħna jista 'jidher fl-output tal-kmand kafka-gruppi tal-konsumaturi minn utilitajiet minn Confluent:

Kif Kafka sar realtà

Issa ejja nħarsu lejn ix-xenarju għall-kunsinna garantita tal-messaġġi. Kafka Producer għandu parametru acks, li jippermettilek tikkonfigura wara kemm jirrikonoxxi l-mexxej tal-cluster jeħtieġ li jikkunsidra l-messaġġ miktub b'suċċess. Dan il-parametru jista' jieħu l-valuri li ġejjin:

  • 0 — rikonoxximent mhux se jiġi kkunsidrat.
  • 1 huwa l-parametru default, hija meħtieġa replika waħda biss biex tirrikonoxxi.
  • −1 — hija meħtieġa rikonoxximent mir-repliki sinkronizzati kollha (setup tal-cluster min.insync.repliki).

Mill-valuri elenkati huwa ċar li acks ugwali għal -1 jagħtu l-aktar garanzija b'saħħitha li l-messaġġ mhux se jintilef.

Kif nafu lkoll, is-sistemi distribwiti mhumiex affidabbli. Biex tipproteġi kontra ħsarat temporanji, Kafka Producer jipprovdi l-għażla ipprova mill-ġdid, li jippermettilek li tissettja n-numru ta' tentattivi ta' trasmissjoni mill-ġdid fi ħdan delivery.timeout.ms. Peress li l-parametru ta' tentattivi mill-ġdid għandu valur default ta' Integer.MAX_VALUE (2147483647), in-numru ta' tentattivi mill-ġdid tal-messaġġi jista' jiġi aġġustat billi jinbidel biss delivery.timeout.ms.

Qegħdin nimxu lejn kunsinna eżattament darba

Is-settings elenkati jippermettu lill-Produttur tagħna jwassal messaġġi b'garanzija għolja. Ejja issa nitkellmu dwar kif niżguraw li kopja waħda biss ta' messaġġ tinkiteb fuq suġġett Kafka? Fl-aktar każ sempliċi, biex tagħmel dan, trid tissettja l-parametru fuq Producer enable.idempotenza għal veru. Idempotenza tiggarantixxi li messaġġ wieħed biss jinkiteb fuq partizzjoni speċifika ta 'suġġett wieħed. Il-prekundizzjoni biex tkun permessa l-idempotenza huma l-valuri acks = kollha, ipprova mill-ġdid > 0, max.in.flight.requests.per.connection ≤ 5. Jekk dawn il-parametri mhumiex speċifikati mill-iżviluppatur, il-valuri ta 'hawn fuq jiġu stabbiliti awtomatikament.

Meta l-idempotenza hija kkonfigurata, huwa meħtieġ li jiġi żgurat li l-istess messaġġi jispiċċaw fl-istess diviżorji kull darba. Dan jista' jsir billi tissettja ċ-ċavetta u l-parametru partitioner.class għal Producer. Nibdew biċ-ċavetta. Għandu jkun l-istess għal kull sottomissjoni. Dan jista 'jinkiseb faċilment billi tuża kwalunkwe waħda mill-IDs tan-negozju mill-post oriġinali. Il-parametru partitioner.class għandu valur default - DefaultPartitioner. B'din l-istrateġija ta 'qsim, b'mod awtomatiku naġixxu hekk:

  • Jekk il-partizzjoni hija speċifikata b'mod espliċitu meta tibgħat il-messaġġ, allura nużawha.
  • Jekk il-partizzjoni mhix speċifikata, iżda ċ-ċavetta hija speċifikata, agħżel il-partizzjoni mill-hash taċ-ċavetta.
  • Jekk il-partizzjoni u ċ-ċavetta mhumiex speċifikati, agħżel il-ħitan wieħed wieħed (round-robin).

Ukoll, bl-użu ta 'ċavetta u idempotent jibgħat b'parametru max.in.flight.requests.per.connection = 1 jagħtik proċessar ta' messaġġi razzjonalizzat fuq il-Konsumatur. Ta 'min jiftakar ukoll li jekk il-kontroll tal-aċċess huwa kkonfigurat fuq il-cluster tiegħek, allura jkollok bżonn drittijiet biex tikteb b'mod idempotenti għal suġġett.

Jekk f'daqqa waħda jkollok nuqqas ta 'kapaċitajiet ta' idempotent jibgħat permezz ta 'ċavetta jew il-loġika fuq in-naħa tal-Produttur teħtieġ iż-żamma tal-konsistenza tad-dejta bejn diviżorji differenti, allura t-tranżazzjonijiet jiġu salvati. Barra minn hekk, bl-użu ta 'transazzjoni katina, tista' tissinkronizza b'mod kondizzjonali rekord f'Kafka, pereżempju, ma 'rekord fid-database. Biex ikun jista' jintbagħat transazzjonali lill-Produttur, għandu jkun idempotenti u addizzjonalment issettjat transazzjonali.id. Jekk ir-raggruppament Kafka tiegħek għandu kontroll ta 'aċċess konfigurat, allura rekord transazzjonali, bħal rekord idempotent, ikollu bżonn permessi ta' kitba, li jistgħu jingħataw b'maskra bl-użu tal-valur maħżun fi transactional.id.

Formalment, kwalunkwe string, bħall-isem tal-applikazzjoni, tista 'tintuża bħala identifikatur tat-tranżazzjoni. Imma jekk tniedi diversi każijiet tal-istess applikazzjoni bl-istess transazzjonali.id, allura l-ewwel istanza mnedija titwaqqaf bi żball, peress li Kafka tqisha bħala proċess zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Biex issolvi din il-problema, aħna nżidu suffiss mal-isem tal-applikazzjoni fil-forma tal-hostname, li niksbu minn varjabbli tal-ambjent.

Il-produttur huwa kkonfigurat, iżda t-tranżazzjonijiet fuq Kafka jikkontrollaw biss l-ambitu tal-messaġġ. Irrispettivament mill-istatus tat-tranżazzjoni, il-messaġġ immedjatament imur għas-suġġett, iżda għandu attributi tas-sistema addizzjonali.

Biex tevita messaġġi bħal dawn milli jinqraw mill-Konsumatur minn qabel, jeħtieġ li jissettja l-parametru iżolament.livell to read_committed value. Konsumatur bħal dan ikun jista' jaqra messaġġi mhux transazzjonali bħal qabel, u messaġġi transazzjonali biss wara impenn.
Jekk issettjajt is-settings kollha elenkati qabel, allura kkonfigurajt eżattament darba l-kunsinna. Prosit!

Iżda hemm sfumatura oħra. Transactional.id, li kkonfigurajna hawn fuq, huwa fil-fatt il-prefiss tat-tranżazzjoni. Fuq il-maniġer tat-tranżazzjonijiet, numru ta 'sekwenza huwa miżjud miegħu. L-identifikatur riċevut jinħareġ lil transazzjonali.id.expiration.ms, li huwa kkonfigurat fuq cluster Kafka u għandu valur default ta '"7 ijiem". Jekk matul dan iż-żmien l-applikazzjoni ma rċeviet l-ebda messaġġ, allura meta tipprova l-ibgħat transazzjonali li jmiss tirċievi InvalidPidMappingException. Il-koordinatur tat-tranżazzjoni mbagħad joħroġ numru ta' sekwenza ġdid għat-tranżazzjoni li jmiss. Madankollu, il-messaġġ jista' jintilef jekk l-InvalidPidMappingException ma tiġix immaniġġjata b'mod korrett.

Minflok totali

Kif tistgħu taraw, mhux biżżejjed li sempliċiment tibgħat messaġġi lil Kafka. Trid tagħżel taħlita ta 'parametri u tkun ippreparat biex tagħmel bidliet malajr. F'dan l-artikolu, ippruvajt nuri fid-dettall is-setup tal-kunsinna eżattament darba u ddeskriviet diversi problemi bil-konfigurazzjonijiet client.id u transactional.id li ltqajna magħhom. Hawn taħt hemm sommarju tas-settings tal-Produttur u tal-Konsumatur.

Produttur:

  1. acks = kollha
  2. jerġa' jipprova > 0
  3. enable.idempotenza = veru
  4. max.in.flight.requests.per.connection ≤ 5 (1 għall-bgħit ordnat)
  5. transactional.id = ${application-name}-${hostname}

Konsumatur:

  1. isolation.level = read_committed

Biex innaqqsu l-iżbalji fl-applikazzjonijiet futuri, għamilna l-ippakkjar tagħna stess fuq il-konfigurazzjoni tar-rebbiegħa, fejn il-valuri għal xi wħud mill-parametri elenkati huma diġà stabbiliti.

Hawn huma ftit materjali għall-awtostudju:

Sors: www.habr.com

Żid kumment