Муаррифии Debezium - CDC барои Apache Kafka

Муаррифии Debezium - CDC барои Apache Kafka

Дар кори худ ман аксар вақт бо қарорҳои нави техникӣ / маҳсулоти нармафзор дучор мешавам, ки маълумот дар бораи онҳо дар Интернети русзабон хеле кам аст. Бо ин мақола, ман кӯшиш мекунам, ки як чунин холигоҳро бо мисоли таҷрибаи охирини худ пур кунам, вақте ки ба ман лозим буд, ки фиристодани рӯйдодҳои CDC аз ду DBMS маъмул (PostgreSQL ва MongoDB) ба кластери Кафка бо истифода аз Debezium насб кунам. Умедворам, ки ин мақолаи баррасии, ки дар натиҷаи кори анҷом дода шудааст, барои дигарон муфид хоҳад буд.

Debezium ва CDC умуман чист?

Дебезиум - Намояндаи категорияи нармафзори CDC (Тағироти маълумотро сабт кунед), ё аниқтараш, он маҷмӯи пайвасткунакҳо барои DBMS-ҳои гуногун мебошад, ки бо чаҳорчӯбаи Apache Kafka Connect мувофиқанд.

ин лоиҳаи кушодаасос, таҳти иҷозатномаи Apache License v2.0 ва сарпарастии Red Hat. Таҳия аз соли 2016 идома дорад ва дар айни замон он дастгирии расмии DBMS-ро пешниҳод мекунад: MySQL, PostgreSQL, MongoDB, SQL Server. Инчунин барои Кассандра ва Oracle пайвасткунакҳо мавҷуданд, аммо онҳо дар айни замон дар ҳолати "дастрасии барвақт" қарор доранд ва релизҳои нав мутобиқати ақибро кафолат намедиҳанд.

Агар мо CDC-ро бо равиши анъанавӣ муқоиса кунем (вақте ки барнома маълумотро аз DBMS мустақиман мехонад), пас бартариҳои асосии он татбиқи ҷараёни тағирёбии маълумот дар сатҳи сатр бо таъхири паст, эътимоднокии баланд ва дастрасиро дар бар мегиранд. Ду нуқтаи охир тавассути истифодаи кластери Кафка ҳамчун анбори рӯйдодҳои CDC ба даст оварда мешаванд.

Инчунин, бартариҳо аз он иборатанд, ки як модели ягона барои нигоҳ доштани рӯйдодҳо истифода мешавад, бинобар ин барномаи ниҳоӣ набояд дар бораи нозукиҳои корбарии DBMS гуногун хавотир шавад.

Ниҳоят, истифодаи брокери паёмӣ барои миқёси уфуқӣ барномаҳоеро, ки тағиротро дар маълумот пайгирӣ мекунанд, мекушояд. Дар айни замон, таъсир ба манбаи маълумот кам карда мешавад, зеро маълумот на мустақиман аз DBMS, балки аз кластери Кафка гирифта мешавад.

Дар бораи меъмории Debezium

Истифодаи Debezium ба ин нақшаи оддӣ меояд:

DBMS (ҳамчун манбаи маълумот) → пайвасткунак дар Kafka Connect → Apache Kafka → истеъмолкунанда

Ҳамчун мисол, ман диаграммаеро аз вебсайти лоиҳа медиҳам:

Муаррифии Debezium - CDC барои Apache Kafka

Бо вуҷуди ин, ман ин схемаро дар ҳақиқат дӯст намедорам, зеро ба назар чунин мерасад, ки танҳо як пайвасткунаки танӯр имконпазир аст.

Дар асл, вазъият дигар аст: пур кардани кӯли маълумотии шумо (пайванди охирин дар диаграммаи боло) ягона роҳи истифодаи Debezium нест. Чорабиниҳои ба Apache Kafka фиристодашуда метавонанд аз ҷониби барномаҳои шумо барои ҳалли ҳолатҳои гуногун истифода шаванд. Барои намуна:

  • тоза кардани маълумоти номатлуб аз кэш;
  • фиристодани огоҳиномаҳо;
  • навсозии индекси ҷустуҷӯ;
  • як навъ гузоришҳои аудит;
  • ...

