Знакомство с Debezium β€” CDC для Apache Kafka

Знакомство с Debezium — CDC для Apache Kafka

Π’ своСй Ρ€Π°Π±ΠΎΡ‚Π΅ я часто ΡΡ‚Π°Π»ΠΊΠΈΠ²Π°ΡŽΡΡŒ с Π½ΠΎΠ²Ρ‹ΠΌΠΈ тСхничСскими Ρ€Π΅ΡˆΠ΅Π½ΠΈΡΠΌΠΈ/ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ½Ρ‹ΠΌΠΈ ΠΏΡ€ΠΎΠ΄ΡƒΠΊΡ‚Π°ΠΌΠΈ, ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ ΠΎ ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π² русскоязычном ΠΈΠ½Ρ‚Π΅Ρ€Π½Π΅Ρ‚Π΅ довольно ΠΌΠ°Π»ΠΎ. Π­Ρ‚ΠΎΠΉ ΡΡ‚Π°Ρ‚ΡŒΠ΅ΠΉ ΠΏΠΎΡΡ‚Π°Ρ€Π°ΡŽΡΡŒ Π²ΠΎΡΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΠΎΠ΄ΠΈΠ½ Ρ‚Π°ΠΊΠΎΠΉ ΠΏΡ€ΠΎΠ±Π΅Π» ΠΏΡ€ΠΈΠΌΠ΅Ρ€ΠΎΠΌ ΠΈΠ· своСй Π½Π΅Π΄Π°Π²Π½Π΅ΠΉ ΠΏΡ€Π°ΠΊΡ‚ΠΈΠΊΠΈ, ΠΊΠΎΠ³Π΄Π° ΠΏΠΎΡ‚Ρ€Π΅Π±ΠΎΠ²Π°Π»ΠΎΡΡŒ Π½Π°ΡΡ‚Ρ€ΠΎΠΈΡ‚ΡŒ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΡƒ CDC-событий ΠΈΠ· Π΄Π²ΡƒΡ… популярных Π‘Π£Π‘Π” (PostgreSQL ΠΈ MongoDB) Π² кластСр Kafka ΠΏΡ€ΠΈ ΠΏΠΎΠΌΠΎΡ‰ΠΈ Debezium. НадСюсь, эта обзорная ΡΡ‚Π°Ρ‚ΡŒΡ, появившаяся ΠΏΠΎ ΠΈΡ‚ΠΎΠ³Π°ΠΌ ΠΏΡ€ΠΎΠ΄Π΅Π»Π°Π½Π½ΠΎΠΉ Ρ€Π°Π±ΠΎΡ‚Ρ‹, окаТСтся ΠΏΠΎΠ»Π΅Π·Π½ΠΎΠΉ ΠΈ Π΄Ρ€ΡƒΠ³ΠΈΠΌ.

Π§Ρ‚ΠΎ Π·Π° Debezium ΠΈ Π²ΠΎΠΎΠ±Ρ‰Π΅ CDC?

Debezium β€” ΠΏΡ€Π΅Π΄ΡΡ‚Π°Π²ΠΈΡ‚Π΅Π»ΡŒ ΠΊΠ°Ρ‚Π΅Π³ΠΎΡ€ΠΈΠΈ ΠΏΡ€ΠΎΠ³Ρ€Π°ΠΌΠΌΠ½ΠΎΠ³ΠΎ обСспСчСния CDC (Capture Data Change), Π° Ссли Ρ‚ΠΎΡ‡Π½Π΅Π΅ β€” это Π½Π°Π±ΠΎΡ€ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠ² для Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Ρ… Π‘Π£Π‘Π”, совмСстимых с Ρ„Ρ€Π΅ΠΉΠΌΠ²ΠΎΡ€ΠΊΠΎΠΌ Apache Kafka Connect.

Π­Ρ‚ΠΎ Open Source-ΠΏΡ€ΠΎΠ΅ΠΊΡ‚, ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‰ΠΈΠΉ Π»ΠΈΡ†Π΅Π½Π·ΠΈΡŽ Apache License v2.0 ΠΈ спонсируСмый ΠΊΠΎΠΌΠΏΠ°Π½ΠΈΠ΅ΠΉ Red Hat. Π Π°Π·Ρ€Π°Π±ΠΎΡ‚ΠΊΠ° вСдётся с 2016 Π³ΠΎΠ΄Π° ΠΈ Π½Π° Π΄Π°Π½Π½Ρ‹ΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ‚ Π² Π½Π΅ΠΌ прСдставлСна ΠΎΡ„ΠΈΡ†ΠΈΠ°Π»ΡŒΠ½Π°Ρ ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠ° ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΡ… Π‘Π£Π‘Π”: MySQL, PostgreSQL, MongoDB, SQL Server. Π’Π°ΠΊΠΆΠ΅ ΡΡƒΡ‰Π΅ΡΡ‚Π²ΡƒΡŽΡ‚ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Ρ‹ для Cassandra ΠΈ Oracle, Π½ΠΎ Π½Π° Π΄Π°Π½Π½Ρ‹ΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ‚ ΠΎΠ½ΠΈ находятся Π² статусС Β«Ρ€Π°Π½Π½Π΅Π³ΠΎ доступа», Π° Π½ΠΎΠ²Ρ‹Π΅ Ρ€Π΅Π»ΠΈΠ·Ρ‹ Π½Π΅ Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡ€ΡƒΡŽΡ‚ ΠΎΠ±Ρ€Π°Ρ‚Π½ΠΎΠΉ совмСстимости.

Если ΡΡ€Π°Π²Π½ΠΈΠ²Π°Ρ‚ΡŒ CDC с Ρ‚Ρ€Π°Π΄ΠΈΡ†ΠΈΠΎΠ½Π½Ρ‹ΠΌ ΠΏΠΎΠ΄Ρ…ΠΎΠ΄ΠΎΠΌ (ΠΊΠΎΠ³Π΄Π° ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Ρ‡ΠΈΡ‚Π°Π΅Ρ‚ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· Π‘Π£Π‘Π” Π½Π°ΠΏΡ€ΡΠΌΡƒΡŽ), Ρ‚ΠΎ ΠΊ Π΅Π³ΠΎ Π³Π»Π°Π²Π½Ρ‹ΠΌ прСимущСствам относят Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΡŽ стриминга измСнСния Π΄Π°Π½Π½Ρ‹Ρ… Π½Π° ΡƒΡ€ΠΎΠ²Π½Π΅ строк с Π½ΠΈΠ·ΠΊΠΎΠΉ Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ, высокой Π½Π°Π΄Π΅ΠΆΠ½ΠΎΡΡ‚ΡŒΡŽ ΠΈ Π΄ΠΎΡΡ‚ΡƒΠΏΠ½ΠΎΡΡ‚ΡŒΡŽ. ПослСдниС Π΄Π²Π° ΠΏΡƒΠ½ΠΊΡ‚Π° Π΄ΠΎΡΡ‚ΠΈΠ³Π°ΡŽΡ‚ΡΡ благодаря использованию кластСра Kafka Π² качСствС Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π° CDC-событий.

Π’Π°ΠΊΠΆΠ΅ ΠΊ достоинствам ΠΌΠΎΠΆΠ½ΠΎ отнСсти Ρ‚ΠΎΡ‚ Ρ„Π°ΠΊΡ‚, Ρ‡Ρ‚ΠΎ для хранСния событий ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ Сдиная модСль, поэтому ΠΊΠΎΠ½Π΅Ρ‡Π½ΠΎΠΌΡƒ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡŽ Π½Π΅ придётся Π±Π΅ΡΠΏΠΎΠΊΠΎΠΈΡ‚ΡŒΡΡ ΠΎ Π½ΡŽΠ°Π½ΡΠ°Ρ… эксплуатации Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Ρ… Π‘Π£Π‘Π”.

