Apresentando Debezium - CDC para Apache Kafka

Apresentando Debezium - CDC para Apache Kafka

Em meu trabalho, frequentemente encontro novas soluções técnicas / produtos de software, cujas informações são bastante escassas na Internet de língua russa. Com este artigo, tentarei preencher uma dessas lacunas com um exemplo de minha prática recente, quando precisei configurar o envio de eventos CDC de dois DBMSs populares (PostgreSQL e MongoDB) para um cluster Kafka usando Debezium. Espero que este artigo de revisão, que surgiu como resultado do trabalho realizado, seja útil para outras pessoas.

O que é Debezium e CDC em geral?

Debézium - Representante da categoria de software CDC (Capturar alteração de dados), ou mais precisamente, é um conjunto de conectores para vários SGBDs compatíveis com o framework Apache Kafka Connect.

Ele projeto de código aberto, licenciado sob a Apache License v2.0 e patrocinado pela Red Hat. O desenvolvimento está em andamento desde 2016 e no momento fornece suporte oficial para os seguintes SGBD: MySQL, PostgreSQL, MongoDB, SQL Server. Também existem conectores para Cassandra e Oracle, mas eles estão atualmente no status de "acesso antecipado" e novos lançamentos não garantem compatibilidade com versões anteriores.

Se compararmos o CDC com a abordagem tradicional (quando o aplicativo lê os dados do DBMS diretamente), suas principais vantagens incluem a implementação do fluxo de alteração de dados no nível da linha com baixa latência, alta confiabilidade e disponibilidade. Os dois últimos pontos são alcançados usando um cluster Kafka como um repositório para eventos CDC.

Além disso, as vantagens incluem o fato de que um único modelo é usado para armazenar eventos, de modo que o aplicativo final não precisa se preocupar com as nuances de operação de diferentes DBMS.

Por fim, o uso de um agente de mensagens abre espaço para o dimensionamento horizontal de aplicativos que rastreiam alterações nos dados. Ao mesmo tempo, o impacto na fonte de dados é minimizado, pois os dados não são recebidos diretamente do DBMS, mas do cluster Kafka.

Sobre a arquitetura Debezium

O uso do Debezium se resume a este esquema simples:

DBMS (como fonte de dados) → conector no Kafka Connect → Apache Kafka → consumidor

Como ilustração, darei um diagrama do site do projeto:

Apresentando Debezium - CDC para Apache Kafka

No entanto, não gosto muito desse esquema, porque parece que apenas um conector de pia é possível.

Na realidade, a situação é diferente: preencher seu Data Lake (último link no diagrama acima) não é a única maneira de usar o Debezium. Os eventos enviados ao Apache Kafka podem ser usados ​​por seus aplicativos para lidar com várias situações. Por exemplo:

  • remoção de dados irrelevantes do cache;
  • envio de notificações;
  • atualizações do índice de pesquisa;
  • algum tipo de logs de auditoria;
  • ...

Caso tenha uma aplicação Java e não haja necessidade/possibilidade de utilizar um cluster Kafka, existe também a possibilidade de trabalhar através conector embutido. A vantagem óbvia é que com ele você pode recusar infraestrutura adicional (na forma de um conector e Kafka). No entanto, esta solução foi descontinuada desde a versão 1.1 e não é mais recomendada para uso (ela pode ser removida em versões futuras).

Este artigo discutirá a arquitetura recomendada pelos desenvolvedores, que fornece tolerância a falhas e escalabilidade.

Configuração do conector

Para começar a rastrear as mudanças no valor mais importante - dados - precisamos:

  1. fonte de dados, que pode ser MySQL a partir da versão 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lista completa);
  2. Cluster do Apache Kafka
  3. Instância do Kafka Connect (versões 1.x, 2.x);
  4. Conector Debezium configurado.

Trabalhe nos dois primeiros pontos, ou seja, o processo de instalação de um DBMS e Apache Kafka está além do escopo do artigo. Porém, para quem quer implantar tudo em um sandbox, existe um já pronto no repositório oficial com exemplos docker-compose.yaml.

Vamos nos concentrar nos dois últimos pontos com mais detalhes.

0. Conexão Kafka

