Ngenalke Debezium - CDC kanggo Apache Kafka

Ngenalke Debezium - CDC kanggo Apache Kafka

Ing karya, aku kerep nemokake solusi teknis / produk piranti lunak anyar, informasi babagan sing rada langka ing Internet sing nganggo basa Rusia. Kanthi artikel iki, aku bakal nyoba ngisi celah kasebut kanthi conto saka praktik anyar, nalika aku kudu nyiyapake ngirim acara CDC saka rong DBMS populer (PostgreSQL lan MongoDB) menyang kluster Kafka nggunakake Debezium. Muga-muga artikel review iki, sing muncul minangka asil karya sing ditindakake, bakal migunani kanggo wong liya.

Apa Debezium lan CDC ing umum?

Debesium - Perwakilan saka kategori piranti lunak CDC (Njupuk owah-owahan data), utawa luwih tepat, iki minangka set konektor kanggo macem-macem DBMS sing kompatibel karo kerangka Apache Kafka Connect.

iki proyek open source, dilisensi ing Apache License v2.0 lan disponsori dening Red Hat. Pangembangan wis ditindakake wiwit 2016 lan saiki menehi dhukungan resmi kanggo DBMS ing ngisor iki: MySQL, PostgreSQL, MongoDB, SQL Server. Ana uga konektor kanggo Cassandra lan Oracle, nanging saiki ana ing status "akses awal", lan rilis anyar ora njamin kompatibilitas mundur.

Yen kita mbandhingake CDC karo pendekatan tradisional (nalika aplikasi kasebut langsung maca data saka DBMS), mula kaluwihan utamane kalebu implementasi streaming pangowahan data ing tingkat baris kanthi latensi sing kurang, linuwih lan kasedhiyan. Rong poin pungkasan digayuh kanthi nggunakake kluster Kafka minangka gudang kanggo acara CDC.

Kajaba iku, kaluwihan kalebu kasunyatan manawa model siji digunakake kanggo nyimpen acara, saengga aplikasi pungkasan ora kudu kuwatir babagan nuansa operasi DBMS sing beda.

Pungkasan, nggunakake makelar pesen mbukak ruang lingkup kanggo skala horisontal aplikasi sing nglacak owah-owahan data. Ing wektu sing padha, impact ing sumber data diminimalisir, amarga data ditampa ora langsung saka DBMS, nanging saka klompok Kafka.

Babagan arsitektur Debezium

Nggunakake Debezium nerangake skema prasaja iki:

DBMS (minangka sumber data) → konektor ing Kafka Connect → Apache Kafka → konsumen

Minangka ilustrasi, aku bakal menehi diagram saka situs web proyek:

Ngenalke Debezium - CDC kanggo Apache Kafka

Nanging, aku ora seneng karo skema iki, amarga mung konektor sink sing bisa ditindakake.

Ing kasunyatan, kahanan beda: ngisi Data Lake (link pungkasan ing diagram ndhuwur) ora mung cara kanggo nggunakake Debezium. Acara sing dikirim menyang Apache Kafka bisa digunakake dening aplikasi sampeyan kanggo ngatasi macem-macem kahanan. Tuladhane:

  • mbusak data sing ora cocog saka cache;
  • ngirim kabar;
  • nganyari indeks panelusuran;
  • sawetara jinis log audit;
  • ...

Yen sampeyan duwe aplikasi Java lan ora perlu/kamungkinan nggunakake kluster Kafka, ana uga kamungkinan kanggo nggarap konektor ditempelake. Kaluwihan sing jelas yaiku sampeyan bisa nolak infrastruktur tambahan (ing wangun konektor lan Kafka). Nanging, solusi iki wis ora digunakake wiwit versi 1.1 lan ora dianjurake kanggo digunakake (bisa uga dicopot ing rilis sabanjure).

Artikel iki bakal ngrembug arsitektur sing disaranake para pangembang, sing menehi toleransi lan skalabilitas kesalahan.

