Bemutatkozik a Debezium – CDC Apache Kafka számára

Bemutatkozik a Debezium – CDC Apache Kafka számára

Munkám során gyakran találkozom új műszaki megoldásokkal/szoftvertermékekkel, amelyekről az orosz nyelvű interneten meglehetősen kevés információ található. Ezzel a cikkel egy ilyen hiányosságot próbálok kitölteni egy példával a közelmúltbeli gyakorlatomból, amikor két népszerű DBMS-ből (PostgreSQL és MongoDB) kellett CDC-események küldését konfigurálnom egy Kafka-fürtbe Debezium segítségével. Remélem, hogy ez az áttekintő cikk, amely az elvégzett munka eredményeként jelenik meg, mások számára is hasznos lesz.

Mi az a Debezium és általában a CDC?

Debezium — a CDC szoftverkategória képviselője (Adatmódosítás rögzítése), pontosabban az Apache Kafka Connect keretrendszerrel kompatibilis különféle DBMS-ekhez való csatlakozókészlet.

Ezt Nyílt forráskódú projekt, az Apache License v2.0 licenc alatt, és a Red Hat támogatásával. A fejlesztés 2016 óta folyik, és jelenleg a következő DBMS-ekhez nyújt hivatalos támogatást: MySQL, PostgreSQL, MongoDB, SQL Server. A Cassandra és az Oracle számára is vannak csatlakozók, de ezek jelenleg „korai hozzáférés” státuszban vannak, és az új kiadások nem garantálják a visszamenőleges kompatibilitást.

Ha összehasonlítjuk a CDC-t a hagyományos megközelítéssel (amikor az alkalmazás közvetlenül a DBMS-ből olvas ki adatokat), akkor fő előnyei közé tartozik az adatváltozási streaming megvalósítása sorszinten alacsony késleltetéssel, nagy megbízhatósággal és rendelkezésre állással. Az utolsó két pontot úgy érjük el, hogy egy Kafka-fürtöt használunk a CDC események tárházaként.

További előnye, hogy egyetlen modellt használnak az események tárolására, így a végalkalmazásnak nem kell aggódnia a különböző DBMS-ek működésének árnyalatai miatt.

Végül az üzenetközvetítő használatával az adatok változásait figyelő alkalmazások vízszintesen méretezhetők. Ugyanakkor az adatforrásra gyakorolt ​​hatás minimális, mivel az adatokat nem közvetlenül a DBMS-ből, hanem a Kafka-fürtből szerzik be.

A Debezium építészetről

A Debezium használata ehhez az egyszerű sémához vezet:

DBMS (adatforrásként) → csatlakozó a Kafka Connectben → Apache Kafka → fogyasztó

Illusztrációként itt van egy diagram a projekt weboldaláról:

Bemutatkozik a Debezium – CDC Apache Kafka számára

Ez a séma azonban nem igazán tetszik, mert úgy tűnik, hogy csak a mosogató csatlakozója lehetséges.

A valóságban a helyzet más: kitölti a Data Lake-et (utolsó link a fenti ábrán) Nem ez az egyetlen módja a Debezium használatának. Az Apache Kafkának küldött eseményeket az alkalmazásai különféle helyzetek kezelésére használhatják. Például:

  • irreleváns adatok eltávolítása a gyorsítótárból;
  • értesítések küldése;
  • keresési index frissítések;
  • valamilyen ellenőrzési napló;
  • ...

Abban az esetben, ha van Java-alkalmazása, és nincs szükség/lehetőség Kafka-fürt használatára, lehetőség van arra is, hogy beágyazott csatlakozó. Nyilvánvaló előnye, hogy nincs szükség további infrastruktúrára (csatlakozó és Kafka formájában). Ez a megoldás azonban az 1.1-es verzió óta elavult, és már nem javasolt a használata (a jövőbeni kiadásokban előfordulhat, hogy a támogatása megszűnik).

Ez a cikk a fejlesztők által javasolt architektúrát tárgyalja, amely hibatűrést és méretezhetőséget biztosít.

Csatlakozó konfigurációja