НаконСц, благодаря использованию Π±Ρ€ΠΎΠΊΠ΅Ρ€Π° сообщСний открываСтся простор для Π³ΠΎΡ€ΠΈΠ·ΠΎΠ½Ρ‚Π°Π»ΡŒΠ½ΠΎΠ³ΠΎ ΠΌΠ°ΡΡˆΡ‚Π°Π±ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΡ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ, ΠΎΡ‚ΡΠ»Π΅ΠΆΠΈΠ²Π°ΡŽΡ‰ΠΈΡ… измСнСния Π² Π΄Π°Π½Π½Ρ‹Ρ…. ΠŸΡ€ΠΈ этом влияниС Π½Π° источник Π΄Π°Π½Π½Ρ‹Ρ… сводится ΠΊ ΠΌΠΈΠ½ΠΈΠΌΡƒΠΌΡƒ, ΠΏΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… происходит Π½Π΅ Π½Π°ΠΏΡ€ΡΠΌΡƒΡŽ ΠΈΠ· Π‘Π£Π‘Π”, Π° ΠΈΠ· кластСра Kafka.

Об Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Π΅ Debezium

ИспользованиС Debezium сводится ΠΊ Ρ‚Π°ΠΊΠΎΠΉ простой схСмС:

Π‘Π£Π‘Π” (ΠΊΠ°ΠΊ источник Π΄Π°Π½Π½Ρ‹Ρ…) β†’ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π² Kafka Connect β†’ Apache Kafka β†’ ΠΊΠΎΠ½ΡΡŒΡŽΠΌΠ΅Ρ€

Π’ качСствС ΠΈΠ»Π»ΡŽΡΡ‚Ρ€Π°Ρ†ΠΈΠΈ ΠΏΡ€ΠΈΠ²Π΅Π΄Ρƒ схСму с сайта ΠΏΡ€ΠΎΠ΅ΠΊΡ‚Π°:

Знакомство с Debezium — CDC для Apache Kafka

Однако эта схСма ΠΌΠ½Π΅ Π½Π΅ ΠΎΡ‡Π΅Π½ΡŒ нравится, ΠΏΠΎΡΠΊΠΎΠ»ΡŒΠΊΡƒ складываСтся Π²ΠΏΠ΅Ρ‡Π°Ρ‚Π»Π΅Π½ΠΈΠ΅, Ρ‡Ρ‚ΠΎ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ использованиС sink-ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°.

Π’ Π΄Π΅ΠΉΡΡ‚Π²ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΡΡ‚ΠΈ ΠΆΠ΅ ситуация отличаСтся: Π½Π°ΠΏΠΎΠ»Π½Π΅Π½ΠΈΠ΅ вашСго Data Lake (послСднСС Π·Π²Π΅Π½ΠΎ Π½Π° схСмС Π²Ρ‹ΡˆΠ΅) Β­β€” это Π½Π΅ СдинствСнный способ примСнСния Debezium. Бобытия, ΠΎΡ‚ΠΏΡ€Π°Π²Π»Π΅Π½Π½Ρ‹Π΅ Π² Apache Kafka, ΠΌΠΎΠ³ΡƒΡ‚ Π±Ρ‹Ρ‚ΡŒ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ вашими прилоТСниями для Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹Ρ… ситуаций. НапримСр:

  • ΡƒΠ΄Π°Π»Π΅Π½ΠΈΠ΅ Π½Π΅Π°ΠΊΡ‚ΡƒΠ°Π»ΡŒΠ½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· кэша;
  • ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ° ΡƒΠ²Π΅Π΄ΠΎΠΌΠ»Π΅Π½ΠΈΠΉ;
  • обновлСния поисковых индСксов;
  • Π½Π΅ΠΊΠΎΠ΅ ΠΏΠΎΠ΄ΠΎΠ±ΠΈΠ΅ Π»ΠΎΠ³ΠΎΠ² Π°ΡƒΠ΄ΠΈΡ‚Π°;
  • …

Π’ случаС, Ссли Ρƒ вас ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Π½Π° Java ΠΈ Π½Π΅Ρ‚ нСобходимости/возмоТности ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ кластСр Kafka, сущСствуСт Ρ‚Π°ΠΊΠΆΠ΅ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Ρ€Π°Π±ΠΎΡ‚Ρ‹ Ρ‡Π΅Ρ€Π΅Π· embedded-ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€. ΠžΡ‡Π΅Π²ΠΈΠ΄Π½Ρ‹ΠΉ плюс Π² Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ с Π½ΠΈΠΌ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΡ‚ΠΊΠ°Π·Π°Ρ‚ΡŒΡΡ ΠΎΡ‚ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠΉ инфраструктуры (Π² Π²ΠΈΠ΄Π΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° ΠΈ Kafka). Однако это Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅ объявлСно ΡƒΡΡ‚Π°Ρ€Π΅Π²ΡˆΠΈΠΌ (deprecated) с вСрсии 1.1 ΠΈ большС Π½Π΅ рСкомСндуСтся ΠΊ использованию (Π² Π±ΡƒΠ΄ΡƒΡ‰ΠΈΡ… Ρ€Π΅Π»ΠΈΠ·Π°Ρ… Π΅Π³ΠΎ ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΡƒ ΠΌΠΎΠ³ΡƒΡ‚ ΡƒΠ±Ρ€Π°Ρ‚ΡŒ).

Π’ Π΄Π°Π½Π½ΠΎΠΉ ΡΡ‚Π°Ρ‚ΡŒΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ Ρ€Π°ΡΡΠΌΠ°Ρ‚Ρ€ΠΈΠ²Π°Ρ‚ΡŒΡΡ рСкомСндуСмая Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠ°ΠΌΠΈ Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Π°, которая обСспСчиваСт ΠΎΡ‚ΠΊΠ°Π·ΠΎΡƒΡΡ‚ΠΎΠΉΡ‡ΠΈΠ²ΠΎΡΡ‚ΡŒ ΠΈ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΌΠ°ΡΡˆΡ‚Π°Π±ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΡ.

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°

Для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π½Π°Ρ‡Π°Ρ‚ΡŒ ΠΎΡ‚ΡΠ»Π΅ΠΆΠΈΠ²Π°Ρ‚ΡŒ измСнСния самой Π³Π»Π°Π²Π½ΠΎΠΉ цСнности β€” Π΄Π°Π½Π½Ρ‹Ρ…, β€” Π½Π°ΠΌ ΠΏΠΎΡ‚Ρ€Π΅Π±ΡƒΡŽΡ‚ΡΡ:

  1. источник Π΄Π°Π½Π½Ρ‹Ρ…, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΠ²Π»ΡΡ‚ΡŒΡΡ MySQL начиная с вСрсии 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (ΠΏΠΎΠ»Π½Ρ‹ΠΉ список);
  2. кластСр Apache Kafka;
  3. инстанс Kafka Connect (вСрсии 1.x, 2.x);
  4. сконфигурированный ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Debezium.

Π Π°Π±ΠΎΡ‚Ρ‹ ΠΏΠΎ ΠΏΠ΅Ρ€Π²Ρ‹ΠΌ Π΄Π²ΡƒΠΌ ΠΏΡƒΠ½ΠΊΡ‚Π°ΠΌ, Ρ‚.Π΅. процСсс инсталляции Π‘Π£Π‘Π” ΠΈ Apache Kafka, выходят Π·Π° Ρ€Π°ΠΌΠΊΠΈ ΡΡ‚Π°Ρ‚ΡŒΠΈ. Однако для Ρ‚Π΅Ρ…, ΠΊΡ‚ΠΎ Ρ…ΠΎΡ‡Π΅Ρ‚ Ρ€Π°Π·Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ всё Π² пСсочницС, Π² ΠΎΡ„ΠΈΡ†ΠΈΠ°Π»ΡŒΠ½ΠΎΠΌ Ρ€Π΅ΠΏΠΎΠ·ΠΈΡ‚ΠΎΡ€ΠΈΠΈ с ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π°ΠΌΠΈ Π΅ΡΡ‚ΡŒ Π³ΠΎΡ‚ΠΎΠ²Ρ‹ΠΉ docker-compose.yaml.

