ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Π°Ρ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° событий, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½Ρ‹Ρ… ΠΈΠ· Kafka

ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Π°Ρ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° событий, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½Ρ‹Ρ… ΠΈΠ· Kafka

ΠŸΡ€ΠΈΠ²Π΅Ρ‚, Π₯Π°Π±Ρ€.

НСдавно я подСлился ΠΎΠΏΡ‹Ρ‚ΠΎΠΌ ΠΎ Ρ‚ΠΎΠΌ, ΠΊΠ°ΠΊΠΈΠ΅ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹ ΠΌΡ‹ Π² ΠΊΠΎΠΌΠ°Π½Π΄Π΅ Ρ‡Π°Ρ‰Π΅ всСго ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ для Kafka Producer ΠΈ Consumer, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΡ€ΠΈΠ±Π»ΠΈΠ·ΠΈΡ‚ΡŒΡΡ ΠΊ Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡ€ΠΎΠ²Π°Π½Π½ΠΎΠΉ доставкС. Π’ этой ΡΡ‚Π°Ρ‚ΡŒΠ΅ Ρ…ΠΎΡ‡Ρƒ Ρ€Π°ΡΡΠΊΠ°Π·Π°Ρ‚ΡŒ, ΠΊΠ°ΠΊ ΠΌΡ‹ ΠΎΡ€Π³Π°Π½ΠΈΠ·ΠΎΠ²Π°Π»ΠΈ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΡƒΡŽ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ события, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½ΠΎΠ³ΠΎ ΠΈΠ· Kafka, Π² Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π΅ Π²Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ нСдоступности внСшнСй систСмы.

Π‘ΠΎΠ²Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ прилоТСния Ρ€Π°Π±ΠΎΡ‚Π°ΡŽΡ‚ Π² ΠΎΡ‡Π΅Π½ΡŒ слоТной срСдС. БизнСс-Π»ΠΎΠ³ΠΈΠΊΠ°, обСрнутая Π² соврСмСнный тСхнологичСский стСк, Ρ€Π°Π±ΠΎΡ‚Π°ΡŽΡ‰Π°Ρ Π² Docker-ΠΎΠ±Ρ€Π°Π·Π΅, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ управляСтся оркСстратором Π²Ρ€ΠΎΠ΄Π΅ Kubernetes ΠΈΠ»ΠΈ OpenShift, ΠΈ ΠΊΠΎΠΌΠΌΡƒΠ½ΠΈΡ†ΠΈΡ€ΡƒΡŽΡ‰Π°Ρ с Π΄Ρ€ΡƒΠ³ΠΈΠΌΠΈ прилоТСниями ΠΈΠ»ΠΈ enterprise-Ρ€Π΅ΡˆΠ΅Π½ΠΈΡΠΌΠΈ Ρ‡Π΅Ρ€Π΅Π· Ρ†Π΅ΠΏΠΎΡ‡ΠΊΡƒ физичСских ΠΈ Π²ΠΈΡ€Ρ‚ΡƒΠ°Π»ΡŒΠ½Ρ‹Ρ… ΠΌΠ°Ρ€ΡˆΡ€ΡƒΡ‚ΠΈΠ·Π°Ρ‚ΠΎΡ€ΠΎΠ². Π’ Ρ‚Π°ΠΊΠΎΠΌ ΠΎΠΊΡ€ΡƒΠΆΠ΅Π½ΠΈΠΈ всСгда Ρ‡Ρ‚ΠΎ-Ρ‚ΠΎ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΠ»ΠΎΠΌΠ°Ρ‚ΡŒΡΡ, поэтому повторная ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° событий Π² случаС нСдоступности ΠΎΠ΄Π½ΠΎΠΉ ΠΈΠ· Π²Π½Π΅ΡˆΠ½ΠΈΡ… систСм β€” ваТная Ρ‡Π°ΡΡ‚ΡŒ Π½Π°ΡˆΠΈΡ… бизнСс-процСссов.

Как Π±Ρ‹Π»ΠΎ Π΄ΠΎ Kafka

Π Π°Π½Π΅Π΅ Π² ΠΏΡ€ΠΎΠ΅ΠΊΡ‚Π΅ ΠΌΡ‹ использовали IBM MQ для асинхронной доставки сообщСний. ΠŸΡ€ΠΈ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΠΎΠ²Π΅Π½ΠΈΠΈ ΠΊΠ°ΠΊΠΎΠΉ-Π»ΠΈΠ±ΠΎ ошибки Π² процСссС Ρ€Π°Π±ΠΎΡ‚Ρ‹ сСрвиса ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½ΠΎΠ΅ сообщСниС ΠΌΠΎΠ³Π»ΠΎ Π±Ρ‹Ρ‚ΡŒ ΠΏΠΎΠΌΠ΅Ρ‰Π΅Π½ΠΎ Π² dead-letter-queue (DLQ) для дальнСйшСго Ρ€ΡƒΡ‡Π½ΠΎΠ³ΠΎ Ρ€Π°Π·Π±ΠΎΡ€Π°. DLQ создавался рядом с входящСй ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒΡŽ, ΠΏΠ΅Ρ€Π΅ΠΊΠ»Π°Π΄Ρ‹Π²Π°Π½ΠΈΠ΅ сообщСния происходило Π²Π½ΡƒΡ‚Ρ€ΠΈ IBM MQ.

