Danasîna Debezium - CDC ji bo Apache Kafka

Danasîna Debezium - CDC ji bo Apache Kafka

Di xebata xwe de, ez pir caran rastî çareseriyên teknîkî / hilberên nermalavê yên nû tê, ku agahdariya di derbarê wan de li ser Înterneta bi zimanê rûsî pir kêm e. Bi vê gotarê re ez ê hewl bidim ku valahiyek weha bi mînakek ji pratîka xweya vê dawiyê tijî bikim, dema ku min hewce kir ku şandina bûyerên CDC ji du DBMS-yên populer (PostgreSQL û MongoDB) ji komek Kafka re bi karanîna Debezium veava bike. Ez hêvî dikim ku ev gotara vekolînê, ku di encama xebata hatî kirin de xuya dike, dê ji yên din re kêrhatî be.

Debezium û CDC bi gelemperî çi ye?

Debezium - nûnerê kategoriya nermalava CDC (Guhertina Daneyên Capture), an jî rasttir, ew komek girêdanên ji bo DBMS-yên cihêreng ên ku bi çarçoveya Apache Kafka Connect-ê re hevaheng e.

ev Projeya Çavkaniya Vekirî, di bin lîsansa Apache v2.0 de û ji hêla Red Hat ve tê piştgirî kirin. Pêşveçûn ji 2016-an vir ve berdewam e û naha ew ji bo DBMS-yên jêrîn piştgirîya fermî peyda dike: MySQL, PostgreSQL, MongoDB, SQL Server. Ji bo Cassandra û Oracle jî girêdan hene, lê niha ew di rewşa "gihîştina zû" de ne, û serbestberdanên nû lihevhatina paşdemayî garantî nakin.

Ger em CDC-ê bi nêzîkatiya kevneşopî re bidin hev (gava ku serîlêdan rasterast daneyên ji DBMS-ê dixwîne), feydeyên wê yên sereke cîbicîkirina guheztina daneyê di asta rêzê de bi derengiya kêm, pêbaweriya bilind û hebûna heye. Du xalên paşîn bi karanîna komek Kafka wekî depoyek ji bo bûyerên CDC têne bidestxistin.

Feydeyek din ev e ku modelek yekane ji bo hilanîna bûyeran tê bikar anîn, ji ber vê yekê serîlêdana paşîn ne hewce ye ku ji hûrguliyên xebitandina DBMS-ên cihêreng xeman bike.

Di dawiyê de, karanîna brokerek peyamê dihêle ku serîlêdanên ku guhartinên daneyan bişopînin ku bi rengek horizontî mezin bibin. Di heman demê de, bandora li ser çavkaniya daneyê kêm dibe, ji ber ku dane ne rasterast ji DBMS, lê ji koma Kafka têne wergirtin.

Li ser mîmariya Debezium

Bikaranîna Debezium bi vê nexşeya hêsan tê:

DBMS (wek çavkaniyek daneyê) → girêdana li Kafka Connect → Apache Kafka → xerîdar

Wekî mînakek, li vir diagramek ji malpera projeyê heye:

Danasîna Debezium - CDC ji bo Apache Kafka

Lêbelê, ez bi rastî ji vê nexşeyê hez nakim, ji ber ku wusa dixuye ku tenê karanîna girêdanek lavaboyê gengaz e.

Di rastiyê de, rewş cûda ye: Dagirtina Gola Daneyên xwe (girêdana paşîn a di diagrama jor de) Ev ne tenê awayê karanîna Debezium e. Bûyerên ku ji Apache Kafka re hatine şandin dikarin ji hêla serîlêdanên we ve werin bikar anîn da ku cûrbecûr rewşan birêve bibin. Bo nimûne:

  • rakirina daneyên negirêdayî ji cache;
  • şandina ragihandinê;
  • nûvekirinên navnîşa lêgerînê;
  • cûreyek têketinên kontrolê;
  • ...

