ื”ื›ื™ืจื• ืืช Debezium - CDC ืขื‘ื•ืจ ืืคืืฆ'ื™ ืงืคืงื

ื”ื›ื™ืจื• ืืช Debezium - CDC ืขื‘ื•ืจ ืืคืืฆ'ื™ ืงืคืงื

ื‘ืขื‘ื•ื“ืชื™, ืื ื™ ื ืชืงืœ ืœืขืชื™ื ืงืจื•ื‘ื•ืช ื‘ืคืชืจื•ื ื•ืช ื˜ื›ื ื™ื™ื / ืžื•ืฆืจื™ ืชื•ื›ื ื” ื—ื“ืฉื™ื, ืฉืžื™ื“ืข ืขืœื™ื”ื ื“ื™ ื“ืœ ื‘ืื™ื ื˜ืจื ื˜ ื“ื•ื‘ืจ ื”ืจื•ืกื™ืช. ื‘ืžืืžืจ ื–ื”, ืื ืกื” ืœืžืœื ืคืขืจ ืื—ื“ ื›ื–ื” ืขื ื“ื•ื’ืžื” ืžื”ืชืจื’ื•ืœ ื”ืื—ืจื•ืŸ ืฉืœื™, ื›ืืฉืจ ื”ื™ื™ืชื™ ืฆืจื™ืš ืœื”ื’ื“ื™ืจ ืฉืœื™ื—ืช ืื™ืจื•ืขื™ CDC ืžืฉื ื™ DBMSs ืคื•ืคื•ืœืจื™ื™ื (PostgreSQL ื•-MongoDB) ืœืืฉื›ื•ืœ ืงืคืงื ื‘ืืžืฆืขื•ืช Debezium. ืื ื™ ืžืงื•ื•ื” ืฉืžืืžืจ ืกืงื™ืจื” ื–ื”, ืฉื”ื•ืคื™ืข ื›ืชื•ืฆืื” ืžื”ืขื‘ื•ื“ื” ืฉื ืขืฉืชื”, ื™ื”ื™ื” ืฉื™ืžื•ืฉื™ ืœืื—ืจื™ื.

ืžื” ื–ื” Debezium ื•-CDC ื‘ืื•ืคืŸ ื›ืœืœื™?

ื“ื‘ื™ืฆื™ื•ื - ื ืฆื™ื’ ืฉืœ ืงื˜ื’ื•ืจื™ื™ืช ื”ืชื•ื›ื ื” CDC (ืฉื™ื ื•ื™ ืœื›ื™ื“ืช ื ืชื•ื ื™ื), ืื• ืœื™ืชืจ ื“ื™ื•ืง, ื–ื•ื”ื™ ืงื‘ื•ืฆื” ืฉืœ ืžื—ื‘ืจื™ื ืขื‘ื•ืจ DBMSs ืฉื•ื ื™ื ื”ืชื•ืืžื™ื ืœืžืกื’ืจืช Apache Kafka Connect.

ื–ื” ืคืจื•ื™ืงื˜ ืงื•ื“ ืคืชื•ื—, ืžื•ืจืฉื” ืชื—ืช ืจื™ืฉื™ื•ืŸ Apache v2.0 ื•ื‘ื—ืกื•ืช Red Hat. ื”ืคื™ืชื•ื— ืžืชื‘ืฆืข ืžืื– 2016 ื•ื›ืจื’ืข ื”ื•ื ืžืกืคืง ืชืžื™ื›ื” ืจืฉืžื™ืช ืœ-DBMS ื”ื‘ืื™ื: MySQL, PostgreSQL, MongoDB, SQL Server. ื™ืฉื ื ื’ื ืžื—ื‘ืจื™ื ืœืงืกื ื“ืจื” ื•ืื•ืจืงืœ, ืืš ื”ื ื ืžืฆืื™ื ื›ืขืช ื‘ืกื˜ื˜ื•ืก "ื’ื™ืฉื” ืžื•ืงื“ืžืช", ื•ืžื”ื“ื•ืจื•ืช ื—ื“ืฉื•ืช ืื™ื ืŸ ืžื‘ื˜ื™ื—ื•ืช ืชืื™ืžื•ืช ืœืื—ื•ืจ.

ืื ื ืฉื•ื•ื” ืืช CDC ืขื ื”ื’ื™ืฉื” ื”ืžืกื•ืจืชื™ืช (ื›ืฉื”ืืคืœื™ืงืฆื™ื” ืงื•ืจืืช ื ืชื•ื ื™ื ืžื”-DBMS ื™ืฉื™ืจื•ืช), ืื– ื”ื™ืชืจื•ื ื•ืช ื”ืขื™ืงืจื™ื™ื ืฉืœื” ื›ื•ืœืœื™ื ื™ื™ืฉื•ื ื”ื–ืจืžืช ืฉื™ื ื•ื™ื™ ื ืชื•ื ื™ื ื‘ืจืžืช ื”ืฉื•ืจื” ืขื ื—ื‘ื™ื•ืŸ ื ืžื•ืš, ืืžื™ื ื•ืช ื•ื–ืžื™ื ื•ืช ื’ื‘ื•ื”ื•ืช. ืฉืชื™ ื”ื ืงื•ื“ื•ืช ื”ืื—ืจื•ื ื•ืช ืžื•ืฉื’ื•ืช ืขืœ ื™ื“ื™ ืฉื™ืžื•ืฉ ื‘ืืฉื›ื•ืœ ืงืคืงื ื›ืžืื’ืจ ืœืื™ืจื•ืขื™ CDC.

ื›ืžื• ื›ืŸ, ื”ื™ืชืจื•ื ื•ืช ื›ื•ืœืœื™ื ืืช ื”ืขื•ื‘ื“ื” ืฉืžื•ื“ืœ ื™ื—ื™ื“ ืžืฉืžืฉ ืœืื—ืกื•ืŸ ืื™ืจื•ืขื™ื, ื›ืš ืฉื”ืืคืœื™ืงืฆื™ื” ื”ืกื•ืคื™ืช ืœื ืฆืจื™ื›ื” ืœื“ืื•ื’ ืœื ื™ื•ืื ืกื™ื ืฉืœ ื”ืคืขืœืช DBMS ืฉื•ื ื™ื.

ืœื‘ืกื•ืฃ, ืฉื™ืžื•ืฉ ื‘ืžืชื•ื•ืš ื”ื•ื“ืขื•ืช ืคื•ืชื— ื˜ื•ื•ื— ืœืฉื™ื ื•ื™ ืงื ื” ืžื™ื“ื” ืื•ืคืงื™ ืฉืœ ื™ื™ืฉื•ืžื™ื ื”ืขื•ืงื‘ื™ื ืื—ืจ ืฉื™ื ื•ื™ื™ื ื‘ื ืชื•ื ื™ื. ื™ื—ื“ ืขื ื–ืืช, ื”ื”ืฉืคืขื” ืขืœ ืžืงื•ืจ ื”ื ืชื•ื ื™ื ืžืฆื˜ืžืฆืžืช, ืžืื—ืจ ืฉื”ื ืชื•ื ื™ื ืžืชืงื‘ืœื™ื ืœื ื™ืฉื™ืจื•ืช ืžื”-DBMS, ืืœื ืžืืฉื›ื•ืœ ืงืคืงื.

ืขืœ ืืจื›ื™ื˜ืงื˜ื•ืจืช ื“ื‘ื–ื™ื•ื

