Memperkenalkan Debezium - CDC untuk Apache Kafka

Memperkenalkan Debezium - CDC untuk Apache Kafka

Dalam pekerjaan saya, saya sering menemukan solusi teknis/produk perangkat lunak baru, yang informasinya agak langka di Internet berbahasa Rusia. Dengan artikel ini saya akan mencoba mengisi salah satu celah tersebut dengan contoh dari praktik saya baru-baru ini, ketika saya perlu mengonfigurasi pengiriman peristiwa CDC dari dua DBMS populer (PostgreSQL dan MongoDB) ke cluster Kafka menggunakan Debezium. Semoga artikel review yang muncul sebagai hasil kerja keras ini dapat bermanfaat bagi orang lain.

Apa itu Debezium dan CDC secara umum?

Debezium β€” perwakilan dari kategori perangkat lunak CDC (Tangkap Perubahan Data), atau lebih tepatnya, ini adalah sekumpulan konektor untuk berbagai DBMS yang kompatibel dengan kerangka Apache Kafka Connect.

Itu proyek Sumber Terbuka, berlisensi di bawah Lisensi Apache v2.0 dan disponsori oleh Red Hat. Pengembangan telah berlangsung sejak 2016 dan saat ini memberikan dukungan resmi untuk DBMS berikut: MySQL, PostgreSQL, MongoDB, SQL Server. Ada juga konektor untuk Cassandra dan Oracle, tetapi saat ini keduanya berada dalam status β€œakses awal”, dan rilis baru tidak menjamin kompatibilitas ke belakang.

Jika kita membandingkan CDC dengan pendekatan tradisional (ketika aplikasi membaca data dari DBMS secara langsung), keunggulan utamanya mencakup implementasi streaming perubahan data pada tingkat baris dengan latensi rendah, keandalan dan ketersediaan tinggi. Dua poin terakhir dicapai dengan menggunakan cluster Kafka sebagai tempat penyimpanan acara CDC.

Keuntungan lainnya adalah kenyataan bahwa satu model digunakan untuk menyimpan peristiwa, sehingga aplikasi akhir tidak perlu khawatir tentang nuansa pengoperasian DBMS yang berbeda.

Terakhir, penggunaan perantara pesan memungkinkan aplikasi yang memantau perubahan data untuk diperluas secara horizontal. Pada saat yang sama, dampak terhadap sumber data diminimalkan, karena data diperoleh tidak langsung dari DBMS, namun dari cluster Kafka.

Tentang arsitektur Debezium

Penggunaan Debezium dilakukan dengan skema sederhana ini:

DBMS (sebagai sumber data) β†’ konektor di Kafka Connect β†’ Apache Kafka β†’ konsumen

Sebagai ilustrasi, berikut adalah diagram dari situs proyek:

Memperkenalkan Debezium - CDC untuk Apache Kafka

Namun, saya kurang begitu menyukai skema ini, karena sepertinya hanya penggunaan konektor wastafel saja yang memungkinkan.

Kenyataannya, situasinya berbeda: mengisi Data Lake Anda (tautan terakhir pada diagram di atas) Ini bukan satu-satunya cara untuk menggunakan Debezium. Peristiwa yang dikirim ke Apache Kafka dapat digunakan oleh aplikasi Anda untuk menangani berbagai situasi. Misalnya:

  • menghapus data yang tidak relevan dari cache;
  • mengirim pemberitahuan;
  • pembaruan indeks pencarian;
  • semacam log audit;
  • ...

Jika Anda memiliki aplikasi Java dan tidak ada kebutuhan/kemungkinan untuk menggunakan cluster Kafka, ada juga kemungkinan untuk bekerja melalui konektor tertanam. Keuntungan yang jelas adalah menghilangkan kebutuhan akan infrastruktur tambahan (dalam bentuk konektor dan Kafka). Namun, solusi ini sudah tidak digunakan lagi sejak versi 1.1 dan tidak lagi direkomendasikan untuk digunakan (dukungan untuk solusi ini mungkin akan dihapus pada rilis mendatang).

