A’ toirt a-steach Debezium - CDC airson Apache Kafka

A’ toirt a-steach Debezium - CDC airson Apache Kafka

Anns an obair agam, bidh mi gu tric a’ tighinn tarsainn air fuasglaidhean teignigeach / bathar-bog ùr, a tha fiosrachadh mu dheidhinn a tha caran gann air an eadar-lìn Ruiseanach. Leis an artaigil seo, feuchaidh mi ri aon bheàrn mar sin a lìonadh le eisimpleir bhon chleachdadh a rinn mi o chionn ghoirid, nuair a dh’ fheumadh mi tachartasan CDC a chuir air dòigh bho dhà DBMS mòr-chòrdte (PostgreSQL agus MongoDB) gu cruinneachadh Kafka a’ cleachdadh Debezium. Tha mi an dòchas gum bi an artaigil ath-bhreithneachaidh seo, a nochd mar thoradh air an obair a chaidh a dhèanamh, feumail do dhaoine eile.

Dè a th’ ann an Debezium agus CDC san fharsaingeachd?

Debezium - Riochdaire roinn bathar-bog CDC (Glac atharrachadh dàta), no nas mionaidiche, is e seata de luchd-ceangail a th’ ann airson diofar DBMS a tha co-chosmhail ri frèam Apache Kafka Connect.

seo pròiseact le còd fosgailte, le cead fo cheadachas Apache v2.0 agus le taic bho Red Hat. Tha leasachadh air a bhith a’ dol air adhart bho 2016 agus aig an àm seo tha e a’ toirt taic oifigeil dha na DBMS a leanas: MySQL, PostgreSQL, MongoDB, SQL Server. Tha luchd-ceangail ann cuideachd airson Cassandra agus Oracle, ach tha iad an-dràsta ann an inbhe “ruigsinneachd tràth”, agus chan eil fiosan ùra a’ gealltainn co-fhreagarrachd air ais.

Ma nì sinn coimeas eadar CDC agus an dòigh-obrach thraidiseanta (nuair a leughas an tagradh dàta bhon DBMS gu dìreach), tha na prìomh bhuannachdan aige a’ toirt a-steach buileachadh sruthadh atharrachadh dàta aig ìre loidhne le latency ìosal, earbsachd àrd agus ruigsinneachd. Tha an dà phuing mu dheireadh air an coileanadh le bhith a’ cleachdadh cruinneachadh Kafka mar stòr airson tachartasan CDC.

Cuideachd, tha na buannachdan a 'toirt a-steach gu bheil aon mhodail air a chleachdadh airson tachartasan a stòradh, agus mar sin chan fheum an tagradh mu dheireadh a bhith draghail mu na h-nursaichean a th' ann a bhith ag obrachadh diofar DBMS.

Mu dheireadh, le bhith a ’cleachdadh broker teachdaireachd a’ fosgladh cothrom airson sgèileadh còmhnard de thagraidhean a bhios a ’cumail sùil air atharrachaidhean ann an dàta. Aig an aon àm, tha a 'bhuaidh air an stòr dàta air a lùghdachadh, oir gheibhear dàta chan ann dìreach bhon DBMS, ach bho bhuidheann Kafka.

Mu ailtireachd Debezium

Tha cleachdadh Debezium a’ tighinn sìos chun sgeama shìmplidh seo:

DBMS (mar stòr dàta) → ceanglaiche ann an Kafka Connect → Apache Kafka → neach-cleachdaidh

Mar eisimpleir, bheir mi diagram bho làrach-lìn a’ phròiseict:

A’ toirt a-steach Debezium - CDC airson Apache Kafka

Ach, chan eil an sgeama seo a 'còrdadh rium gu mòr, oir tha e coltach nach eil e comasach ach ceanglaiche sinc.

Ann an da-rìribh, tha an suidheachadh eadar-dhealaichte: lìonadh do Data Lake (ceangal mu dheireadh san dealbh gu h-àrd) chan e seo an aon dòigh air Debezium a chleachdadh. Faodar tachartasan a chuirear gu Apache Kafka a chleachdadh leis na tagraidhean agad gus dèiligeadh ri diofar shuidheachaidhean. Mar eisimpleir:

  • toirt air falbh dàta neo-iomchaidh bhon tasgadan;
  • a 'cur fiosan;
  • ùrachadh clàr-amais rannsachaidh;
  • seòrsa de chlàran sgrùdaidh;
  • ...

