Introductie van Debezium - CDC voor Apache Kafka

Introductie van Debezium - CDC voor Apache Kafka

In mijn werk kom ik vaak nieuwe technische oplossingen / softwareproducten tegen, waarover informatie vrij schaars is op het Russisch sprekende internet. Met dit artikel zal ik proberen zo'n leemte op te vullen met een voorbeeld uit mijn recente praktijk, toen ik het verzenden van CDC-gebeurtenissen van twee populaire DBMS'en (PostgreSQL en MongoDB) naar een Kafka-cluster moest instellen met behulp van Debezium. Ik hoop dat dit overzichtsartikel, dat verscheen als resultaat van het verrichte werk, nuttig zal zijn voor anderen.

Wat is Debezium en CDC in het algemeen?

Debezium - Vertegenwoordiger van de CDC-softwarecategorie (Leg verandering van gegevens vast), of beter gezegd, het is een set connectoren voor verschillende DBMS'en die compatibel zijn met het Apache Kafka Connect-framework.

Het open source-project, gelicentieerd onder de Apache-licentie v2.0 en gesponsord door Red Hat. De ontwikkeling is aan de gang sinds 2016 en biedt momenteel officiële ondersteuning voor de volgende DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Er zijn ook connectoren voor Cassandra en Oracle, maar deze bevinden zich momenteel in de status "vroege toegang" en nieuwe releases garanderen geen achterwaartse compatibiliteit.

Als we CDC vergelijken met de traditionele benadering (wanneer de applicatie gegevens rechtstreeks uit het DBMS leest), dan zijn de belangrijkste voordelen de implementatie van gegevenswijzigingsstreaming op rijniveau met lage latentie, hoge betrouwbaarheid en beschikbaarheid. De laatste twee punten worden bereikt door een Kafka-cluster te gebruiken als opslagplaats voor CDC-gebeurtenissen.

De voordelen omvatten ook het feit dat een enkel model wordt gebruikt om gebeurtenissen op te slaan, zodat de uiteindelijke toepassing zich geen zorgen hoeft te maken over de nuances van het werken met verschillende DBMS.

Ten slotte biedt het gebruik van een berichtenmakelaar ruimte voor horizontale schaling van applicaties die wijzigingen in gegevens volgen. Tegelijkertijd wordt de impact op de databron geminimaliseerd, aangezien data niet rechtstreeks vanuit het DBMS, maar vanuit het Kafka-cluster wordt ontvangen.

Over de Debezium-architectuur

Het gebruik van Debezium komt neer op dit eenvoudige schema:

DBMS (als gegevensbron) → connector in Kafka Connect → Apache Kafka → consument

Ter illustratie geef ik een schema van de projectwebsite:

Introductie van Debezium - CDC voor Apache Kafka

Ik vind dit schema echter niet echt leuk, omdat het lijkt alsof alleen een gootsteenconnector mogelijk is.

In werkelijkheid is de situatie anders: het vullen van uw Data Lake (laatste link in het diagram hierboven) is niet de enige manier om Debezium te gebruiken. Gebeurtenissen die naar Apache Kafka worden verzonden, kunnen door uw toepassingen worden gebruikt om met verschillende situaties om te gaan. Bijvoorbeeld:

  • verwijdering van irrelevante gegevens uit de cache;
  • het versturen van notificaties;
  • zoekindexupdates;
  • een soort auditlogboeken;
  • ...

Indien u een Java applicatie heeft en er geen noodzaak/mogelijkheid is om een ​​Kafka cluster te gebruiken, is er ook de mogelijkheid om via ingebouwde connector. Het voor de hand liggende pluspunt is dat je hiermee extra infrastructuur (in de vorm van een connector en Kafka) kunt weigeren. Deze oplossing is echter verouderd sinds versie 1.1 en wordt niet langer aanbevolen voor gebruik (deze oplossing kan in toekomstige releases worden verwijderd).

