
Жумушумда мен жаңы техникалык чечимдерди/программалык продуктыларды көп жолуктурам, алар жөнүндө маалымат орус тилдүү интернетте аз. Бул макаланын жардамы менен мен мындай боштукту Debezium аркылуу Кафка кластерине эки популярдуу DBMSден (PostgreSQL жана MongoDB) жөнөтүүнү конфигурациялоо керек болгондо, акыркы практикамдын мисалы менен толтурууга аракет кылам. Аткарылган иштин натыйжасында пайда болгон бул кароо макала башкаларга пайдалуу болот деп ишенем.
Дебезиум жана CDC деген эмне?
— CDC программалык камсыздоо категориясынын өкүлү (), же тагыраак айтканда, бул Apache Kafka Connect алкагына шайкеш келген ар кандай DBMS үчүн туташтыргычтардын жыйындысы.
бул Apache License v2.0 астында лицензияланган жана Red Hat тарабынан каржыланган. Иштеп чыгуу 2016-жылдан бери жүрүп жатат жана азыркы учурда ал төмөнкү DBMS үчүн расмий колдоо көрсөтөт: MySQL, PostgreSQL, MongoDB, SQL Server. Кассандра жана Oracle үчүн туташтыргычтар да бар, бирок учурда алар "эрте жеткиликтүү" статусунда жана жаңы чыгарылыштар артка шайкештикке кепилдик бербейт.
Эгерде биз CDCди салттуу ыкма менен салыштырсак (тиркеме МББдан маалыматтарды түздөн-түз окуганда), анын негизги артыкчылыктары аз күтүү, жогорку ишенимдүүлүк жана жеткиликтүүлүк менен катар деңгээлинде маалыматтарды өзгөртүү агымын ишке ашырууну камтыйт. Акыркы эки пунктка Кафка кластерин CDC окуялары үчүн репозиторий катары колдонуу аркылуу жетишилет.
Дагы бир артыкчылыгы - бул окуяларды сактоо үчүн бир моделдин колдонулушу, ошондуктан акыркы тиркеме ар кандай DBMSлерди иштетүүнүн нюанстары жөнүндө тынчсыздануунун кереги жок.
Акыр-аягы, билдирүү брокерин колдонуу маалыматтардагы өзгөрүүлөрдү көзөмөлдөгөн тиркемелерге горизонталдуу масштабга мүмкүнчүлүк берет. Ошол эле учурда маалымат булагына тийгизген таасири минимумга түшүрүлөт, анткени маалыматтар түздөн-түз МББден эмес, Кафка кластеринен алынат.
Debezium архитектурасы жөнүндө
Debezium колдонуу бул жөнөкөй схемага келет:
DBMS (маалымат булагы катары) → Kafka Connect ичиндеги туташтыргыч → Apache Kafka → керектөөчү
Мисал катары бул жерде долбоордун веб-сайтынан диаграмма келтирилген:

