Sethula i-Debezium - CDC ye-Apache Kafka

Sethula i-Debezium - CDC ye-Apache Kafka

Emsebenzini wami, ngivame ukuhlangana nezixazululo ezintsha zobuchwepheshe/imikhiqizo yesofthiwe, ulwazi oluyivelakancane ku-inthanethi yolimi lwesiRashiya. Ngalesi sihloko ngizozama ukugcwalisa isikhala esisodwa esinjalo ngesibonelo esivela kumkhuba wami wakamuva, lapho ngidinga ukulungisa ukuthumela imicimbi ye-CDC kusuka kuma-DBMS amabili adumile (i-PostgreSQL ne-MongoDB) kuqoqo le-Kafka usebenzisa i-Debezium. Ngithemba ukuthi lesi sihloko sokubuyekeza, esivela njengomphumela womsebenzi owenziwe, sizoba usizo kwabanye.

Iyini i-Debezium ne-CDC ngokujwayelekile?

I-Debezium - omele isigaba sesoftware ye-CDC (Thwebula Ushintsho Lwedatha), noma ngokunembe kakhulu, isethi yezixhumi zama-DBMS ahlukahlukene ahambisana nohlaka lwe-Apache Kafka Connect.

Yilokho Iphrojekthi yomthombo ovulekile, inikezwe ilayisense ngaphansi kwe-Apache License v2.0 futhi ixhaswe ngabakwaRed Hat. Ukuthuthukiswa bekulokhu kuqhubeka kusukela ngo-2016 futhi okwamanje kunikeza ukusekelwa okusemthethweni kwama-DBMS alandelayo: i-MySQL, i-PostgreSQL, i-MongoDB, i-SQL Server. Kukhona nezixhumi ze-Cassandra ne-Oracle, kodwa okwamanje zisesimweni "sokufinyelela ngaphambi kwesikhathi", futhi ukukhishwa okusha akuqinisekisi ukuhambisana emuva.

Uma siqhathanisa i-CDC nendlela yendabuko (lapho uhlelo lokusebenza lufunda idatha kusuka ku-DBMS ngokuqondile), izinzuzo zayo eziyinhloko zifaka ukuqaliswa kokusakazwa koshintsho lwedatha ezingeni lomugqa nge-latency ephansi, ukwethembeka okuphezulu nokutholakala. Amaphuzu amabili okugcina atholakala ngokusebenzisa iqoqo le-Kafka njengendawo yokugcina imicimbi ye-CDC.

Enye inzuzo yiqiniso lokuthi imodeli eyodwa isetshenziselwa ukugcina imicimbi, ngakho-ke isicelo sokugcina akudingeki sikhathazeke ngama-nuances wokusebenza kwe-DBMS ehlukene.

Okokugcina, ukusebenzisa umthengisi womlayezo kuvumela izinhlelo zokusebenza eziqapha izinguquko kudatha ukuthi zikhule zivundlile. Ngesikhathi esifanayo, umthelela kumthombo wedatha uyancishiswa, njengoba idatha ayitholakali ngokuqondile ku-DBMS, kodwa kusukela kuqoqo le-Kafka.

Mayelana nezakhiwo ze-Debezium

Ukusebenzisa i-Debezium kwehlela kulolu hlelo olulula:

I-DBMS (njengomthombo wedatha) β†’ isixhumi ku-Kafka Connect β†’ Apache Kafka β†’ umthengi

Njengomfanekiso, ngizonikeza umdwebo ovela kuwebhusayithi yephrojekthi:

Sethula i-Debezium - CDC ye-Apache Kafka

Kodwa-ke, angithandi ngempela lolu hlelo, ngoba kubonakala sengathi isixhumi sikasinki kuphela singenzeka.

Eqinisweni, isimo sihlukile: ukugcwalisa i-Data Lake yakho (isixhumanisi sokugcina kumdwebo ongenhla) Lena akuyona ukuphela kwendlela yokusebenzisa i-Debezium. Imicimbi ethunyelwe ku-Apache Kafka ingasetshenziswa izinhlelo zakho zokusebenza ukuphatha izimo ezahlukahlukene. Ngokwesibonelo:

  • ukususwa kwedatha engabalulekile kunqolobane;
  • ukuthumela izaziso;
  • izibuyekezo zenkomba yokusesha;
  • uhlobo oluthile lwemibhalo yocwaningo;
  • ...

