Vi introducerer Debezium - CDC for Apache Kafka

Vi introducerer Debezium - CDC for Apache Kafka

I mit arbejde støder jeg ofte på nye tekniske løsninger/softwareprodukter, hvorom der er ret sparsom information på det russisktalende internet. Med denne artikel vil jeg forsøge at udfylde et sådant hul med et eksempel fra min seneste praksis, da jeg skulle konfigurere at sende CDC-begivenheder fra to populære DBMS'er (PostgreSQL og MongoDB) til en Kafka-klynge ved hjælp af Debezium. Jeg håber, at denne anmeldelsesartikel, som dukkede op som et resultat af det udførte arbejde, vil være nyttig for andre.

Hvad er Debezium og CDC generelt?

Debezium - Repræsentant for CDC-softwarekategorien (Optag dataændring), eller mere præcist, det er et sæt stik til forskellige DBMS'er, der er kompatible med Apache Kafka Connect-rammeværket.

Det open source-projekt, licenseret under Apache-licensen v2.0 og sponsoreret af Red Hat. Udviklingen har været i gang siden 2016 og i øjeblikket yder den officiel support til følgende DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Der er også stik til Cassandra og Oracle, men de er i øjeblikket i "early access"-status, og nye udgivelser garanterer ikke bagudkompatibilitet.

Hvis vi sammenligner CDC med den traditionelle tilgang (når applikationen læser data fra DBMS direkte), så inkluderer dens vigtigste fordele implementeringen af ​​dataændringsstreaming på rækkeniveau med lav latenstid, høj pålidelighed og tilgængelighed. De sidste to punkter opnås ved at bruge en Kafka-klynge som et lager for CDC-begivenheder.

Fordelene inkluderer også det faktum, at en enkelt model bruges til at gemme begivenheder, så den endelige applikation ikke behøver at bekymre sig om nuancerne ved at betjene forskellige DBMS.

Endelig åbner brug af en meddelelsesmægler mulighed for horisontal skalering af applikationer, der sporer ændringer i data. Samtidig minimeres påvirkningen af ​​datakilden, da data ikke modtages direkte fra DBMS, men fra Kafka-klyngen.

Om Debezium-arkitekturen

Brug af Debezium kommer ned til denne enkle ordning:

DBMS (som datakilde) → stik i Kafka Connect → Apache Kafka → forbruger

Som illustration vil jeg give et diagram fra projektets hjemmeside:

Vi introducerer Debezium - CDC for Apache Kafka

Jeg kan dog ikke rigtig lide denne ordning, fordi det ser ud til, at kun en vask-stik er mulig.

I virkeligheden er situationen anderledes: at udfylde din Data Lake (sidste link i diagrammet ovenfor) er ikke den eneste måde at bruge Debezium på. Begivenheder sendt til Apache Kafka kan bruges af dine applikationer til at håndtere forskellige situationer. For eksempel:

  • fjernelse af irrelevante data fra cachen;
  • afsendelse af meddelelser;
  • søgeindeksopdateringer;
  • en slags revisionslogs;
  • ...

Hvis du har en Java-applikation og der ikke er behov/mulighed for at bruge en Kafka-klynge, er der også mulighed for at arbejde igennem indbygget stik. Det åbenlyse plus er, at du med det kan nægte yderligere infrastruktur (i form af en forbindelse og Kafka). Denne løsning er dog blevet forældet siden version 1.1 og anbefales ikke længere til brug (den kan blive fjernet i fremtidige udgivelser).

Denne artikel vil diskutere arkitekturen anbefalet af udviklere, som giver fejltolerance og skalerbarhed.

Konnektorkonfiguration

For at begynde at spore ændringer i den vigtigste værdi - data - har vi brug for:

  1. datakilde, som kan være MySQL fra version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (komplet liste);
  2. Apache Kafka klynge
  3. Kafka Connect-instans (version 1.x, 2.x);
  4. konfigureret Debezium-stik.

Arbejd med de to første punkter, dvs. processen med at installere en DBMS og Apache Kafka er uden for artiklens rammer. Men for dem, der ønsker at implementere alt i en sandkasse, er der en færdiglavet en i det officielle lager med eksempler docker-compose.yaml.

Vi vil fokusere mere på de to sidste punkter.

0. Kafka Connect

Her og senere i artiklen betragtes alle konfigurationseksempler i sammenhæng med Docker-billedet distribueret af Debezium-udviklerne. Den indeholder alle de nødvendige plugin-filer (forbindelser) og giver Kafka Connect-konfiguration ved hjælp af miljøvariabler.

