Predstavujeme Debezium - CDC pre Apache Kafka

Predstavujeme Debezium - CDC pre Apache Kafka

Pri svojej práci sa často stretávam s novými technickými riešeniami/softvérovými produktmi, o ktorých je na ruskojazyčnom internete pomerne málo informácií. Týmto článkom sa pokúsim vyplniť jednu takúto medzeru príkladom z mojej nedávnej praxe, keď som potreboval nakonfigurovať odosielanie udalostí CDC z dvoch populárnych DBMS (PostgreSQL a MongoDB) do klastra Kafka pomocou Debezium. Dúfam, že tento prehľadný článok, ktorý sa objaví ako výsledok vykonanej práce, bude užitočný pre ostatných.

Čo je to Debezium a CDC vo všeobecnosti?

Debezium — zástupca kategórie softvéru CDC (Zaznamenať zmenu údajov), presnejšie ide o sadu konektorov pre rôzne DBMS kompatibilné s frameworkom Apache Kafka Connect.

To Open Source projekt, licencovaný pod licenciou Apache v2.0 a sponzorovaný spoločnosťou Red Hat. Vývoj prebieha od roku 2016 av súčasnosti poskytuje oficiálnu podporu pre nasledujúce DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Existujú tiež konektory pre Cassandra a Oracle, ale momentálne sú v stave „predbežného prístupu“ a nové vydania nezaručujú spätnú kompatibilitu.

Ak porovnáme CDC s tradičným prístupom (keď aplikácia načítava dáta priamo z DBMS), medzi jeho hlavné výhody patrí implementácia streamovania zmeny dát na úrovni riadkov s nízkou latenciou, vysokou spoľahlivosťou a dostupnosťou. Posledné dva body sa dosiahnu použitím klastra Kafka ako úložiska pre udalosti CDC.

Ďalšou výhodou je fakt, že na ukladanie udalostí je použitý jeden model, takže koncová aplikácia sa nemusí obávať nuansy prevádzky rôznych DBMS.

Nakoniec, použitie sprostredkovateľa správ umožňuje aplikáciám, ktoré monitorujú zmeny v údajoch, horizontálne škálovať. Zároveň je minimalizovaný dopad na zdroj dát, keďže dáta nie sú získavané priamo z DBMS, ale z Kafkovho klastra.

O architektúre Debezium

Použitie Debezium vychádza z tejto jednoduchej schémy:

DBMS (ako zdroj dát) → konektor v Kafka Connect → Apache Kafka → spotrebiteľ

Ako ilustráciu uvádzame diagram z webovej stránky projektu:

Predstavujeme Debezium - CDC pre Apache Kafka

Táto schéma sa mi však veľmi nepáči, pretože sa zdá, že je možné použiť iba konektor umývadla.

V skutočnosti je situácia iná: naplnenie dátového jazera (posledný odkaz na obrázku vyššie) Toto nie je jediný spôsob použitia Debezium. Udalosti odoslané do Apache Kafka môžu byť použité vašimi aplikáciami na riešenie rôznych situácií. Napríklad:

  • odstránenie irelevantných údajov z vyrovnávacej pamäte;
  • zasielanie upozornení;
  • aktualizácie indexu vyhľadávania;
  • nejaký druh denníkov auditu;
  • ...

V prípade, že máte Java aplikáciu a nie je potrebné/možnosť použiť klaster Kafka, je tu aj možnosť práce cez vstavaný konektor. Zjavnou výhodou je, že eliminuje potrebu ďalšej infraštruktúry (vo forme konektora a Kafka). Toto riešenie je však od verzie 1.1 zastarané a už sa neodporúča používať (podpora preň môže byť v budúcich vydaniach odstránená).

Tento článok sa bude zaoberať architektúrou odporúčanou vývojármi, ktorá poskytuje odolnosť voči chybám a škálovateľnosť.

Konfigurácia konektora

Aby sme mohli začať sledovať zmeny v najdôležitejšej hodnote – dátach – potrebujeme:

  1. zdroj údajov, ktorým môže byť MySQL od verzie 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (úplný zoznam);
  2. klaster Apache Kafka;
  3. Inštancia Kafka Connect (verzie 1.x, 2.x);
  4. nakonfigurovaný konektor Debezium.

Zapracujte na prvých dvoch bodoch, t.j. Proces inštalácie DBMS a Apache Kafka je nad rámec článku. Pre tých, ktorí však chcú všetko nasadiť do sandboxu, má pripravené oficiálne úložisko s príkladmi docker-compose.yaml.

Posledným dvom bodom sa budeme venovať podrobnejšie.

0. Kafka Connect

Tu a ďalej v článku sú všetky príklady konfigurácie diskutované v kontexte obrazu Docker distribuovaného vývojármi Debezium. Obsahuje všetky potrebné súbory pluginov (konektory) a poskytuje konfiguráciu Kafka Connect pomocou premenných prostredia.

