Predstavljamo Debezium - CDC za Apache Kafku

Predstavljamo Debezium - CDC za Apache Kafku

U svom radu često nailazim na nova tehnička rješenja/softverske proizvode, o kojima su informacije prilično oskudne na internetu na ruskom govornom području. U ovom članku pokušat ću popuniti jednu takvu prazninu primjerom iz moje nedavne prakse, kada sam trebao podesiti slanje CDC događaja iz dva popularna DBMS-a (PostgreSQL i MongoDB) u Kafka klaster koristeći Debezium. Nadam se da će ovaj pregledni članak, koji se pojavio kao rezultat obavljenog posla, biti koristan drugima.

Šta je Debezium i CDC općenito?

Debezijum - Predstavnik kategorije CDC softvera (Snimite promjenu podataka), tačnije, to je skup konektora za različite DBMS-ove koji su kompatibilni sa Apache Kafka Connect framework-om.

ovo projekat otvorenog koda, licenciran pod Apache License v2.0 i sponzoriran od strane Red Hata. Razvoj je u toku od 2016. godine i trenutno pruža zvaničnu podršku za sledeće DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Postoje i konektori za Cassandru i Oracle, ali oni su trenutno u statusu "ranog pristupa", a nova izdanja ne garantuju kompatibilnost unatrag.

Ako uporedimo CDC sa tradicionalnim pristupom (kada aplikacija direktno čita podatke iz DBMS-a), onda njegove glavne prednosti uključuju implementaciju striminga promjene podataka na nivou reda s niskim kašnjenjem, visokom pouzdanošću i dostupnošću. Posljednje dvije tačke se postižu korištenjem Kafka klastera kao spremišta za CDC događaje.

Takođe, prednosti uključuju činjenicu da se za pohranjivanje događaja koristi jedan model, tako da konačna aplikacija ne mora da brine o nijansama rada različitih DBMS-a.

Konačno, korištenje posrednika poruka otvara prostor za horizontalno skaliranje aplikacija koje prate promjene u podacima. U isto vrijeme, utjecaj na izvor podataka je minimiziran, jer se podaci ne primaju direktno iz DBMS-a, već iz Kafka klastera.

O Debezium arhitekturi

Korištenje Debeziuma se svodi na ovu jednostavnu shemu:

DBMS (kao izvor podataka) → konektor u Kafka Connect → Apache Kafka → potrošač

Kao ilustraciju, dat ću dijagram sa web stranice projekta:

Predstavljamo Debezium - CDC za Apache Kafku

Međutim, ova shema mi se baš i ne sviđa, jer se čini da je moguć samo konektor za sudoper.

U stvarnosti, situacija je drugačija: punjenje vašeg Data Lakea (zadnja veza na dijagramu iznad) nije jedini način da se koristi Debezium. Vaše aplikacije mogu koristiti događaje poslane Apache Kafki za rješavanje različitih situacija. Na primjer:

  • uklanjanje irelevantnih podataka iz keša;
  • slanje obavještenja;
  • ažuriranja indeksa pretraživanja;
  • neka vrsta evidencije revizije;
  • ...

U slučaju da imate Java aplikaciju i nema potrebe/mogućnosti korištenja Kafka klastera, postoji i mogućnost rada kroz embedded konektor. Očigledan plus je što s njim možete odbiti dodatnu infrastrukturu (u obliku konektora i Kafke). Međutim, ovo rješenje je zastarjelo od verzije 1.1 i više se ne preporučuje za korištenje (može biti uklonjeno u budućim izdanjima).

Ovaj članak će raspravljati o arhitekturi koju preporučuju programeri, koja pruža toleranciju grešaka i skalabilnost.

Konfiguracija konektora

Da bismo započeli praćenje promjena najvažnije vrijednosti – podataka – potrebno nam je:

  1. izvor podataka, koji može biti MySQL počevši od verzije 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (potpuni popis);
  2. Apache Kafka klaster
  3. Kafka Connect instanca (verzije 1.x, 2.x);
  4. konfigurisani Debezium konektor.

Radite na prve dvije tačke, tj. proces instaliranja DBMS-a i Apache Kafka su izvan okvira ovog članka. Međutim, za one koji žele sve postaviti u sandbox, postoji gotovi u službenom spremištu s primjerima docker-compose.yaml.

Fokusiraćemo se na posljednje dvije tačke detaljnije.

0. Kafka Connect

Ovdje i kasnije u članku, svi primjeri konfiguracije razmatraju se u kontekstu Docker slike koju distribuiraju Debezium programeri. Sadrži sve potrebne dodatke (konektore) i pruža Kafka Connect konfiguraciju koristeći varijable okruženja.

