Re hlahisa Debezium - CDC bakeng sa Apache Kafka

Re hlahisa Debezium - CDC bakeng sa Apache Kafka

Mosebetsing oa ka, hangata ke kopana le litharollo tse ncha tsa tekheniki / lihlahisoa tsa software, tlhahisoleseling eo ka eona e haellang Inthaneteng ea batho ba buang Serussia. Ka sengoloa sena, ke tla leka ho tlatsa lekhalo le le leng le joalo ka mohlala oa tloaelo ea ka ea morao-rao, ha ke ne ke hloka ho theha ho romella liketsahalo tsa CDC ho tsoa ho li-DBMS tse peli tse tsebahalang (PostgreSQL le MongoDB) ho sehlopha sa Kafka se sebelisang Debezium. Ke tšepa hore sehlooho sena sa tlhahlobo, se hlahileng ka lebaka la mosebetsi o entsoeng, se tla ba molemo ho ba bang.

Debezium le CDC ke eng ka kakaretso?

Debezium - Moemeli oa sehlopha sa software sa CDC (Tšoara phetoho ea data), kapa ka nepo, ke sehlopha sa likhokahano tsa li-DBMS tse fapaneng tse lumellanang le moralo oa Apache Kafka Connect.

sena morero oa mohloli o bulehileng, e nang le laesense tlasa Apache License v2.0 mme e tšehelitsoe ke Red Hat. Nts'etsopele esale e tsoela pele ho tloha ka 2016 mme hajoale e fana ka ts'ehetso ea semmuso bakeng sa DBMS e latelang: MySQL, PostgreSQL, MongoDB, SQL Server. Ho boetse ho na le likhokahano tsa Cassandra le Oracle, empa hajoale li maemong a "phihlello ea pele", 'me likhatiso tse ncha ha li tiise hore li tla lumellana.

Haeba re bapisa CDC le mokhoa oa setso (ha kopo e bala lintlha tse tsoang ho DBMS ka ho toba), joale melemo ea eona e ka sehloohong e kenyelletsa ts'ebetsong ea phetoho ea data ho phallela boemong ba mola o nang le latency e tlaase, ho tšepahala ho phahameng le ho fumaneha. Lintlha tse peli tsa ho qetela li finyelloa ka ho sebelisa sehlopha sa Kafka e le sebaka sa polokelo ea liketsahalo tsa CDC.

Hape, melemo e kenyelletsa taba ea hore mohlala o le mong o sebelisetsoa ho boloka liketsahalo, kahoo kopo ea ho qetela ha ea lokela ho tšoenyeha ka li-nuances tsa ho sebetsa DBMS e fapaneng.

Qetellong, ho sebelisa morekisi oa melaetsa ho bula monyetla oa ho lekanya lits'ebetso tse latelang liphetoho ho data. Ka nako e ts'oanang, phello ea mohloli oa data e fokotsehile, kaha data ha e amoheloe ka kotloloho ho tsoa ho DBMS, empa ho tsoa sehlopheng sa Kafka.

Mabapi le meralo ea Debezium

Ho sebelisa Debezium ho theohela morerong ona o bonolo:

DBMS (e le mohloli oa data) → sehokelo ho Kafka Connect → Apache Kafka → moreki

E le papiso, ke tla fana ka setšoantšo se tsoang webosaeteng ea morero:

Re hlahisa Debezium - CDC bakeng sa Apache Kafka

Leha ho le joalo, ha ke rate morero ona, hobane ho bonahala eka ho ka khoneha feela sehokelo sa sink.

Ha e le hantle, boemo bo fapane: ho tlatsa Letša la hau la Data (sehokelo sa ho qetela setšoantšong se kaholimo) ha se eona feela tsela ea ho sebelisa Debezium. Liketsahalo tse rometsoeng ho Apache Kafka li ka sebelisoa ke likopo tsa hau ho sebetsana le maemo a fapaneng. Ka mohlala:

  • ho tlosoa ha data e sa lokelang ho tsoa ho cache;
  • ho romela litsebiso;
  • lintlafatso tsa index ea lipatlisiso;
  • mofuta o mong oa li-log tsa tlhahlobo;
  • ...

