Tutvustame Debeziumi – CDC Apache Kafka jaoks

Tutvustame Debeziumi – CDC Apache Kafka jaoks

Oma töös puutun sageli kokku uute tehniliste lahendustega/tarkvaratoodetega, mille kohta on venekeelses internetis info üsna napp. Selle artikliga püüan täita ühe sellise lünga näitega oma hiljutisest praktikast, kui mul oli vaja konfigureerida CDC sündmuste saatmine kahest populaarsest DBMS-ist (PostgreSQL ja MongoDB) Debeziumi abil Kafka klastrisse. Loodan, et see ülevaateartikkel, mis tehtud töö tulemusena ilmub, on teistele kasulik.

Mis on Debezium ja CDC üldiselt?

Debezium — CDC tarkvara kategooria esindaja (Jäädvusta andmete muudatus), või täpsemalt, see on erinevate DBMS-ide pistikute komplekt, mis ühildub Apache Kafka Connecti raamistikuga.

see avatud lähtekoodiga projekt, litsentsitud Apache License v2.0 alusel ja sponsoriks Red Hat. Arendus on kestnud alates 2016. aastast ja praegu pakub see ametlikku tuge järgmistele DBMS-idele: MySQL, PostgreSQL, MongoDB, SQL Server. Pistikud on ka Cassandra ja Oracle jaoks, kuid hetkel on need "varajase juurdepääsu" olekus ja uued väljalasked ei taga tagasiühilduvust.

Kui võrrelda CDC-d traditsioonilise lähenemisega (kui rakendus loeb andmeid otse DBMS-ist), on selle peamisteks eelisteks andmete muutmise voogesituse rakendamine reatasemel madala latentsuse, kõrge töökindluse ja kättesaadavusega. Kaks viimast punkti saavutatakse Kafka klastri kasutamisega CDC sündmuste hoidlana.

Teine eelis on asjaolu, et sündmuste salvestamiseks kasutatakse ühte mudelit, nii et lõpprakendus ei pea muretsema erinevate DBMS-ide töötamise nüansside pärast.

Lõpuks avab sõnumivahendaja kasutamine andmemuudatusi jälgivate rakenduste horisontaalseks skaleerimiseks. Samal ajal on mõju andmeallikale minimaalne, kuna andmeid ei saada otse DBMS-ist, vaid Kafka klastrist.

Debeziumi arhitektuurist

Debeziumi kasutamine taandub sellele lihtsale skeemile:

DBMS (andmeallikana) → pistik Kafka Connectis → Apache Kafka → tarbija

Näitena on siin diagramm projekti veebisaidilt:

Tutvustame Debeziumi – CDC Apache Kafka jaoks

See skeem mulle aga väga ei meeldi, sest tundub, et ainult valamupistiku kasutamine on võimalik.

Tegelikkuses on olukord erinev: täitke oma Data Lake (viimane link ülaloleval diagrammil) ei ole ainus viis Debeziumi kasutamiseks. Apache Kafkale saadetud sündmusi saavad teie rakendused kasutada mitmesuguste olukordade lahendamiseks. Näiteks:

  • ebaoluliste andmete eemaldamine vahemälust;
  • teadete saatmine;
  • otsi indeksi värskendusi;
  • mingid auditi logid;
  • ...

Kui teil on Java rakendus ja puudub vajadus/võimalus Kafka klastri kasutamiseks, on ka võimalus läbi töötada sisseehitatud pistik. Ilmne eelis on see, et see välistab vajaduse täiendava infrastruktuuri järele (pistiku ja Kafka näol). Kuid see lahendus on alates versioonist 1.1 aegunud ja seda ei soovitata enam kasutada (tulevastes versioonides võidakse selle tugi eemaldada).

Selles artiklis käsitletakse arendajate soovitatud arhitektuuri, mis tagab tõrketaluvuse ja mastaapsuse.

Pistiku konfiguratsioon

