Presentamos Debezium - CDC para Apache Kafka

Presentamos Debezium - CDC para Apache Kafka

En mi trabajo, a menudo me encuentro con nuevas soluciones técnicas / productos de software, cuya información es bastante escasa en Internet de habla rusa. Con este artículo, intentaré llenar uno de esos vacíos con un ejemplo de mi práctica reciente, cuando necesitaba configurar el envío de eventos CDC desde dos DBMS populares (PostgreSQL y MongoDB) a un clúster de Kafka usando Debezium. Espero que este artículo de revisión, que apareció como resultado del trabajo realizado, sea útil para otros.

¿Qué es Debezium y CDC en general?

debecio - Representante de la categoría de software CDC (Capturar cambio de datos), o más precisamente, es un conjunto de conectores para varios DBMS que son compatibles con el marco Apache Kafka Connect.

Lo proyecto de código abierto, con licencia Apache License v2.0 y patrocinado por Red Hat. El desarrollo ha estado en marcha desde 2016 y en este momento brinda soporte oficial para los siguientes DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. También hay conectores para Cassandra y Oracle, pero actualmente se encuentran en estado de "acceso anticipado" y las nuevas versiones no garantizan la compatibilidad con versiones anteriores.

Si comparamos CDC con el enfoque tradicional (cuando la aplicación lee datos del DBMS directamente), sus principales ventajas incluyen la implementación de transmisión de cambios de datos a nivel de fila con baja latencia, alta confiabilidad y disponibilidad. Los dos últimos puntos se logran mediante el uso de un clúster de Kafka como repositorio de eventos de CDC.

Además, las ventajas incluyen el hecho de que se usa un solo modelo para almacenar eventos, por lo que la aplicación final no tiene que preocuparse por los matices de operar diferentes DBMS.

Por último, el uso de un intermediario de mensajes abre un margen para la escalabilidad horizontal de las aplicaciones que rastrean los cambios en los datos. Al mismo tiempo, se minimiza el impacto en la fuente de datos, ya que los datos no se reciben directamente del DBMS, sino del clúster de Kafka.

Sobre la arquitectura Debezium

El uso de Debezium se reduce a este esquema simple:

DBMS (como fuente de datos) → conector en Kafka Connect → Apache Kafka → consumidor

Como ilustración, daré un diagrama del sitio web del proyecto:

Presentamos Debezium - CDC para Apache Kafka

Sin embargo, no me gusta mucho este esquema, porque parece que solo es posible un conector de fregadero.

En realidad, la situación es diferente: llenar su Data Lake (último enlace en el diagrama de arriba) no es la única manera de utilizar Debezium. Sus aplicaciones pueden utilizar los eventos enviados a Apache Kafka para resolver diversas situaciones. Por ejemplo:

  • eliminación de datos irrelevantes del caché;
  • envío de notificaciones;
  • actualizaciones del índice de búsqueda;
  • algún tipo de registros de auditoría;
  • ...

En caso de que tenga una aplicación Java y no haya necesidad/posibilidad de usar un clúster de Kafka, también existe la posibilidad de trabajar a través de conector integrado. La ventaja obvia es que con él puede rechazar infraestructura adicional (en forma de conector y Kafka). Sin embargo, esta solución ha quedado obsoleta desde la versión 1.1 y ya no se recomienda su uso (es posible que se elimine en versiones futuras).

Este artículo discutirá la arquitectura recomendada por los desarrolladores, que brinda tolerancia a fallas y escalabilidad.

Configuración del conector

Para comenzar a rastrear los cambios en el valor más importante, los datos, necesitamos:

  1. fuente de datos, que puede ser MySQL a partir de la versión 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (la lista completa);
  2. Clúster de Apache Kafka
  3. Instancia de Kafka Connect (versiones 1.x, 2.x);
  4. conector Debezium configurado.

Trabaje en los dos primeros puntos, es decir. el proceso de instalación de DBMS y Apache Kafka está más allá del alcance de este artículo. Sin embargo, para aquellos que quieran implementar todo en un sandbox, hay uno listo en el repositorio oficial con ejemplos. docker-compose.yaml.

Nos centraremos en los dos últimos puntos con más detalle.

0. Conexión Kafka

Aquí y más adelante en el artículo, todos los ejemplos de configuración se consideran en el contexto de la imagen de Docker distribuida por los desarrolladores de Debezium. Contiene todos los archivos de complemento necesarios (conectores) y proporciona la configuración de Kafka Connect mediante variables de entorno.

