Kako je Kafka postao stvarnost

Kako je Kafka postao stvarnost

Hej Habr!

Radim u Tinkoff timu koji razvija vlastiti centar za obavijesti. Najviše se razvijam u Javi koristeći Spring boot i rješavam razne tehničke probleme koji se pojave u projektu.

Većina naših mikroservisa međusobno komunicira asinkrono putem brokera poruka. Prethodno smo koristili IBM MQ kao posrednika, koji se više nije mogao nositi s opterećenjem, ali je istovremeno imao visoka jamstva isporuke.

Kao zamjena ponuđen nam je Apache Kafka koji ima veliki potencijal skaliranja, ali nažalost zahtijeva gotovo individualan pristup konfiguraciji za različite scenarije. Osim toga, mehanizam za isporuku barem jednom koji prema zadanim postavkama radi u Kafki nije omogućio održavanje potrebne razine dosljednosti izvan kutije. Zatim ću podijeliti naše iskustvo u Kafka konfiguraciji, posebno ću vam reći kako konfigurirati i živjeti s točno jednom isporukom.

Zajamčena dostava i više

Dolje navedene postavke pomoći će u sprječavanju brojnih problema sa zadanim postavkama veze. Ali prvo bih želio obratiti pozornost na jedan parametar koji će olakšati moguće otklanjanje pogrešaka.

Ovo će pomoći client.id za proizvođača i potrošača. Na prvi pogled možete koristiti naziv aplikacije kao vrijednost i u većini slučajeva to će raditi. Iako situacija kada aplikacija koristi nekoliko potrošača i vi im date isti client.id, rezultira sljedećim upozorenjem:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Ako želite koristiti JMX u aplikaciji s Kafkom, to bi mogao biti problem. U ovom slučaju, najbolje je koristiti kombinaciju naziva aplikacije i, na primjer, naziva teme kao client.id vrijednost. Rezultat naše konfiguracije može se vidjeti u izlazu naredbe kafka-skupine-potrošača od pomoćnih programa iz Confluenta:

Kako je Kafka postao stvarnost

Sada pogledajmo scenarij za zajamčenu isporuku poruka. Kafka Producer ima parametar acks, što vam omogućuje da konfigurirate nakon koliko potvrda vođa klastera treba da smatra da je poruka uspješno napisana. Ovaj parametar može poprimiti sljedeće vrijednosti:

  • 0 — potvrda se neće uzeti u obzir.
  • 1 je zadani parametar, potrebna je samo 1 replika za potvrdu.
  • −1 — potrebna je potvrda od svih sinkroniziranih replika (postavljanje klastera min.insync.replike).

Iz navedenih vrijednosti jasno je da acks jednak −1 daje najjaču garanciju da poruka neće biti izgubljena.

Kao što svi znamo, distribuirani sustavi su nepouzdani. Za zaštitu od prolaznih grešaka, Kafka Producer nudi opciju ponovnih pokušaja, što vam omogućuje postavljanje broja pokušaja ponovnog slanja unutar isporuka.timeout.ms. Budući da parametar ponovnih pokušaja ima zadanu vrijednost Integer.MAX_VALUE (2147483647), broj ponovnih pokušaja poruke može se podesiti mijenjanjem samo delivery.timeout.ms.

Idemo prema točno jednoj isporuci

Navedene postavke omogućuju našem producentu isporuku poruka s visokim jamstvom. Razgovarajmo sada o tome kako osigurati da samo jedna kopija poruke bude napisana u Kafkinoj temi? U najjednostavnijem slučaju, da biste to učinili, trebate postaviti parametar na Producer omogućiti.idempotencija na istinito. Idempotencija jamči da je samo jedna poruka napisana u određenu particiju jedne teme. Preduvjet za omogućavanje idempotencije su vrijednosti acks = sve, ponovni pokušaj > 0, max.in.flight.requests.per.connection ≤ 5. Ako razvojni programer ne odredi ove parametre, gornje vrijednosti bit će automatski postavljene.