Air eagal gu bheil tagradh Java agad agus nach eil feum / comasach air cruinneachadh Kafka a chleachdadh, tha cothrom ann cuideachd obrachadh troimhe freumhaichte - ceanglaiche. Is e a ’bhuannachd follaiseach gu bheil e a’ cur às don fheum air bun-structar a bharrachd (ann an cruth ceanglaiche agus Kafka). Ach, cha deach am fuasgladh seo a mholadh bho dhreach 1.1 agus chan eilear ga mholadh airson a chleachdadh (dh’ fhaodadh taic air a shon a bhith air a thoirt air falbh ann am fiosan san àm ri teachd).

Bruidhnidh an artaigil seo air an ailtireachd a mhol luchd-leasachaidh, a bheir seachad fulangas sgàinidhean agus scalability.

Suidheachadh ceangail

Gus tòiseachadh air lorg atharrachaidhean anns an luach - dàta as cudromaiche - feumaidh sinn:

  1. stòr dàta, a dh’ fhaodadh a bhith MySQL a’ tòiseachadh bho dhreach 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (làn liosta);
  2. brabhsair Apache Kafka;
  3. Eisimpleir Kafka Connect (tionndaidhean 1.x, 2.x);
  4. ceanglaiche Debezium a rèiteachadh.

Obraich air a’ chiad dà phuing, i.e. tha am pròiseas airson DBMS agus Apache Kafka a chuir a-steach taobh a-muigh raon an artaigil. Ach, dhaibhsan a tha airson a h-uile càil a chuir a-steach ann am bogsa gainmhich, tha fear deiseil anns an stòr oifigeil le eisimpleirean docker-compose.yaml.

Cuiridh sinn fòcas nas mionaidiche air an dà phuing mu dheireadh.

0. Ceangal Kafka

An seo agus nas fhaide san artaigil, tha na h-eisimpleirean rèiteachaidh uile air an deasbad ann an co-theacsa ìomhaigh Docker air a sgaoileadh le luchd-leasachaidh Debezium. Tha na faidhlichean plugan riatanach (luchd-ceangail) ann agus bheir e seachad rèiteachadh Kafka Connect a’ cleachdadh caochladairean àrainneachd.

Ma tha thu am beachd Kafka Connect bho Confluent a chleachdadh, feumaidh tu na plugins de na ceanglaichean riatanach thu fhèin a chur ris an eòlaire a tha air a shònrachadh ann an plugin.path no air a shuidheachadh tro chaochladair àrainneachd CLASSPATH. Tha na roghainnean airson neach-obrach Kafka Connect agus luchd-ceangail air am mìneachadh tro fhaidhlichean rèiteachaidh a thèid a thoirt seachad mar argamaidean gu àithne tòiseachaidh an neach-obrach. Airson tuilleadh fiosrachaidh, faic sgrìobhainnean.

Tha am pròiseas gu lèir airson stèidheachadh Debeizum anns an dreach ceangail air a dhèanamh ann an dà ìre. Beachdaichidh sinn air gach fear dhiubh:

1. A 'stèidheachadh frèam Kafka Connect

Gus dàta a shruthladh gu cruinneachadh Apache Kafka, tha crìochan sònraichte air an suidheachadh ann am frèam Kafka Connect, leithid:

  • crìochan airson ceangal ris a’ bhuidheann,
  • ainmean chuspairean anns am bi rèiteachadh a’ cheangail fhèin air a stòradh,
  • ainm na buidhne anns a bheil an ceanglaiche a’ ruith (gun fhios nach cleachd thu modh sgaoilte).

Tha ìomhaigh oifigeil Docker den phròiseact a’ toirt taic do rèiteachadh a’ cleachdadh caochladairean àrainneachd - is e seo a chleachdas sinn. Mar sin, luchdaich sìos an dealbh:

docker pull debezium/connect

Tha an seata as lugha de chaochladairean àrainneachd a dh’ fheumar gus an ceanglaiche a ruith mar a leanas:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - liosta tùsail de luchd-frithealaidh brabhsair Kafka gus liosta iomlan de bhuill brabhsair fhaighinn;
  • OFFSET_STORAGE_TOPIC=connector-offsets - cuspair airson àiteachan a stòradh far a bheil an ceanglaiche suidhichte an-dràsta;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - cuspair airson inbhe a ’cheangail agus na gnìomhan aige a stòradh;
  • CONFIG_STORAGE_TOPIC=connector-config - cuspair airson stòradh dàta rèiteachaidh ceangail agus na gnìomhan aige;
  • GROUP_ID=1 - aithnichear a’ bhuidheann de luchd-obrach air am faodar an obair ceangail a choileanadh; riatanach nuair a thathar a’ cleachdadh sgaoileadh (air a chuairteachadh) modh.

