Apache Kafka үшін Debezium - CDC-пен таныстыру

Apache Kafka үшін Debezium - CDC-пен таныстыру

Менің жұмысымда мен жаңа техникалық шешімдерді/бағдарламалық өнімдерді жиі кездестіремін, олар туралы ақпарат орыс тілді Интернетте өте аз. Осы мақалада мен екі танымал ДҚБЖ (PostgreSQL және MongoDB) CDC оқиғаларын Debezium көмегімен Кафка кластеріне жіберуді конфигурациялау қажет болғанда, соңғы тәжірибемдегі мысалмен осындай бір олқылықты толтыруға тырысамын. Атқарылған жұмыстардың нәтижесінде пайда болған бұл шолу мақаласы басқаларға пайдалы болады деп сенемін.

Дебезиум және жалпы CDC дегеніміз не?

Дебезиум — CDC бағдарламалық жасақтама санатының өкілі (Деректерді өзгертуді түсіру) немесе дәлірек айтқанда, бұл Apache Kafka Connect құрылымымен үйлесімді әртүрлі ДҚБЖ үшін қосқыштар жиынтығы.

осы Ашық бастапқы жоба, Apache License v2.0 нұсқасы бойынша лицензияланған және Red Hat демеушісі. Әзірлеу 2016 жылдан бері жалғасуда және қазіргі уақытта ол келесі ДҚБЖ-ға ресми қолдау көрсетеді: MySQL, PostgreSQL, MongoDB, SQL Server. Сондай-ақ Cassandra және Oracle үшін қосқыштар бар, бірақ қазіргі уақытта олар «ерте қол жетімділік» күйінде және жаңа шығарылымдар кері үйлесімділікке кепілдік бермейді.

Егер CDC-ті дәстүрлі тәсілмен салыстыратын болсақ (қолданба ДҚБЖ-дан деректерді тікелей оқығанда), оның негізгі артықшылығы аз кідіріспен, жоғары сенімділікпен және қолжетімділікпен қатар деңгейінде деректерді өзгерту ағынын жүзеге асыруды қамтиды. Соңғы екі нүктеге CDC оқиғалары үшін репозиторий ретінде Кафка кластерін пайдалану арқылы қол жеткізіледі.

Тағы бір артықшылығы - оқиғаларды сақтау үшін бір үлгіні пайдалану фактісі, сондықтан соңғы қолданба әртүрлі ДҚБЖ жұмысының нюанстары туралы алаңдамайды.

Соңында, хабар брокерін пайдалану деректердегі өзгерістерді бақылайтын қолданбаларға көлденең масштабтауға мүмкіндік береді. Сонымен бірге деректер көзіне әсер ету барынша азайтылады, өйткені деректер ДҚБЖ-дан тікелей емес, Кафка кластерінен алынады.

Debezium архитектурасы туралы

Debezium пайдалану осы қарапайым схемаға түседі:

ДҚБЖ (деректер көзі ретінде) → Kafka Connect ішіндегі қосқыш → Apache Kafka → тұтынушы

Мысал ретінде жоба веб-сайтындағы диаграмма:

Apache Kafka үшін Debezium - CDC-пен таныстыру

Дегенмен, маған бұл схема ұнамайды, өйткені тек раковина қосқышын пайдалану мүмкін сияқты.

Іс жүзінде жағдай басқаша: деректер көлін толтыру (жоғарыдағы диаграммадағы соңғы сілтеме) Бұл Debezium пайдаланудың жалғыз жолы емес. Apache Кафкаға жіберілген оқиғаларды қолданбаларыңыз әртүрлі жағдайларды өңдеу үшін пайдалана алады. Мысалы:

  • кэштен маңызды емес деректерді жою;
  • хабарламаларды жіберу;
  • іздеу индексінің жаңартулары;
  • аудит журналдарының кейбір түрлері;
  • ...

Егер сізде Java қолданбасы болса және Кафка кластерін пайдаланудың қажеті/мүмкіндігі болмаса, сонымен бірге жұмыс істеу мүмкіндігі бар. кірістірілген қосқыш. Айқын артықшылығы - бұл қосымша инфрақұрылымның қажеттілігін жояды (коннектор және Кафка түрінде). Дегенмен, бұл шешім 1.1 нұсқасынан бері ескірген және енді пайдалануға ұсынылмайды (оны қолдау болашақ шығарылымдарда жойылуы мүмкін).

