
ä»äºãããŠãããšãæ°ããæè¡ãœãªã¥ãŒã·ã§ã³ããœãããŠã§ã¢è£œåã«åºäŒãããšããããããŸããããããã«é¢ããæ
å ±ã¯ãã·ã¢èªã®ã€ã³ã¿ãŒãããäžã«ã¯ã»ãšãã©ãããŸããããã®èšäºã§ã¯ãDebezium ã䜿çšã㊠2 ã€ã®äººæ°ã®ãã DBMS (PostgreSQL ãš MongoDB) ãã Kafka ã¯ã©ã¹ã¿ãŒã« CDC ã€ãã³ããéä¿¡ããããã«æ§æããå¿
èŠããã£ããšãã®ãç§ã®æè¿ã®å®è·µäŸã䜿ã£ãŠããã®ãããªã®ã£ããã® 1 ã€ãåããŠãããããšæããŸããäœæ¥ã®çµæãšããŠçŸãããã®ã¬ãã¥ãŒèšäºãä»ã®äººã«ãšã£ãŠåœ¹ç«ã€ããšãé¡ã£ãŠããŸãã
Debezium ãš CDC ãšã¯äžè¬çã«äœã§ãã?
â CDC ãœãããŠã§ã¢ ã«ããŽãªã®ä»£è¡š ()ãããæ£ç¢ºã«ã¯ãApache Kafka Connect ãã¬ãŒã ã¯ãŒã¯ãšäºææ§ã®ããããŸããŸãª DBMS çšã®ã³ãã¯ã¿ã®ã»ããã§ãã
ãã Apache License v2.0 ã«åºã¥ããŠã©ã€ã»ã³ã¹ãããŠãããRed Hat ã®ã¹ãã³ãµãŒãšãªã£ãŠããŸããéçºã¯ 2016 幎ããç¶ç¶ãããŠãããçŸåšãMySQLãPostgreSQLãMongoDBãSQL Server ã® DBMS ãæ£åŒã«ãµããŒãããŠããŸãã Cassandra ãš Oracle çšã®ã³ãã¯ã¿ããããŸãããçŸæç¹ã§ã¯ãæ©æã¢ã¯ã»ã¹ãã¹ããŒã¿ã¹ã«ãããæ°ãããªãªãŒã¹ã¯äžäœäºææ§ãä¿èšŒããŸããã
CDC ãåŸæ¥ã®ã¢ãããŒã (ã¢ããªã±ãŒã·ã§ã³ã DBMS ããçŽæ¥ããŒã¿ãèªã¿åãå Žå) ãšæ¯èŒãããšããã®äž»ãªå©ç¹ã«ã¯ãäœé å»¶ãé«ãä¿¡é Œæ§ãå¯çšæ§ãåããè¡ã¬ãã«ã§ã®ããŒã¿å€æŽã¹ããªãŒãã³ã°ã®å®è£ ãå«ãŸããŸããæåŸã® 2 ã€ã®ãã€ã³ãã¯ãKafka ã¯ã©ã¹ã¿ãŒã CDC ã€ãã³ãã®ãªããžããªãšããŠäœ¿çšããããšã§éæãããŸãã
ãã 1 ã€ã®å©ç¹ã¯ãã€ãã³ãã®ä¿åã«åäžã®ã¢ãã«ã䜿çšãããããããšã³ã ã¢ããªã±ãŒã·ã§ã³ã¯ããŸããŸãª DBMS ã®æäœã®åŸ®åŠãªéããæ°ã«ããå¿ èŠããªãããšã§ãã
æåŸã«ãã¡ãã»ãŒãž ãããŒã«ãŒã䜿çšãããšãããŒã¿ã®å€æŽãç£èŠããã¢ããªã±ãŒã·ã§ã³ãæ°Žå¹³æ¹åã«ã¹ã±ãŒã«ã¢ãŠãã§ããããã«ãªããŸããåæã«ãããŒã¿ã¯ DBMS ããçŽæ¥ååŸãããã®ã§ã¯ãªããKafka ã¯ã©ã¹ã¿ãŒããååŸããããããããŒã¿ ãœãŒã¹ãžã®åœ±é¿ã¯æå°éã«æããããŸãã
Debezium ã¢ãŒããã¯ãã£ã«ã€ããŠ
Debezium ã®äœ¿çšã¯ã次ã®ãããªåçŽãªã¹ããŒã ã«åž°çããŸãã
DBMS (ããŒã¿ ãœãŒã¹ãšããŠ) â Kafka ã®ã³ãã¯ã¿ Connect â Apache Kafka â ã³ã³ã·ã¥ãŒã
äŸãšããŠããããžã§ã¯ã Web ãµã€ãããã®å³ã次ã«ç€ºããŸãã

