Hoe Kafka 'n werklikheid geword het

Hoe Kafka 'n werklikheid geword het

Haai Habr!

Ek werk in die Tinkoff-span, wat besig is om sy eie kennisgewingsentrum te ontwikkel. Vir die grootste deel ontwikkel ek in Java met behulp van Spring boot en los verskeie tegniese probleme op wat in die projek opduik.

Die meeste van ons mikrodienste kommunikeer asynchronies met mekaar deur 'n boodskapmakelaar. Voorheen het ons IBM MQ as 'n makelaar gebruik, wat nie meer die vrag kon hanteer nie, maar terselfdertyd hoë afleweringswaarborge gehad het.

As 'n plaasvervanger is ons Apache Kafka aangebied, wat 'n hoë skaalpotensiaal het, maar ongelukkig 'n byna individuele benadering tot konfigurasie vir verskillende scenario's vereis. Daarbenewens het die ten minste een keer afleweringsmeganisme wat by verstek in Kafka werk, nie toegelaat om die vereiste vlak van konsekwentheid uit die boks te handhaaf nie. Vervolgens sal ek ons ​​ervaring met die konfigurasie van Kafka deel, veral hoe om op te stel en te leef met presies een keer aflewering.

Gewaarborgde aflewering en meer

Die opsies wat hieronder bespreek word, sal help om 'n aantal probleme met die verstekverbindinginstellings te voorkom. Maar eers wil ek aandag gee aan een parameter wat 'n moontlike ontfouting sal vergemaklik.

Dit sal help kliënt.id vir produsent en verbruiker. Met die eerste oogopslag kan jy die naam van die toepassing as die waarde gebruik, en in die meeste gevalle sal dit werk. Alhoewel die situasie wanneer daar verskeie verbruikers in die aansoek is en jy hulle dieselfde client.id gee, lei tot die volgende waarskuwing:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

As jy JMX in 'n toepassing met Kafka wil gebruik, kan dit 'n probleem wees. Vir hierdie geval is dit die beste om 'n kombinasie van die toepassingnaam en, byvoorbeeld, die onderwerpnaam as die client.id-waarde te gebruik. Die resultaat van ons konfigurasie kan gesien word in die uitvoer van die opdrag kafka-verbruikersgroepe van hulpprogramme van Confluent:

Hoe Kafka 'n werklikheid geword het

Kom ons kyk nou na die scenario van gewaarborgde boodskaplewering. Kafka Producer het 'n parameter akks, wat jou toelaat om op te stel na hoeveel erkennings die groepleier die boodskap as suksesvol geskryf moet beskou. Hierdie parameter kan die volgende waardes neem:

  • 0 - erken sal nie oorweeg word nie.
  • 1 — verstek parameter, slegs 1 replika hoef erken te word.
  • −1 — erkenning word vereis van alle gesinchroniseerde replikas (klusterinstelling min.insync.replikas).

Uit die gelyste waardes kan gesien word dat acks gelyk aan -1 die sterkste waarborg gee dat die boodskap nie verlore gaan nie.

Soos ons almal weet, is verspreide stelsels onbetroubaar. Om te waak teen verbygaande mislukkings, verskaf Kafka Producer die parameter herprobeer, wat jou toelaat om die aantal herindieningspogings binne te stel aflewering.time-out.ms. Aangesien die herproberingsparameter 'n verstekwaarde van Integer.MAX_VALUE (2147483647) het, kan die aantal herindienings van die boodskap aangepas word deur slegs delivery.timeout.ms te verander.

Beweeg na presies een keer aflewering

Hierdie instellings stel ons vervaardiger in staat om boodskappe met 'n hoë waarborg te lewer. Kom ons praat nou oor hoe om te verseker dat slegs een kopie van 'n boodskap na 'n Kafka-onderwerp geskryf word? In die eenvoudigste geval, hiervoor moet u die Producer-parameter instel enable.idempotence tot waar. Idempotensie waarborg dat slegs een boodskap na 'n spesifieke partisie van een onderwerp geskryf word. Die voorwaarde om idempotensie moontlik te maak, is die waardes acks = almal, herprobeer > 0, max.in.flight.requests.per.connection ≤ 5. As hierdie parameters nie deur die ontwikkelaar gestel word nie, sal die bogenoemde waardes outomaties ingestel word.

