Presentazione di Debezium - CDC per Apache Kafka

Presentazione di Debezium - CDC per Apache Kafka

Nel mio lavoro mi imbatto spesso in nuove soluzioni tecniche / prodotti software, le cui informazioni sono piuttosto scarse sull'Internet di lingua russa. Con questo articolo, proverò a colmare una di queste lacune con un esempio tratto dalla mia pratica recente, quando avevo bisogno di impostare l'invio di eventi CDC da due popolari DBMS (PostgreSQL e MongoDB) a un cluster Kafka utilizzando Debezium. Spero che questo articolo di recensione, apparso come risultato del lavoro svolto, sia utile ad altri.

Cos'è Debezium e CDC in generale?

Debezio - Rappresentante della categoria software CDC (Cattura la modifica dei dati), o più precisamente si tratta di un insieme di connettori per vari DBMS compatibili con il framework Apache Kafka Connect.

Essa progetto open source, concesso in licenza con la licenza Apache v2.0 e sponsorizzato da Red Hat. Lo sviluppo è in corso dal 2016 e al momento fornisce supporto ufficiale per i seguenti DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Esistono anche connettori per Cassandra e Oracle, ma sono attualmente in stato di "accesso anticipato" e le nuove versioni non garantiscono la compatibilità con le versioni precedenti.

Se confrontiamo CDC con l'approccio tradizionale (quando l'applicazione legge direttamente i dati dal DBMS), i suoi principali vantaggi includono l'implementazione dello streaming di modifica dei dati a livello di riga con bassa latenza, elevata affidabilità e disponibilità. Gli ultimi due punti vengono raggiunti utilizzando un cluster Kafka come repository per gli eventi CDC.

Inoltre, i vantaggi includono il fatto che per memorizzare gli eventi viene utilizzato un unico modello, quindi l'applicazione finale non deve preoccuparsi delle sfumature legate al funzionamento di diversi DBMS.

Infine, l'utilizzo di un broker di messaggi apre la possibilità di un ridimensionamento orizzontale delle applicazioni che tengono traccia delle modifiche nei dati. Allo stesso tempo, l'impatto sull'origine dati è ridotto al minimo, poiché i dati non vengono ricevuti direttamente dal DBMS, ma dal cluster Kafka.

Informazioni sull'architettura Debezium

L'uso di Debezium si riduce a questo semplice schema:

DBMS (come origine dati) → connettore in Kafka Connect → Apache Kafka → consumatore

A titolo illustrativo, fornirò un diagramma dal sito web del progetto:

Presentazione di Debezium - CDC per Apache Kafka

Tuttavia, questo schema non mi piace molto, perché sembra che sia possibile solo un connettore per il lavandino.

In realtà la situazione è diversa: riempire il proprio Data Lake (ultimo collegamento nel diagramma sopra) non è l'unico modo per utilizzare Debezium. Gli eventi inviati ad Apache Kafka possono essere utilizzati dalle tue applicazioni per risolvere varie situazioni. Per esempio:

  • rimozione di dati irrilevanti dalla cache;
  • invio di notifiche;
  • aggiornamenti dell'indice di ricerca;
  • una sorta di registri di controllo;
  • ...

Nel caso in cui si disponga di un'applicazione Java e non vi sia necessità/possibilità di utilizzare un cluster Kafka, esiste anche la possibilità di lavorare tramite connettore incorporato. Il vantaggio ovvio è che con esso puoi rifiutare un'infrastruttura aggiuntiva (sotto forma di connettore e Kafka). Tuttavia, questa soluzione è stata deprecata a partire dalla versione 1.1 e non se ne consiglia più l'uso (potrebbe essere rimossa nelle versioni future).

Questo articolo discuterà l'architettura consigliata dagli sviluppatori, che fornisce tolleranza agli errori e scalabilità.

Configurazione del connettore

Per iniziare a monitorare i cambiamenti nel valore più importante, ovvero i dati, abbiamo bisogno di:

  1. sorgente dati, che può essere MySQL a partire dalla versione 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (elenco completo);
  2. Cluster Apache Kafka
  3. Istanza Kafka Connect (versioni 1.x, 2.x);
  4. connettore Debezium configurato.

Lavora sui primi due punti, cioè il processo di installazione di un DBMS e Apache Kafka vanno oltre lo scopo dell'articolo. Tuttavia, per coloro che desiderano distribuire tutto in un sandbox, nel repository ufficiale ce n'è uno già pronto con esempi docker-compose.yaml.

Ci soffermeremo più nel dettaglio sugli ultimi due punti.

0. Kafka Connect

Qui e più avanti nell'articolo tutti gli esempi di configurazione vengono discussi nel contesto dell'immagine Docker distribuita dagli sviluppatori Debezium. Contiene tutti i file plug-in necessari (connettori) e fornisce la configurazione di Kafka Connect utilizzando variabili di ambiente.

