Vă prezentăm Debezium - CDC pentru Apache Kafka

Vă prezentăm Debezium - CDC pentru Apache Kafka

În munca mea, întâlnesc adesea noi soluții tehnice / produse software, informații despre care sunt destul de puține pe internetul vorbitor de limbă rusă. Cu acest articol, voi încerca să umple un astfel de gol cu ​​un exemplu din practica mea recentă, când trebuia să configurez trimiterea de evenimente CDC de la două SGBD-uri populare (PostgreSQL și MongoDB) către un cluster Kafka folosind Debezium. Sper ca acest articol de recenzie, care a aparut ca urmare a muncii depuse, sa fie de folos altora.

Ce este Debezium și CDC în general?

Debezium - Reprezentant al categoriei de software CDC (Capturați modificarea datelor), sau mai exact, este un set de conectori pentru diferite SGBD-uri care sunt compatibile cu framework-ul Apache Kafka Connect.

Aceasta proiect open source, licențiat sub licența Apache v2.0 și sponsorizat de Red Hat. Dezvoltarea este în curs din 2016 și în prezent oferă suport oficial pentru următoarele SGBD: MySQL, PostgreSQL, MongoDB, SQL Server. Există și conectori pentru Cassandra și Oracle, dar în acest moment sunt în starea de „acces timpuriu”, iar noile versiuni nu garantează compatibilitatea cu versiunea anterioară.

Dacă comparăm CDC cu abordarea tradițională (când aplicația citește datele direct din DBMS), atunci principalele sale avantaje includ implementarea fluxului de modificare a datelor la nivel de rând cu latență scăzută, fiabilitate și disponibilitate ridicate. Ultimele două puncte sunt obținute prin utilizarea unui cluster Kafka ca depozit pentru evenimente CDC.

De asemenea, avantajele includ și faptul că un singur model este utilizat pentru stocarea evenimentelor, astfel încât aplicația finală nu trebuie să-și facă griji cu privire la nuanțele operarii diferitelor SGBD.

În cele din urmă, utilizarea unui broker de mesaje deschide spațiu pentru scalarea orizontală a aplicațiilor care urmăresc modificările datelor. În același timp, impactul asupra sursei de date este minimizat, deoarece datele sunt primite nu direct de la SGBD, ci de la clusterul Kafka.

Despre arhitectura Debezium

Utilizarea Debezium se reduce la această schemă simplă:

DBMS (ca sursă de date) → conector în Kafka Connect → Apache Kafka → consumer

Ca o ilustrare, voi oferi o diagramă de pe site-ul proiectului:

Vă prezentăm Debezium - CDC pentru Apache Kafka

Cu toate acestea, nu prea îmi place această schemă, deoarece se pare că este posibil doar un conector pentru chiuvetă.

În realitate, situația este diferită: umplerea Data Lake (ultimul link din diagrama de mai sus) nu este singura modalitate de a utiliza Debezium. Evenimentele trimise către Apache Kafka pot fi folosite de aplicațiile dumneavoastră pentru a face față diferitelor situații. De exemplu:

  • eliminarea datelor irelevante din cache;
  • trimiterea notificărilor;
  • actualizări ale indexului de căutare;
  • un fel de jurnal de audit;
  • ...

În cazul în care aveți o aplicație Java și nu este nevoie/posibilitatea de a utiliza un cluster Kafka, există și posibilitatea de a lucra prin conector încorporat. Plusul evident este că cu el puteți refuza infrastructura suplimentară (sub formă de conector și Kafka). Cu toate acestea, această soluție a fost depreciată de la versiunea 1.1 și nu mai este recomandată pentru utilizare (poate fi eliminată în versiunile viitoare).

Acest articol va discuta despre arhitectura recomandată de dezvoltatori, care oferă toleranță la erori și scalabilitate.

Configurarea conectorului

Pentru a începe să urmărim modificările celei mai importante valori - date - avem nevoie de:

  1. sursa de date, care poate fi MySQL începând cu versiunea 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lista completă);
  2. cluster Apache Kafka
  3. Instanță Kafka Connect (versiunile 1.x, 2.x);
  4. conector Debezium configurat.

Lucrați la primele două puncte, adică procesul de instalare a unui SGBD și Apache Kafka depășește domeniul de aplicare al articolului. Cu toate acestea, pentru cei care doresc să implementeze totul într-un sandbox, există unul gata făcut în depozitul oficial cu exemple docker-compose.yaml.

Ne vom concentra mai detaliat asupra ultimelor două puncte.

0. Kafka Connect

