Presentación de Debezium - CDC para Apache Kafka

Presentación de Debezium - CDC para Apache Kafka

No meu traballo, adoito atoparme con novas solucións técnicas/produtos de software, cuxa información é bastante escasa na Internet de fala rusa. Con este artigo, tentarei cubrir un deses baleiros cun exemplo da miña práctica recente, cando necesitaba configurar o envío de eventos CDC desde dous DBMS populares (PostgreSQL e MongoDB) a un clúster de Kafka usando Debezium. Espero que este artigo de revisión, que apareceu como resultado do traballo realizado, sexa útil para outros.

Que é Debezium e CDC en xeral?

Debezium - Representante da categoría de software CDC (Captura o cambio de datos), ou máis precisamente, é un conxunto de conectores para varios DBMS compatibles co framework Apache Kafka Connect.

El proxecto de código aberto, licenciado baixo a licenza Apache v2.0 e patrocinado por Red Hat. O desenvolvemento está en marcha dende 2016 e nestes momentos ofrece soporte oficial para os seguintes DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Tamén hai conectores para Cassandra e Oracle, pero actualmente están en estado de "acceso anticipado" e as novas versións non garanten a compatibilidade con versións anteriores.

Se comparamos CDC co enfoque tradicional (cando a aplicación le directamente datos do DBMS), entón as súas principais vantaxes inclúen a implementación de transmisión de cambios de datos a nivel de fila con baixa latencia, alta fiabilidade e dispoñibilidade. Os dous últimos puntos conséguense usando un clúster de Kafka como repositorio para eventos de CDC.

Ademais, as vantaxes inclúen o feito de que se utiliza un único modelo para almacenar eventos, polo que a aplicación final non ten que preocuparse polos matices de operar diferentes DBMS.

Finalmente, usar un corredor de mensaxes abre un campo para a escala horizontal das aplicacións que rastrexan os cambios nos datos. Ao mesmo tempo, o impacto na fonte de datos é minimizado, xa que os datos non se reciben directamente do DBMS, senón do clúster Kafka.

Sobre a arquitectura Debezium

Usar Debezium redúcese a este esquema sinxelo:

DBMS (como fonte de datos) → conector en Kafka Connect → Apache Kafka → consumidor

Como ilustración, darei un diagrama do sitio web do proxecto:

Presentación de Debezium - CDC para Apache Kafka

Non obstante, non me gusta moito este esquema, porque parece que só é posible un conector de lavabo.

En realidade, a situación é diferente: encher o teu Data Lake (última ligazón no diagrama anterior) non é a única forma de usar Debezium. As túas aplicacións poden usar os eventos enviados a Apache Kafka para resolver varias situacións. Por exemplo:

  • eliminación de datos irrelevantes da caché;
  • envío de notificacións;
  • actualizacións do índice de busca;
  • algún tipo de rexistros de auditoría;
  • ...

No caso de ter unha aplicación Java e non hai necesidade/posibilidade de utilizar un clúster de Kafka, tamén existe a posibilidade de traballar con conector incorporado. A vantaxe obvia é que con el pode rexeitar infraestrutura adicional (en forma de conector e Kafka). Non obstante, esta solución quedou en desuso desde a versión 1.1 e xa non se recomenda para o seu uso (pode ser eliminada en futuras versións).

Este artigo analizará a arquitectura recomendada polos desenvolvedores, que proporciona tolerancia a fallos e escalabilidade.

Configuración do conector

Para comezar a rastrexar os cambios no valor máis importante, os datos, necesitamos:

  1. fonte de datos, que pode ser MySQL a partir da versión 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (completa);
  2. Clúster Apache Kafka
  3. instancia de Kafka Connect (versións 1.x, 2.x);
  4. Conector Debezium configurado.

Traballar os dous primeiros puntos, é dicir. o proceso de instalación dun DBMS e Apache Kafka están fóra do alcance do artigo. Non obstante, para aqueles que queiran implementar todo nun sandbox, hai un preparado no repositorio oficial con exemplos docker-compose.yaml.

Centrarémonos nos dous últimos puntos con máis detalle.

0. Kafka Connect

Aquí e máis tarde no artigo, todos os exemplos de configuración considéranse no contexto da imaxe de Docker distribuída polos desenvolvedores de Debezium. Contén todos os ficheiros de complementos necesarios (conectores) e proporciona a configuración de Kafka Connect mediante variables de ambiente.

