Ви го претставуваме Debezium - CDC за Apache Kafka

Ви го претставуваме Debezium - CDC за Apache Kafka

Во мојата работа, често наидувам на нови технички решенија/софтверски производи, информации за кои се прилично ретки на интернетот на руски јазик. Со оваа статија ќе се обидам да пополнам една таква празнина со пример од мојата неодамнешна практика, кога требаше да го конфигурирам испраќањето CDC настани од два популарни DBMS (PostgreSQL и MongoDB) до кластерот на Кафка користејќи Debezium. Се надевам дека овој напис за преглед, кој се појавува како резултат на сработеното, ќе биде корисен за другите.

Што е Debezium и CDC воопшто?

Дебезиум — претставник на категоријата софтвер ЦДЦ (Снимајте промена на податоци), или поточно, тоа е збир на конектори за различни DBMS компатибилни со рамката Apache Kafka Connect.

Тоа Проект со отворен код, лиценцирана под лиценцата Apache v2.0 и спонзорирана од Red Hat. Развојот е во тек од 2016 година и моментално обезбедува официјална поддршка за следните DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Има и конектори за Cassandra и Oracle, но во моментот тие се во статус „ран пристап“, а новите изданија не гарантираат компатибилност наназад.

Ако го споредиме CDC со традиционалниот пристап (кога апликацијата директно ги чита податоците од DBMS), нејзините главни предности вклучуваат имплементација на пренос на промени на податоци на ниво на ред со мала латентност, висока доверливост и достапност. Последните две точки се постигнуваат со користење на кластерот Кафка како складиште за ЦДЦ настани.

Друга предност е фактот што еден модел се користи за складирање на настани, така што крајната апликација не мора да се грижи за нијансите на работење со различни DBMS.

Конечно, користењето на брокер за пораки им овозможува на апликациите што ги следат промените во податоците да се намалат хоризонтално. Во исто време, влијанието врз изворот на податоци е минимизирано, бидејќи податоците не се добиваат директно од DBMS, туку од кластерот Кафка.

За архитектурата на Дебезиум

Користењето на Debezium се сведува на оваа едноставна шема:

DBMS (како извор на податоци) → конектор во Kafka Connect → Apache Kafka → потрошувач

Како илустрација, еве дијаграм од веб-страницата на проектот:

Ви го претставуваме Debezium - CDC за Apache Kafka

Сепак, не ми се допаѓа оваа шема, бидејќи се чини дека е можна само употреба на конектор за мијалник.

Во реалноста, ситуацијата е поинаква: пополнување на вашето Езеро на податоци (последна врска на дијаграмот погоре) Ова не е единствениот начин да се користи Debezium. Настаните испратени до Apache Kafka може да се користат од вашите апликации за справување со различни ситуации. На пример:

  • отстранување на ирелевантни податоци од кешот;
  • испраќање известувања;
  • ажурирања на индекси за пребарување;
  • некој вид логови за ревизија;
  • ...

Во случај да имате Java апликација и нема потреба/можност да користите кластер Кафка, постои можност и за работа преку вграден конектор. Очигледната предност е што ја елиминира потребата од дополнителна инфраструктура (во форма на конектор и Кафка). Сепак, ова решение е застарено од верзијата 1.1 и повеќе не се препорачува за употреба (поддршката за него може да биде отстранета во идните изданија).

Оваа статија ќе разговара за архитектурата препорачана од програмерите, која обезбедува толеранција на грешки и приспособливост.

Конфигурација на конектор

За да започнеме да ги следиме промените во најважната вредност - податоците - ни треба:

  1. извор на податоци, кој може да биде MySQL почнувајќи од верзијата 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (целосната листа);
  2. Кластерот Апачи Кафка;
  3. Кафка Конект пример (верзии 1.x, 2.x);
  4. конфигуриран Debezium конектор.

Работете на првите две точки, т.е. Процесот на инсталација на DBMS и Apache Kafka е надвор од опсегот на статијата. Сепак, за оние кои сакаат да распоредат сè во песокот, официјалното складиште со примери има готов docker-compose.yaml.

Ќе се задржиме подетално на последните две точки.

0. Кафка Конект

Овде и понатаму во статијата, сите примери за конфигурација се дискутирани во контекст на сликата на Docker дистрибуирана од развивачите на Debezium. Ги содржи сите потребни приклучни датотеки (конектори) и обезбедува конфигурација на Kafka Connect користејќи променливи на околината.