Uma unesicelo se-Java futhi asikho isidingo/amathuba okusebenzisa iqoqo le-Kafka, kukhona nethuba lokusebenza ngokusebenzisa isixhumi esishumekiwe. Inzuzo esobala ukuthi iqeda isidingo sengqalasizinda eyengeziwe (ngendlela yesixhumi ne-Kafka). Nokho, lesi sixazululo sihoxisiwe kusukela kunguqulo 1.1 futhi asisanconyelwe ukusetshenziswa (ukusekelwa kwaso kungase kususwe ekukhishweni okuzayo).

Lesi sihloko sizoxoxa ngezakhiwo ezinconywe abathuthukisi, okunikeza ukubekezelelana kwamaphutha kanye nokulinganisa.

Ukucushwa kwesixhumi

Ukuze siqale ukulandelela izinguquko enanini elibaluleke kakhulu - idatha - sidinga:

  1. umthombo wedatha, okungaba i-MySQL eqala kunguqulo 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (uhlu olugcwele);
  2. Iqoqo le-Apache Kafka;
  3. Isibonelo se-Kafka Connect (izinguqulo 1.x, 2.x);
  4. isixhumi se-Debezium esimisiwe.

Sebenza emaphuzwini amabili okuqala, i.e. Inqubo yokufaka i-DBMS ne-Apache Kafka ingaphezu kobubanzi be-athikili. Kodwa-ke, kulabo abafuna ukuhambisa yonke into ebhokisini lesihlabathi, indawo yokugcina esemthethweni enezibonelo inesenziwe ngomumo. docker-compose.yaml.

Sizohlala ngokuningiliziwe ngamaphuzu amabili okugcina.

0. Kafka Connect

Lapha futhi kamuva esihlokweni, zonke izibonelo zokucushwa zicatshangelwa kumongo wesithombe se-Docker esatshalaliswa abathuthukisi be-Debezium. Iqukethe wonke amafayela e-plugin adingekayo (izixhumi) futhi inikeza ukucushwa kwe-Kafka Connect kusetshenziswa okuguquguqukayo kwemvelo.

Uma uhlose ukusebenzisa i-Kafka Connect kusuka ku-Confluent, uzodinga ukungeza ngokuzimela ama-plugin ezixhumi ezidingekayo kumkhombandlela ocaciswe ku- plugin.path noma usethe nge-variable yemvelo CLASSPATH. Izilungiselelo zesisebenzi se-Kafka Connect nezixhumi zichazwa ngamafayela okumisa adluliswa njengezimpikiswano kumyalo wokuqala wesisebenzi. Ukuze uthole imininingwane bheka imibhalo.

Yonke inqubo yokusetha i-Debeizum enguqulweni yesixhumi yenziwa ngezigaba ezimbili. Ake sicabangele ngayinye yazo:

1. Ukusetha uhlaka lwe-Kafka Connect

Ukuze usakaze idatha kuqoqo le-Apache Kafka, imingcele ethile isethwe kuhlaka lwe-Kafka Connect, njenge:

  • imingcele yokuxhuma ku-cluster,
  • amagama ezihloko lapho ukucushwa kwesixhumi ngokwaso kuzogcinwa khona,
  • igama leqembu lapho isixhumi sisebenza khona (uma imodi yokusabalalisa isetshenziswa).

Isithombe esisemthethweni se-Docker sephrojekthi sisekela ukucushwa kusetshenziswa okuguquguqukayo kwemvelo - lokhu sizokusebenzisa. Ngakho-ke masilande isithombe:

docker pull debezium/connect

