Na-ewebata Debezium - CDC maka Apache Kafka

Na-ewebata Debezium - CDC maka Apache Kafka

N'ọrụ m, m na-ahụkarị ngwọta teknụzụ ọhụrụ / ngwaahịa ngwanrọ, ozi gbasara nke dị ụkọ na ịntanetị n'asụsụ Russian. N'isiokwu a, m ga-agbalị imeju otu ọdịiche dị otú ahụ na ihe atụ sitere na omume m na-adịbeghị anya, mgbe m kwesịrị ịhazi izipu ihe omume CDC site na DBMS abụọ a ma ama (PostgreSQL na MongoDB) na ụyọkọ Kafka na-eji Debezium. Enwere m olileanya na isiokwu nyocha a, nke pụtara n'ihi ọrụ a rụrụ, ga-abara ndị ọzọ uru.

Kedu ihe bụ Debezium na CDC n'ozuzu?

Debezium - onye nnọchi anya otu sọftụwia CDC (Weghara Mgbanwe Data), ma ọ bụ karịa kpọmkwem, ọ bụ njikọ njikọ maka DBMS dị iche iche dakọtara na Apache Kafka Connect framework.

a Ọrụ mepere emepe, nyere ikike n'okpuru ikike Apache v2.0 yana Red Hat kwadoro. Mmepe na-aga n'ihu kemgbe 2016 ma ugbu a ọ na-enye nkwado gọọmentị maka DBMS ndị a: MySQL, PostgreSQL, MongoDB, SQL Server. Enwekwara njikọ maka Cassandra na Oracle, mana n'oge a ha nọ na ọkwa "nnweta mmalite", na mwepụta ọhụrụ anaghị ekwe nkwa ndakọrịta azụ.

Ọ bụrụ na anyị atụnyere CDC na usoro ọdịnala (mgbe ngwa ahụ na-agụ data sitere na DBMS ozugbo), uru ya bụ isi gụnyere ntinye mgbanwe mgbanwe data na ọkwa dị n'ahịrị na obere latency, ntụkwasị obi dị elu na nnweta. A na-enweta isi ihe abụọ ikpeazụ site na iji ụyọkọ Kafka dị ka ebe nchekwa maka ihe omume CDC.

Uru ọzọ bụ eziokwu ahụ bụ na a na-eji otu ụdị eme ihe iji chekwaa ihe omume, ya mere ngwa njedebe adịghị echegbu onwe ya banyere nuances nke ịrụ ọrụ DBMS dị iche iche.

N'ikpeazụ, iji onye na-ere ahịa ozi na-enye ohere ngwa ndị na-enyocha mgbanwe na data iji gbasaa n'usoro. N'otu oge ahụ, a na-ebelata mmetụta na isi iyi data, ebe ọ bụ na a na-enweta data ahụ ọ bụghị kpọmkwem na DBMS, kama site na ụyọkọ Kafka.

Banyere ihe owuwu Debezium

Iji Debezium gbadata na atụmatụ a dị mfe:

DBMS (dị ka isi iyi data) → njikọ na Kafka Jikọọ → Apache Kafka → ndị ahịa

Dịka ọmụmaatụ, ebe a bụ eserese sitere na webụsaịtị ọrụ:

Na-ewebata Debezium - CDC maka Apache Kafka

Otú ọ dị, enweghị m mmasị na atụmatụ a, n'ihi na ọ dị ka ọ bụ naanị iji njikọ sink ga-ekwe omume.

N'ezie, ọnọdụ dị iche: na-ejuputa Data Lake gị (njikọ ikpeazụ na eserese dị n'elu) Nke a abụghị naanị ụzọ iji Debezium. Ihe omume ezigara na Apache Kafka nwere ike iji ngwa gị mee ihe maka ọnọdụ dị iche iche. Ọmụmaatụ:

  • wepụ data na-adịghị mkpa na cache;
  • izipu ọkwa;
  • mmelite index ọchụchọ;
  • ụfọdụ ụdị ndekọ ndekọ;
  • ...