Если ошибка ΠΈΠΌΠ΅Π»Π° Π²Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹ΠΉ Ρ…Π°Ρ€Π°ΠΊΡ‚Π΅Ρ€ ΠΈ ΠΌΡ‹ ΠΌΠΎΠ³Π»ΠΈ это ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΠΈΡ‚ΡŒ (Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ResourceAccessException ΠΏΡ€ΠΈ HTTP-Π²Ρ‹Π·ΠΎΠ²Π΅ ΠΈΠ»ΠΈ MongoTimeoutException ΠΏΡ€ΠΈ запросС Π² MongoDb), Ρ‚ΠΎ Π² силу вступала стратСгия ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ². Π’Π½Π΅ зависимости ΠΎΡ‚ вСтвлСния Π»ΠΎΠ³ΠΈΠΊΠΈ прилоТСния, исходноС сообщСниС ΠΏΠ΅Ρ€Π΅ΠΊΠ»Π°Π΄Ρ‹Π²Π°Π»ΠΎΡΡŒ ΠΈΠ»ΠΈ Π² ΡΠΈΡΡ‚Π΅ΠΌΠ½ΡƒΡŽ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ для ΠΎΡ‚Π»ΠΎΠΆΠ΅Π½Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ, ΠΈΠ»ΠΈ Π² ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½ΠΎΠ΅ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ ΠΊΠΎΠ³Π΄Π°-Ρ‚ΠΎ Π΄Π°Π²Π½ΠΎ Π±Ρ‹Π»ΠΎ сдСлано для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ сообщСний. ΠŸΡ€ΠΈ этом Π² Π·Π°Π³ΠΎΠ»ΠΎΠ²ΠΎΠΊ сообщСния записываСтся Π½ΠΎΠΌΠ΅Ρ€ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ привязан ΠΊ ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π»Ρƒ Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΈ ΠΈΠ»ΠΈ ΠΊ ΠΊΠΎΠ½Ρ†Ρƒ стратСгии Π½Π° ΡƒΡ€ΠΎΠ²Π½Π΅ прилоТСния. Если ΠΌΡ‹ достигли ΠΊΠΎΠ½Ρ†Π° стратСгии, Π½ΠΎ внСшняя систСма всС Π΅Ρ‰Π΅ нСдоступна, Ρ‚ΠΎ сообщСниС Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠΌΠ΅Ρ‰Π΅Π½ΠΎ Π² DLQ для Ρ€ΡƒΡ‡Π½ΠΎΠ³ΠΎ Ρ€Π°Π·Π±ΠΎΡ€Π°.

Поиск Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ

Поискав Π² ΠΈΠ½Ρ‚Π΅Ρ€Π½Π΅Ρ‚Π΅, ΠΌΠΎΠΆΠ½ΠΎ Π½Π°ΠΉΡ‚ΠΈ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅Π΅ Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅. Если ΠΊΠΎΡ€ΠΎΡ‚ΠΊΠΎ, Ρ‚ΠΎ прСдлагаСтся завСсти ΠΏΠΎ Ρ‚ΠΎΠΏΠΈΠΊΡƒ Π½Π° ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΈ ΠΈ Ρ€Π΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Ρ‚ΡŒ Π½Π° сторонС прилоТСния Consumer’ы, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Ρ‚ΡŒ сообщСния с Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎΠΉ Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ.

ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Π°Ρ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° событий, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½Ρ‹Ρ… ΠΈΠ· Kafka

НСсмотря Π½Π° большоС количСство ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… ΠΎΡ‚Π·Ρ‹Π²ΠΎΠ², ΠΎΠ½ΠΎ каТСтся ΠΌΠ½Π΅ Π½Π΅ совсСм ΡƒΠ΄Π°Ρ‡Π½Ρ‹ΠΌ. Π’ ΠΏΠ΅Ρ€Π²ΡƒΡŽ ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ ΠΏΠΎΡ‚ΠΎΠΌΡƒ, Ρ‡Ρ‚ΠΎ Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΡƒ, ΠΊΡ€ΠΎΠΌΠ΅ Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ бизнСс-Ρ‚Ρ€Π΅Π±ΠΎΠ²Π°Π½ΠΈΠΉ, придСтся ΠΏΠΎΡ‚Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ ΠΌΠ½ΠΎΠ³ΠΎ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ Π½Π° Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΡŽ описанного ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌΠ°.

