Predstavljamo Debezium - CDC za Apache Kafku

Predstavljamo Debezium - CDC za Apache Kafku

U svom radu često nailazim na nova tehnička rješenja / softverske proizvode, o kojima je prilično malo informacija na ruskom govornom području. Ovim ću člankom pokušati popuniti jednu takvu prazninu primjerom iz svoje nedavne prakse, kada sam trebao postaviti slanje CDC događaja iz dva popularna DBMS-a (PostgreSQL i MongoDB) u Kafka klaster koristeći Debezium. Nadam se da će ovaj pregledni članak, koji je nastao kao rezultat obavljenog rada, biti koristan i drugima.

Što je uopće Debezium i CDC?

Debezij - Predstavnik kategorije CDC softver (Snimite promjenu podataka), točnije, to je skup konektora za različite DBMS-ove koji su kompatibilni s okvirom Apache Kafka Connect.

ovo projekt otvorenog koda, licenciran pod licencom Apache v2.0 i sponzoriran od strane Red Hata. Razvoj je u tijeku od 2016. godine i trenutno pruža službenu podršku za sljedeće DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Također postoje konektori za Cassandru i Oracle, ali oni su trenutačno u statusu "ranog pristupa", a nova izdanja ne jamče kompatibilnost s prethodnim verzijama.

Ako CDC usporedimo s tradicionalnim pristupom (kada aplikacija izravno čita podatke iz DBMS-a), tada njegove glavne prednosti uključuju implementaciju protoka promjene podataka na razini retka uz nisku latenciju, visoku pouzdanost i dostupnost. Posljednje dvije točke postižu se korištenjem Kafka klastera kao repozitorija za CDC događaje.

Također, prednosti uključuju činjenicu da se jedan model koristi za pohranu događaja, tako da konačna aplikacija ne mora brinuti o nijansama rada različitih DBMS-a.

Konačno, korištenje brokera poruka otvara prostor za horizontalno skaliranje aplikacija koje prate promjene u podacima. U isto vrijeme, utjecaj na izvor podataka je minimiziran, budući da se podaci ne primaju izravno iz DBMS-a, već iz Kafka klastera.

O Debezium arhitekturi

Korištenje Debeziuma svodi se na ovu jednostavnu shemu:

DBMS (kao izvor podataka) → konektor u Kafka Connect → Apache Kafka → potrošač

Kao ilustraciju, dat ću dijagram s web stranice projekta:

Predstavljamo Debezium - CDC za Apache Kafku

Međutim, ova shema mi se baš ne sviđa, jer se čini da je moguć samo priključak za sudoper.

U stvarnosti je situacija drugačija: punjenje vašeg podatkovnog jezera (posljednji link u gornjem dijagramu) nije jedini način korištenja Debeziuma. Događaje poslane Apache Kafki vaše aplikacije mogu koristiti za rješavanje raznih situacija. Na primjer:

  • uklanjanje nevažnih podataka iz predmemorije;
  • slanje obavijesti;
  • ažuriranje indeksa pretraživanja;
  • neka vrsta revizijskih dnevnika;
  • ...

U slučaju da imate Java aplikaciju i nema potrebe/mogućnosti korištenja Kafka klastera, također postoji mogućnost rada preko ugrađeni konektor. Očigledni plus je da s njim možete odbiti dodatnu infrastrukturu (u obliku konektora i Kafke). Međutim, ovo rješenje je zastarjelo od verzije 1.1 i više se ne preporučuje za upotrebu (možda će biti uklonjeno u budućim izdanjima).

U ovom će se članku raspravljati o arhitekturi koju preporučuju programeri, a koja pruža toleranciju na greške i skalabilnost.

Konfiguracija konektora

Kako bismo počeli pratiti promjene u najvažnijoj vrijednosti - podatku - potrebno nam je:

  1. izvor podataka, koji može biti MySQL počevši od verzije 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (Potpuni popis);
  2. Apache Kafka klaster
  3. Kafka Connect instanca (verzije 1.x, 2.x);
  4. konfiguriran Debezium konektor.

Poradite na prve dvije točke, tj. postupak instaliranja DBMS-a i Apache Kafka izvan je okvira ovog članka. Međutim, za one koji žele sve implementirati u pješčanik, postoji gotov u službenom repozitoriju s primjerima docker-compose.yaml.

Usredotočit ćemo se na posljednje dvije točke detaljnije.

0. Kafka Connect

Ovdje i kasnije u članku, svi primjeri konfiguracije razmatraju se u kontekstu Docker slike koju distribuiraju Debezium programeri. Sadrži sve potrebne datoteke dodataka (konektore) i pruža Kafka Connect konfiguraciju pomoću varijabli okruženja.