Ger we serîlêdanek Java-yê hebe û hewcedarî/îmkana karanîna komek Kafka tune be, di heman demê de îmkana xebatê jî heye. bicîbûyî-girêdan. Feydeya eşkere ev e ku ew hewcedariya binesaziya zêde (di forma girêdan û Kafka de) ji holê radike. Lêbelê, ev çareserî ji guhertoya 1.1-ê ve hatî paşve xistin û êdî ji bo karanîna nayê pêşniyar kirin (piştgiriya wê dibe ku di weşanên pêşerojê de were rakirin).

Vê gotarê dê mîmariya ku ji hêla pêşdebiran ve hatî pêşniyar kirin nîqaş bike, ku tolerasyona xeletiyê û mezinbûnê peyda dike.

Veavakirina Connector

Ji bo ku em dest bi şopandina guhertinên di nirxa herî girîng - daneyan de bikin - em hewce ne:

  1. çavkaniya daneyê, ku dikare bibe MySQL ji guhertoya 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (navnîşa bêkêmasî);
  2. Koma Apache Kafka;
  3. Mînaka Kafka Connect (guhertoyên 1.x, 2.x);
  4. girêdana Debezium mîheng kirin.

Li ser du xalên pêşîn bixebitin, ango. Pêvajoya sazkirinê ya DBMS û Apache Kafka li derveyî çarçoveya gotarê ye. Lêbelê, ji bo kesên ku dixwazin her tiştî di sandboxê de bicîh bikin, depoya fermî ya bi mînakan re amade ye. docker-compose.yaml.

Em ê bi berfirehî li ser du xalên dawî rawestin.

0. Kafka Têkilî

Li vir û bêtir di gotarê de, hemî nimûneyên mîhengê di çarçoveya wêneya Docker de ku ji hêla pêşdebirên Debezium ve hatî belav kirin têne nîqaş kirin. Ew hemî pelên pêvekê (girêdan) hewce dike û veavakirina Kafka Connect bi karanîna guhêrbarên hawîrdorê peyda dike.

Ger hûn dixwazin Kafka Connect ji Confluent bikar bînin, hûn ê hewce bikin ku pêvekên girêdanên pêwîst bi rengek serbixwe li pelrêça ku tê de hatî destnîşan kirin zêde bikin. plugin.path an jî bi guhêrbarek jîngehê ve hatî danîn CLASSPATH. Mîhengên ji bo xebatkar û girêdanên Kafka Connect bi navgîniya pelên vesazkirinê yên ku wekî arguman ji fermana destpêkirina karker re têne şandin têne destnîşankirin. Ji bo bêtir agahdarî, binêre belgekirin.

Tevahiya pêvajoya sazkirina Debeizum di guhertoya girêdanê de di du qonaxan de pêk tê. Ka em li her yek ji wan binêrin:

1. Sazkirina çarçoveya Kafka Connect

Ji bo guheztina daneyan li koma Apache Kafka, pîvanên taybetî di çarçoveya Kafka Connect de têne danîn, wek:

  • parametreyên ji bo girêdana bi komê,
  • navên mijarên ku tê de veavakirina girêdanê bixwe dê rasterast were hilanîn,
  • navê koma ku tê de girêdan dimeşe (heke moda belavkirî tê bikar anîn).

Wêneya fermî ya Docker ya projeyê veavakirinê bi karanîna guhêrbarên hawîrdorê piştgirî dike - ya ku em ê bikar bînin ev e. Ji ber vê yekê, wêneyê dakêşin:

docker pull debezium/connect