Hvis du har til hensigt at bruge Kafka Connect fra Confluent, skal du selv tilføje plugins til de nødvendige stik til den mappe, der er angivet i plugin.path eller indstilles via en miljøvariabel CLASSPATH. Indstillingerne for Kafka Connect-arbejderen og -forbindelserne er defineret gennem konfigurationsfiler, der sendes som argumenter til arbejderens startkommando. For detaljer se dokumentation.

Hele processen med at opsætte Debeizum i konnektorversionen udføres i to trin. Lad os overveje hver af dem:

1. Opsætning af Kafka Connect-rammeværket

For at streame data til en Apache Kafka-klynge indstilles specifikke parametre i Kafka Connect-rammeværket, såsom:

  • klyngeforbindelsesindstillinger,
  • navne på emner, hvor konfigurationen af ​​selve stikket vil blive gemt,
  • navnet på den gruppe, som stikket kører i (i tilfælde af brug af distribueret tilstand).

Det officielle Docker-billede af projektet understøtter konfiguration ved hjælp af miljøvariabler - det er det, vi vil bruge. Så lad os downloade billedet:

docker pull debezium/connect

Det mindste sæt miljøvariabler, der kræves for at køre forbindelsen, er som følger:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - indledende liste over Kafka-klyngeservere for at få en komplet liste over klyngemedlemmer;
  • OFFSET_STORAGE_TOPIC=connector-offsets — et emne til lagring af positioner, hvor stikket er placeret i øjeblikket;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - et emne til lagring af forbindelsens status og dets opgaver;
  • CONFIG_STORAGE_TOPIC=connector-config - et emne til lagring af konnektorkonfigurationsdata og dets opgaver;
  • GROUP_ID=1 — identifikator for den gruppe af arbejdere, på hvilken forbindelsesopgaven kan udføres; påkrævet ved brug af distribueret (fordelt) regime.

Vi starter beholderen med disse variable:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Bemærk om Avro

Som standard skriver Debezium data i JSON-format, hvilket er acceptabelt for sandkasser og små mængder data, men kan være et problem i tungt belastede databaser. Et alternativ til JSON-konverteren er at serialisere meddelelser vha Avro til et binært format, som reducerer belastningen på I/O-undersystemet i Apache Kafka.

For at bruge Avro skal du implementere en separat skema-registrering (til lagring af skemaer). Variablerne for konverteren ser således ud:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detaljer om brug af Avro og opsætning af et register til det ligger uden for artiklens omfang - yderligere, for klarhedens skyld, vil vi bruge JSON.

2. Opsætning af selve stikket

Nu kan du gå direkte til konfigurationen af ​​selve stikket, som vil læse data fra kilden.

Lad os se på eksemplet med stik til to DBMS: PostgreSQL og MongoDB, som jeg har erfaring med, og som der er forskelle på (omend små, men i nogle tilfælde betydelige!).

Konfigurationen er beskrevet i JSON-notation og uploadet til Kafka Connect ved hjælp af en POST-anmodning.

2.1. PostgreSQL

Eksempel på konnektorkonfiguration til PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Princippet for betjening af stikket efter denne konfiguration er ret simpelt:

  • Ved første start forbinder den til databasen angivet i konfigurationen og starter i tilstanden indledende øjebliksbillede, sender til Kafka det indledende sæt af data modtaget med den betingede SELECT * FROM table_name.
  • Efter initialiseringen er fuldført, går stikket i tilstanden til at læse ændringer fra PostgreSQL WAL-filer.

Om de anvendte muligheder:

  • name — navnet på det stik, som den nedenfor beskrevne konfiguration anvendes til; i fremtiden bruges dette navn til at arbejde med forbindelsen (dvs. se status / genstart / opdater konfigurationen) gennem Kafka Connect REST API;
  • connector.class — DBMS-forbindelsesklassen, der vil blive brugt af den konfigurerede forbindelse;
  • plugin.name er navnet på plugin'et til logisk afkodning af data fra WAL-filer. Fås at vælge imellem wal2json, decoderbuffs и pgoutput. De to første kræver installation af de relevante udvidelser i DBMS, og pgoutput for PostgreSQL version 10 og højere kræver ikke yderligere manipulationer;
  • database.* — muligheder for at oprette forbindelse til databasen, hvor database.server.name - navnet på den PostgreSQL-instans, der blev brugt til at danne navnet på emnet i Kafka-klyngen;
  • table.include.list - en liste over tabeller, hvor vi ønsker at spore ændringer; givet i formatet schema.table_name; kan ikke bruges sammen med table.exclude.list;
  • heartbeat.interval.ms — interval (i millisekunder), hvormed stikket sender hjerteslagsmeddelelser til et særligt emne;
  • heartbeat.action.query - en anmodning, der vil blive udført ved afsendelse af hver hjerteslagsmeddelelse (indstillingen er dukket op siden version 1.1);
  • slot.name — navnet på den replikeringsplads, der vil blive brugt af connectoren;
  • publication.name - Navn Offentliggørelse i PostgreSQL, som connectoren bruger. Hvis det ikke eksisterer, vil Debezium forsøge at oprette det. Hvis brugeren, under hvilken forbindelsen oprettes, ikke har nok rettigheder til denne handling, afsluttes forbindelsen med en fejl;
  • transforms bestemmer præcis, hvordan navnet på målemnet skal ændres:
    • transforms.AddPrefix.type angiver, at vi vil bruge regulære udtryk;
    • transforms.AddPrefix.regex — maske, hvorved navnet på målemnet omdefineres;
    • transforms.AddPrefix.replacement - direkte hvad vi omdefinerer.

