Présentation de Debezium - CDC pour Apache Kafka

Présentation de Debezium - CDC pour Apache Kafka

Dans mon travail, je tombe souvent sur de nouvelles solutions techniques / produits logiciels, dont les informations sont plutôt rares sur l'Internet russophone. Avec cet article, je vais essayer de combler une de ces lacunes avec un exemple de ma pratique récente, lorsque j'avais besoin de configurer l'envoi d'événements CDC à partir de deux SGBD populaires (PostgreSQL et MongoDB) vers un cluster Kafka en utilisant Debezium. J'espère que cet article de revue, qui est paru à la suite du travail effectué, sera utile à d'autres.

Qu'est-ce que Debezium et CDC en général ?

Débézium - Représentant de la catégorie logiciel CDC (Capturer le changement de données), ou plus précisément, il s'agit d'un ensemble de connecteurs pour différents SGBD compatibles avec le framework Apache Kafka Connect.

Il projet open source, sous licence Apache License v2.0 et sponsorisé par Red Hat. Le développement est en cours depuis 2016 et il fournit actuellement un support officiel pour les SGBD suivants : MySQL, PostgreSQL, MongoDB, SQL Server. Il existe également des connecteurs pour Cassandra et Oracle, mais ils sont actuellement en "accès anticipé", et les nouvelles versions ne garantissent pas la rétrocompatibilité.

Si nous comparons CDC à l'approche traditionnelle (lorsque l'application lit directement les données du SGBD), ses principaux avantages incluent la mise en œuvre du flux de changement de données au niveau de la ligne avec une faible latence, une fiabilité et une disponibilité élevées. Les deux derniers points sont atteints en utilisant un cluster Kafka comme référentiel pour les événements CDC.

De plus, les avantages incluent le fait qu'un seul modèle est utilisé pour stocker les événements, de sorte que l'application finale n'a pas à se soucier des nuances d'exploitation de différents SGBD.

Enfin, l'utilisation d'un courtier de messages ouvre la possibilité d'une mise à l'échelle horizontale des applications qui suivent les modifications des données. Dans le même temps, l'impact sur la source de données est minimisé, car les données ne sont pas reçues directement du SGBD, mais du cluster Kafka.

À propos de l'architecture Debezium

L'utilisation de Debezium se résume à ce schéma simple :

SGBD (comme source de données) → connecteur dans Kafka Connect → Apache Kafka → consommateur

À titre d'illustration, je vais donner un schéma du site Web du projet :

Présentation de Debezium - CDC pour Apache Kafka

Cependant, je n'aime pas vraiment ce schéma, car il semble que seul un connecteur d'évier soit possible.

En réalité, la situation est différente : remplir votre Data Lake (dernier lien dans le schéma ci-dessus) n'est pas la seule façon d'utiliser Debezium. Les événements envoyés à Apache Kafka peuvent être utilisés par vos applications pour faire face à diverses situations. Par exemple:

  • suppression des données non pertinentes du cache ;
  • envoyer des notifications ;
  • mises à jour de l'index de recherche ;
  • une sorte de journaux d'audit ;
  • ...

Dans le cas où vous avez une application Java et qu'il n'y a pas besoin/possibilité d'utiliser un cluster Kafka, il y a aussi la possibilité de travailler à travers connecteur intégré. Le plus évident est qu'avec lui, vous pouvez refuser une infrastructure supplémentaire (sous la forme d'un connecteur et de Kafka). Cependant, cette solution est obsolète depuis la version 1.1 et n'est plus recommandée (elle pourra être supprimée dans les versions futures).

Cet article traite de l'architecture recommandée par les développeurs, qui offre une tolérance aux pannes et une évolutivité.

Configuration du connecteur

Afin de commencer à suivre les changements dans la valeur la plus importante - les données - nous avons besoin :

  1. source de données, qui peut être MySQL à partir de la version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (La liste complète);
  2. Grappe Apache Kafka
  3. Instance Kafka Connect (versions 1.x, 2.x) ;
  4. connecteur Debezium configuré.

Travaillez sur les deux premiers points, c'est-à-dire le processus d'installation d'un SGBD et d'Apache Kafka dépasse le cadre de l'article. Cependant, pour ceux qui veulent tout déployer dans un bac à sable, il y en a un prêt à l'emploi dans le référentiel officiel avec des exemples docker-compose.yaml.

Nous nous attarderons plus en détail sur les deux derniers points.

0. Connexion Kafka

Ici et plus loin dans l'article, tous les exemples de configuration sont considérés dans le contexte de l'image Docker distribuée par les développeurs Debezium. Il contient tous les fichiers de plug-in nécessaires (connecteurs) et fournit la configuration de Kafka Connect à l'aide de variables d'environnement.

Si vous avez l'intention d'utiliser Kafka Connect de Confluent, vous devrez ajouter vous-même les plugins des connecteurs nécessaires dans le répertoire spécifié dans plugin.path ou défini via une variable d'environnement CLASSPATH. Les paramètres du travailleur et des connecteurs Kafka Connect sont définis via des fichiers de configuration transmis en tant qu'arguments à la commande de démarrage du travailleur. Pour plus de détails, voir documentation.

L'ensemble du processus de mise en place de Debeizum dans la version connecteur s'effectue en deux étapes. Considérons chacun d'eux:

1. Mise en place du framework Kafka Connect

Pour diffuser des données vers un cluster Apache Kafka, des paramètres spécifiques sont définis dans le framework Kafka Connect, tels que :

  • paramètres de connexion au cluster,
  • les noms des sujets dans lesquels la configuration du connecteur lui-même sera stockée,
  • le nom du groupe dans lequel le connecteur s'exécute (en cas d'utilisation du mode distribué).