Bidh sinn a’ tòiseachadh a’ bhogsa leis na caochladairean seo:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota mu Avro

Gu gnàthach, bidh Debezium a’ sgrìobhadh dàta ann an cruth JSON, a tha iomchaidh airson bogsaichean gainmhich agus meudan beaga de dhàta, ach a dh’ fhaodadh a bhith na dhuilgheadas ann an stòran-dàta làn luchdaichte. Is e roghainn eile an àite an inneal-tionndaidh JSON teachdaireachdan a chuir gu sreath a’ cleachdadh Avro gu cruth binary, a lughdaicheas an luchd air an fho-shiostam I / O ann an Apache Kafka.

Gus Avro a chleachdadh feumaidh tu fear air leth a chleachdadh clàr-sgeama (airson sgeamaichean a stòradh). Bidh na caochladairean airson an inneal-tionndaidh a’ coimhead mar seo:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Tha mion-fhiosrachadh mu bhith a’ cleachdadh Avro agus a’ stèidheachadh clàr air a shon taobh a-muigh farsaingeachd an artaigil - nas fhaide, airson soilleireachd, cleachdaidh sinn JSON.

2. A 'stèidheachadh an ceanglaiche fhèin

A-nis faodaidh tu a dhol dìreach gu rèiteachadh an ceanglaiche fhèin, a leughas dàta bhon stòr.

Bheir sinn sùil air an eisimpleir de luchd-ceangail airson dà DBMS: PostgreSQL agus MongoDB, air a bheil eòlas agam agus air a bheil eadar-dhealachaidhean (ged a tha e beag, ach ann an cuid de chùisean cudromach!).

Tha an rèiteachadh air a mhìneachadh ann an comharradh JSON agus air a luchdachadh suas gu Kafka Connect a’ cleachdadh iarrtas POST.

2.1. PostgreSQL

Eisimpleir de rèiteachadh ceangail airson PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Tha prionnsabal obrachadh a 'cheangail an dèidh an stèidheachadh seo gu math sìmplidh:

  • Aig a 'chiad thoiseach, bidh e a' ceangal ris an stòr-dàta a tha air a shònrachadh anns an rèiteachadh agus a 'tòiseachadh sa mhodh dealbh tùsail, a’ cur gu Kafka a’ chiad sheata de dhàta a fhuaireadh leis a’ chumha SELECT * FROM table_name.
  • Às deidh an tòiseachadh a bhith deiseil, thèid an ceanglaiche a-steach don mhodh gus atharrachaidhean bho fhaidhlichean PostgreSQL WAL a leughadh.

