Ke hoʻolauna nei iā Debezium - CDC no Apache Kafka

Ke hoʻolauna nei iā Debezium - CDC no Apache Kafka

Ma kaʻu hana, ʻike pinepine au i nā ʻenehana loea hou / huahana polokalamu, ʻike e pili ana i kahi mea liʻiliʻi ma ka Pūnaewele ʻōlelo Lūkini. Me kēia ʻatikala, e hoʻāʻo wau e hoʻopiha i kahi āpau me kahi laʻana mai kaʻu hana hou, i ka wā e pono ai wau e hoʻonohonoho i ka hoʻouna ʻana i nā hanana CDC mai ʻelua DBMS kaulana (PostgreSQL a me MongoDB) i kahi hui Kafka me Debezium. Manaʻo wau e lilo kēia ʻatikala loiloi, i ʻike ʻia ma muli o ka hana i hana ʻia, e pono ai nā poʻe ʻē aʻe.

He aha ka Debezium a me CDC ma ka laulā?

ʻO Debezium — ʻelele o ka māhele polokalamu CDC (Hoʻololi i ka ʻikepili hopu), a ʻoi aku ka pololei, he pūʻulu hoʻohui ia no nā DBMS like ʻole i kūpono me ka ʻōnaehana Apache Kafka Connect.

keia Pāhana Open Source, laikini ʻia ma lalo o ka Apache License v2.0 a kākoʻo ʻia e Red Hat. Ke hoʻomau nei ka hoʻomohala ʻana mai ka makahiki 2016 a i kēia manawa ke hāʻawi nei i ke kākoʻo kūhelu no nā DBMS aʻe: MySQL, PostgreSQL, MongoDB, SQL Server. Aia kekahi mau mea hoʻohui no Cassandra a me Oracle, akā i kēia manawa aia lākou i ke kūlana "ʻike mua", a ʻaʻole hōʻoia nā hoʻokuʻu hou i ka hoʻohālikelike hope.

Inā mākou e hoʻohālikelike i ka CDC me ke ala kuʻuna (i ka manawa e heluhelu pono ai ka palapala noi i ka ʻikepili mai ka DBMS), ʻo kāna mau mea maikaʻi nui e komo i ka hoʻokō ʻana i ka hoʻololi ʻana o ka ʻikepili e kahe ana ma ka pae lālani me ka latency haʻahaʻa, hilinaʻi kiʻekiʻe a me ka loaʻa. Loaʻa nā helu hope ʻelua ma ka hoʻohana ʻana i kahi hui Kafka ma ke ʻano he waihona no nā hanana CDC.

ʻO kekahi pōmaikaʻi ʻo ia ka hoʻohana ʻana i hoʻokahi hiʻohiʻona e mālama i nā hanana, no laila ʻaʻole hopohopo ka noi hope e pili ana i nā nuances o ka hana ʻana i nā DBMS like ʻole.

ʻO ka mea hope loa, ʻo ka hoʻohana ʻana i kahi mea hoʻolaha memo e ʻae i nā noi e nānā i nā loli i ka ʻikepili e hoʻonui i ka pae ākea. I ka manawa like, ua hoʻemi ʻia ka hopena ma ke kumu ʻikepili, no ka mea ʻaʻole i loaʻa pololei ka ʻikepili mai ka DBMS, akā mai ka pūʻulu Kafka.

E pili ana i ka hoʻolālā Debesium

ʻO ka hoʻohana ʻana iā Debezium e iho mai i kēia ʻano maʻalahi:

DBMS (ma ke ʻano he kumu ʻikepili) → mea hoʻohui i Kafka Connect → Apache Kafka → mea kūʻai

Ma ke ʻano he kiʻi, eia kahi kiʻi mai ka pūnaewele papahana:

Ke hoʻolauna nei iā Debezium - CDC no Apache Kafka

Eia nō naʻe, ʻaʻole wau makemake nui i kēia hoʻolālā, no ka mea me he mea lā hiki ke hoʻohana wale i kahi mea hoʻohui.

