Pagpaila sa Debezium - CDC para sa Apache Kafka

Pagpaila sa Debezium - CDC para sa Apache Kafka

Sa akong trabaho, kanunay akong makit-an ang mga bag-ong teknikal nga solusyon / mga produkto sa software, kasayuran bahin sa kung diin nihit sa Internet nga nagsultig Ruso. Uban niini nga artikulo, sulayan nako nga pun-on ang usa sa ingon nga gintang sa usa ka pananglitan gikan sa akong bag-o nga praktis, kung kinahanglan nako nga i-set up ang pagpadala sa mga panghitabo sa CDC gikan sa duha ka sikat nga DBMS (PostgreSQL ug MongoDB) sa usa ka cluster sa Kafka gamit ang Debezium. Nanghinaut ko nga kini nga artikulo sa pagrepaso, nga nagpakita isip resulta sa trabaho nga nahimo, mahimong mapuslanon sa uban.

Unsa ang Debezium ug CDC sa kinatibuk-an?

Debezium - Representante sa kategorya sa software sa CDC (Pagkuha sa pagbag-o sa datos), o mas tukma, kini usa ka hugpong sa mga konektor alang sa lainlaing mga DBMS nga nahiuyon sa balangkas sa Apache Kafka Connect.

kini open source nga proyekto, lisensyado ubos sa Apache License v2.0 ug gipasiugdahan sa Red Hat. Ang pag-uswag nagsugod sukad sa 2016 ug sa pagkakaron naghatag kini og opisyal nga suporta alang sa mosunod nga DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Adunay usab mga konektor alang sa Cassandra ug Oracle, apan sila sa pagkakaron anaa sa "sayo nga pag-access" nga kahimtang, ug ang mga bag-ong pagpagawas dili makagarantiya sa atrasado nga pagkaangay.

Kung atong itandi ang CDC sa tradisyonal nga pamaagi (sa diha nga ang aplikasyon nagbasa sa datos gikan sa DBMS direkta), nan ang mga nag-unang bentaha niini naglakip sa pagpatuman sa pagbag-o sa datos nga streaming sa lebel sa laray nga adunay ubos nga latency, taas nga kasaligan ug anaa. Ang katapusan nga duha ka mga punto nakab-ot pinaagi sa paggamit sa usa ka Kafka cluster isip usa ka repository alang sa CDC nga mga panghitabo.

Usab, ang mga bentaha naglakip sa kamatuoran nga ang usa ka modelo gigamit sa pagtipig sa mga panghitabo, mao nga ang katapusan nga aplikasyon dili kinahanglan nga mabalaka mahitungod sa mga nuances sa operating lain-laing mga DBMS.

Sa katapusan, ang paggamit sa usa ka mensahe broker nagbukas sa sakup alang sa pinahigda nga pag-scale sa mga aplikasyon nga nagsubay sa mga pagbag-o sa datos. Sa samang higayon, ang epekto sa tinubdan sa datos gipakunhod, tungod kay ang datos nadawat dili direkta gikan sa DBMS, apan gikan sa Kafka cluster.

Mahitungod sa arkitektura sa Debezium

Ang paggamit sa Debezium moabut sa kini nga yano nga laraw:

DBMS (isip tinubdan sa datos) β†’ connector sa Kafka Connect β†’ Apache Kafka β†’ consumer

Ingon usa ka ilustrasyon, maghatag ako usa ka diagram gikan sa website sa proyekto:

Pagpaila sa Debezium - CDC para sa Apache Kafka

Bisan pa, dili gyud ko ganahan niini nga laraw, tungod kay ingon og usa ra ka konektor sa lababo ang posible.

Sa tinuud, lahi ang kahimtang: pagpuno sa imong Data Lake (katapusang link sa dayagram sa ibabaw) dili lamang ang paagi sa paggamit sa Debezium. Ang mga panghitabo nga gipadala sa Apache Kafka mahimong gamiton sa imong mga aplikasyon sa pag-atubang sa lainlaing mga sitwasyon. Pananglitan:

  • pagtangtang sa wala'y kalabutan nga datos gikan sa cache;
  • pagpadala mga pahibalo;
  • mga update sa index sa pagpangita;
  • pipila ka matang sa audit logs;
  • ...

