Pristatome Debezium – CDC, skirtą Apache Kafka

Pristatome Debezium – CDC, skirtą Apache Kafka

Savo darbe dažnai susiduriu su naujais techniniais sprendimais / programinės įrangos produktais, apie kuriuos informacijos rusakalbiame internete yra gana mažai. Šiuo straipsniu pabandysiu užpildyti vieną tokią spragą pavyzdžiu iš savo naujausios praktikos, kai reikėjo nustatyti CDC įvykių siuntimą iš dviejų populiarių DBVS (PostgreSQL ir MongoDB) į Kafka klasterį naudojant Debezium. Tikiuosi, kad šis apžvalginis straipsnis, pasirodęs atlikto darbo rezultatas, bus naudingas kitiems.

Kas yra Debezium ir CDC apskritai?

Debezium - CDC programinės įrangos kategorijos atstovas (Užfiksuoti duomenų pasikeitimą), tiksliau, tai yra įvairių DBVS jungčių rinkinys, suderinamas su Apache Kafka Connect sistema.

Jis atvirojo kodo projektas, licencijuota pagal Apache License v2.0 ir remiama Red Hat. Kuriama nuo 2016 m. ir šiuo metu ji teikia oficialų palaikymą šioms DBVS: MySQL, PostgreSQL, MongoDB, SQL Server. Taip pat yra „Cassandra“ ir „Oracle“ jungčių, tačiau šiuo metu jos yra „ankstyvosios prieigos“ būsenoje, o nauji leidimai negarantuoja atgalinio suderinamumo.

Jei palyginsime CDC su tradiciniu metodu (kai programa nuskaito duomenis tiesiogiai iš DBVS), pagrindiniai jo pranašumai yra duomenų pasikeitimo srautinio perdavimo eilutės lygiu įgyvendinimas su maža delsa, dideliu patikimumu ir prieinamumu. Paskutiniai du taškai pasiekiami naudojant Kafka klasterį kaip CDC įvykių saugyklą.

Be to, privalumai yra tai, kad įvykiams saugoti naudojamas vienas modelis, todėl galutinei programai nereikia jaudintis dėl skirtingų DBVS veikimo niuansų.

Galiausiai, naudojant pranešimų tarpininką, atveriama galimybė horizontaliai keisti duomenų pokyčius sekančias programas. Tuo pačiu metu poveikis duomenų šaltiniui yra minimalus, nes duomenys gaunami ne tiesiogiai iš DBVS, o iš Kafka klasterio.

Apie Debeziumo architektūrą

Debezium naudojimas susideda iš šios paprastos schemos:

DBVS (kaip duomenų šaltinis) → Kafka Connect jungtis → Apache Kafka → vartotojas

Kaip iliustraciją pateiksiu schemą iš projekto svetainės:

Pristatome Debezium – CDC, skirtą Apache Kafka

Tačiau ši schema man nelabai patinka, nes atrodo, kad galima tik kriauklės jungtis.

Iš tikrųjų situacija yra kitokia: užpildykite savo duomenų ežerą (paskutinė nuoroda aukščiau esančioje diagramoje) nėra vienintelis Debezium naudojimo būdas. Į „Apache Kafka“ išsiųstus įvykius jūsų programos gali naudoti įvairioms situacijoms išspręsti. Pavyzdžiui:

  • nereikšmingų duomenų pašalinimas iš talpyklos;
  • pranešimų siuntimas;
  • paieškos rodyklės atnaujinimai;
  • tam tikri audito žurnalai;
  • ...

Jei turite Java programą ir nėra poreikio / galimybės naudoti Kafka klasterio, taip pat yra galimybė dirbti su įdėta jungtis. Akivaizdus pliusas yra tas, kad su juo galite atsisakyti papildomos infrastruktūros (jungties ir „Kafka“ pavidalu). Tačiau šis sprendimas buvo nebenaudojamas nuo 1.1 versijos ir neberekomenduojamas naudoti (jis gali būti pašalintas būsimuose leidimuose).

Šiame straipsnyje bus aptarta kūrėjų rekomenduojama architektūra, užtikrinanti atsparumą gedimams ir mastelio keitimą.

Jungties konfigūracija

