Qhia Debezium - CDC rau Apache Kafka

Qhia Debezium - CDC rau Apache Kafka

Hauv kuv txoj haujlwm, kuv feem ntau tuaj hla cov kev daws teeb meem tshiab / cov khoom siv software, cov ntaub ntawv hais txog qhov uas tsis tshua muaj nyob rau hauv Internet hais lus Lavxias. Nrog rau tsab xov xwm no, kuv yuav sim ua kom tiav ib qho sib txawv nrog ib qho piv txwv los ntawm kuv qhov kev xyaum tsis ntev los no, thaum kuv xav tau teeb tsa xa CDC cov xwm txheej los ntawm ob lub npe nrov DBMSs (PostgreSQL thiab MongoDB) mus rau Kafka pawg siv Debezium. Kuv vam tias tsab xov xwm tshuaj xyuas no, uas tshwm sim los ntawm kev ua tiav, yuav muaj txiaj ntsig zoo rau lwm tus.

Dab tsi yog Debezium thiab CDC feem ntau?

Debezium - Tus neeg sawv cev ntawm CDC software qeb (Khaws cov ntaub ntawv hloov), lossis ntau dua qhov tseeb, nws yog cov txheej txheem txuas rau ntau yam DBMSs uas tau sib xws nrog Apache Kafka Txuas lub moj khaum.

no qhib qhov project, muaj ntawv tso cai raws li Apache License v2.0 thiab txhawb nqa los ntawm Red Hat. Kev txhim kho tau pib txij li xyoo 2016 thiab tam sim no nws muab kev txhawb nqa rau DBMS hauv qab no: MySQL, PostgreSQL, MongoDB, SQL Server. Kuj tseem muaj cov khoom sib txuas rau Cassandra thiab Oracle, tab sis tam sim no lawv nyob rau hauv "kev nkag mus rau thaum ntxov", thiab cov kev tshaj tawm tshiab tsis lav rov qab sib raug zoo.

Yog tias peb sib piv CDC nrog rau txoj hauv kev ib txwm muaj (thaum daim ntawv thov nyeem cov ntaub ntawv los ntawm DBMS ncaj qha), tom qab ntawd nws cov txiaj ntsig tseem ceeb suav nrog kev siv cov ntaub ntawv hloov pauv streaming ntawm qib kab nrog qis latency, muaj kev ntseeg siab thiab muaj. Ob lub ntsiab lus kawg yog ua tiav los ntawm kev siv Kafka pawg ua qhov chaw cia rau CDC cov xwm txheej.

Tsis tas li, qhov zoo muaj xws li qhov tseeb tias ib tus qauv siv los khaws cov xwm txheej, yog li daim ntawv thov zaum kawg tsis tas yuav txhawj xeeb txog cov nuances ntawm kev khiav hauj lwm sib txawv DBMS.

Thaum kawg, siv cov lus broker qhib qhov dav rau kab rov tav scaling ntawm daim ntawv thov uas taug qab cov kev hloov pauv hauv cov ntaub ntawv. Nyob rau tib lub sijhawm, qhov cuam tshuam ntawm cov ntaub ntawv tau raug txo qis, vim cov ntaub ntawv tau txais tsis ncaj qha los ntawm DBMS, tab sis los ntawm Kafka pawg.

Hais txog Debezium architecture

Kev siv Debezium los txog rau lub tswv yim yooj yim no:

DBMS (raws li cov ntaub ntawv qhov chaw) β†’ connector nyob rau hauv Kafka Connect β†’ Apache Kafka β†’ neeg siv

Raws li ib qho piv txwv, kuv yuav muab ib daim duab los ntawm qhov project website:

Qhia Debezium - CDC rau Apache Kafka

Txawm li cas los xij, kuv tsis nyiam cov tswv yim no, vim nws zoo nkaus li tias tsuas yog lub dab dej txuas tuaj yeem ua tau.

Hauv kev muaj tiag, qhov xwm txheej txawv: sau koj cov ntaub ntawv Lake (Qhov kawg txuas hauv daim duab saum toj no) tsis yog tib txoj kev siv Debezium. Cov xwm txheej xa mus rau Apache Kafka tuaj yeem siv los ntawm koj daim ntawv thov los cuam tshuam nrog ntau yam xwm txheej. Piv txwv li:

  • tshem tawm cov ntaub ntawv tsis cuam tshuam los ntawm cache;
  • xa cov ntawv ceeb toom;
  • tshawb nrhiav index hloov tshiab;
  • qee yam ntawm cov ntawv txheeb xyuas;
  • ...