Wanneer idempotensie opgestel word, is dit nodig om te verseker dat dieselfde boodskappe elke keer in dieselfde partisies beland. Dit kan gedoen word deur die partitioner.class sleutel en parameter op Producer te stel. Kom ons begin met die sleutel. Dit moet dieselfde wees vir elke voorlegging. Dit is maklik om te bereik deur 'n besigheids-ID van die oorspronklike pos te gebruik. Die partitioner.class parameter het 'n verstekwaarde − DefaultPartitioner. Met hierdie verstek partisiestrategie tree ons soos volg op:

  • As die partisie uitdruklik gespesifiseer word wanneer die boodskap gestuur word, gebruik ons ​​dit.
  • As die partisie nie gespesifiseer is nie, maar die sleutel is gespesifiseer, kies die partisie met hash vanaf die sleutel.
  • As die partisie en sleutel nie gespesifiseer is nie, kies die partisies om die beurt (round-robin).

Ook, met behulp van 'n sleutel en idempotente stuur met 'n parameter maksimum.in.vlug.versoeke.per.verbinding = 1 gee jou bestelde boodskaphantering op Verbruiker. Afsonderlik is dit die moeite werd om te onthou dat as toegangsbeheer op jou cluster gekonfigureer is, jy regte nodig het om idempotente skryf aan die onderwerp.

As jy skielik nie die vermoë het om idempotent per sleutel te stuur nie, of die logika aan die Produsentekant vereis dat datakonsekwentheid tussen verskillende partisies gehandhaaf word, dan sal transaksies tot die redding kom. Daarbenewens, deur 'n kettingtransaksie te gebruik, kan jy 'n rekord in Kafka voorwaardelik sinchroniseer, byvoorbeeld, met 'n rekord in 'n databasis. Om transaksionele versending na Produsent moontlik te maak, moet dit idempotent wees, en addisioneel gestel transaksionele.id. As jou Kafka-kluster toegangsbeheer gekonfigureer het, sal 'n transaksierekord, soos 'n idempotente een, skryftoestemmings benodig, wat deur 'n masker verleen kan word met die waarde wat in transactional.id gestoor is.

Formeel kan enige string, soos die toepassingnaam, as 'n transaksie-identifiseerder gebruik word. Maar as jy veelvuldige gevalle van dieselfde toepassing met dieselfde transactional.id begin, dan sal die eerste instansie wat geloods word met 'n fout gestop word, want Kafka sal dit as 'n zombieproses beskou.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Om hierdie probleem op te los, voeg ons 'n gasheernaam-agtervoegsel by die toepassingnaam, wat ons van omgewingsveranderlikes kry.

Die vervaardiger is opgestel, maar Kafka-transaksies beheer slegs die omvang van die boodskap. Ongeag die status van die transaksie, kom die boodskap onmiddellik in die onderwerp, maar het addisionele stelselkenmerke.

Om te verhoed dat sulke boodskappe voor die tyd deur die Verbruiker gelees word, moet dit die parameter stel isolasie vlak na die lees_toegewyde waarde. So 'n verbruiker sal nie-transaksieboodskappe soos voorheen kan lees, en transaksionele boodskappe eers na die commit.
As jy al die bogenoemde instellings gestel het, het jy presies een keer aflewering gekonfigureer. Baie geluk!

Maar daar is nog een nuanse. Die transactional.id wat ons hierbo opgestel het, is eintlik die transaksievoorvoegsel. Op die transaksiebestuurder word 'n reeksnommer daaraan aangeheg. Die resulterende identifiseerder word uitgereik aan transaksionele.id.verval.ms, wat op die Kafka-groepering opgestel is en 'n verstekwaarde van 7 dae het. As die aansoek gedurende hierdie tyd geen boodskappe ontvang het nie, sal jy ontvang wanneer jy die volgende transaksionele stuur probeer OngeldigePidMappingException. Daarna sal die transaksiekoördineerder 'n nuwe volgnommer vir die volgende transaksie uitreik. Die boodskap kan egter verlore gaan as die InvalidPidMappingException nie behoorlik hanteer word nie.

In plaas van totale

Soos jy kan sien, is dit nie genoeg om net boodskappe aan Kafka te stuur nie. Jy moet 'n kombinasie van parameters kies en gereed wees om vinnige veranderinge aan te bring. In hierdie artikel het ek probeer om die presiese eenmalige aflewering-instelling in detail te wys en het verskeie probleme beskryf met die client.id- en transactional.id-konfigurasies wat ons teëgekom het. Die Vervaardiger- en Verbruiker-instellings word hieronder opgesom.

produsent:

  1. acks = almal
  2. herprobeer > 0
  3. enable.idempotence = waar
  4. max.in.flight.requests.per.connection ≤ 5 (1 vir bestelde stuur)
  5. transactional.id = ${application-name}-${gasheernaam}

Verbruiker:

  1. isolation.level = read_committed

Om foute in toekomstige toepassings te verminder, het ons ons eie omhulsel oor die veerkonfigurasie gemaak, waar die waardes van sommige van die gelyste parameters reeds ingestel is.

En hier is 'n paar materiaal vir selfstudie:

Bron: will.com

Voeg 'n opmerking