Predstavljamo Debezium - CDC za Apache Kafka

Predstavljamo Debezium - CDC za Apache Kafka

Pri svojem delu pogosto naletim na nove tehnične rešitve / programske izdelke, o katerih je na rusko govorečem internetu precej malo informacij. S tem člankom bom poskušal zapolniti eno takšno vrzel s primerom iz svoje nedavne prakse, ko sem moral nastaviti pošiljanje dogodkov CDC iz dveh priljubljenih DBMS (PostgreSQL in MongoDB) v gručo Kafka z uporabo Debeziuma. Upam, da bo ta pregledni članek, ki je nastal kot rezultat opravljenega dela, koristen tudi drugim.

Kaj je Debezium in CDC na splošno?

Debezij - Predstavnik kategorije programske opreme CDC (Zajemite spremembo podatkov), ali natančneje, gre za nabor konektorjev za različne DBMS-je, ki so združljivi z ogrodjem Apache Kafka Connect.

To odprtokodni projekt, licenciran pod licenco Apache v2.0 in sponzoriran s strani Red Hat. Razvoj poteka od leta 2016 in trenutno zagotavlja uradno podporo za naslednje DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Obstajajo tudi konektorji za Cassandra in Oracle, vendar so trenutno v statusu "zgodnjega dostopa" in nove izdaje ne zagotavljajo združljivosti za nazaj.

Če CDC primerjamo s tradicionalnim pristopom (ko aplikacija neposredno bere podatke iz DBMS), potem njegove glavne prednosti vključujejo implementacijo pretakanja sprememb podatkov na ravni vrstice z nizko zakasnitvijo, visoko zanesljivostjo in razpoložljivostjo. Zadnji dve točki sta doseženi z uporabo gruče Kafka kot repozitorija za dogodke CDC.

Med prednostmi je tudi dejstvo, da se za shranjevanje dogodkov uporablja en sam model, tako da končni aplikaciji ni treba skrbeti za nianse delovanja različnih DBMS.

Nazadnje uporaba posrednika sporočil odpira prostor za horizontalno skaliranje aplikacij, ki sledijo spremembam podatkov. Hkrati je vpliv na vir podatkov čim manjši, saj podatki niso prejeti neposredno iz DBMS, temveč iz gruče Kafka.

O arhitekturi Debezium

Uporaba Debeziuma se zmanjša na to preprosto shemo:

DBMS (kot vir podatkov) → konektor v Kafka Connect → Apache Kafka → potrošnik

Kot ilustracijo bom podal diagram s spletne strani projekta:

Predstavljamo Debezium - CDC za Apache Kafka

Vendar mi ta shema ni ravno všeč, ker se zdi, da je možen samo priključek za pomivalno korito.

V resnici je situacija drugačna: polnjenje podatkovnega jezera (zadnja povezava v zgornjem diagramu) ni edini način uporabe zdravila Debezium. Dogodke, poslane Apache Kafki, lahko vaše aplikacije uporabijo za reševanje različnih situacij. Na primer:

  • odstranitev nepomembnih podatkov iz predpomnilnika;
  • pošiljanje obvestil;
  • posodobitve iskalnega indeksa;
  • neke vrste revizijski dnevniki;
  • ...

V primeru, da imate aplikacijo Java in ni potrebe/možnosti uporabe gruče Kafka, obstaja tudi možnost dela prek vgrajen priključek. Očiten plus je, da z njim lahko zavrnete dodatno infrastrukturo (v obliki konektorja in Kafke). Vendar je bila ta rešitev od različice 1.1 opuščena in ni več priporočljiva za uporabo (morda bo odstranjena v prihodnjih izdajah).

Ta članek bo razpravljal o arhitekturi, ki jo priporočajo razvijalci, ki zagotavlja odpornost na napake in razširljivost.

Konfiguracija priključka

Da lahko začnemo slediti spremembam najpomembnejše vrednosti - podatka - potrebujemo:

  1. vir podatkov, ki je lahko MySQL od različice 5.7 naprej, PostgreSQL 9.6+, MongoDB 3.2+ (Celoten seznam);
  2. Grozd Apache Kafka
  3. Primerek Kafka Connect (različice 1.x, 2.x);
  4. konfiguriran konektor Debezium.

Delajte na prvih dveh točkah, tj. postopek namestitve DBMS in Apache Kafka presega obseg članka. Vendar pa je za tiste, ki želijo vse namestiti v peskovnik, v uradnem repozitoriju že pripravljen s primeri docker-compose.yaml.

Zadnji dve točki se bomo podrobneje posvetili.

0. Kafka Connect

