Apache Kafka үчүн Debezium - CDC менен тааныштыруу

Apache Kafka үчүн Debezium - CDC менен тааныштыруу

Жумушумда мен жаңы техникалык чечимдерди/программалык продуктыларды көп жолуктурам, алар жөнүндө маалымат орус тилдүү интернетте аз. Бул макаланын жардамы менен мен мындай боштукту Debezium аркылуу Кафка кластерине эки популярдуу DBMSден (PostgreSQL жана MongoDB) жөнөтүүнү конфигурациялоо керек болгондо, акыркы практикамдын мисалы менен толтурууга аракет кылам. Аткарылган иштин натыйжасында пайда болгон бул кароо макала башкаларга пайдалуу болот деп ишенем.

Дебезиум жана CDC деген эмне?

Debezium — CDC программалык камсыздоо категориясынын өкүлү (Маалыматты өзгөртүүнү тартуу), же тагыраак айтканда, бул Apache Kafka Connect алкагына шайкеш келген ар кандай DBMS үчүн туташтыргычтардын жыйындысы.

бул Ачык булак долбоору, Apache License v2.0 астында лицензияланган жана Red Hat тарабынан каржыланган. Иштеп чыгуу 2016-жылдан бери жүрүп жатат жана азыркы учурда ал төмөнкү DBMS үчүн расмий колдоо көрсөтөт: MySQL, PostgreSQL, MongoDB, SQL Server. Кассандра жана Oracle үчүн туташтыргычтар да бар, бирок учурда алар "эрте жеткиликтүү" статусунда жана жаңы чыгарылыштар артка шайкештикке кепилдик бербейт.

Эгерде биз CDCди салттуу ыкма менен салыштырсак (тиркеме МББдан маалыматтарды түздөн-түз окуганда), анын негизги артыкчылыктары аз күтүү, жогорку ишенимдүүлүк жана жеткиликтүүлүк менен катар деңгээлинде маалыматтарды өзгөртүү агымын ишке ашырууну камтыйт. Акыркы эки пунктка Кафка кластерин CDC окуялары үчүн репозиторий катары колдонуу аркылуу жетишилет.

Дагы бир артыкчылыгы - бул окуяларды сактоо үчүн бир моделдин колдонулушу, ошондуктан акыркы тиркеме ар кандай DBMSлерди иштетүүнүн нюанстары жөнүндө тынчсыздануунун кереги жок.

Акыр-аягы, билдирүү брокерин колдонуу маалыматтардагы өзгөрүүлөрдү көзөмөлдөгөн тиркемелерге горизонталдуу масштабга мүмкүнчүлүк берет. Ошол эле учурда маалымат булагына тийгизген таасири минимумга түшүрүлөт, анткени маалыматтар түздөн-түз МББден эмес, Кафка кластеринен алынат.

Debezium архитектурасы жөнүндө

Debezium колдонуу бул жөнөкөй схемага келет:

DBMS (маалымат булагы катары) → Kafka Connect ичиндеги туташтыргыч → Apache Kafka → керектөөчү

Мисал катары бул жерде долбоордун веб-сайтынан диаграмма келтирилген:

Apache Kafka үчүн Debezium - CDC менен тааныштыруу

Бирок, мен бул схеманы жактырбайм, анткени раковина туташтыргычын гана колдонууга болот окшойт.

Чындыгында, абал башкача: Сиздин Дата көлүңүздү толтуруу (жогорку диаграммадагы акыркы шилтеме) Бул Debezium колдонуунун жалгыз жолу эмес. Apache Кафкага жөнөтүлгөн окуялар ар кандай кырдаалдарды чечүү үчүн колдонмолоруңуз тарабынан колдонулушу мүмкүн. Мисалы:

  • кэштен тиешеси жок маалыматтарды алып салуу;
  • билдирүүлөрдү жөнөтүү;
  • издөө индексинин жаңыртуулары;
  • кандайдыр бир аудит журналдары;
  • ...

