Kako je Kafka postao stvarnost

Kako je Kafka postao stvarnost

Hej Habr!

Radim u Tinkoff timu, koji razvija sopstveni centar za obaveštavanje. Uglavnom razvijam u Javi koristeći Spring boot i rješavam razne tehničke probleme koji se javljaju u projektu.

Većina naših mikroservisa asinhrono komunicira jedni s drugima preko posrednika poruka. Ranije smo koristili IBM MQ kao brokera, koji više nije mogao da se nosi sa opterećenjem, ali je istovremeno imao visoke garancije isporuke.

Kao zamjena nam je ponuđen Apache Kafka, koji ima veliki potencijal skaliranja, ali, nažalost, zahtijeva gotovo individualan pristup konfiguraciji za različite scenarije. Osim toga, barem jednom mehanizam isporuke koji radi u Kafki po defaultu nije omogućio održavanje potrebnog nivoa konzistentnosti izvan kutije. Zatim ću podijeliti naše iskustvo u Kafka konfiguraciji, posebno ću vam reći kako da konfigurirate i živite s tačno jednom isporukom.

Zagarantovana dostava i ostalo

Postavke o kojima se govori u nastavku pomoći će spriječiti brojne probleme sa zadanim postavkama veze. Ali prvo bih želio da obratim pažnju na jedan parametar koji će olakšati moguće otklanjanje grešaka.

Ovo će pomoći client.id za proizvođača i potrošača. Na prvi pogled možete koristiti ime aplikacije kao vrijednost, a u većini slučajeva to će funkcionirati. Iako situacija kada aplikacija koristi nekoliko potrošača i date im isti client.id, rezultira sljedećim upozorenjem:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Ako želite da koristite JMX u aplikaciji sa Kafkom, onda bi to mogao biti problem. U ovom slučaju, najbolje je koristiti kombinaciju naziva aplikacije i, na primjer, naziva teme kao vrijednost client.id. Rezultat naše konfiguracije se može vidjeti u izlazu naredbe kafka-potrošačke grupe od komunalnih usluga iz Confluenta:

Kako je Kafka postao stvarnost

Pogledajmo sada scenario za garantovanu isporuku poruke. Kafka Producer ima parametar acks, što vam omogućava da konfigurišete nakon koliko potvrda treba vođi klastera da smatra da je poruka uspešno napisana. Ovaj parametar može imati sljedeće vrijednosti:

  • 0 — potvrda se neće uzeti u obzir.
  • 1 je zadani parametar, samo 1 replika je potrebna za potvrdu.
  • −1 — potrebna je potvrda svih sinkroniziranih replika (podešavanje klastera min.insync.replicas).

Iz navedenih vrijednosti jasno je da acks jednak -1 daje najjaču garanciju da poruka neće biti izgubljena.

Kao što svi znamo, distribuirani sistemi su nepouzdani. Za zaštitu od prolaznih kvarova, Kafka Producer nudi tu opciju ponovo pokušava, što vam omogućava da postavite broj pokušaja ponovnog slanja unutar isporuke.timeout.ms. Budući da parametar ponovnih pokušaja ima zadanu vrijednost Integer.MAX_VALUE (2147483647), broj ponovnih pokušaja poruke može se podesiti promjenom samo isporuke.timeout.ms.

Idemo ka tačno jednoj isporuci

Navedene postavke omogućavaju našem Proizvođaču da isporučuje poruke uz visoku garanciju. Hajde sada da razgovaramo o tome kako osigurati da samo jedna kopija poruke bude napisana na Kafkinu temu? U najjednostavnijem slučaju, da biste to učinili, morate postaviti parametar na Producer enable.idempotence to true. Idempotencija garantuje da se samo jedna poruka upisuje na određenu particiju jedne teme. Preduslov za omogućavanje idempotencije su vrijednosti acks = sve, ponovi > 0, max.in.flight.requests.per.connection ≤ 5. Ako ove parametre ne odredi programer, gore navedene vrijednosti će se automatski postaviti.

