Vi introduserer Debezium - CDC for Apache Kafka

Vi introduserer Debezium - CDC for Apache Kafka

I mitt arbeid kommer jeg ofte over nye tekniske løsninger/programvareprodukter som det er lite informasjon om på det russisktalende Internett. Med denne artikkelen vil jeg prøve å fylle et slikt gap med et eksempel fra min nylige praksis, da jeg trengte å sette opp sending av CDC-hendelser fra to populære DBMS-er (PostgreSQL og MongoDB) til en Kafka-klynge ved hjelp av Debezium. Jeg håper at denne oversiktsartikkelen, som dukket opp som et resultat av arbeidet som er gjort, vil være nyttig for andre.

Hva er Debezium og CDC generelt?

Debezium - Representant for CDC-programvarekategorien (Endring av datafangst), eller mer presist, det er et sett med koblinger for forskjellige DBMS-er som er kompatible med Apache Kafka Connect-rammeverket.

Den åpen kildekode-prosjekt, lisensiert under Apache-lisensen v2.0 og sponset av Red Hat. Utviklingen har vært i gang siden 2016 og for øyeblikket gir den offisiell støtte for følgende DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Det finnes også kontakter for Cassandra og Oracle, men de er for øyeblikket i "early access"-status, og nye utgivelser garanterer ikke bakoverkompatibilitet.

Hvis vi sammenligner CDC med den tradisjonelle tilnærmingen (når applikasjonen leser data fra DBMS direkte), inkluderer hovedfordelene implementeringen av dataendringsstrømming på radnivå med lav latenstid, høy pålitelighet og tilgjengelighet. De to siste punktene oppnås ved å bruke en Kafka-klynge som et depot for CDC-hendelser.

Fordelene inkluderer også det faktum at en enkelt modell brukes til å lagre hendelser, slik at den endelige applikasjonen ikke trenger å bekymre deg for nyansene ved å betjene forskjellige DBMS.

Til slutt, bruk av en meldingsmegler åpner for horisontal skalering av applikasjoner som sporer endringer i data. Samtidig minimeres påvirkningen på datakilden, siden data ikke mottas direkte fra DBMS, men fra Kafka-klyngen.

Om Debezium-arkitekturen

Å bruke Debezium kommer ned til dette enkle opplegget:

DBMS (som datakilde) → kobling i Kafka Connect → Apache Kafka → forbruker

Som en illustrasjon vil jeg gi et diagram fra prosjektets nettside:

Vi introduserer Debezium - CDC for Apache Kafka

Imidlertid liker jeg ikke dette opplegget, fordi det ser ut til at bare en vaskkontakt er mulig.

I virkeligheten er situasjonen annerledes: fylle Data Lake (siste lenke i diagrammet ovenfor) er ikke den eneste måten å bruke Debezium på. Hendelser sendt til Apache Kafka kan brukes av applikasjonene dine til å håndtere ulike situasjoner. For eksempel:

  • fjerning av irrelevante data fra cachen;
  • sende varsler;
  • søkeindeksoppdateringer;
  • en slags revisjonslogger;
  • ...

I tilfelle du har en Java-applikasjon og det ikke er behov/mulighet for å bruke en Kafka-klynge, er det også mulighet for å jobbe gjennom innebygd kontakt. Det åpenbare pluss er at med det kan du nekte ekstra infrastruktur (i form av en kobling og Kafka). Denne løsningen har imidlertid blitt avviklet siden versjon 1.1 og anbefales ikke lenger for bruk (den kan bli fjernet i fremtidige utgivelser).

Denne artikkelen vil diskutere arkitekturen anbefalt av utviklere, som gir feiltoleranse og skalerbarhet.

Koblingskonfigurasjon

For å begynne å spore endringer i den viktigste verdien – data – trenger vi:

  1. datakilde, som kan være MySQL fra og med versjon 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (hele listen);
  2. Apache Kafka-klynge
  3. Kafka Connect-forekomst (versjoner 1.x, 2.x);
  4. konfigurert Debezium-kontakt.

Arbeid med de to første punktene, dvs. prosessen med å installere en DBMS og Apache Kafka er utenfor rammen av artikkelen. Men for de som ønsker å distribuere alt i en sandkasse, er det en ferdig i det offisielle depotet med eksempler docker-compose.yaml.

Vi vil fokusere mer på de to siste punktene.

0. Kafka Connect

Her og senere i artikkelen vurderes alle konfigurasjonseksempler i sammenheng med Docker-bildet distribuert av Debezium-utviklerne. Den inneholder alle nødvendige plugin-filer (koblinger) og gir Kafka Connect-konfigurasjon ved hjelp av miljøvariabler.