Ọ bụrụ na ị nwere ngwa Java na enweghị mkpa / ohere iji ụyọkọ Kafka, enwere ike ịrụ ọrụ site na ya. agbakwunyere-njikọ. Uru doro anya bụ na ọ na-ewepụ mkpa maka akụrụngwa ndị ọzọ (n'ụdị njikọ na Kafka). Agbanyeghị, ewepụla ihe ngwọta a kemgbe ụdị 1.1 na anaghị akwado ya maka ojiji (enwere ike iwepụ nkwado maka ya na mwepụta n'ọdịnihu).

Edemede a ga-atụle ihe owuwu nke ndị mmepe kwadoro, nke na-enye nnabata mmejọ na scalability.

Nhazi njikọ

Iji malite nsuso mgbanwe na uru kacha mkpa - data - anyị chọrọ:

  1. isi iyi data, nke nwere ike ịbụ MySQL malite na ụdị 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (ndepụta zuru ezu);
  2. Ụyọkọ Apache Kafka;
  3. Ihe atụ Jikọọ Kafka (ụdị 1.x, 2.x);
  4. njikọ Debezium ahaziri.

Na-arụ ọrụ na isi ihe abụọ mbụ, i.e. Usoro ntinye nke DBMS na Apache Kafka karịrị akarị nke isiokwu. Agbanyeghị, maka ndị chọrọ ibuga ihe niile dị na igbe ájá, ebe nchekwa gọọmentị nwere ihe atụ nwere ihe ejikere. docker-dee.yaml.

Anyị ga-ebikwu nkọwa n'isi ihe abụọ ikpeazụ.

0. Kafka Jikọọ

N'ebe a na n'ihu n'isiokwu ahụ, a na-atụle ihe atụ nhazi niile na ọnọdụ nke ihe oyiyi Docker nke ndị mmepụta Debezium kesara. Ọ nwere faịlụ ngwa mgbakwunye niile dị mkpa (njikọ) ma na-enye nhazi nke Kafka Jikọọ site na iji mgbanwe gburugburu ebe obibi.

Ọ bụrụ n’ịchọrọ iji Kafka Connect si Confluent, ị ga-achọ itinye onwe gị plugins nke njikọ dị mkpa na ndekọ aha akọwapụtara na ya. plugin.path ma ọ bụ tọọ site na mgbanwe gburugburu ebe obibi CLASSPATH. A na-ekpebi ntọala maka onye ọrụ Kafka Jikọọ na njikọ site na faịlụ nhazi nke gafere dị ka arụmụka na iwu mmalite onye ọrụ. Maka nkọwa ndị ọzọ, lee akwụkwọ.

A na-eme usoro niile nke ịtọlite ​​​​Debeizum na ụdị njikọ ahụ na nkebi abụọ. Ka anyị leba anya na nke ọ bụla n'ime ha:

1. Ịtọlite ​​​​Kafka Jikọọ framework

Iji bunye data na ụyọkọ Apache Kafka, a na-edobe paramita dị iche iche na usoro njikọ Kafka, dị ka:

  • paramita maka ijikọ na ụyọkọ ahụ,
  • aha isiokwu nke a ga-echekwa nhazi nke njikọ n'onwe ya ozugbo,
  • aha otu nke njikọ na-agba ọsọ (ọ bụrụ na ejiri ọnọdụ kesaa).

Ihe onyonyo Docker gọọmentị nke ọrụ ahụ na-akwado nhazi site na iji mgbanwe gburugburu ebe obibi - nke a bụ ihe anyị ga-eji. Yabụ, budata onyonyo a:

docker pull debezium/connect