Si tiene la intención de usar Kafka Connect de Confluent, deberá agregar los complementos de los conectores necesarios usted mismo al directorio especificado en plugin.path o establecer a través de una variable de entorno CLASSPATH. La configuración del trabajador y los conectores de Kafka Connect se definen a través de archivos de configuración que se pasan como argumentos al comando de inicio del trabajador. Para más detalles ver documentación.

Todo el proceso de instalación de Debeizum en versión conector se realiza en dos etapas. Consideremos cada uno de ellos:

1. Configuración del marco Kafka Connect

Para transmitir datos a un clúster de Apache Kafka, se establecen parámetros específicos en el marco de Kafka Connect, como:

  • configuración de conexión de clúster,
  • nombres de temas en los que se almacenará la configuración del propio conector,
  • el nombre del grupo en el que se ejecuta el conector (en caso de utilizar el modo distribuido).

La imagen oficial de Docker del proyecto admite la configuración mediante variables de entorno; esto es lo que usaremos. Así que vamos a descargar la imagen:

docker pull debezium/connect

El conjunto mínimo de variables de entorno necesario para ejecutar el conector es el siguiente:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista inicial de servidores del clúster de Kafka para obtener una lista completa de los miembros del clúster;
  • OFFSET_STORAGE_TOPIC=connector-offsets — un tema para almacenar posiciones donde se encuentra actualmente el conector;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un tema para almacenar el estado del conector y sus tareas;
  • CONFIG_STORAGE_TOPIC=connector-config - un tema para almacenar datos de configuración del conector y sus tareas;
  • GROUP_ID=1 — identificador del grupo de trabajadores sobre los que se puede ejecutar la tarea del conector; requerido cuando se usa distribuido (repartido) modo

Iniciamos el contenedor con estas variables:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota sobre Avro

De forma predeterminada, Debezium escribe datos en formato JSON, lo cual es aceptable para entornos limitados y pequeñas cantidades de datos, pero puede ser un problema en bases de datos muy cargadas. Una alternativa al convertidor JSON es serializar mensajes usando Avro a un formato binario, lo que reduce la carga en el subsistema de E/S en Apache Kafka.

Para usar Avro, debe implementar un registro de esquema (para almacenar esquemas). Las variables para el convertidor se verán así:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Los detalles sobre el uso de Avro y la configuración de un registro están fuera del alcance de este artículo; además, para mayor claridad, usaremos JSON.

2. Configuración del propio conector

Ahora puede ir directamente a la configuración del propio conector, que leerá los datos de la fuente.

Veamos el ejemplo de conectores para dos DBMS: PostgreSQL y MongoDB, para los que tengo experiencia y para los que hay diferencias (aunque pequeñas, ¡pero en algunos casos significativas!).

La configuración se describe en notación JSON y se carga en Kafka Connect mediante una solicitud POST.

2.1. PostgreSQL

Ejemplo de configuración de conector para PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

El principio de funcionamiento del conector después de esta configuración es bastante simple:

  • En el primer inicio, se conecta a la base de datos especificada en la configuración y se inicia en el modo instantánea inicial, enviando a Kafka el conjunto inicial de datos recibidos con el condicional SELECT * FROM table_name.
  • Una vez completada la inicialización, el conector ingresa al modo de lectura de cambios de los archivos WAL de PostgreSQL.