Эгер сизде Java тиркемеси бар болсо жана Кафка кластерин колдонуунун кереги жок/мүмкүнчүлүгү жок болсо, бул аркылуу иштөө мүмкүнчүлүгү да бар. камтылган туташтыргыч. Айкын артыкчылыгы - бул кошумча инфраструктурага муктаждыкты жок кылат (конектор жана Кафка түрүндө). Бирок, бул чечим 1.1 версиясынан бери эскирген жана мындан ары колдонууга сунушталбайт (аны колдоо келечектеги чыгарылыштарда алынып салынышы мүмкүн).

Бул макалада иштеп чыгуучулар тарабынан сунуш кылынган архитектура талкууланат, ал каталарга чыдамдуулукту жана масштабдуулукту камсыз кылат.

Туташтыргычтын конфигурациясы

Эң маанилүү маанидеги - маалыматтардагы өзгөрүүлөргө байкоо жүргүзүү үчүн бизге төмөнкүлөр керек:

  1. маалымат булагы, 5.7 версиясынан баштап MySQL болушу мүмкүн, PostgreSQL 9.6+, MongoDB 3.2+ (толук тизмеси);
  2. Apache Kafka кластери;
  3. Kafka Connect инстанциясы (1.x, 2.x версиялары);
  4. конфигурацияланган Debezium туташтыргычы.

Алгачкы эки пункт боюнча иштөө, б.а. DBMS жана Apache Kafka орнотуу процесси макаланын алкагына кирбейт. Бирок, бардыгын кумдук кутуга жайгаштырууну каалагандар үчүн, мисалдар менен расмий репозиторийде даяр docker-compose.yaml.

Акыркы эки пунктка кененирээк токтолобуз.

0. Kafka Connect

Бул жерде жана андан ары макалада конфигурациянын бардык мисалдары Debezium иштеп чыгуучулары тараткан Docker сүрөтүнүн контекстинде талкууланат. Ал бардык керектүү плагин файлдарын (туташтыргычтарды) камтыйт жана чөйрө өзгөрмөлөрүнүн жардамы менен Kafka Connect конфигурациясын камсыз кылат.

Эгер сиз Confluentтен Kafka Connect колдонууну кааласаңыз, анда көрсөтүлгөн каталогго керектүү туташтыргычтардын плагиндерин өз алдынча кошушуңуз керек болот. plugin.path же чөйрө өзгөрмө аркылуу орнотулат CLASSPATH. Kafka Connect жумушчусу жана туташтыргычы үчүн орнотуулар жумушчу ишке киргизүү буйругуна аргумент катары берилген конфигурация файлдары аркылуу аныкталат. Көбүрөөк маалымат алуу үчүн, караңыз документтер.

Туташтыргыч версиясында Debeizum орнотуу процесси эки этапта ишке ашырылат. Келгиле, алардын ар бирин карап көрөлү:

1. Kafka Connect алкагын орнотуу

Apache Kafka кластерине берилиштерди өткөрүү үчүн, Kafka Connect алкагында белгилүү параметрлер орнотулган, мисалы:

  • кластерге туташуу үчүн параметрлер,
  • туташтыргычтын конфигурациясынын өзү түз сактала турган темалардын аталыштары,
  • туташтыргыч иштеп жаткан топтун аталышы (эгер бөлүштүрүлгөн режим колдонулса).

Долбоордун расмий Docker сүрөтү чөйрө өзгөрмөлөрүнүн жардамы менен конфигурацияны колдойт - бул биз колдоно турган нерсе. Ошентип, сүрөттү жүктөп:

docker pull debezium/connect