A legfontosabb érték - adatok - változásainak nyomon követéséhez szükségünk van:

  1. adatforrás, amely lehet MySQL 5.7-től kezdve, PostgreSQL 9.6+, MongoDB 3.2+ (teljes lista);
  2. Apache Kafka klaszter;
  3. Kafka Connect példány (1.x, 2.x verziók);
  4. konfigurált Debezium csatlakozó.

Az első két ponton dolgozz, pl. A DBMS és az Apache Kafka telepítési folyamata túlmutat a cikk keretein. Aki azonban mindent a homokozóban szeretne telepíteni, annak a példákkal ellátott hivatalos adattárban van egy kész docker-compose.yaml.

Az utolsó két ponttal még részletesebben foglalkozunk.

0. Kafka Connect

Itt és a cikk további részében az összes konfigurációs példát a Debezium fejlesztői által terjesztett Docker-kép kontextusában tárgyaljuk. Tartalmazza az összes szükséges beépülő fájlt (csatlakozót), és biztosítja a Kafka Connect konfigurációját környezeti változók használatával.

Ha a Kafka Connectet a Confluentből kívánja használni, akkor önállóan hozzá kell adnia a szükséges csatlakozók beépülő moduljait a pontban megadott könyvtárhoz. plugin.path vagy környezeti változón keresztül állítsa be CLASSPATH. A Kafka Connect-munkavégző és az összekötők beállításait konfigurációs fájlok határozzák meg, amelyek argumentumként kerülnek átadásra a dolgozó indító parancsának. További részletekért lásd dokumentáció.

A Debeizum beállításának teljes folyamata a csatlakozó változatban két lépésben történik. Nézzük mindegyiket:

1. A Kafka Connect keretrendszer beállítása

Az Apache Kafka-fürtbe történő adatfolyamhoz a Kafka Connect keretrendszerben meghatározott paraméterek vannak beállítva, például:

  • a klaszterhez való csatlakozás paraméterei,
  • azoknak a témáknak a neve, amelyekben magának az összekötőnek a konfigurációja közvetlenül lesz tárolva,
  • annak a csoportnak a neve, amelyben az összekötő fut (ha elosztott módot használnak).

A projekt hivatalos Docker-képe támogatja a környezeti változók használatával történő konfigurációt – ezt fogjuk használni. Szóval, töltsd le a képet:

docker pull debezium/connect

Az összekötő futtatásához szükséges környezeti változók minimális készlete a következő:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — a Kafka-fürtkiszolgálók kezdeti listája a fürttagok teljes listájának beszerzéséhez;
  • OFFSET_STORAGE_TOPIC=connector-offsets — egy témakör azon pozíciók tárolására, ahol a csatlakozó jelenleg található;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — téma a csatlakozó állapotának és feladatainak tárolására;
  • CONFIG_STORAGE_TOPIC=connector-config — a csatlakozó konfigurációs adatok tárolásának témaköre és feladatai;
  • GROUP_ID=1 — a dolgozók azon csoportjának azonosítója, amelyen az összekötő feladat végrehajtható; elosztott használat esetén szükséges (megosztott) rezsim.

A következő változókkal indítjuk el a tárolót:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Megjegyzés az Avro-ról

Alapértelmezés szerint a Debezium JSON formátumban írja az adatokat, ami elfogadható homokozókhoz és kis mennyiségű adathoz, de problémát jelenthet a nagy terhelésű adatbázisokban. A JSON-konverter alternatívája az üzenetek sorozatosítása Avro bináris formátumba, ami csökkenti az Apache Kafka I/O alrendszerének terhelését.

Az Avro használatához külön telepíteni kell séma-nyilvántartás (diagramok tárolására). A konverter változói így fognak kinézni:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Az Avro használatának és a rendszerleíró adatbázis beállításának részletei ebben a cikkben nem terjednek ki – a továbbiakban az egyértelműség kedvéért JSON-t fogunk használni.

2. Magának a csatlakozónak a konfigurálása

Most közvetlenül az összekötő konfigurációjához léphet, amely a forrásból olvassa be az adatokat.