Artikel ini akan membahas arsitektur yang direkomendasikan oleh pengembang, yang memberikan toleransi kesalahan dan skalabilitas.

Konfigurasi konektor

Untuk mulai melacak perubahan pada nilai terpenting - data - kita memerlukan:

  1. sumber datanya bisa MySQL mulai versi 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (daftar lengkap);
  2. kluster Apache Kafka;
  3. Instans Kafka Connect (versi 1.x, 2.x);
  4. konektor Debezium yang dikonfigurasi.

Kerjakan dua poin pertama, yaitu. Proses instalasi DBMS dan Apache Kafka berada di luar cakupan artikel. Namun, bagi mereka yang ingin menerapkan semuanya di sandbox, repositori resmi dengan contoh sudah siap pakai buruh pelabuhan-menulis.yaml.

Kami akan membahas lebih detail dua poin terakhir.

0. Koneksi Kafka

Di sini dan selanjutnya dalam artikel, semua contoh konfigurasi dibahas dalam konteks image Docker yang didistribusikan oleh pengembang Debezium. Ini berisi semua file plugin (konektor) yang diperlukan dan menyediakan konfigurasi Kafka Connect menggunakan variabel lingkungan.

Jika Anda ingin menggunakan Kafka Connect dari Confluent, Anda perlu menambahkan sendiri plugin konektor yang diperlukan ke direktori yang ditentukan di plugin.path atau diatur melalui variabel lingkungan CLASSPATH. Pengaturan untuk pekerja dan konektor Kafka Connect ditentukan melalui file konfigurasi yang diteruskan sebagai argumen ke perintah peluncuran pekerja. Untuk lebih jelasnya, lihat dokumentasi.

Seluruh proses pengaturan Debeizum dalam versi konektor dilakukan dalam dua tahap. Mari kita lihat masing-masing:

1. Menyiapkan kerangka Kafka Connect

Untuk mengalirkan data ke klaster Apache Kafka, parameter tertentu diatur dalam kerangka Kafka Connect, seperti:

  • parameter untuk menghubungkan ke cluster,
  • nama topik di mana konfigurasi konektor itu sendiri akan disimpan secara langsung,
  • nama grup tempat konektor dijalankan (jika mode terdistribusi digunakan).

Gambar Docker resmi proyek mendukung konfigurasi menggunakan variabel lingkungan - inilah yang akan kita gunakan. Jadi, unduh gambarnya:

docker pull debezium/connect

Kumpulan minimum variabel lingkungan yang diperlukan untuk menjalankan konektor adalah sebagai berikut:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 β€” daftar awal server cluster Kafka untuk mendapatkan daftar lengkap anggota cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” topik untuk menyimpan posisi di mana konektor berada saat ini;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status β€” topik untuk menyimpan status konektor dan tugasnya;
  • CONFIG_STORAGE_TOPIC=connector-config - topik untuk menyimpan data konfigurasi konektor dan tugasnya;
  • GROUP_ID=1 β€” pengidentifikasi kelompok pekerja yang dapat menjalankan tugas konektor; diperlukan saat menggunakan terdistribusi (didistribusikan) rezim.

Kami meluncurkan kontainer dengan variabel berikut:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Catatan tentang Avro

Secara default, Debezium menulis data dalam format JSON, yang dapat diterima untuk kotak pasir dan data dalam jumlah kecil, namun dapat menjadi masalah dalam database yang sarat muatan. Alternatif untuk konverter JSON adalah membuat pesan bersambung menggunakan Avro ke dalam format biner, yang mengurangi beban pada subsistem I/O di Apache Kafka.

Untuk menggunakan Avro Anda perlu menerapkan yang terpisah skema-registrasi (untuk menyimpan diagram). Variabel untuk konverter akan terlihat seperti ini:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detail tentang penggunaan Avro dan pengaturan registrinya berada di luar cakupan artikel ini - selanjutnya, untuk kejelasan, kami akan menggunakan JSON.

2. Konfigurasi konektor itu sendiri

