เปเบ™เบฐเบ™เปเบฒ Debezium - CDC เบชเปเบฒเบฅเบฑเบš Apache Kafka

เปเบ™เบฐเบ™เปเบฒ Debezium - CDC เบชเปเบฒเบฅเบฑเบš Apache Kafka

เปƒเบ™เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ‚เบญเบ‡เบ‚เป‰เบญเบ, เบ‚เป‰เบญเบเบกเบฑเบเบˆเบฐเบžเบปเบšเบงเบดเบ—เบตเปเบเป‰เป„เบ‚เบ”เป‰เบฒเบ™เบงเบดเบŠเบฒเบเบฒเบ™เปƒเบซเบกเปˆ / เบœเบฐเบฅเบดเบ”เบ•เบฐเบžเบฑเบ™เบŠเบญเบšเปเบง, เบ‚เปเป‰เบกเบนเบ™เบเปˆเบฝเบงเบเบฑเบšเบชเบดเปˆเบ‡เบ—เบตเปˆเบšเปเปˆเบ„เปˆเบญเบเบกเบตเบขเบนเปˆเปƒเบ™เบญเบดเบ™เป€เบ•เบตเป€เบ™เบฑเบ”เบ—เบตเปˆเป€เบงเบปเป‰เบฒเบžเบฒเบชเบฒเบฅเบฑเบ”เป€เบŠเบ. เบ”เป‰เบงเบเบšเบปเบ”เบ„เบงเบฒเบกเบ™เบตเป‰, เบ‚เป‰เบญเบเบˆเบฐเบžเบฐเบเบฒเบเบฒเบกเบ•เบทเปˆเบกเบ‚เปเป‰เบกเบนเบ™เปƒเบชเปˆเบŠเปˆเบญเบ‡เบซเบงเปˆเบฒเบ‡เบ”เบฑเปˆเบ‡เบเปˆเบฒเบงเบ”เป‰เบงเบเบ•เบปเบงเบขเปˆเบฒเบ‡เบˆเบฒเบเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒเบ‚เบญเบ‡เบ‚เป‰เบญเบ, เป€เบกเบทเปˆเบญเบ‚เป‰เบญเบเบ•เป‰เบญเบ‡เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบเบฒเบ™เบชเบปเปˆเบ‡เป€เบซเบ”เบเบฒเบ™ CDC เบˆเบฒเบเบชเบญเบ‡ DBMSs เบ—เบตเปˆเบ™เบดเบเบปเบก (PostgreSQL เปเบฅเบฐ MongoDB) เป„เบ›เบซเบฒเบเบธเปˆเบก Kafka เป‚เบ”เบเปƒเบŠเป‰ Debezium. เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบซเบงเบฑเบ‡เบงเปˆเบฒเบšเบปเบ”เบ„เบงเบฒเบกเบ—เบปเบšเบ—เบงเบ™เบ™เบตเป‰, เป€เบŠเบดเปˆเบ‡เบ›เบฐเบเบปเบ”เบงเปˆเบฒเป€เบ›เบฑเบ™เบœเบปเบ™เบกเบฒเบˆเบฒเบเบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ—เบตเปˆเป€เบฎเบฑเบ”, เบˆเบฐเป€เบ›เบฑเบ™เบ›เบฐเป‚เบซเบเบ”เบ•เปเปˆเบ„เบปเบ™เบญเบทเปˆเบ™.

Debezium เปเบฅเบฐ CDC เป‚เบ”เบเบ—เบปเปˆเบงเป„เบ›เปเบกเปˆเบ™เบซเบเบฑเบ‡?

เป€เบ”เป€เบšเบŠเบฝเบก - เบœเบนเป‰เบ•เบฒเบ‡เบซเบ™เป‰เบฒเบ‚เบญเบ‡เบ›เบฐเป€เบžเบ”เบŠเบญเบšเปเบง CDC (เบšเบฑเบ™เบ—เบถเบเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบ‚เปเป‰เบกเบนเบ™), เบซเบผเบทเบซเบผเบฒเบเบเบงเปˆเบฒเบ—เบตเปˆเบŠเบฑเบ”เป€เบˆเบ™, เบกเบฑเบ™เปเบกเปˆเบ™เบŠเบธเบ”เบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบš DBMSs เบ•เปˆเบฒเบ‡เป†เบ—เบตเปˆเป€เบซเบกเบฒเบฐเบชเบปเบกเบเบฑเบšเบเบญเบš Apache Kafka Connect.

เบ™เบตเป‰ เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เปเบซเบผเปˆเบ‡โ€‹เป€เบ›เบตเบ”โ€‹, เป„เบ”เป‰เบฎเบฑเบšเบญเบฐเบ™เบธเบเบฒเบ”เบžเบฒเบเปƒเบ•เป‰ Apache License v2.0 เปเบฅเบฐเป„เบ”เป‰เบฎเบฑเบšเบเบฒเบ™เบชเบฐเปœเบฑเบšเบชเบฐเปœเบนเบ™เป‚เบ”เบ Red Hat. เบเบฒเบ™เบžเบฑเบ”เบ—เบฐเบ™เบฒเป„เบ”เป‰เบ”เปเบฒเป€เบ™เบตเบ™เบกเบฒเบ•เบฑเป‰เบ‡เปเบ•เปˆเบ›เบต 2016 เปเบฅเบฐเปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบกเบฑเบ™เบชเบฐเบซเบ™เบญเบ‡เบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบขเปˆเบฒเบ‡เป€เบ›เบฑเบ™เบ—เบฒเบ‡เบเบฒเบ™เบชเปเบฒเบฅเบฑเบš DBMS เบ•เปเปˆเป„เบ›เบ™เบตเป‰: MySQL, PostgreSQL, MongoDB, SQL Server. เบเบฑเบ‡เบกเบตเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบš Cassandra เปเบฅเบฐ Oracle, เปเบ•เปˆเบžเบงเบเป€เบ‚เบปเบฒเบขเบนเปˆเปƒเบ™เบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™ "เป€เบ‚เบปเป‰เบฒเป€เบ–เบดเบ‡เป„เบง", เปเบฅเบฐเบเบฒเบ™เบญเบญเบเปƒเบซเบกเปˆเบšเปเปˆเป„เบ”เป‰เบฎเบฑเบšเบ›เบฐเบเบฑเบ™เบ„เบงเบฒเบกเป€เบ‚เบปเป‰เบฒเบเบฑเบ™เป„เบ”เป‰เปƒเบ™เบ”เป‰เบฒเบ™เบซเบฅเบฑเบ‡.

เบ–เป‰เบฒเบžเบงเบเป€เบฎเบปเบฒเบ›เบฝเบšเบ—เบฝเบš CDC เบเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เปเบšเบšเบ”เบฑเป‰เบ‡เป€เบ”เบตเบก (เป€เบกเบทเปˆเบญเปเบญเบฑเบšเบžเบฅเบดเป€เบ„เบŠเบฑเบ™เบญเปˆเบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบ DBMS เป‚เบ”เบเบเบปเบ‡), เบ›เบฐเป‚เบซเบเบ”เบ•เบปเป‰เบ™เบ•เปเบ‚เบญเบ‡เบกเบฑเบ™เบ›เบฐเบเบญเบšเบกเบตเบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบกเบนเบ™เปƒเบ™เบฅเบฐเบ”เบฑเบšเปเบ–เบงเบ—เบตเปˆเบกเบต latency เบ•เปˆเปเบฒ, เบ„เบงเบฒเบกเบซเบ™เป‰เบฒเป€เบŠเบทเปˆเบญเบ–เบทเบชเบนเบ‡เปเบฅเบฐเบ„เบงเบฒเบกเบžเป‰เบญเบก. เบชเบญเบ‡เบˆเบธเบ”เบชเบธเบ”เบ—เป‰เบฒเบเปเบกเปˆเบ™เบšเบฑเบ™เบฅเบธเป„เบ”เป‰เป‚เบ”เบเบเบฒเบ™เปƒเบŠเป‰เบเบธเปˆเบก Kafka เป€เบ›เบฑเบ™เบšเปˆเบญเบ™เป€เบเบฑเบšเบกเป‰เบฝเบ™เบ‚เบญเบ‡เป€เบซเบ”เบเบฒเบ™ CDC.

เบ™เบญเบเบˆเบฒเบเบ™เบตเป‰, เบ‚เปเป‰เบ”เบตเบฅเบงเบกเป€เบ–เบดเบ‡เบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบตเปˆเบงเปˆเบฒเบฎเบนเบšเปเบšเบšเบ”เบฝเบงเบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป€เบžเบทเปˆเบญเป€เบเบฑเบšเบฎเบฑเบเบชเบฒเป€เบซเบ”เบเบฒเบ™, เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบเบชเบธเบ”เบ—เป‰เบฒเบเบšเปเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เบเบฑเบ‡เบงเบปเบ™เบเปˆเบฝเบงเบเบฑเบš nuances เบ‚เบญเบ‡เบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบ‡เบฒเบ™เบ—เบตเปˆเปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™ DBMS.

เบชเบธเบ”เบ—เป‰เบฒเบ, เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ™เบฒเบเบซเบ™เป‰เบฒเบ‚เปเป‰เบ„เบงเบฒเบกเป€เบ›เบตเบ”เบ‚เบญเบšเป€เบ‚เบ”เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ‚เบฐเบซเบเบฒเบเบญเบญเบเบ•เบฒเบกเบฅเบงเบ‡เบ™เบญเบ™เบ‚เบญเบ‡เบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบเบ—เบตเปˆเบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบ‚เปเป‰เบกเบนเบ™. เปƒเบ™เบ‚เบฐเบ™เบฐเบ”เบฝเบงเบเบฑเบ™, เบœเบปเบ™เบเบฐเบ—เบปเบšเบ•เปเปˆเปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เปเบกเปˆเบ™เบซเบ™เป‰เบญเบเบ—เบตเปˆเบชเบธเบ”, เป€เบ™เบทเปˆเบญเบ‡เบˆเบฒเบเบงเปˆเบฒเบ‚เปเป‰เบกเบนเบ™เบšเปเปˆเป„เบ”เป‰เบฎเบฑเบšเป‚เบ”เบเบเบปเบ‡เบˆเบฒเบ DBMS, เปเบ•เปˆเบˆเบฒเบเบเบธเปˆเบก Kafka.

เบเปˆเบฝเบงเบเบฑเบšเบชเบฐเบ–เบฒเบ›เบฑเบ”เบ•เบฐเบเบฐเบเปเบฒ Debezium

เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰ Debezium เบกเบฒเบฅเบปเบ‡เปƒเบ™เป‚เบ„เบ‡เบเบฒเบ™เบ‡เปˆเบฒเบเบ”เบฒเบเบ™เบตเป‰:

DBMS (เป€เบ›เบฑเบ™เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™) โ†’ เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปƒเบ™ Kafka Connect โ†’ Apache Kafka โ†’ เบœเบนเป‰เบšเปเบฅเบดเป‚เบžเบ

