Wir stellen Debezium vor – CDC für Apache Kafka

Wir stellen Debezium vor – CDC für Apache Kafka

Bei meiner Arbeit stoße ich oft auf neue technische Lösungen/Softwareprodukte, über die es im russischsprachigen Internet eher wenig Informationen gibt. Mit diesem Artikel werde ich versuchen, eine solche Lücke mit einem Beispiel aus meiner jüngsten Praxis zu schließen, als ich das Senden von CDC-Ereignissen von zwei beliebten DBMS (PostgreSQL und MongoDB) an einen Kafka-Cluster mithilfe von Debezium konfigurieren musste. Ich hoffe, dass dieser Übersichtsartikel, der als Ergebnis der geleisteten Arbeit erscheint, für andere nützlich sein wird.

Was ist Debezium und CDC im Allgemeinen?

Debezium — Vertreter der CDC-Softwarekategorie (Erfassen Sie Datenänderungen), oder genauer gesagt handelt es sich um eine Reihe von Konnektoren für verschiedene DBMS, die mit dem Apache Kafka Connect-Framework kompatibel sind.

Es Open-Source-Projekt, lizenziert unter der Apache-Lizenz v2.0 und gesponsert von Red Hat. Die Entwicklung läuft seit 2016 und bietet derzeit offizielle Unterstützung für die folgenden DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Es gibt auch Konnektoren für Cassandra und Oracle, diese befinden sich jedoch derzeit im „Early Access“-Status und neue Releases garantieren keine Abwärtskompatibilität.

Wenn wir CDC mit dem herkömmlichen Ansatz vergleichen (bei dem die Anwendung Daten direkt vom DBMS liest), liegen seine Hauptvorteile in der Implementierung des Datenänderungs-Streamings auf Zeilenebene mit geringer Latenz, hoher Zuverlässigkeit und Verfügbarkeit. Die letzten beiden Punkte werden durch die Verwendung eines Kafka-Clusters als Repository für CDC-Ereignisse erreicht.

Ein weiterer Vorteil ist die Tatsache, dass ein einziges Modell zum Speichern von Ereignissen verwendet wird, sodass sich die Endanwendung nicht um die Feinheiten des Betriebs verschiedener DBMS kümmern muss.

Schließlich ermöglicht die Verwendung eines Nachrichtenbrokers Anwendungen, die Datenänderungen überwachen, eine horizontale Skalierung. Gleichzeitig werden die Auswirkungen auf die Datenquelle minimiert, da die Daten nicht direkt vom DBMS, sondern vom Kafka-Cluster bezogen werden.

Über die Debezium-Architektur

Die Verwendung von Debezium läuft auf dieses einfache Schema hinaus:

DBMS (als Datenquelle) → Connector in Kafka Connect → Apache Kafka → Consumer

Zur Veranschaulichung hier ein Diagramm von der Projektwebsite:

Wir stellen Debezium vor – CDC für Apache Kafka

Allerdings gefällt mir dieses Schema nicht wirklich, da scheinbar nur die Verwendung eines Spülenanschlusses möglich ist.

In Wirklichkeit sieht die Situation anders aus: Das Füllen Ihres Data Lake (letzter Link im Diagramm oben) Dies ist nicht die einzige Möglichkeit, Debezium zu verwenden. An Apache Kafka gesendete Ereignisse können von Ihren Anwendungen zur Bewältigung verschiedener Situationen verwendet werden. Zum Beispiel:

  • Entfernen irrelevanter Daten aus dem Cache;
  • Senden von Benachrichtigungen;
  • Suchindexaktualisierungen;
  • eine Art Audit-Protokolle;
  • ...

Falls Sie eine Java-Anwendung haben und keine Notwendigkeit/Möglichkeit besteht, einen Kafka-Cluster zu verwenden, besteht auch die Möglichkeit, durchzuarbeiten eingebetteter Stecker. Der offensichtliche Vorteil besteht darin, dass keine zusätzliche Infrastruktur (in Form eines Connectors und Kafka) erforderlich ist. Diese Lösung ist jedoch seit Version 1.1 veraltet und wird nicht mehr zur Verwendung empfohlen (die Unterstützung dafür wird in zukünftigen Versionen möglicherweise entfernt).

In diesem Artikel wird die von Entwicklern empfohlene Architektur erläutert, die Fehlertoleranz und Skalierbarkeit bietet.

