Kaip Kafka tapo realybe

Kaip Kafka tapo realybe

Sveiki, Habr!

Dirbu Tinkoff komandoje, kuri kuria savo pranešimų centrą. Dažniausiai kuriu Java naudodamas Spring boot ir sprendžiu įvairias technines problemas, kurios kyla projekte.

Dauguma mūsų mikropaslaugų tarpusavyje bendrauja asinchroniškai per pranešimų tarpininką. Anksčiau kaip brokeris naudojome IBM MQ, kuris nebeatlaikė apkrovos, bet tuo pačiu turėjo aukštas pristatymo garantijas.

Kaip pakaitalą mums buvo pasiūlyta „Apache Kafka“, kuri turi didelį mastelio keitimo potencialą, bet, deja, reikalauja beveik individualaus požiūrio į konfigūraciją įvairiems scenarijams. Be to, pagal numatytuosius nustatymus Kafkoje veikiantis bent kartą pristatymo mechanizmas neleido išlaikyti reikiamo nuoseklumo lygio. Toliau pasidalinsiu savo patirtimi, susijusia su Kafka konfigūravimu, visų pirma, papasakosiu, kaip sukonfigūruoti ir gyventi su juo tiksliai vieną kartą.

Garantuotas pristatymas ir kt

Toliau aptariami nustatymai padės išvengti daugelio problemų dėl numatytųjų ryšio nustatymų. Bet pirmiausia norėčiau atkreipti dėmesį į vieną parametrą, kuris palengvins galimą derinimą.

Tai padės klientas.id gamintojui ir vartotojui. Iš pirmo žvilgsnio kaip reikšmę galite naudoti programos pavadinimą ir daugeliu atvejų tai veiks. Nors situacija, kai programa naudoja kelis vartotojus ir jūs suteikiate jiems tą patį client.id, rodomas toks įspėjimas:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Jei norite naudoti JMX programoje su Kafka, tai gali būti problema. Šiuo atveju geriausia naudoti programos pavadinimo ir, pavyzdžiui, temos pavadinimo derinį kaip client.id reikšmę. Mūsų konfigūracijos rezultatą galima pamatyti komandos išvestyje kafka-vartotojų grupės iš „Confluent“ komunalinių paslaugų:

Kaip Kafka tapo realybe

Dabar pažvelkime į garantuoto pranešimų pristatymo scenarijų. „Kafka Producer“ turi parametrą acks, kuri leidžia konfigūruoti, kiek patvirtinimų klasterio lyderis turi laikyti sėkmingai parašytu. Šis parametras gali turėti šias reikšmes:

  • 0 – patvirtinimas nebus svarstomas.
  • 1 yra numatytasis parametras, patvirtinimui reikalinga tik 1 kopija.
  • −1 – reikia patvirtinti visas sinchronizuotas kopijas (grupės sąranka min.insync.replicas).

Iš išvardytų verčių aišku, kad acks lygus −1 suteikia stipriausią garantiją, kad pranešimas nebus prarastas.

Kaip visi žinome, paskirstytos sistemos yra nepatikimos. Kad apsisaugotų nuo trumpalaikių gedimų, „Kafka Producer“ suteikia galimybę bando dar kartą, kuri leidžia nustatyti pakartotinio siuntimo bandymų skaičių pristatymas.laikas.ms. Kadangi parametro kartojimosi reikšmė yra Integer.MAX_VALUE (2147483647), pranešimų pakartotinių bandymų skaičius gali būti koreguojamas keičiant tik delivery.timeout.ms.

Mes judame link pristatymo tiksliai vieną kartą

Išvardinti nustatymai leidžia mūsų gamintojui pristatyti pranešimus su didele garantija. Dabar pakalbėkime apie tai, kaip užtikrinti, kad Kafkos tema būtų parašyta tik viena pranešimo kopija? Paprasčiausiu atveju, norėdami tai padaryti, turite nustatyti „Producer“ parametrą įgalinti.idempotencija iki tiesa. Idempotiškumas garantuoja, kad į konkrečią vienos temos skaidinį bus parašytas tik vienas pranešimas. Idempotencijos įgalinimo prielaida yra vertybės acks = visi, bandyti iš naujo > 0, maksimalus skrydžio.requests.per.connection skaičius ≤ 5. Jei kūrėjas šių parametrų nenurodė, aukščiau nurodytos reikšmės bus nustatytos automatiškai.

Sukonfigūravus idempotenciją, būtina užtikrinti, kad tie patys pranešimai kaskart patektų į tuos pačius skaidinius. Tai galima padaryti nustatant partitioner.class raktą ir parametrą į Producer. Pradėkime nuo rakto. Jis turi būti vienodas kiekvienam pateikimui. Tai galima lengvai pasiekti naudojant bet kurį verslo ID iš pradinio įrašo. Parametras partitioner.class turi numatytąją reikšmę − DefaultPartitioner. Naudodami šią skaidymo strategiją, pagal numatytuosius nustatymus elgiamės taip:

  • Jei skaidinys yra aiškiai nurodytas siunčiant pranešimą, mes jį naudojame.
  • Jei skaidinys nenurodytas, bet nurodytas raktas, pasirinkite skaidinį pagal rakto maišą.
  • Jei skaidinys ir raktas nenurodyti, pasirinkite skaidinius po vieną (apvalus patikrinimas).