I ka ʻoiaʻiʻo, ʻokoʻa ke kūlana: hoʻopiha i kāu Data Lake (ka loulou hope loa ma ke kiʻikuhi ma luna) ʻAʻole kēia wale ke ala e hoʻohana ai iā Debezium. Hiki ke hoʻohana ʻia nā hanana i hoʻouna ʻia iā Apache Kafka e kāu mau noi no ka mālama ʻana i nā kūlana like ʻole. ʻo kahi laʻana:

  • ka wehe ʻana i nā ʻikepili pili ʻole mai ka cache;
  • hoʻouna ʻana i nā leka hoʻomaopopo;
  • huli i ka helu helu hou;
  • kekahi ʻano moʻolelo loiloi;
  • ...

Inā loaʻa iā ʻoe kahi palapala Java a ʻaʻohe pono / hiki ke hoʻohana i kahi pūʻulu Kafka, aia nō ka hiki ke hana ma o embedded-connector. ʻO ka pōmaikaʻi ʻike ʻia ʻo ia ka hoʻopau ʻana i ka pono o nā ʻoihana hou (ma ke ʻano o kahi mea hoʻohui a me Kafka). Eia naʻe, ua hoʻopau ʻia kēia hoʻonā ʻana mai ka mana 1.1 a ʻaʻole i manaʻo hou ʻia no ka hoʻohana ʻana (hiki ke hoʻoneʻe ʻia ke kākoʻo i nā hoʻokuʻu e hiki mai ana).

E kūkākūkā kēia ʻatikala i ka hoʻolālā i ʻōlelo ʻia e nā mea hoʻomohala, e hāʻawi ana i ka hoʻomanawanui hewa a me ka scalability.

Hoʻonohonoho hoʻohui

No ka hoʻomaka ʻana i ka nānā ʻana i nā loli i ka waiwai nui - ʻikepili - pono mākou:

  1. kumu ʻikepili, hiki iā MySQL ke hoʻomaka mai ka mana 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (piha piha);
  2. ʻO ka hui ʻo Apache Kafka;
  3. ʻO Kafka Connect laʻana (nā mana 1.x, 2.x);
  4. hoʻonohonoho ʻia ʻo Debezium connector.

E hana ma na helu mua elua, i.e. ʻO ke kaʻina hana o ka DBMS a me Apache Kafka ma waho o ke kiko o ka ʻatikala. Eia nō naʻe, no ka poʻe makemake e kau i nā mea āpau i loko o ka sandbox, ua mākaukau ka waihona waihona me nā hiʻohiʻona. docker-compose.yaml.

E noʻonoʻo hou mākou i nā mea hope ʻelua.

0. Hui Kafka

Ma ʻaneʻi a ʻoi aku ka ʻatikala, ua kūkākūkā ʻia nā hiʻohiʻona hoʻonohonoho āpau i ka pōʻaiapili o ke kiʻi Docker i hoʻolaha ʻia e nā mea hoʻomohala Debezium. Loaʻa iā ia nā faila plugin pono (nā mea hoʻohui) a hāʻawi i ka hoʻonohonoho ʻana o Kafka Connect me ka hoʻohana ʻana i nā ʻano hoʻololi kaiapuni.

Inā makemake ʻoe e hoʻohana iā Kafka Connect mai Confluent, pono ʻoe e hoʻohui kūʻokoʻa i nā plugins o nā mea hoʻohui pono i ka papa kuhikuhi i kuhikuhi ʻia ma plugin.path a i ʻole i hoʻonohonoho ʻia ma o ka hoʻololi kaiapuni CLASSPATH. Hoʻoholo ʻia nā hoʻonohonoho no ka mea hana a me nā mea hoʻohui Kafka Connect ma o nā faila hoʻonohonoho i hāʻawi ʻia ma ke ʻano he hoʻopaʻapaʻa i ke kauoha hoʻomaka o ka limahana. No nā kikoʻī hou aku, e ʻike palapala.