Sa kaso nga ikaw adunay usa ka Java nga aplikasyon ug walay panginahanglan/posibilidad sa paggamit sa usa ka Kafka cluster, adunay usab ang posibilidad sa pagtrabaho pinaagi sa embedded connector. Ang klaro nga dugang mao nga uban niini mahimo nimong isalikway ang dugang nga imprastraktura (sa porma sa usa ka konektor ug Kafka). Bisan pa, kini nga solusyon wala na gigamit sukad sa bersyon 1.1 ug wala na girekomenda alang sa paggamit (kini mahimong tangtangon sa umaabot nga mga pagpagawas).

Kini nga artikulo maghisgot sa arkitektura nga girekomenda sa mga developers, nga naghatag ug fault tolerance ug scalability.

Konfigurasyon sa konektor

Aron masugdan ang pagsubay sa mga pagbag-o sa labing hinungdanon nga kantidad - data - kinahanglan namon:

  1. tinubdan sa datos, nga mahimong MySQL sugod sa bersyon 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (bug-os nga listahan);
  2. Ang Apache Kafka cluster
  3. Kafka Connect pananglitan (bersyon 1.x, 2.x);
  4. gi-configure nga Debezium connector.

Pagtrabaho sa unang duha ka punto, i.e. ang proseso sa pag-instalar sa usa ka DBMS ug Apache Kafka lapas sa kasangkaran sa artikulo. Bisan pa, alang niadtong gusto nga i-deploy ang tanan sa usa ka sandbox, adunay usa nga andam sa opisyal nga repository nga adunay mga pananglitan. docker-compose.yaml.

Atong hisgotan ang katapusang duha ka punto sa mas detalyado.

0. Kafka Sumpaysumpaya

Dinhi ug sa ulahi sa artikulo, ang tanan nga mga pananglitan sa pag-configure gikonsiderar sa konteksto sa imahe sa Docker nga gipang-apod-apod sa mga developer sa Debezium. Naglangkob kini sa tanan nga gikinahanglan nga mga file sa plugin (mga konektor) ug naghatag og configuration sa Kafka Connect gamit ang environment variables.

Kung gusto nimong gamiton ang Kafka Connect gikan sa Confluent, kinahanglan nimo nga idugang ang mga plugin sa kinahanglan nga mga konektor sa imong kaugalingon sa direktoryo nga gitakda sa plugin.path o gibutang pinaagi sa usa ka variable sa palibot CLASSPATH. Ang mga setting alang sa trabahante sa Kafka Connect ug mga konektor gihubit pinaagi sa mga file sa pag-configure nga gipasa ingon mga argumento sa mando sa pagsugod sa trabahante. Para sa mga detalye tan-awa dokumentasyon.

Ang tibuuk nga proseso sa pag-set up sa Debeizum sa bersyon sa konektor gihimo sa duha ka yugto. Atong tagdon ang matag usa kanila:

1. Pag-set up sa framework sa Kafka Connect

Aron ma-stream ang data sa usa ka Apache Kafka cluster, ang piho nga mga parameter gitakda sa Kafka Connect framework, sama sa:

  • mga setting sa koneksyon sa cluster,
  • mga ngalan sa mga hilisgutan diin ang pag-configure sa konektor mismo itago,
  • ang ngalan sa grupo diin ang connector nagdagan (sa kaso sa paggamit sa distributed mode).

Ang opisyal nga imahe sa Docker sa proyekto nagsuporta sa pag-configure gamit ang mga variable sa palibot - kini ang among gamiton. Busa atong i-download ang hulagway:

docker pull debezium/connect