Haeba u na le ts'ebeliso ea Java 'me ha ho na tlhoko / monyetla oa ho sebelisa sehlopha sa Kafka, ho na le monyetla oa ho sebetsa. sehokedi se kenyeleditsweng. Ntho e totobetseng ke hore ka eona u ka hana lisebelisoa tse eketsehileng (ka mokhoa oa sehokelo le Kafka). Leha ho le joalo, tharollo ena e tlositsoe ho tloha mofuta oa 1.1 mme ha e sa khothaletsoa hore e sebelisoe (e kanna ea tlosoa litokollong tse tlang).

Sengoliloeng sena se tla tšohla meralo e khothalelitsoeng ke bahlahisi, e fanang ka mamello ea liphoso le scalability.

Tlhophiso ea sehokelo

E le hore u qale ho latela liphetoho tsa bohlokoa ka ho fetisisa - data - re hloka:

  1. mohloli oa data, e ka bang MySQL ho tloha ho mofuta oa 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lenane le feletseng);
  2. Sehlopha sa Apache Kafka
  3. Mohlala oa Kafka Connect (liphetolelo tsa 1.x, 2.x);
  4. sehokelo sa Debezium se hlophisitsoeng.

Sebetsa lintlheng tse peli tsa pele, i.e. ts'ebetso ea ho kenya DBMS le Apache Kafka li feta boholo ba sengoloa. Leha ho le joalo, bakeng sa ba batlang ho kenya ntho e 'ngoe le e' ngoe ka lebokoseng la lehlabathe, ho na le e lokiselitsoeng sebakeng sa polokelo ea molao e nang le mehlala. docker-compose.yaml.

Re tla tsepamisa maikutlo lintlheng tse peli tsa ho qetela ka botlalo.

0. Kafka Connect

Mona le hamorao sengolong, mehlala eohle ea tlhophiso e nahanoa molemong oa setšoantšo sa Docker se ajoang ke baetsi ba Debezium. E na le lifaele tsohle tse hlokahalang tsa plugin (lihokelo) mme e fana ka tlhophiso ea Kafka Connect e sebelisa mefuta e fapaneng ea tikoloho.

Haeba u ikemiselitse ho sebelisa Kafka Connect ho tsoa ho Confluent, u tla hloka ho kenyelletsa li-plugins tsa lihokelo tse hlokahalang ho bukana e boletsoeng ho. plugin.path kapa sete ka sebopeho sa tikoloho CLASSPATH. Litlhophiso tsa mosebeletsi oa Kafka Connect le lihokelo li hlalosoa ka lifaele tsa tlhophiso tse fetisitsoeng e le likhang ho taelo ea ho qala ea mosebeletsi. Bakeng sa lintlha bona litokomane.

Ts'ebetso eohle ea ho theha Debeizum mofuta oa sehokelo e etsoa ka mekhahlelo e 'meli. A re hlahlobeng e ’ngoe le e ’ngoe ea tsona:

1. Ho theha moralo oa Kafka Connect

Ho tsamaisa data ho sehlopha sa Apache Kafka, litekanyo tse ikhethileng li behiloe ka har'a moralo oa Kafka Connect, joalo ka:

  • litlhophiso tsa khokahano ea cluster,
  • mabitso a lihlooho tseo tlhophiso ea sehokelo ka boeona e tla bolokoa,
  • lebitso la sehlopha seo sehokedi se sebetsang ho sona (haeba ho sebediswa mokgwa o abuwang).

Setšoantšo sa semmuso sa Docker sa projeke se ts'ehetsa tlhophiso e sebelisang mefuta e fapaneng ea tikoloho - ke seo re tla se sebelisa. Kahoo ha re khoasolle setšoantšo:

docker pull debezium/connect