L'image Docker officielle du projet prend en charge la configuration à l'aide de variables d'environnement - c'est ce que nous utiliserons. Alors téléchargeons l'image :

docker pull debezium/connect

L'ensemble minimal de variables d'environnement requis pour exécuter le connecteur est le suivant :

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - liste initiale des serveurs du cluster Kafka pour obtenir une liste complète des membres du cluster ;
  • OFFSET_STORAGE_TOPIC=connector-offsets — une rubrique pour stocker les positions où se trouve actuellement le connecteur ;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un topic pour stocker l'état du connecteur et ses tâches ;
  • CONFIG_STORAGE_TOPIC=connector-config - un sujet pour stocker les données de configuration du connecteur et ses tâches ;
  • GROUP_ID=1 — identifiant du groupe de travailleurs sur lequel la tâche du connecteur peut être exécutée ; requis lors de l'utilisation distribuée (distribué) mode.

Nous commençons le conteneur avec ces variables :

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Remarque sur Avro

Par défaut, Debezium écrit les données au format JSON, ce qui est acceptable pour les bacs à sable et les petites quantités de données, mais peut poser problème dans les bases de données fortement chargées. Une alternative au convertisseur JSON consiste à sérialiser les messages en utilisant Avro à un format binaire, ce qui réduit la charge sur le sous-système d'E/S dans Apache Kafka.

Pour utiliser Avro, vous devez déployer un schéma-registre (pour stocker les schémas). Les variables du convertisseur ressembleront à ceci :

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Les détails sur l'utilisation d'Avro et la configuration d'un registre pour celui-ci dépassent la portée de l'article - de plus, pour plus de clarté, nous utiliserons JSON.

2. Mise en place du connecteur lui-même

Vous pouvez maintenant accéder directement à la configuration du connecteur lui-même, qui lira les données de la source.

Prenons l'exemple des connecteurs pour deux SGBD : PostgreSQL et MongoDB, pour lesquels j'ai de l'expérience et pour lesquels il existe des différences (quoique minimes, mais dans certains cas significatives !).

La configuration est décrite en notation JSON et téléchargée sur Kafka Connect à l'aide d'une requête POST.

2.1. PostgreSQLName

Exemple de configuration de connecteur pour PostgreSQL :

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Le principe de fonctionnement du connecteur après cette configuration est assez simple :

  • Au premier démarrage, il se connecte à la base de données spécifiée dans la configuration et démarre en mode instantané initial, en envoyant à Kafka l'ensemble initial de données reçues avec le conditionnel SELECT * FROM table_name.
  • Une fois l'initialisation terminée, le connecteur passe en mode de lecture des modifications à partir des fichiers WAL PostgreSQL.

