Soo bandhigida Debezium - CDC ee Apache Kafka

Soo bandhigida Debezium - CDC ee Apache Kafka

Shaqadayda, waxaan inta badan la kulmaa xalal farsamo/software cusub, macluumaadka ku saabsan taas oo aad ugu yar Internet-ka luqadda Ruushka. Maqaalkan waxaan isku dayi doonaa inaan ku buuxiyo mid ka mid ah farqiga noocan oo kale ah tusaale ka soo jeeda dhaqankeygii dhawaa, markii aan u baahday in aan habeeyo u dirida dhacdooyinka CDC ee labada DBMS ee caanka ah (PostgreSQL iyo MongoDB) kooxda Kafka iyadoo la adeegsanayo Debezium. Waxaan rajeynayaa in maqaalkan dib u eegista, oo u muuqda natiijada shaqada la qabtay, uu noqon doono mid waxtar u leh dadka kale.

Waa maxay Debezium iyo CDC guud ahaan?

Debezium - wakiilka qaybta software CDC (Qabo Isbedelka Xogta), ama in ka badan si sax ah, waa isku xirka isku xirka DBMS-yada kala duwan ee ku habboon qaabka isku xirka Apache Kafka.

this Mashruuca Isha Furan, shatiga ku haysta Apache License v2.0 oo ay kafaalo qaaday Koofiyada Cas. Horumarintu waxay socotay ilaa 2016 waxayna hadda siisay taageero rasmi ah DBMS-yada soo socda: MySQL, PostgreSQL, MongoDB, SQL Server. Waxa kale oo jira xidhiidhiyayaal Cassandra iyo Oracle, laakiin wakhtigan xaadirka ah waxay ku jiraan heerka "helitaanka hore", iyo sii daynta cusubi ma dammaanad qaadayso iswaafajinta dambe.

Haddii aan is barbar dhigno CDC iyo habka dhaqanka (marka codsigu si toos ah u akhriyo xogta DBMS), faa'iidooyinkeeda ugu muhiimsan waxaa ka mid ah hirgelinta isbeddelka xogta ee heerka safka oo leh daahitaan hooseeya, isku halleyn sarreeya iyo helitaan. Labada dhibcood ee ugu dambeeya waxaa lagu gaaraa iyada oo la isticmaalayo kooxda Kafka kayd ahaan dhacdooyinka CDC.

Faa'iido kale ayaa ah xaqiiqda ah in hal nooc loo isticmaalo in lagu kaydiyo dhacdooyinka, markaa codsiga dhammaadka maaha inuu ka welwelo nuucyada hawlgalka DBMS ee kala duwan.

Ugu dambeyntii, adeegsiga dallaaliyaha fariinta ayaa u oggolaanaysa codsiyada kormeeraya isbeddellada xogta inay si siman u cabbiraan. Isla mar ahaantaana, saameynta ilaha xogta waa la yareeyaa, maadaama xogta aan si toos ah looga helin DBMS, laakiin laga helo kooxda Kafka.

Ku saabsan dhismaha Debezium

Isticmaalka Debezium waxay ku soo degtaa nidaamkan fudud:

DBMS (sida ilaha xogta) β†’ isku xidhaha Kafka Connect β†’ Apache Kafka β†’ macaamilka

Tusaale ahaan, halkan waa jaantus laga soo qaatay shabakada mashruuca:

Soo bandhigida Debezium - CDC ee Apache Kafka

Si kastaba ha noqotee, runtii ma jecli nidaamkan, sababtoo ah waxay u muuqataa in kaliya isticmaalka isku xirka saxanka ay suurtagal tahay.

Dhab ahaantii, xaaladdu way ka duwan tahay: buuxinta harada xogtaada (xiriirka u dambeeya ee jaantuska sare) Tani maaha habka kaliya ee loo isticmaalo Debezium. Dhacdooyinka loo diro Apache Kafka waxaa isticmaali kara codsiyadaada si ay wax uga qabtaan xaalado kala duwan. Tusaale ahaan:

  • ka saarida xogta aan khusayn khasnadda;
  • diraya ogeysiisyada;
  • cusboonaysiinta index raadinta;
  • nooc ka mid ah diiwaannada hanti dhawrka;
  • ...