Norint pradėti stebėti svarbiausios reikšmės – duomenų – pokyčius, mums reikia:

  1. duomenų šaltinis, kuris gali būti MySQL nuo 5.7 versijos, PostgreSQL 9.6+, MongoDB 3.2+ (visas sąrašas);
  2. Apache Kafka klasteris
  3. Kafka Connect egzempliorius (1.x, 2.x versijos);
  4. sukonfigūruota Debezium jungtis.

Dirbkite su pirmaisiais dviem punktais, t.y. DBVS ir „Apache Kafka“ diegimo procesas nepatenka į straipsnio taikymo sritį. Tačiau tiems, kurie nori viską išdėstyti smėlio dėžėje, oficialioje saugykloje yra paruošta vieta su pavyzdžiais docker-compose.yaml.

Mes sutelksime dėmesį į paskutinius du punktus išsamiau.

0. Kafka Connect

Čia ir vėliau straipsnyje visi konfigūracijos pavyzdžiai nagrinėjami Debezium kūrėjų platinamo Docker vaizdo kontekste. Jame yra visi reikalingi papildinių failai (jungtys) ir pateikiama Kafka Connect konfigūracija naudojant aplinkos kintamuosius.

Jei ketinate naudoti „Kafka Connect“ iš „Confluent“, reikiamų jungčių papildinius turėsite įtraukti į katalogą, nurodytą plugin.path arba nustatyti per aplinkos kintamąjį CLASSPATH. Kafka Connect darbuotojo ir jungčių parametrai apibrėžiami per konfigūracijos failus, kurie perduodami kaip argumentai darbuotojo paleidimo komandai. Daugiau informacijos žr dokumentacija.

Visas Debeizum nustatymo jungties versijoje procesas atliekamas dviem etapais. Panagrinėkime kiekvieną iš jų:

1. Kafka Connect sistemos nustatymas

Norint srautiniu būdu perduoti duomenis į „Apache Kafka“ klasterį, „Kafka Connect“ sistemoje nustatomi konkretūs parametrai, pavyzdžiui:

  • klasterio ryšio parametrai,
  • temų, kuriose bus saugoma pačios jungties konfigūracija, pavadinimai,
  • grupės, kurioje veikia jungtis, pavadinimas (jei naudojamas paskirstytas režimas).

Oficialus projekto „Docker“ vaizdas palaiko konfigūraciją naudojant aplinkos kintamuosius – tai mes naudosime. Taigi atsisiunčiame paveikslėlį:

docker pull debezium/connect

Minimalus aplinkos kintamųjų rinkinys, reikalingas jungtis paleisti, yra toks:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - pradinis Kafka klasterio serverių sąrašas, kad gautumėte visą klasterio narių sąrašą;
  • OFFSET_STORAGE_TOPIC=connector-offsets — pozicijų, kuriose šiuo metu yra jungtis, saugojimo tema;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - jungties būsenos ir jos užduočių saugojimo tema;
  • CONFIG_STORAGE_TOPIC=connector-config - jungties konfigūracijos duomenų saugojimo tema ir jos užduotys;
  • GROUP_ID=1 — darbuotojų grupės, kuriai galima atlikti jungties užduotį, identifikatorius; reikalingas naudojant paskirstytą (paskirstytas) režimas.

Mes pradedame konteinerį šiais kintamaisiais:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Pastaba apie Avro

Pagal numatytuosius nustatymus Debezium įrašo duomenis JSON formatu, kuris yra priimtinas smėlio dėžėms ir nedideliems duomenų kiekiams, tačiau gali būti problema labai apkrautose duomenų bazėse. Alternatyva JSON keitikliui yra nuoseklizuoti pranešimus naudojant Avro į dvejetainį formatą, kuris sumažina įvesties / išvesties posistemio apkrovą „Apache Kafka“.

Norėdami naudoti „Avro“, turite įdiegti atskirą schema-registras (schemų saugojimui). Keitiklio kintamieji atrodys taip:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Išsami informacija apie „Avro“ naudojimą ir jo registro nustatymą nepatenka į straipsnio taikymo sritį – toliau, siekiant aiškumo, naudosime JSON.

2. Pačios jungties nustatymas

