Hvordan Kafka blev til virkelighed

Hvordan Kafka blev til virkelighed

Hej Habr!

Jeg arbejder på Tinkoff-teamet, som er ved at udvikle sit eget notifikationscenter. Jeg udvikler mest i Java ved hjælp af Spring boot og løser forskellige tekniske problemer, der opstår i et projekt.

De fleste af vores mikrotjenester kommunikerer med hinanden asynkront gennem en meddelelsesmægler. Tidligere brugte vi IBM MQ som mægler, der ikke længere kunne klare belastningen, men samtidig havde høje leveringsgarantier.

Som erstatning fik vi tilbudt Apache Kafka, som har et højt skaleringspotentiale, men som desværre kræver en næsten individuel tilgang til konfiguration til forskellige scenarier. Derudover tillod den mindst én gang leveringsmekanisme, der fungerer i Kafka som standard, ikke at opretholde det krævede niveau af konsistens ud af kassen. Dernæst vil jeg dele vores erfaring med Kafka-konfiguration, især vil jeg fortælle dig, hvordan du konfigurerer og lever med præcis én gang levering.

Garanteret levering og mere

Indstillingerne beskrevet nedenfor hjælper med at forhindre en række problemer med standardforbindelsesindstillingerne. Men først vil jeg gerne være opmærksom på en parameter, der vil lette en eventuel debug.

Dette vil hjælpe klient.id for producent og forbruger. Ved første øjekast kan du bruge applikationsnavnet som værdi, og i de fleste tilfælde vil dette virke. Selvom situationen, hvor en applikation bruger flere forbrugere, og du giver dem det samme client.id, resulterer i følgende advarsel:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Hvis du vil bruge JMX i en applikation med Kafka, så kan dette være et problem. I dette tilfælde er det bedst at bruge en kombination af applikationsnavnet og f.eks. emnenavnet som client.id-værdien. Resultatet af vores konfiguration kan ses i kommandoudgangen kafka-forbruger-grupper fra hjælpeprogrammer fra Confluent:

Hvordan Kafka blev til virkelighed

Lad os nu se på scenariet for garanteret beskedlevering. Kafka Producer har en parameter ack'er, som giver dig mulighed for at konfigurere efter, hvor mange anerkendelser klyngelederen skal bruge for at betragte meddelelsen som vellykket skrevet. Denne parameter kan have følgende værdier:

  • 0 — anerkend vil ikke blive taget i betragtning.
  • 1 er standardparameteren, kun 1 replika er nødvendig for at bekræfte.
  • −1 — bekræftelse fra alle synkroniserede replikaer er påkrævet (klyngeopsætning min.insync.replikaer).

Fra de anførte værdier er det klart, at acks lig med -1 giver den stærkeste garanti for, at beskeden ikke går tabt.

Som vi alle ved, er distribuerede systemer upålidelige. For at beskytte mod forbigående fejl giver Kafka Producer muligheden prøver igen, som giver dig mulighed for at indstille antallet af genafsendelsesforsøg inden for levering.timeout.ms. Da genforsøgsparameteren har en standardværdi på Integer.MAX_VALUE (2147483647), kan antallet af meddelelsesforsøg justeres ved kun at ændre levering.timeout.ms.

Vi bevæger os mod præcis én gang levering

De anførte indstillinger giver vores producent mulighed for at levere beskeder med en høj garanti. Lad os nu tale om, hvordan man sikrer, at kun én kopi af en besked er skrevet til et Kafka-emne? I det enkleste tilfælde, for at gøre dette, skal du indstille parameteren på Producer aktivere.idempotens til sandt. Idempotens garanterer, at kun én besked skrives til en specifik partition af ét emne. Forudsætningen for at muliggøre idempotens er værdierne acks = alle, prøv igen > 0, max.in.flight.requests.per.connection ≤ 5. Hvis disse parametre ikke er angivet af udvikleren, indstilles ovenstående værdier automatisk.

