Hogyan vált Kafka valósággá

Hogyan vált Kafka valósággá

Szia Habr!

A Tinkoff csapatánál dolgozom, amely saját értesítési központot fejleszt. Leginkább Java-ban fejlesztek Spring boot segítségével, és oldok meg különféle technikai problémákat, amelyek egy projektben adódnak.

A legtöbb mikroszolgáltatásunk aszinkron módon kommunikál egymással üzenetközvetítőn keresztül. Korábban az IBM MQ-t használtuk brókerként, amely már nem bírta a terhelést, ugyanakkor magas szállítási garanciákkal rendelkezett.

Csereként az Apache Kafkát ajánlották fel, amely nagy skálázási potenciállal rendelkezik, de sajnos szinte egyéni megközelítést igényel a különböző forgatókönyvek konfigurációjához. Ezenkívül a legalább egyszeri szállítási mechanizmus, amely alapértelmezés szerint működik a Kafkában, nem tette lehetővé a szükséges konzisztenciaszint fenntartását a dobozból. Ezután megosztom a Kafka konfigurációval kapcsolatos tapasztalatainkat, különösen azt, hogyan kell konfigurálni és élni pontosan az egyszeri szállítással.

Garantált szállítás és így tovább

Az alábbiakban tárgyalt beállítások segítenek megelőzni az alapértelmezett kapcsolati beállításokkal kapcsolatos számos problémát. Először azonban szeretnék egy olyan paraméterre figyelni, amely megkönnyíti az esetleges hibakeresést.

Ez segíteni fog Ügyfélazonosító Termelők és Fogyasztók számára. Első pillantásra az alkalmazás nevét használhatja értékként, és a legtöbb esetben ez működni fog. Bár az a helyzet, amikor egy alkalmazás több fogyasztót használ, és Ön ugyanazt a client.id-t adja meg, a következő figyelmeztetést eredményezi:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Ha a JMX-et egy Kafka-alkalmazásban szeretné használni, akkor ez probléma lehet. Ebben az esetben a legjobb az alkalmazásnév és például a témakör nevének kombinációját használni ügyfél.id értékként. Konfigurációnk eredménye a parancskimenetben látható kafka-fogyasztói csoportok a Confluent közművekből:

Hogyan vált Kafka valósággá

Most nézzük meg a garantált üzenetkézbesítés forgatókönyvét. A Kafka Producernek van egy paramétere kakasok, amely lehetővé teszi annak konfigurálását, hogy hány nyugtázás után kell a fürtvezetőnek az üzenet sikeres megírásához. Ez a paraméter a következő értékeket veheti fel:

  • 0 – a nyugtázást nem veszi figyelembe.
  • Az 1 az alapértelmezett paraméter, csak 1 replika szükséges a nyugtázáshoz.
  • −1 — minden szinkronizált replikától nyugtázás szükséges (fürtbeállítás min.insync.replicas).

A felsorolt ​​értékekből jól látható, hogy a −1-gyel egyenlő akksik adják a legerősebb garanciát arra, hogy az üzenet nem vész el.

Mint mindannyian tudjuk, az elosztott rendszerek megbízhatatlanok. Az átmeneti hibák elleni védelem érdekében a Kafka Producer lehetőséget biztosít újrapróbálkozik, amely lehetővé teszi az újraküldési kísérletek számának beállítását belül szállítás.időtúllépés.ms. Mivel az újrapróbálkozások paraméter alapértelmezett értéke Integer.MAX_VALUE (2147483647), az üzenetújrapróbálkozások száma csak a delivery.timeout.ms módosításával módosítható.

A pontosan egyszeri szállítás felé haladunk

A felsorolt ​​beállítások lehetővé teszik, hogy Termelőnk magas garanciával kézbesítse az üzeneteket. Most beszéljünk arról, hogyan biztosítható, hogy egy Kafka-témához csak egy üzenetet írjunk? A legegyszerűbb esetben ehhez be kell állítani a paramétert a Produceren lehetővé teszi.idempotencia igaznak. Az Idempotencia garantálja, hogy egy témakör egy adott partíciójára csak egy üzenetet írnak. Az idempotencia lehetővé tételének előfeltétele az értékek acks = minden, újrapróbálkozás > 0, max.in.flight.requests.per.connection ≤ 5. Ha ezeket a paramétereket a fejlesztő nem adja meg, a fenti értékek automatikusan beállnak.