ื”ืฉื™ืžื•ืฉ ื‘-Debezium ืžืกืชื›ื ื‘ืชื›ื ื™ืช ื”ืคืฉื•ื˜ื” ื”ื–ื•:

DBMS (ื›ืžืงื•ืจ ื ืชื•ื ื™ื) โ†’ ืžื—ื‘ืจ ื‘-Kafka Connect โ†’ Apache Kafka โ†’ ืฆืจื›ืŸ

ื›ื”ืžื—ืฉื”, ืืชืŸ ืชืจืฉื™ื ืžืืชืจ ื”ืคืจื•ื™ืงื˜:

ื”ื›ื™ืจื• ืืช Debezium - CDC ืขื‘ื•ืจ ืืคืืฆ'ื™ ืงืคืงื

ืขื ื–ืืช, ืื ื™ ืœื ืžืžืฉ ืื•ื”ื‘ ืืช ื”ืชื•ื›ื ื™ืช ื”ื–ื•, ื›ื™ ื ืจืื” ืฉืจืง ืžื—ื‘ืจ ื›ื™ื•ืจ ืืคืฉืจื™.

ื‘ืžืฆื™ืื•ืช, ื”ืžืฆื‘ ืฉื•ื ื”: ืžื™ืœื•ื™ Data Lake ืฉืœืš (ืงื™ืฉื•ืจ ืื—ืจื•ืŸ ื‘ืชืจืฉื™ื ืœืžืขืœื”) ื–ื• ืœื ื”ื“ืจืš ื”ื™ื—ื™ื“ื” ืœื”ืฉืชืžืฉ ื‘-Debezium. ืื™ืจื•ืขื™ื ืฉื ืฉืœื—ื™ื ืœ- Apache Kafka ื™ื›ื•ืœื™ื ืœืฉืžืฉ ืืช ื”ืืคืœื™ืงืฆื™ื•ืช ืฉืœืš ื›ื“ื™ ืœื”ืชืžื•ื“ื“ ืขื ืžืฆื‘ื™ื ืฉื•ื ื™ื. ืœื“ื•ื’ืžื”:

  • ื”ืกืจื” ืฉืœ ื ืชื•ื ื™ื ืœื ืจืœื•ื•ื ื˜ื™ื™ื ืžื”ืžื˜ืžื•ืŸ;
  • ืฉืœื™ื—ืช ื”ื•ื“ืขื•ืช;
  • ืขื“ื›ื•ื ื™ ืื™ื ื“ืงืก ื—ื™ืคื•ืฉ;
  • ืื™ื–ืฉื”ื• ื™ื•ืžื ื™ ื‘ื™ืงื•ืจืช;
  • ...

ื‘ืžื™ื“ื” ื•ื™ืฉ ืœื›ื ืืคืœื™ืงืฆื™ื™ืช ื’'ืื•ื•ื” ื•ืื™ืŸ ืฆื•ืจืš/ืืคืฉืจื•ืช ืœื”ืฉืชืžืฉ ื‘ืืฉื›ื•ืœ ืงืคืงื, ื™ืฉื ื” ื’ื ืืคืฉืจื•ืช ืœืขื‘ื•ื“ ื“ืจืš ืžื—ื‘ืจ ืžื•ื˜ื‘ืข. ื”ื™ืชืจื•ืŸ ื”ื‘ืจื•ืจ ื”ื•ื ืฉื‘ืืžืฆืขื•ืชื• ื ื™ืชืŸ ืœืกืจื‘ ืœืชืฉืชื™ืช ื ื•ืกืคืช (ื‘ืฆื•ืจืช ืžื—ื‘ืจ ื•ืงืคืงื). ืขื ื–ืืช, ืคืชืจื•ืŸ ื–ื” ื”ื•ืฆื ืžื’ืจืกื” 1.1 ื•ืื™ื ื• ืžื•ืžืœืฅ ืขื•ื“ ืœืฉื™ืžื•ืฉ (ื™ื™ืชื›ืŸ ืฉื”ื•ื ื™ื•ืกืจ ื‘ืžื”ื“ื•ืจื•ืช ืขืชื™ื“ื™ื•ืช).

ืžืืžืจ ื–ื” ื™ื“ื•ืŸ ื‘ืืจื›ื™ื˜ืงื˜ื•ืจื” ื”ืžื•ืžืœืฆืช ืขืœ ื™ื“ื™ ืžืคืชื—ื™ื, ื”ืžืกืคืงืช ืกื•ื‘ืœื ื•ืช ืชืงืœื•ืช ื•ืžื“ืจื’ื™ื•ืช.

ืชืฆื•ืจืช ืžื—ื‘ืจ

ืขืœ ืžื ืช ืœื”ืชื—ื™ืœ ืœืขืงื•ื‘ ืื—ืจ ืฉื™ื ื•ื™ื™ื ื‘ืขืจืš ื”ื—ืฉื•ื‘ ื‘ื™ื•ืชืจ - ื ืชื•ื ื™ื - ืื ื• ืฆืจื™ื›ื™ื:

  1. ืžืงื•ืจ ื ืชื•ื ื™ื, ืฉื™ื›ื•ืœ ืœื”ื™ื•ืช MySQL ื”ื—ืœ ืžื’ืจืกื” 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (ืจืฉื™ืžื” ืžืœืื”);
  2. ืืฉื›ื•ืœ ืืคืืฆ'ื™ ืงืคืงื
  3. ืžื•ืคืข Kafka Connect (ื’ืจืกืื•ืช 1.x, 2.x);
  4. ืžื—ื‘ืจ Debezium ืžื•ื’ื“ืจ.

ืขื‘ื“ื• ืขืœ ืฉืชื™ ื”ื ืงื•ื“ื•ืช ื”ืจืืฉื•ื ื•ืช, ื›ืœื•ืžืจ. ืชื”ืœื™ืš ื”ืชืงื ืช DBMS ื•- Apache Kafka ื”ื ืžืขื‘ืจ ืœื”ื™ืงืฃ ื”ืžืืžืจ. ืขื ื–ืืช, ืœืžื™ ืฉืจื•ืฆื” ืœืคืจื•ืก ื”ื›ืœ ื‘ืืจื’ื– ื—ื•ืœ, ื™ืฉ ืื—ื“ ืžื•ื›ืŸ ื‘ืžืื’ืจ ื”ืจืฉืžื™ ืขื ื“ื•ื’ืžืื•ืช docker-compose.yaml.

ื ืชืžืงื“ ื‘ืฉืชื™ ื”ื ืงื•ื“ื•ืช ื”ืื—ืจื•ื ื•ืช ื‘ื™ืชืจ ืคื™ืจื•ื˜.

0. ืงืคืงื ืงื•ื ืงื˜

ื›ืืŸ ื•ื‘ื”ืžืฉืš ื”ืžืืžืจ, ื›ืœ ื“ื•ื’ืžืื•ืช ื”ืชืฆื•ืจื” ื ืœืงื—ื•ืช ื‘ื—ืฉื‘ื•ืŸ ื‘ื”ืงืฉืจ ืฉืœ ืชืžื•ื ืช Docker ื”ืžื•ืคืฆืช ืขืœ ื™ื“ื™ ืžืคืชื—ื™ Debezium. ื”ื•ื ืžื›ื™ืœ ืืช ื›ืœ ืงื‘ืฆื™ ื”ืคืœืื’ื™ืŸ ื”ื“ืจื•ืฉื™ื (ืžื—ื‘ืจื™ื) ื•ืžืกืคืง ืงืคืงื ืงื•ื ืงื˜ ืชืฆื•ืจื” ื‘ืืžืฆืขื•ืช ืžืฉืชื ื™ ืกื‘ื™ื‘ื”.

