Ukwazisa i-Debezium-CDC ye-Apache Kafka

Ukwazisa i-Debezium-CDC ye-Apache Kafka

Kumsebenzi wam, ndihlala ndifumana izisombululo ezintsha zobugcisa / iimveliso zesoftware, ulwazi malunga nokunqongophala kwi-Intanethi yolwimi lwesiRashiya. Ngeli nqaku ndiza kuzama ukugcwalisa i-gap enye ngomzekelo kwindlela yam yakutshanje, xa ndifuna ukuqwalasela ukuthumela iziganeko ze-CDC ukusuka kwii-DBMS ezimbini ezidumileyo (i-PostgreSQL kunye ne-MongoDB) kwi-cluster ye-Kafka usebenzisa i-Debezium. Ndiyathemba ukuba eli nqaku lokuphonononga, elivela ngenxa yomsebenzi owenziweyo, liya kuba luncedo kwabanye.

Yintoni iDebezium kunye neCDC ngokubanzi?

Debezium β€” ummeli wodidi lweCDC software (Thatha uTshintsho lweDatha)

Oku Iprojekthi yoMthombo ovulekileyo, inikwe ilayisenisi phantsi kwe-Apache License v2.0 kwaye ixhaswa yiRed Hat. Uphuhliso luye lwaqhubeka ukususela ngo-2016 kwaye ngoku lubonelela ngenkxaso esemthethweni kwii-DBMS ezilandelayo: i-MySQL, i-PostgreSQL, i-MongoDB, i-SQL Server. Kukwakho neziqhagamshelo zeCassandra kunye ne-Oracle, kodwa okwangoku zikwimo "yokufikelela kwangoko", kwaye ukukhutshwa okutsha akuqinisekisi ukuhambelana komva.

Ukuba sithelekisa i-CDC kunye nendlela yendabuko (xa isicelo sifunda idatha kwi-DBMS ngokuthe ngqo), iingenelo zayo eziphambili zibandakanya ukuphunyezwa kokutshintshwa kwedatha yokusasazwa kwinqanaba lomqolo kunye ne-latency ephantsi, ukuthembeka okuphezulu kunye nokufumaneka. Amanqaku amabini okugqibela aphunyezwa ngokusebenzisa iqela leKafka njengendawo yokugcina imicimbi yeCDC.

Enye inzuzo yinto yokuba imodeli enye isetyenziselwa ukugcina iziganeko, ngoko ke isicelo sokugqibela akufanele sikhathazeke malunga neengqungquthela zokusebenza ezahlukeneyo zeDBMS.

Okokugqibela, ukusebenzisa umthengisi womyalezo kuvumela usetyenziso olubeka esweni utshintsho kwidatha ukukala ngokuthe tye. Ngelo xesha, impembelelo kumthombo wedatha iyancitshiswa, ekubeni idatha ayifumanekanga ngokuthe ngqo kwi-DBMS, kodwa kwi-cluster ye-Kafka.

Malunga noyilo lweDebezium

Ukusebenzisa i-Debezium yehla kwesi sikimu silula:

DBMS (njengomthombo wedatha) β†’ isidibanisi kwiKafka Qhagamshelana β†’ Apache Kafka β†’ umsebenzisi

Njengomzekeliso, nanku umzobo osuka kwiwebhusayithi yeprojekthi:

Ukwazisa i-Debezium-CDC ye-Apache Kafka

Nangona kunjalo, andiyithandi ngokwenene le nkqubo, kuba kubonakala ngathi kuphela ukusetyenziswa kwekhonkco ye-sink kunokwenzeka.

Enyanisweni, imeko yahlukile: ukuzalisa iDatha yakho yeDatha (ikhonkco lokugqibela kulo mzobo ungasentla) Le ayisiyiyo yodwa indlela yokusebenzisa iDebezium. Iziganeko ezithunyelwe kwi-Apache Kafka zingasetyenziswa zizicelo zakho ukusingatha iimeko ezahlukeneyo. Umzekelo:

  • ukususa idatha engabalulekanga kwi-cache;
  • ukuthumela izaziso;
  • uhlaziyo lwesalathiso sokukhangela;
  • uhlobo oluthile lwemibhalo yophicotho;
  • ...