Ako namjeravate koristiti Kafka Connect iz Confluenta, morat ćete sami dodati dodatke potrebnih konektora u direktorij naveden u plugin.path ili postaviti preko varijable okoline CLASSPATH. Postavke za Kafka Connect worker i konektore definirane su kroz konfiguracijske datoteke koje se prosljeđuju kao argumenti naredbi worker start. Za detalje vidi dokumentacija.

Cijeli proces postavljanja Debeizuma u verziji konektora odvija se u dvije faze. Razmotrimo svaki od njih:

1. Postavljanje okvira Kafka Connect

Za strujanje podataka u Apache Kafka klaster, određeni parametri postavljaju se u okviru Kafka Connect, kao što su:

  • postavke veze klastera,
  • imena tema u kojima će biti pohranjena konfiguracija samog konektora,
  • naziv grupe u kojoj se konektor izvodi (u slučaju korištenja distribuiranog načina rada).

Službena Docker slika projekta podržava konfiguraciju pomoću varijabli okruženja - to je ono što ćemo koristiti. Pa preuzmimo sliku:

docker pull debezium/connect

Minimalni skup varijabli okruženja potrebnih za pokretanje konektora je sljedeći:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - početni popis poslužitelja Kafka klastera za dobivanje kompletnog popisa članova klastera;
  • OFFSET_STORAGE_TOPIC=connector-offsets — tema za pohranjivanje pozicija na kojima se konektor trenutno nalazi;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - tema za pohranu statusa konektora i njegovih zadataka;
  • CONFIG_STORAGE_TOPIC=connector-config - tema za pohranu podataka o konfiguraciji konektora i njegovih zadataka;
  • GROUP_ID=1 — identifikator grupe radnika na kojima se može izvršiti zadatak konektora; potreban pri korištenju distribuiranih (distribuirano) režim.

Spremnik pokrećemo ovim varijablama:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Napomena o Avru

Prema zadanim postavkama, Debezium zapisuje podatke u JSON formatu, što je prihvatljivo za sandboxe i male količine podataka, ali može biti problem u jako opterećenim bazama podataka. Alternativa JSON pretvaraču je serijalizacija poruka pomoću Avro u binarni format, što smanjuje opterećenje I/O podsustava u Apache Kafka.

Da biste koristili Avro, trebate implementirati zaseban shema-registar (za pohranjivanje shema). Varijable za pretvarač će izgledati ovako:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Pojedinosti o korištenju Avra ​​i postavljanju registra za njega izvan su opsega članka - nadalje, radi jasnoće, koristit ćemo JSON.

2. Postavljanje samog priključka

Sada možete ići izravno na konfiguraciju samog konektora, koji će čitati podatke iz izvora.

Pogledajmo primjer konektora za dva DBMS-a: PostgreSQL i MongoDB, za koje imam iskustva i za koje postoje razlike (doduše male, ali u nekim slučajevima značajne!).

Konfiguracija je opisana u JSON notaciji i prenesena na Kafka Connect pomoću POST zahtjeva.

2.1. PostgreSQL

Primjer konfiguracije konektora za PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Princip rada konektora nakon ove konfiguracije je prilično jednostavan:

  • Pri prvom pokretanju povezuje se s bazom podataka navedenom u konfiguraciji i pokreće se u načinu rada početni snimak, šaljući Kafki početni skup primljenih podataka s uvjetom SELECT * FROM table_name.
  • Nakon završene inicijalizacije, konektor ulazi u mod čitanja promjena iz PostgreSQL WAL datoteka.

O korištenim opcijama:

  • name — naziv konektora za koji se koristi dolje opisana konfiguracija; ubuduće se ovo ime koristi za rad s konektorom (tj. pregled statusa / ponovno pokretanje / ažuriranje konfiguracije) putem Kafka Connect REST API-ja;
  • connector.class — klasu DBMS konektora koju će koristiti konfigurirani konektor;
  • plugin.name je naziv dodatka za logičko dekodiranje podataka iz WAL datoteka. Dostupno za odabir wal2json, decoderbuffs и pgoutput. Prva dva zahtijevaju instalaciju odgovarajućih proširenja u DBMS, i pgoutput za PostgreSQL verziju 10 i više ne zahtijeva dodatne manipulacije;
  • database.* — opcije za povezivanje s bazom podataka, gdje database.server.name - ime PostgreSQL instance koja se koristi za formiranje naziva teme u Kafka klasteru;
  • table.include.list - popis tablica u kojima želimo pratiti promjene; dati u formatu schema.table_name; ne može se koristiti zajedno s table.exclude.list;
  • heartbeat.interval.ms — interval (u milisekundama) s kojim konektor šalje otkucajne poruke posebnoj temi;
  • heartbeat.action.query - zahtjev koji će se izvršiti prilikom slanja svake otkucajne poruke (opcija se pojavila od verzije 1.1);
  • slot.name — naziv utora za replikaciju koji će koristiti konektor;
  • publication.name - Ime Publikacija u PostgreSQL koji konektor koristi. U slučaju da ne postoji, Debezium će ga pokušati stvoriti. Ako korisnik pod kojim je uspostavljena veza nema dovoljno prava za ovu radnju, konektor će izaći s pogreškom;
  • transforms određuje kako točno promijeniti naziv ciljane teme:
    • transforms.AddPrefix.type označava da ćemo koristiti regularne izraze;
    • transforms.AddPrefix.regex — maska ​​kojom se redefinira naziv ciljane teme;
    • transforms.AddPrefix.replacement - izravno ono što redefiniramo.