Dabar galite pereiti tiesiai į pačios jungties konfigūraciją, kuri nuskaitys duomenis iš šaltinio.

Pažiūrėkime į dviejų DBVS jungčių pavyzdį: PostgreSQL ir MongoDB, kurių srityje turiu patirties ir yra skirtumų (nors ir nedidelių, bet kai kuriais atvejais reikšmingų!).

Konfigūracija aprašyta JSON žymėjimu ir įkelta į Kafka Connect naudojant POST užklausą.

2.1. PostgreSQL

„PostgreSQL“ jungties konfigūracijos pavyzdys:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Jungties veikimo principas po šios konfigūracijos yra gana paprastas:

  • Pirmą kartą paleidus, jis prisijungia prie konfigūracijoje nurodytos duomenų bazės ir paleidžiamas režimu pradinis momentinis vaizdas, siunčiant Kafkai pradinį duomenų rinkinį, gautą su sąlyga SELECT * FROM table_name.
  • Baigus inicijuoti, jungtis pereina į „PostgreSQL WAL“ failų pakeitimų skaitymo režimą.

Apie naudojamas parinktis:

  • name — jungties, kuriai naudojama toliau aprašyta konfigūracija, pavadinimas; ateityje šis pavadinimas bus naudojamas darbui su jungtimi (t. y. peržiūrėti būseną / paleisti iš naujo / atnaujinti konfigūraciją) per Kafka Connect REST API;
  • connector.class — DBVS jungties klasė, kurią naudos sukonfigūruota jungtis;
  • plugin.name yra įskiepio, skirto loginiam duomenų iš WAL failų dekodavimui, pavadinimas. Galima rinktis wal2json, decoderbuffs и pgoutput. Pirmiesiems dviem reikia įdiegti atitinkamus plėtinius DBVS ir pgoutput PostgreSQL 10 ir naujesnėms versijoms nereikia papildomų manipuliacijų;
  • database.* — prisijungimo prie duomenų bazės galimybės, kur database.server.name - PostgreSQL egzemplioriaus pavadinimas, naudojamas sudarant temos pavadinimą Kafka klasteryje;
  • table.include.list - lentelių, kuriose norime sekti pakeitimus, sąrašas; pateikta forma schema.table_name; negalima naudoti kartu su table.exclude.list;
  • heartbeat.interval.ms — intervalas (milisekundėmis), kuriuo jungtis siunčia širdies plakimo pranešimus į specialią temą;
  • heartbeat.action.query - užklausa, kuri bus vykdoma siunčiant kiekvieną širdies plakimo pranešimą (parinktis atsirado nuo 1.1 versijos);
  • slot.name — replikacijos lizdo, kurį naudos jungtis, pavadinimas;
  • publication.name - Vardas Leidinys PostgreSQL, kurį naudoja jungtis. Jei jo nėra, Debezium bandys jį sukurti. Jei vartotojas, kuriuo užmezgamas ryšys, neturi pakankamai teisių šiam veiksmui, jungtis išeis su klaida;
  • transforms nustato, kaip tiksliai pakeisti tikslinės temos pavadinimą:
    • transforms.AddPrefix.type rodo, kad naudosime reguliariąsias išraiškas;
    • transforms.AddPrefix.regex — kaukė, pagal kurią iš naujo apibrėžiamas tikslinės temos pavadinimas;
    • transforms.AddPrefix.replacement - tiesiogiai ką mes iš naujo apibrėžiame.

Daugiau apie širdies plakimą ir transformacijas

Pagal numatytuosius nustatymus jungtis siunčia duomenis „Kafka“ už kiekvieną įvykdytą operaciją ir įrašo savo LSN (logo sekos numerį) į paslaugų temą offset. Bet kas atsitiks, jei jungtis sukonfigūruota skaityti ne visą duomenų bazę, o tik dalį jos lentelių (kuriose duomenys atnaujinami retai)?

  • Jungtis nuskaitys WAL failus ir neaptiks juose esančių operacijų įsipareigojimų stebimose lentelėse.
  • Todėl jis neatnaujins savo dabartinės padėties nei temoje, nei replikacijos srityje.
  • Dėl to WAL failai „įstrigs“ diske ir greičiausiai jiems pritrūks vietos.