ืื ืืชื” ืžืชื›ื•ื•ืŸ ืœื”ืฉืชืžืฉ ื‘-Kafka Connect ืž-Confluent, ืชืฆื˜ืจืš ืœื”ื•ืกื™ืฃ ืืช ื”ืชื•ืกืคื™ื ืฉืœ ื”ืžื—ื‘ืจื™ื ื”ื“ืจื•ืฉื™ื ื‘ืขืฆืžืš ืœืกืคืจื™ื™ื” ื”ืžืฆื•ื™ื ืช ื‘- plugin.path ืื• ืœื”ื’ื“ื™ืจ ื‘ืืžืฆืขื•ืช ืžืฉืชื ื” ืกื‘ื™ื‘ื” CLASSPATH. ื”ื”ื’ื“ืจื•ืช ืฉืœ ื”ืขื•ื‘ื“ ื•ื”ืžื—ื‘ืจื™ื ืฉืœ Kafka Connect ืžื•ื’ื“ืจื•ืช ื‘ืืžืฆืขื•ืช ืงื‘ืฆื™ ืชืฆื•ืจื” ื”ืžื•ืขื‘ืจื™ื ื›ืืจื’ื•ืžื ื˜ื™ื ืœืคืงื•ื“ืช ื”ืชื—ืœ ืฉืœ ื”ืขื•ื‘ื“. ืœืคืจื˜ื™ื ืจืื” ืชื™ืขื•ื“.

ื›ืœ ืชื”ืœื™ืš ื”ืงืžืช Debeizum ื‘ื’ืจืกืช ื”ืžื—ื‘ืจ ืžืชื‘ืฆืข ื‘ืฉื ื™ ืฉืœื‘ื™ื. ื”ื‘ื” ื ืฉืงื•ืœ ื›ืœ ืื—ื“ ืžื”ื:

1. ื”ืงืžืช ื”ืžืกื’ืจืช ืฉืœ Kafka Connect

ื›ื“ื™ ืœื”ื–ืจื™ื ื ืชื•ื ื™ื ืœืืฉื›ื•ืœ Apache Kafka, ืคืจืžื˜ืจื™ื ืกืคืฆื™ืคื™ื™ื ืžื•ื’ื“ืจื™ื ื‘ืžืกื’ืจืช Kafka Connect, ื›ื’ื•ืŸ:

  • ื”ื’ื“ืจื•ืช ื—ื™ื‘ื•ืจ ืืฉื›ื•ืœ,
  • ืฉืžื•ืช ืฉืœ ื ื•ืฉืื™ื ื‘ื”ื ืชื™ืฉืžืจ ืชืฆื•ืจืช ื”ืžื—ื‘ืจ ืขืฆืžื•,
  • ืฉื ื”ืงื‘ื•ืฆื” ืฉื‘ื” ืคื•ืขืœ ื”ืžื—ื‘ืจ (ื‘ืžืงืจื” ืฉืœ ืฉื™ืžื•ืฉ ื‘ืžืฆื‘ ืžื‘ื•ื–ืจ).

ืชืžื•ื ืช Docker ื”ืจืฉืžื™ืช ืฉืœ ื”ืคืจื•ื™ืงื˜ ืชื•ืžื›ืช ื‘ืชืฆื•ืจื” ื‘ืืžืฆืขื•ืช ืžืฉืชื ื™ ืกื‘ื™ื‘ื” - ื–ื” ืžื” ืฉื ืฉืชืžืฉ. ืื– ื‘ื•ื ื ื•ืจื™ื“ ืืช ื”ืชืžื•ื ื”:

docker pull debezium/connect

ืงื‘ื•ืฆืช ืžืฉืชื ื™ ื”ืกื‘ื™ื‘ื” ื”ืžื™ื ื™ืžืœื™ืช ื”ื ื“ืจืฉืช ืœื”ืคืขืœืช ื”ืžื—ื‘ืจ ื”ื™ื ื›ื“ืœืงืžืŸ:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - ืจืฉื™ืžื” ืจืืฉื•ื ื™ืช ืฉืœ ืฉืจืชื™ ืืฉื›ื•ืœ ืงืคืงื ื›ื“ื™ ืœืงื‘ืœ ืจืฉื™ืžื” ืžืœืื” ืฉืœ ื—ื‘ืจื™ ืืฉื›ื•ืœ;
  • OFFSET_STORAGE_TOPIC=connector-offsets - ื ื•ืฉื ืœืื—ืกื•ืŸ ืขืžื“ื•ืช ืฉื‘ื”ืŸ ื”ืžื—ื‘ืจ ื ืžืฆื ื›ืขืช;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - ื ื•ืฉื ืœืื—ืกื•ืŸ ืžืฆื‘ ื”ืžื—ื‘ืจ ื•ืžืฉื™ืžื•ืชื™ื•;
  • CONFIG_STORAGE_TOPIC=connector-config - ื ื•ืฉื ืœืื—ืกื•ืŸ ื ืชื•ื ื™ ืชืฆื•ืจืช ืžื—ื‘ืจื™ื ื•ืžืฉื™ืžื•ืชื™ื•;
  • GROUP_ID=1 - ืžื–ื”ื” ืฉืœ ืงื‘ื•ืฆืช ื”ืขื•ื‘ื“ื™ื ืฉืขืœื™ื” ื ื™ืชืŸ ืœื‘ืฆืข ืืช ืžืฉื™ืžืช ื”ืžื—ื‘ืจ; ื ื“ืจืฉ ื‘ืขืช ืฉื™ืžื•ืฉ ืžื‘ื•ื–ืจ (ืžื•ืคืฅ) ืžึดืฉืื˜ึธืจ.

ืื ื• ืžืชื—ื™ืœื™ื ืืช ื”ืžื™ื›ืœ ืขื ื”ืžืฉืชื ื™ื ื”ื‘ืื™ื:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

ื”ืขืจื” ืœื’ื‘ื™ ืื‘ืจื•

ื›ื‘ืจื™ืจืช ืžื—ื“ืœ, Debezium ื›ื•ืชื‘ ื ืชื•ื ื™ื ื‘ืคื•ืจืžื˜ JSON, ืฉืžืงื•ื‘ืœ ืขื‘ื•ืจ ืืจื’ื–ื™ ื—ื•ืœ ื•ื›ืžื•ื™ื•ืช ืงื˜ื ื•ืช ืฉืœ ื ืชื•ื ื™ื, ืืš ื™ื›ื•ืœ ืœื”ื•ื•ืช ื‘ืขื™ื” ื‘ืžืกื“ื™ ื ืชื•ื ื™ื ืขืžื•ืกื™ื ื‘ื›ื‘ื“ื•ืช. ืืœื˜ืจื ื˜ื™ื‘ื” ืœืžืžื™ืจ JSON ื”ื™ื ืœื‘ืฆืข ืกื“ืจื” ืฉืœ ื”ื•ื“ืขื•ืช ื‘ืืžืฆืขื•ืช ืื‘ืจื• ืœืคื•ืจืžื˜ ื‘ื™ื ืืจื™, ื”ืžืคื—ื™ืช ืืช ื”ืขื•ืžืก ืขืœ ืชืช-ืžืขืจื›ืช ื”-I/O ื‘- Apache Kafka.

