Memperkenalkan Debezium - CDC untuk Apache Kafka

Memperkenalkan Debezium - CDC untuk Apache Kafka

Dalam kerja saya, saya sering menemui penyelesaian teknikal / produk perisian baru, yang mana terdapat sedikit maklumat mengenai Internet berbahasa Rusia. Dengan artikel ini, saya akan cuba mengisi satu jurang tersebut dengan contoh daripada amalan saya baru-baru ini, apabila saya perlu menyediakan penghantaran acara CDC daripada dua DBMS popular (PostgreSQL dan MongoDB) ke gugusan Kafka menggunakan Debezium. Saya berharap artikel ulasan ini, yang muncul sebagai hasil kerja yang dilakukan, akan berguna kepada orang lain.

Apakah Debezium dan CDC secara umum?

Debezium - Wakil kategori perisian CDC (Tangkap perubahan data), atau lebih tepat lagi, ia adalah satu set penyambung untuk pelbagai DBMS yang serasi dengan rangka kerja Apache Kafka Connect.

ini projek sumber terbuka, dilesenkan di bawah Lesen Apache v2.0 dan ditaja oleh Red Hat. Pembangunan telah dijalankan sejak 2016 dan pada masa ini ia menyediakan sokongan rasmi untuk DBMS berikut: MySQL, PostgreSQL, MongoDB, SQL Server. Terdapat juga penyambung untuk Cassandra dan Oracle, tetapi pada masa ini ia berada dalam status "akses awal", dan keluaran baharu tidak menjamin keserasian ke belakang.

Jika kita membandingkan CDC dengan pendekatan tradisional (apabila aplikasi membaca data daripada DBMS secara langsung), maka kelebihan utamanya termasuk pelaksanaan penstriman perubahan data pada peringkat baris dengan kependaman rendah, kebolehpercayaan tinggi dan ketersediaan. Dua mata terakhir dicapai dengan menggunakan kluster Kafka sebagai repositori untuk acara CDC.

Selain itu, kelebihannya termasuk fakta bahawa model tunggal digunakan untuk menyimpan acara, jadi aplikasi akhir tidak perlu risau tentang nuansa pengendalian DBMS yang berbeza.

Akhirnya, menggunakan broker mesej membuka skop untuk penskalaan mendatar aplikasi yang menjejaki perubahan dalam data. Pada masa yang sama, impak pada sumber data diminimumkan, kerana data diterima bukan terus daripada DBMS, tetapi daripada kelompok Kafka.

Mengenai seni bina Debezium

Menggunakan Debezium datang ke skema mudah ini:

DBMS (sebagai sumber data) β†’ penyambung dalam Kafka Connect β†’ Apache Kafka β†’ pengguna

Sebagai ilustrasi, saya akan memberikan gambar rajah dari laman web projek:

Memperkenalkan Debezium - CDC untuk Apache Kafka

Walau bagaimanapun, saya tidak begitu menyukai skim ini, kerana nampaknya hanya penyambung sinki yang mungkin.

Pada hakikatnya, keadaannya berbeza: mengisi Data Lake anda (pautan terakhir dalam rajah di atas) bukan satu-satunya cara untuk menggunakan Debezium. Acara yang dihantar ke Apache Kafka boleh digunakan oleh aplikasi anda untuk menangani pelbagai situasi. Sebagai contoh:

  • penyingkiran data yang tidak berkaitan daripada cache;
  • menghantar pemberitahuan;
  • kemas kini indeks carian;
  • beberapa jenis log audit;
  • ...

Sekiranya anda mempunyai aplikasi Java dan tidak ada keperluan/kemungkinan untuk menggunakan kluster Kafka, terdapat juga kemungkinan untuk berfungsi penyambung tertanam. Kelebihan yang jelas ialah dengan itu anda boleh menolak infrastruktur tambahan (dalam bentuk penyambung dan Kafka). Walau bagaimanapun, penyelesaian ini telah ditamatkan sejak versi 1.1 dan tidak lagi disyorkan untuk digunakan (ia mungkin dialih keluar dalam keluaran akan datang).

Artikel ini akan membincangkan seni bina yang disyorkan oleh pembangun, yang menyediakan toleransi kesalahan dan kebolehskalaan.

