áá»áœááºá¯ááºáá¡áá¯ááºááœááºá áá¯ááŸá¬ážáá¬áá¬á
áá¬ážá¡ááºáá¬áááºáá±á«áºááœáẠá¡ááœááºááŸá¬ážáá«ážááá·áº áááºážááá¬ááá¯ááºáᬠááŒá±ááŸááºážáá»ááº/áá±á¬á·ááºáá²áẠáá¯ááºáá¯ááºá¡áá
áºáá»á¬ážááᯠáááŒá¬áá ááœá±á·áá±ááááºá á€áá±á¬ááºážáá«ážááŒáá·áº áá»áœááºá¯ááºááẠDebezium ááᯠá¡áá¯á¶ážááŒá¯á Kafka á¡á
á¯á¡áá±ážáá
áºáá¯ááá¯á· CDC ááŒá
áºáááºáá»á¬ážááᯠCDC ááŒá
áºáááºáá»á¬áž áá±ážááá¯á·ááŒááºážááᯠconfigure ááŒá¯áá¯ááºááẠááá¯á¡ááºáá±á¬á¡áá«ááœáẠááá¯ááœá¬ááá»ááºáá
áºáá¯ááᯠá¥ááá¬áá
áºáá¯ááŒáá·áº ááŒáá·áºáááºážááẠááŒáá¯ážá
á¬ážáá«áááºá á¡áá¯ááºááŒá®ážááá¯á·áá±á«áºáá¬áá²á· áá®áá¯á¶ážáááºáá»ááºáá±á¬ááºážáá«ážáᬠáááŒá¬ážáá°ááœá±á¡ááœáẠá¡áá¯á¶ážáááºáááºááá¯á· áá»áŸá±á¬áºááá·áºáá«áááºá
áá±áá¯áá»á¡á¬ážááŒáá·áº Debezium ááŸáá·áº CDC áá°ááẠá¡áááºáááºážá
á€
áá»áœááºá¯ááºááá¯á·ááẠCDC ááᯠááá¬ážááá¯ážáá»áá»ááºážáááºáááºážááŸáá·áº ááŸáá¯ááºážááŸááºáá«á (á¡ááá®áá±ážááŸááºážááẠDBMS á០áá±áá¬ááᯠááá¯ááºááá¯ááºáááºááá·áºá¡áá«) áááºážáá¡áááá¡á¬ážáá¬áá»ááºáá»á¬ážááŸá¬ á¡áááºážá¡ááá·áºááœáẠáá±áá¬ááŒá±á¬ááºážáá²ááŸá¯ streaming ááᯠá¡áá±á¬ááºá¡áááºáá±á¬áºáá¬ááœáẠlatency áááºážáá«ážááŒááºážá ááŒáá·áºáá¬ážáá±á¬áá¯á¶ááŒááºá áááºáá»áááŸá¯ááŸáá·áº áááŸáááá¯ááºááŸá¯ááá¯á·áá«áááºáááºá CDC ááŒá áºáááºáá»á¬ážá¡ááœáẠááá¯ááŸá±á¬ááºáá¬á¡ááŒá ẠKafka á¡á á¯á¡áá±ážááᯠá¡áá¯á¶ážááŒá¯ááŒááºážááŒáá·áº áá±á¬ááºáá¯á¶ážá¡áá»ááºááŸá áºáá»ááºááᯠáááŸááááºá
á¡ááŒá¬ážá¡á¬ážáá¬áá»ááºááŸá¬ ááŒá áºáááºáá»á¬ážááᯠááááºážáááºážáááºá¡ááœáẠáá±á¬áºáááºáá áºáá¯áááºážááᯠá¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ááŒá±á¬áá·áº á¡áá¯á¶ážá¡ááá®áá±ážááŸááºážááẠááá°áá®áá±á¬ DBMSs áááºáááºááŸá¯á ááœá²ááŒá¬ážááŸá¯áá»á¬ážááᯠá áááºáá°á áá¬áááá¯áá«á
áá±á¬ááºáá¯á¶ážááœááºá áááºáá±á·áá»áºááœá²á á¬ážááᯠá¡áá¯á¶ážááŒá¯ááŒááºážááŒáá·áº áá±áá¬ááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážááᯠá á±á¬áá·áºááŒáá·áºááá·áº á¡ááá®áá±ážááŸááºážáá»á¬ážááᯠá¡áá»á¬ážááá¯áẠá¡ááá¯ááºážá¡áá¬á¡áá á¡ááá¯ááºážá¡áá¬á¡áá ááœáá·áºááŒá¯áá±ážáááºá áá áºáá»áááºáááºážááŸá¬áááºá áá±áá¬ááᯠDBMS á០ááá¯ááºááá¯ááºááá¯ááºáá² Kafka á¡á á¯á¡áá±ážá០áááŸááá±á¬ááŒá±á¬áá·áº áá±áá¬áááºážááŒá áºá¡áá±á«áº áááºáá±á¬ááºááŸá¯ááᯠáááºážáá«ážá á±áááºá
Debezium áááá¯áá¬ááŸáá·áº áááºáááº
Debezium ááá¯á¡áá¯á¶ážááŒá¯ááŒááºážááẠá€ááá¯ážááŸááºážáá±á¬á¡á á®á¡á á¥áºááá¯á· áá±á¬ááºááŸááá¬ááẠá
DBMS (áá±áá¬á¡áááºážá¡ááŒá áºá¡ááŒá áº) â Kafka áá»áááºáááºááŸá¯ â Apache Kafka â áá¯á¶ážá áœá²áá°
ááá¯ááºáá±á¬áºáá¯á¶á¡áá±ááŒáá·áº á€áááºááŸá¬ ááá±á¬áá»ááºáááºááá¯ááºá០áá¯á¶ááŒááºážááŒá áºáááºá
ááá¯á·áá±á¬áºáááºážá sink connector ááá¯áá¬á¡áá¯á¶ážááŒá¯ááŒááºážááẠááŒá
áºááá¯ááºáá»á±ááŸááá±á¬ááŒá±á¬áá·áº á€á¡á
á®á¡á
á¥áºááᯠáá»áœááºá¯ááºááááºáááŒáá¯ááºáá«á
á¡ááŸááºááááºááœááºá áááºá Data Lake ááᯠááŒáá·áºááŒááºážá¡ááŒá±á¡áá±ááŸá¬ ááá°áá®áá«á (á¡áááºáá±á¬áºááŒáá« áá¯á¶ááœáẠáá±á¬ááºáá¯á¶ážááá·áºááº) áá«áᬠDebezium ááá¯áá¯á¶ážááá¯á·áá áºáá¯áááºážáá±á¬áááºážáááºážááá¯ááºáá«áá°ážá Apache Kafka ááá¯á·áá±ážááá¯á·áá±á¬ááŒá áºáááºáá»á¬ážááá¯á¡ááŒá±á¡áá±á¡áá»áá¯ážáá»áá¯ážááá¯ááá¯ááºááœááºáááºáááºáá¡ááºááºáá®áá±ážááŸááºážáá»á¬ážáá¡áá¯á¶ážááŒá¯ááá¯ááºáááºá á¥ááá¬á¡á¬ážááŒááºá·:
- ááááºááá¯ááºáá±á¬áá±áá¬ááᯠcache ááŸáááºááŸá¬áž;
- á¡ááŒá±á¬ááºážááŒá¬ážá á¬áá»á¬ážáá±ážááá¯á·ááŒááºážá
- ááŸá¬ááœá±ááŸá¯á¡ááœáŸááºážááœááºážáá¶ááŸá¯áá»á¬ážá
- á á¬áááºážá á áºááŸááºáááºážá¡áá»áá¯á·á
- ...
ááá·áºááœáẠJava á¡ááá®áá±ážááŸááºážáá
áºáá¯ááŸáááŒá®áž Kafka á¡á
á¯á¡áá±ážááᯠá¡áá¯á¶ážááŒá¯ááẠáááá¯á¡ááº/ááŒá
áºááá¯ááºááŒá±áááŸááá«áá áááºážááŸáááá·áº áá¯ááºáá±á¬ááºááá¯ááºááŒá±áááºáž ááŸááá«áááºá
á€áá±á¬ááºážáá«ážááœáẠá¡ááŸá¬ážáá¶ááá¯ááºáááºááŸáá·áº áá»á²á·ááœááºááá¯ááºááŸá¯ááá¯á·ááᯠáá¶á·ááá¯ážáá±ážááá·áº áá±á¬á·ááºáá²á¡ááºáá»ááºáá®áá¬áá»á¬ážá á¡ááŒá¶ááŒá¯áá¬ážáá±á¬ áááá¯áá¬ááá¬ááᯠá€áá±á¬ááºážáá«ážááœáẠááœá±ážááœá±ážáá«áááºá
áá»áááºáááºáááááᬠááœá²á·á ááºážááŸá¯
á¡áá±ážááŒá®ážáá¯á¶ážáááºááá¯áž - áá±áᬠ- á¡ááŒá±á¬ááºážá¡áá²áá»á¬ážááᯠááŒá±áá¬áá¶áááºá¡ááœáẠáá»áœááºá¯ááºááá¯á· ááá¯á¡ááºáááº-
- áá±áá¬á¡áááºážá¡ááŒá
áºá áá¬ážááŸááºáž 5.7á PostgreSQL 9.6+á MongoDB 3.2+ (áá¬ážááŸááºáž XNUMX ááŸá
áááºááá·áº MySQL ááŒá
áºááá¯ááºáááºá
á¡ááŒááºá·á¡áá á¬áááºáž ); - Apache Kafka á¡á á¯á¡áá±áž;
- Kafka Connect á¥ááᬠ(áá¬ážááŸááºáž 1.xá 2.x);
- Debezium áá»áááºáááºáááááá¬ááᯠááŒááºáááºáá¬ážáááºá
áááá¡áá»ááºááŸá
áºáá»ááºá i.e. DBMS ááŸáá·áº Apache Kafka á áááºáááºááŒááºážáá¯ááºáááºážá
ááºááẠáá±á¬ááºážáá«ážá áááºáááºáááºáá»á±á¬áºááœááºáá«áááºá ááá¯á·áá±á¬áºá sandbox ááœááºá¡áá¬á¡á¬ážáá¯á¶ážááá¯á¡áá¯á¶ážáá»ááá¯áá°áá»á¬ážá¡ááœááºá ááá°áá¬áá»á¬ážáá«ááŸááá±á¬ááá¬ážáááºááá¯ááŸá±á¬ááºááŸá¯ááœááºá¡áááºááá·áºáá¯ááºáá¬ážáá±á¬á
áá±á¬ááºáá¯á¶ážá¡áá»ááºááŸá áºáá»ááºááᯠá¡áá±ážá áááºáááºáááºáá±á¬áºááŒáá«áááºá
0. Kafka áá»áááºáááºááŸá¯
á€ááœááºááŸáá·áº áá±á¬ááºáááºáá±á¬ááºážáá«ážááœááºá Debezium developer áá»á¬ážá០ááŒáá·áºáá±áá¬ážáá±á¬ Docker áá¯á¶á áááºá ááºáá¯á¶ááœáẠconfiguration example á¡á¬ážáá¯á¶ážááᯠááœá±ážááœá±ážáá¬ážáááºá áááºážááœáẠááá¯á¡ááºáá±á¬ ááááºá¡ááºááá¯ááºáá»á¬áž (áá»áááºáááºáááááá¬áá»á¬áž) áá«áááºááŒá®áž áááºáááºážáá»ááºááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážááᯠá¡áá¯á¶ážááŒá¯á Kafka Connect áááœá²á·á ááºážáá¯á¶ááᯠáá¶á·ááá¯ážáá±ážáá«áááºá
á¡áááºá áááºááẠConfluent á០Kafka Connect ááá¯á¡áá¯á¶ážááŒá¯ááẠáááºááœááºáá¬ážáá«áá ááá¯á¡ááºáá±á¬áá»áááºáááºáááááá¬áá»á¬ážá ááááºá¡ááºáá»á¬ážááᯠáá®ážááŒá¬ážáááºááŸááºáá¬ážáá±á¬ áááºážááœáŸááºááœáẠááá·áºááœááºážááẠááá¯á¡ááºáááºááŒá
áºáááºá plugin.path
ááá¯á·ááá¯áẠáááºáááºážáá»ááºááŒá±á¬ááºážáá²ááŸá¯áá
áºáá¯ááŸáá
áºááá·áº áááºááŸááºáá«á CLASSPATH
. Kafka Connect worker á¡ááœáẠáááºáááºáá»á¬áž ááŸáá·áº connectors áá»á¬ážááᯠworker launch command ááá¯á· arguments á¡ááŒá
áºááŒááºáááºážááœá¬ážáá±á¬ configuration files ááŸáááá·áº áá¯á¶ážááŒááºáá«áááºá á¡áá±ážá
áááºá¡áá»ááºá¡áááºáá»á¬ážááᯠááŒáá·áºááŸá¯áá«á
áá»áááºáááºáááááá¬áá¬ážááŸááºážááœáẠDebeizum ááá¯ááá·áºááœááºážááŒááºážáá¯ááºáááºážá ááºáá áºáá¯áá¯á¶ážááᯠá¡ááá·áºááŸá áºááá·áºááŒáá·áº áá±á¬ááºááœááºáááºá áá áºáá¯áá»ááºážá á®ááᯠááŒáá·áºáá¡á±á¬ááºá
1. Kafka Connect áá°áá±á¬ááºááᯠá áá áºááá·áºááœááºážááŒááºážá
Apache Kafka á¡á á¯á¡áá±ážááá¯á· áá±áá¬ááᯠááá¯ááºááá¯ááºáá¯ááºááœáŸáá·áºáááºá áááá»áá±á¬ ááá·áºáááºáá±á¬ááºáá»á¬ážááᯠKafka Connect áá°áá±á¬ááºááœáẠáááºááŸááºáá¬ážáááºáá
- á¡á á¯á¡áá±ážááá¯á· áá»áááºáááºáááºá¡ááœáẠááá·áºáááºáá»ááºáá»á¬ážá
- connector á configuration ááá¯ááºááá¯ááºá ááá¯ááºááá¯ááºááááºážáááºážááá·áº á¡ááŒá±á¬ááºážá¡áá¬áá»á¬ážá á¡áááºáá»á¬ážá
- áá»áááºáááºáááááá¬áááºáááºáá±ááá·áºá¡ááœá²á·áá¡ááẠ(ááŒáá·áºáá±áá¯ááºááá¯á¡áá¯á¶ážááŒá¯áá«á)á
ááá±á¬áá»ááºáááá¬ážááẠDocker áá¯ááºáá¯á¶ááẠáááºáááºážáá»ááºááŒá±á¬ááºážááœá²áá»ááºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯á ááœá²á·á ááºážááŸá¯áá¯á¶á á¶ááᯠáá¶á·ááá¯ážáá±ážááẠ- áááºážááẠáá»áœááºá¯ááºááá¯á·á¡áá¯á¶ážááŒá¯ááá·áºá¡áá¬ááŒá áºáááºá ááá¯á·ááŒá±á¬áá·áº áá¯á¶ááᯠáá±á«ááºážáá¯ááºáá¯ááºáá«á
docker pull debezium/connect
áá»áááºáááºáááááá¬ááá¯áááºáááºáááºá¡ááœáẠá¡áááºážáá¯á¶ážáááºáááºážáá»áẠááááºážááŸááºáá»á¬ážá¡á á¯á¡áá±ážááŸá¬ á¡á±á¬ááºáá«á¡ááá¯ááºážááŒá áºáááº-
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
â á¡á á¯á¡ááœá²á·á¡ááœá²á·áááºáá»á¬ážáá á¬áááºážá¡ááŒáá·áºá¡á á¯á¶ááá¯áááŸáááẠKafka á¡á á¯ááá¯ááºáá¬áá¬áá»á¬ážá áááŠážá á¬áááºážá -
OFFSET_STORAGE_TOPIC=connector-offsets
- áááºááŸááá»áááºáááºáááááá¬áááºááŸááá¬áá±áá¬áá»á¬ážááᯠááááºážáááºážááẠáá±á«ááºážá ááºáá áºáá¯á -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
â áá»áááºáááºáááááá¬á á¡ááŒá±á¡áá±ááᯠááááºážáááºážááẠáá±á«ááºážá ááºááŸáá·áº áááºážá áá¯ááºáááºážáá±á¬ááºáá¬áá»á¬ážá -
CONFIG_STORAGE_TOPIC=connector-config
â connector configuration data ááŸáá·áº áááºážá áá¯ááºáááºážáá±á¬ááºáá¬áá»á¬ážááᯠááááºážáááºážááẠá¡ááŒá±á¬ááºážá¡áá¬á -
GROUP_ID=1
â áá»áááºáááºááŸá¯áá¯ááºáááºážááᯠáá¯ááºáá±á¬ááºááá¯ááºááá·áº á¡áá¯ááºááá¬ážá¡á¯ááºá á¯á á¡áá±á¬ááºá¡áá¬ážá ááŒáá·áºáá±á¡áá¯á¶ážááŒá¯áá¬ááœáẠááá¯á¡ááºáá«áááºá (ááŒáá·áºáá±áá¬ážáááº) á á áºá¡á¯ááºá á¯á
áá»áœááºá¯ááºááá¯á·ááẠဠvariable áá»á¬ážááŒáá·áº ááœááºááááºáá¬ááᯠá áááºááá¯ááºáááº-
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
Avro á¡ááŒá±á¬ááºáž ááŸááºáá»ááº
áá°áááºážá¡á¬ážááŒáá·áºá Debezium ááẠsandboxes áá»á¬ážááŸáá·áº data ááá¬áá¡áááºážáááºá¡ááœáẠáááºáá¶ááá¯ááºáá±á¬ JSON áá±á¬áºáááºááœáẠáá±áá¬ááᯠáá±ážáá¬ážáá±á¬áºáááºáž á¡ááœáẠloaded databases áá»á¬ážááœáẠááŒá¿áá¬ááŒá
áºáá¬ááá¯ááºáááºá JSON converter á á¡ááŒá¬ážááœá±ážáá»ááºá
áá¬áá
áºáá¯ááŸá¬ áááºáá±á·áá»áºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯á áá¶áá«ááºá
ááºá
á®áááºááŒá
áºáááºá
Avro ááá¯á¡áá¯á¶ážááŒá¯ááẠáá®ážááŒá¬ážáá
áºáá¯á¡áá¯á¶ážááŒá¯ááẠááá¯á¡ááºáááºá
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
Avro á¡áá¯á¶ážááŒá¯ááŒááºážááŸáá·áº áááºážá¡ááœáẠááŸááºáá¯á¶áááºááŒááºážá á¥áºá á¥áº á¡áá±ážá áááºá¡áá»ááºá¡áááºáá»á¬ážááẠá€áá±á¬ááºážáá«ážá á¡ááá¯ááºážá¡áá¬ááẠáá»á±á¬áºááœááºáá±áá«ááẠ- ááá¯ááá¯ááŸááºážáááºážá á±áááºá¡ááœááºá áá»áœááºá¯ááºááá¯á·ááẠJSON ááᯠá¡áá¯á¶ážááŒá¯áá«áááºá
2. áá»áááºáááºáááááá¬ááᯠááá¯ááºááá¯ááºááŒááºáááºááŒááºážá
ááᯠáááºááẠá¡áááºážá¡ááŒá áºááŸáá±áá¬ááá¯áááºááá·áº connector ááá¯ááºááá¯ááºáááœá²á·á ááºážáá¯á¶ááá¯á· ááá¯ááºááá¯ááºááœá¬ážááá¯ááºáááºá
DBMS ááŸá áºáá¯á¡ááœáẠáá»áááºáááºáááááá¬áá»á¬áž á¥ááá¬ááᯠááŒáá·áºááŒáá«á áá¯á·á áá»áœááºá¯ááºááœáẠá¡ááœá±á·á¡ááŒá¯á¶ááŸáááŒá®áž ááœá²ááŒá¬ážááŸá¯áá»á¬ážááŸáááẠ(á¡áá±ážá¡ááœáŸá¬ážááŒá áºáá±á¬áºáááºáž á¡áá»áá¯á·ááá á¹á áá»á¬ážááœáẠáááá¬áááºááŸá¬ážáááº)á
ááœá²á·á ááºážááŸá¯á¡á¬áž JSON áááºá¹áá±áááœáẠáá±á¬áºááŒáá¬ážááŒá®áž POST áá±á¬ááºážááá¯ááŸá¯ááᯠá¡áá¯á¶ážááŒá¯á Kafka Connect ááá¯á· á¡ááºáá¯ááºáá¯ááºáá¬ážáááºá
á.áá PostgreSQL
PostgreSQL á¡ááœáẠááá°áá¬áá»áááºáááºáááááᬠááœá²á·á ááºážááŸá¯áá¯á¶á á¶-
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
á€áááºáááºááŸá¯ááŒá®ážáá±á¬áẠconnector ááááºáááºááŸá¯áááá¬ááááºá¡áá±á¬áºáá±ážááá¯ážááŸááºážáááº-
- áááá¡ááŒááẠá
áááºáá±á¬á¡áá«á áááºážááẠááœá²á·á
ááºážááŸá¯áá¯á¶á
á¶ááœáẠáááºááŸááºáá¬ážááá·áº áá±áá¬áá±á·á
áºááá¯á· áá»áááºáááºááŒá®áž áá¯ááºááœáẠá
áááºáááºá áááŠážáá»áŸááºáá
áºááŒááºá¡ááŒá±á¡áá±á¡á á¡áá¯á¶ážááŒá¯á áááŸááá±á¬ áááŠážáá±áá¬á¡á
á¯ááᯠKafka ááá¯á· áá±ážááá¯á·ááŒááºážá
SELECT * FROM table_name
. - á áááºááŒááºážááŒá®ážáá«á PostgreSQL WAL ááá¯ááºáá»á¬ážá០á¡ááŒá±á¬ááºážá¡áá²áá»á¬ážááᯠáááºááẠáá»áááºáááºáááááá¬ááẠáá¯ááºááá¯á·ááœá¬ážáá«áááºá
á¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ ááœá±ážáá»ááºá áá¬áá»á¬ážá¡ááŒá±á¬ááºáž-
-
name
- á¡á±á¬ááºááœááºáá±á¬áºááŒáá¬ážáá±á¬ configuration ááá¯á¡áá¯á¶ážááŒá¯áá¬ážááá·áº connector áá¡áááºá á¡áá¬áááºááœááºá á€á¡áááºááᯠKafka Connect REST API ááŸáá áºááá·áº á¡ááŒá±á¡áá±ááᯠááŒáá·áºááŸá¯ááŒááºáž/ááŒááºáááºá áááºááŒááºáž/á¡ááºááááºáá¯ááºááŒááºáž) áá»áááºáááºáááááá¬ááŸáá·áº áá¯ááºáá±á¬ááºááẠá€á¡áááºááᯠá¡áá¯á¶ážááŒá¯áá«áááºá -
connector.class
â configured connector ááŸá¡áá¯á¶ážááŒá¯ááá·áº DBMS connector class; -
plugin.name
â WAL ááá¯ááºáá»á¬ážá០áá±áá¬áá»á¬ážááᯠáá¯áá¹áááá¯ááºáá±ážááŒááºážá¡ááœáẠááááºá¡ááºá¡áááºá á០ááœá±ážáá»ááºáááŸáááá¯ááºáá«áááºáwal2json
,decoderbuffs
Оpgoutput
. áááááŸá áºáá¯ááẠDBMS ááœáẠááá·áºáá»á±á¬áºáá±á¬ extension áá»á¬ážááᯠáááºáááºááẠááá¯á¡ááºáááºápgoutput
PostgreSQL áá¬ážááŸááºáž 10 ááŸáá·áºá¡áááºá¡ááœáẠá¡ááá¯áá±á¬ááºážááŒááºááŸááºááŸá¯áá»á¬áž áááá¯á¡ááºáá«á -
database.*
â áá±áá¬áá±á·á áºááá¯á· áá»áááºáááºáááºá¡ááœáẠááœá±ážáá»ááºá áá¬áá»á¬áždatabase.server.name
â Kafka á¡á á¯á¡áá±ážááœáẠáá±á«ááºážá ááºá¡áááºááᯠááœá²á·á ááºážááẠá¡áá¯á¶ážááŒá¯áá±á¬ PostgreSQL instance á¡áááºá -
table.include.list
- áá»áœááºá¯ááºááá¯á·ááẠááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážááᯠááŒá±áá¬áá¶ááá¯áá±á¬ ááá¬ážáá»á¬ážá á¬áááºážá áá¯á¶á á¶ááŒáá·áºáááºááŸááºáá¬ážáááºáschema.table_name
; áá²á· ááœá²áá¯á¶ážááá¯á·áááá«áá°ážátable.exclude.list
; -
heartbeat.interval.ms
- áá»áááºáááºáááááá¬ááẠá¡áá°ážá¡ááŒá±á¬ááºážá¡áá¬áá áºáá¯ááá¯á· ááŸáá¯á¶ážáá¯ááºáá¶áááºáá±á·áá»áºáá»á¬áž áá±ážááá¯á·ááá·áº ááŒá¬ážáá¬á (áá®áá®á áá¹ááá·áºá¡ááœááºáž)á -
heartbeat.action.query
â ááŸáá¯á¶ážáá¯ááºáá¶áááºáá±á·ááºá»áá áºáá¯á á®ááᯠáá±ážááá¯á·ááá·áºá¡áá«ááœáẠáá¯ááºáá±á¬ááºááá·áº áá±á¬ááºážááá¯áá»áẠ(ááœá±ážáá»ááºááŸá¯ áá¬ážááŸááºáž 1.1 ááœáẠáá«ááŸááááº); -
slot.name
- áá»áááºáááºáááááá¬ááŸá¡áá¯á¶ážááŒá¯ááá·áº áá¯á¶áá°á¡áá±á«ááºáá¡áááºá publication.name
- á¡áááºáá¯á¶ááŸáááºáá¯ááºáá±ááŒááºáž áá»áááºáááºáá°á¡áá¯á¶ážááŒá¯ááá·áº PostgreSQL ááœááºá áááŸááá»áŸáẠDebezium áááºáá®ážááẠááŒáá¯ážá á¬ážáá«áááºá áá»áááºáááºááŸá¯ááŒá¯áá¯ááºáá¬ážááá·áº á¡áá¯á¶ážááŒá¯áá°ááẠá€áá¯ááºáá±á¬ááºáá»ááºá¡ááœáẠáá¯á¶áá±á¬ááºáá±á¬á¡ááœáá·áºá¡áá±ážáá»á¬áž áááŸááá«áá áá»áááºáááºáááááá¬ááẠá¡ááŸá¬ážá¡ááœááºážáá áºáá¯ááŒáá·áº áááºá á²ááœá¬ážáááºááŒá áºáááºá-
transforms
áá áºááŸááºáá±á«ááºážá ááºá á¡áááºááᯠáááºááá¯á·ááŒá±á¬ááºážáá²ááááºááᯠáááááá»áá» áá¯á¶ážááŒááºáááº--
transforms.AddPrefix.type
áá»áœááºá¯ááºááá¯á·ááẠáá¯á¶ááŸááºá¡áá¯á¶ážá¡ááŸá¯ááºážáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯áááºááᯠááœáŸááºááŒáááºá -
transforms.AddPrefix.regex
- áá áºááŸááºáá±á«ááºážá ááºáá¡áááºááᯠááŒááºáááºáááºááŸááºáá±ážááá·áº áá»ááºááŸá¬áá¯á¶ážáá áºáá¯á -
transforms.AddPrefix.replacement
- áá»áœááºá¯ááºááá¯á·ááẠááá¯ááºááá¯ááºá¡áááá¹áá«ááºááœáá·áºááá¯áá»ááºááŒá áºáááºá
-
ááŸáá¯á¶ážáá¯ááºááŒááºážááŸáá·áº ááŒá±á¬ááºážáá²ááŒááºážááá¯ááºáᬠáá±á¬ááºáááºá¡ááŒá±á¬ááºážá¡áá¬áá»á¬áž
áá¯á¶ááŸááºá¡á¬ážááŒáá·áºá áá»áááºáááºáááááá¬ááẠáááááŒá¯áá¬ážáá±á¬ ááœá±áá±ážááœá±áá°áá
áºáá¯á
á®á¡ááœáẠKafka ááá¯á· áá±áá¬áá±ážááá¯á·ááŒá®áž áááºážá LSN (Log Sequence Number) ááᯠáááºáá±á¬ááºááŸá¯áá±á«ááºážá
ááºááœáẠááŸááºáááºážáááºáá¬ážáááºá offset
. ááá¯á·áá±á¬áº áá»áááºáááºáááááá¬ááẠáá±áá¬áá±á·á
áºáá
áºáá¯áá¯á¶ážááᯠááááºááá¯ááºáá²á áááºážáááá¬ážáá»á¬ážá áá
áºá
áááºáá
áºááá¯ááºážáᬠ(áá±áá¬á¡ááºááááºáá»á¬áž áááŒá¬áááááŒá
áºáá±á«áºáá«á) áááºááá¯á·ááŒá
áºáááºáááºážá
- áá»áááºáááºáááááá¬ááẠWAL ááá¯ááºáá»á¬ážááᯠáááºáááºááŒá áºááŒá®áž áááºážááᯠá á±á¬áá·áºááŒáá·áºáá±ááá·áº ááá¬ážáá»á¬ážáᶠááœáŸá²ááŒá±á¬ááºážáá±ážááá·áº áááºááá·áºááœá±áá±ážáá»á±ááŸá¯ááá¯áá»áŸ áááœá±á·áá«á
- ááá¯á·ááŒá±á¬áá·áºá áááºážááẠáá±á«ááºážá áẠááá¯á·ááá¯áẠáá°ážáá°ááŒááºážá¡ááá¯ááºááœááºááŒá áºá á± áááºážááááºááŸáá¡áá±á¡áá¬ážááᯠá¡ááºááááºáá¯ááºáááºááá¯ááºáá«á
- áááºážááẠáá áºááºáá±á«áºááœáẠááááºážáááºážáá¬ážááá·áº WAL ááá¯ááºáá»á¬ážááᯠááŒá áºáá±á«áºá á±ááŒá®áž disk áá±áá¬ááœááºáá¯ááºááœá¬ážááœááºááŸááááºá
ááŒá®ážáá±á¬á· áá«á ááœá±ážáá»ááºá
áá¬ááœá± áááºáááºáá¬áá±á¬ááºáááºá heartbeat.interval.ms
О heartbeat.action.query
. á€ááœá±ážáá»ááºá
áá¬áá»á¬ážááᯠá¡ááœá²ááá¯ááºá¡áá¯á¶ážááŒá¯ááŒááºážááŒáá·áº ááŸáá¯á¶ážáá¯ááºáá¶áááºáá±á·áá»áºáá±ážááá¯á·ááá¯ááºáž áá®ážááŒá¬ážááá¬ážáá
áºáá¯ááœáẠáá±áá¬ááŒá±á¬ááºážáá²ááẠáá±á¬ááºážááá¯ááŸá¯ááᯠáá¯ááºáá±á¬ááºááá¯ááºá
á±áá«áááºá ááá¯á·ááŒá±á¬áá·áºá áá»áááºáááºáááááá¬ááẠáááºááŸááááºááŸááá±ááá·áº LSN (áá¯á¶áá°ááœá¬ážááŸá¯á¡ááá¯ááºááœááº) á¡áááºáááŒááºááœááºážáá¶áá¬ážáááºá áááºážááẠDBMS á¡á¬áž áááá¯á¡ááºáá±á¬á·áá±á¬ WAL ááá¯ááºáá»á¬ážááᯠáááºááŸá¬ážááá¯ááºá
á±áá«áááºá ááœá±ážáá»ááºá
áá¬áá»á¬áž áááºááá¯á·á¡áá¯ááºáá¯ááºáááºááᯠáááºááá¯ááá¯áá±á·áá¬ááá¯ááºáá«áááºá
á¡áá®ážáááºá¡á¬áá¯á¶á
áá¯ááºááá·áºáá±á¬ áá±á¬ááºáááºááœá±ážáá»ááºá
áá¬áá
áºáá¯ááŒá
áºáááºá transforms
. á¡ááŸá¡ááá²á· á¡áááºááŒá±áá±ááá·áº...
áá°áááºážá¡á¬ážááŒáá·áºá Debezium ááẠá¡á±á¬ááºáá«á¡áááºáá±ážááŒááºážáá°áá«áááᯠá¡áá¯á¶ážááŒá¯á á¡ááŒá±á¬ááºážá¡áá¬áá»á¬ážááᯠáááºáá®ážáááº- serverName.schemaName.tableName
. áá«á á¡ááŒá²áááºáž á¡áááºááŒá±ááŸá¬ ááá¯ááºáá«áá°ážá ááœá±ážáá»ááºááŸá¯áá»á¬áž transforms
áááá»áá±á¬á¡áááºáá
áºáá¯ááŸáá·áº áá±á«ááºážá
ááºáá
áºáá¯ááá¯á· áááºážááŒá±á¬ááºážááŒá±á¬ááºážáááºááá¯á¡ááºááá·áº ááá¬ážáá»á¬ážá
á¬áááºážááᯠáááºááŸááºááẠáá¯á¶ááŸááºá¡áá¯á¶ážá¡ááŸá¯ááºážáá»á¬ážááᯠáááºá¡áá¯á¶ážááŒá¯ááá¯ááºáááºá
áá»áœááºá¯ááºááá¯á·áááœá²á·á
ááºážáá¯á¶ááœááºáá»á±ážáá°ážáááºáá«áááºá transforms
á¡á±á¬ááºáá«ááá¯á· ááŒá
áºáá±á«áºáááº- á
á±á¬áá·áºááŒáá·áºáá¬ážáá±á¬ áá±áá¬áá±á·á
áºá០CDC ááŒá
áºáááºáá»á¬ážá¡á¬ážáá¯á¶ážááẠá¡áááºááŸáá·áº áá±á«ááºážá
ááºáá
áºáá¯ááá¯á· áá±á¬ááºááœá¬ážáááºááŒá
áºáááºá data.cdc.dbname
. ááá¯ááºáá«á (á€áááºáááºáá»á¬ážááá«áá²) Debezium ááẠááá¬ážáá
áºáá¯á
á®á¡ááœáẠáá±á«ááºážá
ááºáá
áºáá¯á
á®ááᯠáá¯á¶áá±á¡á¬ážááŒáá·áº áááºáá®ážáááº- pg-dev.public.<table_name>
.
áá»áááºáááºáááááᬠááá·áºáááºáá»ááºáá»á¬áž
PostgreSQL á¡ááœáẠconnector configuration ááá±á¬áºááŒáá»ááºááá¯áááá¯á¶ážáá»á¯ááºáááºá áááºážááá¯ááºáá±á¬ááºááŸá¯áá¡á±á¬ááºáá«á¡ááºá¹áá«áááºáá»á¬áž/ááá·áºáááºáá»ááºáá»á¬ážá¡ááŒá±á¬ááºážááŒá±á¬ááá·áºáááº-
- PostgreSQL á¡ááœáẠáá»áááºáááºáááááá¬á áá¯ááºáá±á¬ááºááá¯ááºá áœááºážááẠáá¯áá¹áááá¯ááºááœá²ááŒááºážá ááá±á¬ááá¬ážáá±á«áºááœáẠáá°áááºáááºá ááá¯á·ááŒá±á¬áá·áº áá° áá±áá¬áá±á·á áºááœá²á·á ááºážáá¯á¶ááᯠááŒá±á¬ááºážáá²ááẠáá±á¬ááºážááá¯áá»ááºáá»á¬ážááᯠááŒá±áá¬áá¶ááá¬ážáá«á (DDL) - ááá¯á·ááŒá±á¬áá·áº á€á¡áá»ááºá¡áááºááẠá¡ááŒá±á¬ááºážá¡áá¬áá»á¬ážááœáẠááŸááááºááá¯ááºáá«á
- áá¯á¶áá°á¡áá±á«ááºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯áá¬ážáá±á¬ááŒá±á¬áá·áº áá»áááºáááºáááááá¬ááᯠáá»áááºáááºááá¯ááºáááºá áᬠáŠážáá±á¬áẠDBMS á á¶ááá°áá¬ááá¯á·
- áá±áá¬áá±á·á áºááá¯á·áá»áááºáááºáááááá¬ááŸáá»áááºáááºááá·áºá¡áá¯á¶ážááŒá¯áá°ááœááºáááºáááºáá¬á¡ááœáá·áºá¡áá±ážáá»á¬ážááŸááá«áá ááááá áááºáá®ááœááºáááºááá¯ááºááá¯ááºáááºáá®ážááŒá®ážáá±áá¬áá±á·á áºááá¯á·ááœáŸáá·áºáááºáááºááá¯áá«áááºá
ááœá²á·á ááºážááŸá¯áá¯á¶á á¶ááᯠá¡áá¯á¶ážááŒá¯ááŒááºážá
ááá¯á·ááŒá±á¬áá·áºá áá»áœááºá¯ááºááá¯á·á configuration ááᯠconnector ááœáẠáááºááŒáá«á áá¯á·á
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
áá±á«ááºážáá¯áẠá¡á±á¬ááºááŒááºááŒá®áž áá»áááºáááºáááááᬠá áááºááŒá±á¬ááºáž áá»áœááºá¯ááºááá¯á· á á áºáá±ážáá«áááºá
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
áá±á¬ááºážáááº- áááºážááᯠá áá áºááá·áºááœááºážááŒá®áž á¡áá¯á¶ážááŒá¯ááá¯ááºáá«ááŒá®á ááᯠáá»áœááºá¯ááºááá¯á·ááẠá á¬ážáá¯á¶ážáá°áá áºáŠážá¡ááŒá Ạáááºáá±á¬ááºááŒá®áž Kafka ááá¯á· áá»áááºáááºááá¯ááºááŒáá«á áá¯á·á ááá¯á·áá±á¬áẠáá»áœááºá¯ááºááá¯á·ááẠááá¬ážááœáẠááá·áºááœááºážááŸá¯ááᯠáá±á«ááºážááá·áºáᬠááŒá±á¬ááºážáá²áá«áááºá
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
áá»áœááºá¯ááºááá¯á·á áá±á«ááºážá ááºááœáẠá¡á±á¬ááºáá«á¡ááá¯ááºáž áá±á¬áºááŒáá«áááºá
áá»áœááºá¯ááºááá¯á·áááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážááŒáá·áº á¡ááœááºááŸááºáá»á¬ážáá±á¬ JSON
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
ááŒá áºáááºááŸá áºáá¯á áá¯á¶ážááœááºá ááŸááºáááºážáá»á¬ážááœáẠááŒá±á¬ááºážáá²áá²á·áá±á¬ ááŸááºáááºážááá±á¬á· (PK) ááŸáá·áº ááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážá á¡ááŸá áºáá¬á- ááŸááºáááºážááẠáááºááá·áºá¡áá¬ááŸáá·áº áá±á¬ááºááá¯ááºážááœáẠááŒá áºáá¬áá²á·áááºá
- áááŒá
áºáááºá¡ááœááº
INSERT
: áááºááá¯áž (before
) áá®áá»áŸáááºánull
ááŸáá·áº ááŒá®ážáá±á¬áẠ- ááá·áºááœááºážáá¬ážáá±á¬ áá»ááºážá - áááŒá
áºáááºá¡ááœááº
UPDATE
: ááŸá¬payload.before
áá»ááºážáááááºá¡ááŒá±á¡áá±ááá¯ááŒááá¬ážááŒá®ážápayload.after
- á¡ááŒá±á¬ááºážá¡áá²áá¡ááŸá áºáá¬áááŸáá·áºá¡áá°á¡áá áºá
2.2 MongoDB
á€áá»áááºáááºáááááá¬ááẠáááºá DBMS node á oplog á០á¡áá»ááºá¡áááºáá»á¬ážááᯠáááºááŸá¯ááŒááºáž á ᶠMongoDB á¡áá¯áá°ááŸá¯ ááá¹ááá¬ážááᯠá¡áá¯á¶ážááŒá¯áááºá
PgSQL á¡ááœáẠáá±á¬áºááŒááŒá®ážáá¬áž áá»áááºáááºáááááá¬ááŸáá·áº áááºáá°áááºá á€áá±áá¬ááœááºáááºážá áááá¡á ááœááºá áá°ááá±áá¬áá»áŸááºáá áºááŒááºááá¯ááºáá°ááŒá®ážáá±á¬áẠáá»áááºáááºáááááá¬ááẠoplog á á¬áááºááŒááºážáá¯ááºááá¯á· ááŒá±á¬ááºážááœá¬ážáááºá
ááœá²á·á ááºážááŸá¯á¥ááá¬-
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
áááºááœá±á·ááŒááºáááá·áºá¡ááá¯ááºážá á€áá±áá¬ááœáẠááááºááá°áá¬ááŸáá·áº ááŸáá¯ááºážááŸááºáá«á ááœá±ážáá»ááºá áá¬á¡áá áºáá»á¬áž áááŸááá±á¬á·áá±á¬áºáááºáž áá±áá¬áá±á·á áºááá¯á· áá»áááºáááºáááºááŸáá·áº áááºážááá¯á·áááŸá±á·áááºááœá²áá»á¬ážá¡ááœáẠáá¬áááºááŸáááá·áº ááœá±ážáá»ááºá áá¬á¡áá±á¡ááœááºááá¯áᬠáá»áŸá±á¬á·áá»áá¬ážáááºá
setting áá»á¬ážááᯠtransforms
á€áá
áºááŒáááºááœáẠáááºážááá¯á·ááẠá¡á±á¬ááºáá«á¡ááá¯ááºážáá¯ááºáá±á¬ááºáááº- áááºážááá¯á·ááẠáá
áºááŸááºáá±á«ááºážá
ááºá á¡áááºááᯠschema á០ááŒá±á¬ááºážáá²áá«áááºá <server_name>.<db_name>.<collection_name>
в data.cdc.mongo_<db_name>
.
á¡ááŸá¬ážáá¶ááá¯ááºáááº
áá»áœááºá¯ááºááá¯á·áá±ááºááœáẠá¡ááŸá¬ážáá¶ááá¯ááºáááºááŸáááŸá¯ááŸáá·áº ááŒáá·áºáá¬ážáá±á¬áááŸáááá¯ááºááŸá¯ááŒá¿áá¬ááẠááááºáááẠááá¯ááá¯ááŒááºážáááºááẠ- á¡áá°ážáááŒáá·áº áá»áœááºá¯ááºááá¯á·ááẠáá±áá¬ááŸáá·áº ááœá±áá±ážááœá±áá°áá»á¬ážá¡ááŒá±á¬ááºážááŒá±á¬áá±áá»áááºááœááºá áá±áá¬ááŒá±á¬ááºážáá²ááŸá¯áá»á¬ážááᯠááŒá±áá¬áá¶ááŒááºáž á€ááŒá¿áá¬ááœáẠáá±ážáááºáá¬ážááŒááºážáááŸááá«á áááá¬áá¡á áááºá¡áá¬á ááŸá¬ážááœá¬ážááá¯ááºááá²á ááá á¹á áá áºáá¯á á®ááŸá¬ Debezium ááŸá¬ áá¬ááœá±ááŒá áºááá²ááá¯áá¬ááᯠááŒáá·áºááŒáá¡á±á¬ááºá
áááºáá¯ááºááẠááœá±ážáá»ááºá áᬠáá¯á¶ážáá¯ááŸááááºá
- Kafka Connect áá¡á±á¬ááºááŒááºáá«á. Connect ááᯠááŒáá·áºáá±áá¯ááºááœáẠá¡áá¯ááºáá¯ááºááẠá á®á ááºáááºááŸááºáá¬ážáá«áá áááºážááẠáá°áá®áá±á¬ group.id ááᯠáááºááŸááºááẠá¡áá¯ááºááá¬ážáá»á¬ážá áœá¬ ááá¯á¡ááºáááºá ááá¯á·áá±á¬áẠáááºážááá¯á·á¡áááºááŸáá áºáᯠáá»ááºááœááºáá«áá áá»áááºáááºáááááá¬ááᯠá¡ááŒá¬ážá¡áá¯ááºááá¬ážáá áºáŠážááœáẠááŒááºáááºá áááºáááºááŒá áºááŒá®áž Kafka ááŸá áá±á«ááºážá ááºáá« áá±á¬ááºáá¯á¶ážáááºááŸááºáá¬ážááá·áºá¡áá±á¡áá¬ážá០áááºáááºáááºááŸá¯áááºááŒá áºáááºá
- Kafka á¡á á¯á¡ááœá²á·ááŸáá·áº áá»áááºáááºááŸá¯ áá¯á¶ážááŸá¯á¶ážááŒááºážá. áá»áááºáááºáááááá¬ááẠKafka ááá¯á·áá±ážááá¯á·ááẠáá»ááºááœááºááá·áº á¡áá±á¡áá¬ážááœáẠá á¬áááºááŒááºážááᯠáááºááá·áºáááºááŒá áºááŒá®áž ááŒáá¯ážáááºážááŸá¯ áá¡á±á¬ááºááŒááºááá»ááºáž áááºážááᯠá¡áá«á¡á¬ážáá»á±á¬áºá áœá¬ ááŒááºáááºáá±ážááá¯á·ááẠááŒáá¯ážá á¬ážáááºááŒá áºáááºá
- áá±áá¬á¡áááºážá¡ááŒá
ẠááááŸáááá¯ááºáá«á. áá»áááºáááºáááááá¬ááẠááŒááºáááºáááºááŸááºáá¬ážááá·áºá¡ááá¯ááºáž á¡áááºážá¡ááŒá
áºááá¯á· ááŒááºáááºáá»áááºáááºááẠááŒáá¯ážáááºážáááºááŒá
áºáááºá áá¯á¶áá±ááẠ16 ááŒáááºá¡áá¯á¶ážááŒá¯áá¬ážáááºá
exponential backoff . áá ááŒáááºááŒá±á¬áẠááŒáá¯ážáááºážááŸá¯ áá¡á±á¬ááºááŒááºááŒá®ážáá±á¬ááºá á¡áá¯ááºá¡ááŒá Ạá¡ááŸááºá¡áá¬ážááŒá¯áá«áááºá áá»ááºááœáẠKafka Connect REST á¡ááºáá¬áá±á·á áºááŸáá áºááá·áº áááºážááᯠáááºááá¯ááºááá¯áẠááŒááºáááºá áááºááẠááá¯á¡ááºáááºááŒá áºáááºá- áááŒá áºáááºá¡ááœáẠPostgreSQL áá±áá¬ááœá± áá¯á¶ážááŸá¯á¶ážááŸá¬ ááá¯ááºáá°ážá áá¯á¶áá°á¡áá±á«ááºáá»á¬ážááᯠá¡áá¯á¶ážááŒá¯ááŒááºážááŒáá·áº áá»áááºáááºáááááá¬á០ááááºááá±ážáá±á¬ WAL ááá¯ááºáá»á¬ážááᯠáá»ááºááŒááºážá០áá¬ážáá®ážáá«áááºá á€ááá á¹á ááœááºá á¡ááŒáœá±á á±á·á¡ááœáẠá¡á¬ážáááºážáá»ááºáá áºáá¯áááºáž ááŸááááº- connector ááŸáá·áº DBMS á¡ááŒá¬áž ááœááºáááºáá»áááºáááºááŸá¯ááẠá¡áá»áááºá¡áá±á¬áºááŒá¬ ááŒááºáá±á¬ááºááœá¬ážáá«áá disk space áá¯ááºáá¯á¶ážááœá¬ážááá·áº ááŒá áºááá¯ááºáá»á±ááŸáááŒá®áž áááºážááẠáá»áá¯á·ááœááºážáá»ááºáá®ááá¯á· áŠážáááºááœá¬ážááá¯ááºáááºá DBMS áá áºáá¯áá¯á¶ážá
- áááŒá áºáááºá¡ááœáẠá MySQL áá»áááºáááºááŸá¯ááᯠááŒááºáááºáááá°áá® binlog ááá¯ááºáá»á¬ážááᯠDBMS ááá¯ááºááá¯ááºá ááŸáá·áºááá¯ááºáááºá áááºážááẠáá»áááºáááºáááááá¬á¡á¬áž áá¡á±á¬ááºááŒááºááá·áºá¡ááŒá±á¡áá±ááá¯á· áá±á¬ááºááœá¬ážá á±ááŒá®áž áá¯á¶ááŸááºáááºáááºááŸá¯ááᯠááŒááºáááºááá°áááºá¡ááœáẠbinlog áá»á¬ážá០áááºáááºáááºááŸá¯ááẠáááŠážáá»áŸááºáá áºááŒááºáá¯ááºááœáẠááŒááºáááºá áááºááẠááá¯á¡ááºáááºááŒá áºáááºá
- á¡áá±á«áº MongoDB. ááŸááºáááºážááœááºáá±á¬áºááŒáá¬ážáááº- áá±á¬á·ááº/oplog ááá¯ááºáá»á¬ážááᯠáá»ááºááá¯ááºááŒááºážááŒá
áºááŒá®áž áá»áááºáááºáááááá¬ááẠDBMS áá»á¬ážá¡á¬ážáá¯á¶ážá¡ááœáẠá¡áá°áá°áááºááŒá
áºááŒá±á¬ááºáž á
á¬ááœááºá
á¬áááºážááœááºáá±á¬áºááŒáá¬ážááẠááá¯ááá¯áááºááŸá¬ áá»áááºáááºáááááá¬ááẠá¡ááŒá±á¡áá±ááá¯á· áá±á¬ááºááœá¬ážáááºááŒá
áºáááºá áá»ááºááœáẠáá¯ááºááœáẠááŒááºáááºá
áááºááẠááá¯á¡ááºáááºááŒá
áºáááºá áááŠážáá»áŸááºáá
áºááŒááº.
ááá¯á·áá±á¬áº ááŒáœááºážáá»áẠááŸááá«áááºá áá»áááºáááºáááááá¬á¡á¬áž á¡áá»áááºá¡áá±á¬áºááŒá¬ áá»áááºáááºááŸá¯ ááŒááºáá±á¬ááºááœá¬ážáá«á (ááá¯á·ááá¯áẠMongoDB á á¶ááá°áá¬ááá¯á· ááá±á¬ááºááŸáááá¯ááºáá«)á á€á¡áá»áááºá¡ááœááºáž oplog ááẠááŸáá·áºáááºááœá¬ážáááºá ááá¯á·áá±á¬áẠáá»áááºáááºááŸá¯ááᯠááŒááºáááºááá°áá±á¬á¡áá«á áá»áááºáááºáááááá¬ááẠááááááá¯ááºáá±á¬ á¡áá±á¡áá¬ážá០áá±áá¬ááᯠáááºááŒáááºá áœá¬ áááºáááºáááºááŸá¯ááá¯ááºáááºááŒá áºáááºá ááá¯á·ááŒá±á¬áá·áº Kafka ááŸá áá±áá¬á¡áá»áá¯á· ááá¯áẠááá¯ááºáááá·áºáááºá
áá±á¬ááºáá»ááº
Debezium ááẠCDC á
áá
áºáá»á¬ážááŸáá·áº áááºáááºá áá»áœááºá¯ááºá ááááá¯á¶áž á¡ááœá±á·á¡ááŒá¯á¶ááŒá
áºááŒá®áž á¡áá¯á¶ážá
á¯á¶ á¡ááœááºáá±á¬ááºážááœááºáá«áááºá ááá±á¬áá»ááºááẠá¡ááá DBMS áá»á¬ážá¡ááœáẠáá¶á·ááá¯ážááŸá¯á ááœá²á·á
ááºážááŸá¯ááœááºáá°ááŸá¯á á¡á
á¯ááá¯ááºá¡ááŒá¯á¶ááá¯áẠáá¶á·ááá¯ážááŸá¯ááŸáá·áº áááºááŒáœáá±á¬á¡ááá¯ááºá¡áááºážá¡ááœáẠáá¶á·ááá¯ážááŸá¯ááŒáá·áº á¡á±á¬ááºááŒááºáá²á·áááºá áááºááœá±á·á
áááºáááºá
á¬ážáá²á·áá°ááœá±á¡ááœáẠáááºážááœáŸááºáá»ááºááœá±ááᯠáááºááŒáá·áºááá¯á· á¡ááŒá¶ááŒá¯áá»ááºáá«áááºá
Kafka Connect á¡ááœáẠJDBC áá»áááºáááºáááááá¬ááŸáá·áº ááŸáá¯ááºážááŸááºáá«áá Debezium á á¡áááá¡á¬ážáá¬áá»ááºááŸá¬ áá±áá¬ááᯠlatency á¡áááºážáá¯á¶ážááŒáá·áº áááºáá¶áááŸáá á±ááá¯ááºááá·áº DBMS ááŸááºáááºážáá»á¬ážá០á¡ááŒá±á¬ááºážá¡áá²áá»á¬ážááᯠáááºááá¯ááºááŒááºážááŒá áºáááºá JDBC Connector (Kafka Connect ááŸ) ááẠáááºááŸááºáá¬ážáá±á¬ á¡áá»áááºáá¬ááá áºáá¯ááœáẠá á±á¬áá·áºááŒáá·áºáá¬ážáá±á¬ ááá¬ážááᯠáá±ážááŒááºážááŒá®áž (áá°áá®áá±á¬á¡ááŒá±á¬ááºážááŒáá»ááºááŒá±á¬áá·áº) áá±áá¬ááᯠáá»ááºááá¯ááºáá±á¬á¡áá«ááœáẠáááºáá±á·áá»áºáá»á¬áž ááá¯ááºáá±ážáá« (ááááºááŸááá±á¬áá±áá¬ááᯠáááºáááºáá²á·ááá¯á· áá±ážááŒááºážááá¯ááºáááºáááºážá)
á¡áá¬ážáá°ááŒá¿áá¬áá»á¬ážááá¯ááŒá±ááŸááºážáááºá áááºáááºá¡á±á¬ááºáá«ááŒá±ááŸááºážáááºážáá»á¬áž (Debezium á¡ááŒááº) ááá¯á¡á¬áá¯á¶á áá¯ááºááá¯ááºáááºá
-
JDBC Connector Kafka Connect; - MySQL á¡ááœááºáᬠááŒá±ááŸááºážáá»ááºáá»á¬ážá áœá¬-
-
Oracle GoldenGate áá«áá±ááá·áº áá«á áá¯á¶ážáááœá²ááŒá¬ážáá²á· "á¡áá±ážáá»áááºá¡áá»áá¯ážá¡á á¬áž" ááŒá áºáá«áááºá
PS
áá»áœááºá¯ááºááá¯á·áááá±á¬á·ááºááœááºáááºážáááºáá«
- «
Kubernetes ááŸá Kafka á¡á á¯á¡áá±ážá¡ááœáẠááá·áºáá»á±á¬áºáá±á¬á¡ááœááºá¡á á¬ážááᯠáááºááŸááºááŒááºážá "; - «
áá»áœááºá¯ááºááá¯á·á SRE áá±á·á ááºááá០áááºááœá±á·áá»áá±á¬ áá¬ááºáááºážáá»á¬ážá á¡ááá¯ááºáž 2 "; - «
Kubernetesá áá»áœááºá¯ááºááá¯á·áááœá±ážáá»ááºááŸá¯áá»á¬ážááŸáá·áº á¡ááœá±á·á¡ááŒá¯á¶áá»á¬ážá¡ááœáẠPostgreSQL áá±á¬áºááŒáá»ááºáá»á¬ážá á¡áá»ááºážáá»á¯áẠ"á
source: www.habr.com