Wéi de Kafka Realitéit gouf

Wéi de Kafka Realitéit gouf

Hey Habr!

Ech schaffen am Tinkoff Team, deen hiren eegene Notifikatiounszentrum entwéckelt. Ech entwéckelen meeschtens am Java mam Spring Boot a léisen verschidden technesch Problemer déi an engem Projet entstinn.

Déi meescht vun eise Mikroservicer kommunizéieren asynchron mateneen duerch e Message Broker. Virdrun hu mir IBM MQ als Broker benotzt, deen net méi mat der Laascht eens konnt, awer gläichzäiteg héich Liwwerungsgarantien haten.

Als Ersatz goufe mir Apache Kafka ugebueden, deen héich Skaléierungspotenzial huet, awer leider erfuerdert eng bal individuell Approche fir d'Konfiguratioun fir verschidden Szenarien. Zousätzlech huet den op d'mannst eemol Liwwermechanismus, deen am Kafka als Standard funktionnéiert, net erlaabt den erfuerderleche Konsistenzniveau aus der Këscht z'erhalen. Als nächst wäert ech eis Erfahrung an der Kafka Konfiguratioun deelen, besonnesch, ech wäert Iech soen wéi Dir konfiguréiert a liewen mat exakt eemol Liwwerung.

Garantéiert Liwwerung a méi

D'Astellungen, déi hei drënner diskutéiert ginn, hëllefen eng Rei Probleemer mat de Standardverbindungsastellungen ze vermeiden. Awer als éischt wëll ech op ee Parameter oppassen, deen e méiglechen Debug erliichtert.

Dëst wäert hëllefen client.id fir Produzent a Konsument. Op den éischte Bléck kënnt Dir den Numm vun der Applikatioun als Wäert benotzen, an de meeschte Fäll funktionnéiert dat. Och wann d'Situatioun wann eng Applikatioun verschidde Konsumenten benotzt an Dir hinnen dee selwechte client.id gitt, resultéiert an der folgender Warnung:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Wann Dir JMX an enger Applikatioun mat Kafka benotze wëllt, da kéint dëst e Problem sinn. Fir dëse Fall ass et am beschten eng Kombinatioun vum Applikatiounsnumm an zum Beispill dem Thema Numm als Client.id Wäert ze benotzen. D'Resultat vun eiser Konfiguratioun kann am Kommandoausgang gesi ginn Kafka-Konsumenten-Gruppen vun Utilities vu Confluent:

Wéi de Kafka Realitéit gouf

Loosst eis elo den Szenario fir garantéiert Message Liwwerung kucken. Kafka Produzent huet e Parameter acks, wat Iech erlaabt ze konfiguréieren no wéi vill unerkannt de Cluster Leader muss de Message erfollegräich geschriwwe betruechten. Dëse Parameter kann déi folgend Wäerter huelen:

  • 0 - unerkennen gëtt net berücksichtegt.
  • 1 ass de Standardparameter, nëmmen 1 Replika ass erfuerderlech fir z'erkennen.
  • -1 - Unerkennung vun all synchroniséierten Repliken ass erfuerderlech (Cluster Setup min.insync.repliken).

Vun de opgezielte Wäerter ass et kloer datt Acks gläich wéi -1 déi stäerkst Garantie gëtt datt de Message net verluer geet.

Wéi mir all wëssen, verdeelt Systemer sinn onzouverlässeg. Fir géint transient Feeler ze schützen, bitt de Kafka Produzent d'Optioun erëm probéiert, wat Iech erlaabt d'Zuel vun de Versuche fir nei Sendungen anzestellen Liwwerung.Timeout.ms. Zanter der Retry-Parameter huet e Standardwäert vun Integer.MAX_VALUE (2147483647), kann d'Zuel vun de Message-Reversiounen ugepasst ginn andeems Dir nëmmen delivery.timeout.ms ännert.

Mir plënneren Richtung genee eemol Liwwerung

Déi opgelëscht Astellungen erlaben eise Produzent Messagen mat enger héijer Garantie ze liwweren. Loosst eis elo schwätzen iwwer wéi sécherzestellen datt nëmmen eng Kopie vun engem Message un e Kafka Thema geschriwwe gëtt? Am einfachste Fall, fir dëst ze maachen, musst Dir de Parameter op Produzent setzen enable.idempotence zu wouer. Idempotenz garantéiert datt nëmmen ee Message op eng spezifesch Partition vun engem Thema geschriwwe gëtt. D'Viraussetzung fir d'Idempotenz z'erméiglechen sinn d'Wäerter acks = all, probéieren > 0, max.in.flight.requests.per.connection ≤ 5. Wann dës Parameteren net vum Entwéckler spezifizéiert ginn, ginn déi uewe genannte Wäerter automatesch agestallt.