Se intendi utilizzare Kafka Connect di Confluent, dovrai aggiungere tu stesso i plugin dei connettori necessari alla directory specificata in plugin.path o impostato tramite una variabile d'ambiente CLASSPATH. Le impostazioni per il lavoratore Kafka Connect e i connettori vengono determinate tramite file di configurazione passati come argomenti al comando di avvio del lavoratore. Per i dettagli vedere documentazione.

L'intero processo di configurazione di Debeizum nella versione connettore viene eseguito in due fasi. Diamo un'occhiata a ciascuno di essi:

1. Configurazione del framework Kafka Connect

Per trasmettere i dati a un cluster Apache Kafka, vengono impostati parametri specifici nel framework Kafka Connect, come ad esempio:

  • parametri per la connessione al cluster,
  • nomi degli argomenti in cui verrà memorizzata direttamente la configurazione del connettore stesso,
  • il nome del gruppo in cui è in esecuzione il connettore (se viene utilizzata la modalità distribuita).

L'immagine Docker ufficiale del progetto supporta la configurazione utilizzando variabili di ambiente: questo è ciò che useremo. Quindi, scarica l'immagine:

docker pull debezium/connect

Il set minimo di variabili di ambiente richieste per eseguire il connettore è il seguente:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - elenco iniziale dei server del cluster Kafka per ottenere un elenco completo dei membri del cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets — un argomento per memorizzare le posizioni in cui si trova attualmente il connettore;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un argomento per memorizzare lo stato del connettore e le sue attività;
  • CONFIG_STORAGE_TOPIC=connector-config — argomento per la memorizzazione dei dati di configurazione del connettore e le sue attività;
  • GROUP_ID=1 — identificativo del gruppo di lavoratori su cui può essere eseguita l'attività di connettore; richiesto quando si utilizza distribuito (distribuito) modalità.

Iniziamo il contenitore con queste variabili:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota su Avro

Per impostazione predefinita, Debezium scrive i dati in formato JSON, che è accettabile per sandbox e piccole quantità di dati, ma può diventare un problema in database con carichi elevati. Un'alternativa a un convertitore JSON consiste nel serializzare i messaggi utilizzando Avro in un formato binario, che riduce il carico sul sottosistema I/O in Apache Kafka.

Per utilizzare Avro è necessario distribuire un file separato registro degli schemi (per la memorizzazione di schemi). Le variabili per il convertitore saranno simili a queste:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

I dettagli sull'utilizzo di Avro e sulla configurazione del relativo registro vanno oltre lo scopo dell'articolo: inoltre, per chiarezza, utilizzeremo JSON.

2. Configurazione del connettore stesso

Ora puoi andare direttamente alla configurazione del connettore stesso, che leggerà i dati dalla sorgente.

Consideriamo l'esempio dei connettori per due DBMS: PostgreSQL e MongoDB, in cui ho esperienza e in cui ci sono differenze (seppur piccole, ma in alcuni casi significative!).

La configurazione è descritta nella notazione JSON e caricata su Kafka Connect utilizzando una richiesta POST.

2.1.PostgreSQL

Esempio di configurazione del connettore per PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Il principio di funzionamento del connettore dopo questa configurazione è abbastanza semplice:

  • Al primo avvio si collega al database specificato nella configurazione e si avvia in modalità istantanea iniziale, inviando a Kafka il set iniziale di dati ricevuti con il condizionale SELECT * FROM table_name.
  • Al termine dell'inizializzazione, il connettore entra in modalità per leggere le modifiche dai file WAL PostgreSQL.

