Představujeme Debezium - CDC pro Apache Kafka

Představujeme Debezium - CDC pro Apache Kafka

Při své práci se často setkávám s novými technickými řešeními / softwarovými produkty, o kterých je na rusky mluvícím internetu poměrně málo informací. Tímto článkem se pokusím jednu takovou mezeru zaplnit příkladem ze své nedávné praxe, kdy jsem potřeboval nastavit odesílání událostí CDC ze dvou oblíbených DBMS (PostgreSQL a MongoDB) do Kafka clusteru pomocí Debezium. Doufám, že tento přehledový článek, který se objevil jako výsledek odvedené práce, bude užitečný pro ostatní.

Co je Debezium a CDC obecně?

Debezium - Zástupce kategorie softwaru CDC (Zachyťte změnu dat), přesněji se jedná o sadu konektorů pro různé DBMS, které jsou kompatibilní s frameworkem Apache Kafka Connect.

To open source projekt, licencováno pod licencí Apache v2.0 a sponzorováno společností Red Hat. Vývoj probíhá od roku 2016 a v současné době poskytuje oficiální podporu pro tyto DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Existují také konektory pro Cassandru a Oracle, ale momentálně jsou ve stavu „early access“ a nové verze nezaručují zpětnou kompatibilitu.

Porovnáme-li CDC s tradičním přístupem (kdy aplikace čte data přímo z DBMS), pak mezi jeho hlavní výhody patří implementace streamování změn dat na úrovni řádků s nízkou latencí, vysokou spolehlivostí a dostupností. Posledních dvou bodů je dosaženo použitím clusteru Kafka jako úložiště událostí CDC.

Mezi výhody patří také to, že pro ukládání událostí je použit jeden model, takže se finální aplikace nemusí starat o nuance provozu různých DBMS.

A konečně, použití zprostředkovatele zpráv otevírá prostor pro horizontální škálování aplikací, které sledují změny v datech. Zároveň je minimalizován dopad na zdroj dat, protože data nejsou přijímána přímo z DBMS, ale z clusteru Kafka.

O architektuře Debezium

Použití Debezium spočívá v tomto jednoduchém schématu:

DBMS (jako zdroj dat) → konektor v Kafka Connect → Apache Kafka → spotřebitel

Jako ilustraci uvedu schéma z webu projektu:

Představujeme Debezium - CDC pro Apache Kafka

Toto schéma se mi však moc nelíbí, protože se zdá, že je možný pouze konektor dřezu.

Ve skutečnosti je situace jiná: naplnění vašeho Data Lake (poslední odkaz na obrázku výše) není jediný způsob, jak používat Debezium. Události odeslané do Apache Kafka mohou vaše aplikace použít k řešení různých situací. Například:

  • odstranění nepodstatných dat z mezipaměti;
  • zasílání upozornění;
  • aktualizace indexu vyhledávání;
  • nějaký druh protokolů auditu;
  • ...

V případě, že máte Java aplikaci a není potřeba/možnost používat Kafka cluster, je zde také možnost propracovat vestavěný konektor. Zjevnou výhodou je, že s ním můžete odmítnout další infrastrukturu (ve formě konektoru a Kafky). Toto řešení je však od verze 1.1 zastaralé a již se nedoporučuje používat (v budoucích verzích může být odstraněno).

Tento článek pojednává o architektuře doporučené vývojáři, která poskytuje odolnost proti chybám a škálovatelnost.

Konfigurace konektoru

Abychom mohli začít sledovat změny v nejdůležitější hodnotě – datech – potřebujeme:

  1. zdroj dat, kterým může být MySQL od verze 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (Úplný seznam);
  2. Cluster Apache Kafka
  3. instance Kafka Connect (verze 1.x, 2.x);
  4. nakonfigurovaný konektor Debezium.

Zapracujte na prvních dvou bodech, tzn. proces instalace DBMS a Apache Kafka jsou nad rámec článku. Pro ty, kteří chtějí vše nasadit v sandboxu, je však připravený v oficiálním repozitáři s příklady docker-compose.yaml.

Podrobněji se zaměříme na poslední dva body.

0. Kafka Connect

Zde a dále v článku jsou všechny příklady konfigurace zvažovány v kontextu obrazu Docker distribuovaného vývojáři Debezium. Obsahuje všechny potřebné soubory pluginů (konektory) a poskytuje konfiguraci Kafka Connect pomocí proměnných prostředí.