Hana ʻia ke kaʻina holoʻokoʻa o ka hoʻonohonoho ʻana iā Debeizum i ka mana hoʻohui i ʻelua mau pae. E nānā kākou i kēlā me kēia o lākou:

1. Hoʻonohonoho i ka hoʻonohonoho Kafka Connect

No ke kahe ʻana i ka ʻikepili i ka pūʻulu Apache Kafka, ua hoʻonohonoho ʻia nā ʻāpana kikoʻī ma ka framework Kafka Connect, e like me:

  • nā ʻāpana no ka hoʻopili ʻana i ka pūʻulu,
  • nā inoa o nā kumuhana kahi e mālama pono ʻia ai ka hoʻonohonoho o ka mea hoʻohui ponoʻī,
  • ka inoa o ka pūʻulu kahi e holo ai ka mea hoʻohui (inā hoʻohana ʻia ke ʻano mahele).

Kākoʻo ke kiʻi Docker mana o ka papahana i ka hoʻonohonoho ʻana me ka hoʻohana ʻana i nā ʻano hoʻololi kaiapuni - ʻo ia kā mākou e hoʻohana ai. No laila, hoʻoiho i ke kiʻi:

docker pull debezium/connect

ʻO ka palena liʻiliʻi o nā mea hoʻololi kaiapuni i koi ʻia e holo i ka mea hoʻohui penei:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - ka papa inoa mua o nā kikowaena puʻupuʻu Kafka e loaʻa i kahi papa inoa piha o nā lālā hui;
  • OFFSET_STORAGE_TOPIC=connector-offsets - he kumuhana no ka mālama ʻana i nā kūlana kahi o ka mea hoʻohui i kēia manawa;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - kumuhana no ka mālama ʻana i ke kūlana o ka mea hoʻohui a me kāna mau hana;
  • CONFIG_STORAGE_TOPIC=connector-config - kumuhana no ka mālama ʻana i ka ʻikepili hoʻonohonoho hoʻohui a me kāna mau hana;
  • GROUP_ID=1 - ka mea hōʻike o ka hui o nā limahana i hiki ke hoʻokō ʻia ka hana hoʻohui; pono i ka hoʻohana ʻana i puʻunaue ʻia (māhele ʻia) hoʻomalu.

Hoʻomaka mākou i ka ipu me kēia mau ʻano hoʻololi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nānā e pili ana iā Avro

Ma ka maʻamau, kākau ʻo Debezium i ka ʻikepili ma ka format JSON, kahi i ʻae ʻia no nā sandboxes a me nā helu liʻiliʻi o ka ʻikepili, akā hiki ke lilo i pilikia i nā ʻikepili i hoʻouka nui ʻia. ʻO kahi koho ʻē aʻe i kahi mea hoʻololi JSON e hoʻohana i nā memo Avro i loko o kahi ʻano binary, e hōʻemi ana i ka ukana ma ka subsystem I/O ma Apache Kafka.

No ka hoʻohana ʻana iā Avro pono ʻoe e kau i kahi kaʻawale schema-registry (no ka mālama ʻana i nā kiʻi). ʻO nā mea hoʻololi no ka mea hoʻololi e like me kēia:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

ʻO nā kikoʻī e pili ana i ka hoʻohana ʻana iā Avro a me ka hoʻonohonoho ʻana i ka hoʻopaʻa inoa no ia mea ma waho o ke kiko o kēia ʻatikala - ma mua, no ka maopopo, e hoʻohana mākou iā JSON.

2. Ka hoʻonohonoho ʻana i ka mea hoʻohui ponoʻī

I kēia manawa hiki iā ʻoe ke hele pololei i ka hoʻonohonoho ʻana o ka mea hoʻohui ponoʻī, nāna e heluhelu i ka ʻikepili mai ke kumu.

E nānā i ka laʻana o nā mea hoʻohui no nā DBMS ʻelua: PostgreSQL a me MongoDB, kahi i loaʻa iaʻu ka ʻike a aia nā ʻokoʻa (ʻoiai he liʻiliʻi, akā i kekahi mau mea nui!).