Sekarang Anda dapat langsung menuju ke konfigurasi konektor itu sendiri, yang akan membaca data dari sumbernya.

Mari kita lihat contoh konektor untuk dua DBMS: PostgreSQL dan MongoDB, yang saya punya pengalaman dan ada perbedaannya (walaupun kecil, tetapi dalam beberapa kasus signifikan!).

Konfigurasinya dijelaskan dalam notasi JSON dan diunggah ke Kafka Connect menggunakan permintaan POST.

2.1. PostgreSQL

Contoh konfigurasi konektor untuk PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prinsip pengoperasian konektor setelah pengaturan ini cukup sederhana:

  • Saat diluncurkan untuk pertama kalinya, ia terhubung ke database yang ditentukan dalam konfigurasi dan dimulai dalam mode cuplikan awal, mengirimkan ke Kafka kumpulan data awal yang diperoleh menggunakan kondisional SELECT * FROM table_name.
  • Setelah inisialisasi selesai, konektor masuk ke mode untuk membaca perubahan dari file WAL PostgreSQL.

Tentang opsi yang digunakan:

  • name β€” nama konektor yang konfigurasinya dijelaskan di bawah ini digunakan; di masa depan, nama ini digunakan untuk bekerja dengan konektor (yaitu melihat status / memulai ulang / memperbarui konfigurasi) melalui REST API Kafka Connect;
  • connector.class β€” Kelas konektor DBMS yang akan digunakan oleh konektor yang dikonfigurasi;
  • plugin.name β€” nama plugin untuk decoding logis data dari file WAL. Tersedia untuk dipilih wal2json, decoderbuffs ΠΈ pgoutput. Dua yang pertama memerlukan instalasi ekstensi yang sesuai di DBMS, dan pgoutput untuk PostgreSQL versi 10 dan lebih tinggi tidak memerlukan manipulasi tambahan;
  • database.* β€” opsi untuk menyambung ke database, di mana database.server.name β€” Nama instans PostgreSQL yang digunakan untuk membentuk nama topik di cluster Kafka;
  • table.include.list β€” daftar tabel yang ingin kita lacak perubahannya; ditentukan dalam format schema.table_name; tidak dapat digunakan bersamaan dengan table.exclude.list;
  • heartbeat.interval.ms β€” interval (dalam milidetik) yang digunakan konektor untuk mengirimkan pesan detak jantung ke topik khusus;
  • heartbeat.action.query β€” permintaan yang akan dieksekusi saat mengirim setiap pesan detak jantung (opsi muncul di versi 1.1);
  • slot.name β€” nama slot replikasi yang akan digunakan oleh konektor;
  • publication.name - Nama Publikasi di PostgreSQL, yang digunakan konektor. Jika tidak ada, Debezium akan mencoba membuatnya. Jika pengguna yang membuat koneksi tidak memiliki hak yang cukup untuk tindakan ini, konektor akan berakhir dengan kesalahan;
  • transforms menentukan dengan tepat cara mengubah nama topik target:
    • transforms.AddPrefix.type menunjukkan bahwa kita akan menggunakan ekspresi reguler;
    • transforms.AddPrefix.regex β€” topeng yang mendefinisikan ulang nama topik target;
    • transforms.AddPrefix.replacement - secara langsung apa yang kami definisikan ulang.

Lebih lanjut tentang detak jantung dan transformasi

Secara default, konektor mengirimkan data ke Kafka untuk setiap transaksi yang dilakukan, dan LSN (Log Sequence Number)-nya dicatat dalam topik layanan offset. Namun apa yang terjadi jika konektor dikonfigurasi untuk membaca bukan seluruh database, namun hanya sebagian tabelnya (di mana pembaruan data tidak sering terjadi)?

  • Konektor akan membaca file WAL dan tidak akan mendeteksi transaksi apa pun yang dilakukan pada tabel yang dipantaunya.
  • Oleh karena itu, ia tidak akan memperbarui posisinya saat ini baik di topik maupun di slot replikasi.
  • Hal ini, pada gilirannya, akan mengakibatkan file WAL tertahan di disk dan kemungkinan besar kehabisan ruang disk.

