Cyflwyno Debezium - CDC ar gyfer Apache Kafka

Cyflwyno Debezium - CDC ar gyfer Apache Kafka

Yn fy ngwaith, rwy'n aml yn dod ar draws datrysiadau technegol / cynhyrchion meddalwedd newydd, y mae gwybodaeth amdanynt braidd yn brin ar y Rhyngrwyd sy'n siarad Rwsieg. Gyda'r erthygl hon, byddaf yn ceisio llenwi un bwlch o'r fath gydag enghraifft o fy arfer diweddar, pan oedd angen i mi sefydlu anfon digwyddiadau CDC o ddau DBMS poblogaidd (PostgreSQL a MongoDB) i glwstwr Kafka gan ddefnyddio Debezium. Rwy'n gobeithio y bydd yr erthygl adolygu hon, a ymddangosodd o ganlyniad i'r gwaith a wnaed, yn ddefnyddiol i eraill.

Beth yw Debezium a CDC yn gyffredinol?

Debezium - Cynrychiolydd y categori meddalwedd CDC (Dal newid data), neu yn fwy manwl gywir, mae'n set o gysylltwyr ar gyfer amrywiol DBMSs sy'n gydnaws â fframwaith Apache Kafka Connect.

Mae'n prosiect ffynhonnell agored, wedi'i drwyddedu o dan Drwydded Apache v2.0 ac wedi'i noddi gan Red Hat. Mae datblygiad wedi bod ar y gweill ers 2016 ac ar hyn o bryd mae'n darparu cefnogaeth swyddogol ar gyfer y DBMS canlynol: MySQL, PostgreSQL, MongoDB, SQL Server. Mae yna hefyd gysylltwyr ar gyfer Cassandra ac Oracle, ond ar hyn o bryd maent mewn statws "mynediad cynnar", ac nid yw datganiadau newydd yn gwarantu cydnawsedd yn ôl.

Os byddwn yn cymharu CDC â'r dull traddodiadol (pan fydd y rhaglen yn darllen data o'r DBMS yn uniongyrchol), yna mae ei brif fanteision yn cynnwys gweithredu ffrydio newid data ar lefel rhes gyda hwyrni isel, dibynadwyedd uchel ac argaeledd. Cyflawnir y ddau bwynt olaf trwy ddefnyddio clwstwr Kafka fel ystorfa ar gyfer digwyddiadau CDC.

Hefyd, mae'r manteision yn cynnwys y ffaith bod un model yn cael ei ddefnyddio i storio digwyddiadau, felly nid oes rhaid i'r cais terfynol boeni am naws gweithredu gwahanol DBMS.

Yn olaf, mae defnyddio brocer negeseuon yn agor y drws ar gyfer graddio cymwysiadau sy'n olrhain newidiadau mewn data yn llorweddol. Ar yr un pryd, mae'r effaith ar y ffynhonnell ddata yn cael ei lleihau, gan fod data'n cael ei dderbyn nid yn uniongyrchol gan y DBMS, ond gan glwstwr Kafka.

Ynglŷn â phensaernïaeth Debezium

Mae defnyddio Debezium yn dibynnu ar y cynllun syml hwn:

DBMS (fel ffynhonnell ddata) → cysylltydd yn Kafka Connect → Apache Kafka → defnyddiwr

Fel enghraifft, byddaf yn rhoi diagram o wefan y prosiect:

Cyflwyno Debezium - CDC ar gyfer Apache Kafka

Fodd bynnag, nid wyf yn hoff iawn o'r cynllun hwn, oherwydd mae'n ymddangos mai dim ond cysylltydd sinc sy'n bosibl.

Mewn gwirionedd, mae'r sefyllfa'n wahanol: llenwi'ch Llyn Data (dolen olaf yn y diagram uchod) nid dyma'r unig ffordd i ddefnyddio Debezium. Gall digwyddiadau a anfonir at Apache Kafka gael eu defnyddio gan eich ceisiadau i ddatrys sefyllfaoedd amrywiol. Er enghraifft:

  • tynnu data amherthnasol o'r storfa;
  • anfon hysbysiadau;
  • diweddariadau mynegai chwilio;
  • rhyw fath o gofnodion archwilio;
  • ...