Kõige olulisema väärtuse – andmete – muutuste jälgimise alustamiseks vajame:

  1. andmeallikas, milleks võib olla MySQL alates versioonist 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (täielik loetelu);
  2. Apache Kafka klaster
  3. Kafka Connecti eksemplar (versioonid 1.x, 2.x);
  4. konfigureeritud Debeziumi pistik.

Töötage kahe esimese punktiga, s.o. DBMS-i ja Apache Kafka installimise protsess ei kuulu artikli ulatusse. Kuid neile, kes soovivad kõike liivakastis juurutada, on ametlikus hoidlas näidetega valmis üks. docker-compose.yaml.

Peatume üksikasjalikumalt kahel viimasel punktil.

0. Kafka Connect

Siin ja edaspidi artiklis käsitletakse kõiki konfiguratsiooninäiteid Debeziumi arendajate poolt levitatud Dockeri pildi kontekstis. See sisaldab kõiki vajalikke pluginafaile (pistikuid) ja pakub keskkonnamuutujate abil Kafka Connecti konfigureerimist.

Kui kavatsete kasutada Confluenti Kafka Connecti, peate iseseisvalt lisama vajalike pistikute pistikprogrammid jaotises määratud kataloogi. plugin.path või määrata keskkonnamuutuja kaudu CLASSPATH. Kafka Connecti töötaja ja konnektorite sätted määratakse konfiguratsioonifailide kaudu, mis edastatakse argumentidena töötaja käivituskäsule. Üksikasju vt dokumentatsioon.

Kogu Debeizumi seadistamise protsess pistiku versioonis toimub kahes etapis. Vaatame igaüks neist:

1. Kafka Connecti raamistiku seadistamine

Andmete voogesitamiseks Apache Kafka klastrisse määratakse Kafka Connecti raamistikus konkreetsed parameetrid, näiteks:

  • klastriga ühendamise parameetrid,
  • teemade nimed, kuhu konnektori enda konfiguratsioon otse salvestatakse,
  • grupi nimi, milles konnektor töötab (jaotatud režiimi kasutamise korral).

Projekti ametlik Dockeri pilt toetab konfigureerimist keskkonnamuutujate abil - seda me kasutame. Laadime siis pildi alla:

docker pull debezium/connect

Konnektori käitamiseks vajalik minimaalne keskkonnamuutujate komplekt on järgmine.

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - Kafka klastri serverite esialgne loend, et saada täielik klastri liikmete loend;
  • OFFSET_STORAGE_TOPIC=connector-offsets — teema positsioonide salvestamiseks, kus konnektor hetkel asub;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - teema konnektori oleku ja selle ülesannete salvestamiseks;
  • CONFIG_STORAGE_TOPIC=connector-config - konnektori konfiguratsiooniandmete ja selle ülesannete salvestamise teema;
  • GROUP_ID=1 — selle töötajate rühma identifikaator, kellel saab ühendustoimingut täita; nõutav hajutatud kasutamisel (levitatud) režiim.

Alustame konteinerit järgmiste muutujatega:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Märkus Avro kohta

Vaikimisi kirjutab Debezium andmed JSON-vormingus, mis on vastuvõetav liivakastide ja väikeste andmemahtude jaoks, kuid võib olla probleem tugevalt koormatud andmebaasides. JSON-muunduri alternatiiviks on sõnumite järjestamine kasutades Avro binaarvormingusse, mis vähendab Apache Kafka I/O alamsüsteemi koormust.

Avro kasutamiseks peate juurutama eraldi skeem-register (skeemide salvestamiseks). Konverteri muutujad näevad välja järgmised:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Üksikasjad Avro kasutamise ja selle jaoks registri seadistamise kohta ei kuulu selle artikli ulatusse – edaspidi kasutame selguse huvides JSON-i.

2. Pistiku enda seadistamine

Nüüd saate minna otse konnektori enda konfiguratsiooni juurde, mis loeb andmeid allikast.