Бирок, мен бул схеманы жактырбайм, анткени раковина туташтыргычын гана колдонууга болот окшойт.
Чындыгында, абал башкача: Сиздин Дата көлүңүздү толтуруу (жогорку диаграммадагы акыркы шилтеме) Бул Debezium колдонуунун жалгыз жолу эмес. Apache Кафкага жөнөтүлгөн окуялар ар кандай кырдаалдарды чечүү үчүн колдонмолоруңуз тарабынан колдонулушу мүмкүн. Мисалы:
- кэштен тиешеси жок маалыматтарды алып салуу;
- билдирүүлөрдү жөнөтүү;
- издөө индексинин жаңыртуулары;
- кандайдыр бир аудит журналдары;
- ...
Эгер сизде Java тиркемеси бар болсо жана Кафка кластерин колдонуунун кереги жок/мүмкүнчүлүгү жок болсо, бул аркылуу иштөө мүмкүнчүлүгү да бар. . Айкын артыкчылыгы - бул кошумча инфраструктурага муктаждыкты жок кылат (конектор жана Кафка түрүндө). Бирок, бул чечим 1.1 версиясынан бери эскирген жана мындан ары колдонууга сунушталбайт (аны колдоо келечектеги чыгарылыштарда алынып салынышы мүмкүн).
Бул макалада иштеп чыгуучулар тарабынан сунуш кылынган архитектура талкууланат, ал каталарга чыдамдуулукту жана масштабдуулукту камсыз кылат.
Туташтыргычтын конфигурациясы
Эң маанилүү маанидеги - маалыматтардагы өзгөрүүлөргө байкоо жүргүзүү үчүн бизге төмөнкүлөр керек:
- маалымат булагы, 5.7 версиясынан баштап MySQL болушу мүмкүн, PostgreSQL 9.6+, MongoDB 3.2+ ();
- Apache Kafka кластери;
- Kafka Connect инстанциясы (1.x, 2.x версиялары);
- конфигурацияланган Debezium туташтыргычы.
Алгачкы эки пункт боюнча иштөө, б.а. DBMS жана Apache Kafka орнотуу процесси макаланын алкагына кирбейт. Бирок, бардыгын кумдук кутуга жайгаштырууну каалагандар үчүн, мисалдар менен расмий репозиторийде даяр .
Акыркы эки пунктка кененирээк токтолобуз.
0. Kafka Connect
Бул жерде жана андан ары макалада конфигурациянын бардык мисалдары Debezium иштеп чыгуучулары тараткан Docker сүрөтүнүн контекстинде талкууланат. Ал бардык керектүү плагин файлдарын (туташтыргычтарды) камтыйт жана чөйрө өзгөрмөлөрүнүн жардамы менен Kafka Connect конфигурациясын камсыз кылат.
Эгер сиз Confluentтен Kafka Connect колдонууну кааласаңыз, анда көрсөтүлгөн каталогго керектүү туташтыргычтардын плагиндерин өз алдынча кошушуңуз керек болот. plugin.path же чөйрө өзгөрмө аркылуу орнотулат CLASSPATH. Kafka Connect жумушчусу жана туташтыргычы үчүн орнотуулар жумушчу ишке киргизүү буйругуна аргумент катары берилген конфигурация файлдары аркылуу аныкталат. Көбүрөөк маалымат алуу үчүн, караңыз .
Туташтыргыч версиясында Debeizum орнотуу процесси эки этапта ишке ашырылат. Келгиле, алардын ар бирин карап көрөлү:
1. Kafka Connect алкагын орнотуу
Apache Kafka кластерине берилиштерди өткөрүү үчүн, Kafka Connect алкагында белгилүү параметрлер орнотулган, мисалы:
- кластерге туташуу үчүн параметрлер,
- туташтыргычтын конфигурациясынын өзү түз сактала турган темалардын аталыштары,
- туташтыргыч иштеп жаткан топтун аталышы (эгер бөлүштүрүлгөн режим колдонулса).
Долбоордун расмий Docker сүрөтү чөйрө өзгөрмөлөрүнүн жардамы менен конфигурацияны колдойт - бул биз колдоно турган нерсе. Ошентип, сүрөттү жүктөп:
docker pull debezium/connectТуташтыргычты иштетүү үчүн зарыл болгон чөйрө өзгөрмөлөрүнүн минималдуу топтому төмөнкүдөй:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092— кластердин мүчөлөрүнүн толук тизмесин алуу үчүн Кафка кластердик серверлеринин баштапкы тизмеси; -
OFFSET_STORAGE_TOPIC=connector-offsets— туташтыргыч учурда жайгашкан позицияларды сактоо үчүн тема; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status— туташтыргычтын статусун жана анын милдеттерин сактоо темасы; -
CONFIG_STORAGE_TOPIC=connector-config— туташтыргычтын конфигурациясынын маалыматтарын жана анын милдеттерин сактоо темасы; -
GROUP_ID=1— бириктирүүчү тапшырма аткарыла турган жумушчулардын тобунун идентификатору; бөлүштүрүлгөн колдонууда зарыл (таратылган) режим.
Бул өзгөрмөлөр менен контейнерди ишке киргизебиз:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2Avro жөнүндө эскертүү
Демейки боюнча, Debezium маалыматтарды JSON форматында жазат, бул кумдуктар жана аз көлөмдөгү маалыматтар үчүн алгылыктуу, бирок өтө жүктөлгөн маалымат базаларында көйгөй болуп калышы мүмкүн. JSON конвертерине альтернатива катары билдирүүлөрдү сериялаштыруу болуп саналат Apache Kafkaдагы киргизүү/чыгаруу подсистемасына жүктөөнү азайтуучу бинардык форматка.
Avro колдонуу үчүн өзүнчө жайгаштыруу керек (диаграммаларды сактоо үчүн). Конвертер үчүн өзгөрмөлөр төмөнкүдөй болот:
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverterAvroну колдонуу жана анын реестрин орнотуу боюнча чоо-жайы бул макаланын алкагына кирбейт - мындан ары да түшүнүктүү болуу үчүн биз JSON колдонобуз.
2. Туташтыргычтын өзүн конфигурациялоо
Эми сиз түз туташтыргычтын конфигурациясына өтө аласыз, ал булактан маалыматтарды окуйт.
Келгиле, эки DBMS үчүн туташтыргычтардын мисалын карап көрөлү: PostgreSQL жана MongoDB, аларда менде тажрыйба бар жана айырмачылыктар бар (кичине болсо да, бирок кээ бир учурларда олуттуу!).
Конфигурация JSON нотациясында сүрөттөлөт жана POST сурамынын жардамы менен Kafka Connect'ке жүктөлөт.
2.1. PostgreSQL
PostgreSQL үчүн туташтыргыч конфигурациясынын мисалы:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}Бул орнотуудан кийин туташтыргычтын иштөө принциби абдан жөнөкөй:
- Биринчи жолу ишке киргизилгенде, ал конфигурацияда көрсөтүлгөн маалымат базасына туташып, режимде иштей баштайт баштапкы сүрөт, шарттуу колдонуу менен алынган маалыматтардын баштапкы топтомун Кафкага жөнөтүү
SELECT * FROM table_name. - Инициализация аяктагандан кийин туташтыргыч PostgreSQL WAL файлдарынан өзгөртүүлөрдү окуу үчүн режимге өтөт.
Колдонулган опциялар жөнүндө:
-
name— төмөндө сүрөттөлгөн конфигурация колдонулган туташтыргычтын аталышы; келечекте, бул аталыш Kafka Connect REST API аркылуу туташтыргыч менен иштөө үчүн колдонулат (б.а. статусту көрүү/кайра баштоо/конфигурацияны жаңыртуу); -
connector.class— конфигурацияланган туташтыргыч тарабынан колдонула турган DBMS туташтыргыч классы; -
plugin.name— WAL файлдарынан маалыматтарды логикалык декоддоо үчүн плагиндин аталышы. Тандоо үчүн жеткиликтүүwal2json,decoderbuffsиpgoutput. Биринчи эки DBMS тиешелүү кеңейтүүлөрдү орнотууну талап кылат, жанаpgoutputPostgreSQL 10 жана андан жогорку версиясы үчүн кошумча манипуляцияларды талап кылбайт; -
database.*— маалымат базасына кошулуу варианттары, кайдаdatabase.server.name— Кафка кластеринде теманын аталышын түзүү үчүн колдонулган PostgreSQL инстанциясынын аталышы; -
table.include.list— биз өзгөрүүлөргө көз салгыбыз келген таблицалардын тизмеси; форматта көрсөтүлгөнschema.table_name; менен бирге колдонууга болбойтtable.exclude.list; -
heartbeat.interval.ms— интервал (миллисекунд менен), анын жардамы менен конектор жүрөктүн согушу жөнүндө билдирүүлөрдү атайын темага жөнөтөт; -
heartbeat.action.query— ар бир жүрөктүн кагышын билдирүүнү жөнөтүүдө аткарыла турган суроо-талап (опция 1.1 версиясында пайда болгон); -
slot.name— туташтыргыч тарабынан колдонула турган репликация слотунун аталышы; publication.name- Аты туташтыргыч колдонгон PostgreSQLде. Эгерде ал жок болсо, Debezium аны түзүүгө аракет кылат. Эгер туташуу жасалган колдонуучу бул аракетке жетиштүү укуктарга ээ болбосо, туташтыргыч ката менен ишин токтотот;-
transformsмаксаттуу теманын атын кантип өзгөртүүнү так аныктайт:-
transforms.AddPrefix.typeтуруктуу сөз айкаштарын колдоно турганыбызды көрсөтөт; -
transforms.AddPrefix.regex— тыа хаһаайыстыбатын атын тематын атын сиргэ үөрэтии; -
transforms.AddPrefix.replacement- түздөн-түз биз эмнени кайра аныктап жатабыз.
-
Жүрөктүн согушу жана өзгөрүү жөнүндө көбүрөөк
Демейки боюнча, туташтыргыч Кафкага ар бир жасалган транзакция үчүн маалыматтарды жөнөтөт жана анын LSN (Log ырааттуулугу) кызмат темасында жазылат. offset. Ал эми туташтыргыч бүт маалымат базасын эмес, анын таблицаларынын бир бөлүгүн гана окуй тургандай конфигурацияланса эмне болот (маалымат жаңыртуулары көп кездешпейт)?
- Туташтыргыч WAL файлдарын окуйт жана ал көзөмөлдөгөн таблицаларга эч кандай транзакция милдеттенмелерин аныктабайт.
- Ошондуктан, ал темадагы же репликация слотундагы учурдагы абалын жаңыртпайт.
- Бул, өз кезегинде, WAL файлдары дискте сакталып калышына жана дисктеги орундун түгөнүп калышына алып келет.
Бул жерде варианттар жардамга келет. heartbeat.interval.ms и heartbeat.action.query. Бул опцияларды жупта колдонуу жүрөктүн согушу кабары жөнөтүлгөн сайын өзүнчө таблицадагы маалыматтарды өзгөртүү өтүнүчүн аткарууга мүмкүндүк берет. Ошентип, учурда туташтыргыч жайгашкан LSN (репликация слотунда) дайыма жаңыланып турат. Бул DBMSге кереги жок WAL файлдарын алып салууга мүмкүндүк берет. Опциялар кантип иштеши жөнүндө көбүрөөк биле аласыз .
Жакшыраак көңүл бурууга татыктуу дагы бир вариант transforms. Бул ыңгайлуулук жана сулуулук жөнүндө көбүрөөк болсо да...
Демейки боюнча, Debezium төмөнкү ат коюу саясатын колдонуу менен темаларды түзөт: serverName.schemaName.tableName. Бул дайыма эле ыңгайлуу боло бербейт. Параметрлер transforms Кадимки сөз айкаштарын белгилүү бир аталыштагы темага багыттоо керек болгон таблицалардын, окуялардын тизмесин аныктоо үчүн колдоно аласыз.
Биздин конфигурацияда рахмат transforms төмөнкүдөй болот: мониторингге алынган маалымат базасынан бардык CDC окуялар аты менен темага өтөт data.cdc.dbname. Болбосо (бул орнотууларсыз), Debezium демейки боюнча ар бир таблица үчүн теманы түзөт: pg-dev.public.<table_name>.
Туташтыргычтын чектөөлөрү
PostgreSQL үчүн туташтыргычтын конфигурациясынын сүрөттөлүшүн бүтүрүү үчүн, анын иштөөсүнүн төмөнкү өзгөчөлүктөрү/чектөөлөрү жөнүндө сөз кылуу зарыл:
- PostgreSQL үчүн туташтыргычтын функционалдуулугу логикалык декоддоо концепциясына таянат. Ошондуктан ал маалымат базасынын түзүмүн өзгөртүү боюнча суроо-талаптарга көз салбайт (DDL) - ылайык, бул маалыматтар темаларда болбойт.
- Репликация уячалары колдонулгандыктан, туташтыргычты туташтыруу мүмкүн гана алдыңкы DBMS инстанциясына.
- Эгерде туташтыргыч базага кошулган колдонуучу окуу үчүн гана укуктарга ээ болсо, анда биринчи ишке киргизүүгө чейин кол менен репликация слотун түзүп, маалымат базасына жарыялоо керек болот.
Конфигурацияны колдонуу
Ошентип, конфигурациябызды туташтыргычка жүктөйбүз:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.jsonЖүктөө ийгиликтүү болгонун жана туташтыргыч иштей баштаганын текшеребиз:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}Улуу: ал орнотулду жана барууга даяр. Эми керектөөчү болуп көрүнөлү жана Кафкага кошулалы, андан кийин биз таблицадагы жазууну кошуп, өзгөртөбүз:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1Биздин темада ал төмөнкүдөй көрсөтүлөт:
Биздин өзгөртүүлөр менен абдан узун JSON
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}Эки учурда тең жазуулар өзгөртүлгөн жазуунун ачкычынан (PK) жана өзгөртүүлөрдүн түпкү маңызынан турат: мурда эмне болгон жана кийин эмне болуп калган.
- учурда
INSERT: мурунку маани (before) барабарnull, жана кийин - киргизилген сап. - учурда
UPDATE:-жылыpayload.beforeсаптын мурунку абалы көрсөтүлөт жана ичиндеpayload.after— езгеруулердун мацызы менен жаны.
2.2 MongoDB
Бул туташтыргыч негизги DBMS түйүнүнүн оплогунан маалыматты окуп, стандарттуу MongoDB репликация механизмин колдонот.
PgSQL үчүн мурунтан эле сүрөттөлгөн туташтыргычка окшоп, бул жерде да биринчи баштоодо, баштапкы маалымат сүрөтү алынат, андан кийин туташтыргыч oplog окуу режимине өтөт.
Конфигурациянын мисалы:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}Көрүнүп тургандай, бул жерде мурунку мисалга салыштырмалуу жаңы варианттар жок, бирок маалымат базасына туташуу үчүн жооптуу варианттардын жана алардын префикстеринин саны гана кыскарган.
орнотуулары transforms бул жолу алар төмөнкүлөрдү аткарышат: алар схемадан максаттуу теманын атын өзгөртүшөт <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
катага сабырдуулук
Биздин убакта каталарга чыдамдуулук жана жогорку жеткиликтүүлүк маселеси болуп көрбөгөндөй курч турат - айрыкча, биз маалыматтар жана транзакциялар жөнүндө сөз болгондо жана маалыматтардын өзгөрүшүнө көз салуу бул маселеде четте калбайт. Келгиле, принципиалдуу түрдө эмне туура эмес болушу мүмкүн экенин жана ар бир учурда Дебезиум менен эмне болорун карап көрөлү.
Баш тартуунун үч варианты бар:
- Kafka Connect катасы. Эгер Connect бөлүштүрүлгөн режимде иштөөгө конфигурацияланса, бул бир эле group.idди орнотуу үчүн бир нече жумушчуну талап кылат. Андан кийин, алардын бири иштебей калса, туташтыргыч башка жумушчуда кайра иштетилет жана Кафкадагы темадагы акыркы берилген позициядан окууну улантат.
- Кафка кластери менен байланышты жоготуу. Туташтыргыч жөн гана Кафкага жөнөтө албаган позицияда окууну токтотот жана аракет ийгиликтүү болмоюнча аны кайра жөнөтүүгө аракет кылат.
- Маалымат булагы жеткиликсиз. Туташтыргыч конфигурациялангандай булакка кайра туташууга аракет кылат. Демейки 16 аракет колдонуу болуп саналат . 16-ийгиликсиз аракеттен кийин тапшырма катары белгиленет ишке ашкан жок жана сиз аны Kafka Connect REST интерфейси аркылуу кол менен өчүрүшүңүз керек болот.
- учурда PostgreSQL маалыматтар жоголбойт, анткени Репликация уячаларын колдонуу туташтыргыч окубаган WAL файлдарын жок кылуудан сактайт. Бул учурда, монетанын терс жагы да бар: туташтыргыч менен DBMS ортосундагы тармак байланышы узак убакытка үзгүлтүккө учураса, диск мейкиндиги түгөнүп калышы мүмкүн жана бул иштебей калышына алып келиши мүмкүн. бүт DBMS.
- учурда MySQL binlog файлдары туташуу калыбына келтирилгенге чейин DBMS өзү тарабынан айланта алат. Бул туташтыргычтын иштебей калган абалга келишине алып келет жана нормалдуу иштөөнү калыбына келтирүү үчүн, binlogs окууну улантуу үчүн баштапкы снапшот режиминде өчүрүп күйгүзүшүңүз керек болот.
- боюнча MongoDB. Документте мындай деп айтылат: лог/oplog файлдары жок кылынган жана туташтыргыч токтоп калган позициясынан окууну уланта албаган учурда туташтыргычтын жүрүм-туруму бардык МББдер үчүн бирдей. Бул туташтыргыч мамлекетке кетет дегенди билдирет ишке ашкан жок жана режимде кайра иштетүүнү талап кылат баштапкы сүрөт.
Бирок, өзгөчөлүктөр бар. Эгерде туташтыргыч узак убакыт бою ажыратылса (же MongoDB инстанциясына жете албаса) жана бул убакыттын ичинде оплог айлануу аркылуу өтүп кетсе, анда туташуу калыбына келтирилгенде, туташтыргыч тынч түрдө биринчи жеткиликтүү абалдан маалыматтарды окууну улантат, ошондуктан Кафкадагы кээ бир маалыматтар жок урат.
жыйынтыктоо
Debezium менин CDC системалары менен болгон биринчи тажрыйбам жана жалпысынан абдан позитивдүү. Долбоор негизги DBMS үчүн колдоосу, конфигурациянын жөнөкөйлүгү, кластердик колдоо жана активдүү коомчулук менен жеңишке жетти. Практикага кызыккандар үчүн колдонмолорду окууну сунуштайм и .
Kafka Connect үчүн JDBC туташтыргычына салыштырмалуу, Debezium'дун негизги артыкчылыгы - бул өзгөртүүлөр DBMS журналдарынан окулат, бул маалыматтарды минималдуу кечигүү менен кабыл алууга мүмкүндүк берет. JDBC туташтыргычы (Kafka Connect'тен) көзөмөлдөнгөн таблицаны белгиленген аралыкта сурайт жана (ошол эле себептен улам) маалыматтар жок кылынганда билдирүүлөрдү жаратпайт (жок берилиштерди кантип сураса болот?).
Окшош көйгөйлөрдү чечүү үчүн, сиз төмөнкү чечимдерге көңүл бурсаңыз болот (Debeziumдан тышкары):
- MySQL үчүн гана бир нече чечимдер:
- , бирок бул таптакыр башка "салмак категориясы".
PS
Биздин блогдон дагы окуңуз:
- «";
- «";
- ««.
Source: www.habr.com