ื›ื“ื™ ืœื”ืฉืชืžืฉ ื‘- Avro, ืขืœื™ืš ืœืคืจื•ืก ืžื›ืฉื™ืจ ื ืคืจื“ schema-registry (ืœืื—ืกื•ืŸ ืกื›ืžื•ืช). ื”ืžืฉืชื ื™ื ืขื‘ื•ืจ ื”ืžืžื™ืจ ื™ื™ืจืื• ื›ืš:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

ืคืจื˜ื™ื ืขืœ ื”ืฉื™ืžื•ืฉ ื‘-Avro ื•ื”ื’ื“ืจืช ืจื™ืฉื•ื ืขื‘ื•ืจื• ื”ื ืžืขื‘ืจ ืœืชื—ื•ื ื”ืžืืžืจ - ื‘ื”ืžืฉืš, ืœืฉื ื”ื‘ื”ื™ืจื•ืช, ื ืฉืชืžืฉ ื‘-JSON.

2. ื”ื’ื“ืจืช ื”ืžื—ื‘ืจ ืขืฆืžื•

ืขื›ืฉื™ื• ืืชื” ื™ื›ื•ืœ ืœืœื›ืช ื™ืฉื™ืจื•ืช ืœืชืฆื•ืจืช ื”ืžื—ื‘ืจ ืขืฆืžื•, ืฉื™ืงืจื ื ืชื•ื ื™ื ืžื”ืžืงื•ืจ.

ื‘ื•ืื• ื ืกืชื›ืœ ืขืœ ื”ื“ื•ื’ืžื” ืฉืœ ืžื—ื‘ืจื™ื ืœืฉื ื™ DBMS: PostgreSQL ื•-MongoDB, ืฉืขื‘ื•ืจื ื™ืฉ ืœื™ ื ื™ืกื™ื•ืŸ ื•ืฉื™ืฉ ื‘ื™ื ื™ื”ื ื”ื‘ื“ืœื™ื (ืืžื ื ืงื˜ื ื™ื, ืื‘ืœ ื‘ื—ืœืง ืžื”ืžืงืจื™ื ืžืฉืžืขื•ืชื™ื™ื!).

ื”ืชืฆื•ืจื” ืžืชื•ืืจืช ื‘ืกื™ืžื•ืŸ JSON ื•ืžื•ืขืœืช ืœ-Kafka Connect ื‘ืืžืฆืขื•ืช ื‘ืงืฉืช POST.

2.1. PostgreSQL

ืชืฆื•ืจืช ืžื—ื‘ืจ ืœื“ื•ื’ืžื” ืขื‘ื•ืจ PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

ืขืงืจื•ืŸ ื”ืคืขื•ืœื” ืฉืœ ื”ืžื—ื‘ืจ ืœืื—ืจ ืชืฆื•ืจื” ื–ื• ื”ื•ื ื“ื™ ืคืฉื•ื˜:

  • ื‘ื”ืชื—ืœื” ื”ืจืืฉื•ื ื” ื”ื•ื ืžืชื—ื‘ืจ ืœืžืกื“ ื”ื ืชื•ื ื™ื ืฉืฆื•ื™ืŸ ื‘ืชืฆื•ืจื” ื•ืžืชื—ื™ืœ ื‘ืžืฆื‘ ืชืžื•ื ืช ืžืฆื‘ ืจืืฉื•ื ื™ืช, ืฉืœื™ื—ืช ืœืงืคืงื ืืช ืกื˜ ื”ื ืชื•ื ื™ื ื”ืจืืฉื•ื ื™ ืฉื”ืชืงื‘ืœ ืขื ื”ืชื ืื™ SELECT * FROM table_name.
  • ืœืื—ืจ ื”ืฉืœืžืช ื”ืืชื—ื•ืœ, ื”ืžื—ื‘ืจ ื ื›ื ืก ืœืžืฆื‘ ืฉืœ ืงืจื™ืืช ืฉื™ื ื•ื™ื™ื ืžืงื‘ืฆื™ PostgreSQL WAL.

ืœื’ื‘ื™ ื”ืืคืฉืจื•ื™ื•ืช ื‘ื”ืŸ ื ืขืฉื” ืฉื™ืžื•ืฉ:

  • name - ืฉื ื”ืžื—ื‘ืจ ืฉืขื‘ื•ืจื• ืžืฉืžืฉืช ื”ืชืฆื•ืจื” ื”ืžืชื•ืืจืช ืœื”ืœืŸ; ื‘ืขืชื™ื“, ื”ืฉื ื”ื–ื” ื™ืฉืžืฉ ืœืขื‘ื•ื“ื” ืขื ื”ืžื—ื‘ืจ (ื›ืœื•ืžืจ ืœื”ืฆื™ื’ ืืช ื”ืกื˜ื˜ื•ืก / ื”ืคืขืœ ืžื—ื“ืฉ / ืขื“ื›ืŸ ืืช ื”ืชืฆื•ืจื”) ื“ืจืš Kafka Connect REST API;
  • connector.class - ืžื—ืœืงืช ืžื—ื‘ืจื™ DBMS ืฉืชืฉืžืฉ ืืช ื”ืžื—ื‘ืจ ื”ืžื•ื’ื“ืจ;
  • plugin.name ื”ื•ื ืฉื ื”ืชื•ืกืฃ ืœืคืขื ื•ื— ืœื•ื’ื™ ืฉืœ ื ืชื•ื ื™ื ืžืงื‘ืฆื™ WAL. ื–ืžื™ืŸ ืœื‘ื—ื™ืจื” wal2json, decoderbuffs ะธ pgoutput. ืฉื ื™ ื”ืจืืฉื•ื ื™ื ื“ื•ืจืฉื™ื ื”ืชืงื ื” ืฉืœ ื”ื”ืจื—ื‘ื•ืช ื”ืžืชืื™ืžื•ืช ื‘-DBMS, ื• pgoutput ืขื‘ื•ืจ PostgreSQL ื’ืจืกื” 10 ื•ืžืขืœื” ืื™ื ื• ื“ื•ืจืฉ ืžื ื™ืคื•ืœืฆื™ื•ืช ื ื•ืกืคื•ืช;
  • database.* โ€” ืืคืฉืจื•ื™ื•ืช ืœื—ื™ื‘ื•ืจ ืœืžืกื“ ื”ื ืชื•ื ื™ื, ื”ื™ื›ืŸ database.server.name - ื”ืฉื ืฉืœ ืžื•ืคืข PostgreSQL ื”ืžืฉืžืฉ ืœื™ืฆื™ืจืช ืฉื ื”ื ื•ืฉื ื‘ืืฉื›ื•ืœ ืงืคืงื;
  • table.include.list - ืจืฉื™ืžื” ืฉืœ ื˜ื‘ืœืื•ืช ืฉื‘ื”ืŸ ืื ื• ืจื•ืฆื™ื ืœืขืงื•ื‘ ืื—ืจ ืฉื™ื ื•ื™ื™ื; ื ื™ืชืŸ ื‘ืคื•ืจืžื˜ schema.table_name; ืœื ื ื™ืชืŸ ืœื”ืฉืชืžืฉ ื™ื—ื“ ืขื table.exclude.list;
  • heartbeat.interval.ms - ืžืจื•ื•ื— (ื‘ืžื™ืœื™ืฉื ื™ื•ืช) ืฉื‘ื• ื”ืžื—ื‘ืจ ืฉื•ืœื— ื”ื•ื“ืขื•ืช ืคืขื™ืžื•ืช ืœื‘ ืœื ื•ืฉื ืžื™ื•ื—ื“;
  • heartbeat.action.query - ื‘ืงืฉื” ืฉืชื‘ื•ืฆืข ื‘ืขืช ืฉืœื™ื—ืช ื›ืœ ื”ื•ื“ืขืช ื“ื•ืคืง (ื”ืืคืฉืจื•ืช ื”ื•ืคื™ืขื” ืžืื– ื’ืจืกื” 1.1);
  • slot.name - ืฉื ื—ืจื™ืฅ ื”ืฉื›ืคื•ืœ ืฉื™ืฉืžืฉ ืืช ื”ืžื—ื‘ืจ;
  • publication.name - ืฉื ืคืจืกื•ื ื‘-PostgreSQL ืฉื”ืžื—ื‘ืจ ืžืฉืชืžืฉ ื‘ื•. ื‘ืžืงืจื” ืฉื”ื•ื ืœื ืงื™ื™ื, Debezium ื™ื ืกื” ืœื™ืฆื•ืจ ืื•ืชื•. ืื ืœืžืฉืชืžืฉ ืฉืชื—ืชื™ื• ื ืขืฉื” ื”ื—ื™ื‘ื•ืจ ืื™ืŸ ืžืกืคื™ืง ื–ื›ื•ื™ื•ืช ืœืคืขื•ืœื” ื–ื•, ื”ืžื—ื‘ืจ ื™ื™ืฆื ืขื ืฉื’ื™ืื”;
  • transforms ืงื•ื‘ืข ื›ื™ืฆื“ ื‘ื“ื™ื•ืง ืœืฉื ื•ืช ืืช ื”ืฉื ืฉืœ ื ื•ืฉื ื”ื™ืขื“:
    • transforms.AddPrefix.type ืžืฆื™ื™ืŸ ืฉื ืฉืชืžืฉ ื‘ื‘ื™ื˜ื•ื™ื™ื ืจื’ื•ืœืจื™ื™ื;
    • transforms.AddPrefix.regex - ืžืกื›ื” ืฉืœืคื™ื” ืฉืžื• ืฉืœ ื ื•ืฉื ื”ื™ืขื“ ืžื•ื’ื“ืจ ืžื—ื“ืฉ;
    • transforms.AddPrefix.replacement - ื™ืฉื™ืจื•ืช ืžื” ืื ื—ื ื• ืžื’ื“ื™ืจื™ื ืžื—ื“ืฉ.