เปƒเบ™เบ–เบฒเบ™เบฐเป€เบ›เบฑเบ™เบ•เบปเบงเบขเปˆเบฒเบ‡, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเบˆเบฐเปƒเบซเป‰เปเบœเบ™เบงเบฒเบ”เบˆเบฒเบเป€เบงเบฑเบšเป„เบŠเบ—เปŒเบ‚เบญเบ‡เป‚เบ„เบ‡เบเบฒเบ™:

เปเบ™เบฐเบ™เปเบฒ Debezium - CDC เบชเปเบฒเบฅเบฑเบš Apache Kafka

เบขเปˆเบฒเบ‡เปƒเบ”เบเปเปˆเบ•เบฒเบก, เบ‚เป‰เบญเบเบšเปเปˆเบกเบฑเบเป‚เบ„เบ‡เบเบฒเบ™เบ™เบตเป‰, เป€เบžเบฒเบฐเบงเปˆเบฒเบกเบฑเบ™เป€เบšเบดเปˆเบ‡เบ„เบทเบงเปˆเบฒเบกเบตเบžเบฝเบ‡เปเบ•เปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบšเปˆเบญเบ™เบซเบฅเบปเป‰เบกเบˆเบปเบกเป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™.

เปƒเบ™เบ„เบงเบฒเบกเป€เบ›เบฑเบ™เบˆเบดเบ‡, เบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เปเบกเปˆเบ™เปเบ•เบเบ•เปˆเบฒเบ‡เบเบฑเบ™: เบเบฒเบ™เบ•เบทเปˆเบกเบ‚เปเป‰เบกเบนเบ™ Lake เบ‚เบญเบ‡เบ—เปˆเบฒเบ™ (เบฅเบดเป‰เบ‡เบชเบธเบ”เบ—เป‰เบฒเบเปƒเบ™เปเบœเบ™เบงเบฒเบ”เบ‚เป‰เบฒเบ‡เป€เบ—เบดเบ‡) เบšเปเปˆเปเบกเปˆเบ™เบงเบดเบ—เบตเบ”เบฝเบงเบ—เบตเปˆเบˆเบฐเปƒเบŠเป‰ Debezium. เป€เบซเบ”เบเบฒเบ™เบ—เบตเปˆเบ–เบทเบเบชเบปเปˆเบ‡เป„เบ›เบซเบฒ Apache Kafka เบชเบฒเบกเบฒเบ”เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป‚เบ”เบเบ„เปเบฒเบฎเป‰เบญเบ‡เบชเบฐเบซเบกเบฑเบเบ‚เบญเบ‡เบ—เปˆเบฒเบ™เป€เบžเบทเปˆเบญเบˆเบฑเบ”เบเบฒเบ™เบเบฑเบšเบชเบฐเบ–เบฒเบ™เบฐเบเบฒเบ™เบ•เปˆเบฒเบ‡เป†. เบเบปเบโ€‹เบ•เบปเบงโ€‹เบขเปˆเบฒเบ‡:

  • เบเบฒเบ™เบเปเบฒเบˆเบฑเบ”เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบšเปเปˆเบเปˆเบฝเบงเบ‚เป‰เบญเบ‡เบญเบญเบเบˆเบฒเบ cache;
  • เบเบฒเบ™โ€‹เบชเบปเปˆเบ‡โ€‹เปเบˆเป‰เบ‡โ€‹เบเบฒเบ™โ€‹;
  • เบเบฒเบ™เบ›เบฑเบšเบ›เบธเบ‡เบ”เบฑเบ”เบชเบฐเบ™เบตเบ„เบปเป‰เบ™เบซเบฒ;
  • เบšเบฒเบ‡เบ›เบฐเป€เบžเบ”เบ‚เบญเบ‡เบšเบฑเบ™เบ—เบถเบเบเบฒเบ™เบเบงเบ”เบชเบญเบš;
  • ...

เปƒเบ™โ€‹เบเปโ€‹เบฅเบฐโ€‹เบ™เบตโ€‹เบ—เบตเปˆโ€‹เบ—เปˆเบฒเบ™โ€‹เบกเบตโ€‹เบ„เปเบฒโ€‹เบฎเป‰เบญเบ‡โ€‹เบชเบฐโ€‹เบซเบกเบฑเบ Java เปเบฅเบฐโ€‹เบšเปเปˆโ€‹เบกเบตโ€‹เบ„เบงเบฒเบกโ€‹เบ•เป‰เบญเบ‡โ€‹เบเบฒเบ™ / เบ„เบงเบฒเบกโ€‹เป€เบ›เบฑเบ™โ€‹เป„เบ›โ€‹เป„เบ”เป‰โ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹เบเบธเปˆเบก Kafkaโ€‹, เบกเบฑเบ™โ€‹เบเบฑเบ‡โ€‹เบกเบตโ€‹เบ„เบงเบฒเบกโ€‹เป€เบ›เบฑเบ™โ€‹เป„เบ›โ€‹เป„เบ”เป‰โ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เป€เบฎเบฑเบ”โ€‹เบงเบฝเบโ€‹เป‚เบ”เบโ€‹เบœเปˆเบฒเบ™โ€‹เบเบฒเบ™โ€‹. เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบ‡. เบšเบงเบเบ—เบตเปˆเบŠเบฑเบ”เป€เบˆเบ™เปเบกเปˆเบ™เบงเปˆเบฒเบเบฑเบšเบกเบฑเบ™เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบ›เบฐเบ•เบดเป€เบชเบ”เป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบžเบทเป‰เบ™เบ–เบฒเบ™เป€เบžเบตเปˆเบกเป€เบ•เบตเบก (เปƒเบ™เบฎเบนเบšเปเบšเบšเบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบฅเบฐ Kafka). เปเบ™เบงเปƒเบ”เบเปเปˆเบ•เบฒเบก, เบเบฒเบ™เปเบเป‰เป„เบ‚เบ™เบตเป‰เป„เบ”เป‰เบ–เบทเบเบเบปเบเป€เบฅเบตเบเบ•เบฑเป‰เบ‡เปเบ•เปˆเป€เบงเบตเบŠเบฑเบ™ 1.1 เปเบฅเบฐเบšเปเปˆเบ–เบทเบเปเบ™เบฐเบ™เบณเปƒเบซเป‰เปƒเบŠเป‰เบญเบตเบเบ•เปเปˆเป„เบ› (เบกเบฑเบ™เบญเบฒเบ”เบˆเบฐเบ–เบทเบเบฅเบถเบšเบญเบญเบเปƒเบ™เบฅเบธเป‰เบ™เปƒเบ™เบญเบฐเบ™เบฒเบ„เบปเบ”).

เบšเบปเบ”โ€‹เบ„เบงเบฒเบกโ€‹เบ™เบตเป‰โ€‹เบˆเบฐโ€‹เบ›เบถเบโ€‹เบชเบฒโ€‹เบซเบฒโ€‹เบฅเบทโ€‹เบชเบฐโ€‹เบ–เบฒโ€‹เบ›เบฑเบ”โ€‹เบ•เบฐโ€‹เบ—เบตเปˆโ€‹เปเบ™เบฐโ€‹เบ™เปเบฒโ€‹เป‚เบ”เบโ€‹เบœเบนเป‰โ€‹เบžเบฑเบ”โ€‹เบ—เบฐโ€‹เบ™เบฒโ€‹, เบ—เบตเปˆโ€‹เปƒเบซเป‰โ€‹เบ„เบงเบฒเบกโ€‹เบ—เบปเบ™โ€‹เบ—เบฒเบ™โ€‹เบ„เบงเบฒเบกโ€‹เบœเบดเบ”โ€‹เบžเบฒเบ”โ€‹เปเบฅเบฐโ€‹เบเบฒเบ™โ€‹เบ‚เบฐโ€‹เบซเบเบฒเบโ€‹เบ•เบปเบงโ€‹เป„เบ”เป‰โ€‹.

เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ

เป€เบžเบทเปˆเบญเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบเบฒเบ™เบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เปƒเบ™เบกเบนเบ™เบ„เปˆเบฒเบ—เบตเปˆเบชเปเบฒเบ„เบฑเบ™เบ—เบตเปˆเบชเบธเบ” - เบ‚เปเป‰เบกเบนเบ™ - เบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™:

  1. เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™, เป€เบŠเบดเปˆเบ‡เบชเบฒเบกเบฒเบ”เป€เบ›เบฑเบ™ MySQL เป€เบฅเบตเปˆเบกเบˆเบฒเบเป€เบงเบตเบŠเบฑเบ™ 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (เบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ„เบปเบšเบ–เป‰เบงเบ™);
  2. เบเบธเปˆเบก Apache Kafka
  3. Kafka Connect instance (เป€เบงเบตเบŠเบฑเปˆเบ™ 1.x, 2.x);
  4. เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ Debezium.

เป€เบฎเบฑเบ”เบงเบฝเบเบชเบญเบ‡เบˆเบธเบ”เบ—เปเบฒเบญเบดเบ”, i.e. เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡ DBMS เปเบฅเบฐ Apache Kafka เปเบกเปˆเบ™เป€เบเบตเบ™เบ‚เบญเบšเป€เบ‚เบ”เบ‚เบญเบ‡เบšเบปเบ”เบ„เบงเบฒเบก. เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เบชเปเบฒเบฅเบฑเบšเบœเบนเป‰เบ—เบตเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เบ—เบตเปˆเบˆเบฐเบ™เปเบฒเปƒเบŠเป‰เบ—เบธเบเบชเบดเปˆเบ‡เบ—เบธเบเบขเปˆเบฒเบ‡เปƒเบ™ sandbox, เบกเบตเบซเบ™เบถเปˆเบ‡เบ—เบตเปˆเบเบฝเบกเบžเป‰เบญเบกเปƒเบ™ repository เบขเปˆเบฒเบ‡เป€เบ›เบฑเบ™เบ—เบฒเบ‡เบเบฒเบ™เบ—เบตเปˆเบกเบตเบ•เบปเบงเบขเปˆเบฒเบ‡. docker-compose.yaml.

เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเบชเบธเบกเปƒเบชเปˆเบชเบญเบ‡เบˆเบธเบ”เบชเบธเบ”เบ—เป‰เบฒเบเปƒเบ™เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เป€เบžเบตเปˆเบกเป€เบ•เบตเบก.

0. Kafka Connect

