Знаёмства з Debezium - CDC для Apache Kafka

Знаёмства з Debezium - CDC для Apache Kafka

У сваёй працы я часта сутыкаюся з новымі тэхнічнымі рашэннямі/праграмнымі прадуктамі, інфармацыі аб якіх у рускамоўным інтэрнэце даволі мала. Гэтым артыкулам пастараюся папоўніць адзін такі прабел прыкладам са сваёй нядаўняй практыкі, калі запатрабавалася наладзіць адпраўку CDC-падзей з двух папулярных СКБД (PostgreSQL і MongoDB) у кластар Kafka пры дапамозе Debezium. Спадзяюся, гэты аглядны артыкул, які з'явіўся па выніках праведзенай працы, апынецца карысным і іншым.

Што за Debezium і ўвогуле CDC?

Debezium - Прадстаўнік катэгорыі праграмнага забеспячэння CDC (Capture Data Change), а калі дакладней - гэта набор канектараў для розных СКБД, сумяшчальных з фрэймворкам Apache Kafka Connect.

Гэта Open Source-праект, які выкарыстоўвае ліцэнзію Apache License v2.0 і фундаваны кампаніяй Red Hat. Распрацоўка вядзецца з 2016 года і на дадзены момант у ім прадстаўлена афіцыйная падтрымка наступных СКБД: MySQL, PostgreSQL, MongoDB, SQL Server. Таксама існуюць канектары для Cassandra і Oracle, але на дадзены момант яны знаходзяцца ў статусе "ранняга доступу", а новыя рэлізы не гарантуюць зваротнай сумяшчальнасці.

Калі параўноўваць CDC з традыцыйным падыходам (калі прыкладанне чытае дадзеныя з СКБД напрамую), то да яго галоўных пераваг адносяць рэалізацыю стрымінгу змены дадзеных на ўзроўні радкоў з нізкай затрымкай, высокай надзейнасцю і даступнасцю. Апошнія два пункты дасягаюцца дзякуючы выкарыстанню кластара Kafka у якасці сховішча CDC-падзей.

Таксама да добрых якасцяў можна аднесці той факт, што для захоўвання падзей выкарыстоўваецца адзіная мадэль, таму канчатковаму з дадаткам не прыйдзецца турбавацца аб нюансах эксплуатацыі розных СКБД.

Нарэшце, дзякуючы выкарыстанню брокера паведамленняў адкрываецца абшар для гарызантальнага маштабавання прыкладанняў, якія адсочваюць змены ў дадзеных. Пры гэтым уплыў на крыніцу дадзеных зводзіцца да мінімуму, паколькі атрыманне дадзеных адбываецца не наўпрост з СКБД, а з кластара Kafka.

Аб архітэктуры Debezium

Выкарыстанне Debezium зводзіцца да такой простай схемы:

СКБД (як крыніца дадзеных) → канектар у Kafka Connect → Apache Kafka → кансьюмер

У якасці ілюстрацыі прывяду схему з сайта праекту:

Знаёмства з Debezium - CDC для Apache Kafka

Аднак гэтая схема мне не вельмі падабаецца, паколькі складаецца ўражанне, што магчыма толькі выкарыстанне sink-канектара.

У рэчаіснасці ж сітуацыя адрозніваецца: напаўненне вашага Data Lake (апошняе звяно на схеме вышэй) - Гэта не адзіны спосаб прымянення Debezium. Падзеі, адпраўленыя ў Apache Kafka, могуць быць выкарыстоўвацца вашымі праграмамі для вырашэння розных сітуацый. Напрыклад:

  • выдаленне неактуальных дадзеных з кэша;
  • адпраўка апавяшчэнняў;
  • абнаўлення пошукавых індэксаў;
  • нейкае падабенства логаў аўдыту;
  • ...

У выпадку, калі ў вас дадатак на Java і няма неабходнасці/магчымасці выкарыстоўваць кластар Kafka, існуе таксама магчымасць працы праз embedded-канектар. Відавочны плюс у тым, што з ім можна адмовіцца ад дадатковай інфраструктуры (у выглядзе канектара і Kafka). Аднак гэтае рашэнне абвешчана састарэлым (deprecated) з версіі 1.1 і больш не рэкамендуецца да выкарыстання (у будучых рэлізах яго падтрымку могуць прыбраць).

