Apache Kafka-д зориулсан Debezium - CDC-г танилцуулж байна

Apache Kafka-д зориулсан Debezium - CDC-г танилцуулж байна

Ажил дээрээ би шинэ техникийн шийдлүүд / програм хангамжийн бүтээгдэхүүнүүдтэй байнга тааралддаг бөгөөд энэ тухай мэдээлэл орос хэлээр ярьдаг интернетэд маш ховор байдаг. Энэ нийтлэлээр би Debezium ашиглан хоёр алдартай DBMS (PostgreSQL болон MongoDB)-аас Кафка кластер руу CDC үйл явдлуудыг илгээх шаардлагатай болсон үед сүүлийн үеийн туршлагаасаа жишээ авч нэг ийм цоорхойг нөхөхийг хичээх болно. Хийсэн ажлын үр дүнд гарсан энэхүү тойм нийтлэл бусдад хэрэг болно гэж найдаж байна.

Дебезиум ба CDC гэж юу вэ?

Дебезиум - CDC програм хангамжийн категорийн төлөөлөгч (Өгөгдлийн өөрчлөлтийг авах), эсвэл илүү нарийвчлалтайгаар, энэ нь Apache Kafka Connect хүрээтэй нийцэх янз бүрийн DBMS-д зориулсан холбогчдын багц юм.

энэ нээлттэй эхийн төсөл, Apache License v2.0 лицензтэй бөгөөд Red Hat ивээн тэтгэсэн. 2016 оноос хойш хөгжүүлэлт хийгдэж байгаа бөгөөд одоогоор дараах DBMS-д албан ёсны дэмжлэг үзүүлж байна: MySQL, PostgreSQL, MongoDB, SQL Server. Мөн Кассандра болон Оракл-д зориулсан холбогч байдаг боловч тэдгээр нь одоогоор "эрт нэвтрэх" статустай байгаа бөгөөд шинэ хувилбарууд нь хоцрогдсон нийцтэй байдлыг баталгаажуулахгүй.

Хэрэв бид CDC-ийг уламжлалт арга барилтай харьцуулж үзвэл (програм нь DBMS-ээс өгөгдлийг шууд унших үед) түүний гол давуу тал нь бага хоцролттой, өндөр найдвартай байдал, хүртээмжтэй мөрийн түвшинд өгөгдлийн өөрчлөлтийн урсгалыг хэрэгжүүлэх явдал юм. Сүүлийн хоёр оноог Кафка кластерыг CDC-ийн үйл явдлын агуулах болгон ашиглах замаар олж авдаг.

Түүнчлэн, давуу талууд нь үйл явдлыг хадгалахад нэг загварыг ашигладаг тул эцсийн програм нь өөр өөр DBMS-ийн үйл ажиллагааны онцлог шинж чанаруудын талаар санаа зовох шаардлагагүй болно.

Эцэст нь, мессеж брокер ашиглах нь өгөгдлийн өөрчлөлтийг хянадаг програмуудыг хэвтээ масштабаар нэмэгдүүлэх боломжийг нээж өгдөг. Үүний зэрэгцээ өгөгдлийг DBMS-ээс шууд хүлээн авдаггүй, харин Кафка кластераас авдаг тул мэдээллийн эх үүсвэрт үзүүлэх нөлөөллийг багасгадаг.

Debezium архитектурын тухай

Debezium-ийг ашиглах нь дараахь энгийн схемд хүргэдэг.

DBMS (өгөгдлийн эх сурвалж болгон) → Kafka Connect дахь холбогч → Apache Kafka → хэрэглэгч

Жишээ болгон би төслийн вэбсайтаас диаграммыг өгөх болно.

Apache Kafka-д зориулсан Debezium - CDC-г танилцуулж байна

Гэсэн хэдий ч би энэ схемд үнэхээр дургүй, учир нь зөвхөн угаалтуур холбогч боломжтой юм шиг санагддаг.

Бодит байдал дээр нөхцөл байдал өөр байна: Дата нуураа дүүргэх (дээрх диаграмын сүүлчийн холбоос) нь Debezium ашиглах цорын ганц арга биш юм. Apache Кафка руу илгээсэн үйл явдлуудыг таны программууд янз бүрийн нөхцөл байдлыг шийдвэрлэхэд ашиглаж болно. Жишээлбэл:

  • кэшээс хамааралгүй өгөгдлийг устгах;
  • мэдэгдэл илгээх;
  • хайлтын индексийн шинэчлэлтүүд;
  • зарим төрлийн аудитын бүртгэл;
  • ...

