Apache Kafka uchun Debezium - CDC bilan tanishtirish

Apache Kafka uchun Debezium - CDC bilan tanishtirish

Mening ishimda men tez-tez yangi texnik echimlar/dasturiy ta'minot mahsulotlariga duch kelaman, ular haqida ma'lumot rus tilidagi Internetda juda kam. Ushbu maqola bilan men ikkita mashhur DBMS (PostgreSQL va MongoDB) dan Debezium yordamida Kafka klasteriga CDC hodisalarini jo'natishni sozlashim kerak bo'lganda, yaqindagi amaliyotimdan misol bilan shunday bo'shliqni to'ldirishga harakat qilaman. Bajarilgan ishlar natijasida paydo bo'lgan ushbu sharh maqolasi boshqalarga foydali bo'ladi deb umid qilaman.

Debezium va CDC umuman nima?

Debezium - CDC dasturiy ta'minot toifasi vakili (Ma'lumotlar o'zgarishini yozib oling), yoki aniqrog'i, bu Apache Kafka Connect ramkasi bilan mos keladigan turli xil ma'lumotlar bazalari uchun ulagichlar to'plami.

bu Ochiq kodli loyiha, Apache License v2.0 ostida litsenziyalangan va Red Hat homiyligida. Rivojlanish 2016 yildan beri davom etmoqda va hozirda u quyidagi ma'lumotlar bazalarini rasmiy qo'llab-quvvatlaydi: MySQL, PostgreSQL, MongoDB, SQL Server. Shuningdek, Cassandra va Oracle uchun ulagichlar mavjud, ammo ular hozirda "erta kirish" holatida va yangi nashrlar orqaga qarab muvofiqlikni kafolatlamaydi.

Agar biz CDCni an'anaviy yondashuv bilan solishtirsak (ilova ma'lumotlar bazasidan ma'lumotlarni to'g'ridan-to'g'ri o'qiganda), uning asosiy afzalliklari qator darajasida past kechikish, yuqori ishonchlilik va mavjudlik bilan ma'lumotlarni o'zgartirish oqimini amalga oshirishni o'z ichiga oladi. Oxirgi ikki nuqtaga Kafka klasteridan CDC hodisalari uchun ombor sifatida foydalanish orqali erishiladi.

Yana bir afzallik shundaki, hodisalarni saqlash uchun bitta modeldan foydalaniladi, shuning uchun oxirgi dastur turli DBMSlarni ishlatish nuanslari haqida tashvishlanmaydi.

Nihoyat, xabar brokeridan foydalanish ma'lumotlardagi o'zgarishlarni kuzatuvchi ilovalarga gorizontal ravishda kengayish imkonini beradi. Shu bilan birga, ma'lumotlar manbasiga ta'sir minimallashtiriladi, chunki ma'lumotlar to'g'ridan-to'g'ri DBMSdan emas, balki Kafka klasteridan olinadi.

Debezium arxitekturasi haqida

Debezium-dan foydalanish ushbu oddiy sxemaga tushadi:

DBMS (ma'lumotlar manbai sifatida) → Kafka Connect-dagi ulagich → Apache Kafka → iste'molchi

Misol tariqasida loyiha veb-saytidagi diagrammani keltiramiz:

Apache Kafka uchun Debezium - CDC bilan tanishtirish

Biroq, men bu sxemani yoqtirmayman, chunki faqat lavabo ulagichidan foydalanish mumkin.

Aslida, vaziyat boshqacha: ma'lumotlar ko'lingizni to'ldirish (yuqoridagi diagrammadagi oxirgi havola) Bu Debeziumdan foydalanishning yagona usuli emas. Apache Kafkaga yuborilgan voqealar turli vaziyatlarni hal qilish uchun ilovalaringiz tomonidan ishlatilishi mumkin. Masalan:

  • keshdan keraksiz ma'lumotlarni olib tashlash;
  • bildirishnomalarni yuborish;
  • qidiruv indeksi yangilanishlari;
  • qandaydir audit jurnallari;
  • ...

Agar sizda Java ilovasi bo'lsa va Kafka klasteridan foydalanishga hojat/imkoniyat bo'lmasa, u orqali ishlash imkoniyati ham mavjud. o'rnatilgan ulagich. Aniq afzalligi shundaki, u qo'shimcha infratuzilmaga bo'lgan ehtiyojni yo'q qiladi (ulagich va Kafka shaklida). Biroq, bu yechim 1.1 versiyasidan beri eskirgan va endi foydalanish tavsiya etilmaydi (uni qo'llab-quvvatlash kelgusi versiyalarda olib tashlanishi mumkin).

Ushbu maqolada ishlab chiquvchilar tomonidan tavsiya etilgan arxitektura muhokama qilinadi, bu esa xatolarga chidamlilik va kengayishni ta'minlaydi.

Ulagich konfiguratsiyasi

Eng muhim qiymat - ma'lumotlardagi o'zgarishlarni kuzatishni boshlash uchun bizga kerak:

  1. 5.7 versiyasidan boshlab MySQL, PostgreSQL 9.6+, MongoDB 3.2+ boʻlishi mumkin boʻlgan maʼlumotlar manbai (to'liq ro'yxat);
  2. Apache Kafka klasteri;
  3. Kafka Connect namunasi (1.x, 2.x versiyalari);
  4. sozlangan Debezium ulagichi.

Birinchi ikkita nuqta ustida ishlang, ya'ni. DBMS va Apache Kafka ni o'rnatish jarayoni maqola doirasidan tashqarida. Biroq, hamma narsani qum qutisiga joylashtirmoqchi bo'lganlar uchun misollar bilan rasmiy omborda tayyor mavjud. docker-compose.yaml.

Oxirgi ikki nuqtada batafsilroq to'xtalamiz.

0. Kafka Connect

Bu erda va keyingi maqolada barcha konfiguratsiya misollari Debezium ishlab chiquvchilari tomonidan tarqatilgan Docker tasviri kontekstida muhokama qilinadi. U barcha kerakli plagin fayllarini (ulagichlar) o'z ichiga oladi va muhit o'zgaruvchilari yordamida Kafka Connect konfiguratsiyasini ta'minlaydi.

Agar siz Confluent-dan Kafka Connect-dan foydalanmoqchi bo'lsangiz, unda ko'rsatilgan katalogga kerakli konnektorlarning plaginlarini mustaqil ravishda qo'shishingiz kerak bo'ladi. plugin.path yoki muhit o'zgaruvchisi orqali o'rnatiladi CLASSPATH. Kafka Connect ishchisi va ulagichlari uchun sozlamalar ishchi ishga tushirish buyrug'iga argument sifatida uzatiladigan konfiguratsiya fayllari orqali aniqlanadi. Batafsil ma'lumot uchun qarang hujjatlar.

Debeizumni ulagich versiyasida o'rnatishning butun jarayoni ikki bosqichda amalga oshiriladi. Keling, ularning har birini ko'rib chiqaylik:

1. Kafka Connect ramkasini sozlash

Apache Kafka klasteriga ma'lumotlarni uzatish uchun Kafka Connect tizimida maxsus parametrlar o'rnatiladi, masalan:

  • klasterga ulanish parametrlari,
  • ulagichning konfiguratsiyasi to'g'ridan-to'g'ri saqlanadigan mavzular nomlari,
  • ulagich ishlaydigan guruhning nomi (agar taqsimlangan rejim ishlatilsa).

Loyihaning rasmiy Docker tasviri muhit o'zgaruvchilari yordamida konfiguratsiyani qo'llab-quvvatlaydi - biz bundan foydalanamiz. Shunday qilib, rasmni yuklab oling:

docker pull debezium/connect

Ulagichni ishga tushirish uchun zarur bo'lgan minimal muhit o'zgaruvchilari to'plami quyidagicha:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — klaster aʼzolarining toʻliq roʻyxatini olish uchun Kafka klaster serverlarining dastlabki roʻyxati;
  • OFFSET_STORAGE_TOPIC=connector-offsets — hozirda ulagich joylashgan pozitsiyalarni saqlash mavzusi;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — ulagichning holatini va uning vazifalarini saqlash mavzusi;
  • CONFIG_STORAGE_TOPIC=connector-config — ulagich konfiguratsiyasi ma'lumotlarini va uning vazifalarini saqlash mavzusi;
  • GROUP_ID=1 — ulagich vazifasini bajarish mumkin bo'lgan ishchilar guruhining identifikatori; taqsimlangan foydalanishda zarur (tarqatilgan) tartib.

Biz ushbu o'zgaruvchilar bilan konteynerni ishga tushiramiz:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro haqida eslatma

Odatiy bo'lib, Debezium ma'lumotlarni JSON formatida yozadi, bu sandboxlar va kichik hajmdagi ma'lumotlar uchun maqbuldir, lekin yuqori yuklangan ma'lumotlar bazalarida muammoga aylanishi mumkin. JSON konvertoriga alternativa xabarlarni ketma-ketlashtirishdir Evro ikkilik formatga o'tkazing, bu Apache Kafkadagi I/U quyi tizimidagi yukni kamaytiradi.

Avro-dan foydalanish uchun siz alohida o'rnatishingiz kerak sxema-reestr (diagrammalarni saqlash uchun). Konverter uchun o'zgaruvchilar quyidagicha ko'rinadi:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro-dan foydalanish va uning registrini o'rnatish bo'yicha tafsilotlar ushbu maqola doirasidan tashqarida - bundan keyin aniqlik uchun biz JSON-dan foydalanamiz.

2. Ulagichning o'zini sozlash

Endi siz to'g'ridan-to'g'ri ulagichning konfiguratsiyasiga o'tishingiz mumkin, u manbadan ma'lumotlarni o'qiydi.

Keling, ikkita DBMS uchun ulagichlar misolini ko'rib chiqaylik: PostgreSQL va MongoDB, ularda men tajribaga egaman va ularda farqlar mavjud (kichik bo'lsa ham, lekin ba'zi hollarda muhim!).

Konfiguratsiya JSON yozuvida tasvirlangan va POST so'rovi yordamida Kafka Connect-ga yuklangan.

2.1. PostgreSQL

PostgreSQL uchun konnektor konfiguratsiyasiga misol:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ushbu o'rnatishdan keyin ulagichning ishlash printsipi juda oddiy:

  • Birinchi marta ishga tushirilganda, u konfiguratsiyada ko'rsatilgan ma'lumotlar bazasiga ulanadi va rejimda boshlanadi dastlabki surat, shartli yordamida olingan dastlabki ma'lumotlar to'plamini Kafkaga yuborish SELECT * FROM table_name.
  • Boshlash tugallangandan so'ng, ulagich PostgreSQL WAL fayllaridagi o'zgarishlarni o'qish rejimiga o'tadi.

Amaldagi variantlar haqida:

  • name — quyida tavsiflangan konfiguratsiya ishlatiladigan ulagichning nomi; kelajakda bu nom ulagich bilan ishlash uchun ishlatiladi (ya'ni, holatni ko'rish/qayta ishga tushirish/konfiguratsiyani yangilash) Kafka Connect REST API orqali;
  • connector.class — konfiguratsiya qilingan ulagich tomonidan foydalaniladigan DBMS ulagichi sinfi;
  • plugin.name — WAL fayllaridan ma'lumotlarni mantiqiy dekodlash uchun plagin nomi. Tanlash uchun mavjud wal2json, decoderbuffs и pgoutput. Birinchi ikkitasi DBMSda tegishli kengaytmalarni o'rnatishni talab qiladi va pgoutput PostgreSQL 10 va undan yuqori versiyalari uchun qo'shimcha manipulyatsiyalarni talab qilmaydi;
  • database.* — maʼlumotlar bazasiga ulanish imkoniyatlari, bu yerda database.server.name — Kafka klasterida mavzu nomini shakllantirish uchun foydalaniladigan PostgreSQL misol nomi;
  • table.include.list — biz o'zgarishlarni kuzatmoqchi bo'lgan jadvallar ro'yxati; formatda belgilangan schema.table_name; bilan birga foydalanish mumkin emas table.exclude.list;
  • heartbeat.interval.ms — ulagich maxsus mavzuga yurak urishi haqidagi xabarlarni yuboradigan interval (millisekundlarda);
  • heartbeat.action.query — har bir yurak urishi haqidagi xabarni yuborishda bajariladigan so'rov (variant 1.1 versiyada paydo bo'lgan);
  • slot.name — ulagich tomonidan ishlatiladigan replikatsiya uyasi nomi;
  • publication.name - Ism adabiyotlar ulagich foydalanadigan PostgreSQL da. Agar u mavjud bo'lmasa, Debezium uni yaratishga harakat qiladi. Agar ulanish amalga oshirilgan foydalanuvchi ushbu harakat uchun etarli huquqlarga ega bo'lmasa, ulagich xato bilan tugaydi;
  • transforms maqsadli mavzu nomini qanday o'zgartirishni aniq belgilaydi:
    • transforms.AddPrefix.type muntazam iboralardan foydalanishimizni bildiradi;
    • transforms.AddPrefix.regex — maqsadli mavzu nomini qayta belgilovchi niqob;
    • transforms.AddPrefix.replacement - to'g'ridan-to'g'ri biz nimani qayta belgilaymiz.

Yurak urishi va o'zgarishlar haqida ko'proq

Odatiy bo'lib, ulagich har bir amalga oshirilgan tranzaksiya uchun ma'lumotlarni Kafkaga yuboradi va uning LSN (log tartib raqami) xizmat mavzusida qayd etiladi. offset. Ammo ulagich butun ma'lumotlar bazasini emas, balki uning jadvallarining faqat bir qismini (ma'lumotlar yangilanishi tez-tez sodir bo'lmaydigan) o'qish uchun tuzilgan bo'lsa nima bo'ladi?

  • Ulagich WAL fayllarini o'qiydi va o'zi kuzatayotgan jadvallarga hech qanday tranzaksiya majburiyatlarini aniqlamaydi.
  • Shuning uchun, u mavzudagi ham, replikatsiya uyasidagi ham joriy holatini yangilamaydi.
  • Bu, o'z navbatida, WAL fayllari diskda saqlanishiga va diskda bo'sh joy tugashiga olib keladi.

Va bu erda variantlar yordamga keladi. heartbeat.interval.ms и heartbeat.action.query. Ushbu parametrlarni juftlikda ishlatish yurak urishi xabari yuborilganda alohida jadvaldagi ma'lumotlarni o'zgartirish so'rovini bajarish imkonini beradi. Shunday qilib, hozirda ulagich joylashgan LSN (replikatsiya uyasida) doimiy ravishda yangilanadi. Bu DBMSga endi kerak bo'lmagan WAL fayllarini olib tashlash imkonini beradi. Variantlar qanday ishlashi haqida ko'proq bilib olishingiz mumkin hujjatlar.

E'tiborga loyiq yana bir variant transforms. Garchi u ko'proq qulaylik va go'zallik haqida bo'lsa-da ...

Odatiy bo'lib, Debezium quyidagi nomlash siyosatidan foydalangan holda mavzularni yaratadi: serverName.schemaName.tableName. Bu har doim ham qulay bo'lmasligi mumkin. Variantlar transforms Muntazam iboralardan ma'lum bir nomga ega mavzuga yo'naltirilishi kerak bo'lgan jadvallar, voqealar ro'yxatini aniqlash uchun foydalanishingiz mumkin.

Bizning konfiguratsiyamizda rahmat transforms quyidagicha sodir bo'ladi: kuzatilgan ma'lumotlar bazasidagi barcha CDC hodisalari nomli mavzuga o'tadi data.cdc.dbname. Aks holda (bu sozlamalarsiz), Debezium sukut bo'yicha har bir jadval uchun mavzu yaratadi: pg-dev.public.<table_name>.

Ulagich cheklovlari

PostgreSQL uchun ulagich konfiguratsiyasi tavsifini yakunlash uchun uning ishlashining quyidagi xususiyatlari/cheklovlari haqida gapirish kerak:

  1. PostgreSQL uchun ulagichning funksionalligi mantiqiy dekodlash kontseptsiyasiga tayanadi. Shuning uchun u ma'lumotlar bazasi tuzilishini o'zgartirish so'rovlarini kuzatmaydi (DDL) - shunga ko'ra, bu ma'lumotlar mavzularda bo'lmaydi.
  2. Replikatsiya uyalari ishlatilganligi sababli ulagichni ulash mumkin faqatgina yetakchi DBMS misoliga.
  3. Agar ulagich ma'lumotlar bazasiga ulangan foydalanuvchi faqat o'qish huquqiga ega bo'lsa, birinchi ishga tushirishdan oldin siz qo'lda replikatsiya slotini yaratishingiz va ma'lumotlar bazasiga nashr qilishingiz kerak bo'ladi.

Konfiguratsiyani qo'llash

Shunday qilib, konfiguratsiyani ulagichga yuklaymiz:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Yuklash muvaffaqiyatli bo'lganini va ulagich ishga tushirilganligini tekshiramiz:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Ajoyib: u sozlangan va ishlashga tayyor. Keling, o'zimizni iste'molchi sifatida ko'rsatamiz va Kafka bilan bog'lanamiz, shundan so'ng biz jadvalga yozuv qo'shamiz va o'zgartiramiz:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Bizning mavzuimizda u quyidagicha ko'rsatiladi:

O'zgarishlarimiz bilan juda uzoq JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Ikkala holatda ham yozuvlar o'zgartirilgan yozuvning kalitidan (PK) va o'zgarishlarning mohiyatidan iborat: yozuv avval nima bo'lgan va keyin nima bo'lgan.

  • Bunday holda INSERT: oldingi qiymat (before) teng null, va keyin - kiritilgan qator.
  • Bunday holda UPDATE: ichida payload.before qatorning oldingi holati ko'rsatiladi va ichida payload.after - o'zgarishlarning mohiyati bilan yangi.

2.2 MongoDB

Ushbu ulagich asosiy DBMS tugunining oplogidan ma'lumotlarni o'qib, standart MongoDB replikatsiya mexanizmidan foydalanadi.

PgSQL uchun allaqachon tasvirlangan ulagichga o'xshab, bu erda ham birinchi ishga tushirishda birlamchi ma'lumotlar surati olinadi, shundan so'ng ulagich oplog o'qish rejimiga o'tadi.

Konfiguratsiyaga misol:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Ko'rib turganingizdek, bu erda oldingi misol bilan solishtirganda yangi variantlar yo'q, faqat ma'lumotlar bazasiga ulanish uchun mas'ul bo'lgan variantlar soni va ularning prefikslari qisqartirildi.

Sozlamalar transforms bu safar ular quyidagilarni bajaradilar: ular maqsadli mavzu nomini sxemadan o'zgartiradilar <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

xatolarga chidamlilik

Bizning zamonamizda nosozliklarga chidamlilik va yuqori mavjudlik masalasi har qachongidan ham keskinroq - ayniqsa, biz ma'lumotlar va tranzaktsiyalar haqida gapirganda va ma'lumotlar o'zgarishini kuzatish bu masalada chetda qolmaydi. Keling, printsipial jihatdan nima noto'g'ri bo'lishi mumkinligini va har bir holatda Debezium bilan nima bo'lishini ko'rib chiqaylik.

O'chirishning uchta varianti mavjud:

  1. Kafka Connect ishlamay qoldi. Agar Connect taqsimlangan rejimda ishlash uchun sozlangan bo'lsa, bu bir xil group.id ni o'rnatish uchun bir nechta ishchilarni talab qiladi. Keyin, agar ulardan biri muvaffaqiyatsiz bo'lsa, ulagich boshqa ishchida qayta ishga tushiriladi va Kafkadagi mavzudagi oxirgi belgilangan pozitsiyadan o'qishni davom ettiradi.
  2. Kafka klasteri bilan aloqani yo'qotish. Ulagich shunchaki Kafkaga yubora olmagan pozitsiyada o'qishni to'xtatadi va urinish muvaffaqiyatli bo'lmaguncha vaqti-vaqti bilan uni qayta yuborishga harakat qiladi.
  3. Ma'lumotlar manbasining mavjud emasligi. Ulagich konfiguratsiya qilinganidek manbaga qayta ulanishga harakat qiladi. Odatiy bo'lib - 16 marta foydalanishga urinish eksponentsial orqaga qaytish. 16- muvaffaqiyatsiz urinishdan so'ng, vazifa sifatida belgilanadi muvaffaqiyatsiz va uni Kafka Connect REST interfeysi orqali qo'lda qayta ishga tushirishingiz kerak bo'ladi.
    • Bunday holda PostgreSQL ma'lumotlar yo'qolmaydi, chunki Replikatsiya uyalaridan foydalanish ulagich tomonidan o'qilmaydigan WAL fayllarini o'chirishdan saqlaydi. Bunday holda, tanganing salbiy tomoni ham bor: agar ulagich va DBMS o'rtasidagi tarmoq ulanishi uzoq vaqt davomida buzilgan bo'lsa, diskdagi bo'sh joy tugashi ehtimoli bor va bu ishlamay qolishiga olib kelishi mumkin. butun DBMS.
    • Bunday holda MySQL binlog fayllari ulanishni tiklashdan oldin DBMS tomonidan aylantirilishi mumkin. Bu ulagichning muvaffaqiyatsiz holatiga o'tishiga olib keladi va normal ishlashni tiklash uchun binloglardan o'qishni davom ettirish uchun dastlabki surat rejimida qayta ishga tushirishingiz kerak bo'ladi.
    • haqida MongoDB. Hujjatlarda aytilishicha: log/oplog fayllari o'chirilgan va ulagich to'xtagan joydan o'qishni davom ettira olmasa, ulagichning xatti-harakati barcha DBMSlar uchun bir xil. Bu ulagichning holatga o'tishini anglatadi muvaffaqiyatsiz va rejimda qayta ishga tushirishni talab qiladi dastlabki surat.

      Biroq, istisnolar mavjud. Agar ulagich uzoq vaqt davomida uzilib qolgan bo'lsa (yoki MongoDB misoliga etib bo'lmasa) va oplog shu vaqt ichida aylanishdan o'tgan bo'lsa, u holda ulanish tiklanganda, ulagich ma'lumotlarni birinchi mavjud pozitsiyadan xotirjam o'qishni davom ettiradi, shuning uchun Kafkadagi ba'zi ma'lumotlar yo'q uradi.