Isethi encane yezinto eziguquguqukayo zendawo edingekayo ukuze usebenzise isixhumi limi kanje:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Uhlu lokuqala lwamaseva eqoqo le-Kafka ukuthola uhlu oluphelele lwamalungu eqoqo;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” isihloko sokugcina izikhundla lapho isixhumi sikhona njengamanje;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - isihloko sokugcina isimo sesixhumi nemisebenzi yaso;
  • CONFIG_STORAGE_TOPIC=connector-config - isihloko sokugcina idatha yokumisa isixhumi nemisebenzi yayo;
  • GROUP_ID=1 - isihlonzi seqembu labasebenzi lapho umsebenzi wokuxhuma ungenziwa khona; okudingekayo uma usebenzisa ukusatshalaliswa (kusatshalaliswa) umbuso.

Siqala isiqukathi ngalokhu okuguquguqukayo:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Qaphela ngo-Avro

Ngokuzenzakalela, i-Debezium ibhala idatha ngefomethi ye-JSON, eyamukelekayo kumabhokisi esihlabathi kanye nenani elincane ledatha, kodwa ingaba inkinga kusizindalwazi esilayishwe kakhulu. Enye indlela yesiguquli se-JSON ukwenza imiyalezo ngohlelo usebenzisa euro ibe yifomethi kanambambili, enciphisa umthwalo kusistimu engaphansi ye-I/O ku-Apache Kafka.

Ukuze usebenzise i-Avro udinga ukuphakela ehlukile i-schema-registry (yokugcina imidwebo). Okuguquguqukayo kwe-converter kuzobukeka kanjena:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Imininingwane yokusebenzisa i-Avro nokusetha ukubhalisa kwayo ingaphezu kobubanzi balesi sihloko - ngokuqhubekayo, ukuze kucace, sizosebenzisa i-JSON.

2. Ukumisa isixhumi ngokwaso

Manje ungaya ngqo ekucushweni kwesixhumi ngokwaso, esizofunda idatha emthonjeni.

Ake sibheke isibonelo sezixhumi zama-DBMS amabili: i-PostgreSQL ne-MongoDB, enginolwazi kuyo futhi okunomehluko kuzo (yize kuncane, kodwa kwezinye izimo kubalulekile!).

Ukulungiselelwa kuchazwe ku-JSON notation futhi kulayishwe ku-Kafka Connect kusetshenziswa isicelo sokuTHUMELA.

2.1. I-PostgreSQL

Isibonelo sokucushwa kwesixhumi se-PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Isimiso sokusebenza kwesixhumi ngemva kwalokhu kucushwa silula:

  • Lapho yethulwa okokuqala, ixhuma kusizindalwazi esishiwo ekucushweni futhi iqala ngemodi isifinyezo sokuqala, ukuthumela ku-Kafka isethi yokuqala yedatha etholiwe enombandela SELECT * FROM table_name.
  • Ngemuva kokuthi ukuqalisa sekuqediwe, isixhumi singena kumodi yezinguquko zokufunda ukusuka kumafayela e-PostgreSQL WAL.

