Знайомство з Debezium - CDC для Apache Kafka

Знайомство з Debezium - CDC для Apache Kafka

У своїй роботі часто стикаюся з новими технічними рішеннями/програмними продуктами, інформації про які в російськомовному інтернеті досить мало. Цією статтею постараюся заповнити одну таку прогалину прикладом зі своєї недавньої практики, коли потрібно було налаштувати відправлення CDC-подій із двох популярних СУБД (PostgreSQL та MongoDB) до кластеру Kafka за допомогою Debezium. Сподіваюся, ця оглядова стаття, що з'явилася за підсумками виконаної роботи, виявиться корисною та іншим.

Що за Debezium та взагалі CDC?

дебезій - Представник категорії програмного забезпечення CDC (Capture Data Change), а якщо точніше, це набір конекторів для різних СУБД, сумісних з фреймворком Apache Kafka Connect.

Це Open Source-проект, використовує ліцензію Apache License v2.0 та спонсорований компанією Red Hat. Розробка ведеться з 2016 року і зараз у ньому представлена ​​офіційна підтримка наступних СУБД: MySQL, PostgreSQL, MongoDB, SQL Server. Також існують конектори для Cassandra і Oracle, але на даний момент вони знаходяться в статусі раннього доступу, а нові релізи не гарантують зворотної сумісності.

Якщо порівнювати CDC із традиційним підходом (коли додаток читає дані із СУБД безпосередньо), то до його головних переваг відносять реалізацію стрімінгу зміни даних на рівні рядків із низькою затримкою, високою надійністю та доступністю. Останні два пункти досягаються завдяки використанню кластера Kafka як сховища CDC-подій.

Також до переваг можна віднести той факт, що для зберігання подій використовується єдина модель, тому кінцевому додатку не доведеться турбуватися про нюанси експлуатації різних СУБД.

Нарешті, завдяки використанню брокера повідомлень відкривається простір для горизонтального масштабування додатків, які відстежують зміни даних. У цьому впливом геть джерело даних зводиться до мінімуму, оскільки отримання даних відбувається безпосередньо з СУБД, та якщо з кластера Kafka.

Про архітектуру Debezium

Використання Debezium зводиться до такої простої схеми:

СУБД (як джерело даних) → конектор в Kafka Connect → Apache Kafka → консьюмер

Як ілюстрацію наведу схему із сайту проекту:

Знайомство з Debezium - CDC для Apache Kafka

Однак ця схема мені не дуже подобається, оскільки складається враження, що можливе лише використання sink-конектора.

Насправді ситуація відрізняється: наповнення вашого Data Lake (Остання ланка на схемі вище) - Це не єдиний спосіб застосування Debezium. Події, надіслані в Apache Kafka, можуть використовуватися вашими програмами для вирішення різних ситуацій. Наприклад:

  • видалення неактуальних даних із кешу;
  • відправлення повідомлень;
  • оновлення пошукових індексів;
  • подібність логів аудиту;
  • ...

У випадку, якщо у вас додаток на Java і немає необхідності використовувати кластер Kafka, існує також можливість роботи через embedded-конектор. Очевидний плюс у тому, що з ним можна відмовитися від додаткової інфраструктури (у вигляді конектора та Kafka). Однак це рішення оголошено застарілим (deprecated) з версії 1.1 та більше не рекомендується до використання (у майбутніх релізах його підтримку можуть усунути).

У цій статті розглядатиметься рекомендована розробниками архітектура, яка забезпечує відмовостійкість та можливість масштабування.

Конфігурація конектора

Для того, щоб почати відслідковувати зміни найголовнішої цінності даних, нам знадобляться:

  1. джерело даних, яким може бути MySQL, починаючи з версії 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (повний список);
  2. кластер Apache Kafka;
  3. інстанс Kafka Connect (версії 1. x, 2. x);
  4. конфігурований конектор Debezium.

Роботи за першими двома пунктами, тобто. процес інсталяції СУБД та Apache Kafka, виходять за рамки статті. Однак для тих, хто хоче розгорнути все у пісочниці, в офіційному репозиторії з прикладами є готовий docker-compose.yaml.

Ми зупинимося докладніше на двох останніх пунктах.

0. Kafka Connect

