Presentazione di Debezium - CDC per Apache Kafka

Presentazione di Debezium - CDC per Apache Kafka

In u mo travagliu, sò spessu scontru novi suluzioni tecniche / prudutti di software, infurmazione nantu à quale hè piuttostu scarsa in Internet di lingua russa. Cù questu articulu, pruvaraghju à cumpiendu una tale lacuna cù un esempiu da a mo pratica recente, quandu avia bisognu di stabilisce l'inviu di l'avvenimenti CDC da dui DBMS populari (PostgreSQL è MongoDB) à un cluster Kafka cù Debezium. Spergu chì questu articulu di rivista, chì hè apparsu per u risultatu di u travagliu fattu, serà utile à l'altri.

Chì ghjè Debezium è CDC in generale?

Debezium - Rappresentante di a categuria di software CDC (Capture cambiamenti di dati), o più precisamente, hè un inseme di connettori per diversi DBMS chì sò cumpatibili cù u framework Apache Kafka Connect.

issu prughjettu open source, licenziatu sottu a Licenza Apache v2.0 è sponsorizatu da Red Hat. U sviluppu hè in corso da 2016 è à u mumentu furnisce supportu ufficiale per i seguenti DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Ci sò ancu connettori per Cassandra è Oracle, ma sò attualmente in u statu di "accessu anticipatu", è e novi versioni ùn guarantisci micca a cumpatibilità retroattiva.

Se paragunemu CDC cù l'approcciu tradiziunale (quandu l'applicazione leghje direttamente i dati da u DBMS), allora i so vantaghji principali includenu l'implementazione di u cambiamentu di dati in streaming à u livellu di fila cù bassa latenza, alta affidabilità è dispunibilità. L'ultimi dui punti sò ottenuti utilizendu un cluster Kafka cum'è repository per l'avvenimenti CDC.

Inoltre, i vantaghji includenu u fattu chì un unicu mudellu hè utilizatu per almacenà l'avvenimenti, cusì l'applicazione finale ùn deve micca preoccupatu di i sfumaturi di uperà diversi DBMS.

Infine, l'usu di un broker di messagiu apre u scopu per a scala horizontale di l'applicazioni chì seguitanu i cambiamenti di dati. À u listessu tempu, l'impattu nantu à a fonte di dati hè minimizatu, postu chì i dati ùn sò micca ricevuti direttamente da u DBMS, ma da u cluster Kafka.

À propositu di l'architettura Debezium

Utilizà Debezium si riduce à questu schema simplice:

DBMS (cum'è fonte di dati) → connettore in Kafka Connect → Apache Kafka → cunsumadore

Comu illustrazione, daraghju un diagramma da u situ web di u prugettu:

Presentazione di Debezium - CDC per Apache Kafka

In ogni casu, ùn mi piace micca veramente stu schema, perchè pare chì solu un connettore di lavabo hè pussibule.

In realità, a situazione hè diversa: riempia u vostru Data Lake (ultimu ligame in u diagramma sopra) Ùn hè micca l'unicu modu per aduprà Debezium. Avvenimenti mandati à Apache Kafka ponu esse utilizati da e vostre applicazioni per trattà cù diverse situazioni. Per esempiu:

  • eliminazione di dati irrilevanti da a cache;
  • invià notificazioni;
  • l'aghjurnamenti di l'indici di ricerca;
  • qualchì tipu di logs di auditu;
  • ...

In casu chì avete una applicazione Java è ùn ci hè micca bisognu / pussibilità di utilizà un cluster Kafka, ci hè ancu a pussibilità di travaglià. cunnessu integratu. U plus evidenti hè chì cun ellu pudete ricusà infrastruttura supplementaria (in a forma di un connector è Kafka). Tuttavia, sta suluzione hè stata deprecata da a versione 1.1 è ùn hè più cunsigliatu per l'usu (pò esse sguassatu in versioni future).

Questu articulu discuterà l'architettura cunsigliata da i sviluppatori, chì furnisce a tolleranza di difetti è scalabilità.

Cunfigurazione di u cunnessu

Per cumincià à seguità i cambiamenti in u valore più impurtante - dati - avemu bisognu:

  1. fonte di dati, chì pò esse MySQL à partesi da a versione 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lista completa);
  2. cluster Apache Kafka
  3. Kafka Connect instance (versioni 1.x, 2.x);
  4. cunfiguratu cunnessu Debezium.