Når idempotens er konfigureret, er det nødvendigt at sikre, at de samme beskeder ender i de samme partitioner hver gang. Dette kan gøres ved at indstille partitioner.class nøglen og parameteren til Producer. Lad os starte med nøglen. Det skal være det samme for hver indsendelse. Dette kan nemt opnås ved at bruge ethvert af forretnings-id'erne fra det originale indlæg. Parameteren partitioner.class har en standardværdi − DefaultPartitioner. Med denne partitioneringsstrategi handler vi som standard sådan:

  • Hvis partitionen udtrykkeligt er angivet, når meddelelsen sendes, bruger vi den.
  • Hvis partitionen ikke er angivet, men nøglen er specificeret, skal du vælge partitionen ved nøglens hash.
  • Hvis partitionen og nøglen ikke er specificeret, skal du vælge partitionerne én efter én (round-robin).

Også ved hjælp af en nøgle og idempotent afsendelse med en parameter max.in.flight.requests.per.connection = 1 giver dig strømlinet beskedbehandling på Forbrugeren. Det er også værd at huske, at hvis adgangskontrol er konfigureret på din klynge, så skal du have rettigheder til idempotent at skrive til et emne.

Hvis du pludselig mangler evnerne til idempotent afsendelse med nøgle, eller logikken på producentsiden kræver at opretholde datakonsistens mellem forskellige partitioner, så vil transaktioner komme til undsætning. Derudover kan du ved hjælp af en kædetransaktion betinget synkronisere en post i for eksempel Kafka med en post i databasen. For at muliggøre transaktionel afsendelse til producenten skal den være idempotent og yderligere indstillet transaktions-id. Hvis din Kafka-klynge har adgangskontrol konfigureret, så vil en transaktionspost, som en idempotent post, have behov for skrivetilladelser, som kan gives ved maske ved hjælp af værdien gemt i transactional.id.

Formelt kan enhver streng, såsom applikationsnavnet, bruges som en transaktions-id. Men hvis du starter flere forekomster af den samme applikation med det samme transaktions-id, så stoppes den første forekomst med en fejl, da Kafka vil betragte det som en zombieproces.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

For at løse dette problem tilføjer vi et suffiks til applikationsnavnet i form af værtsnavnet, som vi henter fra miljøvariabler.

Producenten er konfigureret, men transaktioner på Kafka styrer kun meddelelsens omfang. Uanset transaktionsstatus går meddelelsen straks til emnet, men har yderligere systemattributter.

For at forhindre sådanne meddelelser i at blive læst af forbrugeren i forvejen, skal den indstille parameteren isolationsniveau til read_committed værdi. En sådan forbruger vil være i stand til at læse ikke-transaktionelle meddelelser som før, og transaktionsmeddelelser kun efter en commit.
Hvis du har angivet alle de indstillinger, der er angivet tidligere, har du konfigureret nøjagtigt én gang levering. Tillykke!

Men der er en nuance mere. Transactional.id, som vi konfigurerede ovenfor, er faktisk transaktionspræfikset. På transaktionsmanageren tilføjes et sekvensnummer til den. Den modtagne identifikator udstedes til transaktions-id.udløb.ms, som er konfigureret på en Kafka-klynge og har en standardværdi på "7 dage". Hvis applikationen i løbet af dette tidsrum ikke har modtaget nogen beskeder, vil du modtage den, når du prøver den næste transaktionssending InvalidPidMappingException. Transaktionskoordinatoren vil derefter udstede et nyt sekvensnummer til den næste transaktion. Beskeden kan dog gå tabt, hvis InvalidPidMappingException ikke håndteres korrekt.

I stedet for totals

Som du kan se, er det ikke nok blot at sende beskeder til Kafka. Du skal vælge en kombination af parametre og være forberedt på at foretage hurtige ændringer. I denne artikel forsøgte jeg i detaljer at vise opsætningen af ​​præcis én gang levering og beskrev adskillige problemer med client.id og transactional.id konfigurationerne, som vi stødte på. Nedenfor er en oversigt over producent- og forbrugerindstillingerne.

Producer:

  1. acks = alle
  2. genforsøg > 0
  3. enable.idempotence = sand
  4. max.in.flight.anmodninger.per.forbindelse ≤ 5 (1 for ordnet afsendelse)
  5. transactional.id = ${application-name}-${hostname}

Forbruger:

  1. isolation.level = read_committed

For at minimere fejl i fremtidige applikationer, lavede vi vores egen indpakning over fjederkonfigurationen, hvor værdier for nogle af de anførte parametre allerede er indstillet.

Her er et par materialer til selvstudie:

Kilde: www.habr.com

Tilføj en kommentar