Konfigurasi penyambung

Untuk mula menjejaki perubahan dalam nilai yang paling penting - data - kita perlukan:

  1. sumber data, yang boleh menjadi MySQL bermula dari versi 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (senarai penuh);
  2. Kelompok Apache Kafka
  3. Contoh Kafka Connect (versi 1.x, 2.x);
  4. penyambung Debezium yang dikonfigurasikan.

Bekerja pada dua mata pertama, i.e. proses memasang DBMS dan Apache Kafka berada di luar skop artikel. Walau bagaimanapun, bagi mereka yang ingin menggunakan segala-galanya dalam kotak pasir, terdapat satu siap sedia dalam repositori rasmi dengan contoh docker-compose.yaml.

Kami akan memberi tumpuan kepada dua mata terakhir dengan lebih terperinci.

0. Kafka Connect

Di sini dan kemudian dalam artikel, semua contoh konfigurasi dipertimbangkan dalam konteks imej Docker yang diedarkan oleh pembangun Debezium. Ia mengandungi semua fail pemalam yang diperlukan (penyambung) dan menyediakan konfigurasi Kafka Connect menggunakan pembolehubah persekitaran.

Jika anda berhasrat untuk menggunakan Kafka Connect daripada Confluent, anda perlu menambah sendiri pemalam penyambung yang diperlukan pada direktori yang dinyatakan dalam plugin.path atau ditetapkan melalui pembolehubah persekitaran CLASSPATH. Tetapan untuk pekerja dan penyambung Kafka Connect ditakrifkan melalui fail konfigurasi yang dihantar sebagai hujah kepada arahan mula pekerja. Untuk butiran, lihat dokumentasi.

Seluruh proses menyediakan Debeizum dalam versi penyambung dijalankan dalam dua peringkat. Mari kita pertimbangkan setiap daripada mereka:

1. Menyediakan rangka kerja Kafka Connect

Untuk menstrim data ke gugusan Apache Kafka, parameter khusus ditetapkan dalam rangka kerja Kafka Connect, seperti:

  • tetapan sambungan kluster,
  • nama topik di mana konfigurasi penyambung itu sendiri akan disimpan,
  • nama kumpulan di mana penyambung sedang berjalan (sekiranya menggunakan mod teragih).

Imej Docker rasmi projek menyokong konfigurasi menggunakan pembolehubah persekitaran - inilah yang akan kami gunakan. Jadi mari muat turun imej:

docker pull debezium/connect

Set minimum pembolehubah persekitaran yang diperlukan untuk menjalankan penyambung adalah seperti berikut:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - senarai awal pelayan kluster Kafka untuk mendapatkan senarai lengkap ahli kluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” topik untuk menyimpan kedudukan di mana penyambung berada pada masa ini;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - topik untuk menyimpan status penyambung dan tugasnya;
  • CONFIG_STORAGE_TOPIC=connector-config - topik untuk menyimpan data konfigurasi penyambung dan tugasnya;
  • GROUP_ID=1 β€” pengecam kumpulan pekerja yang mana tugas penyambung boleh dilaksanakan; diperlukan apabila menggunakan diedarkan (diedarkan) rejim.

Kami memulakan bekas dengan pembolehubah ini:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota tentang Avro

Secara lalai, Debezium menulis data dalam format JSON, yang boleh diterima untuk kotak pasir dan sejumlah kecil data, tetapi boleh menjadi masalah dalam pangkalan data yang banyak dimuatkan. Alternatif kepada penukar JSON ialah mensirikan mesej menggunakan Avro kepada format binari, yang mengurangkan beban pada subsistem I/O dalam Apache Kafka.

Untuk menggunakan Avro, anda perlu menggunakan yang berasingan pendaftaran skema (untuk menyimpan skema). Pembolehubah untuk penukar akan kelihatan seperti ini:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Butiran tentang menggunakan Avro dan menyediakan pendaftaran untuknya adalah di luar skop artikel - selanjutnya, untuk kejelasan, kami akan menggunakan JSON.

2. Menyediakan penyambung itu sendiri

Kini anda boleh pergi terus ke konfigurasi penyambung itu sendiri, yang akan membaca data daripada sumber.