Opekempe nke mgbanwe gburugburu ebe obibi achọrọ iji mee njikọ dị ka ndị a:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - ndepụta mbụ nke sava ụyọkọ Kafka iji nweta ndepụta zuru oke nke ndị otu ụyọkọ;
  • OFFSET_STORAGE_TOPIC=connector-offsets - isiokwu maka ịchekwa ọnọdụ ebe njikọ dị ugbu a;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - isiokwu maka ịchekwa ọnọdụ nke njikọ na ọrụ ya;
  • CONFIG_STORAGE_TOPIC=connector-config - isiokwu maka ịchekwa data nhazi njikọ na ọrụ ya;
  • GROUP_ID=1 - njirimara nke otu ndị ọrụ nke njikọ njikọ ọrụ nwere ike igbu; dị mkpa mgbe eji ekesa (ekesa) ọchịchị.

Anyị na-eji mgbanwe ndị a malite akpa ahụ:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Rịba ama banyere Avro

Site na ndabara, Debezium na-ede data na usoro JSON, nke a na-anabata maka igbe ájá na obere data, mana ọ nwere ike ịghọ nsogbu na ọdụ data nke ukwuu. Nhọrọ ọzọ maka ntụgharị JSON bụ iji serialize ozi AVRO n'ime usoro ọnụọgụ abụọ, nke na-ebelata ibu dị na sistemụ I/O na Apache Kafka.

Iji jiri Avro, ịkwesịrị ibuga ihe dị iche schema-ndebanye aha (maka ịchekwa eserese). Ụdị mgbanwe maka onye ntụgharị ga-adị ka nke a:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Nkọwa maka iji Avro na ịtọlite ​​​​ndebanye aha maka ya karịrị oke nke isiokwu a - n'ihu na, maka idoanya, anyị ga-eji JSON.

2. Ịhazi njikọ n'onwe ya

Ugbu a ị nwere ike ịga ozugbo na nhazi nke njikọ ahụ n'onwe ya, nke ga-agụ data sitere na isi iyi.

Ka anyị leba anya n'ihe atụ nke njikọ maka DBMS abụọ: PostgreSQL na MongoDB, nke m nwere ahụmahụ na nke dị iche iche (n'agbanyeghị obere, ma n'ọnọdụ ụfọdụ dị ịrịba ama!).

A kọwapụtara nhazi ahụ na nrịbama JSON wee bulite ya na Jikọọ Kafka site na iji arịrịọ POST.

2.1. PostgreSQL

Nhazi njikọ ihe atụ maka PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ụkpụrụ nke ọrụ nke njikọ mgbe nhazi a dị nnọọ mfe:

  • Mgbe emebere ya maka oge mbụ, ọ na-ejikọ na nchekwa data akọwapụtara na nhazi wee malite na ọnọdụ foto mbụ, na-ezigara Kafka nchịkọta mbụ nke data enwetara site na iji ọnọdụ SELECT * FROM table_name.
  • Mgbe ịmalitechara mmalite, njikọ ahụ na-abanye na ọnọdụ iji gụọ mgbanwe na faịlụ PostgreSQL WAL.

