Giới thiệu Debezium - CDC cho Apache Kafka

Giới thiệu Debezium - CDC cho Apache Kafka

Trong công việc của mình, tôi thường gặp các giải pháp kỹ thuật/sản phẩm phần mềm mới, thông tin về chúng khá khan hiếm trên Internet tiếng Nga. Với bài viết này, tôi sẽ cố gắng lấp đầy một khoảng trống như vậy bằng một ví dụ từ thực tiễn gần đây của tôi, khi tôi cần định cấu hình gửi các sự kiện CDC từ hai DBMS phổ biến (PostgreSQL và MongoDB) đến cụm Kafka bằng Debezium. Tôi hy vọng rằng bài viết đánh giá này, xuất hiện như là kết quả của công việc đã hoàn thành, sẽ hữu ích cho những người khác.

Debezium và CDC nói chung là gì?

Debezi - đại diện của danh mục phần mềm CDC (Ghi lại sự thay đổi dữ liệu), hay chính xác hơn, nó là một tập hợp các trình kết nối cho nhiều DBMS khác nhau tương thích với khung công tác Apache Kafka Connect.

Dự án nguồn mở, được cấp phép theo Giấy phép Apache v2.0 và được Red Hat tài trợ. Quá trình phát triển đã được tiến hành từ năm 2016 và hiện tại nó cung cấp hỗ trợ chính thức cho các DBMS sau: MySQL, PostgreSQL, MongoDB, SQL Server. Ngoài ra còn có các trình kết nối dành cho Cassandra và Oracle, nhưng hiện tại chúng đang ở trạng thái "truy cập sớm" và các bản phát hành mới không đảm bảo khả năng tương thích ngược.

Nếu chúng ta so sánh CDC với phương pháp truyền thống (khi ứng dụng đọc dữ liệu trực tiếp từ DBMS), ưu điểm chính của nó bao gồm việc triển khai truyền phát thay đổi dữ liệu ở cấp hàng với độ trễ thấp, độ tin cậy và tính sẵn sàng cao. Hai điểm cuối cùng đạt được bằng cách sử dụng cụm Kafka làm kho lưu trữ cho các sự kiện CDC.

Một ưu điểm khác là thực tế là một mô hình duy nhất được sử dụng để lưu trữ các sự kiện, do đó ứng dụng cuối không phải lo lắng về các sắc thái vận hành các DBMS khác nhau.

Cuối cùng, việc sử dụng trình trung chuyển tin nhắn cho phép các ứng dụng giám sát thay đổi dữ liệu có thể mở rộng quy mô theo chiều ngang. Đồng thời, tác động đến nguồn dữ liệu được giảm thiểu do dữ liệu được lấy không trực tiếp từ DBMS mà từ cụm Kafka.

Về kiến ​​trúc Debezium

Sử dụng Debezium dựa trên sơ đồ đơn giản này:

DBMS (dưới dạng nguồn dữ liệu) → trình kết nối trong Kafka Connect → Apache Kafka → người tiêu dùng

Để minh họa, đây là sơ đồ từ trang web của dự án:

Giới thiệu Debezium - CDC cho Apache Kafka

Tuy nhiên, tôi không thực sự thích sơ đồ này, vì có vẻ như chỉ có thể sử dụng đầu nối bồn rửa.

Trên thực tế, tình huống lại khác: lấp đầy Hồ dữ liệu của bạn (liên kết cuối cùng trong sơ đồ trên) Đây không phải là cách duy nhất để sử dụng Debezium. Các sự kiện được gửi tới Apache Kafka có thể được ứng dụng của bạn sử dụng để xử lý nhiều tình huống khác nhau. Ví dụ:

  • xóa dữ liệu không liên quan khỏi bộ đệm;
  • gửi thông báo;
  • cập nhật chỉ mục tìm kiếm;
  • một số loại nhật ký kiểm tra;
  • ...

Trong trường hợp bạn có một ứng dụng Java và không có nhu cầu/khả năng sử dụng cụm Kafka, thì cũng có khả năng xử lý thông qua kết nối nhúng. Ưu điểm rõ ràng là nó loại bỏ nhu cầu về cơ sở hạ tầng bổ sung (dưới dạng trình kết nối và Kafka). Tuy nhiên, giải pháp này đã không được dùng nữa kể từ phiên bản 1.1 và không còn được khuyến nghị sử dụng nữa (hỗ trợ cho giải pháp này có thể bị xóa trong các bản phát hành sau này).

Bài viết này sẽ thảo luận về kiến ​​trúc được các nhà phát triển đề xuất, kiến ​​trúc này cung cấp khả năng chịu lỗi và khả năng mở rộng.