Konfigurasi konektor

Kanggo miwiti nglacak owah-owahan ing nilai sing paling penting - data - kita kudu:

  1. sumber data, sing bisa dadi MySQL wiwit saka versi 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (daftar lengkap);
  2. Kluster Apache Kafka
  3. Kayata Kafka Connect (versi 1.x, 2.x);
  4. konektor Debezium diatur.

Nggarap loro TCTerms pisanan, i.e. proses nginstal DBMS lan Apache Kafka ngluwihi orane katrangan saka artikel. Nanging, kanggo sing pengin masang kabeh ing kothak wedhi, ana sing wis siap ing gudang resmi kanthi conto. docker-compose.yaml.

Kita bakal fokus ing rong titik pungkasan kanthi luwih rinci.

0. Kafka Connect

Ing kene lan mengko ing artikel kasebut, kabeh conto konfigurasi dianggep ing konteks gambar Docker sing disebarake dening pangembang Debezium. Isine kabeh file plugin (konektor) sing dibutuhake lan nyedhiyakake konfigurasi Kafka Connect nggunakake variabel lingkungan.

Yen sampeyan pengin nggunakake Kafka Connect saka Confluent, sampeyan kudu nambahake plug-in konektor sing dibutuhake menyang direktori sing ditemtokake ing plugin.path utawa disetel liwat variabel lingkungan CLASSPATH. Setelan kanggo buruh Kafka Connect lan konektor ditetepake liwat file konfigurasi sing liwati minangka bantahan kanggo printah wiwitan buruh. Kanggo rincian ndeleng dokumentasi.

Proses kabeh nyetel Debeizum ing versi konektor digawa metu ing rong tahap. Ayo dipikirake saben wong:

1. Nyetel kerangka Kafka Connect

Kanggo stream data menyang kluster Apache Kafka, paramèter tartamtu disetel ing kerangka Kafka Connect, kayata:

  • setelan sambungan cluster,
  • jeneng topik ing ngendi konfigurasi konektor dhewe bakal disimpen,
  • jeneng klompok ing ngendi konektor mlaku (ing kasus nggunakake mode mbagekke).

Gambar Docker resmi proyek ndhukung konfigurasi nggunakake variabel lingkungan - iki sing bakal digunakake. Dadi ayo ngundhuh gambar kasebut:

docker pull debezium/connect

Set minimal variabel lingkungan sing dibutuhake kanggo mbukak konektor yaiku:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - dhaptar wiwitan server kluster Kafka kanggo entuk dhaptar lengkap anggota kluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets - topik kanggo nyimpen posisi ing ngendi konektor saiki;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - topik kanggo nyimpen status konektor lan tugas;
  • CONFIG_STORAGE_TOPIC=connector-config - topik kanggo nyimpen data konfigurasi konektor lan tugas;
  • GROUP_ID=1 - pengenal klompok buruh ing ngendi tugas konektor bisa dileksanakake; dibutuhake nalika nggunakake disebarake (didistribusikan) rezim.

Kita miwiti wadhah kanthi variabel kasebut:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Cathetan babagan Avro

Kanthi gawan, Debezium nulis data ing format JSON, sing bisa ditampa kanggo kothak wedhi lan jumlah data sing cilik, nanging bisa dadi masalah ing basis data sing akeh banget. Alternatif kanggo konverter JSON yaiku kanggo nggawe serialisasi pesen nggunakake Avro menyang format binar, sing nyuda beban ing subsistem I / O ing Apache Kafka.

Kanggo nggunakake Avro, sampeyan kudu masang kapisah skema-registri (kanggo nyimpen skema). Variabel kanggo konverter bakal katon kaya iki:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Rincian babagan nggunakake Avro lan nyetel registri kasebut ngluwihi ruang lingkup artikel - luwih, kanggo gamblang, kita bakal nggunakake JSON.

2. Nyetel konektor dhewe