Ang minimum nga set sa environment variables nga gikinahanglan sa pagpadagan sa connector mao ang mosunod:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - pasiunang lista sa mga server sa cluster sa Kafka aron makakuha og kompletong listahan sa mga miyembro sa cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets - usa ka hilisgutan alang sa pagtipig sa mga posisyon diin ang konektor karon nahimutang;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - usa ka hilisgutan alang sa pagtipig sa kahimtang sa konektor ug mga buluhaton niini;
  • CONFIG_STORAGE_TOPIC=connector-config - usa ka hilisgutan alang sa pagtipig sa data sa pagsumpo sa connector ug sa mga buluhaton niini;
  • GROUP_ID=1 - identifier sa grupo sa mga trabahante diin ang tahas sa pagkonekta mahimong ipatuman; gikinahanglan kon gamiton ang gipang-apod-apod (giapod-apod) rehimen.

Gisugdan namon ang sudlanan nga adunay kini nga mga variable:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Matikdi mahitungod sa Avro

Sa kasagaran, ang Debezium nagsulat sa datos sa JSON nga format, nga madawat alang sa mga sandbox ug gamay nga gidaghanon sa datos, apan mahimong usa ka problema sa mga database nga daghan kaayo. Usa ka alternatibo sa JSON converter mao ang pag-serialize sa mga mensahe gamit Avro ngadto sa binary format, nga makapamenos sa load sa I/O subsystem sa Apache Kafka.

Aron magamit ang Avro, kinahanglan nimo nga magbutang usa ka lahi schema-registry (alang sa pagtipig sa mga schemas). Ang mga baryable alang sa converter tan-awon sama niini:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Ang mga detalye sa paggamit sa Avro ug pag-set up sa usa ka registry alang niini lapas pa sa sakup sa artikulo - dugang pa, alang sa katin-awan, among gamiton ang JSON.

2. Pag-set up sa connector mismo

Karon mahimo ka nga direktang moadto sa pag-configure sa konektor mismo, nga magbasa sa datos gikan sa gigikanan.

Atong tan-awon ang pananglitan sa mga konektor alang sa duha ka DBMS: PostgreSQL ug MongoDB, diin ako adunay kasinatian ug diin adunay mga kalainan (bisan gamay, apan sa pipila ka mga kaso hinungdanon!).

Ang configuration gihulagway sa JSON notation ug gi-upload sa Kafka Connect gamit ang POST request.

2.1. PostgreSQL

Panig-ingnan sa pag-configure sa konektor alang sa PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ang prinsipyo sa operasyon sa connector human niini nga pagsumpo kay yano ra:

  • Sa una nga pagsugod, kini nagkonektar sa database nga gitakda sa configuration ug magsugod sa mode inisyal nga snapshot, pagpadala sa Kafka sa inisyal nga set sa datos nga nadawat nga adunay kondisyon SELECT * FROM table_name.
  • Human makompleto ang initialization, ang connector mosulod sa mode sa pagbasa sa mga kausaban gikan sa PostgreSQL WAL files.