Агар шумо барномаи Java дошта бошед ва зарурат/имконияти истифодаи кластери Кафка вуҷуд надошта бошад, инчунин имкони кор кардан тавассути пайвасткунаки дарунсохт. Бартарии возеҳ дар он аст, ки бо он шумо метавонед инфрасохтори иловагиро рад кунед (дар шакли пайвасткунак ва Кафка). Аммо, ин ҳалли аз версияи 1.1 бекор карда шудааст ва дигар барои истифода тавсия дода намешавад (он мумкин аст дар нашрҳои оянда хориҷ карда шаванд).

Ин мақола меъмории тавсиякардаи таҳиягаронро баррасӣ хоҳад кард, ки таҳаммулпазирӣ ва миқёспазириро таъмин мекунад.

Конфигуратсияи пайвасткунак

Барои оғоз кардани пайгирии тағирот дар арзиши муҳимтарин - маълумот ба мо лозим аст:

  1. манбаи маълумот, ки метавонад MySQL аз версияи 5.7, PostgreSQL 9.6+, MongoDB 3.2+ бошад (рӯйхати пурраи);
  2. Кластери Apache Kafka
  3. Мисоли Кафка Connect (версияҳои 1.x, 2.x);
  4. конфигуратсияи Debezium.

Аз рӯи ду нуқтаи аввал кор кунед, яъне. раванди насб кардани DBMS ва Apache Kafka аз доираи мақола берун аст. Бо вуҷуди ин, барои онҳое, ки мехоҳанд ҳама чизро дар қуттии қум ҷойгир кунанд, дар анбори расмӣ дорои намунаҳои тайёр мавҷуд аст. docker-compose.yaml.

Мо ба ду нуктаи охир муфассалтар тамаркуз мекунем.

0. Connect Kafka

Дар ин ҷо ва баъдтар дар мақола, ҳама намунаҳои конфигуратсия дар контексти тасвири Docker, ки аз ҷониби таҳиягарони Debezium паҳн карда шудаанд, баррасӣ карда мешаванд. Он дорои ҳамаи файлҳои плагини зарурӣ (пайвасткунакҳо) ва конфигуратсияи Кафка Connect бо истифода аз тағирёбандаҳои муҳити зистро таъмин мекунад.

Агар шумо ният доред, ки Kafka Connect-ро аз Confluent истифода баред, ба шумо лозим меояд, ки плагинҳои пайвасткунакҳои заруриро худатон ба директорияи дар зер нишондодашуда илова кунед. plugin.path ё тавассути тағирёбандаи муҳити зист муқаррар карда мешавад CLASSPATH. Танзимоти коргар ва пайвасткунакҳои Кафка тавассути файлҳои конфигуратсия муайян карда мешаванд, ки ҳамчун далел ба фармони оғози коргар интиқол дода мешаванд. Барои тафсилот нигаред хуччатхо.

Тамоми раванди насб кардани Debeizum дар версияи пайвасткунанда дар ду марҳила сурат мегирад. Биёед ҳар яки онҳоро баррасӣ кунем:

1. Танзими чаҳорчӯбаи Connect Kafka

Барои интиқоли маълумот ба кластери Apache Kafka, параметрҳои мушаххас дар чаҳорчӯбаи Kafka Connect муқаррар карда мешаванд, ба монанди:

  • танзимоти пайвасти кластер,
  • номҳои мавзӯъҳое, ки дар онҳо конфигуратсияи худи пайвасткунак нигоҳ дошта мешавад,
  • номи гурӯҳе, ки дар он пайвасткунак кор мекунад (дар ҳолати истифодаи реҷаи тақсимшуда).

Тасвири расмии Docker лоиҳа конфигуратсияро бо истифода аз тағирёбандаҳои муҳити зист дастгирӣ мекунад - ин чизест, ки мо истифода хоҳем кард. Пас биёед тасвирро зеркашӣ кунем:

docker pull debezium/connect

Ҳадди ақали маҷмӯи тағирёбандаҳои муҳити зист, ки барои иҷро кардани пайвасткунак лозим аст, чунин аст:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - рӯйхати ибтидоии серверҳои кластери Кафка барои гирифтани рӯйхати пурраи аъзоёни кластер;
  • OFFSET_STORAGE_TOPIC=connector-offsets — мавзӯъ барои нигоҳ доштани мавқеъҳо, ки дар айни замон пайвасткунанда ҷойгир аст;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - мавзӯъ барои нигоҳ доштани ҳолати пайвасткунак ва вазифаҳои он;
  • CONFIG_STORAGE_TOPIC=connector-config - мавзӯъ барои нигоҳ доштани маълумоти конфигуратсияи пайвасткунак ва вазифаҳои он;
  • GROUP_ID=1 — идентификатори гурухи коргароне, ки аз руи онхо супориши пайвасткуниро ичро кардан мумкин аст; ҳангоми истифодаи тақсимшуда талаб карда мешавад (тақсим карда мешавад) режим.