Hvis du har tenkt å bruke Kafka Connect fra Confluent, må du selv legge til pluginene til de nødvendige koblingene i katalogen som er spesifisert i plugin.path eller settes via en miljøvariabel CLASSPATH. Innstillingene for Kafka Connect-arbeideren og koblingene er definert gjennom konfigurasjonsfiler som sendes som argumenter til arbeiderens startkommando. For detaljer se dokumentasjon.

Hele prosessen med å sette opp Debeizum i koblingsversjonen utføres i to trinn. La oss vurdere hver av dem:

1. Sette opp Kafka Connect-rammeverket

For å strømme data til en Apache Kafka-klynge, angis spesifikke parametere i Kafka Connect-rammeverket, for eksempel:

  • klyngetilkoblingsinnstillinger,
  • navn på emner der konfigurasjonen av selve kontakten vil bli lagret,
  • navnet på gruppen der koblingen kjører (i tilfelle distribuert modus).

Det offisielle Docker-bildet av prosjektet støtter konfigurasjon ved hjelp av miljøvariabler - det er dette vi skal bruke. Så la oss laste ned bildet:

docker pull debezium/connect

Minimumssettet med miljøvariabler som kreves for å kjøre koblingen er som følger:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - innledende liste over Kafka-klyngeservere for å få en komplett liste over klyngemedlemmer;
  • OFFSET_STORAGE_TOPIC=connector-offsets — et emne for lagring av posisjoner der kontakten er plassert;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - et emne for lagring av statusen til koblingen og dens oppgaver;
  • CONFIG_STORAGE_TOPIC=connector-config - et emne for lagring av koblingskonfigurasjonsdata og dets oppgaver;
  • GROUP_ID=1 — identifikator for gruppen av arbeidere som koblingsoppgaven kan utføres på; nødvendig ved bruk av distribuert (distribuert) regime.

Vi starter beholderen med disse variablene:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Merknad om Avro

Som standard skriver Debezium data i JSON-format, som er akseptabelt for sandkasser og små mengder data, men kan være et problem i tungt belastede databaser. Et alternativ til JSON-konverteren er å serialisere meldinger ved hjelp av Avro til et binært format, som reduserer belastningen på I/O-delsystemet i Apache Kafka.

For å bruke Avro, må du distribuere en separat skjema-register (for lagring av skjemaer). Variablene for omformeren vil se slik ut:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detaljer om bruk av Avro og konfigurering av et register for det er utenfor rammen av artikkelen - videre, for klarhetens skyld, vil vi bruke JSON.

2. Sette opp selve kontakten

Nå kan du gå direkte til konfigurasjonen av selve kontakten, som vil lese data fra kilden.

La oss se på eksemplet med koblinger for to DBMS: PostgreSQL og MongoDB, som jeg har erfaring med og som det er forskjeller på (riktignok små, men i noen tilfeller betydelige!).

Konfigurasjonen er beskrevet i JSON-notasjon og lastet opp til Kafka Connect ved hjelp av en POST-forespørsel.

2.1. PostgreSQL

Eksempel på koblingskonfigurasjon for PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prinsippet for drift av kontakten etter denne konfigurasjonen er ganske enkelt:

  • Ved første start kobles den til databasen spesifisert i konfigurasjonen og starter i modusen første øyeblikksbilde, sender til Kafka det første settet med data mottatt med den betingede SELECT * FROM table_name.
  • Etter at initialiseringen er fullført, går kontakten inn i modusen for lesing av endringer fra PostgreSQL WAL-filer.