xulosa

Debezium CDC tizimlari bilan birinchi tajribam va umuman olganda juda ijobiy. Loyiha asosiy DBMSlarni qo'llab-quvvatlashi, konfiguratsiya qulayligi, klasterlarni qo'llab-quvvatlashi va faol hamjamiyat bilan g'alaba qozondi. Amaliyotga qiziquvchilar uchun men sizga qo'llanmalarni o'qishni maslahat beraman Kafka Connect и Debezium.

Kafka Connect uchun JDBC ulagichi bilan solishtirganda, Debeziumning asosiy afzalligi shundaki, o'zgarishlar DBMS jurnallaridan o'qiladi, bu esa ma'lumotlarni minimal kechikish bilan qabul qilish imkonini beradi. JDBC ulagichi (Kafka Connect-dan) kuzatilgan jadvalni belgilangan vaqt oralig'ida so'raydi va (xuddi shu sababga ko'ra) ma'lumotlar o'chirilganda xabarlarni yaratmaydi (mavjud bo'lgan ma'lumotlarni qanday so'rash mumkin?).

Shu kabi muammolarni hal qilish uchun siz quyidagi echimlarga e'tibor berishingiz mumkin (Debeziumga qo'shimcha ravishda):

PS

Shuningdek, bizning blogimizda o'qing:

Manba: www.habr.com

a Izoh qo'shish