Aqui e posteriormente neste artigo, todos os exemplos de configuração são considerados no contexto da imagem do Docker distribuída pelos desenvolvedores do Debezium. Ele contém todos os arquivos de plug-in necessários (conectores) e fornece a configuração do Kafka Connect usando variáveis ​​de ambiente.

Se você pretende usar o Kafka Connect do Confluent, precisará adicionar os plug-ins dos conectores necessários ao diretório especificado em plugin.path ou definido por meio de uma variável de ambiente CLASSPATH. As configurações para o trabalho e conectores do Kafka Connect são definidas por meio de arquivos de configuração que são passados ​​como argumentos para o comando de início do trabalho. Para detalhes veja documentação.

Todo o processo de configuração do Debeizum na versão conector é feito em duas etapas. Vamos considerar cada um deles:

1. Configurando a estrutura do Kafka Connect

Para transmitir dados para um cluster Apache Kafka, parâmetros específicos são definidos na estrutura do Kafka Connect, como:

  • configurações de conexão de cluster,
  • nomes de tópicos nos quais a configuração do próprio conector será armazenada,
  • o nome do grupo no qual o conector está sendo executado (no caso de usar o modo distribuído).

A imagem oficial do Docker do projeto oferece suporte à configuração usando variáveis ​​de ambiente - é isso que usaremos. Então vamos baixar a imagem:

docker pull debezium/connect

O conjunto mínimo de variáveis ​​de ambiente necessárias para executar o conector é o seguinte:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista inicial de servidores de cluster Kafka para obter uma lista completa de membros do cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets — um tópico para armazenar posições onde o conector está localizado no momento;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - um tópico para armazenar o status do conector e suas tarefas;
  • CONFIG_STORAGE_TOPIC=connector-config - um tópico para armazenar dados de configuração do conector e suas tarefas;
  • GROUP_ID=1 — identificador do grupo de trabalhadores no qual a tarefa do conector pode ser executada; necessário ao usar distribuído (distribuído) modo

Iniciamos o container com estas variáveis:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota sobre Avro

Por padrão, o Debezium grava dados no formato JSON, o que é aceitável para sandboxes e pequenas quantidades de dados, mas pode ser um problema em bancos de dados muito carregados. Uma alternativa ao conversor JSON é serializar mensagens usando Avro para um formato binário, o que reduz a carga no subsistema de E/S no Apache Kafka.

Para usar o Avro, você precisa implantar um registro de esquema (para armazenar esquemas). As variáveis ​​para o conversor ficarão assim:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Os detalhes sobre o uso do Avro e a configuração de um registro para ele estão além do escopo do artigo - além disso, para maior clareza, usaremos o JSON.

2. Configurando o próprio conector

Agora você pode ir direto para a configuração do próprio conector, que irá ler os dados da fonte.

Vejamos o exemplo de conectores para dois SGBD: PostgreSQL e MongoDB, para os quais tenho experiência e para os quais existem diferenças (embora pequenas, mas em alguns casos significativas!).

A configuração é descrita na notação JSON e carregada no Kafka Connect usando uma solicitação POST.

2.1. PostgreSQLName

Exemplo de configuração de conector para PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

O princípio de funcionamento do conector após esta configuração é bastante simples:

  • Na primeira inicialização, ele se conecta ao banco de dados especificado na configuração e inicia no modo instantâneo inicial, enviando para Kafka o conjunto inicial de dados recebidos com o condicional SELECT * FROM table_name.
  • Após a conclusão da inicialização, o conector entra no modo de leitura das alterações dos arquivos PostgreSQL WAL.