Mari lihat contoh penyambung untuk dua DBMS: PostgreSQL dan MongoDB, yang mana saya mempunyai pengalaman dan yang mana terdapat perbezaan (walaupun kecil, tetapi dalam beberapa kes penting!).

Konfigurasi diterangkan dalam tatatanda JSON dan dimuat naik ke Kafka Connect menggunakan permintaan POST.

2.1. PostgreSQL

Contoh konfigurasi penyambung untuk PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prinsip operasi penyambung selepas konfigurasi ini agak mudah:

  • Pada permulaan pertama, ia menyambung ke pangkalan data yang dinyatakan dalam konfigurasi dan bermula dalam mod petikan awal, menghantar kepada Kafka set awal data yang diterima dengan bersyarat SELECT * FROM table_name.
  • Selepas pemulaan selesai, penyambung memasuki mod membaca perubahan daripada fail PostgreSQL WAL.

Mengenai pilihan yang digunakan:

  • name β€” nama penyambung yang mana konfigurasi yang diterangkan di bawah digunakan; kemudiannya, nama ini digunakan untuk berfungsi dengan penyambung (iaitu melihat status / mulakan semula / kemas kini konfigurasi) melalui API REST Kafka Connect;
  • connector.class β€” kelas penyambung DBMS yang akan digunakan oleh penyambung yang dikonfigurasikan;
  • plugin.name ialah nama pemalam untuk penyahkodan logik data daripada fail WAL. Tersedia untuk dipilih wal2json, decoderbuffs ΠΈ pgoutput. Dua yang pertama memerlukan pemasangan sambungan yang sesuai dalam DBMS, dan pgoutput untuk PostgreSQL versi 10 dan lebih tinggi tidak memerlukan manipulasi tambahan;
  • database.* β€” pilihan untuk menyambung ke pangkalan data, di mana database.server.name - nama contoh PostgreSQL yang digunakan untuk membentuk nama topik dalam kelompok Kafka;
  • table.include.list - senarai jadual di mana kami ingin menjejaki perubahan; diberikan dalam format schema.table_name; tidak boleh digunakan bersama-sama dengan table.exclude.list;
  • heartbeat.interval.ms β€” selang (dalam milisaat) yang mana penyambung menghantar mesej degupan jantung ke topik khas;
  • heartbeat.action.query - permintaan yang akan dilaksanakan apabila menghantar setiap mesej degupan jantung (pilihan telah muncul sejak versi 1.1);
  • slot.name β€” nama slot replikasi yang akan digunakan oleh penyambung;
  • publication.name - Nama penerbitan dalam PostgreSQL yang digunakan oleh penyambung. Sekiranya ia tidak wujud, Debezium akan cuba menciptanya. Jika pengguna di mana sambungan dibuat tidak mempunyai hak yang mencukupi untuk tindakan ini, penyambung akan keluar dengan ralat;
  • transforms menentukan cara tepat untuk menukar nama topik sasaran:
    • transforms.AddPrefix.type menunjukkan bahawa kami akan menggunakan ungkapan biasa;
    • transforms.AddPrefix.regex β€” topeng yang mana nama topik sasaran ditakrifkan semula;
    • transforms.AddPrefix.replacement - secara langsung apa yang kami takrifkan semula.

Lebih lanjut mengenai degupan jantung dan perubahan

Secara lalai, penyambung menghantar data kepada Kafka untuk setiap transaksi yang dilakukan dan menulis LSN (Nombor Urutan Log)nya kepada topik perkhidmatan offset. Tetapi apa yang berlaku jika penyambung dikonfigurasikan untuk membaca bukan keseluruhan pangkalan data, tetapi hanya sebahagian daripada jadualnya (yang datanya jarang dikemas kini)?

  • Penyambung akan membaca fail WAL dan tidak mengesan transaksi yang dilakukan di dalamnya ke jadual yang dipantau.
  • Oleh itu, ia tidak akan mengemas kini kedudukan semasanya sama ada dalam topik atau dalam slot replikasi.
  • Ini, seterusnya, akan menyebabkan fail WAL "terperangkap" pada cakera dan kemungkinan akan kehabisan ruang cakera.

