Как Кафка стана реалност

Как Кафка стана реалност

Хей Хабр!

Работя в екипа на Tinkoff, който разработва собствен център за уведомяване. В по-голямата си част разработвам в Java с помощта на Spring boot и решавам различни технически проблеми, които възникват в проекта.

Повечето от нашите микроуслуги комуникират една с друга асинхронно чрез брокер на съобщения. Преди това използвахме IBM MQ като брокер, който вече не можеше да се справи с товара, но в същото време имаше високи гаранции за доставка.

Като заместител ни беше предложен Apache Kafka, който има висок потенциал за мащабиране, но, за съжаление, изисква почти индивидуален подход за конфигуриране за различни сценарии. В допълнение, механизмът за доставка поне веднъж, който работи в Kafka по подразбиране, не позволи поддържането на необходимото ниво на последователност извън кутията. След това ще споделя нашия опит с конфигурирането на Kafka, по-специално как да настроите и да живеете с доставка точно веднъж.

Гарантирана доставка и др

Обсъдените по-долу опции ще помогнат за предотвратяване на редица проблеми с настройките за връзка по подразбиране. Но първо бих искал да обърна внимание на един параметър, който ще улесни евентуално отстраняване на грешки.

Това ще помогне client.id за производител и потребител. На пръв поглед можете да използвате името на приложението като стойност и в повечето случаи това ще работи. Въпреки че ситуацията, когато има няколко потребители в приложението и вие им давате един и същ client.id, води до следното предупреждение:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Ако искате да използвате JMX в приложение с Kafka, това може да е проблем. В този случай е най-добре да използвате комбинация от името на приложението и, например, името на темата като стойност на client.id. Резултатът от нашата конфигурация може да се види в изхода на командата kafka-потребителски групи от помощни програми от Confluent:

Как Кафка стана реалност

Сега нека разгледаме сценария за гарантирана доставка на съобщения. Kafka Producer има параметър АСК, което ви позволява да конфигурирате след колко потвърждения лидерът на клъстера трябва да счита съобщението за успешно написано. Този параметър може да приема следните стойности:

  • 0 - потвърждението няма да се зачита.
  • 1 — параметър по подразбиране, само 1 реплика трябва да бъде потвърдена.
  • −1 — изисква се потвърждение от всички синхронизирани реплики (настройка на клъстер min.insync.реплики).

От изброените стойности се вижда, че acks равно на -1 дава най-силната гаранция, че съобщението няма да бъде загубено.

Както всички знаем, разпределените системи са ненадеждни. За да се предпази от преходни повреди, Kafka Producer предоставя параметъра повторни опити, което ви позволява да зададете броя на опитите за повторно изпращане в рамките доставка.изчакване.ms. Тъй като параметърът за повторни опити има стойност по подразбиране Integer.MAX_VALUE (2147483647), броят на повторните изпращания на съобщението може да се коригира чрез промяна само на delivery.timeout.ms.

Преминаване към доставка точно веднъж

Тези настройки позволяват на нашия продуцент да доставя съобщения с висока гаранция. Нека сега да поговорим за това как да гарантираме, че само едно копие на съобщение е написано в Kafka тема? В най-простия случай за това трябва да зададете параметъра Producer позволявам.идемпотентност до вярно. Идемпотентността гарантира, че само едно съобщение е написано в определен дял на една тема. Предпоставката за разрешаване на идемпотентност са ценностите acks = всички, повторен опит > 0, max.in.flight.requests.per.connection ≤ 5. Ако тези параметри не са зададени от разработчика, горните стойности ще бъдат зададени автоматично.

Когато е настроена идемпотентност, е необходимо да се гарантира, че едни и същи съобщения завършват в едни и същи дялове всеки път. Това може да стане чрез задаване на ключа и параметъра partitioner.class на Producer. Да започнем с ключа. Трябва да е еднакъв за всяко изпращане. Това се постига лесно с помощта на бизнес идентификатор от оригиналната публикация. Параметърът partitioner.class има стойност по подразбиране − DefaultPartitioner. С тази стратегия за разделяне по подразбиране ние действаме по следния начин:

  • Ако дялът е изрично посочен при изпращане на съобщението, тогава ние го използваме.
  • Ако дялът не е посочен, но ключът е посочен, изберете дяла по хеша на ключа.
  • Ако дялът и ключът не са зададени, изберете дяловете на свой ред (кръгло).

