Ifihan Debezium - CDC fun Apache Kafka

Ifihan Debezium - CDC fun Apache Kafka

Ninu iṣẹ mi, Mo nigbagbogbo wa awọn solusan imọ-ẹrọ tuntun / awọn ọja sọfitiwia, alaye nipa eyiti o jẹ kuku ṣọwọn lori Intanẹẹti ti n sọ Russian. Pẹlu nkan yii, Emi yoo gbiyanju lati kun iru aafo kan pẹlu apẹẹrẹ lati adaṣe aipẹ mi, nigbati Mo nilo lati ṣeto fifiranṣẹ awọn iṣẹlẹ CDC lati awọn DBMS olokiki meji (PostgreSQL ati MongoDB) si iṣupọ Kafka nipa lilo Debezium. Mo nireti pe nkan atunyẹwo yii, eyiti o han bi abajade ti iṣẹ ti a ṣe, yoo wulo fun awọn miiran.

Kini Debezium ati CDC ni apapọ?

Debezium - Aṣoju ti ẹya sọfitiwia CDC (Yaworan iyipada data), tabi diẹ sii ni deede, o jẹ eto awọn asopọ fun ọpọlọpọ awọn DBMS ti o ni ibamu pẹlu ilana Apache Kafka Connect.

Eyi jẹ iṣẹ orisun ṣiṣi, iwe-aṣẹ labẹ Iwe-aṣẹ Apache v2.0 ati atilẹyin nipasẹ Red Hat. Idagbasoke ti nlọ lọwọ lati ọdun 2016 ati ni akoko ti o pese atilẹyin osise fun DBMS wọnyi: MySQL, PostgreSQL, MongoDB, SQL Server. Awọn asopọ tun wa fun Cassandra ati Oracle, ṣugbọn wọn wa lọwọlọwọ ni ipo “iwọle ni kutukutu”, ati awọn idasilẹ tuntun ko ṣe iṣeduro ibamu sẹhin.

Ti a ba ṣe afiwe CDC pẹlu ọna ibile (nigbati ohun elo ba ka data lati DBMS taara), lẹhinna awọn anfani akọkọ rẹ pẹlu imuse ti ṣiṣan iyipada data ni ipele ila pẹlu lairi kekere, igbẹkẹle giga ati wiwa. Awọn aaye meji ti o kẹhin jẹ aṣeyọri nipasẹ lilo iṣupọ Kafka kan bi ibi ipamọ fun awọn iṣẹlẹ CDC.

Pẹlupẹlu, awọn anfani pẹlu otitọ pe awoṣe kan ṣoṣo ni a lo lati tọju awọn iṣẹlẹ, nitorinaa ohun elo ikẹhin ko ni lati ṣe aibalẹ nipa awọn nuances ti ṣiṣẹ oriṣiriṣi DBMS.

Lakotan, lilo alagbata ifiranṣẹ kan ṣii aaye fun iwọn petele ti awọn ohun elo ti o tọpa awọn ayipada ninu data. Ni akoko kanna, ipa lori orisun data ti dinku, nitori data ko gba taara lati DBMS, ṣugbọn lati inu iṣupọ Kafka.

Nipa Debezium faaji

Lilo Debezium wa silẹ si ero ti o rọrun yii:

DBMS (bi orisun data) → asopo ni Kafka Sopọ → Apache Kafka → olumulo

Gẹgẹbi apejuwe, Emi yoo fun aworan kan lati oju opo wẹẹbu iṣẹ akanṣe:

Ifihan Debezium - CDC fun Apache Kafka

Bibẹẹkọ, Emi ko fẹran ero yii gaan, nitori o dabi pe asopo ifọwọ nikan ṣee ṣe.

Ni otitọ, ipo naa yatọ: kikun Adagun Data rẹ (ọna asopọ ti o kẹhin ninu aworan atọka loke) kii ṣe ọna kan ṣoṣo lati lo Debezium. Awọn iṣẹlẹ ti a firanṣẹ si Apache Kafka le ṣee lo nipasẹ awọn ohun elo rẹ lati koju ọpọlọpọ awọn ipo. Fun apere:

  • yiyọ data ti ko ṣe pataki lati kaṣe;
  • fifiranṣẹ awọn iwifunni;
  • awọn imudojuiwọn atọka wiwa;
  • diẹ ninu awọn iru ti se ayewo àkọọlẹ;
  • ...