Хэрэв танд Java програм байгаа бөгөөд Кафка кластер ашиглах шаардлагагүй/боломжгүй бол энэ програмаар дамжуулан ажиллах боломжтой. суулгагдсан холбогч. Үүний тод давуу тал нь та нэмэлт дэд бүтцээс (холбогч ба Кафка хэлбэрээр) татгалзаж болно. Гэсэн хэдий ч, энэ шийдэл нь 1.1 хувилбараас хойш хуучирсан бөгөөд цаашид ашиглахыг зөвлөдөггүй (ирээдүйн хувилбаруудад үүнийг устгаж магадгүй).

Энэ нийтлэлд алдааг тэсвэрлэх чадвар, өргөтгөх чадварыг хангадаг хөгжүүлэгчид санал болгосон архитектурыг авч үзэх болно.

Холбогчийн тохиргоо

Хамгийн чухал утга болох өгөгдлийн өөрчлөлтийг хянаж эхлэхийн тулд бидэнд дараахь зүйлс хэрэгтэй болно.

  1. өгөгдлийн эх үүсвэр нь 5.7 хувилбараас эхлэн MySQL, PostgreSQL 9.6+, MongoDB 3.2+ (бүрэн жагсаалт);
  2. Апачи Кафка кластер
  3. Kafka Connect жишээ (1.x, 2.x хувилбарууд);
  4. тохируулсан Debezium холбогч.

Эхний хоёр цэг дээр ажиллах, өөрөөр хэлбэл. DBMS болон Apache Kafka-г суулгах үйл явц нь нийтлэлийн хамрах хүрээнээс гадуур юм. Гэсэн хэдий ч, бүх зүйлийг хамгаалагдсан хязгаарлагдмал орчинд байрлуулахыг хүсч буй хүмүүсийн хувьд жишээнүүдийн хамт албан ёсны репозитор дээр бэлэн хувилбар байдаг. docker-compose.yaml.

Бид сүүлийн хоёр цэг дээр илүү дэлгэрэнгүй анхаарал хандуулах болно.

0. Кафка холболт

Энд болон нийтлэлд бүх тохиргооны жишээг Debezium хөгжүүлэгчид тараасан Docker зургийн хүрээнд авч үзэх болно. Энэ нь шаардлагатай бүх залгаас файлуудыг (холбогч) агуулдаг бөгөөд орчны хувьсагчдыг ашиглан Kafka Connect тохиргоог өгдөг.

Хэрэв та Confluent-ээс Kafka Connect-ийг ашиглахаар төлөвлөж байгаа бол шаардлагатай холбогчуудын залгаасуудыг доор заасан лавлах хэсэгт нэмэх шаардлагатай болно. plugin.path эсвэл орчны хувьсагчаар тохируулна CLASSPATH. Kafka Connect-ийн ажилтан болон холбогчдод зориулсан тохиргоонууд нь ажилчны эхлүүлэх команд руу аргумент болгон дамжуулагдсан тохиргооны файлуудаар тодорхойлогддог. Дэлгэрэнгүйг үзнэ үү баримт бичиг.

Debeizum-ийг холбогч хувилбарт тохируулах бүх үйл явц хоёр үе шаттайгаар явагдана. Тэд тус бүрийг авч үзье:

1. Kafka Connect хүрээг тохируулах

Өгөгдлийг Apache Кафка кластер руу дамжуулахын тулд Кафка Холбогч системд дараах параметрүүдийг тохируулсан болно.

  • кластер холболтын тохиргоо,
  • холбогчийн тохиргоог өөрөө хадгалах сэдвүүдийн нэрс,
  • холбогч ажиллаж байгаа бүлгийн нэр (тархсан горимыг ашиглах тохиолдолд).

Төслийн албан ёсны Docker дүрс нь орчны хувьсагчдыг ашиглан тохиргоог дэмждэг - үүнийг бид ашиглах болно. Тиймээс зургийг татаж авцгаая:

docker pull debezium/connect

