Presentació de Debezium - CDC per a Apache Kafka

Presentació de Debezium - CDC per a Apache Kafka

En el meu treball, sovint em trobo amb noves solucions tècniques / productes de programari, la informació sobre les quals és força escassa a Internet de parla russa. Amb aquest article, intentaré omplir un d'aquests buits amb un exemple de la meva pràctica recent, quan necessitava configurar l'enviament d'esdeveniments CDC des de dos DBMS populars (PostgreSQL i MongoDB) a un clúster de Kafka mitjançant Debezium. Espero que aquest article de ressenya, que va aparèixer com a resultat de la feina feta, sigui útil als altres.

Què és Debezium i CDC en general?

Debezium - Representant de la categoria de programari CDC (Captura el canvi de dades), o més precisament, és un conjunt de connectors per a diversos DBMS compatibles amb el framework Apache Kafka Connect.

El projecte de codi obert, amb llicència sota l'Apache License v2.0 i patrocinat per Red Hat. El desenvolupament està en marxa des de l'any 2016 i actualment ofereix suport oficial per als següents DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. També hi ha connectors per a Cassandra i Oracle, però actualment es troben en estat d'accés anticipat i les noves versions no garanteixen la compatibilitat enrere.

Si comparem CDC amb l'enfocament tradicional (quan l'aplicació llegeix dades del DBMS directament), els seus principals avantatges inclouen la implementació de la transmissió de canvis de dades a nivell de fila amb baixa latència, alta fiabilitat i disponibilitat. Els dos darrers punts s'aconsegueixen utilitzant un clúster Kafka com a dipòsit d'esdeveniments de CDC.

A més, els avantatges inclouen el fet que s'utilitza un únic model per emmagatzemar esdeveniments, de manera que l'aplicació final no s'ha de preocupar pels matisos d'operar diferents DBMS.

Finalment, l'ús d'un intermediari de missatges obre l'abast per a l'escala horitzontal d'aplicacions que fan un seguiment dels canvis en les dades. Al mateix temps, es minimitza l'impacte sobre la font de dades, ja que les dades no es reben directament del SGBD, sinó del clúster Kafka.

Sobre l'arquitectura Debezium

L'ús de Debezium es redueix a aquest esquema senzill:

DBMS (com a font de dades) → connector a Kafka Connect → Apache Kafka → consumidor

Com a il·lustració, donaré un diagrama del lloc web del projecte:

Presentació de Debezium - CDC per a Apache Kafka

Tanmateix, no m'agrada molt aquest esquema, perquè sembla que només és possible un connector d'aigüera.

En realitat, la situació és diferent: omplir el vostre Data Lake (últim enllaç del diagrama de dalt) no és l'única manera d'utilitzar Debezium. Els esdeveniments enviats a Apache Kafka poden ser utilitzats per les vostres aplicacions per fer front a diverses situacions. Per exemple:

  • eliminació de dades irrellevants de la memòria cau;
  • enviament de notificacions;
  • actualitzacions de l'índex de cerca;
  • algun tipus de registres d'auditoria;
  • ...

En cas que tingueu una aplicació Java i no hi ha necessitat/possibilitat d'utilitzar un clúster Kafka, també hi ha la possibilitat de treballar amb connector incrustat. L'avantatge evident és que amb ell podeu rebutjar una infraestructura addicional (en forma de connector i Kafka). Tanmateix, aquesta solució ha quedat obsoleta des de la versió 1.1 i ja no es recomana utilitzar-la (és possible que s'elimini en versions futures).

Aquest article tractarà l'arquitectura recomanada pels desenvolupadors, que proporciona tolerància a errors i escalabilitat.

Configuració del connector

Per començar a fer el seguiment dels canvis en el valor més important (dades) necessitem:

  1. font de dades, que pot ser MySQL a partir de la versió 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (la llista completa);
  2. Clúster Apache Kafka
  3. Instància de Kafka Connect (versions 1.x, 2.x);
  4. connector Debezium configurat.

Treballar els dos primers punts, és a dir. el procés d'instal·lació d'un SGBD i Apache Kafka està fora de l'abast de l'article. No obstant això, per a aquells que vulguin desplegar-ho tot en un sandbox, n'hi ha un de ja fet al repositori oficial amb exemples docker-compose.yaml.

Ens centrarem en els dos últims punts amb més detall.

0. Kafka Connect

Aquí i més endavant a l'article, tots els exemples de configuració es consideren en el context de la imatge de Docker distribuïda pels desenvolupadors de Debezium. Conté tots els fitxers de connectors necessaris (connectors) i proporciona la configuració de Kafka Connect mitjançant variables d'entorn.

Si teniu intenció d'utilitzar Kafka Connect des de Confluent, haureu d'afegir vosaltres mateixos els connectors dels connectors necessaris al directori especificat a plugin.path o establir mitjançant una variable d'entorn CLASSPATH. La configuració del treballador i els connectors de Kafka Connect es defineixen mitjançant fitxers de configuració que es passen com a arguments a l'ordre d'inici del treballador. Per a més detalls vegeu documentació.

Tot el procés de configuració de Debeizum en la versió de connector es realitza en dues etapes. Considerem cadascun d'ells:

1. Configuració del marc Kafka Connect

Per transmetre dades a un clúster Apache Kafka, s'estableixen paràmetres específics al marc de Kafka Connect, com ara:

  • configuració de connexió del clúster,
  • noms dels temes en què s'emmagatzemarà la configuració del propi connector,
  • el nom del grup en què s'executa el connector (en cas d'utilitzar el mode distribuït).

