Hur Kafka blev verklighet

Hur Kafka blev verklighet

Hej Habr!

Jag arbetar på Tinkoff-teamet, som håller på att utveckla ett eget meddelandecenter. Jag utvecklar mest i Java med Spring boot och löser olika tekniska problem som uppstår i ett projekt.

De flesta av våra mikrotjänster kommunicerar med varandra asynkront genom en meddelandeförmedlare. Tidigare använde vi IBM MQ som mäklare, som inte längre klarade belastningen, men samtidigt hade höga leveransgarantier.

Som ersättare erbjöds vi Apache Kafka, som har hög skalningspotential, men som tyvärr kräver ett nästan individuellt förhållningssätt till konfiguration för olika scenarier. Dessutom tillät den minst en gång leveransmekanismen som fungerar i Kafka som standard inte att upprätthålla den erforderliga nivån av konsistens direkt. Därefter kommer jag att dela vår erfarenhet av Kafka-konfiguration, i synnerhet kommer jag att berätta hur du konfigurerar och lever med exakt en gång leverans.

Garanterad leverans med mera

Inställningarna som diskuteras nedan hjälper till att förhindra ett antal problem med standardinställningarna för anslutning. Men först skulle jag vilja uppmärksamma en parameter som kommer att underlätta en eventuell felsökning.

Detta kommer hjälpa Klient ID för producent och konsument. Vid första anblicken kan du använda applikationsnamnet som värde, och i de flesta fall kommer detta att fungera. Även om situationen när en applikation använder flera konsumenter och du ger dem samma client.id, resulterar i följande varning:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Om du vill använda JMX i en applikation med Kafka kan detta vara ett problem. I det här fallet är det bäst att använda en kombination av applikationsnamnet och till exempel ämnesnamnet som client.id-värde. Resultatet av vår konfiguration kan ses i kommandoutgången kafka-konsument-grupper från verktyg från Confluent:

Hur Kafka blev verklighet

Låt oss nu titta på scenariot för garanterad meddelandeleverans. Kafka Producer har en parameter acks, vilket låter dig konfigurera efter hur många bekräftelser klusterledaren behöver för att betrakta meddelandet som framgångsrikt skrivet. Denna parameter kan ha följande värden:

  • 0 — bekräfta kommer inte att beaktas.
  • 1 är standardparametern, endast 1 replik krävs för att bekräfta.
  • −1 — bekräftelse från alla synkroniserade repliker krävs (klusterinställning min.insync.repliker).

Från de listade värdena är det tydligt att acks lika med -1 ger den starkaste garantin att meddelandet inte kommer att gå förlorat.

Som vi alla vet är distribuerade system opålitliga. För att skydda mot övergående fel tillhandahåller Kafka Producer alternativet försöker igen, som låter dig ställa in antalet återsändningsförsök inom leverans.timeout.ms. Eftersom parametern omförsök har ett standardvärde på Integer.MAX_VALUE (2147483647), kan antalet meddelandeförsök justeras genom att endast ändra delivery.timeout.ms.

Vi går mot exakt en gång leverans

De listade inställningarna tillåter vår producent att leverera meddelanden med en hög garanti. Låt oss nu prata om hur man säkerställer att endast en kopia av ett meddelande skrivs till ett Kafka-ämne? I det enklaste fallet, för att göra detta, måste du ställa in parametern på Producer aktivera.idempotens till sant. Idempotens garanterar att endast ett meddelande skrivs till en specifik partition av ett ämne. Förutsättningen för att möjliggöra idempotens är värdena acks = alla, försök igen > 0, max.in.flight.requests.per.connection ≤ 5. Om dessa parametrar inte anges av utvecklaren, kommer värdena ovan att ställas in automatiskt.