ãã ãããã®æ¹åŒã¯ã·ã³ã¯ã³ãã¯ã¿ãã䜿çšã§ããªãããã«èŠããã®ã§ãããŸã奜ãã§ã¯ãããŸããã
å®éã«ã¯ç¶æ³ã¯ç°ãªããŸããããŒã¿ã¬ã€ã¯ããã£ã±ãã«ãªãã®ã§ãã (äžã®å³ã®æåŸã®ãªã³ã¯) Debezium ã®äœ¿ç𿹿³ã¯ããã ãã§ã¯ãããŸããã Apache Kafka ã«éä¿¡ãããã€ãã³ãã¯ãã¢ããªã±ãŒã·ã§ã³ã§ããŸããŸãªç¶æ³ã«å¯ŸåŠããããã«äœ¿çšã§ããŸããäŸãã°ïŒ
- ç¡é¢ä¿ãªããŒã¿ããã£ãã·ã¥ããåé€ããã
- éç¥ã®éä¿¡ã
- æ€çŽ¢ã€ã³ããã¯ã¹ã®æŽæ°ã
- ããçš®ã®ç£æ»ãã°ã
- ...
Java ã¢ããªã±ãŒã·ã§ã³ããããKafka ã¯ã©ã¹ã¿ãŒã䜿çšããå¿ èŠããªã/䜿çšããå¯èœæ§ããªãå Žåã¯ãæ¬¡ã®æ¹æ³ã䜿çšããå¯èœæ§ããããŸãã ãæãããªå©ç¹ã¯ã远å ã®ã€ã³ãã©ã¹ãã©ã¯ã㣠(ã³ãã¯ã¿ãš Kafka ã®åœ¢åŒ) ãäžèŠã«ãªãããšã§ãããã ãããã®ãœãªã¥ãŒã·ã§ã³ã¯ããŒãžã§ã³ 1.1 以ééæšå¥šãšãªãã䜿çšã¯æšå¥šãããªããªããŸãã (å°æ¥ã®ãªãªãŒã¹ã§ã¯ãµããŒããåé€ãããå¯èœæ§ããããŸã)ã
ãã®èšäºã§ã¯ãéçºè ãæšå¥šãããã©ãŒã«ã ãã¬ã©ã³ã¹ãšã¹ã±ãŒã©ããªãã£ãæäŸããã¢ãŒããã¯ãã£ã«ã€ããŠèª¬æããŸãã
ã³ãã¯ã¿æ§æ
æãéèŠãªå€ã§ããããŒã¿ã®å€åã®è¿œè·¡ãéå§ããã«ã¯ã以äžãå¿ èŠã§ãã
- ããŒã¿ ãœãŒã¹ãããŒãžã§ã³ 5.7 以éã® MySQLãPostgreSQL 9.6 以éãMongoDB 3.2 以é ();
- Apache Kafka ã¯ã©ã¹ã¿ãŒã
- Kafka Connect ã€ã³ã¹ã¿ã³ã¹ (ããŒãžã§ã³ 1.xã2.x)ã
- æ§æããã Debezium ã³ãã¯ã¿ã
æåã® 2 ã€ã®ç¹ã«åãçµã¿ãŸãã DBMS ãš Apache Kafka ã®ã€ã³ã¹ããŒã« ããã»ã¹ã¯ããã®èšäºã®ç¯å²å€ã§ãããã ãããã¹ãŠããµã³ãããã¯ã¹ã«ãããã€ããã人ã®ããã«ããµã³ãã«ãå«ãå ¬åŒãªããžããªã«ã¯æ¢è£œã®ãªããžããªããããŸãã .
æåŸã® 2 ã€ã®ç¹ã«ã€ããŠè©³ãã説æããŸãã
0.ã«ãã«ã³ãã¯ã
ãã®èšäºã®ããããã³ä»¥éã®ãã¹ãŠã®æ§æäŸã¯ãDebezium éçºè ã«ãã£ãŠé åžããã Docker ã€ã¡ãŒãžã®ã³ã³ããã¹ãã§èª¬æãããŠããŸããããã«ã¯ãå¿ èŠãªãã¹ãŠã®ãã©ã°ã€ã³ ãã¡ã€ã« (ã³ãã¯ã¿) ãå«ãŸããŠãããç°å¢å€æ°ã䜿çšã㊠Kafka Connect ã®æ§æãæäŸããŸãã
Confluent ãã Kafka Connect ã䜿çšããå Žåã¯ãå¿
èŠãªã³ãã¯ã¿ã®ãã©ã°ã€ã³ããã§æå®ããããã£ã¬ã¯ããªã«åå¥ã«è¿œå ããå¿
èŠããããŸãã plugin.path ãŸãã¯ç°å¢å€æ°çµç±ã§èšå® CLASSPATHã Kafka Connect ã¯ãŒã«ãŒãšã³ãã¯ã¿ã®èšå®ã¯ãã¯ãŒã«ãŒèµ·åã³ãã³ãã«åŒæ°ãšããŠæž¡ãããæ§æãã¡ã€ã«ã«ãã£ãŠæ±ºå®ãããŸãã詳现ã«ã€ããŠã¯ããåç
§ããŠãã ããã .
ã³ãã¯ã¿ ããŒãžã§ã³ã§ Debeizum ãã»ããã¢ããããããã»ã¹å šäœã¯ 2 段éã§å®è¡ãããŸãããããããèŠãŠã¿ãŸãããã
1. Kafka Connect ãã¬ãŒã ã¯ãŒã¯ã®ã»ããã¢ãã
ããŒã¿ã Apache Kafka ã¯ã©ã¹ã¿ãŒã«ã¹ããªãŒãã³ã°ããã«ã¯ãKafka Connect ãã¬ãŒã ã¯ãŒã¯ã§æ¬¡ã®ãããªç¹å®ã®ãã©ã¡ãŒã¿ãŒãèšå®ããŸãã
- ã¯ã©ã¹ã¿ãŒã«æ¥ç¶ããããã®ãã©ã¡ãŒã¿ãŒã
- ã³ãã¯ã¿èªäœã®æ§æãçŽæ¥ä¿åããããããã¯ã®ååã
- ã³ãã¯ã¿ãå®è¡ãããŠããã°ã«ãŒãã®åå (忣ã¢ãŒãã䜿çšãããŠããå Žå)ã
ãããžã§ã¯ãã®å ¬åŒ Docker ã€ã¡ãŒãžã¯ãç°å¢å€æ°ã䜿çšããæ§æããµããŒãããŠããŸããããã䜿çšããŸããããã§ãç»åãããŠã³ããŒãããŸãã
docker pull debezium/connectã³ãã¯ã¿ãå®è¡ããããã«å¿ èŠãªç°å¢å€æ°ã®æå°ã»ããã¯æ¬¡ã®ãšããã§ãã
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092â ã¯ã©ã¹ã¿ãŒã¡ã³ããŒã®å®å šãªãªã¹ããååŸããããã® Kafka ã¯ã©ã¹ã¿ãŒãµãŒããŒã®åæãªã¹ãã -
OFFSET_STORAGE_TOPIC=connector-offsetsâ ã³ãã¯ã¿ãçŸåšé 眮ãããŠããäœçœ®ãä¿åããããã®ãããã¯ã -
CONNECT_STATUS_STORAGE_TOPIC=connector-status- ã³ãã¯ã¿ãšãã®ã¿ã¹ã¯ã®ã¹ããŒã¿ã¹ãä¿åããããã®ãããã¯ã -
CONFIG_STORAGE_TOPIC=connector-config- ã³ãã¯ã¿æ§æããŒã¿ãšãã®ã¿ã¹ã¯ã®ä¿åã«é¢ãããããã¯ã -
GROUP_ID=1â ã³ãã¯ã¿ã¿ã¹ã¯ãå®è¡ã§ããã¯ãŒã«ãŒã®ã°ã«ãŒãã®èå¥åã忣å©çšæã«å¿ èŠ (é åžæžã¿) ã¢ãŒãã
次ã®å€æ°ã䜿çšããŠã³ã³ãããèµ·åããŸãã
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2ã¢ããã«é¢ããæ³šæäºé
ããã©ã«ãã§ã¯ãDebezium ã¯ããŒã¿ã JSON 圢åŒã§æžã蟌ã¿ãŸããããã¯ãµã³ãããã¯ã¹ãå°éã®ããŒã¿ã«ã¯èš±å®¹ãããŸãããé«è² è·ã®ããŒã¿ããŒã¹ã§ã¯åé¡ã«ãªãå¯èœæ§ããããŸãã JSON ã³ã³ããŒã¿ãŒã®ä»£ããã«ã次ã䜿çšããŠã¡ãã»ãŒãžãã·ãªã¢ã«åããããšãã§ããŸãã ãã€ããªåœ¢åŒã«å€æããããšã§ãApache Kafka ã® I/O ãµãã·ã¹ãã ã®è² è·ã軜æžãããŸãã
Avro ã䜿çšããã«ã¯ãå¥ã® (å³ãä¿åãããã)ãã³ã³ããŒã¿ãŒã®å€æ°ã¯æ¬¡ã®ããã«ãªããŸãã
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverterAvro ã®äœ¿çšãšãã®ã¬ãžã¹ããªã®èšå®ã®è©³çްã«ã€ããŠã¯ããã®èšäºã®ç¯å²ãè¶ ããŠããŸããããã«ãããããããããããã«ãJSON ã䜿çšããŸãã
2. ã³ãã¯ã¿èªäœã®èšå®
ããã§ããœãŒã¹ããããŒã¿ãèªã¿åãã³ãã¯ã¿èªäœã®æ§æã«çŽæ¥ç§»åã§ããããã«ãªããŸãã
PostgreSQL ãš MongoDB ãšãã 2 ã€ã® DBMS ã®ã³ãã¯ã¿ã®äŸãèŠãŠã¿ãŸããããç§ã«ã¯çµéšããããéãã¯ãããŸã (å°ãããšã¯ãããå Žåã«ãã£ãŠã¯é倧ã§ã!)ã
æ§æã¯ JSON 衚èšã§èšè¿°ãããPOST ãªã¯ãšã¹ãã䜿çšã㊠Kafka Connect ã«ã¢ããããŒããããŸãã
2.1.PostgreSQL
PostgreSQL ã®ã³ãã¯ã¿æ§æã®äŸ:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}ãã®ã»ããã¢ããåŸã®ã³ãã¯ã¿ã®åäœåçã¯éåžžã«ç°¡åã§ãã
- åããŠèµ·åãããšãæ§æã§æå®ãããããŒã¿ããŒã¹ã«æ¥ç¶ããã¢ãŒãã§èµ·åããŸãã åæã¹ãããã·ã§ãããæ¡ä»¶ä»ãã䜿çšããŠååŸãããããŒã¿ã®åæã»ããã Kafka ã«éä¿¡ããŸãã
SELECT * FROM table_name. - åæåãå®äºãããšãã³ãã¯ã¿ã¯ PostgreSQL WAL ãã¡ã€ã«ãã倿Žãèªã¿åãã¢ãŒãã«ãªããŸãã
䜿çšãããªãã·ã§ã³ã«ã€ããŠ:
-
nameâ 以äžã§èª¬æããæ§æã䜿çšãããã³ãã¯ã¿ã®ååãå°æ¥çã«ã¯ããã®ååã¯ãKafka Connect REST API ãä»ããŠã³ãã¯ã¿ãæäœãã (ã€ãŸããã¹ããŒã¿ã¹ã®è¡šç€º/åèµ·å/æ§æã®æŽæ°) ããã«äœ¿çšãããŸãã -
connector.classâ æ§æãããã³ãã¯ã¿ã«ãã£ãŠäœ¿çšããã DBMS ã³ãã¯ã¿ ã¯ã©ã¹ã -
plugin.nameâ WAL ãã¡ã€ã«ããã®ããŒã¿ãè«ççã«ãã³ãŒãããããã®ãã©ã°ã€ã³ã®ååãããéžæå¯èœwal2json,decoderbuffsОpgoutputãæåã® 2 ã€ã¯ãDBMS ã«é©åãªæ¡åŒµæ©èœãã€ã³ã¹ããŒã«ããå¿ èŠããããŸããpgoutputPostgreSQL ããŒãžã§ã³ 10 以éã®å Žåã远å ã®æäœã¯å¿ èŠãããŸããã -
database.*â ããŒã¿ããŒã¹ã«æ¥ç¶ããããã®ãªãã·ã§ã³ãdatabase.server.nameâ Kafka ã¯ã©ã¹ã¿ãŒå ã®ãããã¯åã®åœ¢æã«äœ¿çšããã PostgreSQL ã€ã³ã¹ã¿ã³ã¹åã -
table.include.listâ 倿Žã远跡ããããŒãã«ã®ãªã¹ãã圢åŒã§æå®ãããschema.table_name;ãšäžç·ã«äœ¿çšããããšã¯ã§ããŸããtable.exclude.list; -
heartbeat.interval.msâ ã³ãã¯ã¿ãç¹å¥ãªãããã¯ã«ããŒãããŒã ã¡ãã»ãŒãžãéä¿¡ããéé (ããªç§)ã -
heartbeat.action.queryâ åããŒãããŒã ã¡ãã»ãŒãžã®éä¿¡æã«å®è¡ããããªã¯ãšã¹ã (ãã®ãªãã·ã§ã³ã¯ããŒãžã§ã³ 1.1 ã§ç»å Ž)ã -
slot.nameâ ã³ãã¯ã¿ã«ãã£ãŠäœ¿çšãããã¬ããªã±ãŒã·ã§ã³ ã¹ãããã®ååã publication.name- åå ã³ãã¯ã¿ã䜿çšãã PostgreSQL å ãååšããªãå ŽåãDebezium ã¯äœæã詊ã¿ãŸããæ¥ç¶ã確ç«ãããŠãŒã¶ãŒã«ãã®ã¢ã¯ã·ã§ã³ã«å¯Ÿããååãªæš©éããªãå Žåãã³ãã¯ã¿ã¯ãšã©ãŒã§çµäºããŸãã-
transformsã¿ãŒã²ãã ãããã¯ã®ååã倿Žããæ¹æ³ãæ£ç¢ºã«æ±ºå®ããŸãã-
transforms.AddPrefix.typeæ£èŠè¡šçŸã䜿çšããããšã瀺ããŸãã -
transforms.AddPrefix.regexâ ã¿ãŒã²ãããããã¯ã®ååãåå®çŸ©ãããã¹ã¯ã -
transforms.AddPrefix.replacement- ç§ãã¡ãåå®çŸ©ããŠãããã®ãçŽæ¥çã«ã
-
ããŒãããŒããšå€æã®è©³çް
ããã©ã«ãã§ã¯ãã³ãã¯ã¿ã¯ã³ãããããããã©ã³ã¶ã¯ã·ã§ã³ããšã«ããŒã¿ã Kafka ã«éä¿¡ãããã® LSN (ãã° ã·ãŒã±ã³ã¹çªå·) ããµãŒãã¹ ãããã¯ã«èšé²ãããŸãã offsetãããããã³ãã¯ã¿ãããŒã¿ããŒã¹å
šäœã§ã¯ãªãããã®ããŒãã«ã®äžéš (ããŒã¿ã®æŽæ°ãé »ç¹ã«è¡ãããªãããŒãã«) ã®ã¿ãèªã¿åãããã«æ§æãããŠããå Žåã¯ã©ããªãã§ãããã?
- ã³ãã¯ã¿ã¯ WAL ãã¡ã€ã«ãèªã¿åããç£èŠããŠããããŒãã«ãžã®ãã©ã³ã¶ã¯ã·ã§ã³ã®ã³ããããæ€åºããŸããã
- ãããã£ãŠããããã¯å ãŸãã¯ã¬ããªã±ãŒã·ã§ã³ ã¹ãããå ã®çŸåšã®äœçœ®ã¯æŽæ°ãããŸããã
- ãã®çµæãWAL ãã¡ã€ã«ããã£ã¹ã¯äžã«ä¿æããããã£ã¹ã¯å®¹éãäžè¶³ããå¯èœæ§ããããŸãã
ããã§ãªãã·ã§ã³ã圹ã«ç«ã¡ãŸãã heartbeat.interval.ms О heartbeat.action.queryããããã®ãªãã·ã§ã³ããã¢ã§äœ¿çšãããšãããŒãããŒã ã¡ãã»ãŒãžãéä¿¡ããããã³ã«ãå¥ã®ããŒãã«ã®ããŒã¿å€æŽãªã¯ãšã¹ããå®è¡ã§ããããã«ãªããŸãããããã£ãŠãã³ãã¯ã¿ãçŸåš (ã¬ããªã±ãŒã·ã§ã³ ã¹ãããå
ã«) é
眮ãããŠãã LSN ã¯åžžã«æŽæ°ãããŸããããã«ãããDBMS ã¯äžèŠã«ãªã£ã WAL ãã¡ã€ã«ãåé€ã§ããŸãããªãã·ã§ã³ãã©ã®ããã«æ©èœãããã«ã€ããŠè©³ããã¯ã .
ããæ³šç®ã«å€ãããã 1 ã€ã®ãªãã·ã§ã³ã¯ã transformsãå©äŸ¿æ§ãšçŸããã®æ¹ãéèŠã§ãã...
ããã©ã«ãã§ã¯ãDebezium ã¯æ¬¡ã®åœåããªã·ãŒã䜿çšããŠãããã¯ãäœæããŸãã serverName.schemaName.tableNameãããã¯å¿
ããã䟿å©ãšã¯éããŸããããªãã·ã§ã³ transforms æ£èŠè¡šçŸã䜿çšããŠãããŒãã«ã®ãªã¹ããã€ãŸãã€ãã³ããç¹å®ã®ååã®ãããã¯ã«ã«ãŒãã£ã³ã°ããå¿
èŠãããããŒãã«ã®ãªã¹ããå®çŸ©ã§ããŸãã
ç§ãã¡ã®æ§æã§ã¯ããããšã transforms 次ã®ããšãèµ·ãããŸã: ç£èŠå¯Ÿè±¡ããŒã¿ããŒã¹ããã®ãã¹ãŠã® CDC ã€ãã³ãã¯ã次ã®ååã®ãããã¯ã«éãããŸãã data.cdc.dbnameããã以å€ã®å Žå (ãããã®èšå®ããªãå Žå)ãDebezium ã¯ããã©ã«ãã§æ¬¡ã®ãããªããŒãã«ããšã«ãããã¯ãäœæããŸãã pg-dev.public.<table_name>.
ã³ãã¯ã¿ã®å¶é
PostgreSQL ã®ã³ãã¯ã¿æ§æã®èª¬æãç· ããããã«ã¯ããã®æäœã®æ¬¡ã®æ©èœãšå¶éã«ã€ããŠèª¬æãã䟡å€ããããŸãã
- PostgreSQL çšã³ãã¯ã¿ã®æ©èœã¯ãè«çãã³ãŒãã®æŠå¿µã«äŸåããŠããŸãããããã£ãŠã圌㯠ããŒã¿ããŒã¹æ§é ã倿Žãããªã¯ãšã¹ãã远跡ããŸãã (DDL) - ãããã£ãŠããã®ããŒã¿ã¯ãããã¯ã«ã¯å«ãŸããŸããã
- ã¬ããªã±ãŒã·ã§ã³ã¹ãããã䜿çšããŠãããããã³ãã¯ã¿ã®æ¥ç¶ãå¯èœ ã®ã¿ äž»èŠãª DBMS ã€ã³ã¹ã¿ã³ã¹ã«æ¥ç¶ããŸãã
- ã³ãã¯ã¿ãããŒã¿ããŒã¹ã«æ¥ç¶ãããŠãŒã¶ãŒãèªã¿åãå°çšæš©éãæã£ãŠããå Žåã¯ãæåã®èµ·ååã«ã¬ããªã±ãŒã·ã§ã³ ã¹ããããæåã§äœæããããŒã¿ããŒã¹ã«å ¬éããå¿ èŠããããŸãã
æ§æã®é©çš
ããã§ã¯ãæ§æãã³ãã¯ã¿ã«ããŒãããŸãããã
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.jsonããŠã³ããŒããæåããã³ãã¯ã¿ãéå§ãããããšã確èªããŸãã
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}çŽ æŽãããã§ããã»ããã¢ãããå®äºããããã«äœ¿çšã§ããããã«ãªããŸãããããã§ãã³ã³ã·ã¥ãŒãã®ãµããã㊠Kafka ã«æ¥ç¶ããŠã¿ãŸãããããã®åŸãããŒãã«å ã®ãšã³ããªã远å ããã³å€æŽããŸãã
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1ãã®ãããã¯ã§ã¯ã次ã®ããã«è¡šç€ºãããŸãã
倿Žãå«ãéåžžã«é·ã JSON
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}ã©ã¡ãã®å Žåããã¬ã³ãŒãã¯ã倿Žãããã¬ã³ãŒãã®ã㌠(PK) ãšã倿Žã®æ¬è³ª (ã¬ã³ãŒãã以åã¯ã©ãã§ãã£ããããã®åŸã©ããªã£ãã) ã§æ§æãããŸãã
- ã®å Žå
INSERT: åã®å€ (before) ã«çããnullããã®åŸ - æ¿å ¥ãããè¡ã - ã®å Žå
UPDATEã§ïŒpayload.beforeã©ã€ã³ã®åã®ç¶æ ã衚瀺ãããŸããpayload.afterâ å€åã®æ¬è³ªãåããæ°ãããã®ã
2.2 ã¢ã³ãŽDB
ãã®ã³ãã¯ã¿ã¯ãæšæºã® MongoDB ã¬ããªã±ãŒã·ã§ã³ ã¡ã«ããºã ã䜿çšãããã©ã€ã㪠DBMS ããŒãã® oplog ããæ å ±ãèªã¿åããŸãã
ãã§ã«èª¬æãã PgSQL çšã®ã³ãã¯ã¿ãšåæ§ã«ãããã§ãæåã®èµ·åæã«ãã©ã€ã㪠ããŒã¿ã®ã¹ãããã·ã§ãããååŸããããã®åŸã³ãã¯ã¿ã¯ oplog èªã¿åãã¢ãŒãã«åãæ¿ãããŸãã
èšå®äŸïŒ
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}ã芧ã®ãšãããåã®äŸãšæ¯èŒããŠæ°ãããªãã·ã§ã³ã¯ãããŸããããããŒã¿ããŒã¹ãšãã®ãã¬ãã£ãã¯ã¹ãžã®æ¥ç¶ãæ åœãããªãã·ã§ã³ã®æ°ã ããæžããŸããã
èšå® transforms ä»åã¯æ¬¡ã®ããšãè¡ããŸã: ã¹ããŒãããã¿ãŒã²ãã ãããã¯ã®ååã倿ããŸãã <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
èé害æ§
çŸä»£ã«ããããã©ãŒã«ã ãã¬ã©ã³ã¹ãšé«å¯çšæ§ã®åé¡ã¯ããããŸã§ä»¥äžã«æ·±å»ã«ãªã£ãŠããŸããç¹ã«ããŒã¿ãšãã©ã³ã¶ã¯ã·ã§ã³ã«ã€ããŠè©±ããŠããå Žåããã®åé¡ã§ã¯ããŒã¿å€æŽã®è¿œè·¡ã¯éèŠã§ã¯ãããŸãããåççã«äœãåé¡ã«ãªãå¯èœæ§ãããã®ãââããããŠããããã®ã±ãŒã¹ã§Debeziumã«äœãèµ·ããã®ããèŠãŠã¿ãŸãããã
ãªããã¢ãŠã ãªãã·ã§ã³ã¯ 3 ã€ãããŸãã
- Kafka æ¥ç¶ã®å€±æã Connect ã忣ã¢ãŒãã§åäœããããã«æ§æãããŠããå Žåãè€æ°ã®ã¯ãŒã«ãŒãåã group.id ãèšå®ããå¿ èŠããããŸãããã®åŸããã®ãã¡ã® 1 ã€ã倱æããå Žåãã³ãã¯ã¿ã¯å¥ã®ã¯ãŒã«ãŒã§åèµ·åãããKafka ã®ãããã¯å ã®æåŸã«ã³ããããããäœçœ®ããèªã¿åããç¶ããŸãã
- Kafka ã¯ã©ã¹ã¿ãŒãšã®æ¥ç¶ã倱ããããã³ãã¯ã¿ã¯ãKafka ãžã®éä¿¡ã«å€±æããäœçœ®ã§èªã¿åãã忢ããæåãããŸã§å®æçã«åéä¿¡ã詊ã¿ãŸãã
- ããŒã¿ãœãŒã¹ãå©çšã§ããªããã³ãã¯ã¿ã¯ãæ§æã«åŸã£ãŠãœãŒã¹ãžã®åæ¥ç¶ã詊è¡ããŸããããã©ã«ã㯠16 åã®è©Šè¡ã§ãã ã 16 åç®ã®è©Šè¡ã倱æãããšãã¿ã¹ã¯ã¯æ¬¡ã®ããã«ããŒã¯ãããŸãã 倱æãã Kafka Connect REST ã€ã³ã¿ãŒãã§ã€ã¹ãä»ããŠæåã§åèµ·åããå¿
èŠããããŸãã
- ã®å Žå PostgreSQL ããŒã¿ã¯å€±ãããŸãããã¬ããªã±ãŒã·ã§ã³ ã¹ãããã䜿çšãããšãã³ãã¯ã¿ã«ãã£ãŠèªã¿åãããªã WAL ãã¡ã€ã«ãåé€ã§ããªããªããŸãããã®å Žåãã³ã€ã³ã«ã¯ãã€ãã¹é¢ããããŸããã³ãã¯ã¿ãš DBMS éã®ãããã¯ãŒã¯æ¥ç¶ãé·æéäžæããããšããã£ã¹ã¯å®¹éãäžè¶³ããDBMS ã®é害ãçºçããå¯èœæ§ããããŸãã DBMSå šäœã
- ã®å Žå MySQL æ¥ç¶ã埩å ãããåã«ãDBMS èªäœã«ãã£ãŠ binlog ãã¡ã€ã«ãããŒããŒã·ã§ã³ã§ããŸããããã«ãããã³ãã¯ã¿ãéå®³ç¶æ ã«ãªããéåžžã®åäœã埩å ããã«ã¯ãåæã¹ãããã·ã§ãã ã¢ãŒãã§åèµ·åããŠãã€ããªãã°ããã®èªã¿åããç¶è¡ããå¿ èŠããããŸãã
- ãªã³ MongoDBã®ãããã¥ã¡ã³ãã«ã¯ããã°/oplog ãã¡ã€ã«ãåé€ãããã³ãã¯ã¿ãäžæããäœçœ®ããèªã¿åããç¶è¡ã§ããªãå Žåã®ã³ãã¯ã¿ã®åäœã¯ããã¹ãŠã® DBMS ã§åãã§ãããšèšèŒãããŠããŸããã³ãã¯ã¿ã次ã®ç¶æ
ã«ãªãããšãæå³ããŸã 倱æãã ã¢ãŒãã§åèµ·åããå¿
èŠããããŸã åæã¹ãããã·ã§ãã.
ãã ããäŸå€ããããŸããã³ãã¯ã¿ãé·æéåæãã (ãŸã㯠MongoDB ã€ã³ã¹ã¿ã³ã¹ã«ã¢ã¯ã»ã¹ã§ãã)ããã®éã« oplog ãããŒããŒã·ã§ã³ãè¡ã£ãå Žåãæ¥ç¶ã埩å ããããšãã³ãã¯ã¿ã¯æåã«äœ¿çšå¯èœãªäœçœ®ããéãã«ããŒã¿ãèªã¿åãç¶ããŸãããã®ãããKafka ã®ããŒã¿ã®äžéšã¯ ã㌠åœãããŸãã
ãŸãšã
Debezium ã¯ç§ã«ãšã£ãŠ CDC ã·ã¹ãã ã®åããŠã®çµéšã§ãããå šäœçã«éåžžã«ããžãã£ãã§ãããã®ãããžã§ã¯ãã¯ãäž»èŠãª DBMS ã®ãµããŒããæ§æã®å®¹æããã¯ã©ã¹ã¿ãªã³ã°ã®ãµããŒããããã³æŽ»çºãªã³ãã¥ããã£ã«ãã£ãŠæ¯æãéããŸãããå®è·µã«èå³ãããæ¹ã¯ãã¬ã€ããèªãããšããå§ãããŸãã О .
Kafka Connect ã® JDBC ã³ãã¯ã¿ãšæ¯èŒããå ŽåãDebezium ã®äž»ãªå©ç¹ã¯ã倿Žã DBMS ãã°ããèªã¿åããããããæå°éã®é å»¶ã§ããŒã¿ãåä¿¡ã§ããããšã§ãã JDBC ã³ãã¯ã¿ (Kafka Connect ãã) ã¯ãç£èŠå¯Ÿè±¡ããŒãã«ã«å¯ŸããŠäžå®ã®ééã§ã¯ãšãªãå®è¡ãã(åãçç±ã§) ããŒã¿ãåé€ããããšãã«ã¡ãã»ãŒãžãçæããŸãã (ååšããªãããŒã¿ãã¯ãšãªããã«ã¯ã©ãããã°ããã§ãããã?)ã
åæ§ã®åé¡ã解決ããã«ã¯ã(Debezium ã«å ããŠ) 次ã®ãœãªã¥ãŒã·ã§ã³ã«æ³šæãæãããšãã§ããŸãã
- MySQL å°çšã®ããã€ãã®ãœãªã¥ãŒã·ã§ã³:
- , ããããããã¯å šãç°ãªããäœéã«ããŽãªãŒãã§ãã
PS
ç§ãã¡ã®ããã°ããèªã¿ãã ãã:
- «";
- «";
- «'ã
åºæïŒ habr.com