Cấu hình trình kết nối

Để bắt đầu theo dõi những thay đổi về giá trị quan trọng nhất - dữ liệu - chúng tôi cần:

  1. nguồn dữ liệu, có thể là MySQL bắt đầu từ phiên bản 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (danh sách đầy đủ);
  2. Cụm Apache Kafka;
  3. Phiên bản Kafka Connect (phiên bản 1.x, 2.x);
  4. Trình kết nối Debezium được cấu hình.

Làm việc trên hai điểm đầu tiên, tức là. Quá trình cài đặt DBMS và Apache Kafka nằm ngoài phạm vi của bài viết. Tuy nhiên, đối với những người muốn triển khai mọi thứ trong hộp cát, kho lưu trữ chính thức với các ví dụ có sẵn docker-compose.yaml.

Chúng tôi sẽ đi sâu vào chi tiết hơn về hai điểm cuối cùng.

0. Kết nối Kafka

Ở đây và xa hơn nữa trong bài viết, tất cả các ví dụ về cấu hình sẽ được thảo luận trong bối cảnh hình ảnh Docker do các nhà phát triển Debezium phân phối. Nó chứa tất cả các tệp plugin (trình kết nối) cần thiết và cung cấp cấu hình của Kafka Connect bằng các biến môi trường.

Nếu bạn định sử dụng Kafka Connect từ Confluent, bạn sẽ cần thêm độc lập các plugin của các trình kết nối cần thiết vào thư mục được chỉ định trong plugin.path hoặc đặt thông qua biến môi trường CLASSPATH. Các cài đặt cho trình kết nối và trình kết nối Kafka Connect được xác định thông qua các tệp cấu hình được chuyển dưới dạng đối số cho lệnh khởi chạy trình chạy. Để biết thêm chi tiết, xem tài liệu.

Toàn bộ quá trình thiết lập Debeizum trong phiên bản trình kết nối được thực hiện theo hai giai đoạn. Chúng ta hãy nhìn vào từng người trong số họ:

1. Thiết lập khung Kafka Connect

Để truyền dữ liệu đến cụm Apache Kafka, các tham số cụ thể được đặt trong khung Kafka Connect, chẳng hạn như:

  • các tham số để kết nối với cụm,
  • tên của các chủ đề trong đó cấu hình của trình kết nối sẽ được lưu trữ trực tiếp,
  • tên của nhóm mà trình kết nối đang chạy (nếu sử dụng chế độ phân tán).

Hình ảnh Docker chính thức của dự án hỗ trợ cấu hình bằng cách sử dụng các biến môi trường - đây là những gì chúng ta sẽ sử dụng. Vì vậy, hãy tải xuống hình ảnh:

docker pull debezium/connect

Tập hợp các biến môi trường tối thiểu cần thiết để chạy trình kết nối như sau:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - danh sách ban đầu của các máy chủ cụm Kafka để có được danh sách đầy đủ các thành viên của cụm;
  • OFFSET_STORAGE_TOPIC=connector-offsets - chủ đề để lưu trữ các vị trí hiện tại của đầu nối;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — chủ đề để lưu trữ trạng thái của trình kết nối và các nhiệm vụ của nó;
  • CONFIG_STORAGE_TOPIC=connector-config - chủ đề để lưu trữ dữ liệu cấu hình đầu nối và các nhiệm vụ của nó;
  • GROUP_ID=1 - mã định danh của nhóm công nhân mà nhiệm vụ kết nối có thể được thực thi trên đó; cần thiết khi sử dụng phân phối (phân phối) chế độ.

Chúng tôi khởi chạy vùng chứa với các biến sau:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Lưu ý về Avro

Theo mặc định, Debezium ghi dữ liệu ở định dạng JSON, có thể chấp nhận được đối với hộp cát và lượng nhỏ dữ liệu, nhưng có thể trở thành vấn đề trong cơ sở dữ liệu có tải cao. Một cách thay thế cho trình chuyển đổi JSON là tuần tự hóa các tin nhắn bằng cách sử dụng Avro sang định dạng nhị phân, giúp giảm tải cho hệ thống con I/O trong Apache Kafka.

Để sử dụng Avro bạn cần triển khai một phần mềm riêng biệt đăng ký lược đồ (để lưu trữ sơ đồ). Các biến cho bộ chuyển đổi sẽ trông như thế này:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Thông tin chi tiết về cách sử dụng Avro và thiết lập sổ đăng ký cho nó nằm ngoài phạm vi của bài viết này - để rõ ràng hơn, chúng tôi sẽ sử dụng JSON.

2. Tự cấu hình trình kết nối