ΠœΡ‹ ΠΆΠ΅ остановимся ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ Π½Π° Π΄Π²ΡƒΡ… послСдних ΠΏΡƒΠ½ΠΊΡ‚Π°Ρ….

0. Kafka Connect

Π—Π΄Π΅ΡΡŒ ΠΈ Π΄Π°Π»Π΅Π΅ Π² ΡΡ‚Π°Ρ‚ΡŒΠ΅ всС ΠΏΡ€ΠΈΠΌΠ΅Ρ€Ρ‹ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ Ρ€Π°ΡΡΠΌΠ°Ρ‚Ρ€ΠΈΠ²Π°ΡŽΡ‚ΡΡ Π² контСкстС Docker-ΠΎΠ±Ρ€Π°Π·Π°, распространяСмого Ρ€Π°Π·Ρ€Π°Π±ΠΎΡ‚Ρ‡ΠΈΠΊΠ°ΠΌΠΈ Debezium. Он содСрТит всС Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΡ‹Π΅ Ρ„Π°ΠΉΠ»Ρ‹ ΠΏΠ»Π°Π³ΠΈΠ½ΠΎΠ² (ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Ρ‹) ΠΈ прСдусматриваСт ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡŽ Kafka Connect ΠΏΡ€ΠΈ ΠΏΠΎΠΌΠΎΡ‰ΠΈ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… окруТСния.

Π’ случаС, Ссли прСдполагаСтся использованиС Kafka Connect ΠΎΡ‚ Confluent, потрСбуСтся ΡΠ°ΠΌΠΎΡΡ‚ΠΎΡΡ‚Π΅Π»ΡŒΠ½ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ ΠΏΠ»Π°Π³ΠΈΠ½Ρ‹ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΡ‹Ρ… ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠ² Π² Π΄ΠΈΡ€Π΅ΠΊΡ‚ΠΎΡ€ΠΈΡŽ, ΡƒΠΊΠ°Π·Π°Π½Π½ΡƒΡŽ Π² plugin.path ΠΈΠ»ΠΈ Π·Π°Π΄Π°Π²Π°Π΅ΠΌΡƒΡŽ Ρ‡Π΅Ρ€Π΅Π· ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ окруТСния CLASSPATH. Настройки Π²ΠΎΡ€ΠΊΠ΅Ρ€Π° Kafka Connect ΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠ² ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΡΡŽΡ‚ΡΡ Ρ‡Π΅Ρ€Π΅Π· ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΎΠ½Π½Ρ‹Π΅ Ρ„Π°ΠΉΠ»Ρ‹, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΏΠ΅Ρ€Π΅Π΄Π°ΡŽΡ‚ΡΡ Π°Ρ€Π³ΡƒΠΌΠ΅Π½Ρ‚Π°ΠΌΠΈ ΠΊ ΠΊΠΎΠΌΠ°Π½Π΄Π΅ запуска Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°. ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ см. Π² Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ.

Π’Π΅ΡΡŒ процСсс ΠΏΠΎ настройкС Debeizum Π² Π²Π°Ρ€ΠΈΠ°Π½Ρ‚Π΅ с ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ осущСствляСтся Π² Π΄Π²Π° этапа. Рассмотрим ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ ΠΈΠ· Π½ΠΈΡ…:

1. Настройка Ρ„Ρ€Π΅ΠΉΠΌΠ²ΠΎΡ€ΠΊΠ° Kafka Connect

Для стриминга Π΄Π°Π½Π½Ρ‹Ρ… Π² кластСр Apache Kafka Π²ΠΎ Ρ„Ρ€Π΅ΠΉΠΌΠ²ΠΎΡ€ΠΊΠ΅ Kafka Connect Π·Π°Π΄Π°ΡŽΡ‚ΡΡ спСцифичныС ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹, Ρ‚Π°ΠΊΠΈΠ΅ ΠΊΠ°ΠΊ:

  • ΠΏΠ°Ρ€Π°ΠΌΠ΅Ρ‚Ρ€Ρ‹ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ кластСру,
  • названия Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ², Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π±ΡƒΠ΄Π΅Ρ‚ Ρ…Ρ€Π°Π½ΠΈΡ‚ΡŒΡΡ нСпосрСдствСнно конфигурация самого ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°,
  • имя Π³Ρ€ΡƒΠΏΠΏΡ‹, Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ Π·Π°ΠΏΡƒΡ‰Π΅Π½ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ (Π² случаС использования distributed-Ρ€Π΅ΠΆΠΈΠΌΠ°).

ΠžΡ„ΠΈΡ†ΠΈΠ°Π»ΡŒΠ½Ρ‹ΠΉ Docker-ΠΎΠ±Ρ€Π°Π· ΠΏΡ€ΠΎΠ΅ΠΊΡ‚Π° ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΈΠ²Π°Π΅Ρ‚ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡŽ ΠΏΡ€ΠΈ ΠΏΠΎΠΌΠΎΡ‰ΠΈ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… окруТСния β€” этим ΠΈ Π²ΠΎΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌΡΡ. Π˜Ρ‚Π°ΠΊ, скачиваСм ΠΎΠ±Ρ€Π°Π·:

docker pull debezium/connect

ΠœΠΈΠ½ΠΈΠΌΠ°Π»ΡŒΠ½Ρ‹ΠΉ Π½Π°Π±ΠΎΡ€ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Ρ… окруТСния, Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΡ‹ΠΉ для запуска ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°, выглядит ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 β€” Π½Π°Ρ‡Π°Π»ΡŒΠ½Ρ‹ΠΉ список сСрвСров кластСра Kafka для получСния ΠΏΠΎΠ»Π½ΠΎΠ³ΠΎ списка Ρ‡Π»Π΅Π½ΠΎΠ² кластСра;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” Ρ‚ΠΎΠΏΠΈΠΊ для хранСния ΠΏΠΎΠ·ΠΈΡ†ΠΈΠΉ, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π½Π° Π΄Π°Π½Π½Ρ‹ΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ‚ находится ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status β€” Ρ‚ΠΎΠΏΠΈΠΊ для хранСния статуса ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° ΠΈ Π΅Π³ΠΎ Π·Π°Π΄Π°Π½ΠΈΠΉ;
  • CONFIG_STORAGE_TOPIC=connector-config β€” Ρ‚ΠΎΠΏΠΈΠΊ для хранСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° ΠΈ Π΅Π³ΠΎ Π·Π°Π΄Π°Π½ΠΈΠΉ;
  • GROUP_ID=1 β€” ΠΈΠ΄Π΅Π½Ρ‚ΠΈΡ„ΠΈΠΊΠ°Ρ‚ΠΎΡ€ Π³Ρ€ΡƒΠΏΠΏΡ‹ Π²ΠΎΡ€ΠΊΠ΅Ρ€ΠΎΠ², Π½Π° ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… ΠΌΠΎΠΆΠ΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒΡΡ Π·Π°Π΄Π°Π½ΠΈΠ΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°; Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌ ΠΏΡ€ΠΈ использовании распрСдСлённого (distributed) Ρ€Π΅ΠΆΠΈΠΌΠ°.

ЗапускаСм ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€ с этими ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹ΠΌΠΈ:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

ΠŸΡ€ΠΈΠΌΠ΅Ρ‡Π°Π½ΠΈΠ΅ ΠΏΡ€ΠΎ Avro