Ni ọran ti o ba ni ohun elo Java ati pe ko si iwulo / o ṣeeṣe lati lo iṣupọ Kafka, o tun ṣee ṣe lati ṣiṣẹ nipasẹ ifibọ asopo ohun. Afikun ti o han gedegbe ni pe pẹlu rẹ o le kọ awọn amayederun afikun (ni ọna asopọ ati Kafka). Sibẹsibẹ, ojutu yii ti parẹ lati ẹya 1.1 ati pe ko ṣe iṣeduro fun lilo (o le yọkuro ni awọn idasilẹ ọjọ iwaju).

Nkan yii yoo jiroro lori faaji ti a ṣeduro nipasẹ awọn olupilẹṣẹ, eyiti o pese ifarada ẹbi ati iwọn.

Asopọmọra iṣeto ni

Lati bẹrẹ ipasẹ awọn ayipada ninu iye pataki julọ - data - a nilo:

  1. orisun data, eyiti o le jẹ MySQL ti o bẹrẹ lati ẹya 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (atokọ pipe);
  2. Apache Kafka iṣupọ
  3. Kafka Sopọ apẹẹrẹ (awọn ẹya 1.x, 2.x);
  4. tunto Debezium asopo ohun.

Ṣiṣẹ lori awọn aaye meji akọkọ, i.e. ilana fifi sori ẹrọ DBMS ati Apache Kafka kọja ipari ti nkan naa. Sibẹsibẹ, fun awọn ti o fẹ lati fi ohun gbogbo ranṣẹ sinu apoti iyanrin, ọkan ti a ti ṣetan wa ni ibi ipamọ osise pẹlu awọn apẹẹrẹ. docker-compose.yaml.

A yoo dojukọ awọn aaye meji ti o kẹhin ni awọn alaye diẹ sii.

0. Kafka Sopọ

Nibi ati nigbamii ninu nkan naa, gbogbo awọn apẹẹrẹ iṣeto ni a gbero ni aaye ti aworan Docker ti o pin nipasẹ awọn olupilẹṣẹ Debezium. O ni gbogbo awọn faili itanna pataki (awọn asopọ) ati pese iṣeto ni asopọ Kafka nipa lilo awọn oniyipada ayika.

Ti o ba pinnu lati lo Asopọ Kafka lati Confluent, iwọ yoo nilo lati ṣafikun awọn afikun ti awọn asopọ pataki funrararẹ si itọsọna ti o pato ninu plugin.path tabi ṣeto nipasẹ ohun ayika oniyipada CLASSPATH. Awọn eto fun oṣiṣẹ Kafka Sopọ ati awọn asopọ ti wa ni asọye nipasẹ awọn faili atunto ti o kọja bi awọn ariyanjiyan si aṣẹ oṣiṣẹ bẹrẹ. Fun alaye wo iwe.

Gbogbo ilana ti eto Debeizum ni ẹya asopo ohun ni a ṣe ni awọn ipele meji. Jẹ ki a ro ọkọọkan wọn:

1. Eto soke Kafka So ilana

Lati san data si iṣupọ Apache Kafka, awọn paramita kan pato ni a ṣeto sinu ilana Asopọ Kafka, gẹgẹbi:

  • awọn eto asopọ iṣupọ,
  • awọn orukọ ti awọn akọle ninu eyiti iṣeto ti asopo naa funrararẹ yoo wa ni ipamọ,
  • Orukọ ẹgbẹ ninu eyiti asopo naa nṣiṣẹ (ni ọran ti lilo ipo pinpin).

Aworan Docker osise ti iṣẹ akanṣe ṣe atilẹyin iṣeto ni lilo awọn oniyipada ayika - eyi ni ohun ti a yoo lo. Nitorinaa jẹ ki a ṣe igbasilẹ aworan naa:

docker pull debezium/connect