Dit artikel bespreekt de door ontwikkelaars aanbevolen architectuur, die fouttolerantie en schaalbaarheid biedt.

Connectorconfiguratie

Om te beginnen met het volgen van veranderingen in de belangrijkste waarde - data - hebben we het volgende nodig:

  1. gegevensbron, die MySQL kan zijn vanaf versie 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (volledige lijst);
  2. Apache Kafka-cluster
  3. Kafka Connect-instantie (versies 1.x, 2.x);
  4. geconfigureerde Debezium-connector.

Werk aan de eerste twee punten, d.w.z. het proces van het installeren van een DBMS en Apache Kafka valt buiten het bestek van het artikel. Voor degenen die echter alles in een sandbox willen inzetten, is er een kant-en-klare in de officiële repository met voorbeelden docker-compose.yaml.

Op de laatste twee punten gaan we dieper in.

0. Kafka Connect

Hier en verderop in het artikel worden alle configuratievoorbeelden beschouwd in de context van de Docker-image die door de Debezium-ontwikkelaars wordt gedistribueerd. Het bevat alle benodigde plug-inbestanden (connectors) en biedt Kafka Connect-configuratie met behulp van omgevingsvariabelen.

Als u van plan bent om Kafka Connect van Confluent te gebruiken, moet u zelf de plug-ins van de benodigde connectoren toevoegen aan de map die is opgegeven in plugin.path of ingesteld via een omgevingsvariabele CLASSPATH. De instellingen voor de Kafka Connect-worker en connectors worden gedefinieerd via configuratiebestanden die als argumenten worden doorgegeven aan de worker-startopdracht. Voor details zie documentatie.

Het hele proces van het instellen van Debeizum in de connectorversie wordt in twee fasen uitgevoerd. Laten we elk van hen bekijken:

1. Het opzetten van het Kafka Connect-framework

Om gegevens naar een Apache Kafka-cluster te streamen, worden specifieke parameters ingesteld in het Kafka Connect-framework, zoals:

  • clusterverbindingsinstellingen,
  • namen van onderwerpen waarin de configuratie van de connector zelf wordt opgeslagen,
  • de naam van de groep waarin de connector draait (in het geval van gedistribueerde modus).

De officiële Docker-image van het project ondersteunt configuratie met behulp van omgevingsvariabelen - dit is wat we zullen gebruiken. Dus laten we de afbeelding downloaden:

docker pull debezium/connect

De minimale set omgevingsvariabelen die nodig zijn om de connector uit te voeren, is als volgt:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - eerste lijst met Kafka-clusterservers om een ​​volledige lijst met clusterleden te krijgen;
  • OFFSET_STORAGE_TOPIC=connector-offsets — een onderwerp voor het opslaan van posities waar de connector zich momenteel bevindt;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - een onderwerp voor het opslaan van de status van de connector en zijn taken;
  • CONFIG_STORAGE_TOPIC=connector-config - een onderwerp voor het opslaan van connectorconfiguratiegegevens en zijn taken;
  • GROUP_ID=1 — identificator van de groep werknemers waarop de connectortaak kan worden uitgevoerd; vereist bij gebruik van gedistribueerd (gedistribueerd) modus.

We starten de container met deze variabelen:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Opmerking over Avro

Standaard schrijft Debezium gegevens in JSON-indeling, wat acceptabel is voor sandboxen en kleine hoeveelheden gegevens, maar een probleem kan zijn in zwaarbelaste databases. Een alternatief voor de JSON-converter is om berichten te serialiseren met behulp van euro naar een binair formaat, wat de belasting van het I/O-subsysteem in Apache Kafka vermindert.

