Nepangkeun Debezium - CDC pikeun Apache Kafka

Nepangkeun Debezium - CDC pikeun Apache Kafka

Dina karya kuring, kuring sering mendakan solusi téknis / produk parangkat lunak énggal, inpormasi ngeunaan anu rada langka dina Internét nganggo basa Rusia. Kalayan tulisan ieu, kuring bakal nyobian ngeusian hiji gap sapertos conto tina prakték panganyarna kuring, nalika kuring peryogi nyetél ngirim acara CDC tina dua DBMS populér (PostgreSQL sareng MongoDB) ka klaster Kafka nganggo Debezium. Abdi ngarepkeun tulisan ulasan ieu, anu muncul salaku hasil tina padamelan anu dilakukeun, bakal mangpaat pikeun anu sanés.

Naon Debezium sareng CDC sacara umum?

Debesium - Wawakil kategori parangkat lunak CDC (Capture data robah), atanapi langkung tepatna, éta mangrupikeun sakumpulan panyambung pikeun sagala rupa DBMS anu cocog sareng kerangka Apache Kafka Connect.

ieu proyék open source, dilisensikeun dina Apache Lisensi v2.0 sarta disponsoran ku Red Hat. Pangwangunan parantos lumangsung saprak 2016 sareng ayeuna nyayogikeun dukungan resmi pikeun DBMS ieu: MySQL, PostgreSQL, MongoDB, SQL Server. Aya ogé panyambungna pikeun Cassandra na Oracle, tapi di momen aranjeunna dina status "aksés awal", sarta Kaluaran anyar teu ngajamin kasaluyuan mundur.

Upami urang ngabandingkeun CDC sareng pendekatan tradisional (nalika aplikasi langsung maca data tina DBMS), maka kaunggulan utamina kalebet palaksanaan parobahan data streaming dina tingkat baris kalayan latency rendah, réliabilitas anu luhur sareng kasadiaan. Dua titik anu terakhir dihontal ku ngagunakeun klaster Kafka salaku gudang pikeun acara CDC.

Ogé, kaunggulan kaasup kanyataan yén modél tunggal dipaké pikeun nyimpen acara, jadi aplikasi final teu kudu salempang ngeunaan nuances operasi DBMS béda.

Tungtungna, nganggo calo pesen ngamungkinkeun aplikasi anu ngawas parobahan data sacara horisontal. Dina waktos anu sami, dampak dina sumber data diminimalkeun, sabab data dicandak henteu langsung ti DBMS, tapi tina klaster Kafka.

Ngeunaan arsitektur Debezium

Ngagunakeun Debezium turun ka skéma basajan ieu:

DBMS (salaku sumber data) → konektor dina Kafka Connect → Apache Kafka → konsumen

Salaku ilustrasi, abdi bakal masihan diagram ti ramatloka proyék:

Nepangkeun Debezium - CDC pikeun Apache Kafka

Nanging, kuring henteu resep pisan kana skéma ieu, sabab sigana ngan ukur konektor tilelep anu mungkin.

Dina kanyataanana, kaayaan béda: ngeusian Data Lake anjeun (Tumbu panungtungan dina diagram di luhur) sanes hiji-hijina cara ngagunakeun Debezium. Kajadian anu dikirim ka Apache Kafka tiasa dianggo ku aplikasi anjeun pikeun nanganan rupa-rupa kaayaan. Salaku conto:

  • miceun data anu teu relevan tina cache;
  • ngirim bewara;
  • apdet indéks pilarian;
  • sababaraha jenis log audit;
  • ...

Upami anjeun gaduh aplikasi Java sareng henteu peryogi / kamungkinan ngagunakeun klaster Kafka, aya ogé kamungkinan pikeun ngaliwat. panyambungna dipasang. Kauntungannana atra nyaéta yén anjeun tiasa nampik infrastruktur tambahan (dina bentuk konektor sareng Kafka). Sanajan kitu, solusi ieu geus deprecated saprak vérsi 1.1 sarta geus teu dianjurkeun deui pikeun pamakéan (eta bisa dihapus dina release nu bakal datang).

Tulisan ieu bakal ngabahas arsitéktur anu disarankeun ku pamekar, anu nyayogikeun kasabaran sareng skalabilitas.