Мо контейнерро бо ин тағирёбандаҳо оғоз мекунем:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Эзоҳ дар бораи Avro

Бо нобаёнӣ, Debezium маълумотро дар формати JSON менависад, ки барои қуттиҳои қуттиҳо ва миқдори ками додаҳо қобили қабул аст, аммо метавонад дар пойгоҳи додаҳои пурбор мушкилот бошад. Як алтернативаи табдилдиҳандаи JSON ин силсила кардани паёмҳо мебошад Avro ба формати бинарӣ, ки сарбориро ба зерсистемаи I/O дар Apache Kafka коҳиш медиҳад.

Барои истифодаи Avro, шумо бояд алоҳида ҷойгир кунед схема-реестр (барои нигоҳ доштани схемаҳо). Тағирёбандаҳои конвертер чунин хоҳанд буд:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Тафсилот дар бораи истифодаи Avro ва таъсиси феҳрист барои он аз доираи мақола берун нестанд - минбаъд, барои равшанӣ, мо JSON-ро истифода хоҳем бурд.

2. Танзими худи пайвасткунак

Акнун шумо метавонед мустақиман ба конфигуратсияи худи пайвасткунанда гузаред, ки маълумотро аз манбаъ мехонад.

Биёед мисоли пайвасткунакҳоро барои ду DBMS бубинем: PostgreSQL ва MongoDB, ки ман таҷриба дорам ва барои онҳо фарқиятҳо мавҷуданд (ҳарчанд хурд, аммо дар баъзе ҳолатҳо назаррас!).

Конфигуратсия дар қайди JSON тавсиф шудааст ва бо истифода аз дархости POST ба Kafka Connect бор карда мешавад.

2.1. PostgreSQL

Намунаи конфигуратсияи пайвасткунак барои PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Принсипи кори пайвасткунак пас аз ин конфигуратсия хеле содда аст:

  • Дар оғози аввал, он ба базаи дар конфигуратсия нишондодашуда пайваст мешавад ва дар реҷа оғоз меёбад акси ибтидоӣ, ба Кафка фиристодани маҷмӯи ибтидоии маълумоти гирифташуда бо шартӣ SELECT * FROM table_name.
  • Пас аз ба итмом расидани оғозёбӣ, пайвасткунак ба ҳолати хондани тағирот аз файлҳои PostgreSQL WAL дохил мешавад.

Дар бораи имконоти истифодашуда:

  • name — номи пайвасткунак, ки барои он конфигуратсияи дар зер тавсифшуда истифода мешавад; баъдтар, ин ном барои кор бо пайвасткунанда истифода мешавад (яъне дидани ҳолат / бозоғозӣ / навсозии конфигуратсия) тавассути Kafka Connect REST API;
  • connector.class — синфи пайвасткунаки DBMS, ки аз ҷониби пайвасткунаки танзимшуда истифода мешавад;
  • plugin.name номи плагин барои рамзкушоии мантиқии маълумот аз файлҳои WAL мебошад. Барои интихоб кардан дастрас аст wal2json, decoderbuffs и pgoutput. Дуи аввал насби васеъшавии мувофиқро дар DBMS талаб мекунанд ва pgoutput барои PostgreSQL версияи 10 ва навтар, коркарди иловагӣ талаб намекунад;
  • database.* — имконоти пайвастшавӣ ба базаи маълумот, ки дар куҷо database.server.name - номи мисоли PostgreSQL, ки барои ташкили номи мавзӯъ дар кластери Кафка истифода мешавад;
  • table.include.list - рӯйхати ҷадвалҳое, ки мо мехоҳем тағиротро пайгирӣ кунем; дар формат дода шудааст schema.table_name; якҷоя истифода бурдан мумкин нест table.exclude.list;
  • heartbeat.interval.ms — фосила (бо милли сония), ки бо он пайвасткунак паёмҳои тапиши дилро ба мавзӯи махсус мефиристад;
  • heartbeat.action.query - дархосте, ки ҳангоми фиристодани ҳар як паёми тапиши дил иҷро мешавад (ин вариант аз версияи 1.1 пайдо шудааст);
  • slot.name — номи слотҳои такрорӣ, ки аз ҷониби пайвасткунанда истифода мешавад;
  • publication.name - Ном нашрияҳо дар PostgreSQL, ки пайвасткунак истифода мебарад. Дар сурати мавҷуд набудани он, Debezium кӯшиш мекунад, ки онро эҷод кунад. Агар корбаре, ки дар зери он пайвастшавӣ анҷом дода мешавад, барои ин амал ҳуқуқи кофӣ надошта бошад, пайвасткунак бо хатогӣ хориҷ мешавад;
  • transforms муайян мекунад, ки чӣ тавр ба таври дақиқ тағир додани номи мавзӯи мақсаднок:
    • transforms.AddPrefix.type нишон медиҳад, ки мо ибораҳои муқаррариро истифода хоҳем бурд;
    • transforms.AddPrefix.regex — ниқоб, ки бо он номи мавзӯи мавриди ҳадаф аз нав муайян карда мешавад;
    • transforms.AddPrefix.replacement - бевосита он чизеро, ки мо аз нав муайян мекунем.