Mu na roghainnean a chaidh a chleachdadh:

  • name - ainm a 'cheangail airson a bheil an rèiteachadh a tha air a mhìneachadh gu h-ìosal air a chleachdadh; san àm ri teachd, tha an t-ainm seo air a chleachdadh gus obrachadh leis a’ cheangail (ie coimhead air an inbhe / ath-thòisich / ùraich an rèiteachadh) tro Kafka Connect REST API;
  • connector.class - an clas ceangail DBMS a chleachdas an ceanglaiche rèiteachaidh;
  • plugin.name - ainm a’ phlug airson dì-chòdachadh loidsigeach air dàta bho fhaidhlichean WAL. Ri fhaighinn airson taghadh wal2json, decoderbuffs и pgoutput. Feumaidh a’ chiad dhà leudachadh iomchaidh a chuir a-steach san DBMS, agus pgoutput airson PostgreSQL dreach 10 agus nas àirde chan eil feum air làimhseachadh a bharrachd;
  • database.* - roghainnean airson ceangal ris an stòr-dàta, càite database.server.name - ainm an eisimpleir PostgreSQL a chaidh a chleachdadh gus ainm a’ chuspair a chruthachadh ann am buidheann Kafka;
  • table.include.list - liosta de chlàran anns a bheil sinn airson sùil a chumail air atharrachaidhean; air a shònrachadh ann an cruth schema.table_name; chan urrainnear a chleachdadh còmhla ri table.exclude.list;
  • heartbeat.interval.ms - eadar-ama (ann am milliseconds) leis am bi an ceanglaiche a’ cur teachdaireachdan buille cridhe gu cuspair sònraichte;
  • heartbeat.action.query - iarrtas a thèid a chuir gu bàs nuair a chuireas tu a h-uile teachdaireachd buille cridhe (tha an roghainn air nochdadh bhon dreach 1.1);
  • slot.name - ainm an t-slot mac-samhail a chleachdas an ceanglaiche;
  • publication.name — Ainm foillseachaidhean ann am PostgreSQL a bhios an ceanglaiche a’ cleachdadh. Mura h-eil e ann, feuchaidh Debezium ri chruthachadh. Mura h-eil còraichean gu leòr aig a’ chleachdaiche fon deach an ceangal a dhèanamh airson a’ ghnìomh seo, falbhaidh an ceanglaiche le mearachd;
  • transforms a’ dearbhadh mar a dh’ atharraicheas tu ainm a’ chuspair targaid:
    • transforms.AddPrefix.type a’ nochdadh gun cleachd sinn abairtean cunbhalach;
    • transforms.AddPrefix.regex - masg a bhios ag ath-mhìneachadh ainm a’ chuspair targaid;
    • transforms.AddPrefix.replacement - gu dìreach na tha sinn ag ath-mhìneachadh.

Tuilleadh mu bhuille-cridhe agus cruth-atharrachadh

Gu gnàthach, bidh an ceanglaiche a’ cur dàta gu Kafka airson gach gnothach dealasach, agus a’ sgrìobhadh a LSN (Àireamh Seicheamh Log) gu cuspair na seirbheis offset. Ach dè a thachras ma tha an ceanglaiche air a rèiteachadh gus nach leugh e an stòr-dàta gu lèir, ach dìreach pàirt de na clàran aige (anns nach bi ùrachaidhean dàta a ’tachairt gu tric)?

  • Leughaidh an ceanglaiche faidhlichean WAL agus chan lorg e gealltanasan malairt annta gu na bùird air a bheil e a’ cumail sùil.
  • Mar sin, chan ùraich e an suidheachadh làithreach aige an dàrna cuid sa chuspair no san t-slot ath-riochdachadh.
  • Mar thoradh air seo, thèid faidhlichean WAL a chumail air diosc agus is dòcha gun ruith iad a-mach à àite diosc.

Agus an seo thig roghainnean gu teasairginn. heartbeat.interval.ms и heartbeat.action.query. Le bhith a’ cleachdadh nan roghainnean sin ann an càraidean bidh e comasach iarrtas a chuir an gnìomh airson dàta atharrachadh ann an clàr air leth gach uair a thèid teachdaireachd buille-cridhe a chuir. Mar sin, tha an LSN air a bheil an ceanglaiche suidhichte an-dràsta (anns an t-slot mac-samhail) air ùrachadh gu cunbhalach. Leigidh seo leis an DBMS faidhlichean WAL a thoirt air falbh nach eil a dhìth tuilleadh. Faodaidh tu barrachd ionnsachadh mu mar a tha na roghainnean ag obair ann sgrìobhainnean.

Is e roghainn eile a tha airidh air aire nas mionaidiche transforms. Ged a tha e nas motha mu dheidhinn goireasachd agus bòidhchead ...

Gu gnàthach, bidh Debezium a’ cruthachadh chuspairean a’ cleachdadh a’ phoileasaidh ainmeachaidh a leanas: serverName.schemaName.tableName. Is dòcha nach bi seo an-còmhnaidh goireasach. Roghainnean transforms a’ cleachdadh abairtean cunbhalach, faodaidh tu liosta de chlàran a mhìneachadh aig am feum tachartasan a bhith air an stiùireadh gu cuspair le ainm sònraichte.

Anns an rèiteachadh againn taing do transforms bidh na leanas a’ tachairt: thèid a h-uile tachartas CDC bhon stòr-dàta rianail chun chuspair leis an ainm data.cdc.dbname. Rud eile (às aonais na roghainnean sin), chruthaicheadh ​​Debezium cuspair gu bunaiteach airson gach clàr den fhoirm: pg-dev.public.<table_name>.

Cuingealachaidhean ceangail