Hōʻike ʻia ka hoʻonohonoho ʻana ma JSON notation a hoʻouka ʻia i Kafka Connect me ka hoʻohana ʻana i kahi noi POST.

2.1. PostgreSQL

Laʻana hoʻonohonoho hoʻohui no PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

ʻO ke kumu o ka hana o ka mea hoʻohui ma hope o kēia hoʻonohonoho maʻalahi:

  • Ke hoʻokuʻu ʻia no ka manawa mua, pili ia i ka ʻikepili i kuhikuhi ʻia i ka hoʻonohonoho ʻana a hoʻomaka i ke ʻano kiʻi mua, e hoʻouna ana iā Kafka i ka ʻikepili mua i loaʻa me ka hoʻohana ʻana i ke kūlana SELECT * FROM table_name.
  • Ma hope o ka hoʻomaka ʻana, hele ka mea hoʻohui i ke ʻano e heluhelu i nā loli mai nā faila PostgreSQL WAL.

E pili ana i nā koho i hoʻohana ʻia:

  • name - ka inoa o ka mea hoʻohui i hoʻohana ʻia ai ka hoʻonohonoho i wehewehe ʻia ma lalo nei; i ka wā e hiki mai ana, hoʻohana ʻia kēia inoa e hana me ka mea hoʻohui (ʻo ia hoʻi, e nānā i ke kūlana / hoʻomaka hou / hoʻololi i ka hoʻonohonoho) ma o ka Kafka Connect REST API;
  • connector.class - Ka papa hoʻohui DBMS e hoʻohana ʻia e ka mea hoʻohui i hoʻonohonoho ʻia;
  • plugin.name - ka inoa o ka plugin no ka decoding logical o ka ʻikepili mai nā faila WAL. Hiki ke koho mai wal2json, decoderbuffs и pgoutput. Pono nā mea ʻelua e hoʻokomo i nā hoʻonui kūpono i ka DBMS, a pgoutput no ka PostgreSQL version 10 a kiʻekiʻe aʻe ʻaʻole ia e koi i nā manipulations hou;
  • database.* — nā koho no ka hoʻohui ʻana i ka waihona, kahi database.server.name — Hoʻohana ʻia ka inoa instance PostgreSQL e hoʻokumu i ka inoa kumuhana ma ka pūʻulu Kafka;
  • table.include.list - he papa inoa o nā papa a mākou e makemake ai e hahai i nā loli; i hoakakaia ma ke ano schema.table_name; hiki ole ke hoohana pu me table.exclude.list;
  • heartbeat.interval.ms - ka wā (i nā milliseconds) kahi e hoʻouna ai ka mea hoʻohui i nā memo puʻuwai i kahi kumuhana kūikawā;
  • heartbeat.action.query - kahi noi e hoʻokō ʻia i ka wā e hoʻouna ai i kēlā me kēia memo puʻuwai puʻuwai (ʻike ʻia ke koho ma ka mana 1.1);
  • slot.name - ka inoa o ka slot replication e hoʻohana ʻia e ka mea hoʻohui;
  • publication.name - Ka inoa nā palapala i PostgreSQL, kahi e hoʻohana ai ka mea hoʻohui. Inā ʻaʻole ia, e hoʻāʻo ʻo Debezium e hana. Inā ʻaʻole lawa nā kuleana o ka mea hoʻohana ma lalo o ka pilina no kēia hana, e hoʻopau ka mea hoʻohui me kahi hewa;
  • transforms e hoʻoholo pololei pehea e hoʻololi ai i ka inoa o ke kumuhana i manaʻo ʻia:
    • transforms.AddPrefix.type hōʻike e hoʻohana mākou i nā ʻōlelo maʻamau;
    • transforms.AddPrefix.regex - he mask e wehewehe hou i ka inoa o ke kumuhana i manaʻo ʻia;
    • transforms.AddPrefix.replacement - pololei ka mea a mākou e wehewehe hou nei.

ʻOi aku e pili ana i ka puʻuwai a me ka hoʻololi