Više o otkucajima srca i transformacijama

Prema zadanim postavkama, konektor šalje podatke Kafki za svaku predanu transakciju i zapisuje svoj LSN (redni broj dnevnika) u temu usluge offset. Ali što se događa ako je konektor konfiguriran za čitanje ne cijele baze podataka, već samo dijela njezinih tablica (u kojima se podaci rijetko ažuriraju)?

  • Konektor će čitati WAL datoteke i neće otkriti predaje transakcija u njima tablicama koje nadzire.
  • Stoga neće ažurirati svoju trenutnu poziciju ni u temi ni u utoru za replikaciju.
  • To će zauzvrat uzrokovati da WAL datoteke "zaglave" na disku i da će vjerojatno ostati bez prostora na disku.

I ovdje opcije dolaze u pomoć. heartbeat.interval.ms и heartbeat.action.query. Korištenje ovih opcija u paru omogućuje izvršavanje zahtjeva za promjenom podataka u zasebnoj tablici svaki put kada se pošalje poruka otkucaja srca. Dakle, LSN na kojem se konektor trenutno nalazi (u utoru za replikaciju) stalno se ažurira. To omogućuje DBMS-u da ukloni WAL datoteke koje više nisu potrebne. Za više informacija o tome kako opcije funkcioniraju, pogledajte dokumentacija.

Još jedna opcija koja zaslužuje veću pozornost je transforms. Iako se više radi o praktičnosti i ljepoti...

Prema zadanim postavkama, Debezium stvara teme koristeći sljedeća pravila imenovanja: serverName.schemaName.tableName. Ovo možda nije uvijek zgodno. Mogućnosti transforms koristeći regularne izraze, možete definirati popis tablica čiji se događaji trebaju preusmjeriti na temu s određenim nazivom.

U našoj konfiguraciji zahvaljujući transforms događa se sljedeće: svi CDC događaji iz praćene baze podataka ići će u temu s imenom data.cdc.dbname. U suprotnom (bez ovih postavki), Debezium bi prema zadanim postavkama stvorio temu za svaku tablicu obrasca: pg-dev.public.<table_name>.

Ograničenja konektora

Na kraju opisa konfiguracije konektora za PostgreSQL, vrijedi govoriti o sljedećim značajkama / ograničenjima njegovog rada:

  1. Funkcionalnost konektora za PostgreSQL oslanja se na koncept logičkog dekodiranja. Stoga on ne prati zahtjeve za promjenom strukture baze podataka (DDL) - sukladno tome, ovi podaci neće biti u temama.
  2. Budući da se koriste utori za replikaciju, povezivanje konektora je moguće samo glavnoj instanci DBMS-a.
  3. Ako korisnik pod kojim se konektor povezuje s bazom podataka ima prava samo za čitanje, tada ćete prije prvog pokretanja morati ručno stvoriti utor za replikaciju i objaviti u bazi podataka.

Primjena konfiguracije

Dakle, učitajmo našu konfiguraciju u konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Provjeravamo je li preuzimanje uspješno i konektor pokrenut:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Sjajno: postavljeno je i spremno za rad. Sada se pretvarajmo da smo potrošač i spojimo se na Kafku, nakon čega dodamo i promijenimo unos u tablici:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

U našoj temi to će biti prikazano na sljedeći način:

Vrlo dugačak JSON s našim izmjenama

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

U oba slučaja, zapisi se sastoje od ključa (PK) zapisa koji je promijenjen, te same suštine promjena: što je zapis bio prije i što je postao poslije.

  • U slučaju INSERT: vrijednost prije (before) jednako nullnakon čega slijedi niz koji je umetnut.
  • U slučaju UPDATE: v payload.before prikazuje se prethodno stanje retka, a in payload.after - novo sa suštinom promjene.