Kwimeko apho unesicelo seJava kwaye akukho mfuneko/kunokwenzeka ukusebenzisa iqela le-Kafka, kukwakhona ukuba usebenze isidibanisi esizinzisiweyo. Inzuzo ecacileyo kukuba iphelisa imfuno yezibonelelo ezongezelelweyo (ngendlela yokudibanisa kunye neKafka). Nangona kunjalo, esi sisombululo sirhoxisiwe ukususela kwinguqulo ye-1.1 kwaye ayisakhuthazwa ukuba isetyenziswe (inkxaso yaso inokususwa ekukhutshweni kwexesha elizayo).

Eli nqaku liza kuxubusha i-architecture ecetyiswayo ngabaphuhlisi, enika ukunyamezela kweempazamo kunye nokulinganisa.

Ubumbeko lwesiqhagamshelo

Ukuze uqalise ukulandelela utshintsho kwixabiso elibaluleke kakhulu - idatha - sifuna:

  1. umthombo wedatha, onokuba yi-MySQL eqala kwinguqulo 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (uluhlu olupheleleyo);
  2. Iqela le-Apache Kafka;
  3. Umzekelo we-Kafka Connect (iinguqulelo 1.x, 2.x);
  4. iqwalaselwe isinxibelelanisi seDebezium.

Sebenza kwiingongoma ezimbini zokuqala, okt. Inkqubo yokufakela i-DBMS kunye ne-Apache Kafka ingaphaya kwendawo yenqaku. Nangona kunjalo, kwabo bafuna ukuhambisa yonke into kwibhokisi yesanti, indawo yokugcina esemthethweni enemizekelo inendawo esele yenziwe. docker-compose.yaml.

Siza kuhlala ngokweenkcukacha ngakumbi kumanqaku amabini okugqibela.

0. Kafka Connect

Apha nangaphezulu kwinqaku, yonke imizekelo yoqwalaselo ixutyushwa kumxholo weDocker umfanekiso osasazwa ngabaphuhlisi beDebezium. Iqulethe zonke iifayile zeplagin eziyimfuneko (iziqhagamshelo) kwaye ibonelela ngoqwalaselo lweKafka Connect usebenzisa izinto eziguquguqukayo zokusingqongileyo.

Ukuba ujonge ukusebenzisa i-Kafka Qhagamshelana esuka kwi-Confluent, kuya kufuneka udibanise ngokuzimeleyo iiplagi zedibanisi eziyimfuneko kuluhlu oluchaziweyo plugin.path okanye usete ngotshintsho lwemo engqongileyo CLASSPATH. Izicwangciso zomsebenzisi we-Kafka Connect kunye nezihlanganisi zichongwa ngeefayile zoqwalaselo ezigqithiswa njengeengxabano kumyalelo wokuqaliswa komsebenzi. Ukuze ufumane iinkcukacha ezithe vetshe, bona amaxwebhu.

Yonke inkqubo yokuseta i-Debeizum kwinguqulelo yesinxibelelanisi iqhutywa ngamanqanaba amabini. Makhe sijonge nganye kuzo:

1. Ukumisela isakhelo se-Kafka Connect

Ukusasaza idatha kwiqela le-Apache Kafka, iiparamitha ezithile zisetwe kwisakhelo soQhagamshelwano lweKafka, njenge:

  • iiparamitha zokudibanisa kwiqela,
  • amagama ezihloko apho uqwalaselo lwesiqhagamshelo ngokwalo luyakugcinwa ngokuthe ngqo,
  • Igama leqela apho isiqhagamshelanisi sisebenza (ukuba imo yosasazo iyasetyenziswa).

Umfanekiso osemthethweni weDocker weprojekthi uxhasa ulungelelwaniso usebenzisa izinto eziguquguqukayo zokusingqongileyo - yile nto siya kuyisebenzisa. Ngoko, khuphela umfanekiso:

docker pull debezium/connect