Също така, използване на ключ и идемпотентно изпращане с параметър max.in.flight.requests.per.connection = 1 ви дава подредена обработка на съобщения на Consumer. Отделно си струва да запомните, че ако контролът на достъпа е конфигуриран на вашия клъстер, тогава ще ви трябват права за идемпотентно писане в темата.

Ако внезапно ви липсва възможността да изпращате идемпотент по ключ или логиката от страната на производителя изисква поддържане на съгласуваност на данните между различни дялове, тогава транзакциите ще дойдат на помощ. Освен това, използвайки верижна транзакция, можете условно да синхронизирате запис в Kafka, например, със запис в база данни. За да активирате транзакционно изпращане до Producer, той трябва да бъде идемпотентен и допълнително зададен транзакционен.id. Ако вашият Kafka клъстер има конфигуриран контрол на достъпа, тогава транзакционен запис, като идемпотентен, ще се нуждае от разрешения за запис, които могат да бъдат предоставени чрез маска, като се използва стойността, съхранена в transactional.id.

Формално всеки низ, като името на приложението, може да се използва като идентификатор на транзакция. Но ако стартирате няколко екземпляра на едно и също приложение с един и същ транзакционен.id, тогава първият стартиран екземпляр ще бъде спрян с грешка, защото Kafka ще го счита за зомби процес.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

За да разрешим този проблем, добавяме суфикс на име на хост към името на приложението, което получаваме от променливите на средата.

Производителят е настроен, но транзакциите на Kafka контролират само обхвата на съобщението. Независимо от състоянието на транзакцията, съобщението веднага влиза в темата, но има допълнителни системни атрибути.

За да се предотвратят такива съобщения да бъдат прочетени от Потребителя преди време, той трябва да зададе параметъра ниво на изолация към стойността read_committed. Такъв Потребител ще може да чете не-транзакционни съобщения, както преди, и транзакционни съобщения само след ангажимента.
Ако сте задали всички настройки, изброени по-горе, тогава сте конфигурирали точно веднъж доставка. Честито!

Но има още един нюанс. Transactional.id, който сме задали по-горе, всъщност е префиксът на транзакцията. В мениджъра на транзакциите към него се добавя пореден номер. Полученият идентификатор се издава на transactional.id.expiration.ms, който е конфигуриран на клъстера Kafka и има стойност по подразбиране от 7 дни. Ако през това време приложението не получи никакви съобщения, тогава, когато опитате следващото транзакционно изпращане, ще получите InvalidPidMappingException. След това координаторът на транзакцията ще издаде нов пореден номер за следващата транзакция. Съобщението обаче може да се загуби, ако InvalidPidMappingException не се обработи правилно.

Вместо общо

Както можете да видите, не е достатъчно просто да изпращате съобщения до Кафка. Трябва да изберете комбинация от параметри и да сте готови за бързи промени. В тази статия се опитах да покажа подробно настройката за доставка точно веднъж и описах няколко проблема с конфигурациите client.id и transactional.id, на които се натъкнахме. Настройките на производителя и потребителя са обобщени по-долу.

производител:

  1. acks = всички
  2. повторни опити > 0
  3. enable.idempotence = вярно
  4. max.in.flight.requests.per.connection ≤ 5 (1 за поръчано изпращане)
  5. transactional.id = ${име-на-приложение}-${име на хост}

Консуматор:

  1. isolation.level = read_committed

За да сведем до минимум грешките в бъдещи приложения, ние създадохме собствена обвивка върху пролетната конфигурация, където стойностите на някои от изброените параметри вече са зададени.

И ето няколко материала за самообучение:

Източник: www.habr.com

Добавяне на нов коментар