Konfigurasi panyambungna

Pikeun ngamimitian nyukcruk parobahan dina nilai pangpentingna - data - urang peryogi:

  1. sumber data, anu tiasa MySQL mimitian ti versi 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (daptar lengkep);
  2. klaster Apache Kafka;
  3. conto Kafka Connect (versi 1.x, 2.x);
  4. ngonpigurasi konektor Debezium.

Gawé dina dua titik kahiji, i.e. Prosés pamasangan DBMS sareng Apache Kafka saluareun ruang lingkup tulisan. Nanging, pikeun anu hoyong nyebarkeun sadayana dina kotak keusik, gudang resmi sareng conto ngagaduhan siap-siap. docker-compose.yaml.

Urang bakal difokuskeun dua titik panungtungan dina leuwih jéntré.

0. Kafka Connect

Di dieu sareng engké dina tulisan, sadaya conto konfigurasi dianggap dina konteks gambar Docker anu disebarkeun ku pamekar Debezium. Ieu ngandung sakabéh file plugin perlu (konektor) jeung nyadiakeun konfigurasi Kafka Connect ngagunakeun variabel lingkungan.

Upami anjeun badé nganggo Kafka Connect ti Confluent, anjeun kedah sacara mandiri nambihan plugins panyambungna anu diperyogikeun kana diréktori anu dijelaskeun dina plugin.path atawa diatur via variabel lingkungan CLASSPATH. Setélan pikeun worker Kafka Connect jeung panyambungna diartikeun ngaliwatan file konfigurasi nu diliwatan salaku argumen pikeun worker ngamimitian paréntah. Pikeun detil tingali dokuméntasi.

Sakabéh prosés nyetél Debeizum dina versi konektor dilumangsungkeun dina dua tahap. Hayu urang nganggap unggal sahijina:

1. Nyetel kerangka Kafka Connect

Pikeun ngalirkeun data kana klaster Apache Kafka, parameter husus disetel dina kerangka Kafka Connect, sapertos:

  • setélan sambungan klaster,
  • nami topik dimana konfigurasi konektor sorangan bakal disimpen,
  • nami grup dimana panyambungna dijalankeun (bisi nganggo modeu disebarkeun).

Gambar Docker resmi proyék ngadukung konfigurasi nganggo variabel lingkungan - ieu anu bakal kami anggo. Janten, unduh gambarna:

docker pull debezium/connect

Set minimum variabel lingkungan anu diperlukeun pikeun ngajalankeun konektor nyaéta kieu:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - daptar awal server klaster Kafka pikeun ménta daptar lengkep anggota klaster;
  • OFFSET_STORAGE_TOPIC=connector-offsets - topik pikeun nyimpen posisi dimana konektor ayeuna lokasina;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - topik pikeun nyimpen status konektor jeung tugas na;
  • CONFIG_STORAGE_TOPIC=connector-config - topik pikeun nyimpen data konfigurasi konektor sareng tugasna;
  • GROUP_ID=1 - identifier sahiji grup pagawe di mana tugas panyambungna bisa dieksekusi; diperlukeun nalika ngagunakeun disebarkeun (disebarkeun) réjim.

Urang ngamimitian wadahna kalayan variabel ieu:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Catetan ngeunaan Avro

Sacara standar, Debezium nyerat data dina format JSON, anu tiasa ditampi pikeun kotak pasir sareng data sajumlah leutik, tapi tiasa janten masalah dina pangkalan data anu sarat pisan. Alternatif pikeun konverter JSON nyaéta pikeun ngatur séri pesen nganggo Avro ka format binér, nu ngurangan beban dina I / O subsistem di Apache Kafka.

Pikeun make Avro, anjeun kudu nyebarkeun hiji misah skéma-pendaptaran (pikeun nyimpen skéma). Variabel pikeun konverter bakal katingali sapertos kieu:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Rincian ngeunaan ngagunakeun Avro sareng nyetél pendaptaran pikeun éta saluareun ruang lingkup tulisan - salajengna, pikeun kajelasan, kami bakal nganggo JSON.

2. Nyetel konektor sorangan