Тут і далі у статті усі приклади конфігурації розглядаються в контексті Docker-образу, що розповсюджується розробниками Debezium. Він містить усі необхідні файли плагінів (конектори) та передбачає конфігурацію Kafka Connect за допомогою змінних оточення.

У випадку, якщо передбачається використання Kafka Connect від Confluent, потрібно самостійно додати плагіни необхідних конекторів до директорії, зазначеної в plugin.path або задається через змінну оточення CLASSPATH. Налаштування воркера Kafka Connect та конекторів визначаються через конфігураційні файли, що передаються аргументами до команди запуску воркера. Детальніше див. документації.

Весь процес налаштування Debeizum у варіанті з конектором здійснюється в два етапи. Розглянемо кожен із них:

1. Налаштування фреймворку Kafka Connect

Для стримінгу даних у кластер Apache Kafka у фреймворку Kafka Connect задаються специфічні параметри, такі як:

  • параметри підключення до кластера,
  • назви топіків, в яких зберігатиметься безпосередньо конфігурація самого конектора,
  • ім'я групи, в якій запущено конектор (у разі використання distributed-режиму).

Офіційний Docker образ проекту підтримує конфігурацію за допомогою змінних оточення - цим і скористаємося. Отже, завантажуємо образ:

docker pull debezium/connect

Мінімальний набір змінних оточення, необхідний для запуску конектора, виглядає так:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Початковий список серверів кластера Kafka для отримання повного списку членів кластера;
  • OFFSET_STORAGE_TOPIC=connector-offsets - Топік для зберігання позицій, на яких на даний момент знаходиться конектор;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - Топік для зберігання статусу конектора та його завдань;
  • CONFIG_STORAGE_TOPIC=connector-config - Топік для зберігання даних конфігурації конектора та його завдань;
  • GROUP_ID=1 - Ідентифікатор групи воркерів, на яких може виконуватися завдання конектора; необхідний при використанні розподіленого (Distributed) режиму.

Запускаємо контейнер із цими змінними:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Примітка про Avro

За замовчуванням Debezium пише дані у форматі JSON, що є прийнятним для пісочниць і невеликих обсягів даних, але може стати проблемою у високонавантажених базах. Альтернативою JSON-конвертеру є серіалізація повідомлень за допомогою Avro у бінарний формат, що дозволяє знизити навантаження на підсистему I/O в Apache Kafka.

Для використання Avro потрібно розгорнути окремий schema-registry (Для зберігання схем). Змінні для конвертера виглядатимуть так:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Деталі щодо використання Avro та налаштування registry за нього виходять за рамки статті – далі для наочності ми будемо використовувати JSON.

2. Налаштування самого конектора

Тепер можна перейти безпосередньо до конфігурації самого конектора, який читатиме дані з джерела.

Розглянемо на прикладі конекторів для двох СУБД: PostgreSQL і MongoDB, — за якими я маю досвід і за якими є відмінності (нехай і невеликі, але в деяких випадках — суттєві!).

Конфігурація описується в нотації JSON і завантажується в Kafka Connect за допомогою запиту POST.

2.1. PostgreSQL

Приклад конфігурації конектора для PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Принцип роботи конектора після такого налаштування досить простий:

  • При першому запуску він підключається до бази, вказаної у конфігурації, та запускається в режимі initial snapshotнадсилаючи в Kafka початковий набір даних, отриманих за допомогою умовного SELECT * FROM table_name.
  • Після завершення ініціалізації конектор переходить у режим читання змін з WAL-файлів PostgreSQL.