เบ—เบตเปˆเบ™เบตเป‰เปเบฅเบฐเบ•เปเปˆเบกเบฒเปƒเบ™เบšเบปเบ”เบ„เบงเบฒเบก, เบ•เบปเบงเบขเปˆเบฒเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ—เบฑเบ‡เบซเบกเบปเบ”เบ–เบทเบเบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเปƒเบ™เบชเบฐเบžเบฒเบšเบเบฒเบ™เบ‚เบญเบ‡เบฎเบนเบšเบžเบฒเบš Docker เบ—เบตเปˆเปเบˆเบเบขเบฒเบเป‚เบ”เบเบ™เบฑเบเบžเบฑเบ”เบ—เบฐเบ™เบฒ Debezium. เบกเบฑเบ™เบ›เบฐเบเบญเบšเบ”เป‰เบงเบเป„เบŸเบฅเปŒ plugin เบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ—เบฑเบ‡เบซเบกเบปเบ” (เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ) เปเบฅเบฐเบชเบฐเบซเบ™เบญเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ Kafka Connect เป‚เบ”เบเปƒเบŠเป‰เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก.

เบ–เป‰เบฒเบ—เปˆเบฒเบ™เบ•เบฑเป‰เบ‡เปƒเบˆเบˆเบฐเปƒเบŠเป‰ Kafka Connect เบˆเบฒเบ Confluent, เบ—เปˆเบฒเบ™เบˆเบฐเบ•เป‰เบญเบ‡เป€เบžเบตเปˆเบก plugins เบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบ”เป‰เบงเบเบ•เบปเบงเบ—เปˆเบฒเบ™เป€เบญเบ‡เบเบฑเบšเป„เบ”เป€เบฅเบเบฐเบ—เปเบฅเบตเบ—เบตเปˆเบฅเบฐเบšเบธเป„เบงเป‰เปƒเบ™. plugin.path เบซเบผเบทเบ•เบฑเป‰เบ‡เบœเปˆเบฒเบ™เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก CLASSPATH. เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบชเปเบฒเบฅเบฑเบš Kafka Connect worker เปเบฅเบฐ connectors เปเบกเปˆเบ™เบ–เบทเบเบเปเบฒเบ™เบปเบ”เป‚เบ”เบเบœเปˆเบฒเบ™เป„เบŸเบฅเปŒเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ—เบตเปˆเบ–เบทเบเบชเบปเปˆเบ‡เบœเปˆเบฒเบ™เป€เบ›เบฑเบ™เบเบฒเบ™เป‚เบ•เป‰เบ–เบฝเบ‡เบเบฑเบšเบ„เปเบฒเบชเบฑเปˆเบ‡เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ‚เบญเบ‡เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™. เบชเปเบฒเบฅเบฑเบšเบฅเบฒเบเบฅเบฐเบญเบฝเบ”เป€เบšเบดเปˆเบ‡ เป€เบญเบเบฐเบชเบฒเบ™.

เบ‚เบฐเบšเบงเบ™เบเบฒเบ™เบ—เบฑเบ‡เบซเบกเบปเบ”เบ‚เบญเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ Debeizum เปƒเบ™เบชเบฐเบšเบฑเบšเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบกเปˆเบ™เบ”เปเบฒเป€เบ™เบตเบ™เปƒเบ™เบชเบญเบ‡เบ‚เบฑเป‰เบ™เบ•เบญเบ™. เบ‚เปโ€‹เปƒเบซเป‰โ€‹เบžเบดเบˆเบฒเบฅเบฐเบ™เบฒโ€‹เบžเบงเบโ€‹เป€เบ‚เบปเบฒโ€‹เปเบ•เปˆโ€‹เบฅเบฐโ€‹เบ„เบปเบ™:

1. เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบเบญเบš Kafka Connect

เป€เบžเบทเปˆเบญเบ–เปˆเบฒเบเบ—เบญเบ”เบ‚เปเป‰เบกเบนเบ™เป„เบ›เบซเบฒเบเบธเปˆเบก Apache Kafka, เบ•เบปเบงเบเปเบฒเบ™เบปเบ”เบเบฒเบ™เบชเบฐเป€เบžเบฒเบฐเปเบกเปˆเบ™เบ–เบทเบเบเปเบฒเบ™เบปเบ”เป„เบงเป‰เปƒเบ™เบเบญเบš Kafka Connect, เป€เบŠเบฑเปˆเบ™:

  • เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เบ„เปˆเบฒโ€‹เบเบฒเบ™โ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹เบเบธเปˆเบกโ€‹,
  • เบŠเบทเปˆเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ‚เบญเบ‡เบกเบฑเบ™เป€เบญเบ‡เบˆเบฐเบ–เบทเบเป€เบเบฑเบšเป„เบงเป‰,
  • เบŠเบทเปˆเบ‚เบญเบ‡เบเบธเปˆเบกเบ—เบตเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเปเบฒเบฅเบฑเบ‡เปเบฅเปˆเบ™ (เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบฎเบนเบšเปเบšเบšเบเบฒเบ™เปเบˆเบเบขเบฒเบ).

เบฎเบนเบšเบžเบฒเบš Docker เบขเปˆเบฒเบ‡เป€เบ›เบฑเบ™เบ—เบฒเบ‡เบเบฒเบ™เบ‚เบญเบ‡เป‚เบ„เบ‡เบเบฒเบ™เบชเบฐเบซเบ™เบฑเบšเบชเบฐเบซเบ™เบนเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป‚เบ”เบเปƒเบŠเป‰เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบก - เบ™เบตเป‰เปเบกเปˆเบ™เบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰. เบ”เบฑเปˆเบ‡โ€‹เบ™เบฑเป‰เบ™โ€‹เปƒเบซเป‰โ€‹เบ”เบฒเบงโ€‹เบ™โ€‹เปŒโ€‹เป‚เบซเบฅเบ”โ€‹เบฎเบนเบšโ€‹เบžเบฒเบšโ€‹:

docker pull debezium/connect

เบŠเบธเบ”เบ•เบปเบงเปเบ›เบชเบฐเบžเบฒเบšเปเบงเบ”เบฅเป‰เบญเบกเบ‚เบฑเป‰เบ™เบ•เปˆเบณเบ—เบตเปˆเบ•เป‰เบญเบ‡เบเบฒเบ™เป€เบžเบทเปˆเบญเปเบฅเปˆเบ™เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบกเบตเบ”เบฑเปˆเบ‡เบ™เบตเป‰:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - เบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™เบ‚เบญเบ‡เป€เบ„เบทเปˆเบญเบ‡เปเบกเปˆเบ‚เปˆเบฒเบเบ‚เบญเบ‡เบเบธเปˆเบก Kafka เป€เบžเบทเปˆเบญเปƒเบซเป‰เป„เบ”เป‰เบฎเบฑเบšเบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ„เบปเบšเบ–เป‰เบงเบ™เบ‚เบญเบ‡เบชเบฐเบกเบฒเบŠเบดเบเบเบธเปˆเบก;
  • OFFSET_STORAGE_TOPIC=connector-offsets โ€” เบซเบปเบงเบ‚เปเป‰เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒเบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ—เบตเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ•เบฑเป‰เบ‡เบขเบนเปˆเปƒเบ™เบ›เบฐเบˆเบธเบšเบฑเบ™;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - เบซเบปเบงเบ‚เปเป‰เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒเบชเบฐเบ–เบฒเบ™เบฐเบžเบฒเบšเบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบฅเบฐเบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบกเบฑเบ™;
  • CONFIG_STORAGE_TOPIC=connector-config - เบซเบปเบงเบ‚เปเป‰เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒเบ‚เปเป‰เบกเบนเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบฅเบฐเบงเบฝเบเบ‡เบฒเบ™เบ‚เบญเบ‡เบกเบฑเบ™;
  • GROUP_ID=1 โ€” เบ•เบปเบงเบฅเบฐเบšเบธเบเบธเปˆเบกเบ„เบปเบ™เบ‡เบฒเบ™เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบ›เบฐเบ•เบดเบšเบฑเบ”เปœเป‰เบฒเบ—เบตเปˆเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ; เบ•เป‰เบญเบ‡เบเบฒเบ™เปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เปเบˆเบเบขเบฒเบ (เปเบˆเบเบขเบฒเบ) เบฅเบฐเบšเบญเบš.

เบžเบงเบเป€เบฎเบปเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™ container เบ”เป‰เบงเบเบ•เบปเบงเปเบ›เป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

เบซเบกเบฒเบเป€เบซเบ”เบเปˆเบฝเบงเบเบฑเบš Avro

เป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™, Debezium เบ‚เบฝเบ™เบ‚เปเป‰เบกเบนเบ™เปƒเบ™เบฎเบนเบšเปเบšเบš JSON, เป€เบŠเบดเปˆเบ‡เบเบญเบกเบฎเบฑเบšเป„เบ”เป‰เบชเปเบฒเบฅเบฑเบš sandboxes เปเบฅเบฐเบ‚เปเป‰เบกเบนเบ™เบˆเปเบฒเบ™เบงเบ™เบ™เป‰เบญเบเป†, เปเบ•เปˆเบชเบฒเบกเบฒเบ”เป€เบ›เบฑเบ™เบšเบฑเบ™เบซเบฒเปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเป‚เบซเบฅเบ”เบซเบผเบฒเบ. เบ—เบฒเบ‡เป€เบฅเบทเบญเบเบ‚เบญเบ‡เบ•เบปเบงเปเบ›เบ‡ JSON เปเบกเปˆเบ™เป€เบžเบทเปˆเบญ serialize เบ‚เปเป‰เบ„เบงเบฒเบกเป‚เบ”เบเปƒเบŠเป‰ Avro เบเบฑเบšเบฎเบนเบšเปเบšเบšเบ–เบฒเบ™เบชเบญเบ‡, เป€เบŠเบดเปˆเบ‡เบซเบผเบธเบ”เบœเปˆเบญเบ™เบเบฒเบ™เป‚เบซเบผเบ”เปƒเบ™เบฅเบฐเบšเบปเบšเบเปˆเบญเบ I / O เปƒเบ™ Apache Kafka.

เป€เบžเบทเปˆเบญเปƒเบŠเป‰ Avro, เบ—เปˆเบฒเบ™เบˆเปเบฒเป€เบ›เบฑเบ™เบ•เป‰เบญเบ‡เปƒเบŠเป‰เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบ schema-registry (เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เป€เบเบฑเบšเบฎเบฑเบเบชเบฒ schemas). เบ•เบปเบงเปเบ›เบ‚เบญเบ‡เบ•เบปเบงเปเบ›เบ‡เบชเบฑเบ™เบเบฒเบ™เบˆเบฐเบกเบตเบฅเบฑเบเบชเบฐเบ™เบฐเบ™เบตเป‰:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰ Avro เปเบฅเบฐเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบเบฒเบ™เบฅเบปเบ‡เบ—เบฐเบšเบฝเบ™เบชเปเบฒเบฅเบฑเบšเบกเบฑเบ™เป€เบเบตเบ™เบ‚เบญเบšเป€เบ‚เบ”เบ‚เบญเบ‡เบšเบปเบ”เบ„เบงเบฒเบก - เบ•เบทเปˆเบกเบญเบตเบ, เป€เบžเบทเปˆเบญเบ„เบงเบฒเบกเบŠเบฑเบ”เป€เบˆเบ™, เบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰ JSON.

2. เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡

เปƒเบ™เบ›เบฑเบ”เบˆเบธเบšเบฑเบ™เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป„เบ›เป‚เบ”เบเบเบปเบ‡เบเบฑเบšเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡, เป€เบŠเบดเปˆเบ‡เบˆเบฐเบญเปˆเบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเปเบซเบผเปˆเบ‡.

เบ‚เปเปƒเบซเป‰เป€เบšเบดเปˆเบ‡เบ•เบปเบงเบขเปˆเบฒเบ‡เบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบšเบชเบญเบ‡ DBMS: PostgreSQL เปเบฅเบฐ MongoDB, เบ—เบตเปˆเบ‚เป‰เบญเบเบกเบตเบ›เบฐเบชเบปเบšเบเบฒเบ™เปเบฅเบฐเบกเบตเบ„เบงเบฒเบกเปเบ•เบเบ•เปˆเบฒเบ‡ (เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบˆเบฐเบ™เป‰เบญเบ, เปเบ•เปˆเปƒเบ™เบšเบฒเบ‡เบเปเบฅเบฐเบ™เบตเบ—เบตเปˆเบชเปเบฒเบ„เบฑเบ™!).

เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป„เบ”เป‰เบ–เบทเบเบญเบฐเบ—เบดเบšเบฒเบเป„เบงเป‰เปƒเบ™ JSON notation เปเบฅเบฐเบญเบฑเบšเป‚เบซเบฅเบ”เป„เบ›เบเบฑเบ‡ Kafka Connect เป‚เบ”เบเปƒเบŠเป‰เบ„เปเบฒเบฎเป‰เบญเบ‡เบ‚เป POST.

2.1. PostgreSQL

เบ•เบปเบงเบขเปˆเบฒเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบš PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

เบซเบผเบฑเบเบเบฒเบ™เบ‚เบญเบ‡เบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบ‡เบฒเบ™เบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบซเบผเบฑเบ‡เบˆเบฒเบเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ™เบตเป‰เปเบกเปˆเบ™เบ‚เป‰เบญเบ™เบ‚เป‰เบฒเบ‡เบ‡เปˆเบฒเบเบ”เบฒเบ:

  • เปƒเบ™เบ•เบญเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ—เปเบฒเบญเบดเบ”, เบกเบฑเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบฅเบฐเบšเบธเป„เบงเป‰เปƒเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเปเบฅเบฐเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบ™เป‚เบซเบกเบ” เบžเบฒเบšเบ–เปˆเบฒเบเป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™, เบชเบปเปˆเบ‡เบเบฑเบš Kafka เบŠเบธเบ”เบ‚เปเป‰เบกเบนเบ™เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™เบ—เบตเปˆเป„เบ”เป‰เบฎเบฑเบšเบ”เป‰เบงเบเป€เบ‡เบทเปˆเบญเบ™เป„เบ‚ SELECT * FROM table_name.
  • เบซเบผเบฑเบ‡โ€‹เบˆเบฒเบโ€‹เบเบฒเบ™โ€‹เป€เบฅเบตเปˆเบกโ€‹เบ•เบปเป‰เบ™โ€‹เปเบกเปˆเบ™โ€‹เบชเปเบฒโ€‹เป€เบฅเบฑเบ”โ€‹, เบ•เบปเบงโ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹เป€เบ‚เบปเป‰เบฒโ€‹เป„เบ›โ€‹เปƒเบ™โ€‹เบฎเบนเบšโ€‹เปเบšเบšโ€‹เบเบฒเบ™โ€‹เบญเปˆเบฒเบ™โ€‹เบเบฒเบ™โ€‹เบ›เปˆเบฝเบ™โ€‹เปเบ›เบ‡โ€‹เบˆเบฒเบโ€‹เป„เบŸเบฅโ€‹เปŒ PostgreSQL WALโ€‹.

เบเปˆเบฝเบงโ€‹เบเบฑเบšโ€‹เบ—เบฒเบ‡โ€‹เป€เบฅเบทเบญเบโ€‹เบ—เบตเปˆโ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹:

  • name โ€” เบŠเบทเปˆโ€‹เบ‚เบญเบ‡โ€‹เบ•เบปเบงโ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹เบ—เบตเปˆโ€‹เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เบ„เปˆเบฒโ€‹เบญเบฐโ€‹เบ—เบดโ€‹เบšเบฒเบโ€‹เบ‚เป‰เบฒเบ‡โ€‹เบฅเบธเปˆเบกโ€‹เบ™เบตเป‰โ€‹เป„เบ”เป‰โ€‹เบ–เบทเบโ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹; เปƒเบ™เบญเบฐเบ™เบฒเบ„เบปเบ”, เบŠเบทเปˆเบ™เบตเป‰เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป€เบžเบทเปˆเบญเป€เบฎเบฑเบ”เบงเบฝเบเบเบฑเบšเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ (i. e. เป€เบšเบดเปˆเบ‡เบชเบฐเบ–เบฒเบ™เบฐ / restart / เบ›เบฑเบšเบ›เบธเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ) เบœเปˆเบฒเบ™ Kafka Connect REST API;
  • connector.class โ€” เบซเป‰เบญเบ‡เบฎเบฝเบ™เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ DBMS เบ—เบตเปˆเบˆเบฐเปƒเบŠเป‰เป‚เบ”เบเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเบเบณเบ™เบปเบ”เบ„เปˆเบฒ;
  • plugin.name เปเบกเปˆเบ™เบŠเบทเปˆเบ‚เบญเบ‡ plugin เบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ–เบญเบ”เบฅเบฐเบซเบฑเบ”เบขเปˆเบฒเบ‡เบกเบตเป€เบซเบ”เบœเบปเบ™เบ‚เบญเบ‡เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเป„เบŸเบฅเปŒ WAL. เบกเบตเปƒเบซเป‰เป€เบฅเบทเบญเบ wal2json, decoderbuffs ะธ pgoutput. เบชเบญเบ‡เบญเบฑเบ™เบ—เปเบฒเบญเบดเบ”เบฎเบฝเบเบฎเป‰เบญเบ‡เปƒเบซเป‰เบกเบตเบเบฒเบ™เบ•เบดเบ”เบ•เบฑเป‰เบ‡เบชเปˆเบงเบ™เบ‚เบฐเบซเบเบฒเบเบ—เบตเปˆเป€เบซเบกเบฒเบฐเบชเบปเบกเปƒเบ™ DBMS, เปเบฅเบฐ pgoutput เบชเปเบฒเบฅเบฑเบš PostgreSQL เบฎเบธเปˆเบ™ 10 เปเบฅเบฐเบชเบนเบ‡เบเบงเปˆเบฒเบšเปเปˆเบฎเบฝเบเบฎเป‰เบญเบ‡เปƒเบซเป‰เบกเบตเบเบฒเบ™เบซเบกเบนเบ™เปƒเบŠเป‰เป€เบžเบตเปˆเบกเป€เบ•เบตเบก;
  • database.* โ€” เบ—เบฒเบ‡โ€‹เป€เบฅเบทเบญเบโ€‹เบชเปเบฒโ€‹เบฅเบฑเบšโ€‹เบเบฒเบ™โ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹เบเบฑเบšโ€‹เบ–เบฒเบ™โ€‹เบ‚เปเป‰โ€‹เบกเบนเบ™โ€‹, เบšเปˆเบญเบ™โ€‹เบ—เบตเปˆโ€‹ database.server.name - เบŠเบทเปˆเบ‚เบญเบ‡เบ•เบปเบงเบขเปˆเบฒเบ‡ PostgreSQL เบ—เบตเปˆเปƒเบŠเป‰เป€เบžเบทเปˆเบญเบชเป‰เบฒเบ‡เบŠเบทเปˆเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เปƒเบ™เบเบธเปˆเบก Kafka;
  • table.include.list - เบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบ•เป‰เบญเบ‡เบเบฒเบ™เบ—เบตเปˆเบˆเบฐเบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡; เบกเบญเบšเปƒเบซเป‰เปƒเบ™เบฎเบนเบšเปเบšเบš schema.table_name; เบšเปเปˆเบชเบฒเบกเบฒเบ”เปƒเบŠเป‰เบฎเปˆเบงเบกเบเบฑเบ™เบเบฑเบš table.exclude.list;
  • heartbeat.interval.ms โ€” เป„เบฅเบเบฐเบซเปˆเบฒเบ‡ (เป€เบ›เบฑเบ™ milliseconds) เบ—เบตเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเบปเปˆเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเบซเบปเบงเปƒเบˆเป€เบ•เบฑเป‰เบ™เป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบžเบดเป€เบชเบ”;
  • heartbeat.action.query - เบ„เปเบฒโ€‹เบฎเป‰เบญเบ‡โ€‹เบชเบฐโ€‹เบซเบกเบฑเบโ€‹เบ—เบตเปˆโ€‹เบˆเบฐโ€‹เป„เบ”เป‰โ€‹เบฎเบฑเบšโ€‹เบเบฒเบ™โ€‹เบ›เบฐโ€‹เบ•เบดโ€‹เบšเบฑเบ”โ€‹เปƒเบ™โ€‹เป€เบงโ€‹เบฅเบฒโ€‹เบ—เบตเปˆโ€‹เบชเบปเปˆเบ‡โ€‹เบ‚เปเป‰โ€‹เบ„เบงเบฒเบกโ€‹เป€เบ•เบฑเป‰เบ™โ€‹เบ‚เบญเบ‡โ€‹เบซเบปเบงโ€‹เปƒเบˆโ€‹เปเบ•เปˆโ€‹เบฅเบฐโ€‹เบ„เบปเบ™ (เบ—เบฒเบ‡โ€‹เป€เบฅเบทเบญเบโ€‹เบ—เบตเปˆโ€‹เป„เบ”เป‰โ€‹เบ›เบฒโ€‹เบเบปเบ”โ€‹เบ•เบฑเป‰เบ‡โ€‹เปเบ•เปˆโ€‹เบชเบฐโ€‹เบšเบฑเบš 1.1โ€‹)โ€‹;
  • slot.name โ€” เบŠเบทเปˆเบ‚เบญเบ‡เบŠเปˆเบญเบ‡ replication เบ—เบตเปˆเบˆเบฐเบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰เป‚เบ”เบเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ;
  • publication.name - เบŠเบทเปˆ publications เปƒเบ™ PostgreSQL เบ—เบตเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปƒเบŠเป‰. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ—เบตเปˆเบกเบฑเบ™เบšเปเปˆเบกเบต, Debezium เบˆเบฐเบžเบฐเบเบฒเบเบฒเบกเบชเป‰เบฒเบ‡เบกเบฑเบ™. เบ–เป‰เบฒเบœเบนเป‰เปƒเบŠเป‰เบ—เบตเปˆเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ™เบฑเป‰เบ™เบšเปเปˆเบกเบตเบชเบดเบ”เบžเบฝเบ‡เบžเปเบชเปเบฒเบฅเบฑเบšเบเบฒเบ™เบ”เปเบฒเป€เบ™เบตเบ™เบเบฒเบ™เบ™เบตเป‰, เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบญเบญเบเบ”เป‰เบงเบเบ„เบงเบฒเบกเบœเบดเบ”เบžเบฒเบ”;
  • transforms เบเปเบฒเบ™เบปเบ”เบงเบดเบ—เบตเบเบฒเบ™เบ›เปˆเบฝเบ™เบŠเบทเปˆเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเบ—เบตเปˆเปเบ™เปˆเบ™เบญเบ™:
    • transforms.AddPrefix.type เบŠเบตเป‰เบšเบญเบเบงเปˆเบฒเบžเบงเบเป€เบฎเบปเบฒเบˆเบฐเปƒเบŠเป‰เบชเบณเบ™เบงเบ™เบ›เบปเบเบเบฐเบ•เบด;
    • transforms.AddPrefix.regex โ€” เปœเป‰เบฒเบเบฒเบเบ—เบตเปˆเบŠเบทเปˆเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เป€เบ›เบปเป‰เบฒเปเบฒเบเบ–เบทเบเบเบณเบ™เบปเบ”เบ„เบทเบ™เปƒเปเปˆ;
    • transforms.AddPrefix.replacement - เบเบปเบ‡เบเบฑเบšเบชเบดเปˆเบ‡เบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเบเปเบฒเบ™เบปเบ”เบ„เบทเบ™เปƒเบซเบกเปˆ.

เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบเปˆเบฝเบงเบเบฑเบšเบเบฒเบ™เป€เบ•เบฑเป‰เบ™เบ‚เบญเบ‡เบซเบปเบงเปƒเบˆเปเบฅเบฐเบเบฒเบ™เบซเบฑเบ™เบ›เปˆเบฝเบ™

เป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™, เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบชเบปเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™เป„เบ›เบซเบฒ Kafka เบชเปเบฒเบฅเบฑเบšเปเบ•เปˆเบฅเบฐเบ—เบธเบฅเบฐเบเปเบฒเบ—เบตเปˆเบซเบกเบฑเป‰เบ™เบชเบฑเบ™เบเบฒ, เปเบฅเบฐเบ‚เบฝเบ™ LSN เบ‚เบญเบ‡เบกเบฑเบ™ (Log Sequence Number) เปƒเบชเปˆเบซเบปเบงเบ‚เปเป‰เบเบฒเบ™เบšเปเบฅเบดเบเบฒเบ™ offset. เปเบ•เปˆเบˆเบฐเป€เบเบตเบ”เบซเบเบฑเบ‡เบ‚เบถเป‰เบ™เบ–เป‰เบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ–เบทเบเบ•เบฑเป‰เบ‡เบ„เปˆเบฒเปƒเบซเป‰เบญเปˆเบฒเบ™เบšเปเปˆเปเบกเปˆเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบฑเบ‡เปเบปเบ”, เปเบ•เปˆเบกเบตเบžเบฝเบ‡เบชเปˆเบงเบ™เปœเบถเปˆเบ‡เบ‚เบญเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ‚เบญเบ‡เบกเบฑเบ™เป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™ (เบ‚เปเป‰เบกเบนเบ™เปƒเบ”เบ–เบทเบเบญเบฑเบšเป€เบ”เบ”เป€เบฅเบทเป‰เบญเบเป†)?

  • เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบญเปˆเบฒเบ™เป„เบŸเบฅเปŒ WAL เปเบฅเบฐเบšเปเปˆเบเบงเบ”เบžเบปเบšเบเบฒเบ™เป€เบฎเบฑเบ”เบ—เบธเบฅเบฐเบเปเบฒเปƒเบ™เบžเบงเบเบกเบฑเบ™เบเบฑเบšเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบตเปˆเบกเบฑเบ™เบ•เบดเบ”เบ•เบฒเบก.
  • เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, เบกเบฑเบ™เบˆเบฐเบšเปเปˆเบ›เบฑเบšเบ›เบธเบ‡เบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ›เบฐเบˆเบธเบšเบฑเบ™เบ‚เบญเบ‡เบกเบฑเบ™เบšเปเปˆเบงเปˆเบฒเบˆเบฐเบขเบนเปˆเปƒเบ™เบซเบปเบงเบ‚เปเป‰เบซเบผเบทเปƒเบ™เบŠเปˆเบญเบ‡ replication.
  • เบ™เบตเป‰, เปƒเบ™เบ—เบฒเบ‡เบเบฑเบšเบเบฑเบ™, เบˆเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰เป„เบŸเบฅเปŒ WAL "เบ•เบดเบ”" เปƒเบ™เปเบœเปˆเบ™เปเบฅเบฐเบญเบฒเบ”เบˆเบฐเบซเบกเบปเบ”เบžเบทเป‰เบ™เบ—เบตเปˆเบ”เบดเบ”.

เปเบฅเบฐเปƒเบ™เบ—เบตเปˆเบ™เบตเป‰เบ—เบฒเบ‡เป€เบฅเบทเบญเบเบกเบฒเบซเบฒเบเบนเป‰เป„เบž. heartbeat.interval.ms ะธ heartbeat.action.query. เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบ—เบฒเบ‡เป€เบฅเบทเบญเบเป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰เป€เบ›เบฑเบ™เบ„เบนเปˆเป€เบฎเบฑเบ”เปƒเบซเป‰เบกเบฑเบ™เป€เบ›เบฑเบ™เป„เบ›เป„เบ”เป‰เบ—เบตเปˆเบˆเบฐเบ›เบฐเบ•เบดเบšเบฑเบ”เบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบ‚เปเป‰เบกเบนเบ™เปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เปเบเบเบ•เปˆเบฒเบ‡เบซเบฒเบเปƒเบ™เปเบ•เปˆเบฅเบฐเบ„เบฑเป‰เบ‡เบ—เบตเปˆเบ‚เปเป‰เบ„เบงเบฒเบกเบซเบปเบงเปƒเบˆเบ–เบทเบเบชเบปเปˆเบ‡. เบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™, LSN เบ—เบตเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ•เบฑเป‰เบ‡เบขเบนเปˆเปƒเบ™เบ›เบฐเบˆเบธเบšเบฑเบ™ (เบขเบนเปˆเปƒเบ™เบŠเปˆเบญเบ‡ replication) เป„เบ”เป‰เบ–เบทเบเบ›เบฑเบšเบ›เบธเบ‡เบขเปˆเบฒเบ‡เบ•เปเปˆเป€เบ™เบทเปˆเบญเบ‡. เบ™เบตเป‰เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰ DBMS เป€เบญเบปเบฒเป„เบŸเบฅเปŒ WAL เบ—เบตเปˆเบšเปเปˆเบˆเปเบฒเป€เบ›เบฑเบ™เบญเบญเบ. เบชเปเบฒเบฅเบฑเบšเบ‚เปเป‰เบกเบนเบ™เป€เบžเบตเปˆเบกเป€เบ•เบตเบกเบเปˆเบฝเบงเบเบฑเบšเบงเบดเบ—เบตเบเบฒเบ™เบ—เบฒเบ‡เป€เบฅเบทเบญเบเป€เบฎเบฑเบ”เบงเบฝเบ, เป€เบšเบดเปˆเบ‡ เป€เบญเบเบฐเบชเบฒเบ™.

เบ—เบฒเบ‡เป€เบฅเบทเบญเบเบญเบทเปˆเบ™เบ—เบตเปˆเบชเบปเบกเบ„เบงเบ™เป„เบ”เป‰เบฎเบฑเบšเบ„เบงเบฒเบกเบชเบปเบ™เปƒเบˆเบขเปˆเบฒเบ‡เปƒเบเป‰เบŠเบดเบ”เปเบกเปˆเบ™ transforms. เป€เบ–เบดเบ‡เปเบกเปˆเบ™เบงเปˆเบฒเบกเบฑเบ™เบกเบตเบ„เบงเบฒเบกเบชเบฐเบ”เบงเบเบชเบฐเบšเบฒเบเปเบฅเบฐเบ„เบงเบฒเบกเบ‡เบฒเบกเบซเบผเบฒเบเบเบงเปˆเบฒ ...

เป‚เบ”เบเบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™, Debezium เบชเป‰เบฒเบ‡เบซเบปเบงเบ‚เปเป‰เป‚เบ”เบเปƒเบŠเป‰เบ™เบฐเป‚เบเบšเบฒเบเบเบฒเบ™เบ•เบฑเป‰เบ‡เบŠเบทเปˆเบ•เปเปˆเป„เบ›เบ™เบตเป‰: serverName.schemaName.tableName. เบ™เบตเป‰เบญเบฒเบ”เบˆเบฐเบšเปเปˆเบชเบฐเบ”เบงเบเบชเบฐ เป€เปเบต เป„เบ›. เบ—เบฒเบ‡เป€เบฅเบทเบญเบ transforms เบเบฒเบ™เบ™เปเบฒเปƒเบŠเป‰เบเบฒเบ™เบชเบฐเปเบ”เบ‡เบญเบญเบเบ›เบปเบเบเบฐเบ•เบด, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบเปเบฒเบ™เบปเบ”เบšเบฑเบ™เบŠเบตเบฅเบฒเบเบŠเบทเปˆเบ‚เบญเบ‡เบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ—เบตเปˆเป€เบซเบ”เบเบฒเบ™เบ•เป‰เบญเบ‡เบ–เบทเบเบ™เปเบฒเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบกเบตเบŠเบทเปˆเบชเบฐเป€เบžเบฒเบฐ.

เปƒเบ™เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเบ‚เปเบ‚เบญเบšเปƒเบˆเบเบฑเบš transforms เบ•เปเปˆเป„เบ›เบ™เบตเป‰เบˆเบฐเป€เบเบตเบ”เบ‚เบถเป‰เบ™: เป€เบซเบ”เบเบฒเบ™ CDC เบ—เบฑเบ‡เบซเบกเบปเบ”เบˆเบฒเบเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบ–เบทเบเบ•เบดเบ”เบ•เบฒเบกเบˆเบฐเป„เบ›เบซเบฒเบซเบปเบงเบ‚เปเป‰เบ—เบตเปˆเบกเบตเบŠเบทเปˆ data.cdc.dbname. เบ–เป‰เบฒเบšเปเปˆเบ”เบฑเปˆเบ‡เบ™เบฑเป‰เบ™ (เป‚เบ”เบเบšเปเปˆเบกเบตเบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเป€เบซเบผเบปเปˆเบฒเบ™เบตเป‰), Debezium เบˆเบฐเบชเป‰เบฒเบ‡เบซเบปเบงเบ‚เปเป‰เบชเปเบฒเบฅเบฑเบšเปเบ•เปˆเบฅเบฐเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ‚เบญเบ‡เปเบšเบšเบŸเบญเบก: pg-dev.public.<table_name>.