Rêjeya herî kêm a guhêrbarên jîngehê yên ku ji bo xebitandina girêdanê hewce ne wiha ye:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - navnîşa destpêkê ya pêşkêşkerên koma Kafka ji bo bidestxistina navnîşek bêkêmasî ya endamên komê;
  • OFFSET_STORAGE_TOPIC=connector-offsets - mijarek ji bo hilanîna pozîsyonên ku girêdana niha lê ye;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - mijar ji bo hilanîna statûya girêdanê û karên wê;
  • CONFIG_STORAGE_TOPIC=connector-config - mijar ji bo hilanîna daneyên veavakirina girêdanê û karên wê;
  • GROUP_ID=1 - Nasnameya koma karkeran a ku peywira girêdanê dikare li ser were meşandin; dema ku tê belavkirin pêdivî ye (belav kirin) rejîm.

Em konteynerê bi van guherbaran dest pê dikin:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nîşe li ser Avro

Bi xwerû, Debezium daneyan di formata JSON de dinivîse, ku ji bo sandbox û mîqdarên piçûk ên daneyê tê pejirandin, lê dikare di databasên pir barkirî de bibe pirsgirêk. Alternatîfek ji veguherînerek JSON ev e ku meriv bi karanîna peyaman serialîze bike Avro nav formatek binary, ku barkirina li ser binepergala I/O ya di Apache Kafka de kêm dike.

Ji bo ku Avro bikar bînin, hûn hewce ne ku cîhek cûda bicîh bikin schema-qeydê (ji bo hilanîna diagraman). Guherbarên ji bo veguherîner dê bi vî rengî xuya bikin:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Hûrguliyên li ser karanîna Avro û sazkirina qeydê ji bo wê li derveyî çarçoweya vê gotarê ne - bêtir, ji bo zelaliyê, em ê JSON bikar bînin.

2. Veavakirina connector xwe

Naha hûn dikarin rasterast biçin veavakirina girêdanê bixwe, ku dê daneyan ji çavkaniyê bixwîne.

Ka em li mînaka girêdanên ji bo du DBMS-an binêrin: PostgreSQL û MongoDB, ku tê de ezmûna min heye û tê de cûdahî hene (her çend piçûk, lê di hin rewşan de girîng in!).

Veavakirin bi nîşana JSON-ê tê vegotin û bi karanîna daxwazek POST-ê li Kafka Connect tê barkirin.

2.1. PostgreSQL

Mînak veavakirina girêdana ji bo PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prensîba xebata girêdanê piştî vê sazkirinê pir hêsan e:

  • Dema ku yekem car tê destpêkirin, ew bi databasa ku di veavakirinê de hatî destnîşan kirin ve girêdide û di modê de dest pê dike wêneya destpêkê, ji Kafka re berhevoka destpêkê ya daneyên ku bi karanîna şertê hatine bidestxistin dişîne SELECT * FROM table_name.
  • Piştî ku destpêkkirin qediya, girêdan diçe modê da ku guhartinên ji pelên PostgreSQL WAL bixwînin.