Mayelana nezinketho ezisetshenzisiwe:

  • name - igama lesixhumi esisetshenziselwa ukumisa okuchazwe ngezansi; ngokuzayo, leli gama lisetshenziselwa ukusebenza nesixhumi (okungukuthi buka isimo / qala kabusha / ubuyekeze ukucushwa) nge-Kafka Connect REST API;
  • connector.class - Isigaba sesixhumi se-DBMS esizosetshenziswa isixhumi esimisiwe;
  • plugin.name igama le-plugin lokuqoshwa okunengqondo kwedatha kumafayela e-WAL. Iyatholakala ongakhetha kukho wal2json, decoderbuffs ΠΈ pgoutput. Okubili kokuqala kudinga ukufakwa kwezandiso ezifanele ku-DBMS, futhi pgoutput kunguqulo 10 ye-PostgreSQL nangaphezulu ayidingi ukukhohlisa okwengeziwe;
  • database.* β€” izinketho zokuxhuma kusizindalwazi, lapho database.server.name - igama lesibonelo se-PostgreSQL elisetshenziselwa ukwakha igama lesihloko kuqoqo le-Kafka;
  • table.include.list β€” uhlu lwamathebula esifuna ukulandelela kuwo izinguquko; ecaciswe ngefomethi schema.table_name; ayikwazi ukusetshenziswa kanyekanye table.exclude.list;
  • heartbeat.interval.ms β€” isikhawu (ngama-millisecond) lapho isixhumi sithumela imiyalezo yokushaya kwenhliziyo esihlokweni esikhethekile;
  • heartbeat.action.query - isicelo esizokwenziwa lapho kuthunyelwa umyalezo ngamunye wokushaya kwenhliziyo (inketho ivele kunguqulo 1.1);
  • slot.name - igama lesikhala sokuphindaphinda elizosetshenziswa isixhumi;
  • publication.name - Igama izincwadi ku-PostgreSQL, esetshenziswa isixhumi. Uma ingekho, i-Debezium izozama ukuyidala. Uma umsebenzisi oxhunywe ngaphansi kwakhe engenawo amalungelo anele alesi senzo, isixhumi sizonqamula ngephutha;
  • transforms inquma ngokuqondile ukuthi lingashintsha kanjani igama lesihloko okuhloswe ngaso:
    • transforms.AddPrefix.type ibonisa ukuthi sizosebenzisa izinkulumo ezivamile;
    • transforms.AddPrefix.regex - imaski echaza kabusha igama lesihloko okuhlosiwe;
    • transforms.AddPrefix.replacement - ngokuqondile lokho esikuchaza kabusha.

Okuningi mayelana nokushaya kwenhliziyo noguquko

Ngokuzenzakalelayo, isixhumi sithumela idatha ku-Kafka ngomsebenzi ngamunye owenziwe, futhi i-LSN yayo (Inombolo Yokulandelana Kwelogi) irekhodwa esihlokweni sesevisi. offset. Kodwa kwenzekani uma isixhumi silungiselelwe ukuthi singafundi yonke imininingwane egciniwe, kodwa ingxenye yamathebula aso kuphela (lapho ukubuyekezwa kwedatha kungenzeki khona njalo)?

  • Isixhumi sizofunda amafayela we-WAL futhi singatholi okwenziwa kuwo kumathebula esiwaqaphayo.
  • Ngakho-ke, ngeke ibuyekeze indawo yayo yamanje noma esihlokweni noma endaweni yokuphindaphinda.
  • Lokhu, kuzodala ukuthi amafayela we-WAL "anamathele" kudiski futhi cishe azophelelwa isikhala sediski.

Futhi lapha izinketho zisiza. heartbeat.interval.ms ΠΈ heartbeat.action.query. Ukusebenzisa lezi zinketho ngababili kwenza kube nokwenzeka ukwenza isicelo sokushintsha idatha kuthebula elihlukile ngaso sonke isikhathi lapho kuthunyelwa umlayezo wokushaya kwenhliziyo. Ngakho, i-LSN okukhona kuyo isixhumi (esikhala sokuphindaphinda) ivuselelwa njalo. Lokhu kuvumela i-DBMS ukuthi ikhiphe amafayela e-WAL angasadingeki. Ungafunda kabanzi mayelana nokuthi izinketho zisebenza kanjani imibhalo.

Enye inketho efanele ukunakwa kakhulu transforms. Yize imayelana nokunethezeka nobuhle...

Ngokuzenzakalelayo, i-Debezium idala izihloko isebenzisa inqubomgomo yokuqamba elandelayo: serverName.schemaName.tableName. Lokhu kungase kungabi lula ngaso sonke isikhathi. Izinketho transforms Ungasebenzisa izinkulumo ezivamile ukuze uchaze uhlu lwamathebula, izehlakalo okudingeka zidluliselwe esihlokweni esinegama elithile.

Ekucushweni kwethu siyabonga transforms okulandelayo kuyenzeka: yonke imicimbi ye-CDC evela kusizindalwazi esigadwayo izoya esihlokweni esinegama data.cdc.dbname. Uma kungenjalo (ngaphandle kwalezi zilungiselelo), i-Debezium ngokuzenzakalela izodala isihloko setafula ngalinye njenge: pg-dev.public.<table_name>.