Бұл мақалада ақауларға төзімділік пен ауқымдылықты қамтамасыз ететін әзірлеушілер ұсынған архитектура талқыланады.

Қосқыш конфигурациясы

Ең маңызды мәндегі - деректердегі өзгерістерді бақылауды бастау үшін бізге қажет:

  1. деректер көзі, ол 5.7 нұсқасынан бастап MySQL, PostgreSQL 9.6+, MongoDB 3.2+ (толық тізім);
  2. Apache Кафка кластері;
  3. Kafka Connect данасы (1.x, 2.x нұсқалары);
  4. конфигурацияланған Debezium қосқышы.

Алғашқы екі нүкте бойынша жұмыс жасаңыз, яғни. ДҚБЖ және Apache Kafka орнату процесі мақаланың ауқымынан тыс. Дегенмен, барлығын құмсалғышта орналастырғысы келетіндер үшін мысалдары бар ресми репозиторийде дайын. docker-compose.yaml.

Соңғы екі тармаққа толығырақ тоқталамыз.

0. Kafka Connect

Мұнда және одан әрі мақалада барлық конфигурация мысалдары Debezium әзірлеушілері таратқан Docker кескінінің контекстінде талқыланады. Ол барлық қажетті плагин файлдарын (қосқыштарды) қамтиды және орта айнымалы мәндерін пайдаланып Kafka Connect конфигурациясын қамтамасыз етеді.

Егер сіз Confluent-тен Kafka Connect қолданбасын пайдаланғыңыз келсе, онда көрсетілген каталогқа қажетті қосқыштардың плагиндерін дербес қосуыңыз қажет. plugin.path немесе орта айнымалысы арқылы орнатылады CLASSPATH. Kafka Connect жұмысшысы мен қосқыштарына арналған параметрлер жұмысшыны іске қосу пәрменіне дәлел ретінде жіберілетін конфигурация файлдары арқылы анықталады. Қосымша мәліметтер алу үшін қараңыз құжаттама.

Debeizum коннектор нұсқасында орнатудың бүкіл процесі екі кезеңде жүзеге асырылады. Олардың әрқайсысын қарастырайық:

1. Kafka Connect құрылымын орнату

Деректерді Apache Kafka кластеріне ағынмен жіберу үшін Kafka Connect жүйесінде арнайы параметрлер орнатылады, мысалы:

  • кластерге қосылу параметрлері,
  • қосқыштың конфигурациясының өзі тікелей сақталатын тақырыптардың атаулары,
  • қосқыш жұмыс істейтін топтың атауы (үлестірілген режим пайдаланылса).

Жобаның ресми Docker кескіні қоршаған ортаның айнымалы мәндерін қолданатын конфигурацияны қолдайды - бұл біз қолданатын нәрсе. Сонымен, суретті жүктеп алыңыз:

docker pull debezium/connect

Қосқышты іске қосу үшін қажетті орта айнымалыларының ең аз жинағы келесідей:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — кластер мүшелерінің толық тізімін алу үшін Кафка кластері серверлерінің бастапқы тізімі;
  • OFFSET_STORAGE_TOPIC=connector-offsets — қосқыш қазіргі уақытта орналасқан позицияларды сақтауға арналған тақырып;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — қосқыштың күйін және оның тапсырмаларын сақтауға арналған тақырып;
  • CONFIG_STORAGE_TOPIC=connector-config — қосқыш конфигурациясының деректерін және оның тапсырмаларын сақтауға арналған тақырып;
  • GROUP_ID=1 — қосқыш тапсырмасын орындауға болатын жұмысшылар тобының идентификаторы; таратылған пайдалану кезінде қажет (таратылған) режим.

Біз контейнерді келесі айнымалылармен іске қосамыз:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Авро туралы ескерту

Әдепкі бойынша, Debezium деректерді JSON пішімінде жазады, бұл құм жәшіктер мен деректердің аз көлемі үшін қолайлы, бірақ жоғары жүктелген дерекқорларда мәселе болуы мүмкін. JSON түрлендіргішіне балама ретінде хабарларды сериялау пайдаланады Avro екілік пішімге ауыстырыңыз, бұл Apache Kafka жүйесіндегі енгізу/шығару ішкі жүйесіне жүктемені азайтады.

Avro пайдалану үшін сізге бөлек орналастыру қажет схема-тізілім (диаграммаларды сақтау үшін). Түрлендіргіштің айнымалылары келесідей болады:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro пайдалану және оның тізілімін орнату туралы мәліметтер осы мақаланың ауқымынан тыс - әрі қарай түсінікті болу үшін біз JSON қолданамыз.