เบ‚เปเป‰เบˆเปเบฒเบเบฑเบ”เบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ

เปƒเบ™เบ•เบญเบ™เบ—เป‰เบฒเบเบ‚เบญเบ‡เบฅเบฒเบเบฅเบฐเบญเบฝเบ”เบ‚เบญเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบš PostgreSQL, เบกเบฑเบ™เป€เบ›เบฑเบ™เบกเบนเบ™เบ„เปˆเบฒเบ—เบตเปˆเบˆเบฐเป€เบงเบปเป‰เบฒเบเปˆเบฝเบงเบเบฑเบšเบฅเบฑเบเบชเบฐเบ™เบฐเบ•เปเปˆเป„เบ›เบ™เบตเป‰ / เบ‚เปเป‰เบˆเปเบฒเบเบฑเบ”เบ‚เบญเบ‡เบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ‚เบญเบ‡เบกเบฑเบ™:

  1. เบซเบ™เป‰เบฒเบ—เบตเปˆเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบชเปเบฒเบฅเบฑเบš PostgreSQL เบญเบตเบ‡เปƒเบชเปˆเปเบ™เบงเบ„เบงเบฒเบกเบ„เบดเบ”เบ‚เบญเบ‡เบเบฒเบ™เบ–เบญเบ”เบฅเบฐเบซเบฑเบ”เบขเปˆเบฒเบ‡เบกเบตเป€เบซเบ”เบœเบปเบ™. เป€เบžเบฒเบฐเบชเบฐเบ™เบฑเป‰เบ™ เบฅเบฒเบง เบšเปเปˆเป„เบ”เป‰เบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบฎเป‰เบญเบ‡เบ‚เปเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เป‚เบ„เบ‡เบชเป‰เบฒเบ‡เบ‚เบญเบ‡เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™ (DDL) - เบ•เบฒเบกเบ„เบงเบฒเบกเป€เบซเบกเบฒเบฐเบชเบปเบก, เบ‚เปเป‰เบกเบนเบ™เบ™เบตเป‰เบˆเบฐเบšเปเปˆเบขเบนเปˆเปƒเบ™เบซเบปเบงเบ‚เปเป‰.
  2. เบ™เบฑเบšเบ•เบฑเป‰เบ‡เปเบ•เปˆเบชเบฐเบฅเบฑเบญเบ”เบ•เบดเบ‡ replication เบ–เบทเบเบ™เปเบฒเปƒเบŠเป‰, เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบกเปˆเบ™เป€เบ›เบฑเบ™เป„เบ›เป„เบ”เป‰ เบžเบฝเบ‡เปเบ•เปˆ เบเบฑเบšเบ•เบปเบงเบขเปˆเบฒเบ‡ DBMS เบ•เบปเป‰เบ™เบชเบฐเบšเบฑเบš.
  3. เบ–เป‰เบฒเบœเบนเป‰เปƒเบŠเป‰เบžเบฒเบเปƒเบ•เป‰เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบกเบตเบชเบดเบ”เบญเปˆเบฒเบ™เป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, เบเปˆเบญเบ™เบ—เบตเปˆเบˆเบฐเป€เบ›เบตเบ”เบ•เบปเบงเบ„เบฑเป‰เบ‡เบ—เปเบฒเบญเบดเบ”, เบ—เปˆเบฒเบ™เบˆเบฐเบ•เป‰เบญเบ‡เบชเป‰เบฒเบ‡เบŠเปˆเบญเบ‡ replication เบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡เปเบฅเบฐเป€เบœเบตเบเปเบœเปˆเปƒเบ™เบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™.

เบเบฒเบ™โ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เบ„เปˆเบฒโ€‹

เบชเบฐเบ™เบฑเป‰เบ™เปƒเบซเป‰เป‚เบซเบผเบ”เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปƒเบชเปˆเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

เบžเบงเบโ€‹เป€เบฎเบปเบฒโ€‹เบเบงเบ”โ€‹เบชเบญเบšโ€‹เบงเปˆเบฒโ€‹เบเบฒเบ™โ€‹เบ”เบฒเบงโ€‹เป‚เบซเบผเบ”โ€‹เบชเปเบฒโ€‹เป€เบฅเบฑเบ”โ€‹เบœเบปเบ™โ€‹เปเบฅเบฐโ€‹เบ•เบปเบงโ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹เป„เบ”เป‰โ€‹เป€เบฅเบตเปˆเบกโ€‹เบ•เบปเป‰เบ™โ€‹:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

เบ”เบตเป€เบฅเบตเบ”: เบกเบฑเบ™เบ•เบฑเป‰เบ‡เปเบฅเป‰เบง เปเบฅเบฐเบžเป‰เบญเบกเบ—เบตเปˆเบˆเบฐเป„เบ›. เบ•เบญเบ™เบ™เบตเป‰เปƒเบซเป‰เบชเบปเบกเบกเบธเบ”เบงเปˆเบฒเป€เบ›เบฑเบ™เบœเบนเป‰เบšเปเบฅเบดเป‚เบžเบเปเบฅเบฐเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบš Kafka, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบžเบงเบเป€เบฎเบปเบฒเป€เบžเบตเปˆเบกเปเบฅเบฐเบ›เปˆเบฝเบ™เบฅเบฒเบเบเบฒเบ™เปƒเบ™เบ•เบฒเบ•เบฐเบฅเบฒเบ‡:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

เปƒเบ™เบซเบปเบงเบ‚เปเป‰เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ, เบ™เบตเป‰เบˆเบฐเบชเบฐเปเบ”เบ‡เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰:

JSON เบเบฒเบงเบซเบผเบฒเบเบเบฑเบšเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

เปƒเบ™เบ—เบฑเบ‡เบชเบญเบ‡เบเปเบฅเบฐเบ™เบต, เบšเบฑเบ™เบ—เบถเบเบ›เบฐเบเบญเบšเบ”เป‰เบงเบเบเบธเบ™เปเบˆ (PK) เบ‚เบญเบ‡เบšเบฑเบ™เบ—เบถเบเบ—เบตเปˆเบกเบตเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡, เปเบฅเบฐเบ„เบงเบฒเบกเบชเปเบฒเบ„เบฑเบ™เบ‚เบญเบ‡เบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡: เบชเบดเปˆเบ‡เบ—เบตเปˆเบšเบฑเบ™เบ—เบถเบเบเปˆเบญเบ™เปเบฅเบฐเบชเบดเปˆเบ‡เบ—เบตเปˆเบกเบฑเบ™เบเบฒเบเป€เบ›เบฑเบ™เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™.

  • เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ INSERT: เบ„เปˆเบฒโ€‹เบเปˆเบญเบ™ (before) เป€เบ—เบปเปˆเบฒเบเบฑเบš nullเบ•เบดเบ”เบ•เบฒเบกเบ”เป‰เบงเบเบชเบฒเบเบ—เบตเปˆเปƒเบชเปˆ.
  • เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ UPDATE: เปƒเบ™ payload.before เบชเบฐเบ–เบฒเบ™เบฐเบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒเบ‚เบญเบ‡เปเบ–เบงเบ–เบทเบเบชเบฐเปเบ”เบ‡, เปเบฅเบฐเปƒเบ™ payload.after - เปƒเบซเบกเปˆโ€‹เบ—เบตเปˆโ€‹เบกเบตโ€‹เป€เบ™เบทเป‰เบญโ€‹เปเบ—เป‰โ€‹เบ‚เบญเบ‡โ€‹เบเบฒเบ™โ€‹เบ›เปˆเบฝเบ™โ€‹เปเบ›เบ‡โ€‹.

2.2 MongoDB

เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ™เบตเป‰เปƒเบŠเป‰เบเบปเบ™เป„เบเบเบฒเบ™เบˆเปเบฒเบฅเบญเบ‡เปเบšเบš MongoDB เบกเบฒเบ”เบ•เบฐเบ–เบฒเบ™, เบญเปˆเบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบ oplog เบ‚เบญเบ‡ node เบซเบผเบฑเบ DBMS.

เบ„เป‰เบฒเบเบ„เบทเบเบฑเบ™เบเบฑเบšเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ—เบตเปˆเป„เบ”เป‰เบญเบฐเบ—เบดเบšเบฒเบเป„เบงเป‰เปเบฅเป‰เบงเบชเปเบฒเบฅเบฑเบš PgSQL, เบ—เบตเปˆเบ™เบตเป‰, เป€เบŠเบฑเปˆเบ™เบ”เบฝเบงเบเบฑเบ™, เปƒเบ™เบ•เบญเบ™เป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เบ—เปเบฒเบญเบดเบ”, เบžเบฒเบšเบ–เปˆเบฒเบเบ‚เปเป‰เบกเบนเบ™เบ•เบปเป‰เบ™เบ•เปเป„เบ”เป‰เบ–เบทเบเบ›เบฐเบ•เบดเบšเบฑเบ”, เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบ›เปˆเบฝเบ™เป€เบ›เบฑเบ™เป‚เบซเบกเบ”เบเบฒเบ™เบญเปˆเบฒเบ™ oplog.

เบ•เบปเบงเบขเปˆเบฒเบ‡เบเบฒเบ™เบ•เบฑเป‰เบ‡เบ„เปˆเบฒ:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

เบ”เบฑเปˆเบ‡เบ—เบตเปˆเบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบซเบฑเบ™เป„เบ”เป‰, เบšเปเปˆเบกเบตเบ—เบฒเบ‡เป€เบฅเบทเบญเบเปƒเบซเบกเปˆเป€เบกเบทเปˆเบญเบ—เบฝเบšเบเบฑเบšเบ•เบปเบงเบขเปˆเบฒเบ‡เบ—เบตเปˆเบœเปˆเบฒเบ™เบกเบฒ, เปเบ•เปˆเบงเปˆเบฒเบžเบฝเบ‡เปเบ•เปˆเบˆเปเบฒเบ™เบงเบ™เบ‚เบญเบ‡เบ—เบฒเบ‡เป€เบฅเบทเบญเบเบ—เบตเปˆเบฎเบฑเบšเบœเบดเบ”เบŠเบญเบšเปƒเบ™เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบ–เบฒเบ™เบ‚เปเป‰เบกเบนเบ™เปเบฅเบฐเบ„เปเบฒเบ™เปเบฒเบซเบ™เป‰เบฒเบ‚เบญเบ‡เป€เบ‚เบปเบฒเป€เบˆเบปเป‰เบฒเป„เบ”เป‰เบ–เบทเบเบซเบผเบธเบ”เบฅเบปเบ‡.