Mahitungod sa mga opsyon nga gigamit:

  • name - ang ngalan sa connector diin gigamit ang configuration nga gihulagway sa ubos; sa umaabot, kini nga ngalan gigamit sa pagtrabaho uban sa connector (ie tan-awa ang status / restart / update sa configuration) pinaagi sa Kafka Connect REST API;
  • connector.class β€” ang DBMS connector class nga gamiton sa configured connector;
  • plugin.name mao ang ngalan sa plugin alang sa lohikal nga pag-decode sa datos gikan sa WAL files. Mahimong mapilian wal2json, decoderbuffs ΠΈ pgoutput. Ang unang duha nagkinahanglan sa pag-instalar sa angay nga mga extension sa DBMS, ug pgoutput alang sa PostgreSQL nga bersyon 10 ug mas taas wala magkinahanglan og dugang nga mga manipulasyon;
  • database.* β€” mga kapilian sa pagkonektar sa database, diin database.server.name - ang ngalan sa PostgreSQL instance nga gigamit sa pagporma sa ngalan sa hilisgutan sa Kafka cluster;
  • table.include.list - usa ka lista sa mga lamesa diin gusto namon nga masubay ang mga pagbag-o; gihatag sa porma schema.table_name; dili mahimong gamiton uban sa table.exclude.list;
  • heartbeat.interval.ms - agwat (sa milliseconds) diin ang konektor nagpadala mga mensahe sa pinitik sa kasingkasing sa usa ka espesyal nga hilisgutan;
  • heartbeat.action.query - usa ka hangyo nga ipatuman kung ipadala ang matag mensahe sa pinitik sa kasingkasing (ang kapilian nagpakita sukad sa bersyon 1.1);
  • slot.name - ang ngalan sa replication slot nga gamiton sa connector;
  • publication.name - Ngalan mga publikasyon sa PostgreSQL nga gigamit sa connector. Kung wala kini, ang Debezium mosulay sa paghimo niini. Kung ang tiggamit diin gihimo ang koneksyon wala’y igong katungod alang niini nga aksyon, ang konektor mogawas nga adunay sayup;
  • transforms nagtino kon unsaon pag-usab ang ngalan sa gipunting nga hilisgutan:
    • transforms.AddPrefix.type nagpakita nga mogamit kita ug regular nga mga ekspresyon;
    • transforms.AddPrefix.regex - maskara diin ang ngalan sa gipunting nga hilisgutan gibag-o;
    • transforms.AddPrefix.replacement - direkta kung unsa ang among gi-redefine.

Dugang pa bahin sa pinitik sa kasingkasing ug pagbag-o

Sa kasagaran, ang connector nagpadala ug data ngadto sa Kafka alang sa matag nahimo nga transaksyon, ug isulat ang LSN (Log Sequence Number) niini ngadto sa service topic. offset. Apan unsa ang mahitabo kung ang connector gi-configure aron dili basahon ang tibuok database, apan bahin lamang sa mga lamesa niini (diin ang data dili kanunay nga gi-update)?

  • Ang connector mobasa sa WAL files ug dili makamatikod sa mga transaksyon nga gihimo niini ngadto sa mga lamesa nga gibantayan niini.
  • Busa, dili kini mag-update sa kasamtangan nga posisyon niini sa hilisgutan o sa replication slot.
  • Kini, sa baylo, magpahinabo sa mga WAL nga mga file nga "ma-stuck" sa disk ug lagmit mahurot ang espasyo sa disk.

Ug dinhi ang mga kapilian moabut aron sa pagluwas. heartbeat.interval.ms ΠΈ heartbeat.action.query. Ang paggamit niini nga mga kapilian nga magkapares nagpaposible sa pagpatuman sa usa ka hangyo nga usbon ang datos sa usa ka bulag nga lamesa sa matag higayon nga ipadala ang usa ka mensahe sa pinitik sa kasingkasing. Sa ingon, ang LSN diin ang konektor karon nahimutang (sa replication slot) kanunay nga gi-update. Gitugotan niini ang DBMS sa pagtangtang sa mga WAL file nga dili na kinahanglan. Alang sa dugang nga impormasyon kon sa unsang paagi ang mga opsyon molihok, tan-awa dokumentasyon.

Ang laing kapilian nga angay nga hatagan ug pagtagad mao ang transforms. Bisan kung kini labi pa bahin sa kasayon ​​​​ug katahum ...

Sa kasagaran, ang Debezium nagmugna og mga hilisgutan gamit ang mosunod nga palisiya sa pagngalan: serverName.schemaName.tableName. Mahimong dili kini kanunay nga kombenyente. Mga kapilian transforms gamit ang regular nga mga ekspresyon, mahimo nimong ipasabut ang usa ka lista sa mga lamesa kansang mga panghitabo kinahanglan nga madala sa usa ka hilisgutan nga adunay usa ka piho nga ngalan.