Se queres usar Kafka Connect de Confluent, terás que engadir ti mesmo os complementos dos conectores necesarios ao directorio especificado en plugin.path ou configurado mediante unha variable de ambiente CLASSPATH. A configuración para o traballador de Kafka Connect e os conectores defínense mediante ficheiros de configuración que se pasan como argumentos ao comando de inicio do traballador. Para detalles ver documentación.

Todo o proceso de configuración de Debeizum na versión do conector realízase en dúas etapas. Consideremos cada un deles:

1. Configurar o marco Kafka Connect

Para transmitir datos a un clúster de Apache Kafka, establécense parámetros específicos no marco de Kafka Connect, como:

  • configuración de conexión de clúster,
  • nomes dos temas nos que se almacenará a configuración do propio conector,
  • o nome do grupo no que se está a executar o conector (en caso de utilizar o modo distribuído).

A imaxe oficial de Docker do proxecto admite a configuración mediante variables de ambiente; isto é o que usaremos. Entón imos descargar a imaxe:

docker pull debezium/connect

O conxunto mínimo de variables de ambiente necesarios para executar o conector é o seguinte:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista inicial dos servidores do clúster de Kafka para obter unha lista completa dos membros do clúster;
  • OFFSET_STORAGE_TOPIC=connector-offsets — un tema para almacenar posicións onde se atopa actualmente o conector;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un tema para almacenar o estado do conector e as súas tarefas;
  • CONFIG_STORAGE_TOPIC=connector-config - un tema para almacenar datos de configuración do conector e as súas tarefas;
  • GROUP_ID=1 — identificador do grupo de traballadores no que se pode executar a tarefa do conector; necesario cando se usa distribuído (distribuído) réxime.

Comezamos o contedor con estas variables:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota sobre Avro

De forma predeterminada, Debezium escribe os datos en formato JSON, que é aceptable para espazos de proba e pequenas cantidades de datos, pero pode ser un problema en bases de datos moi cargadas. Unha alternativa ao conversor JSON é serializar mensaxes usando Euro a un formato binario, o que reduce a carga do subsistema de E/S en Apache Kafka.

Para usar Avro, cómpre implementar outro esquema-rexistro (para almacenar esquemas). As variables para o conversor terán o seguinte aspecto:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Os detalles sobre o uso de Avro e a configuración dun rexistro para el están fóra do alcance do artigo; ademais, para claridade, usaremos JSON.

2. Configuración do propio conector

Agora podes ir directamente á configuración do propio conector, que lerá os datos da fonte.

Vexamos o exemplo de conectores para dous DBMS: PostgreSQL e MongoDB, para os que teño experiencia e para os que hai diferenzas (aínda que pequenas, pero nalgúns casos importantes!).

A configuración descríbese en notación JSON e cárgase en Kafka Connect mediante unha solicitude POST.

2.1. PostgreSQL

Exemplo de configuración do conector para PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

O principio de funcionamento do conector despois desta configuración é bastante sinxelo:

  • No primeiro inicio, conéctase á base de datos especificada na configuración e comeza no modo instantánea inicial, enviando a Kafka o conxunto inicial de datos recibidos co condicional SELECT * FROM table_name.
  • Despois de completar a inicialización, o conector entra no modo de lectura de cambios dos ficheiros WAL de PostgreSQL.