Haddii ay dhacdo in aad haysato arji Java oo aanay jirin baahi/suurtogal ah in aad isticmaasho kooxda Kafka, waxa kale oo jirta suurtogalnimada in lagu shaqeeyo isku xiraha-ku-xidhan. Faa'iidada muuqata waa in ay meesha ka saarto baahida kaabayaasha dheeraadka ah (qaabka isku xirka iyo Kafka). Si kastaba ha ahaatee, xalkan waa la joojiyay ilaa nooca 1.1 oo hadda laguma talin in la isticmaalo (taageerada waxaa laga yaabaa in laga saaro siidaynta mustaqbalka).

Maqaalkani waxa uu ka hadli doonaa qaab dhismeedka ay ku taliyaan horumariyayaashu, kaas oo bixiya dulqaadka khaladka iyo miisaanka.

Isku xidhaha qaabaynta

Si aan u bilowno la socodka isbeddelada qiimaha ugu muhiimsan - xogta - waxaan u baahanahay:

  1. isha xogta, taas oo noqon karta MySQL oo ka bilaabmaysa nooca 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (liiska oo dhammaystiran);
  2. Kooxda Apache Kafka;
  3. Tusaale ahaan Kafka Connect (noocyada 1.x, 2.x);
  4. isku xidhka Debezium habaysan.

Ka shaqee labada qodob ee hore, i.e. Habka rakibida ee DBMS iyo Apache Kafka waa ka baxsan baaxadda maqaalka. Si kastaba ha ahaatee, kuwa doonaya in ay geeyaan wax kasta oo ku jira sanduuqa bacaadka, kaydka rasmiga ah ee tusaalooyinka leh ayaa leh diyaarsan. docker-ka kooban.yaml.

Waxaan si faah faahsan uga hadli doonaa labada qodob ee u dambeeya.

0. Kafka Connect

Halkan iyo in ka badan maqaalka, dhammaan tusaalooyinka qaabeynta ayaa looga hadlay macnaha guud ee sawirka Docker ee ay qaybiyeen horumarinta Debezium. Waxay ka kooban tahay dhammaan feylasha plugin ee lagama maarmaanka ah (isku xirayaasha) waxayna bixisaa qaabeynta Kafka Connect iyadoo la adeegsanayo doorsoomayaasha deegaanka.

Haddii aad rabto inaad isticmaasho Kafka Connect ka Confluent, waxaad u baahan doontaa inaad si madax-bannaan ugu darto furayaasha xiriiriyeyaasha lagama maarmaanka ah tusaha lagu cayimay plugin.path ama loo maro doorsoome deegaan CLASSPATH. Dejinta shaqaalaha isku xirka Kafka iyo xirayaasha waxaa lagu go'aamiyaa faylalka qaabeynta ee loo gudbiyo dood ahaan amarka bilaabida shaqaalaha. Faahfaahin dheeraad ah, arag dukumentiyo.

Nidaamka oo dhan ee dejinta Debeizum ee nooca xiriiriyaha waxaa loo fuliyaa laba marxaladood. Bal aynu eegno mid walba:

1. Dejinta qaabka isku xirka Kafka

Si aad xogta ugu gudbiso kutlada Apache Kafka, xuduudo gaar ah ayaa lagu dejiyay qaabka isku xirka Kafka, sida:

  • qiyaasaha isku xirka kooxda,
  • magacyada mawduucyada kuwaas oo qaabeynta isku xirka laftiisa si toos ah loo kaydin doono,
  • Magaca kooxda uu xiriiriyaha ku shaqeynayo (haddii habka loo qaybiyo la isticmaalo).

Sawirka rasmiga ah ee Docker ee mashruucu wuxuu taageeraa qaabeynta iyadoo la adeegsanayo doorsoomayaasha deegaanka - tani waa waxa aan isticmaali doono. Haddaba, soo deji sawirka:

docker pull debezium/connect