Li ser vebijarkên bikar anîn:

  • name - navê girêdana ku ji bo veavakirina ku li jêr hatî destnîşan kirin tê bikar anîn; di pêşerojê de, ev nav ji bo xebitandina bi girêdanê (ango, dîtina statûyê / ji nû ve / veavakirinê / nûvekirina veavakirinê) bi riya Kafka Connect REST API-yê tê bikar anîn;
  • connector.class - Dersa girêdana DBMS-ê ya ku dê ji hêla girêdana mîhengkirî ve were bikar anîn;
  • plugin.name - navê pêvekê ji bo deşîfrekirina mentiqî ya daneyên ji pelên WAL. Ji bo hilbijartina ji berdest e wal2json, decoderbuffs и pgoutput. Du ya yekem hewceyê sazkirina pêvekên guncan di DBMS-ê de, û pgoutput ji bo PostgreSQL guhertoya 10 û jortir hewcedarî manipulasyonên zêde nake;
  • database.* - vebijarkên ji bo girêdana bi databasê, li ku database.server.name - Navê mînaka PostgreSQL ji bo avakirina navê mijarê di koma Kafka de tê bikar anîn;
  • table.include.list - navnîşek tabloyên ku em dixwazin tê de guhertinan bişopînin; di formatê de diyar kirin schema.table_name; bi hev re nayê bikar anîn table.exclude.list;
  • heartbeat.interval.ms - navber (di milî çirkeyan de) ku pêveker ji mijarek taybetî re peyamên lêdana dil dişîne;
  • heartbeat.action.query - daxwazek ku dê di şandina her peyamek dil de were bicîh kirin (vebijark di guhertoya 1.1 de xuya bû);
  • slot.name - navê hêlîna dubarekirinê ya ku dê ji hêla girêdanê ve were bikar anîn;
  • publication.name - Name weşanên di PostgreSQL de, ku girêdan bikar tîne. Ger ew nebe, Debezium dê hewl bide ku wê biafirîne. Ger bikarhênerê ku di bin pêwendiyê de tê çêkirin ji bo vê çalakiyê ne xwediyê mafên têr be, dê girêdan bi xeletiyek biqede;
  • transforms bi rastî diyar dike ka meriv çawa navê mijara armancê biguhezîne:
    • transforms.AddPrefix.type nîşan dide ku em ê bêjeyên rêkûpêk bikar bînin;
    • transforms.AddPrefix.regex - maskek ku navê mijara armanc ji nû ve diyar dike;
    • transforms.AddPrefix.replacement - rasterast tiştê ku em ji nû ve pênase dikin.

Zêdetir li ser lêdana dil û veguherînan

Bi xwerû, pêveker ji bo her danûstendinek peywirdar daneyan ji Kafka re dişîne, û LSN (Hejmara Rêza Têketinê) di mijara karûbarê de tê tomar kirin. offset. Lê çi diqewime heke girêdan were mîheng kirin ku ne tevahiya databasê, lê tenê beşek ji tabloyên wê bixwîne (ku tê de nûvekirinên daneyê pir caran çênabin)?

  • Têkilî dê pelên WAL-ê bixwîne û dê li ser tabloyên ku ew çavdêrî dike ti peywirên danûstendinê nas neke.
  • Ji ber vê yekê, ew ê pozîsyona xwe ya heyî ne di mijar û ne jî di hêlîna dubarekirinê de nûve neke.
  • Ev, di encamê de, dê bibe sedem ku pelên WAL li ser dîskê werin girtin û dibe ku cîhê dîskê biqede.

Û ev e ku vebijark ji bo rizgariyê tê. heartbeat.interval.ms и heartbeat.action.query. Bikaranîna van vebijarkan bi cotan gengaz dike ku her carê ku peyamek lêdana dil tê şandin daxwazek ji bo guhertina daneyan di tabloyek cihê de pêk bîne. Bi vî rengî, LSN-ya ku pêvekêş niha li ser tê de ye (di hêlîna dubarekirinê de) bi domdarî tê nûve kirin. Ev dihêle DBMS pelên WAL-ê yên ku êdî ne hewce ne rake. Hûn dikarin bêtir fêr bibin ka vebijarkan çawa tê de dixebitin belgekirin.

Vebijarkek din a ku hêjayî baldariya nêzîk e transforms. Her çend ew bêtir li ser rehetî û bedewiyê ye ...

Bi xwerû, Debezium mijaran bi karanîna polîtîkaya navên jêrîn diafirîne: serverName.schemaName.tableName. Dibe ku ev her gav ne rehet be. Vebijêrk transforms Hûn dikarin bêjeyên birêkûpêk bikar bînin da ku navnîşek tabloyan diyar bikin, bûyerên ku ji wan hewce ne ku ji mijarek bi navek taybetî re werin rêve kirin.

Di veavakirina me de spas transforms jêrîn diqewime: hemî bûyerên CDC ji databasa çavdêrîkirî dê biçin mijarek bi navê data.cdc.dbname. Wekî din (bêyî van mîhengan), Debezium dê ji hêla xwerû ve mijarek ji bo her tabloyê biafirîne: pg-dev.public.<table_name>.