ืขื•ื“ ืขืœ ืคืขื™ืžื•ืช ืœื‘ ื•ืฉื™ื ื•ื™ื™ื

ื›ื‘ืจื™ืจืช ืžื—ื“ืœ, ื”ืžื—ื‘ืจ ืฉื•ืœื— ื ืชื•ื ื™ื ืœืงืคืงื ืขื‘ื•ืจ ื›ืœ ืขืกืงื” ืฉื”ืชื—ื™ื™ื‘ื”, ื•ื›ื•ืชื‘ ืืช ื”-LSN ืฉืœื• (ืžืกืคืจ ืจืฆืฃ ื™ื•ืžืŸ) ืœื ื•ืฉื ื”ืฉื™ืจื•ืช offset. ืื‘ืœ ืžื” ืงื•ืจื” ืื ื”ืžื—ื‘ืจ ืžื•ื’ื“ืจ ืœืงืจื•ื ืœื ืืช ื›ืœ ืžืกื“ ื”ื ืชื•ื ื™ื, ืืœื ืจืง ื—ืœืง ืžื”ื˜ื‘ืœืื•ืช ืฉืœื• (ืฉื‘ื”ืŸ ื”ื ืชื•ื ื™ื ืžืชืขื“ื›ื ื™ื ืœืขืชื™ื ืจื—ื•ืงื•ืช)?

  • ื”ืžื—ื‘ืจ ื™ืงืจื ืงื‘ืฆื™ WAL ื•ืœื ื™ื–ื”ื” ื‘ื”ื ื”ืชื—ื™ื™ื‘ื•ื™ื•ืช ืขืกืงืื•ืช ืœื˜ื‘ืœืื•ืช ืฉื”ื•ื ืžื ื˜ืจ.
  • ืœื›ืŸ, ื”ื•ื ืœื ื™ืขื“ื›ืŸ ืืช ืžื™ืงื•ืžื• ื”ื ื•ื›ื—ื™ ืœื ื‘ื ื•ืฉื ื•ืœื ื‘ืžืฉื‘ืฆืช ื”ืฉื›ืคื•ืœ.
  • ื–ื”, ื‘ืชื•ืจื•, ื™ื’ืจื•ื ืœืงื‘ืฆื™ WAL ืœื”ื™ื•ืช "ืชืงื•ืขื™ื" ื‘ื“ื™ืกืง ื•ื›ื›ืœ ื”ื ืจืื” ื™ื’ืžืจ ืฉื˜ื— ื”ื“ื™ืกืง.

ื•ื›ืืŸ ื”ืืคืฉืจื•ื™ื•ืช ื‘ืื•ืช ืœืขื–ืจื”. heartbeat.interval.ms ะธ heartbeat.action.query. ืฉื™ืžื•ืฉ ื‘ืืคืฉืจื•ื™ื•ืช ืืœื• ื‘ื–ื•ื’ื•ืช ืžืืคืฉืจ ืœื‘ืฆืข ื‘ืงืฉื” ืœืฉื™ื ื•ื™ ื ืชื•ื ื™ื ื‘ื˜ื‘ืœื” ื ืคืจื“ืช ื‘ื›ืœ ืคืขื ืฉื ืฉืœื—ืช ื”ื•ื“ืขืช ื“ื•ืคืง. ืœืคื™ื›ืš, ื”-LSN ืขืœื™ื• ื ืžืฆื ื”ืžื—ื‘ืจ ื›ืจื’ืข (ื‘ื—ืจื™ืฅ ื”ืฉื›ืคื•ืœ) ืžืชืขื“ื›ืŸ ื›ืœ ื”ื–ืžืŸ. ื–ื” ืžืืคืฉืจ ืœ-DBMS ืœื”ืกื™ืจ ืงื‘ืฆื™ WAL ืฉืื™ื ื ื ื—ื•ืฆื™ื ืขื•ื“. ืœืžื™ื“ืข ื ื•ืกืฃ ืขืœ ืื•ืคืŸ ืคืขื•ืœืช ื”ืืคืฉืจื•ื™ื•ืช, ืจืื” ืชื™ืขื•ื“.

ืืคืฉืจื•ืช ื ื•ืกืคืช ืฉืจืื•ื™ื” ืœืชืฉื•ืžืช ืœื‘ ืจื‘ื” ื™ื•ืชืจ ื”ื™ื transforms. ืœืžืจื•ืช ืฉื–ื” ื™ื•ืชืจ ืขืœ ื ื•ื—ื•ืช ื•ื™ื•ืคื™...