Туташтыргычты иштетүү үчүн зарыл болгон чөйрө өзгөрмөлөрүнүн минималдуу топтому төмөнкүдөй:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — кластердин мүчөлөрүнүн толук тизмесин алуу үчүн Кафка кластердик серверлеринин баштапкы тизмеси;
  • OFFSET_STORAGE_TOPIC=connector-offsets — туташтыргыч учурда жайгашкан позицияларды сактоо үчүн тема;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — туташтыргычтын статусун жана анын милдеттерин сактоо темасы;
  • CONFIG_STORAGE_TOPIC=connector-config — туташтыргычтын конфигурациясынын маалыматтарын жана анын милдеттерин сактоо темасы;
  • GROUP_ID=1 — бириктирүүчү тапшырма аткарыла турган жумушчулардын тобунун идентификатору; бөлүштүрүлгөн колдонууда зарыл (таратылган) режим.

Бул өзгөрмөлөр менен контейнерди ишке киргизебиз:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro жөнүндө эскертүү

Демейки боюнча, Debezium маалыматтарды JSON форматында жазат, бул кумдуктар жана аз көлөмдөгү маалыматтар үчүн алгылыктуу, бирок өтө жүктөлгөн маалымат базаларында көйгөй болуп калышы мүмкүн. JSON конвертерине альтернатива катары билдирүүлөрдү сериялаштыруу болуп саналат Avro Apache Kafkaдагы киргизүү/чыгаруу подсистемасына жүктөөнү азайтуучу бинардык форматка.

Avro колдонуу үчүн өзүнчө жайгаштыруу керек схема-реестр (диаграммаларды сактоо үчүн). Конвертер үчүн өзгөрмөлөр төмөнкүдөй болот:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avroну колдонуу жана анын реестрин орнотуу боюнча чоо-жайы бул макаланын алкагына кирбейт - мындан ары да түшүнүктүү болуу үчүн биз JSON колдонобуз.

2. Туташтыргычтын өзүн конфигурациялоо

Эми сиз түз туташтыргычтын конфигурациясына өтө аласыз, ал булактан маалыматтарды окуйт.

Келгиле, эки DBMS үчүн туташтыргычтардын мисалын карап көрөлү: PostgreSQL жана MongoDB, аларда менде тажрыйба бар жана айырмачылыктар бар (кичине болсо да, бирок кээ бир учурларда олуттуу!).

Конфигурация JSON нотациясында сүрөттөлөт жана POST сурамынын жардамы менен Kafka Connect'ке жүктөлөт.

2.1. PostgreSQL

PostgreSQL үчүн туташтыргыч конфигурациясынын мисалы:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Бул орнотуудан кийин туташтыргычтын иштөө принциби абдан жөнөкөй:

  • Биринчи жолу ишке киргизилгенде, ал конфигурацияда көрсөтүлгөн маалымат базасына туташып, режимде иштей баштайт баштапкы сүрөт, шарттуу колдонуу менен алынган маалыматтардын баштапкы топтомун Кафкага жөнөтүү SELECT * FROM table_name.
  • Инициализация аяктагандан кийин туташтыргыч PostgreSQL WAL файлдарынан өзгөртүүлөрдү окуу үчүн режимге өтөт.