Connector-Konfiguration

Um mit der Verfolgung von Änderungen des wichtigsten Werts – Daten – zu beginnen, benötigen wir:

  1. Datenquelle, die MySQL ab Version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ sein kann (Die vollständige Liste);
  2. Apache Kafka-Cluster;
  3. Kafka Connect-Instanz (Versionen 1.x, 2.x);
  4. konfigurierter Debezium-Connector.

Arbeiten Sie an den ersten beiden Punkten, d.h. Der Installationsprozess des DBMS und Apache Kafka geht über den Rahmen des Artikels hinaus. Für diejenigen, die jedoch alles in der Sandbox bereitstellen möchten, gibt es im offiziellen Repository mit Beispielen ein fertiges Paket docker-compose.yaml.

Auf die letzten beiden Punkte gehen wir näher ein.

0. Kafka Connect

Hier und weiter im Artikel werden alle Konfigurationsbeispiele im Kontext des von den Debezium-Entwicklern verteilten Docker-Images besprochen. Es enthält alle notwendigen Plugin-Dateien (Connectoren) und ermöglicht die Konfiguration von Kafka Connect mithilfe von Umgebungsvariablen.

Wenn Sie Kafka Connect von Confluent verwenden möchten, müssen Sie die Plugins der erforderlichen Konnektoren unabhängig zum in angegebenen Verzeichnis hinzufügen plugin.path oder über eine Umgebungsvariable festgelegt werden CLASSPATH. Einstellungen für den Kafka Connect-Worker und die Connectors werden durch Konfigurationsdateien bestimmt, die als Argumente an den Worker-Startbefehl übergeben werden. Weitere Einzelheiten finden Sie unter Dokumentation.

Der gesamte Prozess der Einrichtung von Debeizum in der Connector-Version erfolgt in zwei Schritten. Schauen wir uns jeden von ihnen an:

1. Einrichten des Kafka Connect-Frameworks

Um Daten an den Apache Kafka-Cluster zu streamen, werden im Kafka Connect-Framework bestimmte Parameter festgelegt, wie zum Beispiel:

  • Parameter für die Verbindung zum Cluster,
  • Namen von Themen, in denen die Konfiguration des Connectors selbst direkt gespeichert wird,
  • der Name der Gruppe, in der der Connector ausgeführt wird (wenn der verteilte Modus verwendet wird).

Das offizielle Docker-Image des Projekts unterstützt die Konfiguration mithilfe von Umgebungsvariablen – diese werden wir verwenden. Laden Sie also das Bild herunter:

docker pull debezium/connect

Der Mindestsatz an Umgebungsvariablen, der zum Ausführen des Connectors erforderlich ist, ist wie folgt:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 – anfängliche Liste der Kafka-Cluster-Server, um eine vollständige Liste der Cluster-Mitglieder zu erhalten;
  • OFFSET_STORAGE_TOPIC=connector-offsets – ein Thema zum Speichern von Positionen, an denen sich der Stecker aktuell befindet;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — Thema zum Speichern des Status des Connectors und seiner Aufgaben;
  • CONFIG_STORAGE_TOPIC=connector-config – Thema zum Speichern von Connector-Konfigurationsdaten und seinen Aufgaben;
  • GROUP_ID=1 — Kennung der Gruppe von Workern, für die die Connector-Aufgabe ausgeführt werden kann; notwendig bei verteilter Verwendung (verteilt) Modus.

Wir starten den Container mit diesen Variablen:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Hinweis zu Avro

Standardmäßig schreibt Debezium Daten im JSON-Format, was für Sandboxes und kleine Datenmengen akzeptabel ist, in stark ausgelasteten Datenbanken jedoch zum Problem werden kann. Eine Alternative zu einem JSON-Konverter besteht darin, Nachrichten mithilfe von zu serialisieren Avro in ein Binärformat, was die Belastung des I/O-Subsystems in Apache Kafka reduziert.

Um Avro verwenden zu können, müssen Sie ein separates bereitstellen Schema-Registrierung (zum Speichern von Diagrammen). Die Variablen für den Konverter sehen folgendermaßen aus:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Einzelheiten zur Verwendung von Avro und zum Einrichten der Registrierung dafür gehen über den Rahmen dieses Artikels hinaus. Im Folgenden verwenden wir aus Gründen der Übersichtlichkeit JSON.

2. Konfigurieren des Connectors selbst