Pokud hodláte používat Kafka Connect od společnosti Confluent, budete muset sami přidat pluginy potřebných konektorů do adresáře uvedeného v plugin.path nebo nastavit pomocí proměnné prostředí CLASSPATH. Nastavení pro pracovníka a konektory Kafka Connect jsou definovány prostřednictvím konfiguračních souborů, které jsou předány jako argumenty příkazu worker start. Podrobnosti viz dokumentace.

Celý proces nastavení Debeizum v konektorové verzi probíhá ve dvou fázích. Podívejme se na každou z nich:

1. Nastavení rámce Kafka Connect

Pro streamování dat do clusteru Apache Kafka jsou v rámci Kafka Connect nastaveny specifické parametry, jako například:

  • nastavení připojení clusteru,
  • názvy témat, ve kterých bude uložena konfigurace samotného konektoru,
  • název skupiny, ve které konektor běží (v případě použití distribuovaného režimu).

Oficiální obrázek projektu Docker podporuje konfiguraci pomocí proměnných prostředí – to je to, co budeme používat. Takže si stáhneme obrázek:

docker pull debezium/connect

Minimální sada proměnných prostředí vyžadovaná ke spuštění konektoru je následující:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - počáteční seznam serverů clusteru Kafka pro získání úplného seznamu členů clusteru;
  • OFFSET_STORAGE_TOPIC=connector-offsets — téma pro ukládání pozic, kde se konektor právě nachází;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - téma pro uložení stavu konektoru a jeho úkolů;
  • CONFIG_STORAGE_TOPIC=connector-config - téma pro ukládání konfiguračních dat konektoru a jeho úkolů;
  • GROUP_ID=1 — identifikátor skupiny pracovníků, na kterých lze provést úlohu konektoru; vyžadováno při použití distribuovaných (distribuováno) režim.

Začneme kontejner s těmito proměnnými:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Poznámka o Avro

Ve výchozím nastavení zapisuje Debezium data ve formátu JSON, což je přijatelné pro sandboxy a malé objemy dat, ale v silně zatížených databázích může být problém. Alternativou k převodníku JSON je serializace zpráv pomocí Avro do binárního formátu, což snižuje zatížení I/O subsystému v Apache Kafka.

Chcete-li používat Avro, musíte nasadit samostatný schéma-registr (pro ukládání schémat). Proměnné pro převodník budou vypadat takto:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Podrobnosti o používání Avro a nastavení registru pro něj jsou nad rámec článku – dále pro přehlednost použijeme JSON.

2. Nastavení samotného konektoru

Nyní můžete přejít přímo do konfigurace samotného konektoru, který bude číst data ze zdroje.

Podívejme se na příklad konektorů pro dva DBMS: PostgreSQL a MongoDB, se kterými mám zkušenosti a u kterých existují rozdíly (sice malé, ale v některých případech značné!).

Konfigurace je popsána v notaci JSON a nahrána do Kafka Connect pomocí požadavku POST.

2.1. PostgreSQL

Příklad konfigurace konektoru pro PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Princip fungování konektoru po této konfiguraci je poměrně jednoduchý:

  • Při prvním spuštění se připojí k databázi uvedené v konfiguraci a spustí se v režimu počáteční snímek, odešle Kafkovi počáteční sadu dat přijatých s podmínkou SELECT * FROM table_name.
  • Po dokončení inicializace konektor přejde do režimu čtení změn ze souborů PostgreSQL WAL.