Колдонулган опциялар жөнүндө:

  • name — төмөндө сүрөттөлгөн конфигурация колдонулган туташтыргычтын аталышы; келечекте, бул аталыш Kafka Connect REST API аркылуу туташтыргыч менен иштөө үчүн колдонулат (б.а. статусту көрүү/кайра баштоо/конфигурацияны жаңыртуу);
  • connector.class — конфигурацияланган туташтыргыч тарабынан колдонула турган DBMS туташтыргыч классы;
  • plugin.name — WAL файлдарынан маалыматтарды логикалык декоддоо үчүн плагиндин аталышы. Тандоо үчүн жеткиликтүү wal2json, decoderbuffs и pgoutput. Биринчи эки DBMS тиешелүү кеңейтүүлөрдү орнотууну талап кылат, жана pgoutput PostgreSQL 10 жана андан жогорку версиясы үчүн кошумча манипуляцияларды талап кылбайт;
  • database.* — маалымат базасына кошулуу варианттары, кайда database.server.name — Кафка кластеринде теманын аталышын түзүү үчүн колдонулган PostgreSQL инстанциясынын аталышы;
  • table.include.list — биз өзгөрүүлөргө көз салгыбыз келген таблицалардын тизмеси; форматта көрсөтүлгөн schema.table_name; менен бирге колдонууга болбойт table.exclude.list;
  • heartbeat.interval.ms — интервал (миллисекунд менен), анын жардамы менен конектор жүрөктүн согушу жөнүндө билдирүүлөрдү атайын темага жөнөтөт;
  • heartbeat.action.query — ар бир жүрөктүн кагышын билдирүүнү жөнөтүүдө аткарыла турган суроо-талап (опция 1.1 версиясында пайда болгон);
  • slot.name — туташтыргыч тарабынан колдонула турган репликация слотунун аталышы;
  • publication.name - Аты чыгаруу туташтыргыч колдонгон PostgreSQLде. Эгерде ал жок болсо, Debezium аны түзүүгө аракет кылат. Эгер туташуу жасалган колдонуучу бул аракетке жетиштүү укуктарга ээ болбосо, туташтыргыч ката менен ишин токтотот;
  • transforms максаттуу теманын атын кантип өзгөртүүнү так аныктайт:
    • transforms.AddPrefix.type туруктуу сөз айкаштарын колдоно турганыбызды көрсөтөт;
    • transforms.AddPrefix.regex — тыа хаһаайыстыбатын атын тематын атын сиргэ үөрэтии;
    • transforms.AddPrefix.replacement - түздөн-түз биз эмнени кайра аныктап жатабыз.

Жүрөктүн согушу жана өзгөрүү жөнүндө көбүрөөк

Демейки боюнча, туташтыргыч Кафкага ар бир жасалган транзакция үчүн маалыматтарды жөнөтөт жана анын LSN (Log ырааттуулугу) кызмат темасында жазылат. offset. Ал эми туташтыргыч бүт маалымат базасын эмес, анын таблицаларынын бир бөлүгүн гана окуй тургандай конфигурацияланса эмне болот (маалымат жаңыртуулары көп кездешпейт)?

  • Туташтыргыч WAL файлдарын окуйт жана ал көзөмөлдөгөн таблицаларга эч кандай транзакция милдеттенмелерин аныктабайт.
  • Ошондуктан, ал темадагы же репликация слотундагы учурдагы абалын жаңыртпайт.
  • Бул, өз кезегинде, WAL файлдары дискте сакталып калышына жана дисктеги орундун түгөнүп калышына алып келет.

Бул жерде варианттар жардамга келет. heartbeat.interval.ms и heartbeat.action.query. Бул опцияларды жупта колдонуу жүрөктүн согушу кабары жөнөтүлгөн сайын өзүнчө таблицадагы маалыматтарды өзгөртүү өтүнүчүн аткарууга мүмкүндүк берет. Ошентип, учурда туташтыргыч жайгашкан LSN (репликация слотунда) дайыма жаңыланып турат. Бул DBMSге кереги жок WAL файлдарын алып салууга мүмкүндүк берет. Опциялар кантип иштеши жөнүндө көбүрөөк биле аласыз документтер.

Жакшыраак көңүл бурууга татыктуу дагы бир вариант transforms. Бул ыңгайлуулук жана сулуулук жөнүндө көбүрөөк болсо да...

Демейки боюнча, Debezium төмөнкү ат коюу саясатын колдонуу менен темаларды түзөт: serverName.schemaName.tableName. Бул дайыма эле ыңгайлуу боло бербейт. Параметрлер transforms Кадимки сөз айкаштарын белгилүү бир аталыштагы темага багыттоо керек болгон таблицалардын, окуялардын тизмесин аныктоо үчүн колдоно аласыз.

Биздин конфигурацияда рахмат transforms төмөнкүдөй болот: мониторингге алынган маалымат базасынан бардык CDC окуялар аты менен темага өтөт data.cdc.dbname. Болбосо (бул орнотууларсыз), Debezium демейки боюнча ар бир таблица үчүн теманы түзөт: pg-dev.public.<table_name>.