Jetzt können Sie direkt zur Konfiguration des Connectors selbst gehen, der Daten aus der Quelle liest.

Schauen wir uns das Beispiel von Konnektoren für zwei DBMS an: PostgreSQL und MongoDB, mit denen ich Erfahrung habe und bei denen es Unterschiede gibt (wenn auch kleine, aber in einigen Fällen erhebliche!).

Die Konfiguration wird in JSON-Notation beschrieben und über eine POST-Anfrage auf Kafka Connect hochgeladen.

2.1. PostgreSQL

Beispiel-Connector-Konfiguration für PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Das Funktionsprinzip des Steckers nach diesem Setup ist recht einfach:

  • Beim ersten Start stellt es eine Verbindung zur in der Konfiguration angegebenen Datenbank her und startet im Modus erster Schnappschuss, Senden des ersten Datensatzes, der mithilfe der Bedingung erhalten wurde, an Kafka SELECT * FROM table_name.
  • Nach Abschluss der Initialisierung wechselt der Connector in den Modus zum Lesen von Änderungen aus PostgreSQL-WAL-Dateien.

Zu den verwendeten Optionen:

  • name — der Name des Connectors, für den die unten beschriebene Konfiguration verwendet wird; In Zukunft wird dieser Name verwendet, um über die Kafka Connect REST API mit dem Connector zu arbeiten (d. h. den Status anzuzeigen/Neustart/Aktualisierung der Konfiguration);
  • connector.class — DBMS-Connector-Klasse, die vom konfigurierten Connector verwendet wird;
  • plugin.name — der Name des Plugins zur logischen Dekodierung von Daten aus WAL-Dateien. Verfügbar zur Auswahl wal2json, decoderbuffs и pgoutput. Die ersten beiden erfordern die Installation der entsprechenden Erweiterungen im DBMS und pgoutput für PostgreSQL Version 10 und höher sind keine zusätzlichen Manipulationen erforderlich;
  • database.* — Optionen zum Herstellen einer Verbindung zur Datenbank, wo database.server.name – PostgreSQL-Instanzname, der zur Bildung des Themennamens im Kafka-Cluster verwendet wird;
  • table.include.list — eine Liste von Tabellen, in denen wir Änderungen verfolgen möchten; im Format angegeben schema.table_name; kann nicht zusammen mit verwendet werden table.exclude.list;
  • heartbeat.interval.ms — Intervall (in Millisekunden), mit dem der Connector Heartbeat-Nachrichten an ein spezielles Thema sendet;
  • heartbeat.action.query — eine Anfrage, die beim Senden jeder Heartbeat-Nachricht ausgeführt wird (die Option erschien in Version 1.1);
  • slot.name – der Name des Replikationssteckplatzes, der vom Connector verwendet wird;
  • publication.name - Name Veröffentlichung in PostgreSQL, das der Connector verwendet. Wenn es nicht existiert, wird Debezium versuchen, es zu erstellen. Wenn der Benutzer, unter dem die Verbindung hergestellt wird, nicht über ausreichende Rechte für diese Aktion verfügt, bricht der Connector mit einem Fehler ab;
  • transforms bestimmt genau, wie der Name des Zielthemas geändert wird:
    • transforms.AddPrefix.type zeigt an, dass wir reguläre Ausdrücke verwenden werden;
    • transforms.AddPrefix.regex – eine Maske, die den Namen des Zielthemas neu definiert;
    • transforms.AddPrefix.replacement - direkt das, was wir neu definieren.

Mehr über Herzschlag und Transformationen

Standardmäßig sendet der Connector für jede festgeschriebene Transaktion Daten an Kafka und seine LSN (Log Sequence Number) wird im Servicethema aufgezeichnet offset. Aber was passiert, wenn der Connector so konfiguriert ist, dass er nicht die gesamte Datenbank, sondern nur einen Teil ihrer Tabellen liest (in denen Datenaktualisierungen nicht häufig stattfinden)?

  • Der Connector liest WAL-Dateien und erkennt keine Transaktions-Commits für die von ihm überwachten Tabellen.
  • Daher wird die aktuelle Position weder im Thema noch im Replikationsslot aktualisiert.
  • Dies wiederum führt dazu, dass WAL-Dateien auf der Festplatte gespeichert werden und wahrscheinlich nicht mehr genügend Speicherplatz vorhanden ist.