Nézzük meg a példát két DBMS: PostgreSQL és MongoDB csatlakozóira, amelyekben van tapasztalatom, és amelyekben vannak eltérések (bár kicsik, de bizonyos esetekben jelentősek!).

A konfiguráció leírása JSON-jelöléssel történik, és egy POST-kérés segítségével feltölthető a Kafka Connectbe.

2.1. PostgreSQL

Példa csatlakozókonfiguráció a PostgreSQL-hez:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

A csatlakozó működési elve a beállítás után meglehetősen egyszerű:

  • Az első indításkor csatlakozik a konfigurációban megadott adatbázishoz, és módban indul kezdeti pillanatfelvétel, elküldi Kafkának a feltételes feltétel használatával kapott kezdeti adathalmazt SELECT * FROM table_name.
  • Az inicializálás befejezése után az összekötő módba lép, hogy beolvassa a PostgreSQL WAL-fájlok változásait.

A használt opciókról:

  • name — annak a csatlakozónak a neve, amelyhez az alábbiakban ismertetett konfigurációt használják; a jövőben ezt a nevet használjuk a csatlakozási szoftverrel való együttműködésre (azaz állapot megtekintésére/újraindításra/konfiguráció frissítésére) a Kafka Connect REST API-n keresztül;
  • connector.class — A konfigurált összekötő által használt DBMS-összekötő osztály;
  • plugin.name — a WAL-fájlokból származó adatok logikai dekódolásához szükséges beépülő modul neve. Választható wal2json, decoderbuffs и pgoutput. Az első kettő megköveteli a megfelelő bővítmények telepítését a DBMS-ben, és pgoutput a PostgreSQL 10-es és újabb verzióihoz nincs szükség további manipulációkra;
  • database.* — az adatbázishoz való csatlakozás lehetőségei, hol database.server.name — PostgreSQL-példánynév, amely a téma nevének kialakítására szolgál a Kafka-fürtben;
  • table.include.list — azoknak a tábláknak a listája, amelyekben nyomon szeretnénk követni a változásokat; formátumban meghatározott schema.table_name; nem használható együtt table.exclude.list;
  • heartbeat.interval.ms — intervallum (ezredmásodpercben), amellyel a csatlakozó szívverési üzeneteket küld egy speciális témának;
  • heartbeat.action.query — egy kérés, amely minden szívverés üzenet elküldésekor végrehajtásra kerül (az opció az 1.1-es verzióban jelent meg);
  • slot.name — az összekötő által használt replikációs slot neve;
  • publication.name - Név Publikáció PostgreSQL-ben, amelyet az összekötő használ. Ha nem létezik, a Debezium megpróbálja létrehozni. Ha a felhasználó, aki alatt a kapcsolat létrejön, nem rendelkezik elegendő jogosultsággal ehhez a művelethez, az összekötő hibával leáll;
  • transforms pontosan meghatározza, hogyan kell megváltoztatni a cél téma nevét:
    • transforms.AddPrefix.type azt jelzi, hogy reguláris kifejezéseket fogunk használni;
    • transforms.AddPrefix.regex — egy maszk, amely újradefiniálja a cél téma nevét;
    • transforms.AddPrefix.replacement - közvetlenül, amit újradefiniálunk.

Bővebben a szívverésről és az átalakulásokról

Alapértelmezés szerint az összekötő minden végrehajtott tranzakcióhoz adatokat küld a Kafkának, és annak LSN-je (naplósorozatszáma) rögzítésre kerül a szolgáltatás témakörben. offset. De mi történik, ha az összekötő úgy van beállítva, hogy ne a teljes adatbázist olvassa, hanem csak a tábláinak egy részét (amelyekben az adatfrissítések nem gyakran fordulnak elő)?

  • Az összekötő olvasni fogja a WAL fájlokat, és nem észlel semmilyen tranzakció véglegesítést az általa figyelt táblákban.
  • Ezért nem frissíti jelenlegi pozícióját sem a témakörben, sem a replikációs résben.
  • Ez viszont azt eredményezi, hogy a WAL-fájlok a lemezen maradnak, és valószínűleg elfogy a lemezterület.