Saiki sampeyan bisa langsung menyang konfigurasi konektor dhewe, sing bakal maca data saka sumber.

Ayo kang katon ing conto konektor kanggo loro DBMS: PostgreSQL lan MongoDB, kang aku duwe pengalaman lan kang ana beda (sanajan cilik, nanging ing sawetara kasus pinunjul!).

Konfigurasi kasebut diterangake ing notasi JSON lan diunggah menyang Kafka Connect nggunakake panyuwunan POST.

2.1. PostgreSQL

Conto konfigurasi konektor kanggo PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prinsip operasi konektor sawise konfigurasi iki cukup prasaja:

  • Ing wiwitan pisanan, nyambung menyang database kasebut ing konfigurasi lan miwiti ing mode snapshot awal, ngirim menyang Kafka set data awal sing ditampa kanthi kondisional SELECT * FROM table_name.
  • Sawise initialization rampung, konektor ngetik mode maca owahan saka file PostgreSQL WAL.

Babagan pilihan sing digunakake:

  • name - jeneng konektor sing konfigurasi diterangake ing ngisor iki digunakake; ing mangsa ngarep, jeneng iki digunakake kanggo nggarap konektor (i.e. ndeleng status / miwiti maneh / nganyari konfigurasi) liwat Kafka Connect REST API;
  • connector.class - kelas konektor DBMS sing bakal digunakake dening konektor sing dikonfigurasi;
  • plugin.name iku jeneng plugin kanggo dekoding logis data saka file WAL. Kasedhiya kanggo milih saka wal2json, decoderbuffs и pgoutput. Loro pisanan mbutuhake instalasi ekstensi sing cocog ing DBMS, lan pgoutput kanggo versi PostgreSQL 10 lan sing luwih dhuwur ora mbutuhake manipulasi tambahan;
  • database.* - opsi kanggo nyambungake menyang database, ngendi database.server.name - jeneng conto PostgreSQL sing digunakake kanggo mbentuk jeneng topik ing kluster Kafka;
  • table.include.list - dhaptar tabel sing pengin dilacak owah-owahan; diwenehi ing format schema.table_name; ora bisa digunakake bebarengan karo table.exclude.list;
  • heartbeat.interval.ms - interval (ing milliseconds) karo konektor ngirim pesen deg-degan menyang topik khusus;
  • heartbeat.action.query - panjalukan sing bakal ditindakake nalika ngirim saben pesen deg-degan (pilihan wis muncul wiwit versi 1.1);
  • slot.name - jeneng slot réplikasi sing bakal digunakake dening konektor;
  • publication.name - Jeneng publikasi ing PostgreSQL sing digunakake konektor. Yen ora ana, Debezium bakal nyoba nggawe. Yen pangguna sing nggawe sambungan ora duwe hak sing cukup kanggo tumindak iki, konektor bakal metu kanthi kesalahan;
  • transforms nemtokake cara persis ngganti jeneng topik target:
    • transforms.AddPrefix.type nuduhake yen kita bakal nggunakake ekspresi reguler;
    • transforms.AddPrefix.regex - topeng kanthi jeneng topik target didefinisikan maneh;
    • transforms.AddPrefix.replacement - langsung apa kita redefine.

Liyane babagan detak jantung lan owah-owahan

Kanthi gawan, konektor ngirim data menyang Kafka kanggo saben transaksi, lan nulis LSN (Log Sequence Number) menyang topik layanan. offset. Nanging apa mengkono yen konektor dikonfigurasi kanggo maca ora kabeh database, nanging mung bagean saka tabel (kang data dianyari arang)?

  • Konektor bakal maca file WAL lan ora ndeteksi transaksi sing ditindakake ing tabel sing dipantau.
  • Mulane, ora bakal nganyari posisi saiki ing topik utawa ing slot réplikasi.
  • Iki bakal nyebabake file WAL "macet" ing disk lan kemungkinan bakal entek ruang disk.

