Kuunza Debezium - CDC yeApache Kafka

Kuunza Debezium - CDC yeApache Kafka

Mubasa rangu, ini ndinowanzosangana nehunyanzvi hwekugadzirisa zvigadziriso / zvigadzirwa zvesoftware, ruzivo nezve izvo zviri kushomeka paInternet-mutauro weRussia. Nechinyorwa ichi ndichaedza kuzadza rimwe gaka rakadaro nemuenzaniso kubva kumaitiro angu achangoburwa, pandakada kugadzirisa kutumira zviitiko zveCDC kubva kumaDBMS maviri anozivikanwa (PostgreSQL neMongoDB) kune Kafka cluster uchishandisa Debezium. Ndinovimba kuti chinyorwa ichi chekuongorora, chinoratidzika semugumisiro webasa rakaitwa, chichabatsira kune vamwe.

Chii chinonzi Debezium neCDC zvakazara?

Debezium - mumiriri weCDC software chikamu (Bata Data Shanduko), kana zvakanyanya, iyo seti yekubatanidza kune akasiyana DBMS inoenderana neApache Kafka Connect chimiro.

ichi Open Source project, ine rezinesi pasi peApache License v2.0 uye inotsigirwa neRed Hat. Kubudirira kwave kuenderera mberi kubva 2016 uye parizvino inopa rutsigiro rwepamutemo kune anotevera maDBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Kune zvakare zvinongedzo zveCassandra neOracle, asi panguva ino ivo vari mu "pekutanga kuwana" chimiro, uye kuburitswa kutsva hakuvimbisi kuenderana kumashure.

Kana tikaenzanisa CDC nemaitiro echinyakare (apo chikumbiro chinoverenga data kubva kuDBMS zvakananga), zvakanakira zvaro zvinosanganisira kushandiswa kwekuchinja kwedata kutenderera padanho remutsara ne low latency, kuvimbika kwepamusoro uye kuwanikwa. Mapoinzi maviri ekupedzisira anowanikwa nekushandisa Kafka cluster senzvimbo yekuchengetera zviitiko zveCDC.

Imwe bhenefiti inyaya yekuti imwe modhi inoshandiswa kuchengetedza zviitiko, saka iyo yekupedzisira application haifanirwe kunetseka nezve nuances yekushanda akasiyana maDBMS.

Chekupedzisira, kushandisa meseji broker inobvumira maapplication anotarisisa shanduko mudhata kuti iwedzere yakatwasuka. Panguva imwecheteyo, kukanganisa pane data sosi kunoderedzwa, sezvo data iri kuwanikwa kwete zvakananga kubva kuDBMS, asi kubva kuKafka cluster.

Nezve Debezium architecture

Kushandisa Debezium kunouya pasi kune iri nyore chirongwa:

DBMS (senzvimbo yedata) β†’ chinongedzo muKafka Batanidza β†’ Apache Kafka β†’ mutengi

Semufananidzo, heino dhayagiramu kubva kune webhusaiti yeprojekiti:

Kuunza Debezium - CDC yeApache Kafka

Zvisinei, ini handifariri chirongwa ichi, nokuti zvinoratidzika kuti chete kushandiswa kwekubatanidza sink kunogoneka.

Muchokwadi, mamiriro acho akasiyana: kuzadza yako Data Lake (yekupedzisira link padhayagiramu iri pamusoro) haisiriyo chete nzira yekushandisa Debezium. Zviitiko zvinotumirwa kuApache Kafka zvinogona kushandiswa nezvikumbiro zvako kubata akasiyana mamiriro. Semuyenzaniso:

  • kubvisa data risingakoshi kubva kune cache;
  • kutumira zviziviso;
  • tsvaga index updates;
  • imwe mhando yezvinyorwa zvekuongorora;
  • ...

Kana iwe uine Java application uye pasina chikonzero / mukana wekushandisa Kafka cluster, pane zvakare mukana wekushanda kuburikidza. embedded-connector. Kubatsira kuri pachena ndekwekuti inobvisa kudiwa kwekuwedzera kwezvivakwa (nenzira yekubatanidza uye Kafka). Nekudaro, mhinduro iyi yakabviswa kubva mushanduro 1.1 uye haichakurudzirwe kuti ishandiswe (rutsigiro rwayo runogona kubviswa mukuburitswa mune ramangwana).

Ichi chinyorwa chichakurukura zvivakwa zvinokurudzirwa nevagadziri, izvo zvinopa kukanganisa kushivirira uye scalability.

Connector kugadzirisa