Iseti esezantsi yoguquguquko lwemekobume efunekayo ukuqhuba isinxibelelanisi ngolu hlobo lulandelayo:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Uluhlu lokuqala lweeseva zeqela le-Kafka ukufumana uluhlu olupheleleyo lwamalungu eqela;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” isihloko sokugcina izikhundla apho isinxibelelanisi sikhoyo ngoku;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status β€” isihloko sokugcina ubume besinxibelelanisi kunye nemisebenzi yaso;
  • CONFIG_STORAGE_TOPIC=connector-config β€” isihloko sokugcina idatha yoqwalaselo lwesinxibelelanisi kunye nemisebenzi yayo;
  • GROUP_ID=1 - isazisi seqela labasebenzi apho umsebenzi wokudibanisa unokwenziwa khona; iyimfuneko xa usebenzisa isasazwa (isasaziwe) ulawulo.

Siqalisa isikhongozeli ngeziguquko:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Qaphela malunga no-Avro

Ngokungagqibekanga, i-Debezium ibhala idatha kwifomathi ye-JSON, eyamkelekileyo kwiibhokisi zesanti kunye nenani elincinci ledatha, kodwa ingaba yingxaki kwiinkcukacha ezilayishwe kakhulu. Enye indlela kwisiguquli se-JSON kukucwangcisa imiyalezo usebenzisa euro kwifomati yokubini, eyehlisa umthwalo kwindlela esezantsi ye-I/O kwi-Apache Kafka.

Ukusebenzisa i-Avro kufuneka usebenzise eyahlukileyo schema-registry (yokugcina imizobo). Iinguqu zomguquli ziya kujongeka ngolu hlobo:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Iinkcukacha malunga nokusebenzisa i-Avro kunye nokuseta irejista kuyo ingaphaya kwemida yeli nqaku - ngakumbi, ukucaca, siya kusebenzisa i-JSON.

2. Ukuqwalasela isidibaniso ngokwaso

Ngoku ungaya ngqo kuqwalaselo lwesiqhagamshelo ngokwalo, esiza kufunda idatha kumthombo.

Makhe sijonge kumzekelo weziqhagamshelo zee-DBMS ezimbini: I-PostgreSQL kunye ne-MongoDB, apho ndinamava kwaye apho kukho ukungafani (nangona kuncinci, kodwa kwezinye iimeko kubalulekile!).

Ubumbeko luchazwe kwi-JSON notation kwaye ilayishwe kwi-Kafka Connect usebenzisa isicelo se-POST.

2.1. I-PostgreSQL

Umzekelo woqwalaselo lwesinxibelelanisi sePostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Umgaqo wokusebenza kwesiqhagamshelanisi emva kolu cwangciso ulula kakhulu:

  • Xa iqaliswe okokuqala, idibanisa kwisiseko sedatha esichazwe kuqwalaselo kwaye iqala kwimowudi umfanekiso wokuqala, ukuthumela kwi-Kafka iseti yokuqala yedatha efunyenwe usebenzisa i-conditional SELECT * FROM table_name.
  • Emva kokuba ukuqaliswa kugqityiwe, isinxibelelanisi siya kwimowudi yokufunda utshintsho kwiifayile ze-PostgreSQL WAL.