U travagliu nantu à i primi dui punti, i.e. u prucessu di stallà un DBMS è Apache Kafka sò fora di u scopu di l'articulu. Tuttavia, per quelli chì volenu implementà tuttu in un sandbox, ci hè un ready-made in u repositoriu ufficiale cù esempi. docker-compose.yaml.

Fighjemu nantu à l'ultimi dui punti in più detail.

0. Kafka Cunnette

Quì è dopu in l'articulu, tutti l'esempii di cunfigurazione sò cunsiderati in u cuntestu di l'imaghjini Docker distribuitu da i sviluppatori Debezium. Contene tutti i fugliali plugin necessarii (connettori) è furnisce a cunfigurazione di Kafka Connect utilizendu variabili di l'ambiente.

Se avete intenzione di utilizà Kafka Connect da Confluent, avete bisognu di aghjunghje i plugins di i connettori necessarii stessu à u repertoriu specificatu in plugin.path o stabilitu via una variabile d'ambiente CLASSPATH. I paràmetri per u travagliu Kafka Connect è i connettori sò definiti per i schedarii di cunfigurazione chì sò passati cum'è argumenti à u cumandamentu di l'iniziu di u travagliu. Per i dettagli, vede ducumentazione.

Tuttu u prucessu di stallà Debeizum in a versione di cunnessu hè realizatu in duie tappe. Pensemu à ognunu di elli:

1. Stallà u framework Kafka Connect

Per trasmette dati à un cluster Apache Kafka, paràmetri specifichi sò stabiliti in u framework Kafka Connect, cum'è:

  • paràmetri di cunnessione di cluster,
  • nomi di temi in quale a cunfigurazione di u cunnessu stessu serà guardatu,
  • u nome di u gruppu in quale u connettore hè in esecuzione (in casu di utilizà u modu distribuitu).

L'imaghjini ufficiali di Docker di u prugettu sustene a cunfigurazione cù variabili di l'ambiente - questu hè ciò chì useremu. Allora scarichemu l'imaghjini:

docker pull debezium/connect

U settore minimu di variabili ambientali necessarii per eseguisce u connettore hè u seguitu:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista iniziale di i servitori di cluster Kafka per ottene una lista completa di i membri di u cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets - un tema per almacenà e pusizioni induve u connettore hè attualmente situatu;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un tema per almacenà u statutu di u connettore è e so funzioni;
  • CONFIG_STORAGE_TOPIC=connector-config - un tema per almacenà e dati di cunfigurazione di u connettore è e so funzioni;
  • GROUP_ID=1 - identificatore di u gruppu di travagliadori nantu à quale u compitu di cunnessione pò esse eseguitu; necessariu quandu si usa distribuitu (distribuitu) regime.

Cuminciamu u cuntinuu cù sti variàbili:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota nantu à Avro

Per automaticamente, Debezium scrive dati in formatu JSON, chì hè accettatu per sandboxes è picculi quantità di dati, ma pò esse un prublema in basa di dati assai caricati. Un'alternativa à u cunvertitore JSON hè di serializà i missaghji usendu Avro à un furmatu binariu, chì riduce a carica nantu à u subsistema I / O in Apache Kafka.

Per utilizà Avro, avete bisognu di implementà un separatu schema-registru (per almacenà schemi). I variàbili per u cunvertitore pareranu cusì:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

I dettagli nantu à l'usu di Avro è a creazione di un registru per questu sò fora di u scopu di l'articulu - in più, per a chiarezza, useremu JSON.

2. Stabbilimentu di u cunnessu stessu

Avà pudete andà direttamente à a cunfigurazione di u cunnessu stessu, chì leghje e dati da a fonte.