Qaybta ugu yar ee doorsoomayaasha deegaanka ee looga baahan yahay in lagu socodsiiyo xiriiriyaha waa sida soo socota:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - liiska bilowga ah ee koox-kooxeedka Kafka si loo helo liis dhamaystiran oo xubnaha kooxda ah;
  • OFFSET_STORAGE_TOPIC=connector-offsets - mawduuc loogu talagalay kaydinta boosaska uu hadda ku yaal xiriiriyaha;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - mawduuca loogu talagalay kaydinta xaaladda xiriiriyaha iyo hawlihiisa;
  • CONFIG_STORAGE_TOPIC=connector-config - mawduuca kaydinta xogta qaabeynta isku xirka iyo hawlaheeda;
  • GROUP_ID=1 - aqoonsiga kooxda shaqaalaha kaas oo hawsha isku xidhka lagu fulin karo; lagama maarmaanka ah marka la isticmaalayo qaybiyey (la qaybiyay) nidaam.

Waxaanu weelka ku daah-furnaa doorsoomayaashan:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Xusuusnow Avro

Sida caadiga ah, Debezium waxay u qortaa xogta qaabka JSON, taas oo la aqbali karo sanduuqyada ciidda iyo qadar yar oo xog ah, laakiin waxay dhibaato ku noqon kartaa kaydadka aadka loo raray. Beddelka beddelka JSON waa in fariimaha la kala saaro iyadoo la isticmaalayo Euro qaab binary ah, kaas oo yareynaya culeyska I/O subsystem ee Apache Kafka.

Si aad u isticmaasho Avro waxaad u baahan tahay inaad geyso mid gooni ah schema-diiwaangelinta (si loo kaydiyo jaantusyada). Doorsoomayaasha beddelka ayaa u ekaan doona sidan:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Faahfaahinta ku saabsan isticmaalka Avro iyo u samaynta diiwaanka iyada ayaa ka baxsan xadka qodobkan - si dheeraad ah, si loo caddeeyo, waxaan isticmaali doonaa JSON.

2. Habaynta xidhiidhiyaha laftiisa

Hadda waxaad si toos ah u aadi kartaa qaabeynta isku xirka laftiisa, kaas oo akhrin doona xogta isha.

Aynu eegno tusaalaha isku xirka labada DBMS: PostgreSQL iyo MongoDB, oo aan khibrad u leeyahay oo ay ku kala duwan yihiin (inkasta oo ay yar yihiin, laakiin xaaladaha qaarkood muhiim!).

Qaabaynta ayaa lagu sifeeyay qoraalka JSON waxaana lagu shubay Kafka Connect iyadoo la adeegsanayo codsi POST.

2.1. PostgreSQL

Tusaale isku xidhaha isku xidhka PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Mabda'a hawlgalka isku xirka ka dib dejintan waa mid fudud:

  • Marka ugu horraysa la bilaabo, waxay ku xidhmaysaa kaydka xogta ee ku qeexan qaabaynta waxayna ku bilaabataa hab sawirkii hore, u diraya Kafka xogta bilowga ah ee la helay iyadoo la adeegsanayo shuruudaha SELECT * FROM table_name.
  • Ka dib bilawga dhamaystirka, xidhiidhiyaha waxa uu galayaa hab si uu u akhriyo isbedelada faylalka PostgreSQL WAL.

Ku saabsan xulashooyinka la isticmaalay:

  • name - magaca isku xirka kaas oo qaabeynta hoos lagu sharraxay loo isticmaalo; mustaqbalka, magacan waxaa loo isticmaalaa in lagu shaqeeyo xiriiriyaha (ie, fiiri heerka / dib u bilaabista / cusboonaysiinta qaabeynta) iyada oo loo marayo Kafka Connect REST API;
  • connector.class - Fasalka isku xirka DBMS kaas oo uu isticmaali doono xiriiriyaha habaysan;
  • plugin.name - magaca plugin-ka loogu talagalay dejinta macquulka ah ee xogta faylasha WAL. La heli karo in laga doorto wal2json, decoderbuffs ΠΈ pgoutput. Labada hore waxay u baahan yihiin rakibaadda kordhinta ku habboon ee DBMS, iyo pgoutput nooca PostgreSQL ee 10 iyo ka sare uma baahna wax-is-daba-marin dheeraad ah;
  • database.* - fursadaha isku xidhka database-ka, halka database.server.name - Magaca tusaalaha PostgreSQL ee loo isticmaalo in lagu sameeyo magaca mawduuca kooxda Kafka;
  • table.include.list - liis jadwal ah oo aan rabno inaan la soconno isbeddellada; ku qeexan qaabka schema.table_name; lama wada isticmaali karo table.exclude.list;
  • heartbeat.interval.ms - inta u dhaxaysa (hal ilbiriqsi) oo xiriiriyaha uu u diro farriimaha garaaca wadnaha mawduuc gaar ah;
  • heartbeat.action.query - codsi la fulin doono marka la dirayo fariin kasta oo garaaca wadnaha (ikhtiyaarka ayaa ka muuqday nooca 1.1);
  • slot.name - magaca booska taranka ee loo isticmaali doono xiriiriyaha;
  • publication.name - Magaca daabacadaha gudaha PostgreSQL, kaas oo xiriiriyaha uu isticmaalo. Haddii aysan jirin, Debezium waxay isku dayi doontaa inay abuurto. Haddii isticmaaleha uu xiriirka la sameeyay uusan u lahayn xuquuq ku filan falkan, xiriiriyaha ayaa ku joojin doona qalad;
  • transforms ayaa go'aamiya sida saxda ah ee loo beddelo magaca mawduuca la beegsanayo:
    • transforms.AddPrefix.type waxay muujinaysaa inaan isticmaali doono tibaaxo joogto ah;
    • transforms.AddPrefix.regex - maaskaro dib u qeexaya magaca mawduuca la beegsanayo;
    • transforms.AddPrefix.replacement - si toos ah waxa aan dib u qeexeyno.