Malunga nokhetho olusetyenzisiweyo:

  • name β€” igama lesiqhagamshelo esisetyenziselwa uqwalaselo oluchazwe ngezantsi; kwixesha elizayo, eli gama lisetyenziswa ukusebenza kunye neqhagamshelo (oko kukuthi, jonga isimo/uqalise kwakhona/uhlaziye uqwalaselo) nge Kafka Qhagamshelana REST API;
  • connector.class -Iklasi yokudibanisa i-DBMS eya kusetyenziswa sisidibanisi esimiselweyo;
  • plugin.name β€” igama leplagi yokufaka iikhowuwudi ngengqiqo yedata kwiifayile zeWAL. Iyafumaneka ukuze ukhethe kuyo wal2json, decoderbuffs ΠΈ pgoutput. Ezimbini zokuqala zifuna ukufakwa kwezandiso ezifanelekileyo kwi-DBMS, kunye pgoutput ye-PostgreSQL inguqulo ye-10 nangaphezulu ayifuni ukukhohlisa okongeziweyo;
  • database.* β€” iinketho zokuqhagamshela kuvimba weenkcukacha, apho database.server.name -Igama lomzekelo wePostgreSQL elisetyenziselwa ukwenza igama lesihloko kwiqela leKafka;
  • table.include.list - Uluhlu lweetafile apho sifuna ukulandelela utshintsho; ichazwe kwifomathi schema.table_name; ayinakusetyenziswa kunye table.exclude.list;
  • heartbeat.interval.ms - ikhefu (kwi-milliseconds) apho isinxibelelanisi sithumela imiyalezo yokubetha kwentliziyo kwisihloko esikhethekileyo;
  • heartbeat.action.query β€” isicelo esiya kwenziwa xa kuthunyelwa umyalezo ngamnye wokubetha kwentliziyo (ukhetho luvele kwinguqulelo 1.1);
  • slot.name β€” igama lendawo yokuphindaphinda eya kusetyenziswa sisidibanisi;
  • publication.name - Igama iimpapasho kwi-PostgreSQL, esetyenziswa ngumnxibelelanisi. Ukuba ayikho, i-Debezium iya kuzama ukuyidala. Ukuba umsebenzisi odityaniswe phantsi kwakhe akanawo amalungelo aneleyo esi senzo, umnxibelelanisi uya kuphelisa ngempazamo;
  • transforms imisela ngokuthe ngqo indlela yokutshintsha igama lesihloko ekujoliswe kuso:
    • transforms.AddPrefix.type ibonisa ukuba siya kusebenzisa amabinzana aqhelekileyo;
    • transforms.AddPrefix.regex - imaski echaza ngokutsha igama lesihloko ekujoliswe kuso;
    • transforms.AddPrefix.replacement - ngqo into esiyichazayo ngokutsha.

Okunye malunga nokubetha kwentliziyo kunye notshintsho

Ngokungagqibekanga, umnxibelelanisi uthumela idatha kwiKafka kwintengiselwano nganye ezinikeleyo, kwaye iLSN yayo (iNombolo yokuLawula iLogi) irekhodwe kwisihloko senkonzo. offset. Kodwa kwenzeka ntoni ukuba isinxibelelanisi siqwalaselwe ukuba singafundi yonke isiseko sedatha, kodwa inxalenye yeetafile zayo kuphela (apho uhlaziyo lwedatha lungenzeki rhoqo)?

  • Isidibanisi siya kufunda iifayile ze-WAL kwaye asiyi kuqaphela nayiphi na intengiselwano eyenziwa kwiitafile ezibeka esweni.
  • Ngoko ke, ayiyi kuhlaziya indawo yayo yangoku nokuba kwisihloko okanye kwindawo yokuphindaphinda.
  • Oku, kwakhona, kuya kubangela ukuba iifayile ze-WAL zibanjwe kwidiski kwaye kunokwenzeka ukuba ziphelelwe yisithuba sediski.

Kwaye kulapho iinketho ziza kuhlangula. heartbeat.interval.ms ΠΈ heartbeat.action.query. Ukusebenzisa ezi zikhetho ngababini kwenza kube lula ukwenza isicelo sokutshintsha idatha kwitafile eyahlukileyo rhoqo xa kuthunyelwa umyalezo wokubetha kwentliziyo. Ngaloo ndlela, i-LSN apho ikhonkco ikhoyo ngoku (kwindawo yokuphindaphinda) ihlaziywa rhoqo. Oku kuvumela i-DBMS ukuba isuse iifayile ze-WAL ezingasafunekiyo. Unokufunda ngakumbi malunga nendlela iinketho ezisebenza ngayo amaxwebhu.

Olunye ukhetho olufanele ukuqwalaselwa ngokusondeleyo transforms. Nangona imalunga nokulula kunye nobuhle ...

Ngokungagqibekanga, i-Debezium yenza izihloko isebenzisa le migaqo-nkqubo ilandelayo yamagama: serverName.schemaName.tableName. Oku kusenokungasoloko kulula. Iinketho transforms Ungasebenzisa amabinzana aqhelekileyo ukuchaza uluhlu lweetheyibhile, iziganeko ekufuneka zihanjiswe kwisihloko esinegama elithile.

Kubumbeko kuqwalaselo lwethu transforms oku kulandelayo kwenzeka: zonke iziganeko ze-CDC ezivela kwisiseko sedatha esweni ziya kuya kwisihloko esinegama data.cdc.dbname. Kungenjalo (ngaphandle kwezi setingi), i-Debezium iyakwenza ngokungagqibekanga isihloko kwitafile nganye efana nale: pg-dev.public.<table_name>.