Bonyane sete e fapaneng ea tikoloho e hlokehang ho tsamaisa sehokelo ke e latelang:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Lethathamo la pele la li-server tsa sehlopha sa Kafka ho fumana lenane le felletseng la litho tsa sehlopha;
  • OFFSET_STORAGE_TOPIC=connector-offsets - sehlooho sa ho boloka maemo moo sehokelo se leng teng hajoale;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - sehlooho sa ho boloka boemo ba sehokelo le mesebetsi ea sona;
  • CONFIG_STORAGE_TOPIC=connector-config - sehlooho sa ho boloka data ea tlhophiso ea sehokelo le mesebetsi ea eona;
  • GROUP_ID=1 - sekhetho sa sehlopha sa basebetsi seo mosebetsi oa sehokelo o ka etsoang ho sona; e hlokahalang ha o sebedisoa (ea ajoa) puso.

Re qala setshelo ka mefuta ena:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Hlokomela ka Avro

Ka nako e sa lekanyetsoang, Debezium e ngola lintlha ka mokhoa oa JSON, o amohelehang bakeng sa mabokose a lehlabathe le lintlha tse nyenyane, empa e ka ba bothata ho databases tse tletseng haholo. Mokhoa o mong oa ho fetolela JSON ke ho hlophisa melaetsa ka ho sebelisa la euro ho sebopeho sa binary, se fokotsang mojaro ho I / O subsystem ho Apache Kafka.

Ho sebelisa Avro, o hloka ho tsamaisa e arohaneng schema-registry (bakeng sa ho boloka meralo). Liphetoho tsa converter li tla shebahala tjena:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Lintlha mabapi le ho sebelisa Avro le ho theha ngoliso bakeng sa eona li ka nqane ho sengoloa - ho feta moo, bakeng sa ho hlaka, re tla sebelisa JSON.

2. Ho theha sehokelo ka bosona

Hona joale o ka ea ka ho toba ho tlhophiso ea sehokelo ka boeona, se tla bala data ho tsoa mohloling.

Ha re shebeng mohlala oa lihokelo tsa DBMS tse peli: PostgreSQL le MongoDB, tseo ke nang le boiphihlelo ho tsona le tseo ho nang le liphapang ho tsona (leha li le nyane, empa maemong a mang li bohlokoa!).

Tokiso e hlalositsoe ka mongolo oa JSON mme e kentsoe ho Kafka Connect ho sebelisoa kopo ea POST.

2.1. PostgreSQL

Mohlala oa tlhophiso ea sehokelo bakeng sa PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Molao-motheo oa ts'ebetso ea sehokelo ka mor'a tlhophiso ena o bonolo haholo:

  • Qalong ea pele, e hokela ho database e boletsoeng ho tlhophiso mme e qala ka mokhoa senepe sa pele, ho romella Kafka sete ea pele ea data e amohetsoeng ka maemo SELECT * FROM table_name.
  • Ka mor'a hore ho qalisoa ho phethoe, sehokelo se kenya mokhoa oa ho bala liphetoho ho tloha lifaeleng tsa PostgreSQL WAL.