По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Debezium ΠΏΠΈΡˆΠ΅Ρ‚ Π΄Π°Π½Π½Ρ‹Π΅ Π² Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ JSON, Ρ‡Ρ‚ΠΎ ΠΏΡ€ΠΈΠ΅ΠΌΠ»Π΅ΠΌΠΎ для пСсочниц ΠΈ Π½Π΅Π±ΠΎΠ»ΡŒΡˆΠΈΡ… ΠΎΠ±ΡŠΡ‘ΠΌΠΎΠ² Π΄Π°Π½Π½Ρ‹Ρ…, Π½ΠΎ ΠΌΠΎΠΆΠ΅Ρ‚ ΡΡ‚Π°Ρ‚ΡŒ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠΎΠΉ Π² высоконагруТСнных Π±Π°Π·Π°Ρ…. ΠΠ»ΡŒΡ‚Π΅Ρ€Π½Π°Ρ‚ΠΈΠ²ΠΎΠΉ JSON-ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π΅Ρ€Ρƒ являСтся сСриализация сообщСний ΠΏΡ€ΠΈ ΠΏΠΎΠΌΠΎΡ‰ΠΈ Avro Π² Π±ΠΈΠ½Π°Ρ€Π½Ρ‹ΠΉ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚, Ρ‡Ρ‚ΠΎ позволяСт ΡΠ½ΠΈΠ·ΠΈΡ‚ΡŒ Π½Π°Π³Ρ€ΡƒΠ·ΠΊΡƒ Π½Π° подсистСму I/O Π² Apache Kafka.

Для использования Avro трСбуСтся Ρ€Π°Π·Π²Π΅Ρ€Π½ΡƒΡ‚ΡŒ ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½Ρ‹ΠΉ schema-registry (для хранСния схСм). ΠŸΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½Ρ‹Π΅ для ΠΊΠΎΠ½Π²Π΅Ρ€Ρ‚Π΅Ρ€Π° Π±ΡƒΠ΄ΡƒΡ‚ Π²Ρ‹Π³Π»ΡΠ΄Π΅Ρ‚ΡŒ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Π”Π΅Ρ‚Π°Π»ΠΈ ΠΏΠΎ использованию Avro ΠΈ настройкС registry Π·Π° Π½Π΅Π³ΠΎ выходят Π·Π° Ρ€Π°ΠΌΠΊΠΈ ΡΡ‚Π°Ρ‚ΡŒΠΈ β€” Π΄Π°Π»Π΅Π΅ для наглядности ΠΌΡ‹ Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ JSON.

2. Настройка самого ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°

Π’Π΅ΠΏΠ΅Ρ€ΡŒ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠ΅Ρ€Π΅ΠΉΡ‚ΠΈ нСпосрСдствСнно ΠΊ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ самого ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ Ρ‡ΠΈΡ‚Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· источника.

Рассмотрим Π½Π° ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠ² для Π΄Π²ΡƒΡ… Π‘Π£Π‘Π”: PostgreSQL ΠΈ MongoDB, β€” ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ Ρƒ мСня Π΅ΡΡ‚ΡŒ ΠΎΠΏΡ‹Ρ‚ ΠΈ ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ ΠΈΠΌΠ΅ΡŽΡ‚ΡΡ отличия (ΠΏΡƒΡΡ‚ΡŒ ΠΈ нСбольшиС, Π½ΠΎ Π² Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… случаях β€” сущСствСнныС!).

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ описываСтся Π² Π½ΠΎΡ‚Π°Ρ†ΠΈΠΈ JSON ΠΈ загруТаСтся Π² Kafka Connect ΠΏΡ€ΠΈ ΠΏΠΎΠΌΠΎΡ‰ΠΈ POST-запроса.

2.1. PostgreSQL

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° для PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

ΠŸΡ€ΠΈΠ½Ρ†ΠΈΠΏ Ρ€Π°Π±ΠΎΡ‚Ρ‹ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° послС Ρ‚Π°ΠΊΠΎΠΉ настройки довольно прост:

  • ΠŸΡ€ΠΈ ΠΏΠ΅Ρ€Π²ΠΎΠΌ запускС ΠΎΠ½ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅Ρ‚ΡΡ ΠΊ Π±Π°Π·Π΅, ΡƒΠΊΠ°Π·Π°Π½Π½ΠΎΠΉ Π² ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ, ΠΈ запускаСтся Π² Ρ€Π΅ΠΆΠΈΠΌΠ΅ initial snapshot, отправляя Π² Kafka Π½Π°Ρ‡Π°Π»ΡŒΠ½Ρ‹ΠΉ Π½Π°Π±ΠΎΡ€ Π΄Π°Π½Π½Ρ‹Ρ…, ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½Π½Ρ‹Ρ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ условного SELECT * FROM table_name.
  • ПослС Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ инициализация Π±ΡƒΠ΄Π΅Ρ‚ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½Π°, ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΠ΅Ρ€Π΅Ρ…ΠΎΠ΄ΠΈΡ‚ Π² Ρ€Π΅ΠΆΠΈΠΌ чтСния ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ ΠΈΠ· WAL-Ρ„Π°ΠΉΠ»ΠΎΠ² PostgreSQL.

Об ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌΡ‹Ρ… опциях:

  • name β€” имя ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°, для ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ конфигурация, описанная Π½ΠΈΠΆΠ΅; Π² дальнСйшСм это имя ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ для Ρ€Π°Π±ΠΎΡ‚Ρ‹ с ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ (Ρ‚.Π΅. просмотра статуса/пСрСзапуска/обновлСния ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ) Ρ‡Π΅Ρ€Π΅Π· REST API Kafka Connect;
  • connector.class β€” класс ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° Π‘Π£Π‘Π”, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΡƒΠ΅ΠΌΡ‹ΠΌ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ;
  • plugin.name β€” Π½Π°Π·Π²Π°Π½ΠΈΠ΅ ΠΏΠ»Π°Π³ΠΈΠ½Π° для логичСского дСкодирования Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· WAL-Ρ„Π°ΠΉΠ»ΠΎΠ². На Π²Ρ‹Π±ΠΎΡ€ доступны wal2json, decoderbuffs ΠΈ pgoutput. ΠŸΠ΅Ρ€Π²Ρ‹Π΅ Π΄Π²Π° Ρ‚Ρ€Π΅Π±ΡƒΡŽΡ‚ установки ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΡ… Ρ€Π°ΡΡˆΠΈΡ€Π΅Π½ΠΈΠΉ Π² Π‘Π£Π‘Π”, Π° pgoutput для PostgreSQL вСрсии 10 ΠΈ Π²Ρ‹ΡˆΠ΅ Π½Π΅ Ρ‚Ρ€Π΅Π±ΡƒΠ΅Ρ‚ Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… манипуляций;
  • database.* β€” ΠΎΠΏΡ†ΠΈΠΈ для ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ Π‘Π”, Π³Π΄Π΅ database.server.name β€” имя инстанса PostgreSQL, ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌΠΎΠ΅ для формирования ΠΈΠΌΠ΅Π½ΠΈ Ρ‚ΠΎΠΏΠΈΠΊΠ° Π² кластСрС Kafka;
  • table.include.list β€” список Ρ‚Π°Π±Π»ΠΈΡ†, Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… ΠΌΡ‹ Ρ…ΠΎΡ‚ΠΈΠΌ ΠΎΡ‚ΡΠ»Π΅ΠΆΠΈΠ²Π°Ρ‚ΡŒ измСнСния; задаётся Π² Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ schema.table_name; нСльзя ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ вмСстС с table.exclude.list;
  • heartbeat.interval.ms β€” ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π» (Π² миллисСкундах), с ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ отправляСт heartbeat-сообщСния Π² ΡΠΏΠ΅Ρ†ΠΈΠ°Π»ΡŒΠ½Ρ‹ΠΉ Ρ‚ΠΎΠΏΠΈΠΊ;
  • heartbeat.action.query β€” запрос, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒΡΡ ΠΏΡ€ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ΅ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ heartbeat-сообщСния (опция появилась с вСрсии 1.1);
  • slot.name β€” имя слота Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ;
  • publication.name β€” имя ΠΏΡƒΠ±Π»ΠΈΠΊΠ°Ρ†ΠΈΠΈ Π² PostgreSQL, ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€. Π’ случаС, Ссли Π΅Ρ‘ Π½Π΅ сущСствуСт, Debezium попытаСтся Π΅Ρ‘ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ. Π’ случаС, Ссли Ρƒ ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Ρ, ΠΏΠΎΠ΄ ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ происходит ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅, нСдостаточно ΠΏΡ€Π°Π² для этого дСйствия β€” ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π·Π°Π²Π΅Ρ€ΡˆΠΈΡ‚ Ρ€Π°Π±ΠΎΡ‚Ρƒ с ошибкой;
  • transforms опрСдСляСт, ΠΊΠ°ΠΊ ΠΈΠΌΠ΅Π½Π½ΠΎ ΠΈΠ·ΠΌΠ΅Π½ΡΡ‚ΡŒ Π½Π°Π·Π²Π°Π½ΠΈΠ΅ Ρ†Π΅Π»Π΅Π²ΠΎΠ³ΠΎ Ρ‚ΠΎΠΏΠΈΠΊΠ°:
    • transforms.AddPrefix.type ΡƒΠΊΠ°Π·Ρ‹Π²Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ Π±ΡƒΠ΄Π΅ΠΌ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ рСгулярныС выраТСния;
    • transforms.AddPrefix.regex β€” маска, ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΉ пСрСопрСдСляСтся Π½Π°Π·Π²Π°Π½ΠΈΠ΅ Ρ†Π΅Π»Π΅Π²ΠΎΠ³ΠΎ Ρ‚ΠΎΠΏΠΈΠΊΠ°;
    • transforms.AddPrefix.replacement β€” нСпосрСдствСнно Ρ‚ΠΎ, Π½Π° Ρ‡Ρ‚ΠΎ пСрСопрСдСляСм.

ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ ΠΏΡ€ΠΎ heartbeat ΠΈ transforms