Wax badan oo ku saabsan garaaca wadnaha iyo isbeddellada

Sida caadiga ah, xiriiriyuhu wuxuu u soo diraa xogta Kafka macaamil kasta oo la go'aamiyay, LSN-ka (Lambarka Taxanaha Log) ayaa lagu duubay mawduuca adeegga offset. Laakiin maxaa dhacaya haddii xiriiriyaha loo habeeyey inuu akhriyo xogta oo dhan, laakiin kaliya qayb ka mid ah miisaskeeda (kaas oo xogta xogta aan si joogto ah u dhicin)?

  • Xidhiidhiyuhu waxa uu akhriyi doonaa faylasha WAL mana ogaan doono wax macaamil ganacsi ah oo lagu sameeyo miisaska uu kormeerayo.
  • Sidaa darteed, ma cusboonaysiin doonto booskeeda hadda mawduuca ama booska ku-dabaynta.
  • Tani, iyaduna, waxay keeni doontaa in faylasha WAL lagu hayo saxanka oo ay u badan tahay inay ka dhammaadaan meel diskotig ah.

Waxayna tani tahay halka ay doorashadu ka imanayso samata bixinta. heartbeat.interval.ms ΠΈ heartbeat.action.query. Isticmaalka xulashooyinkan laba-labo waxay suurtogal ka dhigaysaa in la sameeyo codsi lagu beddelayo xogta miis gaar ah mar kasta oo fariin garaaca wadnaha la diro. Sidaa darteed, LSN-ka kaas oo xiriiriyaha uu hadda ku yaal (oo ku yaal booska taranka) ayaa si joogto ah loo cusbooneysiiyaa. Tani waxay u oggolaanaysaa DBMS inay ka saarto faylasha WAL ee aan hadda loo baahnayn. Waxaad wax badan ka baran kartaa sida doorashadu u shaqeeyaan dukumentiyo.

Doorasho kale oo mudan in fiiro gaar ah loo yeesho waa transforms. Inkasta oo ay ka badan tahay ku habboonaanta iyo quruxda...

Sida caadiga ah, Debezium waxay abuurtaa mowduucyo iyadoo adeegsanaysa siyaasadda magac-bixinta ee soo socota: serverName.schemaName.tableName. Tan waxaa laga yaabaa inaysan mar walba ku habboonayn. Ikhtiyaarada transforms Waxaad isticmaali kartaa tibaaxo joogto ah si aad u qeexdo liiska miisaska, dhacdooyinka kuwaas oo loo baahan yahay in loo gudbiyo mawduuc magac gaar ah leh.