Ir čia į pagalbą ateina variantai. heartbeat.interval.ms и heartbeat.action.query. Naudojant šias parinktis poromis, kiekvieną kartą, kai siunčiamas širdies plakimo pranešimas, galima vykdyti užklausą pakeisti duomenis atskiroje lentelėje. Taigi LSN, kuriame šiuo metu yra jungtis (replikacijos lizde), yra nuolat atnaujinamas. Tai leidžia DBVS pašalinti WAL failus, kurių nebereikia. Norėdami gauti daugiau informacijos apie tai, kaip veikia parinktys, žr dokumentacija.

Kitas variantas, kuris nusipelno atidesnio dėmesio transforms. Nors tai daugiau apie patogumą ir grožį...

Pagal numatytuosius nustatymus Debezium kuria temas naudodama šią pavadinimų politiką: serverName.schemaName.tableName. Tai ne visada gali būti patogu. Galimybės transforms Naudodami reguliariąsias išraiškas galite apibrėžti lentelių, kurių įvykius reikia nukreipti į temą konkrečiu pavadinimu, sąrašą.

Mūsų konfigūracijoje dėka transforms atsitinka taip: visi CDC įvykiai iš stebimos duomenų bazės pateks į temą su pavadinimu data.cdc.dbname. Kitu atveju (be šių nustatymų) Debezium pagal numatytuosius nustatymus sukurtų temą kiekvienai formos lentelei: pg-dev.public.<table_name>.

Jungties apribojimai

„PostgreSQL“ jungties konfigūracijos aprašymo pabaigoje verta pakalbėti apie šias jo darbo ypatybes / apribojimus:

  1. „PostgreSQL“ jungties funkcija priklauso nuo loginio dekodavimo koncepcijos. Todėl jis neseka prašymų pakeisti duomenų bazės struktūrą (DDL) – atitinkamai šių duomenų temose nebus.
  2. Kadangi naudojami replikacijos lizdai, galima prijungti jungtį tik į pagrindinį DBVS egzempliorių.
  3. Jei vartotojas, pagal kurį jungtis jungiasi prie duomenų bazės, turi tik skaitymo teises, prieš pirmą kartą paleidžiant turėsite rankiniu būdu sukurti replikacijos lizdą ir paskelbti duomenų bazėje.

Konfigūracijos taikymas

Taigi įkelkime savo konfigūraciją į jungtį:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Patikriname, ar atsisiuntimas buvo sėkmingas, ir jungtis paleista:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Puiku: jis nustatytas ir paruoštas naudoti. Dabar apsimeskime vartotoju ir prisijunkime prie Kafkos, po to pridedame ir keičiame įrašą lentelėje:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mūsų temoje tai bus rodoma taip:

Labai ilgas JSON su mūsų pakeitimais

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Abiem atvejais įrašai susideda iš pakeisto įrašo rakto (PK) ir pačios pakeitimų esmės: koks įrašas buvo anksčiau ir kuo tapo po.

  • Tais atvejais, kai INSERT: vertė prieš (before) lygus nullpo kurio seka įterpta eilutė.
  • Tais atvejais, kai UPDATE: į payload.before rodoma ankstesnė eilutės būsena ir į payload.after - nauja su pasikeitimo esme.

2.2 MongoDB

Ši jungtis naudoja standartinį MongoDB replikacijos mechanizmą, nuskaito informaciją iš DBVS pirminio mazgo operacijų žurnalo.

Panašiai kaip jau aprašyta PgSQL jungtis, čia taip pat pirmą kartą paleidžiant daroma pirminių duomenų momentinė nuotrauka, po kurios jungtis persijungia į oplog skaitymo režimą.

Konfigūracijos pavyzdys:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kaip matote, lyginant su ankstesniu pavyzdžiu naujų parinkčių nėra, tačiau sumažintas tik parinkčių, atsakingų už prisijungimą prie duomenų bazės ir jų priešdėlių skaičius.

Nustatymai transforms šį kartą jie daro taip: pasukite tikslinės temos pavadinimą iš schemos <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

atsparumas gedimams