Be to, naudojant raktą ir idempotentinį siuntimą su parametru max.in.flight.requests.per.connection = 1 suteikia jums supaprastintą pranešimų apdorojimą vartotojui. Taip pat verta atsiminti, kad jei jūsų klasteryje sukonfigūruotas prieigos valdymas, jums reikės teisių, kad galėtumėte bejėgiškai rašyti į temą.

Jei staiga jums pritrūksta idempotento siuntimo raktu galimybių arba gamintojo pusėje esanti logika reikalauja išlaikyti duomenų nuoseklumą tarp skirtingų skaidinių, tada operacijos ateis į pagalbą. Be to, naudodami grandininę operaciją, galite sąlygiškai sinchronizuoti Kafkos įrašą, pavyzdžiui, su įrašu duomenų bazėje. Norint įgalinti operacijų siuntimą gamintojui, jis turi būti idempotentas ir papildomai nustatytas sandorio.id. Jei jūsų Kafka klasteryje yra sukonfigūruota prieigos kontrolė, tada operacijos įrašui, kaip ir idempotentam įrašui, reikės rašymo teisių, kurias galima suteikti naudojant kaukę, naudojant transakcijos.id saugomą reikšmę.

Formaliai bet kokia eilutė, pvz., programos pavadinimas, gali būti naudojama kaip operacijos identifikatorius. Bet jei paleidžiate kelis tos pačios programos egzempliorius su tuo pačiu transakciniu.id, pirmasis paleistas egzempliorius bus sustabdytas su klaida, nes Kafka tai laikys zombių procesu.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Norėdami išspręsti šią problemą, prie programos pavadinimo pridedame priesagą pagrindinio kompiuterio pavadinimo forma, kurią gauname iš aplinkos kintamųjų.

Gamintojas sukonfigūruotas, tačiau Kafkos operacijos valdo tik pranešimo apimtį. Nepriklausomai nuo operacijos būsenos, pranešimas iškart pereina į temą, tačiau turi papildomų sistemos atributų.

Kad vartotojas neperskaitytų tokių pranešimų iš anksto, jis turi nustatyti parametrą izoliacija.lygis į read_committed vertę. Toks Vartotojas galės skaityti su sandoriu nesusijusius pranešimus kaip ir anksčiau, o su sandoriu susijusius pranešimus tik po įsipareigojimo.
Jei nustatėte visus anksčiau išvardintus nustatymus, sukonfigūravote tiksliai vieną kartą pristatymo metu. Sveikiname!

Tačiau yra dar vienas niuansas. Transaction.id, kurį sukonfigūravome aukščiau, iš tikrųjų yra operacijos priešdėlis. Operacijų tvarkyklėje prie jo pridedamas eilės numeris. Gautas identifikatorius išduodamas transakcinis.id.galiojimo laikas.ms, kuris sukonfigūruotas Kafka klasteryje ir kurio numatytoji reikšmė yra „7 dienos“. Jei per tą laiką programa negavo jokių pranešimų, tada, kai bandysite kitą operacijos siuntimą, gausite InvalidPidMappingException. Tada operacijos koordinatorius išduos naują sekančios operacijos eilės numerį. Tačiau pranešimas gali būti prarastas, jei InvalidPidMappingException nebus tinkamai tvarkomas.

Vietoj bendrų sumų

Kaip matote, vien tik siųsti žinutes Kafkai neužtenka. Turite pasirinkti parametrų derinį ir būti pasirengę greitai atlikti pakeitimus. Šiame straipsnyje pabandžiau išsamiai parodyti tiksliai vieną kartą pristatytą sąranką ir aprašiau kelias problemas, su kuriomis susidūrėme su client.id ir transactional.id konfigūracijomis. Toliau pateikiama gamintojo ir vartotojo nustatymų santrauka.

Gamintojas:

  1. acks = viskas
  2. bandymai > 0
  3. įgalinti.idempotence = tiesa
  4. max.in.flight.requests.per.connection ≤ 5 (1 už tvarkingą siuntimą)
  5. transakcinis.id = ${application-name}-${hostname}

Vartotojas:

  1. isolation.level = read_committed

Siekdami sumažinti būsimų programų klaidas, sukūrėme savo apvyniojimą ant spyruoklės konfigūracijos, kur kai kurių išvardytų parametrų reikšmės jau yra nustatytos.

Čia yra keletas medžiagų savarankiškam mokymuisi:

Šaltinis: www.habr.com

Добавить комментарий