Sobre as opcións utilizadas:

  • name — o nome do conector para o que se utiliza a configuración descrita a continuación; no futuro, este nome utilízase para traballar co conector (é dicir, ver o estado / reiniciar / actualizar a configuración) a través da API REST de Kafka Connect;
  • connector.class — a clase do conector DBMS que utilizará o conector configurado;
  • plugin.name é o nome do complemento para a decodificación lóxica de datos dos ficheiros WAL. Dispoñible para escoller wal2json, decoderbuffs и pgoutput. Os dous primeiros requiren a instalación das extensións adecuadas no DBMS, e pgoutput para PostgreSQL versión 10 e superior non require manipulacións adicionais;
  • database.* — opcións para conectarse á base de datos, onde database.server.name - o nome da instancia de PostgreSQL utilizada para formar o nome do tema no clúster de Kafka;
  • table.include.list - unha lista de táboas nas que queremos facer un seguimento dos cambios; dado no formato schema.table_name; non se pode usar xunto con table.exclude.list;
  • heartbeat.interval.ms — intervalo (en milisegundos) co que o conector envía mensaxes de latido cardíaco a un tema especial;
  • heartbeat.action.query - unha solicitude que se executará ao enviar cada mensaxe de latido (a opción apareceu desde a versión 1.1);
  • slot.name — o nome do slot de replicación que utilizará o conector;
  • publication.name - Nome Publicación en PostgreSQL que usa o conector. No caso de que non exista, Debezium tentará crealo. Se o usuario baixo o cal se realiza a conexión non ten dereitos suficientes para esta acción, o conector sairá cun erro;
  • transforms determina como cambiar exactamente o nome do tema de destino:
    • transforms.AddPrefix.type indica que utilizaremos expresións regulares;
    • transforms.AddPrefix.regex — máscara coa que se redefine o nome do tema de destino;
    • transforms.AddPrefix.replacement - directamente o que redefinimos.

Máis información sobre o latido do corazón e as transformacións

Por defecto, o conector envía datos a Kafka para cada transacción comprometida e escribe o seu LSN (número de secuencia de rexistro) no tema do servizo. offset. Pero que pasa se o conector está configurado para ler non toda a base de datos, senón só parte das súas táboas (nas que os datos se actualizan con pouca frecuencia)?

  • O conector lerá ficheiros WAL e non detectará transaccións confirmadas neles nas táboas que supervisa.
  • Polo tanto, non actualizará a súa posición actual nin no tema nin no espazo de replicación.
  • Isto, á súa vez, fará que os ficheiros WAL estean "pegados" no disco e probablemente se quede sen espazo no disco.

E aquí as opcións veñen ao rescate. heartbeat.interval.ms и heartbeat.action.query. Usar estas opcións en parellas fai posible executar unha solicitude de cambio de datos nunha táboa separada cada vez que se envía unha mensaxe de latido. Así, o LSN no que se atopa actualmente o conector (no slot de replicación) actualízase constantemente. Isto permite que o DBMS elimine os ficheiros WAL que xa non son necesarios. Para obter máis información sobre como funcionan as opcións, consulte documentación.

Outra opción que merece máis atención é transforms. Aínda que se trata máis de comodidade e beleza...

Por defecto, Debezium crea temas usando a seguinte política de nomenclatura: serverName.schemaName.tableName. Isto pode non ser sempre conveniente. Opcións transforms usando expresións regulares, pode definir unha lista de táboas cuxos eventos deben ser encamiñados a un tema cun nome específico.

Na nosa configuración grazas a transforms ocorre o seguinte: todos os eventos de CDC da base de datos rastrexada irán ao tema co nome data.cdc.dbname. En caso contrario (sen esta configuración), Debezium crearía por defecto un tema para cada táboa do formulario: pg-dev.public.<table_name>.

Limitacións dos conectores

Ao final da descrición da configuración do conector para PostgreSQL, paga a pena falar das seguintes características/limitacións do seu traballo:

  1. A funcionalidade do conector para PostgreSQL depende do concepto de decodificación lóxica. Polo tanto el non rastrexa as solicitudes para cambiar a estrutura da base de datos (DDL) - en consecuencia, estes datos non estarán nos temas.
  2. Dado que se utilizan ranuras de replicación, é posible a conexión do conector á instancia de DBMS mestra.
  3. Se o usuario baixo o cal o conector se conecta á base de datos ten dereitos de só lectura, entón antes do primeiro lanzamento, terá que crear manualmente un slot de replicación e publicalo na base de datos.

Aplicando unha configuración

Entón, carguemos a nosa configuración no conector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Comprobamos que a descarga foi exitosa e que o conector comezou:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Genial: está configurado e listo para funcionar. Agora imos finxir ser un consumidor e conectarnos a Kafka, despois engadimos e cambiamos unha entrada na táboa:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

No noso tema, isto amosarase do seguinte xeito:

JSON moi longo cos nosos cambios

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