ื›ื‘ืจื™ืจืช ืžื—ื“ืœ, Debezium ื™ื•ืฆืจ ื ื•ืฉืื™ื ื‘ืืžืฆืขื•ืช ืžื“ื™ื ื™ื•ืช ื”ืฉืžื•ืช ื”ื‘ืื”: serverName.schemaName.tableName. ื–ื” ืื•ืœื™ ืœื ืชืžื™ื“ ื ื•ื—. ืืคืฉืจื•ื™ื•ืช transforms ื‘ืืžืฆืขื•ืช ื‘ื™ื˜ื•ื™ื™ื ืจื’ื•ืœืจื™ื™ื, ื ื™ืชืŸ ืœื”ื’ื“ื™ืจ ืจืฉื™ืžื” ืฉืœ ื˜ื‘ืœืื•ืช ืฉื™ืฉ ืœื ืชื‘ ืืช ื”ืื™ืจื•ืขื™ื ืฉืœื”ืŸ ืœื ื•ืฉื ื‘ืขืœ ืฉื ืกืคืฆื™ืคื™.

ื‘ืชืฆื•ืจื” ืฉืœื ื• ื”ื•ื“ื•ืช ืœ transforms ืงื•ืจื” ื”ื‘ื: ื›ืœ ืื™ืจื•ืขื™ ื”-CDC ืžืžืกื“ ื”ื ืชื•ื ื™ื ื‘ืžืขืงื‘ ื™ืขื‘ืจื• ืœื ื•ืฉื ืขื ื”ืฉื data.cdc.dbname. ืื—ืจืช (ืœืœื ื”ื’ื“ืจื•ืช ืืœื”), Debezium ื›ื‘ืจื™ืจืช ืžื—ื“ืœ ืชื™ืฆื•ืจ ื ื•ืฉื ืขื‘ื•ืจ ื›ืœ ื˜ื‘ืœื” ื‘ื˜ื•ืคืก: pg-dev.public.<table_name>.

ืžื’ื‘ืœื•ืช ืžื—ื‘ืจื™ื

ื‘ืกื•ืฃ ื”ืชื™ืื•ืจ ืฉืœ ืชืฆื•ืจืช ื”ืžื—ื‘ืจ ืขื‘ื•ืจ PostgreSQL, ื›ื“ืื™ ืœื“ื‘ืจ ืขืœ ื”ืชื›ื•ื ื•ืช/ืžื’ื‘ืœื•ืช ื”ื‘ืื•ืช ืฉืœ ืขื‘ื•ื“ืชื•:

  1. ืคื•ื ืงืฆื™ื•ื ืœื™ื•ืช ื”ืžื—ื‘ืจ ืขื‘ื•ืจ PostgreSQL ืžืกืชืžื›ืช ืขืœ ื”ืจืขื™ื•ืŸ ืฉืœ ืคืขื ื•ื— ืœื•ื’ื™. ืœื›ืŸ ื”ื•ื ืื™ื ื• ืขื•ืงื‘ ืื—ืจ ื‘ืงืฉื•ืช ืœืฉื™ื ื•ื™ ืžื‘ื ื” ืžืกื“ ื”ื ืชื•ื ื™ื (DDL) - ื‘ื”ืชืื, ื ืชื•ื ื™ื ืืœื• ืœื ื™ื”ื™ื• ื‘ื ื•ืฉืื™ื.
  2. ืžื›ื™ื•ื•ืŸ ืฉืžืฉืชืžืฉื™ื ื‘ื—ืจื™ืฆื™ ืฉื›ืคื•ืœ, ื”ื—ื™ื‘ื•ืจ ืฉืœ ื”ืžื—ื‘ืจ ืืคืฉืจื™ ืจืง ืœืžื•ืคืข ื”ืจืืฉื™ ืฉืœ ื”-DBMS.
  3. ืื ืœืžืฉืชืžืฉ ืฉืชื—ืชื™ื• ื”ืžื—ื‘ืจ ืžืชื—ื‘ืจ ืœืžืกื“ ื”ื ืชื•ื ื™ื ื™ืฉ ื–ื›ื•ื™ื•ืช ืงืจื™ืื” ื‘ืœื‘ื“, ืื– ืœืคื ื™ ื”ื”ืฉืงื” ื”ืจืืฉื•ื ื”, ื™ื”ื™ื” ืขืœื™ืš ืœื™ืฆื•ืจ ื™ื“ื ื™ืช ืžืฉื‘ืฆืช ืฉื›ืคื•ืœ ื•ืœืคืจืกื ื‘ืžืกื“ ื”ื ืชื•ื ื™ื.

ื”ื—ืœืช ืชืฆื•ืจื”

ืื– ื‘ื•ืื• ื ื˜ืขืŸ ืืช ื”ืชืฆื•ืจื” ืฉืœื ื• ืœืžื—ื‘ืจ:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

ืื ื• ื‘ื•ื“ืงื™ื ืฉื”ื”ื•ืจื“ื” ื”ืฆืœื™ื—ื” ื•ื”ืžื—ื‘ืจ ื”ืชื—ื™ืœ:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

ื ื”ื“ืจ: ื–ื” ืžื•ื’ื“ืจ ื•ืžื•ื›ืŸ ืœืฉื™ืžื•ืฉ. ืขื›ืฉื™ื• ื‘ื•ืื• ื ืชื—ื–ื” ืœืฆืจื›ืŸ ื•ื ืชื—ื‘ืจ ืœืงืคืงื, ื•ืœืื—ืจ ืžื›ืŸ ื ื•ืกื™ืฃ ื•ื ืฉื ื” ืขืจืš ื‘ื˜ื‘ืœื”:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

ื‘ื ื•ืฉื ืฉืœื ื•, ื–ื” ื™ื•ืฆื’ ื‘ืื•ืคืŸ ื”ื‘ื:

JSON ืืจื•ืš ืžืื•ื“ ืขื ื”ืฉื™ื ื•ื™ื™ื ืฉืœื ื•

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

ื‘ืฉื ื™ ื”ืžืงืจื™ื, ื”ืจืฉื•ืžื•ืช ืžื•ืจื›ื‘ื•ืช ืžื”ืžืคืชื— (PK) ืฉืœ ื”ืจืฉื•ืžื” ืฉืฉื•ื ืชื”, ื•ืžืขืฆื ื”ืฉื™ื ื•ื™ื™ื: ืžื” ื”ื™ื™ืชื” ื”ืจืฉื•ืžื” ืœืคื ื™ ื•ืžื” ื”ื™ื ื”ืคื›ื” ืœืื—ืจื™ื”.

  • ื‘ืžืงืจื” ืฉืœ INSERT: ืขืจืš ืœืคื ื™ (before) ืฉื•ื•ื™ื nullื•ืื—ืจื™ื• ื”ืžื—ืจื•ื–ืช ืฉื”ื•ื›ื ืกื”.
  • ื‘ืžืงืจื” ืฉืœ UPDATE: ื‘ payload.before ื”ืžืฆื‘ ื”ืงื•ื“ื ืฉืœ ื”ืฉื•ืจื” ืžื•ืฆื’, ื•ื ื›ื ืก payload.after - ื—ื“ืฉ ืขื ืžื”ื•ืช ื”ืฉื™ื ื•ื™.

2.2 MongoDB

ืžื—ื‘ืจ ื–ื” ืžืฉืชืžืฉ ื‘ืžื ื’ื ื•ืŸ ื”ืฉื›ืคื•ืœ ื”ืกื˜ื ื“ืจื˜ื™ ืฉืœ MongoDB, ืงื•ืจื ืžื™ื“ืข ืžื”-oplog ืฉืœ ื”ืฆื•ืžืช ื”ืจืืฉื™ ืฉืœ DBMS.