La imatge oficial de Docker del projecte admet la configuració mitjançant variables d'entorn; això és el que farem servir. Així que descarreguem la imatge:

docker pull debezium/connect

El conjunt mínim de variables d'entorn necessaris per executar el connector és el següent:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - llista inicial dels servidors del clúster de Kafka per obtenir una llista completa dels membres del clúster;
  • OFFSET_STORAGE_TOPIC=connector-offsets — un tema per emmagatzemar les posicions on es troba actualment el connector;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un tema per emmagatzemar l'estat del connector i les seves tasques;
  • CONFIG_STORAGE_TOPIC=connector-config - un tema per emmagatzemar dades de configuració del connector i les seves tasques;
  • GROUP_ID=1 — identificador del grup de treballadors en què es pot executar la tasca de connector; necessari quan s'utilitza distribuït (distribuït) règim.

Comencem el contenidor amb aquestes variables:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota sobre Avro

De manera predeterminada, Debezium escriu dades en format JSON, que és acceptable per a caixes de sorra i petites quantitats de dades, però pot ser un problema en bases de dades amb molta càrrega. Una alternativa al convertidor JSON és serialitzar missatges mitjançant Avro a un format binari, que redueix la càrrega del subsistema d'E/S a Apache Kafka.

Per utilitzar Avro, heu de desplegar un altre esquema-registre (per emmagatzemar esquemes). Les variables per al convertidor seran així:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Els detalls sobre l'ús d'Avro i la seva configuració d'un registre estan fora de l'abast de l'article; a més, per a més claredat, utilitzarem JSON.

2. Configuració del propi connector

Ara podeu anar directament a la configuració del propi connector, que llegirà les dades de la font.

Vegem l'exemple de connectors per a dos DBMS: PostgreSQL i MongoDB, per als quals tinc experiència i per als quals hi ha diferències (encara que petites, però en alguns casos importants!).

La configuració es descriu en notació JSON i es penja a Kafka Connect mitjançant una sol·licitud POST.

2.1. PostgreSQL

Exemple de configuració del connector per a PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

El principi de funcionament del connector després d'aquesta configuració és bastant simple:

  • Al primer inici, es connecta a la base de dades especificada a la configuració i s'inicia en el mode instantània inicial, enviant a Kafka el conjunt inicial de dades rebudes amb el condicional SELECT * FROM table_name.
  • Un cop finalitzada la inicialització, el connector entra en el mode de lectura dels canvis dels fitxers WAL de PostgreSQL.