Ако имате намера да користите Kafka Connect од Confluent, ќе треба самостојно да ги додадете приклучоците од потребните конектори во директориумот наведен во plugin.path или поставете преку променлива на околината CLASSPATH. Поставките за работникот и конекторите на Kafka Connect се одредуваат преку конфигурациските датотеки што се пренесуваат како аргументи до командата за стартување на работникот. За повеќе детали, видете документација.

Целиот процес на поставување на Debeizum во верзијата на конекторот се изведува во две фази. Ајде да погледнеме во секој од нив:

1. Поставување на рамката на Kafka Connect

За пренос на податоци во кластерот Apache Kafka, специфични параметри се поставени во рамката Kafka Connect, како што се:

  • параметри за поврзување со кластерот,
  • имиња на теми во кои директно ќе се зачува конфигурацијата на самиот конектор,
  • името на групата во која работи конекторот (ако се користи дистрибуиран режим).

Официјалната слика на Docker на проектот ја поддржува конфигурацијата користејќи променливи на околината - ова е она што ќе го користиме. Значи, преземете ја сликата:

docker pull debezium/connect

Минималниот сет на променливи на животната средина потребни за да се вклучи конекторот е како што следува:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — почетна листа на сервери на кластерот Кафка за да се добие комплетна листа на членови на кластерот;
  • OFFSET_STORAGE_TOPIC=connector-offsets — тема за складирање позиции каде што моментално се наоѓа конекторот;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — тема за зачувување на статусот на конекторот и неговите задачи;
  • CONFIG_STORAGE_TOPIC=connector-config — тема за складирање на податоци за конфигурацијата на конекторот и неговите задачи;
  • GROUP_ID=1 — идентификатор на групата работници на кои може да се изврши задачата за поврзување; неопходни при користење на дистрибуирани (дистрибуирано) режим.

Го лансираме контејнерот со овие променливи:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Белешка за Авро

Стандардно, Debezium запишува податоци во JSON формат, што е прифатливо за песок и мали количини на податоци, но може да стане проблем во многу оптоварени бази на податоци. Алтернатива на конверторот JSON е серијализирање на пораките користејќи Avro во бинарен формат, што го намалува оптоварувањето на подсистемот В/И во Апачи Кафка.

За да користите Avro, треба да распоредите посебно шема-регистар (за складирање на дијаграми). Променливите за конверторот ќе изгледаат вака:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Деталите за користење на Avro и поставување на регистарот за него се надвор од опсегот на овој напис - понатаму, за јасност, ќе користиме JSON.

2. Конфигурирање на самиот конектор

Сега можете директно да отидете на конфигурацијата на самиот конектор, кој ќе ги чита податоците од изворот.

Да го погледнеме примерот на конектори за два DBMS: PostgreSQL и MongoDB, во кои имам искуство и во кои има разлики (иако мали, но во некои случаи значајни!).

Конфигурацијата е опишана во нотација JSON и е поставена на Kafka Connect со помош на барање POST.

2.1. PostgreSQL

Пример за конфигурација на конектор за PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Принципот на работа на конекторот по ова поставување е прилично едноставен:

  • Кога се стартува за прв пат, се поврзува со базата на податоци наведена во конфигурацијата и започнува во режим почетна слика, испраќајќи му го на Кафка почетниот сет на податоци добиени со користење на условното SELECT * FROM table_name.
  • Откако ќе заврши иницијализацијата, конекторот влегува во режим за читање на промените од датотеките PostgreSQL WAL.