Kuti utange kutevedzera shanduko mune yakakosha kukosha - data - isu tinoda:

  1. data source, inogona kunge iri MySQL kutanga kubva kuvhezheni 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (Complete list);
  2. Apache Kafka cluster;
  3. Kafka Connect muenzaniso (shanduro 1.x, 2.x);
  4. yakagadziriswa Debezium connector.

Shanda pamapoinzi maviri ekutanga, i.e. Iyo yekuisa maitiro eDBMS neApache Kafka inodarika chiyero chechinyorwa. Nekudaro, kune avo vanoda kuendesa zvese mubhokisi rejecha, iyo yepamutemo repository ine mienzaniso ine yakagadzirira-yakagadzirwa. docker-compose.yaml.

Tichagara zvakadzama pamapoinzi maviri ekupedzisira.

0. Kafka Connect

Pano uyezve muchinyorwa, ese magadzirirwo emienzaniso anokurukurwa mumamiriro eiyo Docker mufananidzo wakagoverwa nevagadziri veDebezium. Iyo ine ese anodiwa plugin mafaera (makoniki) uye inopa kumisikidzwa kweKafka Connect uchishandisa nharaunda zvinosiyana.

Kana iwe uchifunga kushandisa Kafka Connect kubva kuConfluent, iwe unozofanirwa kuzvimiririra kuwedzera ma plugins ezvibatanidza zvinodikanwa kune dhairekitori rakatsanangurwa mu. plugin.path kana kuseta kuburikidza neyakasiyana siyana CLASSPATH. Zvigadziriso zveKafka Connect mushandi uye zvibatanidza zvinotemerwa kuburikidza nemafaira ekugadzirisa ayo anopfuudzwa senharo kumutemo wekutangisa mushandi. Kuti uwane rumwe ruzivo, ona zvinyorwa.

Iyo yese maitiro ekumisikidza Debeizum mune yekubatanidza vhezheni inoitwa mumatanho maviri. Ngatitarisei kune imwe neimwe yadzo:

1. Kugadzira Kafka Connect framework

Kufambisa data kuApache Kafka cluster, yakatarwa paramita yakaiswa muKafka Connect chimiro, senge:

  • parameters yekubatanidza kune cluster,
  • mazita emisoro umo iyo dhizaini yekubatanidza pachayo ichachengetwa zvakananga,
  • zita reboka iro chibatanidza chiri kushanda (kana yakagoverwa mode inoshandiswa).

Iyo yepamutemo Docker mufananidzo wepurojekiti inotsigira iyo gadziriso uchishandisa nharaunda zvinosiyana - izvi ndizvo zvatichashandisa. Saka, dhawunirodha mufananidzo:

docker pull debezium/connect

Iyo yakaderera seti yezvakasiyana nharaunda inodiwa kumhanyisa chinongedzo ndeiyi inotevera:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - yekutanga runyorwa rweKafka cluster maseva kuti uwane runyoro rwakakwana rwenhengo dzeboka;
  • OFFSET_STORAGE_TOPIC=connector-offsets - musoro wekuchengetedza nzvimbo uko chinongedzo chiripo;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - musoro wekuchengetedza chimiro chekubatanidza uye mabasa ayo;
  • CONFIG_STORAGE_TOPIC=connector-config - musoro wekuchengetedza yekubatanidza data yekumisikidza uye mabasa ayo;
  • GROUP_ID=1 - chiziviso cheboka revashandi iro basa rekubatanidza rinogona kuitwa; zvinodikanwa kana uchishandisa kugoverwa (yakagoverwa) hurumende.

Isu tinovhura mudziyo nemhando idzi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Cherechedza nezve Avro

Nekusagadzikana, Debezium inonyora dhata muJSON fomati, iyo inogamuchirwa kune sandboxes uye shoma shoma yedata, asi inogona kuve dambudziko mumadhatabheti akaremerwa zvakanyanya. Imwe nzira kune JSON inoshandura ndeye kuenzanisa mameseji uchishandisa Euro kuita binary fomati, iyo inoderedza mutoro paI / O subsystem muApache Kafka.

Kuti ushandise Avro unofanirwa kuendesa yakaparadzana schema-registry (yekuchengetedza madhayagiramu). Izvo zvakasiyana zve converter zvinotaridzika seizvi:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Tsanangudzo pakushandisa Avro uye kumisikidza registry yayo inodarika chiyero chechinyorwa ichi - kuenderera mberi, kujekesa, isu tichashandisa JSON.

2. Kugadzirisa chibatanidza pachacho