Imikhawulo yesixhumi

Ukuphetha incazelo yokucushwa kwesixhumi se-PostgreSQL, kufanelekile ukukhuluma ngezici/imikhawulo yokusebenza kwayo:

  1. Ukusebenza kwesixhumi se-PostgreSQL kuncike kumqondo wokuqoshwa okunengqondo. Ngakho-ke yena ayilandeleli izicelo zokushintsha ukwakheka kwesizindalwazi (DDL) - ngokufanele, le datha ngeke ibe sezihloko.
  2. Njengoba kusetshenziswa izikhala zokuphindaphinda, ukuxhumana kwesixhumi kungenzeka kuphela kusibonelo esiholayo se-DBMS.
  3. Uma umsebenzisi lapho isixhumi sixhumeka ngaphansi kwaso kusizindalwazi enamalungelo okufunda kuphela, ngakho-ke ngaphambi kokwethulwa kokuqala, uzodinga ukwenza mathupha imbobo yokuphindaphinda futhi ushicilele kusizindalwazi.

Isebenzisa ukumisa

Ngakho-ke masilayishe ukumisa kwethu kusixhumi:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Sihlola ukuthi ukulanda kube yimpumelelo futhi isixhumi siqale:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Khulu: seyilungisiwe futhi isilungele ukuhamba. Manje ake senze sengathi singumthengi futhi sixhume ku-Kafka, ngemva kwalokho sizofaka futhi sishintshe okufakiwe etafuleni:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Esihlokweni sethu kuzovezwa kanje:

I-JSON ende kakhulu nezinguquko zethu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Kuzo zombili izimo, amarekhodi aqukethe ukhiye (PK) werekhodi elishintshiwe, kanye nengqikithi yezinguquko: ukuthi irekhodi laliyini ngaphambili nokuthi libe yini ngemva kwalokho.

  • Endabeni INSERT: inani ngaphambili (before) kuyalingana null, futhi ngemva - umugqa owawufakiwe.
  • Endabeni UPDATE: at payload.before isimo sangaphambilini somugqa siyaboniswa, futhi ku payload.after - entsha nengqikithi yezinguquko.

2.2 I-MongoDB

Lesi sixhumi sisebenzisa indlela evamile yokuphindaphinda ye-MongoDB, ukufunda ulwazi ku-oplog yenodi eyinhloko ye-DBMS.

Ngokufanayo nesixhumi esichazwe kakade se-PgSQL, lapha, futhi, ekuqaleni kokuqala, isifinyezo sedatha esiyinhloko sithathwa, ngemva kwalokho isixhumi sishintshela kumodi yokufunda ye-oplog.

Isibonelo sokumisa:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Njengoba ubona, azikho izinketho ezintsha lapha uma kuqhathaniswa nesibonelo sangaphambilini, kodwa kuphela inani lezinketho ezinesibopho sokuxhuma ku-database kanye neziqalo zazo zincishisiwe.

Izilungiselelo transforms kulokhu benza lokhu okulandelayo: baguqula igama lesihloko okuhloswe ngaso lisuke ku-schema <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

ukubekezelelana kwamaphutha

Indaba yokubekezelela amaphutha nokutholakala okuphezulu esikhathini sethu ishube kakhulu kunangaphambili - ikakhulukazi uma sikhuluma ngedatha nemisebenzi, futhi ukulandelela izinguquko zedatha akumi eceleni kulolu daba. Ake sibheke ukuthi yini engahamba kahle ngokomgomo nokuthi kuzokwenzekani kuDebezium esimweni ngasinye.