Wann d'Idempotenz konfiguréiert ass, ass et néideg ze garantéieren datt déiselwecht Messagen all Kéier op déiselwecht Partitionen ophalen. Dëst kann gemaach ginn andeems de Partitioner.class Schlëssel a Parameter op Produzent setzt. Loosst eis mam Schlëssel ufänken. Et muss d'selwecht sinn fir all Soumissioun. Dëst kann einfach erreecht ginn andeems Dir eng vun de Geschäfts-IDen aus dem Originalpost benotzt. De Parameter partitioner.class huet e Standardwäert - DefaultPartitioner. Mat dëser Partitionéierungsstrategie handele mir par défaut esou:

  • Wann d'Partition explizit spezifizéiert gëtt wann Dir de Message schéckt, da benotze mir se.
  • Wann d'Partition net spezifizéiert ass, awer de Schlëssel ass spezifizéiert, wielt d'Partition mam Hash vum Schlëssel.
  • Wann d'Partition an de Schlëssel net spezifizéiert sinn, wielt d'Partitionen een nom aneren (Ronn-Robin).

Och benotzt e Schlëssel an idempotent Schécken mat engem Parameter max.in.flight.requests.per.connection = 1 gëtt Iech streamlined Message Veraarbechtung op de Konsument. Et ass och derwäert ze erënneren datt wann Zougangskontroll op Ärem Cluster konfiguréiert ass, da braucht Dir Rechter fir idempotent zu engem Thema ze schreiwen.

Wann Dir op eemol d'Fähigkeiten vun der idempotenter Schécken duerch Schlëssel feelt oder d'Logik op der Produzent Säit erfuerdert d'Datekonsistenz tëscht verschiddene Partitionen z'erhalen, da kommen Transaktiounen zur Rettung. Zousätzlech, mat enger Kettentransaktioun, kënnt Dir e Rekord an Kafka bedingt synchroniséieren, zum Beispill mat engem Rekord an der Datebank. Fir Transaktiounsschécken un de Produzent z'erméiglechen, muss et idempotent sinn an zousätzlech gesat ginn transactional.id. Wann Äre Kafka-Cluster Zougangskontroll konfiguréiert huet, da brauch en Transaktiounsrekord, wéi en idempotent Rekord, Schreifrechter, déi duerch Maske mat Hëllef vum Wäert gelagert kënne ginn an transactional.id.

Formell kann all String, wéi den Numm vun der Applikatioun, als Transaktiounsidentifizéierer benotzt ginn. Awer wann Dir e puer Instanzen vun der selwechter Applikatioun mat derselwechter transactional.id lancéiert, da gëtt déi éischt lancéiert Instanz mat engem Fehler gestoppt, well de Kafka et als Zombie-Prozess betruecht.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Fir dëse Problem ze léisen, addéiere mir e Suffix zum Applikatiounsnumm a Form vum Hostnumm, dee mir aus Ëmfeldvariablen kréien.

De Produzent ass konfiguréiert, awer Transaktiounen op Kafka kontrolléieren nëmmen den Ëmfang vun der Noriicht. Onofhängeg vun der Transaktioun Status, de Message geet direkt op d'Thema, mee huet zousätzlech System Attributer.

Fir ze verhënneren datt esou Messagen vum Konsument virdru gelies ginn, muss de Parameter setzen Isolatioun.Niveau ze liesen_engagéierten Wäert. Sou e Konsument wäert fäeg sinn net-transaktionell Messagen ze liesen wéi virdrun, an Transaktiounsmessagen nëmmen no engem Engagement.
Wann Dir all déi virdrun opgezielt Astellungen agestallt hutt, dann hutt Dir exakt eemol d'Liwwerung konfiguréiert. Gratulatioun!

Awer et gëtt eng méi Nuance. Transactional.id, déi mir uewe konfiguréiert hunn, ass tatsächlech den Transaktiounspräfix. Am Transaktiounsmanager gëtt eng Sequenznummer derbäigesat. De kritt Identifizéierer gëtt ausgestallt transactional.id.expiration.ms, deen op engem Kafka-Cluster konfiguréiert ass an e Standardwäert vun "7 Deeg" huet. Wann während dëser Zäit d'Applikatioun keng Messagen kritt huet, da wann Dir déi nächst Transaktiounsschéck probéiert, kritt Dir InvalidPidMappingException. Den Transaktiounskoordinator wäert dann eng nei Sequenznummer fir déi nächst Transaktioun erausginn. Allerdéngs kann de Message verluer goen wann d'InvalidPidMappingException net korrekt gehandhabt gëtt.

Amplaz Gesamtzuelen

Wéi Dir kënnt gesinn, ass et net genuch fir einfach Messagen un de Kafka ze schécken. Dir musst eng Kombinatioun vu Parameteren wielen a bereet sinn fir séier Ännerungen ze maachen. An dësem Artikel hunn ech probéiert am Detail de genau eemol Liwweropbau ze weisen an e puer Probleemer mat der Client.id an Transactional.id Konfiguratiounen beschriwwen, déi mir begéint hunn. Drënner ass e Resumé vun de Produzent a Konsument Astellunge.

Produzent:

  1. acks = all
  2. widderholl > 0
  3. enable.idempotence = wouer
  4. max.in.flight.requests.per.connection ≤ 5 (1 fir uerdentlech Sendung)
  5. transactional.id = ${application-name}-${hostname}

Konsument:

  1. isolation.level = liesen_engagéiert

Fir Feeler an zukünfteg Uwendungen ze minimiséieren, hu mir eisen eegene Wrapper iwwer d'Fréijoerkonfiguratioun gemaach, wou d'Wäerter fir e puer vun de opgelëschte Parameter scho festgeluecht sinn.

Hei sinn e puer Material fir Selbststudie:

Source: will.com

Setzt e Commentaire