Холбогчийг ажиллуулахад шаардагдах орчны хувьсагчийн хамгийн бага багц нь дараах байдалтай байна.

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - кластерийн гишүүдийн бүрэн жагсаалтыг авахын тулд Кафка кластер серверүүдийн анхны жагсаалт;
  • OFFSET_STORAGE_TOPIC=connector-offsets - холбогч одоо байгаа байрлалыг хадгалах сэдэв;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - холбогч болон түүний даалгаврын статусыг хадгалах сэдэв;
  • CONFIG_STORAGE_TOPIC=connector-config - холбогчийн тохиргооны өгөгдөл, түүний даалгавруудыг хадгалах сэдэв;
  • GROUP_ID=1 - холбогч даалгаврыг гүйцэтгэж болох ажилчдын бүлгийн тодорхойлогч; хуваарилагдсан ашиглах үед шаардлагатай (тараагдсан) дэглэм.

Бид савыг дараах хувьсагчаар эхлүүлнэ:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Аврогийн тухай тэмдэглэл

Анхдагч байдлаар, Debezium нь JSON форматаар өгөгдлийг бичдэг бөгөөд энэ нь хамгаалагдсан хязгаарлагдмал орчинд болон бага хэмжээний өгөгдөлд зөвшөөрөгдөх боломжтой боловч ачаалал ихтэй мэдээллийн санд асуудал үүсгэдэг. JSON хөрвүүлэгчийн өөр нэг хувилбар бол мессежийг ашиглан цуваа болгох явдал юм Авро Ар- Apache Кафка дахь I / O дэд системийн ачааллыг бууруулдаг хоёртын формат руу шилжүүлнэ.

Avro-г ашиглахын тулд та тусдаа програмыг байрлуулах хэрэгтэй схем бүртгэл (схем хадгалахад зориулагдсан). Хөрвүүлэгчийн хувьсагчид дараах байдлаар харагдах болно.

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro-г ашиглах, түүнд зориулж бүртгэл үүсгэх талаархи дэлгэрэнгүй мэдээлэл нь нийтлэлийн хамрах хүрээнээс гадуур - цаашид тодорхой болгохын тулд бид JSON ашиглах болно.

2. Холбогчийг өөрөө тохируулах

Одоо та эх сурвалжаас өгөгдлийг унших холбогчийн тохиргоо руу шууд очиж болно.

Миний туршлагатай PostgreSQL ба MongoDB гэсэн хоёр DBMS-ийн холбогчуудын жишээг харцгаая (бага боловч зарим тохиолдолд мэдэгдэхүйц!).

Тохиргоог JSON тэмдэглэгээнд тайлбарласан бөгөөд POST хүсэлтийг ашиглан Kafka Connect-д байршуулсан.

2.1. PostgreSQL

PostgreSQL-д зориулсан холбогч тохиргооны жишээ:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Энэ тохиргооны дараа холбогчийг ажиллуулах зарчим нь маш энгийн:

  • Эхний эхлэхэд тохиргоонд заасан мэдээллийн сантай холбогдож, горимд эхэлнэ анхны агшин зуурын зураг, Кафка руу нөхцөлтэй хүлээн авсан өгөгдлийн эхний багцыг илгээх SELECT * FROM table_name.
  • Эхлэх ажил дууссаны дараа холбогч нь PostgreSQL WAL файлуудаас өөрчлөлтүүдийг унших горимд ордог.

