Introducing Debezium - CDC for Apache Kafka

Introducing Debezium - CDC for Apache Kafka

In my work, I often come across new technical solutions / software products, information about which is rather scarce on the Russian-speaking Internet. With this article, I will try to fill one such gap with an example from my recent practice, when I needed to set up sending CDC events from two popular DBMSs (PostgreSQL and MongoDB) to a Kafka cluster using Debezium. I hope that this review article, which appeared as a result of the work done, will be useful to others.

What is Debezium and CDC in general?

Debezium - Representative of the CDC software category (Capture data change), or more precisely, it is a set of connectors for various DBMSs that are compatible with the Apache Kafka Connect framework.

It is a open source project, licensed under the Apache License v2.0 and sponsored by Red Hat. Development has been underway since 2016 and at the moment it provides official support for the following DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. There are also connectors for Cassandra and Oracle, but they are currently in "early access" status, and new releases do not guarantee backward compatibility.

If we compare CDC with the traditional approach (when the application reads data from the DBMS directly), then its main advantages include the implementation of data change streaming at the row level with low latency, high reliability and availability. The last two points are achieved by using a Kafka cluster as a repository for CDC events.

Also, the advantages include the fact that a single model is used to store events, so the final application does not have to worry about the nuances of operating different DBMS.

Finally, using a message broker opens up scope for horizontal scaling of applications that track changes in data. At the same time, the impact on the data source is minimized, since data is received not directly from the DBMS, but from the Kafka cluster.

About the Debezium architecture

Using Debezium comes down to this simple scheme:

DBMS (as data source) → connector in Kafka Connect → Apache Kafka → consumer

As an illustration, I will give a diagram from the project website:

Introducing Debezium - CDC for Apache Kafka

However, I do not really like this scheme, because it seems that only a sink connector is possible.

In reality, the situation is different: filling your Data Lake (last link in the diagram above) is not the only way to use Debezium. Events sent to Apache Kafka can be used by your applications to deal with various situations. For example:

  • removal of irrelevant data from the cache;
  • sending notifications;
  • search index updates;
  • some kind of audit logs;
  • ...

In case you have a Java application and there is no need/possibility to use a Kafka cluster, there is also the possibility to work through embedded connector. The obvious plus is that with it you can refuse additional infrastructure (in the form of a connector and Kafka). However, this solution has been deprecated since version 1.1 and is no longer recommended for use (it may be removed in future releases).

This article will discuss the architecture recommended by developers, which provides fault tolerance and scalability.

Connector configuration

In order to start tracking changes in the most important value - data - we need:

  1. data source, which can be MySQL starting from version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (full list);
  2. Apache Kafka cluster
  3. Kafka Connect instance (versions 1.x, 2.x);
  4. configured Debezium connector.

Work on the first two points, i.e. the process of installing a DBMS and Apache Kafka are beyond the scope of the article. However, for those who want to deploy everything in a sandbox, there is a ready-made one in the official repository with examples docker-compose.yaml.

We will focus on the last two points in more detail.

0. Kafka Connect

Here and later in the article, all configuration examples are considered in the context of the Docker image distributed by the Debezium developers. It contains all the necessary plugin files (connectors) and provides Kafka Connect configuration using environment variables.

If you intend to use Kafka Connect from Confluent, you will need to add the plugins of the necessary connectors yourself to the directory specified in plugin.path or set via an environment variable CLASSPATH. The settings for the Kafka Connect worker and connectors are defined through configuration files that are passed as arguments to the worker start command. For details see documentation.

The whole process of setting up Debeizum in the connector version is carried out in two stages. Let's consider each of them:

1. Setting up the Kafka Connect framework

To stream data to an Apache Kafka cluster, specific parameters are set in the Kafka Connect framework, such as:

  • cluster connection settings,
  • names of topics in which the configuration of the connector itself will be stored,
  • the name of the group in which the connector is running (in case of using distributed mode).

The official Docker image of the project supports configuration using environment variables - this is what we will use. So let's download the image:

docker pull debezium/connect

The minimum set of environment variables required to run the connector is as follows:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - initial list of Kafka cluster servers to get a complete list of cluster members;
  • OFFSET_STORAGE_TOPIC=connector-offsets — a topic for storing positions where the connector is currently located;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - a topic for storing the status of the connector and its tasks;
  • CONFIG_STORAGE_TOPIC=connector-config - a topic for storing connector configuration data and its tasks;
  • GROUP_ID=1 — identifier of the group of workers on which the connector task can be executed; required when using distributed (distributed) mode.

