Kafka necə reallığa çevrildi

Kafka necə reallığa çevrildi

Hey Habr!

Mən öz bildiriş mərkəzini inkişaf etdirən Tinkoff komandasında işləyirəm. Mən əsasən Spring boot istifadə edərək Java-da inkişaf edirəm və layihədə yaranan müxtəlif texniki problemləri həll edirəm.

Mikroxidmətlərimizin əksəriyyəti mesaj brokeri vasitəsilə bir-biri ilə asinxron əlaqə qurur. Əvvəllər biz artıq yükün öhdəsindən gələ bilməyən, eyni zamanda yüksək çatdırılma zəmanətinə malik olan IBM MQ-dan broker kimi istifadə edirdik.

Əvəzedici olaraq bizə yüksək miqyas potensialına malik olan, lakin təəssüf ki, müxtəlif ssenarilər üçün konfiqurasiyaya demək olar ki, fərdi yanaşma tələb edən Apache Kafka təklif edildi. Bundan əlavə, Kafkada standart olaraq işləyən ən azı bir dəfə çatdırılma mexanizmi qutudan kənarda lazımi ardıcıllıq səviyyəsini saxlamağa imkan vermədi. Bundan sonra, Kafka konfiqurasiyası ilə bağlı təcrübəmizi bölüşəcəyəm, xüsusən də bir dəfə çatdırılma ilə necə konfiqurasiya və yaşamaq lazım olduğunu söyləyəcəyəm.

Zəmanətli çatdırılma və s

Aşağıda müzakirə edilən parametrlər standart əlaqə parametrləri ilə bağlı bir sıra problemlərin qarşısını almağa kömək edəcək. Ancaq əvvəlcə mümkün səhvləri aradan qaldırmağa kömək edəcək bir parametrə diqqət yetirmək istərdim.

Bu kömək edəcək client.id İstehsalçı və İstehlakçı üçün. İlk baxışdan dəyər kimi tətbiqin adını istifadə edə bilərsiniz və əksər hallarda bu işləyəcək. Tətbiqin bir neçə İstehlakçıdan istifadə etməsi və onlara eyni client.id-ni verdiyiniz vəziyyət aşağıdakı xəbərdarlıqla nəticələnir:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

JMX-i Kafka ilə bir proqramda istifadə etmək istəyirsinizsə, bu problem ola bilər. Bu halda, müştəri.id dəyəri kimi proqram adının və məsələn, mövzu adının birləşməsindən istifadə etmək ən yaxşısıdır. Konfiqurasiyamızın nəticəsini komanda çıxışında görmək olar kafka-istehlakçı qrupları Confluent-in kommunal xidmətlərindən:

Kafka necə reallığa çevrildi

İndi zəmanətli mesajın çatdırılması üçün ssenariyə baxaq. Kafka Producer parametri var acks, bu, klaster liderinin mesajı uğurla yazılmış hesab etməsi üçün neçə təsdiqdən sonra konfiqurasiya etməyə imkan verir. Bu parametr aşağıdakı dəyərləri qəbul edə bilər:

  • 0 - etiraf nəzərə alınmayacaq.
  • 1 standart parametrdir, təsdiq etmək üçün yalnız 1 replika tələb olunur.
  • −1 — bütün sinxronlaşdırılmış replikalardan təsdiq tələb olunur (klaster quraşdırması min.insync.replikalar).

Sadalanan dəyərlərdən aydın olur ki, -1-ə bərabər olan acks mesajın itirilməyəcəyinə ən güclü zəmanət verir.

Hamımızın bildiyimiz kimi paylanmış sistemlər etibarsızdır. Keçici nasazlıqlardan qorunmaq üçün Kafka Producer seçimi təmin edir yenidən cəhd edirdaxilində təkrar göndərmə cəhdlərinin sayını təyin etməyə imkan verir çatdırılma.zaman aşımı.ms. Yenidən cəhd parametri Integer.MAX_VALUE (2147483647) defolt dəyərinə malik olduğundan, mesaj təkrar cəhdlərinin sayı yalnız çatdırılma.timeout.ms dəyişdirilərək tənzimlənə bilər.

Tam bir dəfə çatdırılma istiqamətində irəliləyirik

Sadalanan parametrlər İstehsalçımıza mesajları yüksək zəmanətlə çatdırmağa imkan verir. İndi gəlin Kafka mövzusuna mesajın yalnız bir nüsxəsinin yazılmasını necə təmin etmək barədə danışaq? Ən sadə halda, bunu etmək üçün Producer-də parametr təyin etməlisiniz aktivləşdirmək.idempotence doğruya. Idempotency bir mövzunun xüsusi bölməsinə yalnız bir mesaj yazılmasına zəmanət verir. İdempotentliyi təmin etmək üçün ilkin şərt dəyərlərdir acks = hamısı, yenidən cəhd edin > 0, max.in.flight.requests.per.connection ≤ 5. Əgər bu parametrlər tərtibatçı tərəfindən göstərilməyibsə, yuxarıdakı dəyərlər avtomatik olaraq təyin olunacaq.

İdempotentlik konfiqurasiya edildikdə, eyni mesajların hər dəfə eyni bölmələrdə bitməsini təmin etmək lazımdır. Bu, partitioner.class açarını və parametrini Producer olaraq təyin etməklə edilə bilər. Açardan başlayaq. Hər təqdimat üçün eyni olmalıdır. Bu, orijinal yazıdakı biznes identifikatorlarından hər hansı birini istifadə etməklə asanlıqla əldə edilə bilər. Partitioner.class parametrinin standart dəyəri var - DefaultPartitioner. Bu bölmə strategiyası ilə standart olaraq biz belə hərəkət edirik:

  • Mesaj göndərilərkən bölmə açıq şəkildə göstərilibsə, biz ondan istifadə edirik.
  • Bölmə göstərilməyibsə, lakin açar göstərilibsə, bölməni açarın hash ilə seçin.
  • Bölmə və açar göstərilməyibsə, bölmələri bir-bir seçin (dairəvi rejimdə).