Unyino lwesiqhagamshelo

Ukuqukumbela inkcazo yoqwalaselo lwesinxibelelanisi sePostgreSQL, kufanelekile ukuthetha ngezi zinto zilandelayo / imida yokusebenza kwayo:

  1. Ukusebenza kwesidibanisi sePostgreSQL kuxhomekeke kwingqikelelo yokwenziwa kwekhowudi okusengqiqweni. Ngoko ke yena ayilandeli izicelo zokutshintsha ulwakhiwo lwedatabase (DDL) - ngokufanelekileyo, le datha ayiyi kuba kwizihloko.
  2. Ekubeni kusetyenziswa iindawo zokuphindaphinda, ukuqhagamshela isinxibelelanisi kunokwenzeka kuphela kumzekelo okhokelayo weDBMS.
  3. Ukuba umsebenzisi lowo umdibaniso udibanisa kwi-database unamalungelo okufunda kuphela, ngoko ngaphambi kokuqaliswa kokuqala kuya kufuneka ukuba wenze i-slot yokuphindaphinda ngesandla kwaye upapashe kwisiseko sedatha.

Ukusebenzisa ubumbeko

Ke, masilayishe ubumbeko bethu kwisinxibelelanisi:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Sijonga ukuba ukhuphelo lube yimpumelelo kwaye isiqhagamshelo siqalile:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Khulu: seyilungisiwe ulungele uhamba. Ngoku masenze ngathi singumthengi kwaye siqhagamshele kwiKafka, emva koko siyakongeza kwaye sitshintshe ukungena kwitafile:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Kwisihloko sethu iya kuboniswa ngolu hlobo lulandelayo:

I-JSON ende kakhulu kunye notshintsho lwethu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Kuzo zombini iimeko, iirekhodi zibandakanya isitshixo (i-PK) yerekhodi eyatshintshwa, kunye neyona nto ibalulekileyo yotshintsho: yintoni irekhodi eyayikho ngaphambili kwaye yaba yintoni emva kwayo.

  • Kwimeko INSERT: ixabiso ngaphambili (before) iyalingana null, kwaye emva - umgca owawufakiwe.
  • Kwimeko UPDATE: payload.before imo yangaphambili yomgca ibonisiwe, kwaye kwi payload.after - entsha ngondoqo wotshintsho.

2.2 I-MongoDB

Esi sidibanisi sisebenzisa indlela eqhelekileyo yokuphindaphinda ye-MongoDB, ulwazi lokufunda kwi-oplog ye-node ephambili ye-DBMS.

Ngokufana nekhonkco esele ichaziwe ye-PgSQL, apha, kwakhona, ekuqaleni kokuqala, i-snapshot yedatha ephambili ithathwa, emva koko ikhonkco itshintshela kwimodi yokufunda ye-oplog.

Umzekelo woqwalaselo:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Njengoko ubona, akukho zikhetho ezintsha apha xa kuthelekiswa nomzekelo wangaphambili, kodwa kuphela inani leenketho ezinoxanduva lokuxhuma kwisiseko sedatha kunye nezimaphambili zazo zincitshisiwe.

Izicwangciso transforms ngeli xesha benza oku kulandelayo: baguqula igama lesihloko ekujoliswe kuso kwi-schema <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

ukunyamezela iimpazamo

Umba wokunyamezela iimpazamo kunye nokufumaneka okuphezulu kwixesha lethu unzima kakhulu kunanini ngaphambili - ngakumbi xa sithetha ngedatha kunye neentengiselwano, kwaye ukulandelela utshintsho lwedatha akumi ecaleni kulo mba. Makhe sijonge ukuba yintoni enokungahambi kakuhle kumgaqo kwaye kuya kwenzeka ntoni kwiDebezium kwimeko nganye.