ΠšΡ€ΠΎΠΌΠ΅ Ρ‚ΠΎΠ³ΠΎ, Ссли Π½Π° Kafka-кластСрС Π²ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΎ ΡƒΠΏΡ€Π°Π²Π»Π΅Π½ΠΈΠ΅ доступами, Ρ‚ΠΎ придСтся ΠΏΠΎΡ‚Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ ΠΊΠ°ΠΊΠΎΠ΅-Ρ‚ΠΎ врСмя Π½Π° Π·Π°Π²Π΅Π΄Π΅Π½ΠΈΠ΅ Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ² ΠΈ обСспСчСниС Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΡ‹Ρ… доступов ΠΊ Π½ΠΈΠΌ. Π’ Π΄ΠΎΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊ этому Π½ΡƒΠΆΠ½ΠΎ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠ΄Π±ΠΈΡ€Π°Ρ‚ΡŒ ΠΏΡ€Π°Π²ΠΈΠ»ΡŒΠ½Ρ‹ΠΉ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ retention.ms для ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ ΠΈΠ· Ρ€Π΅Ρ‚Ρ€Π°ΠΉ-Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ², Ρ‡Ρ‚ΠΎΠ±Ρ‹ сообщСния успСвали ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎ ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡ‚ΡŒΡΡ ΠΈ Π½Π΅ ΠΏΡ€ΠΎΠΏΠ°Π΄Π°Π»ΠΈ ΠΈΠ· Π½Π΅Π³ΠΎ. Π Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΡŽ ΠΈ запрос доступов придСтся ΠΏΠΎΠ²Ρ‚ΠΎΡ€ΡΡ‚ΡŒ для ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ ΡΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‰Π΅Π³ΠΎ ΠΈΠ»ΠΈ Π½ΠΎΠ²ΠΎΠ³ΠΎ сСрвиса.

Π”Π°Π²Π°ΠΉΡ‚Π΅ Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ посмотрим, ΠΊΠ°ΠΊΠΈΠ΅ ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌΡ‹ для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ сообщСния прСдоставляСт Π½Π°ΠΌ spring Π² Ρ†Π΅Π»ΠΎΠΌ ΠΈ spring-kafka Π² частности. Spring-kafka ΠΈΠΌΠ΅Π΅Ρ‚ Ρ‚Ρ€Π°Π½Π·ΠΈΡ‚ΠΈΠ²Π½ΡƒΡŽ Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡ‚ΡŒ Π½Π° spring-retry, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ прСдоставляСт абстракции для управлСния Ρ€Π°Π·Π½Ρ‹ΠΌΠΈ BackOffPolicy. Π­Ρ‚ΠΎ довольно Π³ΠΈΠ±ΠΊΠΈΠΉ инструмСнт, Π½ΠΎ Π΅Π³ΠΎ Π·Π½Π°Ρ‡ΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹ΠΌ нСдостатком являСтся Ρ…Ρ€Π°Π½Π΅Π½ΠΈΠ΅ сообщСний для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ Π² памяти прилоТСния. Π­Ρ‚ΠΎ Π·Π½Π°Ρ‡ΠΈΡ‚, Ρ‡Ρ‚ΠΎ пСрСзапуск прилоТСния ΠΈΠ·-Π·Π° обновлСния ΠΈΠ»ΠΈ ошибки Π²ΠΎ врСмя эксплуатации ΠΏΡ€ΠΈΠ²Π΅Π΄Π΅Ρ‚ ΠΊ ΠΏΠΎΡ‚Π΅Ρ€Π΅ всСх сообщСний, ΠΎΠΆΠΈΠ΄Π°ΡŽΡ‰ΠΈΡ… ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ. Π’Π°ΠΊ ΠΊΠ°ΠΊ этот ΠΏΡƒΠ½ΠΊΡ‚ ΠΊΡ€ΠΈΡ‚ΠΈΡ‡Π΅Π½ для нашСй систСмы, ΠΌΡ‹ Π½Π΅ стали Ρ€Π°ΡΡΠΌΠ°Ρ‚Ρ€ΠΈΠ²Π°Ρ‚ΡŒ Π΅Π³ΠΎ дальшС.

Π‘Π°ΠΌΠ° spring-kafka прСдоставляСт нСсколько Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΉ ContainerAwareErrorHandler, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ SeekToCurrentErrorHandler, с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ ΠΌΠΎΠΆΠ½ΠΎ, Π½Π΅ смСщая offset Π² случаС возникновСния ошибки, ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ сообщСниС ΠΏΠΎΠ·ΠΆΠ΅. Начиная с вСрсии spring-kafka 2.3 появилась Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Π·Π°Π΄Π°Π²Π°Ρ‚ΡŒ BackOffPolicy.