Bây giờ bạn có thể truy cập trực tiếp vào cấu hình của chính trình kết nối, trình kết nối này sẽ đọc dữ liệu từ nguồn.

Hãy xem ví dụ về trình kết nối cho hai DBMS: PostgreSQL và MongoDB, trong đó tôi có kinh nghiệm và trong đó có những khác biệt (mặc dù nhỏ nhưng trong một số trường hợp là đáng kể!).

Cấu hình được mô tả bằng ký hiệu JSON và được tải lên Kafka Connect bằng yêu cầu POST.

2.1. PostgreSQL

Cấu hình trình kết nối ví dụ cho PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Nguyên lý hoạt động của đầu nối sau khi thiết lập này khá đơn giản:

  • Khi khởi chạy lần đầu tiên, nó sẽ kết nối với cơ sở dữ liệu được chỉ định trong cấu hình và khởi động ở chế độ ảnh chụp nhanh ban đầu, gửi tới Kafka bộ dữ liệu ban đầu thu được bằng cách sử dụng câu lệnh có điều kiện SELECT * FROM table_name.
  • Sau khi quá trình khởi tạo hoàn tất, trình kết nối sẽ chuyển sang chế độ đọc các thay đổi từ tệp WAL PostgreSQL.

Về các tùy chọn được sử dụng:

  • name - tên của đầu nối sử dụng cấu hình được mô tả dưới đây; trong tương lai, tên này được sử dụng để hoạt động với trình kết nối (tức là xem trạng thái/khởi động lại/cập nhật cấu hình) thông qua API Kafka Connect REST;
  • connector.class - Lớp trình kết nối DBMS sẽ được sử dụng bởi trình kết nối được cấu hình;
  • plugin.name — tên của plugin để giải mã logic dữ liệu từ các tệp WAL. Có sẵn để lựa chọn wal2json, decoderbuffs и pgoutput. Hai cái đầu tiên yêu cầu cài đặt các phần mở rộng thích hợp trong DBMS và pgoutput đối với PostgreSQL phiên bản 10 trở lên không yêu cầu thao tác bổ sung;
  • database.* — các tùy chọn để kết nối với cơ sở dữ liệu, trong đó database.server.name - Tên phiên bản PostgreSQL được sử dụng để tạo thành tên chủ đề trong cụm Kafka;
  • table.include.list — danh sách các bảng mà chúng tôi muốn theo dõi các thay đổi; được chỉ định ở định dạng schema.table_name; không thể được sử dụng cùng với table.exclude.list;
  • heartbeat.interval.ms — khoảng thời gian (tính bằng mili giây) mà trình kết nối gửi thông báo nhịp tim đến một chủ đề đặc biệt;
  • heartbeat.action.query — một yêu cầu sẽ được thực thi khi gửi từng tin nhắn nhịp tim (tùy chọn xuất hiện trong phiên bản 1.1);
  • slot.name - tên của khe sao chép sẽ được trình kết nối sử dụng;
  • publication.name - Tên ấn phẩm trong PostgreSQL mà trình kết nối sử dụng. Nếu nó không tồn tại, Debezium sẽ cố gắng tạo ra nó. Nếu người dùng thực hiện kết nối không có đủ quyền đối với hành động này, trình kết nối sẽ kết thúc với lỗi;
  • transforms xác định chính xác cách thay đổi tên của chủ đề mục tiêu:
    • transforms.AddPrefix.type chỉ ra rằng chúng tôi sẽ sử dụng biểu thức chính quy;
    • transforms.AddPrefix.regex - một mặt nạ xác định lại tên của chủ đề mục tiêu;
    • transforms.AddPrefix.replacement - trực tiếp những gì chúng tôi đang xác định lại.

Tìm hiểu thêm về nhịp tim và sự biến đổi

Theo mặc định, trình kết nối gửi dữ liệu tới Kafka cho mỗi giao dịch đã cam kết và LSN (Số thứ tự nhật ký) của nó được ghi lại trong chủ đề dịch vụ offset. Nhưng điều gì sẽ xảy ra nếu trình kết nối được định cấu hình để không đọc toàn bộ cơ sở dữ liệu mà chỉ đọc một phần bảng của nó (trong đó việc cập nhật dữ liệu không diễn ra thường xuyên)?

  • Trình kết nối sẽ đọc các tệp WAL và sẽ không phát hiện bất kỳ cam kết giao dịch nào đối với các bảng mà nó đang theo dõi.
  • Do đó, nó sẽ không cập nhật vị trí hiện tại của nó trong chủ đề hoặc trong vùng sao chép.
  • Ngược lại, điều này sẽ dẫn đến việc các tệp WAL được giữ trên đĩa và có khả năng hết dung lượng đĩa.