Ma ka maʻamau, hoʻouna ka mea hoʻohui i ka ʻikepili iā Kafka no kēlā me kēia hana i hana ʻia, a ua kākau ʻia kāna LSN (Log Sequence Number) ma ke kumuhana lawelawe. offset. He aha ka hopena inā hoʻonohonoho ʻia ka mea hoʻohui e heluhelu ʻaʻole i ka ʻikepili holoʻokoʻa, akā ʻo kahi hapa wale nō o kāna mau papa (kahi i loaʻa ʻole ai nā hoʻonui ʻikepili)?

  • E heluhelu ka mea hoʻohui i nā faila WAL a ʻaʻole e ʻike i nā hana i hana ʻia i nā papa e nānā ʻia ana.
  • No laila, ʻaʻole ia e hōʻano hou i kona kūlana i kēia manawa ma ke kumuhana a i ʻole ma ka slot replication.
  • ʻO kēia ka hopena i ka paʻa ʻana o nā faila WAL ma ka disk a pau paha i waho o ka hakahaka disk.

A ʻo kēia kahi e hiki ai i nā koho ke hoʻopakele. heartbeat.interval.ms и heartbeat.action.query. ʻO ka hoʻohana ʻana i kēia mau koho ʻelua e hiki ai ke hana i kahi noi e hoʻololi i ka ʻikepili i kahi papa ʻokoʻa i kēlā me kēia manawa i hoʻouna ʻia kahi leka puʻuwai. No laila, ʻo ka LSN kahi i loaʻa ai ka mea hoʻohui i kēia manawa (ma ka slot replication) e hoʻonui mau ʻia. ʻAe kēia i ka DBMS e wehe i nā faila WAL i pono ʻole. Hiki iā ʻoe ke aʻo hou e pili ana i ka hana ʻana o nā koho palapala.

ʻO kekahi koho kūpono e nānā pono transforms. ʻOiai e pili ana i ka maʻalahi a me ka nani ...

Ma ka paʻamau, hana ʻo Debezium i nā kumuhana me ka hoʻohana ʻana i kēia kulekele inoa: serverName.schemaName.tableName. ʻAʻole kūpono paha kēia i nā manawa a pau. Nā koho transforms Hiki iā ʻoe ke hoʻohana i nā ʻōlelo maʻamau e wehewehe i kahi papa inoa o nā papa, nā hanana mai kahi e pono ai ke alakaʻi ʻia i kahi kumuhana me kahi inoa kikoʻī.

Ma kā mākou hoʻonohonoho mahalo transforms e hana ʻia kēia: e hele nā ​​hanana CDC a pau mai ka waihona i nānā ʻia i kahi kumuhana me ka inoa data.cdc.dbname. A i ʻole (me ka ʻole o kēia mau hoʻonohonoho), hana ʻo Debezium i kahi kumuhana no kēlā me kēia papa e like me: pg-dev.public.<table_name>.

Nā palena hoʻohui

No ka hoʻopau ʻana i ka wehewehe ʻana o ka hoʻonohonoho hoʻohui no PostgreSQL, pono e kamaʻilio e pili ana i kēia mau hiʻohiʻona a me nā palena o kāna hana:

  1. ʻO ka hana o ka mea hoʻohui no PostgreSQL e hilinaʻi ana i ka manaʻo o ka decoding logical. No laila ʻo ia ʻAʻole ia e hahai i nā noi e hoʻololi i ka ʻōnaehana waihona (DDL) - no laila, ʻaʻole e loaʻa kēia ʻikepili i nā kumuhana.
  2. Ma muli o ka hoʻohana ʻia ʻana o nā slot replication, hiki ke hoʻohui i kahi mea hoʻohui wale i ka laʻana DBMS alakaʻi.
  3. Inā loaʻa i ka mea hoʻohana ma lalo o ka mea hoʻohui e hoʻopili ai i ka waihona i nā kuleana heluhelu wale nō, a laila ma mua o ka hoʻomaka mua ʻana e pono ʻoe e hana lima i kahi slot replication a hoʻolaha i ka waihona.

Ke noi nei i ka hoʻonohonoho