Туташтыргычтын чектөөлөрү

PostgreSQL үчүн туташтыргычтын конфигурациясынын сүрөттөлүшүн бүтүрүү үчүн, анын иштөөсүнүн төмөнкү өзгөчөлүктөрү/чектөөлөрү жөнүндө сөз кылуу зарыл:

  1. PostgreSQL үчүн туташтыргычтын функционалдуулугу логикалык декоддоо концепциясына таянат. Ошондуктан ал маалымат базасынын түзүмүн өзгөртүү боюнча суроо-талаптарга көз салбайт (DDL) - ылайык, бул маалыматтар темаларда болбойт.
  2. Репликация уячалары колдонулгандыктан, туташтыргычты туташтыруу мүмкүн гана алдыңкы DBMS инстанциясына.
  3. Эгерде туташтыргыч базага кошулган колдонуучу окуу үчүн гана укуктарга ээ болсо, анда биринчи ишке киргизүүгө чейин кол менен репликация слотун түзүп, маалымат базасына жарыялоо керек болот.

Конфигурацияны колдонуу

Ошентип, конфигурациябызды туташтыргычка жүктөйбүз:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Жүктөө ийгиликтүү болгонун жана туташтыргыч иштей баштаганын текшеребиз:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Улуу: ал орнотулду жана барууга даяр. Эми керектөөчү болуп көрүнөлү жана Кафкага кошулалы, андан кийин биз таблицадагы жазууну кошуп, өзгөртөбүз:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Биздин темада ал төмөнкүдөй көрсөтүлөт:

Биздин өзгөртүүлөр менен абдан узун JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Эки учурда тең жазуулар өзгөртүлгөн жазуунун ачкычынан (PK) жана өзгөртүүлөрдүн түпкү маңызынан турат: мурда эмне болгон жана кийин эмне болуп калган.

  • учурда INSERT: мурунку маани (before) барабар null, жана кийин - киргизилген сап.
  • учурда UPDATE:-жылы payload.before саптын мурунку абалы көрсөтүлөт жана ичинде payload.after — езгеруулердун мацызы менен жаны.

2.2 MongoDB

Бул туташтыргыч негизги DBMS түйүнүнүн оплогунан маалыматты окуп, стандарттуу MongoDB репликация механизмин колдонот.

PgSQL үчүн мурунтан эле сүрөттөлгөн туташтыргычка окшоп, бул жерде да биринчи баштоодо, баштапкы маалымат сүрөтү алынат, андан кийин туташтыргыч oplog окуу режимине өтөт.

Конфигурациянын мисалы:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Көрүнүп тургандай, бул жерде мурунку мисалга салыштырмалуу жаңы варианттар жок, бирок маалымат базасына туташуу үчүн жооптуу варианттардын жана алардын префикстеринин саны гана кыскарган.

орнотуулары transforms бул жолу алар төмөнкүлөрдү аткарышат: алар схемадан максаттуу теманын атын өзгөртүшөт <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

катага сабырдуулук

Биздин убакта каталарга чыдамдуулук жана жогорку жеткиликтүүлүк маселеси болуп көрбөгөндөй курч турат - айрыкча, биз маалыматтар жана транзакциялар жөнүндө сөз болгондо жана маалыматтардын өзгөрүшүнө көз салуу бул маселеде четте калбайт. Келгиле, принципиалдуу түрдө эмне туура эмес болушу мүмкүн экенин жана ар бир учурда Дебезиум менен эмне болорун карап көрөлү.