Бештар дар бораи тапиши дил ва тағирот

Бо нобаёнӣ, пайвасткунак барои ҳар як транзаксияи анҷомдодашуда маълумотро ба Кафка мефиристад ва LSN-и худро (Рақами пайдарпаии гузориш) ба мавзӯи хидмат менависад offset. Аммо чӣ мешавад, агар пайвасткунак барои хондани на тамоми пойгоҳи додаҳо, балки танҳо як қисми ҷадвалҳои он танзим карда шавад (дар он маълумотҳо кам нав карда мешаванд)?

  • Пайвасткунак файлҳои WAL-ро мехонад ва содироти транзаксияро дар ҷадвалҳое, ки назорат мекунад, муайян намекунад.
  • Аз ин рӯ, он мавқеи кунунии худро на дар мавзӯъ ва на дар ковокии такрорӣ нав намекунад.
  • Ин дар навбати худ боиси он мегардад, ки файлҳои WAL дар диск "часпида" шаванд ва эҳтимолан фазои диск тамом шавад.

Ва дар ин ҷо вариантҳо ба наҷот меоянд. heartbeat.interval.ms и heartbeat.action.query. Истифодаи ин вариантҳо дар ҷуфт имкон медиҳад, ки дархост барои тағир додани маълумот дар ҷадвали алоҳида ҳар дафъае, ки паёми тапиши дил фиристода мешавад, иҷро карда шавад. Ҳамин тариқ, LSN, ки пайвасткунак дар айни замон ҷойгир аст (дар ковокии реплика) пайваста нав карда мешавад. Ин ба DBMS имкон медиҳад, ки файлҳои WAL-ро, ки дигар лозим нестанд, хориҷ кунад. Барои маълумоти бештар дар бораи чӣ гуна кор кардани вариантҳо, нигаред хуччатхо.

Варианти дигаре, ки сазовори таваҷҷӯҳи бештар аст transforms. Гарчанде ки ин бештар дар бораи роҳат ва зебоӣ аст ...

Бо нобаёнӣ, Debezium мавзӯъҳоро бо истифода аз сиёсати номгузории зерин эҷод мекунад: serverName.schemaName.tableName. Ин метавонад на ҳамеша қулай бошад. Имконот transforms бо истифода аз ибораҳои муқаррарӣ, шумо метавонед рӯйхати ҷадвалҳоеро муайян кунед, ки рӯйдодҳои онҳо бояд ба мавзӯъ бо номи мушаххас равона карда шаванд.

Дар конфигуратсияи мо ба шарофати transforms ҳодисаҳои зерин рух медиҳанд: ҳама рӯйдодҳои CDC аз пойгоҳи додаҳо ба мавзӯъ бо ном мераванд data.cdc.dbname. Дар акси ҳол (бе ин танзимот), Debezium ба таври нобаёнӣ барои ҳар як ҷадвали форма мавзӯъ эҷод мекунад: pg-dev.public.<table_name>.