Yog tias koj muaj daim ntawv thov Java thiab tsis xav tau / muaj peev xwm siv Kafka pawg, kuj tseem muaj peev xwm ua haujlwm los ntawm embedded connector. Qhov pom tseeb ntxiv yog tias nrog nws koj tuaj yeem tsis kam ntxiv cov cuab yeej cuab tam (hauv daim ntawv txuas thiab Kafka). Txawm li cas los xij, qhov kev daws teeb meem no tau raug txiav tawm txij li version 1.1 thiab tsis pom zoo siv ntxiv lawm (nws yuav raug tshem tawm hauv kev tshaj tawm yav tom ntej).

Kab lus no yuav tham txog cov architecture pom zoo los ntawm cov neeg tsim khoom, uas muab kev ua txhaum cai thiab scalability.

Connector configuration

Txhawm rau pib taug qab cov kev hloov pauv ntawm tus nqi tseem ceeb tshaj plaws - cov ntaub ntawv - peb xav tau:

  1. cov ntaub ntawv, uas tuaj yeem yog MySQL pib los ntawm version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (cov npe ua tiav);
  2. Apache Kafka pawg
  3. Kafka Txuas piv txwv (versions 1.x, 2.x);
  4. configured Debezium connector.

Ua haujlwm ntawm thawj ob lub ntsiab lus, i.e. cov txheej txheem ntawm kev txhim kho DBMS thiab Apache Kafka yog dhau ntawm cov kab lus. Txawm li cas los xij, rau cov neeg uas xav xa txhua yam hauv lub sandbox, muaj ib qho kev npaj ua tiav hauv cov chaw khaws ntaub ntawv nrog cov piv txwv. docker-compose.yaml.

Peb yuav tsom mus rau ob lub ntsiab lus kawg hauv kev nthuav dav ntxiv.

0. Kafka Txuas

Ntawm no thiab tom qab hauv tsab xov xwm, tag nrho cov qauv kev teeb tsa raug txiav txim siab hauv cov ntsiab lus ntawm Docker duab faib los ntawm Debezium developers. Nws muaj tag nrho cov tsim nyog plugin cov ntaub ntawv (connectors) thiab muab Kafka Connect configuration siv ib puag ncig variables.

Yog tias koj npaj siab yuav siv Kafka Txuas los ntawm Confluent, koj yuav tsum tau ntxiv cov plugins ntawm qhov tsim nyog txuas koj tus kheej mus rau cov npe teev hauv plugin.path los yog teeb tsa ntawm ib puag ncig hloov pauv CLASSPATH. Cov chaw rau Kafka Connect tus neeg ua haujlwm thiab cov khoom sib txuas tau txhais los ntawm cov ntaub ntawv teeb tsa uas tau dhau los ua kev sib cav rau tus neeg ua haujlwm pib hais kom ua. Yog xav paub ntxiv, saib cov ntaub ntawv.

Tag nrho cov txheej txheem ntawm kev teeb tsa Debeizum nyob rau hauv lub connector version yog nqa tawm nyob rau hauv ob theem. Cia peb xav txog lawv txhua tus:

1. Teeb tsa Kafka Txuas lub moj khaum

Txhawm rau kwj cov ntaub ntawv mus rau Apache Kafka pawg, cov kev txwv tshwj xeeb tau teeb tsa hauv Kafka Txuas lub moj khaum, xws li:

  • pawg kev sib txuas,
  • cov npe ntawm cov ncauj lus nyob rau hauv uas lub configuration ntawm lub connector nws tus kheej yuav muab khaws cia,
  • lub npe ntawm cov pab pawg nyob rau hauv uas lub connector yog khiav (nyob rau hauv cov ntaub ntawv ntawm siv faib hom).

Cov duab Docker ntawm qhov project txhawb kev teeb tsa siv ib puag ncig hloov pauv - qhov no yog qhov peb yuav siv. Yog li cia peb rub tawm cov duab:

docker pull debezium/connect

