Hoe Kafka werkelijkheid werd

Hoe Kafka werkelijkheid werd

Hé Habr!

Ik werk in het Tinkoff-team, dat een eigen meldingscentrum ontwikkelt. Ik ontwikkel voornamelijk in Java met behulp van Spring Boot en los verschillende technische problemen op die zich in een project voordoen.

De meeste van onze microservices communiceren asynchroon met elkaar via een berichtenmakelaar. Voorheen maakten wij gebruik van IBM MQ als makelaar, die de last niet meer aankon, maar tegelijkertijd hoge leveringsgaranties had.

Ter vervanging kregen we Apache Kafka aangeboden, dat een groot schaalpotentieel heeft, maar helaas een bijna individuele configuratiebenadering voor verschillende scenario's vereist. Bovendien maakte het mechanisme voor ten minste één levering, dat standaard in Kafka werkt, het niet mogelijk om het vereiste niveau van consistentie out-of-the-box te handhaven. Vervolgens zal ik onze ervaring met de Kafka-configuratie delen, in het bijzonder zal ik u vertellen hoe u precies één keer moet configureren en ermee kunt leven.

Gegarandeerde levering en meer

De hieronder besproken instellingen helpen een aantal problemen met de standaardverbindingsinstellingen te voorkomen. Maar eerst zou ik aandacht willen besteden aan één parameter die een mogelijke foutopsporing zal vergemakkelijken.

Dit zal helpen klant identificatie voor producent en consument. Op het eerste gezicht kunt u de applicatienaam als waarde gebruiken, en in de meeste gevallen zal dit werken. Hoewel de situatie waarin een applicatie meerdere Consumers gebruikt en u hen dezelfde client.id geeft, resulteert in de volgende waarschuwing:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Als je JMX wilt gebruiken in een applicatie met Kafka, dan kan dit een probleem zijn. In dit geval kunt u het beste een combinatie van de applicatienaam en bijvoorbeeld de topicnaam als client.id-waarde gebruiken. Het resultaat van onze configuratie is te zien in de opdrachtuitvoer kafka-consumentengroepen van nutsbedrijven van Confluent:

Hoe Kafka werkelijkheid werd

Laten we nu eens kijken naar het scenario voor gegarandeerde bezorging van berichten. Kafka Producer heeft een parameter aks, waarmee u kunt configureren na hoeveel bevestigingen de clusterleider nodig heeft om het bericht als succesvol geschreven te beschouwen. Deze parameter kan de volgende waarden aannemen:

  • 0 — bevestiging wordt niet in overweging genomen.
  • 1 is de standaardparameter; er is slechts 1 replica vereist om te bevestigen.
  • −1 — bevestiging van alle gesynchroniseerde replica's is vereist (clusterconfiguratie min.insync.replica's).

Uit de genoemde waarden blijkt duidelijk dat acks gelijk aan −1 de sterkste garantie geven dat het bericht niet verloren gaat.

Zoals we allemaal weten zijn gedistribueerde systemen onbetrouwbaar. Ter bescherming tegen tijdelijke fouten biedt Kafka Producer deze optie pogingen, waarmee u het aantal pogingen tot opnieuw verzenden kunt instellen bezorgingstime-out.ms. Omdat de parameter voor nieuwe pogingen de standaardwaarde Integer.MAX_VALUE (2147483647) heeft, kan het aantal nieuwe pogingen voor berichten worden aangepast door alleen delivery.timeout.ms te wijzigen.

We gaan richting eenmalig leveren

Met de vermelde instellingen kan onze Producer berichten met een hoge garantie bezorgen. Laten we het nu hebben over hoe we ervoor kunnen zorgen dat slechts één kopie van een bericht naar een Kafka-onderwerp wordt geschreven? In het eenvoudigste geval moet u hiervoor de parameter op Producer instellen enable.idempotence naar waar. Idempotency garandeert dat er slechts één bericht naar een specifieke partitie van één onderwerp wordt geschreven. De voorwaarde voor het mogelijk maken van idempotentie zijn de waarden acks = alles, opnieuw proberen > 0, max.in.flight.requests.per.connection ≤ 5. Als deze parameters niet door de ontwikkelaar zijn opgegeven, worden de bovenstaande waarden automatisch ingesteld.