По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ отправляСт Π΄Π°Π½Π½Ρ‹Π΅ Π² Kafka ΠΏΠΎ ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΊΠΎΠΌΠΌΠΈΡ‚Π½ΡƒΡ‚ΠΎΠΉ Ρ‚Ρ€Π°Π½Π·Π°ΠΊΡ†ΠΈΠΈ, Π° Π΅Ρ‘ LSN (Log Sequence Number) записываСт Π² слуТСбный Ρ‚ΠΎΠΏΠΈΠΊ offset. Но Ρ‡Ρ‚ΠΎ ΠΏΡ€ΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ‚, Ссли ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ настроСн Π½Π° Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ Π½Π΅ всСй Π±Π°Π·Ρ‹ Ρ†Π΅Π»ΠΈΠΊΠΎΠΌ, Π° Ρ‚ΠΎΠ»ΡŒΠΊΠΎ части Π΅Ρ‘ Ρ‚Π°Π±Π»ΠΈΡ† (Π² ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… происходит Π½Π΅ часто)?

  • ΠšΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π±ΡƒΠ΄Π΅Ρ‚ Ρ‡ΠΈΡ‚Π°Ρ‚ΡŒ WAL-Ρ„Π°ΠΉΠ»Ρ‹ ΠΈ Π½Π΅ ΠΎΠ±Π½Π°Ρ€ΡƒΠΆΠΈΠ²Π°Ρ‚ΡŒ Π² Π½ΠΈΡ… ΠΊΠΎΠΌΠΌΠΈΡ‚Π° Ρ‚Ρ€Π°Π½Π·Π°ΠΊΡ†ΠΈΠΉ Π² Ρ‚Π΅ Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹, Π·Π° ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌΠΈ ΠΎΠ½ слСдит.
  • ΠŸΠΎΡΡ‚ΠΎΠΌΡƒ ΠΎΠ½ Π½Π΅ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΠ±Π½ΠΎΠ²Π»ΡΡ‚ΡŒ свою Ρ‚Π΅ΠΊΡƒΡ‰ΡƒΡŽ ΠΏΠΎΠ·ΠΈΡ†ΠΈΡŽ Π½ΠΈ Π² Ρ‚ΠΎΠΏΠΈΠΊΠ΅, Π½ΠΈ Π² слотС Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ.
  • Π­Ρ‚ΠΎ, Π² свою ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, ΠΏΡ€ΠΈΠ²Π΅Π΄Ρ‘Ρ‚ ΠΊ Β«ΡƒΠ΄Π΅Ρ€ΠΆΠ°Π½ΠΈΡŽΒ» WAL-Ρ„Π°ΠΉΠ»ΠΎΠ² Π½Π° дискС ΠΈ вСроятному ΠΈΡΡ‡Π΅Ρ€ΠΏΠ°Π½ΠΈΡŽ всСго дискового пространства.

И Ρ‚ΡƒΡ‚ Π½Π° ΠΏΠΎΠΌΠΎΡ‰ΡŒ приходят ΠΎΠΏΡ†ΠΈΠΈ heartbeat.interval.ms ΠΈ heartbeat.action.query. ИспользованиС этих ΠΎΠΏΡ†ΠΈΠΉ Π² ΠΏΠ°Ρ€Π΅ Π΄Π°Ρ‘Ρ‚ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ Ρ€Π°Π· ΠΏΡ€ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ΅ heartbeat-сообщСния Π²Ρ‹ΠΏΠΎΠ»Π½ΡΡ‚ΡŒ запрос Π½Π° ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… Π² ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½ΠΎΠΉ Ρ‚Π°Π±Π»ΠΈΡ†Π΅. Π’Π΅ΠΌ самым постоянно актуализируСтся LSN, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ сСйчас находится ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ (Π² слотС Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ). Π­Ρ‚ΠΎ позволяСт Π‘Π£Π‘Π” ΡƒΠ΄Π°Π»ΠΈΡ‚ΡŒ WAL-Ρ„Π°ΠΉΠ»Ρ‹, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΠΎΠ»Π΅Π΅ Π½Π΅ Π½ΡƒΠΆΠ½Ρ‹. ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ ΡƒΠ·Π½Π°Ρ‚ΡŒ ΠΎ Ρ€Π°Π±ΠΎΡ‚Π΅ ΠΎΠΏΡ†ΠΈΠΉ ΠΌΠΎΠΆΠ½ΠΎ Π² Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ.

Другая опция, достойная Π±ΠΎΠ»Π΅Π΅ ΠΏΡ€ΠΈΡΡ‚Π°Π»ΡŒΠ½ΠΎΠ³ΠΎ внимания, β€” это transforms. Π₯отя ΠΎΠ½Π° скорСС ΠΏΡ€ΠΎ удобство ΠΈ красоту…

По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Debezium создаёт Ρ‚ΠΎΠΏΠΈΠΊΠΈ, Ρ€ΡƒΠΊΠΎΠ²ΠΎΠ΄ΡΡ‚Π²ΡƒΡΡΡŒ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅ΠΉ ΠΏΠΎΠ»ΠΈΡ‚ΠΈΠΊΠΎΠΉ имСнования: serverName.schemaName.tableName. Π­Ρ‚ΠΎ Π½Π΅ всСгда ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΡƒΠ΄ΠΎΠ±Π½ΠΎ. ΠžΠΏΡ†ΠΈΡΠΌΠΈ transforms ΠΌΠΎΠΆΠ½ΠΎ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ рСгулярных Π²Ρ‹Ρ€Π°ΠΆΠ΅Π½ΠΈΠΉ ΠΎΠΏΡ€Π΅Π΄Π΅Π»ΡΡ‚ΡŒ список Ρ‚Π°Π±Π»ΠΈΡ†, эвСнты ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π½ΡƒΠΆΠ½ΠΎ ΠΌΠ°Ρ€ΡˆΡ€ΡƒΡ‚ΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π² Ρ‚ΠΎΠΏΠΈΠΊ с ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½Ρ‹ΠΌ Π½Π°Π·Π²Π°Π½ΠΈΠ΅ΠΌ.