Dan di sini pilihan datang untuk menyelamatkan. heartbeat.interval.ms ΠΈ heartbeat.action.query. Menggunakan pilihan ini secara berpasangan membolehkan untuk melaksanakan permintaan untuk menukar data dalam jadual berasingan setiap kali mesej degupan jantung dihantar. Oleh itu, LSN di mana penyambung berada pada masa ini (dalam slot replikasi) sentiasa dikemas kini. Ini membolehkan DBMS mengalih keluar fail WAL yang tidak diperlukan lagi. Untuk maklumat lanjut tentang cara pilihan berfungsi, lihat dokumentasi.

Pilihan lain yang patut diberi perhatian lebih dekat ialah transforms. Walaupun ia lebih kepada keselesaan dan kecantikan...

Secara lalai, Debezium mencipta topik menggunakan dasar penamaan berikut: serverName.schemaName.tableName. Ini mungkin tidak selalunya sesuai. Pilihan transforms menggunakan ungkapan biasa, anda boleh menentukan senarai jadual yang acaranya perlu dihalakan ke topik dengan nama tertentu.

Dalam konfigurasi kami terima kasih kepada transforms perkara berikut berlaku: semua acara CDC daripada pangkalan data yang dijejaki akan pergi ke topik dengan nama data.cdc.dbname. Jika tidak (tanpa tetapan ini), Debezium secara lalai akan mencipta topik untuk setiap jadual borang: pg-dev.public.<table_name>.

Had penyambung

Pada penghujung huraian konfigurasi penyambung untuk PostgreSQL, patut dibincangkan tentang ciri / batasan kerja berikut:

  1. Fungsi penyambung untuk PostgreSQL bergantung pada konsep penyahkodan logik. Oleh itu dia tidak menjejaki permintaan untuk menukar struktur pangkalan data (DDL) - oleh itu, data ini tidak akan berada dalam topik.
  2. Oleh kerana slot replikasi digunakan, sambungan penyambung adalah mungkin sahaja kepada contoh DBMS induk.
  3. Jika pengguna di mana penyambung menyambung ke pangkalan data mempunyai hak baca sahaja, maka sebelum pelancaran pertama, anda perlu membuat slot replikasi secara manual dan menerbitkan ke pangkalan data.

Menggunakan Konfigurasi

Jadi mari muatkan konfigurasi kami ke dalam penyambung:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Kami menyemak sama ada muat turun berjaya dan penyambung bermula:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Hebat: ia telah disediakan dan sedia untuk digunakan. Sekarang mari kita berpura-pura menjadi pengguna dan menyambung ke Kafka, selepas itu kita menambah dan menukar entri dalam jadual:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Dalam topik kami, ini akan dipaparkan seperti berikut:

JSON yang sangat panjang dengan perubahan kami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Dalam kedua-dua kes, rekod terdiri daripada kunci (PK) rekod yang telah diubah, dan intipati perubahan itu: apakah rekod itu sebelum dan apa yang berlaku selepasnya.

  • Dalam kes INSERT: nilai sebelum (before) sama nulldiikuti dengan rentetan yang dimasukkan.
  • Dalam kes UPDATE: pada payload.before keadaan baris sebelumnya dipaparkan, dan dalam payload.after - baru dengan intipati perubahan.

2.2 MongoDB

Penyambung ini menggunakan mekanisme replikasi MongoDB standard, membaca maklumat daripada oplog nod utama DBMS.

Begitu juga dengan penyambung yang telah diterangkan untuk PgSQL, di sini juga, pada permulaan pertama, petikan data utama diambil, selepas itu penyambung bertukar kepada mod bacaan oplog.

Contoh konfigurasi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Seperti yang anda lihat, tiada pilihan baharu berbanding contoh sebelumnya, tetapi hanya bilangan pilihan yang bertanggungjawab untuk menyambung ke pangkalan data dan awalan mereka telah dikurangkan.

Tetapan transforms kali ini mereka melakukan perkara berikut: menukar nama topik sasaran daripada skema <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

toleransi kesalahan