És itt a lehetőségek segítenek. heartbeat.interval.ms и heartbeat.action.query. Ezen opciók páros használata lehetővé teszi, hogy egy külön táblázatban lévő adatok módosítására vonatkozó kérést hajtsanak végre minden szívverési üzenet elküldésekor. Így az LSN, amelyen a csatlakozó jelenleg található (a replikációs slotban), folyamatosan frissül. Ez lehetővé teszi a DBMS számára, hogy eltávolítsa a már nem szükséges WAL-fájlokat. Többet megtudhat az opciók működéséről dokumentáció.

Egy másik lehetőség, amelyre érdemes odafigyelni transforms. Bár ez inkább a kényelemről és a szépségről szól...

Alapértelmezés szerint a Debezium témaköröket a következő elnevezési szabályzat használatával hoz létre: serverName.schemaName.tableName. Ez nem mindig kényelmes. Lehetőségek transforms A reguláris kifejezések segítségével megadhatja azoknak a tábláknak a listáját, amelyekből az eseményeket egy adott nevű témakörhöz kell irányítani.

Konfigurációnkban köszönjük transforms a következő történik: a megfigyelt adatbázisból származó összes CDC esemény a névvel rendelkező témába kerül data.cdc.dbname. Ellenkező esetben (e beállítások nélkül) a Debezium alapértelmezés szerint minden táblához létrehoz egy témát, például: pg-dev.public.<table_name>.

Csatlakozók korlátozásai

A PostgreSQL csatlakozási konfigurációjának leírásának befejezéseként érdemes beszélni a következő jellemzőiről/működési korlátairól:

  1. A PostgreSQL csatlakozójának funkcionalitása a logikai dekódolás koncepcióján alapul. Ezért ő nem követi az adatbázis-struktúra megváltoztatására irányuló kéréseket (DDL) - ennek megfelelően ezek az adatok nem lesznek a topikban.
  2. Mivel replikációs bővítőhelyeket használnak, lehetséges a csatlakozó csatlakoztatása csak a vezető DBMS-példányhoz.
  3. Ha a felhasználó, aki alatt az összekötő csatlakozik az adatbázishoz, csak olvasási jogokkal rendelkezik, akkor az első indítás előtt manuálisan létre kell hoznia egy replikációs helyet, és közzé kell tennie az adatbázisban.

A konfiguráció alkalmazása

Tehát töltsük be a konfigurációnkat a csatlakozóba:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Ellenőrizzük, hogy a letöltés sikeres volt-e, és a csatlakozó elindult:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Remek: be van állítva és használatra kész. Most tegyünk úgy, mintha fogyasztók lennénk, és csatlakozzunk Kafkához, majd hozzáadunk és módosítunk egy bejegyzést a táblázatban:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Témánkban a következőképpen fog megjelenni:

Nagyon hosszú JSON a változtatásainkkal

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

A rekordok mindkét esetben a módosított rekord kulcsából (PK) és a változtatások lényegéből állnak: mi volt a rekord előtt és mi lett azután.

  • Abban az esetben, ha INSERT: érték előtt (before) egyenlő null, és utána - a beszúrt sor.
  • Abban az esetben, ha UPDATE: nál nél payload.before a sor előző állapota jelenik meg, és in payload.after — új a változtatások lényegével.

2.2 MongoDB

Ez az összekötő a szabványos MongoDB replikációs mechanizmust használja, és információkat olvas be az elsődleges DBMS-csomópont műveletnaplójából.

A PgSQL-hez már leírt csatlakozóhoz hasonlóan itt is az első indításkor készül az elsődleges adatok pillanatképe, amely után a csatlakozó oplog olvasási módba kapcsol.

Konfigurációs példa:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Mint látható, az előző példához képest itt nincsenek új lehetőségek, csak az adatbázishoz való kapcsolódásért felelős opciók és előtagjaik száma csökkent.

beállítások transforms ezúttal a következőket teszik: átalakítják a céltéma nevét a sémából <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

hibatűrés