No laila, e hoʻouka i kā mākou hoʻonohonoho i loko o ka mea hoʻohui:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Nānā mākou ua holomua ka hoʻoiho ʻana a ua hoʻomaka ka mea hoʻohui:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Nui: ua hoʻonohonoho ʻia a mākaukau e hele. I kēia manawa e hoʻohālike mākou he mea kūʻai aku a pili iā Kafka, a laila e hoʻohui mākou a hoʻololi i kahi komo i ka papaʻaina:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Ma kā mākou kumuhana e hōʻike ʻia penei:

ʻO JSON lōʻihi loa me kā mākou hoʻololi

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Ma nā hihia ʻelua, aia nā moʻolelo i ke kī (PK) o ka moʻolelo i hoʻololi ʻia, a me ke ʻano nui o nā hoʻololi: he aha ka moʻolelo ma mua a me ka mea i lilo ma hope.

  • Ma ka hihia o INSERT: waiwai ma mua (before) like null, a ma hope - ka laina i hoʻokomo ʻia.
  • Ma ka hihia o UPDATE: i loko payload.before hōʻike ʻia ke kūlana mua o ka laina, a ma payload.after - hou me ke ʻano o nā loli.

2.2 MongoDB

Ke hoʻohana nei kēia mea hoʻohui i ka mīkini hoʻopiʻi MongoDB maʻamau, e heluhelu ana i ka ʻike mai ka oplog o ka node DBMS mua.

E like me ka mea hoʻohui i wehewehe mua ʻia no PgSQL, ma ʻaneʻi hoʻi, i ka hoʻomaka mua ʻana, lawe ʻia ke kiʻi ʻikepili mua, a laila hoʻololi ka mea hoʻohui i ke ʻano heluhelu oplog.

Laʻana hoʻonohonoho:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

E like me kāu e ʻike ai, ʻaʻohe koho hou ma aneʻi i hoʻohālikelike ʻia i ka laʻana mua, akā ua hoʻemi ʻia ka helu o nā koho i kuleana no ka hoʻopili ʻana i ka waihona a me kā lākou prefixes.

Nā Papa transforms i kēia manawa hana lākou i kēia: hoʻololi lākou i ka inoa o ke kumuhana i manaʻo ʻia mai ka schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

hoʻomanawanui hewa

ʻOi aku ka nui o ka pilikia o ka hoʻomanawanui hewa a me ka loaʻa kiʻekiʻe i ko mākou manawa ma mua o ka wā - ʻoiai ke kamaʻilio nei mākou e pili ana i ka ʻikepili a me nā kālepa, a ʻaʻole kū kaʻawale ka nānā ʻana i nā loli ʻikepili i kēia pukana. E nānā kākou i ka mea hiki ke hewa ma ke kumu a me ka mea e hiki mai ana iā Debezium i kēlā me kēia hihia.