Aig deireadh an tuairisgeul air rèiteachadh an ceanglaiche airson PostgreSQL, is fhiach bruidhinn mu na feartan / crìochan obrach a leanas:

  1. Tha gnìomhachd ceangail airson PostgreSQL an urra ris a’ bhun-bheachd de chòdachadh loidsigeach. Uime sin esan chan eil e a’ cumail sùil air iarrtasan airson structar an stòr-dàta atharrachadh (DDL) - a rèir sin, cha bhi an dàta seo anns na cuspairean.
  2. Leis gu bheilear a’ cleachdadh sliotan mac-samhail, tha e comasach ceangal a cheangal a-mhàin gu prìomh eisimpleir DBMS.
  3. Ma tha còraichean leughaidh a-mhàin aig an neach-cleachdaidh fon bheil an ceanglaiche a’ ceangal ris an stòr-dàta, an uairsin ron chiad fhoillseachadh, feumaidh tu slot mac-samhail a chruthachadh le làimh agus fhoillseachadh don stòr-dàta.

A 'cur a-steach Configuration

Mar sin luchdaich sinn ar rèiteachadh a-steach don cheangal:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Nì sinn cinnteach gun robh an luchdachadh sìos soirbheachail agus thòisich an ceanglaiche:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Sgoinneil: tha e deiseil agus deiseil airson a dhol. A-nis leig dhuinn a bhith nad neach-cleachdaidh agus ceangal a dhèanamh ri Kafka, às deidh sin cuiridh sinn ris agus atharraich sinn inntrig sa chlàr:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Anns a 'chuspair againn, bidh seo air a thaisbeanadh mar a leanas:

JSON glè fhada leis na h-atharrachaidhean againn

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Anns gach cùis, tha na clàran a 'gabhail a-steach an iuchair (PK) den chlàr a chaidh atharrachadh, agus fìor bhrìgh nan atharrachaidhean: dè bha an clàr roimhe agus dè a thàinig às a dhèidh.

  • Ann an cùis INSERT: luach roimhe (before) co-ionann nullair a leantainn leis an t-sreang a chaidh a chuir a-steach.
  • Ann an cùis UPDATE: aig payload.before tha staid an t-sreath roimhe air a thaisbeanadh, agus ann an payload.after - ùr le brìgh an atharrachaidh.

2.2 MongoDB

Bidh an ceanglaiche seo a’ cleachdadh an uidheamachd mac-samhail àbhaisteach MongoDB, a’ leughadh fiosrachadh bho oplog prìomh nód DBMS.

Mar an ceudna ris a’ cheangail a chaidh a mhìneachadh mar-thà airson PgSQL, an seo, cuideachd, aig a’ chiad dol-a-mach, thathas a’ togail a’ phrìomh dhealbh dàta, às deidh sin bidh an ceanglaiche ag atharrachadh gu modh leughaidh oplog.

Eisimpleir rèiteachaidh:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Mar a chì thu, chan eil roghainnean ùra ann an seo an taca ris an eisimpleir roimhe, ach dìreach chaidh an àireamh de roghainnean a tha an urra ri ceangal ris an stòr-dàta agus na ro-leasachain aca a lughdachadh.

Roghainnean transforms an turas seo nì iad na leanas: tionndaidh ainm a’ chuspair targaid bhon sgeama <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fulangas lochd

Tha cùis fulangas sgàinidhean agus ruigsinneachd àrd nar n-ùine nas gèire na bha e a-riamh - gu sònraichte nuair a bhios sinn a’ bruidhinn mu dhàta agus gnothaichean, agus nach eil lorg atharrachadh dàta air an taobh sa chùis seo. Bheir sinn sùil air dè as urrainn a dhol ceàrr ann am prionnsapal agus dè a thachras do Debezium anns gach cùis.