Và đây là lúc các lựa chọn xuất hiện để giải cứu. heartbeat.interval.ms и heartbeat.action.query. Việc sử dụng các tùy chọn này theo cặp giúp có thể thực hiện yêu cầu thay đổi dữ liệu trong một bảng riêng biệt mỗi khi gửi tin nhắn nhịp tim. Do đó, LSN mà trình kết nối hiện đang được đặt (trong khe sao chép) được cập nhật liên tục. Điều này cho phép DBMS xóa các tệp WAL không còn cần thiết. Bạn có thể tìm hiểu thêm về cách các tùy chọn hoạt động trong tài liệu.

Một lựa chọn khác đáng được quan tâm hơn là transforms. Mặc dù thiên về sự tiện lợi và đẹp mắt hơn...

Theo mặc định, Debezium tạo chủ đề bằng chính sách đặt tên sau: serverName.schemaName.tableName. Điều này có thể không phải lúc nào cũng thuận tiện. Tùy chọn transforms Bạn có thể sử dụng biểu thức chính quy để xác định danh sách các bảng, sự kiện cần được chuyển đến một chủ đề có tên cụ thể.

Trong cấu hình của chúng tôi cảm ơn transforms điều sau đây xảy ra: tất cả các sự kiện CDC từ cơ sở dữ liệu được giám sát sẽ chuyển đến một chủ đề có tên data.cdc.dbname. Mặt khác (không có các cài đặt này), theo mặc định, Debezium sẽ tạo một chủ đề cho mỗi bảng như: pg-dev.public.<table_name>.

Hạn chế của trình kết nối

Để kết thúc mô tả về cấu hình trình kết nối cho PostgreSQL, cần nói về các tính năng/hạn chế hoạt động của nó sau:

  1. Chức năng của trình kết nối cho PostgreSQL dựa trên khái niệm giải mã logic. Vì thế anh ấy không theo dõi các yêu cầu thay đổi cấu trúc cơ sở dữ liệu (DDL) - theo đó, dữ liệu này sẽ không có trong chủ đề.
  2. Vì các khe sao chép được sử dụng nên có thể kết nối một đầu nối chỉ đến phiên bản DBMS hàng đầu.
  3. Nếu người dùng mà trình kết nối kết nối với cơ sở dữ liệu có quyền chỉ đọc thì trước lần khởi chạy đầu tiên, bạn sẽ cần tạo một khe sao chép theo cách thủ công và xuất bản lên cơ sở dữ liệu.

Áp dụng cấu hình

Vì vậy, hãy tải cấu hình của chúng tôi vào trình kết nối:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Chúng tôi kiểm tra xem quá trình tải xuống có thành công hay không và trình kết nối đã bắt đầu:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Tuyệt vời: nó đã được thiết lập và sẵn sàng hoạt động. Bây giờ, hãy giả vờ là người tiêu dùng và kết nối với Kafka, sau đó chúng ta sẽ thêm và thay đổi một mục trong bảng:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Trong chủ đề của chúng tôi, nó sẽ được hiển thị như sau:

JSON rất dài với những thay đổi của chúng tôi

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Trong cả hai trường hợp, các bản ghi bao gồm khóa (PK) của bản ghi đã được thay đổi và bản chất của những thay đổi đó: bản ghi đó là gì trước và nó trở thành gì sau đó.

  • Trong trường hợp INSERT: giá trị trước (before) bằng nullvà sau - dòng được chèn vào.
  • Trong trường hợp UPDATE: trong payload.before trạng thái trước đó của dòng được hiển thị và trong payload.after - mới với bản chất của những thay đổi.

2.2 MongoDB

Trình kết nối này sử dụng cơ chế sao chép MongoDB tiêu chuẩn, đọc thông tin từ oplog của nút DBMS chính.

Tương tự như trình kết nối đã được mô tả cho PGSQL, ở đây, ở lần khởi động đầu tiên, ảnh chụp nhanh dữ liệu chính sẽ được thực hiện, sau đó trình kết nối chuyển sang chế độ đọc oplog.

Ví dụ về cấu hình:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Như bạn có thể thấy, không có tùy chọn mới nào ở đây so với ví dụ trước mà chỉ giảm số lượng tùy chọn chịu trách nhiệm kết nối với cơ sở dữ liệu và tiền tố của chúng.

Cài đặt transforms lần này họ làm như sau: họ chuyển đổi tên của chủ đề mục tiêu từ lược đồ <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

khả năng chịu lỗi