Dan di sinilah pilihan bisa membantu. heartbeat.interval.ms ΠΈ heartbeat.action.query. Menggunakan opsi ini secara berpasangan memungkinkan untuk melakukan permintaan untuk mengubah data dalam tabel terpisah setiap kali pesan detak jantung dikirim. Dengan demikian, LSN tempat konektor saat ini berada (di slot replikasi) terus diperbarui. Hal ini memungkinkan DBMS untuk menghapus file WAL yang tidak lagi diperlukan. Anda dapat mempelajari lebih lanjut tentang cara kerja opsi dokumentasi.

Pilihan lain yang patut mendapat perhatian lebih adalah transforms. Meski lebih pada kenyamanan dan keindahan...

Secara default, Debezium membuat topik menggunakan kebijakan penamaan berikut: serverName.schemaName.tableName. Hal ini mungkin tidak selalu nyaman. Pilihan transforms menggunakan ekspresi reguler, Anda dapat menentukan daftar tabel yang acaranya perlu dirutekan ke topik dengan nama tertentu.

Dalam konfigurasi kami terima kasih transforms hal berikut terjadi: semua kejadian CDC dari database yang dipantau akan menuju ke topik dengan nama tersebut data.cdc.dbname. Jika tidak (tanpa pengaturan ini), Debezium secara default akan membuat topik untuk setiap tabel seperti: pg-dev.public.<table_name>.

Keterbatasan Konektor

Untuk menyimpulkan deskripsi konfigurasi konektor untuk PostgreSQL, ada baiknya membicarakan fitur/keterbatasan pengoperasiannya berikut ini:

  1. Fungsionalitas konektor untuk PostgreSQL bergantung pada konsep decoding logis. Oleh karena itu dia tidak melacak permintaan untuk mengubah struktur database (DDL) - oleh karena itu, data ini tidak akan ada dalam topik.
  2. Karena slot replikasi digunakan, penyambungan konektor dimungkinkan hanya ke instans DBMS terkemuka.
  3. Jika pengguna yang menghubungkan konektor ke database diberikan hak hanya baca, maka sebelum peluncuran pertama Anda perlu membuat slot replikasi secara manual dan mempublikasikannya ke database.

Menerapkan konfigurasi

Jadi, mari muat konfigurasi kita ke dalam konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Kami memeriksa apakah pengunduhan berhasil dan konektor dimulai:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Hebat: sudah siap dan siap digunakan. Sekarang mari kita berpura-pura menjadi konsumen dan terhubung ke Kafka, setelah itu kita akan menambahkan dan mengubah entri di tabel:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Dalam topik kami akan ditampilkan sebagai berikut:

JSON yang sangat panjang dengan perubahan kami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Dalam kedua kasus tersebut, catatan terdiri dari kunci (PK) dari catatan yang diubah, dan inti dari perubahan tersebut: catatan apa sebelumnya dan apa jadinya setelahnya.

  • Dalam kasus INSERT: nilai sebelum (before) sama dengan null, dan setelahnya - baris yang disisipkan.
  • Dalam kasus UPDATE: payload.before keadaan garis sebelumnya ditampilkan, dan masuk payload.after β€” baru dengan esensi perubahan.

2.2 MongoDB

Konektor ini menggunakan mekanisme replikasi MongoDB standar, membaca informasi dari oplog node DBMS utama.

Mirip dengan konektor untuk PgSQL yang sudah dijelaskan, di sini juga, pada permulaan pertama, snapshot data utama diambil, setelah itu konektor beralih ke mode membaca oplog.

Contoh konfigurasi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Seperti yang Anda lihat, tidak ada opsi baru di sini dibandingkan dengan contoh sebelumnya, tetapi hanya jumlah opsi yang bertanggung jawab untuk menyambung ke database dan awalannya telah dikurangi.

Pengaturan transforms kali ini mereka melakukan hal berikut: mereka mengubah nama topik target dari skema <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

toleransi kesalahan