Eto ti o kere ju ti awọn oniyipada ayika ti o nilo lati ṣiṣẹ asopo jẹ bi atẹle:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - atokọ akọkọ ti awọn olupin iṣupọ Kafka lati gba atokọ pipe ti awọn ọmọ ẹgbẹ iṣupọ;
  • OFFSET_STORAGE_TOPIC=connector-offsets - koko-ọrọ kan fun titoju awọn ipo nibiti asopọ ti wa ni lọwọlọwọ;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - koko kan fun titoju ipo ti asopo ati awọn iṣẹ-ṣiṣe rẹ;
  • CONFIG_STORAGE_TOPIC=connector-config - koko kan fun titoju data iṣeto ni asopo ati awọn iṣẹ ṣiṣe rẹ;
  • GROUP_ID=1 - idamo ti ẹgbẹ awọn oṣiṣẹ lori eyiti o le ṣe iṣẹ-ṣiṣe asopọ; beere nigba lilo pin (pinpin) ijọba.

A bẹrẹ apoti pẹlu awọn oniyipada wọnyi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Akiyesi nipa Avro

Nipa aiyipada, Debezium kọ data ni ọna kika JSON, eyiti o jẹ itẹwọgba fun awọn apoti iyanrin ati awọn iwọn kekere ti data, ṣugbọn o le jẹ iṣoro ninu awọn apoti isura data ti kojọpọ. Yiyan si JSON oluyipada ni lati serialize awọn ifiranṣẹ nipa lilo Avro si ọna kika alakomeji, eyiti o dinku fifuye lori I / O subsystem ni Apache Kafka.

Lati lo Avro, o nilo lati ran lọtọ eto-igbasilẹ (fun titoju awọn eto). Awọn oniyipada fun oluyipada yoo dabi eyi:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Awọn alaye lori lilo Avro ati ṣiṣeto iforukọsilẹ fun o kọja ipari ti nkan naa - siwaju, fun mimọ, a yoo lo JSON.

2. Eto soke asopo ohun ara

Bayi o le lọ taara si iṣeto ti asopo naa funrararẹ, eyiti yoo ka data lati orisun.

Jẹ ki a wo apẹẹrẹ awọn asopọ fun DBMS meji: PostgreSQL ati MongoDB, eyiti Mo ni iriri ati eyiti awọn iyatọ wa (botilẹjẹpe kekere, ṣugbọn ni awọn ọran pataki!).

Iṣeto ni a ṣapejuwe ninu akiyesi JSON ati gbejade si Sopọ Kafka nipa lilo ibeere POST kan.

2.1. PostgreSQL

Iṣeto asopo apẹẹrẹ fun PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ilana iṣiṣẹ ti asopo lẹhin iṣeto yii jẹ ohun rọrun:

  • Ni ibẹrẹ akọkọ, o sopọ si ibi-ipamọ data pato ninu iṣeto ati bẹrẹ ni ipo naa aworan ibẹrẹ, fifiranṣẹ si Kafka ipilẹ akọkọ ti data ti o gba pẹlu ipo SELECT * FROM table_name.
  • Lẹhin ti ipilẹṣẹ ti pari, asopo naa wọ inu ipo kika awọn ayipada lati awọn faili PostgreSQL WAL.

