Aféierung Debezium - CDC fir Apache Kafka

Aféierung Debezium - CDC fir Apache Kafka

A menger Aarbecht stinn ech dacks op nei technesch Léisungen/Softwareprodukter, iwwer déi am russeschsproochege Internet zimlech knapp Informatioun ass. Mat dësem Artikel probéieren ech esou eng Lück mat engem Beispill aus menger rezenter Praxis ze fëllen, wann ech d'Konfiguratioun vun CDC Eventer vun zwee populäre DBMSs (PostgreSQL a MongoDB) an e Kafka Cluster mat Debezium konfiguréieren. Ech hoffen, datt dësen Iwwerpréiwungsartikel, deen als Resultat vun der Aarbecht erschéngt, fir anerer nëtzlech ass.

Wat ass Debezium an CDC am Allgemengen?

Debezium - Vertrieder vun der CDC Software Kategorie (Capture Data Change), oder méi präzis, et ass e Set vu Stecker fir verschidde DBMSs kompatibel mam Apache Kafka Connect Kader.

dëser Open Source Projet, lizenzéiert ënner der Apache Lizenz v2.0 a gesponsert vum Red Hat. D'Entwécklung ass amgaang zënter 2016 a gëtt de Moment offiziell Ënnerstëtzung fir déi folgend DBMSen: MySQL, PostgreSQL, MongoDB, SQL Server. Et ginn och Connectoren fir Cassandra an Oracle, awer am Moment sinn se am "fréi Zougang" Status, an nei Verëffentlechungen garantéieren keng Réckkompatibilitéit.