In our qaabeynta mahadsanid transforms kuwan soo socda ayaa dhacaya: dhammaan dhacdooyinka CDC ee xogta xogta la kormeeray waxay aadi doonaan mawduuc magaca leh data.cdc.dbname. Haddii kale (la'aanteed goobahan), Debezium waxay si caadi ah u abuuri doontaa mawduuc miis kasta sida: pg-dev.public.<table_name>.

Xaddidaadaha Xiriiriyaha

Si loo soo gabagabeeyo sharraxaadda isku xirka isku xirka PostgreSQL, waxaa habboon in laga hadlo sifooyinka soo socda / xaddidaadda hawlgalkeeda:

  1. Shaqada xiriiriyaha ee PostgreSQL waxay ku tiirsan tahay fikradda furista macquulka ah. Sidaa darteed ayuu ma daba gasho codsiyada lagu beddelayo qaab dhismeedka kaydka xogta (DDL) - sidaas awgeed, xogtani kuma jiri doonto mawduucyada.
  2. Maaddaama boosaska dib-u-celinta la isticmaalo, isku xirka xiriiriyaha waa suurtagal oo keliya Tusaalaha ugu horreeya ee DBMS.
  3. Haddii adeegsadaha uu xiriiriyaha ku xirayo keydka macluumaadka la siiyo xuquuq-akhris-kaliya, ka hor inta aan la bilaabin waxaad u baahan doontaa inaad gacanta ku sameyso booska nuqul ka mid ah oo aad ku daabacdo keydka macluumaadka.

Codsiga qaabeynta

Haddaba, aynu ku shubno qaabayntayada isku xidhka:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Waxaan hubinay in soo dejintu lagu guuleystay oo xiriiriyaha uu bilaabay:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

weyn: waa la dejiyay oo diyaar u ah in la tago. Hadda aan iska dhigno isticmaale oo aan ku xirno Kafka, ka dib waxaan ku dari doonnaa oo beddeleynaa gelitaanka miiska:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mawduucayada waxa loo soo bandhigi doonaa sida soo socota:

JSON aad u dheer oo leh isbedeladayada

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Labada xaaladoodba, diiwaanadu waxay ka kooban yihiin furaha (PK) diiwaanka la beddelay, iyo nuxurka isbeddellada: wixii diiwaanku ahaa ka hor iyo waxa uu noqday ka dib.

  • Marka laga hadlayo INSERT: qiimaha horebefore) la siman yahay null, iyo ka dib - xariiqda la geliyey.
  • Marka laga hadlayo UPDATE: at payload.before xaaladii hore ee xariiqda ayaa la soo bandhigay, iyo in payload.after - cusub oo leh nuxurka isbeddellada.

2.2 MongoDB

Xidhiidhiyahani waxa uu isticmaalaa habka ku celcelinta caadiga ah ee MongoDB, isaga oo akhrinaya macluumaadka oplog ee qanjidhka DBMS ee aasaasiga ah.

Si la mid ah xiriiriyaha hore loogu sharraxay ee PgSQL, halkan, sidoo kale, bilowga ugu horreeya, sawirka xogta aasaasiga ah ayaa la qaadayaa, ka dib xiriiriyaha wuxuu u wareegayaa habka akhriska oplog.

Tusaalaha qaabaynta:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Sida aad arki karto, halkan ma jiraan doorashooyin cusub marka la barbar dhigo tusaalihii hore, laakiin kaliya tirada xulashooyinka mas'uulka ka ah isku xirka xogta iyo horgalayaasha waa la dhimay.

Настройки transforms markan waxay sameeyaan kuwan soo socda: waxay bedelaan magaca mawduuca bartilmaameedka ah ee qorshaha <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

dulqaadka qaladka

Arrinta dulqaadka khaladka iyo helitaanka sare ee waqtigeenna ayaa aad uga sii daran sidii hore - gaar ahaan marka aan ka hadlayno xogta iyo wax kala iibsiga, iyo raadraaca isbeddelka xogta kama istaago arrintan. Aynu eegno waxa khaldami kara mabda’ ahaan iyo waxa Debezium ku dhici doona xaalad kasta.