Π­Ρ‚ΠΎΡ‚ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ позволяСт ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎ ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅ΠΌΡ‹ΠΌ сообщСниям ΠΏΠ΅Ρ€Π΅ΠΆΠΈΠ²Π°Ρ‚ΡŒ рСстарт прилоТСния, Π½ΠΎ ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌ DLQ ΠΏΠΎ-ΠΏΡ€Π΅ΠΆΠ½Π΅ΠΌΡƒ отсутствуСт. ИмСнно этот Π²Π°Ρ€ΠΈΠ°Π½Ρ‚ ΠΌΡ‹ Π²Ρ‹Π±Ρ€Π°Π»ΠΈ Π² Π½Π°Ρ‡Π°Π»Π΅ 2019 Π³ΠΎΠ΄Π°, оптимистично полагая, Ρ‡Ρ‚ΠΎ DLQ Π½Π΅ понадобится (Π½Π°ΠΌ ΠΏΠΎΠ²Π΅Π·Π»ΠΎ ΠΈ ΠΎΠ½ Π΄Π΅ΠΉΡΡ‚Π²ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎ Π½Π΅ понадобился Π·Π° нСсколько мСсяцСв эксплуатации прилоТСния с Ρ‚Π°ΠΊΠΎΠΉ систСмой ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ). Π’Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ ошибки ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ ΡΡ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π½ΠΈΡŽ SeekToCurrentErrorHandler. ΠžΡΡ‚Π°Π»ΡŒΠ½Ρ‹Π΅ ошибки ΠΏΠ΅Ρ‡Π°Ρ‚Π°Π»ΠΈΡΡŒ Π² Π»ΠΎΠ³, ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ ΡΠΌΠ΅Ρ‰Π΅Π½ΠΈΡŽ offset, ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°Π»Π°ΡΡŒ со ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ сообщСниСм.

Π˜Ρ‚ΠΎΠ³ΠΎΠ²ΠΎΠ΅ Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅

РСализация, основанная Π½Π° SeekToCurrentErrorHandler, ΠΏΠΎΠ΄Ρ‚ΠΎΠ»ΠΊΠ½ΡƒΠ»Π° нас ΠΊ Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚ΠΊΠ΅ собствСнного ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌΠ° для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ сообщСний.

ΠŸΡ€Π΅ΠΆΠ΄Π΅ всСго ΠΌΡ‹ Ρ…ΠΎΡ‚Π΅Π»ΠΈ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΡƒΠΆΠ΅ ΠΈΠΌΠ΅ΡŽΡ‰ΠΈΠΉΡΡ ΠΎΠΏΡ‹Ρ‚ ΠΈ Ρ€Π°ΡΡˆΠΈΡ€ΠΈΡ‚ΡŒ Π΅Π³ΠΎ Π² зависимости ΠΎΡ‚ Π»ΠΎΠ³ΠΈΠΊΠΈ прилоТСния. Для прилоТСния с Π»ΠΈΠ½Π΅ΠΉΠ½ΠΎΠΉ Π»ΠΎΠ³ΠΈΠΊΠΎΠΉ ΠΎΠΏΡ‚ΠΈΠΌΠ°Π»ΡŒΠ½Ρ‹ΠΌ Π±Ρ‹Π»ΠΎ Π±Ρ‹ ΠΏΡ€Π΅ΠΊΡ€Π°Ρ‚ΠΈΡ‚ΡŒ считываниС Π½ΠΎΠ²Ρ‹Ρ… сообщСний Π² Ρ‚Π΅Ρ‡Π΅Π½ΠΈΠ΅ нСбольшого ΠΏΡ€ΠΎΠΌΠ΅ΠΆΡƒΡ‚ΠΊΠ° Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ, Π·Π°Π΄Π°Π½Π½ΠΎΠ³ΠΎ Π² Ρ€Π°ΠΌΠΊΠ°Ρ… стратСгии ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ². Для ΠΎΡΡ‚Π°Π»ΡŒΠ½Ρ‹Ρ… ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ Ρ…ΠΎΡ‚Π΅Π»ΠΎΡΡŒ ΠΈΠΌΠ΅Ρ‚ΡŒ Π΅Π΄ΠΈΠ½ΡƒΡŽ Ρ‚ΠΎΡ‡ΠΊΡƒ, которая Π±Ρ‹ обСспСчивала Π²Ρ‹ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ стратСгии ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ². Π’ Π΄ΠΎΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ эта Сдиная Ρ‚ΠΎΡ‡ΠΊΠ° Π΄ΠΎΠ»ΠΆΠ½Π° ΠΎΠ±Π»Π°Π΄Π°Ρ‚ΡŒ Ρ„ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½Π°Π»ΡŒΠ½ΠΎΡΡ‚ΡŒΡŽ DLQ для ΠΎΠ±ΠΎΠΈΡ… ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΎΠ².

Π‘Π°ΠΌΠ° стратСгия ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ² Π΄ΠΎΠ»ΠΆΠ½Π° Ρ…Ρ€Π°Π½ΠΈΡ‚ΡŒΡΡ Π² ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ ΠΎΡ‚Π²Π΅Ρ‡Π°Π΅Ρ‚ Π·Π° ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠ΅ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅Π³ΠΎ ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π»Π° ΠΏΡ€ΠΈ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΠΎΠ²Π΅Π½ΠΈΠΈ Π²Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ ошибки.

ΠžΡΡ‚Π°Π½ΠΎΠ²ΠΊΠ° Consumer’a для прилоТСния с Π»ΠΈΠ½Π΅ΠΉΠ½ΠΎΠΉ Π»ΠΎΠ³ΠΈΠΊΠΎΠΉ