We start the container with these variables:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Note about Avro

By default, Debezium writes data in JSON format, which is acceptable for sandboxes and small amounts of data, but can be a problem in heavily loaded databases. An alternative to the JSON converter is to serialize messages using Avro to a binary format, which reduces the load on the I / O subsystem in Apache Kafka.

To use Avro, you need to deploy a separate schema-registry (for storing schemas). The variables for the converter will look like this:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Details on using Avro and setting up a registry for it are beyond the scope of the article - further, for clarity, we will use JSON.

2. Setting up the connector itself

Now you can go directly to the configuration of the connector itself, which will read data from the source.

Let's look at the example of connectors for two DBMS: PostgreSQL and MongoDB, for which I have experience and for which there are differences (albeit small, but in some cases significant!).

The configuration is described in JSON notation and uploaded to Kafka Connect using a POST request.

2.1. PostgreSQL

Example connector configuration for PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

The principle of operation of the connector after this configuration is quite simple:

  • At the first start, it connects to the database specified in the configuration and starts in the mode initial snapshot, sending to Kafka the initial set of data received with the conditional SELECT * FROM table_name.
  • After the initialization is completed, the connector enters the mode of reading changes from PostgreSQL WAL files.

About the options used:

  • name — the name of the connector for which the configuration described below is used; in the future, this name is used to work with the connector (i.e. view the status / restart / update the configuration) through the Kafka Connect REST API;
  • connector.class — the DBMS connector class that will be used by the configured connector;
  • plugin.name is the name of the plugin for logical decoding of data from WAL files. Available to choose from wal2json, decoderbuffs и pgoutput. The first two require the installation of the appropriate extensions in the DBMS, and pgoutput for PostgreSQL version 10 and higher does not require additional manipulations;
  • database.* — options for connecting to the database, where database.server.name - the name of the PostgreSQL instance used to form the name of the topic in the Kafka cluster;
  • table.include.list - a list of tables in which we want to track changes; given in the format schema.table_name; cannot be used together with table.exclude.list;
  • heartbeat.interval.ms — interval (in milliseconds) with which the connector sends heartbeat messages to a special topic;
  • heartbeat.action.query - a request that will be executed when sending each heartbeat message (the option has appeared since version 1.1);
  • slot.name — the name of the replication slot that will be used by the connector;
  • publication.name - name ARTICLES in PostgreSQL that the connector uses. In case it doesn't exist, Debezium will try to create it. If the user under which the connection is made does not have enough rights for this action, the connector will exit with an error;
  • transforms determines how exactly to change the name of the target topic:
    • transforms.AddPrefix.type indicates that we will use regular expressions;
    • transforms.AddPrefix.regex — mask by which the name of the target topic is redefined;
    • transforms.AddPrefix.replacement - directly what we redefine.

More about heartbeat and transforms

By default, the connector sends data to Kafka for each committed transaction, and writes its LSN (Log Sequence Number) to the service topic offset. But what happens if the connector is configured to read not the entire database, but only part of its tables (in which data is updated infrequently)?

  • The connector will read WAL files and not detect transaction commits in them to the tables it monitors.
  • Therefore, it will not update its current position either in the topic or in the replication slot.
  • This, in turn, will cause the WAL files to be "stuck" on disk and will likely run out of disk space.

And here options come to the rescue. heartbeat.interval.ms и heartbeat.action.query. Using these options in pairs makes it possible to execute a request to change data in a separate table each time a heartbeat message is sent. Thus, the LSN on which the connector is currently located (in the replication slot) is constantly updated. This allows the DBMS to remove WAL files that are no longer needed. For more information on how options work, see documentation.

Another option that deserves closer attention is transforms. Although it is more about convenience and beauty ...

By default, Debezium creates topics using the following naming policy: serverName.schemaName.tableName. This may not always be convenient. Options transforms using regular expressions, you can define a list of tables whose events need to be routed to a topic with a specific name.

In our configuration thanks to transforms the following happens: all CDC events from the tracked database will go to the topic with the name data.cdc.dbname. Otherwise (without these settings), Debezium would by default create a topic for each table of the form: pg-dev.public.<table_name>.