Ako namjeravate koristiti Kafka Connect iz Confluenta, morat ćete sami dodati dodatke potrebnih konektora u direktorij naveden u plugin.path ili postaviti preko varijable okruženja CLASSPATH. Postavke za Kafka Connect radnika i konektore definiraju se kroz konfiguracijske datoteke koje se prosljeđuju kao argumenti naredbi za pokretanje radnika. Za detalje pogledajte dokumentaciju.

Cijeli proces postavljanja Debeizuma u verziji konektora odvija se u dvije faze. Razmotrimo svaki od njih:

1. Postavljanje okvira Kafka Connect

Za prijenos podataka u Apache Kafka klaster, određeni parametri se postavljaju u okvir Kafka Connect, kao što su:

  • postavke klaster veze,
  • nazivi tema u kojima će biti pohranjena konfiguracija samog konektora,
  • naziv grupe u kojoj se izvodi konektor (u slučaju korištenja distribuiranog načina).

Službena Docker slika projekta podržava konfiguraciju pomoću varijabli okruženja - to je ono što ćemo koristiti. Pa da skinemo sliku:

docker pull debezium/connect

Minimalni skup varijabli okruženja potrebnih za pokretanje konektora je sljedeći:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - inicijalna lista Kafka servera klastera za dobijanje kompletne liste članova klastera;
  • OFFSET_STORAGE_TOPIC=connector-offsets — tema za pohranjivanje pozicija na kojima se konektor trenutno nalazi;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - tema za pohranjivanje statusa konektora i njegovih zadataka;
  • CONFIG_STORAGE_TOPIC=connector-config - tema za pohranjivanje podataka o konfiguraciji konektora i njegovih zadataka;
  • GROUP_ID=1 — identifikator grupe radnika na kojoj se može izvršiti zadatak konektora; potrebno kada se koristi distribuirano (distribuirano) režima.

Pokrećemo kontejner sa ovim varijablama:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Napomena o Avru

Podrazumevano, Debezium zapisuje podatke u JSON formatu, što je prihvatljivo za sandboxove i male količine podataka, ali može biti problem u jako opterećenim bazama podataka. Alternativa JSON pretvaraču je serijalizacija poruka pomoću Avro u binarni format, što smanjuje opterećenje I/O podsistema u Apache Kafki.

Da biste koristili Avro, morate primijeniti poseban schema-registry (za pohranjivanje shema). Varijable za pretvarač će izgledati ovako:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detalji o korištenju Avro-a i postavljanju registra za njega su izvan okvira ovog članka - dalje, radi jasnoće, koristit ćemo JSON.

2. Postavljanje samog konektora

Sada možete ići direktno na konfiguraciju samog konektora, koji će čitati podatke iz izvora.

Pogledajmo primjer konektora za dva DBMS-a: PostgreSQL i MongoDB, za koje imam iskustva i za koje postoje razlike (iako male, ali u nekim slučajevima značajne!).

Konfiguracija je opisana u JSON notaciji i otpremljena u Kafka Connect pomoću POST zahtjeva.

2.1. PostgreSQL

Primjer konfiguracije konektora za PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Princip rada konektora nakon ove konfiguracije je prilično jednostavan:

  • Prilikom prvog pokretanja, povezuje se s bazom podataka specificiranom u konfiguraciji i pokreće se u načinu rada početni snimak, šaljući Kafki početni skup podataka primljenih sa uslovom SELECT * FROM table_name.
  • Nakon što je inicijalizacija završena, konektor ulazi u način čitanja promjena iz PostgreSQL WAL datoteka.

O korištenim opcijama:

  • name — naziv konektora za koji se koristi dolje opisana konfiguracija; u budućnosti se ovo ime koristi za rad sa konektorom (tj. pregled statusa / ponovno pokretanje / ažuriranje konfiguracije) preko Kafka Connect REST API-ja;
  • connector.class — klasu DBMS konektora koju će koristiti konfigurirani konektor;
  • plugin.name je naziv dodatka za logičko dekodiranje podataka iz WAL datoteka. Dostupan za odabir wal2json, decoderbuffs и pgoutput. Prva dva zahtijevaju instalaciju odgovarajućih ekstenzija u DBMS, i pgoutput za PostgreSQL verziju 10 i noviju ne zahtijevaju dodatne manipulacije;
  • database.* — opcije za povezivanje sa bazom podataka, gde database.server.name - naziv PostgreSQL instance koja se koristi za formiranje naziva teme u Kafka klasteru;
  • table.include.list - listu tabela u kojima želimo pratiti promjene; dato u formatu schema.table_name; ne može se koristiti zajedno sa table.exclude.list;
  • heartbeat.interval.ms — interval (u milisekundama) u kojem konektor šalje poruke otkucaja srca na posebnu temu;
  • heartbeat.action.query - zahtjev koji će se izvršavati prilikom slanja svake poruke otkucaja srca (opcija se pojavljuje od verzije 1.1);
  • slot.name — naziv slota za replikaciju koji će koristiti konektor;
  • publication.name - ime publikacije u PostgreSQL-u koji koristi konektor. U slučaju da ne postoji, Debezium će pokušati da ga kreira. Ako korisnik pod kojim se uspostavlja veza nema dovoljno prava za ovu radnju, konektor će izaći s greškom;
  • transforms određuje kako tačno promijeniti naziv ciljne teme:
    • transforms.AddPrefix.type označava da ćemo koristiti regularne izraze;
    • transforms.AddPrefix.regex — maska ​​kojom se redefinira naziv ciljne teme;
    • transforms.AddPrefix.replacement - direktno ono što redefinišemo.