Nipa awọn aṣayan ti a lo:

  • name - orukọ asopọ fun eyiti a ti lo iṣeto ti a ṣalaye ni isalẹ; ni ojo iwaju, orukọ yii ni a lo lati ṣiṣẹ pẹlu asopọ (ie wo ipo / tun bẹrẹ / imudojuiwọn iṣeto ni) nipasẹ Kafka Connect REST API;
  • connector.class - kilasi asopọ DBMS ti yoo ṣee lo nipasẹ asopo ti a tunto;
  • plugin.name ni orukọ ohun itanna fun iyipada ọgbọn ti data lati awọn faili WAL. Wa lati yan lati wal2json, decoderbuffs и pgoutput. Awọn meji akọkọ nilo fifi sori ẹrọ ti awọn amugbooro ti o yẹ ni DBMS, ati pgoutput fun ẹya PostgreSQL 10 ati giga julọ ko nilo awọn ifọwọyi ni afikun;
  • database.* - awọn aṣayan fun sisopọ si database, nibo database.server.name - orukọ apẹẹrẹ PostgreSQL ti a lo lati ṣe agbekalẹ orukọ koko-ọrọ ninu iṣupọ Kafka;
  • table.include.list - atokọ ti awọn tabili ninu eyiti a fẹ lati tọpa awọn ayipada; fun ni ọna kika schema.table_name; ko le ṣee lo pọ pẹlu table.exclude.list;
  • heartbeat.interval.ms - aarin (ni milliseconds) pẹlu eyiti asopo naa firanṣẹ awọn ifiranṣẹ lilu ọkan si koko pataki kan;
  • heartbeat.action.query - ibeere ti yoo ṣee ṣe nigbati fifiranṣẹ ifiranṣẹ lilu ọkan kọọkan (aṣayan naa ti han lati ẹya 1.1);
  • slot.name - orukọ ti Iho ẹda ti yoo ṣee lo nipasẹ asopo;
  • publication.name - Orukọ awọn iwe-aṣẹ ni PostgreSQL ti asopo nlo. Ti ko ba si tẹlẹ, Debezium yoo gbiyanju lati ṣẹda rẹ. Ti olumulo labẹ eyiti asopọ ti ṣe ko ni awọn ẹtọ to fun iṣe yii, asopo naa yoo jade pẹlu aṣiṣe kan;
  • transforms pinnu bi o ṣe le yi orukọ koko-ọrọ ibi-afẹde pada ni pato:
    • transforms.AddPrefix.type tọkasi pe a yoo lo awọn ọrọ deede;
    • transforms.AddPrefix.regex - boju-boju nipasẹ eyiti orukọ ti koko-ọrọ ibi-afẹde ti tun-tumọ;
    • transforms.AddPrefix.replacement - taara ohun ti a redefine.

Diẹ ẹ sii nipa lilu ọkan ati awọn iyipada

Nipa aiyipada, asopo naa nfi data ranṣẹ si Kafka fun idunadura olufaraji kọọkan, ati kọwe LSN rẹ (Nọmba Ọna Wọle) si koko iṣẹ naa. offset. Ṣugbọn kini o ṣẹlẹ ti o ba tunto asopo naa lati ka kii ṣe gbogbo data data, ṣugbọn apakan nikan ti awọn tabili rẹ (ninu eyiti data ti ni imudojuiwọn nigbagbogbo)?

  • Asopọmọra yoo ka awọn faili WAL ati pe ko rii awọn adehun iṣowo ninu wọn si awọn tabili ti o ṣe abojuto.
  • Nitorinaa, kii yoo ṣe imudojuiwọn ipo lọwọlọwọ boya ni koko tabi ni Iho ẹda.
  • Eyi, ni ọna, yoo fa ki awọn faili WAL jẹ "di" lori disk ati pe yoo pari ni aaye disk.

Ati nibi awọn aṣayan wa si igbala. heartbeat.interval.ms и heartbeat.action.query. Lilo awọn aṣayan wọnyi ni awọn orisii jẹ ki o ṣee ṣe lati ṣe ibeere kan lati yi data pada ni tabili lọtọ ni gbogbo igba ti ifiranṣẹ ikọlu ọkan ba ti firanṣẹ. Nitorinaa, LSN lori eyiti asopo naa wa lọwọlọwọ (ninu iho isọdọtun) ti ni imudojuiwọn nigbagbogbo. Eyi ngbanilaaye DBMS lati yọ awọn faili WAL ti ko nilo mọ. Fun alaye siwaju sii lori bi awọn aṣayan ṣiṣẹ, wo iwe.

Aṣayan miiran ti o yẹ akiyesi isunmọ ni transforms. Botilẹjẹpe o jẹ diẹ sii nipa irọrun ati ẹwa…

Nipa aiyipada, Debezium ṣẹda awọn koko-ọrọ nipa lilo ilana isọkọ atẹle: serverName.schemaName.tableName. Eyi le ma rọrun nigbagbogbo. Awọn aṣayan transforms lilo awọn ikosile deede, o le ṣalaye atokọ ti awọn tabili ti awọn iṣẹlẹ wọn nilo lati lọ si koko-ọrọ pẹlu orukọ kan pato.

Ni wa iṣeto ni ọpẹ si transforms atẹle naa ṣẹlẹ: gbogbo awọn iṣẹlẹ CDC lati ibi ipamọ data tọpinpin yoo lọ si koko-ọrọ pẹlu orukọ naa data.cdc.dbname. Bibẹẹkọ (laisi awọn eto wọnyi), Debezium yoo nipasẹ aiyipada ṣẹda koko kan fun tabili kọọkan ti fọọmu naa: pg-dev.public.<table_name>.