Tukaj in kasneje v članku so vsi primeri konfiguracije obravnavani v kontekstu slike Docker, ki jo distribuirajo razvijalci Debeziuma. Vsebuje vse potrebne datoteke vtičnikov (konektorje) in zagotavlja konfiguracijo Kafka Connect z uporabo spremenljivk okolja.

Če nameravate uporabljati Kafka Connect iz Confluenta, boste morali sami dodati vtičnike potrebnih konektorjev v imenik, naveden v plugin.path ali nastavite prek spremenljivke okolja CLASSPATH. Nastavitve za delavca Kafka Connect in konektorje so definirane s konfiguracijskimi datotekami, ki so posredovane kot argumenti ukazu za zagon delavca. Za podrobnosti glej dokumentacijo.

Celoten postopek nastavitve Debeizuma v konektorski različici poteka v dveh fazah. Razmislimo o vsakem od njih:

1. Nastavitev ogrodja Kafka Connect

Za pretakanje podatkov v gručo Apache Kafka so v ogrodju Kafka Connect nastavljeni posebni parametri, kot so:

  • nastavitve povezave gruče,
  • imena tem, v katerih bo shranjena konfiguracija samega priključka,
  • ime skupine, v kateri se izvaja konektor (v primeru uporabe porazdeljenega načina).

Uradna Dockerjeva slika projekta podpira konfiguracijo z uporabo spremenljivk okolja - to bomo uporabili. Torej prenesimo sliko:

docker pull debezium/connect

Najmanjši nabor spremenljivk okolja, potrebnih za zagon konektorja, je naslednji:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - začetni seznam strežnikov gruče Kafka, da dobite popoln seznam članov gruče;
  • OFFSET_STORAGE_TOPIC=connector-offsets — temo za shranjevanje položajev, kjer se konektor trenutno nahaja;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - temo za shranjevanje statusa konektorja in njegovih nalog;
  • CONFIG_STORAGE_TOPIC=connector-config - temo za shranjevanje konfiguracijskih podatkov konektorja in njegovih nalog;
  • GROUP_ID=1 — identifikator skupine delavcev, na kateri je mogoče izvesti konektorsko nalogo; potrebno pri uporabi porazdeljenih (razdeljeno) režim.

Vsebnik zaženemo s temi spremenljivkami:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Opomba o Avru

Debezium privzeto zapisuje podatke v formatu JSON, ki je sprejemljiv za peskovnike in majhne količine podatkov, vendar lahko predstavlja težavo v zelo obremenjenih zbirkah podatkov. Alternativa pretvorniku JSON je serializacija sporočil z uporabo Avro v binarni format, kar zmanjša obremenitev V/I podsistema v Apache Kafka.

Če želite uporabljati Avro, morate namestiti ločeno shema-register (za shranjevanje shem). Spremenljivke za pretvornik bodo videti takole:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Podrobnosti o uporabi Avro in nastavitvi registra zanj presegajo obseg članka - poleg tega bomo zaradi jasnosti uporabili JSON.

2. Nastavitev samega priključka

Zdaj lahko greste neposredno na konfiguracijo samega priključka, ki bo prebral podatke iz vira.

Poglejmo primer konektorjev za dva DBMS: PostgreSQL in MongoDB, za katera imam izkušnje in za katere obstajajo razlike (čeprav majhne, ​​a v nekaterih primerih pomembne!).

Konfiguracija je opisana v zapisu JSON in naložena v Kafka Connect z uporabo zahteve POST.

2.1. PostgreSQL

Primer konfiguracije konektorja za PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Načelo delovanja konektorja po tej konfiguraciji je precej preprosto:

  • Ob prvem zagonu se poveže z bazo podatkov, določeno v konfiguraciji, in zažene v načinu začetni posnetek, ki Kafki pošlje začetni niz podatkov, prejetih s pogojnikom SELECT * FROM table_name.
  • Po končani inicializaciji konektor preide v način branja sprememb iz datotek PostgreSQL WAL.