ΠŸΡ€ΠΈ Ρ€Π°Π±ΠΎΡ‚Π΅ с spring-kafka ΠΊΠΎΠ΄ для остановки Consumer’a ΠΌΠΎΠΆΠ΅Ρ‚ Π²Ρ‹Π³Π»ΡΠ΄Π΅Ρ‚ΡŒ ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π½ΠΎ Ρ‚Π°ΠΊ:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Π’ ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π΅ retryAt β€” это врСмя, ΠΊΠΎΠ³Π΄Π° Π½ΡƒΠΆΠ½ΠΎ Π·Π°Π½ΠΎΠ²ΠΎ Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ MessageListenerContainer, Ссли ΠΎΠ½ Π΅Ρ‰Π΅ Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚. ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹ΠΉ запуск ΠΏΡ€ΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ‚ Π² ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½ΠΎΠΌ ΠΏΠΎΡ‚ΠΎΠΊΠ΅, Π·Π°ΠΏΡƒΡ‰Π΅Π½Π½ΠΎΠΌ Π² TaskScheduler, Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΡŽ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ Ρ‚ΠΎΠΆΠ΅ прСдоставляСт spring.

Π—Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ retryAt ΠΌΡ‹ Π½Π°Ρ…ΠΎΠ΄ΠΈΠΌ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ способом:

  1. Π˜Ρ‰Π΅Ρ‚ΡΡ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ счСтчика ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ².
  2. Π’ соотвСтствии со Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ΠΌ счСтчика ищСтся Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΈ Π² стратСгии ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ². БтратСгия ΠΎΠ±ΡŠΡΠ²Π»ΡΠ΅Ρ‚ΡΡ Π² самом ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΈ, для Π΅Π΅ хранСния ΠΌΡ‹ Π²Ρ‹Π±Ρ€Π°Π»ΠΈ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚ JSON.
  3. НайдСнный Π² JSON-массивС ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» содСрТит Π² сСбС количСство сСкунд, Ρ‡Π΅Ρ€Π΅Π· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠ²Ρ‚ΠΎΡ€ΠΈΡ‚ΡŒ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ. Π­Ρ‚ΠΎ количСство сСкунд прибавляСтся ΠΊ Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΌΡƒ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ, образуя Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ для retryAt.
  4. Если ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» Π½Π΅ Π½Π°ΠΉΠ΄Π΅Π½, Ρ‚ΠΎ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ retryAt Ρ€Π°Π²Π½ΠΎ null ΠΈ сообщСниС отправится Π² DLQ для Ρ€ΡƒΡ‡Π½ΠΎΠ³ΠΎ Ρ€Π°Π·Π±ΠΎΡ€Π°.

ΠŸΡ€ΠΈ Ρ‚Π°ΠΊΠΎΠΌ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄Π΅ остаСтся Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΡΠΎΡ…Ρ€Π°Π½ΠΈΡ‚ΡŒ количСство ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ² для ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ сообщСния, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ находится сСйчас Π½Π° ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ΅, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ Π² памяти прилоТСния. Π‘ΠΎΡ…Ρ€Π°Π½Π΅Π½ΠΈΠ΅ счСтчика ΠΏΠΎΠΏΡ‹Ρ‚ΠΎΠΊ Π² памяти Π½Π΅ ΠΊΡ€ΠΈΡ‚ΠΈΡ‡Π½ΠΎ для этого ΠΏΠΎΠ΄Ρ…ΠΎΠ΄Π°, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ с Π»ΠΈΠ½Π΅ΠΉΠ½ΠΎΠΉ Π»ΠΎΠ³ΠΈΠΊΠΎΠΉ Π½Π΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ Π² Ρ†Π΅Π»ΠΎΠΌ. Π’ ΠΎΡ‚Π»ΠΈΡ‡ΠΈΠ΅ ΠΎΡ‚ spring-retry пСрСзапуск прилоТСния ΠΏΡ€ΠΈΠ²Π΅Π΄Π΅Ρ‚ Π½Π΅ ΠΊ ΠΏΠΎΡ‚Π΅Ρ€Π΅ всСх сообщСний для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ, Π° просто ΠΊ пСрСзапуску стратСгии.

Π­Ρ‚ΠΎΡ‚ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ ΠΏΠΎΠΌΠΎΠ³Π°Π΅Ρ‚ ΡΠ½ΡΡ‚ΡŒ Π½Π°Π³Ρ€ΡƒΠ·ΠΊΡƒ с внСшнСй систСмы, которая ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ нСдоступна ΠΈΠ·-Π·Π° ΠΎΡ‡Π΅Π½ΡŒ большой Π½Π°Π³Ρ€ΡƒΠ·ΠΊΠΈ. Π”Ρ€ΡƒΠ³ΠΈΠΌΠΈ словами, Π² Π΄ΠΎΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ ΠΊ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ΅ ΠΌΡ‹ добились Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ ΠΏΠ°Ρ‚Ρ‚Π΅Ρ€Π½Π° circuit breaker.