Qhov tsawg kawg nkaus ntawm ib puag ncig hloov pauv yuav tsum tau khiav lub connector yog raws li hauv qab no:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - thawj daim ntawv teev npe ntawm Kafka pawg servers kom tau txais cov npe ua tiav ntawm pawg tswv cuab;
  • OFFSET_STORAGE_TOPIC=connector-offsets - ib lub ntsiab lus rau khaws cia cov haujlwm uas lub connector nyob tam sim no;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - ib lub ntsiab lus rau khaws cov xwm txheej ntawm lub connector thiab nws cov dej num;
  • CONFIG_STORAGE_TOPIC=connector-config - lub ntsiab lus khaws cia cov ntaub ntawv txuas txuas thiab nws cov dej num;
  • GROUP_ID=1 - tus cim ntawm pab pawg neeg ua haujlwm uas cov haujlwm txuas tuaj yeem ua tiav; xav tau thaum siv faib (muab faib) kev tswj hwm.

Peb pib lub thawv nrog cov kev hloov pauv no:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nco ntsoov txog Avro

Los ntawm lub neej ntawd, Debezium sau cov ntaub ntawv hauv JSON hom, uas yog siv tau rau sandboxes thiab cov ntaub ntawv me me, tab sis tuaj yeem ua teeb meem hauv cov ntaub ntawv hnyav hnyav. Lwm txoj hauv kev rau JSON converter yog serialize cov lus siv euro rau ib hom ntawv binary, uas txo cov load ntawm I / O subsystem hauv Apache Kafka.

Txhawm rau siv Avro, koj yuav tsum xa ib qho cais schema-registry (rau khaws cia schemas). Cov variables rau lub converter yuav zoo li no:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Cov ntsiab lus ntawm kev siv Avro thiab teeb tsa ib daim ntawv teev npe rau nws dhau ntawm qhov kev nthuav dav ntawm tsab xov xwm - ntxiv, kom pom tseeb, peb yuav siv JSON.

2. Teem lub connector nws tus kheej

Tam sim no koj tuaj yeem mus ncaj qha mus rau qhov kev teeb tsa ntawm lub connector nws tus kheej, uas yuav nyeem cov ntaub ntawv los ntawm qhov chaw.

Cia peb saib cov piv txwv ntawm kev sib txuas rau ob DBMS: PostgreSQL thiab MongoDB, uas kuv muaj kev paub dhau los thiab qhov muaj qhov sib txawv (txawm tias me me, tab sis qee qhov tseem ceeb!).

Cov kev teeb tsa tau piav qhia hauv JSON cov cim thiab xa mus rau Kafka Txuas siv POST thov.

2.1. PostgreSQL

Piv txwv connector configuration rau PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Lub hauv paus ntsiab lus ntawm kev khiav hauj lwm ntawm lub connector tom qab configuration no yog heev yooj yim:

  • Thaum pib thawj zaug, nws txuas mus rau cov ntaub ntawv teev tseg hauv kev teeb tsa thiab pib hauv hom pib snapshot, xa mus rau Kafka thawj txheej ntawm cov ntaub ntawv tau txais nrog cov xwm txheej SELECT * FROM table_name.
  • Tom qab pib ua tiav, tus txuas nkag mus rau hom kev nyeem ntawv hloov pauv ntawm PostgreSQL WAL cov ntaub ntawv.

Hais txog cov kev xaiv siv:

  • name - lub npe ntawm lub connector uas lub configuration piav hauv qab no yog siv; yav tom ntej, lub npe no yog siv los ua haujlwm nrog cov khoom sib txuas (piv txwv li saib cov xwm txheej / rov pib dua / hloov kho qhov teeb tsa) los ntawm Kafka Txuas REST API;
  • connector.class - DBMS connector chav kawm uas yuav raug siv los ntawm configured connector;
  • plugin.name yog lub npe ntawm lub plugin rau kev txiav txim siab ntawm cov ntaub ntawv los ntawm WAL cov ntaub ntawv. Muaj xaiv los ntawm wal2json, decoderbuffs ΠΈ pgoutput. Thawj ob xav tau kev teeb tsa ntawm qhov tsim nyog txuas ntxiv hauv DBMS, thiab pgoutput rau PostgreSQL version 10 thiab siab dua tsis tas yuav muaj kev cuam tshuam ntxiv;
  • database.* - kev xaiv rau kev txuas mus rau lub database, qhov twg database.server.name - lub npe ntawm PostgreSQL piv txwv siv los ua lub npe ntawm cov ncauj lus hauv Kafka pawg;
  • table.include.list - ib daim ntawv teev cov rooj uas peb xav taug qab cov kev hloov pauv; muab rau hauv hom ntawv schema.table_name; tsis tuaj yeem siv ua ke nrog table.exclude.list;
  • heartbeat.interval.ms - ncua sij hawm (hauv milliseconds) uas lub connector xa lub plawv dhia lus mus rau lub ntsiab lus tshwj xeeb;
  • heartbeat.action.query - ib qho kev thov uas yuav raug tua thaum xa txhua lub plawv dhia lus (qhov kev xaiv tau tshwm sim txij li version 1.1);
  • slot.name - lub npe ntawm qhov replication slot uas yuav siv los ntawm lub connector;
  • publication.name - Lub npe cov ntawv luam tawm hauv PostgreSQL uas tus connector siv. Yog tias nws tsis muaj, Debezium yuav sim tsim nws. Yog hais tias tus neeg siv nyob rau hauv uas qhov kev twb kev txuas yog tsim los tsis muaj cai txaus rau qhov kev txiav txim, lub connector yuav tawm nrog ib tug yuam kev;
  • transforms txiav txim siab npaum li cas los hloov lub npe ntawm lub hom phiaj lub ntsiab lus:
    • transforms.AddPrefix.type qhia tias peb yuav siv cov lus hais tsis tu ncua;
    • transforms.AddPrefix.regex - daim npog qhov ncauj uas lub npe ntawm lub hom phiaj yog rov txhais dua;
    • transforms.AddPrefix.replacement - ncaj qha qhov peb redefine.