Und hier kommen Optionen zur Rettung. heartbeat.interval.ms и heartbeat.action.query. Durch die paarweise Verwendung dieser Optionen ist es möglich, jedes Mal, wenn eine Heartbeat-Nachricht gesendet wird, eine Anforderung zum Ändern von Daten in einer separaten Tabelle durchzuführen. Somit wird die LSN, auf der sich der Connector aktuell befindet (im Replikationsslot), ständig aktualisiert. Dadurch kann das DBMS WAL-Dateien entfernen, die nicht mehr benötigt werden. Weitere Informationen zur Funktionsweise der Optionen finden Sie hier Dokumentation.

Eine weitere Option, die genauere Aufmerksamkeit verdient, ist transforms. Obwohl es mehr um Bequemlichkeit und Schönheit geht ...

Standardmäßig erstellt Debezium Themen mit der folgenden Benennungsrichtlinie: serverName.schemaName.tableName. Dies ist möglicherweise nicht immer bequem. Optionen transforms Sie können reguläre Ausdrücke verwenden, um eine Liste von Tabellen zu definieren, von denen Ereignisse an ein Thema mit einem bestimmten Namen weitergeleitet werden müssen.

In unserer Konfiguration danke transforms Folgendes geschieht: Alle CDC-Ereignisse aus der überwachten Datenbank werden zu einem Thema mit dem Namen verschoben data.cdc.dbname. Andernfalls (ohne diese Einstellungen) würde Debezium standardmäßig für jede Tabelle ein Thema erstellen, etwa: pg-dev.public.<table_name>.

Connector-Einschränkungen

Um die Beschreibung der Connector-Konfiguration für PostgreSQL abzuschließen, lohnt es sich, über die folgenden Funktionen/Einschränkungen seines Betriebs zu sprechen:

  1. Die Funktionalität des Connectors für PostgreSQL basiert auf dem Konzept der logischen Dekodierung. Deshalb er verfolgt keine Anfragen zur Änderung der Datenbankstruktur (DDL) - dementsprechend werden diese Daten nicht in den Themen enthalten sein.
  2. Da Replikationssteckplätze verwendet werden, ist der Anschluss eines Connectors möglich nur zur führenden DBMS-Instanz.
  3. Wenn der Benutzer, unter dem der Connector eine Verbindung zur Datenbank herstellt, nur über Leserechte verfügt, müssen Sie vor dem ersten Start manuell einen Replikationsslot erstellen und in der Datenbank veröffentlichen.

Anwenden einer Konfiguration

Laden wir also unsere Konfiguration in den Connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Wir überprüfen, ob der Download erfolgreich war und der Connector gestartet wurde:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Großartig: Es ist aufgebaut und einsatzbereit. Stellen wir uns nun vor, ein Verbraucher zu sein und stellen eine Verbindung zu Kafka her. Anschließend fügen wir einen Eintrag in der Tabelle hinzu und ändern ihn:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

In unserem Thema wird es wie folgt angezeigt:

Sehr langes JSON mit unseren Änderungen

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In beiden Fällen bestehen Datensätze aus dem Schlüssel (PK) des geänderten Datensatzes und dem eigentlichen Wesen der Änderungen: dem, was der Datensatz vorher war und was er danach wurde.

  • Im Falle von INSERT: Wert vor (before) gleich null, und danach - die Zeile, die eingefügt wurde.
  • Im Falle von UPDATE: in payload.before der vorherige Zustand der Leitung wird angezeigt, und in payload.after – neu mit dem Wesen der Veränderung.

2.2 MongoDB

Dieser Connector verwendet den standardmäßigen MongoDB-Replikationsmechanismus und liest Informationen aus dem Oplog des primären DBMS-Knotens.

Ähnlich wie beim bereits beschriebenen Connector für PgSQL wird auch hier beim ersten Start der primäre Daten-Snapshot erstellt, woraufhin der Connector in den Oplog-Lesemodus wechselt.

Konfigurationsbeispiel:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Wie Sie sehen, gibt es hier keine neuen Optionen im Vergleich zum vorherigen Beispiel, sondern lediglich die Anzahl der Optionen, die für die Verbindung zur Datenbank zuständig sind, und deren Präfixe wurde reduziert.

Einstellungen transforms Dieses Mal machen sie Folgendes: Sie transformieren den Namen des Zielthemas aus dem Schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

Fehlertoleranz