2. Коннектордың өзін конфигурациялау

Енді сіз дереккөзден деректерді оқитын қосқыштың конфигурациясына тікелей өте аласыз.

Екі ДҚБЖ үшін қосқыштар мысалын қарастырайық: PostgreSQL және MongoDB, оларда менің тәжірибем бар және айырмашылықтары бар (аз болса да, бірақ кейбір жағдайларда маңызды!).

Конфигурация JSON белгілеуінде сипатталған және POST сұрауы арқылы Kafka Connect қызметіне жүктеп салынған.

2.1. PostgreSQL

PostgreSQL үшін қосқыш конфигурациясының мысалы:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Бұл орнатудан кейін қосқыштың жұмыс принципі өте қарапайым:

  • Бірінші рет іске қосылған кезде ол конфигурацияда көрсетілген дерекқорға қосылып, режимде іске қосылады бастапқы сурет, шартты көмегімен алынған деректердің бастапқы жинағын Кафкаға жіберу SELECT * FROM table_name.
  • Баптандыру аяқталғаннан кейін қосқыш PostgreSQL WAL файлдарындағы өзгерістерді оқу режиміне өтеді.

Қолданылатын опциялар туралы:

  • name — төменде сипатталған конфигурация қолданылатын қосқыштың аты; болашақта бұл атау Kafka Connect REST API арқылы қосқышпен жұмыс істеу үшін пайдаланылады (яғни, күйді көру/қайта іске қосу/конфигурацияны жаңарту);
  • connector.class — конфигурацияланған қосқыш пайдаланатын ДҚБЖ қосқыш класы;
  • plugin.name — WAL файлдарынан деректерді логикалық декодтауға арналған плагин атауы. Таңдау үшін қол жетімді wal2json, decoderbuffs и pgoutput. Алғашқы екеуі ДҚБЖ тиісті кеңейтімдерді орнатуды талап етеді және pgoutput PostgreSQL 10 және одан жоғары нұсқасы үшін қосымша манипуляцияларды қажет етпейді;
  • database.* — дерекқорға қосылу опциялары, мұнда database.server.name — Кафка кластерінде тақырып атауын қалыптастыру үшін пайдаланылатын PostgreSQL дана атауы;
  • table.include.list — өзгерістерді қадағалағымыз келетін кестелер тізімі; форматта көрсетілген schema.table_name; бірге қолдануға болмайды table.exclude.list;
  • heartbeat.interval.ms — коннектор арнайы тақырыпқа жүрек соғуы туралы хабарламаларды жіберетін интервал (миллисекундпен);
  • heartbeat.action.query — әрбір жүрек соғуы туралы хабарламаны жіберу кезінде орындалатын сұраныс (опция 1.1 нұсқасында пайда болды);
  • slot.name — қосқыш пайдаланатын репликация слотының атауы;
  • publication.name - Аты басылымдар қосқыш пайдаланатын PostgreSQL ішінде. Егер ол жоқ болса, Debezium оны жасауға тырысады. Егер қосылым орындалатын пайдаланушының бұл әрекетке құқықтары жеткіліксіз болса, қосқыш қатемен жұмысын тоқтатады;
  • transforms мақсатты тақырыптың атын өзгерту жолын дәл анықтайды:
    • transforms.AddPrefix.type тұрақты тіркестерді қолданатынымызды көрсетеді;
    • transforms.AddPrefix.regex — мақсатты тақырыптың атын қайта анықтайтын маска;
    • transforms.AddPrefix.replacement - тікелей біз қайта анықтайтын нәрсе.

Жүрек соғуы және түрлендірулер туралы толығырақ

Әдепкі бойынша қосқыш әрбір жасалған транзакция үшін деректерді Кафкаға жібереді және оның LSN (Журнал реттік нөмірі) қызмет тақырыбына жазылады. offset. Бірақ қосқыш толық дерекқорды емес, оның кестелерінің бір бөлігін ғана оқуға теңшелсе не болады (деректер жаңартулары жиі болмайды)?

  • Коннектор WAL файлдарын оқиды және ол бақылап отырған кестелерге ешқандай транзакцияны анықтамайды.
  • Сондықтан ол тақырыптағы немесе репликация ұяшығындағы ағымдағы орнын жаңартпайды.
  • Бұл, өз кезегінде, WAL файлдарының дискіде сақталуына және дискілік кеңістіктің таусылуына әкеледі.