Маҳдудиятҳои пайвасткунанда

Дар охири тавсифи конфигуратсияи пайвасткунак барои PostgreSQL, бояд дар бораи хусусиятҳо / маҳдудиятҳои кори он сӯҳбат кунем:

  1. Функсияи пайвасткунак барои PostgreSQL ба консепсияи рамзгузории мантиқӣ такя мекунад. Бинобар ин у дархостҳоро барои тағир додани сохтори базавӣ пайгирӣ намекунад (DDL) - мутаносибан, ин маълумот дар мавзӯъҳо нахоҳад буд.
  2. Азбаски слотҳои такрорӣ истифода мешаванд, пайвасти пайвасткунак имконпазир аст танҳо ба мисоли асосии DBMS.
  3. Агар корбаре, ки дар зери он пайвасткунак ба махзани маълумот пайваст мешавад, ҳуқуқҳои танҳо барои хондан дошта бошад, пас пеш аз оғози аввал ба шумо лозим меояд, ки слотҳои такрориро дастӣ эҷод кунед ва дар пойгоҳи додаҳо интишор кунед.

Татбиқи конфигуратсия

Пас биёед конфигуратсияи худро ба пайвасткунак бор кунем:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Мо тафтиш мекунем, ки зеркашӣ бомуваффақият буд ва пайвасткунак оғоз ёфт:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Аҷоиб: он насб шудааст ва барои рафтан омода аст. Акнун биёед худро истеъмолкунанда вонамуд кунем ва ба Кафка пайваст шавем, ки пас аз он мо воридотро дар ҷадвал илова ва тағир медиҳем:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Дар мавзӯи мо, ин ба таври зерин нишон дода мешавад:

JSON хеле дароз бо тағиротҳои мо

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Дар ҳарду ҳолат, сабтҳо аз калиди (PK) сабти тағирёфта ва моҳияти тағирот иборатанд: сабт қаблан чӣ буд ва пас аз он чӣ шуд.

  • Дар ин ҳолат INSERT: арзиши пеш аз (before) баробар аст nullпас аз он сатри воридшуда.
  • Дар ин ҳолат UPDATE: дар payload.before ҳолати пештараи сатр нишон дода мешавад ва дар payload.after - нав бо моҳияти тағйирот.

2.2 MongoDB

Ин пайвасткунак механизми такрории стандартии MongoDB-ро истифода мебарад, ки маълумотро аз оплоги гиреҳи ибтидоии DBMS мехонад.

Ба ҳамин монанд ба пайвасткунаки қаблан тавсифшуда барои PgSQL, дар ин ҷо низ ҳангоми оғози аввал тасвири ибтидоии маълумот гирифта мешавад, ки пас аз он пайвасткунак ба режими хониши оплог мегузарад.

Намунаи конфигуратсия:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Тавре ки шумо мебинед, дар муқоиса бо мисоли қаблӣ ягон варианти нав вуҷуд надорад, аммо танҳо шумораи вариантҳое, ки барои пайвастшавӣ ба пойгоҳи додаҳо масъуланд ва префиксҳои онҳо кам карда шудаанд.

Танзимотҳо transforms ин дафъа онҳо корҳои зеринро иҷро мекунанд: номи мавзӯи ҳадафро аз схема табдил диҳед <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

таҳаммулпазирии хатогиҳо

Масъалаи таҳаммулпазирӣ ба хатогиҳо ва дастрасии баланд дар замони мо беш аз пеш шадидтар аст - хусусан вақте ки мо дар бораи маълумот ва транзаксияҳо сӯҳбат мекунем ва пайгирии тағирёбии маълумот дар ин масъала дар канор нест. Биёед бубинем, ки чӣ метавонад дар асл хато кунад ва дар ҳар як ҳолат бо Дебезиум чӣ мешавад.