Rhag ofn bod gennych raglen Java ac nad oes angen/posibilrwydd i ddefnyddio clwstwr Kafka, mae posibilrwydd hefyd i weithio drwyddo. cysylltydd wedi'i fewnosod. Y fantais amlwg yw y gallwch chi wrthod seilwaith ychwanegol gydag ef (ar ffurf cysylltydd a Kafka). Fodd bynnag, mae'r datrysiad hwn wedi'i anghymeradwyo ers fersiwn 1.1 ac nid yw bellach yn cael ei argymell i'w ddefnyddio (gellir ei ddileu mewn datganiadau yn y dyfodol).

Bydd yr erthygl hon yn trafod y bensaernïaeth a argymhellir gan ddatblygwyr, sy'n darparu goddefgarwch bai a scalability.

Ffurfweddiad cysylltydd

Er mwyn dechrau olrhain newidiadau yn y gwerth - data pwysicaf - mae angen:

  1. ffynhonnell ddata, a all fod yn MySQL gan ddechrau o fersiwn 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (rhestr gyflawn);
  2. Clwstwr Apache Kafka
  3. Enghraifft Kafka Connect (fersiynau 1.x, 2.x);
  4. cysylltydd Debezium wedi'i ffurfweddu.

Gweithiwch ar y ddau bwynt cyntaf, h.y. mae'r broses o osod DBMS ac Apache Kafka y tu hwnt i gwmpas yr erthygl. Fodd bynnag, i'r rhai sydd am ddefnyddio popeth mewn blwch tywod, mae un parod yn y gadwrfa swyddogol gydag enghreifftiau docwr-gyfansoddi.yaml.

Byddwn yn canolbwyntio ar y ddau bwynt olaf yn fwy manwl.

0. Cyswllt Kafka

Yma ac yn ddiweddarach yn yr erthygl, ystyrir yr holl enghreifftiau cyfluniad yng nghyd-destun delwedd Docker a ddosberthir gan ddatblygwyr Debezium. Mae'n cynnwys yr holl ffeiliau ategyn angenrheidiol (cysylltwyr) ac yn darparu cyfluniad Kafka Connect gan ddefnyddio newidynnau amgylchedd.

Os ydych yn bwriadu defnyddio Kafka Connect o Confluent, bydd angen i chi ychwanegu ategion y cysylltwyr angenrheidiol eich hun i'r cyfeiriadur a nodir yn plugin.path neu wedi'i osod trwy newidyn amgylchedd CLASSPATH. Mae'r gosodiadau ar gyfer y gweithiwr Kafka Connect a'r cysylltwyr yn cael eu diffinio trwy ffeiliau ffurfweddu sy'n cael eu trosglwyddo fel dadleuon i'r gorchymyn cychwyn gweithiwr. Am fanylion gweler dogfennaeth.

Mae'r broses gyfan o sefydlu Debeizum yn y fersiwn cysylltydd yn cael ei wneud mewn dau gam. Gadewch i ni ystyried pob un ohonynt:

1. Sefydlu fframwaith Kafka Connect

I ffrydio data i glwstwr Apache Kafka, gosodir paramedrau penodol yn fframwaith Kafka Connect, megis:

  • gosodiadau cysylltiad clwstwr,
  • enwau pynciau y bydd cyfluniad y cysylltydd ei hun yn cael ei storio ynddynt,
  • enw'r grŵp y mae'r cysylltydd yn rhedeg ynddo (rhag ofn defnyddio modd dosbarthedig).

Mae delwedd swyddogol Docker y prosiect yn cefnogi cyfluniad gan ddefnyddio newidynnau amgylchedd - dyma'r hyn y byddwn yn ei ddefnyddio. Felly gadewch i ni lawrlwytho'r ddelwedd:

docker pull debezium/connect