Vaatame näidet kahe DBMS-i konnektoritest: PostgreSQL ja MongoDB, mille osas mul on kogemusi ja mille osas on erinevusi (olgugi, et väikesed, kuid mõnel juhul märkimisväärsed!).

Konfiguratsiooni kirjeldatakse JSON-i tähistuses ja laaditakse POST-päringu abil üles Kafka Connecti.

2.1. PostgreSQL

PostgreSQL-i konnektori konfiguratsiooni näide:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Pistiku tööpõhimõte pärast seda seadistust on üsna lihtne:

  • Esmakordsel käivitamisel loob see ühenduse konfiguratsioonis määratud andmebaasiga ja käivitub režiimis esialgne pilt, saates Kafkale tingimuslause abil saadud esialgse andmekogumi SELECT * FROM table_name.
  • Pärast lähtestamise lõpetamist siseneb konnektor PostgreSQL WAL-failide muudatuste lugemise režiimi.

Kasutatud valikute kohta:

  • name — selle pistiku nimi, mille jaoks kasutatakse allpool kirjeldatud konfiguratsiooni; edaspidi kasutatakse seda nime konnektoriga töötamiseks (st oleku vaatamiseks/taaskäivitamiseks/konfiguratsiooni värskendamiseks) Kafka Connect REST API kaudu;
  • connector.class — DBMS-i konnektoriklass, mida konfigureeritud konnektor kasutab;
  • plugin.name on WAL-failidest andmete loogilise dekodeerimise plugina nimi. Saadaval, mille vahel valida wal2json, decoderbuffs и pgoutput. Esimesed kaks nõuavad DBMS-i vastavate laienduste installimist ja pgoutput PostgreSQL-i versioon 10 ja uuemad ei vaja täiendavaid manipuleerimisi;
  • database.* — andmebaasiga ühenduse loomise võimalused, kus database.server.name — PostgreSQL-i eksemplari nimi, mida kasutatakse teema nime moodustamiseks Kafka klastris;
  • table.include.list — tabelite loend, milles soovime muudatusi jälgida; vormingus määratud schema.table_name; ei saa kasutada koos table.exclude.list;
  • heartbeat.interval.ms — intervall (millisekundites), millega konnektor saadab südamelöögiteateid spetsiaalsele teemale;
  • heartbeat.action.query — päring, mis täidetakse iga südamelöögiteate saatmisel (valik ilmus versioonis 1.1);
  • slot.name — konnektori poolt kasutatava replikatsioonipesa nimi;
  • publication.name - Nimi Avaldamine PostgreSQL-is, mida konnektor kasutab. Kui seda pole, proovib Debezium seda luua. Kui kasutajal, kelle all ühendus luuakse, ei ole selle toimingu jaoks piisavalt õigusi, katkeb konnektor veaga;
  • transforms määrab, kuidas täpselt sihtteema nime muuta:
    • transforms.AddPrefix.type näitab, et kasutame regulaaravaldisi;
    • transforms.AddPrefix.regex — mask, mis määratleb uuesti sihtteema nime;
    • transforms.AddPrefix.replacement - otseselt see, mida me ümber määratleme.

Rohkem südamelöökide ja transformatsioonide kohta

Vaikimisi saadab konnektor andmed Kafkale iga sooritatud tehingu kohta ja selle LSN (logi järjekorranumber) salvestatakse teenuse teemasse offset. Aga mis juhtub, kui konnektor on konfigureeritud lugema mitte kogu andmebaasi, vaid ainult osa selle tabeleid (milles andmeid ei värskendata sageli)?

  • Konnektor loeb WAL-faile ega tuvasta neis jälgitavates tabelites tehingute sooritamist.
  • Seetõttu ei värskenda see oma praegust asukohta ei teemas ega replikatsioonipesas.
  • See omakorda toob kaasa WAL-failide plaadil hoidmise ja tõenäoliselt saab kettaruum otsa.