Awọn idiwọn asopọ

Ni ipari apejuwe ti iṣeto asopo fun PostgreSQL, o tọ lati sọrọ nipa awọn ẹya wọnyi / awọn idiwọn ti iṣẹ rẹ:

  1. Iṣẹ-ṣiṣe asopo fun PostgreSQL da lori ero ti iyipada ọgbọn. Nitorina oun ko tọpinpin awọn ibeere lati yi eto data data pada (DDL) - ni ibamu, data yii kii yoo wa ninu awọn akọle.
  2. Niwọn igba ti a ti lo awọn iho isọdọtun, asopọ ti asopo naa ṣee ṣe Nikan si apẹẹrẹ DBMS titunto si.
  3. Ti olumulo labẹ eyiti asopọ asopọ si ibi ipamọ data ni awọn ẹtọ kika-nikan, lẹhinna ṣaaju ifilọlẹ akọkọ, iwọ yoo nilo lati ṣẹda iho isọdọtun pẹlu ọwọ ati gbejade si data data.

Nbere a iṣeto ni

Nitorinaa jẹ ki a gbe iṣeto wa sinu asopo:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

A ṣayẹwo pe igbasilẹ naa ṣaṣeyọri ati pe asopo naa bẹrẹ:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Nla: o ti ṣeto ati setan lati lọ. Bayi jẹ ki a dibọn lati jẹ alabara ki o sopọ si Kafka, lẹhin eyi a ṣafikun ati yi titẹsi kan pada ninu tabili:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Ninu koko wa, eyi yoo han bi atẹle:

JSON pipẹ pupọ pẹlu awọn ayipada wa

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Ni awọn ọran mejeeji, awọn igbasilẹ ni bọtini (PK) ti igbasilẹ ti o yipada, ati pataki ti awọn iyipada: kini igbasilẹ naa ṣaaju ati kini o di lẹhin.

  • Ninu ọran ti INSERT: iye ṣaaju (before) dọgba nullatẹle nipa okun ti a fi sii.
  • Ninu ọran ti UPDATE: ninu payload.before ipo ti tẹlẹ ti ila ti han, ati ni payload.after - titun pẹlu awọn lodi ti ayipada.

2.2 MongoDB

Asopọmọra yii nlo ẹrọ isọdọtun MongoDB boṣewa, alaye kika lati oplog ti ipade akọkọ DBMS.

Bakanna si asopo ti a ti ṣapejuwe tẹlẹ fun PgSQL, nibi, paapaa, ni ibẹrẹ akọkọ, aworan data akọkọ ti wa ni ya, lẹhin eyi asopo naa yipada si ipo kika oplog.

Apẹẹrẹ iṣeto:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Bii o ti le rii, ko si awọn aṣayan tuntun ni akawe si apẹẹrẹ iṣaaju, ṣugbọn nọmba awọn aṣayan nikan ti o ni iduro fun sisopọ si ibi ipamọ data ati awọn ami-iṣaaju wọn ti dinku.

Eto transforms ni akoko yii wọn ṣe atẹle naa: yi orukọ koko-ọrọ ibi-afẹde pada lati ero naa <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

ifarada ẹbi

Ọrọ ti ifarada ẹbi ati wiwa giga ni akoko wa jẹ nla ju igbagbogbo lọ - paapaa nigba ti a ba sọrọ nipa data ati awọn iṣowo, ati ipasẹ iyipada data ko si ni awọn ẹgbẹ ninu ọran yii. Jẹ ki a wo kini o le jẹ aṣiṣe ni ipilẹ ati kini yoo ṣẹlẹ si Debezium ni ọran kọọkan.