Om alternativene som brukes:

  • name — navnet på kontakten som konfigurasjonen beskrevet nedenfor brukes for; i fremtiden brukes dette navnet til å jobbe med koblingen (dvs. se status / restart / oppdater konfigurasjonen) gjennom Kafka Connect REST API;
  • connector.class — DBMS-koblingsklassen som skal brukes av den konfigurerte koblingen;
  • plugin.name er navnet på plugin for logisk dekoding av data fra WAL-filer. Tilgjengelig å velge mellom wal2json, decoderbuffs и pgoutput. De to første krever installasjon av de riktige utvidelsene i DBMS, og pgoutput for PostgreSQL versjon 10 og høyere krever ikke ytterligere manipulasjoner;
  • database.* — alternativer for å koble til databasen, hvor database.server.name - navnet på PostgreSQL-forekomsten som ble brukt til å danne navnet på emnet i Kafka-klyngen;
  • table.include.list - en liste over tabeller der vi ønsker å spore endringer; gitt i formatet schema.table_name; kan ikke brukes sammen med table.exclude.list;
  • heartbeat.interval.ms — intervall (i millisekunder) som kontakten sender hjerteslagmeldinger med til et spesielt emne;
  • heartbeat.action.query - en forespørsel som vil bli utført når du sender hver hjerteslagmelding (alternativet har dukket opp siden versjon 1.1);
  • slot.name — navnet på replikeringssporet som skal brukes av koblingen;
  • publication.name - Navn publikasjon i PostgreSQL som kontakten bruker. I tilfelle den ikke eksisterer, vil Debezium prøve å lage den. Hvis brukeren som tilkoblingen gjøres under ikke har nok rettigheter for denne handlingen, vil koblingen avsluttes med en feil;
  • transforms bestemmer nøyaktig hvordan navnet på målemnet skal endres:
    • transforms.AddPrefix.type indikerer at vi vil bruke regulære uttrykk;
    • transforms.AddPrefix.regex — maske som navnet på måltemaet omdefineres med;
    • transforms.AddPrefix.replacement - direkte hva vi omdefinerer.

Mer om hjerteslag og transformasjoner

Som standard sender koblingen data til Kafka for hver forpliktet transaksjon, og skriver LSN (Log Sequence Number) til tjenesteemnet offset. Men hva skjer hvis koblingen er konfigurert til å lese ikke hele databasen, men bare en del av tabellene (hvor data oppdateres sjelden)?

  • Koblingen vil lese WAL-filer og ikke oppdage transaksjonsforpliktelser i dem til tabellene den overvåker.
  • Derfor vil den ikke oppdatere sin nåværende posisjon verken i emnet eller i replikeringssporet.
  • Dette vil igjen føre til at WAL-filene blir "fast" på disken og vil sannsynligvis gå tom for diskplass.

Og her kommer alternativer til unnsetning. heartbeat.interval.ms и heartbeat.action.query. Bruk av disse alternativene i par gjør det mulig å utføre en forespørsel om å endre data i en separat tabell hver gang en hjerteslagmelding sendes. Dermed blir LSN-en som kontakten for øyeblikket befinner seg på (i replikeringssporet) kontinuerlig oppdatert. Dette lar DBMS fjerne WAL-filer som ikke lenger er nødvendige. For mer informasjon om hvordan alternativer fungerer, se dokumentasjon.

Et annet alternativ som fortjener nærmere oppmerksomhet er transforms. Selv om det handler mer om bekvemmelighet og skjønnhet ...

Som standard oppretter Debezium emner ved å bruke følgende navnepolicy: serverName.schemaName.tableName. Dette er kanskje ikke alltid praktisk. Alternativer transforms ved hjelp av regulære uttrykk kan du definere en liste over tabeller hvis hendelser må rutes til et emne med et bestemt navn.

I vår konfigurasjon takket være transforms følgende skjer: alle CDC-hendelser fra den sporede databasen vil gå til emnet med navnet data.cdc.dbname. Ellers (uten disse innstillingene), vil Debezium som standard opprette et emne for hver tabell i skjemaet: pg-dev.public.<table_name>.

Koblingsbegrensninger

På slutten av beskrivelsen av koblingskonfigurasjonen for PostgreSQL er det verdt å snakke om følgende funksjoner / begrensninger i arbeidet:

  1. Koblingsfunksjonaliteten for PostgreSQL er avhengig av konseptet logisk dekoding. Derfor han sporer ikke forespørsler om å endre strukturen til databasen (DDL) - følgelig vil disse dataene ikke være i emnene.
  2. Siden replikeringsspor brukes, er tilkobling av kontakten mulig bare til hoved-DBMS-forekomsten.
  3. Hvis brukeren som kobler til databasen under har skrivebeskyttede rettigheter, må du manuelt opprette en replikeringsplass og publisere til databasen før den første lanseringen.

Bruker en konfigurasjon

Så la oss laste inn konfigurasjonen vår i kontakten:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Vi sjekker at nedlastingen var vellykket og at koblingen startet:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Flott: den er satt opp og klar til bruk. La oss nå late som om vi er en forbruker og koble oss til Kafka, hvoretter vi legger til og endrer en oppføring i tabellen:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

I vårt emne vil dette vises som følger:

Veldig lang JSON med endringene våre

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

I begge tilfeller består postene av nøkkelen (PK) til posten som ble endret, og selve essensen av endringene: hva posten var før og hva den ble etter.

  • I tilfelle av INSERT: verdi før (before) er lik nulletterfulgt av strengen som ble satt inn.
  • I tilfelle av UPDATE: i payload.before den forrige tilstanden til raden vises, og i payload.after - nytt med essensen av endring.