Iye zvino iwe unogona kuenda zvakananga kune gadziriro yekubatanidza pachayo, iyo inoverenga data kubva kunobva.

Ngatitarisei muenzaniso wezvibatanidza zveDBMS mbiri: PostgreSQL neMongoDB, umo ndine ruzivo uye mune misiyano (kunyangwe idiki, asi mune dzimwe nguva yakakosha!).

Iyo gadziriso inotsanangurwa muJSON notation uye yakaiswa kuKafka Connect uchishandisa POST chikumbiro.

2.1. PostgreSQL

Muenzaniso wekubatanidza gadziriso yePostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Nheyo yekushanda kwechibatanidza mushure mekugadzirisa iyi iri nyore:

  • Kana yakatangwa kekutanga, inobatanidza kune dhatabhesi inotsanangurwa mukugadziriswa uye inotanga mumodhi mufananidzo wekutanga, kutumira kuKafka seti yekutanga yedata yakawanikwa uchishandisa mamiriro SELECT * FROM table_name.
  • Mushure mekutanga kwapera, chinongedzo chinopinda mumodhi yekuverenga shanduko kubva kuPostgreSQL WAL mafaera.

Nezvesarudzo dzakashandiswa:

  • name - zita rekubatanidza iyo iyo inogadziriswa inotsanangurwa pazasi inoshandiswa; mune ramangwana, zita iri rinoshandiswa kushanda nekubatanidza (kureva, tarisa mamiriro / restart / update gadziriro) kuburikidza neKafka Connect REST API;
  • connector.class - DBMS yekubatanidza kirasi iyo ichashandiswa neyakagadziriswa yekubatanidza;
  • plugin.name -zita replugin rekudhindisa zvine musoro data kubva kuWAL mafaera. Inowanikwa kusarudza kubva wal2json, decoderbuffs ΠΈ pgoutput. Iwo maviri ekutanga anoda kuiswa kweakakodzera ekuwedzera muDBMS, uye pgoutput yePostgreSQL vhezheni 10 uye yepamusoro haidi mamwe manipulations;
  • database.* - sarudzo dzekubatanidza kune database, kupi database.server.name -PostgreSQL zita remuenzaniso rinoshandiswa kugadzira zita remusoro muboka reKafka;
  • table.include.list - rondedzero yematafura atinoda kutevedzera shanduko; yakapihwa mufomati schema.table_name; haigone kushandiswa pamwe chete table.exclude.list;
  • heartbeat.interval.ms - nguva (mu milliseconds) iyo chinongedzo chinotumira mameseji ekurova kwemoyo kune yakakosha musoro;
  • heartbeat.action.query - chikumbiro chinozoitwa kana uchitumira meseji yega yega yekurova kwemoyo (iyo sarudzo yakaonekwa muvhezheni 1.1);
  • slot.name - zita rekudzokorora slot iro richashandiswa nekubatanidza;
  • publication.name - Zita mabhuku muPostgreSQL iyo chinongedzo chinoshandisa. Kana iyo isipo, Debezium inoedza kuigadzira. Kana mushandisi ari pasi pacho chinongedzo chakaitwa asina kodzero dzakakwana dzechiito ichi, chinongedzo chinopedza nekukanganisa;
  • transforms inosarudza chaizvo machinjire ezita remusoro wenyaya:
    • transforms.AddPrefix.type inoratidza kuti tichashandisa matauriro enguva dzose;
    • transforms.AddPrefix.regex - mask inotsanangurazve zita remusoro wenyaya;
    • transforms.AddPrefix.replacement - zvakananga zvatiri kutsanangura patsva.

Zvimwe pamusoro pekurova kwemoyo uye kushandura

Nekutadza, chinongedzo chinotumira data kuKafka kune yega yega dhizaini, uye LSN yayo (Log Sequence Nhamba) inorekodhwa mumusoro webasa. offset. Asi chii chinoitika kana chibatanidza chikagadzirirwa kuverenga kwete dhatabhesi rose, asi chikamu chetafura dzayo (umo zvigadziriso zve data hazviwanzo kuitika)?

  • Iyo yekubatanidza inoverenga mafaera eWAL uye haizoone chero kutengeserana kunoitwa kumatafura airi kutarisa.
  • Naizvozvo, haizogadzirise chinzvimbo chayo chazvino mungave mumusoro wenyaya kana mune yekudzokorora slot.
  • Izvi, zvakare, zvinozoita kuti mafaera eWAL abatwe padhisiki uye pamwe achipera dhisiki nzvimbo.