Mabapi le likhetho tse sebelisitsoeng:

  • name - lebitso la sehokelo seo tlhophiso e hlalositsoeng ka tlase e sebelisetsoang; nakong e tlang, lebitso lena le sebelisetsoa ho sebetsa le sehokelo (ke hore, sheba boemo / restart / update configuration) ka Kafka Connect REST API;
  • connector.class - sehlopha sa sehokelo sa DBMS se tla sebelisoa ke sehokelo se hlophisitsoeng;
  • plugin.name ke lebitso la plugin bakeng sa ho hlakisa data ka mokhoa o hlakileng ho tsoa lifaeleng tsa WAL. E fumaneha ho khetha ho tloha wal2json, decoderbuffs и pgoutput. Tse peli tsa pele li hloka ho kenya lisebelisoa tse nepahetseng ho DBMS, le pgoutput bakeng sa mofuta oa 10 oa PostgreSQL ho ea holimo ha o hloke ho qhekella ho eketsehileng;
  • database.* — dikgetho tsa ho hokela polokelongtshedimosetso, moo database.server.name - lebitso la mohlala oa PostgreSQL o sebelisitsoeng ho theha lebitso la sehlooho sehlopheng sa Kafka;
  • table.include.list - lethathamo la litafole tseo re batlang ho latela liphetoho ho tsona; e fanoeng ka sebopeho schema.table_name; ha e khone ho sebelisoa hammoho table.exclude.list;
  • heartbeat.interval.ms - karohano (ka milliseconds) eo sehokelo se romellang melaetsa ea ho otla ha pelo ho sehlooho se ikhethileng;
  • heartbeat.action.query - kopo e tla etsoa ha o romella molaetsa o mong le o mong oa ho otla ha pelo (khetho e hlahile ho tloha phetolelong ea 1.1);
  • slot.name - lebitso la sekotjana sa ho ikatisa se tla sebelisoa ke sehokedi;
  • publication.name - Lebitso lingoliloeng ho PostgreSQL eo sehokelo se e sebelisang. Haeba e le sieo, Debezium e tla leka ho e etsa. Haeba mosebelisi eo khokahano e etsoang tlas'a eona a se na litokelo tse lekaneng bakeng sa ketso ena, sehokelo se tla tsoa ka phoso;
  • transforms e khetha mokhoa oa ho fetola lebitso la sehlooho se reriloeng:
    • transforms.AddPrefix.type e bontša hore re tla sebelisa lipolelo tse tloaelehileng;
    • transforms.AddPrefix.regex - mask eo lebitso la sehlooho se reretsoeng le hlalosoang bocha;
    • transforms.AddPrefix.replacement - ka kotloloho seo re se hlalosang bocha.

Tse ling ka ho otla ha pelo le ho fetoha

Ka nako e sa lekanyetsoang, sehokelo se romella data ho Kafka bakeng sa ts'ebetso e 'ngoe le e' ngoe e ikemiselitseng, 'me se ngola LSN ea eona (Nomoro ea Tatelano ea Log) ho sehlooho sa tšebeletso. offset. Empa ho etsahala'ng haeba sehokelo se lokiselitsoe ho bala eseng database eohle, empa karolo feela ea litafole tsa eona (tseo data e nchafatsoang hangata)?

  • Sehokelo se tla bala lifaele tsa WAL mme se se ke sa bona tšebetso ea transaction ho tsona ho ea litafoleng tseo e li hlokomelang.
  • Ka hona, e ke ke ea nchafatsa boemo ba eona ba hajoale ebang ke sehloohong kapa sekoting sa ho pheta-pheta.
  • Sena le sona se tla etsa hore lifaele tsa WAL li "khomarele" ho disk mme mohlomong li tla felloa ke sebaka sa disk.

'Me mona likhetho li tla ho thusa. heartbeat.interval.ms и heartbeat.action.query. Ho sebelisa likhetho tsena ka bobeli ho etsa hore ho khonehe ho etsa kopo ea ho fetola data tafoleng e arohaneng nako le nako ha molaetsa oa ho otla ha pelo o romelloa. Kahoo, LSN eo sehokelo se teng hona joale (ka har'a slot ea ho pheta-pheta) e lula e ntlafatsoa. Sena se lumella DBMS ho tlosa lifaele tsa WAL tse seng li sa hlokahale. Ho fumana lintlha tse ling mabapi le hore na likhetho li sebetsa joang, bona litokomane.

Khetho e 'ngoe e tšoaneloang ke tlhokomelo e haufi ke transforms. Leha e bua haholo ka boiketlo le botle ...

Ka mokhoa o ikhethileng, Debezium e theha lihlooho ka ho sebelisa leano le latelang la mabitso: serverName.schemaName.tableName. Sena se ka 'na sa se be bonolo kamehla. Dikgetho transforms ka ho sebelisa lipolelo tse tloaelehileng, u ka hlalosa lethathamo la litafole tseo liketsahalo tsa tsona li hlokang ho fetisetsoa sehloohong se nang le lebitso le itseng.

Ka tlhophiso ea rona leboha ho transforms se latelang sea etsahala: liketsahalo tsohle tsa CDC ho tsoa ho database tse lateloang li tla ea sehloohong se nang le lebitso data.cdc.dbname. Ho seng joalo (ntle le litlhophiso tsena), Debezium e ne e tla iketsetsa sehlooho bakeng sa tafole ka 'ngoe ea foromo: pg-dev.public.<table_name>.

