ืืขืืืืชื, ืื ื ื ืชืงื ืืขืชืื ืงืจืืืืช ืืคืชืจืื ืืช ืืื ืืื / ืืืฆืจื ืชืืื ื ืืืฉืื, ืฉืืืืข ืขืืืื ืื ืื ืืืื ืืจื ื ืืืืจ ืืจืืกืืช. ืืืืืจ ืื, ืื ืกื ืืืื ืคืขืจ ืืื ืืื ืขื ืืืืื ืืืชืจืืื ืืืืจืื ืฉืื, ืืืฉืจ ืืืืชื ืฆืจืื ืืืืืืจ ืฉืืืืช ืืืจืืขื CDC ืืฉื ื DBMSs ืคืืคืืืจืืื (PostgreSQL ื-MongoDB) ืืืฉืืื ืงืคืงื ืืืืฆืขืืช Debezium. ืื ื ืืงืืื ืฉืืืืจ ืกืงืืจื ืื, ืฉืืืคืืข ืืชืืฆืื ืืืขืืืื ืฉื ืขืฉืชื, ืืืื ืฉืืืืฉื ืืืืจืื.
ืื ืื Debezium ื-CDC ืืืืคื ืืืื?
ืื
ืื ื ืฉืืื ืืช CDC ืขื ืืืืฉื ืืืกืืจืชืืช (ืืฉืืืคืืืงืฆืื ืงืืจืืช ื ืชืื ืื ืื-DBMS ืืฉืืจืืช), ืื ืืืชืจืื ืืช ืืขืืงืจืืื ืฉืื ืืืืืื ืืืฉืื ืืืจืืช ืฉืื ืืื ื ืชืื ืื ืืจืืช ืืฉืืจื ืขื ืืืืื ื ืืื, ืืืื ืืช ืืืืื ืืช ืืืืืืช. ืฉืชื ืื ืงืืืืช ืืืืจืื ืืช ืืืฉืืืช ืขื ืืื ืฉืืืืฉ ืืืฉืืื ืงืคืงื ืืืืืจ ืืืืจืืขื CDC.
ืืื ืื, ืืืชืจืื ืืช ืืืืืื ืืช ืืขืืืื ืฉืืืื ืืืื ืืฉืืฉ ืืืืกืื ืืืจืืขืื, ืื ืฉืืืคืืืงืฆืื ืืกืืคืืช ืื ืฆืจืืื ืืืืื ืื ืืืื ืกืื ืฉื ืืคืขืืช DBMS ืฉืื ืื.
ืืืกืืฃ, ืฉืืืืฉ ืืืชืืื ืืืืขืืช ืคืืชื ืืืื ืืฉืื ืื ืงื ื ืืืื ืืืคืงื ืฉื ืืืฉืืืื ืืขืืงืืื ืืืจ ืฉืื ืืืื ืื ืชืื ืื. ืืื ืขื ืืืช, ืืืฉืคืขื ืขื ืืงืืจ ืื ืชืื ืื ืืฆืืืฆืืช, ืืืืจ ืฉืื ืชืื ืื ืืชืงืืืื ืื ืืฉืืจืืช ืื-DBMS, ืืื ืืืฉืืื ืงืคืงื.
ืขื ืืจืืืืงืืืจืช ืืืืืื
ืืฉืืืืฉ ื-Debezium ืืกืชืื ืืชืื ืืช ืืคืฉืืื ืืื:
DBMS (ืืืงืืจ ื ืชืื ืื) โ ืืืืจ ื-Kafka Connect โ Apache Kafka โ ืฆืจืื
ืืืืืฉื, ืืชื ืชืจืฉืื ืืืชืจ ืืคืจืืืงื:
ืขื ืืืช, ืื ื ืื ืืืฉ ืืืื ืืช ืืชืืื ืืช ืืื, ืื ื ืจืื ืฉืจืง ืืืืจ ืืืืจ ืืคืฉืจื.
ืืืฆืืืืช, ืืืฆื ืฉืื ื: ืืืืื Data Lake ืฉืื (ืงืืฉืืจ ืืืจืื ืืชืจืฉืื ืืืขืื) ืื ืื ืืืจื ืืืืืื ืืืฉืชืืฉ ื-Debezium. ืืืจืืขืื ืฉื ืฉืืืื ื- Apache Kafka ืืืืืื ืืฉืืฉ ืืช ืืืคืืืงืฆืืืช ืฉืื ืืื ืืืชืืืื ืขื ืืฆืืื ืฉืื ืื. ืืืืืื:
- ืืกืจื ืฉื ื ืชืื ืื ืื ืจืืืื ืืืื ืืืืืืื;
- ืฉืืืืช ืืืืขืืช;
- ืขืืืื ื ืืื ืืงืก ืืืคืืฉ;
- ืืืืฉืื ืืืื ื ืืืงืืจืช;
- ...
ืืืืื ืืืฉ ืืื ืืคืืืงืฆืืืช ื'ืืืื ืืืื ืฆืืจื/ืืคืฉืจืืช ืืืฉืชืืฉ ืืืฉืืื ืงืคืงื, ืืฉื ื ืื ืืคืฉืจืืช ืืขืืื ืืจื
ืืืืจ ืื ืืืื ืืืจืืืืงืืืจื ืืืืืืฆืช ืขื ืืื ืืคืชืืื, ืืืกืคืงืช ืกืืืื ืืช ืชืงืืืช ืืืืจืืืืช.
ืชืฆืืจืช ืืืืจ
ืขื ืื ืช ืืืชืืื ืืขืงืื ืืืจ ืฉืื ืืืื ืืขืจื ืืืฉืื ืืืืชืจ - ื ืชืื ืื - ืื ื ืฆืจืืืื:
- ืืงืืจ ื ืชืื ืื, ืฉืืืื ืืืืืช MySQL ืืื ืืืจืกื 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (
ืจืฉืืื ืืืื ); - ืืฉืืื ืืคืืฆ'ื ืงืคืงื
- ืืืคืข Kafka Connect (ืืจืกืืืช 1.x, 2.x);
- ืืืืจ Debezium ืืืืืจ.
ืขืืื ืขื ืฉืชื ืื ืงืืืืช ืืจืืฉืื ืืช, ืืืืืจ. ืชืืืื ืืชืงื ืช DBMS ื- Apache Kafka ืื ืืขืืจ ืืืืงืฃ ืืืืืจ. ืขื ืืืช, ืืื ืฉืจืืฆื ืืคืจืืก ืืื ืืืจืื ืืื, ืืฉ ืืื ืืืื ืืืืืจ ืืจืฉืื ืขื ืืืืืืืช
ื ืชืืงื ืืฉืชื ืื ืงืืืืช ืืืืจืื ืืช ืืืชืจ ืคืืจืื.
0. ืงืคืงื ืงืื ืงื
ืืื ืืืืืฉื ืืืืืจ, ืื ืืืืืืืช ืืชืฆืืจื ื ืืงืืืช ืืืฉืืื ืืืงืฉืจ ืฉื ืชืืื ืช Docker ืืืืคืฆืช ืขื ืืื ืืคืชืื Debezium. ืืื ืืืื ืืช ืื ืงืืฆื ืืคืืืืื ืืืจืืฉืื (ืืืืจืื) ืืืกืคืง ืงืคืงื ืงืื ืงื ืชืฆืืจื ืืืืฆืขืืช ืืฉืชื ื ืกืืืื.
ืื ืืชื ืืชืืืื ืืืฉืชืืฉ ื-Kafka Connect ื-Confluent, ืชืฆืืจื ืืืืกืืฃ ืืช ืืชืืกืคืื ืฉื ืืืืืจืื ืืืจืืฉืื ืืขืฆืื ืืกืคืจืืื ืืืฆืืื ืช ื- plugin.path
ืื ืืืืืืจ ืืืืฆืขืืช ืืฉืชื ื ืกืืืื CLASSPATH
. ืืืืืจืืช ืฉื ืืขืืื ืืืืืืจืื ืฉื Kafka Connect ืืืืืจืืช ืืืืฆืขืืช ืงืืฆื ืชืฆืืจื ืืืืขืืจืื ืืืจืืืื ืืื ืืคืงืืืช ืืชืื ืฉื ืืขืืื. ืืคืจืืื ืจืื
ืื ืชืืืื ืืงืืช Debeizum ืืืจืกืช ืืืืืจ ืืชืืฆืข ืืฉื ื ืฉืืืื. ืืื ื ืฉืงืื ืื ืืื ืืื:
1. ืืงืืช ืืืกืืจืช ืฉื Kafka Connect
ืืื ืืืืจืื ื ืชืื ืื ืืืฉืืื Apache Kafka, ืคืจืืืจืื ืกืคืฆืืคืืื ืืืืืจืื ืืืกืืจืช Kafka Connect, ืืืื:
- ืืืืจืืช ืืืืืจ ืืฉืืื,
- ืฉืืืช ืฉื ื ืืฉืืื ืืื ืชืืฉืืจ ืชืฆืืจืช ืืืืืจ ืขืฆืื,
- ืฉื ืืงืืืฆื ืฉืื ืคืืขื ืืืืืจ (ืืืงืจื ืฉื ืฉืืืืฉ ืืืฆื ืืืืืจ).
ืชืืื ืช Docker ืืจืฉืืืช ืฉื ืืคืจืืืงื ืชืืืืช ืืชืฆืืจื ืืืืฆืขืืช ืืฉืชื ื ืกืืืื - ืื ืื ืฉื ืฉืชืืฉ. ืื ืืื ื ืืจืื ืืช ืืชืืื ื:
docker pull debezium/connect
ืงืืืฆืช ืืฉืชื ื ืืกืืืื ืืืื ืืืืืช ืื ืืจืฉืช ืืืคืขืืช ืืืืืจ ืืื ืืืืงืื:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
- ืจืฉืืื ืจืืฉืื ืืช ืฉื ืฉืจืชื ืืฉืืื ืงืคืงื ืืื ืืงืื ืจืฉืืื ืืืื ืฉื ืืืจื ืืฉืืื; -
OFFSET_STORAGE_TOPIC=connector-offsets
- ื ืืฉื ืืืืกืื ืขืืืืช ืฉืืื ืืืืืจ ื ืืฆื ืืขืช; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
- ื ืืฉื ืืืืกืื ืืฆื ืืืืืจ ืืืฉืืืืชืื; -
CONFIG_STORAGE_TOPIC=connector-config
- ื ืืฉื ืืืืกืื ื ืชืื ื ืชืฆืืจืช ืืืืจืื ืืืฉืืืืชืื; -
GROUP_ID=1
- ืืืื ืฉื ืงืืืฆืช ืืขืืืืื ืฉืขืืื ื ืืชื ืืืฆืข ืืช ืืฉืืืช ืืืืืจ; ื ืืจืฉ ืืขืช ืฉืืืืฉ ืืืืืจ (ืืืคืฅ) ืึดืฉืืึธืจ.
ืื ื ืืชืืืืื ืืช ืืืืื ืขื ืืืฉืชื ืื ืืืืื:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
ืืขืจื ืืืื ืืืจื
ืืืจืืจืช ืืืื, Debezium ืืืชื ื ืชืื ืื ืืคืืจืื JSON, ืฉืืงืืื ืขืืืจ ืืจืืื ืืื ืืืืืืืช ืงืื ืืช ืฉื ื ืชืื ืื, ืื ืืืื ืืืืืช ืืขืื ืืืกืื ื ืชืื ืื ืขืืืกืื ืืืืืืช. ืืืืจื ืืืื ืืืืืจ JSON ืืื ืืืฆืข ืกืืจื ืฉื ืืืืขืืช ืืืืฆืขืืช
ืืื ืืืฉืชืืฉ ื- Avro, ืขืืื ืืคืจืืก ืืืฉืืจ ื ืคืจื
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
ืคืจืืื ืขื ืืฉืืืืฉ ื-Avro ืืืืืจืช ืจืืฉืื ืขืืืจื ืื ืืขืืจ ืืชืืื ืืืืืจ - ืืืืฉื, ืืฉื ืืืืืจืืช, ื ืฉืชืืฉ ื-JSON.
2. ืืืืจืช ืืืืืจ ืขืฆืื
ืขืืฉืื ืืชื ืืืื ืืืืช ืืฉืืจืืช ืืชืฆืืจืช ืืืืืจ ืขืฆืื, ืฉืืงืจื ื ืชืื ืื ืืืืงืืจ.
ืืืื ื ืกืชืื ืขื ืืืืืื ืฉื ืืืืจืื ืืฉื ื DBMS: PostgreSQL ื-MongoDB, ืฉืขืืืจื ืืฉ ืื ื ืืกืืื ืืฉืืฉ ืืื ืืื ืืืืืื (ืืื ื ืงืื ืื, ืืื ืืืืง ืืืืงืจืื ืืฉืืขืืชืืื!).
ืืชืฆืืจื ืืชืืืจืช ืืกืืืื JSON ืืืืขืืช ื-Kafka Connect ืืืืฆืขืืช ืืงืฉืช POST.
2.1. PostgreSQL
ืชืฆืืจืช ืืืืจ ืืืืืื ืขืืืจ PostgreSQL:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
ืขืงืจืื ืืคืขืืื ืฉื ืืืืืจ ืืืืจ ืชืฆืืจื ืื ืืื ืื ืคืฉืื:
- ืืืชืืื ืืจืืฉืื ื ืืื ืืชืืืจ ืืืกื ืื ืชืื ืื ืฉืฆืืื ืืชืฆืืจื ืืืชืืื ืืืฆื ืชืืื ืช ืืฆื ืจืืฉืื ืืช, ืฉืืืืช ืืงืคืงื ืืช ืกื ืื ืชืื ืื ืืจืืฉืื ื ืฉืืชืงืื ืขื ืืชื ืื
SELECT * FROM table_name
. - ืืืืจ ืืฉืืืช ืืืชืืื, ืืืืืจ ื ืื ืก ืืืฆื ืฉื ืงืจืืืช ืฉืื ืืืื ืืงืืฆื PostgreSQL WAL.
ืืืื ืืืคืฉืจืืืืช ืืื ื ืขืฉื ืฉืืืืฉ:
-
name
- ืฉื ืืืืืจ ืฉืขืืืจื ืืฉืืฉืช ืืชืฆืืจื ืืืชืืืจืช ืืืื; ืืขืชืื, ืืฉื ืืื ืืฉืืฉ ืืขืืืื ืขื ืืืืืจ (ืืืืืจ ืืืฆืื ืืช ืืกืืืืก / ืืคืขื ืืืืฉ / ืขืืื ืืช ืืชืฆืืจื) ืืจื Kafka Connect REST API; -
connector.class
- ืืืืงืช ืืืืจื DBMS ืฉืชืฉืืฉ ืืช ืืืืืจ ืืืืืืจ; -
plugin.name
ืืื ืฉื ืืชืืกืฃ ืืคืขื ืื ืืืื ืฉื ื ืชืื ืื ืืงืืฆื WAL. ืืืื ืืืืืจืwal2json
,decoderbuffs
ะธpgoutput
. ืฉื ื ืืจืืฉืื ืื ืืืจืฉืื ืืชืงื ื ืฉื ืืืจืืืืช ืืืชืืืืืช ื-DBMS, ืpgoutput
ืขืืืจ PostgreSQL ืืจืกื 10 ืืืขืื ืืื ื ืืืจืฉ ืื ืืคืืืฆืืืช ื ืืกืคืืช; -
database.*
โ ืืคืฉืจืืืืช ืืืืืืจ ืืืกื ืื ืชืื ืื, ืืืืdatabase.server.name
- ืืฉื ืฉื ืืืคืข PostgreSQL ืืืฉืืฉ ืืืฆืืจืช ืฉื ืื ืืฉื ืืืฉืืื ืงืคืงื; -
table.include.list
- ืจืฉืืื ืฉื ืืืืืืช ืฉืืื ืื ื ืจืืฆืื ืืขืงืื ืืืจ ืฉืื ืืืื; ื ืืชื ืืคืืจืืschema.table_name
; ืื ื ืืชื ืืืฉืชืืฉ ืืื ืขืtable.exclude.list
; -
heartbeat.interval.ms
- ืืจืืื (ืืืืืืฉื ืืืช) ืฉืื ืืืืืจ ืฉืืื ืืืืขืืช ืคืขืืืืช ืื ืื ืืฉื ืืืืื; -
heartbeat.action.query
- ืืงืฉื ืฉืชืืืฆืข ืืขืช ืฉืืืืช ืื ืืืืขืช ืืืคืง (ืืืคืฉืจืืช ืืืคืืขื ืืื ืืจืกื 1.1); -
slot.name
- ืฉื ืืจืืฅ ืืฉืืคืื ืฉืืฉืืฉ ืืช ืืืืืจ; publication.name
- ืฉืืคืจืกืื ื-PostgreSQL ืฉืืืืืจ ืืฉืชืืฉ ืื. ืืืงืจื ืฉืืื ืื ืงืืื, Debezium ืื ืกื ืืืฆืืจ ืืืชื. ืื ืืืฉืชืืฉ ืฉืชืืชืื ื ืขืฉื ืืืืืืจ ืืื ืืกืคืืง ืืืืืืช ืืคืขืืื ืื, ืืืืืจ ืืืฆื ืขื ืฉืืืื;-
transforms
ืงืืืข ืืืฆื ืืืืืง ืืฉื ืืช ืืช ืืฉื ืฉื ื ืืฉื ืืืขื:-
transforms.AddPrefix.type
ืืฆืืื ืฉื ืฉืชืืฉ ืืืืืืืื ืจืืืืจืืื; -
transforms.AddPrefix.regex
- ืืกืื ืฉืืคืื ืฉืื ืฉื ื ืืฉื ืืืขื ืืืืืจ ืืืืฉ; -
transforms.AddPrefix.replacement
- ืืฉืืจืืช ืื ืื ืื ื ืืืืืจืื ืืืืฉ.
-
ืขืื ืขื ืคืขืืืืช ืื ืืฉืื ืืืื
ืืืจืืจืช ืืืื, ืืืืืจ ืฉืืื ื ืชืื ืื ืืงืคืงื ืขืืืจ ืื ืขืกืงื ืฉืืชืืืืื, ืืืืชื ืืช ื-LSN ืฉืื (ืืกืคืจ ืจืฆืฃ ืืืื) ืื ืืฉื ืืฉืืจืืช offset
. ืืื ืื ืงืืจื ืื ืืืืืจ ืืืืืจ ืืงืจืื ืื ืืช ืื ืืกื ืื ืชืื ืื, ืืื ืจืง ืืืง ืืืืืืืืช ืฉืื (ืฉืืื ืื ืชืื ืื ืืชืขืืื ืื ืืขืชืื ืจืืืงืืช)?
- ืืืืืจ ืืงืจื ืงืืฆื WAL ืืื ืืืื ืืื ืืชืืืืืืืืช ืขืกืงืืืช ืืืืืืืช ืฉืืื ืื ืืจ.
- ืืื, ืืื ืื ืืขืืื ืืช ืืืงืืื ืื ืืืื ืื ืื ืืฉื ืืื ืืืฉืืฆืช ืืฉืืคืื.
- ืื, ืืชืืจื, ืืืจืื ืืงืืฆื WAL ืืืืืช "ืชืงืืขืื" ืืืืกืง ืืืื ืื ืจืื ืืืืจ ืฉืื ืืืืกืง.
ืืืื ืืืคืฉืจืืืืช ืืืืช ืืขืืจื. heartbeat.interval.ms
ะธ heartbeat.action.query
. ืฉืืืืฉ ืืืคืฉืจืืืืช ืืื ืืืืืืช ืืืคืฉืจ ืืืฆืข ืืงืฉื ืืฉืื ืื ื ืชืื ืื ืืืืื ื ืคืจืืช ืืื ืคืขื ืฉื ืฉืืืช ืืืืขืช ืืืคืง. ืืคืืื, ื-LSN ืขืืื ื ืืฆื ืืืืืจ ืืจืืข (ืืืจืืฅ ืืฉืืคืื) ืืชืขืืื ืื ืืืื. ืื ืืืคืฉืจ ื-DBMS ืืืกืืจ ืงืืฆื WAL ืฉืืื ื ื ืืืฆืื ืขืื. ืืืืืข ื ืืกืฃ ืขื ืืืคื ืคืขืืืช ืืืคืฉืจืืืืช, ืจืื
ืืคืฉืจืืช ื ืืกืคืช ืฉืจืืืื ืืชืฉืืืช ืื ืจืื ืืืชืจ ืืื transforms
. ืืืจืืช ืฉืื ืืืชืจ ืขื ื ืืืืช ืืืืคื...
ืืืจืืจืช ืืืื, Debezium ืืืฆืจ ื ืืฉืืื ืืืืฆืขืืช ืืืื ืืืช ืืฉืืืช ืืืื: serverName.schemaName.tableName
. ืื ืืืื ืื ืชืืื ื ืื. ืืคืฉืจืืืืช transforms
ืืืืฆืขืืช ืืืืืืื ืจืืืืจืืื, ื ืืชื ืืืืืืจ ืจืฉืืื ืฉื ืืืืืืช ืฉืืฉ ืื ืชื ืืช ืืืืจืืขืื ืฉืืื ืื ืืฉื ืืขื ืฉื ืกืคืฆืืคื.
ืืชืฆืืจื ืฉืื ื ืืืืืช ื transforms
ืงืืจื ืืื: ืื ืืืจืืขื ื-CDC ืืืกื ืื ืชืื ืื ืืืขืงื ืืขืืจื ืื ืืฉื ืขื ืืฉื data.cdc.dbname
. ืืืจืช (ืืื ืืืืจืืช ืืื), Debezium ืืืจืืจืช ืืืื ืชืืฆืืจ ื ืืฉื ืขืืืจ ืื ืืืื ืืืืคืก: pg-dev.public.<table_name>
.
ืืืืืืช ืืืืจืื
ืืกืืฃ ืืชืืืืจ ืฉื ืชืฆืืจืช ืืืืืจ ืขืืืจ PostgreSQL, ืืืื ืืืืจ ืขื ืืชืืื ืืช/ืืืืืืช ืืืืืช ืฉื ืขืืืืชื:
- ืคืื ืงืฆืืื ืืืืช ืืืืืจ ืขืืืจ PostgreSQL ืืกืชืืืช ืขื ืืจืขืืื ืฉื ืคืขื ืื ืืืื. ืืื ืืื ืืื ื ืขืืงื ืืืจ ืืงืฉืืช ืืฉืื ืื ืืื ื ืืกื ืื ืชืื ืื (DDL) - ืืืชืื, ื ืชืื ืื ืืื ืื ืืืื ืื ืืฉืืื.
- ืืืืืื ืฉืืฉืชืืฉืื ืืืจืืฆื ืฉืืคืื, ืืืืืืจ ืฉื ืืืืืจ ืืคืฉืจื ืจืง ืืืืคืข ืืจืืฉื ืฉื ื-DBMS.
- ืื ืืืฉืชืืฉ ืฉืชืืชืื ืืืืืจ ืืชืืืจ ืืืกื ืื ืชืื ืื ืืฉ ืืืืืืช ืงืจืืื ืืืื, ืื ืืคื ื ืืืฉืงื ืืจืืฉืื ื, ืืืื ืขืืื ืืืฆืืจ ืืื ืืช ืืฉืืฆืช ืฉืืคืื ืืืคืจืกื ืืืกื ืื ืชืื ืื.
ืืืืช ืชืฆืืจื
ืื ืืืื ื ืืขื ืืช ืืชืฆืืจื ืฉืื ื ืืืืืจ:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
ืื ื ืืืืงืื ืฉืืืืจืื ืืฆืืืื ืืืืืืจ ืืชืืื:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
ื ืืืจ: ืื ืืืืืจ ืืืืื ืืฉืืืืฉ. ืขืืฉืื ืืืื ื ืชืืื ืืฆืจืื ืื ืชืืืจ ืืงืคืงื, ืืืืืจ ืืื ื ืืกืืฃ ืื ืฉื ื ืขืจื ืืืืื:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
ืื ืืฉื ืฉืื ื, ืื ืืืฆื ืืืืคื ืืื:
JSON ืืจืื ืืืื ืขื ืืฉืื ืืืื ืฉืื ื
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
ืืฉื ื ืืืงืจืื, ืืจืฉืืืืช ืืืจืืืืช ืืืืคืชื (PK) ืฉื ืืจืฉืืื ืฉืฉืื ืชื, ืืืขืฆื ืืฉืื ืืืื: ืื ืืืืชื ืืจืฉืืื ืืคื ื ืืื ืืื ืืคืื ืืืืจืื.
- ืืืงืจื ืฉื
INSERT
: ืขืจื ืืคื ื (before
) ืฉืืืืnull
ืืืืจืื ืืืืจืืืช ืฉืืืื ืกื. - ืืืงืจื ืฉื
UPDATE
: ืpayload.before
ืืืฆื ืืงืืื ืฉื ืืฉืืจื ืืืฆื, ืื ืื ืกpayload.after
- ืืืฉ ืขื ืืืืช ืืฉืื ืื.
2.2 MongoDB
ืืืืจ ืื ืืฉืชืืฉ ืืื ืื ืื ืืฉืืคืื ืืกืื ืืจืื ืฉื MongoDB, ืงืืจื ืืืืข ืื-oplog ืฉื ืืฆืืืช ืืจืืฉื ืฉื DBMS.
ืืืืื ืืืืืจ ืฉืชืืืจ ืืืจ ืขืืืจ PgSQL, ืื ืืื, ืืืชืืื ืืจืืฉืื ื, ื ืืงืืช ืชืืื ืช ืืืฆื ืืจืืฉืืช ืฉื ืื ืชืื ืื, ืืืืืจ ืืื ืืืืืจ ืขืืืจ ืืืฆื ืงืจืืืช oplog.
ืืืืื ืืชืฆืืจื:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
ืืคื ืฉื ืืชื ืืจืืืช, ืืื ืืคืฉืจืืืืช ืืืฉืืช ืืขืืืช ืืืืืื ืืงืืืืช, ืืื ืจืง ืืกืคืจ ืืืคืฉืจืืืืช ืืืืจืืืืช ืืืืืืจ ืืืกื ืื ืชืื ืื ืืืงืืืืืืช ืฉืืื ืฆืืืฆื.
ืืืืจืืช transforms
ืืคืขื ืื ืขืืฉืื ืืช ืืคืขืืืืช ืืืืืช: ืืคื ืื ืืช ืืฉื ืฉื ื ืืฉื ืืืขื ืืืชืื ืืช <server_name>.<db_name>.<collection_name>
ะฒ data.cdc.mongo_<db_name>
.
ืกืืืื ืืช ืืชืงืืืช
ืกืืืืืช ืกืืืื ืืช ืืชืงืืืช ืืืืืื ืืช ืืืืืื ืืืื ื ื ืืื ืืจืืคื ืืชืืื โ ืืืืืื ืืฉืืืืจืื ืขื ื ืชืื ืื ืืขืกืงืืืช, ืืืขืงื ืืืจ ืฉืื ืืื ื ืชืื ืื ืื ื ืืฆื ืืฆื ืืขื ืืื ืืื. ืืืื ื ืกืชืื ืื ืืืื ืืืฉืชืืฉ ืขืงืจืื ืืช ืืื ืืงืจื ืืืืืืื ืืื ืืงืจื.
ืืฉื ื ืฉืืืฉ ืืคืฉืจืืืืช ืืืืืื ืืกืืื:
- ืืฉื ืฉื ืงืคืงื ืงืื ืงื. ืื Connect ืืืืืจ ืืขืืื ืืืฆื ืืืืืจ, ืื ืืืจืฉ ืืืกืคืจ ืขืืืืื ืืืืืืจ ืืช ืืืชื group.id. ืืืืจ ืืื, ืื ืืื ืืื ื ืืฉื, ืืืืืจ ืืืคืขื ืืืืฉ ืขื ืืขืืื ืืฉื ื ืืืืฉืื ืืงืจืื ืืืขืืื ืืืืืืืืช ืืืืจืื ื ืื ืืฉื ืืงืคืงื.
- ืืืืื ืงืืฉืืจืืืช ืขื ืืฉืืื ืงืคืงื. ืืืืืจ ืคืฉืื ืืคืกืืง ืืงืจืื ืืืืงืื ืฉืื ืืฆืืื ืืฉืืื ืืงืคืงื ืืืื ืคืขื ืื ืกื ืืฉืืื ืืืชื ืืืืฉ ืขื ืฉืื ืืกืืื ืืฆืืื.
- ืืงืืจ ืื ืชืื ืื ืืื ื ืืืื. ืืืืืจ ืื ืกื ืืืชืืืจ ืืืืฉ ืืืงืืจ ืืืชืื ืืชืฆืืจื. ืืจืืจืช ืืืืื ืืื 16 ื ืืกืืื ืืช ืฉืืืืฉ
ืืืืื ืืงืกืคืื ื ืฆืืืื . ืืืืจ ืื ืืกืืื ื-16 ืืืืฉื, ืืืฉืืื ืชืกืืื ื ื ืืฉื ืืืืื ืฆืืจื ืืืคืขืื ืืืชื ืืืืฉ ืืืืคื ืืื ื ืืืืฆืขืืช ืืืฉืง Kafka Connect REST.- ืืืงืจื ืฉื PostgreSQL ื ืชืื ืื ืื ืืืืื, ืื ืฉืืืืฉ ืืืจืืฆื ืฉืืคืื ืืื ืข ืืืืงื ืฉื ืงืืฆื WAL ืฉืื ื ืงืจืื ืขื ืืื ืืืืืจ. ืืืงืจื ืื, ืืฉ ืืืกืจืื: ืื ืงืืฉืืจืืืช ืืจืฉืช ืืื ืืืืืจ ื-DBMS ืืืคืจืขืช ืืืฉื ืืื ืจื, ืืฉ ืกืืืื ืฉืฉืื ืืืืกืง ืืืืจ, ืืื ืขืืื ืืืืืื ืืืฉื ืฉื ื-DBMS ืืืื.
- ืืืงืจื ืฉื MySQL ื ืืชื ืืกืืื ืงืืฆื binlog ืขื ืืื ื-DBMS ืขืฆืื ืืคื ื ืฉืืืืจ ืืงืืฉืืจืืืช. ืื ืืืจืื ืืืืืจ ืืขืืืจ ืืืฆื ืืฉื, ืืืื ืืฆืืจื ืืืคืขืื ืืืืฉ ืืืฆื ืชืืื ืช ืืฆื ืจืืฉืื ื ืืื ืืืืฉืื ืืงืจืื ื-binlogs ืืื ืืฉืืืจ ืืช ืืคืขืืื ืืจืืืื.
- ืขื MongoDB. ืืชืืขืื ืืืืจ: ืืืชื ืืืืช ืฉื ืืืืืจ ืืืงืจื ืฉืงืืืฆื ืืืืื/ืืืคืืืืื ื ืืืงื ืืืืืืจ ืื ืืืื ืืืืฉืื ืืงืจืื ืืืืืงืื ืฉืื ืืืคืกืง ืืื ืขืืืจ ืื ื-DBMS. ืื ืืืื ืืขืืืื ืฉืืืืืจ ืืืื ืก ืืืืื ื ื ืืฉื ืืืืจืืฉ ืืคืขืื ืืืืฉ ืืืฆื ืชืืื ืช ืืฆื ืจืืฉืื ืืช.
ืขื ืืืช, ืืฉื ื ืืืฆืื ืืืคื. ืื ืืืืืจ ืืื ืืืฆื ืื ืืชืง ืืืฉื ืืื ืจื (ืื ืื ืืฆืืื ืืืืืข ืืืืคืข MongoDB), ืื-oplog ืืกืชืืื ืืืืื ืืื ืื, ืื ืืืฉืจ ืืืืืืจ ืืฉืืืืจ, ืืืืืจ ืืืฉืื ืืจืืืข ืืงืจืื ื ืชืื ืื ืืืืืงืื ืืืืื ืืจืืฉืื , ืืืื ืืืง ืืื ืชืื ืื ืืงืคืงื ืื ืืคืืข.
ืืกืงื ื
Debezium ืืื ืื ืืกืืื ืืจืืฉืื ืฉืื ืขื ืืขืจืืืช CDC ืืืื ืืืื ืืืืื ืืกื ืืื. ืืคืจืืืงื ืฉืืื ืืช ืชืืืืช ื-DBMS ืืจืืฉื, ืงืืืช ืชืฆืืจื, ืชืืืื ืืืฉืืืืืช ืืงืืืื ืคืขืืื. ืืืขืื ืืื ืื ืืชืจืืื, ืื ื ืืืืืฅ ืืงืจืื ืืช ืืืืจืืืื ืขืืืจ
ืืืฉืืืื ืืืืืจ JDBC ืขืืืจ Kafka Connect, ืืืชืจืื ืืขืืงืจื ืฉื Debezium ืืื ืฉืฉืื ืืืื ื ืงืจืืื ืืืืื ื ื-DBMS, ืื ืฉืืืคืฉืจ ืงืืืช ื ืชืื ืื ืืืฉืืืื ืืื ืืืืืช. ื-JDBC Connector (ืฉืืกืืคืง ืขื ืืื Kafka Connect) ืืืฆืข ืฉืืืืชืืช ืขื ืืืืื ืืื ืืงืืช ืืืจืืื ืงืืืข ื(ืืืืชื ืกืืื) ืืื ื ืืืืฆืจ ืืืืขืืช ืืืฉืจ ืื ืชืื ืื ื ืืืงืื (ืืื ื ืืชื ืืืฆืข ืฉืืืืชื ืื ืชืื ืื ืฉืืื ื ืงืืืืื?).
ืืื ืืคืชืืจ ืืขืืืช ืืืืืช, ืืชื ืืืื ืืฉืื ืื ืืคืชืจืื ืืช ืืืืื (ืื ืืกืฃ ื-Debezium):
-
ืืืืจ JDBC Kafka Connect - ืืื ืคืชืจืื ืืช MySQL ืืืื:
-
ืืืจืงื ืืืืื ืืืื , ืืื ืื "ืงืืืืจืืืช ืืฉืงื" ืฉืื ื ืืืืืืื.
ื .ื.
ืงืจื ืื ืืืืื ืฉืื ื:
- ยซ
ืงืืข ืืช ืืืืื ืืืชืืื ืืืฉืืื ืงืคืงื ื-Kubernetes "; - ยซ
ืกืืคืืจืื ืืขืฉืืื ืืืื ืืืืืืื ืฉืื ื ื-SRE. ืืืง 2 "; - ยซ
ืกืงืืจื ืงืฆืจื ืฉื ืืคืขืืื PostgreSQL ืขืืืจ Kubernetes, ืืืืืจื ืืื ืืกืืื ืฉืื ื ".
ืืงืืจ: www.habr.com