Mae'r set leiaf o newidynnau amgylchedd sydd eu hangen i redeg y cysylltydd fel a ganlyn:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - rhestr gychwynnol o weinyddion clwstwr Kafka i gael rhestr gyflawn o aelodau'r clwstwr;
  • OFFSET_STORAGE_TOPIC=connector-offsets - pwnc ar gyfer storio safleoedd lle mae'r cysylltydd wedi'i leoli ar hyn o bryd;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - pwnc ar gyfer storio statws y cysylltydd a'i dasgau;
  • CONFIG_STORAGE_TOPIC=connector-config - pwnc ar gyfer storio data cyfluniad cysylltydd a'i dasgau;
  • GROUP_ID=1 — dynodwr y grŵp o weithwyr y gellir cyflawni tasg y cysylltydd arno; sy'n ofynnol wrth ddefnyddio dosbarthedig (dosbarthwyd) drefn.

Rydyn ni'n dechrau'r cynhwysydd gyda'r newidynnau hyn:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nodyn am Avro

Yn ddiofyn, mae Debezium yn ysgrifennu data mewn fformat JSON, sy'n dderbyniol ar gyfer blychau tywod a symiau bach o ddata, ond gall fod yn broblem mewn cronfeydd data sydd wedi'u llwytho'n drwm. Dewis arall yn lle'r trawsnewidydd JSON yw cyfresoli negeseuon gan ddefnyddio ewro i fformat deuaidd, sy'n lleihau'r llwyth ar yr is-system I / O yn Apache Kafka.

I ddefnyddio Avro, mae angen i chi ddefnyddio un ar wahân sgema-gofrestrfa (ar gyfer storio sgemâu). Bydd y newidynnau ar gyfer y trawsnewidydd yn edrych fel hyn:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Mae manylion ar ddefnyddio Avro a sefydlu cofrestrfa ar ei gyfer y tu hwnt i gwmpas yr erthygl - ymhellach, er eglurder, byddwn yn defnyddio JSON.

2. Sefydlu'r cysylltydd ei hun

Nawr gallwch chi fynd yn uniongyrchol i gyfluniad y cysylltydd ei hun, a fydd yn darllen data o'r ffynhonnell.

Edrychwn ar yr enghraifft o gysylltwyr ar gyfer dau DBMS: PostgreSQL a MongoDB, y mae gennyf brofiad ar eu cyfer ac y mae gwahaniaethau ar eu cyfer (er eu bod yn fach, ond mewn rhai achosion yn arwyddocaol!).

Disgrifir y ffurfweddiad yn nodiant JSON a'i uwchlwytho i Kafka Connect gan ddefnyddio cais POST.

2.1. PostgreSQL

Ffurfweddiad cysylltydd enghreifftiol ar gyfer PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Mae egwyddor gweithredu'r cysylltydd ar ôl y cyfluniad hwn yn eithaf syml:

  • Ar y cychwyn cyntaf, mae'n cysylltu â'r gronfa ddata a nodir yn y ffurfweddiad ac yn dechrau yn y modd cipolwg cychwynnol, anfon at Kafka y set gychwynnol o ddata a dderbyniwyd gyda'r amodol SELECT * FROM table_name.
  • Ar ôl cwblhau'r cychwyniad, mae'r cysylltydd yn mynd i mewn i'r modd darllen newidiadau o ffeiliau PostgreSQL WAL.