À propos des options utilisées :

  • name — le nom du connecteur pour lequel la configuration décrite ci-dessous est utilisée ; à l'avenir, ce nom sera utilisé pour travailler avec le connecteur (c'est-à-dire afficher l'état / redémarrer / mettre à jour la configuration) via l'API Kafka Connect REST ;
  • connector.class — la classe de connecteur SGBD qui sera utilisée par le connecteur configuré ;
  • plugin.name est le nom du plugin pour le décodage logique des données des fichiers WAL. Disponible au choix wal2json, decoderbuffs и pgoutput. Les deux premiers nécessitent l'installation des extensions appropriées dans le SGBD, et pgoutput pour PostgreSQL version 10 et supérieure ne nécessite pas de manipulations supplémentaires ;
  • database.* — options de connexion à la base de données, où database.server.name - le nom de l'instance PostgreSQL utilisée pour former le nom du sujet dans le cluster Kafka ;
  • table.include.list - une liste de tables dans lesquelles nous voulons suivre les changements ; donné sous la forme schema.table_name; ne peut pas être utilisé avec table.exclude.list;
  • heartbeat.interval.ms — intervalle (en millisecondes) avec lequel le connecteur envoie des messages de pulsation à un sujet spécial ;
  • heartbeat.action.query - une requête qui sera exécutée lors de l'envoi de chaque message heartbeat (l'option est apparue depuis la version 1.1) ;
  • slot.name — le nom du slot de réplication qui sera utilisé par le connecteur ;
  • publication.name - Nom Publication dans PostgreSQL que le connecteur utilise. S'il n'existe pas, Debezium essaiera de le créer. Si l'utilisateur sous lequel la connexion est établie n'a pas assez de droits pour cette action, le connecteur se fermera avec une erreur ;
  • transforms détermine comment changer exactement le nom du sujet cible :
    • transforms.AddPrefix.type indique que nous utiliserons des expressions régulières ;
    • transforms.AddPrefix.regex — masque par lequel le nom du thème cible est redéfini ;
    • transforms.AddPrefix.replacement - directement ce que nous redéfinissons.

En savoir plus sur les battements de cœur et les transformations

Par défaut, le connecteur envoie des données à Kafka pour chaque transaction validée et écrit son LSN (Log Sequence Number) dans la rubrique de service offset. Mais que se passe-t-il si le connecteur est configuré pour lire non pas la totalité de la base de données, mais seulement une partie de ses tables (dans lesquelles les données sont rarement mises à jour) ?

  • Le connecteur lira les fichiers WAL et ne détectera pas les validations de transaction dans les tables qu'il surveille.
  • Par conséquent, il ne mettra pas à jour sa position actuelle dans le sujet ou dans l'emplacement de réplication.
  • Ceci, à son tour, entraînera le "blocage" des fichiers WAL sur le disque et manquera probablement d'espace disque.

Et ici, les options viennent à la rescousse. heartbeat.interval.ms и heartbeat.action.query. L'utilisation de ces options par paires permet d'exécuter une demande de modification de données dans une table distincte chaque fois qu'un message de pulsation est envoyé. Ainsi, le LSN sur lequel se trouve actuellement le connecteur (dans le slot de réplication) est constamment mis à jour. Cela permet au SGBD de supprimer les fichiers WAL qui ne sont plus nécessaires. Pour plus d'informations sur le fonctionnement des options, voir documentation.

Une autre option qui mérite une plus grande attention est transforms. Bien qu'il s'agisse davantage de commodité et de beauté ...

Par défaut, Debezium crée des sujets en utilisant la politique de nommage suivante : serverName.schemaName.tableName. Cela peut ne pas toujours être pratique. Choix transforms à l'aide d'expressions régulières, vous pouvez définir une liste de tables dont les événements doivent être acheminés vers une rubrique portant un nom spécifique.

Dans notre configuration grâce à transforms ce qui suit se produit : tous les événements CDC de la base de données suivie iront au sujet avec le nom data.cdc.dbname. Sinon (sans ces paramètres), Debezium créerait par défaut un sujet pour chaque table du formulaire : pg-dev.public.<table_name>.

Limites du connecteur

À la fin de la description de la configuration du connecteur pour PostgreSQL, il convient de parler des fonctionnalités/limitations suivantes de son travail :

  1. La fonctionnalité de connecteur pour PostgreSQL repose sur le concept de décodage logique. Par conséquent il ne suit pas les demandes de modification de la structure de la base de données (DDL) - en conséquence, ces données ne seront pas dans les rubriques.
  2. Étant donné que des slots de réplication sont utilisés, la connexion du connecteur est possible seulement à l'instance de SGBD maître.
  3. Si l'utilisateur sous lequel le connecteur se connecte à la base de données a des droits en lecture seule, alors avant le premier lancement, vous devrez créer manuellement un slot de réplication et publier dans la base de données.

Application d'une configuration

Chargeons donc notre configuration dans le connecteur :

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Nous vérifions que le téléchargement a réussi et que le connecteur a démarré :

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Génial : il est installé et prêt à l'emploi. Faisons maintenant semblant d'être un consommateur et connectons-nous à Kafka, après quoi nous ajoutons et modifions une entrée dans le tableau :

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Dans notre sujet, cela sera affiché comme suit :

JSON très long avec nos modifications

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Dans les deux cas, les enregistrements sont constitués de la clé (PK) de l'enregistrement qui a été modifié et de l'essence même des modifications : ce qu'était l'enregistrement avant et ce qu'il est devenu après.

  • Dans le cas de INSERT: valeur avant (before) équivaut à nullsuivi de la chaîne qui a été insérée.
  • Dans le cas de UPDATE: dans payload.before l'état précédent de la ligne est affiché, et dans payload.after - nouveau avec l'essence du changement.