За користените опции:

  • name — името на конекторот за кој се користи конфигурацијата опишана подолу; во иднина, ова име се користи за работа со конекторот (т.е. прегледување на статусот/рестартирање/ажурирање на конфигурацијата) преку Kafka Connect REST API;
  • connector.class — Класа на конектор DBMS што ќе ја користи конфигурираниот конектор;
  • plugin.name — името на приклучокот за логично декодирање на податоци од WAL-датотеките. Достапно за избор wal2json, decoderbuffs и pgoutput. Првите две бараат инсталирање на соодветни екстензии во DBMS, и pgoutput за PostgreSQL верзија 10 и повисока не бара дополнителни манипулации;
  • database.* — опции за поврзување со базата на податоци, каде database.server.name — Име на примерот на PostgreSQL што се користи за формирање на името на темата во кластерот Кафка;
  • table.include.list — список на табели во кои сакаме да ги следиме промените; наведено во форматот schema.table_name; не може да се користи заедно со table.exclude.list;
  • heartbeat.interval.ms — интервал (во милисекунди) со кој конекторот испраќа пораки за чукање на срцето на посебна тема;
  • heartbeat.action.query — барање што ќе се изврши при испраќање на секоја порака за отчукување на срцето (опцијата се појави во верзијата 1.1);
  • slot.name — името на отворот за репликација што ќе го користи конекторот;
  • publication.name - Име Објавување во PostgreSQL, што го користи конекторот. Ако не постои, Debezium ќе се обиде да го создаде. Ако корисникот под кој е направена врската нема доволно права за оваа акција, конекторот ќе прекине со грешка;
  • transforms одредува точно како да го смените името на целната тема:
    • transforms.AddPrefix.type покажува дека ќе користиме регуларни изрази;
    • transforms.AddPrefix.regex — маска што го редефинира името на целната тема;
    • transforms.AddPrefix.replacement - директно она што го редефинираме.

Повеќе за отчукувањата на срцето и трансформациите

Стандардно, конекторот испраќа податоци до Кафка за секоја извршена трансакција, а неговиот LSN (Log Sequence Number) се запишува во темата за услугата offset. Но, што се случува ако конекторот е конфигуриран да ја чита не целата база на податоци, туку само дел од нејзините табели (во кои ажурирањата на податоците не се случуваат често)?

  • Конекторот ќе ги чита WAL-датотеките и нема да открие никакви обврски за трансакции на табелите што ги следи.
  • Затоа, нема да ја ажурира својата моментална позиција ниту во темата ниту во отворот за репликација.
  • Ова, пак, ќе резултира со WAL-датотеките да се чуваат на дискот и најверојатно да снема простор на дискот.

И тука опциите доаѓаат на помош. heartbeat.interval.ms и heartbeat.action.query. Користењето на овие опции во парови овозможува да се изврши барање за промена на податоците во посебна табела секој пат кога се испраќа порака за отчукување на срцето. Така, LSN на кој моментално се наоѓа конекторот (во отворот за репликација) постојано се ажурира. Ова им овозможува на DBMS да ги отстрани WAL датотеките што повеќе не се потребни. Можете да дознаете повеќе за тоа како функционираат опциите документација.

Друга опција достојна за поголемо внимание е transforms. Иако повеќе се работи за практичноста и убавината...

Стандардно, Debezium создава теми користејќи ја следнава политика за именување: serverName.schemaName.tableName. Ова можеби не е секогаш погодно. Опции transforms Можете да користите регуларни изрази за да дефинирате листа на табели, настани од кои треба да се пренасочат до тема со одредено име.

Во нашата конфигурација, благодариме transforms се случува следново: сите CDC настани од набљудуваната база на податоци ќе одат на тема со име data.cdc.dbname. Во спротивно (без овие поставки), Debezium стандардно би креирал тема за секоја табела како: pg-dev.public.<table_name>.

Ограничувања на конектори

За да го заклучиме описот на конфигурацијата на конекторот за PostgreSQL, вреди да се зборува за следните карактеристики/ограничувања на неговата работа:

  1. Функционалноста на конекторот за PostgreSQL се потпира на концептот на логичко декодирање. Затоа тој не ги следи барањата за промена на структурата на базата на податоци (DDL) - соодветно, овие податоци нема да бидат во темите.
  2. Бидејќи се користат слотови за репликација, можно е поврзување на конектор само до водечкиот пример за DBMS.
  3. Ако корисникот под кој конекторот се поврзува со базата на податоци има права само за читање, тогаш пред првото стартување ќе треба рачно да креирате слот за репликација и да го објавите во базата на податоци.

Примена на конфигурацијата

Значи, ајде да ја вчитаме нашата конфигурација во конекторот:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Проверуваме дали преземањето е успешно и дека конекторот започна:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Одлично: поставен е и подготвен за работа. Сега да се преправаме дека сме потрошувач и да се поврземе со Кафка, по што ќе додадеме и промениме запис во табелата:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Во нашата тема ќе биде прикажано на следниов начин:

Многу долг JSON со нашите промени

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Во двата случаи, записите се состојат од клучот (PK) на записот што е променет, и самата суштина на промените: што бил записот пред и што станал после.

  • Во случај на INSERT: вредност пред (before) еднакви null, а потоа - линијата што беше вметната.
  • Во случај на UPDATE: на payload.before се прикажува претходната состојба на линијата и во payload.after — ново со суштината на промените.