O použitých možnostech:

  • name — název konektoru, pro který se používá níže popsaná konfigurace; v budoucnu se tento název používá pro práci s konektorem (tj. zobrazení stavu / restart / aktualizace konfigurace) prostřednictvím Kafka Connect REST API;
  • connector.class — třída konektoru DBMS, kterou bude konfigurovaný konektor používat;
  • plugin.name je název pluginu pro logické dekódování dat ze souborů WAL. K dispozici na výběr wal2json, decoderbuffs и pgoutput. První dva vyžadují instalaci příslušných rozšíření v DBMS a pgoutput pro PostgreSQL verze 10 a vyšší nevyžaduje další manipulace;
  • database.* — možnosti připojení k databázi, kde database.server.name - název instance PostgreSQL použité k vytvoření názvu tématu v clusteru Kafka;
  • table.include.list - seznam tabulek, ve kterých chceme sledovat změny; uvedeno ve formátu schema.table_name; nelze použít společně s table.exclude.list;
  • heartbeat.interval.ms — interval (v milisekundách), s nímž konektor odesílá zprávy prezenčního signálu na speciální téma;
  • heartbeat.action.query - požadavek, který bude proveden při odeslání každé zprávy o srdečním tepu (možnost se objevuje od verze 1.1);
  • slot.name — název replikačního slotu, který bude konektor používat;
  • publication.name - Název Uveřejnění v PostgreSQL, který konektor používá. V případě, že neexistuje, Debezium se jej pokusí vytvořit. Pokud uživatel, pod kterým je připojení vytvořeno, nemá dostatečná práva pro tuto akci, konektor se ukončí s chybou;
  • transforms určuje, jak přesně změnit název cílového tématu:
    • transforms.AddPrefix.type označuje, že budeme používat regulární výrazy;
    • transforms.AddPrefix.regex — maska, pomocí které je předefinován název cílového tématu;
    • transforms.AddPrefix.replacement - přímo to, co nově definujeme.

Více o tepu a proměnách

Ve výchozím nastavení konektor odesílá data do Kafky pro každou potvrzenou transakci a zapisuje své LSN (sekvenční číslo protokolu) do tématu služby. offset. Co se ale stane, pokud je konektor nakonfigurován tak, aby nečetl celou databázi, ale pouze část jejích tabulek (ve kterých se data aktualizují jen zřídka)?

  • Konektor bude číst soubory WAL a nezjistí v nich potvrzení transakcí do tabulek, které sleduje.
  • Proto nebude aktualizovat svou aktuální pozici v tématu ani v replikačním slotu.
  • To zase způsobí, že se soubory WAL „zaseknou“ na disku a pravděpodobně jim dojde místo na disku.

A zde na záchranu přicházejí možnosti. heartbeat.interval.ms и heartbeat.action.query. Použití těchto možností ve dvojicích umožňuje provést požadavek na změnu dat v samostatné tabulce pokaždé, když je odeslána prezenční zpráva. LSN, na kterém je konektor aktuálně umístěn (v replikačním slotu), je tedy neustále aktualizován. To umožňuje DBMS odstranit soubory WAL, které již nejsou potřeba. Další informace o tom, jak možnosti fungují, viz dokumentace.

Další možností, která si zaslouží bližší pozornost, je transforms. I když jde spíše o pohodlí a krásu...

Ve výchozím nastavení vytváří Debezium témata pomocí následujících zásad pojmenování: serverName.schemaName.tableName. To nemusí být vždy pohodlné. Možnosti transforms pomocí regulárních výrazů můžete definovat seznam tabulek, jejichž události je třeba směrovat na téma se specifickým názvem.

V naší konfiguraci díky transforms stane se následující: všechny události CDC ze sledované databáze přejdou na téma s názvem data.cdc.dbname. Jinak (bez těchto nastavení) by Debezium ve výchozím nastavení vytvořilo téma pro každou tabulku formuláře: pg-dev.public.<table_name>.

Omezení konektoru

Na konci popisu konfigurace konektoru pro PostgreSQL stojí za to mluvit o následujících funkcích / omezeních jeho práce:

  1. Funkce konektoru pro PostgreSQL spoléhá na koncept logického dekódování. Proto on nesleduje požadavky na změnu struktury databáze (DDL) - podle toho tyto údaje nebudou v tématech.
  2. Protože jsou použity replikační sloty, je možné připojení konektoru pouze do hlavní instance DBMS.
  3. Pokud má uživatel, pod kterým se konektor připojuje k databázi, práva pouze pro čtení, pak před prvním spuštěním budete muset ručně vytvořit replikační slot a publikovat do databáze.

Použití konfigurace

Nahrajeme tedy naši konfiguraci do konektoru:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Zkontrolujeme, že stahování proběhlo úspěšně a konektor se spustil:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Skvělé: je to nastaveno a připraveno k použití. Nyní předstírejme, že jsme spotřebitel a připojíme se ke Kafkovi, poté přidáme a změníme záznam v tabulce:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

V našem tématu se to zobrazí takto:

Velmi dlouhý JSON s našimi změnami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

V obou případech se záznamy skládají z klíče (PK) záznamu, který byl změněn, a ze samotné podstaty změn: co byl záznam předtím a čím se stal poté.

  • V případě INSERT: hodnota před (before) rovná se nullnásledovaný řetězcem, který byl vložen.
  • V případě UPDATE: na payload.before zobrazí se předchozí stav řádku a v payload.after - nový s podstatou změny.