Sînorên Connector

Ji bo qedandina danasîna veavakirina girêdana ji bo PostgreSQL, hêja ye ku meriv li ser taybetmendiyên jêrîn / tixûbên xebata wê biaxive:

  1. Fonksiyona girêdanê ya ji bo PostgreSQL xwe dispêre têgeha deşîfrekirina mentiqî. Ji ber vê yekê ew daxwazên ji bo guhertina avahiya databasê naşopîne (DDL) - Li gorî vê yekê, ev dane dê di mijaran de nebe.
  2. Ji ber ku hêlînên dubarekirinê têne bikar anîn, girêdana girêdanek gengaz e bi tenê ji mînaka sereke ya DBMS re.
  3. Ger bikarhênerê ku di binê wî de girêdan bi databasê ve girêdide xwedan mafên tenê xwendinê ye, wê hingê berî destpêkirina yekem hûn ê hewce bikin ku bi destan hêlînek dubarekirinê biafirînin û li databasê biweşînin.

Sepandina veavakirinê

Ji ber vê yekê, werin em veavakirina xwe di girêdanê de bar bikin:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Em kontrol dikin ku dakêşandin serketî bû û girêdan dest pê kir:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Mezin: ew sazkirî ye û amade ye ku biçe. Naha em îdia bikin ku bibin xerîdar û bi Kafka ve girêbidin, pişt re em ê navnîşek di tabloyê de zêde bikin û biguhezînin:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Di mijara me de ew ê wekî jêrîn were xuyang kirin:

JSON bi guhertinên me re pir dirêj

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Di her du rewşan de, tomar ji kilîta (PK) ya tomara ku hatî guheztin pêk tê, û cewhera guhertinan pêk tê: çi tomar berê bû û çi bû piştî.

  • Di doza INSERT: nirxa berê (before) wekhev null, û piştî - xêza ku hate danîn.
  • Di doza UPDATE: di payload.before rewşa berê ya rêzê tê xuyang kirin, û tê de payload.after - nû bi cewhera guhertinan.

2.2 MongoDB

Ev girêdan mekanîzmaya dubarekirina MongoDB-ya standard bikar tîne, agahdariya ji oploga girêka DBMS-a bingehîn dixwîne.

Mîna girêdana ku berê hatî destnîşan kirin ji bo PgSQL, li vir jî, di destpêka yekem de, wêneya daneya seretayî tê kişandin, piştî ku girêdan diguhezîne moda xwendina oplogê.

Mînaka veavakirinê:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Wekî ku hûn dibînin, li vir li gorî mînaka berê vebijarkên nû tune ne, lê tenê hejmara vebijarkên ku ji bo girêdana databasê û pêşgirên wan berpirsiyar in kêm bûne.

Mîhengên transforms vê carê ew jêrîn dikin: ew navê mijara armancê ji şemayê veguherînin <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerans xelet

Pirsgirêka tolerasyona xeletiyê û hebûna zêde di dema me de ji her demê bêtir tûjtir e - nemaze dema ku em li ser dane û danûstendinan diaxivin, û şopandina guheztinên daneyê di vê pirsgirêkê de namîne. Werin em binihêrin ka di prensîbê de çi dikare xelet bibe û di her rewşê de dê çi bibe serê Debezium.