Više o otkucajima srca i transformacijama

Po defaultu, konektor šalje podatke Kafki za svaku izvršenu transakciju i upisuje svoj LSN (Log Sequence Number) u temu usluge offset. Ali šta se dešava ako je konektor konfigurisan da čita ne celu bazu podataka, već samo deo njenih tabela (u kojima se podaci retko ažuriraju)?

  • Konektor će čitati WAL datoteke i neće otkriti urezivanje transakcija u njima na tablicama koje prati.
  • Stoga, neće ažurirati svoju trenutnu poziciju ni u temi ni u slotu za replikaciju.
  • Ovo će zauzvrat uzrokovati da WAL datoteke budu "zaglavljene" na disku i vjerovatno će ostati bez prostora na disku.

I tu opcije dolaze u pomoć. heartbeat.interval.ms и heartbeat.action.query. Korištenje ovih opcija u parovima omogućava da se izvrši zahtjev za promjenom podataka u zasebnoj tabeli svaki put kada se pošalje srčana poruka. Dakle, LSN na kojem se konektor trenutno nalazi (u slotu za replikaciju) se stalno ažurira. Ovo omogućava DBMS-u da ukloni WAL fajlove koji više nisu potrebni. Za više informacija o tome kako opcije funkcioniraju, pogledajte dokumentaciju.

Još jedna opcija koja zaslužuje veću pažnju je transforms. Iako je više o udobnosti i ljepoti...

Prema zadanim postavkama, Debezium kreira teme koristeći sljedeću politiku imenovanja: serverName.schemaName.tableName. Ovo možda nije uvijek zgodno. Opcije transforms koristeći regularne izraze, možete definirati listu tabela čiji se događaji trebaju usmjeriti na temu sa određenim imenom.

U našoj konfiguraciji zahvaljujući transforms događa se sljedeće: svi CDC događaji iz praćene baze podataka će ići na temu s imenom data.cdc.dbname. Inače (bez ovih postavki), Debezium bi po defaultu kreirao temu za svaku tabelu obrasca: pg-dev.public.<table_name>.

Ograničenja konektora

Na kraju opisa konfiguracije konektora za PostgreSQL, vrijedi govoriti o sljedećim karakteristikama / ograničenjima njegovog rada:

  1. Funkcionalnost konektora za PostgreSQL se oslanja na koncept logičkog dekodiranja. Stoga on ne prati zahtjeve za promjenu strukture baze podataka (DDL) - prema tome, ovi podaci neće biti u temama.
  2. Pošto se koriste slotovi za replikaciju, povezivanje konektora je moguće samo na glavnu instancu DBMS-a.
  3. Ako korisnik pod kojim se konektor povezuje na bazu podataka ima prava samo za čitanje, tada ćete prije prvog pokretanja morati ručno kreirati slot za replikaciju i objaviti u bazi podataka.

Primjena konfiguracije

Dakle, učitajmo našu konfiguraciju u konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Provjeravamo da li je preuzimanje bilo uspješno i da je konektor pokrenut:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Odlično: postavljen je i spreman za rad. Sada se pretvarajmo da smo potrošač i spojimo se na Kafku, nakon čega dodamo i promijenimo unos u tabeli:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

U našoj temi to će biti prikazano na sljedeći način:

Vrlo dugačak JSON sa našim promjenama

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

U oba slučaja, zapisi se sastoje od ključa (PK) zapisa koji je promijenjen i same suštine promjena: šta je zapis bio prije i šta je postao poslije.

  • U slučaju INSERT: vrijednost prije (before) jednako nullnakon čega slijedi niz koji je umetnut.
  • U slučaju UPDATE: u payload.before prikazano je prethodno stanje reda i u payload.after - novo sa suštinom promjene.