Sobre las opciones utilizadas:

  • name — el nombre del conector para el que se utiliza la configuración descrita a continuación; en el futuro, este nombre se utilizará para trabajar con el conector (es decir, ver el estado/reiniciar/actualizar la configuración) a través de la API REST de Kafka Connect;
  • connector.class — la clase de conector DBMS que utilizará el conector configurado;
  • plugin.name es el nombre del complemento para la decodificación lógica de datos de archivos WAL. Disponible para elegir wal2json, decoderbuffs и pgoutput. Los dos primeros requieren la instalación de las extensiones adecuadas en el DBMS, y pgoutput para PostgreSQL versión 10 y superior no requiere manipulaciones adicionales;
  • database.* — opciones para conectarse a la base de datos, donde database.server.name - el nombre de la instancia de PostgreSQL utilizada para formar el nombre del tema en el clúster de Kafka;
  • table.include.list - una lista de tablas en las que queremos realizar un seguimiento de los cambios; dado en el formato schema.table_name; no se puede usar junto con table.exclude.list;
  • heartbeat.interval.ms — intervalo (en milisegundos) con el que el conector envía mensajes de latido a un tema especial;
  • heartbeat.action.query - una solicitud que se ejecutará al enviar cada mensaje de latido (la opción aparece desde la versión 1.1);
  • slot.name — el nombre de la ranura de replicación que utilizará el conector;
  • publication.name - Nombre Publicación en PostgreSQL que utiliza el conector. En caso de que no exista, Debezium intentará crearlo. Si el usuario bajo el cual se realiza la conexión no tiene suficientes derechos para esta acción, el conector saldrá con un error;
  • transforms determina cómo cambiar exactamente el nombre del tema de destino:
    • transforms.AddPrefix.type indica que usaremos expresiones regulares;
    • transforms.AddPrefix.regex — máscara por la que se redefine el nombre del tema de destino;
    • transforms.AddPrefix.replacement - directamente lo que redefinimos.

Más sobre latidos y transformaciones

De forma predeterminada, el conector envía datos a Kafka para cada transacción confirmada y escribe su LSN (Número de secuencia de registro) en el tema del servicio. offset. Pero, ¿qué sucede si el conector está configurado para leer no toda la base de datos, sino solo una parte de sus tablas (en las que los datos se actualizan con poca frecuencia)?

  • El conector leerá archivos WAL y no detectará confirmaciones de transacciones en ellos en las tablas que supervisa.
  • Por lo tanto, no actualizará su posición actual ni en el tema ni en la ranura de replicación.
  • Esto, a su vez, hará que los archivos WAL se "atasquen" en el disco y probablemente se queden sin espacio en el disco.

Y aquí las opciones vienen al rescate. heartbeat.interval.ms и heartbeat.action.query. El uso de estas opciones en pares hace posible ejecutar una solicitud para cambiar datos en una tabla separada cada vez que se envía un mensaje de latido. Por lo tanto, el LSN en el que se encuentra actualmente el conector (en la ranura de replicación) se actualiza constantemente. Esto permite que el DBMS elimine los archivos WAL que ya no se necesitan. Para obtener más información sobre cómo funcionan las opciones, consulte documentación.

Otra opción que merece mayor atención es transforms. Aunque se trata más de comodidad y belleza...

De forma predeterminada, Debezium crea temas utilizando la siguiente política de nomenclatura: serverName.schemaName.tableName. Esto puede no ser siempre conveniente. Opciones transforms usando expresiones regulares, puede definir una lista de tablas cuyos eventos deben enrutarse a un tema con un nombre específico.

En nuestra configuración gracias a transforms sucede lo siguiente: todos los eventos de CDC de la base de datos rastreada irán al tema con el nombre data.cdc.dbname. De lo contrario (sin esta configuración), Debezium crearía por defecto un tema para cada tabla del formulario: pg-dev.public.<table_name>.

Limitaciones del conector

Al final de la descripción de la configuración del conector para PostgreSQL, vale la pena hablar sobre las siguientes características/limitaciones de su trabajo:

  1. La funcionalidad del conector para PostgreSQL se basa en el concepto de decodificación lógica. por lo tanto el no rastrea las solicitudes para cambiar la estructura de la base de datos (DDL) - en consecuencia, estos datos no estarán en los temas.
  2. Dado que se utilizan ranuras de replicación, la conexión del conector es posible sólo a la instancia maestra de DBMS.
  3. Si el usuario bajo el cual el conector se conecta a la base de datos tiene derechos de solo lectura, entonces, antes del primer lanzamiento, deberá crear manualmente una ranura de replicación y publicar en la base de datos.

Aplicar una configuración

Entonces, carguemos nuestra configuración en el conector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Comprobamos que la descarga fue exitosa y se inició el conector:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Genial: está configurado y listo para funcionar. Ahora supongamos que somos un consumidor y nos conectemos a Kafka, después de lo cual agregamos y cambiamos una entrada en la tabla:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

En nuestro tema, esto se mostrará de la siguiente manera:

JSON muy largo con nuestros cambios.

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

En ambos casos, los registros consisten en la clave (PK) del registro que se modificó y la esencia misma de los cambios: qué era el registro antes y en qué se convirtió después.

  • En el caso de INSERT: valor antes de (before) es igual nullseguido de la cadena que se insertó.
  • En el caso de UPDATE: en payload.before se muestra el estado anterior de la fila, y en payload.after - nuevo con la esencia del cambio.

