Vi presenterar Debezium - CDC för Apache Kafka

Vi presenterar Debezium - CDC för Apache Kafka

I mitt arbete stöter jag ofta på nya tekniska lösningar/mjukvaruprodukter vars information är ganska knapphändig på det ryskspråkiga Internet. Med den här artikeln kommer jag att försöka fylla en sådan lucka med ett exempel från min senaste praxis, när jag behövde konfigurera att skicka CDC-händelser från två populära DBMS:er (PostgreSQL och MongoDB) till ett Kafka-kluster med Debezium. Jag hoppas att denna recensionsartikel, som dyker upp som ett resultat av det utförda arbetet, kommer att vara användbar för andra.

Vad är Debezium och CDC i allmänhet?

Debezium — representant för CDC-programvarukategorin (Fånga dataändring), eller mer exakt, det är en uppsättning kontakter för olika DBMS:er som är kompatibla med Apache Kafka Connect-ramverket.

Den Open Source-projekt, licensierad under Apache-licensen v2.0 och sponsrad av Red Hat. Utveckling har pågått sedan 2016 och för närvarande ger den officiellt stöd för följande DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Det finns också kontakter för Cassandra och Oracle, men för tillfället är de i "tidig tillgång"-status, och nya utgåvor garanterar inte bakåtkompatibilitet.

Om vi ​​jämför CDC med det traditionella tillvägagångssättet (när applikationen läser data från DBMS direkt), inkluderar dess främsta fördelar implementeringen av dataändringsströmning på radnivå med låg latens, hög tillförlitlighet och tillgänglighet. De två sista punkterna uppnås genom att använda ett Kafka-kluster som ett arkiv för CDC-händelser.

En annan fördel är det faktum att en enda modell används för att lagra händelser, så slutapplikationen behöver inte oroa sig för nyanserna av att använda olika DBMS.

Slutligen, genom att använda en meddelandemäklare kan applikationer som övervakar förändringar i data skala ut horisontellt. Samtidigt minimeras påverkan på datakällan, eftersom data inte hämtas direkt från DBMS, utan från Kafka-klustret.

Om Debezium-arkitekturen

Att använda Debezium kommer ner till detta enkla schema:

DBMS (som datakälla) → anslutning i Kafka Connect → Apache Kafka → konsument

Som en illustration är här ett diagram från projektets webbplats:

Vi presenterar Debezium - CDC för Apache Kafka

Men jag gillar inte riktigt det här schemat, eftersom det verkar som att endast användningen av en diskbänkskontakt är möjlig.

I verkligheten är situationen annorlunda: fylla din Data Lake (sista länken i diagrammet ovan) Detta är inte det enda sättet att använda Debezium. Händelser som skickas till Apache Kafka kan användas av dina applikationer för att hantera en mängd olika situationer. Till exempel:

  • ta bort irrelevant data från cachen;
  • skicka meddelanden;
  • sökindexuppdateringar;
  • någon form av revisionsloggar;
  • .

Om du har en Java-applikation och det inte finns något behov/möjlighet att använda ett Kafka-kluster finns det även möjlighet att arbeta igenom inbäddad kontakt. Den uppenbara fördelen är att det eliminerar behovet av ytterligare infrastruktur (i form av en kontakt och Kafka). Den här lösningen har dock blivit utfasad sedan version 1.1 och rekommenderas inte längre för användning (stöd för den kan tas bort i framtida utgåvor).

Den här artikeln kommer att diskutera arkitekturen som rekommenderas av utvecklare, som ger feltolerans och skalbarhet.

Anslutningskonfiguration

För att börja spåra förändringar i det viktigaste värdet - data - behöver vi:

  1. datakälla, som kan vara MySQL från och med version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (hela listan);
  2. Apache Kafka-kluster;
  3. Kafka Connect-instans (version 1.x, 2.x);
  4. konfigurerad Debezium-kontakt.

Arbeta med de två första punkterna, d.v.s. Installationsprocessen för DBMS och Apache Kafka ligger utanför artikelns omfattning. Men för de som vill distribuera allt i sandlådan har det officiella förrådet med exempel en färdig docker-compose.yaml.

Vi kommer att uppehålla oss mer i detalj vid de två sista punkterna.

0. Kafka Connect

Här och längre fram i artikeln diskuteras alla konfigurationsexempel i samband med Docker-bilden som distribueras av Debezium-utvecklarna. Den innehåller alla nödvändiga plugin-filer (anslutningar) och tillhandahåller konfiguration av Kafka Connect med hjälp av miljövariabler.