Π’ нашСй ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ благодаря transforms происходит ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅Π΅: всС CDC-события ΠΈΠ· отслСТиваСмой Π‘Π” ΠΏΠΎΠΏΠ°Π΄ΡƒΡ‚ Π² Ρ‚ΠΎΠΏΠΈΠΊ с ΠΈΠΌΠ΅Π½Π΅ΠΌ data.cdc.dbname. Π’ ΠΏΡ€ΠΎΡ‚ΠΈΠ²Π½ΠΎΠΌ случаС (Π±Π΅Π· этих настроСк) Debezium ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Π±Ρ‹ создавал ΠΏΠΎ Ρ‚ΠΎΠΏΠΈΠΊΡƒ Π½Π° ΠΊΠ°ΠΆΠ΄ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ Π²ΠΈΠ΄Π°: pg-dev.public.<table_name>.

ΠžΠ³Ρ€Π°Π½ΠΈΡ‡Π΅Π½ΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°

Π’ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΠΈ описания ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° для PostgreSQL стоит Ρ€Π°ΡΡΠΊΠ°Π·Π°Ρ‚ΡŒ ΠΎ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΡ… особСнностях/ограничСниях Π΅Π³ΠΎ Ρ€Π°Π±ΠΎΡ‚Ρ‹:

  1. Π€ΡƒΠ½ΠΊΡ†ΠΈΠΎΠ½Π°Π» ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° для PostgreSQL полагаСтся Π½Π° ΠΊΠΎΠ½Ρ†Π΅ΠΏΡ†ΠΈΡŽ логичСского дСкодирования. ΠŸΠΎΡΡ‚ΠΎΠΌΡƒ ΠΎΠ½ Π½Π΅ отслСТиваСт запросы Π½Π° ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠ΅ структуры Π‘Π” (DDL) β€” соотвСтствСнно, Π² Ρ‚ΠΎΠΏΠΈΠΊΠ°Ρ… этих Π΄Π°Π½Π½Ρ‹Ρ… Π½Π΅ Π±ΡƒΠ΄Π΅Ρ‚.
  2. Π’Π°ΠΊ ΠΊΠ°ΠΊ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‚ΡΡ слоты Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ, ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΊ Π²Π΅Π΄ΡƒΡ‰Π΅ΠΌΡƒ экзСмпляру Π‘Π£Π‘Π”.
  3. Если ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»ΡŽ, ΠΏΠΎΠ΄ ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π°Π΅Ρ‚ΡΡ ΠΊ Π±Π°Π·Π΅ Π΄Π°Π½Π½Ρ‹Ρ…, Π²Ρ‹Π΄Π°Π½Ρ‹ ΠΏΡ€Π°Π²Π° Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π½Π° Ρ‡Ρ‚Π΅Π½ΠΈΠ΅, Ρ‚ΠΎ ΠΏΠ΅Ρ€Π΅Π΄ ΠΏΠ΅Ρ€Π²Ρ‹ΠΌ запуском потрСбуСтся Π²Ρ€ΡƒΡ‡Π½ΡƒΡŽ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ слот Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ ΠΈ ΠΏΡƒΠ±Π»ΠΈΠΊΠ°Ρ†ΠΈΡŽ Π² Π‘Π”.

ΠŸΡ€ΠΈΠΌΠ΅Π½Π΅Π½ΠΈΠ΅ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ

Π˜Ρ‚Π°ΠΊ, Π·Π°Π³Ρ€ΡƒΠ·ΠΈΠΌ Π½Π°ΡˆΡƒ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡŽ Π² ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

ΠŸΡ€ΠΎΠ²Π΅Ρ€ΡΠ΅ΠΌ, Ρ‡Ρ‚ΠΎ Π·Π°Π³Ρ€ΡƒΠ·ΠΊΠ° ΠΏΡ€ΠΎΡˆΠ»Π° ΡƒΡΠΏΠ΅ΡˆΠ½ΠΎ ΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ запустился:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

ΠžΡ‚Π»ΠΈΡ‡Π½ΠΎ: ΠΎΠ½ настроСн ΠΈ Π³ΠΎΡ‚ΠΎΠ² ΠΊ Ρ€Π°Π±ΠΎΡ‚Π΅. Π’Π΅ΠΏΠ΅Ρ€ΡŒ прикинСмся ΠΊΠΎΠ½ΡΡŒΡŽΠΌΠ΅Ρ€ΠΎΠΌ ΠΈ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡ΠΈΠΌΡΡ ΠΊ Kafka, послС Ρ‡Π΅Π³ΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΠΌ ΠΈ ΠΈΠ·ΠΌΠ΅Π½ΠΈΠΌ запись Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Π’ нашСм Ρ‚ΠΎΠΏΠΈΠΊΠ΅ это отобразится ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ:

ΠžΡ‡Π΅Π½ΡŒ Π΄Π»ΠΈΠ½Π½Ρ‹ΠΉ JSON с нашими измСнСниями

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Π’ ΠΎΠ±ΠΎΠΈΡ… случаях записи состоят ΠΈΠ· ΠΊΠ»ΡŽΡ‡Π° (PK) записи, которая Π±Ρ‹Π»Π° ΠΈΠ·ΠΌΠ΅Π½Π΅Π½Π°, ΠΈ нСпосрСдствСнно самой сути ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ: ΠΊΠ°ΠΊΠΎΠΉ Π±Ρ‹Π»Π° запись Π΄ΠΎ ΠΈ ΠΊΠ°ΠΊΠΎΠΉ стала послС.

  • Π’ случаС с INSERT: Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ Π΄ΠΎ (before) Ρ€Π°Π²Π½ΠΎ null, Π° послС β€” строка, которая Π±Ρ‹Π»Π° вставлСна.
  • Π’ случаС с UPDATE: Π² payload.before отобраТаСтся ΠΏΡ€Π΅Π΄Ρ‹Π΄ΡƒΡ‰Π΅Π΅ состояниС строки, Π° Π² payload.after β€” Π½ΠΎΠ²ΠΎΠ΅ с ΡΡƒΡ‚ΡŒΡŽ ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ.

2.2 MongoDB

Π­Ρ‚ΠΎΡ‚ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ стандартный ΠΌΠ΅Ρ…Π°Π½ΠΈΠ·ΠΌ Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ MongoDB, считывая ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ ΠΈΠ· oplog’Π° primary-ΡƒΠ·Π»Π° Π‘Π£Π‘Π”.

Аналогично ΡƒΠΆΠ΅ описанному ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Ρƒ для PgSQL, здСсь Ρ‚ΠΎΠΆΠ΅ ΠΏΡ€ΠΈ ΠΏΠ΅Ρ€Π²ΠΎΠΌ запускС снимаСтся ΠΏΠ΅Ρ€Π²ΠΈΡ‡Π½Ρ‹ΠΉ ΡΠ½Π°ΠΏΡˆΠΎΡ‚ Π΄Π°Π½Π½Ρ‹Ρ…, послС Ρ‡Π΅Π³ΠΎ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΠ΅Ρ€Π΅ΠΊΠ»ΡŽΡ‡Π°Π΅Ρ‚ΡΡ Π½Π° Ρ€Π΅ΠΆΠΈΠΌ чтСния oplog’а.

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Как ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ, здСсь Π½Π΅Ρ‚ Π½ΠΎΠ²Ρ‹Ρ… ΠΎΠΏΡ†ΠΈΠΉ ΠΏΠΎ ΡΡ€Π°Π²Π½Π΅Π½ΠΈΡŽ с ΠΏΡ€ΠΎΡˆΠ»Ρ‹ΠΌ ΠΏΡ€ΠΈΠΌΠ΅Ρ€ΠΎΠΌ, Π½ΠΎ ΡΠΎΠΊΡ€Π°Ρ‚ΠΈΠ»ΠΎΡΡŒ лишь количСство ΠΎΠΏΡ†ΠΈΠΉ, ΠΎΡ‚Π²Π΅Ρ‡Π°ΡŽΡ‰ΠΈΡ… Π·Π° ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅ ΠΊ Π‘Π” ΠΈ ΠΈΡ… прСфиксы.