Про опції, що використовуються:

  • name - Ім'я конектора, для якого використовується конфігурація, описана нижче; надалі це ім'я використовується для роботи з конектором (тобто перегляду статусу/перезапуску/оновлення конфігурації) через REST API Kafka Connect;
  • connector.class - Клас конектора СУБД, який буде використовуватися конектором конектором;
  • plugin.name - Назва плагіна для логічного декодування даних із WAL-файлів. На вибір доступні wal2json, decoderbuffs и pgoutput. Перші два вимагають встановлення відповідних розширень у СУБД, а pgoutput для PostgreSQL версії 10 та вище не потребує додаткових маніпуляцій;
  • database.* - Опції для підключення до БД, де database.server.name - Ім'я інстансу PostgreSQL, що використовується для формування імені топіка в кластері Kafka;
  • table.include.list — список таблиць, у яких хочемо відстежувати зміни; задається у форматі schema.table_name; не можна використовувати разом з table.exclude.list;
  • heartbeat.interval.ms - інтервал (у мілісекундах), з яким конектор відправляє heartbeat-повідомлення до спеціального топіка;
  • heartbeat.action.query — запит, який буде виконуватися під час надсилання кожного heartbeat-повідомлення (опція з'явилася з версії 1.1);
  • slot.name - Ім'я слота реплікації, який буде використовуватися конектором;
  • publication.name - ім'я публікації у PostgreSQL, яку використовує конектор. Якщо її не існує, Debezium спробує її створити. У випадку, якщо у користувача, під яким відбувається підключення, недостатньо прав для цієї дії, конектор завершить роботу з помилкою;
  • transforms визначає, як саме змінювати назву цільового топіка:
    • transforms.AddPrefix.type вказує, що будемо використовувати регулярні вирази;
    • transforms.AddPrefix.regex - маска, за якою перевизначається назва цільового топіка;
    • transforms.AddPrefix.replacement - Саме те, на що перевизначаємо.

Детальніше про heartbeat та transforms

За умовчанням конектор надсилає дані в Kafka по кожній комітну транзакції, а її LSN (Log Sequence Number) записує в службовий топік offset. Але що станеться, якщо конектор налаштований на читання не всієї бази повністю, лише частини її таблиць (у яких оновлення даних відбувається не часто)?

  • Конектор читатиме WAL-файли і не виявлятиме в них коміту транзакцій у ті таблиці, за якими він стежить.
  • Тому він не оновлюватиме свою поточну позицію ні в топіці, ні в слоті реплікації.
  • Це, у свою чергу, призведе до «утримання» WAL-файлів на диску та ймовірного вичерпання всього дискового простору.

І тут на допомогу приходять опції heartbeat.interval.ms и heartbeat.action.query. Використання цих опцій у парі дає можливість щоразу при надсиланні heartbeat-повідомлення виконувати запит на зміну даних в окремій таблиці. Тим самим постійно актуалізується LSN, на якому зараз знаходиться конектор (у слоті реплікації). Це дозволяє СУБД видалити WAL-файли, які не потрібні. Детальніше дізнатися про роботу опцій можна у документації.

Інша опція, варта більш пильної уваги, - це transforms. Хоча вона скоріше про зручність і красу.

За замовчуванням Debezium створює топики, керуючись наступною політикою іменування: serverName.schemaName.tableName. Це не завжди може бути зручним. Опціями transforms можна за допомогою регулярних виразів визначати список таблиць, евенти з яких потрібно маршрутизувати до топіка з конкретною назвою.

У нашій конфігурації завдяки transforms відбувається таке: всі CDC-події з відстежуваної БД потраплять у топік з ім'ям data.cdc.dbname. В іншому випадку (без цих налаштувань) Debezium за замовчуванням створював би по топіку на кожну таблицю виду: pg-dev.public.<table_name>.

Обмеження конектора

На завершення опису конфігурації конектора для PostgreSQL варто розповісти про наступні особливості/обмеження його роботи:

  1. Функціонал конектора для PostgreSQL покладається концепцію логічного декодування. Тому він не відслідковує запити на зміну структури БД (DDL) — відповідно, у топах цих даних не буде.
  2. Оскільки використовуються слоти реплікації, підключення конектора можливе лише до провідного екземпляра СУБД.
  3. Якщо користувачеві, під яким конектор підключається до бази даних, видано права лише на читання, перед першим запуском потрібно вручну створити слот реплікації та публікацію в БД.

Застосування конфігурації

Отже, завантажимо нашу конфігурацію в конектор:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Перевіряємо, що завантаження пройшло успішно і конектор запустився:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Відмінно: він налаштований та готовий до роботи. Тепер прикинемось консьюмером і підключимося до Kafka, після чого додамо та змінимо запис у таблиці:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

У нашому топіці це відобразиться таким чином:

Дуже довгий JSON з нашими змінами

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

В обох випадках записи складаються з ключа (PK) запису, який був змінений, і безпосередньо самої суті змін: який був запис до і який став після.

  • У випадку з INSERT: значення до (before) одно null, а після — рядок, який було вставлено.
  • У випадку з UPDATEpayload.before відображається попередній стан рядка, а в payload.after — нове із суттю змін.

2.2 MongoDB

Цей конектор використовує стандартний механізм реплікації MongoDB, зчитуючи інформацію з oplog'а primary-вузла СУБД.

Аналогічно вже описаному конектору для PgSQL, тут також при першому запуску знімається первинний снапшот даних, після чого конектор перемикається на режим читання oplog'а.

Приклад конфігурації:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Як можна помітити, тут немає нових опцій у порівнянні з минулим прикладом, але скоротилася лише кількість опцій, які відповідають за підключення до БД та їх префікси.

Налаштування transforms цього разу роблять таке: перетворюють ім'я цільового топіка зі схеми <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

Відмовостійкість

Питання відмовостійкості та високої доступності в наш час стоїть як ніколи гостро — особливо коли ми говоримо про дані та транзакції, і відстеження змін даних не стоїть у цьому питанні осторонь. Розглянемо, що в принципі може піти не так і що відбуватиметься з Debezium у кожному з випадків.

Є три варіанти відмови:

  1. Відмова Kafka Connect. Якщо Connect налаштований на роботу у розподіленому режимі, для цього необхідно кільком воркерам задати однаковий group.id. Тоді при відмові одного з них конектор буде перезапущено на іншому воркері і продовжить читання з останньої позиції в топіку в Kafka.
  2. Втрата зв'язків з Kafka-кластером. Конектор просто зупинить читання на позиції, яку не вдалося відправити в Kafka, і періодично намагатиметься повторно відправити її, поки спроба не завершиться успіхом.
  3. Недоступність джерела даних. Конектор буде робити спроби повторного підключення до джерела відповідно до конфігурації. За промовчанням це 16 спроб з використанням експоненційний відкат. Після 16-ї невдалої спроби таск буде помічений як не вдалося і буде потрібно його перезапуск через REST-інтерфейс Kafka Connect.
    • У випадку з PostgreSQL дані пропадуть, т.к. використання слотів реплікації не дасть видалити WAL-файли, не прочитані конектором. У цьому випадку є і зворотний бік медалі: якщо на тривалий час буде порушено мережевий зв'язок між конектором і СУБД, є ймовірність, що місце на диску закінчиться, а це може призвести до відмови СУБД цілком.
    • У випадку з MySQL файли бінлогів можуть бути відротовані СУБД раніше, ніж відновиться зв'язність. Це призведе до того, що конектор перейде в стан failed, а для відновлення нормального функціонування буде потрібно повторний запуск режиму initial snapshot для продовження читання з бінлогів.
    • Про MongoDB. Документація говорить: поведінка коннектора у разі, якщо файли журналів/oplog'а були видалені і коннектор неспроможна продовжити читання з позиції, де зупинився, однаково всім СУБД. Воно полягає в тому, що конектор перейде в стан не вдалося і вимагатиме повторного запуску в режимі initial snapshot.

      Проте бувають винятки. Якщо конектор тривалий час перебував у відключеному стані (або не міг достукатися до екземпляра MongoDB), а oplog за цей час пройшов ротацію, то при відновленні підключення конектор незворушно продовжить читати дані з першої доступної позиції, через що частина даних у Kafka НЕ потрапить.

Висновок

Debezium - мій перший досвід роботи з CDC-системами і загалом дуже позитивний. Проект підкупив підтримкою основних СУБД, простотою конфігурації, підтримкою кластеризації та активною спільнотою. Зацікавився практикою рекомендую ознайомитися з гайдами для Kafka Connect и дебезій.

У порівнянні з JDBC-конектором для Kafka Connect основною перевагою Debezium є те, що зміни зчитуються з журналів СУБД, що дозволяє отримувати дані з мінімальною затримкою. JDBC Connector (з поставки Kafka Connect) робить запити до таблиці, що відстежується, з фіксованим інтервалом і (з цієї ж причини) не генерує повідомлення при видаленні даних (як можна запитати дані, яких немає?).

Для вирішення подібних завдань можна звернути увагу на такі рішення (крім Debezium):

PS

Читайте також у нашому блозі:

Джерело: habr.com

Додати коментар або відгук