O uporabljenih možnostih:

  • name — ime konektorja, za katerega se uporablja spodaj opisana konfiguracija; v prihodnje se to ime uporablja za delo s konektorjem (tj. ogled stanja/ponovni zagon/posodabljanje konfiguracije) prek Kafka Connect REST API;
  • connector.class — razred konektorja DBMS, ki ga bo uporabljal konfigurirani konektor;
  • plugin.name je ime dodatka za logično dekodiranje podatkov iz datotek WAL. Na voljo na izbiro wal2json, decoderbuffs и pgoutput. Prva dva zahtevata namestitev ustreznih razširitev v DBMS in pgoutput za PostgreSQL različice 10 in višje ne zahteva dodatnih manipulacij;
  • database.* — možnosti povezovanja z bazo podatkov, kjer database.server.name - ime primerka PostgreSQL, uporabljenega za oblikovanje imena teme v gruči Kafka;
  • table.include.list - seznam tabel, v katerih želimo slediti spremembam; podan v obliki schema.table_name; ni mogoče uporabljati skupaj z table.exclude.list;
  • heartbeat.interval.ms — interval (v milisekundah), s katerim konektor pošilja sporočila srčnega utripa posebni temi;
  • heartbeat.action.query - zahteva, ki se bo izvršila ob pošiljanju vsakega srčnega utripa (možnost se je pojavila od različice 1.1);
  • slot.name — ime reže za podvajanje, ki jo bo uporabljal konektor;
  • publication.name - Ime publikacije v PostgreSQL, ki ga uporablja priključek. V primeru, da ne obstaja, ga bo Debezium poskušal ustvariti. Če uporabnik, pod katerim je vzpostavljena povezava, nima dovolj pravic za to dejanje, se konektor zapre z napako;
  • transforms določa, kako natančno spremeniti ime ciljne teme:
    • transforms.AddPrefix.type nakazuje, da bomo uporabili regularne izraze;
    • transforms.AddPrefix.regex — maska, s katero se na novo definira ime ciljne teme;
    • transforms.AddPrefix.replacement - neposredno tisto, kar redefiniramo.

Več o srčnem utripu in transformacijah

Privzeto konektor pošlje podatke Kafki za vsako odobreno transakcijo in zapiše svoj LSN (zaporedna številka dnevnika) v temo storitve offset. Toda kaj se zgodi, če je konektor konfiguriran tako, da ne bere celotne baze podatkov, ampak samo del njenih tabel (v katerih se podatki redko posodabljajo)?

  • Spojnik bo prebral datoteke WAL in v njih ne bo zaznal predaje transakcij v tabele, ki jih nadzira.
  • Zato ne bo posodobil svojega trenutnega položaja niti v temi niti v reži za podvajanje.
  • To pa bo povzročilo, da bodo datoteke WAL "obtičale" na disku in bo verjetno zmanjkalo prostora na disku.

In tukaj na pomoč pridejo možnosti. heartbeat.interval.ms и heartbeat.action.query. Uporaba teh možnosti v parih omogoča izvedbo zahteve za spremembo podatkov v ločeni tabeli vsakič, ko je poslano sporočilo srčnega utripa. Tako se LSN, na katerem se trenutno nahaja priključek (v reži za replikacijo), nenehno posodablja. To DBMS omogoča, da odstrani datoteke WAL, ki niso več potrebne. Za več informacij o delovanju možnosti glejte dokumentacijo.

Druga možnost, ki si zasluži večjo pozornost, je transforms. Čeprav gre bolj za udobje in lepoto ...

Debezium privzeto ustvari teme z naslednjim pravilnikom o poimenovanju: serverName.schemaName.tableName. To morda ni vedno priročno. Opcije transforms z uporabo regularnih izrazov lahko definirate seznam tabel, katerih dogodke je treba usmeriti v temo z določenim imenom.

V naši konfiguraciji zahvaljujoč transforms zgodi se naslednje: vsi dogodki CDC iz sledene baze podatkov bodo šli v temo z imenom data.cdc.dbname. V nasprotnem primeru (brez teh nastavitev) bi Debezium privzeto ustvaril temo za vsako tabelo obrazca: pg-dev.public.<table_name>.

Omejitve priključka

Na koncu opisa konfiguracije konektorja za PostgreSQL je vredno govoriti o naslednjih funkcijah / omejitvah njegovega dela:

  1. Funkcionalnost konektorja za PostgreSQL temelji na konceptu logičnega dekodiranja. Zato on ne sledi zahtevam za spremembo strukture baze podatkov (DDL) - v skladu s tem ti podatki ne bodo v temah.
  2. Ker se uporabljajo replikacijske reže, je povezava konektorja možna Samo na glavno instanco DBMS.
  3. Če ima uporabnik, pod katerim se konektor povezuje z bazo podatkov, pravice samo za branje, boste morali pred prvim zagonom ročno ustvariti replikacijsko režo in objaviti v bazi podatkov.

Uporaba konfiguracije

Torej naložimo našo konfiguracijo v konektor:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Preverimo, ali je bil prenos uspešen in se je konektor začel:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Odlično: nastavljeno je in pripravljeno za uporabo. Sedaj pa se pretvarjajmo, da smo potrošnik in se povežemo s Kafko, nato dodamo in spremenimo vnos v tabeli:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

V naši temi bo to prikazano na naslednji način:

Zelo dolg JSON z našimi spremembami

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

V obeh primerih sta zapisa sestavljena iz ključa (PK) zapisa, ki je bil spremenjen, in samega bistva sprememb: kaj je zapis bil prej in kaj je postal pozneje.

  • V primeru INSERT: vrednost pred (before) je enako nullsledi niz, ki je bil vstavljen.
  • V primeru UPDATE: ob payload.before se prikaže prejšnje stanje vrstice in v payload.after - novo z bistvom spremembe.