Settings transforms เป€เบงเบฅเบฒเบ™เบตเป‰เบžเบงเบเป€เบ‚เบปเบฒเป€เบฎเบฑเบ”เบ”เบฑเปˆเบ‡เบ•เปเปˆเป„เบ›เบ™เบตเป‰: เบ›เปˆเบฝเบ™เบŠเบทเปˆเบ‚เบญเบ‡เบซเบปเบงเบ‚เปเป‰เป€เบ›เบปเป‰เบฒเบซเบกเบฒเบเบˆเบฒเบเป‚เบ„เบ‡เบเบฒเบ™ <server_name>.<db_name>.<collection_name> ะฒ data.cdc.mongo_<db_name>.

เบ„เบงเบฒเบกโ€‹เบ—เบปเบ™โ€‹เบ—เบฒเบ™โ€‹เบ„เบงเบฒเบกโ€‹เบœเบดเบ”โ€‹เบžเบฒเบ”โ€‹

เบšเบฑเบ™เบซเบฒเบ„เบงเบฒเบกเบ—เบปเบ™เบ—เบฒเบ™เบ•เปเปˆเบ„เบงเบฒเบกเบœเบดเบ”เปเบฅเบฐเบ„เบงเบฒเบกเบžเป‰เบญเบกเบชเบนเบ‡เปƒเบ™เป€เบงเบฅเบฒเบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒเปเบกเปˆเบ™เบกเบตเบ„เบงเบฒเบกเป€เบ„เบฑเปˆเบ‡เบ•เบถเบ‡เบซเบผเบฒเบเบเปˆเบงเบฒเป€เบเบปเปˆเบฒ - เป‚เบ”เบเบชเบฐเป€เบžเบฒเบฐเปƒเบ™เป€เบงเบฅเบฒเบ—เบตเปˆเบžเบงเบเป€เบฎเบปเบฒเป€เบงเบปเป‰เบฒเบเปˆเบฝเบงเบเบฑเบšเบ‚เปเป‰เบกเบนเบ™เปเบฅเบฐเบเบฒเบ™เป€เบฎเบฑเบ”เบ—เบธเบฅเบฐเบเปเบฒ, เปเบฅเบฐเบเบฒเบ™เบ•เบดเบ”เบ•เบฒเบกเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เบ‚เปเป‰เบกเบนเบ™เบšเปเปˆเป„เบ”เป‰เบขเบนเปˆเบ‚เป‰เบฒเบ‡เปƒเบ™เป€เบฅเบทเปˆเบญเบ‡เบ™เบตเป‰. เปƒเบซเป‰เบžเบดเบˆเบฒเบฅเบฐเบ™เบฒเบชเบดเปˆเบ‡เบ—เบตเปˆเบชเบฒเบกเบฒเบ”เบœเบดเบ”เบžเบฒเบ”เปƒเบ™เบซเบผเบฑเบเบเบฒเบ™เปเบฅเบฐเบชเบดเปˆเบ‡เบ—เบตเปˆเบˆเบฐเป€เบเบตเบ”เบ‚เบถเป‰เบ™เบเบฑเบš Debezium เปƒเบ™เปเบ•เปˆเบฅเบฐเบเปเบฅเบฐเบ™เบต.

เบกเบตเบชเบฒเบกเบ—เบฒเบ‡เป€เบฅเบทเบญเบเปƒเบ™เบเบฒเบ™เบ–เบญเบ™เบ•เบปเบง:

  1. Kafka Connect เบ„เบงเบฒเบกเบฅเบปเป‰เบกเป€เบซเบผเบง. เบ–เป‰เบฒ Connect เบ–เบทเบเบ•เบฑเป‰เบ‡เบ„เปˆเบฒเปƒเบซเป‰เป€เบฎเบฑเบ”เบงเบฝเบเบขเบนเปˆเปƒเบ™เบฎเบนเบšเปเบšเบšเบเบฒเบ™เปเบˆเบเบขเบฒเบ, เบ™เบตเป‰เบ•เป‰เบญเบ‡เบเบฒเบ™เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™เบซเบผเบฒเบเบ„เบปเบ™เป€เบžเบทเปˆเบญเบ•เบฑเป‰เบ‡เบ„เปˆเบฒ group.id เบ”เบฝเบงเบเบฑเบ™. เบซเบผเบฑเบ‡เบˆเบฒเบเบ™เบฑเป‰เบ™, เบ–เป‰เบฒเบซเบ™เบถเปˆเบ‡เบ‚เบญเบ‡เบžเบงเบเป€เบ‚เบปเบฒเบฅเบปเป‰เบกเป€เบซเบฅเบง, เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบ–เบทเบเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบซเบกเปˆเปƒเบ™เบžเบฐเบ™เบฑเบเบ‡เบฒเบ™เบญเบทเปˆเบ™เปเบฅเบฐเบชเบทเบšเบ•เปเปˆเบญเปˆเบฒเบ™เบˆเบฒเบเบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ—เบตเปˆเบซเบกเบฑเป‰เบ™เบชเบฑเบ™เบเบฒเบชเบธเบ”เบ—เป‰เบฒเบเปƒเบ™เบซเบปเบงเบ‚เปเป‰เปƒเบ™ Kafka.
  2. เบเบฒเบ™เบชเบนเบ™เป€เบชเบเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบเบฑเบšเบเบธเปˆเบก Kafka. เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบžเบฝเบ‡เปเบ•เปˆเบˆเบฐเบขเบธเบ”เบเบฒเบ™เบญเปˆเบฒเบ™เบขเบนเปˆเปƒเบ™เบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ—เบตเปˆเบกเบฑเบ™เบฅเบปเป‰เบกเป€เบซเบฅเบงเบ—เบตเปˆเบˆเบฐเบชเบปเปˆเบ‡เป„เบ›เบซเบฒ Kafka เปเบฅเบฐเบžเบฐเบเบฒเบเบฒเบกเบชเบปเปˆเบ‡เบ„เบทเบ™เป€เบ›เบฑเบ™เป„เบฅเบเบฐเบˆเบปเบ™เบเปˆเบงเบฒเบ„เบงเบฒเบกเบžเบฐเบเบฒเบเบฒเบกเบชเปเบฒเป€เบฅเบฑเบ”.
  3. เบšเปเปˆเบกเบตเปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™. เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบžเบฐเบเบฒเบเบฒเบกเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ„เบทเบ™เปƒเปเปˆเบเบฑเบšเปเบซเบผเปˆเบ‡เบ•เบฒเบกเบเบฒเบ™เบเบณเบ™เบปเบ”เบ„เปˆเบฒ. เบ„เปˆเบฒเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปเบกเปˆเบ™ 16 เบžเบฐเบเบฒเบเบฒเบกเปƒเบŠเป‰ backoff exponential. เบซเบผเบฑเบ‡เบˆเบฒเบเบ„เบงเบฒเบกเบžเบฐเบเบฒเบเบฒเบกเบ—เบตเปˆเบฅเบปเป‰เบกเป€เบซเบฅเบงเบ„เบฑเป‰เบ‡เบ—เบต 16, เบงเบฝเบเบ‡เบฒเบ™เบˆเบฐเบ–เบทเบเบซเบกเบฒเบเป€เบ›เบฑเบ™ เบฅเบปเป‰เบกเป€เบซเบฅเบง เปเบฅเบฐเบกเบฑเบ™เบˆเบฐเบ•เป‰เบญเบ‡เบ–เบทเบเป€เบฅเบตเปˆเบกเบ•เบปเป‰เบ™เปƒเบซเบกเปˆเบ”เป‰เบงเบเบ•เบปเบ™เป€เบญเบ‡เป‚เบ”เบเบœเปˆเบฒเบ™เบเบฒเบ™เป‚เบ•เป‰เบ•เบญเบš Kafka Connect REST.
    • เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ PostgreSQL เบ‚เปเป‰เบกเบนเบ™เบˆเบฐเบšเปเปˆเบชเบนเบ™เป€เบชเบ, เป€เบžเบฒเบฐเบงเปˆเบฒ เบเบฒเบ™โ€‹เบ™เปเบฒโ€‹เปƒเบŠเป‰โ€‹เบชเบฐโ€‹เบฅเบฑเบญเบ”โ€‹เบ•เบดเบ‡ replication เบˆเบฐโ€‹เบ›เป‰เบญเบ‡โ€‹เบเบฑเบ™โ€‹เบเบฒเบ™โ€‹เบฅเบถเบšโ€‹เป„เบŸเบฅโ€‹เปŒ WAL เบ—เบตเปˆโ€‹เบšเปเปˆโ€‹เป„เบ”เป‰โ€‹เบญเปˆเบฒเบ™โ€‹เป‚เบ”เบโ€‹เบ•เบปเบงโ€‹เป€เบŠเบทเปˆเบญเบกโ€‹เบ•เปเปˆโ€‹. เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ™เบตเป‰, เบกเบตเบ‚เปเป‰เป€เบชเบเบ„เบท: เบ–เป‰เบฒเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป€เบ„เบทเบญเบ‚เปˆเบฒเบเบฅเบฐเบซเบงเปˆเบฒเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปเบฅเบฐ DBMS เบ–เบทเบเบฅเบปเบšเบเบงเบ™เป€เบ›เบฑเบ™เป€เบงเบฅเบฒเบ”เบปเบ™เบ™เบฒเบ™, เบกเบตเป‚เบญเบเบฒเบ”เบ—เบตเปˆเบžเบทเป‰เบ™เบ—เบตเปˆเบ”เบดเบ”เบˆเบฐเบซเบกเบปเบ”เป„เบ›, เปเบฅเบฐเบ™เบตเป‰เบญเบฒเบ”เบˆเบฐเบ™เปเบฒเป„เบ›เบชเบนเปˆเบ„เบงเบฒเบกเบฅเบปเป‰เบกเป€เบซเบผเบงเบ‚เบญเบ‡ DBMS เบ—เบฑเบ‡เบซเบกเบปเบ”.
    • เปƒเบ™เบเปเบฅเบฐเบ™เบตเบ‚เบญเบ‡ MySQL เป„เบŸเบฅเปŒ binlog เบชเบฒเบกเบฒเบ”เบซเบกเบธเบ™เป„เบ”เป‰เป‚เบ”เบ DBMS เบ•เบปเบงเบกเบฑเบ™เป€เบญเบ‡เบเปˆเบญเบ™เบ—เบตเปˆเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบ–เบทเบเบŸเบทเป‰เบ™เบŸเบน. เบ™เบตเป‰เบˆเบฐเป€เบฎเบฑเบ”เปƒเบซเป‰เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบชเบฐเบ–เบฒเบ™เบฐเบ—เบตเปˆเบฅเบปเป‰เบกเป€เบซเบฅเบง, เปเบฅเบฐเบกเบฑเบ™เบˆเบฐเบ•เป‰เบญเบ‡เบ›เบดเบ”เป€เบ›เบตเบ”เปƒเบซเบกเปˆเปƒเบ™เป‚เบซเบกเบ” snapshot เป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™เป€เบžเบทเปˆเบญเบชเบทเบšเบ•เปเปˆเบเบฒเบ™เบญเปˆเบฒเบ™เบˆเบฒเบ binlogs เป€เบžเบทเปˆเบญเบŸเบทเป‰เบ™เบŸเบนเบเบฒเบ™เป€เบฎเบฑเบ”เบงเบฝเบเบ›เบปเบเบเบฐเบ•เบด.
    • เบเปˆเบฝเบงเบเบฑเบš MongoDB. เป€เบญเบเบฐเบชเบฒเบ™เบเปˆเบฒเบงเบงเปˆเบฒ: เบžเบถเบ”เบ•เบดเบเปเบฒเบ‚เบญเบ‡เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเปƒเบ™เบเปเบฅเบฐเบ™เบตเบ—เบตเปˆเป„เบŸเบฅเปŒ log / oplog เป„เบ”เป‰เบ–เบทเบเบฅเบถเบšเบ–เบดเป‰เบกเปเบฅเบฐเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบšเปเปˆเบชเบฒเบกเบฒเบ”เบชเบทเบšเบ•เปเปˆเบญเปˆเบฒเบ™เบˆเบฒเบเบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ—เบตเปˆเบกเบฑเบ™เบ›เบฐเป„เบงเป‰เปเบกเปˆเบ™เบ„เบทเบเบฑเบ™เบชเปเบฒเบฅเบฑเบš DBMS เบ—เบฑเบ‡เบซเบกเบปเบ”. เบกเบฑเบ™เบขเบนเปˆเปƒเบ™เบ„เบงเบฒเบกเบˆเบดเบ‡เบ—เบตเปˆเบงเปˆเบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเป€เบ‚เบปเป‰เบฒเป„เบ›เปƒเบ™เบฅเบฑเบ” เบฅเบปเป‰เบกเป€เบซเบฅเบง เปเบฅเบฐเบˆเบฐเบ•เป‰เบญเบ‡เบกเบตเบเบฒเบ™เบ›เบดเบ”เป€เบ›เบตเบ”เปƒเปเปˆเปƒเบ™เป‚เปเบ” เบžเบฒเบšเบ–เปˆเบฒเบเป€เบšเบทเป‰เบญเบ‡เบ•เบปเป‰เบ™.

      เบขเปˆเบฒเบ‡เปƒเบ”เบเปเบ•เบฒเบก, เบกเบตเบ‚เปเป‰เบเบปเบเป€เบงเบฑเป‰เบ™. เบ–เป‰เบฒเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบขเบนเปˆเปƒเบ™เบชเบฐเบ–เบฒเบ™เบฐเบ•เบฑเบ”เบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเป€เบ›เบฑเบ™เป€เบงเบฅเบฒเบ”เบปเบ™เบ™เบฒเบ™ (เบซเบผเบทเบšเปเปˆเบชเบฒเบกเบฒเบ”เป€เบ‚เบปเป‰เบฒเบซเบฒเบ•เบปเบงเบขเปˆเบฒเบ‡ MongoDB), เปเบฅเบฐ oplog เป„เบ”เป‰เบ–เบทเบเบซเบกเบธเบ™เปƒเบ™เบŠเปˆเบงเบ‡เป€เบงเบฅเบฒเบ™เบตเป‰, เป€เบกเบทเปˆเบญเบเบฒเบ™เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบ–เบทเบเบŸเบทเป‰เบ™เบŸเบนเบ„เบทเบ™เปƒเบซเบกเปˆ, เบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆเบˆเบฐเบชเบทเบšเบ•เปเปˆเบญเปˆเบฒเบ™เบ‚เปเป‰เบกเบนเบ™เบˆเบฒเบเบ•เปเบฒเปเบซเบ™เปˆเบ‡เบ—เปเบฒเบญเบดเบ”เบ—เบตเปˆเบกเบตเบขเบนเปˆ. , เบ™เบฑเป‰เบ™เปเบกเปˆเบ™เป€เบซเบ”เบœเบปเบ™เบ—เบตเปˆเบšเบฒเบ‡เบ‚เปเป‰เบกเบนเบ™เปƒเบ™ Kafka เบšเปเปˆ เบˆเบฐเบ•เบต.