Lan ing kene opsi teka kanggo ngluwari. heartbeat.interval.ms и heartbeat.action.query. Nggunakake opsi iki ing pasangan ndadekake iku bisa kanggo nindakake panjalukan kanggo ngganti data ing tabel kapisah saben pesen deg-degan dikirim. Mangkono, LSN sing konektor saiki (ing slot réplikasi) terus dianyari. Iki ngidini DBMS mbusak file WAL sing ora dibutuhake maneh. Kanggo informasi luwih lengkap babagan carane opsi bisa, waca dokumentasi.

Pilihan liyane sing kudu digatekake yaiku transforms. Sanajan luwih babagan kepenak lan kaendahan ...

Kanthi gawan, Debezium nggawe topik nggunakake kabijakan jeneng ing ngisor iki: serverName.schemaName.tableName. Iki bisa uga ora mesthi trep. Pilihan transforms nggunakake ekspresi biasa, sampeyan bisa nemtokake dhaptar tabel sing acara kudu diarahake menyang topik kanthi jeneng tartamtu.

Ing konfigurasi kita thanks kanggo transforms ing ngisor iki mengkono: kabeh acara CDC saka database dilacak bakal menyang topik karo jeneng data.cdc.dbname. Yen ora (tanpa setelan kasebut), Debezium bakal nggawe topik kanggo saben tabel kanthi gawan: pg-dev.public.<table_name>.

Watesan konektor

Ing pungkasan katrangan babagan konfigurasi konektor kanggo PostgreSQL, sampeyan kudu ngomong babagan fitur / watesan ing ngisor iki:

  1. Fungsi konektor kanggo PostgreSQL gumantung ing konsep dekoding logis. Mula dheweke ora trek panjalukan kanggo ngganti struktur database (DDL) - miturut, data iki ora bakal ana ing topik.
  2. Wiwit slot réplikasi digunakake, sambungan konektor bisa mung menyang conto DBMS master.
  3. Yen pangguna sing nyambungake konektor menyang database duwe hak mung diwaca, banjur sadurunge peluncuran pisanan, sampeyan kudu nggawe slot replikasi kanthi manual lan nerbitake menyang database.

Nglamar Konfigurasi

Dadi ayo mbukak konfigurasi kita menyang konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Priksa manawa download wis sukses lan konektor diwiwiti:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Agung: wis diatur lan siap kanggo pindhah. Saiki ayo ndalang dadi konsumen lan nyambung menyang Kafka, banjur nambah lan ngganti entri ing tabel:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Ing topik kita, iki bakal ditampilake kaya ing ngisor iki:

JSON sing dawa banget karo owah-owahan kita

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Ing kaloro kasus kasebut, cathetan kasebut kalebu kunci (PK) rekaman sing diganti, lan inti saka owah-owahan: apa cathetan sadurunge lan apa sawise.

  • Ing kasus INSERT: nilai sadurunge (before) padha nullngiring dening senar sing dilebokake.
  • Ing kasus UPDATE: ing payload.before negara sadurunge baris ditampilake, lan ing payload.after - anyar karo inti saka owah-owahan.

2.2 MongoDB

Konektor iki nggunakake mekanisme replikasi MongoDB standar, maca informasi saka oplog saka simpul utami DBMS.

Kajaba kanggo konektor sing wis diterangake kanggo PgSQL, ing kene uga, ing wiwitan, gambar asli data dijupuk, sawise konektor ngalih menyang mode maca oplog.

Conto konfigurasi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Nalika sampeyan bisa ndeleng, ora ana opsi anyar dibandhingake conto sadurungé, nanging mung nomer opsi tanggung jawab kanggo nyambung menyang database lan ater-ater sing wis suda.

Setelan transforms wektu iki padha nindakake ing ngisor iki: nguripake jeneng topik target saka rencana <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

toleransi kesalahan