Настройки transforms Π² этот Ρ€Π°Π· Π΄Π΅Π»Π°ΡŽΡ‚ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅Π΅: ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°ΡŽΡ‚ имя Ρ†Π΅Π»Π΅Π²ΠΎΠ³ΠΎ Ρ‚ΠΎΠΏΠΈΠΊΠ° ΠΈΠ· схСмы <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

ΠžΡ‚ΠΊΠ°Π·ΠΎΡƒΡΡ‚ΠΎΠΉΡ‡ΠΈΠ²ΠΎΡΡ‚ΡŒ

Вопрос отказоустойчивости ΠΈ высокой доступности Π² нашС врСмя стоит ΠΊΠ°ΠΊ Π½ΠΈΠΊΠΎΠ³Π΄Π° остро β€” особСнно ΠΊΠΎΠ³Π΄Π° ΠΌΡ‹ Π³ΠΎΠ²ΠΎΡ€ΠΈΠΌ ΠΏΡ€ΠΎ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈ Ρ‚Ρ€Π°Π½Π·Π°ΠΊΡ†ΠΈΠΈ, ΠΈ отслСТиваниС ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ Π΄Π°Π½Π½Ρ‹Ρ… Π½Π΅ стоит Π² этом вопросС Π² сторонС. Рассмотрим, Ρ‡Ρ‚ΠΎ Π² ΠΏΡ€ΠΈΠ½Ρ†ΠΈΠΏΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΠΏΠΎΠΉΡ‚ΠΈ Π½Π΅ Ρ‚Π°ΠΊ ΠΈ Ρ‡Ρ‚ΠΎ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΡ€ΠΎΠΈΡΡ…ΠΎΠ΄ΠΈΡ‚ΡŒ с Debezium Π² ΠΊΠ°ΠΆΠ΄ΠΎΠΌ ΠΈΠ· случаСв.

Π•ΡΡ‚ΡŒ Ρ‚Ρ€ΠΈ Π²Π°Ρ€ΠΈΠ°Π½Ρ‚Π° ΠΎΡ‚ΠΊΠ°Π·Π°:

  1. ΠžΡ‚ΠΊΠ°Π· Kafka Connect. Если Connect настроСн Π½Π° Ρ€Π°Π±ΠΎΡ‚Ρƒ Π² распрСдСлСнном Ρ€Π΅ΠΆΠΈΠΌΠ΅, для этого Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ нСскольким Π²ΠΎΡ€ΠΊΠ΅Ρ€Π°ΠΌ Π·Π°Π΄Π°Ρ‚ΡŒ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²Ρ‹ΠΉ group.id. Π’ΠΎΠ³Π΄Π° ΠΏΡ€ΠΈ ΠΎΡ‚ΠΊΠ°Π·Π΅ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΠΈΠ· Π½ΠΈΡ… ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠ΅Ρ€Π΅Π·Π°ΠΏΡƒΡ‰Π΅Π½ Π½Π° Π΄Ρ€ΡƒΠ³ΠΎΠΌ Π²ΠΎΡ€ΠΊΠ΅Ρ€Π΅ ΠΈ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠΈΡ‚ Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ с послСднСй ΠΊΠΎΠΌΠΌΠΈΡ‚Π½ΡƒΡ‚ΠΎΠΉ ΠΏΠΎΠ·ΠΈΡ†ΠΈΠΈ Π² Ρ‚ΠΎΠΏΠΈΠΊΠ΅ Π² Kafka.
  2. ΠŸΠΎΡ‚Π΅Ρ€Ρ связности с Kafka-кластСром. ΠšΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ просто остановит Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ Π½Π° ΠΏΠΎΠ·ΠΈΡ†ΠΈΠΈ, ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ Π½Π΅ ΡƒΠ΄Π°Π»ΠΎΡΡŒ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ Π² Kafka, ΠΈ Π±ΡƒΠ΄Π΅Ρ‚ пСриодичСски ΠΏΡ‹Ρ‚Π°Ρ‚ΡŒΡΡ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ Π΅Ρ‘, ΠΏΠΎΠΊΠ° ΠΏΠΎΠΏΡ‹Ρ‚ΠΊΠ° Π½Π΅ Π·Π°Π²Π΅Ρ€ΡˆΠΈΡ‚ΡΡ успСхом.
  3. ΠΠ΅Π΄ΠΎΡΡ‚ΡƒΠΏΠ½ΠΎΡΡ‚ΡŒ источника Π΄Π°Π½Π½Ρ‹Ρ…. ΠšΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΡ€ΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡ‚ΡŒ ΠΏΠΎΠΏΡ‹Ρ‚ΠΊΠΈ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠ³ΠΎ ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊ источнику Π² соотвСтствии с ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠ΅ΠΉ. По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ это 16 ΠΏΠΎΠΏΡ‹Ρ‚ΠΎΠΊ с использованиСм exponential backoff. ПослС 16-ΠΉ Π½Π΅ΡƒΠ΄Π°Ρ‡Π½ΠΎΠΉ ΠΏΠΎΠΏΡ‹Ρ‚ΠΊΠΈ таск Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠΌΠ΅Ρ‡Π΅Π½ ΠΊΠ°ΠΊ failed ΠΈ потрСбуСтся Π΅Π³ΠΎ Ρ€ΡƒΡ‡Π½ΠΎΠΉ пСрСзапуск Ρ‡Π΅Ρ€Π΅Π· REST-интСрфСйс Kafka Connect.
    • Π’ случаС с PostgreSQL Π΄Π°Π½Π½Ρ‹Π΅ Π½Π΅ ΠΏΡ€ΠΎΠΏΠ°Π΄ΡƒΡ‚, Ρ‚.ΠΊ. использованиС слотов Ρ€Π΅ΠΏΠ»ΠΈΠΊΠ°Ρ†ΠΈΠΈ Π½Π΅ даст ΡƒΠ΄Π°Π»ΠΈΡ‚ΡŒ WAL-Ρ„Π°ΠΉΠ»Ρ‹, Π½Π΅ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Π½Π½Ρ‹Π΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ. Π’ этом случаС Π΅ΡΡ‚ΡŒ ΠΈ обратная сторона ΠΌΠ΅Π΄Π°Π»ΠΈ: Ссли Π½Π° ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠ΅ врСмя Π±ΡƒΠ΄Π΅Ρ‚ Π½Π°Ρ€ΡƒΡˆΠ΅Π½Π° сСтСвая ΡΠ²ΡΠ·Π½ΠΎΡΡ‚ΡŒ ΠΌΠ΅ΠΆΠ΄Ρƒ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ ΠΈ Π‘Π£Π‘Π”, Π΅ΡΡ‚ΡŒ Π²Π΅Ρ€ΠΎΡΡ‚Π½ΠΎΡΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ мСсто Π½Π° дискС закончится, Π° это ΠΌΠΎΠΆΠ΅Ρ‚ ΠΌΠΎΠΆΠ΅Ρ‚ привСсти ΠΊ ΠΎΡ‚ΠΊΠ°Π·Ρƒ Π‘Π£Π‘Π” Ρ†Π΅Π»ΠΈΠΊΠΎΠΌ.
    • Π’ случаС с MySQL Ρ„Π°ΠΉΠ»Ρ‹ Π±ΠΈΠ½Π»ΠΎΠ³ΠΎΠ² ΠΌΠΎΠ³ΡƒΡ‚ Π±Ρ‹Ρ‚ΡŒ ΠΎΡ‚Ρ€ΠΎΡ‚ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹ самой Π‘Π£Π‘Π” Ρ€Π°Π½ΡŒΡˆΠ΅, Ρ‡Π΅ΠΌ восстановится ΡΠ²ΡΠ·Π½ΠΎΡΡ‚ΡŒ. Π­Ρ‚ΠΎ ΠΏΡ€ΠΈΠ²Π΅Π΄Ρ‘Ρ‚ ΠΊ Ρ‚ΠΎΠΌΡƒ, Ρ‡Ρ‚ΠΎ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΠ΅Ρ€Π΅ΠΉΠ΄Ρ‘Ρ‚ Π² состояниС failed, Π° для восстановлСния Π½ΠΎΡ€ΠΌΠ°Π»ΡŒΠ½ΠΎΠ³ΠΎ функционирования потрСбуСтся ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½Ρ‹ΠΉ запуск Π² Ρ€Π΅ΠΆΠΈΠΌΠ΅ initial snapshot для продолТСния чтСния ΠΈΠ· Π±ΠΈΠ½Π»ΠΎΠ³ΠΎΠ².
    • ΠŸΡ€ΠΎ MongoDB. ДокумСнтация гласит: ΠΏΠΎΠ²Π΅Π΄Π΅Π½ΠΈΠ΅ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π° Π² случаС, Ссли Ρ„Π°ΠΉΠ»Ρ‹ ΠΆΡƒΡ€Π½Π°Π»ΠΎΠ²/oplog’Π° Π±Ρ‹Π»ΠΈ ΡƒΠ΄Π°Π»Π΅Π½Ρ‹ ΠΈ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π½Π΅ ΠΌΠΎΠΆΠ΅Ρ‚ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠΈΡ‚ΡŒ Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ с Ρ‚ΠΎΠΉ ΠΏΠΎΠ·ΠΈΡ†ΠΈΠΈ, Π³Π΄Π΅ остановился, ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΠΎ для всСх Π‘Π£Π‘Π”. Оно Π·Π°ΠΊΠ»ΡŽΡ‡Π°Π΅Ρ‚ΡΡ Π² Ρ‚ΠΎΠΌ, Ρ‡Ρ‚ΠΎ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΠ΅Ρ€Π΅ΠΉΠ΄Ρ‘Ρ‚ Π² состояниС failed ΠΈ ΠΏΠΎΡ‚Ρ€Π΅Π±ΡƒΠ΅Ρ‚ ΠΏΠΎΠ²Ρ‚ΠΎΡ€Π½ΠΎΠ³ΠΎ запуска Π² Ρ€Π΅ΠΆΠΈΠΌΠ΅ initial snapshot.

      Однако Π±Ρ‹Π²Π°ΡŽΡ‚ ΠΈΡΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ. Если ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠ΅ врСмя находился Π² ΠΎΡ‚ΠΊΠ»ΡŽΡ‡Π΅Π½Π½ΠΎΠΌ состоянии (ΠΈΠ»ΠΈ Π½Π΅ ΠΌΠΎΠ³ Π΄ΠΎΡΡ‚ΡƒΡ‡Π°Ρ‚ΡŒΡΡ Π΄ΠΎ экзСмпляра MongoDB), Π° oplog Π·Π° это врСмя ΠΏΡ€ΠΎΡˆΡ‘Π» Ρ€ΠΎΡ‚Π°Ρ†ΠΈΡŽ, Ρ‚ΠΎ ΠΏΡ€ΠΈ восстановлСнии ΠΏΠΎΠ΄ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ Π½Π΅Π²ΠΎΠ·ΠΌΡƒΡ‚ΠΈΠΌΠΎ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠΈΡ‚ Ρ‡ΠΈΡ‚Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ с ΠΏΠ΅Ρ€Π²ΠΎΠΉ доступной ΠΏΠΎΠ·ΠΈΡ†ΠΈΠΈ, ΠΈΠ·-Π·Π° Ρ‡Π΅Π³ΠΎ Ρ‡Π°ΡΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Ρ… Π² Kafka Π½Π΅ ΠΏΠΎΠΏΠ°Π΄Ρ‘Ρ‚.