Π’ нашСм случаС ΠΏΠΎΡ€ΠΎΠ³ ошибки Ρ€Π°Π²Π΅Π½ всСго 1, Π° Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΌΠΈΠ½ΠΈΠΌΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ простой систСмы ΠΈΠ·-Π·Π° Π²Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠ³ΠΎ сСтСвого пСрСбоя, ΠΌΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ ΠΎΡ‡Π΅Π½ΡŒ Π³Ρ€Π°Π½ΡƒΠ»ΡΡ€Π½ΡƒΡŽ ΡΡ‚Ρ€Π°Ρ‚Π΅Π³ΠΈΡŽ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ² с нСбольшими ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π»Π°ΠΌΠΈ Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΈ. Π­Ρ‚ΠΎ ΠΌΠΎΠΆΠ΅Ρ‚ Π½Π΅ ΠΏΠΎΠ΄ΠΎΠΉΡ‚ΠΈ для всСх ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ Π³Ρ€ΡƒΠΏΠΏΡ‹ ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠΉ, поэтому ΡΠΎΠΎΡ‚Π½ΠΎΡˆΠ΅Π½ΠΈΠ΅ ΠΌΠ΅ΠΆΠ΄Ρƒ ΠΏΠΎΡ€ΠΎΠ³ΠΎΠΌ ошибки ΠΈ Π²Π΅Π»ΠΈΡ‡ΠΈΠ½ΠΎΠΉ ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π»Π° Π½ΡƒΠΆΠ½ΠΎ ΠΏΠΎΠ΄Π±ΠΈΡ€Π°Ρ‚ΡŒ, ΠΎΠΏΠΈΡ€Π°ΡΡΡŒ Π½Π° особСнности систСмы.

ΠžΡ‚Π΄Π΅Π»ΡŒΠ½ΠΎΠ΅ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ для ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ сообщСний ΠΎΡ‚ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ с Π½Π΅Π΄Π΅Ρ‚Π΅Ρ€ΠΌΠΈΠ½ΠΈΡ€ΠΎΠ²Π°Π½Π½ΠΎΠΉ Π»ΠΎΠ³ΠΈΠΊΠΎΠΉ

Π’ΠΎΡ‚ ΠΏΡ€ΠΈΠΌΠ΅Ρ€ ΠΊΠΎΠ΄Π°, ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡŽΡ‰Π΅Π³ΠΎ сообщСниС Π² Ρ‚Π°ΠΊΠΎΠ΅ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ (Retryer), ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΡƒΡŽ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΡƒ Π² Ρ‚ΠΎΠΏΠΈΠΊ DESTINATION ΠΏΡ€ΠΈ достиТСнии Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Из ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π° Π²ΠΈΠ΄Π½ΠΎ, Ρ‡Ρ‚ΠΎ ΠΌΠ½ΠΎΠ³ΠΎ ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ пСрСдаСтся Π² Ρ…Π΅Π΄Π΅Ρ€Π°Ρ…. Π—Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ RETRY_AT находится Ρ‚Π°ΠΊ ΠΆΠ΅, ΠΊΠ°ΠΊ ΠΈ для ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌΠ° ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π° Ρ‡Π΅Ρ€Π΅Π· остановку Consumer’a. Помимо DESTINATION ΠΈ RETRY_AT ΠΌΡ‹ ΠΏΠ΅Ρ€Π΅Π΄Π°Π΅ΠΌ:

  • GROUP_ID, ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ Π³Ρ€ΡƒΠΏΠΏΠΈΡ€ΡƒΠ΅ΠΌ сообщСния для Ρ€ΡƒΡ‡Π½ΠΎΠ³ΠΎ Π°Π½Π°Π»ΠΈΠ·Π° ΠΈ упрощСния поиска.
  • ORIGINAL_PARTITION, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΠΎΡΡ‚Π°Ρ€Π°Ρ‚ΡŒΡΡ ΡΠΎΡ…Ρ€Π°Π½ΠΈΡ‚ΡŒ Ρ‚ΠΎΡ‚ ΠΆΠ΅ Consumer для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ. Π­Ρ‚ΠΎΡ‚ ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ Ρ€Π°Π²Π΅Π½ null, Π² Ρ‚Π°ΠΊΠΎΠΌ случаС новая partition Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π° ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ record.key() ΠΎΡ€ΠΈΠ³ΠΈΠ½Π°Π»ΡŒΠ½ΠΎΠ³ΠΎ сообщСния.
  • ОбновлСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ COUNTER, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΡΠ»Π΅Π΄ΠΎΠ²Π°Ρ‚ΡŒ стратСгии ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹Ρ… Π²Ρ‹Π·ΠΎΠ²ΠΎΠ².
  • SEND_TO β€” константа, ΠΏΠΎΠΊΠ°Π·Ρ‹Π²Π°ΡŽΡ‰Π°Ρ, ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ Π»ΠΈ сообщСниС Π½Π° ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΡƒΡŽ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ ΠΏΠΎ достиТСнии RETRY_AT ΠΈΠ»ΠΈ ΠΏΠΎΠΌΠ΅ΡΡ‚ΠΈΡ‚ΡŒ Π² DLQ.
  • REASON β€” ΠΏΡ€ΠΈΡ‡ΠΈΠ½Π°, ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° сообщСния Π±Ρ‹Π»Π° ΠΏΡ€Π΅Ρ€Π²Π°Π½Π°.