2.2 MongoDB

Овој конектор го користи стандардниот механизам за репликација MongoDB, читајќи информации од оплогот на примарниот јазол DBMS.

Слично на веќе опишаниот конектор за PgSQL, и овде, при првиот почеток, се слика примарната снимка на податоци, по што конекторот се префрла во режим на оплог за читање.

Пример за конфигурација:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Како што можете да видите, овде нема нови опции во споредба со претходниот пример, туку само бројот на опции одговорни за поврзување со базата на податоци и нивните префикси е намален.

Подесувања transforms овој пат го прават следново: го трансформираат името на целната тема од шемата <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

толеранција на грешки

Прашањето за толеранција на грешки и висока достапност во нашево време е поакутно од кога било - особено кога зборуваме за податоци и трансакции, а следењето на промените на податоците не стои настрана во ова прашање. Ајде да погледнеме што може да тргне наопаку во принцип и што ќе се случи со Debezium во секој случај.

Постојат три опции за откажување:

  1. Неуспех на Kafka Connect. Ако Connect е конфигуриран да работи во дистрибуиран режим, ова бара повеќе работници да ја постават истата group.id. Потоа, ако некој од нив не успее, конекторот ќе се рестартира на друг работник и ќе продолжи да чита од последната посветена позиција во темата во Кафка.
  2. Губење на поврзаноста со кластерот Кафка. Конекторот едноставно ќе престане да чита на позицијата што не успеа да ја испрати до Кафка и периодично ќе се обидува повторно да ја испрати додека обидот не успее.
  3. Недостапност на изворот на податоци. Конекторот ќе се обиде повторно да се поврзе со изворот како што е конфигуриран. Стандардно е користење на 16 обиди експоненцијално повлекување. По 16. неуспешен обид, задачата ќе биде означена како не успеа и ќе треба рачно да го рестартирате преку интерфејсот Kafka Connect REST.
    • Во случај на PostgreSQL податоците нема да бидат изгубени, бидејќи Користењето слотови за репликација ќе ве спречи да ги избришете датотеките WAL што не се читаат од конекторот. Во овој случај, има и негативна страна на паричката: ако мрежното поврзување помеѓу конекторот и DBMS е нарушено подолго време, постои можност просторот на дискот да истече, а тоа може да доведе до дефект на целиот DBMS.
    • Во случај на MySQL, binlog-датотеките може да се ротираат од самиот DBMS пред да се врати поврзувањето. Ова ќе предизвика конекторот да премине во неуспешна состојба, а за да ја вратите нормалната работа, ќе треба да се рестартирате во режимот на почетна слика за да продолжите да читате од бинлогови.
    • на MongoDB. Во документацијата е наведено: однесувањето на конекторот во случај да се избришат датотеките за log/oplog и конекторот да не може да продолжи да чита од позицијата каде што застанал е исто за сите DBMS. Тоа значи дека конекторот ќе влезе во состојбата не успеа и ќе бара рестартирање во режим почетна слика.

      Сепак, постојат исклучоци. Ако конекторот беше исклучен долго време (или не можеше да стигне до примерокот на MongoDB), а оплогот помина низ ротација во ова време, тогаш кога врската ќе се врати, конекторот мирно ќе продолжи да чита податоци од првата достапна позиција, поради што некои од податоците во Кафка Нема ќе удри.

Заклучок

Debezium е моето прво искуство со ЦДЦ системи и севкупно многу позитивно. Проектот победи со неговата поддршка за главните DBMS, леснотијата на конфигурација, поддршката за кластерирање и активната заедница. За оние кои се заинтересирани за пракса, препорачувам да ги прочитаат водичите за Кафка Конект и Дебезиум.

Во споредба со JDBC конекторот за Kafka Connect, главната предност на Debezium е што промените се читаат од дневниците на DBMS, што овозможува да се примаат податоците со минимална латентност. JDBC Connector (од Kafka Connect) ја бара следената табела на фиксен интервал и (од истата причина) не генерира пораки кога податоците се бришат (како може да побарате податоци што не постојат?).

За да решите слични проблеми, можете да обрнете внимание на следните решенија (покрај Дебезиум):

PS

Прочитајте и на нашиот блог:

Извор: www.habr.com

Додадете коментар