Informazioni sulle opzioni utilizzate:

  • name — il nome del connettore per il quale viene utilizzata la configurazione di seguito descritta; in futuro, questo nome verrà utilizzato per funzionare con il connettore (ovvero visualizzare lo stato/riavviare/aggiornare la configurazione) tramite l'API REST di Kafka Connect;
  • connector.class — la classe del connettore DBMS che verrà utilizzata dal connettore configurato;
  • plugin.name è il nome del plugin per la decodifica logica dei dati dai file WAL. Disponibile tra cui scegliere wal2json, decoderbuffs и pgoutput. I primi due richiedono l'installazione delle apposite estensioni nel DBMS, e pgoutput per PostgreSQL versione 10 e successive non sono necessarie ulteriori manipolazioni;
  • database.* — opzioni per la connessione al database, dove database.server.name — Nome dell'istanza PostgreSQL utilizzato per formare il nome dell'argomento nel cluster Kafka;
  • table.include.list — un elenco di tabelle in cui vogliamo tenere traccia delle modifiche; dato nel formato schema.table_name; non può essere utilizzato insieme a table.exclude.list;
  • heartbeat.interval.ms — intervallo (in millisecondi) con cui il connettore invia messaggi heartbeat a un argomento speciale;
  • heartbeat.action.query - una richiesta che verrà eseguita all'invio di ciascun messaggio heartbeat (l'opzione è presente dalla versione 1.1);
  • slot.name — il nome dello slot di replica che verrà utilizzato dal connettore;
  • publication.name - Nome Pubblicazione in PostgreSQL utilizzato dal connettore. Nel caso in cui non esista, Debezium proverà a crearlo. Se l'utente con cui viene effettuata la connessione non dispone di diritti sufficienti per questa azione, il connettore uscirà con un errore;
  • transforms determina come modificare esattamente il nome dell'argomento di destinazione:
    • transforms.AddPrefix.type indica che utilizzeremo le espressioni regolari;
    • transforms.AddPrefix.regex — una maschera che ridefinisce il nome dell'argomento target;
    • transforms.AddPrefix.replacement - direttamente ciò che stiamo ridefinendo.

Maggiori informazioni sul battito cardiaco e sulle trasformazioni

Per impostazione predefinita, il connettore invia dati a Kafka per ogni transazione impegnata e scrive il suo LSN (Log Sequence Number) nell'argomento del servizio offset. Ma cosa succede se il connettore è configurato per leggere non l'intero database, ma solo parte delle sue tabelle (in cui gli aggiornamenti dei dati non avvengono frequentemente)?

  • Il connettore leggerà i file WAL e non rileverà i commit delle transazioni in essi contenuti nelle tabelle monitorate.
  • Pertanto, non aggiornerà la sua posizione corrente né nell'argomento né nello slot di replica.
  • Ciò, a sua volta, farà sì che i file WAL rimangano "bloccati" sul disco e probabilmente esauriranno lo spazio su disco.

Ed è qui che le opzioni vengono in soccorso. heartbeat.interval.ms и heartbeat.action.query. L'utilizzo di queste opzioni in coppia rende possibile eseguire una richiesta di modifica dei dati in una tabella separata ogni volta che viene inviato un messaggio heartbeat. Pertanto, l'LSN su cui si trova attualmente il connettore (nello slot di replica) viene costantemente aggiornato. Ciò consente al DBMS di rimuovere i file WAL che non sono più necessari. Per ulteriori informazioni su come funzionano le opzioni, vedere documentazione.

Un'altra opzione degna di maggiore attenzione è transforms. Anche se è più una questione di comodità e bellezza...

Per impostazione predefinita, Debezium crea argomenti utilizzando la seguente politica di denominazione: serverName.schemaName.tableName. Questo potrebbe non essere sempre conveniente. Opzioni transforms È possibile utilizzare le espressioni regolari per definire un elenco di tabelle, gli eventi da cui devono essere instradati a un argomento con un nome specifico.

Nella nostra configurazione grazie a transforms accade quanto segue: tutti gli eventi CDC dal database tracciato andranno all'argomento con il nome data.cdc.dbname. Altrimenti (senza queste impostazioni), Debezium creerebbe per impostazione predefinita un argomento per ogni tabella come: pg-dev.public.<table_name>.

Limitazioni del connettore

Per concludere la descrizione della configurazione del connettore per PostgreSQL è opportuno parlare delle seguenti caratteristiche/limitazioni del suo funzionamento:

  1. La funzionalità del connettore per PostgreSQL si basa sul concetto di decodifica logica. Perciò lui non tiene traccia delle richieste di modifica della struttura del database (DDL) - di conseguenza, questi dati non saranno presenti negli argomenti.
  2. Poiché vengono utilizzati gli slot di replica, è possibile la connessione del connettore solo all'istanza del DBMS master.
  3. Se l'utente con cui il connettore si connette al database dispone di diritti di sola lettura, prima del primo avvio sarà necessario creare manualmente uno slot di replica e pubblicare nel database.

Applicazione di una configurazione

Carichiamo quindi la nostra configurazione nel connettore:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Controlliamo che il download sia andato a buon fine e che il connettore sia avviato:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Ottimo: è configurato e pronto per l'uso. Ora fingiamo di essere un consumatore e colleghiamoci a Kafka, dopodiché aggiungiamo e modifichiamo una voce nella tabella:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Nel nostro argomento, questo verrà visualizzato come segue:

JSON molto lungo con le nostre modifiche

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

In entrambi i casi, i record sono costituiti dalla chiave (PK) del record che è stato modificato e dall'essenza stessa delle modifiche: cosa era il record prima e cosa è diventato dopo.

  • Nel caso di INSERT: valore prima (before) equivale nullseguito dalla stringa che è stata inserita.
  • Nel caso di UPDATE: in payload.before viene visualizzato lo stato precedente della linea e in payload.after - nuovo con l'essenza del cambiamento.