Am yr opsiynau a ddefnyddiwyd:

  • name — enw'r cysylltydd y defnyddir y cyfluniad a ddisgrifir isod ar ei gyfer; yn y dyfodol, defnyddir yr enw hwn i weithio gyda'r cysylltydd (h.y. gweld y statws / ailgychwyn / diweddaru'r ffurfweddiad) trwy API REST Kafka Connect;
  • connector.class — y dosbarth cysylltydd DBMS a ddefnyddir gan y cysylltydd wedi'i ffurfweddu;
  • plugin.name yw enw'r ategyn ar gyfer datgodio data o ffeiliau WAL yn rhesymegol. Ar gael i ddewis ohonynt wal2json, decoderbuffs и pgoutput. Mae'r ddau gyntaf yn gofyn am osod yr estyniadau priodol yn y DBMS, a pgoutput ar gyfer PostgreSQL fersiwn 10 ac uwch nid oes angen manipulations ychwanegol;
  • database.* — opsiynau ar gyfer cysylltu â'r gronfa ddata, ble database.server.name - enw'r enghraifft PostgreSQL a ddefnyddiwyd i ffurfio enw'r pwnc yng nghlwstwr Kafka;
  • table.include.list - rhestr o dablau yr ydym am olrhain newidiadau ynddynt; a roddir yn y fformat schema.table_name; ni ellir ei ddefnyddio ynghyd â table.exclude.list;
  • heartbeat.interval.ms — yr egwyl (mewn milieiliadau) pan fydd y cysylltydd yn anfon negeseuon curiad y galon at bwnc arbennig;
  • heartbeat.action.query - cais a fydd yn cael ei weithredu wrth anfon pob neges curiad calon (mae'r opsiwn wedi ymddangos ers fersiwn 1.1);
  • slot.name — enw'r slot atgynhyrchu a ddefnyddir gan y cysylltydd;
  • publication.name — Enw Cyhoeddi yn PostgreSQL y mae'r cysylltydd yn ei ddefnyddio. Rhag ofn nad yw'n bodoli, bydd Debezium yn ceisio ei greu. Os nad oes gan y defnyddiwr y gwneir y cysylltiad oddi tano ddigon o hawliau ar gyfer y weithred hon, bydd y cysylltydd yn gadael gyda gwall;
  • transforms yn penderfynu sut yn union i newid enw'r pwnc targed:
    • transforms.AddPrefix.type yn dynodi y byddwn yn defnyddio ymadroddion rheolaidd;
    • transforms.AddPrefix.regex — mwgwd ar gyfer ailddiffinio enw'r testun targed;
    • transforms.AddPrefix.replacement - yn uniongyrchol yr hyn yr ydym yn ei ailddiffinio.

Mwy am guriad calon a thrawsnewid

Yn ddiofyn, mae'r cysylltydd yn anfon data i Kafka ar gyfer pob trafodiad ymrwymedig, ac yn ysgrifennu ei LSN (Rhif Dilyniant Log) i'r pwnc gwasanaeth offset. Ond beth sy'n digwydd os yw'r cysylltydd wedi'i ffurfweddu i ddarllen nid y gronfa ddata gyfan, ond dim ond rhan o'i dablau (lle mae data'n cael ei ddiweddaru'n anaml)?

  • Bydd y cysylltydd yn darllen ffeiliau WAL ac ni fydd yn canfod ymrwymiadau trafodion ynddynt i'r tablau y mae'n eu monitro.
  • Felly, ni fydd yn diweddaru ei sefyllfa bresennol naill ai yn y pwnc nac yn y slot atgynhyrchu.
  • Bydd hyn, yn ei dro, yn achosi i'r ffeiliau WAL fod yn "sownd" ar ddisg ac yn debygol o redeg allan o ofod disg.

Ac yma daw opsiynau i'r adwy. heartbeat.interval.ms и heartbeat.action.query. Mae defnyddio'r opsiynau hyn mewn parau yn ei gwneud hi'n bosibl gweithredu cais i newid data mewn tabl ar wahân bob tro yr anfonir neges curiad calon. Felly, mae'r LSN y mae'r cysylltydd wedi'i leoli arno ar hyn o bryd (yn y slot atgynhyrchu) yn cael ei ddiweddaru'n gyson. Mae hyn yn caniatáu i'r DBMS ddileu ffeiliau WAL nad oes eu hangen mwyach. I gael rhagor o wybodaeth am sut mae opsiynau’n gweithio, gweler dogfennaeth.

Opsiwn arall sy'n haeddu sylw agosach yw transforms. Er ei fod yn ymwneud yn fwy â chyfleustra a harddwch ...

Yn ddiofyn, mae Debezium yn creu pynciau gan ddefnyddio'r polisi enwi canlynol: serverName.schemaName.tableName. Efallai na fydd hyn bob amser yn gyfleus. Opsiynau transforms gan ddefnyddio ymadroddion rheolaidd, gallwch ddiffinio rhestr o dablau y mae angen cyfeirio eu digwyddiadau at bwnc ag enw penodol.

Yn ein cyfluniad diolch i transforms mae'r canlynol yn digwydd: bydd pob digwyddiad CDC o'r gronfa ddata wedi'i olrhain yn mynd i'r pwnc gyda'r enw data.cdc.dbname. Fel arall (heb y gosodiadau hyn), byddai Debezium yn ddiofyn yn creu pwnc ar gyfer pob tabl o'r ffurflen: pg-dev.public.<table_name>.

Cyfyngiadau cysylltydd

Ar ddiwedd y disgrifiad o gyfluniad y cysylltydd ar gyfer PostgreSQL, mae'n werth siarad am y nodweddion / cyfyngiadau canlynol o'i waith:

  1. Mae ymarferoldeb cysylltydd PostgreSQL yn dibynnu ar y cysyniad o ddatgodio rhesymegol. Am hynny efe nid yw'n olrhain ceisiadau i newid strwythur y gronfa ddata (DDL) - yn unol â hynny, ni fydd y data hwn yn y pynciau.
  2. Gan fod slotiau atgynhyrchu yn cael eu defnyddio, mae cysylltiad y cysylltydd yn bosibl yn unig i'r enghraifft meistr DBMS.
  3. Os oes gan y defnyddiwr y mae'r cysylltydd yn cysylltu â'r gronfa ddata oddi tano hawliau darllen yn unig, yna cyn y lansiad cyntaf, bydd angen i chi greu slot atgynhyrchu â llaw a'i gyhoeddi i'r gronfa ddata.

Cymhwyso Cyfluniad

Felly gadewch i ni lwytho ein cyfluniad i'r cysylltydd:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Rydym yn gwirio bod y lawrlwythiad yn llwyddiannus a dechreuodd y cysylltydd:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Gwych: mae wedi'i sefydlu ac yn barod i fynd. Nawr gadewch i ni esgus bod yn ddefnyddiwr a chysylltu â Kafka, ac ar ôl hynny rydyn ni'n ychwanegu a newid cofnod yn y tabl:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Yn ein pwnc, bydd hyn yn cael ei arddangos fel a ganlyn:

JSON hir iawn gyda'n newidiadau

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Yn y ddau achos, mae'r cofnodion yn cynnwys allwedd (PK) y cofnod a newidiwyd, a hanfod y newidiadau: beth oedd y cofnod cyn a beth ddaeth ar ei ôl.

  • Yn achos INSERT: gwerth cyn (before) yn gyfartal nullac yna'r llinyn a fewnosodwyd.
  • Yn achos UPDATE: yn payload.before mae cyflwr blaenorol y rhes yn cael ei arddangos, ac yn payload.after - newydd gyda hanfod newid.

2.2 MongoDB

Mae'r cysylltydd hwn yn defnyddio'r mecanwaith ail-greu MongoDB safonol, gan ddarllen gwybodaeth o oplog prif nod DBMS.

Yn yr un modd â'r cysylltydd a ddisgrifiwyd eisoes ar gyfer PgSQL, yma, hefyd, ar y cychwyn cyntaf, cymerir y ciplun data cynradd, ac ar ôl hynny mae'r cysylltydd yn newid i fodd darllen oplog.

Enghraifft ffurfweddu:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Fel y gwelwch, nid oes unrhyw opsiynau newydd o gymharu â'r enghraifft flaenorol, ond dim ond nifer yr opsiynau sy'n gyfrifol am gysylltu â'r gronfa ddata a'u rhagddodiaid sydd wedi'u lleihau.

Gosodiadau transforms y tro hwn maen nhw'n gwneud y canlynol: trowch enw'r pwnc targed o'r cynllun <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

goddefgarwch fai

Mae mater goddefgarwch namau ac argaeledd uchel yn ein hamser yn fwy difrifol nag erioed - yn enwedig pan fyddwn yn siarad am ddata a thrafodion, ac nid yw olrhain newid data ar y llinell ochr yn y mater hwn. Gadewch i ni edrych ar yr hyn a all fynd o'i le mewn egwyddor a beth fydd yn digwydd i Debezium ym mhob achos.

Mae tri opsiwn optio allan:

  1. Methiant Kafka Connect. Os yw Connect wedi'i ffurfweddu i weithio yn y modd dosbarthedig, mae hyn yn ei gwneud yn ofynnol i weithwyr lluosog osod yr un grŵp.id. Yna, os bydd un ohonynt yn methu, bydd y cysylltydd yn cael ei ailgychwyn ar y gweithiwr arall ac yn parhau i ddarllen o'r sefyllfa ymroddedig olaf yn y pwnc yn Kafka.
  2. Colli cysylltedd â chlwstwr Kafka. Bydd y cysylltydd yn rhoi'r gorau i ddarllen yn y safle y methodd ei anfon at Kafka ac o bryd i'w gilydd yn ceisio ei ail-anfon nes bod yr ymgais yn llwyddo.
  3. Ffynhonnell data ddim ar gael. Bydd y cysylltydd yn ceisio ailgysylltu â'r ffynhonnell yn ôl y ffurfweddiad. Y rhagosodiad yw 16 ymgais gan ddefnyddio wrth gefn esbonyddol. Ar ôl yr 16eg ymgais aflwyddiannus, bydd y dasg yn cael ei nodi fel wedi methu a bydd angen ei ailgychwyn â llaw trwy ryngwyneb Kafka Connect REST.
    • Yn achos PostgreSQL ni fydd data yn cael ei golli, oherwydd bydd defnyddio slotiau atgynhyrchu yn atal dileu ffeiliau WAL nad ydynt yn cael eu darllen gan y cysylltydd. Yn yr achos hwn, mae yna anfantais: os amharir ar y cysylltedd rhwydwaith rhwng y cysylltydd a'r DBMS am amser hir, mae'n debygol y bydd y gofod disg yn rhedeg allan, a gall hyn arwain at fethiant y DBMS cyfan.
    • Yn achos MySQL gall ffeiliau binlog gael eu cylchdroi gan y DBMS ei hun cyn adfer cysylltedd. Bydd hyn yn achosi i'r cysylltydd fynd i'r cyflwr methu, a bydd angen iddo ailgychwyn yn y modd ciplun cychwynnol i barhau i ddarllen o binlogs i adfer gweithrediad arferol.
    • Про MongoDB. Mae'r ddogfennaeth yn dweud: mae ymddygiad y cysylltydd rhag ofn bod y ffeiliau log/oplog wedi'u dileu ac na all y cysylltydd barhau i ddarllen o'r safle lle gadawodd i ffwrdd yr un peth ar gyfer pob DBMS. Mae'n gorwedd yn y ffaith y bydd y cysylltydd yn mynd i'r wladwriaeth wedi methu a bydd angen ailgychwyn yn y modd cipolwg cychwynnol.

      Fodd bynnag, mae yna eithriadau. Pe bai'r cysylltydd mewn cyflwr datgysylltu am amser hir (neu na allai gyrraedd enghraifft MongoDB), a bod yr oplog wedi'i gylchdroi yn ystod yr amser hwn, yna pan fydd y cysylltiad yn cael ei adfer, bydd y cysylltydd yn parhau i ddarllen data o'r safle cyntaf sydd ar gael yn dawel. , a dyna pam mae rhai o'r data yn Kafka dim bydd taro.

Casgliad

Debezium yw fy mhrofiad cyntaf gyda systemau CDC ac mae wedi bod yn gadarnhaol iawn ar y cyfan. Llwgrwobrwyodd y prosiect gefnogaeth y prif DBMS, rhwyddineb cyfluniad, cefnogaeth ar gyfer clystyru a chymuned weithgar. I'r rhai sydd â diddordeb mewn ymarfer, rwy'n argymell eich bod yn darllen y canllawiau ar gyfer Cyswllt Kafka и Debezium.

O'i gymharu â'r cysylltydd JDBC ar gyfer Kafka Connect, prif fantais Debezium yw bod newidiadau yn cael eu darllen o'r logiau DBMS, sy'n caniatáu derbyn data heb fawr o oedi. Mae'r JDBC Connector (a ddarperir gan Kafka Connect) yn cwestiynu'r tabl tracio ar gyfnod penodol ac (am yr un rheswm) nid yw'n cynhyrchu negeseuon pan fydd data'n cael ei ddileu (sut allwch chi ymholi am ddata nad yw yno?).

I ddatrys problemau tebyg, gallwch roi sylw i'r atebion canlynol (yn ogystal â Debezium):

PS

Darllenwch hefyd ar ein blog:

Ffynhonnell: hab.com

Ychwanegu sylw