Ak máte v úmysle používať Kafka Connect od spoločnosti Confluent, budete musieť nezávisle pridať pluginy potrebných konektorov do adresára uvedeného v plugin.path alebo nastaviť pomocou premennej prostredia CLASSPATH. Nastavenia pre pracovníka a konektory Kafka Connect sú definované prostredníctvom konfiguračných súborov, ktoré sa odovzdávajú ako argumenty príkazu worker start. Ďalšie podrobnosti nájdete v časti dokumentáciu.

Celý proces nastavenia Debeizum vo verzii konektora prebieha v dvoch fázach. Pozrime sa na každú z nich:

1. Nastavenie rámca Kafka Connect

Na streamovanie údajov do klastra Apache Kafka sú v rámci Kafka Connect nastavené špecifické parametre, ako napríklad:

  • nastavenia pripojenia klastra,
  • názvy tém, v ktorých bude priamo uložená konfigurácia samotného konektora,
  • názov skupiny, v ktorej je spustený konektor (v prípade použitia distribuovaného režimu).

Oficiálny obrázok projektu Docker podporuje konfiguráciu pomocou premenných prostredia - to je to, čo budeme používať. Tak si stiahnite obrázok:

docker pull debezium/connect

Minimálna množina premenných prostredia potrebná na spustenie konektora je nasledovná:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — počiatočný zoznam serverov klastra Kafka na získanie úplného zoznamu členov klastra;
  • OFFSET_STORAGE_TOPIC=connector-offsets — téma pre ukladanie pozícií, kde sa práve nachádza konektor;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — téma na ukladanie stavu konektora a jeho úloh;
  • CONFIG_STORAGE_TOPIC=connector-config — téma na ukladanie konfiguračných údajov konektora a jeho úloh;
  • GROUP_ID=1 — identifikátor skupiny pracovníkov, na ktorých možno vykonať úlohu konektora; potrebné pri použití distribuovaných (distribuované) režimu.

Kontajner začíname týmito premennými:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Poznámka o Avro

V predvolenom nastavení Debezium zapisuje údaje vo formáte JSON, ktorý je prijateľný pre karantény a malé množstvo údajov, ale môže sa stať problémom vo vysoko zaťažených databázach. Alternatívou k prevodníku JSON je serializácia správ pomocou Avro do binárneho formátu, čo znižuje zaťaženie I/O subsystému v Apache Kafka.

Ak chcete použiť Avro, musíte nasadiť samostatný schéma-registra (na ukladanie diagramov). Premenné pre prevodník budú vyzerať takto:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Podrobnosti o používaní Avro a jeho nastavení presahujú rámec tohto článku – ďalej pre prehľadnosť použijeme JSON.

2. Konfigurácia samotného konektora

Teraz môžete prejsť priamo na konfiguráciu samotného konektora, ktorý bude čítať údaje zo zdroja.

Pozrime sa na príklad konektorov pre dva DBMS: PostgreSQL a MongoDB, s ktorými mám skúsenosti a v ktorých sú rozdiely (síce malé, ale v niektorých prípadoch výrazné!).

Konfigurácia je opísaná v zápise JSON a nahraná do Kafka Connect pomocou požiadavky POST.

2.1. PostgreSQL

Príklad konfigurácie konektora pre PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Princíp činnosti konektora po tomto nastavení je pomerne jednoduchý:

  • Pri prvom spustení sa pripojí k databáze špecifikovanej v konfigurácii a spustí sa v režime počiatočná snímka, odoslanie Kafkovi počiatočný súbor údajov získaných pomocou podmieneného SELECT * FROM table_name.
  • Po dokončení inicializácie konektor prejde do režimu na čítanie zmien zo súborov PostgreSQL WAL.