Tha trì roghainnean tarraing a-mach ann:

  1. Dh'fhàillig Kafka Connect. Ma tha Connect air a rèiteachadh gus obrachadh ann am modh sgaoilte, feumaidh seo grunn luchd-obrach an aon bhuidheann.id a shuidheachadh. An uairsin, ma dh ’fhailicheas aon dhiubh, thèid an ceanglaiche ath-thòiseachadh air an neach-obrach eile agus lean air adhart a’ leughadh bhon t-suidheachadh dealasach mu dheireadh sa chuspair ann an Kafka.
  2. Caill ceangal le buidheann Kafka. Bidh an ceanglaiche dìreach a’ stad air leughadh aig an t-suidheachadh nach do chuir e gu Kafka agus bho àm gu àm feuchaidh e ri ath-chuir air ais gus an soirbhich leis an oidhirp.
  3. Stòr dàta nach eil ri fhaighinn. Feuchaidh an ceanglaiche ri ath-cheangal ris an stòr a rèir an rèiteachaidh. Is e an roghainn àbhaisteach 16 oidhirpean a’ cleachdadh cùl-taic exponential. Às deidh an 16mh oidhirp air fàiligeadh, thèid an obair a chomharrachadh mar Dh'fhàillig agus feumar ath-thòiseachadh le làimh tro eadar-aghaidh Kafka Connect REST.
    • Ann an cùis PostgreSQL cha tèid an dàta a chall, oir le bhith a’ cleachdadh sliotan mac-samhail cuiridh sin casg air cuir às do fhaidhlichean WAL nach leugh an ceanglaiche. Anns a 'chùis seo, tha eas-bhuannachd ann: ma tha an ceangal lìonra eadar an ceanglaiche agus an DBMS air a bhriseadh airson ùine mhòr, tha teansa gum bi an t-àite diosg a' ruith a-mach, agus dh'fhaodadh seo leantainn gu fàilligeadh an DBMS gu lèir.
    • Ann an cùis MySQL faodar faidhlichean binlog a thionndadh leis an DBMS fhèin mus tèid ceangal a thoirt air ais. Bidh seo ag adhbhrachadh gun tèid an ceanglaiche a-steach don staid a dh’ fhàillig, agus feumaidh e ath-thòiseachadh sa mhodh dealbh tùsail gus leantainn air adhart a’ leughadh bho binlogs gus obrachadh àbhaisteach a thoirt air ais.
    • air a ' MongoDB. Tha na sgrìobhainnean ag ràdh: tha giùlan a’ cheangail air eagal ’s gun tèid na faidhlichean log/oplog a dhubhadh às agus nach urrainn don cheangalaiche cumail a’ leughadh bhon t-suidheachadh far an do dh’ fhalbh e mar an ceudna airson a h-uile DBMS. Tha e na laighe anns an fhìrinn gun tèid an ceanglaiche a-steach don stàit Dh'fhàillig agus bidh feum air ath-thòiseachadh sa mhodh dealbh tùsail.

      Ach, tha eisgeachdan ann. Ma chaidh an ceanglaiche a dhì-cheangal airson ùine mhòr (no nach b ’urrainn dha suidheachadh MongoDB a ruighinn), agus chaidh an oplog tro chuairteachadh rè na h-ùine seo, an uairsin nuair a thèid an ceangal ath-nuadhachadh, leanaidh an ceanglaiche gu socair a’ leughadh dàta bhon chiad suidheachadh a tha ri fhaighinn, is e sin as coireach gu bheil cuid den dàta ann an Kafka chan eil buailidh.

co-dhùnadh

Is e Debezium a’ chiad eòlas agam le siostaman CDC agus tha e air a bhith gu math dòchasach san fharsaingeachd. Bhris am pròiseact taic bhon phrìomh DBMS, cho furasta ‘s a bha e rèiteachaidh, taic airson cruinneachadh agus coimhearsnachd ghnìomhach. Dhaibhsan aig a bheil ùidh ann an cleachdadh, tha mi a’ moladh gun leugh thu an stiùireadh airson Ceangal Kafka и Debezium.

An coimeas ris a’ cheangal JDBC airson Kafka Connect, is e prìomh bhuannachd Debezium gu bheil atharrachaidhean air an leughadh bho logaichean DBMS, a leigeas le dàta fhaighinn le glè bheag de dh’ ùine. Bidh an JDBC Connector (bho Kafka Connect) a’ ceasnachadh a’ bhòrd sgrùdaichte aig àm stèidhichte agus (airson an aon adhbhar) cha bhith e a’ gineadh teachdaireachdan nuair a thèid dàta a dhubhadh às (ciamar a dh’ fhaighnicheas tu dàta nach eil ann?).

Gus fuasgladh fhaighinn air duilgheadasan coltach ris, faodaidh tu aire a thoirt do na fuasglaidhean a leanas (a bharrachd air Debezium):

PS

Leugh cuideachd air ar blog:

Source: www.habr.com

Cuir beachd ann