2.2 Mongo DB

Este conector utiliza el mecanismo de replicación estándar de MongoDB y lee información del registro de operaciones del nodo principal de DBMS.

De manera similar al conector ya descrito para PgSQL, aquí también, en el primer inicio, se toma la instantánea de datos primarios, después de lo cual el conector cambia al modo de lectura de registro de operaciones.

Ejemplo de configuración:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Como puede ver, no hay opciones nuevas en comparación con el ejemplo anterior, solo se ha reducido la cantidad de opciones encargadas de conectarse a la base de datos y sus prefijos.

Configuración transforms esta vez hacen lo siguiente: cambiar el nombre del tema de destino del esquema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

Tolerancia a fallos

El problema de la tolerancia a fallas y la alta disponibilidad en nuestro tiempo es más grave que nunca, especialmente cuando hablamos de datos y transacciones, y el seguimiento de cambios de datos no está al margen en este asunto. Veamos qué puede salir mal en principio y qué sucederá con Debezium en cada caso.

Hay tres opciones de exclusión voluntaria:

  1. Error de Kafka Connect. Si Connect está configurado para funcionar en modo distribuido, esto requiere que varios trabajadores configuren el mismo group.id. Luego, si uno de ellos falla, el conector se reiniciará en el otro trabajador y continuará leyendo desde la última posición confirmada en el tema en Kafka.
  2. Pérdida de conectividad con el clúster de Kafka. El conector simplemente dejará de leer en la posición que no pudo enviar a Kafka e intentará reenviarlo periódicamente hasta que el intento tenga éxito.
  3. Fuente de datos no disponible. El conector intentará volver a conectarse a la fuente según la configuración. El valor predeterminado es 16 intentos usando retroceso exponencial. Después del decimosexto intento fallido, la tarea se marcará como fracasado y deberá reiniciarse manualmente a través de la interfaz REST de Kafka Connect.
    • En el caso de PostgreSQL los datos no se perderán, porque el uso de ranuras de replicación evitará la eliminación de archivos WAL no leídos por el conector. En este caso, hay una desventaja: si la conectividad de red entre el conector y el DBMS se interrumpe durante mucho tiempo, existe la posibilidad de que se agote el espacio en disco y esto puede provocar la falla de todo el DBMS.
    • En el caso de MySQL Los archivos binlog pueden ser rotados por el propio DBMS antes de que se restablezca la conectividad. Esto hará que el conector entre en un estado de falla y deberá reiniciarse en el modo de instantánea inicial para continuar leyendo desde binlogs para restaurar la operación normal.
    • Про MongoDB. La documentación dice: el comportamiento del conector en caso de que se hayan eliminado los archivos de registro/oplog y el conector no pueda continuar leyendo desde la posición donde lo dejó es el mismo para todos los DBMS. Se encuentra en el hecho de que el conector entrará en el estado fracasado y requerirá un reinicio en el modo instantánea inicial.

      Sin embargo, hay excepciones. Si el conector estuvo en un estado desconectado durante mucho tiempo (o no pudo llegar a la instancia de MongoDB) y oplog se activó durante este tiempo, cuando se restablezca la conexión, el conector continuará leyendo tranquilamente los datos desde la primera posición disponible. , razón por la cual algunos de los datos en Kafka no golpeará

Conclusión

Debezium es mi primera experiencia con los sistemas CDC y ha sido muy positiva en general. El proyecto sobornó al soporte del DBMS principal, facilidad de configuración, soporte para agrupamiento y una comunidad activa. Para aquellos interesados ​​en la práctica, les recomiendo que lean las guías para Conexión Kafka и debecio.

En comparación con el conector JDBC para Kafka Connect, la principal ventaja de Debezium es que los cambios se leen de los registros de DBMS, lo que permite recibir los datos con un retraso mínimo. El conector JDBC (proporcionado por Kafka Connect) consulta la tabla rastreada en un intervalo fijo y (por la misma razón) no genera mensajes cuando se eliminan datos (¿cómo puede consultar datos que no están allí?).

Para resolver problemas similares, puede prestar atención a las siguientes soluciones (además de Debezium):

PS

Lea también en nuestro blog:

Fuente: habr.com

Añadir un comentario