Mere om hjerteslag og transformationer

Som standard sender forbindelsen data til Kafka for hver forpligtet transaktion og skriver sin LSN (Log Sequence Number) til serviceemnet offset. Men hvad sker der, hvis forbindelsen er konfigureret til ikke at læse hele databasen, men kun en del af dens tabeller (hvor data opdateres sjældent)?

  • Connectoren læser WAL-filer og registrerer ikke transaktionsbekræftelser i dem til de tabeller, den overvåger.
  • Derfor opdaterer den ikke sin nuværende position, hverken i emnet eller i replikeringspladsen.
  • Dette vil igen medføre, at WAL-filerne "sætter sig fast" på disken og vil sandsynligvis løbe tør for diskplads.

Og her kommer mulighederne til undsætning. heartbeat.interval.ms и heartbeat.action.query. Brug af disse muligheder i par gør det muligt at udføre en anmodning om at ændre data i en separat tabel, hver gang en hjerteslagsmeddelelse sendes. Således opdateres det LSN, som konnektoren i øjeblikket er placeret på (i replikationssloten), konstant. Dette gør det muligt for DBMS at fjerne WAL-filer, der ikke længere er nødvendige. For mere information om, hvordan muligheder fungerer, se dokumentation.

En anden mulighed, der fortjener nærmere opmærksomhed, er transforms. Selvom det handler mere om bekvemmelighed og skønhed ...

Som standard opretter Debezium emner ved hjælp af følgende navnepolitik: serverName.schemaName.tableName. Dette er måske ikke altid praktisk. Muligheder transforms ved hjælp af regulære udtryk kan du definere en liste over tabeller, hvis begivenheder skal omdirigeres til et emne med et bestemt navn.

I vores konfiguration takket være transforms følgende sker: alle CDC-hændelser fra den sporede database vil gå til emnet med navnet data.cdc.dbname. Ellers (uden disse indstillinger) ville Debezium som standard oprette et emne for hver tabel i formularen: pg-dev.public.<table_name>.

Forbindelsesbegrænsninger

I slutningen af ​​beskrivelsen af ​​stikkonfigurationen til PostgreSQL er det værd at tale om følgende funktioner / begrænsninger af dets arbejde:

  1. Connector-funktionaliteten til PostgreSQL er afhængig af konceptet logisk afkodning. Derfor han sporer ikke anmodninger om at ændre strukturen i databasen (DDL) - derfor vil disse data ikke være i emnerne.
  2. Da der bruges replikationsslots, er forbindelsen af ​​stikket mulig kun til master DBMS-instansen.
  3. Hvis brugeren, under hvilken forbindelsen forbinder til databasen, har skrivebeskyttede rettigheder, skal du inden den første lancering manuelt oprette en replikeringsplads og publicere til databasen.

Anvendelse af en konfiguration

Så lad os indlæse vores konfiguration i stikket:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Vi kontrollerer, at overførslen var vellykket, og at forbindelsen startede:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Fantastisk: det er sat op og klar til at gå. Lad os nu foregive at være en forbruger og oprette forbindelse til Kafka, hvorefter vi tilføjer og ændrer en post i tabellen:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

I vores emne vil dette blive vist som følger:

Meget lang JSON med vores ændringer

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

I begge tilfælde består posterne af nøglen (PK) af den post, der blev ændret, og selve essensen af ​​ændringerne: hvad posten var før, og hvad den blev til efter.

  • I tilfælde af INSERT: værdi før (before) lige med nullefterfulgt af strengen, der blev indsat.
  • I tilfælde af UPDATE: kl payload.before den forrige tilstand af rækken vises, og i payload.after - nyt med essensen af ​​forandring.