Ja siin tulevad appi valikud. heartbeat.interval.ms и heartbeat.action.query. Nende valikute paarikaupa kasutamine võimaldab iga kord, kui südamelöögiteade saadetakse, esitada eraldi tabelis olevate andmete muutmise päringu. Seega uuendatakse pidevalt LSN-i, millel konnektor praegu asub (replikatsioonipesas). See võimaldab DBMS-il eemaldada WAL-failid, mida enam ei vajata. Lisateabe saamiseks valikute toimimise kohta vt dokumentatsioon.

Teine võimalus, mis väärib põhjalikumat tähelepanu, on transforms. Kuigi see on pigem mugavus ja ilu...

Vaikimisi loob Debezium teemasid järgmiste nimetamisreeglite abil: serverName.schemaName.tableName. See ei pruugi alati mugav olla. Valikud transforms Regulaaravaldiste abil saate määratleda tabelite loendi, millest sündmused tuleb suunata konkreetse nimega teemasse.

Täname meie konfiguratsioonis transforms juhtub järgmine: kõik CDC sündmused jälgitavast andmebaasist lähevad nimega teemasse data.cdc.dbname. Vastasel juhul (ilma nende säteteta) loob Debezium vaikimisi iga tabeli jaoks teema, näiteks: pg-dev.public.<table_name>.

Konnektori piirangud

PostgreSQL-i konnektori konfiguratsiooni kirjelduse lõpetuseks tasub rääkida järgmistest selle toimimise funktsioonidest/piirangutest:

  1. PostgreSQL-i konnektori funktsionaalsus tugineb loogilise dekodeerimise kontseptsioonile. Seetõttu ta ei jälgi andmebaasi struktuuri muutmise taotlusi (DDL) - vastavalt neid andmeid teemadesse ei tule.
  2. Kuna kasutatakse replikatsioonipesasid, on pistiku ühendamine võimalik ainult juhtivale DBMS-i eksemplarile.
  3. Kui kasutajal, kelle all konnektor andmebaasiga ühenduse loob, on kirjutuskaitstud õigused, peate enne esimest käivitamist käsitsi replikatsioonipesa looma ja andmebaasis avaldama.

Konfiguratsiooni rakendamine

Niisiis, laadime oma konfiguratsiooni konnektorisse:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Kontrollime, kas allalaadimine õnnestus ja konnektor käivitub:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Suurepärane: see on seadistatud ja kasutamiseks valmis. Teeskleme nüüd tarbijat ja loome ühenduse Kafkaga, mille järel lisame ja muudame tabelisse kirje:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Meie teemas kuvatakse see järgmiselt:

Väga pikk JSON meie muudatustega

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Mõlemal juhul koosnevad kirjed muudetud kirje võtmest (PK) ja muudatuste olemusest: mis kirje oli enne ja mis sellest sai pärast.

  • Juhul kui INSERT: väärtus enne (before) võrdub nullmillele järgneb sisestatud string.
  • Juhul kui UPDATE: sisse payload.before kuvatakse rea eelmine olek ja sisse payload.after — uus koos muudatuste olemusega.

2.2 MongoDB

See konnektor kasutab standardset MongoDB replikatsioonimehhanismi, lugedes teavet esmase DBMS-i sõlme oplogist.

Sarnaselt juba kirjeldatud PgSQL-i konnektoriga tehakse ka siin esmakordsel käivitamisel esmane andmete hetktõmmis, misjärel lülitub konnektor oplogi lugemisrežiimile.

Konfiguratsiooni näide:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Nagu näete, ei ole siin võrreldes eelmise näitega uusi valikuid, vaid vähendatud on ainult andmebaasiga ühenduse loomise eest vastutavate valikute ja nende eesliidete arvu.

Seaded transforms seekord teevad nad järgmist: keerake skeemist sihtteema nimi <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