Aici și mai târziu în articol, toate exemplele de configurare sunt luate în considerare în contextul imaginii Docker distribuită de dezvoltatorii Debezium. Conține toate fișierele plugin necesare (conectori) și oferă configurarea Kafka Connect folosind variabile de mediu.

Dacă intenționați să utilizați Kafka Connect de la Confluent, va trebui să adăugați dvs. pluginurile conectorilor necesari în directorul specificat în plugin.path sau setat printr-o variabilă de mediu CLASSPATH. Setările pentru lucrătorul Kafka Connect și conectorii sunt definite prin fișierele de configurare care sunt transmise ca argumente comenzii de pornire a lucrătorului. Pentru detalii, vezi documentație.

Întregul proces de configurare a Debeizum în versiunea conector se desfășoară în două etape. Să luăm în considerare fiecare dintre ele:

1. Configurarea cadrului Kafka Connect

Pentru a transmite date către clusterul Apache Kafka, parametrii specifici sunt setați în cadrul Kafka Connect, cum ar fi:

  • setări de conectare la cluster,
  • numele subiectelor în care va fi stocată configurația conectorului în sine,
  • numele grupului în care rulează conectorul (în cazul utilizării modului distribuit).

Imaginea oficială Docker a proiectului acceptă configurarea folosind variabile de mediu - aceasta este ceea ce vom folosi. Deci haideți să descarcăm imaginea:

docker pull debezium/connect

Setul minim de variabile de mediu necesare pentru a rula conectorul este următorul:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — lista inițială a serverelor de cluster Kafka pentru a obține o listă completă a membrilor clusterului;
  • OFFSET_STORAGE_TOPIC=connector-offsets — un subiect pentru stocarea pozițiilor în care se află în prezent conectorul;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - un subiect pentru stocarea stării conectorului și a sarcinilor acestuia;
  • CONFIG_STORAGE_TOPIC=connector-config - un subiect pentru stocarea datelor de configurare a conectorilor și sarcinile acestuia;
  • GROUP_ID=1 — identificatorul grupului de lucrători asupra căruia se poate executa sarcina de conectare; necesar atunci când se utilizează distribuit (distribuit) regim.

Începem containerul cu aceste variabile:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Notă despre Avro

În mod implicit, Debezium scrie datele în format JSON, care este acceptabil pentru sandbox-uri și cantități mici de date, dar poate fi o problemă în bazele de date foarte încărcate. O alternativă la convertorul JSON este serializarea mesajelor folosind Avro într-un format binar, care reduce sarcina pe subsistemul I/O din Apache Kafka.

Pentru a utiliza Avro, trebuie să implementați un separat schema-registru (pentru stocarea schemelor). Variabilele pentru convertor vor arăta astfel:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detaliile despre utilizarea Avro și configurarea registrului pentru acesta depășesc domeniul de aplicare al acestui articol - mai departe, pentru claritate, vom folosi JSON.

2. Configurarea conectorului în sine

Acum puteți merge direct la configurația conectorului în sine, care va citi datele din sursă.

Să ne uităm la exemplul de conectori pentru două DBMS: PostgreSQL și MongoDB, pentru care am experiență și pentru care există diferențe (deși mici, dar în unele cazuri semnificative!).

Configurația este descrisă în notație JSON și încărcată în Kafka Connect folosind o solicitare POST.

2.1. PostgreSQL

Exemplu de configurare a conectorului pentru PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Principiul de funcționare a conectorului după această configurație este destul de simplu:

  • Când este lansat pentru prima dată, se conectează la baza de date specificată în configurație și pornește în modul instantaneu inițial, trimițând lui Kafka setul inițial de date primit cu condiționalul SELECT * FROM table_name.
  • După finalizarea inițializării, conectorul intră în modul de citire a modificărilor din fișierele WAL PostgreSQL.