2.2 MongoDB

Tento konektor používá standardní replikační mechanismus MongoDB, který čte informace z oplogu primárního uzlu DBMS.

Podobně jako u již popsaného konektoru pro PgSQL, i zde se při prvním spuštění pořídí primární datový snímek, po kterém se konektor přepne do režimu čtení oplogu.

Příklad konfigurace:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Jak vidíte, oproti předchozímu příkladu zde nejsou žádné nové možnosti, pouze se snížil počet možností zodpovědných za připojení k databázi a jejich prefixů.

Nastavení transforms tentokrát udělají následující: otočte název cílového tématu ze schématu <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

odolnost proti chybám

Otázka odolnosti proti chybám a vysoké dostupnosti je v naší době naléhavější než kdy jindy – zvláště když mluvíme o datech a transakcích a sledování změn dat není v této věci na okraji zájmu. Podívejme se na to, co se v principu může pokazit a co se v každém případě stane s Debeziem.

Existují tři možnosti odhlášení:

  1. Selhání Kafka Connect. Pokud je Connect nakonfigurován pro práci v distribuovaném režimu, vyžaduje to, aby více pracovníků nastavilo stejné group.id. Pokud pak jeden z nich selže, konektor se restartuje na druhém pracovníkovi a pokračuje ve čtení od poslední potvrzené pozice v tématu v Kafkovi.
  2. Ztráta konektivity s Kafkovým clusterem. Konektor jednoduše přestane číst na pozici, kterou se nepodařilo odeslat do Kafky, a pravidelně se jej bude pokoušet znovu odeslat, dokud pokus neuspěje.
  3. Zdroj dat není dostupný. Konektor se pokusí znovu připojit ke zdroji podle konfigurace. Výchozí hodnota je 16 pokusů exponenciální ústup. Po 16. neúspěšném pokusu bude úkol označen jako neúspěšný a bude nutné jej ručně restartovat přes rozhraní Kafka Connect REST.
    • V případě PostgreSQL data se neztratí, protože použití replikačních slotů zabrání odstranění souborů WAL, které konektor nepřečte. V tomto případě je zde nevýhoda: pokud je na delší dobu narušena síťová konektivita mezi konektorem a DBMS, existuje šance, že dojde místo na disku, a to může vést k selhání celého DBMS.
    • V případě MySQL binlog soubory mohou být otočeny samotným DBMS před obnovením připojení. To způsobí, že konektor přejde do stavu selhání a bude nutné jej restartovat v režimu počátečního snímku, aby bylo možné pokračovat ve čtení z binlogů a obnovit normální provoz.
    • O MongoDB. Dokumentace říká: Chování konektoru v případě, že byly smazány soubory log/oplog a konektor nemůže pokračovat ve čtení od místa, kde skončil, je stejné pro všechny DBMS. Spočívá v tom, že konektor půjde do stavu neúspěšný a bude vyžadovat restart v režimu počáteční snímek.

      Existují však výjimky. Pokud byl konektor dlouhou dobu v odpojeném stavu (nebo se nemohl dostat k instanci MongoDB) a oplog se během této doby otočil, po obnovení připojení bude konektor klidně pokračovat ve čtení dat z první dostupné pozice , proto některá data v Kafkovi ne zasáhne.

Závěr

Debezium je moje první zkušenost se systémy CDC a celkově byla velmi pozitivní. Projekt podplatil podporu hlavních DBMS, snadnost konfigurace, podporu clusteringu a aktivní komunitu. Zájemcům o praxi doporučuji přečíst si příručky pro Kafka Connect и Debezium.

Ve srovnání s konektorem JDBC pro Kafka Connect je hlavní výhodou Debezium to, že změny jsou načítány z protokolů DBMS, což umožňuje příjem dat s minimálním zpožděním. JDBC Connector (poskytovaný Kafka Connect) se dotazuje na sledovanou tabulku v pevném intervalu a (ze stejného důvodu) negeneruje zprávy, když jsou data smazána (jak můžete hledat data, která tam nejsou?).

Chcete-li vyřešit podobné problémy, můžete věnovat pozornost následujícím řešením (kromě Debezium):

PS

Přečtěte si také na našem blogu:

Zdroj: www.habr.com

Přidat komentář