2.2 MongoDB

Ta konektor uporablja standardni mehanizem podvajanja MongoDB, pri čemer bere informacije iz oploga primarnega vozlišča DBMS.

Podobno kot pri že opisanem konektorju za PgSQL se tudi tukaj ob prvem zagonu naredi primarni podatkovni posnetek, po katerem konektor preklopi v način branja oplog.

Primer konfiguracije:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kot lahko vidite, v primerjavi s prejšnjim primerom ni novih možnosti, ampak se je zmanjšalo le število možnosti, odgovornih za povezovanje z bazo podatkov, in njihovih predpon.

Nastavitve transforms tokrat naredijo naslednje: obrnejo ime ciljne teme iz sheme <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

toleranca napak

Vprašanje tolerance napak in visoke razpoložljivosti je v našem času bolj pereče kot kdaj koli prej - še posebej, ko govorimo o podatkih in transakcijah, in sledenje spremembam podatkov v tej zadevi ni na stranskem tiru. Poglejmo, kaj gre načeloma lahko narobe in kaj se bo zgodilo z Debeziumom v posameznem primeru.

Na voljo so tri možnosti za zavrnitev:

  1. Kafka Connect napaka. Če je Connect konfiguriran za delo v porazdeljenem načinu, mora več delavcev nastaviti isti group.id. Nato, če eden od njih odpove, se konektor znova zažene na drugem delavcu in nadaljuje z branjem od zadnjega dodeljenega položaja v temi v Kafki.
  2. Izguba povezljivosti z grozdom Kafka. Konektor bo preprosto nehal brati na mestu, ki ga ni uspel poslati Kafki, in ga občasno poskusil znova poslati, dokler poskus ne uspe.
  3. Vir podatkov ni na voljo. Konektor se bo poskušal znova povezati z virom v skladu s konfiguracijo. Privzeto je 16 poskusov uporabe eksponentni povratek. Po 16. neuspelem poskusu bo naloga označena kot ni in ga bo treba ročno znova zagnati prek vmesnika Kafka Connect REST.
    • V primeru PostgreSQL podatki ne bodo izgubljeni, saj uporaba rež za podvajanje bo preprečila brisanje datotek WAL, ki jih konektor ne prebere. V tem primeru obstaja slaba stran: če je omrežna povezljivost med konektorjem in DBMS motena dlje časa, obstaja možnost, da zmanjka prostora na disku, kar lahko privede do okvare celotnega DBMS.
    • V primeru MySQL binlog datoteke lahko vrti sam DBMS, preden se povezljivost obnovi. To bo povzročilo, da bo konektor prešel v neuspešno stanje in se bo moral znova zagnati v načinu začetnega posnetka, da bo nadaljeval z branjem iz binlogov in obnovil normalno delovanje.
    • na MongoDB. V dokumentaciji piše: obnašanje konektorja v primeru, da so bile datoteke dnevnika/oplog izbrisane in konektor ne more nadaljevati z branjem od mesta, kjer je končal, je enako za vse DBMS. Leži v tem, da bo konektor prešel v stanje ni in bo zahteval ponovni zagon v načinu začetni posnetek.

      Vendar obstajajo izjeme. Če je bil konektor dolgo časa v prekinjenem stanju (ali ni mogel doseči primerka MongoDB) in je bil oplog med tem časom zavrten, bo konektor, ko je povezava ponovno vzpostavljena, mirno nadaljeval z branjem podatkov s prvega razpoložljivega položaja , zato nekateri podatki v Kafki ne bo udaril.

Zaključek

Debezium je moja prva izkušnja s sistemi CDC in je bila na splošno zelo pozitivna. Projekt je podkupil podporo glavnega DBMS, enostavnost konfiguracije, podporo za združevanje v gruče in aktivno skupnost. Tistim, ki jih zanima praksa, priporočam, da preberete vodnike za Kafka Connect и Debezij.

V primerjavi s konektorjem JDBC za Kafka Connect je glavna prednost Debeziuma ta, da se spremembe berejo iz dnevnikov DBMS, kar omogoča sprejem podatkov z minimalno zamudo. JDBC Connector (zagotavlja Kafka Connect) poizveduje po sledeni tabeli v določenem intervalu in (iz istega razloga) ne ustvari sporočil, ko so podatki izbrisani (kako lahko poizvedujete po podatkih, ki jih ni?).

Za reševanje podobnih težav ste lahko pozorni na naslednje rešitve (poleg Debeziuma):

PS

Preberite tudi na našem blogu:

Vir: www.habr.com

Dodaj komentar