Retryer сохраняСт сообщСния для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ ΠΈ Ρ€ΡƒΡ‡Π½ΠΎΠ³ΠΎ Ρ€Π°Π·Π±ΠΎΡ€Π° Π² PostgreSQL. По Ρ‚Π°ΠΉΠΌΠ΅Ρ€Ρƒ запускаСтся Π·Π°Π΄Π°Ρ‡Π°, которая Π½Π°Ρ…ΠΎΠ΄ΠΈΡ‚ сообщСния с Π½Π°ΡΡ‚ΡƒΠΏΠΈΠ²ΡˆΠΈΠΌ RETRY_AT ΠΈ отправляСт ΠΈΡ… ΠΎΠ±Ρ€Π°Ρ‚Π½ΠΎ Π² ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΡŽ ORIGINAL_PARTITION Ρ‚ΠΎΠΏΠΈΠΊΠ° DESTINATION с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ record.key().

ПослС ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ сообщСния ΡƒΠ΄Π°Π»ΡΡŽΡ‚ΡΡ ΠΈΠ· PostgreSQL. Π ΡƒΡ‡Π½ΠΎΠΉ Ρ€Π°Π·Π±ΠΎΡ€ сообщСний происходит Π² простом UI, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ взаимодСйствуСт с Retryer ΠΏΠΎ REST API. ΠžΡΠ½ΠΎΠ²Π½Ρ‹ΠΌΠΈ Π΅Π³ΠΎ особСнностями ΡΠ²Π»ΡΡŽΡ‚ΡΡ ΠΏΠ΅Ρ€Π΅ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ° ΠΈΠ»ΠΈ ΡƒΠ΄Π°Π»Π΅Π½ΠΈΠ΅ сообщСний ΠΈΠ· DLQ, просмотр ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ ΠΎΠ± ошибкС ΠΈ поиск сообщСний, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€ ΠΏΠΎ ΠΈΠΌΠ΅Π½ΠΈ ошибки.

Π’Π°ΠΊ ΠΊΠ°ΠΊ Π½Π° Π½Π°ΡˆΠΈΡ… кластСрах Π²ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΎ ΡƒΠΏΡ€Π°Π²Π»Π΅Π½ΠΈΠ΅ доступом, Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎ Π·Π°ΠΏΡ€Π°ΡˆΠΈΠ²Π°Ρ‚ΡŒ доступы ΠΊ Ρ‚ΠΎΠΏΠΈΠΊΡƒ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΡΠ»ΡƒΡˆΠ°Π΅Ρ‚ Retryer, ΠΈ Π΄Π°Ρ‚ΡŒ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Retryer’у ΠΏΠΈΡΠ°Ρ‚ΡŒ Π² DESTINATION Ρ‚ΠΎΠΏΠΈΠΊ. Π­Ρ‚ΠΎ Π½Π΅ΡƒΠ΄ΠΎΠ±Π½ΠΎ, Π½ΠΎ, Π² ΠΎΡ‚Π»ΠΈΡ‡ΠΈΠ΅ ΠΎΡ‚ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄Π° с Ρ‚ΠΎΠΏΠΈΠΊΠΎΠΌ Π½Π° ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π», Ρƒ нас появляСтся полноцСнная DLQ ΠΈ UI для управлСния Сю.

Π‘Ρ‹Π²Π°ΡŽΡ‚ случаи, ΠΊΠΎΠ³Π΄Π° входящий Ρ‚ΠΎΠΏΠΈΠΊ Ρ‡ΠΈΡ‚Π°ΡŽΡ‚ нСсколько Ρ€Π°Π·Π½Ρ‹Ρ… consumer-Π³Ρ€ΡƒΠΏΠΏ, прилоТСния ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Ρ€Π΅Π°Π»ΠΈΠ·ΡƒΡŽΡ‚ Ρ€Π°Π·Π½ΡƒΡŽ Π»ΠΎΠ³ΠΈΠΊΡƒ. ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Π°Ρ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° сообщСния Ρ‡Π΅Ρ€Π΅Π· Retryer для ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΈΠ· Ρ‚Π°ΠΊΠΈΡ… ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ ΠΏΡ€ΠΈΠ²Π΅Π΄Π΅Ρ‚ ΠΊ Π΄ΡƒΠ±Π»ΠΈΠΊΠ°Ρ‚Ρƒ Π½Π° Π΄Ρ€ΡƒΠ³ΠΎΠΌ. Π§Ρ‚ΠΎΠ±Ρ‹ Π·Π°Ρ‰ΠΈΡ‚ΠΈΡ‚ΡŒΡΡ ΠΎΡ‚ этого, ΠΌΡ‹ Π·Π°Π²ΠΎΠ΄ΠΈΠΌ ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½Ρ‹ΠΉ Ρ‚ΠΎΠΏΠΈΠΊ для ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠΉ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠΈ. Входящий ΠΈ retry-Ρ‚ΠΎΠΏΠΈΠΊ ΠΌΠΎΠΆΠ΅Ρ‚ Ρ‡ΠΈΡ‚Π°Ρ‚ΡŒ ΠΎΠ΄ΠΈΠ½ ΠΈ Ρ‚ΠΎΡ‚ ΠΆΠ΅ Consumer Π±Π΅Π· ΠΊΠ°ΠΊΠΈΡ…-Π»ΠΈΠ±ΠΎ ΠΎΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΠΉ.