A hibatűrés és a magas rendelkezésre állás kérdése korunkban minden eddiginél akutabb – különösen, ha adatokról és tranzakciókról beszélünk, és az adatváltozások nyomon követése sem marad el ebben a kérdésben. Nézzük meg, hogy mi ronthat el elvileg és mi lesz a Debeziummal minden esetben.

Három leiratkozási lehetőség van:

  1. Kafka Connect hiba. Ha a Connect úgy van beállítva, hogy elosztott módban működjön, akkor több dolgozónak kell beállítania ugyanazt a group.id-t. Ezután, ha az egyik meghibásodik, a csatlakozó újraindul egy másik dolgozón, és folytatja az olvasást a témában a Kafka témában lévő utolsó lefoglalt pozíciótól.
  2. A Kafka-klaszterrel való kapcsolat elvesztése. A csatlakozó egyszerűen leállítja az olvasást azon a helyen, amelyet nem sikerült elküldeni Kafkának, és rendszeresen megpróbálja újraküldeni, amíg a próbálkozás sikertelen lesz.
  3. Az adatforrás nem elérhető. A csatlakozó megpróbál újra csatlakozni a beállított forráshoz. Az alapértelmezett 16 próbálkozás exponenciális visszalépés. A 16. sikertelen próbálkozás után a feladat jelölésre kerül sikertelen és manuálisan újra kell indítania a Kafka Connect REST interfészen keresztül.
    • Abban az esetben, ha PostgreSQL az adatok nem vesznek el, mert A replikációs helyek használatával megakadályozza, hogy törölje a WAL-fájlokat, amelyeket az összekötő nem olvas. Ebben az esetben az éremnek van egy árnyoldala is: ha a csatlakozó és a DBMS közötti hálózati kapcsolat hosszabb ideig megszakad, fennáll annak a lehetősége, hogy a lemezterület elfogy, és ez a a teljes DBMS.
    • Abban az esetben, ha MySQL A binlog fájlokat maga a DBMS is elforgathatja a kapcsolat visszaállítása előtt. Ez azt eredményezi, hogy a csatlakozó meghibásodott állapotba kerül, és a normál működés visszaállításához újra kell indítania a kezdeti pillanatfelvétel módban a binlogok olvasásának folytatásához.
    • Про MongoDB. A dokumentáció kimondja: az összekötő viselkedése abban az esetben, ha a napló-/oplog-fájlok törlésre kerültek, és az összekötő nem tudja folytatni az olvasást onnan, ahol abbamaradt, ugyanaz minden DBMS-nél. Ez azt jelenti, hogy a csatlakozó állapotba kerül sikertelen és módban újra kell indítani kezdeti pillanatfelvétel.

      Vannak azonban kivételek. Ha a csatlakozó hosszabb ideig le volt bontva (vagy nem tudta elérni a MongoDB példányt), és az oplog ez idő alatt elfordult, akkor a kapcsolat helyreállítása után a csatlakozó nyugodtan folytatja az adatok olvasását az első elérhető pozícióból, ezért a kafkai adatok egy része nincs ütni fog.

Következtetés

A Debezium az első tapasztalatom a CDC rendszerekkel, és összességében nagyon pozitív. A projekt megnyerte a főbb DBMS-ek támogatását, az egyszerű konfigurálást, a klaszterezés támogatását és az aktív közösséget. A gyakorlat iránt érdeklődőknek javaslom, hogy olvassák el az útmutatókat Kafka Connect и Debezium.

A Kafka Connect JDBC-csatlakozójához képest a Debezium fő előnye, hogy a változtatásokat a DBMS-naplókból olvassa ki, ami lehetővé teszi az adatok minimális késleltetéssel történő fogadását. A JDBC Connector (a Kafka Connecttől) meghatározott időközönként lekérdezi a megfigyelt táblát, és (ugyanezért) nem generál üzenetet az adatok törlésekor (hogyan lehet lekérdezni olyan adatokat, amelyek nem léteznek?).

Hasonló problémák megoldásához a következő megoldásokra figyelhet (a Debezium mellett):

PS

Olvassa el blogunkon is:

Forrás: will.com

Hozzászólás