Ашигласан сонголтуудын талаар:

  • name — доор тайлбарласан тохиргоог ашигласан холбогчийн нэр; ирээдүйд энэ нэрийг Kafka Connect REST API-ээр дамжуулан холбогчтой ажиллахад ашигладаг (жишээ нь төлөвийг харах / дахин эхлүүлэх / тохиргоог шинэчлэх);
  • connector.class — тохируулсан холбогч ашиглах DBMS холбогчийн ангилал;
  • plugin.name нь WAL файлуудын өгөгдлийг логик тайлах залгаасын нэр юм. Сонгох боломжтой wal2json, decoderbuffs и pgoutput. Эхний хоёр нь DBMS-д тохирох өргөтгөлүүдийг суулгахыг шаарддаг бөгөөд pgoutput PostgreSQL 10 ба түүнээс дээш хувилбарын хувьд нэмэлт засвар хийх шаардлагагүй;
  • database.* — мэдээллийн санд холбогдох сонголтууд, хаана database.server.name - Кафка кластерт сэдвийн нэрийг бүрдүүлэхэд ашигласан PostgreSQL жишээний нэр;
  • table.include.list - өөрчлөлтийг хянахыг хүссэн хүснэгтүүдийн жагсаалт; хэлбэрээр өгсөн schema.table_name; -тай хамт хэрэглэх боломжгүй table.exclude.list;
  • heartbeat.interval.ms - холбогч нь зүрхний цохилтын мессежийг тусгай сэдэв рүү илгээдэг интервал (миллисекунд);
  • heartbeat.action.query - зүрхний цохилтын мессеж бүрийг илгээх үед гүйцэтгэх хүсэлт (1.1 хувилбараас хойш энэ сонголт гарч ирсэн);
  • slot.name - холбогч ашиглах хуулбарлах үүрний нэр;
  • publication.name -Нэр нийтлэлүүд холбогч ашигладаг PostgreSQL дээр. Хэрэв байхгүй бол Debezium үүнийг үүсгэхийг оролдох болно. Хэрэв холболт хийгдсэн хэрэглэгч энэ үйлдэлд хангалттай эрхгүй бол холбогч алдаа гарна;
  • transforms зорилтот сэдвийн нэрийг яг хэрхэн өөрчлөхийг тодорхойлдог:
    • transforms.AddPrefix.type бид тогтмол хэллэг ашиглах болно гэдгийг харуулж байна;
    • transforms.AddPrefix.regex - зорилтот сэдвийн нэрийг дахин тодорхойлох маск;
    • transforms.AddPrefix.replacement - бид юуг шинэчлэн тодорхойлдог.

Зүрхний цохилт, өөрчлөлтийн талаар илүү ихийг мэдэж аваарай

Анхдагч байдлаар, холбогч нь хийсэн гүйлгээ бүрийн мэдээллийг Кафка руу илгээж, үйлчилгээний сэдэв рүү LSN (логийн дарааллын дугаар) бичдэг. offset. Гэхдээ холбогч нь мэдээллийн баазыг бүхэлд нь биш, харин түүний хүснэгтүүдийн зөвхөн нэг хэсгийг (өгөгдлүүд байнга шинэчлэгддэггүй) уншихаар тохируулагдсан бол яах вэ?

  • Холбогч нь WAL файлуудыг уншиж, хянаж буй хүснэгтүүддээ гүйлгээний үйлдлийг илрүүлэхгүй.
  • Тиймээс энэ нь сэдэв болон хуулбарлах үүр дэх одоогийн байрлалаа шинэчлэхгүй.
  • Энэ нь эргээд WAL файлуудыг дискэн дээр "гацах" бөгөөд дискний зай дуусах магадлалтай.

Энд сонголтууд аврах ажилд ирдэг. heartbeat.interval.ms и heartbeat.action.query. Эдгээр сонголтыг хосоор нь ашигласнаар зүрхний цохилтын мессежийг илгээх бүрт тусдаа хүснэгтэд өгөгдлийг өөрчлөх хүсэлтийг гүйцэтгэх боломжтой болно. Тиймээс одоо холбогч байрладаг LSN (хуулбарлах үүрэнд) байнга шинэчлэгддэг. Энэ нь DBMS-д шаардлагагүй болсон WAL файлуудыг устгах боломжийг олгодог. Сонголтууд хэрхэн ажилладаг талаар дэлгэрэнгүй мэдээллийг үзнэ үү баримт бичиг.

Илүү анхаарал хандуулах өөр нэг сонголт бол transforms. Хэдийгээр энэ нь тав тухтай байдал, гоо үзэсгэлэнгийн тухай юм ...

Анхдагч байдлаар, Debezium дараах нэршлийн бодлогыг ашиглан сэдвүүдийг үүсгэдэг: serverName.schemaName.tableName. Энэ нь үргэлж тохиромжтой биш байж магадгүй юм. Сонголтууд transforms Тогтмол хэллэгийг ашиглан үйл явдлыг тодорхой нэртэй сэдэв рүү чиглүүлэх шаардлагатай хүснэгтүүдийн жагсаалтыг тодорхойлж болно.

Манай тохиргоонд баярлалаа transforms дараах зүйл тохиолдоно: хянасан мэдээллийн сангаас CDC-ийн бүх үйл явдлууд нэр бүхий сэдэв рүү очно data.cdc.dbname. Үгүй бол (эдгээр тохиргоогүйгээр) Debezium анхдагчаар маягтын хүснэгт бүрт сэдэв үүсгэх болно: pg-dev.public.<table_name>.