Uye apa ndipo apo sarudzo dzinouya kuzonunura. heartbeat.interval.ms ΠΈ heartbeat.action.query. Kushandisa sarudzo idzi dziri vaviri vaviri kunoita kuti zvikwanise kuita chikumbiro chekushandura data mutafura yakaparadzana pese panotumirwa meseji yekurova kwemoyo. Saka, iyo LSN pane iyo yekubatanidza iripo ikozvino (muiyo replication slot) inogara ichigadziridzwa. Izvi zvinobvumira DBMS kubvisa mafaira eWAL asingachadiwi. Iwe unogona kudzidza zvakawanda pamusoro pekuti sarudzo dzinoshanda sei mukati zvinyorwa.

Imwe sarudzo yakakodzera kunyatsotariswa ndeye transforms. Kunyangwe zviri zvakawanda nezve nyore uye runako ...

Nekumisikidza, Debezium inogadzira misoro ichishandisa rinotevera rezita remitemo: serverName.schemaName.tableName. Izvi zvingasava nyore nguva dzose. Options transforms Unogona kushandisa matauriro enguva dzose kutsanangura runyoro rwematafura, zviitiko zvinoda kuendeswa kune imwe nyaya ine zita chairo.

Mukugadzirisa kwedu kuvonga ku transforms zvinotevera zvinoitika: zvese zviitiko zveCDC kubva kune inotariswa dhatabhesi zvichaenda kune musoro une zita data.cdc.dbname. Zvikasadaro (pasina zvigadziriso izvi), Debezium yaizongogadzira musoro wetafura yega yega senge: pg-dev.public.<table_name>.

Connector Limitations

Kupedzisa tsananguro yekubatanidza gadziriso yePostgreSQL, zvakakodzera kutaura nezve anotevera maficha / zvipimo zvekushanda kwayo:

  1. Kushanda kwechibatanidza chePostgreSQL kunovimba nepfungwa yekunyora zvine musoro. Naizvozvo iye haiteveri zvikumbiro zvekuchinja dhatabhesi chimiro (DDL) - nekudaro, iyi data haizove mumisoro.
  2. Sezvo kudzokorora slots kuchishandiswa, kubatanidza chinongedzo chinogoneka chete kune inotungamira DBMS muenzaniso.
  3. Kana mushandisi anosungirirwa pasi peiyo yekubatanidza kune dhatabhesi akapihwa kodzero dzekuverenga-chete, saka usati watanga kutanga iwe uchafanirwa kugadzira nemaoko slot yekudzokorora uye kushambadzira kune dhatabhesi.

Kushandisa configuration

Saka ngatiisei gadziriso yedu mukubatanidza:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Isu tinotarisisa kuti kurodha kwakabudirira uye chinongedzo chakatanga:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Makanaka: yagadzirwa uye yagadzirira kuenda. Iye zvino ngatiedzei kuva mutengi uye tobatana neKafka, mushure mezvo tichawedzera nekushandura yekupinda mutafura:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mumusoro wenyaya yedu icharatidzwa sezvizvi:

Yakareba kwazvo JSON nekuchinja kwedu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Muzviitiko zvese izvi, zvinyorwa zvinosanganisira kiyi (PK) yerekodhi yakashandurwa, uye iyo chaiyo shanduko: izvo rekodhi raive risati rave uye kuti rakazovei mushure.

  • Munyaya ye INSERT: kukosha pamberi (before) zvakaenzana null, uye mushure - mutsara wakaiswa.
  • Munyaya ye UPDATE: mukati payload.before iyo yapfuura mamiriro emutsara inoratidzwa, uye mukati payload.after - itsva ine musimboti wekuchinja.

2.2 MongoDB

Ichi chinongedzo chinoshandisa yakajairwa MongoDB kudzokorora mashandiro, kuverenga ruzivo kubva kune oplog yekutanga DBMS node.

Zvakafanana neyakatsanangurwa yekubatanidza yePgSQL, pano, zvakare, pakutanga kwekutanga, iyo yekutanga data snapshot inotorwa, mushure meiyo yekubatanidza inochinja kune oplog yekuverenga modhi.

Configuration muenzaniso:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Sezvauri kuona, hapana zvitsva zvingasarudzwa pano zvichienzaniswa nemuenzaniso wapfuura, asi nhamba chete yezvisarudzo zvine chekuita nekubatanidza kune database uye prefixes yavo yakaderedzwa.

Zvirongwa transforms panguva ino vanoita zvinotevera: vanoshandura zita remusoro wenyaya kubva pane schema <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

kukanganisa kushivirira