Waxaa jira saddex doorasho oo ka bixitaan ah:

  1. Kafka Connect fashilka. Haddii Connect loo habeeyey inuu ku shaqeeyo qaab qaybsan, tani waxay u baahan tahay shaqaale badan si ay u dejiyaan isla group.id. Kadib, haddii mid iyaga ka mid ah uu guuldareysto, xiriiriyaha ayaa dib loogu bilaabi doonaa shaqaale kale oo sii wadi doona akhrinta booska ugu dambeeya ee mawduuca Kafka.
  2. Luminta isku xirnaanta kooxda Kafka. Xidhiidhiyuhu wuxuu si fudud u joojin doonaa wax-akhrinta booska ku guul daraystay inuu u soo diro Kafka, wuxuuna isku dayi doonaa inuu mar mar dib u diro ilaa iskudaygu guulaysto.
  3. Isha xogta oo aan la heli karin. Xidhiidhiyuhu wuxuu isku dayi doonaa inuu dib ugu xidho isha sida loo habeeyey. Dabeecaddu waa 16 isku day oo la isticmaalayo dib u dhac jibbaaran. Kadib isku daygii 16aad ee aan guulaysan, hawsha waxa loo calaamadayn doonaa sida guuldareystay waxaadna u baahan doontaa inaad gacanta dib ugu bilawdo adoo isticmaalaya Kafka Connect REST interface.
    • Marka laga hadlayo PostgreSQL Xogtu ma lumin doonto, sababtoo ah Isticmaalka boosaska ku celcelinta waxay kaa ilaalin doontaa inaad tirtirto faylasha WAL ee aanu akhriyin xidhiidhiyaha. Xaaladdan oo kale, waxaa sidoo kale jira hoos u dhac ku yimaada lacagta qadaadiicda: haddii isku xirka shabakada ee u dhexeeya isku xirka iyo DBMS uu xumaado muddo dheer, waxaa suurtagal ah in booska diskku uu dhammaan doono, taasina waxay keeni kartaa fashil. DBMS oo dhan.
    • Marka laga hadlayo MySQL faylasha binlog-ga waxa rogi kara DBMS lafteeda ka hor inta aan la soo celin isku xidhka Tani waxay sababi doontaa xidhiidhiyaha inuu galo xaaladdii fashilantay, iyo si loo soo celiyo hawlgalkii caadiga ahaa, waxaad u baahan doontaa inaad dib u bilowdo qaabka sawir-qaadista ee bilowga ah si aad u sii waddo akhrinta binlogs-yada.
    • on MongoDB. Dukumeentigu wuxuu dhigayaa in habdhaqanka xiriiriyaha haddii ay dhacdo in galalka log/oplog la tirtiray oo xiriiriyaha uusan sii wadi karin akhrinta halka uu ka tagay inay la mid tahay dhammaan DBMS-yada. Waxay ka dhigan tahay in xiriiriyaha uu geli doono gobolka guuldareystay waxayna u baahan doontaa in dib loo bilaabo qaabka sawirkii hore.

      Si kastaba ha ahaatee, waxaa jira waxyaabo ka reeban. Haddii xiriiriyaha uu go'ay muddo dheer (ama uusan gaari karin tusaale ahaan MongoDB), iyo oplog-ku wuxuu dhex maray wareeg inta lagu jiro waqtigan, ka dib markii xiriirka la soo celiyo, xiriiriyaha ayaa si degan u sii wadi doona akhrinta xogta booska ugu horreeya ee la heli karo. Taasi waa sababta qaar ka mid ah xogta Kafka ma ku dhufan doona.

gunaanad

Debezium waa waayo-aragnimadayda ugu horreysa ee hababka CDC iyo guud ahaan mid aad u wanaagsan. Mashruucu wuxuu ku guuleystey taageeradiisa DBMS-yada waaweyn, fududaynta qaabeynta, taageerada kooxaynta, iyo bulshada firfircoon. Kuwa xiisaynaya waxqabadka, waxaan kugula talinayaa inaad akhrido hagayaasha Kafka Connect ΠΈ Debezium.

Marka la barbar dhigo xiriiriyaha JDBC ee Kafka Connect, faa'iidada ugu weyn ee Debezium waa in isbeddelada laga akhriyo diiwaannada DBMS, taas oo u oggolaanaysa in xogta lagu helo daahitaanka ugu yar. Xidhiidhiyaha JDBC (oo ka socda Kafka Connect) waxa uu waydiiyaa miiska la kormeero wakhti go'an iyo (sabab la mid ah) ma soo saaro fariimaha marka xogta la tirtiro (sideed u waydiin kartaa xogta aan jirin?).

Si loo xalliyo dhibaatooyinka la midka ah, waxaad fiiro gaar ah u yeelan kartaa xalalka soo socda (marka lagu daro Debezium):

PS

Sidoo kale ka akhri boggayaga:

Source: www.habr.com

Add a comment