Холбогчийн хязгаарлалт

PostgreSQL-д зориулсан холбогч тохиргооны тайлбарын төгсгөлд түүний ажлын дараах боломжууд / хязгаарлалтуудын талаар ярих нь зүйтэй.

  1. PostgreSQL-ийн холбогч функц нь логик тайлах үзэл баримтлалд тулгуурладаг. Тиймээс тэр өгөгдлийн сангийн бүтцийг өөрчлөх хүсэлтийг хянахгүй (DDL) - үүний дагуу энэ өгөгдөл нь сэдвүүдэд байхгүй болно.
  2. Хуулбарлах слотуудыг ашигладаг тул холбогчийг холбох боломжтой зөвхөн мастер DBMS жишээ рүү.
  3. Хэрэв холбогч нь мэдээллийн санд холбогдсон хэрэглэгч зөвхөн унших эрхтэй бол эхний эхлүүлэхийн өмнө хуулбарлах үүрийг гараар үүсгэж, мэдээллийн санд нийтлэх шаардлагатай болно.

Тохиргоог ашиглаж байна

Тиймээс бид тохиргоогоо холбогч руу ачаалъя:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Татаж авалт амжилттай болж, холбогч ажиллаж эхэлсэн эсэхийг шалгана.

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Гайхалтай: үүнийг тохируулсан, ашиглахад бэлэн байна. Одоо бид хэрэглэгч гэж дүр эсгэж, Кафкатай холбогдож, дараа нь хүснэгтэд оруулгыг нэмж, өөрчилье.

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Манай сэдвээр үүнийг дараах байдлаар харуулах болно.

Бидний өөрчлөлттэй маш урт JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Аль ч тохиолдолд бүртгэл нь өөрчлөгдсөн бичлэгийн түлхүүр (PK) ба өөрчлөлтийн мөн чанараас бүрддэг: бичлэг өмнө нь юу байсан, дараа нь юу болсон.

  • Тохиолдолд INSERT: өмнөх утга (before) тэнцүү байна nullдараа нь оруулсан тэмдэгт мөр байна.
  • Тохиолдолд UPDATE: дээр payload.before мөрийн өмнөх төлөв харагдах ба дотор payload.after - өөрчлөлтийн мөн чанар бүхий шинэ.

2.2 MongoDB

Энэ холбогч нь стандарт MongoDB хуулбарлах механизмыг ашигладаг бөгөөд DBMS үндсэн зангилааны оплогоос мэдээллийг уншдаг.

PgSQL-ийн аль хэдийн тайлбарласан холбогчтой адил энд мөн эхний эхлэлд анхдагч өгөгдлийн агшин зуурын зургийг авдаг бөгөөд үүний дараа холбогч нь oplog унших горимд шилждэг.

Тохиргооны жишээ:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Таны харж байгаагаар өмнөх жишээтэй харьцуулахад шинэ сонголт байхгүй, зөвхөн мэдээллийн сантай холбогдох үүрэгтэй сонголтуудын тоо болон тэдгээрийн угтваруудыг багасгасан.

Тохиргоо transforms энэ удаад тэд дараахь зүйлийг хийдэг: зорилтот сэдвийн нэрийг схемээс эргүүлнэ <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

алдааг тэсвэрлэх чадвар

Бидний цаг үед алдааг тэсвэрлэх чадвар, өндөр хүртээмжтэй байдлын асуудал урьд өмнөхөөсөө илүү хурцаар тавигдаж байна - ялангуяа бид өгөгдөл, гүйлгээний талаар ярихад энэ асуудалд өгөгдлийн өөрчлөлтийг хянах нь хажуу тийшээ байдаггүй. Зарчмын хувьд юу нь буруу байж болох, тухайн тохиолдол бүрт Дебезиумд юу тохиолдохыг харцгаая.