Nyaya yekushivirira kukanganisa uye kuwanikwa kwepamusoro munguva yedu yakanyanya kuoma kupfuura nakare kose - kunyanya kana tichitaura nezve data uye kutengeserana, uye yekutevera shanduko yedata haina kumira parutivi munyaya iyi. Ngatitarisei izvo zvinogona kukanganisa musimboti uye chii chichaitika kuDebezium mune yega yega.

Pane nzira nhatu dzekubuda:

  1. Kafka Connect kukundikana. Kana Connect ikagadziriswa kuti ishande mumodhi yakagoverwa, izvi zvinoda vashandi vakawanda kuti vasete boka rimwechete.id. Zvino, kana chimwe chazvo chikatadza, chinongedzo chinotangwazve pane mumwe mushandi uye oenderera mberi nekuverenga kubva panzvimbo yekupedzisira yakazvipira mumusoro weKafka.
  2. Kurasikirwa kwekubatana neKafka cluster. Iyo yekubatanidza inongomira kuverenga pachinzvimbo chatadza kutumira kuKafka, uye nguva nenguva inoedza kuitumirazve kusvikira kuedza kwabudirira.
  3. Kusavapo kwedata. Iyo yekubatanidza inoedza kubatanidza kune sosi sezvayakagadzirirwa. The default ndeye 16 kuedza kushandisa exponential backoff. Mushure mekuedza kwegumi nematanhatu kusingabudiriri, basa racho richaiswa chiratidzo se vakakundikana uye iwe unozofanirwa kuitangazve nemaoko kuburikidza neKafka Batanidza REST interface.
    • Munyaya ye PostgreSQL iyo data haizorasikirwe, nekuti Kushandisa kudzokorora slots kunokudzivirira kubva pakudzima WAL mafaera asina kuverengerwa nekubatanidza. Muchiitiko ichi, panewo kuderera kune mari: kana network yekubatanidza pakati pekubatanidza uye DBMS yakavhiringidzika kwenguva yakareba, pane mukana wekuti nzvimbo ye diski ichapera, uye izvi zvinogona kutungamirira kukukundikana. iyo DBMS yese.
    • Munyaya ye MySQL binlog mafaira anogona kutenderedzwa neDBMS pachayo kubatana kusati kwadzorerwa. Izvi zvichaita kuti chinongedzo chipinde munzvimbo yakakundikana, uye kudzoreredza kushanda kwakajairika, iwe uchafanirwa kutangazve mune yekutanga snapshot modhi kuti uenderere mberi nekuverenga kubva kumabhinlogs.
    • pamusoro MongoDB. Zvinyorwa zvinoti: maitiro ekubatanidza kana faira regi/oplog radzimwa uye chinongedzo hachigone kuenderera mberi nekuverenga kubva pachinzvimbo chayakasiya chakafanana kune ese maDBMS. Zvinoreva kuti chibatanidza chichapinda muhurumende vakakundikana uye zvinoda kutangazve mumodhi mufananidzo wekutanga.

      Zvisinei, pane zvisizvo. Kana iyo yekubatanidza yakadzimwa kwenguva yakareba (kana kusakwanisa kusvika kuMongoDB muenzaniso), uye oplog yakapfuura nekutenderera panguva ino, ipapo kana iyo yekubatanidza yadzoreredzwa, chinongedzo chicharamba chinyararire kuverenga data kubva panzvimbo yekutanga iripo, ndosaka mamwe data muKafka kwete acharova.

mhedziso

Debezium chiitiko changu chekutanga neCDC masisitimu uye yanga yakanaka kwazvo pazere. Iyo purojekiti yakakunda nerutsigiro rwayo kune makuru maDBMS, nyore kugadzirisa, tsigiro yekubatanidza, uye nharaunda inoshanda. Kune avo vanofarira kudzidzira, ini ndinokurudzira kuti uverenge madhairekitori e Kafka Connect ΠΈ Debezium.

Kuenzaniswa neJDBC connector yeKafka Connect, mukana mukuru weDebezium ndeyekuti shanduko dzinoverengwa kubva kuDBMS logs, iyo inobvumira kuti data igamuchirwe neine latency shoma. Iyo JDBC Connector (kubva kuKafka Connect) inobvunza tafura inotariswa panguva yakatarwa uye (nechikonzero chimwe chete) haiburitse mameseji kana data radzimwa (ungabvunza sei data isipo?).

Kuti ugadzirise matambudziko akafanana, unogona kuterera kune anotevera mhinduro (pamwe neDebezium):

PS

Verenga zvakare pablog yedu:

Source: www.habr.com

Voeg