
Hei Habr!
Lucrez în echipa Tinkoff, care își dezvoltă propriul centru de notificare. Dezvolt în principal în Java folosind Spring boot și rezolv diverse probleme tehnice care apar într-un proiect.
Majoritatea microserviciilor noastre comunică între ele în mod asincron printr-un broker de mesaje. Anterior, am folosit IBM MQ ca broker, care nu mai putea face față încărcăturii, dar în același timp avea garanții mari de livrare.
Ca înlocuitor, ni s-a oferit Apache Kafka, care are un potențial mare de scalare, dar, din păcate, necesită o abordare aproape individuală a configurației pentru diferite scenarii. În plus, mecanismul de livrare cel puțin o dată care funcționează în mod implicit în Kafka nu a permis menținerea nivelului necesar de consistență din cutie. În continuare, voi împărtăși experiența noastră în configurarea Kafka, în special, vă voi spune cum să configurați și să trăiți cu exact o dată livrare.
Livrare garantată și nu numai
Setările discutate mai jos vor ajuta la prevenirea o serie de probleme cu setările implicite de conexiune. Dar mai întâi aș dori să fiu atent la un parametru care va facilita o posibilă depanare.
Aceasta va ajuta client.id pentru Producător și Consumator. La prima vedere, puteți utiliza numele aplicației ca valoare și, în majoritatea cazurilor, aceasta va funcționa. Deși situația în care o aplicație folosește mai mulți consumatori și le oferiți același client.id, are ca rezultat următorul avertisment:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0Dacă doriți să utilizați JMX într-o aplicație cu Kafka, atunci aceasta ar putea fi o problemă. În acest caz, cel mai bine este să utilizați o combinație a numelui aplicației și, de exemplu, a numelui subiectului ca valoare client.id. Rezultatul configurației noastre poate fi văzut în rezultatul comenzii kafka-grupuri de consumatori de la utilitatile de la Confluent:

Acum să ne uităm la scenariul pentru livrarea garantată a mesajelor. Kafka Producer are un parametru acks, care vă permite să configurați după câte confirmări are nevoie liderul clusterului pentru a lua în considerare mesajul scris cu succes. Acest parametru poate lua următoarele valori:
- 0 — confirmarea nu va fi luată în considerare.
- 1 este parametrul implicit, este necesară doar 1 replică pentru confirmare.
- −1 — este necesară confirmarea de la toate replicile sincronizate (configurarea clusterului min.insync.replicas).
Din valorile enumerate este clar că valorile egale cu −1 oferă cea mai puternică garanție că mesajul nu se va pierde.
După cum știm cu toții, sistemele distribuite nu sunt de încredere. Pentru a proteja împotriva defecțiunilor tranzitorii, Kafka Producer oferă opțiunea reîncearcă, care vă permite să setați numărul de încercări de retrimitere livrare.timeout.ms. Deoarece parametrul de reîncercări are o valoare implicită Integer.MAX_VALUE (2147483647), numărul de reîncercări ale mesajelor poate fi ajustat prin modificarea numai a delivery.timeout.ms.
Ne îndreptăm către o livrare exactă
Setările enumerate permit Producătorului nostru să livreze mesaje cu o garanție ridicată. Să vorbim acum despre cum să ne asigurăm că doar o copie a unui mesaj este scrisă într-un subiect Kafka? În cel mai simplu caz, pentru a face acest lucru, trebuie să setați parametrul pe Producer activa.idempotenta la adevărat. Idempotency garantează că un singur mesaj este scris într-o anumită partiție a unui subiect. Condiția prealabilă pentru activarea idempotnței sunt valorile acks = toate, reîncercați > 0, max.în.flight.requests.per.connection ≤ 5. Dacă acești parametri nu sunt specificați de dezvoltator, valorile de mai sus vor fi setate automat.
Când idempotence este configurat, este necesar să vă asigurați că aceleași mesaje ajung de fiecare dată în aceleași partiții. Acest lucru se poate face setând cheia și parametrul partitioner.class la Producer. Să începem cu cheia. Trebuie să fie același pentru fiecare depunere. Acest lucru poate fi realizat cu ușurință folosind oricare dintre ID-urile companiei din postarea inițială. Parametrul partitioner.class are o valoare implicită − . Cu această strategie de partiționare, în mod implicit procedăm astfel:
- Dacă partiția este specificată în mod explicit la trimiterea mesajului, atunci o folosim.
- Dacă partiția nu este specificată, dar cheia este specificată, selectați partiția după codul hash al cheii.
- Dacă partiția și cheia nu sunt specificate, selectați partițiile una câte una (round-robin).
De asemenea, folosind o cheie și trimitere idempotent cu un parametru max.in.flight.requests.per.connection = 1 vă oferă procesare eficientă a mesajelor pentru Consumator. De asemenea, merită să ne amintim că, dacă controlul accesului este configurat pe clusterul dvs., atunci veți avea nevoie de drepturi pentru a scrie în mod idempotent la un subiect.
Dacă dintr-o dată vă lipsesc capacitățile de trimitere idempotent prin cheie sau logica din partea Producătorului necesită menținerea coerenței datelor între diferite partiții, atunci tranzacțiile vor veni în ajutor. În plus, folosind o tranzacție în lanț, puteți sincroniza condiționat o înregistrare în Kafka, de exemplu, cu o înregistrare în baza de date. Pentru a permite trimiterea tranzacțională către Producător, aceasta trebuie să fie idempotent și setat suplimentar tranzacțional.id. Dacă clusterul tău Kafka are controlul accesului configurat, atunci o înregistrare tranzacțională, cum ar fi o înregistrare idempotent, va avea nevoie de permisiuni de scriere, care pot fi acordate prin mască folosind valoarea stocată în transactional.id.
În mod formal, orice șir, cum ar fi numele aplicației, poate fi folosit ca identificator de tranzacție. Dar dacă lansați mai multe instanțe ale aceleiași aplicații cu același transactional.id, atunci prima instanță lansată va fi oprită cu o eroare, deoarece Kafka îl va considera un proces zombi.
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.Pentru a rezolva această problemă, adăugăm un sufix la numele aplicației sub forma numelui de gazdă, pe care îl obținem din variabilele de mediu.
Producătorul este configurat, dar tranzacțiile pe Kafka controlează doar domeniul de aplicare al mesajului. Indiferent de starea tranzacției, mesajul trece imediat la subiect, dar are atribute suplimentare de sistem.
Pentru a preveni ca astfel de mesaje să fie citite din timp de către Consumator, acesta trebuie să seteze parametrul izolare.nivel la valoarea read_committed. Un astfel de Consumator va putea citi mesajele netranzacționale ca înainte, iar mesajele tranzacționale numai după o comitere.
Dacă ați setat toate setările enumerate mai devreme, atunci ați configurat exact o dată livrare. Felicitări!
Dar mai este o nuanță. Transactional.id, pe care l-am configurat mai sus, este de fapt prefixul tranzacției. Pe managerul de tranzacții, i se adaugă un număr de secvență. Identificatorul primit este emis către tranzacțional.id.expiration.ms, care este configurat pe un cluster Kafka și are o valoare implicită de „7 zile”. Dacă în acest timp aplicația nu a primit niciun mesaj, atunci când încercați următoarea trimitere tranzacțională veți primi InvalidPidMappingException. Coordonatorul tranzacției va emite apoi un nou număr de ordine pentru următoarea tranzacție. Cu toate acestea, mesajul se poate pierde dacă InvalidPidMappingException nu este tratată corect.
În loc de totaluri
După cum puteți vedea, nu este suficient să trimiteți pur și simplu mesaje lui Kafka. Trebuie să alegeți o combinație de parametri și să fiți pregătit să faceți modificări rapide. În acest articol, am încercat să arăt în detaliu configurația de livrare exactă o dată și am descris mai multe probleme cu configurațiile client.id și transactional.id pe care le-am întâlnit. Mai jos este un rezumat al setărilor Producătorului și Consumatorului.
Producer:
- acks = all
- reîncercări > 0
- enable.idempotenţă = adevărat
- max.în.flight.cereri.per.conexiune ≤ 5 (1 pentru trimitere ordonată)
- transactional.id = ${nume-aplicație}-${nume gazdă}
Consumator:
- isolation.level = read_committed
Pentru a minimiza erorile în aplicațiile viitoare, am realizat propriul nostru înveliș peste configurația arcului, unde sunt deja setate valori pentru unii dintre parametrii enumerați.
Iată câteva materiale pentru auto-studiu:
Sursa: www.habr.com