Mefokolo ea sehokelo

Qetellong ea tlhaloso ea tlhophiso ea sehokelo bakeng sa PostgreSQL, ho bohlokoa ho bua ka likarolo tse latelang / meeli ea mosebetsi oa eona:

  1. Ts'ebetso ea sehokelo bakeng sa PostgreSQL e its'etleha holim'a mohopolo oa decoding e hlakileng. Ka hona o ha e latele likopo tsa ho fetola sebopeho sa database (DDL) - ka hona, data ena e ke ke ea e-ba lihloohong.
  2. Kaha ho sebelisoa li-slots tsa ho pheta-pheta, ho khoneha ho hokahanya sehokelo feela ho mohlala oa master DBMS.
  3. Haeba mosebelisi eo sehokelo se hokahantsoeng le database a na le litokelo tsa ho bala feela, joale pele o qala ts'ebetso ea pele, o tla hloka ho iketsetsa sebaka sa ho ikatisa ebe o se phatlalatsa ho database.

Ho sebelisa tlhophiso

Kahoo a re ke re kenye tlhophiso ea rona ho sehokelo:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Re etsa bonnete ba hore ho khoasolla ho atlehile le hore sehokelo se qalile:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

E kholo: e setiloe 'me e loketse ho tsamaea. Joale ha re iketse eka re moreki 'me re hokahane le Kafka, ka mor'a moo re eketsa le ho fetola se kentsoeng tafoleng:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Sehloohong sa rona, sena se tla hlahisoa ka tsela e latelang:

JSON e telele haholo ka liphetoho tsa rona

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Maemong ana ka bobeli, litlaleho li na le senotlolo (PK) sa rekoto e fetotsoeng, le moelelo oa liphetoho: hore na rekoto e ne e le eng pele le hore na e bile eng kamora moo.

  • Boemong ba INSERT: boleng pele (before) lekana nulle lateloe ke khoele e kentsoeng.
  • Boemong ba UPDATE: payload.before boemo bo fetileng ba mola bo bontšoa, le ho payload.after - e ncha ka moelelo oa phetoho.

2.2 MongoDB

Sehokelo sena se sebelisa mokhoa o tloaelehileng oa ho pheta-pheta oa MongoDB, ho bala tlhahisoleseling ho tsoa ho oplog ea node ea mantlha ea DBMS.

Ka mokhoa o ts'oanang le sehokelo se seng se hlalositsoe bakeng sa PgSQL, mona, hape, qalong ea pele, setšoantšo sa mantlha sa data se nkuoa, ka mor'a moo sehokelo se fetohela ho mokhoa oa ho bala oa oplog.

Mohlala oa tlhophiso:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Joalokaha u ka bona, ha ho na likhetho tse ncha ha li bapisoa le mohlala o fetileng, empa ke palo feela ea likhetho tse ikarabellang bakeng sa ho hokahanya le database le li-prefixes tsa tsona li fokotsehile.

Lisebelisoa transforms khetlong lena ba etsa tse latelang: fetola lebitso la sehlooho se tobisitsoeng ho tloha morerong <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

mamello ea liphoso

Taba ea mamello ea liphoso le ho fumaneha ho phahameng mehleng ea rona e matla ho feta leha e le neng pele - haholo-holo ha re bua ka data le transactions, 'me data change tracking ha e ka thōko tabeng ena. Ha re shebeng se ka senyehang ka molao-motheo le hore na ho tla etsahala eng ka Debezium boemong bo bong le bo bong.