Gedimų tolerancijos ir didelio prieinamumo problema mūsų laikais yra kaip niekad opi – ypač kai kalbame apie duomenis ir operacijas, o duomenų pasikeitimo stebėjimas šiuo klausimu nėra nuošalyje. Pažiūrėkime, kas iš principo gali suklysti ir kas atsitiks su Debezium kiekvienu atveju.

Yra trys atsisakymo parinktys:

  1. Kafka Connect gedimas. Jei „Connect“ sukonfigūruotas veikti paskirstytu režimu, keli darbuotojai turi nustatyti tą patį group.id. Tada, jei vienas iš jų sugenda, jungtis bus paleista iš naujo kitam darbuotojui ir bus tęsiamas skaitymas nuo paskutinės nustatytos temos pozicijos Kafka.
  2. Ryšio su Kafka grupe praradimas. Jungtis tiesiog nustos skaityti toje padėtyje, kurios nepavyko išsiųsti Kafkai, ir periodiškai bandys ją išsiųsti iš naujo, kol bandymas bus sėkmingas.
  3. Duomenų šaltinis nepasiekiamas. Jungtis bandys vėl prisijungti prie šaltinio pagal konfigūraciją. Numatytasis yra 16 bandymų naudojant eksponentinis atsitraukimas. Po 16 nesėkmingo bandymo užduotis bus pažymėta kaip nepavyko ir jį reikės rankiniu būdu iš naujo paleisti per Kafka Connect REST sąsają.
    • Tais atvejais, kai PostgreSQL duomenys nebus prarasti, nes naudojant replikacijos lizdus bus išvengta WAL failų, kurių jungtis neskaito, ištrynimo. Šiuo atveju yra ir minusas: jei tinklo ryšys tarp jungties ir DBVS yra sutrikęs ilgą laiką, yra tikimybė, kad diske pritrūks vietos, o tai gali lemti visos DBVS gedimą.
    • Tais atvejais, kai MySQL binlog failus gali pasukti pati DBVS prieš atkuriant ryšį. Dėl to jungtis pereis į nesėkmingą būseną ir ją reikės iš naujo paleisti pradinio momentinio fotografavimo režimu, kad būtų galima toliau skaityti iš binlogų ir atkurti normalų veikimą.
    • apie MongoDB. Dokumentacijoje rašoma: jungties elgsena, jei žurnalo / oplog failai buvo ištrinti ir jungtis negali tęsti skaitymo nuo tos vietos, kurioje jis buvo baigtas, yra vienoda visoms DBVS. Tai slypi tame, kad jungtis pateks į būseną nepavyko ir reikės iš naujo paleisti režimu pradinis momentinis vaizdas.

      Tačiau yra išimčių. Jei jungtis ilgą laiką buvo atjungta (arba negalėjo pasiekti MongoDB egzemplioriaus) ir per tą laiką buvo pasuktas oplogas, tada, kai ryšys bus atkurtas, jungtis ramiai toliau skaitys duomenis iš pirmosios galimos padėties. , todėl kai kurie Kafkos duomenys ne pataikys.

išvada

Debezium yra mano pirmoji patirtis su CDC sistemomis ir apskritai buvo labai teigiama. Projektas papirko pagrindinių DBVS palaikymą, konfigūravimo paprastumą, grupavimo palaikymą ir aktyvią bendruomenę. Tiems, kurie domisi praktika, rekomenduoju perskaityti vadovus Kafka Connect и Debezium.

Palyginti su JDBC jungtimi, skirta Kafka Connect, pagrindinis Debezium pranašumas yra tas, kad pakeitimai nuskaitomi iš DBMS žurnalų, o tai leidžia gauti duomenis su minimaliu delsimu. JDBC jungtis (pateikiama „Kafka Connect“) fiksuotu intervalu pateikia užklausą stebimoje lentelėje ir (dėl tos pačios priežasties) negeneruoja pranešimų, kai duomenys ištrinami (kaip galite pateikti užklausą dėl duomenų, kurių nėra?).

Norėdami išspręsti panašias problemas, galite atkreipti dėmesį į šiuos sprendimus (be Debezium):

PS

Taip pat skaitykite mūsų tinklaraštyje:

Šaltinis: www.habr.com

Добавить комментарий