Sa among configuration salamat sa transforms ang mosunod nga mahitabo: ang tanan nga mga panghitabo sa CDC gikan sa gisubay nga database moadto sa hilisgutan nga adunay ngalan data.cdc.dbname. Kung dili (kung wala kini nga mga setting), ang Debezium sa default maghimo usa ka hilisgutan alang sa matag lamesa sa porma: pg-dev.public.<table_name>.

Mga limitasyon sa konektor

Sa katapusan sa paghulagway sa connector configuration alang sa PostgreSQL, kini mao ang bili sa paghisgot mahitungod sa mosunod nga mga bahin / limitasyon sa iyang buhat:

  1. Ang connector functionality para sa PostgreSQL nagsalig sa konsepto sa logical decoding. Busa siya wala magsubay sa mga hangyo nga usbon ang istruktura sa database (DDL) - sa ingon, kini nga datos dili sa mga hilisgutan.
  2. Tungod kay gigamit ang mga slot sa replikasyon, posible ang koneksyon sa konektor lamang ngadto sa master nga DBMS nga pananglitan.
  3. Kung ang user diin ang connector nagkonektar sa database adunay read-only nga mga katungod, unya sa dili pa ang unang paglansad, kinahanglan nimo nga mano-mano ang paghimo og replication slot ug i-publish sa database.

Pagpadapat sa usa ka Configuration

Busa atong i-load ang atong configuration ngadto sa connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Gisusi namo nga malampuson ang pag-download ug nagsugod ang connector:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Maayo: kini gipahimutang ug andam na nga moadto. Karon magpakaaron-ingnon ta nga usa ka konsumidor ug magkonektar sa Kafka, pagkahuman atong idugang ug usbon ang usa ka entry sa lamesa:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Sa among hilisgutan, kini ipakita sama sa mosunod:

Taas kaayo nga JSON sa among mga pagbag-o

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Sa duha ka mga kaso, ang mga rekord naglangkob sa yawe (PK) sa rekord nga giusab, ug ang esensya sa mga pagbag-o: unsa ang rekord kaniadto ug unsa kini pagkahuman.

  • Sa kaso sa INSERT: bili kaniadto (before) managsama nullgisundan sa pisi nga gisulod.
  • Sa kaso sa UPDATE: sa payload.before ang miaging kahimtang sa laray gipakita, ug sa payload.after - bag-o nga adunay esensya sa pagbag-o.

2.2 MongoDB

Kini nga connector naggamit sa standard nga mekanismo sa replikasyon sa MongoDB, nagbasa sa impormasyon gikan sa oplog sa DBMS primary node.

Susama sa gihulagway na nga konektor para sa PgSQL, dinhi usab, sa una nga pagsugod, ang panguna nga snapshot sa datos gikuha, pagkahuman ang konektor nagbalhin sa oplog reading mode.

Pananglitan sa pag-configure:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Sama sa imong nakita, wala’y bag-ong mga kapilian kung itandi sa miaging pananglitan, apan ang gidaghanon lamang sa mga kapilian nga responsable sa pagkonektar sa database ug ang ilang mga prefix ang nakunhuran.

Mga Setting transforms niining higayona ilang buhaton ang mosunod: ibalik ang ngalan sa target nga hilisgutan gikan sa laraw <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

pagtugot sa sayop

Ang isyu sa fault tolerance ug taas nga pagkaanaa sa atong panahon mas grabe kaysa kaniadto - labi na kung maghisgot kita bahin sa mga datos ug mga transaksyon, ug ang pagsubay sa pagbag-o sa datos wala sa sideline bahin niini nga butang. Atong tan-awon kung unsa ang mahimong sayup sa prinsipyo ug kung unsa ang mahitabo sa Debezium sa matag kaso.