Xav paub ntau ntxiv txog lub plawv dhia thiab hloov pauv

Los ntawm lub neej ntawd, tus connector xa cov ntaub ntawv mus rau Kafka rau txhua qhov kev sib cog lus, thiab sau nws LSN (Log Sequence Number) rau lub ntsiab lus pabcuam. offset. Tab sis yuav ua li cas yog hais tias tus connector yog configured nyeem tsis tag nrho cov database, tab sis tsuas yog ib feem ntawm nws cov ntxhuav (nyob rau hauv cov ntaub ntawv twg yog tshiab infrequently)?

  • Lub connector yuav nyeem WAL cov ntaub ntawv thiab tsis ntes cov kev sib pauv hauv lawv mus rau cov ntxhuav nws saib.
  • Yog li ntawd, nws yuav tsis hloov nws txoj hauj lwm tam sim no nyob rau hauv lub ncauj lus los yog nyob rau hauv replication slot.
  • Qhov no, nyob rau hauv lem, yuav ua rau cov ntaub ntawv WAL yuav tsum tau "kab" ntawm disk thiab yuav zoo li khiav tawm ntawm qhov chaw disk.

Thiab ntawm no cov kev xaiv los cawm. heartbeat.interval.ms ΠΈ heartbeat.action.query. Siv cov kev xaiv no ua khub ua rau nws muaj peev xwm ua tau raws li qhov kev thov kom hloov cov ntaub ntawv nyob rau hauv ib lub rooj sib cais txhua zaus xa cov lus hauv siab. Yog li, LSN uas lub connector tam sim no nyob (hauv qhov replication slot) yog hloov kho tas li. Qhov no tso cai rau DBMS tshem tawm WAL cov ntaub ntawv uas tsis xav tau ntxiv lawm. Yog xav paub ntxiv txog kev xaiv ua haujlwm li cas, saib cov ntaub ntawv.

Lwm qhov kev xaiv uas tsim nyog tau txais kev mloog zoo dua yog transforms. Txawm hais tias nws yog ntau txog kev yooj yim thiab kev zoo nkauj ...

Los ntawm lub neej ntawd, Debezium tsim cov ncauj lus siv txoj cai teev npe hauv qab no: serverName.schemaName.tableName. Qhov no tej zaum yuav tsis yooj yim. Kev xaiv transforms siv cov lus qhia tsis tu ncua, koj tuaj yeem txhais cov npe ntawm cov rooj uas nws cov xwm txheej yuav tsum tau xa mus rau lub ncauj lus nrog lub npe tshwj xeeb.

Nyob rau hauv peb configuration ua tsaug rau transforms Cov hauv qab no tshwm sim: tag nrho CDC cov xwm txheej los ntawm cov ntaub ntawv taug qab yuav mus rau lub ntsiab lus nrog lub npe data.cdc.dbname. Txwv tsis pub (tsis muaj cov kev teeb tsa no), Debezium yuav los ntawm lub neej ntawd tsim ib lub ntsiab lus rau txhua lub rooj ntawm daim ntawv: pg-dev.public.<table_name>.

Connector txwv