Fighjemu l'esempiu di connettori per dui DBMS: PostgreSQL è MongoDB, per quale aghju sperienza è per quale ci sò diffirenzii (anche chjuchi, ma in certi casi significativu!).

A cunfigurazione hè descritta in notazione JSON è caricata à Kafka Connect utilizendu una dumanda POST.

2.1. PostgreSQL

Esempiu di cunfigurazione di cunnessu per PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

U principiu di funziunamentu di u connector dopu sta cunfigurazione hè abbastanza sèmplice:

  • À u primu principiu, si cunnetta à a basa di dati specificata in a cunfigurazione è principia in u modu snapshot iniziale, mandendu à Kafka l'inseme iniziale di dati ricivutu cù a cundizzioni SELECT * FROM table_name.
  • Dopu chì l'inizializazione hè finita, u connettore entra in u modu di leghje i cambiamenti da i schedarii WAL PostgreSQL.

Circa l'opzioni aduprate:

  • name - u nome di u connettore per quale hè aduprata a cunfigurazione descritta quì sottu; in u futuru, stu nome hè adupratu per travaglià cù u connector (vale à dì vede u statu / riavvia / aghjurnà a cunfigurazione) attraversu l'API REST Kafka Connect;
  • connector.class - a classe di cunnessu DBMS chì serà utilizatu da u connettore cunfiguratu;
  • plugin.name hè u nome di u plugin per a decodificazione logica di dati da i schedari WAL. Disponibile à sceglie wal2json, decoderbuffs и pgoutput. I primi dui necessitanu a stallazione di l'estensioni appropritate in u DBMS, è pgoutput per a versione PostgreSQL 10 è superiore ùn hè micca bisognu di manipulazioni supplementari;
  • database.* - opzioni per cunnette à a basa di dati, induve database.server.name - u nome di l'istanza PostgreSQL utilizatu per furmà u nome di u tema in u cluster Kafka;
  • table.include.list - una lista di tavule in quale vulemu seguità i cambiamenti; datu in u furmatu schema.table_name; ùn pò esse usatu inseme cù table.exclude.list;
  • heartbeat.interval.ms - intervallu (in millisecondi) cù quale u connettore manda messagi di u battutu à un tema speciale;
  • heartbeat.action.query - una dumanda chì serà eseguita quandu invià ogni missaghju di battitu di cori (l'opzione hè apparsa da a versione 1.1);
  • slot.name - u nome di u slot di replicazione chì serà utilizatu da u connettore;
  • publication.name - Nome publicazioni in PostgreSQL chì u connettore usa. In casu ùn esiste micca, Debezium pruverà à creà. Se l'utilizatore sottu à quale a cunnessione hè fatta ùn hà micca abbastanza diritti per questa azione, u connettore esce cun un errore;
  • transforms determina esattamente cumu cambià u nome di u tema di destinazione:
    • transforms.AddPrefix.type indica chì avemu aduprà espressioni regulare;
    • transforms.AddPrefix.regex - maschera da quale u nome di u tema di destinazione hè ridefinitu;
    • transforms.AddPrefix.replacement - direttamente ciò chì avemu ridefinitu.

Più nantu à u battitu di u core è e trasformazioni

Per automaticamente, u connettore manda dati à Kafka per ogni transazzione impegnata, è scrive u so LSN (Log Sequence Number) à u tema di serviziu. offset. Ma chì succede se u connettore hè cunfiguratu per leghje micca tutta a basa di dati, ma solu una parte di e so tavule (in quale a data hè aghjurnata pocu frequente)?

  • U connettore leghje i fugliali WAL è ùn rileva micca transazzione cumminciate in elli à e tavule chì monitoreghja.
  • Dunque, ùn aghjurnà micca a so pusizione attuale nè in u tema nè in u slot di replicazione.
  • Questu, à u turnu, pruvucarà i schedari WAL per esse "stuck" in u discu è prubabilmente escerà u spaziu di discu.

E quì l'opzioni venenu in salvezza. heartbeat.interval.ms и heartbeat.action.query. L'usu di queste opzioni in coppie permette di eseguisce una dumanda di cambià dati in una tavola separata ogni volta chì un missaghju di battitu di cori hè mandatu. Cusì, u LSN nantu à quale u connettore hè attualmente situatu (in u slot di replicazione) hè constantemente aghjurnatu. Questu permette à u DBMS di sguassà i schedari WAL chì ùn sò più necessarii. Per più infurmazione nantu à cumu funziona l'opzioni, vede ducumentazione.

Un'altra opzione chì merita più attenzione hè transforms. Ancu s'ellu si tratta più di comodità è bellezza ...

Per automaticamente, Debezium crea temi utilizendu a seguente pulitica di nomi: serverName.schemaName.tableName. Questu pò micca sempre esse convenientu. Opzioni transforms usendu l'espressioni regulare, pudete definisce una lista di tavule chì l'avvenimenti anu da esse instradati à un tema cun un nome specificu.

In a nostra cunfigurazione grazia à transforms succede u seguente: tutti l'avvenimenti CDC da a basa di dati tracciati andaranu à u tema cù u nome data.cdc.dbname. Altrimenti (senza sti paràmetri), Debezium crea per automaticamente un tema per ogni tavula di a forma: pg-dev.public.<table_name>.

Limitazioni di u cunnessu

À a fine di a descrizzione di a cunfigurazione di u connector per PostgreSQL, vale a pena parlà di e seguenti caratteristiche / limitazioni di u so travagliu:

  1. A funziunalità di u connector per PostgreSQL si basa in u cuncettu di decodificazione logica. Dunque ellu ùn traccia micca e dumande per cambià a struttura di a basa di dati (DDL) - per quessa, sta dati ùn saranu micca in i temi.
  2. Siccomu i slot di replicazione sò usati, a cunnessione di u connettore hè pussibule solu à l'istanza DBMS maestru.
  3. Se l'utilizatore sottu quale u cunnessu si cunnetta à a basa di dati hà diritti di sola lettura, allora prima di u primu lanciu, avete bisognu di creà manualmente un slot di replicazione è publicà à a basa di dati.

Applicà una cunfigurazione

Allora carchemu a nostra cunfigurazione in u connettore:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Avemu verificatu chì a scaricamentu hè successu è u connettore hà iniziatu:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Grande: hè stallatu è pronta per andà. Avà fingemu d'esse un cunsumadore è cunnetta cù Kafka, dopu chì aghjunghje è cambià una voce in a tavula:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

In u nostru tema, questu serà mostratu cusì:

JSON assai longu cù i nostri cambiamenti

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In i dui casi, i registri sò custituiti da a chjave (PK) di u record chì hè stata cambiata, è l'essenza stessa di i cambiamenti: ciò chì u record era prima è ciò chì hè diventatu dopu.

  • In u casu di INSERT: valore prima (before) uguali nullseguita da a stringa chì hè stata inserita.
  • In u casu di UPDATE: à payload.before u statu precedente di a fila hè visualizatu, è in payload.after - novu cù l'essenza di u cambiamentu.

2.2 MongoDB

Stu connettore usa u mecanismu di replicazione standard di MongoDB, leghjendu l'infurmazioni da l'oplog di u nodu primariu DBMS.

In modu simile à u connettore digià descrittu per PgSQL, ancu quì, à u primu iniziu, l'istantanea di dati primariu hè presa, dopu chì u connettore cambia à u modu di lettura oplog.

Esempiu di cunfigurazione:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Comu pudete vede, ùn ci sò micca novi opzioni cumparatu cù l'esempiu precedente, ma solu u numeru di l'opzioni rispunsevuli di cunnette à a basa di dati è i so prefissi hè stata ridutta.

Settings transforms sta volta facenu i seguenti: turnà u nome di u tema di destinazione da u schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolleranza à i difetti

U prublema di a tolleranza di difetti è l'alta dispunibilità in u nostru tempu hè più agutu chè mai - soprattuttu quandu parlemu di dati è transazzione, è u seguimentu di u cambiamentu di dati ùn hè micca in latu in questa materia. Fighjemu ciò chì pò sbaglià in principiu è ciò chì succede à Debezium in ogni casu.

Ci sò trè opzioni di opt-out:

  1. Kafka Connect fallimentu. Se Connect hè cunfiguratu per travaglià in modu distribuitu, questu richiede parechji travagliadori per stabilisce u stessu group.id. Allora, se unu di elli falla, u connettore serà riavviatu nantu à l'altru travagliadore è cuntinueghja a leghje da l'ultima pusizioni impegnata in u tema in Kafka.
  2. Perdita di cunnessione cù u cluster Kafka. U connettore smetterà solu di leghje à a pusizione chì hà fiascatu di mandà à Kafka è pruvate periodicamente di rinviallu finu à chì u tentativu riesce.
  3. A fonte di dati ùn hè micca dispunibule. U connettore pruverà à ricunniscendu à a fonte secondu a cunfigurazione. U predefinitu hè 16 tentativi di usu backoff esponenziale. Dopu à u 16th tentativu fallutu, u compitu serà marcatu cum'è hà fiascatu è deve esse riavviatu manualmente via l'interfaccia REST Kafka Connect.
    • In u casu di PostgreSQL dati ùn sarà persu, perchè L'usu di slot di replicazione impedisce l'eliminazione di i fugliali WAL chì ùn sò micca letti da u connettore. In questu casu, ci hè un inconveniente: se a cunnessione di a rete trà u cunnessu è u DBMS hè disturbata per un bellu pezzu, ci hè una chance chì u spaziu di discu si esce, è questu pò purtà à u fallimentu di tuttu u DBMS.
    • In u casu di MySQL i schedari binlog ponu esse rotati da u DBMS stessu prima chì a cunnessione hè restaurata. Questu pruvucarà u connettore per andà in u statu fallutu, è hà da esse riavviatu in u modu di snapshot iniziale per cuntinuà a leghje da i binlogs per restaurà u funziunamentu normale.
    • nantu MongoDB. A documentazione dice: u cumpurtamentu di u connector in casu chì i schedarii di log / oplog sò stati sguassati è u connector ùn pò micca cuntinuà à leghje da a pusizione induve l'abbandunò hè u listessu per tutti i DBMS. Si trova in u fattu chì u connector andarà in u statu hà fiascatu è averà bisognu di un riavviu in u modu snapshot iniziale.

      Tuttavia, ci sò eccezzioni. Se u connettore era in un statu disconnected per un bellu pezzu (o ùn pudia micca ghjunghje à l'istanza MongoDB), è oplog hè stata rotata durante stu tempu, allora quandu a cunnessione hè restaurata, u connettore cuntinuerà tranquillamente à leghje e dati da a prima pusizioni dispunibule. , chì hè per quessa alcuni di i dati in Kafka ùn batterà.

cunchiusioni

Debezium hè a mo prima sperienza cù i sistemi CDC è hè statu assai pusitivu in generale. U prughjettu bribed u sustegnu di u DBMS principale, facilità di cunfigurazione, supportu per clustering è una cumunità attiva. Per quelli chì anu interessatu in a pratica, vi cunsigliu di leghje e guide per Kafka Connect и Debezium.

Comparatu à u connector JDBC per Kafka Connect, u vantaghju principale di Debezium hè chì i cambiamenti sò leghjiti da i logs DBMS, chì permette à e dati per esse ricivutu cù ritardu minimu. U Connettore JDBC (furnitu da Kafka Connect) interrogà a tavula tracciata à un intervallu fissu è (per a listessa ragione) ùn genera micca missaghji quandu i dati sò sguassati (cumu pudete dumandà a dati chì ùn sò micca quì?).

Per risolve prublemi simili, pudete attentu à e seguenti suluzioni (in più di Debezium):

PS

Leghjite puru nant'à u nostru blog:

Source: www.habr.com

Add a comment