Π—Π°ΠΊΠ»ΡŽΡ‡Π΅Π½ΠΈΠ΅

Debezium β€” ΠΌΠΎΠΉ ΠΏΠ΅Ρ€Π²Ρ‹ΠΉ ΠΎΠΏΡ‹Ρ‚ Ρ€Π°Π±ΠΎΡ‚Ρ‹ с CDC-систСмами ΠΈ Π² Ρ†Π΅Π»ΠΎΠΌ вСсьма ΠΏΠΎΠ»ΠΎΠΆΠΈΡ‚Π΅Π»ΡŒΠ½Ρ‹ΠΉ. ΠŸΡ€ΠΎΠ΅ΠΊΡ‚ ΠΏΠΎΠ΄ΠΊΡƒΠΏΠΈΠ» ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ основных Π‘Π£Π‘Π”, простотой ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΠΈ, ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ кластСризации ΠΈ Π°ΠΊΡ‚ΠΈΠ²Π½Ρ‹ΠΌ сообщСством. Π—Π°ΠΈΠ½Ρ‚Π΅Ρ€Π΅ΡΠΎΠ²Π°Π²ΡˆΠΈΠΌΡΡ ΠΏΡ€Π°ΠΊΡ‚ΠΈΠΊΠΎΠΉ Ρ€Π΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡƒΡŽ ΠΎΠ·Π½Π°ΠΊΠΎΠΌΠΈΡ‚ΡŒΡΡ с Π³Π°ΠΉΠ΄Π°ΠΌΠΈ для Kafka Connect ΠΈ Debezium.

По ΡΡ€Π°Π²Π½Π΅Π½ΠΈΡŽ с JDBC-ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€ΠΎΠΌ для Kafka Connect основным прСимущСством Debezium являСтся Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ измСнСния ΡΡ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ ΠΈΠ· ΠΆΡƒΡ€Π½Π°Π»ΠΎΠ² Π‘Π£Π‘Π”, Ρ‡Ρ‚ΠΎ позволяСт ΠΏΠΎΠ»ΡƒΡ‡Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ с минимальной Π·Π°Π΄Π΅Ρ€ΠΆΠΊΠΎΠΉ. JDBC Connector (ΠΈΠ· поставки Kafka Connect) Π΄Π΅Π»Π°Π΅Ρ‚ запросы ΠΊ отслСТиваСмой Ρ‚Π°Π±Π»ΠΈΡ†Π΅ с фиксированным ΠΈΠ½Ρ‚Π΅Ρ€Π²Π°Π»ΠΎΠΌ ΠΈ (ΠΏΠΎ этой ΠΆΠ΅ ΠΏΡ€ΠΈΡ‡ΠΈΠ½Π΅) Π½Π΅ Π³Π΅Π½Π΅Ρ€ΠΈΡ€ΡƒΠ΅Ρ‚ сообщСния ΠΏΡ€ΠΈ ΡƒΠ΄Π°Π»Π΅Π½ΠΈΠΈ Π΄Π°Π½Π½Ρ‹Ρ… (ΠΊΠ°ΠΊ ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡ€ΠΎΡΠΈΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π½Π΅Ρ‚?).

Для Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ схоТих Π·Π°Π΄Π°Ρ‡ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ Ρ€Π΅ΡˆΠ΅Π½ΠΈΡ (ΠΏΠΎΠΌΠΈΠΌΠΎ Debezium):

P.S.

Π§ΠΈΡ‚Π°ΠΉΡ‚Π΅ Ρ‚Π°ΠΊΠΆΠ΅ Π² нашСм Π±Π»ΠΎΠ³Π΅:

Π˜ΡΡ‚ΠΎΡ‡Π½ΠΈΠΊ: habr.com