Sobre les opcions utilitzades:

  • name — el nom del connector per al qual s'utilitza la configuració descrita a continuació; en el futur, aquest nom s'utilitzarà per treballar amb el connector (és a dir, veure l'estat/reiniciar/actualitzar la configuració) mitjançant l'API REST de Kafka Connect;
  • connector.class — la classe de connector DBMS que utilitzarà el connector configurat;
  • plugin.name és el nom del connector per a la descodificació lògica de dades dels fitxers WAL. Disponible per triar wal2json, decoderbuffs и pgoutput. Els dos primers requereixen la instal·lació de les extensions adequades al SGBD, i pgoutput per a PostgreSQL versió 10 i superior no requereix manipulacions addicionals;
  • database.* — opcions per connectar-se a la base de dades, on database.server.name - el nom de la instància PostgreSQL utilitzada per formar el nom del tema al clúster de Kafka;
  • table.include.list - una llista de taules en què volem fer un seguiment dels canvis; donat en el format schema.table_name; no es pot utilitzar juntament amb table.exclude.list;
  • heartbeat.interval.ms — interval (en mil·lisegons) amb el qual el connector envia missatges de batec del cor a un tema especial;
  • heartbeat.action.query - una sol·licitud que s'executarà en enviar cada missatge de batec (l'opció apareix des de la versió 1.1);
  • slot.name — el nom de la ranura de rèplica que utilitzarà el connector;
  • publication.name - Nom Publicació a PostgreSQL que utilitza el connector. En cas que no existeixi, Debezium intentarà crear-lo. Si l'usuari sota el qual es fa la connexió no té prou drets per a aquesta acció, el connector sortirà amb un error;
  • transforms determina com canviar exactament el nom del tema objectiu:
    • transforms.AddPrefix.type indica que utilitzarem expressions regulars;
    • transforms.AddPrefix.regex — màscara amb la qual es redefinirà el nom del tema objectiu;
    • transforms.AddPrefix.replacement - directament allò que redefinim.

Més informació sobre el batec del cor i les transformacions