Татгалзах гурван сонголт байна:

  1. Кафка холболтын алдаа. Хэрэв Connect нь тархсан горимд ажиллахаар тохируулагдсан бол энэ нь олон ажилчдад ижил group.id тохируулах шаардлагатай. Дараа нь тэдгээрийн аль нэг нь бүтэлгүйтвэл холбогчийг нөгөө ажилчин дээр дахин эхлүүлж, Кафка дахь сэдвийн хамгийн сүүлд хийсэн байрлалаас үргэлжлүүлэн унших болно.
  2. Кафка кластертай холбоо тасарсан. Холбогч нь Кафка руу илгээж чадаагүй байрлалдаа уншихаа зогсоож, оролдлого амжилттай болох хүртэл үе үе дахин илгээхийг оролдох болно.
  3. Өгөгдлийн эх үүсвэр боломжгүй. Холбогч нь тохиргооны дагуу эх үүсвэрт дахин холбогдохыг оролдох болно. Анхдагч нь 16 оролдлого юм экспоненциал ухралт. 16 дахь оролдлого амжилтгүй болсны дараа даалгаврыг дараах байдлаар тэмдэглэнэ амжилтгүй болсон мөн үүнийг Kafka Connect REST интерфейсээр дамжуулан гараар дахин эхлүүлэх шаардлагатай болно.
    • Тохиолдолд PostgreSQL өгөгдөл алдагдахгүй, учир нь хуулбарлах үүр ашиглах нь холбогч уншаагүй WAL файлуудыг устгахаас сэргийлнэ. Энэ тохиолдолд сул тал бий: хэрэв холбогч ба DBMS-ийн хоорондох сүлжээний холболт удаан хугацаагаар тасалдвал дискний зай дуусч, энэ нь бүхэл бүтэн DBMS-ийн эвдрэлд хүргэж болзошгүй юм.
    • Тохиолдолд MySQL Холболтыг сэргээхээс өмнө binlog файлуудыг DBMS өөрөө эргүүлж болно. Энэ нь холбогчийг бүтэлгүйтсэн төлөвт оруулах бөгөөд хэвийн ажиллагааг сэргээхийн тулд бинлогоос үргэлжлүүлэн уншихын тулд анхны агшин зуурын горимд дахин эхлүүлэх шаардлагатай болно.
    • дээр МонгоБ. Баримт бичигт: Бүртгэл/oplog файлууд устгагдсан, холбогч нь орхисон газраасаа үргэлжлүүлэн унших боломжгүй тохиолдолд холбогчийн үйлдэл бүх DBMS-д ижил байна. Энэ нь холбогч нь муж руу ороход оршино амжилтгүй болсон горимд дахин эхлүүлэх шаардлагатай болно анхны агшин зуурын зураг.

      Гэсэн хэдий ч үл хамаарах зүйлүүд байдаг. Хэрэв холбогч нь удаан хугацааны турш салгагдсан төлөвт байсан (эсвэл MongoDB-ийн инстанц руу хүрч чадаагүй) бөгөөд энэ хугацаанд oplog эргэлдсэн бол холболт сэргээгдэх үед холбогч анхны боломжтой байрлалаас өгөгдлийг тайван уншина. , ийм учраас Кафка дахь зарим өгөгдөл үгүй цохих болно.

дүгнэлт

Дебезиум бол миний CDC системтэй анхны туршлага бөгөөд ерөнхийдөө маш эерэг байсан. Төсөл нь үндсэн DBMS-ийн дэмжлэг, тохиргооны хялбар байдал, кластерын дэмжлэг, идэвхтэй нийгэмлэгийн дэмжлэгийг авав. Практикт сонирхолтой хүмүүст зориулсан гарын авлагыг уншихыг зөвлөж байна Кафка холболт и Дебезиум.

Kafka Connect-ийн JDBC холбогчтой харьцуулахад Debezium-ийн гол давуу тал нь DBMS бүртгэлээс өөрчлөлтүүдийг уншдаг бөгөөд энэ нь өгөгдлийг хамгийн бага сааталтайгаар хүлээн авах боломжийг олгодог. JDBC холбогч (Кафка Холбоосоос хангагдсан) хянагдсан хүснэгтийг тогтмол хугацаанд асуудаг ба (ижил шалтгаанаар) өгөгдлийг устгах үед мессеж үүсгэдэггүй (байгаагүй өгөгдлийг хэрхэн хайх вэ?).

Үүнтэй төстэй асуудлыг шийдэхийн тулд та дараахь шийдлүүдэд анхаарлаа хандуулж болно (Debezium-аас гадна):

PS

Мөн манай блог дээрээс уншина уу:

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх