Hoe't Kafka werklikheid waard

Hoe't Kafka werklikheid waard

Hoi Habr!

Ik wurkje oan it Tinkoff-team, dat in eigen notifikaasjesintrum ûntwikkelet. Ik ûntwikkelje meast yn Java mei Spring boot en oplosse ferskate technyske problemen dy't ûntsteane yn in projekt.

De measte fan ús mikrotsjinsten kommunisearje asynchronysk mei elkoar fia in berjochtmakelaar. Earder brûkten wy IBM MQ as broker, dy't de lading net mear koe, mar tagelyk hege leveringsgarânsjes hie.

As ferfanging waarden wy Apache Kafka oanbean, dy't hege skaalpotinsjeel hat, mar, spitigernôch, in hast yndividuele oanpak fan konfiguraasje fereasket foar ferskate senario's. Derneist liet it op syn minst ien kear leveringsmeganisme dat standert yn Kafka wurket net it fereaske nivo fan konsistinsje út 'e doaze behâlde. Folgjende sil ik ús ûnderfining diele yn Kafka-konfiguraasje, yn 't bysûnder sil ik jo fertelle hoe't jo kinne konfigurearje en libje mei krekt ien kear levering.

Garandearre levering en mear

De hjirûnder besprutsen ynstellingen sille helpe om in oantal problemen te foarkommen mei de standertferbiningynstellingen. Mar earst wol ik omtinken jaan oan ien parameter dy't in mooglike debug sil fasilitearje.

Dit sil helpe client.id foar produsint en konsumint. Op it earste each kinne jo de applikaasjenamme brûke as de wearde, en yn 'e measte gefallen sil dit wurkje. Hoewol de situaasje as in applikaasje ferskate konsuminten brûkt en jo har deselde client.id jouwe, resultearret yn 'e folgjende warskôging:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

As jo ​​​​JMX wolle brûke yn in applikaasje mei Kafka, dan kin dit in probleem wêze. Foar dit gefal is it it bêste om in kombinaasje fan de applikaasjenamme te brûken en bygelyks de ûnderwerpnamme as de client.id-wearde. It resultaat fan ús konfiguraasje kin sjoen wurde yn 'e kommando-útfier kafka-konsumint-groepen fan nutsbedriuwen fan Confluent:

Hoe't Kafka werklikheid waard

Litte wy no sjen nei it senario foar garandearre berjochtlevering. Kafka Producer hat in parameter acks, wêrmei jo te konfigurearjen nei hoefolle erkent de klusterlieder moat beskôgje it berjocht mei súkses skreaun. Dizze parameter kin de folgjende wearden nimme:

  • 0 - erkenne sil net wurde beskôge.
  • 1 is de standertparameter, mar 1 replika is nedich om te erkennen.
  • -1 - befêstiging fan alle syngronisearre replika's is fereaske (cluster opset min.insync.replicas).

Ut de neamde wearden is it dúdlik dat acks lyk oan -1 de sterkste garânsje jouwe dat it berjocht net ferlern sil.

Lykas wy allegearre witte, binne ferspraat systemen ûnbetrouber. Om te beskermjen tsjin transiente fouten, biedt Kafka Producer de opsje opnij besykjen, wêrmei jo it oantal opnij ferstjoerpogingen binnen ynstelle kinne delivery.timeout.ms. Sûnt de retries parameter hat in standert wearde fan Integer.MAX_VALUE (2147483647), kin it oantal berjocht opnij oanpast troch in feroaring allinnich delivery.timeout.ms.

Wy geane nei presys ien kear levering

De neamde ynstellingen kinne ús Produsint berjochten leverje mei in hege garânsje. Litte wy no prate oer hoe't jo soargje kinne dat mar ien kopy fan in berjocht wurdt skreaun nei in Kafka-ûnderwerp? Yn it ienfâldichste gefal, om dit te dwaan, moatte jo de parameter ynstelle op Producer enable.idempotence oan wier. Idempotinsje garandearret dat mar ien berjocht wurdt skreaun nei in spesifike dieling fan ien ûnderwerp. De betingst foar it ynskeakeljen fan idempotinsje binne de wearden acks = alles, besykje op 'e nij > 0, max.in.flight.requests.per.connection ≤ 5. As dizze parameters net wurde oantsjutte troch de ûntwikkelder, wurde de boppesteande wearden automatysk ynsteld.

Wannear't idempotens is ynsteld, is it nedich om te soargjen dat deselde berjochten elke kear yn deselde partysjes einigje. Dit kin dien wurde troch de partitioner.class kaai en parameter te setten op Producer. Litte wy begjinne mei de kaai. It moat itselde wêze foar elke yntsjinjen. Dit kin maklik wurde berikt troch ien fan 'e saaklike ID's te brûken fan' e orizjinele post. De parameter partitioner.class hat in standertwearde - StandertPartitioner. Mei dizze partitioneringsstrategy hannelje wy standert sa:

  • As de partysje eksplisyt oanjûn is by it ferstjoeren fan it berjocht, dan brûke wy it.
  • As de partysje net oantsjutte is, mar de kaai is oantsjutte, selektearje de partysje troch de hash fan 'e kaai.
  • As de partysje en kaai net oantsjutte binne, selektearje de partysjes ien foar ien (round-robin).

Ek mei help fan in kaai en idempotent ferstjoeren mei in parameter max.in.flight.requests.per.connection = 1 jout jo streamline berjocht ferwurking op de Consumer. It is ek de muoite wurdich om te ûnthâlden dat as tagongskontrôle is ynsteld op jo kluster, dan sille jo rjochten nedich hawwe om idempotent te skriuwen nei in ûnderwerp.

As jo ​​ynienen misse de mooglikheden fan idempotent ferstjoeren troch kaai of de logika oan de Producer kant fereasket behâld fan gegevens gearhing tusken ferskillende partysjes, dan transaksjes sille komme ta de rêding. Dêrneist, mei help fan in keten transaksje, kinne jo betingst syngronisearje in rekord yn Kafka, bygelyks, mei in rekord yn de databank. Om transaksjoneel ferstjoeren nei de produsint mooglik te meitsjen, moat it idempotint wêze en ekstra ynsteld transactional.id. As jo ​​Kafka-kluster tagongskontrôle ynsteld hat, dan sil in transaksjerecord, lykas in idempotinte record, skriuwrjochten nedich wêze, dy't kinne wurde ferliend troch masker mei de wearde opslein yn transactional.id.

Formeel kin elke tekenrige, lykas de applikaasjenamme, brûkt wurde as transaksje-identifikaasje. Mar as jo ferskate eksimplaren fan deselde applikaasje starte mei deselde transactional.id, dan sil de earste lansearre eksimplaar wurde stoppe mei in flater, om't Kafka it as in zombieproses sil beskôgje.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Om dit probleem op te lossen, foegje wy in efterheaksel ta oan de applikaasjenamme yn 'e foarm fan de hostnamme, dy't wy krije fan omjouwingsfariabelen.

De produsint is konfigureare, mar transaksjes op Kafka kontrolearje allinich de omfang fan it berjocht. Nettsjinsteande de transaksjestatus giet it berjocht fuortendaliks nei it ûnderwerp, mar hat ekstra systeemattributen.

Om foar te kommen dat sokke berjochten troch de konsumint foarôfgeand wurde lêzen, moat it de parameter ynstelle isolation.level to read_committed wearde. Sa'n konsumint sil net-transaksje-berjochten kinne lêze lykas earder, en transaksje-berjochten pas nei in commit.
As jo ​​alle earder neamde ynstellingen ynsteld hawwe, dan hawwe jo presys ien kear levering konfigureare. Lokwinske!

Mar der is noch ien nuânse. Transactional.id, dy't wy hjirboppe konfigureare, is eins it transaksjefoarheaksel. Op de transaksjebehearder wurdt der in folchoardernûmer oan tafoege. De ûntfongen identifier wurdt útjûn oan transactional.id.expiration.ms, dat is konfigurearre op in Kafka-kluster en hat in standertwearde fan "7 dagen". As yn dizze tiid de applikaasje gjin berjochten hat ûntfongen, dan sille jo as jo besykje de folgjende transaksje ferstjoere InvalidPidMappingException. De transaksjekoördinator sil dan in nij folchoardernûmer útjaan foar de folgjende transaksje. It berjocht kin lykwols ferlern gean as de InvalidPidMappingException net goed behannele wurdt.

Yn stee fan totalen

Sa't jo sjen kinne, is it net genôch om gewoan berjochten nei Kafka te stjoeren. Jo moatte in kombinaasje fan parameters kieze en ree wêze om rappe feroaringen te meitsjen. Yn dit artikel besocht ik yn detail de presys ien kear levering opset te sjen en beskreau ferskate problemen mei de client.id en transactional.id konfiguraasjes dy't wy tsjinkamen. Hjirûnder is in gearfetting fan de ynstellings foar produsint en konsumint.

Produsint:

  1. acks = alles
  2. opnij > 0
  3. enable.idempotence = wier
  4. max.in.flight.requests.per.connection ≤ 5 (1 foar oarderlik ferstjoeren)
  5. transactional.id = ${application-name}-${hostnamme}

Konsumint:

  1. isolation.level = read_committed

Om flaters yn takomstige applikaasjes te minimalisearjen, hawwe wy ús eigen wrapper makke oer de maitiidkonfiguraasje, wêr't wearden foar guon fan 'e neamde parameters al binne ynsteld.

Hjir binne in pear materialen foar selsstúdzje:

Boarne: www.habr.com

Add a comment