Debezium 简介 - Apache Kafka 的 CDC

Debezium 简介 - Apache Kafka 的 CDC

在工作中,我经常遇到新的技术解决方案/软件产品,而这些信息在俄语互联网上相当稀缺。 在本文中,我将尝试用我最近实践中的一个示例来填补这样的空白,当时我需要配置使用 Debezium 将 CDC 事件从两个流行的 DBMS(PostgreSQL 和 MongoDB)发送到 Kafka 集群。 我希望这篇评论文章作为已完成工作的结果而出现,对其他人有用。

Debezium 和 CDC 是什么?

去角质 — CDC 软件类别的代表(捕获数据变化),或者更准确地说,它是一组与 Apache Kafka Connect 框架兼容的各种 DBMS 的连接器。

开源项目, 根据 Apache License v2.0 获得许可并由 Red Hat 赞助。 自 2016 年以来一直在进行开发,目前它为以下 DBMS 提供官方支持:MySQL、PostgreSQL、MongoDB、SQL Server。 还有针对 Cassandra 和 Oracle 的连接器,但目前它们处于“早期访问”状态,新版本不保证向后兼容性。

如果我们将 CDC 与传统方法(当应用程序直接从 DBMS 读取数据时)进行比较,它的主要优点包括在行级别实现数据更改流,具有低延迟、高可靠性和可用性。 最后两点是通过使用 Kafka 集群作为 CDC 事件的存储库来实现的。

另一个优点是使用单一模型来存储事件,因此最终应用程序不必担心操作不同 DBMS 的细微差别。

最后,使用消息代理允许监视数据变化的应用程序水平扩展。 同时,对数据源的影响也降到最低,因为数据不是直接从DBMS获取,而是从Kafka集群获取。

关于 Debezium 架构

使用 Debezium 可以归结为这个简单的方案:

DBMS(作为数据源)→ Kafka Connect 中的连接器→ Apache Kafka → 消费者

作为说明,以下是项目网站上的图表:

Debezium 简介 - Apache Kafka 的 CDC

不过,我不太喜欢这个方案,因为似乎只能使用sink连接器。

实际上,情况有所不同:填充您的数据湖 (上图中的最后一个链接) 这不是使用 Debezium 的唯一方法。 您的应用程序可以使用发送到 Apache Kafka 的事件来处理各种情况。 例如:

  • 从缓存中删除不相关的数据;
  • 发送通知;
  • 搜索索引更新;
  • 某种审计日志;
  • ...

如果您有 Java 应用程序并且不需要/不可能使用 Kafka 集群,也可以通过 嵌入式连接器。 明显的优点是它不需要额外的基础设施(以连接器和 Kafka 的形式)。 但是,此解决方案自 1.1 版本起已被弃用,不再建议使用(未来版本中可能会删除对其的支持)。

本文将讨论开发人员推荐的架构,该架构提供容错性和可扩展性。

连接器配置

为了开始跟踪最重要的价值(数据)的变化,我们需要:

  1. 数据源,可以是MySQL 5.7以上版本、PostgreSQL 9.6+、MongoDB 3.2+(完整列表);
  2. 阿帕奇卡夫卡集群;
  3. Kafka Connect 实例(版本 1.x、2.x);
  4. 配置 Debezium 连接器。

解决前两点,即DBMS 和 Apache Kafka 的安装过程超出了本文的范围。 然而,对于那些想要在沙盒中部署所有内容的人来说,带有示例的官方存储库有一个现成的 docker-compose.yaml.

我们将更详细地讨论最后两点。

0.卡夫卡连接

此处以及本文的后续内容中,所有配置示例都是在 Debezium 开发人员分发的 Docker 映像的上下文中讨论的。 它包含所有必要的插件文件(连接器),并使用环境变量提供 Kafka Connect 的配置。

如果您打算使用 Confluence 中的 Kafka Connect,则需要将必要连接器的插件独立添加到中指定的目录中 plugin.path 或通过环境变量设置 CLASSPATH。 Kafka Connect 工作线程和连接器的设置是通过配置文件确定的,这些配置文件作为参数传递给工作线程启动命令。 有关更多详细信息,请参阅 文件资料.

在连接器版本中设置 Debeizum 的整个过程分两个阶段进行。 让我们分别看看:

1. 设置Kafka Connect框架

为了将数据流式传输到 Apache Kafka 集群,需要在 Kafka Connect 框架中设置特定参数,例如:

  • 连接到集群的参数,
  • 连接器本身的配置将直接存储在主题的名称中,
  • 连接器在其中运行的组的名称(如果使用分布式模式)。

该项目的官方 Docker 镜像支持使用环境变量进行配置 - 这就是我们将使用的。 因此,下载图像:

docker pull debezium/connect