2.2 MongoDB

Ovaj konektor koristi standardni mehanizam replikacije MongoDB, čitajući informacije iz oploga primarnog čvora DBMS.

Slično već opisanom konektoru za PgSQL, i ovdje se pri prvom pokretanju pravi primarni snimak podataka, nakon čega se konektor prebacuje u oplog mod čitanja.

Primjer konfiguracije:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kao što vidite, nema novih opcija u odnosu na prethodni primjer, već je samo smanjen broj opcija odgovornih za povezivanje s bazom podataka i njihovih prefiksa.

Postavke transforms ovaj put rade sljedeće: okreću naziv ciljne teme iz šeme <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerancije grešaka

Pitanje tolerancije grešaka i visoke dostupnosti u naše vrijeme je akutnije nego ikad - posebno kada govorimo o podacima i transakcijama, a praćenje promjena podataka nije po strani po ovom pitanju. Pogledajmo šta u principu može poći po zlu i šta će se desiti sa Debeziumom u svakom slučaju.

Postoje tri opcije isključivanja:

  1. Kafka Connect neuspjeh. Ako je Connect konfiguriran da radi u distribuiranom načinu rada, to zahtijeva da više radnika postavi istu grupu.id. Zatim, ako jedan od njih pokvari, konektor će se ponovo pokrenuti na drugom workeru i nastaviti čitanje od posljednje urezane pozicije u temi u Kafki.
  2. Gubitak veze sa Kafka klasterom. Konektor će jednostavno prestati čitati na poziciji koju nije uspio poslati Kafki i povremeno će ga pokušati ponovo poslati dok pokušaj ne uspije.
  3. Izvor podataka nedostupan. Konektor će pokušati da se ponovo poveže sa izvorom u skladu sa konfiguracijom. Zadana vrijednost je 16 pokušaja korištenja eksponencijalni backoff. Nakon 16. neuspjelog pokušaja, zadatak će biti označen kao Nije uspeo i moraće se ručno ponovo pokrenuti preko Kafka Connect REST interfejsa.
    • U slučaju PostgreSQL podaci neće biti izgubljeni, jer korištenje slotova za replikaciju će spriječiti brisanje WAL datoteka koje konektor ne čita. U ovom slučaju postoji loša strana: ako je mrežna veza između konektora i DBMS-a poremećena na duže vrijeme, postoji šansa da će ponestati prostora na disku, a to može dovesti do kvara cijelog DBMS-a.
    • U slučaju MySQL binlog datoteke mogu biti rotirane od strane samog DBMS-a prije nego što se konekcija vrati. Ovo će dovesti do toga da konektor pređe u neuspješno stanje i morat će se ponovo pokrenuti u početnom režimu snimanja da bi nastavio čitanje iz binologa da bi se vratio normalan rad.
    • na MongoDB. Dokumentacija kaže: ponašanje konektora u slučaju da su log/oplog fajlovi izbrisani i konektor ne može da nastavi čitanje sa pozicije na kojoj je stao je isto za sve DBMS. Leži u činjenici da će konektor ući u stanje Nije uspeo i zahtijevat će ponovno pokretanje u načinu rada početni snimak.

      Međutim, postoje izuzeci. Ako je konektor bio u isključenom stanju duže vrijeme (ili nije mogao doći do MongoDB instance), a oplog je rotiran za to vrijeme, onda kada se veza uspostavi, konektor će mirno nastaviti čitati podatke sa prve dostupne pozicije , zbog čega su neki podaci u Kafki ne će pogoditi.

zaključak

Debezium je moje prvo iskustvo sa CDC sistemima i općenito je vrlo pozitivno. Projekat je podmitio podršku glavnog DBMS-a, lakoću konfiguracije, podršku za klasterisanje i aktivnu zajednicu. Za one koje zanima praksa, preporučujem da pročitaju vodiče za Kafka Connect и Debezijum.

U poređenju sa JDBC konektorom za Kafka Connect, glavna prednost Debeziuma je ta što se promene čitaju iz DBMS dnevnika, što omogućava prijem podataka sa minimalnim kašnjenjem. JDBC konektor (koji obezbeđuje Kafka Connect) ispituje praćenu tabelu u fiksnom intervalu i (iz istog razloga) ne generiše poruke kada se podaci obrišu (kako možete tražiti podatke kojih nema?).

Za rješavanje sličnih problema možete obratiti pažnju na sljedeća rješenja (pored Debeziuma):

PS

Pročitajte i na našem blogu:

izvor: www.habr.com

Dodajte komentar