Thaum kawg ntawm qhov kev piav qhia ntawm tus connector configuration rau PostgreSQL, nws yog tsim nyog tham txog cov nram qab no nta / kev txwv ntawm nws cov hauj lwm:

  1. Lub connector functionality rau PostgreSQL tso siab rau lub tswv yim ntawm kev txiav txim siab logical. Yog li ntawd nws tsis taug qab thov kom hloov cov qauv ntawm cov ntaub ntawv (DDL) - Raws li, cov ntaub ntawv no yuav tsis nyob hauv cov ncauj lus.
  2. Txij li thaum replication slots yog siv, kev sib txuas ntawm lub connector yog ua tau tsuas rau tus tswv DBMS piv txwv.
  3. Yog hais tias tus neeg siv nyob rau hauv uas lub connector txuas mus rau lub database muaj cai nyeem nkaus xwb, ces ua ntej lub community launch, koj yuav tsum manually tsim ib tug replication slot thiab luam tawm rau lub database.

Thov ib tug configuration

Yog li cia peb thauj peb cov kev teeb tsa rau hauv lub connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Peb xyuas tias cov download tau ua tiav thiab lub connector pib:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Zoo heev: nws tau teeb tsa thiab npaj mus. Tam sim no cia peb ua txuj ua tus neeg siv khoom thiab txuas rau Kafka, tom qab ntawd peb ntxiv thiab hloov qhov nkag hauv lub rooj:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Hauv peb lub ntsiab lus, qhov no yuav tshwm sim raws li hauv qab no:

JSON ntev heev nrog peb cov kev hloov pauv

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Nyob rau hauv ob qho tib si, cov ntaub ntawv muaj qhov tseem ceeb (PK) ntawm cov ntaub ntawv uas tau hloov, thiab qhov tseem ceeb ntawm cov kev hloov pauv: cov ntaub ntawv yog dab tsi ua ntej thiab qhov nws tau los tom qab.

  • Xws li ntawm INSERT: tus nqi ua ntej (before) sib npaug nullua raws li txoj hlua uas tau muab tso.
  • Xws li ntawm UPDATE: hauv payload.before yav dhau los lub xeev ntawm kab yog tso tawm, thiab nyob rau hauv payload.after - tshiab nrog lub ntsiab ntawm kev hloov.

2.2 MongoDB

Lub connector no siv tus qauv MongoDB replication mechanism, nyeem cov ntaub ntawv los ntawm oplog ntawm DBMS thawj node.

Ib yam li cov twb tau piav qhia txuas rau PgSQL, ntawm no, ib yam nkaus, thaum pib thawj zaug, thawj cov ntaub ntawv snapshot raug coj mus, tom qab ntawd lub connector hloov mus rau oplog nyeem hom.

Configuration piv txwv:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Raws li koj tuaj yeem pom, tsis muaj kev xaiv tshiab piv rau piv txwv yav dhau los, tab sis tsuas yog tus naj npawb ntawm cov kev xaiv lub luag haujlwm rau kev txuas mus rau cov ntaub ntawv thiab lawv cov ntawv ua ntej tau raug txo.

Chaw transforms lub sijhawm no lawv ua cov hauv qab no: tig lub npe ntawm lub hom phiaj ntawm lub tswv yim <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

txhaum cai

Qhov teeb meem ntawm kev ua txhaum cai thiab muaj siab nyob hauv peb lub sijhawm yog qhov mob hnyav dua li puas tau - tshwj xeeb tshaj yog thaum peb tham txog cov ntaub ntawv thiab kev hloov pauv, thiab cov ntaub ntawv hloov pauv tsis yog nyob rau ntawm qhov teeb meem no. Cia peb saib seb qhov twg tuaj yeem ua yuam kev hauv txoj cai thiab yuav ua li cas rau Debezium hauv txhua kis.