2.2 MongoDB

Denne koblingen bruker standard MongoDB-replikeringsmekanisme, og leser informasjon fra oploggen til DBMS-primærnoden.

I likhet med den allerede beskrevne koblingen for PgSQL, blir også her, ved første start, tatt det primære data-snapshotet, hvoretter koblingen bytter til oplog-lesemodus.

Konfigurasjonseksempel:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Som du kan se, er det ingen nye alternativer sammenlignet med forrige eksempel, men bare antallet alternativer som er ansvarlige for å koble til databasen og deres prefikser er redusert.

Innstillinger transforms denne gangen gjør de følgende: slå av navnet på måltemaet fra ordningen <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

feiltoleranse

Spørsmålet om feiltoleranse og høy tilgjengelighet i vår tid er mer akutt enn noen gang – spesielt når vi snakker om data og transaksjoner, og dataendringssporing er ikke på sidelinjen i denne saken. La oss se på hva som kan gå galt i prinsippet og hva som vil skje med Debezium i hvert enkelt tilfelle.

Det er tre alternativer for å velge bort:

  1. Kafka Connect-feil. Hvis Connect er konfigurert til å fungere i distribuert modus, krever dette at flere arbeidere setter samme group.id. Deretter, hvis en av dem mislykkes, vil kontakten startes på nytt på den andre arbeideren og fortsette å lese fra den siste engasjerte posisjonen i emnet i Kafka.
  2. Tap av tilkobling med Kafka-klyngen. Koblingen vil ganske enkelt slutte å lese på posisjonen den ikke klarte å sende til Kafka og med jevne mellomrom prøve å sende den på nytt til forsøket lykkes.
  3. Datakilde utilgjengelig. Koblingen vil prøve å koble til kilden på nytt i henhold til konfigurasjonen. Standard er 16 forsøk med bruk eksponentiell tilbakeslag. Etter det 16. mislykkede forsøket vil oppgaven bli merket som mislyktes og den må startes på nytt manuelt via Kafka Connect REST-grensesnittet.
    • I tilfelle av PostgreSQL data vil ikke gå tapt, fordi bruk av replikeringsspor vil forhindre sletting av WAL-filer som ikke leses av kontakten. I dette tilfellet er det en ulempe: hvis nettverkstilkoblingen mellom kontakten og DBMS er avbrutt i lang tid, er det en sjanse for at diskplassen går tom, og dette kan føre til svikt i hele DBMS.
    • I tilfelle av MySQL binlog-filer kan roteres av DBMS selv før tilkoblingen gjenopprettes. Dette vil føre til at kontakten går inn i feiltilstand, og den må startes på nytt i innledende øyeblikksbildemodus for å fortsette å lese fra binlogs for å gjenopprette normal drift.
    • Про MongoDB. Dokumentasjonen sier: atferden til koblingen i tilfelle logg-/oplog-filene er slettet og koblingen ikke kan fortsette å lese fra posisjonen der den slapp, er den samme for alle DBMS. Det ligger i det faktum at kontakten vil gå inn i staten mislyktes og vil kreve en omstart i modusen første øyeblikksbilde.

      Det finnes imidlertid unntak. Hvis kontakten var i frakoblet tilstand i lang tid (eller ikke kunne nå MongoDB-forekomsten), og oplog ble rotert i løpet av denne tiden, vil kontakten rolig fortsette å lese data fra den første tilgjengelige posisjonen når forbindelsen er gjenopprettet , som er grunnen til at noen av dataene i Kafka no vil treffe.

Konklusjon

Debezium er min første erfaring med CDC-systemer og har vært veldig positiv totalt sett. Prosjektet bestakk støtten til hoved-DBMS, enkel konfigurasjon, støtte for clustering og et aktivt fellesskap. For de som er interessert i praksis anbefaler jeg at du leser veiledningene for Kafka Connect и Debezium.

Sammenlignet med JDBC-kontakten for Kafka Connect, er hovedfordelen med Debezium at endringer leses fra DBMS-loggene, noe som gjør at data kan mottas med minimal forsinkelse. JDBC Connector (levert av Kafka Connect) spør etter den sporede tabellen med et fast intervall og (av samme grunn) genererer ikke meldinger når data slettes (hvordan kan du spørre etter data som ikke er der?).

For å løse lignende problemer, kan du ta hensyn til følgende løsninger (i tillegg til Debezium):

PS

Les også på bloggen vår:

Kilde: www.habr.com

Legg til en kommentar