á áľáŤáŹ ááľáĽ ᣠáĽá áá á áłá˛áľ á´áááŤá áááľááá˝ / á¨áśááľáá áááśá˝ áŤááĽááá ᣠáľá áŠá˛áŤá á°áááŞá á áááá¨áĽ á áŁá á ááł á¨áá áá¨áᢠá áá
ááŁáĽáᣠá°áĽáá¨áá á áá áá á¨á˛á˛á˛ ááá
áśá˝á á¨áááľ áłáá á˛á˘á¤áá¤áľ (PostgreSQL áĽá MongoDB) áá° áŤá áááľá°á ááá á˛áŤáľáááá á¨á
áἠáá ááá´ ááłá áá á ááľ ááá°áľ áááááľ áĽáááŤááᢠá á°á áŤá áĽáŤ ááááŤáľ á¨ááŁá áá
á¨áááá á˝áá áááá˝ á áá áĽáá°ááá á°áľá á á°ááááá˘
Debezium áĽá CDC á á á ááá áááľáá?
áá
á˛á˛á˛á á¨á°áááˇá á ááŤá¨áĽ áá áŤááťá¸áá (áá°áá áŞáŤá á¨á˛á˘á¤áá¤áľ á ááĽáł áá¨á á˛áŤááĽ) áá áĽá ááš áá á°á áááá¨áľáŁ á¨áá°á á áľá°áááááľ áĽá á°ááááľ á á¨áľá á°á¨á á¨ááἠááἠáĽá¨áľ áá°áá áá áŤá ááááᢠá¨áá¨á¨áťááš áááľ ááĽáŚá˝ á¨áŤá፠áááľá°á áĽáá° á˛á˛á˛ ááľá°áśá˝ áá¨ááťááľ á áá áá á¨á°áá áá¸áá˘
áĽáá˛áá ᣠáĽá ááš á ááľ áá á áá´á ááľá°áśá˝á ááá¨áá¸áľ áĽá á áá á¨ááá áááá¸á áŤáŤáľáłá ᣠáľáááá á¨áá¨á¨áťá áá°áá áŞáŤ á¨á°ááŤáŠ á˛á˘á¤áá¤áľá áľáááĽáŤáľ ááŠááśá˝ áá¨áá á¨áá áľáá˘
á áá¨á¨áťá á¨áááĽááľ á°ááá áá áá á ááἠáá á¨áá°á¨á áááŚá˝á á¨áá¨áłá°á á áááŹá˝áá˝á á á ááľá ááááŤáľ áá°á áá¨ááłáᢠá á°ááłáłá áá áá¨á á¨á˛á˘á¤áá¤áľ á ááĽáł áłááá á¨áŤá፠áááľá°á áľááá°áá°á á áá¨áá ááá áá áŤáá á°á˝áĽá ááááłáá˘
áľá á°á áá¨á á ááá´áá¸á
Debezium á áá áá áá°áá ááá áĽá áľ áááŁááĄ-
DBMS (áĽáá° á¨ááἠááá) â áááá á áŤá፠á ááá â Apache Kafka â á¸áá˝
áĽáá° ááłáᣠá¨ááŽáááą áľá ᨠáá˝ áá áĽááá ááá፠áĽá°áŁáááĄ-
ááá ᣠáá
áá áĽá
áľ á ááá°áá ᣠááááŤáąá á¨áá áá á˘áŤ áááá áĽáť á¨ááťá áááľááá˘
áĽáá° áĽáááą á¨áá, áááłá ââââá¨á°áᨠáá: á¨áĽááľáá á¨ááἠááá ááááľ (á¨áá áŁáá áĽáá áá á¨áá¨á¨áťá á ááá) Debezium ááá áá áĽá¸áá ááááľ á áá°áá. áá° Apache Kafka á¨á°áአááľá°áśá˝ á¨á°ááŤáŠ áááłáá˝á ááááłáľ á áá°áá áŞáŤáá˝á áá áá áá˝ááᢠáááłá:
- á ááŁáĽááľ á¨áááá áá¨á á¨áá¸ááŤá ááľáĽ ááľáááľ;
- ááłáááŤáá˝á ááá;
- á¨ááá á˘áá´ááľ ááááá˝;
- á ááłááľ ááááľ á¨áŚá˛áľ ááááŚá˝;
- ...
á¨á፠á áááŹá˝á áŤáá
áĽá á¨áŤá፠áááľá°á ááá áá áŤááľáááá
á¨ááĽáŤáľ áĽáľáá á á
áá ááŁáĽá á ááá˘áá˝ á¨á°áá¨á¨áá á¨áľá-á áá ááľá áŤáĽáŤáŤáá˘
á¨áááá áá á
á áŁá á áľááá á ááá áĽá´áľ ááľáĽ áááŚá˝á áá¨áłá°á ááááá - ááἠ- áĽá áĽáááááá
- á¨ááἠáááᣠá¨áľáŞáľ 5.7ᣠPostgreSQL 9.6+ᣠMongoDB 3.2+ á¨áááá MySQL ááá áá˝áá (
á¨á°áá ááááᢠ); - Apache Kafka ááá
- á¨áŤá፠á ááá ááłá (áľáŞáśá˝ 1.x, 2.x);
- á¨á°ááᨠDebezium á áŤáŤáĽ.
á ááááŞáŤááš áááľ ááĽáŚá˝ áá ááľáŠ, áááľá. DBMS áĽá Apache Kafka á¨ááŤá áá°áľ á¨á˝áá áá°á á áá áá¸áᢠááá áá, áááá ááá á á á¸á áłáĽá ááľáĽ ááá°ááŤáľ ááááá, á áŚáá´ááá áá¨ááť ááľáĽ ááłááá˝á á¨áŤá ááá á¨áá á á.
á áá¨á¨áťááš áááľ ááĽáŚá˝ áá á áááá áĽáá°áŠáŤááá˘
0. á¨áŤá፠áááááľ
áĽáá áĽá á áá á á áááš ááľáĽ ááá á¨ááááŞáŤ ááłááá˝ á á˛á¤áá¨á ááá˘áá˝ á á°á°áŤá¨á á¨áśá¨á ááľá á ááľ ááľáĽ á°ááľá°ááᢠáááá á áľááá á°á°áŞ áááá˝ (áááááá˝) áááá áĽá á¨á áŤáŁá˘ á°ááááŽá˝á á áá áá á¨áŤá፠á ááá áá á áŤáááŁáá˘
ᨠKafka Connect from Confluent ááá áá áŤá°áĄ á áľááá á¨áááľá áááááá˝ áááá áĽáŤáľá á á°ááá¸á áá፠áá áá¨á áŤáľáááááłá plugin.path
ááá á á áŤáŁá˘ á°áááá á áŠá á°ááá
áˇá CLASSPATH
. á¨áŤá፠á ááá á°áŤá°á áĽá áááááá˝ á
áá
áśá˝ á¨áááášáľ áá áŤá°áá á
áá áľáĽáá áĽáá° áááá á áá°ááá á¨áá
á¨áľ áááá˝ ááᢠááááአáááá¨áą
á á ááá áĽáŞáľ ááľáĽ Debeizum á á¨áááááľ á á ááá áá°áľ á áááľ á°á¨ááá˝ áá¨ááááᢠáĽáŤááłááłá¸áá áĽáááá¨áłá¸ááĄ-
1. á¨áŤá፠áááá ááĽáá áááááľ
áááĽá áá° Apache Kafka áááľá°á ááá°áŤá¨áľ á¨á°áá°á áááŞáŤáá˝ á áŤá፠áááá áááá ááľáĽ á°ááá ááᣠáááłááĄ-
- á¨áááľá°á áááááľ á ááĽáŽá˝ áŁ
- á¨ááááá áá á áŤáą á¨áá¨áá˝áŁá¸á á¨áááśá˝ áľáá˝ áŁ
- ááááá á¨áá°áŤá áľ á¨áĄáľá áľá (á¨á°á¨ááá áááłá á áá ááá áľ áá).
á¨ááŽáááą áŚáá´áá Docker ááľá á¨á áŤáŁá˘ á°ááááŽá˝á á áá áá áá á¨áľá áá°ááá - áĽá á¨ááá ááá áá áá ááᢠáľááá ááľáá áĽááááľáĄ-
docker pull debezium/connect
áááááá ááááľ á¨ááŤáľáááá áá á°áá á¨á áŤáŁá˘ á°ááááŽá˝ áľáĽáľáĽ áĽáá°áá¨á°áá ááá˘
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
- á¨á°áá á¨áááľá°á á áŁááľá áááá áááááľ á¨áŤá፠áááľá°á á ááááŽá˝ á¨ááááŞáŤ áááá; -
OFFSET_STORAGE_TOPIC=connector-offsets
- ááááá á á áá áá á¨áááá áľá áŚáłáá˝á ááá¨áá¸áľ áááľ; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
- á¨áááááľ áááłá áĽá á°ááŁáŽášá ááá¨áá¸áľ áááľ; -
CONFIG_STORAGE_TOPIC=connector-config
- á¨á ááá áá á áááĽá áĽá á°ááŁáŽášá ááá¨áá¸áľ áááľ; -
GROUP_ID=1
- á¨ááááá á°ááŁá á¨áá¨áááá áľ á¨á°áŤá°áá˝ áĄáľá áááŤ; á˛á°áŤá áŤáľáááá (á¨á°á¨ááá) á áááá˘
ááŤáŁáá á áá¨á°ááľ á°ááááŽá˝ áĽááááŤáá-
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
áľá á áᎠááľáłááť
á ááŁáŞáŁ Debezium áááĽá á JSON á
áá¸áľ áá˝ááᣠáá
á ááá áŞáŤ áłáĽáá˝ áĽá á ááľá°á áá á áŤáá ááἠá°ááŁáááľ áŤááᣠááá áá á áŁá á á°áŤá á¨ááἠááłáá˝ ááľáĽ á˝áá ááá áá˝ááᢠá¨JSON ááá¨áŞáŤ áá á ááŤá áááĽááśá˝á á áá áá á°á¨áłáłá ááľá¨á ááá˘
Avroá ááá áá á¨á°áᨠáá°ááŤáľ áŤáľáááááłá
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
Avroá áľááá áá áĽá ááĽáą áááἠáľááááááľ ááááŽá˝ á¨á˝áá áá°á á áá áá¸á - á¨á áá ááá áááľá¨á JSON á áĽáá ááááá˘
2. áááááá á áŤáą áááááľ
á áá á ááĽáł áá° ááááá áŤáą áá á áááľ áá˝áá, áá á á¨ááአáá áá¨áá áŤááŁá.
ááááľ á˛á˘á¤áá¤áľ á¨áááááá˝ ááłá áĽáááá¨áľáĄ PostgreSQL áĽá MongoDBᣠááá á áááľ á áá áĽá áááąá ááŠááśá˝ á á (ááá áĽááłá áľáá˝ á˘ááá á á ááłááľ áááłáá˝ áá á áľááá áá!)á˘
áá አá JSON ááľáłááť ááľáĽ á°áááżá áĽá á¨POST áĽáŤáá á áá áá áá° Kafka Connect á°á°á ááá˘
2.1. PostgreSQL
á¨PostgreSQL ááłá á áŤáŤáĽ áá ááĄ
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
á¨áá áá á á áá á¨áááááľ ááá á áŁá ááá áá-
- á ááááŞáŤá á
áá áá, á ááááŞáŤá ááľáĽ á¨á°á áá°á á¨ááἠááł áá ááááá áĽá á áááł ááááŤá á¨ááááŞáŤ á
á˝á áłá áĽááłá¨á
áľá áááłá ââáá á¨á°áá ááá á¨ááááŞáŤ á¨ááἠáľáĽáľáĽ ááŤá፠á ááá áá
SELECT * FROM table_name
. - á áá á¨á°á ááá á áá áááá ᨠPostgreSQL WAL áááá˝ áááŚá˝á á¨ááá ἠáá´á áŤáľááŁáá˘
áľáá°á áááŁá¸á á ááŤáŽá˝áĄ-
-
name
- á¨áá á áłá˝ á¨á°ááá¸á áá á áĽá á áá á¨áááá áľ á¨áááá áľá; ááá°ááą áá áľá á¨áááá áá á áĽáŽ áááľáŤáľ áĽá á áá áááá (áááľá áááłáá áááá¨áą / áĽáá°áá áŤáľááአ/ á ááááŠá áŤááá) á Kafka Connect REST API; -
connector.class
- á á°ááá¨á áááá áĽá á áá á¨áááá ᨠDBMS áááá ááá; -
plugin.name
á¨WAL áááá˝ áá¨áá áááŤá á˛áŽá˛áá áááľá¨á á¨á°á°áŞá áľá ááᢠáááá¨áĽ ááááá˘wal2json
,decoderbuffs
иpgoutput
. á¨ááááŞáŤááš áááą á á˛á˘á¤áá¤áľ ááľáĽ á°áá˘áá á áĽáŤ ááŤá áŤáľáááá¸áá, áĽápgoutput
á PostgreSQL áľáŞáľ 10 áĽá á¨á፠á áá á°á¨á᪠ááá áá áŽá˝á á áŤáľáááá; -
database.*
- á¨áá¨á ááą áá á¨áááááľ á ááŤáŽá˝ ᣠá¨áľdatabase.server.name
- á áŤá፠áááľá°á ááľáĽ á¨áááąá áľá ááááľá¨áľ áĽá á áá á¨ááá ᨠPostgreSQL ááłá áľá; -
table.include.list
- áááŚá˝á ááá¨áłá°á á¨ááááááŁá¸á á¨á á¨á´ááá˝ áááá; á á áá¸áľ á°á°áĽáˇáschema.table_name
; áá á áĽáŽ áá áá á ááťáátable.exclude.list
; -
heartbeat.interval.ms
- ááááá á¨áἠááľ áááááśá˝á áá° áአáááľ á¨áááá áľ ááá°áľ (á ááá°á¨ááśá˝); -
heartbeat.action.query
- áĽáŤááłááąá á¨áἠááľ áááĽááľ á˛áá á¨áá°áá á áĽáŤá (á ááŤáŠ á¨áľáŞáľ 1.1 ááᎠáłááˇá); -
slot.name
- á áááá áĽá á áá á¨áááá á¨ááŁááľ ááľáá˘áŤ áľá; publication.name
- áľáá˝ááá˝ ááááá á áá ááá PostgreSQL ááľáĽá˘ á¨áá á°á áá¨á áááá á ááááŤáᢠáááááą á¨á°á°á¨áá áľ á°á áá ááá áľáááľ á á ááĽáśá˝ á¨áááľ, ááááá á áľá á°áľ áááŁá;-
transforms
á¨áłáááá áááľ áľá á áľááá áĽáá´áľ ááá¨á áĽáá°ááťá áááľáááĄ--
transforms.AddPrefix.type
áá°á á ááááŤáá˝á áĽáá°ááá áá áá ááá; -
transforms.AddPrefix.regex
- á¨áłááá áááľ áľá áĽáá°áá á¨áááá˝á áľ áááĽá; -
transforms.AddPrefix.replacement
- á ááĽáł á¨ááááá¸á.
-
áľá á¨áἠááľ áĽá áááŚá˝ á°á¨ááŞ
á ááŁáŞááľ ááááá ááĽáŤááłááą á¨á°áá¸á ááĽááľ ááἠáá° áŤá áááŤá áĽá LSN (Log Sequence Number) á á áááááľ áááľ áá áá˝áá offset
. áá ááááá áááá áłáłá¤á áłááá áĽáá˛áŤáἠá¨á°ááᨠáá áááá á¨á áá á¨áŚáš ááá áĽáť (á á¨áľáá ááᥠáĽáá áŤááááá)?
- ááááá á¨WAL áááá˝á áŤááŁá áĽá á ááľáŁá¸á áŤááá ááĽááľ áá° áá¨áłá°áá¸á á á¨á´ááá˝ á áŤáááá˘
- áľááá ᣠá áááąá áá á ááŁááľ ááľáá˘áŤ ááľáĽ á áá áŤááá áŚáł á áŤááááá˘
- áá á°áá á¨WAL áááá˝ á á˛áľá áá "áĽáá˛áŁá á" áĽá á¨á˛áľá áŚáł ááŤáá áŁá¸á áá˝ááá˘
áĽá áĽáá
á ááŤáŽá˝ áááłá áááŁá. heartbeat.interval.ms
и heartbeat.action.query
. áĽááá
á á ááŤáŽá˝ á áĽááľ áá áá á¨áἠááľ áááĽááľ á á°áᨠááĽá á á°áᨠá áá á¨áĽ ááľáĽ áŤááá áá¨á á¨ááá¨á áĽáŤáá áááľáá¸á áŤáľá˝ááᢠáľááá
, ááááá á á áá áá á¨áááá áľ LSN (á ááŁááľ ááľáá˘áŤ ááľáĽ) áŤáááá¨áĽ ááťáťáá. áá
DBMS á¨á áá á áá á¨ááŤáľááááľá á¨WAL áááá˝ áĽáá˛áŤáľáááľ áŤáľá˝áááᢠá ááŤáŽá˝ áĽáá´áľ áĽáá°áá አá¨á áá áá¨á áááááľ áááá¨áą
á¨á áá áľáŠá¨áľ áá°á á á¨áááŁá ááá á ááŤá áá transforms
. ááá áĽááłá áľá áážáľ áĽá áá áľ á¨á áá á˘ááá ...
á ááŁáŞáŁ Debezium á¨áá¨á°ááá á¨áľá ááᲠá áá áá áááśá˝á áááĽáŤááĄ- serverName.schemaName.tableName
. áá
áááá ááš áááá áá˝áá. á ááŤáŽá˝ transforms
áá°á á á áááážá˝á á áá áá ááľá°áśáťá¸á á¨á°áá°á áľá ááłáá áááľ áááŤáľ áŤááŁá¸áá á¨á°áá á¨áŚá˝ áááá áááá˝ áá˝ááá˘
á áĽá áá
á áĽááá°áááá transforms
á¨áá¨á°ááľ áá¨ááááᥠááá á¨á°á¨áłá°áá áłáłá¤á á¨ááᥠá¨á˛á˛á˛ ááľá°áśá˝ á áľá áá° áááľ áááłá data.cdc.dbname
. áŤáá áá፠(áŤá áĽááá
á
ááĽáŽá˝) Debezium á ááŁáŞááľ ááĽáŤááłááą á¨á
áš á áá á¨áĽ áááľ áááĽáŤáᥠpg-dev.public.<table_name>
.
á¨áááááľ áá°áŚá˝
á PostgreSQL á áŤáŤáĽ áá á ááá፠áá¨á¨áť áá áľá áĽáŤá áŁá áŞáá˝ / áá°áŚá˝ áááŤáľ á áá áá-
- ᨠPostgreSQL á áŤáŤáĽ á°ááŁá á ááá á˛áŽá˛áá á˝áá°-ááłáĽ áá á¨á°áá á¨á° ááᢠáľááá á áĽáą á¨ááἠááłáá ááá á ááááἠáĽáŤááá˝á á áá¨áłá°áá (á˛á¤áá¤á) - á áá áá á¨áľ, áá ááἠá áááśá˝ ááľáĽ á áááá.
- á¨ááŁááľ ááá°áśá˝ áĽá á áá áľáááá, á¨ááááá áááááľ ááťáá áĽáť áá° áá DBMS ááłáá˘
- ááááá á¨áá¨á ááą áá á¨ááááá áľ á°á áá á¨ááá ἠáĽáť ááĽáśá˝ áŤááľáŁ á¨ááááŞáŤá á áá á ááľ áĽáŤáľá á¨ááŁááľ ááľáá˘áŤ ááá á áĽá áá° áłáłá¤á áá°á áŤáľáááááłáá˘
áá á¨áľá á áá°áá á áá
áľááá á¨áĽáá áá á¨áľ áá° áááá ááľáĽ áĽááŤááĄ-
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
ááá¨áą á¨á°áłáŤ áááá áĽá ááááá ááááŠá áĽáá¨áááŁááá˘
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
á áŁá áĽáŠáĄ á°ááá áś ááááľ ááá ááᢠá áá á¸áá˝ ááľáá á¨áŤá፠áá áĽáááá á¨á á áá á á°áá á¨áĄ áá áá¤áľ á¨áá¨á áĽááááŁááá˘
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
á áĽá áááľ ááľáĽ, áá áĽáá°áá¨á°áá ááłáŤá.
á áŁá á¨á á JSON á¨áááŚáťá˝á áá
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
á áááąá áááłáá˝ ááááŚáš á¨á°áá¨á¨áá á¨áááᥠááá (ááŹ) áĽá á¨áááŚášá áááľ áŤáŤáľáłáᥠáááᥠá¨áá á ááľ áá áĽáá°áá ᨠáĽá á áá áá áĽáá° ááá˘
- á áá
á¨ááľ
INSERT
áá á ááľbefore
) áĽáŠá ááá˘null
á¨ááŁá ááĽá¨ááá á°á¨áľá. - á áá
á¨ááľ
UPDATE
: ápayload.before
á¨á¨áľá ááłáá áááł ááłáŤá áĽá á ááľáĽpayload.after
- á¨ááἠáááľ áá á á˛áľá˘
2.2 MongoDB
áá áááá á¨á˛á˘á¤áá¤áľ á¨ááááŞáŤ á°á¨á ááľááá ááááľ áŚááá áá áá¨áá á ááá ἠáá°á ááá MongoDB á¨ááŁááľ áá´á áá áááá˘
á á°ááłáłá á PgSQL áá°á á˛á á¨á°ááá¸á áááá ááᣠáĽáá á áĽáá˛áᣠá ááááŞáŤá á áá áá ááá á¨ááἠá á˝á áłá áá˝ áĽááł ááá°áłáᣠá¨á፠á áá ááááá áá° áŚááá ááŁáĽ áááł ááá¨áŤáá˘
á¨áááá ááłááĄ-
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
áĽáá°áááá¨áąáľ ᣠá¨ááłáá ááłá áá á˛áááአááá á á˛áľ á ááŤáŽá˝ á¨áá ᣠáá á¨áá¨á ááą áá ááááááľ áááááľ áŤáá¸á á¨á ááŤáŽá˝ áĽááľ áĽá á áľá á áĽáŤááťá¸á ááá°ááá˘
á
ááĽáŽá˝ transforms
á áá
áá á¨áá¨á°ááľá áŤá°ááá: á¨áłáááá áááľ áľá á¨áááááĽáŠ áá áŤááŠáľ <server_name>.<db_name>.<collection_name>
в data.cdc.mongo_<db_name>
.
áľá á°áľá ááłááľ
á áááá˝á á¨áľá á°áľ ááťáťá áĽá á¨áá°á á á ááŚáľ ááłá á¨ááźáá áá á á áá á áłáłá˘ áá - á á°ááá áľá áá¨á áĽá ááĽááśá˝ áľáá፠áĽá á¨ááἠáááĽá áá¨áłá°á á áá ááłá áá á¨áá á áá°áá ᢠá ááá á°á¨á áľá á°áľ á¨ááááá áĽá á áĽáŤááłááą ááłá áá á°á áá¨á áá áĽáá°ááá áĽááá˘
áśáľáľ á¨ááጠáááŁáľ á ááŤáŽá˝ á ááĄ-
- á¨áŤá፠áááááľ á áááłáŤáľá˘. Connect á á°á¨ááá áááł áĽáá˛á°áŤ á¨á°ááᨠáá á°ááłáłá group.id ááááááľ áĽá á°áŤá°áá˝á áááááᢠá¨ááŤá á¨ááŤá¨áá¸á á ááą áŤáá°áłáŤ, ááááá á áááá á°áŤá°á áá áĽáá°áá ááááŤá áĽá á áŤá፠ááľáĽ áŁáá áááľ ááľáĽ á¨áá¨á¨áťá ááá á á áá ááá ἠáááĽáá.
- á¨áŤá፠áááľá°á áá áŤáá áááááľ ááŁáľ. ááááá á ááá áá° áŤá፠ááá áŤááťááá áŚáł ááá ἠáŤááá áĽá áá¨áŤá áĽáľáŞáłáŤ áľá¨áľ á á¨ááá áĽáá°áá áááá ááááŤáá˘
- á¨ááἠááá á áááá. ááááá áĽáá° á áááአáá°á¨áľ á¨ááአáá áĽáá°áá ááááááľ ááááŤáᢠááŁáŞá 16 áá¨áŤáá˝á áá áá ááá˘
ááá ááᣠ. ᨠ16 áá áŤáá°áłáŤ áá¨áŤ á áá, á°ááŁáŠ áĽáá° ááááľ áá°á¨áá áłá á áá°áłáŤá áĽá á áŤá፠á ááá REST á áááá˝ á áŠá áĽáŤáľá áĽáá°áá áááá áŤáľááááá.- á áá á¨ááľ PostgreSQL ááἠá áá áá, ááááŤáąá á¨ááŁááľ ááá°áśá˝á áá áá á áŽááá°áŠ áŤáá°áá ᥠá¨WAL áááá˝ áĽááłáá°á¨á áá¨áá¨ááᢠá áá áááł, á ááłá ááá˝ á á-á áááá áĽá á á˛á˘á¤áá¤áľ ááŤá¨á áŤáá á¨á ááłá¨ áá¨áĽ áááááľ áá¨á á áá á¨á°áľá°ááá, á¨á˛áľá áŚáłá ááŤáá á¨áá˝áá áľ áĽáľá á á, áĽá áá áá° á á ááá á˛á˘á¤áá¤áľ ááľááľ ááŤá፠áá˝áá.
- á áá á¨ááľ MySQL á¨á˘ááá áááá˝ áááááą áá°áá á¨á áľ á¨ááááą á ááľ á áŤáą á DBMS áá˝á¨á¨á¨á áá˝ááᢠáá ááááá áá° á áá°áłáŤááľ áááł áĽáá˛áᣠáŤá°ááááᣠáĽá áá°á á áľáŤáá áá°áá á¨á áľ áááááľ á¨á˘ááá ááá ἠáááá á á áááť á á˝á áłá áá˝ áĽááł ááľáĽ áĽáá°áá áááá á áá áľá˘
- áá MongODB. á°ááą áĽáá˛á
áááᥠá¨áá/oplog áááá˝ á¨á°á°á¨á áĽá ááááá áŤááá áľ áŚáł ááá ἠááá á áŤááťá á¨ááááá áŁá
᪠áááá DBMS á°ááłáłá ááᢠááááá áá° áľá´áą ááľáĽ áľáááᣠáá á áá°áłáŤá áĽá á áááł ááľáĽ áĽáá°áá ááľááá áŤáľááááá á¨ááááŞáŤ á
á˝á áłá áĽááł.
ááá áá, áአáááłáá˝ á á. ááááá áá¨á á áá á á°áá¨á áááł ááľáĽ á¨áá ᨠ(ááá MongoDB ááłá ááľá¨áľ áŤááťá) áĽá áŚááá á áá áá ááŻáᣠá¨á፠áááááą áá°áá á¨á áľ á˛áááľ ááááá á á¨á ááááľ á¨ááááŞáŤá á¨ááá áŚáł áá¨áá ááá ἠáááĽááᢠ, ááá á áá á áŤá፠ááľáĽ áŤá á ááłááľ áá¨ááá˝ á áá°áá áááłá á˘
áá°áá°ááŤ
á´á¤áá¨á á á˛á˛á˛ á˛áľá°áá˝ á¨ááááŞáŤ ááá´ áá áĽá á á á ááá á áŁá á áááłá áá áᢠááŽáááą á¨áááá á˛á˘á¤áá¤áľ áľááᣠá¨ááá
á áááááľáŁ ááááľá°á áľáá áĽá á¨áá áá
á á¨á°áĽ áľáá á áľáááᢠá á°ááŁá áá áááľ, áááŞáŤáášá áĽáá˛áŤáᥠáĽáááŤá˝ááá
ááŤá፠áŽááá°á á¨JDBC á áŤáŤáĽ áá á˛ááłá°á á¨á°áĽáá¨á ááá áĽá á á¨á˛á˘á¤áá¤áľ áááἠááľáłááťáá˝ áá áááŚá˝ ááá ᥠááᣠáá á áá¨á á áľááš áááá¨áľ áĽáá˛áá á áŤáľá˝ááᢠá¨JDBC áŽááá°á (á áŤá፠áŽááá°á á¨áá¨á ) á¨á°á¨áłá°ááá á áá á¨áĽ á á°áá°á á¨áá ááá°áľ áá ááá áĽá (á á°ááłáłá ááááŤáľ) ááᥠá áá°á¨áá áľ áá áááááśá˝á á áŤáááá (áĽá፠á¨ááá áľá ááἠáĽáá´áľ áá á¨á áá˝áá?)á˘
á°ááłáłá á˝ááŽá˝á ááááłáľ ááá¨á°ááľ áááľááá˝ áľáŠá¨áľ ááľá áľ áá˝áá (á¨á°áĽáá¨á á á°á¨ááŞ)
-
JDBC á áŤáŤáĽ Kafka á ááá - áĽááľ MySQL-áĽáť áááľááá˝:
-
Oracle ááááá á á , áá áá áá˝á á¨á°áᨠ"á¨ááĽá°áľ ááľáĽ" áá.
PS
á áĽááá˝á áá áŤááĽáĄáĄ-
- ÂŤ
á áŠá áááľáľ ááľáĽ ááŤá፠áááľá°á á°áá˘áá áá á áááľá "; - ÂŤ
á¨SRE á¨áááľ á°áááľ ááááłá˝á á°ááŁáŤá áłáŞáŽá˝á˘ ááá 2 "; - ÂŤ
ᨠPostgreSQL ááááŤáá˝ ááŠá áááľáľáŁ áááŤááťá˝á áĽá áááśáťá˝á á áá ááá፠.
ááá: hab.com