Om du tänker använda Kafka Connect från Confluent måste du självständigt lägga till plugins för de nödvändiga anslutningarna till katalogen som anges i plugin.path eller ställs in via en miljövariabel CLASSPATH. Inställningar för Kafka Connect-arbetaren och anslutningarna bestäms genom konfigurationsfiler som skickas som argument till arbetarstartkommandot. För mer information, se dokumentation.

Hela processen med att installera Debeizum i kopplingsversionen utförs i två steg. Låt oss titta på var och en av dem:

1. Konfigurera Kafka Connect-ramverket

För att strömma data till Apache Kafka-klustret ställs specifika parametrar in i Kafka Connect-ramverket, såsom:

  • parametrar för att ansluta till klustret,
  • namn på ämnen där konfigurationen av själva kontakten kommer att lagras direkt,
  • namnet på gruppen där anslutningen körs (om distribuerat läge används).

Den officiella Docker-bilden av projektet stöder konfigurationen med hjälp av miljövariabler - det här är vad vi kommer att använda. Så ladda ner bilden:

docker pull debezium/connect

Den minsta uppsättningen miljövariabler som krävs för att köra anslutningen är följande:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — Inledande lista över Kafka-klusterservrar för att få en komplett lista över klustermedlemmar;
  • OFFSET_STORAGE_TOPIC=connector-offsets — ett ämne för lagring av positioner där anslutningen för närvarande finns.
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — Ämne för lagring av anslutningens status och dess uppgifter;
  • CONFIG_STORAGE_TOPIC=connector-config — Ämne för lagring av anslutningskonfigurationsdata och dess uppgifter;
  • GROUP_ID=1 — Identifierare för den grupp av arbetare på vilken kopplingsuppgiften kan utföras. nödvändig vid användning av distribuerad (distribuerad) läge.

Vi startar behållaren med dessa variabler:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Anmärkning om Avro

Som standard skriver Debezium data i JSON-format, vilket är acceptabelt för sandlådor och små mängder data, men kan bli ett problem i mycket belastade databaser. Ett alternativ till en JSON-omvandlare är att serialisera meddelanden med hjälp av Avro till ett binärt format, vilket minskar belastningen på I/O-undersystemet i Apache Kafka.

För att använda Avro måste du distribuera en separat schema-registret (för att lagra diagram). Variablerna för omvandlaren kommer att se ut så här:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detaljer om hur du använder Avro och ställer in registret för det ligger utanför ramen för den här artikeln - längre fram, för tydlighetens skull, kommer vi att använda JSON.

2. Konfigurera själva kontakten

Nu kan du gå direkt till konfigurationen av själva kontakten, som kommer att läsa data från källan.

Låt oss titta på exemplet med anslutningar för två DBMS:er: PostgreSQL och MongoDB, som jag har erfarenhet av och där det finns skillnader (om än små, men i vissa fall betydande!).

Konfigurationen beskrivs i JSON-notation och laddas upp till Kafka Connect med en POST-begäran.

2.1. PostgreSQL

Exempel på anslutningskonfiguration för PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Funktionsprincipen för kontakten efter denna installation är ganska enkel:

  • När den startas för första gången ansluter den till databasen som anges i konfigurationen och startar i läge första ögonblicksbilden, skickar till Kafka den initiala uppsättningen av data som erhållits med den villkorliga SELECT * FROM table_name.
  • Efter att initieringen är klar går kontakten in i läge för att läsa ändringar från PostgreSQL WAL-filer.