Sobre as opções utilizadas:

  • name — o nome do conector para o qual é utilizada a configuração descrita abaixo; no futuro, esse nome é usado para trabalhar com o conector (ou seja, visualizar o status / reiniciar / atualizar a configuração) por meio da API REST do Kafka Connect;
  • connector.class — a classe do conector DBMS que será utilizada pelo conector configurado;
  • plugin.name é o nome do plug-in para decodificação lógica de dados de arquivos WAL. Disponível para escolher wal2json, decoderbuffs и pgoutput. Os dois primeiros requerem a instalação das extensões apropriadas no SGBD, e pgoutput para PostgreSQL versão 10 e superior não requer manipulações adicionais;
  • database.* — opções para conectar ao banco de dados, onde database.server.name - o nome da instância PostgreSQL usada para formar o nome do tópico no cluster Kafka;
  • table.include.list - uma lista de tabelas nas quais queremos rastrear as alterações; dado no formato schema.table_name; não pode ser usado junto com table.exclude.list;
  • heartbeat.interval.ms — intervalo (em milissegundos) com o qual o conector envia mensagens heartbeat para um tópico especial;
  • heartbeat.action.query - uma requisição que será executada ao enviar cada mensagem de heartbeat (a opção existe desde a versão 1.1);
  • slot.name — o nome do slot de replicação que será usado pelo conector;
  • publication.name - Nome publicações no PostgreSQL que o conector usa. Caso não exista, o Debezium tentará criá-lo. Se o usuário sob o qual a conexão é feita não tiver direitos suficientes para esta ação, o conector sairá com um erro;
  • transforms determina exatamente como alterar o nome do tópico de destino:
    • transforms.AddPrefix.type indica que usaremos expressões regulares;
    • transforms.AddPrefix.regex — máscara pela qual o nome do tópico de destino é redefinido;
    • transforms.AddPrefix.replacement - diretamente o que redefinimos.

Mais sobre pulsação e transformações

Por padrão, o conector envia dados para Kafka para cada transação confirmada e grava seu LSN (Log Sequence Number) no tópico de serviço offset. Mas o que acontece se o conector estiver configurado para ler não todo o banco de dados, mas apenas parte de suas tabelas (nas quais os dados são atualizados com pouca frequência)?

  • O conector lerá arquivos WAL e não detectará confirmações de transação neles para as tabelas que ele monitora.
  • Portanto, ele não atualizará sua posição atual nem no tópico nem no slot de replicação.
  • Isso, por sua vez, fará com que os arquivos WAL fiquem "presos" no disco e provavelmente ficarão sem espaço em disco.

E aqui as opções vêm em socorro. heartbeat.interval.ms и heartbeat.action.query. O uso dessas opções em pares possibilita a execução de uma solicitação de alteração de dados em uma tabela separada sempre que uma mensagem de heartbeat é enviada. Assim, o LSN no qual o conector está localizado atualmente (no slot de replicação) é constantemente atualizado. Isso permite que o DBMS remova arquivos WAL que não são mais necessários. Para obter mais informações sobre como as opções funcionam, consulte documentação.

Outra opção que merece mais atenção é transforms. Embora seja mais sobre conveniência e beleza ...

Por padrão, o Debezium cria tópicos usando a seguinte política de nomenclatura: serverName.schemaName.tableName. Isso pode nem sempre ser conveniente. Opções transforms usando expressões regulares, você pode definir uma lista de tabelas cujos eventos precisam ser roteados para um tópico com um nome específico.

Em nossa configuração graças a transforms acontece o seguinte: todos os eventos CDC do banco de dados rastreado irão para o tópico com o nome data.cdc.dbname. Caso contrário (sem essas configurações), o Debezium criaria por padrão um tópico para cada tabela do formulário: pg-dev.public.<table_name>.

Limitações do conector

Ao final da descrição da configuração do conector para PostgreSQL, vale falar sobre as seguintes características/limitações de seu funcionamento:

  1. A funcionalidade do conector para PostgreSQL depende do conceito de decodificação lógica. Portanto ele não rastreia solicitações para alterar a estrutura do banco de dados (DDL) - portanto, esses dados não estarão nos tópicos.
  2. Como os slots de replicação são usados, a conexão do conector é possível apenas para a instância mestre do DBMS.
  3. Se o usuário sob o qual o conector se conecta ao banco de dados tiver direitos somente leitura, antes da primeira inicialização, você precisará criar manualmente um slot de replicação e publicar no banco de dados.

Aplicando uma configuração

Então vamos carregar nossa configuração no conector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Verificamos se o download foi bem-sucedido e o conector foi iniciado:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Ótimo: está configurado e pronto para funcionar. Agora vamos fingir ser um consumidor e nos conectar ao Kafka, após o que adicionamos e alteramos uma entrada na tabela:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Em nosso tópico, isso será exibido da seguinte forma:

JSON muito longo com nossas alterações

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Em ambos os casos, os registros consistem na chave (PK) do registro que foi alterado e na própria essência das alterações: o que o registro era antes e o que se tornou depois.

  • No caso de INSERT: valor antes (before) é igual a nullseguido pela string que foi inserida.
  • No caso de UPDATE: em payload.before o estado anterior da linha é exibido e, em payload.after - novo com a essência da mudança.