2.2 MongoDB

Ce connecteur utilise le mécanisme de réplication MongoDB standard, en lisant les informations de l'oplog du nœud principal du SGBD.

Comme pour le connecteur déjà décrit pour PgSQL, ici aussi, au premier démarrage, l'instantané des données primaires est pris, après quoi le connecteur passe en mode de lecture oplog.

Exemple de configuration:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Comme vous pouvez le voir, il n'y a pas de nouvelles options par rapport à l'exemple précédent, mais seul le nombre d'options chargées de se connecter à la base de données et leurs préfixes a été réduit.

réglages transforms cette fois, ils font ce qui suit : transformer le nom du sujet cible du schéma <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolérance aux pannes

La question de la tolérance aux pannes et de la haute disponibilité à notre époque est plus aiguë que jamais - en particulier lorsque nous parlons de données et de transactions, et le suivi des modifications de données n'est pas à l'écart dans ce domaine. Regardons ce qui peut mal tourner en principe et ce qui arrivera à Debezium dans chaque cas.

Il existe trois options de désinscription :

  1. Échec de Kafka Connect. Si Connect est configuré pour fonctionner en mode distribué, cela nécessite que plusieurs travailleurs définissent le même group.id. Ensuite, si l'un d'entre eux échoue, le connecteur sera redémarré sur l'autre travailleur et continuera la lecture à partir de la dernière position validée dans le sujet dans Kafka.
  2. Perte de connectivité avec le cluster Kafka. Le connecteur arrêtera simplement de lire à la position qu'il n'a pas réussi à envoyer à Kafka et essaiera périodiquement de le renvoyer jusqu'à ce que la tentative réussisse.
  3. Source de données indisponible. Le connecteur tentera de se reconnecter à la source selon la configuration. La valeur par défaut est de 16 tentatives en utilisant ralentissement exponentiel. Après la 16e tentative infructueuse, la tâche sera marquée comme manqué et il devra être redémarré manuellement via l'interface Kafka Connect REST.
    • Dans le cas de PostgreSQL les données ne seront pas perdues, car l'utilisation de slots de réplication empêchera la suppression des fichiers WAL non lus par le connecteur. Dans ce cas, il y a un inconvénient : si la connectivité réseau entre le connecteur et le SGBD est interrompue pendant une longue période, il y a un risque que l'espace disque s'épuise, ce qui peut entraîner la défaillance de l'ensemble du SGBD.
    • Dans le cas de MySQL les fichiers binlog peuvent faire l'objet d'une rotation par le SGBD lui-même avant que la connectivité ne soit restaurée. Cela entraînera le connecteur à passer à l'état d'échec et il devra redémarrer en mode instantané initial pour continuer à lire les journaux binaires afin de restaurer le fonctionnement normal.
    • Sur MongoDB. La documentation indique: le comportement du connecteur dans le cas où les fichiers log/oplog ont été supprimés et que le connecteur ne peut pas continuer à lire à partir de la position où il s'est arrêté est le même pour tous les SGBD. Cela réside dans le fait que le connecteur passera à l'état manqué et nécessitera un redémarrage en mode instantané initial.

      Cependant, il existe des exceptions. Si le connecteur était dans un état déconnecté pendant une longue période (ou n'a pas pu atteindre l'instance MongoDB), et oplog a été tourné pendant ce temps, alors lorsque la connexion est rétablie, le connecteur continuera calmement à lire les données à partir de la première position disponible , c'est pourquoi certaines des données de Kafka aucun va frapper.

Conclusion

Debezium est ma première expérience avec les systèmes CDC et a été globalement très positive. Le projet a soudoyé le support du SGBD principal, la facilité de configuration, le support du clustering et une communauté active. Pour ceux qui s'intéressent à la pratique, je vous recommande de lire les guides pour Kafka Connexion и Débézium.

Par rapport au connecteur JDBC pour Kafka Connect, le principal avantage de Debezium est que les modifications sont lues à partir des journaux du SGBD, ce qui permet de recevoir les données avec un délai minimal. Le connecteur JDBC (fourni par Kafka Connect) interroge la table suivie à intervalle fixe et (pour la même raison) ne génère pas de messages lorsque des données sont supprimées (comment interroger des données qui ne s'y trouvent pas ?).

Pour résoudre des problèmes similaires, vous pouvez faire attention aux solutions suivantes (en plus de Debezium) :

PS

A lire aussi sur notre blog :

Source: habr.com

Ajouter un commentaire