Vấn đề về khả năng chịu lỗi và tính sẵn sàng cao trong thời đại của chúng ta trở nên gay gắt hơn bao giờ hết - đặc biệt là khi chúng ta đang nói về dữ liệu và giao dịch, và việc theo dõi các thay đổi dữ liệu không nằm ngoài vấn đề này. Chúng ta hãy xem điều gì có thể sai về mặt nguyên tắc và điều gì sẽ xảy ra với Debezium trong từng trường hợp.

Có ba tùy chọn từ chối:

  1. Lỗi kết nối Kafka. Nếu Connect được định cấu hình để hoạt động ở chế độ phân tán, điều này yêu cầu nhiều nhân viên đặt cùng một nhóm.id. Sau đó, nếu một trong số chúng không thành công, trình kết nối sẽ được khởi động lại trên một máy chạy khác và tiếp tục đọc từ vị trí đã cam kết cuối cùng trong chủ đề trong Kafka.
  2. Mất kết nối với cụm Kafka. Trình kết nối sẽ chỉ dừng đọc ở vị trí không gửi được tới Kafka và sẽ cố gắng gửi lại định kỳ cho đến khi thử thành công.
  3. Nguồn dữ liệu không có sẵn. Trình kết nối sẽ cố gắng kết nối lại với nguồn như đã định cấu hình. Mặc định là 16 lần thử sử dụng hậu quả theo cấp số nhân. Sau lần thử thứ 16 không thành công, nhiệm vụ sẽ được đánh dấu là không và bạn sẽ cần phải khởi động lại thủ công thông qua giao diện Kafka Connect REST.
    • Trong trường hợp PostgreSQL dữ liệu sẽ không bị mất, bởi vì Việc sử dụng các khe sao chép sẽ ngăn bạn xóa các tệp WAL mà trình kết nối không đọc được. Trong trường hợp này, đồng xu cũng có một nhược điểm: nếu kết nối mạng giữa trình kết nối và DBMS bị gián đoạn trong thời gian dài, có khả năng dung lượng ổ đĩa sẽ hết và điều này có thể dẫn đến lỗi của toàn bộ DBMS.
    • Trong trường hợp MySQL Các tệp binlog có thể được xoay bởi chính DBMS trước khi kết nối được khôi phục. Điều này sẽ khiến trình kết nối chuyển sang trạng thái lỗi và để khôi phục hoạt động bình thường, bạn sẽ cần khởi động lại ở chế độ chụp nhanh ban đầu để tiếp tục đọc từ binlog.
    • trên MongoDB. Tài liệu nêu rõ: hành vi của trình kết nối trong trường hợp tệp nhật ký/oplog đã bị xóa và trình kết nối không thể tiếp tục đọc từ vị trí mà nó đã dừng lại là giống nhau đối với tất cả các DBMS. Có nghĩa là đầu nối sẽ chuyển sang trạng thái không và sẽ yêu cầu khởi động lại ở chế độ ảnh chụp nhanh ban đầu.

      Tuy nhiên, vẫn có những ngoại lệ. Nếu trình kết nối bị ngắt kết nối trong một thời gian dài (hoặc không thể truy cập phiên bản MongoDB) và oplog đã quay trong thời gian này, thì khi kết nối được khôi phục, trình kết nối sẽ bình tĩnh tiếp tục đọc dữ liệu từ vị trí khả dụng đầu tiên, đó là lý do tại sao một số dữ liệu trong Kafka không sẽ đánh.

Kết luận

Debezium là trải nghiệm đầu tiên của tôi với hệ thống CDC và nhìn chung rất tích cực. Dự án đã giành chiến thắng nhờ sự hỗ trợ cho các DBMS chính, tính dễ cấu hình, hỗ trợ phân cụm và cộng đồng tích cực. Đối với những người quan tâm đến việc thực hành, tôi khuyên bạn nên đọc hướng dẫn về Kết nối Kafka и Debezi.

So với trình kết nối JDBC cho Kafka Connect, ưu điểm chính của Debezium là các thay đổi được đọc từ nhật ký DBMS, cho phép nhận dữ liệu với độ trễ tối thiểu. Trình kết nối JDBC (từ Kafka Connect) truy vấn bảng được theo dõi theo một khoảng thời gian cố định và (vì lý do tương tự) không tạo ra thông báo khi dữ liệu bị xóa (làm cách nào bạn có thể truy vấn dữ liệu không tồn tại?).

Để giải quyết các vấn đề tương tự, bạn có thể chú ý đến các giải pháp sau (ngoài Debezium):

PS

Đọc thêm trên blog của chúng tôi:

Nguồn: www.habr.com

Thêm một lời nhận xét