Om alternativen som används:

  • name — Namnet på kontakten för vilken den konfiguration som beskrivs nedan används. i framtiden kommer detta namn att användas för att arbeta med anslutningen (dvs visa status/starta om/uppdatera konfigurationen) via Kafka Connect REST API;
  • connector.class — DBMS-anslutningsklass som kommer att användas av den konfigurerade anslutningen;
  • plugin.name — Namnet på plugin-programmet för logisk avkodning av data från WAL-filer. Finns att välja mellan wal2json, decoderbuffs и pgoutput. De två första kräver installation av lämpliga tillägg i DBMS, och pgoutput för PostgreSQL version 10 och högre kräver inga ytterligare manipulationer;
  • database.* — alternativ för att ansluta till databasen, var database.server.name — PostgreSQL-instansnamn som används för att bilda ämnesnamnet i Kafka-klustret;
  • table.include.list — en lista över tabeller där vi vill spåra ändringar; som anges i formatet schema.table_name; kan inte användas tillsammans med table.exclude.list;
  • heartbeat.interval.ms — intervall (i millisekunder) med vilket kontakten skickar hjärtslagsmeddelanden till ett speciellt ämne;
  • heartbeat.action.query — en begäran som kommer att utföras när varje hjärtslagsmeddelande skickas (alternativet dök upp i version 1.1);
  • slot.name — Namnet på den replikeringsplats som kommer att användas av anslutningen.
  • publication.name - Namn Publikation i PostgreSQL, som kontakten använder. Om det inte finns kommer Debezium att försöka skapa det. Om användaren under vilken anslutningen görs inte har tillräckligt med rättigheter för denna åtgärd, kommer anslutningen att avslutas med ett fel;
  • transforms bestämmer exakt hur namnet på målämnet ska ändras:
    • transforms.AddPrefix.type indikerar att vi kommer att använda reguljära uttryck;
    • transforms.AddPrefix.regex — en mask som omdefinierar namnet på målämnet;
    • transforms.AddPrefix.replacement – direkt vad vi omdefinierar.

Mer om hjärtslag och transformationer

Som standard skickar kontakten data till Kafka för varje genomförd transaktion, och dess LSN (Log Sequence Number) registreras i tjänsteämnet offset. Men vad händer om anslutningen är konfigurerad att inte läsa hela databasen, utan bara en del av dess tabeller (där datauppdateringar inte förekommer ofta)?

  • Anslutningen läser WAL-filer och kommer inte att upptäcka några transaktionsbekräftelser till tabellerna som den övervakar.
  • Därför kommer den inte att uppdatera sin nuvarande position vare sig i ämnet eller i replikeringsplatsen.
  • Detta kommer i sin tur att resultera i att WAL-filer hålls på disken och sannolikt tar slut på diskutrymme.

Och det är här alternativen kommer till undsättning. heartbeat.interval.ms и heartbeat.action.query. Att använda dessa alternativ i par gör det möjligt att utföra en begäran om att ändra data i en separat tabell varje gång ett hjärtslagsmeddelande skickas. Således uppdateras det LSN på vilket anslutningen för närvarande är placerad (i replikeringsplatsen) ständigt. Detta gör att DBMS kan ta bort WAL-filer som inte längre behövs. Du kan lära dig mer om hur alternativen fungerar dokumentation.

Ett annat alternativ värt att uppmärksammas är transforms. Även om det handlar mer om bekvämlighet och skönhet...

Som standard skapar Debezium ämnen med hjälp av följande namnpolicy: serverName.schemaName.tableName. Detta kanske inte alltid är bekvämt. alternativ transforms Du kan använda reguljära uttryck för att definiera en lista med tabeller, varifrån händelser måste dirigeras till ett ämne med ett specifikt namn.

Tack i vår konfiguration transforms följande händer: alla CDC-händelser från den övervakade databasen kommer att gå till ett ämne med namnet data.cdc.dbname. Annars (utan dessa inställningar) skulle Debezium som standard skapa ett ämne för varje tabell som: pg-dev.public.<table_name>.

Anslutningsbegränsningar

För att avsluta beskrivningen av anslutningskonfigurationen för PostgreSQL är det värt att prata om följande funktioner/begränsningar för dess funktion:

  1. Funktionaliteten hos kontakten för PostgreSQL bygger på konceptet logisk avkodning. Därför han spårar inte förfrågningar om att ändra databasstrukturen (DDL) - därför kommer denna information inte att finnas i ämnena.
  2. Eftersom replikeringsplatser används är det möjligt att ansluta en kontakt endast till den ledande DBMS-instansen.
  3. Om användaren under vilken anslutningen ansluter till databasen ges skrivskyddade rättigheter, måste du innan den första lanseringen manuellt skapa en replikeringsplats och publicera till databasen.

Använder konfigurationen

Så låt oss ladda vår konfiguration i kontakten:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Vi kontrollerar att nedladdningen lyckades och att kontakten startade:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Bra: den är inställd och redo att användas. Låt oss nu låtsas vara en konsument och ansluta till Kafka, varefter vi kommer att lägga till och ändra en post i tabellen:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

I vårt ämne kommer det att visas enligt följande:

Mycket lång JSON med våra ändringar

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