У дадзеным артыкуле будзе разглядацца рэкамендуемая распрацоўшчыкамі архітэктура, якая забяспечвае адмоваўстойлівасць і магчымасць маштабавання.

Канфігурацыя канектара

Для таго, каб пачаць адсочваць змены самай галоўнай каштоўнасці - дадзеных, - нам спатрэбяцца:

  1. крыніца дадзеных, якім можа з'яўляцца MySQL пачынальна з версіі 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (поўны спіс);
  2. кластар Apache Kafka;
  3. інстанс Kafka Connect (версіі 1.x, 2.x);
  4. сканфігураваны канектар Debezium.

Працы па першых двух пунктах, г.зн. працэс усталёўкі СКБД і Apache Kafka, выходзяць за рамкі артыкула. Аднак для тых, хто хоча разгарнуць усё ў пясочніцы, у афіцыйным рэпазітары з прыкладамі ёсць гатовы docker-compose.yaml.

Мы ж спынімся падрабязней на двух апошніх пунктах.

0. Kafka Connect

Тут і далей у артыкуле ўсе прыклады канфігурацыі разглядаюцца ў кантэксце Docker-выявы, які распаўсюджваецца распрацоўнікамі Debezium. Ён змяшчае ўсе неабходныя файлы убудоў (канектары) і прадугледжвае канфігурацыю Kafka Connect пры дапамозе зменных асяроддзі.

У выпадку, калі мяркуецца выкарыстанне Kafka Connect ад Confluent, запатрабуецца самастойна дадаць убудовы неабходных канектараў у дырэкторыю, паказаную ў plugin.path ці задаваную праз зменную асяроддзі CLASSPATH. Налады воркера Kafka Connect і канектараў вызначаюцца праз канфігурацыйныя файлы, якія перадаюцца аргументамі да каманды запуску воркера. Больш падрабязна гл. у дакументацыі.

Увесь працэс па наладзе Debeizum у варыянце з канектарам ажыццяўляецца ў два этапы. Разгледзім кожны з іх:

1. Настройка фрэймворка Kafka Connect

Для стрымінгу дадзеных у кластар Apache Kafka у фрэймворку Kafka Connect задаюцца спецыфічныя параметры, такія як:

  • параметры падлучэння да кластара,
  • назвы топікаў, у якіх будзе захоўвацца непасрэдна канфігурацыя самога канектара,
  • імя групы, у якой запушчаны канектар (у выпадку выкарыстання distributed-рэжыму).

Афіцыйны Docker-выява праекту падтрымлівае канфігурацыю пры дапамозе зменных асяроддзі — гэтым і скарыстаемся. Такім чынам, спампоўваем выяву:

docker pull debezium/connect

Мінімальны набор зменных асяроддзі, неабходны для запуску канектара, выглядае наступным чынам:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Пачатковы спіс сервераў кластара Kafka для атрымання поўнага спісу членаў кластара;
  • OFFSET_STORAGE_TOPIC=connector-offsets - топік для захоўвання пазіцый, на якіх на дадзены момант знаходзіцца канектар;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - топік для захоўвання статусу канектара і яго заданняў;
  • CONFIG_STORAGE_TOPIC=connector-config - топік для захоўвання дадзеных канфігурацыі канектара і яго заданняў;
  • GROUP_ID=1 - ідэнтыфікатар групы воркераў, на якіх можа выконвацца заданне канектара; неабходны пры выкарыстанні размеркаванага (distributed) рэжыму.

Запускаем кантэйнер з гэтымі зменнымі:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Заўвага пра Avro

Па змаўчанні Debezium піша дадзеныя ў фармаце JSON, што прымальна для пясочніц і невялікіх аб'ёмаў дадзеных, але можа стаць праблемай у высоканагружаных базах. Альтэрнатывай JSON-канвертару з'яўляецца серыялізацыя паведамленняў пры дапамозе Avro у бінарны фармат, што дазваляе зменшыць нагрузку на падсістэму I/O у Apache Kafka.

Для выкарыстання Avro патрабуецца разгарнуць асобны schema-registry (Для захоўвання схем). Зменныя для канвертара будуць выглядаць наступным чынам:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Дэталі па выкарыстанні Avro і наладзе registry за яго выходзяць за рамкі артыкула – далей для нагляднасці мы будзе выкарыстоўваць JSON.

