์ ๋ ์
๋ฌด๋ฅผ ์ํํ๋ฉด์ ์๋ก์ด ๊ธฐ์ ์๋ฃจ์
/์ํํธ์จ์ด ์ ํ์ ์์ฃผ ์ ํ๋๋ฐ, ์ด์ ๋ํ ์ ๋ณด๋ ๋ฌ์์์ด ์ธํฐ๋ท์์๋ ๋ค์ ๋ถ์กฑํฉ๋๋ค. ์ด ๊ธฐ์ฌ์์๋ Debezium์ ์ฌ์ฉํ์ฌ ๋๋ฆฌ ์ฌ์ฉ๋๋ ๋ DBMS(PostgreSQL ๋ฐ MongoDB)์์ Kafka ํด๋ฌ์คํฐ๋ก CDC ์ด๋ฒคํธ๋ฅผ ์ ์กํ๋๋ก ๊ตฌ์ฑํด์ผ ํ๋ ์ต๊ทผ ์ค์ต์ ์๋ฅผ ์ฌ์ฉํ์ฌ ๊ทธ๋ฌํ ๊ณต๋ฐฑ ์ค ํ๋๋ฅผ ๋ฉ์ฐ๋ ค๊ณ ํฉ๋๋ค. ์ํ๋ ์์
์ ๊ฒฐ๊ณผ๋ก ๋ํ๋๋ ์ด ๋ฆฌ๋ทฐ ๊ธฐ์ฌ๊ฐ ๋ค๋ฅธ ์ฌ๋๋ค์๊ฒ ๋์์ด ๋๊ธฐ๋ฅผ ๋ฐ๋๋๋ค.
์ผ๋ฐ์ ์ผ๋ก Debezium๊ณผ CDC๋ ๋ฌด์์ ๋๊น?
๊ทธ๊ฒ
CDC๋ฅผ ๊ธฐ์กด ์ ๊ทผ ๋ฐฉ์(์ ํ๋ฆฌ์ผ์ด์ ์ด DBMS์์ ์ง์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ๊ฒฝ์ฐ)๊ณผ ๋น๊ตํ๋ฉด ์ฃผ์ ์ด์ ์ ๋ฎ์ ๋๊ธฐ ์๊ฐ, ๋์ ์์ ์ฑ ๋ฐ ๊ฐ์ฉ์ฑ์ผ๋ก ํ ์์ค์์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์คํธ๋ฆฌ๋ฐ์ ๊ตฌํํ๋ค๋ ๊ฒ์ ๋๋ค. ๋ง์ง๋ง ๋ ๊ฐ์ง ์ฌํญ์ Kafka ํด๋ฌ์คํฐ๋ฅผ CDC ์ด๋ฒคํธ์ฉ ์ ์ฅ์๋ก ์ฌ์ฉํ์ฌ ๋ฌ์ฑ๋ฉ๋๋ค.
๋ ๋ค๋ฅธ ์ฅ์ ์ ๋จ์ผ ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ์ด๋ฒคํธ๋ฅผ ์ ์ฅํ๋ฏ๋ก ์ต์ข ์ ํ๋ฆฌ์ผ์ด์ ์ด ์๋ก ๋ค๋ฅธ DBMS๋ฅผ ์ด์ํ๋ ๋ฐ ๋ฐ๋ฅธ ๋ฏธ๋ฌํ ์ฐจ์ด์ ๋ํด ๊ฑฑ์ ํ ํ์๊ฐ ์๋ค๋ ์ ์ ๋๋ค.
๋ง์ง๋ง์ผ๋ก ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ฌํญ์ ๋ชจ๋ํฐ๋งํ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ด ์ํ์ผ๋ก ํ์ฅ๋ ์ ์์ต๋๋ค. ๋์์ ๋ฐ์ดํฐ๋ฅผ DBMS์์ ์ง์ ๊ฐ์ ธ์ค์ง ์๊ณ Kafka ํด๋ฌ์คํฐ์์ ๊ฐ์ ธ์ค๋ฏ๋ก ๋ฐ์ดํฐ ์์ค์ ๋ฏธ์น๋ ์ํฅ์ด ์ต์ํ๋ฉ๋๋ค.
Debezium ์ํคํ ์ฒ ์ ๋ณด
Debezium์ ์ฌ์ฉํ๋ ๊ฒ์ ๋ค์๊ณผ ๊ฐ์ ๊ฐ๋จํ ๊ณํ์ผ๋ก ๊ท๊ฒฐ๋ฉ๋๋ค.
DBMS(๋ฐ์ดํฐ ์์ค) โ Kafka Connect์ ์ปค๋ฅํฐ โ Apache Kafka โ ์๋น์
์๋ฅผ ๋ค์ด, ๋ค์์ ํ๋ก์ ํธ ์น์ฌ์ดํธ์ ๋ค์ด์ด๊ทธ๋จ์ ๋๋ค.
ํ์ง๋ง ์ ๋ ์ด ๋ฐฉ์์ด ๋ณ๋ก ๋ง์์ ๋ค์ง ์์ต๋๋ค. ์๋ํ๋ฉด ์ฑํฌ ์ปค๋ฅํฐ๋ง์ ์ฌ์ฉํ๋ ๊ฒ์ด ๊ฐ๋ฅํ ๊ฒ ๊ฐ๊ธฐ ๋๋ฌธ์
๋๋ค.
์ค์ ๋ก๋ ์ํฉ์ด ๋ค๋ฆ ๋๋ค. ๋ฐ์ดํฐ ๋ ์ดํฌ๋ฅผ ์ฑ์ฐ๋ ๊ฒ์ ๋๋ค. (์ ๋ค์ด์ด๊ทธ๋จ์ ๋ง์ง๋ง ๋งํฌ) ์ด๊ฒ์ด Debezium์ ์ฌ์ฉํ๋ ์ ์ผํ ๋ฐฉ๋ฒ์ ์๋๋๋ค. Apache Kafka๋ก ์ ์ก๋ ์ด๋ฒคํธ๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ค์ํ ์ํฉ์ ์ฒ๋ฆฌํ๋ ๋ฐ ์ฌ์ฉ๋ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด:
- ์บ์์์ ๊ด๋ จ ์๋ ๋ฐ์ดํฐ๋ฅผ ์ ๊ฑฐํฉ๋๋ค.
- ์๋ฆผ ๋ณด๋ด๊ธฐ;
- ๊ฒ์ ์์ธ ์ ๋ฐ์ดํธ;
- ์ผ์ข ์ ๊ฐ์ฌ ๋ก๊ทธ;
- ...
Java ์ ํ๋ฆฌ์ผ์ด์
์ด ์๊ณ Kafka ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ ํ์/๊ฐ๋ฅ์ฑ์ด ์๋ ๊ฒฝ์ฐ ๋ค์์ ํตํด ์์
ํ ์๋ ์์ต๋๋ค.
์ด ๋ฌธ์์์๋ ๋ด๊ฒฐํจ์ฑ๊ณผ ํ์ฅ์ฑ์ ์ ๊ณตํ๋ ๊ฐ๋ฐ์๊ฐ ๊ถ์ฅํ๋ ์ํคํ ์ฒ์ ๋ํด ์ค๋ช ํฉ๋๋ค.
์ปค๋ฅํฐ ๊ตฌ์ฑ
๊ฐ์ฅ ์ค์ํ ๊ฐ์น์ธ ๋ฐ์ดํฐ์ ๋ณ๊ฒฝ ์ฌํญ์ ์ถ์ ํ๋ ค๋ฉด ๋ค์์ด ํ์ํฉ๋๋ค.
- ๋ฒ์ 5.7๋ถํฐ MySQL, PostgreSQL 9.6+, MongoDB 3.2+(
์ ์ฒด ๋ชฉ๋ก ); - ์ํ์น ์นดํ์นด ํด๋ฌ์คํฐ;
- Kafka Connect ์ธ์คํด์ค(๋ฒ์ 1.x, 2.x)
- Debezium ์ปค๋ฅํฐ๋ฅผ ๊ตฌ์ฑํ์ต๋๋ค.
์ฒ์ ๋ ์ ์ ๋ํด ์์
ํ์ญ์์ค. DBMS ๋ฐ Apache Kafka์ ์ค์น ํ๋ก์ธ์ค๋ ์ด ๊ธฐ์ฌ์ ๋ฒ์๋ฅผ ๋ฒ์ด๋ฉ๋๋ค. ๊ทธ๋ฌ๋ ์๋๋ฐ์ค์ ๋ชจ๋ ๊ฒ์ ๋ฐฐํฌํ๋ ค๋ ์ฌ๋๋ค์ ์ํด ์์ ๊ฐ ํฌํจ๋ ๊ณต์ ์ ์ฅ์์ ๋ฏธ๋ฆฌ ๋ง๋ค์ด์ง
๋ง์ง๋ง ๋ ๊ฐ์ง ์ฌํญ์ ๋ํด ๋ ์์ธํ ์ค๋ช ํ๊ฒ ์ต๋๋ค.
0. ์นดํ์นด ์ปค๋ฅํธ
์ด ๋ฌธ์์ ์ถ๊ฐ ๋ฌธ์์์๋ ๋ชจ๋ ๊ตฌ์ฑ ์์ ๊ฐ Debezium ๊ฐ๋ฐ์๊ฐ ๋ฐฐํฌํ Docker ์ด๋ฏธ์ง์ ๋งฅ๋ฝ์์ ๋ ผ์๋ฉ๋๋ค. ์ฌ๊ธฐ์๋ ํ์ํ ๋ชจ๋ ํ๋ฌ๊ทธ์ธ ํ์ผ(์ปค๋ฅํฐ)์ด ํฌํจ๋์ด ์์ผ๋ฉฐ ํ๊ฒฝ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ Kafka Connect ๊ตฌ์ฑ์ ์ ๊ณตํฉ๋๋ค.
Confluent์์ Kafka Connect๋ฅผ ์ฌ์ฉํ๋ ค๋ ๊ฒฝ์ฐ ํ์ํ ์ปค๋ฅํฐ์ ํ๋ฌ๊ทธ์ธ์ ๋ค์์ ์ง์ ๋ ๋๋ ํฐ๋ฆฌ์ ๋
๋ฆฝ์ ์ผ๋ก ์ถ๊ฐํด์ผ ํฉ๋๋ค. plugin.path
๋๋ ํ๊ฒฝ ๋ณ์๋ฅผ ํตํด ์ค์ CLASSPATH
. Kafka Connect ์์
์ ๋ฐ ์ปค๋ฅํฐ์ ๋ํ ์ค์ ์ ์์
์ ์์ ๋ช
๋ น์ ์ธ์๋ก ์ ๋ฌ๋๋ ๊ตฌ์ฑ ํ์ผ์ ํตํด ๊ฒฐ์ ๋ฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ๋ค์์ ์ฐธ์กฐํ์ธ์.
์ปค๋ฅํฐ ๋ฒ์ ์์ Debeizum์ ์ค์ ํ๋ ์ ์ฒด ํ๋ก์ธ์ค๋ ๋ ๋จ๊ณ๋ก ์ํ๋ฉ๋๋ค. ๊ฐ๊ฐ์ ์ดํด๋ณด๊ฒ ์ต๋๋ค.
1. Kafka Connect ํ๋ ์์ํฌ ์ค์
๋ฐ์ดํฐ๋ฅผ Apache Kafka ํด๋ฌ์คํฐ๋ก ์คํธ๋ฆฌ๋ฐํ๊ธฐ ์ํด ๋ค์๊ณผ ๊ฐ์ ํน์ ๋งค๊ฐ๋ณ์๊ฐ Kafka Connect ํ๋ ์์ํฌ์ ์ค์ ๋ฉ๋๋ค.
- ํด๋ฌ์คํฐ ์ฐ๊ฒฐ์ ์ํ ๋งค๊ฐ๋ณ์
- ์ปค๋ฅํฐ ์์ฒด์ ๊ตฌ์ฑ์ด ์ง์ ์ ์ฅ๋ ์ฃผ์ ์ ์ด๋ฆ,
- ์ปค๋ฅํฐ๊ฐ ์คํ ์ค์ธ ๊ทธ๋ฃน์ ์ด๋ฆ(๋ถ์ฐ ๋ชจ๋๊ฐ ์ฌ์ฉ๋๋ ๊ฒฝ์ฐ)
ํ๋ก์ ํธ์ ๊ณต์ Docker ์ด๋ฏธ์ง๋ ํ๊ฒฝ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ ๊ตฌ์ฑ์ ์ง์ํฉ๋๋ค. ์ด๊ฒ์ด ์ฐ๋ฆฌ๊ฐ ์ฌ์ฉํ ๊ฒ์ ๋๋ค. ๋ฐ๋ผ์ ์ด๋ฏธ์ง๋ฅผ ๋ค์ด๋ก๋ํ์ญ์์ค.
docker pull debezium/connect
์ปค๋ฅํฐ๋ฅผ ์คํํ๋ ๋ฐ ํ์ํ ์ต์ ํ๊ฒฝ ๋ณ์ ์ธํธ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
โ ์ ์ฒด ํด๋ฌ์คํฐ ๊ตฌ์ฑ์ ๋ชฉ๋ก์ ์ป๊ธฐ ์ํ Kafka ํด๋ฌ์คํฐ ์๋ฒ์ ์ด๊ธฐ ๋ชฉ๋ก -
OFFSET_STORAGE_TOPIC=connector-offsets
- ์ปค๋ฅํฐ๊ฐ ํ์ฌ ์์นํ ์์น๋ฅผ ์ ์ฅํ๊ธฐ ์ํ ์ฃผ์ ; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
โ ์ปค๋ฅํฐ ๋ฐ ํด๋น ์์ ์ ์ํ๋ฅผ ์ ์ฅํ๊ธฐ ์ํ ์ฃผ์ ์ ๋๋ค. -
CONFIG_STORAGE_TOPIC=connector-config
โ ์ปค๋ฅํฐ ๊ตฌ์ฑ ๋ฐ์ดํฐ ๋ฐ ํด๋น ์์ ์ ์ ์ฅํ๋ ์ฃผ์ ์ ๋๋ค. -
GROUP_ID=1
โ ์ปค๋ฅํฐ ์์ ์ด ์คํ๋ ์ ์๋ ์์ ์ ๊ทธ๋ฃน์ ์๋ณ์์ ๋๋ค. ๋ถ์ฐ ์ฌ์ฉ์ ํ์ํ (๋ถ์ฐ) ๋ชจ๋.
๋ค์ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ ์ปจํ ์ด๋๋ฅผ ์์ํฉ๋๋ค.
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
Avro์ ๋ํ ์ฐธ๊ณ ์ฌํญ
๊ธฐ๋ณธ์ ์ผ๋ก Debezium์ JSON ํ์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์์ฑํฉ๋๋ค. ์ด๋ ์๋๋ฐ์ค ๋ฐ ์๋์ ๋ฐ์ดํฐ์๋ ํ์ฉ๋์ง๋ง ๋ก๋๊ฐ ๋ง์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์๋ ๋ฌธ์ ๊ฐ ๋ ์ ์์ต๋๋ค. JSON ๋ณํ๊ธฐ์ ๋์์ ๋ค์์ ์ฌ์ฉํ์ฌ ๋ฉ์์ง๋ฅผ ์ง๋ ฌํํ๋ ๊ฒ์
๋๋ค.
Avro๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด ๋ณ๋์ ๋ฐฐํฌ๊ฐ ํ์ํฉ๋๋ค.
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
Avro ์ฌ์ฉ ๋ฐ ๋ ์ง์คํธ๋ฆฌ ์ค์ ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ด ๋ฌธ์์ ๋ฒ์๋ฅผ ๋ฒ์ด๋ฉ๋๋ค. ๋ ๋ช ํํ๊ฒ ์ค๋ช ํ๊ธฐ ์ํด JSON์ ์ฌ์ฉํ๊ฒ ์ต๋๋ค.
2. ์ปค๋ฅํฐ ์์ฒด ๊ตฌ์ฑ
์ด์ ์์ค์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ์ปค๋ฅํฐ ์์ฒด์ ๊ตฌ์ฑ์ผ๋ก ์ง์ ์ด๋ํ ์ ์์ต๋๋ค.
๋ DBMS์ธ PostgreSQL๊ณผ MongoDB์ ๋ํ ์ปค๋ฅํฐ์ ์๋ฅผ ์ดํด๋ณด๊ฒ ์ต๋๋ค. ์ฌ๊ธฐ์๋ ์ ๊ฐ ๊ฒฝํ์ด ์๊ณ ์ฐจ์ด์ ์ด ์์ต๋๋ค(์์ง๋ง ์ด๋ค ๊ฒฝ์ฐ์๋ ์ค์ํฉ๋๋ค!).
๊ตฌ์ฑ์ JSON ํ๊ธฐ๋ฒ์ผ๋ก ์ค๋ช ๋๋ฉฐ POST ์์ฒญ์ ์ฌ์ฉํ์ฌ Kafka Connect์ ์ ๋ก๋๋ฉ๋๋ค.
2.1. ํฌ์คํธ๊ทธ๋ SQL
PostgreSQL์ฉ ์ปค๋ฅํฐ ๊ตฌ์ฑ ์:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
์ด ์ค์ ํ ์ปค๋ฅํฐ ์๋ ์๋ฆฌ๋ ๋งค์ฐ ๊ฐ๋จํฉ๋๋ค.
- ์ฒ์ ์คํ ์ ๊ตฌ์ฑ์ ์ง์ ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐํ์ฌ ๋ชจ๋๋ก ์์๋ฉ๋๋ค. ์ด๊ธฐ ์ค๋
์ท, ์กฐ๊ฑด์ ์ฌ์ฉํ์ฌ ์ป์ ์ด๊ธฐ ๋ฐ์ดํฐ ์ธํธ๋ฅผ Kafka๋ก ๋ณด๋
๋๋ค.
SELECT * FROM table_name
. - ์ด๊ธฐํ๊ฐ ์๋ฃ๋๋ฉด ์ปค๋ฅํฐ๋ PostgreSQL WAL ํ์ผ์ ๋ณ๊ฒฝ ์ฌํญ์ ์ฝ๋ ๋ชจ๋๋ก ์ ํ๋ฉ๋๋ค.
์ฌ์ฉ๋ ์ต์ ์ ๋ณด:
-
name
โ ์๋ ์ค๋ช ๋ ๊ตฌ์ฑ์ด ์ฌ์ฉ๋๋ ์ปค๋ฅํฐ์ ์ด๋ฆ ์์ผ๋ก ์ด ์ด๋ฆ์ Kafka Connect REST API๋ฅผ ํตํด ์ปค๋ฅํฐ ์์ (์ฆ, ์ํ ๋ณด๊ธฐ/๋ค์ ์์/๊ตฌ์ฑ ์ ๋ฐ์ดํธ)์ ์ฌ์ฉ๋ฉ๋๋ค. -
connector.class
โ ๊ตฌ์ฑ๋ ์ปค๋ฅํฐ์์ ์ฌ์ฉํ DBMS ์ปค๋ฅํฐ ํด๋์ค์ ๋๋ค. -
plugin.name
โ WAL ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ๋ ผ๋ฆฌ์ ์ผ๋ก ๋์ฝ๋ฉํ๊ธฐ ์ํ ํ๋ฌ๊ทธ์ธ์ ์ด๋ฆ์ ๋๋ค. ์ ํ ๊ฐ๋ฅwal2json
,decoderbuffs
ะธpgoutput
. ์ฒ์ ๋ ๊ฐ๋ DBMS์ ์ ์ ํ ํ์ฅ์ ์ค์นํด์ผ ํฉ๋๋ค.pgoutput
PostgreSQL ๋ฒ์ 10 ์ด์์ ๊ฒฝ์ฐ ์ถ๊ฐ ์กฐ์์ด ํ์ํ์ง ์์ต๋๋ค. -
database.*
โ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ต์ . ์ฌ๊ธฐ์database.server.name
โ Kafka ํด๋ฌ์คํฐ์์ ์ฃผ์ ์ด๋ฆ์ ๊ตฌ์ฑํ๋ ๋ฐ ์ฌ์ฉ๋๋ PostgreSQL ์ธ์คํด์ค ์ด๋ฆ์ ๋๋ค. -
table.include.list
โ ๋ณ๊ฒฝ ์ฌํญ์ ์ถ์ ํ๋ ค๋ ํ ์ด๋ธ ๋ชฉ๋ก ํ์์ผ๋ก ์ง์ schema.table_name
; ์ ํจ๊ป ์ฌ์ฉํ ์ ์์ต๋๋ค.table.exclude.list
; -
heartbeat.interval.ms
โ ์ปค๋ฅํฐ๊ฐ ํํธ๋นํธ ๋ฉ์์ง๋ฅผ ํน์ ์ฃผ์ ๋ก ๋ณด๋ด๋ ๊ฐ๊ฒฉ(๋ฐ๋ฆฌ์ด)์ ๋๋ค. -
heartbeat.action.query
โ ๊ฐ ํํธ๋นํธ ๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ์คํ๋ ์์ฒญ(์ต์ ์ ๋ฒ์ 1.1์ ๋ํ๋จ) -
slot.name
โ ์ปค๋ฅํฐ์์ ์ฌ์ฉํ ๋ณต์ ์ฌ๋กฏ์ ์ด๋ฆ์ ๋๋ค. publication.name
- ์ด๋ฆ์ถํ ์ปค๋ฅํฐ๊ฐ ์ฌ์ฉํ๋ PostgreSQL์์. ๋ง์ฝ ์กด์ฌํ์ง ์๋๋ค๋ฉด, Debezium์ ๊ทธ๊ฒ์ ์์ฑํ๋ ค๊ณ ์๋ํ ๊ฒ์ ๋๋ค. ์ฐ๊ฒฐ๋ ์ฌ์ฉ์์๊ฒ ์ด ์์ ์ ๋ํ ์ถฉ๋ถํ ๊ถํ์ด ์์ผ๋ฉด ์ปค๋ฅํฐ๊ฐ ์ค๋ฅ์ ํจ๊ป ์ข ๋ฃ๋ฉ๋๋ค.-
transforms
๋์ ์ฃผ์ ์ ์ด๋ฆ์ ๋ณ๊ฒฝํ๋ ๋ฐฉ๋ฒ์ ์ ํํ๊ฒ ๊ฒฐ์ ํฉ๋๋ค.-
transforms.AddPrefix.type
์ ๊ท์์ ์ฌ์ฉํ ๊ฒ์์ ๋ํ๋ ๋๋ค. -
transforms.AddPrefix.regex
โ ๋์ ์ฃผ์ ์ ์ด๋ฆ์ ์ฌ์ ์ํ๋ ๋ง์คํฌ -
transforms.AddPrefix.replacement
- ์ฐ๋ฆฌ๊ฐ ์ฌ์ ์ํ๋ ๊ฒ์ ์ง์ ์ ์ผ๋ก.
-
ํํธ๋นํธ ๋ฐ ๋ณํ์ ๋ํด ์์ธํ ์์๋ณด๊ธฐ
๊ธฐ๋ณธ์ ์ผ๋ก ์ปค๋ฅํฐ๋ ์ปค๋ฐ๋ ๊ฐ ํธ๋์ญ์
์ ๋ํด Kafka์ ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด๊ณ ํด๋น LSN(๋ก๊ทธ ์ํ์ค ๋ฒํธ)์ด ์๋น์ค ํญ๋ชฉ์ ๊ธฐ๋ก๋ฉ๋๋ค. offset
. ๊ทธ๋ฌ๋ ์ปค๋ฅํฐ๊ฐ ์ ์ฒด ๋ฐ์ดํฐ๋ฒ ์ด์ค๊ฐ ์๋๋ผ ํด๋น ํ
์ด๋ธ์ ์ผ๋ถ(๋ฐ์ดํฐ ์
๋ฐ์ดํธ๊ฐ ์์ฃผ ๋ฐ์ํ์ง ์์)๋ง ์ฝ๋๋ก ๊ตฌ์ฑ๋๋ฉด ์ด๋ป๊ฒ ๋ ๊น์?
- ์ปค๋ฅํฐ๋ WAL ํ์ผ์ ์ฝ๊ณ ๋ชจ๋ํฐ๋ง ์ค์ธ ํ ์ด๋ธ์ ๋ํ ํธ๋์ญ์ ์ปค๋ฐ์ ๊ฐ์งํ์ง ์์ต๋๋ค.
- ๋ฐ๋ผ์ ์ฃผ์ ๋ ๋ณต์ ์ฌ๋กฏ์์ ํ์ฌ ์์น๋ฅผ ์ ๋ฐ์ดํธํ์ง ์์ต๋๋ค.
- ๊ทธ๋ฌ๋ฉด WAL ํ์ผ์ด ๋์คํฌ์ ๋ณด๊ด๋์ด ๋์คํฌ ๊ณต๊ฐ์ด ๋ถ์กฑํด์ง ์ ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ์ด๊ฒ์ด ๋ฐ๋ก ์ต์
์ด ๊ตฌ์ถ๋๋ ๊ณณ์
๋๋ค. heartbeat.interval.ms
ะธ heartbeat.action.query
. ์ด๋ฌํ ์ต์
์ ์์ผ๋ก ์ฌ์ฉํ๋ฉด ํํธ๋นํธ ๋ฉ์์ง๊ฐ ์ ์ก๋ ๋๋ง๋ค ๋ณ๋์ ํ
์ด๋ธ์์ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์์ฒญ์ ์ํํ ์ ์์ต๋๋ค. ๋ฐ๋ผ์ ์ปค๋ฅํฐ๊ฐ ํ์ฌ ์์นํ(๋ณต์ ์ฌ๋กฏ์ ์๋) LSN์ ์ง์์ ์ผ๋ก ์
๋ฐ์ดํธ๋ฉ๋๋ค. ์ด๋ฅผ ํตํด DBMS๋ ๋ ์ด์ ํ์ํ์ง ์์ WAL ํ์ผ์ ์ ๊ฑฐํ ์ ์์ต๋๋ค. ์ต์
์ด ์๋ํ๋ ๋ฐฉ์์ ๋ํด ์์ธํ ์์๋ณผ ์ ์์ต๋๋ค.
์ฃผ์ ๊น๊ฒ ์ฃผ๋ชฉํ ๋งํ ๋ ๋ค๋ฅธ ์ต์
์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. transforms
. ํธ๋ฆฌํจ๊ณผ ์๋ฆ๋ค์์ด ๋ ์ค์ํ์ง๋ง...
๊ธฐ๋ณธ์ ์ผ๋ก Debezium์ ๋ค์ ๋ช
๋ช
์ ์ฑ
์ ์ฌ์ฉํ์ฌ ์ฃผ์ ๋ฅผ ์์ฑํฉ๋๋ค. serverName.schemaName.tableName
. ์ด๋ ํญ์ ํธ๋ฆฌํ์ง ์์ ์ ์์ต๋๋ค. ์ต์
transforms
์ ๊ท์์ ์ฌ์ฉํ์ฌ ํน์ ์ด๋ฆ์ ๊ฐ์ง ์ฃผ์ ๋ก ๋ผ์ฐํ
ํด์ผ ํ๋ ์ด๋ฒคํธ์ธ ํ
์ด๋ธ ๋ชฉ๋ก์ ์ ์ํ ์ ์์ต๋๋ค.
์ฐ๋ฆฌ ๊ตฌ์ฑ์์๋ ๊ฐ์ฌํฉ๋๋ค. transforms
๋ค์๊ณผ ๊ฐ์ ์ผ์ด ๋ฐ์ํฉ๋๋ค. ๋ชจ๋ํฐ๋ง๋๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ชจ๋ CDC ์ด๋ฒคํธ๋ ๋ค์ ์ด๋ฆ์ ์ฃผ์ ๋ก ์ด๋ํฉ๋๋ค. data.cdc.dbname
. ๊ทธ๋ ์ง ์์ผ๋ฉด (์ด ์ค์ ์์ด) Debezium์ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ค์๊ณผ ๊ฐ์ ๊ฐ ํ
์ด๋ธ์ ๋ํ ์ฃผ์ ๋ฅผ ์์ฑํฉ๋๋ค. pg-dev.public.<table_name>
.
์ปค๋ฅํฐ ์ ํ
PostgreSQL์ฉ ์ปค๋ฅํฐ ๊ตฌ์ฑ์ ๋ํ ์ค๋ช ์ ๋ง๋ฌด๋ฆฌํ๋ ค๋ฉด ํด๋น ์์ ์ ๋ค์ ๊ธฐ๋ฅ/์ ํ ์ฌํญ์ ๋ํด ์ด์ผ๊ธฐํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
- PostgreSQL์ฉ ์ปค๋ฅํฐ์ ๊ธฐ๋ฅ์ ๋ ผ๋ฆฌ์ ๋์ฝ๋ฉ ๊ฐ๋ ์ ์์กดํฉ๋๋ค. ๊ทธ๋ฌ๋ฏ๋ก ๊ทธ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ตฌ์กฐ ๋ณ๊ฒฝ ์์ฒญ์ ์ถ์ ํ์ง ์์ต๋๋ค. (DDL) - ๋ฐ๋ผ์ ์ด ๋ฐ์ดํฐ๋ ์ฃผ์ ์ ํฌํจ๋์ง ์์ต๋๋ค.
- ๋ณต์ ์ฌ๋กฏ์ ์ฌ์ฉํ๋ฏ๋ก ์ปค๋ฅํฐ ์ฐ๊ฒฐ์ด ๊ฐ๋ฅํฉ๋๋ค. ๋ง ์ฃผ์ DBMS ์ธ์คํด์ค์.
- ์ปค๋ฅํฐ๊ฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ฐ๊ฒฐ๋๋ ์ฌ์ฉ์์๊ฒ ์ฝ๊ธฐ ์ ์ฉ ๊ถํ์ด ์๋ ๊ฒฝ์ฐ ์ฒ์ ์์ํ๊ธฐ ์ ์ ๋ณต์ ์ฌ๋กฏ์ ์๋์ผ๋ก ์์ฑํ๊ณ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ฒ์ํด์ผ ํฉ๋๋ค.
๊ตฌ์ฑ ์ ์ฉ
์ด์ ๊ตฌ์ฑ์ ์ปค๋ฅํฐ์ ๋ก๋ํด ๋ณด๊ฒ ์ต๋๋ค.
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
๋ค์ด๋ก๋๊ฐ ์ฑ๊ณตํ๊ณ ์ปค๋ฅํฐ๊ฐ ์์๋์๋์ง ํ์ธํฉ๋๋ค.
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
์ข์ต๋๋ค. ์ค์ ์ด ์๋ฃ๋์์ผ๋ฉฐ ์ฌ์ฉํ ์ค๋น๊ฐ ๋์์ต๋๋ค. ์ด์ ์๋น์์ธ ์ฒํ๊ณ Kafka์ ์ฐ๊ฒฐํ ํ ํ ์ด๋ธ์ ํญ๋ชฉ์ ์ถ๊ฐํ๊ณ ๋ณ๊ฒฝํ๊ฒ ์ต๋๋ค.
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
์ฐ๋ฆฌ ์ฃผ์ ์์๋ ๋ค์๊ณผ ๊ฐ์ด ํ์๋ฉ๋๋ค.
๋ณ๊ฒฝ ์ฌํญ์ด ํฌํจ๋ ๋งค์ฐ ๊ธด JSON
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
๋ ๊ฒฝ์ฐ ๋ชจ๋ ๋ ์ฝ๋๋ ๋ณ๊ฒฝ๋ ๋ ์ฝ๋์ ํค(PK)์ ๋ณ๊ฒฝ์ ๋ณธ์ง, ์ฆ ๋ ์ฝ๋๊ฐ ์ด์ ์ ๋ฌด์์ด์๋์ง, ์ดํ์ ๋ฌด์์ด ๋์๋์ง๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.
- ์ ๊ฒฝ์ฐ
INSERT
: ์ด์ ๊ฐ(before
) ๊ฐ์null
, ๊ทธ๋ฆฌ๊ณ ๊ทธ ์ดํ - ์ฝ์ ๋ ๋ผ์ธ. - ์ ๊ฒฝ์ฐ
UPDATE
:์์payload.before
๋ผ์ธ์ ์ด์ ์ํ๊ฐ ํ์๋๊ณ ,payload.after
โ ๋ณํ์ ๋ณธ์ง์ ์ง๋ ์๋ก์ด ๊ฒ.
2.2 ๋ชฝ๊ณ DB
์ด ์ปค๋ฅํฐ๋ ํ์ค MongoDB ๋ณต์ ๋ฉ์ปค๋์ฆ์ ์ฌ์ฉํ์ฌ ๊ธฐ๋ณธ DBMS ๋ ธ๋์ oplog์์ ์ ๋ณด๋ฅผ ์ฝ์ต๋๋ค.
์ด๋ฏธ ์ค๋ช ํ PgSQL์ฉ ์ปค๋ฅํฐ์ ์ ์ฌํ๊ฒ ์ฌ๊ธฐ์์๋ ์ฒ์ ์์ํ ๋ ๊ธฐ๋ณธ ๋ฐ์ดํฐ ์ค๋ ์ท์ด ์์ฑ๋๊ณ ๊ทธ ํ ์ปค๋ฅํฐ๊ฐ oplog ์ฝ๊ธฐ ๋ชจ๋๋ก ์ ํ๋ฉ๋๋ค.
๊ตฌ์ฑ ์ :
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
๋ณด์๋ค์ํผ ์ด์ ์์ ๋น๊ตํ์ฌ ์ฌ๊ธฐ์๋ ์๋ก์ด ์ต์ ์ด ์์ง๋ง ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ๋ด๋นํ๋ ์ต์ ์์ ์ ๋์ฌ๋ง ์ค์์ต๋๋ค.
์ค์ transforms
์ด๋ฒ์๋ ๋ค์์ ์ํํฉ๋๋ค. ์คํค๋ง์์ ๋์ ์ฃผ์ ์ ์ด๋ฆ์ ๋ณํํฉ๋๋ค. <server_name>.<db_name>.<collection_name>
ะฒ data.cdc.mongo_<db_name>
.
๊ฒฐํจ ํ์ฉ
์ฐ๋ฆฌ ์๋์ ๋ด๊ฒฐํจ์ฑ ๋ฐ ๊ณ ๊ฐ์ฉ์ฑ ๋ฌธ์ ๋ ๊ทธ ์ด๋ ๋๋ณด๋ค ์ฌ๊ฐํฉ๋๋ค. ํนํ ๋ฐ์ดํฐ์ ํธ๋์ญ์ ์ ๊ดํด ์ด์ผ๊ธฐํ ๋ ๋ฐ์ดํฐ ๋ณ๊ฒฝ ์ฌํญ์ ์ถ์ ํ๋ ๊ฒ์ด ์ด ๋ฌธ์ ์์ ์ ์ธ๋์ง ์์ต๋๋ค. ์์น์ ์ผ๋ก ๋ฌด์์ด ์๋ชป๋ ์ ์๋์ง, ๊ฐ ๊ฒฝ์ฐ์ Debezium์ ์ด๋ค ์ผ์ด ์ผ์ด๋ ์ง ์ดํด๋ณด๊ฒ ์ต๋๋ค.
์ตํธ์์ ์ต์ ์๋ ์ธ ๊ฐ์ง๊ฐ ์์ต๋๋ค.
- ์นดํ์นด ์ฐ๊ฒฐ ์คํจ. Connect๊ฐ ๋ถ์ฐ ๋ชจ๋์์ ์๋ํ๋๋ก ๊ตฌ์ฑ๋ ๊ฒฝ์ฐ ๋์ผํ group.id๋ฅผ ์ค์ ํ๋ ค๋ฉด ์ฌ๋ฌ ์์ ์๊ฐ ํ์ํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ ๊ทธ ์ค ํ๋๊ฐ ์คํจํ๋ฉด ์ปค๋ฅํฐ๊ฐ ๋ค๋ฅธ ์์ ์์์ ๋ค์ ์์๋๊ณ Kafka ์ฃผ์ ์์ ๋ง์ง๋ง์ผ๋ก ์ปค๋ฐ๋ ์์น๋ถํฐ ๊ณ์ ์ฝ์ต๋๋ค.
- Kafka ํด๋ฌ์คํฐ์์ ์ฐ๊ฒฐ ๋๊น. ์ปค๋ฅํฐ๋ ๋จ์ํ Kafka๋ก ์ ์ก์ ์คํจํ ์์น์์ ์ฝ๊ธฐ๋ฅผ ์ค์งํ๊ณ ์๋๊ฐ ์ฑ๊ณตํ ๋๊น์ง ์ฃผ๊ธฐ์ ์ผ๋ก ์ฌ์ ์ก์ ์๋ํฉ๋๋ค.
- ๋ฐ์ดํฐ ์์ค๋ฅผ ์ฌ์ฉํ ์ ์์. ์ปค๋ฅํฐ๋ ๊ตฌ์ฑ๋ ๋๋ก ์์ค์ ๋ค์ ์ฐ๊ฒฐ์ ์๋ํฉ๋๋ค. ๊ธฐ๋ณธ๊ฐ์ ๋ค์์ ์ฌ์ฉํ์ฌ 16๋ฒ ์๋ํ๋ ๊ฒ์
๋๋ค.
์ง์ ๋ฐฑ์คํ . 16๋ฒ์งธ ์๋์ ์คํจํ๋ฉด ํด๋น ์์ ์ ๋ค์๊ณผ ๊ฐ์ด ํ์๋ฉ๋๋ค. ์คํจํ Kafka Connect REST ์ธํฐํ์ด์ค๋ฅผ ํตํด ์๋์ผ๋ก ๋ค์ ์์ํด์ผ ํฉ๋๋ค.- ์ ๊ฒฝ์ฐ PostgreSQL ๋ฐ์ดํฐ๋ ์์ค๋์ง ์์ต๋๋ค. ์๋ํ๋ฉด ๋ณต์ ์ฌ๋กฏ์ ์ฌ์ฉํ๋ฉด ์ปค๋ฅํฐ๊ฐ ์ฝ์ง ์๋ WAL ํ์ผ์ ์ญ์ ํ๋ ๊ฒ์ ๋ฐฉ์งํ ์ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ ์ฝ์ธ์ ๋จ์ ๋ ์์ต๋๋ค. ์ปค๋ฅํฐ์ DBMS ์ฌ์ด์ ๋คํธ์ํฌ ์ฐ๊ฒฐ์ด ์ฅ๊ธฐ๊ฐ ์ค๋จ๋๋ฉด ๋์คํฌ ๊ณต๊ฐ์ด ๋ถ์กฑํด์ง ๊ฐ๋ฅ์ฑ์ด ์๊ณ ์ด๋ก ์ธํด ์ฅ์ ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค. ์ ์ฒด DBMS.
- ์ ๊ฒฝ์ฐ MySQL์ binlog ํ์ผ์ ์ฐ๊ฒฐ์ด ๋ณต์๋๊ธฐ ์ ์ DBMS ์์ฒด์ ์ํด ํ์ ๋ ์ ์์ต๋๋ค. ์ด๋ก ์ธํด ์ปค๋ฅํฐ๊ฐ ์คํจ ์ํ๋ก ์ ํ๋๊ณ , ์ ์ ์๋์ ๋ณต์ํ๋ ค๋ฉด ์ด๊ธฐ ์ค๋ ์ท ๋ชจ๋์์ ๋ค์ ์์ํ์ฌ binlog์์ ๊ณ์ ์ฝ์ด์ผ ํฉ๋๋ค.
- ์ฝ MongoDB์. ์ค๋ช
์์ ๋ฐ๋ฅด๋ฉด ๋ก๊ทธ/oplog ํ์ผ์ด ์ญ์ ๋๊ณ ์ปค๋ฅํฐ๊ฐ ์ค๋จ๋ ์์น์์ ๊ณ์ ์ฝ์ ์ ์๋ ๊ฒฝ์ฐ ์ปค๋ฅํฐ์ ๋์์ ๋ชจ๋ DBMS์ ๋ํด ๋์ผํฉ๋๋ค. ์ด๋ ์ปค๋ฅํฐ๊ฐ ์ํ๋ก ์ ํ๋จ์ ์๋ฏธํฉ๋๋ค. ์คํจํ ๋ชจ๋์์ ๋ค์ ์์ํด์ผ ํฉ๋๋ค. ์ด๊ธฐ ์ค๋
์ท.
๊ทธ๋ฌ๋ ์์ธ๊ฐ ์์ต๋๋ค. ์ปค๋ฅํฐ ์ฐ๊ฒฐ์ด ์ค๋ซ๋์ ๋์ด์ง๊ณ (๋๋ MongoDB ์ธ์คํด์ค์ ๋๋ฌํ ์ ์์) ์ด ์๊ฐ ๋์ oplog๊ฐ ํ์ ํ ๊ฒฝ์ฐ ์ฐ๊ฒฐ์ด ๋ณต์๋๋ฉด ์ปค๋ฅํฐ๋ ์ฌ์ฉ ๊ฐ๋ฅํ ์ฒซ ๋ฒ์งธ ์์น์์ ์นจ์ฐฉํ๊ฒ ๊ณ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ต๋๋ค. ์ด๊ฒ์ด ๋ฐ๋ก Kafka์ ์ผ๋ถ ๋ฐ์ดํฐ๊ฐ ์๋ ์น ๊ฒ์ด๋ค.
๊ฒฐ๋ก
Debezium์ CDC ์์คํ
์ ๋ํ ๋์ ์ฒซ ๊ฒฝํ์ด๋ฉฐ ์ ๋ฐ์ ์ผ๋ก ๋งค์ฐ ๊ธ์ ์ ์
๋๋ค. ์ด ํ๋ก์ ํธ๋ ์ฃผ์ DBMS ์ง์, ๊ตฌ์ฑ ์ฉ์ด์ฑ, ํด๋ฌ์คํฐ๋ง ์ง์ ๋ฐ ํ๋ฐํ ์ปค๋ฎค๋ํฐ๋ก ์ฑ๊ณตํ์ต๋๋ค. ์ค๋ฌด์ ๊ด์ฌ์ด ์๋ ๋ถ๋ค์ ๊ฐ์ด๋๋ฅผ ์ฝ์ด๋ณด์๊ธธ ๊ถํฉ๋๋ค.
Kafka Connect์ฉ JDBC ์ปค๋ฅํฐ์ ๋น๊ตํ ๋ Debezium์ ๊ฐ์ฅ ํฐ ์ฅ์ ์ DBMS ๋ก๊ทธ์์ ๋ณ๊ฒฝ ์ฌํญ์ ์ฝ์ด ์ต์ํ์ ๋๊ธฐ ์๊ฐ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์์ ํ ์ ์๋ค๋ ๊ฒ์ ๋๋ค. Kafka Connect์ JDBC ์ปค๋ฅํฐ๋ ๊ณ ์ ๋ ๊ฐ๊ฒฉ์ผ๋ก ๋ชจ๋ํฐ๋ง๋๋ ํ ์ด๋ธ์ ์ฟผ๋ฆฌํ๊ณ (๊ฐ์ ์ด์ ๋ก) ๋ฐ์ดํฐ๊ฐ ์ญ์ ๋ ๋ ๋ฉ์์ง๋ฅผ ์์ฑํ์ง ์์ต๋๋ค(์กด์ฌํ์ง ์๋ ๋ฐ์ดํฐ๋ฅผ ์ด๋ป๊ฒ ์ฟผ๋ฆฌํ ์ ์์ต๋๊น?).
์ ์ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ ค๋ฉด Debezium ์ธ์ ๋ค์ ์๋ฃจ์ ์ ์ฃผ์๋ฅผ ๊ธฐ์ธ์ผ ์ ์์ต๋๋ค.
-
JDBC ์ปค๋ฅํฐ Kafka ์ฐ๊ฒฐ; - MySQL ์ ์ฉ์ ์ฌ๋ฌ ์๋ฃจ์ :
-
์ค๋ผํด ๊ณจ๋ ๊ฒ์ดํธ , ๊ทธ๋ฌ๋ ์ด๊ฒ์ ์์ ํ ๋ค๋ฅธ "์ฒด์ค ๋ฒ์ฃผ"์ ๋๋ค.
PS
๋ธ๋ก๊ทธ์์๋ ์ฝ์ด๋ณด์ธ์.
- ยซ
Kubernetes์์ Kafka ํด๋ฌ์คํฐ์ ์ ํฉํ ํฌ๊ธฐ ๊ฒฐ์ "; - ยซ
SRE ์ผ์์ํ์ ์ค์ฉ์ ์ธ ์ด์ผ๊ธฐ. 2 ๋ถ "; - ยซ
Kubernetes์ฉ PostgreSQL ๋ฌธ์ ๋ํ ๊ฐ๋ตํ ๊ฐ์, ์ฐ๋ฆฌ์ ์ ํ ๋ฐ ๊ฒฝํ ".
์ถ์ฒ : habr.com