ื‘ื“ื•ืžื” ืœืžื—ื‘ืจ ืฉืชื•ืืจ ื›ื‘ืจ ืขื‘ื•ืจ PgSQL, ื’ื ื›ืืŸ, ื‘ื”ืชื—ืœื” ื”ืจืืฉื•ื ื”, ื ืœืงื—ืช ืชืžื•ื ืช ื”ืžืฆื‘ ื”ืจืืฉื™ืช ืฉืœ ื”ื ืชื•ื ื™ื, ื•ืœืื—ืจ ืžื›ืŸ ื”ืžื—ื‘ืจ ืขื•ื‘ืจ ืœืžืฆื‘ ืงืจื™ืืช oplog.

ื“ื•ื’ืžื” ืœืชืฆื•ืจื”:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

ื›ืคื™ ืฉื ื™ืชืŸ ืœืจืื•ืช, ืื™ืŸ ืืคืฉืจื•ื™ื•ืช ื—ื“ืฉื•ืช ืœืขื•ืžืช ื”ื“ื•ื’ืžื” ื”ืงื•ื“ืžืช, ืืœื ืจืง ืžืกืคืจ ื”ืืคืฉืจื•ื™ื•ืช ื”ืื—ืจืื™ื•ืช ืœื—ื™ื‘ื•ืจ ืœืžืกื“ ื”ื ืชื•ื ื™ื ื•ื”ืงื™ื“ื•ืžื•ืช ืฉืœื”ืŸ ืฆื•ืžืฆื.

ื”ื’ื“ืจื•ืช transforms ื”ืคืขื ื”ื ืขื•ืฉื™ื ืืช ื”ืคืขื•ืœื•ืช ื”ื‘ืื•ืช: ืžืคื ื™ื ืืช ื”ืฉื ืฉืœ ื ื•ืฉื ื”ื™ืขื“ ืžื”ืชื›ื ื™ืช <server_name>.<db_name>.<collection_name> ะฒ data.cdc.mongo_<db_name>.

ืกื•ื‘ืœื ื•ืช ืœืชืงืœื•ืช

ืกื•ื’ื™ื™ืช ืกื•ื‘ืœื ื•ืช ื”ืชืงืœื•ืช ื•ื”ื–ืžื™ื ื•ืช ื”ื’ื‘ื•ื”ื” ื‘ื–ืžื ื ื• ื”ื™ื ื—ืจื™ืคื” ืžืชืžื™ื“ โ€“ ื‘ืžื™ื•ื—ื“ ื›ืฉืžื“ื‘ืจื™ื ืขืœ ื ืชื•ื ื™ื ื•ืขืกืงืื•ืช, ื•ืžืขืงื‘ ืื—ืจ ืฉื™ื ื•ื™ื™ ื ืชื•ื ื™ื ืœื ื ืžืฆื ื‘ืฆื“ ื‘ืขื ื™ื™ืŸ ื”ื–ื”. ื‘ื•ืื• ื ืกืชื›ืœ ืžื” ื™ื›ื•ืœ ืœื”ืฉืชื‘ืฉ ืขืงืจื•ื ื™ืช ื•ืžื” ื™ืงืจื” ืœื“ื‘ื–ื™ื•ื ื‘ื›ืœ ืžืงืจื”.