Masalah toleransi kesalahan dan ketersediaan tinggi saat ini lebih akut dari sebelumnya - terutama ketika kita berbicara tentang data dan transaksi, dan pelacakan perubahan data tidak mengesampingkan masalah ini. Mari kita lihat apa yang salah pada prinsipnya dan apa yang akan terjadi pada Debezium dalam setiap kasus.

Ada tiga opsi untuk tidak ikut serta:

  1. Kegagalan Kafka Connect. Jika Connect dikonfigurasi untuk bekerja dalam mode terdistribusi, ini memerlukan beberapa pekerja untuk mengatur group.id yang sama. Kemudian, jika salah satunya gagal, konektor akan dimulai ulang pada pekerja lain dan melanjutkan pembacaan dari posisi terakhir yang dilakukan dalam topik di Kafka.
  2. Hilangnya konektivitas dengan cluster Kafka. Konektor hanya akan berhenti membaca pada posisi yang gagal dikirim ke Kafka, dan secara berkala akan mencoba mengirim ulang hingga upaya tersebut berhasil.
  3. Tidak tersedianya sumber data. Konektor akan mencoba menyambung kembali ke sumber seperti yang dikonfigurasi. Standarnya adalah 16 upaya menggunakan kemunduran eksponensial. Setelah upaya ke-16 yang gagal, tugas akan ditandai sebagai gagal dan Anda perlu memulai ulang secara manual melalui antarmuka Kafka Connect REST.
    • Dalam kasus PostgreSQL datanya tidak akan hilang, karena Menggunakan slot replikasi akan mencegah Anda menghapus file WAL yang tidak dibaca oleh konektor. Dalam hal ini, ada juga sisi negatifnya: jika konektivitas jaringan antara konektor dan DBMS terganggu dalam waktu lama, ada kemungkinan ruang disk akan habis, dan ini dapat menyebabkan kegagalan. seluruh DBMS.
    • Dalam kasus MySQL file binlog dapat diputar oleh DBMS itu sendiri sebelum konektivitas dipulihkan. Hal ini akan menyebabkan konektor masuk ke status gagal, dan untuk memulihkan operasi normal, Anda perlu memulai ulang dalam mode snapshot awal untuk melanjutkan membaca dari binlog.
    • Tentang MongoDB. Dokumentasi menyatakan: perilaku konektor jika file log/oplog telah dihapus dan konektor tidak dapat melanjutkan pembacaan dari posisi terakhirnya adalah sama untuk semua DBMS. Artinya konektornya akan masuk ke state gagal dan akan memerlukan restart dalam mode cuplikan awal.

      Namun, ada pengecualian. Jika konektor terputus untuk waktu yang lama (atau tidak dapat menjangkau instance MongoDB), dan oplog mengalami rotasi selama waktu ini, maka ketika koneksi dipulihkan, konektor akan dengan tenang terus membaca data dari posisi pertama yang tersedia, itulah sebabnya beberapa data di Kafka tidak akan mengenai.

Kesimpulan

Debezium adalah pengalaman pertama saya dengan sistem CDC dan secara keseluruhan sangat positif. Proyek ini unggul karena dukungannya terhadap DBMS utama, kemudahan konfigurasi, dukungan pengelompokan, dan komunitas yang aktif. Bagi yang tertarik untuk berlatih, saya sarankan Anda membaca panduannya Koneksi Kafka ΠΈ Debezium.

Dibandingkan dengan konektor JDBC untuk Kafka Connect, keunggulan utama Debezium adalah perubahan dibaca dari log DBMS, sehingga data diterima dengan penundaan minimal. Konektor JDBC (disediakan oleh Kafka Connect) menanyakan tabel yang dilacak pada interval tetap dan (untuk alasan yang sama) tidak menghasilkan pesan ketika data dihapus (bagaimana Anda bisa menanyakan data yang tidak ada?).

Untuk mengatasi masalah serupa, Anda dapat memperhatikan solusi berikut (selain Debezium):

PS

Baca juga di blog kami:

Sumber: www.habr.com

Tambah komentar