Az idempotencia beállításakor gondoskodni kell arról, hogy minden alkalommal ugyanazok az üzenetek ugyanabban a partícióban legyenek. Ezt úgy teheti meg, hogy a partitioner.class kulcsot és paramétert Producer értékre állítja. Kezdjük a kulccsal. Minden benyújtásnál azonosnak kell lennie. Ez könnyen elérhető az eredeti bejegyzésben szereplő üzleti azonosítók bármelyikének használatával. A partitioner.class paraméter alapértelmezett értéke – DefaultPartitioner. Ezzel a particionálási stratégiával alapértelmezés szerint a következőképpen járunk el:

  • Ha az üzenet küldésekor kifejezetten meg van adva a partíció, akkor azt használjuk.
  • Ha a partíció nincs megadva, de a kulcs meg van adva, válassza ki a partíciót a kulcs hash-je alapján.
  • Ha a partíció és a kulcs nincs megadva, válassza ki a partíciókat egyenként (körbevetés).

Továbbá kulcs használata és idempotens küldés paraméterrel max.in.flight.requests.per.connection = 1 egyszerűsített üzenetfeldolgozást biztosít a fogyasztó számára. Azt is érdemes megjegyezni, hogy ha a hozzáférés-vezérlés be van állítva a fürtön, akkor jogosultságra lesz szüksége ahhoz, hogy egy témába illetéktelenül írjon.

Ha hirtelen hiányzik az idempotens kulcsos küldés képessége, vagy a Producer oldali logika megköveteli az adatok konzisztenciájának fenntartását a különböző partíciók között, akkor a tranzakciók jönnek a segítségre. Ezenkívül egy lánctranzakció használatával feltételesen szinkronizálhat egy rekordot a Kafka-ban, például egy rekorddal az adatbázisban. A Gyártónak történő tranzakciós küldés engedélyezéséhez idempotensnek és kiegészítőleg beállítottnak kell lennie tranzakciós.id. Ha a Kafka-fürt hozzáférés-vezérléssel rendelkezik, akkor a tranzakciós rekordnak, mint az idempotens rekordnak, írási engedélyekre lesz szüksége, amelyeket maszkkal lehet megadni a tranzakcióal.id fájlban tárolt érték használatával.

Formálisan bármely karakterlánc, például az alkalmazás neve, használható tranzakcióazonosítóként. De ha ugyanannak az alkalmazásnak több példányát indítja el ugyanazzal a tranzakciós.id-vel, akkor az elsőként elindított példány hibával leáll, mivel Kafka zombifolyamatnak tekinti.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

A probléma megoldására az alkalmazás nevéhez egy utótagot adunk a gazdagépnév formájában, amelyet a környezeti változókból kapunk.

Az előállító konfigurálva van, de a Kafka tranzakciói csak az üzenet hatókörét szabályozzák. A tranzakció állapotától függetlenül az üzenet azonnal a témához megy, de további rendszerattribútumokkal rendelkezik.

Annak elkerülése érdekében, hogy a Fogyasztó időben elolvassa az ilyen üzeneteket, be kell állítania a paramétert elszigeteltség.szint a read_committed értékhez. Az ilyen Fogyasztó képes lesz olvasni a nem tranzakciós üzeneteket, mint korábban, és a tranzakciós üzeneteket csak commit után.
Ha az összes korábban felsorolt ​​beállítást beállította, akkor pontosan a kiszállítást követően konfigurálta. Gratulálunk!

De van még egy árnyalat. A Transaction.id, amelyet fent konfiguráltunk, valójában a tranzakció előtagja. A tranzakciókezelőben sorszámot adnak hozzá. A kapott azonosítót a rendszer kiadja tranzakciós.id.expiration.ms, amely egy Kafka-fürtön van konfigurálva, és alapértelmezett értéke „7 nap”. Ha ezalatt az alkalmazás nem kapott üzenetet, akkor a következő tranzakciós küldés próbálkozásakor kapni fog InvalidPidMappingException. A tranzakciókoordinátor ezután új sorszámot ad ki a következő tranzakcióhoz. Az üzenet azonban elveszhet, ha az InvalidPidMappingException nem kezeli megfelelően.

Összegek helyett

Amint látja, nem elég egyszerűen üzeneteket küldeni Kafkának. Ki kell választania a paraméterek kombinációját, és fel kell készülnie a gyors változtatásokra. Ebben a cikkben megpróbáltam részletesen bemutatni a pontosan egyszeri kézbesítési beállítást, és leírtam számos problémát a client.id és a tranzakciós.id konfigurációkkal kapcsolatban, amelyekkel találkoztunk. Az alábbiakban összefoglaljuk a gyártói és fogyasztói beállításokat.

Producer:

  1. acks = minden
  2. újrapróbálkozás > 0
  3. enable.idempotence = igaz
  4. max.in.flight.requests.per.connection ≤ 5 (1 a szabályos küldéshez)
  5. tranzakciós.id = ${alkalmazásnév}-${gazdanév}

Fogyasztó:

  1. isolation.level = read_committed

A jövőbeni alkalmazások hibáinak minimalizálása érdekében saját burkolóanyagot készítettünk a rugós konfiguráció felett, ahol a felsorolt ​​paraméterek egy részének értékei már be vannak állítva.

Íme néhány anyag az önálló tanuláshoz:

Forrás: will.com

Hozzászólás