Ayeuna anjeun tiasa langsung kana konfigurasi konektor sorangan, anu bakal maca data tina sumberna.

Hayu urang nempo conto konektor pikeun dua DBMS: PostgreSQL na MongoDB, nu kuring boga pangalaman jeung nu aya béda (sanajan leutik, tapi dina sababaraha kasus signifikan!).

Konfigurasi dijelaskeun dina notasi JSON sareng diunggah ka Kafka Connect nganggo pamundut POST.

2.1. PostgreSQL

Conto konfigurasi konektor pikeun PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prinsip operasi konektor sanggeus konfigurasi ieu cukup basajan:

  • Nalika diluncurkeun pikeun kahiji kalina, éta nyambung ka pangkalan data anu ditangtukeun dina konfigurasi sareng ngamimitian dina modeu snapshot awal, ngirimkeun ka Kafka set awal data anu ditampi kalayan kondisional SELECT * FROM table_name.
  • Saatos initialization réngsé, panyambungna asup kana modeu maca parobahan tina file PostgreSQL WAL.

Ngeunaan pilihan anu dianggo:

  • name - nami konektor anu nganggo konfigurasi anu dijelaskeun di handap ieu; Dina mangsa nu bakal datang, ngaran ieu dipaké pikeun gawé bareng konektor (i.e. nempo status / balikan deui / update konfigurasi) ngaliwatan Kafka Connect REST API;
  • connector.class - Kelas konektor DBMS anu bakal dianggo ku konektor anu dikonpigurasi;
  • plugin.name nyaeta nami plugin pikeun decoding logis data tina file WAL. Sadia pikeun milih tina wal2json, decoderbuffs и pgoutput. Dua kahiji merlukeun pamasangan ekstensi luyu dina DBMS, sarta pgoutput pikeun versi PostgreSQL 10 sarta luhur teu merlukeun manipulasi tambahan;
  • database.* - pilihan pikeun nyambungkeun kana database, dimana database.server.name - Ngaran conto PostgreSQL dipaké pikeun ngabentuk ngaran topik dina klaster Kafka;
  • table.include.list - daptar tabel dimana urang hoyong ngalacak parobahan; dibikeun dina format schema.table_name; teu bisa dipaké babarengan jeung table.exclude.list;
  • heartbeat.interval.ms - interval (dina milliseconds) jeung nu konektor ngirimkeun pesen keteg jajantung ka topik husus;
  • heartbeat.action.query - pamundut anu bakal dieksekusi nalika ngirim unggal pesen keteg jajantung (pilihan muncul dina versi 1.1);
  • slot.name - nami slot réplikasi anu bakal dianggo ku konektor;
  • publication.name - Ngaran publikasi dina PostgreSQL, anu nganggo konektor. Upami teu aya, Debezium bakal nyobian nyiptakeunana. Lamun pamaké handapeun saha sambungan dijieun teu boga cukup hak pikeun aksi ieu, konektor bakal nungtungan ku kasalahan;
  • transforms nangtukeun kumaha persisna pikeun ngarobah ngaran topik udagan:
    • transforms.AddPrefix.type nunjukkeun yén urang bakal ngagunakeun ungkapan biasa;
    • transforms.AddPrefix.regex - topeng ku nu ngaran topik targétna didefinisikeun deui;
    • transforms.AddPrefix.replacement - langsung naon urang ngartikeun ulang.

Langkung seueur ngeunaan keteg jajantung sareng transformasi

Sacara standar, konektor ngirim data ka Kafka pikeun tiap transaksi komitmen, sarta LSN na (Log Sequence Number) kacatet dina topik layanan. offset. Tapi naon anu lumangsung lamun konektor ieu ngonpigurasi maca teu sakabéh database, tapi ngan bagian tina tabel na (nu data diropéa jarang)?

  • Konektor bakal maca file WAL sareng moal ngadeteksi transaksi anu dilakukeun kana tabel anu dipantau.
  • Ku alatan éta, éta moal ngamutahirkeun posisi na ayeuna boh dina topik atawa dina slot réplikasi.
  • Ieu, kahareupna bakal nyababkeun file WAL dicekel dina disk sareng kamungkinan kehabisan rohangan disk.