Om Avro te gebruiken, moet u een apart schema-registratie (voor het opslaan van schema's). De variabelen voor de converter zien er als volgt uit:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Details over het gebruik van Avro en het opzetten van een register ervoor vallen buiten het bestek van het artikel - verder zullen we voor de duidelijkheid JSON gebruiken.

2. De connector zelf instellen

Nu kunt u rechtstreeks naar de configuratie van de connector zelf gaan, die gegevens van de bron zal lezen.

Laten we eens kijken naar het voorbeeld van connectoren voor twee DBMS: PostgreSQL en MongoDB, waarmee ik ervaring heb en waarvoor er verschillen zijn (zij het klein, maar in sommige gevallen significant!).

De configuratie wordt beschreven in JSON-notatie en geüpload naar Kafka Connect met behulp van een POST-verzoek.

2.1. PostgreSQL

Voorbeeld connectorconfiguratie voor PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Het werkingsprincipe van de connector na deze configuratie is vrij eenvoudig:

  • Bij de eerste start maakt het verbinding met de database die is opgegeven in de configuratie en start het in de modus eerste momentopname, het verzenden naar Kafka van de eerste set gegevens die is ontvangen met de voorwaardelijke SELECT * FROM table_name.
  • Nadat de initialisatie is voltooid, gaat de connector naar de modus voor het lezen van wijzigingen van PostgreSQL WAL-bestanden.

Over de gebruikte opties:

  • name — de naam van de connector waarvoor de hieronder beschreven configuratie wordt gebruikt; later wordt deze naam gebruikt om met de connector te werken (d.w.z. bekijk de status / herstart / update de configuratie) via de Kafka Connect REST API;
  • connector.class — de DBMS-connectorklasse die door de geconfigureerde connector zal worden gebruikt;
  • plugin.name — de naam van de plug-in voor logische decodering van gegevens uit WAL-bestanden. Beschikbaar om uit te kiezen wal2json, decoderbuffs и pgoutput. De eerste twee vereisen de installatie van de juiste extensies in het DBMS, en pgoutput voor PostgreSQL versie 10 en hoger zijn geen aanvullende manipulaties vereist;
  • database.* — opties voor verbinding met de database, waar database.server.name - de naam van de PostgreSQL-instantie die is gebruikt om de naam van het onderwerp in het Kafka-cluster te vormen;
  • table.include.list - een lijst met tabellen waarin we wijzigingen willen bijhouden; gegeven in het formaat schema.table_name; kan niet samen met worden gebruikt table.exclude.list;
  • heartbeat.interval.ms — interval (in milliseconden) waarmee de connector hartslagberichten naar een speciaal onderwerp stuurt;
  • heartbeat.action.query - een verzoek dat wordt uitgevoerd bij het verzenden van elk hartslagbericht (de optie is verschenen sinds versie 1.1);
  • slot.name — de naam van het replicatieslot dat door de connector zal worden gebruikt;
  • publication.name - Naam Uitgave in PostgreSQL die de connector gebruikt. Als het niet bestaat, zal Debezium proberen het te maken. Als de gebruiker waaronder de verbinding tot stand is gebracht niet voldoende rechten heeft voor deze actie, wordt de connector afgesloten met een foutmelding;
  • transforms bepaalt hoe de naam van het doelonderwerp precies moet worden gewijzigd:
    • transforms.AddPrefix.type geeft aan dat we reguliere expressies zullen gebruiken;
    • transforms.AddPrefix.regex — masker waarmee de naam van het doelonderwerp opnieuw wordt gedefinieerd;
    • transforms.AddPrefix.replacement - direct wat we herdefiniëren.

Meer over hartslag en transformaties

De connector stuurt standaard gegevens naar Kafka voor elke vastgelegde transactie en schrijft zijn LSN (Log Sequence Number) naar het serviceonderwerp offset. Maar wat gebeurt er als de connector is geconfigureerd om niet de hele database te lezen, maar slechts een deel van de tabellen (waarin gegevens zelden worden bijgewerkt)?

  • De connector leest WAL-bestanden en detecteert geen transactiecommits in de tabellen die worden gecontroleerd.
  • Daarom zal het zijn huidige positie in het onderwerp of het replicatieslot niet bijwerken.
  • Dit zal er op zijn beurt voor zorgen dat de WAL-bestanden op schijf "vast komen te zitten" en dat er waarschijnlijk geen schijfruimte meer beschikbaar is.

En hier komen opties te hulp. heartbeat.interval.ms и heartbeat.action.query. Door deze opties in paren te gebruiken, is het mogelijk om een ​​verzoek uit te voeren om gegevens in een aparte tabel te wijzigen telkens wanneer een heartbeat-bericht wordt verzonden. Het LSN waarop de connector zich momenteel bevindt (in het replicatieslot) wordt dus voortdurend bijgewerkt. Hierdoor kan het DBMS WAL-bestanden verwijderen die niet langer nodig zijn. Zie voor meer informatie over hoe opties werken documentatie.

Een andere optie die meer aandacht verdient, is transforms. Al gaat het meer om gemak en schoonheid...

Debezium maakt standaard onderwerpen met het volgende naamgevingsbeleid: serverName.schemaName.tableName. Dit is misschien niet altijd handig. Opties transforms met behulp van reguliere expressies kunt u een lijst met tabellen definiëren waarvan de gebeurtenissen moeten worden doorgestuurd naar een onderwerp met een specifieke naam.

In onze configuratie dankzij transforms het volgende gebeurt: alle CDC-gebeurtenissen uit de bijgehouden database gaan naar het onderwerp met de naam data.cdc.dbname. Anders (zonder deze instellingen) zou Debezium standaard een onderwerp aanmaken voor elke tabel van het formulier: pg-dev.public.<table_name>.

Connectorbeperkingen

Aan het einde van de beschrijving van de connectorconfiguratie voor PostgreSQL is het de moeite waard om te praten over de volgende functies / beperkingen van zijn werk:

  1. De connectorfunctionaliteit voor PostgreSQL is gebaseerd op het concept van logische decodering. Daarom hij houdt geen verzoeken bij om de structuur van de database te wijzigen (DDL) - dienovereenkomstig zullen deze gegevens niet in de onderwerpen staan.
  2. Omdat replicatieslots worden gebruikt, is de aansluiting van de connector mogelijk alleen naar de hoofdinstantie van het DBMS.
  3. Als de gebruiker waaronder de connector verbinding maakt met de database alleen-lezenrechten heeft, moet u vóór de eerste keer opstarten handmatig een replicatieslot maken en naar de database publiceren.

Een configuratie toepassen

Dus laten we onze configuratie in de connector laden:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

We controleren of de download is gelukt en de connector is gestart:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Geweldig: het is opgezet en klaar voor gebruik. Laten we nu doen alsof we een consument zijn en verbinding maken met Kafka, waarna we een invoer in de tabel toevoegen en wijzigen:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

In ons topic wordt dit als volgt weergegeven:

Zeer lange JSON met onze wijzigingen

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In beide gevallen bestaan ​​de records uit de sleutel (PK) van het record dat is gewijzigd, en de essentie van de wijzigingen: wat het record ervoor was en wat het daarna werd.

  • In het geval van INSERT: waarde voor (before) gelijk nullgevolgd door de string die is ingevoegd.
  • In het geval van UPDATE: Bij payload.before de vorige status van de rij wordt weergegeven, en in payload.after - nieuw met de essentie van verandering.

2.2 MongoDB

Deze connector gebruikt het standaard MongoDB-replicatiemechanisme en leest informatie uit de oplog van het primaire DBMS-knooppunt.

Net als bij de reeds beschreven connector voor PgSQL wordt ook hier bij de eerste start het momentopname van de primaire gegevens gemaakt, waarna de connector overschakelt naar de oplog-leesmodus.

Configuratievoorbeeld:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Zoals u kunt zien, zijn er geen nieuwe opties in vergelijking met het vorige voorbeeld, maar alleen het aantal opties dat verantwoordelijk is voor verbinding met de database en hun voorvoegsels is verminderd.

Instellingen transforms deze keer doen ze het volgende: de naam van het doelonderwerp uit het schema halen <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fout tolerantie

De kwestie van fouttolerantie en hoge beschikbaarheid is in onze tijd acuter dan ooit - vooral als we het hebben over gegevens en transacties, en het bijhouden van gegevenswijzigingen staat hierbij niet aan de zijlijn. Laten we eens kijken wat er in principe mis kan gaan en wat er in elk geval met Debezium zal gebeuren.

Er zijn drie opt-out mogelijkheden:

  1. Kafka Connect-fout. Als Connect is geconfigureerd om in gedistribueerde modus te werken, moeten meerdere medewerkers dezelfde group.id instellen. Als een van hen faalt, wordt de connector opnieuw gestart op de andere werker en wordt verder gelezen vanaf de laatste vastgelegde positie in het onderwerp in Kafka.
  2. Verlies van connectiviteit met Kafka-cluster. De connector stopt gewoon met lezen op de positie die niet naar Kafka kon worden verzonden en probeert het periodiek opnieuw te verzenden totdat de poging slaagt.
  3. Gegevensbron niet beschikbaar. De connector probeert opnieuw verbinding te maken met de bron volgens de configuratie. De standaardwaarde is 16 pogingen met exponentiële uitstel. Na de 16e mislukte poging wordt de taak gemarkeerd als mislukt en het moet handmatig opnieuw worden opgestart via de Kafka Connect REST-interface.
    • In het geval van PostgreSQL gegevens gaan niet verloren, omdat het gebruik van replicatieslots voorkomt dat WAL-bestanden worden verwijderd die niet door de connector worden gelezen. In dit geval is er een keerzijde: als de netwerkconnectiviteit tussen de connector en het DBMS lange tijd wordt onderbroken, bestaat de kans dat de schijfruimte opraakt en dit kan leiden tot het uitvallen van het gehele DBMS.
    • In het geval van MySQL binlog-bestanden kunnen door het DBMS zelf worden geroteerd voordat de verbinding wordt hersteld. Hierdoor gaat de connector in de mislukte status en moet deze opnieuw worden opgestart in de initiële snapshot-modus om door te gaan met lezen van binlogs om de normale werking te herstellen.
    • Про MongoDB. De documentatie zegt: het gedrag van de connector in het geval dat de log-/oplogbestanden zijn verwijderd en de connector niet verder kan lezen vanaf de positie waar hij was gebleven, is hetzelfde voor alle DBMS. Het ligt in het feit dat de connector in de staat zal gaan mislukt en vereist een herstart in de modus eerste momentopname.

      Er zijn echter uitzonderingen. Als de connector lange tijd niet was aangesloten (of de MongoDB-instantie niet kon bereiken) en de oplog gedurende deze tijd werd geroteerd, zal de connector, wanneer de verbinding is hersteld, rustig doorgaan met het lezen van gegevens vanaf de eerst beschikbare positie , vandaar dat sommige gegevens in Kafka geen zal raken.

Conclusie

Debezium is mijn eerste ervaring met CDC-systemen en was over het algemeen zeer positief. Het project kocht de ondersteuning van het belangrijkste DBMS, configuratiegemak, ondersteuning voor clustering en een actieve gemeenschap om. Voor degenen die geïnteresseerd zijn in de praktijk, raad ik u aan de handleidingen voor te lezen Kafka Connect и Debezium.

Vergeleken met de JDBC-connector voor Kafka Connect is het belangrijkste voordeel van Debezium dat wijzigingen worden gelezen uit de DBMS-logboeken, waardoor gegevens met minimale vertraging kunnen worden ontvangen. De JDBC-connector (geleverd door Kafka Connect) bevraagt ​​de bijgehouden tabel met een vast interval en genereert (om dezelfde reden) geen berichten wanneer gegevens worden verwijderd (hoe kunt u gegevens opvragen die er niet zijn?).

Om vergelijkbare problemen op te lossen, kunt u (naast Debezium) letten op de volgende oplossingen:

PS

Lees ook op onze blog:

Bron: www.habr.com

Voeg een reactie