2.2 MongoDB

Questo connettore utilizza il meccanismo di replica MongoDB standard, leggendo le informazioni dall'oplog del nodo primario DBMS.

Analogamente al connettore per PgSQL già descritto, anche qui al primo avvio viene eseguito lo snapshot dei dati primari, dopodiché il connettore passa alla modalità di lettura oplog.

Esempio di configurazione:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Come puoi vedere, non ci sono nuove opzioni rispetto all'esempio precedente, ma è stato ridotto solo il numero di opzioni responsabili della connessione al database e i loro prefissi.

Impostazioni transforms questa volta fanno quanto segue: trasformano il nome dell'argomento di destinazione dallo schema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolleranza ai guasti

Il problema della tolleranza agli errori e dell'elevata disponibilità nel nostro tempo è più acuto che mai, soprattutto quando si parla di dati e transazioni, e il monitoraggio delle modifiche dei dati non è da meno in questo problema. Diamo un'occhiata a cosa può andare storto in linea di principio e cosa accadrà a Debezium in ciascun caso.

Sono disponibili tre opzioni di rinuncia:

  1. Errore di Kafka Connect. Se Connect è configurato per funzionare in modalità distribuita, è necessario che più lavoratori impostino lo stesso group.id. Quindi, se uno di essi fallisce, il connettore verrà riavviato sull'altro lavoratore e continuerà a leggere dall'ultima posizione impegnata nell'argomento in Kafka.
  2. Perdita di connettività con il cluster Kafka. Il connettore smetterà semplicemente di leggere nella posizione di cui non è stato possibile inviare a Kafka e proverà periodicamente a inviarla nuovamente finché il tentativo non avrà esito positivo.
  3. Indisponibilità dell'origine dati. Il connettore tenterà di riconnettersi alla sorgente in base alla configurazione. L'impostazione predefinita è 16 tentativi di utilizzo backoff esponenziale. Dopo il 16° tentativo fallito, l'attività verrà contrassegnata come mancato e dovrà essere riavviato manualmente tramite l'interfaccia REST di Kafka Connect.
    • Nel caso di PostgreSQL i dati non andranno persi, perché l'utilizzo degli slot di replica impedirà l'eliminazione dei file WAL non letti dal connettore. In questo caso c'è uno svantaggio: se la connettività di rete tra il connettore e il DBMS viene interrotta per un lungo periodo, c'è la possibilità che lo spazio su disco si esaurisca e ciò potrebbe portare al guasto dell'intero DBMS.
    • Nel caso di MySQL i file binlog possono essere ruotati dal DBMS stesso prima che la connettività venga ripristinata. Ciò farà sì che il connettore entri nello stato di errore e sarà necessario riavviarlo in modalità snapshot iniziale per continuare a leggere dai binlog e ripristinare il normale funzionamento.
    • Про MongoDB. La documentazione dice: il comportamento del connettore nel caso in cui i file di log/oplog siano stati eliminati e il connettore non possa continuare a leggere dalla posizione in cui si era interrotto è lo stesso per tutti i DBMS. Sta nel fatto che il connettore entrerà nello stato mancato e richiederà un riavvio nella modalità istantanea iniziale.

      Tuttavia, ci sono delle eccezioni. Se il connettore è stato disconnesso per un lungo periodo (o non è riuscito a raggiungere l'istanza MongoDB) e l'oplog ha effettuato la rotazione durante questo periodo, quando la connessione viene ripristinata, il connettore continuerà tranquillamente a leggere i dati dalla prima posizione disponibile, ecco perché alcuni dati si trovano in Kafka no colpirà.

conclusione

Debezium è la mia prima esperienza con i sistemi CDC e nel complesso è molto positiva. Il progetto ha conquistato grazie al supporto dei principali DBMS, alla facilità di configurazione, al supporto del clustering e alla comunità attiva. Per chi è interessato alla pratica, consiglio di leggere le guide di Kafka Connect и Debezio.

Rispetto al connettore JDBC per Kafka Connect, il vantaggio principale di Debezium è che le modifiche vengono lette dai log DBMS, il che consente di ricevere i dati con un ritardo minimo. Il connettore JDBC (fornito da Kafka Connect) interroga la tabella tracciata a un intervallo fisso e (per lo stesso motivo) non genera messaggi quando i dati vengono eliminati (come è possibile eseguire una query per dati che non sono presenti?).

Per risolvere problemi simili, puoi prestare attenzione alle seguenti soluzioni (oltre a Debezium):

PS

Leggi anche sul nostro blog:

Fonte: habr.com

Aggiungi un commento