Wanneer idempotentie is geconfigureerd, is het noodzakelijk om ervoor te zorgen dat dezelfde berichten elke keer op dezelfde partities terechtkomen. Dit kunt u doen door de sleutel en parameter partitioner.class in te stellen op Producer. Laten we beginnen met de sleutel. Deze moet voor elke inzending hetzelfde zijn. Dit kan eenvoudig worden bereikt door een van de bedrijfs-ID's uit het oorspronkelijke bericht te gebruiken. De parameter partitioner.class heeft een standaardwaarde − StandaardPartitioner. Met deze partitiestrategie handelen we standaard als volgt:

  • Als de partitie expliciet wordt opgegeven bij het verzenden van het bericht, gebruiken we deze.
  • Als de partitie niet is opgegeven, maar de sleutel wel is opgegeven, selecteert u de partitie op basis van de hash van de sleutel.
  • Als de partitie en sleutel niet zijn opgegeven, selecteert u de partities één voor één (round-robin).

Ook met behulp van een sleutel en idempotent verzenden met een parameter max.in.flight.requests.per.connectie = 1 geeft u een gestroomlijnde berichtverwerking voor de consument. Het is ook de moeite waard om te onthouden dat als toegangscontrole op uw cluster is geconfigureerd, u rechten nodig heeft om idempotent naar een onderwerp te schrijven.

Als je plotseling de mogelijkheden van idempotent verzenden per sleutel mist, of als de logica aan de kant van de Producer het handhaven van gegevensconsistentie tussen verschillende partities vereist, dan zullen transacties te hulp komen. Daarnaast kunt u met een ketentransactie een record in Kafka voorwaardelijk synchroniseren met bijvoorbeeld een record in de database. Om transactionele verzending naar de Producent mogelijk te maken, moet deze idempotent zijn en bovendien zijn ingesteld transactionele.id. Als voor uw Kafka-cluster toegangscontrole is geconfigureerd, heeft een transactioneel record, zoals een idempotent record, schrijfrechten nodig, die kunnen worden verleend via een masker met behulp van de waarde die is opgeslagen in transactional.id.

Formeel kan elke tekenreeks, zoals de applicatienaam, worden gebruikt als transactie-ID. Maar als u meerdere exemplaren van dezelfde applicatie start met dezelfde transactional.id, wordt de eerste gestarte instantie gestopt met een fout, omdat Kafka het als een zombieproces zal beschouwen.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Om dit probleem op te lossen, voegen we een achtervoegsel toe aan de applicatienaam in de vorm van de hostnaam, die we verkrijgen uit omgevingsvariabelen.

De producent is geconfigureerd, maar transacties op Kafka bepalen alleen de reikwijdte van het bericht. Ongeacht de transactiestatus gaat het bericht onmiddellijk naar het onderwerp, maar heeft het aanvullende systeemkenmerken.

Om te voorkomen dat dergelijke berichten vooraf door de Consument worden gelezen, moet deze de parameter instellen isolatieniveau naar read_commited waarde. Een dergelijke consument kan niet-transactionele berichten lezen zoals voorheen, en transactionele berichten pas na een commit.
Als u alle eerder genoemde instellingen heeft ingesteld, dan heeft u bij levering precies één keer geconfigureerd. Gefeliciteerd!

Maar er is nog een nuance. Transactional.id, dat we hierboven hebben geconfigureerd, is eigenlijk het transactievoorvoegsel. Op de transactiemanager wordt er een volgnummer aan toegevoegd. De ontvangen identificatie wordt uitgegeven aan transactionele.id.expiration.ms, die is geconfigureerd op een Kafka-cluster en een standaardwaarde heeft van “7 dagen”. Als de applicatie gedurende deze tijd geen berichten heeft ontvangen, ontvangt u de volgende transactionele verzending wanneer u deze probeert InvalidPidMappingException. De transactiecoördinator geeft dan een nieuw volgnummer af voor de volgende transactie. Het bericht kan echter verloren gaan als de InvalidPidMappingException niet correct wordt afgehandeld.

In plaats van totalen

Zoals je kunt zien, is het niet voldoende om alleen maar berichten naar Kafka te sturen. U moet een combinatie van parameters kiezen en bereid zijn om snelle wijzigingen aan te brengen. In dit artikel heb ik geprobeerd de exacte eenmalige leveringsconfiguratie in detail weer te geven en heb ik verschillende problemen beschreven met de client.id- en transactional.id-configuraties die we tegenkwamen. Hieronder vindt u een samenvatting van de Producent- en Consumenteninstellingen.

Producer:

  1. acks = alles
  2. nieuwe pogingen > 0
  3. enable.idempotence = waar
  4. max.in.flight.requests.per.connection ≤ 5 (1 voor ordelijk verzenden)
  5. transactioneel.id = ${applicatienaam}-${hostnaam}

Consument:

  1. isolation.level = lees_toegewijd

Om fouten in toekomstige toepassingen te minimaliseren, hebben we onze eigen wrapper over de veerconfiguratie gemaakt, waar waarden voor sommige van de genoemde parameters al zijn ingesteld.

Hier zijn een paar materialen voor zelfstudie:

Bron: www.habr.com

Voeg een reactie