运行连接器所需的最小环境变量集如下:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — Kafka集群服务器的初始列表,以获得集群成员的完整列表;
  • OFFSET_STORAGE_TOPIC=connector-offsets — 用于存储连接器当前所在位置的主题;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — 用于存储连接器状态及其任务的主题;
  • CONFIG_STORAGE_TOPIC=connector-config — 用于存储连接器配置数据及其任务的主题;
  • GROUP_ID=1 — 可以执行连接器任务的工作组的标识符; 使用分布式时必要的 (分散式) 模式。

我们使用这些变量启动容器:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

关于 Avro 的注意事项

默认情况下,Debezium 以 JSON 格式写入数据,这对于沙箱和少量数据来说是可以接受的,但在高负载数据库中可能会成为问题。 JSON 转换器的替代方案是使用序列化消息 阿夫罗 转换为二进制格式,从而减少 Apache Kafka 中 I/O 子系统的负载。

要使用 Avro,您需要部署单独的 模式注册表 (用于存储图表)。 转换器的变量将如下所示:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

有关使用 Avro 以及为其设置注册表的详细信息超出了本文的范围 - 进一步,为了清楚起见,我们将使用 JSON。

2. 配置连接器本身

现在您可以直接进行连接器本身的配置,它将从源读取数据。

让我们看一下两个 DBMS 的连接器示例:PostgreSQL 和 MongoDB,我在其中有经验并且存在差异(尽管很小,但在某些情况下很重要!)。

配置以 JSON 表示法描述,并使用 POST 请求上传到 Kafka Connect。

2.1. 数据库

PostgreSQL 连接器配置示例:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

设置完成后连接器的工作原理非常简单:

  • 第一次启动时,它连接到配置中指定的数据库并以模式启动 初始快照,将使用条件获得的初始数据集发送到 Kafka SELECT * FROM table_name.
  • 初始化完成后,连接器进入从 PostgreSQL WAL 文件读取更改的模式。

关于使用的选项:

  • name — 使用下述配置的连接器的名称; 将来,该名称用于通过 Kafka Connect REST API 与连接器配合使用(即查看状态/重新启动/更新配置);
  • connector.class — 将由配置的连接器使用的 DBMS 连接器类;
  • plugin.name — 用于对 WAL 文件中的数据进行逻辑解码的插件的名称。 可供选择 wal2json, decoderbuffs и pgoutput。 前两个需要在 DBMS 中安装适当的扩展,并且 pgoutput 对于 PostgreSQL 版本 10 及更高版本不需要额外的操作;
  • database.* — 连接数据库的选项,其中 database.server.name — PostgreSQL实例名称,用于构成Kafka集群中的主题名称;
  • table.include.list — 我们想要跟踪更改的表列表; 格式中指定的 schema.table_name; 不能与以下一起使用 table.exclude.list;
  • heartbeat.interval.ms — 连接器向特殊主题发送心跳消息的时间间隔(以毫秒为单位);
  • heartbeat.action.query — 发送每条心跳消息时都会执行的请求(该选项出现在1.1版本中);
  • slot.name — 连接器将使用的复制槽的名称;
  • publication.name - 姓名 发表 在连接器使用的 PostgreSQL 中。 如果它不存在,Debezium 将尝试创建它。 如果建立连接的用户没有足够的权限执行此操作,连接器将终止并出现错误;
  • transforms 确定如何更改目标主题的名称:
    • transforms.AddPrefix.type 表明我们将使用正则表达式;
    • transforms.AddPrefix.regex — 重新定义目标主题名称的掩码;
    • transforms.AddPrefix.replacement - 直接我们正在重新定义的内容。

有关心跳和变换的更多信息

默认情况下,连接器将每个已提交事务的数据发送到 Kafka,其 LSN(日志序列号)记录在服务主题中 offset。 但是,如果连接器配置为不读取整个数据库,而仅读取其部分表(其中数据更新不频繁发生),会发生什么情况?

  • 连接器将读取 WAL 文件,并且不会检测到对其正在监视的表的任何事务提交。
  • 因此,它不会更新其在主题或复制槽中的当前位置。
  • 反过来,这将导致 WAL 文件保留在磁盘上并可能耗尽磁盘空间。

这就是选择可以发挥作用的地方。 heartbeat.interval.ms и heartbeat.action.query。 成对使用这些选项使得每次发送心跳消息时都可以执行更改单独表中数据的请求。 因此,连接器当前所在的LSN(在复制槽中)会不断更新。 这允许 DBMS 删除不再需要的 WAL 文件。 您可以详细了解这些选项如何工作 文件资料.

另一个值得密切关注的选项是 transforms。 虽然更多的是为了方便和美观……

默认情况下,Debezium 使用以下命名策略创建主题: serverName.schemaName.tableName。 这可能并不总是很方便。 选项 transforms 您可以使用正则表达式来定义表列表,需要将这些事件路由到具有特定名称的主题。