Міне, опциялар құтқаруға келеді. heartbeat.interval.ms и heartbeat.action.query. Бұл опцияларды жұпта пайдалану жүрек соғу хабары жіберілген сайын бөлек кестедегі деректерді өзгертуге сұрауды орындауға мүмкіндік береді. Осылайша, қазіргі уақытта қосқыш орналасқан LSN (репликация ұясында) үнемі жаңартылып отырады. Бұл ДҚБЖ қажет емес WAL файлдарын жоюға мүмкіндік береді. Опциялар қалай жұмыс істейтіні туралы көбірек біле аласыз құжаттама.

Нақтырақ назар аударуға тұрарлық тағы бір нұсқа transforms. Бұл ыңғайлылық пен сұлулық туралы көбірек болса да...

Әдепкі бойынша, Debezium келесі атау саясатын пайдаланып тақырыптар жасайды: serverName.schemaName.tableName. Бұл әрқашан ыңғайлы болмауы мүмкін. Опциялар transforms Белгілі бір атаумен тақырыпқа бағытталу керек оқиғаларды, кестелер тізімін анықтау үшін тұрақты өрнектерді пайдалануға болады.

Біздің конфигурацияда рахмет transforms келесі жағдай орын алады: бақыланатын дерекқордағы барлық CDC оқиғалары аты бар тақырыпқа өтеді data.cdc.dbname. Әйтпесе (бұл параметрлерсіз), Debezium әдепкі бойынша әрбір кесте үшін тақырып жасайды: pg-dev.public.<table_name>.

Қосқыш шектеулері

PostgreSQL үшін қосқыш конфигурациясының сипаттамасын аяқтау үшін оның жұмысының келесі мүмкіндіктері/шектеулері туралы айту керек:

  1. PostgreSQL қосқышының функционалдығы логикалық декодтау тұжырымдамасына негізделген. Сондықтан ол дерекқор құрылымын өзгертуге сұрауларды қадағаламайды (DDL) - сәйкесінше бұл деректер тақырыптарда болмайды.
  2. Репликация слоттары пайдаланылғандықтан, қосқышты қосуға болады тек жетекші ДҚБ данасына.
  3. Егер қосқыш дерекқорға қосылатын пайдаланушының тек оқуға арналған құқықтары болса, бірінші іске қосу алдында репликация ұяшығын қолмен жасап, дерекқорға жариялау қажет болады.

Конфигурацияны қолдану

Сонымен, конфигурациямызды қосқышқа жүктейміз:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Жүктеп алу сәтті болғанын және қосқыштың іске қосылғанын тексереміз:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Тамаша: ол орнатылған және пайдалануға дайын. Енді тұтынушы болып көрініп, Кафкаға қосылайық, содан кейін кестеге жазба қосып, өзгертеміз:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Біздің тақырыпта ол келесідей көрсетіледі:

Өзгерістерімізбен өте ұзақ JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Екі жағдайда да жазбалар өзгертілген жазбаның кілтінен (PK) және өзгерістердің мәнінен тұрады: жазба бұрын болған және кейін не болған.

  • Жағдайда INSERT: алдындағы мән (before) тең null, ал кейін - енгізілген жол.
  • Жағдайда UPDATE: ішінде payload.before жолдың алдыңғы күйі көрсетіледі және ішінде payload.after — өзгерістердің мәнімен жаңа.

2.2 MongoDB

Бұл қосқыш негізгі ДҚБЖ түйінінің оплогынан ақпаратты оқу үшін стандартты MongoDB репликация механизмін пайдаланады.

PgSQL үшін бұрыннан сипатталған қосқышқа ұқсас, мұнда да бірінші іске қосу кезінде бастапқы деректер суреті алынады, содан кейін қосқыш оплогты оқу режиміне ауысады.

Конфигурация мысалы:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Көріп отырғаныңыздай, мұнда алдыңғы мысалмен салыстырғанда жаңа опциялар жоқ, тек дерекқорға қосылуға жауапты опциялардың саны және олардың префикстері қысқартылды.

Параметрлер transforms бұл жолы олар келесі әрекеттерді орындайды: олар схемадан мақсатты тақырыптың атын түрлендіреді <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

ақауларға төзімділік