Masalah toleransi kesalahan lan kasedhiyan dhuwur ing jaman saiki luwih akut tinimbang sadurunge - utamane nalika kita ngomong babagan data lan transaksi, lan pelacakan pangowahan data ora ana ing sisih pinggir babagan iki. Ayo goleki apa sing bisa salah ing prinsip lan apa sing bakal kedadeyan karo Debezium ing saben kasus.

Ana telung opsi milih metu:

  1. Gagal Kafka Connect. Yen Connect dikonfigurasi kanggo bisa ing mode mbagekke, iki mbutuhake sawetara buruh nyetel group.id padha. Banjur, yen salah siji gagal, konektor bakal diwiwiti maneh ing buruh liyane lan terus maca saka posisi setya pungkasan ing topik ing Kafka.
  2. Mundhut panyambungan karo kluster Kafka. Konektor mung bakal mandheg maca ing posisi sing gagal dikirim menyang Kafka lan nyoba ngirim maneh kanthi periodik nganti upaya kasebut kasil.
  3. Sumber data ora kasedhiya. Konektor bakal nyoba nyambungake maneh menyang sumber miturut konfigurasi. Default punika 16 nyoba nggunakake backoff eksponensial. Sawise nyoba gagal kaping 16, tugas kasebut bakal ditandhani minangka gagal lan kudu diwiwiti maneh kanthi manual liwat antarmuka REST Kafka Connect.
    • Ing kasus PostgreSQL data ora bakal ilang, amarga nggunakake slot réplikasi bakal nyegah pambusakan file WAL ora diwaca dening konektor. Ing kasus iki, ana downside: yen panyambungan jaringan antarane konektor lan DBMS disrupted kanggo dangu, ana kasempatan sing papan disk bakal entek, lan iki bisa mimpin kanggo Gagal kabeh DBMS.
    • Ing kasus MySQL file binlog bisa diputer dening DBMS dhewe sadurunge konektivitas dibalèkaké. Iki bakal nimbulaké konektor menyang negara gagal, lan iku kudu miwiti maneh ing mode gambar asli seko dhisikan kanggo terus maca saka binlogs kanggo mulihake operasi normal.
    • ing MongoDB. Dokumentasi ngandika: prilaku konektor ing kasus log / file oplog wis dibusak lan konektor ora bisa nerusake maca saka posisi sing ditinggalake padha kanggo kabeh DBMS. Iku dumunung ing kasunyatan sing konektor bakal pindhah menyang negara gagal lan bakal mbutuhake restart ing mode snapshot awal.

      Nanging, ana pangecualian. Yen konektor ana ing negara pedhot kanggo dangu (utawa ora bisa tekan instance MongoDB), lan oplog diputer sak iki wektu, banjur nalika sambungan dibalèkaké, konektor bakal tenang terus maca data saka posisi pisanan kasedhiya, kang sawetara saka data ing Kafka ora bakal kenek.

kesimpulan

Debezium minangka pengalaman pisanan karo sistem CDC lan wis positif banget sakabèhé. Proyek kasebut nyogok dhukungan saka DBMS utama, gampang konfigurasi, dhukungan kanggo clustering lan komunitas sing aktif. Kanggo sing kasengsem ing laku, Aku menehi saran sing maca pandhuan kanggo Sambungake Kafka и Debesium.

Dibandhingake karo konektor JDBC kanggo Kafka Connect, kauntungan utama Debezium yaiku owah-owahan diwaca saka log DBMS, sing ngidini data bisa ditampa kanthi wektu tundha minimal. Konektor JDBC (disedhiyakake dening Kafka Connect) takon tabel sing dilacak ing interval tetep lan (kanggo alesan sing padha) ora ngasilake pesen nalika data dibusak (kepiye sampeyan bisa takon data sing ora ana?).

Kanggo ngatasi masalah sing padha, sampeyan bisa menehi perhatian marang solusi ing ngisor iki (saliyane Debezium):

PS

Waca uga ing blog kita:

Source: www.habr.com

Add a comment