Kunezinketho ezintathu zokuphuma:

  1. Ukuhluleka kwe-Kafka Connect. Uma i-Connect ilungiselelwe ukuthi isebenze kumodi esabalalisiwe, lokhu kudinga abasebenzi abaningi ukuze basethe i-group.id efanayo. Bese, uma eyodwa yazo ihluleka, isixhumi sizoqalwa kabusha kwesinye isisebenzi futhi siqhubeke nokufunda ukusuka endaweni yokugcina yokuzibophezela esihlokweni esise-Kafka.
  2. Ukulahleka kokuxhumana neqoqo le-Kafka. Isixhumi sizomane siyeke ukufunda endaweni ehlulekile ukuthumela ku-Kafka, futhi sizozama ngezikhathi ezithile ukusithumela kabusha kuze kube yilapho umzamo uphumelela.
  3. Umthombo wedatha awutholakali. Isixhumi sizozama ukuxhuma kabusha emthonjeni ngokuya ngokucushwa. Okuzenzakalelayo yimizamo engu-16 usebenzisa i-exponential backoff. Ngemuva komzamo we-16 wehlulekile, umsebenzi uzomakwa ngokuthi uhlulekile futhi uzodinga ukuyiqalisa kabusha ngesandla nge-interface ye-Kafka Connect REST.
    • Endabeni I-PostgreSQL idatha ngeke ilahleke, ngoba Ukusebenzisa izikhala zokuphindaphinda kuzokuvimbela ekususeni amafayela e-WAL angafundiwe isixhumi. Kulesi simo, kukhona futhi uhlamvu lwemali: uma uxhumano lwenethiwekhi phakathi kwesixhumi ne-DBMS luphazamiseka isikhathi eside, kungenzeka ukuthi isikhala sediski sizophela, futhi lokhu kungaholela ekuhlulekeni yonke i-DBMS.
    • Endabeni MySQL amafayela e-binlog angazungeziswa yi-DBMS ngokwayo ngaphambi kokuba uxhumano lubuyiselwe. Lokhu kuzobangela isixhumi ukuthi siye esimweni sokuhluleka, futhi ukuze ubuyisele ukusebenza okuvamile, uzodinga ukuqala kabusha kumodi yesifinyezo sokuqala ukuze uqhubeke ufunda kuma-binlogs.
    • Mayelana I-MongoDB. Amadokhumenti athi: ukuziphatha kwesixhumi esimweni lapho amafayela elogi/oplog esusiwe futhi isixhumi asikwazi ukuqhubeka nokufunda sisuka lapho siyeke khona kuyefana kuwo wonke ama-DBMS. Kusho ukuthi isixhumi sizongena esimeni uhlulekile futhi izodinga ukuqala kabusha kumodi isifinyezo sokuqala.

      Nokho, kukhona okuhlukile. Uma isixhumi sinqanyuliwe isikhathi eside (noma asikwazanga ukufinyelela isibonelo se-MongoDB), futhi i-oplog idlulile phakathi nalesi sikhathi, khona-ke lapho uxhumano kubuyiselwa, isixhumi sizoqhubeka ngokuthula ukufunda idatha kusukela endaweni yokuqala etholakalayo, yingakho enye idatha e-Kafka hhayi izoshaya.

isiphetho

I-Debezium isipiliyoni sami sokuqala ngezinhlelo ze-CDC futhi ibe yinhle kakhulu kukonke. Le phrojekthi yafumbathisa ukusekelwa kwe-DBMS eyinhloko, ukumisa kalula, ukusekelwa kokuhlanganisa kanye nomphakathi osebenzayo. Kulabo abathanda ukuzijwayeza, ngincoma ukuthi ufunde imihlahlandlela Kafka Connect ΠΈ I-Debezium.

Uma kuqhathaniswa nesixhumi se-JDBC se-Kafka Connect, inzuzo enkulu ye-Debezium ukuthi izinguquko zifundwa kusukela kulogi lwe-DBMS, okuvumela idatha ukuthi yamukelwe ngokubambezeleka okuncane. I-JDBC Connector (kusuka ku-Kafka Connect) ibuza ithebula eligadwayo ngesikhathi esinqunyiwe futhi (ngesizathu esifanayo) ayikhiqizi imilayezo lapho idatha isuswa (ungabuza kanjani idatha engekho?).

Ukuxazulula izinkinga ezifanayo, unganaka lezi zixazululo ezilandelayo (ngaphezu kwe-Debezium):

PS

Funda futhi kubhulogi yethu:

Source: www.habr.com

Engeza amazwana