Sê vebijarkên veqetandinê hene:

  1. têkçûna Kafka Connect. Ger Connect were mîheng kirin ku di moda belavkirî de bixebite, ev hewce dike ku gelek xebatkar heman group.id saz bikin. Dûv re, heke yek ji wan têk nebe, dê girêdan li ser xebatkarek din ji nû ve were destpêkirin û xwendina ji pozîsyona paşîn a paşîn a di mijara Kafka de berdewam bike.
  2. Wendabûna girêdana bi koma Kafka re. Têkilî dê tenê li cihê ku ji Kafka re neşîne xwendinê rawestîne, û dê bi periyodîk hewl bide ku wê ji nû ve bişîne heya ku hewldan biserkeve.
  3. Çavkaniya daneyan tune. Têkilî dê hewl bide ku ji nû ve bi çavkaniyê ve wekî ku hatî mîheng kirin ve girêde. Vebijêrk e 16 hewldanên bikaranîna paşvekêşana berbiçav. Piştî hewldana 16-ê ya neserkeftî, dê peywir wekî were nîşankirin nekin û hûn ê hewce bikin ku wê bi desta bi navgîniya Kafka Connect REST ji nû ve bidin destpêkirin.
    • Di doza PostgreSQL Daneyên wê winda nebin, ji ber Bikaranîna hêlînên dubarekirinê dê pêşî li we bigire ku hûn pelên WAL-ê yên ku ji hêla girêdanê ve nayên xwendin jêbirin. Di vê rewşê de, xeletiyek drav jî heye: heke girêdana torê ya di navbera girêdan û DBMS-ê de ji bo demek dirêj ve were qut kirin, îhtîmalek heye ku cîhê dîskê biqede, û ev dikare bibe sedema têkçûna tevahiya DBMS.
    • Di doza MySQL pelên binlog dikare ji hêla DBMS-ê bixwe ve were zivirandin berî ku girêdan were nûve kirin. Ev ê bibe sedem ku girêdan têkeve rewşa têkçûyî, û ji bo vegerandina xebata normal, hûn hewce ne ku di moda wêneya destpêkê de ji nû ve bidin destpêkirin da ku xwendina ji binlogan bidomînin.
    • li ser MongoDB. Di belgeyê de tê gotin ku tevgera girêdanê di bûyera ku pelên têketin/oplogê hatine jêbirin û girêdan nikaribe xwendina xwe ji cihê ku lê hiştiye bidomîne ji bo hemî DBMS-an yek e. Ev tê wê wateyê ku girêdan dê têkeve nav dewletê nekin û dê hewce bike ku di modê de ji nû ve dest pê bike wêneya destpêkê.

      Lêbelê, îstîsna hene. Ger girêdan ji bo demek dirêj ve hate qut kirin (an jî nikarîbû bigihîje mînaka MongoDB), û oplog di vê demê de di zivirînê de derbas bû, wê hingê gava ku girêdan were nûve kirin, dê girêdan bi aramî xwendina daneyan ji pozîsyona yekem a berdest berdewam bike. ji ber vê yekê hin daneyên Kafka ne dê bixin.

encamê

Debezium ezmûna min a yekem bi pergalên CDC re ye û bi tevahî pir erênî ye. Proje bi piştgiriya xwe ya ji bo DBMS-yên sereke, hêsankirina veavakirinê, piştgirîya komkirinê, û civata çalak bi ser ket. Ji bo kesên ku bi pratîkê re eleqedar dibin, ez pêşniyar dikim ku hûn rêberan bixwînin Kafka Connect и Debezium.

Li gorî girêdana JDBC ji bo Kafka Connect, avantaja sereke ya Debezium ev e ku guheztin ji têketinên DBMS têne xwendin, ku dihêle ku dane bi derengiya hindiktirîn were wergirtin. JDBC Connector (ji Kafka Connect) di navberek diyarkirî de li tabloya çavdêrîkirî dipirse û (ji ber heman sedemê) dema ku dane têne jêbirin peyaman naafirîne (hûn çawa dikarin daneyên ku tune bipirsin?).

Ji bo çareserkirina pirsgirêkên bi vî rengî, hûn dikarin bala xwe bidin çareseriyên jêrîn (ji bilî Debezium):

PS

Li ser bloga me jî bixwînin:

Source: www.habr.com

Add a comment