Despre opțiunile utilizate:

  • name — numele conectorului pentru care se utilizează configurația descrisă mai jos; în viitor, acest nume este folosit pentru a lucra cu conectorul (adică, vizualiza starea/repornirea/actualizați configurația) prin intermediul API-ului REST Kafka Connect;
  • connector.class — clasa de conector DBMS care va fi utilizată de conectorul configurat;
  • plugin.name este numele pluginului pentru decodarea logică a datelor din fișierele WAL. Disponibil pentru a alege wal2json, decoderbuffs и pgoutput. Primele două necesită instalarea extensiilor corespunzătoare în SGBD și pgoutput pentru PostgreSQL versiunea 10 și superioară nu necesită manipulări suplimentare;
  • database.* — opțiuni de conectare la baza de date, unde database.server.name — Numele instanței PostgreSQL folosit pentru a forma numele subiectului în clusterul Kafka;
  • table.include.list - o listă de tabele în care dorim să urmărim modificările; dat în format schema.table_name; nu poate fi folosit împreună cu table.exclude.list;
  • heartbeat.interval.ms — interval (în milisecunde) cu care conectorul trimite mesaje de bătăi inimii către un subiect special;
  • heartbeat.action.query - o solicitare care va fi executata la trimiterea fiecarui mesaj heartbeat (optiunea a aparut inca de la versiunea 1.1);
  • slot.name — numele slotului de replicare care va fi utilizat de conector;
  • publication.name - Nume publicare în PostgreSQL pe care îl folosește conectorul. În cazul în care nu există, Debezium va încerca să o creeze. Dacă utilizatorul sub care se realizează conexiunea nu are suficiente drepturi pentru această acțiune, conectorul va ieși cu o eroare;
  • transforms stabilește cum exact să schimbi numele subiectului țintă:
    • transforms.AddPrefix.type indică faptul că vom folosi expresii regulate;
    • transforms.AddPrefix.regex — mască prin care este redefinit denumirea subiectului țintă;
    • transforms.AddPrefix.replacement - direct ceea ce redefinim.

Mai multe despre bătăile inimii și transformări

În mod implicit, conectorul trimite date către Kafka pentru fiecare tranzacție angajată, iar LSN (Numărul de secvență de jurnal) este înregistrat în subiectul de serviciu offset. Dar ce se întâmplă dacă conectorul este configurat să citească nu întreaga bază de date, ci doar o parte din tabelele acesteia (în care actualizările de date nu au loc frecvent)?

  • Conectorul va citi fișierele WAL și nu va detecta nicio tranzacție comisă în tabelele pe care le monitorizează.
  • Prin urmare, nu își va actualiza poziția actuală nici în subiect, nici în slotul de replicare.
  • Acest lucru, la rândul său, va face ca fișierele WAL să fie „blocate” pe disc și, probabil, vor rămâne fără spațiu pe disc.

Și aici opțiunile vin în ajutor. heartbeat.interval.ms и heartbeat.action.query. Utilizarea acestor opțiuni în perechi face posibilă executarea unei cereri de modificare a datelor într-un tabel separat de fiecare dată când este trimis un mesaj de bătăi inimii. Astfel, LSN-ul pe care se află în prezent conectorul (în slotul de replicare) este actualizat constant. Acest lucru permite DBMS-ului să elimine fișierele WAL care nu mai sunt necesare. Pentru mai multe informații despre cum funcționează opțiunile, consultați documentație.

O altă opțiune care merită o atenție mai atentă este transforms. Deși este mai mult despre comoditate și frumusețe...

În mod implicit, Debezium creează subiecte folosind următoarea politică de denumire: serverName.schemaName.tableName. Acest lucru poate să nu fie întotdeauna convenabil. Opțiuni transforms folosind expresii regulate, puteți defini o listă de tabele ale căror evenimente trebuie direcționate către un subiect cu un nume specific.

În configurația noastră, mulțumesc transforms se întâmplă următoarele: toate evenimentele CDC din baza de date urmărită vor merge la subiectul cu numele data.cdc.dbname. În caz contrar (fără aceste setări), Debezium ar crea în mod implicit un subiect pentru fiecare tabel al formularului: pg-dev.public.<table_name>.

Limitări ale conectorului

La sfârșitul descrierii configurației conectorului pentru PostgreSQL, merită să vorbim despre următoarele caracteristici / limitări ale activității sale:

  1. Funcționalitatea conectorului pentru PostgreSQL se bazează pe conceptul de decodare logică. Prin urmare el nu urmărește cererile de modificare a structurii bazei de date (DDL) - în consecință, aceste date nu vor fi în subiecte.
  2. Deoarece sunt utilizate sloturi de replicare, conectarea conectorului este posibilă numai către instanța principală DBMS.
  3. Dacă utilizatorul sub care se conectează conectorul la baza de date are drepturi de numai citire, atunci înainte de prima lansare, va trebui să creați manual un slot de replicare și să publicați în baza de date.

Aplicarea unei configurații

Deci, să încărcăm configurația noastră în conector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Verificăm dacă descărcarea a avut succes și conectorul a început:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Grozav: este configurat și gata de funcționare. Acum să ne prefacem că suntem un consumator și să ne conectăm la Kafka, după care vom adăuga și vom schimba o intrare în tabel:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

În subiectul nostru, acesta va fi afișat după cum urmează:

JSON foarte lung cu modificările noastre

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