ื™ืฉื ืŸ ืฉืœื•ืฉ ืืคืฉืจื•ื™ื•ืช ืœื‘ื™ื˜ื•ืœ ื”ืกื›ืžื”:

  1. ื›ืฉืœ ืฉืœ ืงืคืงื ืงื•ื ืงื˜. ืื Connect ืžื•ื’ื“ืจ ืœืขื‘ื•ื“ ื‘ืžืฆื‘ ืžื‘ื•ื–ืจ, ื–ื” ื“ื•ืจืฉ ืžืžืกืคืจ ืขื•ื‘ื“ื™ื ืœื”ื’ื“ื™ืจ ืืช ืื•ืชื• group.id. ืœืื—ืจ ืžื›ืŸ, ืื ืื—ื“ ืžื”ื ื ื›ืฉืœ, ื”ืžื—ื‘ืจ ื™ื•ืคืขืœ ืžื—ื“ืฉ ืขืœ ื”ืขื•ื‘ื“ ื”ืฉื ื™ ื•ื™ืžืฉื™ืš ืœืงืจื•ื ืžื”ืขืžื“ื” ื”ืžื—ื•ื™ื™ื‘ืช ื”ืื—ืจื•ื ื” ื‘ื ื•ืฉื ื‘ืงืคืงื.
  2. ืื•ื‘ื“ืŸ ืงื™ืฉื•ืจื™ื•ืช ืขื ืืฉื›ื•ืœ ืงืคืงื. ื”ืžื—ื‘ืจ ืคืฉื•ื˜ ื™ืคืกื™ืง ืœืงืจื•ื ื‘ืžื™ืงื•ื ืฉืœื ื”ืฆืœื™ื— ืœืฉืœื•ื— ืœืงืคืงื ื•ืžื“ื™ ืคืขื ื™ื ืกื” ืœืฉืœื•ื— ืื•ืชื• ืžื—ื“ืฉ ืขื“ ืฉื”ื ื™ืกื™ื•ืŸ ื™ืฆืœื™ื—.
  3. ืžืงื•ืจ ื”ื ืชื•ื ื™ื ืื™ื ื• ื–ืžื™ืŸ. ื”ืžื—ื‘ืจ ื™ื ืกื” ืœื”ืชื—ื‘ืจ ืžื—ื“ืฉ ืœืžืงื•ืจ ื‘ื”ืชืื ืœืชืฆื•ืจื”. ื‘ืจื™ืจืช ื”ืžื—ื“ืœ ื”ื™ื 16 ื ื™ืกื™ื•ื ื•ืช ืฉื™ืžื•ืฉ ื’ื™ื‘ื•ื™ ืืงืกืคื•ื ื ืฆื™ืืœื™. ืœืื—ืจ ื”ื ื™ืกื™ื•ืŸ ื”-16 ื”ื›ื•ืฉืœ, ื”ืžืฉื™ืžื” ืชืกื•ืžืŸ ื› ื ื›ืฉืœ ื•ื™ื”ื™ื” ืฆื•ืจืš ืœื”ืคืขื™ืœ ืื•ืชื• ืžื—ื“ืฉ ื‘ืื•ืคืŸ ื™ื“ื ื™ ื‘ืืžืฆืขื•ืช ืžืžืฉืง Kafka Connect REST.
    • ื‘ืžืงืจื” ืฉืœ PostgreSQL ื ืชื•ื ื™ื ืœื ื™ืื‘ื“ื•, ื›ื™ ืฉื™ืžื•ืฉ ื‘ื—ืจื™ืฆื™ ืฉื›ืคื•ืœ ื™ืžื ืข ืžื—ื™ืงื” ืฉืœ ืงื‘ืฆื™ WAL ืฉืœื ื ืงืจืื• ืขืœ ื™ื“ื™ ื”ืžื—ื‘ืจ. ื‘ืžืงืจื” ื–ื”, ื™ืฉ ื—ื™ืกืจื•ืŸ: ืื ืงื™ืฉื•ืจื™ื•ืช ื”ืจืฉืช ื‘ื™ืŸ ื”ืžื—ื‘ืจ ืœ-DBMS ืžื•ืคืจืขืช ื‘ืžืฉืš ื–ืžืŸ ืจื‘, ื™ืฉ ืกื™ื›ื•ื™ ืฉืฉื˜ื— ื”ื“ื™ืกืง ื™ื’ืžืจ, ื•ื–ื” ืขืœื•ืœ ืœื”ื•ื‘ื™ืœ ืœื›ืฉืœ ืฉืœ ื”-DBMS ื›ื•ืœื•.
    • ื‘ืžืงืจื” ืฉืœ MySQL ื ื™ืชืŸ ืœืกื•ื‘ื‘ ืงื‘ืฆื™ binlog ืขืœ ื™ื“ื™ ื”-DBMS ืขืฆืžื• ืœืคื ื™ ืฉื—ื–ื•ืจ ื”ืงื™ืฉื•ืจื™ื•ืช. ื–ื” ื™ื’ืจื•ื ืœืžื—ื‘ืจ ืœืขื‘ื•ืจ ืœืžืฆื‘ ื›ืฉืœ, ื•ื”ื•ื ื™ืฆื˜ืจืš ืœื”ืคืขื™ืœ ืžื—ื“ืฉ ื‘ืžืฆื‘ ืชืžื•ื ืช ืžืฆื‘ ืจืืฉื•ื ื™ ื›ื“ื™ ืœื”ืžืฉื™ืš ืœืงืจื•ื ืž-binlogs ื›ื“ื™ ืœืฉื—ื–ืจ ืืช ื”ืคืขื•ืœื” ื”ืจื’ื™ืœื”.
    • ืขืœ MongoDB. ื”ืชื™ืขื•ื“ ืื•ืžืจ: ื”ื”ืชื ื”ื’ื•ืช ืฉืœ ื”ืžื—ื‘ืจ ื‘ืžืงืจื” ืฉืงื•ื‘ืฆื™ ื”ื™ื•ืžืŸ/ืื•ืคืœื•ื’ื™ื ื ืžื—ืงื• ื•ื”ืžื—ื‘ืจ ืœื ื™ื›ื•ืœ ืœื”ืžืฉื™ืš ืœืงืจื•ื ืžื”ืžื™ืงื•ื ืฉื‘ื• ื”ื•ืคืกืง ื–ื”ื” ืขื‘ื•ืจ ื›ืœ ื”-DBMS. ื–ื” ื˜ืžื•ืŸ ื‘ืขื•ื‘ื“ื” ืฉื”ืžื—ื‘ืจ ื™ื™ื›ื ืก ืœืžื“ื™ื ื” ื ื›ืฉืœ ื•ื™ื“ืจื•ืฉ ื”ืคืขืœื” ืžื—ื“ืฉ ื‘ืžืฆื‘ ืชืžื•ื ืช ืžืฆื‘ ืจืืฉื•ื ื™ืช.

      ืขื ื–ืืช, ื™ืฉื ื ื™ื•ืฆืื™ ื“ื•ืคืŸ. ืื ื”ืžื—ื‘ืจ ื”ื™ื” ื‘ืžืฆื‘ ืžื ื•ืชืง ื‘ืžืฉืš ื–ืžืŸ ืจื‘ (ืื• ืœื ื”ืฆืœื™ื— ืœื”ื’ื™ืข ืœืžื•ืคืข MongoDB), ื•ื”-oplog ื”ืกืชื•ื‘ื‘ ื‘ืžื”ืœืš ื–ืžืŸ ื–ื”, ืื– ื›ืืฉืจ ื”ื—ื™ื‘ื•ืจ ื™ืฉื•ื—ื–ืจ, ื”ืžื—ื‘ืจ ื™ืžืฉื™ืš ื‘ืจื•ื’ืข ืœืงืจื•ื ื ืชื•ื ื™ื ืžื”ืžื™ืงื•ื ื”ื–ืžื™ืŸ ื”ืจืืฉื•ืŸ , ื•ืœื›ืŸ ื—ืœืง ืžื”ื ืชื•ื ื™ื ื‘ืงืคืงื ืœื ื™ืคื’ืข.

ืžืกืงื ื”

Debezium ื”ื•ื ื”ื ื™ืกื™ื•ืŸ ื”ืจืืฉื•ืŸ ืฉืœื™ ืขื ืžืขืจื›ื•ืช CDC ื•ื”ื™ื” ืžืื•ื“ ื—ื™ื•ื‘ื™ ื‘ืกืš ื”ื›ืœ. ื”ืคืจื•ื™ืงื˜ ืฉื™ื—ื“ ืืช ืชืžื™ื›ืช ื”-DBMS ื”ืจืืฉื™, ืงืœื•ืช ืชืฆื•ืจื”, ืชืžื™ื›ื” ื‘ืืฉื›ื•ืœื•ืช ื•ืงื”ื™ืœื” ืคืขื™ืœื”. ืœืžืขื•ื ื™ื™ื ื™ื ื‘ืชืจื’ื•ืœ, ืื ื™ ืžืžืœื™ืฅ ืœืงืจื•ื ืืช ื”ืžื“ืจื™ื›ื™ื ืขื‘ื•ืจ ืงืคืงื ืžืชื—ื‘ืจ ะธ ื“ื‘ื™ืฆื™ื•ื.

ื‘ื”ืฉื•ื•ืื” ืœืžื—ื‘ืจ JDBC ืขื‘ื•ืจ Kafka Connect, ื”ื™ืชืจื•ืŸ ื”ืขื™ืงืจื™ ืฉืœ Debezium ื”ื•ื ืฉืฉื™ื ื•ื™ื™ื ื ืงืจืื™ื ืžื™ื•ืžื ื™ ื”-DBMS, ืžื” ืฉืžืืคืฉืจ ืงื‘ืœืช ื ืชื•ื ื™ื ื‘ื”ืฉื”ื™ื™ื” ืžื™ื ื™ืžืœื™ืช. ื”-JDBC Connector (ืฉืžืกื•ืคืง ืขืœ ื™ื“ื™ Kafka Connect) ืžื‘ืฆืข ืฉืื™ืœืชื•ืช ืขืœ ื”ื˜ื‘ืœื” ื”ืžื ื•ืงื‘ืช ื‘ืžืจื•ื•ื— ืงื‘ื•ืข ื•(ืžืื•ืชื” ืกื™ื‘ื”) ืื™ื ื• ืžื™ื™ืฆืจ ื”ื•ื“ืขื•ืช ื›ืืฉืจ ื”ื ืชื•ื ื™ื ื ืžื—ืงื™ื (ืื™ืš ื ื™ืชืŸ ืœื‘ืฆืข ืฉืื™ืœืชื” ืœื ืชื•ื ื™ื ืฉืื™ื ื ืงื™ื™ืžื™ื?).

ื›ื“ื™ ืœืคืชื•ืจ ื‘ืขื™ื•ืช ื“ื•ืžื•ืช, ืืชื” ื™ื›ื•ืœ ืœืฉื™ื ืœื‘ ืœืคืชืจื•ื ื•ืช ื”ื‘ืื™ื (ื‘ื ื•ืกืฃ ืœ-Debezium):

ื .ื‘.

ืงืจื ื’ื ื‘ื‘ืœื•ื’ ืฉืœื ื•:

ืžืงื•ืจ: www.habr.com

ื”ื•ืกืคืช ืชื’ื•ื‘ื”