Баш тартуунун үч варианты бар:

  1. Kafka Connect катасы. Эгер Connect бөлүштүрүлгөн режимде иштөөгө конфигурацияланса, бул бир эле group.idди орнотуу үчүн бир нече жумушчуну талап кылат. Андан кийин, алардын бири иштебей калса, туташтыргыч башка жумушчуда кайра иштетилет жана Кафкадагы темадагы акыркы берилген позициядан окууну улантат.
  2. Кафка кластери менен байланышты жоготуу. Туташтыргыч жөн гана Кафкага жөнөтө албаган позицияда окууну токтотот жана аракет ийгиликтүү болмоюнча аны кайра жөнөтүүгө аракет кылат.
  3. Маалымат булагы жеткиликсиз. Туташтыргыч конфигурациялангандай булакка кайра туташууга аракет кылат. Демейки 16 аракет колдонуу болуп саналат экспоненциалдык артка чегинүү. 16-ийгиликсиз аракеттен кийин тапшырма катары белгиленет ишке ашкан жок жана сиз аны Kafka Connect REST интерфейси аркылуу кол менен өчүрүшүңүз керек болот.
    • учурда PostgreSQL маалыматтар жоголбойт, анткени Репликация уячаларын колдонуу туташтыргыч окубаган WAL файлдарын жок кылуудан сактайт. Бул учурда, монетанын терс жагы да бар: туташтыргыч менен DBMS ортосундагы тармак байланышы узак убакытка үзгүлтүккө учураса, диск мейкиндиги түгөнүп калышы мүмкүн жана бул иштебей калышына алып келиши мүмкүн. бүт DBMS.
    • учурда MySQL binlog файлдары туташуу калыбына келтирилгенге чейин DBMS өзү тарабынан айланта алат. Бул туташтыргычтын иштебей калган абалга келишине алып келет жана нормалдуу иштөөнү калыбына келтирүү үчүн, binlogs окууну улантуу үчүн баштапкы снапшот режиминде өчүрүп күйгүзүшүңүз керек болот.
    • боюнча MongoDB. Документте мындай деп айтылат: лог/oplog файлдары жок кылынган жана туташтыргыч токтоп калган позициясынан окууну уланта албаган учурда туташтыргычтын жүрүм-туруму бардык МББдер үчүн бирдей. Бул туташтыргыч мамлекетке кетет дегенди билдирет ишке ашкан жок жана режимде кайра иштетүүнү талап кылат баштапкы сүрөт.

      Бирок, өзгөчөлүктөр бар. Эгерде туташтыргыч узак убакыт бою ажыратылса (же MongoDB инстанциясына жете албаса) жана бул убакыттын ичинде оплог айлануу аркылуу өтүп кетсе, анда туташуу калыбына келтирилгенде, туташтыргыч тынч түрдө биринчи жеткиликтүү абалдан маалыматтарды окууну улантат, ошондуктан Кафкадагы кээ бир маалыматтар жок урат.

жыйынтыктоо

Debezium менин CDC системалары менен болгон биринчи тажрыйбам жана жалпысынан абдан позитивдүү. Долбоор негизги DBMS үчүн колдоосу, конфигурациянын жөнөкөйлүгү, кластердик колдоо жана активдүү коомчулук менен жеңишке жетти. Практикага кызыккандар үчүн колдонмолорду окууну сунуштайм Kafka Connect и Debezium.

Kafka Connect үчүн JDBC туташтыргычына салыштырмалуу, Debezium'дун негизги артыкчылыгы - бул өзгөртүүлөр DBMS журналдарынан окулат, бул маалыматтарды минималдуу кечигүү менен кабыл алууга мүмкүндүк берет. JDBC туташтыргычы (Kafka Connect'тен) көзөмөлдөнгөн таблицаны белгиленген аралыкта сурайт жана (ошол эле себептен улам) маалыматтар жок кылынганда билдирүүлөрдү жаратпайт (жок берилиштерди кантип сураса болот?).

Окшош көйгөйлөрдү чечүү үчүн, сиз төмөнкү чечимдерге көңүл бурсаңыз болот (Debeziumдан тышкары):

PS

Биздин блогдон дагы окуңуз:

Source: www.habr.com

Комментарий кошуу