Ho na le likhetho tse tharo tsa ho tsoa:

  1. Kafka Connect ho hloleha. Haeba Connect e lokiselitsoe ho sebetsa ka mokhoa o ajoang, sena se hloka basebetsi ba bangata ho seta sehlopha se le seng.id. Joale, haeba e 'ngoe ea tsona e hlōleha, sehokelo se tla tsosolosoa ho mosebeletsi e mong 'me se tsoele pele ho bala ho tloha boemong ba ho qetela bo ikemiselitseng sehloohong sa Kafka.
  2. Ho lahleheloa ke khokahano le sehlopha sa Kafka. Sehokelo se tla khaotsa ho bala sebakeng seo se hlōlehileng ho se romela Kafka 'me nako le nako se leke ho se romella ho fihlela boiteko bo atleha.
  3. Mohloli oa lintlha ha o fumanehe. Sehokelo se tla leka ho hokela mohloli hape ho latela tlhophiso. Ntho ea kamehla ke liteko tse 16 tse sebelisoang mokokotlo oa exponential. Ka mor'a teko ea 16 e hlōlehileng, mosebetsi o tla tšoauoa e le E ile ea hlōleha 'me e tla hloka ho qalisoa bocha ka sebopeho sa Kafka Connect REST.
    • Boemong ba PostgreSQL data e ke ke ea lahleha, hobane ho sebelisa li-slots tsa ho pheta-pheta ho tla thibela ho hlakoloa ha lifaele tsa WAL tse sa baloeng ke sehokelo. Tabeng ena, ho na le ho fokotseha: haeba khokahanyo ea marang-rang pakeng tsa sehokelo le DBMS e senyeha ka nako e telele, ho na le monyetla oa hore sebaka sa disk se tla fela, 'me sena se ka lebisa ho hlōleha ha DBMS eohle.
    • Boemong ba MySQL lifaele tsa binlog li ka fetisoa ke DBMS ka boeona pele khokahanyo e tsosolosoa. Sena se tla etsa hore sehokelo se kene boemong bo hlōlehileng, 'me se tla hloka ho qala hape ka mokhoa oa ho nka lifoto ho tsoela pele ho bala ho tsoa ho li-binlogs ho khutlisetsa ts'ebetso e tloaelehileng.
    • ka MongoDB. Litokomane li re: boitšoaro ba sehokelo haeba lifaele tsa log/oplog li hlakotsoe 'me sehokelo se sitoa ho tsoela pele ho bala ho tloha sebakeng seo se tlohetseng ho sona se tšoana le DBMS eohle. E lutse tabeng ea hore sehokedi se tla kena mmusong E ile ea hlōleha mme e tla hloka ho qala bocha ka mokhoa senepe sa pele.

      Leha ho le joalo, ho na le mekhelo. Haeba sehokelo se ne se le maemong a khaotsoeng ka nako e telele (kapa se ne se sa khone ho fihlela mohlala oa MongoDB), mme oplog e ne e fetotsoe nakong ena, joale ha khokahano e tsosolosoa, sehokelo se tla tsoela pele ka khutso ho bala data ho tloha sebakeng sa pele se fumanehang. , ke ka lebaka leo lintlha tse ling tsa Kafka ha tla otla.

fihlela qeto e

Debezium ke boiphihlelo ba ka ba pele ka litsamaiso tsa CDC mme e bile ntle haholo ka kakaretso. Morero o fane ka tjotjo ho tšehetso ea DBMS e kholo, boiketlo ba tlhophiso, ts'ehetso ea ho kopanya le sechaba se mafolofolo. Bakeng sa ba thahasellang ho itloaetsa, ke khothaletsa hore u bale litataiso tsa Kafka Connect и Debezium.

Ha ho bapisoa le sehokelo sa JDBC bakeng sa Kafka Connect, molemo o ka sehloohong oa Debezium ke hore liphetoho li baloa ho tloha ho li-logs tsa DBMS, tse lumellang hore data e amoheloe ka ho lieha ho fokolang. JDBC Connector (e fanoeng ke Kafka Connect) e botsa tafole e lateloang ka nako e tsitsitseng mme (ka lebaka le tšoanang) ha e hlahise melaetsa ha data e hlakotsoe (o ka botsa joang bakeng sa data e sieo?).

Ho rarolla mathata a tšoanang, o ka ela hloko litharollo tse latelang (ho kenyelletsa Debezium):

PES

Bala hape ho blog ea rona:

Source: www.habr.com

Eketsa ka tlhaloso