Aia ʻekolu mau koho koho:

  1. Kafka Connect hāʻule. Inā hoʻonohonoho ʻia ʻo Connect e hana ma ke ʻano puʻupuʻu, pono kēia i nā limahana he nui e hoʻonohonoho i ka group.id hoʻokahi. A laila, inā hāʻule kekahi o lākou, e hoʻomaka hou ka mea hoʻohui i kahi mea hana ʻē aʻe a hoʻomau i ka heluhelu ʻana mai ke kūlana hope loa i ke kumuhana ma Kafka.
  2. Nalo ka pilina me ka pūʻulu Kafka. E ho'ōki wale ka mea hoʻohui i ka heluhelu ʻana ma ke kūlana i hoʻouna ʻole ʻia iā Kafka, a e hoʻāʻo ʻo ia e hoʻouna hou a hiki i ka hoʻāʻo ʻana.
  3. Loaʻa ʻole ke kumu ʻikepili. E hoʻāʻo ka mea hoʻohui e hoʻopili hou i ke kumu e like me ka hoʻonohonoho ʻana. ʻO ka paʻamau he 16 hoʻāʻo e hoʻohana hoʻihoʻi exponential. Ma hope o ka 16 o ka ho'āʻo ʻole ʻana, e kaha ʻia ka hana pilikia ka a pono ʻoe e hoʻomaka hou iā ia ma o ka Kafka Connect REST interface.
    • Ma ka hihia o PostgreSQL ʻaʻole e nalowale ka ʻikepili, no ka mea ʻO ka hoʻohana ʻana i nā slot replication e pale iā ʻoe mai ka holoi ʻana i nā faila WAL i heluhelu ʻole ʻia e ka mea hoʻohui. I kēia hihia, aia kekahi ʻaoʻao i lalo o ke kālā: inā ua hoʻopau ʻia ka pilina pūnaewele ma waena o ka mea hoʻohui a me ka DBMS no ka manawa lōʻihi, aia paha ka pau ʻana o ka disk space, a hiki i kēia ke alakaʻi i kahi hemahema ka DBMS holoʻokoʻa.
    • Ma ka hihia o MySQL Hiki ke hoʻololi ʻia nā faila binlog e ka DBMS ponoʻī ma mua o ka hoʻihoʻi ʻia ʻana o ka pilina. ʻO kēia ke kumu e hele ai ka mea hoʻohui i ke kūlana hemahema, a e hoʻihoʻi i ka hana maʻamau, pono ʻoe e hoʻomaka hou i ke ʻano snapshot mua e hoʻomau i ka heluhelu ʻana mai binlogs.
    • maluna o ʻO MongoDB. Hōʻike ka palapala: ʻo ke ʻano o ka mea hoʻohui inā ua holoi ʻia nā faila log/oplog a ʻaʻole hiki i ka mea hoʻohui ke hoʻomau i ka heluhelu ʻana mai ke kūlana kahi i haʻalele ai ua like ia no nā DBMS āpau. ʻO ia hoʻi, e hele ka mea hoʻohui i ka mokuʻāina pilikia ka a pono e hoʻomaka hou i ke ʻano kiʻi mua.

      Eia nō naʻe, aia nā ʻokoʻa. Inā ua oki ʻia ka mea hoʻohui no ka manawa lōʻihi (a ʻaʻole hiki ke hōʻea i ka laʻana MongoDB), a ua hele ka oplog i ka hoʻololi ʻana i kēia manawa, a laila ke hoʻihoʻi ʻia ka pilina, e hoʻomau mālie ka mea hoʻohui e heluhelu i ka ʻikepili mai ke kūlana mua i loaʻa. ʻo ia ke kumu o kekahi o nā ʻikepili ma Kafka ole e hahau.

hopena

ʻO Debezium kaʻu ʻike mua me nā ʻōnaehana CDC a maikaʻi loa. Ua lanakila ka papahana me kāna kākoʻo no nā DBMS nui, maʻalahi o ka hoʻonohonoho, kākoʻo clustering, a me ke kaiāulu ikaika. No ka poʻe makemake i ka hoʻomaʻamaʻa, paipai wau iā ʻoe e heluhelu i nā alakaʻi no Hui Kafka и ʻO Debezium.

Hoʻohālikelike ʻia me ka mea hoʻohui JDBC no Kafka Connect, ʻo ka pono nui o Debezium ʻo ia ka heluhelu ʻana i nā loli mai nā log DBMS, e hiki ai ke loaʻa ka ʻikepili me ka liʻiliʻi liʻiliʻi. Nīnau ka JDBC Connector (mai Kafka Connect) i ka papa i nānā ʻia ma kahi manawa paʻa a (no ke kumu hoʻokahi) ʻaʻole e hoʻopuka i nā memo ke holoi ʻia ka ʻikepili (pehea ʻoe e nīnau ai i ka ʻikepili i loaʻa ʻole?).

No ka hoʻoponopono ʻana i nā pilikia like, hiki iā ʻoe ke hoʻolohe i nā hopena aʻe (e hoʻohui iā Debezium):

PS

E heluhelu pū ma kā mākou blog:

Source: www.habr.com

Pākuʻi i ka manaʻo hoʻopuka