veataluvus

Meie aja tõrketaluvuse ja kõrge kättesaadavuse teema on teravam kui kunagi varem – eriti kui räägime andmetest ja tehingutest ning andmete muutuste jälgimine pole selles küsimuses kõrvaline. Vaatame, mis võib põhimõtteliselt valesti minna ja mis saab igal juhul Debeziumist.

Loobumisvõimalusi on kolm:

  1. Kafka Connecti rike. Kui Connect on konfigureeritud töötama hajutatud režiimis, peab see mitu töötajat määrama sama group.id. Seejärel, kui üks neist ebaõnnestub, taaskäivitatakse konnektor mõnel teisel töötajal ja jätkatakse lugemist Kafka teema viimasest kinnitatud positsioonist.
  2. Ühenduse katkemine Kafka klastriga. Ühendus lihtsalt lõpetab lugemise kohas, mida Kafkale saatmine ebaõnnestus, ja proovib seda perioodiliselt uuesti saata, kuni katse õnnestub.
  3. Andmeallika puudumine. Konnektor proovib konfigureeritud viisil allikaga uuesti ühendust luua. Vaikimisi on 16 katset eksponentsiaalne taganemine. Pärast 16. ebaõnnestunud katset märgitakse ülesanne kui ei ja see tuleb Kafka Connect REST liidese kaudu käsitsi taaskäivitada.
    • Juhul kui PostgreSQL andmed ei lähe kaduma, sest Replikatsioonipesade kasutamine takistab teil kustutamast WAL-faile, mida konnektor ei loe. Sel juhul on mündil ka negatiivne külg: kui võrguühendus pistiku ja DBMS-i vahel on pikemat aega häiritud, on võimalus, et kettaruum saab otsa ja see võib põhjustada kogu DBMS-i.
    • Juhul kui MySQL DBMS saab enne ühenduse taastamist binlogi faile pöörata. Selle tulemusel läheb konnektor ebaõnnestunud olekusse ja normaalse töö taastamiseks peate taaskäivitama esialgses hetktõmmise režiimis, et jätkata binlogide lugemist.
    • edasi MongoDB. Dokumentatsioonis on kirjas: konnektori käitumine juhul, kui logi-/oplog-failid on kustutatud ja konnektor ei saa lugemist jätkata kohast, kus see pooleli jäi, on kõigi DBMS-ide puhul sama. See tähendab, et pistik läheb olekusse ei ja nõuab režiimis taaskäivitamist esialgne pilt.

      Siiski on erandeid. Kui pistik oli pikka aega lahti ühendatud (või ei pääsenud MongoDB eksemplarini) ja oplog läbis selle aja jooksul pöörlemise, siis ühenduse taastamisel jätkab konnektor rahulikult andmete lugemist esimesest saadaolevast positsioonist, mistõttu mõned andmed Kafkas ei tabab.

Järeldus

Debezium on minu esimene kogemus CDC süsteemidega ja üldiselt väga positiivne. Projekt andis altkäemaksu peamise DBMS-i toe, konfigureerimise lihtsuse, klastrite toe ja aktiivse kogukonna. Praktikahuvilistel soovitan tutvuda juhendiga Kafka Connect и Debezium.

Võrreldes Kafka Connecti JDBC-pistikuga, on Debeziumi peamine eelis see, et muudatusi loetakse DBMS-i logidest, mis võimaldab andmeid vastu võtta minimaalse viivitusega. JDBC-konnektor (pakkuja Kafka Connect) esitab jälgitavale tabelile kindla intervalliga päringuid ja (samal põhjusel) ei genereeri andmete kustutamisel sõnumeid (kuidas saate küsida andmeid, mida seal pole?).

Sarnaste probleemide lahendamiseks võite pöörata tähelepanu järgmistele lahendustele (lisaks Debeziumile):

PS

Loe ka meie blogist:

Allikas: www.habr.com

Lisa kommentaar