Wa mir CDC mat der traditioneller Approche vergläichen (wann d'Applikatioun Daten aus der DBMS direkt liest), sinn hir Haaptvirdeeler d'Ëmsetzung vun Datenännerungsstreaming um Zeilniveau mat gerénger latency, héijer Zouverlässegkeet an Disponibilitéit. Déi lescht zwee Punkte ginn erreecht andeems Dir e Kafka-Cluster als Repository fir CDC Eventer benotzt.

En anere Virdeel ass d'Tatsaach datt en eenzege Modell benotzt gëtt fir Eventer ze späicheren, sou datt d'Ennapplikatioun keng Suergen iwwer d'Nuancen vun der Operatioun vu verschiddenen DBMSs muss maachen.

Schlussendlech, andeems Dir e Message Broker benotzt, erlaabt Uwendungen déi Ännerungen an Daten iwwerwaachen fir horizontal ze skaléieren. Zur selwechter Zäit gëtt den Impakt op d'Datequell miniméiert, well d'Donnéeën net direkt vum DBMS kritt ginn, mee aus dem Kafka-Cluster.

Iwwer d'Debezium Architektur

D'Benotzung vun Debezium kënnt op dësen einfache Schema erof:

DBMS (als Datenquell) → Connector in Kafka Connect → Apache Kafka → Konsument

Als Illustratioun, hei ass en Diagramm vun der Projet Websäit:

Aféierung Debezium - CDC fir Apache Kafka

Wéi och ëmmer, ech hunn dëse Schema net wierklech gär, well et schéngt datt nëmmen d'Benotzung vun engem Spullstecker méiglech ass.

A Wierklechkeet ass d'Situatioun anescht: Fëllt Ären Data Lake (leschte Link am Diagramm uewen) Dëst ass net deen eenzege Wee fir Debezium ze benotzen. Eventer, déi op Apache Kafka geschéckt ginn, kënne vun Ären Uwendungen benotzt ginn fir verschidde Situatiounen ze handhaben. Zum Beispill:

  • irrelevant Daten aus dem Cache ewechhuelen;
  • schéckt Notifikatiounen;
  • Sich Index Aktualiséierungen;
  • eng Zort Audit Logbicher;
  • ...

Am Fall wou Dir eng Java Applikatioun hutt an et kee Besoin / Méiglechkeet ass e Kafka Cluster ze benotzen, gëtt et och d'Méiglechkeet duerch ze schaffen embedded-Connector. De offensichtleche Virdeel ass datt et de Besoin fir zousätzlech Infrastruktur eliminéiert (a Form vun engem Connector a Kafka). Wéi och ëmmer, dës Léisung gouf zënter der Versioun 1.1 ofgeschaaft an ass net méi recommandéiert fir ze benotzen (Ënnerstëtzung dofir kann an zukünfteg Verëffentlechunge geläscht ginn).

Dësen Artikel wäert d'Architektur diskutéieren déi vun Entwéckler recommandéiert ass, déi Feelertoleranz a Skalierbarkeet ubitt.

Connector Konfiguratioun

Fir unzefänken Ännerungen am wichtegste Wäert ze verfollegen - Daten - brauche mir:

  1. Datenquell, déi MySQL ka sinn ab Versioun 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (komplett Lëscht);
  2. Apache Kafka Stärekoup;
  3. Kafka Connect Instanz (Versiounen 1.x, 2.x);
  4. konfiguréiert Debezium Connector.

Aarbecht un déi éischt zwee Punkten, d.h. Den Installatiounsprozess vun der DBMS an Apache Kafka ass iwwer den Ëmfang vum Artikel. Wéi och ëmmer, fir déi, déi alles an der Sandkëscht wëllen ofsetzen, huet den offiziellen Repository mat Beispiller e fäerdeg docker-compose.yaml.

Op déi lescht zwee Punkte wäerte mir méi am Detail ophalen.

0. Kafka Connect

Hei a weider am Artikel ginn all Konfiguratiounsbeispiller diskutéiert am Kontext vum Docker Bild verdeelt vun den Debezium Entwéckler. Et enthält all déi néideg Plugin Dateien (Stecker) a bitt Konfiguratioun vu Kafka Connect mat Ëmfeldvariablen.

Wann Dir wëlles Kafka Connect vu Confluent ze benotzen, musst Dir onofhängeg d'Plugins vun den néidege Stecker an de Verzeechnes uginn plugin.path oder iwwer eng Ëmweltvariabel gesat CLASSPATH. Astellunge fir de Kafka Connect Aarbechter a Connectoren ginn duerch Konfiguratiounsdateien festgeluegt, déi als Argumenter un den Aarbechterstartbefehl weiderginn. Fir méi Detailer, kuckt Dokumentatioun.

De ganze Prozess fir Debeizum opzestellen an der Connector Versioun gëtt an zwou Etappen duerchgefouert. Loosst eis op jiddereng vun hinnen kucken:

1. Astelle vum Kafka Connect Kader

Fir Daten an den Apache Kafka Cluster ze streamen, ginn spezifesch Parameteren am Kafka Connect Kader gesat, sou wéi:

  • Parameteren fir d'Verbindung mam Cluster,
  • Nimm vun Themen an deenen d'Konfiguratioun vum Connector selwer direkt gespäichert gëtt,
  • den Numm vun der Grupp an där de Connector leeft (wann de verdeelte Modus benotzt gëtt).

Den offiziellen Docker-Bild vum Projet ënnerstëtzt d'Konfiguratioun mat Ëmfeldvariablen - dat ass wat mir benotzen. Also, download d'Bild:

docker pull debezium/connect

De Minimum Set vun Ëmfeldvariablen déi néideg ass fir de Connector ze lafen ass wéi follegt:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - initial Lëscht vu Kafka Cluster Server fir eng komplett Lëscht vu Cluster Memberen ze kréien;
  • OFFSET_STORAGE_TOPIC=connector-offsets - en Thema fir Positiounen ze späicheren wou de Connector momentan läit;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - Thema fir de Status vum Connector a seng Aufgaben ze späicheren;
  • CONFIG_STORAGE_TOPIC=connector-config - Thema fir Connector Konfiguratiounsdaten a seng Aufgaben ze späicheren;
  • GROUP_ID=1 - Identifizéierer vun der Grupp vun Aarbechter op deem d'Connector Aufgab ausgefouert ka ginn; néideg wann Dir verdeelt benotzt (verdeelt) Regime.

Mir starten de Container mat dëse Variabelen:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Notiz iwwer Avro

Par défaut schreift Debezium Daten am JSON-Format, wat akzeptabel ass fir Sandkëschten a kleng Quantitéiten un Daten, awer kann e Problem an héich geluedenen Datenbanken ginn. Eng Alternativ zu engem JSON Konverter ass d'Serialiséierung vu Messagen mat Avro an e binäre Format, wat d'Laascht op den I/O-Subsystem an Apache Kafka reduzéiert.

Fir Avro ze benotzen, musst Dir eng separat ofsetzen schema-registréieren (fir Diagrammer ze späicheren). D'Variabelen fir den Konverter wäerten esou ausgesinn:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detailer iwwer d'Benotzung vun Avro an d'Astellung vun der Registry dofir sinn iwwer den Ëmfang vun dësem Artikel - weider op, fir Kloerheet, benotze mir JSON.

2. Konfiguratioun vum Connector selwer

Elo kënnt Dir direkt an d'Konfiguratioun vum Connector selwer goen, deen Daten aus der Quell liesen.

Loosst eis d'Beispill vu Stecker fir zwee DBMS kucken: PostgreSQL a MongoDB, an deenen ech Erfahrung hunn an an deenen et Ënnerscheeder sinn (wann och kleng, awer an e puer Fäll bedeitend!).

D'Konfiguratioun gëtt an der JSON Notatioun beschriwwen an op Kafka Connect eropgelueden mat enger POST Ufro.

2.1. PostgreSQL

Beispill Connector Konfiguratioun fir PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

De Prinzip vun der Operatioun vum Stecker no dësem Setup ass ganz einfach:

  • Wann et fir d'éischte Kéier gestart gëtt, verbënnt et mat der Datebank, déi an der Konfiguratioun spezifizéiert ass, a fänkt am Modus un initial Schnappschëss, Schécken un Kafka den initialen Satz vun Donnéeën, déi mat der Bedingung kritt goufen SELECT * FROM table_name.
  • Nodeems d'Initialiséierung fäerdeg ass, geet de Connector an de Modus fir Ännerunge vu PostgreSQL WAL Dateien ze liesen.

Iwwer d'Optiounen benotzt:

  • name - den Numm vum Connector fir deen d'Konfiguratioun hei ënnendrënner benotzt gëtt; an Zukunft gëtt dësen Numm benotzt fir mam Connector ze schaffen (dh, kuckt de Status / Restart / Update d'Konfiguratioun) iwwer de Kafka Connect REST API;
  • connector.class - DBMS Connector Klass déi vum konfiguréierte Connector benotzt gëtt;
  • plugin.name - den Numm vum Plugin fir logesch Dekodéierung vun Daten aus WAL Dateien. Verfügbar fir ze wielen wal2json, decoderbuffs и pgoutput. Déi éischt zwee erfuerderen Installatioun vun de passenden Extensiounen am DBMS, an pgoutput fir PostgreSQL Versioun 10 a méi héich erfuerdert keng zousätzlech Manipulatiounen;
  • database.* - Optiounen fir eng Verbindung mat der Datebank, wou database.server.name - PostgreSQL Instanz Numm benotzt fir den Thema Numm am Kafka Cluster ze bilden;
  • table.include.list - eng Lëscht vun Dëscher an deenen mir Ännerungen verfollegen wëllen; am Format spezifizéiert schema.table_name; kann net zesumme benotzt ginn mat table.exclude.list;
  • heartbeat.interval.ms - Intervall (a Millisekonnen) mat deem de Connector Häerzschlagmeldungen op e speziellt Thema schéckt;
  • heartbeat.action.query - eng Ufro déi ausgefouert gëtt wann Dir all Häerzschlagmeldung schéckt (d'Optioun erschéngt an der Versioun 1.1);
  • slot.name - den Numm vum Replikatiounslot deen vum Connector benotzt gëtt;
  • publication.name - Numm Publikatiounen am PostgreSQL, deen de Connector benotzt. Wann et net existéiert, probéiert Debezium et ze kreéieren. Wann de Benotzer, ënner deem d'Verbindung gemaach gëtt, net genuch Rechter fir dës Aktioun huet, gëtt de Connector mat engem Feeler ofgeschloss;
  • transforms bestëmmt genau wéi den Numm vum Zilthema geännert gëtt:
    • transforms.AddPrefix.type weist datt mir regelméisseg Ausdréck benotzen;
    • transforms.AddPrefix.regex - eng Mask déi den Numm vum Zilthema nei definéiert;
    • transforms.AddPrefix.replacement - direkt wat mir nei definéieren.

Méi iwwer Häerzschlag an Transformatiounen

Par défaut schéckt de Connector Daten un Kafka fir all engagéiert Transaktioun, a seng LSN (Log Sequence Number) gëtt am Service Thema opgeholl offset. Awer wat geschitt wann de Connector konfiguréiert ass fir net déi ganz Datebank ze liesen, awer nëmmen en Deel vu sengen Dëscher (an deenen Datenupdates net dacks optrieden)?

  • De Connector liest WAL Dateien a erkennt keng Transaktiounsverpflichtungen un d'Dëscher déi se iwwerwaacht.
  • Dofir wäert et seng aktuell Positioun weder am Thema oder am Replikatiounslot aktualiséieren.
  • Dëst, am Tour, wäert zu WAL-Dateien op der Disk gehale ginn a méiglecherweis aus dem Disk Space lafen.

An dat ass wou Optiounen zur Rettung kommen. heartbeat.interval.ms и heartbeat.action.query. Dës Optiounen a Pairen ze benotzen mécht et méiglech eng Ufro ze maachen fir Daten an enger separater Tabell z'änneren all Kéier wann en Häerzschlag Message geschéckt gëtt. Also gëtt den LSN, op deem de Connector momentan läit (am Replikatiounsslot) dauernd aktualiséiert. Dëst erlaabt d'DBMS WAL Dateien ze läschen déi net méi gebraucht ginn. Dir kënnt méi léieren wéi d'Optiounen funktionnéieren Dokumentatioun.

Eng aner Optioun, déi méi no opmierksam ass transforms. Och wann et méi ëm Komfort a Schéinheet geet ...

Par défaut erstellt Debezium Themen mat der folgender Nummpolitik: serverName.schemaName.tableName. Dëst kann net ëmmer bequem sinn. Optiounen transforms Dir kënnt reegelméisseg Ausdréck benotzen fir eng Lëscht vun Dëscher ze definéieren, Eventer aus deenen op en Thema mat engem spezifeschen Numm routéiert musse ginn.

An eiser Configuratioun Merci transforms déi folgend geschitt: all CDC Eventer aus der iwwerwaachter Datebank ginn op en Thema mam Numm data.cdc.dbname. Soss (ouni dës Astellungen), géif Debezium par défaut en Thema fir all Dësch erstellen wéi: pg-dev.public.<table_name>.

Connector Aschränkungen

Fir d'Beschreiwung vun der Connectorkonfiguratioun fir PostgreSQL ofzeschléissen, ass et derwäert iwwer déi folgend Funktiounen / Aschränkungen vu senger Operatioun ze schwätzen:

  1. D'Funktionalitéit vum Connector fir PostgreSQL hänkt vum Konzept vun der logescher Decodéierung of. Dofir huet hien verfollegt keng Ufroe fir d'Datebankstruktur z'änneren (DDL) - deementspriechend sinn dës Donnéeën net an den Themen.
  2. Zënter Replikatiounsplazen benotzt ginn, ass e Connector méiglech nëmmen op déi féierend DBMS Instanz.
  3. Wann de Benotzer ënner deem de Connector mat der Datebank verbënnt nëmmen Liesrechter huet, da musst Dir virum éischte Start manuell e Replikatiounsslot erstellen an an d'Datebank publizéieren.

Uwendung vun der Konfiguratioun

Also loosst eis eis Konfiguratioun an de Connector lueden:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Mir kontrolléieren ob den Download erfollegräich war an de Connector ugefaang huet:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Super: et ass opgestallt a prett fir ze goen. Loosst eis elo wéi e Konsument sinn a mat Kafka verbannen, duerno wäerte mir eng Entrée an der Tabell addéieren an änneren:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

An eisem Thema gëtt et wéi follegt ugewisen:

Ganz laang JSON mat eisen Ännerungen

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

A béide Fäll besteet d'Records aus dem Schlëssel (PK) vum Rekord deen geännert gouf, an d'Essenz vun den Ännerungen: wat de Rekord virdru war a wat et duerno gouf.

  • Am Fall vun INSERT: Wäert virun (before) gläich null, an no - d'Linn déi agebaut gouf.
  • Am Fall vun UPDATE: an v payload.before de viregten Zoustand vun der Linn gëtt ugewisen, an an payload.after - nei mat der Essenz vun Ännerungen.

2.2 MongoDB

Dëse Connector benotzt de Standard MongoDB Replikatiounsmechanismus, liest Informatioun aus dem Oplog vum primäre DBMS Node.

Ähnlech wéi de scho beschriwwenen Connector fir PgSQL, och hei, um éischte Start, gëtt de primäre Date-Snapshot geholl, duerno wiesselt de Connector an den Oplog Liesmodus.

Konfiguratioun Beispill:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Wéi Dir gesitt, ginn et keng nei Optiounen hei am Verglach zum fréiere Beispill, awer nëmmen d'Zuel vun den Optiounen, déi verantwortlech sinn fir mat der Datebank ze verbannen an hir Präfixe gouf reduzéiert.

Настройки transforms dës Kéier maachen se déi folgend: si transforméieren den Numm vum Zilthema aus dem Schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

Feeler Toleranz

D'Fro vun der Fehltoleranz an der héijer Disponibilitéit an eiser Zäit ass méi akut wéi jee - besonnesch wa mir iwwer Daten an Transaktioune schwätzen, an d'Verfollegung vun Donnéeën Ännerungen an dësem Thema net ophalen. Loosst eis kucken wat am Prinzip falsch ka goen a wat op all Fall mam Debezium geschitt.

Et ginn dräi Opt-out Optiounen:

  1. Kafka Connect Feeler. Wann Connect konfiguréiert ass fir am verdeelt Modus ze schaffen, erfuerdert dat e puer Aarbechter fir déiselwecht group.id ze setzen. Dann, wann ee vun hinnen feelt, gëtt de Connector op engem aneren Aarbechter nei gestart a liest weider vun der leschter engagéierter Positioun am Kafka Thema.
  2. Verloscht vu Konnektivitéit mam Kafka Cluster. De Connector wäert einfach ophalen op der Positioun ze liesen déi net un de Kafka geschéckt gouf, a wäert periodesch probéieren et nei ze schécken bis de Versuch geléngt.
  3. Datequell Onverfügbarkeet. De Connector wäert probéieren op d'Quell ze konnektéieren wéi konfiguréiert. De Standard ass 16 Versuche benotzt exponentiell Réckgang. Nom 16. Mëssgléckt Versuch gëtt d'Aufgab markéiert als net fonktionnéiert an Dir musst et manuell iwwer de Kafka Connect REST Interface nei starten.
    • Am Fall vun PostgreSQL d'Donnéeë ginn net verluer, well D'Benotzung vu Replikatiounsplazen verhënnert datt Dir WAL Dateien läschen déi net vum Connector gelies ginn. An dësem Fall gëtt et och e Nodeel vun der Mënz: wann d'Netzverbindung tëscht dem Connector an dem DBMS fir eng laang Zäit ënnerbrach ass, ass et méiglech datt d'Plaz vum Disk leeft, an dëst kann zu engem Echec féieren déi ganz DBMS.
    • Am Fall vun MySQL binlog Dateien kënne vun der DBMS selwer rotéiert ginn ier d'Konnektivitéit restauréiert gëtt. Dëst wäert féieren datt de Connector an de gescheitert Staat geet, a fir normal Operatioun ze restauréieren, musst Dir am initialen Snapshot Modus nei starten fir weider vu Binlogs ze liesen.
    • op MongoDB. D'Dokumentatioun seet: d'Behuele vum Connector am Fall wou Log-/Oplog-Dateien geläscht goufen an de Connector kann net weider liesen aus der Positioun wou et opgehalen huet ass d'selwecht fir all DBMSs. Et heescht datt de Connector an de Staat geet net fonktionnéiert a wäert am Modus nei starten initial Schnappschëss.

      Allerdéngs ginn et Ausnahmen. Wann de Connector fir eng laang Zäit ofgeschalt gouf (oder d'MongoDB Instanz net konnt erreechen), an den Oplog ass während dëser Zäit duerch d'Rotatioun gaang, dann wann d'Verbindung restauréiert ass, wäert de Connector roueg weider Daten aus der éischter verfügbarer Positioun liesen, dofir e puer vun den Donnéeën am Kafka Net wäert schloen.

Konklusioun

Debezium ass meng éischt Erfahrung mat CDC Systemer an allgemeng ganz positiv. De Projet huet gewonnen mat senger Ënnerstëtzung fir grouss DBMSs, Einfachheet vun der Konfiguratioun, Cluster Support, an aktiv Gemeinschaft. Fir déi interesséiert Praxis, Ech recommandéieren, datt Dir d'Guide liesen fir Kafka Connect и Debezium.

Am Verglach zum JDBC Connector fir Kafka Connect ass den Haaptvirdeel vum Debezium datt d'Ännerunge vun den DBMS Logbicher gelies ginn, wat et erlaabt Daten mat minimaler latency ze kréien. De JDBC Connector (vum Kafka Connect) freet den iwwerwaachte Dësch mat engem fixen Intervall an (aus deemselwechte Grond) generéiert keng Messagen wann d'Donnéeën geläscht ginn (wéi kënnt Dir Daten ufroen déi net existéieren?).

Fir ähnlech Problemer ze léisen, kënnt Dir op déi folgend Léisungen oppassen (zousätzlech zu Debezium):

PS

Liest och op eisem Blog:

Source: will.com

Setzt e Commentaire