2.2 MongoDB

Esse conector usa o mecanismo de replicação padrão do MongoDB, lendo informações do oplog do nó primário do DBMS.

Da mesma forma que o já descrito conector para PgSQL, também aqui, na primeira inicialização, o instantâneo de dados primário é obtido, após o qual o conector muda para o modo de leitura oplog.

Exemplo de configuração:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Como você pode ver, não há novas opções em relação ao exemplo anterior, mas apenas o número de opções responsáveis ​​pela conexão com o banco de dados e seus prefixos foram reduzidos.

configurações transforms desta vez eles fazem o seguinte: transformam o nome do tópico alvo do esquema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerância ao erro

A questão da tolerância a falhas e alta disponibilidade em nosso tempo é mais aguda do que nunca - especialmente quando falamos de dados e transações, e o rastreamento de alterações de dados não está à margem nesse assunto. Vejamos o que pode dar errado em princípio e o que acontecerá com Debezium em cada caso.

Existem três opções de exclusão:

  1. Falha no Kafka Connect. Se o Connect estiver configurado para funcionar no modo distribuído, isso exigirá que vários trabalhadores definam o mesmo group.id. Então, se um deles falhar, o conector será reiniciado no outro trabalhador e continuará lendo a partir da última posição confirmada no tópico em Kafka.
  2. Perda de conectividade com o cluster Kafka. O conector simplesmente parará de ler na posição em que falhou ao enviar para Kafka e tentará reenviá-lo periodicamente até que a tentativa seja bem-sucedida.
  3. Fonte de dados indisponível. O conector tentará se reconectar à fonte de acordo com a configuração. O padrão é 16 tentativas usando recuo exponencial. Após a 16ª tentativa falhada, a tarefa será marcada como fracassado e precisará ser reiniciado manualmente por meio da interface REST do Kafka Connect.
    • No caso de PostgreSQL os dados não serão perdidos, porque o uso de slots de replicação impedirá a exclusão de arquivos WAL não lidos pelo conector. Nesse caso, há uma desvantagem: se a conectividade de rede entre o conector e o DBMS for interrompida por muito tempo, existe a chance de o espaço em disco acabar e isso pode levar à falha de todo o DBMS.
    • No caso de MySQL os arquivos binlog podem ser girados pelo próprio DBMS antes que a conectividade seja restaurada. Isso fará com que o conector entre no estado de falha e precisará reiniciar no modo instantâneo inicial para continuar lendo os logs binários para restaurar a operação normal.
    • Про MongoDB. A documentação diz: o comportamento do conector caso os arquivos log/oplog tenham sido excluídos e o conector não possa continuar lendo da posição onde parou é o mesmo para todos os SGBDs. Está no fato de que o conector entrará no estado fracassado e exigirá uma reinicialização no modo instantâneo inicial.

      No entanto, há exceções. Se o conector estiver em um estado desconectado por um longo período de tempo (ou não conseguir acessar a instância do MongoDB) e o oplog for girado durante esse período, quando a conexão for restaurada, o conector continuará calmamente a ler os dados da primeira posição disponível , e é por isso que alguns dos dados em Kafka não vai bater.

Conclusão

Debezium é minha primeira experiência com sistemas CDC e tem sido muito positiva em geral. O projeto subornou o suporte do principal DBMS, facilidade de configuração, suporte para clustering e uma comunidade ativa. Aos interessados ​​na prática, recomendo a leitura dos guias de Conexão Kafka и Debézium.

Em comparação com o conector JDBC para Kafka Connect, a principal vantagem do Debezium é que as alterações são lidas nos logs do DBMS, o que permite que os dados sejam recebidos com atraso mínimo. O Conector JDBC (fornecido pelo Kafka Connect) consulta a tabela rastreada em um intervalo fixo e (pelo mesmo motivo) não gera mensagens quando os dados são excluídos (como você pode consultar dados que não estão lá?).

Para resolver problemas semelhantes, você pode prestar atenção às seguintes soluções (além do Debezium):

PS

Leia também em nosso blog:

Fonte: habr.com

Adicionar um comentário