Awọn aṣayan ijade mẹta wa:

  1. Ikuna Kafka Sopọ. Ti o ba tunto Sopọ lati ṣiṣẹ ni ipo pinpin, eyi nilo ọpọlọpọ awọn oṣiṣẹ lati ṣeto group.id kanna. Lẹhinna, ti ọkan ninu wọn ba kuna, asopo naa yoo tun bẹrẹ lori oṣiṣẹ miiran ati tẹsiwaju kika lati ipo igbẹhin ti o kẹhin ninu koko-ọrọ ni Kafka.
  2. Isonu ti Asopọmọra pẹlu iṣupọ Kafka. Asopọmọra yoo da kika kika nirọrun ni ipo ti o kuna lati firanṣẹ si Kafka ati lorekore gbiyanju lati fi ranṣẹ titi igbiyanju naa yoo fi ṣaṣeyọri.
  3. Orisun data ko si. Asopọmọra yoo gbiyanju lati tun sopọ si orisun ni ibamu si iṣeto. Aiyipada jẹ awọn igbiyanju 16 ni lilo apọju backoff. Lẹhin igbiyanju 16th kuna, iṣẹ-ṣiṣe yoo jẹ samisi bi kuna ati pe yoo nilo lati tun bẹrẹ pẹlu ọwọ nipasẹ wiwo Kafka Connect REST.
    • Ninu ọran ti PostgreSQL data yoo wa ko le sọnu, nitori lilo awọn iho isọdọtun yoo ṣe idiwọ piparẹ awọn faili WAL ti asopọ ko ka. Ni idi eyi, isalẹ wa: ti asopọ nẹtiwọki laarin asopo ati DBMS ba ni idilọwọ fun igba pipẹ, aye wa pe aaye disk yoo pari, ati pe eyi le ja si ikuna ti gbogbo DBMS.
    • Ninu ọran ti MySQL awọn faili binlog le jẹ yiyi nipasẹ DBMS funrararẹ ṣaaju ki asopọ pada. Eyi yoo fa ki asopo naa lọ sinu ipo ti o kuna, ati pe yoo nilo lati tun bẹrẹ ni ipo fọtoyiya akọkọ lati tẹsiwaju kika lati awọn binlogs lati mu iṣẹ ṣiṣe deede pada.
    • on MongoDB. Iwe naa sọ pe: ihuwasi ti asopo ohun ti o ba jẹ pe a ti paarẹ awọn faili log/oplog ati pe asopo naa ko le tẹsiwaju kika lati ipo ti o wa ni pipa jẹ kanna fun gbogbo DBMS. O wa ni otitọ pe asopo yoo lọ sinu ipinle kuna ati pe yoo nilo atunbere ni ipo naa aworan ibẹrẹ.

      Sibẹsibẹ, awọn imukuro wa. Ti asopo naa ba wa ni ipo ti ge asopọ fun igba pipẹ (tabi ko le de apẹẹrẹ MongoDB), ati pe oplog ti yiyi ni akoko yii, lẹhinna nigbati asopọ ba tun pada, asopo naa yoo tẹsiwaju ni idakẹjẹ lati ka data lati ipo akọkọ ti o wa. , ti o jẹ idi diẹ ninu awọn data ni Kafka kii ṣe yoo lu.

ipari

Debezium jẹ iriri akọkọ mi pẹlu awọn eto CDC ati pe o ti ni idaniloju pupọ. Ise agbese na ṣe ẹbun atilẹyin ti DBMS akọkọ, irọrun ti iṣeto, atilẹyin fun iṣupọ ati agbegbe ti nṣiṣe lọwọ. Fun awọn ti o nifẹ si adaṣe, Mo ṣeduro pe ki o ka awọn itọsọna fun Kafka Sopọ и Debezium.

Ti a ṣe afiwe si asopọ JDBC fun Kafka Connect, anfani akọkọ ti Debezium ni pe awọn iyipada ti wa ni kika lati awọn akọọlẹ DBMS, eyiti o fun laaye data lati gba pẹlu idaduro diẹ. Asopọ JDBC (ti a pese nipasẹ Kafka Sopọ) beere tabili ti a tọpinpin ni aarin aarin ati (fun idi kanna) ko ṣe ina awọn ifiranṣẹ nigbati data paarẹ (bawo ni o ṣe le beere fun data ti ko si nibẹ?).

Lati yanju awọn iṣoro ti o jọra, o le san ifojusi si awọn solusan wọnyi (ni afikun si Debezium):

PS

Ka tun lori bulọọgi wa:

orisun: www.habr.com

Fi ọrọìwòye kun