เบชเบฐเบซเบฅเบธเบš

Debezium เปเบกเปˆเบ™เบ›เบฐเบชเบปเบšเบเบฒเบ™เบ—เปเบฒเบญเบดเบ”เบ‚เบญเบ‡เบ‚เป‰เบญเบเบเบฑเบšเบฅเบฐเบšเบปเบš CDC เปเบฅเบฐเป„เบ”เป‰เบฎเบฑเบšเบœเบปเบ™เบ”เบตเป‚เบ”เบเบฅเบงเบก. เป‚เบ„เบ‡โ€‹เบเบฒเบ™โ€‹เบ”เบฑเปˆเบ‡โ€‹เบเปˆเบฒเบงโ€‹เป„เบ”เป‰โ€‹เปƒเบซเป‰โ€‹เบชเบดเบ™โ€‹เบšเบปเบ™โ€‹เบเบฒเบ™โ€‹เบชเบฐโ€‹เบซเบ™เบฑเบšโ€‹เบชเบฐโ€‹เบซเบ™เบนเบ™โ€‹เบ‚เบญเบ‡ DBMS เบ•เบปเป‰เบ™โ€‹เบ•เปโ€‹, เบ„เบงเบฒเบกโ€‹เบ‡เปˆเบฒเบโ€‹เบ‚เบญเบ‡โ€‹เบเบฒเบ™โ€‹เบ•เบฑเป‰เบ‡โ€‹เบ„เปˆเบฒโ€‹, เบเบฒเบ™โ€‹เบชเบฐโ€‹เบซเบ™เบฑเบšโ€‹เบชเบฐโ€‹เบซเบ™เบนเบ™โ€‹เบเบฒเบ™โ€‹เป€เบ›เบฑเบ™โ€‹เบเบธเปˆเบกโ€‹เปเบฅเบฐโ€‹เบเบฒเบ™โ€‹เป€เบ„เบทเปˆเบญเบ™โ€‹เป„เบซเบงโ€‹เบ‚เบญเบ‡โ€‹เบŠเบธเบกโ€‹เบŠเบปเบ™โ€‹. เบชเปเบฒเบฅเบฑเบšเบœเบนเป‰เบ—เบตเปˆเบชเบปเบ™เปƒเบˆเปƒเบ™เบเบฒเบ™เบ›เบฐเบ•เบดเบšเบฑเบ”, เบ‚เป‰เบฒเบžเบฐเป€เบˆเบปเป‰เบฒเปเบ™เบฐเบ™เปเบฒเปƒเบซเป‰เบ—เปˆเบฒเบ™เบญเปˆเบฒเบ™เบ„เบนเปˆเบกเบทเบชเปเบฒเบฅเบฑเบš Kafka เป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ ะธ เป€เบ”เป€เบšเบŠเบฝเบก.

เป€เบกเบทเปˆเบญเบ›เบฝเบšเบ—เบฝเบšเบเบฑเบšเบ•เบปเบงเป€เบŠเบทเปˆเบญเบกเบ•เปเปˆ JDBC เบชเปเบฒเบฅเบฑเบš Kafka Connect, เบ›เบฐเป‚เบซเบเบ”เบ•เบปเป‰เบ™เบ•เปเบ‚เบญเบ‡ Debezium เปเบกเปˆเบ™เบงเปˆเบฒเบเบฒเบ™เบ›เปˆเบฝเบ™เปเบ›เบ‡เป„เบ”เป‰เบ–เบทเบเบญเปˆเบฒเบ™เบˆเบฒเบเบšเบฑเบ™เบ—เบถเบ DBMS, เป€เบŠเบดเปˆเบ‡เบญเบฐเบ™เบธเบเบฒเบ”เปƒเบซเป‰เป„เบ”เป‰เบฎเบฑเบšเบ‚เปเป‰เบกเบนเบ™เบ”เป‰เบงเบเบ„เบงเบฒเบกเบฅเปˆเบฒเบŠเป‰เบฒเบซเบ™เป‰เบญเบเบ—เบตเปˆเบชเบธเบ”. JDBC Connector (เบชเบฐเบซเบ™เบญเบ‡เปƒเบซเป‰เป‚เบ”เบ Kafka Connect) เบชเบญเบšเบ–เบฒเบกเบ•เบฒเบ•เบฐเบฅเบฒเบ‡เบ•เบดเบ”เบ•เบฒเบกเปƒเบ™เป„เบฅเบเบฐเป€เบงเบฅเบฒเบ„เบปเบ‡เบ—เบตเปˆเปเบฅเบฐ (เบชเปเบฒเบฅเบฑเบšเป€เบซเบ”เบœเบปเบ™เบ”เบฝเบงเบเบฑเบ™) เบšเปเปˆเบชเป‰เบฒเบ‡เบ‚เปเป‰เบ„เบงเบฒเบกเป€เบกเบทเปˆเบญเบ‚เปเป‰เบกเบนเบ™เบ–เบทเบเบฅเบถเบš (เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เบชเบญเบšเบ–เบฒเบกเบ‚เปเป‰เบกเบนเบ™เบ—เบตเปˆเบšเปเปˆเบกเบตเบขเบนเปˆเบšเปˆเบญเบ™เบ™เบฑเป‰เบ™เป„เบ”เป‰เปเบ™เบงเปƒเบ”?).

เป€เบžเบทเปˆเบญเปเบเป‰เป„เบ‚เบšเบฑเบ™เบซเบฒเบ—เบตเปˆเบ„เป‰เบฒเบเบ„เบทเบเบฑเบ™, เบ—เปˆเบฒเบ™เบชเบฒเบกเบฒเบ”เป€เบญเบปเบฒเปƒเบˆเปƒเบชเปˆเบเบฑเบšเบงเบดเบ—เบตเปเบเป‰เป„เบ‚เบ•เปเปˆเป„เบ›เบ™เบตเป‰ (เบ™เบญเบเป€เบซเบ™เบทเบญเบˆเบฒเบ Debezium):

  • JDBC Connector Kafka Connect
  • เบšเบฒเบ‡เบงเบดเบ—เบตเปเบเป‰เป„เบ‚ MySQL เป€เบ—เบปเปˆเบฒเบ™เบฑเป‰เบ™:
  • Oracle Golden Gate, เปเบ•เปˆเบ™เบตเป‰เปเบกเปˆเบ™ "เบ›เบฐเป€เบžเบ”เบ™เป‰เปเบฒเบซเบ™เบฑเบ".

PS

เบญเปˆเบฒเบ™เบเบฑเบ‡เบขเบนเปˆเปƒเบ™ blog เบ‚เบญเบ‡เบžเบงเบเป€เบฎเบปเบฒ:

เปเบซเบผเปˆเบ‡เบ‚เปเป‰เบกเบนเบ™: www.habr.com

เป€เบžเบตเปˆเบกเบ„เบงเบฒเบกเบ„เบดเบ”เป€เบซเบฑเบ™