O použitých možnostiach:

  • name — názov konektora, pre ktorý sa používa konfigurácia opísaná nižšie; v budúcnosti sa tento názov používa na prácu s konektorom (t. j. zobrazenie stavu/reštart/aktualizácia konfigurácie) cez Kafka Connect REST API;
  • connector.class — trieda konektora DBMS, ktorú bude používať nakonfigurovaný konektor;
  • plugin.name je názov pluginu pre logické dekódovanie údajov zo súborov WAL. Dostupné na výber wal2json, decoderbuffs и pgoutput. Prvé dva vyžadujú inštaláciu príslušných rozšírení v DBMS a pgoutput pre PostgreSQL verziu 10 a vyššiu nevyžaduje ďalšie manipulácie;
  • database.* — možnosti pripojenia k databáze, kde database.server.name - názov inštancie PostgreSQL použitej na vytvorenie názvu témy v klastri Kafka;
  • table.include.list — zoznam tabuliek, v ktorých chceme sledovať zmeny; špecifikované vo formáte schema.table_name; nemožno použiť spolu s table.exclude.list;
  • heartbeat.interval.ms — interval (v milisekundách), s ktorým konektor odosiela správy srdcového tepu na špeciálnu tému;
  • heartbeat.action.query — požiadavka, ktorá sa vykoná pri odoslaní každej správy o prehrávaní srdca (možnosť sa objavila vo verzii 1.1);
  • slot.name — názov replikačného slotu, ktorý bude konektor používať;
  • publication.name - názov Uverejnenie v PostgreSQL, ktorý konektor používa. Ak neexistuje, Debezium sa ho pokúsi vytvoriť. Ak používateľ, pod ktorým je pripojenie vytvorené, nemá dostatočné práva na túto akciu, konektor sa ukončí s chybou;
  • transforms presne určuje, ako zmeniť názov cieľovej témy:
    • transforms.AddPrefix.type označuje, že budeme používať regulárne výrazy;
    • transforms.AddPrefix.regex — maska, ktorá predefinuje názov cieľovej témy;
    • transforms.AddPrefix.replacement - priamo to, čo predefinujeme.

Viac o tlkot srdca a premenách

V predvolenom nastavení konektor odosiela Kafkovi údaje pre každú potvrdenú transakciu a jeho LSN (sekvenčné číslo denníka) je zaznamenané v téme služby offset. Čo sa však stane, ak je konektor nakonfigurovaný tak, aby nečítal celú databázu, ale iba časť jej tabuliek (v ktorých sa aktualizácie údajov nevyskytujú často)?

  • Konektor bude čítať súbory WAL a nezistí žiadne potvrdenia transakcií do tabuliek, ktoré monitoruje.
  • Preto nebude aktualizovať svoju aktuálnu pozíciu ani v téme, ani v replikačnom slote.
  • To zase povedie k tomu, že súbory WAL budú držané na disku a pravdepodobne dôjde k nedostatku miesta na disku.

A tu možnosti prichádzajú na záchranu. heartbeat.interval.ms и heartbeat.action.query. Použitie týchto možností v pároch umožňuje vykonať požiadavku na zmenu údajov v samostatnej tabuľke vždy, keď je odoslaná správa o prezúvaní. LSN, na ktorej sa konektor momentálne nachádza (v replikačnom slote), sa teda neustále aktualizuje. To umožňuje DBMS odstrániť súbory WAL, ktoré už nie sú potrebné. Môžete sa dozvedieť viac o tom, ako fungujú možnosti dokumentáciu.

Ďalšou možnosťou, ktorá si zaslúži väčšiu pozornosť, je transforms. Aj keď ide skôr o pohodlie a krásu...

V predvolenom nastavení Debezium vytvára témy pomocou nasledujúcej politiky pomenovania: serverName.schemaName.tableName. To nemusí byť vždy pohodlné. možnosti transforms pomocou regulárnych výrazov môžete definovať zoznam tabuliek, ktorých udalosti je potrebné nasmerovať na tému so špecifickým názvom.

V našej konfigurácii ďakujeme transforms stane sa nasledovné: všetky udalosti CDC z monitorovanej databázy prejdú na tému s názvom data.cdc.dbname. V opačnom prípade (bez týchto nastavení) by Debezium predvolene vytvorilo tému pre každú tabuľku, ako napríklad: pg-dev.public.<table_name>.

Obmedzenia konektorov

Na konci popisu konfigurácie konektora pre PostgreSQL stojí za to hovoriť o nasledujúcich vlastnostiach / obmedzeniach jeho práce:

  1. Funkčnosť konektora pre PostgreSQL sa opiera o koncept logického dekódovania. Preto on nesleduje požiadavky na zmenu štruktúry databázy (DDL) - podľa toho tieto údaje nebudú v témach.
  2. Keďže sa používajú replikačné sloty, je možné pripojiť konektor iba do vedúcej inštancie DBMS.
  3. Ak má používateľ, pod ktorým sa konektor pripája k databáze, práva len na čítanie, pred prvým spustením budete musieť manuálne vytvoriť replikačný slot a publikovať ho do databázy.

Aplikácia konfigurácie

Takže načítajme našu konfiguráciu do konektora:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Skontrolujeme, či bolo sťahovanie úspešné a konektor sa spustil:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Skvelé: je to nastavené a pripravené na použitie. Teraz predstierajme, že sme spotrebiteľ a pripojíme sa ku Kafkovi, potom pridáme a zmeníme záznam v tabuľke:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

V našej téme sa zobrazí takto:

Veľmi dlhý JSON s našimi zmenami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

V oboch prípadoch záznamy pozostávajú z kľúča (PK) záznamu, ktorý bol zmenený, a zo samotnej podstaty zmien: aký bol záznam predtým a čo sa stal potom.

  • V prípade INSERT: hodnota pred (before) sa rovná nullnasleduje reťazec, ktorý bol vložený.
  • V prípade UPDATE: in payload.before zobrazí sa predchádzajúci stav riadku a v payload.after — nové s podstatou zmien.