En ambos os casos, os rexistros consisten na clave (PK) do rexistro que foi modificado e a esencia mesma dos cambios: o que era o rexistro antes e o que se converteu despois.

  • No caso de INSERT: valor antes (before) é igual nullseguido da cadea que se inseriu.
  • No caso de UPDATE: en payload.before móstrase o estado anterior da fila e en payload.after - novo coa esencia do cambio.

2.2 MongoDB

Este conector usa o mecanismo de replicación estándar de MongoDB, lendo información do oplog do nodo principal do DBMS.

Do mesmo xeito que o conector xa descrito para PgSQL, aquí tamén, ao primeiro inicio, tómase a instantánea de datos primaria, despois de que o conector cambia ao modo de lectura de oplog.

Exemplo de configuración:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Como podes ver, non hai novas opcións en comparación co exemplo anterior, senón que só se reduciu o número de opcións responsables da conexión á base de datos e os seus prefixos.

Configuración transforms nesta ocasión fan o seguinte: desvía o nome do tema obxectivo do esquema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerancia a fallos

O problema da tolerancia a fallos e da alta dispoñibilidade no noso tempo é máis agudo que nunca, especialmente cando falamos de datos e transaccións, e o seguimento do cambio de datos non está á marxe neste asunto. Vexamos o que pode saír mal en principio e que pasará con Debezium en cada caso.

Hai tres opcións de exclusión:

  1. Fallo de Kafka Connect. Se Connect está configurado para funcionar en modo distribuído, isto require que varios traballadores establezan o mesmo group.id. Entón, se un deles falla, o conector reiniciarase no outro traballador e seguirá lendo desde a última posición comprometida no tema en Kafka.
  2. Perda de conectividade co clúster de Kafka. O conector simplemente deixará de ler na posición que non puido enviar a Kafka e tentará reenviar periodicamente ata que o intento teña éxito.
  3. Fonte de datos non dispoñible. O conector tentará volver conectarse á fonte segundo a configuración. O valor predeterminado é 16 intentos de uso retroceso exponencial. Despois do décimo sexto intento fallido, a tarefa marcarase como fracasado e terá que reiniciarse manualmente a través da interface REST de Kafka Connect.
    • No caso de PostgreSQL os datos non se perderán, porque o uso de ranuras de replicación evitará a eliminación dos ficheiros WAL non lidos polo conector. Neste caso, hai unha desvantaxe: se a conectividade de rede entre o conector e o DBMS se interrompe durante moito tempo, existe a posibilidade de que o espazo no disco se esgote, e isto pode provocar o fallo de todo o DBMS.
    • No caso de MySQL Os ficheiros binlog poden ser rotados polo propio DBMS antes de que se restaure a conectividade. Isto fará que o conector pase ao estado de fallo e terá que reiniciarse no modo de instantánea inicial para continuar lendo os binlogs para restaurar o funcionamento normal.
    • en MongoDB. A documentación di: o comportamento do conector no caso de que os ficheiros de rexistro/oplog foron eliminados e o conector non pode continuar lendo desde a posición onde o deixou é o mesmo para todos os DBMS. Está no feito de que o conector entrará no estado fracasado e requirirá un reinicio no modo instantánea inicial.

      Non obstante, hai excepcións. Se o conector estivo desconectado durante moito tempo (ou non puido chegar á instancia de MongoDB) e oplog xirou durante este tempo, cando se restaure a conexión, o conector seguirá lendo os datos desde a primeira posición dispoñible con calma. , por iso algúns dos datos en Kafka non vai bater.

Conclusión

Debezium é a miña primeira experiencia cos sistemas CDC e foi moi positiva en xeral. O proxecto subornou o soporte do DBMS principal, a facilidade de configuración, o soporte para a agrupación e unha comunidade activa. Para os interesados ​​na práctica, recoméndolles que leas as guías para Kafka Connect и Debezium.

En comparación co conector JDBC para Kafka Connect, a principal vantaxe de Debezium é que se len os cambios dos rexistros do DBMS, o que permite recibir datos cun atraso mínimo. O conector JDBC (proporcionado por Kafka Connect) consulta a táboa rastrexada nun intervalo fixo e (polo mesmo motivo) non xera mensaxes cando se eliminan os datos (como podes consultar datos que non están alí?).

Para resolver problemas similares, podes prestar atención ás seguintes solucións (ademais de Debezium):

PS

Lea tamén no noso blog:

Fonte: www.habr.com

Engadir un comentario