Isu toleransi kesalahan dan ketersediaan yang tinggi pada zaman kita adalah lebih meruncing berbanding sebelum ini - terutamanya apabila kita bercakap tentang data dan transaksi, dan penjejakan perubahan data tidak diketepikan dalam perkara ini. Mari kita lihat apa yang boleh berlaku pada dasarnya dan apa yang akan berlaku kepada Debezium dalam setiap kes.

Terdapat tiga pilihan menarik diri:

  1. Kegagalan Kafka Connect. Jika Connect dikonfigurasikan untuk berfungsi dalam mod teragih, ini memerlukan berbilang pekerja untuk menetapkan group.id yang sama. Kemudian, jika salah satu daripadanya gagal, penyambung akan dimulakan semula pada pekerja lain dan terus membaca dari kedudukan komited terakhir dalam topik dalam Kafka.
  2. Kehilangan sambungan dengan gugusan Kafka. Penyambung hanya akan berhenti membaca pada kedudukan yang gagal dihantar kepada Kafka dan cuba menghantarnya semula secara berkala sehingga percubaan itu berjaya.
  3. Sumber data tidak tersedia. Penyambung akan cuba menyambung semula ke sumber mengikut konfigurasi. Lalai ialah 16 percubaan menggunakan mundur eksponen. Selepas percubaan ke-16 gagal, tugasan akan ditandakan sebagai gagal dan ia perlu dimulakan semula secara manual melalui antara muka REST Kafka Connect.
    • Dalam kes PostgreSQL data tidak akan hilang, kerana menggunakan slot replikasi akan menghalang pemadaman fail WAL yang tidak dibaca oleh penyambung. Dalam kes ini, terdapat kelemahan: jika sambungan rangkaian antara penyambung dan DBMS terganggu untuk masa yang lama, terdapat kemungkinan ruang cakera akan kehabisan, dan ini boleh menyebabkan kegagalan keseluruhan DBMS.
    • Dalam kes MySQL fail binlog boleh diputar oleh DBMS itu sendiri sebelum sambungan dipulihkan. Ini akan menyebabkan penyambung masuk ke dalam keadaan gagal, dan ia perlu dimulakan semula dalam mod syot kilat awal untuk meneruskan bacaan daripada binlog untuk memulihkan operasi biasa.
    • pada MongoDB. Dokumentasi mengatakan bahawa tingkah laku penyambung sekiranya fail log/oplog telah dipadamkan dan penyambung tidak dapat meneruskan bacaan dari kedudukan di mana ia berhenti adalah sama untuk semua DBMS. Ia terletak pada hakikat bahawa penyambung akan masuk ke negeri ini gagal dan akan memerlukan permulaan semula dalam mod petikan awal.

      Walau bagaimanapun, terdapat pengecualian. Jika penyambung berada dalam keadaan terputus untuk masa yang lama (atau tidak dapat mencapai contoh MongoDB), dan oplog diputar pada masa ini, maka apabila sambungan dipulihkan, penyambung akan terus membaca data dari kedudukan pertama yang tersedia dengan tenang , itulah sebabnya beberapa data dalam Kafka tiada akan memukul.

Kesimpulan

Debezium adalah pengalaman pertama saya dengan sistem CDC dan secara keseluruhannya sangat positif. Projek itu menyogok sokongan DBMS utama, kemudahan konfigurasi, sokongan untuk pengelompokan dan komuniti yang aktif. Bagi mereka yang berminat dalam amalan, saya mengesyorkan anda membaca panduan untuk Kafka Connect ΠΈ Debezium.

Berbanding dengan penyambung JDBC untuk Kafka Connect, kelebihan utama Debezium ialah perubahan dibaca daripada log DBMS, yang membolehkan data diterima dengan kelewatan yang minimum. Penyambung JDBC (disediakan oleh Kafka Connect) menanyakan jadual yang dijejaki pada selang masa tetap dan (atas sebab yang sama) tidak menjana mesej apabila data dipadamkan (bagaimana anda boleh membuat pertanyaan untuk data yang tiada?).

Untuk menyelesaikan masalah yang sama, anda boleh memberi perhatian kepada penyelesaian berikut (sebagai tambahan kepada Debezium):

PS

Baca juga di blog kami:

Sumber: www.habr.com

Tambah komen