Connector limitations

At the end of the description of the connector configuration for PostgreSQL, it is worth talking about the following features / limitations of its work:

  1. The connector functionality for PostgreSQL relies on the concept of logical decoding. Therefore he does not track requests to change the structure of the database (DDL) - accordingly, this data will not be in the topics.
  2. Since replication slots are used, the connection of the connector is possible only to the master DBMS instance.
  3. If the user under which the connector connects to the database has read-only rights, then before the first launch, you will need to manually create a replication slot and publish to the database.

Applying a configuration

So let's load our configuration into the connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

We check that the download was successful and the connector started:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Great: it's set up and ready to go. Now let's pretend to be a consumer and connect to Kafka, after which we add and change an entry in the table:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

In our topic, this will be displayed as follows:

Very long JSON with our changes

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In both cases, the records consist of the key (PK) of the record that was changed, and the very essence of the changes: what the record was before and what it became after.

  • In case of INSERT: value before (before) is equal nullfollowed by the string that was inserted.
  • In case of UPDATE: In payload.before the previous state of the row is displayed, and in payload.after - new with the essence of change.

2.2 MongoDB

This connector uses the standard MongoDB replication mechanism, reading information from the oplog of the DBMS primary node.

Similarly to the already described connector for PgSQL, here, too, at the first start, the primary data snapshot is taken, after which the connector switches to oplog reading mode.

Configuration example:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

As you can see, there are no new options compared to the previous example, but only the number of options responsible for connecting to the database and their prefixes has been reduced.

Setting transforms this time they do the following: turn the name of the target topic from the scheme <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fault tolerance

The issue of fault tolerance and high availability in our time is more acute than ever - especially when we talk about data and transactions, and data change tracking is not on the sidelines in this matter. Let's look at what can go wrong in principle and what will happen to Debezium in each case.

There are three opt-out options:

  1. Kafka Connect failure. If Connect is configured to work in distributed mode, this requires multiple workers to set the same group.id. Then, if one of them fails, the connector will be restarted on the other worker and continue reading from the last committed position in the topic in Kafka.
  2. Loss of connectivity with Kafka cluster. The connector will simply stop reading at the position it failed to send to Kafka and periodically try to resend it until the attempt succeeds.
  3. Data source unavailable. The connector will attempt to reconnect to the source according to the configuration. The default is 16 attempts using exponential backoff. After the 16th failed attempt, the task will be marked as failed and it will need to be manually restarted via the Kafka Connect REST interface.
    • In case of PostgreSQL data will not be lost, because using replication slots will prevent the deletion of WAL files not read by the connector. In this case, there is a downside: if the network connectivity between the connector and the DBMS is disrupted for a long time, there is a chance that the disk space will run out, and this may lead to the failure of the entire DBMS.
    • In case of MySQL binlog files can be rotated by the DBMS itself before connectivity is restored. This will cause the connector to go into the failed state, and it will need to restart in initial snapshot mode to continue reading from binlogs to restore normal operation.
    • About MongoDB. The documentation says: the behavior of the connector in case the log/oplog files have been deleted and the connector cannot continue reading from the position where it left off is the same for all DBMS. It lies in the fact that the connector will go into the state failed and will require a restart in the mode initial snapshot.

      However, there are exceptions. If the connector was in a disconnected state for a long time (or could not reach the MongoDB instance), and oplog was rotated during this time, then when the connection is restored, the connector will calmly continue to read data from the first available position, which is why some of the data in Kafka not will hit.

Conclusion

Debezium is my first experience with CDC systems and has been very positive overall. The project bribed the support of the main DBMS, ease of configuration, support for clustering and an active community. For those interested in practice, I recommend that you read the guides for Kafka Connect и Debezium.

Compared to the JDBC connector for Kafka Connect, the main advantage of Debezium is that changes are read from the DBMS logs, which allows data to be received with minimal delay. The JDBC Connector (provided by Kafka Connect) queries the tracked table at a fixed interval and (for the same reason) does not generate messages when data is deleted (how can you query for data that is not there?).

To solve similar problems, you can pay attention to the following solutions (in addition to Debezium):

PS

Read also on our blog:

Source: habr.com

Add a comment