Das Thema Fehlertoleranz und Hochverfügbarkeit ist in unserer Zeit aktueller denn je – insbesondere wenn es um Daten und Transaktionen geht, und die Verfolgung von Datenänderungen spielt bei diesem Thema keine Rolle. Schauen wir uns an, was grundsätzlich schief gehen kann und was jeweils mit Debezium passiert.

Es gibt drei Opt-out-Optionen:

  1. Kafka Connect-Fehler. Wenn Connect für den verteilten Modus konfiguriert ist, müssen mehrere Worker dieselbe Gruppen-ID festlegen. Wenn dann einer von ihnen ausfällt, wird der Connector auf einem anderen Worker neu gestartet und liest ab der letzten festgeschriebenen Position im Thema in Kafka weiter.
  2. Verbindungsverlust mit dem Kafka-Cluster. Der Connector stoppt das Lesen einfach an der Position, an der das Senden an Kafka fehlgeschlagen ist, und versucht regelmäßig, es erneut zu senden, bis der Versuch erfolgreich ist.
  3. Nichtverfügbarkeit der Datenquelle. Der Connector versucht, wie konfiguriert die Verbindung zur Quelle wiederherzustellen. Der Standardwert beträgt 16 Versuche exponentielles Backoff. Nach dem 16. erfolglosen Versuch wird die Aufgabe mit markiert gescheitert und Sie müssen es manuell über die Kafka Connect REST-Schnittstelle neu starten.
    • Im Falle von PostgreSQL Die Daten gehen nicht verloren, weil Durch die Verwendung von Replikationsslots wird verhindert, dass Sie WAL-Dateien löschen, die vom Connector nicht gelesen werden. In diesem Fall gibt es auch eine Kehrseite der Medaille: Wenn die Netzwerkverbindung zwischen dem Connector und dem DBMS für längere Zeit unterbrochen ist, besteht die Möglichkeit, dass der Speicherplatz knapp wird, was zu einem Ausfall von führen kann das gesamte DBMS.
    • Im Falle von MySQL Binlog-Dateien können vom DBMS selbst rotiert werden, bevor die Konnektivität wiederhergestellt wird. Dies führt dazu, dass der Connector in den ausgefallenen Zustand wechselt. Um den normalen Betrieb wiederherzustellen, müssen Sie im anfänglichen Snapshot-Modus neu starten, um mit dem Lesen aus Binlogs fortzufahren.
    • Про MongoDB. In der Dokumentation heißt es: Das Verhalten des Connectors für den Fall, dass Protokoll-/Oplog-Dateien gelöscht wurden und der Connector den Lesevorgang nicht an der Stelle fortsetzen kann, an der er aufgehört hat, ist für alle DBMS gleich. Dies bedeutet, dass der Connector in den Status wechselt gescheitert und erfordert einen Neustart im Modus erster Schnappschuss.

      Es gibt jedoch Ausnahmen. Wenn der Connector für längere Zeit getrennt war (oder die MongoDB-Instanz nicht erreichen konnte) und das Oplog während dieser Zeit rotiert wurde, liest der Connector nach Wiederherstellung der Verbindung ruhig weiterhin Daten von der ersten verfügbaren Position. Aus diesem Grund sind einige Daten in Kafka nicht wird treffen.

Abschluss

Debezium ist meine erste Erfahrung mit CDC-Systemen und insgesamt sehr positiv. Das Projekt überzeugte durch die Unterstützung wichtiger DBMS, die einfache Konfiguration, die Clustering-Unterstützung und die aktive Community. Für Praxisinteressierte empfehle ich die Lektüre der Ratgeber Kafka Connect и Debezium.

Im Vergleich zum JDBC-Connector für Kafka Connect besteht der Hauptvorteil von Debezium darin, dass Änderungen aus den DBMS-Protokollen gelesen werden, wodurch Daten mit minimaler Latenz empfangen werden können. Der JDBC Connector (von Kafka Connect) fragt die überwachte Tabelle in einem festen Intervall ab und generiert (aus dem gleichen Grund) keine Meldungen, wenn Daten gelöscht werden (wie kann man Daten abfragen, die nicht vorhanden sind?).

Um ähnliche Probleme zu lösen, können Sie (zusätzlich zu Debezium) auf die folgenden Lösungen achten:

PS

Lesen Sie auch auf unserem Blog:

Source: habr.com

Kommentar hinzufügen