Banyere nhọrọ ndị ejiri:

  • name - aha njikọ nke a na-eji nhazi nke akọwapụtara n'okpuru ebe a; n'ọdịnihu, a na-eji aha a na-arụ ọrụ na njikọ (ya bụ, lelee ọnọdụ / malitegharịa / melite nhazi) site na Kafka Connect REST API;
  • connector.class - klaasị njikọ DBMS nke njikọ ahazi ga-eji;
  • plugin.name - aha ngwa mgbakwunye maka ngbanwe ezi uche dị na data sitere na faịlụ WAL. Dị ịhọrọ site na wal2json, decoderbuffs и pgoutput. Abụọ ndị mbụ chọrọ ntinye nke ndọtị kwesịrị ekwesị na DBMS, na pgoutput maka ụdị PostgreSQL nke 10 na nke dị elu anaghị achọ nhazi ọzọ;
  • database.* - nhọrọ maka ijikọ na nchekwa data, ebe database.server.name - Aha ihe atụ PostgreSQL ejiri mepụta aha isiokwu na ụyọkọ Kafka;
  • table.include.list - ndepụta nke tebụl nke anyị chọrọ soro mgbanwe; kpọmkwem na usoro schema.table_name; enweghị ike iji ọnụ na table.exclude.list;
  • heartbeat.interval.ms - etiti oge (na milliseconds) nke njikọ na-eziga ozi obi na-akụ na isiokwu pụrụ iche;
  • heartbeat.action.query - arịrịọ nke a ga-eme mgbe ị na-eziga ozi obi mgbawa ọ bụla (nhọrọ ahụ pụtara na ụdị 1.1);
  • slot.name - aha oghere mmeghari nke njikọ ga-eji;
  • publication.name - Aha akwụkwọ na PostgreSQL, nke njikọ na-eji. Ọ bụrụ na ọ dịghị, Debezium ga-agbalị ịmepụta ya. Ọ bụrụ na onye ọrụ nke emebere njikọ ahụ enweghị ikike zuru oke maka omume a, njikọ ahụ ga-akwụsị na njehie;
  • transforms na-ekpebi otu esi agbanwe aha isiokwu ebumnuche:
    • transforms.AddPrefix.type na-egosi na anyị ga-eji okwu mgbe nile;
    • transforms.AddPrefix.regex - ihe nkpuchi nke na-akọwapụta aha nke isiokwu ezubere iche;
    • transforms.AddPrefix.replacement - ozugbo ihe anyị na-akọwapụta.

Ihe ndị ọzọ gbasara nkụchi obi na mgbanwe

Site na ndabara, njikọ ahụ na-eziga data na Kafka maka azụmahịa ọ bụla etinyere, yana LSN ya (nọmba nke ndekọ aha) ka edere na isiokwu ọrụ. offset. Mana gịnị ga - eme ma ọ bụrụ na ahazi njikọ ahụ ka ọ gụọ ọ bụghị nchekwa data niile, mana naanị akụkụ nke tebụl ya (nke mmelite data anaghị eme ugboro ugboro)?

  • Njikọ ahụ ga-agụ faịlụ WAL ma ọ gaghị achọpụta azụmahịa ọ bụla na-eme na tebụl ọ na-enyocha.
  • Ya mere, ọ gaghị emelite ọnọdụ ya ugbu a ma na isiokwu ma ọ bụ na oghere mmeghari.
  • Nke a, n'aka nke ya, ga-ebute faịlụ WAL na-ejide na diski ma eleghị anya ohere diski ga-agwụ.

Na nke a bụ ebe nhọrọ na-abịa napụta. heartbeat.interval.ms и heartbeat.action.query. Iji nhọrọ ndị a na ụzọ abụọ na-eme ka o kwe omume ịme arịrịọ ịgbanwe data na tebụl dị iche mgbe ọ bụla ezigara ozi nkụta obi. Ya mere, LSN nke njikọ dị ugbu a (na oghere mmeghari) na-emelite mgbe niile. Nke a na-enye ohere ka DBMS wepụ faịlụ WAL na-adịkwaghị mkpa. Ị nwere ike mụtakwuo maka ka nhọrọ ndị ahụ si arụ ọrụ akwụkwọ.

Nhọrọ ọzọ kwesịrị ka e lebara ya anya bụ transforms. Agbanyeghị na ọ bụ maka ịdị mma na ịma mma ...

Site na ndabara, Debezium na-emepụta isiokwu site na iji amụma ịkpọ aha ndị a: serverName.schemaName.tableName. Nke a nwere ike ọ gaghị adị mma mgbe niile. Nhọrọ transforms Ị nwere ike iji okwu oge niile kọwaa ndepụta tebụl, ihe omume ndị ekwesịrị ibuga na isiokwu nwere aha akọwapụtara.

Na nhazi anyị daalụ transforms Ihe ndị a na-eme: ihe niile CDC sitere na nchekwa data a na-enyocha ga-aga na isiokwu nwere aha data.cdc.dbname. Ma ọ bụghị ya (na-enweghị ntọala ndị a), Debezium ga-eji ndabara mepụta isiokwu maka tebụl ọ bụla dị ka: pg-dev.public.<table_name>.

Njikọ njikọ