Kukho iindlela ezintathu zokuphuma:

  1. Kafka Connect ukusilela. Ukuba uQhagamshelwano luqwalaselwe ukuba lusebenze kwimo yosasazo, oku kufuna abasebenzi abaninzi ukuseta iqela.id elifanayo. Emva koko, ukuba omnye wabo uyasilela, isinxibelelanisi siya kuqaliswa kwakhona komnye umsebenzi kwaye siqhubeke nokufunda ukusuka kwindawo yokugqibela ezinikeleyo kwisihloko eKafka.
  2. Ukulahleka koqhagamshelwano neqela laseKafka. Isidibanisi siyakuyeka ukufunda kwindawo engazange ithunyelwe kwi-Kafka, kwaye iya kuzama ukuyithumela kwakhona de kube umzamo uphumelele.
  3. Ukungafumaneki komthombo wedatha. Isidibanisi siya kuzama ukuqhagamshela kwakhona kumthombo njengoko kuqwalaselwe. Ukungagqibeki yimizamo ye-16 umva we-exponential. Emva komzamo we-16 ongaphumelelanga, umsebenzi uya kumakishwa njengo Yahluleka kwaye kuya kufuneka uyiqale ngokutsha ngesandla ngeKafka Qhagamshelana REST ujongano.
    • Kwimeko PostgreSQL idatha ayiyi kulahleka, kuba Ukusebenzisa ii-replications slots kuya kukuthintela ekucimeni iifayile ze-WAL ezingafundwanga sisiqhagamshelo. Kule meko, kukho kwakhona ukuhla kwingqekembe: ukuba uxhulumaniso lwenethiwekhi phakathi komdibaniso kunye neDBMS luphazamisekile ixesha elide, kukho ithuba lokuba indawo yedisk iya kuphelelwa, kwaye oku kunokukhokelela ekungaphumelelini yonke iDBMS.
    • Kwimeko MySQL iifayile zebinlog zinokujikeleziswa yiDBMS ngokwayo phambi kokuba uxhulumaniso lubuyiselwe. Oku kuya kubangela ukuba umdibanisi aye kwimeko engaphumeleliyo, kwaye ukubuyisela ukusebenza okuqhelekileyo, kuya kufuneka uqalise kwakhona kwimowudi ekhawulezayo yokuqala ukuqhubeka nokufunda kwii-binlogs.
    • phezu MongoDB. Uxwebhu luthi: ukuziphatha kokudibanisa kwimeko apho iifayile zelogi / i-oplog zicinyiwe kwaye umdibaniso akakwazi ukuqhubeka nokufunda ukusuka kwindawo apho ushiye khona kuyafana kuzo zonke ii-DBMS. Kuthetha ukuba isinxibelelanisi siya kurhulumente Yahluleka kwaye iyakufuna ukuqalisa kwakhona kwimowudi umfanekiso wokuqala.

      Nangona kunjalo, zikho iimeko. Ukuba umdibaniso unqanyulwe ixesha elide (okanye akakwazanga ukufikelela kumzekelo we-MongoDB), kwaye i-oplog yahamba ngokujikeleza ngeli xesha, ngoko xa uxhulumaniso lubuyiselwa, umdibanisi uya kuqhubeka ngokuzola ukufunda idatha ukusuka kwindawo yokuqala ekhoyo, yiyo loo nto ezinye zedatha eKafka hayi izakubetha.

isiphelo

I-Debezium ngamava am okuqala kunye neenkqubo ze-CDC kwaye iyonke intle kakhulu. Iprojekthi iphumelele ngenkxaso yayo kwii-DBMS ezinkulu, ukukhululeka koqwalaselo, inkxaso yokudibanisa, kunye noluntu olusebenzayo. Kwabo banomdla wokuziqhelanisa, ndincoma ukuba ufunde izikhokelo Kafka Connect ΠΈ Debezium.

Xa kuthelekiswa ne-JDBC ikhonkco ye-Kafka Connect, inzuzo ephambili ye-Debezium kukuba utshintsho lufundwa kwiilogi ze-DBMS, ezivumela ukuba idatha yamkelwe nge-latency encinci. I-JDBC Connector (esuka kwi-Kafka Connect) ibuza itafile ebekwe esweni kwisithuba esisisigxina kwaye (ngesizathu esifanayo) ayivezi imiyalezo xa idatha icinyiwe (ungayibuza njani idatha engekho?).

Ukusombulula iingxaki ezifanayo, unokunikela ingqalelo kwezi zisombululo zilandelayo (ukongeza kwiDebezium):

PS

Funda nakwibhlog yethu:

umthombo: www.habr.com

Yongeza izimvo