2.2 MongoDB

Denne konnektor bruger standard MongoDB-replikeringsmekanismen, der læser information fra oploggen for den primære DBMS-knude.

I lighed med det allerede beskrevne stik til PgSQL, tages også her ved første start det primære data-snapshot, hvorefter stikket skifter til oplog-læsningstilstand.

Konfigurationseksempel:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Som du kan se, er der ingen nye muligheder sammenlignet med det forrige eksempel, men kun antallet af muligheder, der er ansvarlige for at oprette forbindelse til databasen og deres præfikser, er blevet reduceret.

Indstillinger transforms denne gang gør de følgende: Drej navnet på målemnet fra skemaet <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fejltolerance

Spørgsmålet om fejltolerance og høj tilgængelighed i vores tid er mere akut end nogensinde - især når vi taler om data og transaktioner, og dataændringssporing er ikke på sidelinjen i denne sag. Lad os se på, hvad der principielt kan gå galt, og hvad der vil ske med Debezium i hvert enkelt tilfælde.

Der er tre fravalgsmuligheder:

  1. Kafka Connect-fejl. Hvis Connect er konfigureret til at fungere i distribueret tilstand, kræver dette, at flere arbejdere indstiller det samme group.id. Så, hvis en af ​​dem fejler, genstartes forbindelsen på den anden arbejder og fortsætter med at læse fra den sidste forpligtede position i emnet i Kafka.
  2. Tab af forbindelse med Kafka-klyngen. Konnektoren vil simpelthen stoppe med at læse på den position, den ikke kunne sende til Kafka og med jævne mellemrum forsøge at sende den igen, indtil forsøget lykkes.
  3. Datakilden er ikke tilgængelig. Tilslutningen vil forsøge at genoprette forbindelsen til kilden i henhold til konfigurationen. Standardindstillingen er 16 forsøg eksponentiel tilbageslag. Efter det 16. mislykkede forsøg vil opgaven blive markeret som mislykkedes og det skal genstartes manuelt via Kafka Connect REST-grænsefladen.
    • I tilfælde af PostgreSQL data vil ikke gå tabt, pga brug af replikeringspladser forhindrer sletning af WAL-filer, der ikke læses af stikket. I dette tilfælde er der en ulempe: Hvis netværksforbindelsen mellem stikket og DBMS er afbrudt i lang tid, er der en chance for, at diskpladsen løber tør, og det kan føre til fejl i hele DBMS.
    • I tilfælde af MySQL binlog-filer kan roteres af selve DBMS, før forbindelsen genoprettes. Dette vil få stikket til at gå i fejltilstand, og det bliver nødt til at genstarte i initial snapshot-tilstand for at fortsætte med at læse fra binlogs for at genoprette normal drift.
    • Про MongoDB. Dokumentationen siger: Connectorens adfærd i tilfælde af at log/oplog-filerne er blevet slettet, og connectoren ikke kan fortsætte med at læse fra den position, hvor den slap, er den samme for alle DBMS. Det ligger i, at stikket vil gå i tilstanden mislykkedes og vil kræve en genstart i tilstanden indledende øjebliksbillede.

      Der er dog undtagelser. Hvis stikket var i en afbrudt tilstand i lang tid (eller ikke kunne nå MongoDB-forekomsten), og oplog blev roteret i løbet af denne tid, så vil stikket roligt fortsætte med at læse data fra den første tilgængelige position, når forbindelsen er genoprettet , hvorfor nogle af dataene i Kafka nej vil ramme.

Konklusion

Debezium er min første erfaring med CDC-systemer og har generelt været meget positiv. Projektet bestikkede støtten fra det primære DBMS, nem konfiguration, støtte til klyngedannelse og et aktivt fællesskab. Til praksisinteresserede anbefaler jeg, at du læser vejledningerne vedr Kafka Connect и Debezium.

Sammenlignet med JDBC-stikket til Kafka Connect er den største fordel ved Debezium, at ændringer læses fra DBMS-logfilerne, hvilket gør det muligt at modtage data med minimal forsinkelse. JDBC Connector (leveret af Kafka Connect) forespørger den sporede tabel med et fast interval og genererer (af samme årsag) ikke beskeder, når data slettes (hvordan kan du forespørge efter data, der ikke er der?).

For at løse lignende problemer kan du være opmærksom på følgende løsninger (ud over Debezium):

PS

Læs også på vores blog:

Kilde: www.habr.com

Tilføj en kommentar