Muaj peb txoj kev xaiv tawm:

  1. Kafka Txuas tsis ua haujlwm. Yog tias txuas tau teeb tsa ua haujlwm hauv hom kev faib tawm, qhov no yuav tsum muaj ntau tus neeg ua haujlwm los teeb tsa tib lub group.id. Tom qab ntawd, yog tias ib qho ntawm lawv ua tsis tiav, qhov txuas yuav rov pib dua ntawm lwm tus neeg ua haujlwm thiab txuas ntxiv nyeem los ntawm qhov kawg tau cog lus nyob rau hauv cov ncauj lus hauv Kafka.
  2. Poob kev sib txuas nrog Kafka pawg. Lub connector tsuas yuav tsum tsis txhob nyeem ntawv ntawm txoj hauj lwm nws ua tsis tau tejyam xa mus rau Kafka thiab yuav ncua sij hawm rov xa nws mus txog rau thaum qhov kev sim ua tiav.
  3. Cov ntaub ntawv qhov chaw tsis muaj. Lub connector yuav sim rov txuas mus rau qhov chaw raws li kev teeb tsa. Lub neej ntawd yog 16 sim siv exponential backoff. Tom qab 16th ua tsis tau tejyam, txoj hauj lwm yuav raug cim raws li ua tsis tau thiab nws yuav tsum tau rov pib dua manually ntawm Kafka Txuas REST interface.
    • Xws li ntawm PostgreSQL cov ntaub ntawv yuav tsis ploj, vim siv cov replication slots yuav tiv thaiv kev tshem tawm ntawm WAL cov ntaub ntawv tsis nyeem los ntawm lub connector. Nyob rau hauv cov ntaub ntawv no, muaj ib tug downside: yog hais tias lub network kev twb kev txuas ntawm connector thiab DBMS cuam tshuam rau lub sij hawm ntev, muaj lub caij nyoog uas qhov chaw disk yuav khiav tawm, thiab qhov no yuav ua rau tsis ua hauj lwm ntawm tag nrho cov DBMS.
    • Xws li ntawm MySQL binlog cov ntaub ntawv tuaj yeem tig los ntawm DBMS nws tus kheej ua ntej kev sib txuas tau rov qab los. Qhov no yuav ua rau lub connector nkag mus rau hauv lub xeev ua tsis tau tejyam, thiab nws yuav tsum tau rov pib dua nyob rau hauv thawj snapshot hom mus txuas ntxiv nyeem ntawv los ntawm binlogs mus rau restore ib txwm ua hauj lwm.
    • rau MongoDB. Cov ntaub ntawv hais tias: tus cwj pwm ntawm tus connector nyob rau hauv cov ntaub ntawv log / oplog ntaub ntawv tau raug tshem tawm thiab cov connector tsis tau txuas ntxiv nyeem ntawv los ntawm txoj hauj lwm uas nws tshuav tawm yog tib yam rau tag nrho cov DBMS. Nws nyob rau hauv qhov tseeb hais tias tus connector yuav mus rau hauv lub xeev ua tsis tau thiab yuav tsum tau rov pib dua hauv hom pib snapshot.

      Txawm li cas los xij, muaj kev zam. Yog hais tias lub connector nyob rau hauv lub disconnector lub sij hawm ntev (los yog tsis tuaj yeem ncav cuag MongoDB piv txwv), thiab oplog tau tig thaum lub sijhawm no, tom qab ntawd thaum qhov kev sib txuas tau rov qab los, tus connector yuav maj mam nyeem cov ntaub ntawv los ntawm thawj txoj haujlwm. , uas yog vim li cas qee cov ntaub ntawv hauv Kafka tsis yuav ntaus.

xaus

Debezium yog kuv thawj qhov kev paub nrog CDC systems thiab tau zoo tag nrho. Qhov project bribed kev txhawb nqa ntawm lub ntsiab DBMS, yooj yim ntawm configuration, kev txhawb pab pawg thiab ib tug active zej zog. Rau cov neeg nyiam xyaum, kuv xav kom koj nyeem cov lus qhia rau Kafka Txuas ΠΈ Debezium.

Piv nrog rau JDBC connector rau Kafka Connect, lub ntsiab kom zoo dua ntawm Debezium yog qhov kev hloov pauv tau nyeem los ntawm DBMS cav, uas tso cai rau cov ntaub ntawv tau txais nrog qeeb qeeb. Lub JDBC Connector (muab los ntawm Kafka Txuas) nug cov lus taug qab ntawm lub sijhawm ruaj khov thiab (rau tib lub laj thawj) tsis tsim cov lus thaum cov ntaub ntawv raug tshem tawm (koj tuaj yeem nug cov ntaub ntawv uas tsis nyob ntawd li cas?).

Txhawm rau daws cov teeb meem zoo sib xws, koj tuaj yeem mloog cov kev daws teeb meem hauv qab no (ntxiv rau Debezium):

PS

Nyeem kuj ntawm peb blog:

Tau qhov twg los: www.hab.com

Ntxiv ib saib