Həmçinin, bir parametr ilə bir açar və idempotent göndərmə istifadə edərək max.in.flight.requests.per.connection = 1 sizə İstehlakçıda sadələşdirilmiş mesaj emalını verir. Onu da xatırlamağa dəyər ki, əgər giriş nəzarəti klasterinizdə konfiqurasiya edilibsə, o zaman mövzuya qeyri-müəyyən şəkildə yazmaq hüququna ehtiyacınız olacaq.

Birdən açarla idempotent göndərmə imkanlarınız çatışmırsa və ya İstehsalçı tərəfindəki məntiq müxtəlif bölmələr arasında məlumat ardıcıllığının saxlanmasını tələb edirsə, əməliyyatlar köməyə gələcək. Bundan əlavə, zəncirvari əməliyyatdan istifadə edərək, Kafkadakı bir qeydi, məsələn, verilənlər bazasındakı qeydlə şərti olaraq sinxronlaşdıra bilərsiniz. İstehsalçıya əməliyyat göndərilməsini aktivləşdirmək üçün o, idempotent olmalı və əlavə olaraq təyin edilməlidir transactional.id. Əgər giriş nəzarəti Kafka klasterinizdə konfiqurasiya edilibsə, o zaman idempotent qeyd kimi tranzaksiya qeydinə transferal.id-də saxlanan dəyərdən istifadə edərək maska ​​ilə verilə bilən yazma hüquqları lazımdır.

Formal olaraq, hər hansı bir sətir, məsələn, proqram adı, əməliyyat identifikatoru kimi istifadə edilə bilər. Ancaq eyni tətbiqin bir neçə instansiyasını eyni transactional.id ilə işə salsanız, Kafka bunu zombi prosesi hesab edəcəyi üçün ilk işə salınan instansiya xəta ilə dayandırılacaq.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Bu problemi həll etmək üçün proqram adına mühit dəyişənlərindən əldə etdiyimiz host adı şəklində şəkilçi əlavə edirik.

İstehsalçı konfiqurasiya olunub, lakin Kafka üzərindəki əməliyyatlar yalnız mesajın əhatə dairəsinə nəzarət edir. Əməliyyat statusundan asılı olmayaraq, mesaj dərhal mövzuya gedir, lakin əlavə sistem atributlarına malikdir.

Belə mesajların İstehlakçı tərəfindən vaxtından əvvəl oxunmasının qarşısını almaq üçün o, parametri təyin etməlidir izolyasiya.səviyyə read_committed dəyərinə. Belə bir İstehlakçı əvvəlki kimi qeyri-transaksiya xarakterli mesajları, tranzaksiya mesajlarını isə yalnız öhdəlik götürdükdən sonra oxuya biləcək.
Əgər əvvəllər sadalanan bütün parametrləri təyin etmisinizsə, deməli, çatdırılmanı tam olaraq bir dəfə konfiqurasiya etmisiniz. Təbrik edirik!

Ancaq daha bir nüans var. Yuxarıda konfiqurasiya etdiyimiz Transactional.id əslində əməliyyat prefiksidir. Əməliyyat menecerində ona ardıcıllıq nömrəsi əlavə olunur. Qəbul edilmiş identifikator verilir əməliyyat.id.expiration.ms, Kafka klasterində konfiqurasiya edilmiş və standart dəyəri “7 gün”. Əgər bu müddət ərzində proqram heç bir mesaj almayıbsa, növbəti əməliyyat göndərməyə cəhd etdiyiniz zaman siz alacaqsınız InvalidPidMappingException. Sonra əməliyyat koordinatoru növbəti əməliyyat üçün yeni ardıcıllıq nömrəsi verəcəkdir. Bununla belə, InvalidPidMappingException düzgün idarə olunmazsa, mesaj itirilə bilər.

Nəticə əvəzinə

Göründüyü kimi, Kafkaya sadəcə mesaj göndərmək kifayət deyil. Parametrlərin birləşməsini seçməli və tez dəyişikliklər etməyə hazır olmalısınız. Bu yazıda mən tam olaraq bir dəfə çatdırılma konfiqurasiyasını ətraflı göstərməyə çalışdım və qarşılaşdığımız client.id və transactional.id konfiqurasiyaları ilə bağlı bir sıra problemləri təsvir etdim. Aşağıda İstehsalçı və İstehlakçı parametrlərinin xülasəsi verilmişdir.

İstehsalçı:

  1. acks = hamısı
  2. təkrar cəhdlər > 0
  3. enable.idempotence = doğrudur
  4. max.in.flight.requests.per.connection ≤ 5 (sifarişlə göndərmə üçün 1)
  5. transactional.id = ${application-name}-${hostname}

İstehlakçı:

  1. isolation.level = oxunan_təhsil

Gələcək tətbiqlərdə səhvləri minimuma endirmək üçün, sadalanan bəzi parametrlər üçün dəyərlərin artıq təyin olunduğu yay konfiqurasiyası üzərində öz sarğımızı etdik.

Özünü öyrənmək üçün bir neçə material var:

Mənbə: www.habr.com

Добавить комментарий