När idempotens är konfigurerad är det nödvändigt att se till att samma meddelanden hamnar i samma partitioner varje gång. Detta kan göras genom att ställa in nyckeln partitioner.class och parametern till Producer. Låt oss börja med nyckeln. Det måste vara samma för varje inlämning. Detta kan enkelt uppnås genom att använda valfritt företags-ID från det ursprungliga inlägget. Parametern partitioner.class har ett standardvärde − DefaultPartitioner. Med denna partitioneringsstrategi agerar vi som standard så här:

  • Om partitionen uttryckligen anges när meddelandet skickas, använder vi den.
  • Om partitionen inte är specificerad, men nyckeln är specificerad, välj partitionen med nyckelns hash.
  • Om partitionen och nyckeln inte är specificerade, välj partitionerna en efter en (round-robin).

Också att använda en nyckel och idempotent sändning med en parameter max.in.flight.requests.per.connection = 1 ger dig strömlinjeformad meddelandebehandling på konsumenten. Det är också värt att komma ihåg att om åtkomstkontroll är konfigurerad på ditt kluster, kommer du att behöva rättigheter att idempotent skriva till ett ämne.

Om du plötsligt saknar kapaciteten för idempotent sändning med nyckel eller logiken på producentsidan kräver att datakonsistensen mellan olika partitioner upprätthålls, kommer transaktioner att komma till undsättning. Med hjälp av en kedjetransaktion kan du dessutom villkorligt synkronisera en post i Kafka, till exempel med en post i databasen. För att möjliggöra transaktionssändning till producenten måste den vara idempotent och extra inställd transaktions-id. Om ditt Kafka-kluster har åtkomstkontroll konfigurerad, kommer en transaktionspost, som en idempotent post, att behöva skrivbehörighet, som kan beviljas genom mask med värdet lagrat i transactional.id.

Formellt kan vilken sträng som helst, som applikationsnamnet, användas som en transaktionsidentifierare. Men om du startar flera instanser av samma applikation med samma transactional.id, kommer den först startade instansen att stoppas med ett fel, eftersom Kafka kommer att betrakta det som en zombieprocess.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

För att lösa detta problem lägger vi till ett suffix till applikationsnamnet i form av värdnamnet, som vi hämtar från miljövariabler.

Producenten är konfigurerad, men transaktioner på Kafka styr bara omfattningen av meddelandet. Oavsett transaktionsstatus går meddelandet omedelbart till ämnet, men har ytterligare systemattribut.

För att förhindra att sådana meddelanden läses av konsumenten i förväg måste den ställa in parametern isoleringsnivå till read_committed värde. En sådan konsument kommer att kunna läsa icke-transaktionella meddelanden som tidigare, och transaktionsmeddelanden endast efter en commit.
Om du har angett alla inställningar som anges tidigare, har du konfigurerat exakt en gång leverans. Grattis!

Men det finns en nyans till. Transactional.id, som vi konfigurerade ovan, är faktiskt transaktionsprefixet. På transaktionshanteraren läggs ett sekvensnummer till den. Den mottagna identifieraren utfärdas till transaktions-id.expiration.ms, som är konfigurerat på ett Kafka-kluster och har ett standardvärde på "7 dagar". Om applikationen inte har tagit emot några meddelanden under denna tid kommer du att få när du försöker nästa transaktionssändning InvalidPidMappingException. Transaktionskoordinatorn kommer sedan att utfärda ett nytt sekvensnummer för nästa transaktion. Meddelandet kan dock gå förlorat om InvalidPidMappingException inte hanteras korrekt.

I stället för totals

Som du kan se räcker det inte att bara skicka meddelanden till Kafka. Du måste välja en kombination av parametrar och vara beredd på att göra snabba ändringar. I den här artikeln försökte jag i detalj visa inställningarna för exakt en gång leverans och beskrev flera problem med client.id och transactional.id-konfigurationerna som vi stötte på. Nedan finns en sammanfattning av inställningarna för producent och konsument.

Producent:

  1. acks = alla
  2. försöker igen > 0
  3. enable.idempotence = sant
  4. max.in.flight.requests.per.connection ≤ 5 (1 för ordnad sändning)
  5. transactional.id = ${application-name}-${hostname}

Konsument:

  1. isolation.level = read_committed

För att minimera fel i framtida applikationer gjorde vi vår egen omslag över fjäderkonfigurationen, där värden för några av de listade parametrarna redan är inställda.

Här är ett par material för självstudier:

Källa: will.com

Lägg en kommentar