Біздің уақытымызда ақауларға төзімділік және жоғары қолжетімділік мәселесі бұрынғыдан да өткір болып табылады - әсіресе деректер мен транзакциялар туралы айтатын болсақ, және деректердің өзгеруін қадағалау бұл мәселеде шет қалмайды. Келіңіздер, негізінен ненің дұрыс емес болуы мүмкін екенін және әр жағдайда Дебезиуммен не болатынын қарастырайық.

Бас тартудың үш нұсқасы бар:

  1. Kafka Connect сәтсіздігі. Егер Connect таратылған режимде жұмыс істеу үшін конфигурацияланса, бұл бірнеше жұмысшының бірдей group.id параметрін орнатуын талап етеді. Содан кейін, егер олардың біреуі сәтсіз болса, қосқыш басқа жұмысшыда қайта іске қосылады және Кафкадағы тақырыптағы соңғы орындалған позициядан оқуды жалғастырады.
  2. Кафка кластерімен байланысты жоғалту. Қосқыш жай ғана Кафкаға жібере алмаған позицияда оқуды тоқтатады және әрекет сәтті болғанша оны мезгіл-мезгіл қайта жіберуге тырысады.
  3. Деректер көзінің қолжетімсіздігі. Коннектор конфигурацияланғандай көзге қайта қосылуға әрекет жасайды. Әдепкі - 16 әрекетті пайдалану экспоненциалды кері шегініс. 16-шы сәтсіз әрекеттен кейін тапсырма келесідей белгіленеді Сәтсіз болды және оны Kafka Connect REST интерфейсі арқылы қолмен қайта іске қосу қажет болады.
    • Жағдайда PostgreSQL деректер жоғалмайды, өйткені Репликация слоттарын пайдалану қосқыш оқымайтын WAL файлдарын жоюға жол бермейді. Бұл жағдайда тиынның кемшілігі де бар: қосқыш пен ДҚБЖ арасындағы желілік байланыс ұзақ уақыт бойы үзілсе, дискілік кеңістік таусылып қалуы мүмкін және бұл істен шығуға әкелуі мүмкін. бүкіл ДҚБЖ.
    • Жағдайда MySQL қосылым қалпына келтірілмей тұрып binlog файлдарын ДҚБЖ өзі айналдыра алады. Бұл қосқыштың сәтсіз күйге өтуіне әкеледі және қалыпты жұмысты қалпына келтіру үшін бинлогтардан оқуды жалғастыру үшін бастапқы сурет режимінде қайта іске қосу қажет болады.
    • туралы MongoDB. Құжаттамада былай делінген: журнал/oplog файлдары жойылған және қосқыш тоқтаған жерінен оқуды жалғастыра алмаған жағдайда қосқыштың әрекеті барлық ДҚБЖ үшін бірдей. Бұл қосқыш күйге өтетінін білдіреді Сәтсіз болды және режимде қайта іске қосуды қажет етеді бастапқы сурет.

      Дегенмен, ерекше жағдайлар бар. Егер қосқыш ұзақ уақыт бойы ажыратылса (немесе MongoDB данасына жете алмаса) және осы уақыт ішінде оплог айналудан өтсе, қосылым қалпына келтірілгенде, қосқыш деректерді бірінші қол жетімді позициядан тыныш оқуды жалғастырады, сондықтан Кафкадағы кейбір деректер емес соғады.

қорытынды

Debezium - бұл CDC жүйелерімен алғашқы тәжірибем және жалпы алғанда өте жағымды. Жоба негізгі ДҚБЖ қолдауымен, конфигурацияның қарапайымдылығымен, кластерлік қолдауымен және белсенді қауымдастықпен жеңіске жетті. Практикаға қызығушылық танытқандар үшін нұсқаулықтарды оқуды ұсынамын Кафка қосылу и Дебезиум.

Kafka Connect үшін JDBC қосқышымен салыстырғанда, Debezium бағдарламасының басты артықшылығы - өзгерістер ДҚБЖ журналдарынан оқылады, бұл деректерді ең аз кідіріспен алуға мүмкіндік береді. JDBC қосқышы (Kafka Connect жүйесінен) бақыланатын кестені белгіленген аралықта сұрайды және деректер жойылған кезде (сол себепті) хабарлар жасамайды (болмайтын деректерді қалай сұрауға болады?).

Ұқсас мәселелерді шешу үшін келесі шешімдерге назар аударуға болады (Debezium-дан басқа):

PS

Біздің блогта да оқыңыз:

Ақпарат көзі: www.habr.com

пікір қалдыру