Sarta di dieu pilihan datang ka nyalametkeun teh. heartbeat.interval.ms и heartbeat.action.query. Ngagunakeun pilihan ieu dina pasangan ngamungkinkeun pikeun ngalakukeun pamundut pikeun ngarobah data dina tabel misah unggal waktos pesen keteg jajantung dikirim. Ku kituna, LSN dimana konektor ayeuna aya (dina slot réplikasi) terus diropéa. Hal ieu ngamungkinkeun DBMS ngahapus file WAL anu henteu diperyogikeun deui. Anjeun tiasa diajar langkung seueur ngeunaan kumaha pilihan dianggo dokuméntasi.

pilihan sejen pantes perhatian ngadeukeutan nyaeta transforms. Sanaos langkung seueur ngeunaan genah sareng kaéndahan ...

Sacara standar, Debezium nyiptakeun topik nganggo kabijakan pangaranan ieu: serverName.schemaName.tableName. Ieu bisa jadi teu salawasna merenah. Pilihan transforms Anjeun tiasa make ungkapan biasa keur ngartikeun daptar tabel, acara ti mana kudu routed ka topik kalawan ngaran husus.

Dina konfigurasi kami hatur nuhun kana transforms di handap ieu kajadian: sagala acara CDC ti database dilacak bakal buka topik kalawan nami data.cdc.dbname. Upami teu kitu (tanpa setélan ieu), Debezium sacara standar bakal nyiptakeun topik pikeun unggal méja sapertos: pg-dev.public.<table_name>.

watesan panyambungna

Pikeun nyimpulkeun pedaran ngeunaan konfigurasi konektor pikeun PostgreSQL, éta patut ngobrol ngeunaan fitur handap / watesan operasi na:

  1. Fungsi panyambungna pikeun PostgreSQL ngandelkeun konsép decoding logis. Ku kituna manéhna teu lagu requests pikeun ngarobah struktur database (DDL) - sasuai, data ieu moal aya dina jejer.
  2. Kusabab slot réplikasi dipaké, sambungan tina konektor mungkin ngan kana conto master DBMS.
  3. Lamun pamaké ngabawah panyambungna nyambung ka database boga hak baca-hijina, mangka saméméh peluncuran kahiji, anjeun bakal kudu sacara manual nyieun slot réplikasi jeung nyebarkeun ka database.

Nerapkeun Konfigurasi

Ku kituna hayu urang muka konfigurasi urang kana konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Kami pariksa yén unduhan parantos suksés sareng panyambungna dimimitian:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Hébat: tos siap sareng siap angkat. Ayeuna hayu urang pura-pura janten konsumen sareng sambungkeun ka Kafka, teras urang tambahkeun sareng robih éntri dina tabél:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Dina topik urang bakal dipintonkeun saperti kieu:

JSON anu panjang pisan sareng parobihan kami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Dina duanana kasus, rékaman diwangun ku konci (PK) tina catetan anu dirobah, sarta hakekat parobahan: naon catetan éta saméméh jeung naon deui sanggeus.

  • Dina kasus INSERT: nilai saméméh (before) sarua nulldituturkeun ku senar anu diselapkeun.
  • Dina kasus UPDATE: at payload.before kaayaan baris saméméhna dipintonkeun, sarta dina payload.after - anyar jeung hakekat robah.

2.2 MongoDB

Konektor ieu nganggo mékanisme réplikasi MongoDB standar, maca inpormasi tina oplog titik primér DBMS.

Nya kitu jeung panyambungna geus dijelaskeun pikeun PgSQL, di dieu, teuing, dina mimiti mimiti, snapshot data primér dicokot, nu satutasna konektor pindah ka modeu bacaan oplog.

Conto konfigurasi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Sakumaha anjeun tiasa tingali, teu aya pilihan anyar di dieu dibandingkeun sareng conto sateuacana, tapi ngan ukur jumlah pilihan anu tanggung jawab pikeun nyambungkeun kana pangkalan data sareng awalanna parantos dikirangan.

setélan transforms waktos ieu aranjeunna ngalakukeun di handap: balikkeun nami topik udagan tina skéma <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

toleransi kasalahan