2.2 MongoDB

Tento konektor používa štandardný replikačný mechanizmus MongoDB, ktorý číta informácie z oplogu primárneho uzla DBMS.

Podobne ako pri už popísanom konektore pre PgSQL, aj tu sa pri prvom spustení urobí primárna snímka dát, po ktorej sa konektor prepne do režimu čítania oplogu.

Príklad konfigurácie:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Ako vidíte, v porovnaní s predchádzajúcim príkladom tu nie sú žiadne nové možnosti, ale znížil sa iba počet možností zodpovedných za pripojenie k databáze a ich predpony.

Nastavenie transforms tentoraz urobia nasledovné: transformujú názov cieľovej témy zo schémy <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

odolnosť proti chybám

Otázka odolnosti voči chybám a vysokej dostupnosti je v dnešnej dobe naliehavejšia ako kedykoľvek predtým – najmä ak hovoríme o údajoch a transakciách a sledovanie zmien údajov v tejto otázke nestojí bokom. Pozrime sa, čo sa môže v princípe pokaziť a čo sa stane s Debeziom v každom prípade.

Existujú tri možnosti odhlásenia:

  1. Porucha Kafka Connect. Ak je Connect nakonfigurované na prácu v distribuovanom režime, vyžaduje to, aby viacero pracovníkov nastavilo rovnaké group.id. Potom, ak jeden z nich zlyhá, konektor sa reštartuje na inom pracovníkovi a bude pokračovať v čítaní od poslednej potvrdenej pozície v téme v Kafke.
  2. Strata spojenia s Kafkovým klastrom. Konektor jednoducho prestane čítať na pozícii, ktorú sa nepodarilo odoslať Kafkovi, a bude sa pravidelne pokúšať o opätovné odoslanie, kým pokus neuspeje.
  3. Nedostupnosť zdroja údajov. Konektor sa pokúsi znova pripojiť k zdroju podľa konfigurácie. Predvolená hodnota je 16 pokusov exponenciálny ústup. Po 16. neúspešnom pokuse bude úloha označená ako neúspešný a budete ho musieť manuálne reštartovať cez rozhranie Kafka Connect REST.
    • V prípade PostgreSQL údaje sa nestratia, pretože Použitie replikačných slotov vám zabráni vymazať súbory WAL, ktoré konektor neprečíta. V tomto prípade je tu aj nevýhoda mince: ak sa na dlhší čas preruší sieťová konektivita medzi konektorom a DBMS, existuje možnosť, že sa minie miesto na disku, čo môže viesť k poruche celý DBMS.
    • V prípade MySQL binlog súbory môže samotná DBMS otáčať pred obnovením pripojenia. To spôsobí, že konektor prejde do neúspešného stavu a na obnovenie normálnej prevádzky budete musieť reštartovať v režime počiatočnej snímky, aby ste mohli pokračovať v čítaní z binlogov.
    • o MongoDB. V dokumentácii sa uvádza: správanie konektora v prípade, že boli vymazané súbory log/oplog a konektor nemôže pokračovať v čítaní od miesta, kde skončil, je rovnaké pre všetky DBMS. Spočíva v tom, že konektor pôjde do stavu neúspešný a bude vyžadovať reštart v režime počiatočná snímka.

      Existujú však aj výnimky. Ak bol konektor dlhší čas odpojený (alebo sa nemohol dostať k inštancii MongoDB) a oplog počas tejto doby prešiel rotáciou, po obnovení pripojenia bude konektor pokojne pokračovať v čítaní údajov z prvej dostupnej pozície, preto niektoré údaje v Kafkovi nie zasiahne.

Záver

Debezium je moja prvá skúsenosť so systémami CDC a celkovo veľmi pozitívna. Projekt si získal podporu pre hlavné DBMS, jednoduchosť konfigurácie, podporu klastrovania a aktívnu komunitu. Záujemcom o prax odporúčam prečítať si návody pre Kafka Connect и Debezium.

V porovnaní s konektorom JDBC pre Kafka Connect je hlavnou výhodou Debezium to, že zmeny sa čítajú z denníkov DBMS, čo umožňuje príjem údajov s minimálnou latenciou. JDBC Connector (od Kafka Connect) dopytuje monitorovanú tabuľku v pevnom intervale a (z rovnakého dôvodu) negeneruje správy, keď sú dáta vymazané (ako môžete dopytovať dáta, ktoré neexistujú?).

Ak chcete vyriešiť podobné problémy, môžete venovať pozornosť nasledujúcim riešeniam (okrem Debezium):

PS

Prečítajte si aj na našom blogu:

Zdroj: hab.com

Pridať komentár