Kada je idempotencija konfigurirana, potrebno je osigurati da iste poruke svaki put završe na istim particijama. To se može učiniti postavljanjem ključa partitioner.class i parametra na Producer. Počnimo s ključem. Mora biti isti za svaki podnesak. To se lako može postići korištenjem bilo kojeg poslovnog ID-a iz izvorne objave. Parametar partitioner.class ima zadanu vrijednost − Default Partitioner. S ovom strategijom particioniranja, prema zadanim postavkama ponašamo se ovako:

  • Ako je particija eksplicitno navedena prilikom slanja poruke, tada je koristimo.
  • Ako particija nije navedena, ali je naveden ključ, odaberite particiju prema hash ključu.
  • Ako particija i ključ nisu navedeni, odaberite particije jednu po jednu (kružno).

Također, korištenje ključa i idempotentno slanje s parametrom max.in.flight.requests.per.connection = 1 daje vam pojednostavljenu obradu poruka na Potrošaču. Također je vrijedno zapamtiti da ako je kontrola pristupa konfigurirana na vašem klasteru, tada ćete trebati prava za idempotentno pisanje teme.

Ako vam odjednom nedostaju mogućnosti idempotentnog slanja ključem ili logika na strani proizvođača zahtijeva održavanje konzistentnosti podataka između različitih particija, tada će transakcije priskočiti u pomoć. Osim toga, koristeći lančanu transakciju, možete uvjetno sinkronizirati zapis u Kafki, na primjer, sa zapisom u bazi podataka. Da bi se omogućilo transakcijsko slanje Proizvođaču, on mora biti idempotentan i dodatno postavljen transakcijski.id. Ako vaš Kafka klaster ima konfiguriranu kontrolu pristupa, tada će transakcijski zapis, poput idempotentnog zapisa, trebati dopuštenja za pisanje, koja se mogu dodijeliti maskom koristeći vrijednost pohranjenu u transactional.id.

Formalno, bilo koji niz, kao što je naziv aplikacije, može se koristiti kao identifikator transakcije. Ali ako pokrenete nekoliko instanci iste aplikacije s istim transactional.id-om, tada će prva pokrenuta instanca biti zaustavljena s pogreškom, jer će je Kafka smatrati zombi procesom.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Kako bismo riješili ovaj problem, nazivu aplikacije dodajemo sufiks u obliku naziva hosta koji dobivamo iz varijabli okoline.

Proizvođač je konfiguriran, ali transakcije na Kafki kontroliraju samo opseg poruke. Bez obzira na status transakcije, poruka odmah ide u temu, ali ima dodatne sistemske atribute.

Kako bi se spriječilo čitanje takvih poruka od strane Potrošača prije vremena, potrebno je postaviti parametar izolacija.nivo za read_committed vrijednost. Takav će potrošač moći čitati netransakcijske poruke kao i prije, a transakcijske poruke tek nakon izvršenja.
Ako ste postavili sve ranije navedene postavke, tada ste konfigurirali točno jednu isporuku. Čestitamo!

Ali postoji još jedna nijansa. Transactional.id, koji smo gore konfigurirali, zapravo je prefiks transakcije. Na upravitelju transakcija dodaje mu se redni broj. Primljeni identifikator se izdaje transakcijski.id.istek.ms, koji je konfiguriran na Kafka klasteru i ima zadanu vrijednost "7 dana". Ako tijekom tog vremena aplikacija nije primila nijednu poruku, primit ćete sljedeće transakcijsko slanje kada pokušate poslati InvalidPidMappingException. Koordinator transakcije će zatim izdati novi redni broj za sljedeću transakciju. Međutim, poruka se može izgubiti ako se InvalidPidMappingException ne postupa ispravno.

Umjesto ukupnih iznosa

Kao što vidite, Kafki nije dovoljno samo slati poruke. Morate odabrati kombinaciju parametara i biti spremni na brze promjene. U ovom sam članku pokušao detaljno prikazati postavku isporuke točno jednom i opisao nekoliko problema s konfiguracijama client.id i transactional.id na koje smo naišli. Ispod je sažetak postavki proizvođača i potrošača.

Producent:

  1. acks = sve
  2. ponovni pokušaji > 0
  3. enable.idempotence = istina
  4. max.in.flight.requests.per.connection ≤ 5 (1 za uredno slanje)
  5. transactional.id = ${application-name}-${hostname}

Potrošač:

  1. izolacija.razina = read_committed

Kako bismo minimizirali pogreške u budućim aplikacijama, napravili smo vlastiti omot preko proljetne konfiguracije, gdje su vrijednosti za neke od navedenih parametara već postavljene.

Evo nekoliko materijala za samostalno učenje:

Izvor: www.habr.com

Dodajte komentar