Isu kasabaran sesar sareng kasadiaan anu luhur dina waktos urang langkung akut ti kantos - khususna nalika urang nyarioskeun data sareng transaksi, sareng pelacakan parobahan data henteu aya di sela-sela masalah ieu. Hayu urang tingali naon anu salah dina prinsipna sareng naon anu bakal kajadian ka Debezium dina unggal kasus.

Aya tilu pilihan milih kaluar:

  1. gagalna Kafka Connect. Lamun Connect geus ngonpigurasi pikeun digawé dina modeu disebarkeun, ieu merlukeun sababaraha pagawe pikeun nyetél group.id sarua. Lajeng, lamun salah sahijina gagal, konektor bakal restarted on worker sejen tur nuluykeun maca tina posisi komitmen panungtungan dina topik di Kafka.
  2. Leungitna konektipitas sareng klaster Kafka. Panyambungna ngan saukur bakal ngeureunkeun maca dina posisi anu gagal dikirim ka Kafka, sareng périodik bakal nyobian ngirim deui dugi usahana suksés.
  3. Sumber data teu sadia. Konektor bakal nyobian nyambung deui ka sumber numutkeun konfigurasi. standar nyaeta 16 usaha ngagunakeun backoff éksponénsial. Saatos 16 usaha gagal, tugas bakal ditandaan salaku gagal sareng éta kedah dibalikan deui sacara manual liwat antarmuka Kafka Connect REST.
    • Dina kasus PostgreSQL data moal leungit, sabab ngagunakeun slot réplikasi bakal nyegah ngahapus file WAL teu dibaca ku konektor. Dina hal ieu, aya downside a: lamun konektipitas jaringan antara konektor jeung DBMS geus kaganggu pikeun lila, aya kamungkinan yén spasi disk bakal kaluar, sarta ieu bisa ngakibatkeun gagalna sakabéh DBMS.
    • Dina kasus MySQL file binlog bisa diputer ku DBMS sorangan saméméh konektipitas dibalikeun. Ieu bakal ngakibatkeun konektor pikeun lebet kana kaayaan gagal, sarta mulangkeun operasi normal, anjeun bakal kudu balikan deui dina modeu snapshot awal pikeun nuluykeun bacaan ti binlogs.
    • dina MongoDB. Dokuméntasi nyebutkeun: paripolah konektor bisi file log / oplog geus dihapus sarta konektor teu bisa neruskeun maca ti posisi dimana eta antepkeun sarua pikeun sakabéh DBMS. Eta perenahna di kanyataan yén konektor bakal balik kana kaayaan gagal sarta bakal merlukeun balikan deui dina modeu snapshot awal.

      Sanajan kitu, aya pengecualian. Upami konektor dina kaayaan dipegatkeun kanggo waktos anu lami (atanapi henteu tiasa ngahontal conto MongoDB), sareng oplog diputar salami waktos ieu, teras nalika sambungan dibalikkeun, konektor bakal teras-terasan maca data tina posisi anu sayogi. , naha sababaraha data dina Kafka teu bakal pencét.

kacindekan

Debezium mangrupikeun pangalaman munggaran kuring sareng sistem CDC sareng parantos positip pisan. Proyék nyogok dukungan DBMS utama, betah konfigurasi, dukungan pikeun clustering sareng komunitas anu aktip. Pikeun maranéhanana museurkeun prakna, Kuring nyarankeun yén anjeun baca pituduh pikeun Kafka Nyambung и Debesium.

Dibandingkeun panyambungna JDBC pikeun Kafka Connect, Kauntungan utama Debezium nyaéta parobahan dibaca tina log DBMS, anu ngamungkinkeun data ditampi kalayan reureuh minimal. Konektor JDBC (disadiakeun ku Kafka Connect) naroskeun tabel anu dilacak dina interval anu tetep sareng (kusabab anu sami) henteu ngahasilkeun pesen nalika data dihapus (kumaha anjeun tiasa naroskeun data anu henteu aya?).

Pikeun ngabéréskeun masalah anu sami, anjeun tiasa nengetan solusi ieu (salian Debezium):

PS

Baca ogé dina blog urang:

sumber: www.habr.com

Tambahkeun komentar