I båda fallen består posterna av nyckeln (PK) för posten som ändrades, och själva kärnan i ändringarna: vad posten var före och vad den blev efter.

  • I fallet med INSERT: värde före (before) är lika med null, och efter - raden som infogades.
  • I fallet med UPDATE: kl payload.before radens tidigare tillstånd visas, och in payload.after — ny med essensen av förändringar.

2.2 MongoDB

Den här anslutningen använder MongoDB-replikeringsmekanismen som standard och läser information från oploggen för den primära DBMS-noden.

I likhet med den redan beskrivna kontakten för PgSQL, även här, vid första starten, tas den primära dataögonblicksbilden, varefter kontakten växlar till oplog-läsläge.

Konfigurationsexempel:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Som du kan se finns det inga nya alternativ här jämfört med föregående exempel, utan bara antalet alternativ som ansvarar för att ansluta till databasen och deras prefix har minskat.

Inställningar transforms den här gången gör de följande: de transformerar namnet på målämnet från schemat <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

feltolerans

Frågan om feltolerans och hög tillgänglighet i vår tid är mer akut än någonsin – speciellt när vi pratar om data och transaktioner, och spårning av dataförändringar står inte åt sidan i denna fråga. Låt oss titta på vad som i princip kan gå fel och vad som kommer att hända med Debezium i varje enskilt fall.

Det finns tre alternativ för att välja bort:

  1. Kafka Connect-fel. Om Connect är konfigurerat för att fungera i distribuerat läge, kräver detta att flera arbetare ställer in samma group.id. Sedan, om en av dem misslyckas, kommer anslutningen att startas om på en annan arbetare och fortsätta att läsa från den senast engagerade positionen i ämnet i Kafka.
  2. Förlust av anslutning till Kafka-klustret. Kontakten kommer helt enkelt att sluta läsa vid den position som inte kunde skickas till Kafka, och kommer med jämna mellanrum att försöka skicka den igen tills försöket lyckas.
  3. Datakälla inte tillgänglig. Anslutningen kommer att försöka återansluta till källan som konfigurerad. Standard är 16 försök att använda exponentiell backoff. Efter det 16:e misslyckade försöket kommer uppgiften att markeras som misslyckades och du måste starta om den manuellt via Kafka Connect REST-gränssnittet.
    • I fallet med PostgreSQL data kommer inte att gå förlorade, eftersom Genom att använda replikeringsplatser kommer du att förhindra att du raderar WAL-filer som inte läses av kontakten. I det här fallet finns det också en nackdel med myntet: om nätverksanslutningen mellan kontakten och DBMS störs under en längre tid, finns det en möjlighet att diskutrymmet tar slut, och detta kan leda till ett fel på hela DBMS.
    • I fallet med MySQL binlog-filer kan roteras av DBMS själv innan anslutningen återställs. Detta kommer att göra att kontakten går in i det misslyckade tillståndet, och för att återställa normal drift måste du starta om i initialt ögonblicksbildläge för att fortsätta läsa från binloggar.
    • Про MongoDB. Dokumentationen anger: beteendet hos anslutaren i händelse av att logg-/oplogfiler har raderats och anslutaren inte kan fortsätta läsa från den position där den slutade är detsamma för alla DBMS. Det betyder att kontakten kommer att gå in i tillståndet misslyckades och kommer att kräva omstart i läge första ögonblicksbilden.

      Det finns dock undantag. Om kontakten var frånkopplad under en lång tid (eller inte kunde nå MongoDB-instansen) och oploggen roterades under denna tid, när anslutningen återställs, kommer kontakten lugnt att fortsätta att läsa data från den första tillgängliga positionen, vilket är anledningen till att en del av uppgifterna i Kafka ingen kommer att slå.

Slutsats

Debezium är min första erfarenhet av CDC-system och överlag mycket positiv. Projektet vann över med sitt stöd för stora DBMS, enkel konfiguration, klustringsstöd och aktiv gemenskap. För den som är intresserad av praktik rekommenderar jag att du läser guiderna för Kafka Connect и Debezium.

Jämfört med JDBC-kontakten för Kafka Connect är den största fördelen med Debezium att ändringar läses från DBMS-loggarna, vilket gör att data kan tas emot med minimal latens. JDBC Connector (från Kafka Connect) frågar den övervakade tabellen med ett fast intervall och (av samma anledning) genererar inga meddelanden när data raderas (hur kan du fråga data som inte finns?).

För att lösa liknande problem kan du vara uppmärksam på följande lösningar (utöver Debezium):

PS

Läs även på vår blogg:

Källa: will.com

Lägg en kommentar