Се варианти радкунӣ вуҷуд дорад:

  1. Нокомии пайвасти Кафка. Агар Пайвастшавӣ барои кор дар реҷаи тақсимшуда танзим шуда бошад, ин аз коргарони сершумор талаб мекунад, ки ҳамон group.id-ро таъин кунанд. Сипас, агар яке аз онҳо ноком шавад, пайвасткунак дар коргари дигар аз нав оғоз карда мешавад ва хонданро аз мавқеи охирини мавзӯи Кафка идома медиҳад.
  2. Аз даст додани пайвастшавӣ бо кластери Кафка. Пайвасткунак хонданро дар мавқеъе, ки ба Кафка фиристода натавонист, қатъ мекунад ва давра ба давра кӯшиш мекунад, ки онро то муваффақ шудан ба кӯшиш дубора ирсол кунад.
  3. Манбаи маълумот дастрас нест. Пайвасткунак кӯшиш мекунад, ки мувофиқи конфигуратсия ба манбаъ дубора пайваст шавад. Пешфарз 16 кӯшиши истифода аст бозгашти экспоненсиалӣ. Пас аз кӯшиши 16-уми ноком, вазифа ҳамчун қайд карда мешавад ноком ва он бояд тавассути интерфейси Kafka Connect REST дастӣ аз нав оғоз карда шавад.
    • Дар ин ҳолат PostgreSQL маълумот гум намешавад, зеро истифодаи слотҳои такрорӣ ҳазфи файлҳои WAL-ро, ки аз ҷониби пайвасткунак хонда нашудаанд, пешгирӣ мекунад. Дар ин ҳолат, як манфии он вуҷуд дорад: агар пайвасти шабакавӣ байни пайвасткунак ва DBMS муддати тӯлонӣ қатъ карда шавад, эҳтимолияти тамом шудани фазои диск вуҷуд дорад ва ин метавонад боиси аз кор баромадани тамоми МБМ гардад.
    • Дар ин ҳолат MySQL Файлҳои binlog метавонанд аз ҷониби худи DBMS пеш аз барқарор шудани пайвастшавӣ гардиш карда шаванд. Ин боиси он мегардад, ки пайвасткунак ба ҳолати ноком меравад ва он бояд дар ҳолати аввалаи аксбардорӣ аз нав оғоз кунад, то хондан аз binlogs барои барқарор кардани кори муқаррарӣ идома диҳад.
    • ба Муғулистон. Ҳуҷҷатҳо мегӯянд, ки рафтори пайвасткунак дар сурати нест карда шудани файлҳои log/oplog ва пайвасткунак наметавонад хонданро аз мавқеъе, ки дар он мондааст, идома диҳад, барои ҳама DBMS якхела аст. Ин дар он аст, ки пайвасткунак ба давлат дохил мешавад ноком ва дар реҷа аз нав оғоз карданро талаб мекунад акси ибтидоӣ.

      Бо вуҷуди ин, истисноҳо вуҷуд доранд. Агар пайвасткунак муддати тӯлонӣ дар ҳолати ҷудошуда қарор дошта бошад (ё ба мисоли MongoDB расида натавонист) ва оплог дар давоми ин муддат гардиш карда бошад, пас вақте ки пайвастшавӣ барқарор карда мешавад, пайвасткунак оромона хондани маълумотро аз мавқеи аввалини дастрас идома медиҳад. , ки чаро баъзе маълумот дар Кафка не мезанад.

хулоса

Debezium аввалин таҷрибаи ман бо системаҳои CDC аст ва дар маҷмӯъ хеле мусбат буд. Лоиҳа барои дастгирии МДМ асосӣ, осонии конфигуратсия, дастгирии кластерсозӣ ва ҷомеаи фаъол ришва дод. Барои онҳое, ки ба амалия таваҷҷӯҳ доранд, ман тавсия медиҳам, ки дастурҳоро хонед Кафка пайваст и Дебезиум.

Дар муқоиса бо пайвасткунаки JDBC барои Kafka Connect, бартарии асосии Debezium дар он аст, ки тағиротҳо аз гузоришҳои DBMS хонда мешаванд, ки имкон медиҳад маълумот бо таъхири ҳадди ақал қабул карда шавад. Пайвасткунаки JDBC (аз ҷониби Кафка Connect пешниҳод шудааст) ҷадвали пайгиришударо дар фосилаи муқарраршуда дархост мекунад ва (бо ҳамин сабаб) ҳангоми нест кардани маълумот паёмҳо тавлид намекунад (чӣ гуна шумо метавонед маълумотеро, ки дар он ҷо мавҷуд нестанд, пурсед?).

Барои ҳалли мушкилоти шабеҳ, шумо метавонед ба ҳалли зерин диққат диҳед (ғайр аз Debezium):

PS

Инчунин дар блоги мо хонед:

Манбаъ: will.com

Илова Эзоҳ