Iji mechie nkọwa nke nhazi njikọ maka PostgreSQL, ọ bara uru ikwu maka njirimara / njedebe nke ọrụ ya:

  1. Ọrụ nke njikọ maka PostgreSQL dabere na echiche nke ngbanwe ezi uche dị na ya. Ya mere ọ anaghị eso arịrịọ ka ịgbanwe usoro nchekwa data (DDL) - ya mere, data a agaghị adị na isiokwu.
  2. Ebe ọ bụ na a na-eji oghere mmegharị, jikọọ njikọ ga-ekwe omume naanị na ihe atụ DBMS na-eduga.
  3. Ọ bụrụ na onye ọrụ n'okpuru onye njikọ njikọ na nchekwa data nwere ikike ịgụ naanị, mgbe ahụ tupu mmalite mbụ ị ga-achọ iji aka gị mepụta oghere mmeghari ma bipụta ya na nchekwa data.

Na-etinye nhazi ahụ

Yabụ, ka anyị tinye nhazi anyị na njikọ njikọ:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Anyị na-elele na nbudata ahụ gara nke ọma na njikọ malitere:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Ọ dị mma: edobere ya ma dị njikere ịga. Ugbu a, ka anyị mee ka ọ bụrụ onye na-azụ ahịa ma jikọọ na Kafka, mgbe nke ahụ gasịrị, anyị ga-agbakwunye ma gbanwee ntinye na tebụl:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

N'isiokwu anyị, a ga-egosipụta ya dị ka ndị a:

JSON dị ogologo na mgbanwe anyị

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

N'okwu abụọ ahụ, ihe ndekọ na-agụnye igodo (PK) nke ndekọ ahụ gbanwere, yana ihe kachasị mkpa nke mgbanwe: ihe ndekọ ahụ dị na mbụ na ihe ọ ghọrọ mgbe ọ gasịrị.

  • N'ihe banyere INSERT: uru tupu (before) nhata null, na mgbe - akara nke etinyere.
  • N'ihe banyere UPDATE: n'ime payload.before egosipụtara ọnọdụ ahịrị gara aga, na n'ime payload.after - ọhụrụ na isi mgbanwe.

2.2 MongoDB

Njikọ a na-eji usoro mmegharị MongoDB ọkọlọtọ, na-agụ ozi sitere na oplog nke isi DBMS node.

Yiri njikọ nke akọwarala maka PgSQL, ebe a kwa, na mbido mbụ, a na-ewere foto data bụ isi, emesia njikọ ahụ na-agbanye na ọnọdụ ọgụgụ oplog.

Ọmụmaatụ nhazi:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Dịka ị na-ahụ, ọ nweghị nhọrọ ọhụrụ ebe a ma e jiri ya tụnyere ihe atụ gara aga, mana naanị ọnụọgụ nhọrọ maka ijikọ na nchekwa data na prefixes ha belatara.

Ntọala transforms oge a, ha na-eme ihe ndị a: ha na-agbanwe aha isiokwu a na-achọsi ike site na atụmatụ ahụ <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

nnabata mmejọ

Okwu nke nnabata mmejọ na ịdị elu dị elu n'oge anyị dị oke njọ karịa mgbe ọ bụla - ọkachasị mgbe anyị na-ekwu maka data na azụmahịa, yana mgbanwe mgbanwe data adịghị eguzo n'akụkụ na mbipụta a. Ka anyị leba anya n'ihe nwere ike ịgahie ụzọ na ihe ga-eme Debezium na nke ọ bụla.