În ambele cazuri, înregistrările constau din cheia (PK) a înregistrării care a fost schimbată și însăși esența modificărilor: ce a fost înregistrarea înainte și ce a devenit după.

  • În cazul INSERT: valoarea înainte de (before) egal nullurmat de șirul care a fost introdus.
  • În cazul UPDATE: în payload.before este afișată starea anterioară a rândului și în payload.after - nou cu esența schimbării.

2.2 MongoDB

Acest conector folosește mecanismul standard de replicare MongoDB, citind informații din oplog-ul nodului primar DBMS.

În mod similar cu conectorul deja descris pentru PgSQL, și aici, la prima pornire, este luată instantaneul de date primar, după care conectorul trece în modul de citire oplog.

Exemplu de configurare:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

După cum puteți vedea, nu există opțiuni noi în comparație cu exemplul anterior, ci doar numărul de opțiuni responsabile pentru conectarea la baza de date și prefixele acestora a fost redus.

Setări transforms de data aceasta fac următoarele: întoarceți numele subiectului țintă din schemă <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

toleranta la greseli

Problema toleranței la erori și a disponibilității ridicate în vremurile noastre este mai acută ca niciodată - mai ales când vorbim despre date și tranzacții, iar urmărirea modificărilor datelor nu stă deoparte în această problemă. Să ne uităm la ce poate merge prost în principiu și ce se va întâmpla cu Debezium în fiecare caz.

Există trei opțiuni de renunțare:

  1. Eroare Kafka Connect. Dacă Connect este configurat să funcționeze în modul distribuit, aceasta necesită ca mai mulți lucrători să seteze același group.id. Apoi, dacă unul dintre ele eșuează, conectorul va fi repornit pe celălalt lucrător și va continua să citească din ultima poziție comisă în subiectul din Kafka.
  2. Pierderea conectivității cu clusterul Kafka. Conectorul va opri pur și simplu să citească în poziția pe care nu a reușit să o trimită către Kafka și va încerca periodic să-l retrimită până când încercarea reușește.
  3. Sursa de date indisponibilă. Conectorul va încerca să se reconecteze la sursă conform configurației. Valoarea implicită este de 16 încercări de utilizare retragere exponențială. După a 16-a încercare eșuată, sarcina va fi marcată ca a eșuat și va trebui să fie repornit manual prin interfața REST Kafka Connect.
    • În cazul PostgreSQL datele nu se vor pierde, deoarece Utilizarea sloturilor de replicare vă va împiedica să ștergeți fișierele WAL care nu sunt citite de conector. În acest caz, există și un dezavantaj al monedei: dacă conectivitatea la rețea dintre conector și DBMS este întreruptă pentru o lungă perioadă de timp, există posibilitatea ca spațiul pe disc să se epuizeze, iar acest lucru poate duce la o defecțiune a întregul SGBD.
    • În cazul MySQL fișierele binlog pot fi rotite de către DBMS înainte ca conectivitatea să fie restabilită. Acest lucru va face ca conectorul să intre în starea eșuată și va trebui să repornească în modul instantaneu inițial pentru a continua citirea din binlog-uri pentru a restabili funcționarea normală.
    • despre MongoDB. Documentația spune: comportamentul conectorului în cazul în care fișierele log/oplog au fost șterse și conectorul nu poate continua să citească din poziția în care a rămas este același pentru toate DBMS. Constă în faptul că conectorul va intra în stare a eșuat și va necesita o repornire în modul instantaneu inițial.

      Cu toate acestea, există și excepții. Dacă conectorul a fost într-o stare deconectată pentru o perioadă lungă de timp (sau nu a putut ajunge la instanța MongoDB) și oplog a fost rotit în acest timp, atunci când conexiunea este restabilită, conectorul va continua să citească cu calm datele din prima poziție disponibilă , motiv pentru care unele dintre datele din Kafka nu va lovi.

Concluzie

Debezium este prima mea experiență cu sistemele CDC și a fost în general foarte pozitivă. Proiectul a câștigat prin suportul pentru SGBD-uri majore, ușurința de configurare, suport pentru clustering și comunitate activă. Pentru cei interesați de practică, vă recomand să citiți ghidurile pentru Kafka Connect и Debezium.

În comparație cu conectorul JDBC pentru Kafka Connect, principalul avantaj al Debezium este că modificările sunt citite din jurnalele DBMS, ceea ce permite ca datele să fie primite cu întârziere minimă. Conectorul JDBC (furnizat de Kafka Connect) interogează tabelul urmărit la un interval fix și (din același motiv) nu generează mesaje atunci când datele sunt șterse (cum puteți interoga datele care nu există?).

Pentru a rezolva probleme similare, puteți acorda atenție următoarelor soluții (pe lângă Debezium):

PS

Citește și pe blogul nostru:

Sursa: www.habr.com

Adauga un comentariu