ΠŸΠΎΠ²Ρ‚ΠΎΡ€Π½Π°Ρ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° событий, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½Ρ‹Ρ… ΠΈΠ· Kafka

По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ этот ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ Π½Π΅ прСдоставляСт возмоТности circuit breaker’a, ΠΎΠ΄Π½Π°ΠΊΠΎ Π΅Π³ΠΎ ΠΌΠΎΠΆΠ½ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ Π² ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ spring-cloud-netflix ΠΈΠ»ΠΈ Π½ΠΎΠ²ΠΎΠ³ΠΎ spring cloud circuit breaker, ΠΎΠ±Π΅Ρ€Π½ΡƒΠ² мСста Π²Ρ‹Π·ΠΎΠ²ΠΎΠ² Π²Π½Π΅ΡˆΠ½ΠΈΡ… сСрвисов Π² ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΠ΅ абстракции. ΠšΡ€ΠΎΠΌΠ΅ Ρ‚ΠΎΠ³ΠΎ, появляСтся Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Π²Ρ‹Π±ΠΎΡ€Π° стратСгии для bulkhead ΠΏΠ°Ρ‚Ρ‚Π΅Ρ€Π½Π°, Ρ‡Ρ‚ΠΎ Ρ‚ΠΎΠΆΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΏΠΎΠ»Π΅Π·Π½ΠΎ. НапримСр, Π² spring-cloud-netflix это ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ thread pool ΠΈΠ»ΠΈ сСмафор.

Π’Ρ‹Π²ΠΎΠ΄

Π’ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π΅ Ρƒ нас ΠΏΠΎΠ»ΡƒΡ‡ΠΈΠ»ΠΎΡΡŒ ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½ΠΎΠ΅ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ позволяСт ΠΏΠΎΠ²Ρ‚ΠΎΡ€ΠΈΡ‚ΡŒ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ сообщСния ΠΏΡ€ΠΈ Π²Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ нСдоступности ΠΊΠ°ΠΊΠΎΠΉ-Π»ΠΈΠ±ΠΎ внСшнСй систСмы.

Одним ΠΈΠ· Π³Π»Π°Π²Π½Ρ‹Ρ… прСимущСств прилоТСния являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ ΠΈΠΌ ΠΌΠΎΠ³ΡƒΡ‚ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ внСшниС систСмы, Ρ€Π°Π±ΠΎΡ‚Π°ΡŽΡ‰ΠΈΠ΅ Π½Π° Ρ‚ΠΎΠΌ ΠΆΠ΅ Kafka-кластСрС, Π±Π΅Π· Π·Π½Π°Ρ‡ΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… Π΄ΠΎΡ€Π°Π±ΠΎΡ‚ΠΎΠΊ Π½Π° своСй сторонС! Π’Π°ΠΊΠΎΠΌΡƒ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡŽ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π±ΡƒΠ΄Π΅Ρ‚ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ доступ ΠΊ retry-Ρ‚ΠΎΠΏΠΈΠΊΡƒ, Π·Π°ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ нСсколько Kafka-Π·Π°Π³ΠΎΠ»ΠΎΠ²ΠΊΠΎΠ² ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ сообщСниС Π² Retryer. НС Π½ΡƒΠΆΠ½ΠΎ ΠΏΠΎΠ΄Π½ΠΈΠΌΠ°Ρ‚ΡŒ Π½ΠΈΠΊΠ°ΠΊΠΎΠΉ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠΉ инфраструктуры. А Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΡƒΠΌΠ΅Π½ΡŒΡˆΠΈΡ‚ΡŒ количСство ΠΏΠ΅Ρ€Π΅ΠΊΠ»Π°Π΄Ρ‹Π²Π°Π΅ΠΌΡ‹Ρ… сообщСний ΠΈΠ· прилоТСния Π² Retryer ΠΈ ΠΎΠ±Ρ€Π°Ρ‚Π½ΠΎ, ΠΌΡ‹ Π²Ρ‹Π΄Π΅Π»ΠΈΠ»ΠΈ прилоТСния с Π»ΠΈΠ½Π΅ΠΉΠ½ΠΎΠΉ Π»ΠΎΠ³ΠΈΠΊΠΎΠΉ ΠΈ сдСлали Π² Π½ΠΈΡ… ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΡƒΡŽ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ Ρ‡Π΅Ρ€Π΅Π· остановку Consumer.

Π˜ΡΡ‚ΠΎΡ‡Π½ΠΈΠΊ: habr.com

Π”ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ ΠΊΠΎΠΌΠΌΠ΅Π½Ρ‚Π°Ρ€ΠΈΠΉ