2. Настройка самога канектара

Цяпер можна перайсці непасрэдна да канфігурацыі самога канектара, які будзе чытаць дадзеныя з крыніцы.

Разгледзім на прыкладзе канектараў для двух СКБД: PostgreSQL і MongoDB, - па якіх у мяне ёсць вопыт і па якіх маюцца адрозненні (хай і невялікія, але ў некаторых выпадках - істотныя!).

Канфігурацыя апісваецца ў натацыі JSON і загружаецца ў Kafka Connect пры дапамозе POST-запыту.

2.1. PostgreSQL

Прыклад канфігурацыі канектара для PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Прынцып працы канектара пасля такой налады даволі просты:

  • Пры першым запуску ён падлучаецца да базы, указанай у канфігурацыі, і запускаецца ў рэжыме initial snapshot, адпраўляючы ў Kafka пачатковы набор дадзеных, атрыманых з дапамогай умоўнага SELECT * FROM table_name.
  • Пасля таго, як ініцыялізацыя будзе завершана, канектар пераходзіць у рэжым чытання змен з WAL-файлаў PostgreSQL.

Аб выкарыстоўваных опцыях:

  • name - імя канектара, для якога выкарыстоўваецца канфігурацыя, апісаная ніжэй; у далейшым гэтае імя выкарыстоўваецца для працы з канектарам (г.зн. прагляду статуту/перазапуску/абнаўленні канфігурацыі) праз REST API Kafka Connect;
  • connector.class - Клас канектара СКБД, які будзе выкарыстоўвацца канфігуруемым канектарам;
  • plugin.name - назва плагіна для лагічнага дэкадавання дадзеных з WAL-файлаў. На выбар даступныя wal2json, decoderbuffs и pgoutput. Першыя два патрабуюць усталёўкі адпаведных пашырэнняў у СКБД, а pgoutput для PostgreSQL версіі 10 і вышэй не патрабуе дадатковых маніпуляцый;
  • database.* - опцыі для падлучэння да БД, дзе database.server.name - імя інстанса PostgreSQL, якое выкарыстоўваецца для фарміравання імя топіка ў кластары Kafka;
  • table.include.list - спіс табліц, у якіх мы хочам адсочваць змены; задаецца ў фармаце schema.table_name; нельга выкарыстоўваць разам з table.exclude.list;
  • heartbeat.interval.ms - Інтэрвал (у мілісекундах), з якім канектар адпраўляе heartbeat-паведамленні ў спецыяльны топік;
  • heartbeat.action.query - запыт, які будзе выконвацца пры адпраўцы кожнага heartbeat-паведамлення (опцыя з'явілася з версіі 1.1);
  • slot.name - імя слота рэплікацыі, які будзе выкарыстоўвацца канектарам;
  • publication.name - імя публікацыі у PostgreSQL, якую выкарыстоўвае канектар. У выпадку, калі яе не існуе, Debezium паспрабуе яе стварыць. У выпадку, калі ў карыстальніка, пад якім адбываецца падключэнне, недастаткова правоў для гэтага дзеяння - канектар завершыць працу з памылкай;
  • transforms вызначае, як менавіта змяняць назву мэтавага топіка:
    • transforms.AddPrefix.type паказвае, што будзем выкарыстоўваць рэгулярныя выразы;
    • transforms.AddPrefix.regex - маска, па якой перавызначаецца назва мэтавага топіка;
    • transforms.AddPrefix.replacement - непасрэдна тое, на што перавызначаем.

Больш падрабязна пра heartbeat і transforms

Па змаўчанні канектар адпраўляе дадзеныя ў Kafka па кожнай комитнут транзакцыі, а яе LSN (Log Sequence Number) запісвае ў службовы топік offset. Але што адбудзецца, калі канектар наладжаны на чытанне не ўсёй базы цалкам, а толькі часткі яе табліц (у якіх абнаўленне дадзеных адбываецца не часта)?

  • Канектар будзе чытаць WAL-файлы і не выяўляць у іх коміта транзакцый у тыя табліцы, за якімі ён сочыць.
  • Таму ён не будзе абнаўляць сваю бягучую пазіцыю ні ў топіцы, ні ў слоце рэплікацыі.
  • Гэта, у сваю чаргу, прывядзе да "ўтрымання" WAL-файлаў на дыску і верагоднага вычарпання ўсёй дыскавай прасторы.

І тут на дапамогу прыходзяць опцыі. heartbeat.interval.ms и heartbeat.action.query. Выкарыстанне гэтых опцый у пары дае магчымасць кожны раз пры адпраўцы heartbeat-паведамлення выконваць запыт на змяненне даных у асобнай табліцы. Тым самым увесь час актуалізуецца LSN, на якім цяпер знаходзіцца канектар (у слоце рэплікацыі). Гэта дазваляе СКБД выдаліць WAL-файлы, якія больш не патрэбныя. Падрабязней даведацца аб рабоце опцый можна ў дакументацыі.

Іншая опцыя, вартая больш пільнай увагі, - гэта transforms. Хаця яна хутчэй пра зручнасць і прыгажосць…

Па змаўчанні Debezium стварае топікі, кіруючыся наступнай палітыкай наймення: serverName.schemaName.tableName. Гэта не заўсёды можа быць зручна. Опцыямі transforms можна з дапамогай рэгулярных выразаў вызначаць спіс табліц, эвенты з якіх трэба маршрутызаваць у топік з канкрэтнай назвай.

У нашай канфігурацыі дзякуючы transforms адбываецца наступнае: усе CDC-падзеі з адсочванай БД патрапяць у топік з імем data.cdc.dbname. У адваротным выпадку (без гэтых налад) Debezium па змаўчанні бы ствараў па топіку на кожную табліцу выгляду: pg-dev.public.<table_name>.

Абмежаванні канектара

У завяршэнні апісання канфігурацыі канектара для PostgreSQL варта распавесці аб наступных асаблівасцях/абмежаваннях яго працы:

  1. Функцыянал канектара для PostgreSQL належыць на канцэпцыю лагічнага дэкадавання. Таму ён не адсочвае запыты на змену структуры БД (DDL) - адпаведна, у топіках гэтых дадзеных не будзе.
  2. Бо выкарыстоўваюцца слоты рэплікацыі, падлучэнне канектара магчыма толькі да вядучага экзэмпляра СКБД.
  3. Калі карыстачу, пад якім канектар падлучаецца да базы дадзеных, выдадзены правы толькі на чытанне, то перад першым запускам запатрабуецца ўручную стварыць слот рэплікацыі і публікацыю ў БД.

Ужыванне канфігурацыі

Такім чынам, загрузім нашу канфігурацыю ў канектар:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Правяраем, што загрузка прайшла паспяхова і канектар запусціўся:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Выдатна: ён настроены і гатовы да працы. Цяпер прыкінемся кансьюмерам і падлучымся да Kafka, пасля чаго дадамо і зменім запіс у табліцы:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

У нашым топіцы гэта адлюструецца наступным чынам:

Вельмі доўгі JSON з нашымі зменамі

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

У абодвух выпадках запісы складаюцца з ключа (PK) запісы, якая была зменена, і непасрэдна самой сутнасці змен: які быў запіс да і які стаў пасля.

  • У выпадку з INSERT: значэнне да (before) роўна null, а пасля - радок, якая была ўстаўлена.
  • У выпадку з UPDATE: у payload.before адлюстроўваецца папярэдні стан радка, а ў payload.after - Новае з сутнасцю змен.

2.2/XNUMX MongoDB

Гэты канектар выкарыстоўвае стандартны механізм рэплікацыі MongoDB, счытваючы інфармацыю з oplog'а primary-вузла СКБД.

Аналагічна ўжо апісанаму канектар для PgSQL, тут таксама пры першым запуску здымаецца першасны снапшот дадзеных, пасля чаго канектар перамыкаецца на рэжым чытання oplog'а.

Прыклад канфігурацыі:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Як можна заўважыць, тут няма новых опцый у параўнанні з мінулым прыкладам, але скарацілася толькі колькасць опцый, якія адказваюць за падлучэнне да БД і іх прэфіксы.

Налады transforms у гэты раз робяць наступнае: ператвараюць імя мэтавага топіка са схемы <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

адмоваўстойлівасць

Пытанне адмоваўстойлівасці і высокай даступнасці ў наш час стаіць як ніколі востра - асабліва калі мы гаворым пра дадзеныя і транзакцыі, і адсочванне змяненняў дадзеных не стаіць у гэтым пытанні ўбаку. Разгледзім, што ў прынцыпе можа пайсці не так і што будзе адбывацца з Debezium у кожным з выпадкаў.

Ёсць тры варыянты адмовы:

  1. Адмова Kafka Connect. Калі Connect наладжаны на працу ў размеркаваным рэжыме, для гэтага неабходна некалькім воркерам задаць аднолькавы group.id. Тады пры адмове аднаго з іх канектар будзе перазапушчаны на іншым воркеры і працягне чытанне з апошняй камітнутай пазіцыі ў топіцы ў Kafka.
  2. Страта складнасці з Kafka-кластарам. Канектар проста спыніць чытанне на пазіцыі, якую не ўдалося адправіць у Kafka, і будзе перыядычна спрабаваць паўторна адправіць яе, пакуль спроба не завершыцца поспехам.
  3. Недаступнасць крыніцы дадзеных. Канектар будзе рабіць спробы паўторнага падлучэння да крыніцы ў адпаведнасці з канфігурацыяй. Па змаўчанні гэта 16 спроб з выкарыстаннем exponential backoff. Пасля 16-й няўдалай спробы таск будзе пазначаны як не ўдалося і запатрабуецца яго ручны перазапуск праз REST-інтэрфейс Kafka Connect.
    • У выпадку з PostgreSQL дадзеныя не знікнуць, т.я. выкарыстанне слотаў рэплікацыі не дасць выдаліць WAL-файлы, не прачытаныя канектарам. У гэтым выпадку ёсць і адваротны бок медаля: калі на працяглы час будзе парушана сеткавая складнасць паміж канектарам і СКБД, ёсць верагоднасць, што месца на дыску скончыцца, а гэта можа прывесці да адмовы СКБД цалкам.
    • У выпадку з MySQL файлы бінлогаў могуць быць адратаваны самой СКБД раней, чым адновіцца складнасць. Гэта прывядзе да таго, што канектар пяройдзе ў стан failed, а для аднаўлення нармальнага функцыянавання запатрабуецца паўторны запуск у рэжыме initial snapshot для працягу чытання з бінлогаў.
    • Пра MongoDB. Дакументацыя абвяшчае: паводзіны канектара ў выпадку, калі файлы часопісаў/oplog'а былі выдаленыя і канектар не можа працягнуць чытанне з той пазіцыі, дзе спыніўся, аднолькава для ўсіх СКБД. Яно заключаецца ў тым, што канектар пяройдзе ў стан не ўдалося і запатрабуе паўторнага запуску ў рэжыме initial snapshot.

      Аднак бываюць выключэнні. Калі канектар працяглы час знаходзіўся ў адключаным стане (ці не мог дастукацца да асобніка MongoDB), а oplog за гэты час мінуў ратацыю, то пры аднаўленні падлучэння канектар спакойна працягне чытаць дадзеныя з першай даступнай пазіцыі, з-за чаго частка дадзеных у Kafka ня патрапіць.

Заключэнне

Debezium – мой першы досвед працы з CDC-сістэмамі і ў цэлым вельмі дадатны. Праект падкупіў падтрымкай асноўных СКБД, прастатой канфігурацыі, падтрымкай кластарызацыі і актыўнай супольнасцю. Які зацікавіўся практыкай рэкамендую азнаёміцца ​​з гайдамі для Kafka Connect и Debezium.

У параўнанні з JDBC-канектарам для Kafka Connect асноўнай перавагай Debezium з'яўляецца тое, што змены счытваюцца з часопісаў СКБД, што дазваляе атрымліваць дадзеныя з мінімальнай затрымкай. JDBC Connector (з пастаўкі Kafka Connect) робіць запыты да адсочванай табліцы з фіксаваным інтэрвалам і (па гэтай жа прычыне) не генеруе паведамленні пры выдаленні дадзеных (як можна запытаць дадзеныя, якіх няма?).

Для рашэння падобных задач можна звярнуць увагу на наступныя рашэнні (акрамя Debezium):

PS

Чытайце таксама ў нашым блогу:

Крыніца: habr.com

Дадаць каментар