在我们的配置中谢谢 transforms 发生以下情况:来自受监控数据库的所有 CDC 事件都将转到具有名称的主题 data.cdc.dbname。 否则(没有这些设置),Debezium 默认情况下会为每个表创建一个主题,例如: pg-dev.public.<table_name>.

连接器限制

为了结束 PostgreSQL 连接器配置的描述,值得讨论其操作的以下功能/限制:

  1. PostgreSQL 连接器的功能依赖于逻辑解码的概念。 因此他 不跟踪更改数据库结构的请求 (DDL) - 因此,该数据不会出现在主题中。
  2. 由于使用了复制槽,因此可以连接连接器 到领先的 DBMS 实例。
  3. 如果连接器连接到数据库的用户具有只读权限,则在首次启动之前,您将需要手动创建复制槽并发布到数据库。

应用配置

因此,让我们将配置加载到连接器中:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

我们检查下载是否成功并且连接器已启动:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

太棒了:它已设置完毕并准备就绪。 现在让我们假装是消费者并连接到 Kafka,之后我们将添加和更改表中的条目:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

在我们的主题中它将显示如下:

非常长的 JSON 以及我们的更改

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

在这两种情况下,记录都由已更改记录的键 (PK) 以及更改的本质组成:记录之前是什么以及之后变成了什么。

  • 在的情况下 INSERT: 之前的值 (before) 等于 null,以及之后 - 插入的行。
  • 在的情况下 UPDATE:在 payload.before 显示线路的先前状态,并且 payload.after ——新与变的本质。

2.2 MongoDB

该连接器使用标准 MongoDB 复制机制,从主 DBMS 节点的 oplog 读取信息。

与已经描述的 PgSQL 连接器类似,这里也是在第一次启动时拍摄主数据快照,之后连接器切换到 oplog 读取模式。

配置示例:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

正如您所看到的,与前面的示例相比,这里没有新的选项,只是减少了负责连接数据库的选项数量及其前缀。

设置 transforms 这次他们执行以下操作:从架构中转换目标主题的名称 <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

容错

我们这个时代的容错和高可用性问题比以往任何时候都更加尖锐——尤其是当我们谈论数据和事务时,跟踪数据变化在这个问题上并不被忽视。 让我们看看原则上可能会出现什么问题,以及在每种情况下 Debezium 会发生什么。

共有三种退出选项:

  1. 卡夫卡连接失败。 如果Connect配置为以分布式模式工作,则需要多个worker设置相同的group.id。 然后,如果其中一个失败,连接器将在另一个工作线程上重新启动,并继续从 Kafka 主题中最后提交的位置读取。
  2. 与 Kafka 集群的连接丢失。 连接器将简单地在发送到 Kafka 失败的位置停止读取,并定期尝试重新发送,直到尝试成功。
  3. 数据源不可用。 连接器将尝试按照配置重新连接到源。 默认为 16 次尝试 指数退避。 第 16 次尝试失败后,该任务将被标记为 失败 您需要通过 Kafka Connect REST 接口手动重新启动它。
    • 在的情况下 PostgreSQL的 数据不会丢失,因为使用复制槽将阻止您删除连接器未读取的 WAL 文件。 在这种情况下,硬币也有一个缺点:如果连接器和 DBMS 之间的网络连接长时间中断,则磁盘空间有可能耗尽,这可能会导致失败整个数据库管理系统。
    • 在的情况下 MySQL的 在连接恢复之前,DBMS 本身可以轮换 binlog 文件。 这将导致连接器进入故障状态,并且要恢复正常操作,您将需要在初始快照模式下重新启动以继续从二进制日志中读取。
    • Про MongoDB的。 文档指出:在日志/操作日志文件已被删除并且连接器无法从其停止位置继续读取的情况下,连接器的行为对于所有 DBMS 都是相同的。 这意味着连接器将进入状态 失败 并且需要在模式下重新启动 初始快照.

      不过,也有例外。 如果连接器长时间断开(或者无法到达MongoDB实例),并且这段时间oplog经历了轮转,那么当连接恢复时,连接器会从容地继续从第一个可用位置读取数据,这就是为什么 Kafka 中的一些数据 没有 会击中。

结论

Debezium 是我第一次使用 CDC 系统,总体来说非常积极。 该项目以其对主要 DBMS 的支持、易于配置、集群支持和活跃的社区而赢得了胜利。 对于那些对实践感兴趣的人,我建议您阅读以下指南 卡夫卡连接 и 去角质.

与 Kafka Connect 的 JDBC 连接器相比,Debezium 的主要优点是从 DBMS 日志中读取更改,从而可以以最小的延迟接收数据。 JDBC Connector(来自 Kafka Connect)以固定的时间间隔查询受监控的表,并且(出于同样的原因)当数据被删除时不会生成消息(如何查询不存在的数据?)。

解决类似问题,可以关注以下解决方案(除了Debezium):

PS

另请阅读我们的博客:

来源: habr.com

添加评论