De manera predeterminada, el connector envia dades a Kafka per a cada transacció compromesa i escriu el seu LSN (Número de seqüència de registre) al tema del servei offset. Però, què passa si el connector està configurat per llegir no tota la base de dades, sinó només una part de les seves taules (en les quals les dades s'actualitzen amb poca freqüència)?

  • El connector llegirà els fitxers WAL i no detectarà transaccions compromeses en ells a les taules que supervisa.
  • Per tant, no actualitzarà la seva posició actual ni al tema ni a la ranura de rèplica.
  • Això, al seu torn, farà que els fitxers WAL quedin "encallats" al disc i probablement es quedaran sense espai al disc.

I aquí les opcions vénen al rescat. heartbeat.interval.ms и heartbeat.action.query. L'ús d'aquestes opcions en parelles fa possible executar una sol·licitud de canvi de dades en una taula separada cada vegada que s'enviï un missatge de batec. Així, el LSN on es troba actualment el connector (a la ranura de rèplica) s'actualitza constantment. Això permet que el DBMS elimine els fitxers WAL que ja no són necessaris. Per obtenir més informació sobre com funcionen les opcions, vegeu documentació.

Una altra opció que mereix més atenció és transforms. Encara que es tracta més de comoditat i bellesa...

De manera predeterminada, Debezium crea temes utilitzant la política de nomenclatura següent: serverName.schemaName.tableName. Això pot no ser sempre convenient. Opcions transforms utilitzant expressions regulars, podeu definir una llista de taules els esdeveniments de les quals s'han d'encaminar a un tema amb un nom específic.

En la nostra configuració gràcies a transforms succeeix el següent: tots els esdeveniments de CDC de la base de dades de seguiment aniran al tema amb el nom data.cdc.dbname. En cas contrari (sense aquesta configuració), Debezium crearia per defecte un tema per a cada taula del formulari: pg-dev.public.<table_name>.

Limitacions del connector

Al final de la descripció de la configuració del connector per a PostgreSQL, val la pena parlar de les següents característiques / limitacions del seu treball:

  1. La funcionalitat del connector per a PostgreSQL es basa en el concepte de descodificació lògica. Per tant ell no fa un seguiment de les sol·licituds per canviar l'estructura de la base de dades (DDL) - en conseqüència, aquestes dades no estaran als temes.
  2. Com que s'utilitzen ranures de rèplica, la connexió del connector és possible només a la instància mestra del SGBD.
  3. Si l'usuari amb el qual es connecta el connector a la base de dades té drets de només lectura, abans del primer llançament, haureu de crear manualment una ranura de rèplica i publicar-la a la base de dades.

Aplicació d'una configuració

Així que carreguem la nostra configuració al connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Comprovem que la descàrrega ha estat correcta i que el connector ha començat:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Genial: està configurat i llest per funcionar. Ara fem veure que som un consumidor i connectem a Kafka, després d'això afegim i canviem una entrada a la taula:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Al nostre tema, això es mostrarà de la següent manera:

JSON molt llarg amb els nostres canvis

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

En ambdós casos, els registres consisteixen en la clau (PK) del registre que es va canviar i l'essència mateixa dels canvis: què era el registre abans i què esdevingué després.

  • En el cas de INSERT: valor abans (before) és igual nullseguit de la cadena que s'ha inserit.
  • En el cas de UPDATE: dins payload.before es mostra l'estat anterior de la fila i en payload.after - nou amb l'essència del canvi.

2.2 MongoDB

Aquest connector utilitza el mecanisme de replicació estàndard de MongoDB, llegint informació de l'oplog del node primari del DBMS.

De manera similar al connector ja descrit per a PgSQL, aquí també, al primer inici, es fa la instantània de dades primàries, després de la qual el connector passa al mode de lectura d'oplog.

Exemple de configuració:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Com podeu veure, no hi ha opcions noves respecte a l'exemple anterior, sinó que només s'ha reduït el nombre d'opcions responsables de la connexió a la base de dades i els seus prefixos.

ajustos transforms aquesta vegada fan el següent: desviar el nom del tema objectiu de l'esquema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

falta de tolerància

El tema de la tolerància a errors i l'alta disponibilitat en el nostre temps és més agut que mai, sobretot quan parlem de dades i transaccions, i el seguiment dels canvis de dades no està al marge en aquest assumpte. Vegem què pot sortir malament en principi i què passarà amb Debezium en cada cas.

Hi ha tres opcions de desactivació:

  1. Fallada de Kafka Connect. Si Connect està configurat per funcionar en mode distribuït, això requereix que diversos treballadors estableixin el mateix group.id. Aleshores, si un d'ells falla, el connector es reiniciarà a l'altre treballador i es continuarà llegint des de l'última posició compromesa del tema a Kafka.
  2. Pèrdua de connectivitat amb el clúster Kafka. El connector simplement deixarà de llegir a la posició que no ha pogut enviar a Kafka i intentarà tornar-lo a enviar periòdicament fins que l'intent tingui èxit.
  3. Font de dades no disponible. El connector intentarà tornar a connectar-se a la font segons la configuració. El valor predeterminat és de 16 intents retrocés exponencial. Després del 16è intent fallit, la tasca es marcarà com a fracassat i caldrà reiniciar-lo manualment mitjançant la interfície REST de Kafka Connect.
    • En el cas de PostgreSQL les dades no es perdran, perquè l'ús de ranures de rèplica evitarà la supressió dels fitxers WAL que no llegeix el connector. En aquest cas, hi ha un inconvenient: si la connectivitat de xarxa entre el connector i el SGBD s'interromp durant molt de temps, hi ha la possibilitat que l'espai en disc s'esgoti i això pot provocar un fracàs de tot el SGBD.
    • En el cas de MySQL Els fitxers binlog poden ser rotats pel mateix SGBD abans que es restableixi la connectivitat. Això farà que el connector passi a l'estat d'error i s'haurà de reiniciar en el mode d'instantània inicial per continuar llegint els binlogs per restablir el funcionament normal.
    • Про MongoDB. La documentació diu: el comportament del connector en cas que s'hagin esborrat els fitxers log/oplog i el connector no pot continuar llegint des de la posició on es va deixar és el mateix per a tots els SGBD. Rau en el fet que el connector entrarà a l'estat fracassat i requerirà un reinici en el mode instantània inicial.

      Tanmateix, hi ha excepcions. Si el connector va estar en un estat desconnectat durant molt de temps (o no va poder arribar a la instància de MongoDB) i oplog es va girar durant aquest temps, aleshores, quan es restableixi la connexió, el connector continuarà llegint dades des de la primera posició disponible amb calma. , per això algunes de les dades de Kafka no colpejarà.

Conclusió

Debezium és la meva primera experiència amb sistemes CDC i ha estat molt positiva en general. El projecte va subornar el suport del SGBD principal, la facilitat de configuració, el suport per a clúster i una comunitat activa. Per als interessats en la pràctica, us recomano que llegiu les guies Kafka Connect и Debezium.

En comparació amb el connector JDBC per a Kafka Connect, el principal avantatge de Debezium és que els canvis es llegeixen dels registres de DBMS, la qual cosa permet rebre dades amb un retard mínim. El connector JDBC (proporcionat per Kafka Connect) consulta la taula de seguiment a un interval fix i (per la mateixa raó) no genera missatges quan se suprimeixen les dades (com podeu consultar les dades que no hi són?).

Per resoldre problemes similars, podeu prestar atenció a les solucions següents (a més de Debezium):

PS

Llegeix també al nostre blog:

Font: www.habr.com

Afegeix comentari