2.2 MongoDB

Ovaj konektor koristi standardni MongoDB mehanizam replikacije, čitajući informacije iz oplog primarnog čvora DBMS-a.

Slično kao i kod već opisanog konektora za PgSQL, i ovdje se pri prvom pokretanju radi primarna snimka podataka, nakon čega konektor prelazi u mod čitanja oplog-a.

Primjer konfiguracije:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kao što vidite, nema novih opcija u odnosu na prethodni primjer, već je samo smanjen broj opcija zaduženih za povezivanje s bazom podataka i njihovih prefiksa.

postavke transforms ovaj put čine sljedeće: skreću naziv ciljane teme iz sheme <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerancija kvarova

Pitanje tolerancije na pogreške i visoke dostupnosti u naše je vrijeme akutnije nego ikad - pogotovo kada govorimo o podacima i transakcijama, a praćenje promjena podataka u ovom pitanju nije po strani. Pogledajmo što u načelu može poći po zlu i što će se dogoditi s Debeziumom u svakom slučaju.

Postoje tri opcije isključivanja:

  1. Kafka Connect kvar. Ako je Connect konfiguriran za rad u distribuiranom načinu rada, to zahtijeva da više radnika postavi isti group.id. Zatim, ako jedan od njih ne uspije, konektor će se ponovno pokrenuti na drugom radniku i nastaviti s čitanjem od posljednje angažirane pozicije u temi u Kafki.
  2. Gubitak povezanosti s Kafka klasterom. Konektor će jednostavno prestati čitati na poziciji koju nije uspio poslati Kafki i povremeno će ga pokušati ponovno poslati dok pokušaj ne uspije.
  3. Izvor podataka nedostupan. Priključak će se pokušati ponovno spojiti na izvor u skladu s konfiguracijom. Zadano je 16 pokušaja korištenja eksponencijalni odmak. Nakon 16. neuspjelog pokušaja, zadatak će biti označen kao Neuspjela i trebat će ga ručno ponovno pokrenuti putem Kafka Connect REST sučelja.
    • U slučaju PostgreSQL podaci neće biti izgubljeni, jer korištenje utora za replikaciju spriječit će brisanje WAL datoteka koje konektor ne čita. U ovom slučaju postoji loša strana: ako je mrežna povezanost između konektora i DBMS-a prekinuta dulje vrijeme, postoji mogućnost da će ponestati prostora na disku, a to može dovesti do kvara cijelog DBMS-a.
    • U slučaju MySQL binlog datoteke može rotirati sam DBMS prije ponovnog uspostavljanja veze. To će dovesti do toga da konektor prijeđe u neuspješno stanje, a da bi se vratio normalan rad, morat će se ponovno pokrenuti u početnom načinu snimanja kako bi nastavio čitati iz binlogova.
    • na MongoDB. Dokumentacija kaže: ponašanje konektora u slučaju da su log/oplog datoteke obrisane i konektor ne može nastaviti čitati od mjesta gdje je stao isto je za sve DBMS. Leži u činjenici da će konektor otići u stanje Neuspjela i zahtijevat će ponovno pokretanje u načinu rada početni snimak.

      Međutim, postoje iznimke. Ako je konektor bio u nepovezanom stanju dulje vrijeme (ili nije mogao doći do MongoDB instance), a oplog je tijekom tog vremena rotiran, tada će, kada se veza uspostavi, konektor mirno nastaviti čitati podatke s prve dostupne pozicije , zbog čega neki od podataka kod Kafke ne će pogoditi.

Zaključak

Debezium je moje prvo iskustvo s CDC sustavima i općenito je vrlo pozitivno. Projekt je podmitio podršku glavnog DBMS-a, jednostavnost konfiguracije, podršku klasteriranja i aktivnu zajednicu. Za one koje zanima praksa, preporučujem da pročitaju vodiče za Kafka Connect и Debezij.

U usporedbi s JDBC konektorom za Kafka Connect, glavna prednost Debeziuma je to što se promjene čitaju iz DBMS zapisa, što omogućuje primanje podataka s minimalnim kašnjenjem. JDBC konektor (omogućen od strane Kafka Connect) postavlja upite praćenoj tablici u fiksnom intervalu i (iz istog razloga) ne generira poruke kada se podaci izbrišu (kako možete tražiti podatke kojih nema?).

Za rješavanje sličnih problema možete obratiti pozornost na sljedeća rješenja (uz Debezium):

PS

Pročitajte i na našem blogu:

Izvor: www.habr.com

Dodajte komentar