Enwere nhọrọ ọpụpụ atọ:

  1. Kafka Njikọ ọdịda. Ọ bụrụ na ahaziri Njikọ ka ọ rụọ ọrụ na ọnọdụ ekesa, nke a chọrọ ọtụtụ ndị ọrụ ka ịtọ otu group.id. Mgbe ahụ, ọ bụrụ na otu n'ime ha daa, njikọ ahụ ga-amaliteghachi na onye ọrụ ọzọ wee gaa n'ihu na-agụ site na njedebe ikpeazụ na isiokwu na Kafka.
  2. Ọnwụ nke njikọta na ụyọkọ Kafka. Njikọ ahụ ga-akwụsị ịgụ naanị n'ọnọdụ nke na-ezigaghị Kafka, ọ ga-anwa kwa oge iziga ya ruo mgbe mbọ ahụ ga-aga nke ọma.
  3. Enweghị isi iyi data. Njikọ ahụ ga-anwa ịjikọ ọzọ na isi iyi dị ka ahaziri. The ndabara bụ 16 mgbalị iji exponential backoff. Mgbe mbọ nke iri na isii emeghị nke ọma, a ga-akara ọrụ ahụ dị ka okpu na ị ga-achọ iji aka gị malitegharịa ya site na Kafka Jikọọ REST interface.
    • N'ihe banyere PostgreSQL data agaghị efu, n'ihi na Iji oghere mmegharị ga-egbochi gị ihichapụ faịlụ WAL nke njikọ anaghị agụ. N'okwu a, enwerekwa ala na mkpụrụ ego ahụ: ọ bụrụ na emebi njikọ netwọk n'etiti njikọ na DBMS ruo ogologo oge, ọ ga-ekwe omume na ohere diski ga-agwụ, nke a nwere ike ibute ọdịda nke ọdịda. DBMS niile.
    • N'ihe banyere MySQL DBMS n'onwe ya nwere ike ịtụgharị faịlụ binlog tupu eweghachite njikọta. Nke a ga-eme ka njikọ ahụ banye n'ime ọnọdụ dara ada, na iji weghachi ọrụ nkịtị, ị ga-achọ ịmalitegharị na ọnọdụ foto mbụ iji nọgide na-agụ site na binlogs.
    • on MongoDB. Akwụkwọ ahụ na-ekwu: omume nke njikọ ma ọ bụrụ na ehichapụ faịlụ log/oplog na njikọ enweghị ike ịga n'ihu na-agụ site na ọnọdụ ebe ọ kwụsịrị bụ otu ihe ahụ maka DBMS niile. Ọ pụtara na njikọ ahụ ga-abanye na steeti ahụ okpu ma ga-achọ ịmalitegharị na ọnọdụ foto mbụ.

      Otú ọ dị, e nwere ndị ọzọ. Ọ bụrụ na akwụsịla njikọ ahụ ogologo oge (ma ọ bụ enweghị ike iru ihe atụ MongoDB), na oplog gafere ntụgharị n'oge a, mgbe ahụ ka eweghachiri njikọ ahụ, njikọ ahụ ga-eji nwayọọ na-agụ data site na ọnọdụ mbụ dị, nke mere na ụfọdụ data na Kafka bụghị ga-akụ.

nkwubi

Debezium bụ ahụmịhe mbụ m nwere na sistemụ CDC yana n'ozuzu ya dị mma. Ọrụ ahụ meriri site na nkwado ya maka ndị isi DBMS, mfe nhazi, nkwado ụyọkọ, na obodo na-arụsi ọrụ ike. Maka ndị nwere mmasị na omume, ana m akwado ka ị gụọ ntuziaka maka Njikọ Kafka и Debezium.

E jiri ya tụnyere JDBC njikọ maka Kafka Connect, isi uru nke Debezium bụ na a na-agụ mgbanwe site na ndekọ DBMS, nke na-enye ohere ịnweta data na obere nkwụsịtụ. Njikọ JDBC (nke sitere na Kafka Jikọọ) na-ajụ tebulu a na-enyocha n'otu oge na (n'ihi otu ihe ahụ) anaghị ewepụta ozi mgbe ehichapụ data (kedu ka ị ga-esi jụọ data na-adịghị adị?).

Iji dozie nsogbu ndị yiri ya, ị nwere ike ịṅa ntị na ngwọta ndị a (na mgbakwunye na Debezium):

PS

Gụọkwa na blọọgụ anyị:

isi: www.habr.com

Tinye a comment