Kada je idempotencija konfigurisana, potrebno je osigurati da iste poruke svaki put završe na istim particijama. Ovo se može učiniti postavljanjem ključa i parametra partitioner.class na Producer. Počnimo s ključem. Mora biti isti za svaki podnesak. To se lako može postići korištenjem bilo kojeg poslovnog ID-a iz originalnog posta. Parametar partitioner.class ima zadanu vrijednost − DefaultPartitioner. Sa ovom strategijom particioniranja, po defaultu se ponašamo ovako:

  • Ako je particija eksplicitno navedena prilikom slanja poruke, onda je koristimo.
  • Ako particija nije navedena, ali je ključ naveden, odaberite particiju prema hash ključu.
  • Ako particija i ključ nisu specificirani, odaberite particije jednu po jednu (kružno).

Također, korištenje ključa i idempotentno slanje s parametrom max.in.flight.requests.per.connection = 1 daje vam pojednostavljenu obradu poruka na Potrošaču. Također je vrijedno zapamtiti da ako je kontrola pristupa konfigurisana na vašem klasteru, tada će vam trebati prava da idempotentno pišete u temu.

Ako vam odjednom nedostaju mogućnosti idempotentnog slanja po ključu ili logika na strani proizvođača zahtijeva održavanje konzistentnosti podataka između različitih particija, tada će transakcije doći u pomoć. Osim toga, koristeći lančanu transakciju, možete uvjetno sinkronizirati zapis u Kafki, na primjer, sa zapisom u bazi podataka. Da bi se omogućilo transakcijsko slanje Proizvođaču, mora biti idempotentno i dodatno podešeno transakcija.id. Ako vaš Kafka klaster ima konfiguriranu kontrolu pristupa, tada će transakcijskom zapisu, poput idempotentnog zapisa, biti potrebne dozvole za pisanje, koje se mogu dodijeliti maskom koristeći vrijednost pohranjenu u transakciji.id.

Formalno, bilo koji niz, kao što je ime aplikacije, može se koristiti kao identifikator transakcije. Ali ako pokrenete nekoliko instanci iste aplikacije s istim transakcijskim.id-om, tada će prva pokrenuta instanca biti zaustavljena s greškom, jer će Kafka to smatrati zombi procesom.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Da bismo riješili ovaj problem, imenu aplikacije dodajemo sufiks u obliku imena hosta, koji dobijamo iz varijabli okruženja.

Proizvođač je konfigurisan, ali transakcije na Kafki kontrolišu samo opseg poruke. Bez obzira na status transakcije, poruka odmah ide na temu, ali ima dodatne sistemske atribute.

Kako bi spriječio da potrošač pročita takve poruke unaprijed, potrebno je postaviti parametar nivo izolacije na read_committed vrijednost. Takav potrošač će moći čitati netransakcione poruke kao i prije, a transakcijske poruke tek nakon urezivanja.
Ako ste postavili sva prethodno navedena podešavanja, tada ste konfigurisali tačno jednom isporuku. Čestitamo!

Ali postoji još jedna nijansa. Transactional.id, koji smo konfigurisali gore, zapravo je prefiks transakcije. U upravitelju transakcija, njemu se dodaje redni broj. Primljeni identifikator se izdaje transakcija.id.expiration.ms, koji je konfiguriran na Kafka klasteru i ima zadanu vrijednost od "7 dana". Ako za to vrijeme aplikacija nije primila nijednu poruku, tada ćete primiti sljedeće transakcijsko slanje kada pokušate InvalidPidMappingException. Koordinator transakcije će tada izdati novi redni broj za sljedeću transakciju. Međutim, poruka može biti izgubljena ako se izuzetkom InvalidPidMappingException ne rukuje ispravno.

Umesto ukupnih

Kao što vidite, nije dovoljno samo poslati poruke Kafki. Morate odabrati kombinaciju parametara i biti spremni za brze promjene. U ovom članku pokušao sam detaljno prikazati postavke za točno jednokratnu isporuku i opisao nekoliko problema s konfiguracijama client.id i transactional.id na koje smo naišli. Ispod je sažetak postavki proizvođača i potrošača.

producent:

  1. acks = sve
  2. pokušaji > 0
  3. enable.idempotence = istina
  4. max.in.flight.requests.per.connection ≤ 5 (1 za uredno slanje)
  5. transakcija.id = ${ime-aplikacije}-${ime hosta}

Potrošač:

  1. isolation.level = read_committed

Da bismo sveli greške u budućim aplikacijama, napravili smo vlastiti omot preko konfiguracije opruge, gdje su vrijednosti za neke od navedenih parametara već postavljene.

Evo nekoliko materijala za samostalno učenje:

izvor: www.habr.com

Dodajte komentar