Adunay tulo ka mga opsyon sa pag-opt out:

  1. Kapakyasan sa Kafka Connect. Kung ang Connect gi-configure aron magtrabaho sa giapod-apod nga mode, kini nanginahanglan daghang mga trabahante nga magbutang sa parehas nga grupo.id. Dayon, kung ang usa niini mapakyas, ang connector i-restart sa laing trabahante ug ipadayon ang pagbasa gikan sa katapusang gitugyan nga posisyon sa hilisgutan sa Kafka.
  2. Pagkawala sa koneksyon sa Kafka cluster. Ang connector mohunong lamang sa pagbasa sa posisyon nga napakyas sa pagpadala sa Kafka ug matag karon ug unya mosulay sa pagpadala niini hangtud nga ang pagsulay molampos.
  3. Dili magamit ang tinubdan sa datos. Ang connector mosulay sa pagkonektar pag-usab ngadto sa tinubdan sumala sa configuration. Ang default mao ang 16 ka pagsulay sa paggamit exponential backoff. Human sa ika-16 nga napakyas nga pagsulay, ang buluhaton markahan nga napakyas ug kini kinahanglan nga mano-mano nga i-restart pinaagi sa Kafka Connect REST interface.
    • Sa kaso sa PostgreSQL data dili mawala, tungod kay ang paggamit sa mga replication slots makapugong sa pagtangtang sa WAL files nga wala mabasa sa connector. Sa kini nga kaso, adunay usa ka downside: kung ang network connectivity tali sa connector ug sa DBMS nabalda sa dugay nga panahon, adunay usa ka higayon nga ang disk space mahurot, ug kini mahimong mosangpot sa kapakyasan sa tibuok DBMS.
    • Sa kaso sa MySQL Ang mga file sa binlog mahimong i-rotate sa DBMS mismo sa dili pa ibalik ang koneksyon. Kini ang hinungdan nga ang connector moadto sa napakyas nga kahimtang, ug kini kinahanglan nga i-restart sa inisyal nga snapshot mode aron magpadayon sa pagbasa gikan sa mga binlog aron mabalik ang normal nga operasyon.
    • sa MongoDB. Ang dokumentasyon nag-ingon: ang pamatasan sa konektor kung ang mga file sa log/oplog natangtang ug ang konektor dili makapadayon sa pagbasa gikan sa posisyon kung diin kini gibiyaan parehas sa tanan nga DBMS. Kini nahimutang sa kamatuoran nga ang connector moadto sa estado napakyas ug magkinahanglan og restart sa mode inisyal nga snapshot.

      Bisan pa, adunay mga eksepsiyon. Kung ang connector naa sa usa ka disconnected state sa dugay nga panahon (o dili makaabot sa MongoDB instance), ug ang oplog gipatuyok niining panahona, unya kung ang koneksyon gipahiuli, ang connector kalmado nga magpadayon sa pagbasa sa data gikan sa unang anaa nga posisyon. , mao nga ang pipila sa mga datos sa Kafka dili moigo.

konklusyon

Ang Debezium mao ang akong una nga kasinatian sa mga sistema sa CDC ug positibo kaayo sa kinatibuk-an. Gisuhol sa proyekto ang suporta sa nag-unang DBMS, kasayon ​​sa pag-configure, suporta alang sa clustering ug usa ka aktibong komunidad. Alang sa mga interesado sa praktis, girekomenda ko nga basahon nimo ang mga giya alang sa Kafka Connect ΠΈ Debezium.

Kung itandi sa konektor sa JDBC alang sa Kafka Connect, ang panguna nga bentaha sa Debezium mao nga ang mga pagbag-o gibasa gikan sa mga log sa DBMS, nga nagtugot sa datos nga madawat nga adunay gamay nga paglangan. Ang JDBC Connector (nga gihatag sa Kafka Connect) nagpangutana sa gisubay nga lamesa sa usa ka pirmi nga agwat ug (sa parehas nga hinungdan) dili makamugna mga mensahe kung ang data matangtang (unsaon nimo mapangutana ang datos nga wala didto?).

Aron masulbad ang susamang mga problema, mahimo nimong hatagan ug pagtagad ang mosunod nga mga solusyon (dugang sa Debezium):

PS

Basaha usab sa among blog:

Source: www.habr.com

Idugang sa usa ka comment