Debezium - Apache Kafkarako CDC aurkezten

Debezium - Apache Kafkarako CDC aurkezten

Nire lanean, sarritan aurkitzen naiz soluzio/software-produktu tekniko berriak, eta horri buruzko informazioa nahiko urria da errusierazko Interneten. Artikulu honekin halako hutsune bat betetzen saiatuko naiz nire azken praktikako adibide batekin, Debezium erabiliz CDC gertaerak bi DBMS ezagunetatik (PostgreSQL eta MongoDB) Kafka kluster batera bidaltzea konfiguratu behar nuenean. Egindako lanaren ondorioz agertzen den berrikuspen-artikulu hau besteentzat baliagarria izatea espero dut.

Zer da Debezium eta CDC orokorrean?

Debezium β€” CDC software kategoriako ordezkaria (Harrapatu datuen aldaketa), edo zehatzago esanda, Apache Kafka Connect esparruarekin bateragarriak diren hainbat DBMSren konektore multzoa da.

It Kode irekiko proiektua, Apache License v2.0 lizentziapean eta Red Hat-ek babestuta. Garapena 2016az geroztik etengabea da eta gaur egun DBMS hauetarako laguntza ofiziala eskaintzen du: MySQL, PostgreSQL, MongoDB, SQL Server. Cassandra eta Oraclerako konektoreak ere badaude, baina momentuz "sarbide goiztiarra" egoeran daude, eta bertsio berriek ez dute atzerako bateragarritasuna bermatzen.

CDC ikuspegi tradizionalarekin alderatzen badugu (aplikazioak DBMSko datuak zuzenean irakurtzen dituenean), bere abantaila nagusiak errenkada mailan datu-aldaketaren streaming-a ezartzea latentzia baxuarekin, fidagarritasun handiarekin eta erabilgarritasunarekin. Azken bi puntuak Kafka kluster bat CDC gertaeren biltegi gisa erabiliz lortzen dira.

Beste abantaila bat gertaerak gordetzeko eredu bakarra erabiltzen dela da, beraz, amaierako aplikazioak ez du kezkatu behar DBMS ezberdinen funtzionamenduaren Γ±abarduraz.

Azkenik, mezu-artekari bat erabiltzeak datuen aldaketak kontrolatzen dituzten aplikazioei horizontalki eskalatzeko aukera ematen die. Aldi berean, datu-iturburuan eragina gutxitzen da, datuak ez baitira zuzenean DBMStik lortzen, Kafka klusterretik baizik.

Debezium arkitekturari buruz

Debezium erabiltzea eskema sinple honetara dator:

DBMS (datu iturri gisa) β†’ Kafka Connect-en konektorea β†’ Apache Kafka β†’ kontsumitzailea

Irudi gisa, hona hemen proiektuaren webguneko diagrama bat:

Debezium - Apache Kafkarako CDC aurkezten

Hala ere, ez zait oso gustatzen eskema hau, badirudi konketa-konektorea soilik erabiltzea posible dela.

Egia esan, egoera bestelakoa da: zure Data Lake betetzea (Goiko diagramako azken esteka) Hau ez da Debezium erabiltzeko modu bakarra. Apache Kafka-ra bidalitako gertaerak zure aplikazioek hainbat egoera kudeatzeko erabil ditzakete. Adibidez:

  • garrantzirik gabeko datuak cachetik kentzea;
  • jakinarazpenak bidaltzea;
  • bilaketa-indizearen eguneraketak;
  • nolabaiteko ikuskaritza erregistroak;
  • ...

Java aplikazio bat baduzu eta Kafka kluster bat erabiltzeko behar/aukerarik ez badago, lan egiteko aukera ere badago. konektore txertatua. Abantaila nabaria da azpiegitura gehigarrien beharra ezabatzen duela (konektore eta Kafka moduan). Dena den, irtenbide hau zaharkituta dago 1.1 bertsioaz geroztik eta jada ez da gomendagarria erabiltzeko (baliteke etorkizuneko bertsioetan ezabatu egingo den laguntza).

Artikulu honetan garatzaileek gomendatutako arkitektura eztabaidatuko da, akatsen tolerantzia eta eskalagarritasuna eskaintzen dituena.

Konektorearen konfigurazioa

Balio garrantzitsuenean - datuak - aldaketen jarraipena egiten hasteko, behar dugu:

  1. datu-iturria, MySQL izan daiteke 5.7 bertsiotik hasita, PostgreSQL 9.6+, MongoDB 3.2+ (zerrenda osoa);
  2. Apache Kafka klusterra;
  3. Kafka Connect instantzia (1.x, 2.x bertsioak);
  4. konfiguratutako Debezium konektorea.

Lehenengo bi puntuak landu, hau da. DBMS eta Apache Kafkaren instalazio prozesua artikuluaren esparrutik kanpo dago. Hala ere, sandbox-ean dena zabaldu nahi dutenentzat, adibideekin biltegi ofizialak prest dago. docker-compose.yaml.

Azken bi puntuetan sakonduko dugu.

0. Kafka Konektatu

Hemen eta gehiago artikuluan, konfigurazio adibide guztiak Debezium garatzaileek banatutako Docker irudiaren testuinguruan eztabaidatzen dira. Beharrezko plugin-fitxategi guztiak (konektoreak) ditu eta Kafka Connect-en konfigurazioa eskaintzen du ingurune-aldagaiak erabiliz.

Kafka Connect Confluent-etik erabiltzeko asmoa baduzu, independentean gehitu beharko dituzu beharrezko konektoreen pluginak atalean zehaztutako direktorioan. plugin.path edo ingurune-aldagai baten bidez ezarri CLASSPATH. Kafka Connect langilearen eta konektoreen ezarpenak langilea abiarazteko komandoari argumentu gisa pasatzen diren konfigurazio fitxategien bidez zehazten dira. Xehetasun gehiagorako, ikus dokumentazioa.

Debeizum konektore bertsioan konfiguratzeko prozesu osoa bi fasetan egiten da. Ikus ditzagun horietako bakoitza:

1. Kafka Connect esparrua konfiguratzea

Apache Kafka klusterera datuak igortzeko, parametro zehatzak ezartzen dira Kafka Connect esparruan, hala nola:

  • clusterra konektatzeko parametroak,
  • konektorearen beraren konfigurazioa zuzenean gordeko den gaien izenak,
  • konektorea exekutatzen ari den taldearen izena (modu banatua erabiltzen bada).

Proiektuaren Docker-en irudi ofizialak ingurune-aldagaiak erabiliz konfigurazioa onartzen du - hau da erabiliko duguna. Beraz, deskargatu irudia:

docker pull debezium/connect

Konektorea exekutatzeko beharrezkoa den ingurune-aldagaien gutxieneko multzoa honako hau da:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 β€” Kafka klusterreko zerbitzarien hasierako zerrenda klusterreko kideen zerrenda osoa lortzeko;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” konektorea gaur egun dagoen lekuak gordetzeko gaia;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status β€” konektorearen egoera eta bere zereginak gordetzeko gaia;
  • CONFIG_STORAGE_TOPIC=connector-config β€” konektoreen konfigurazio datuak eta bere zereginak gordetzeko gaia;
  • GROUP_ID=1 β€” konektore-ataza exekutatu daitekeen langile-taldearen identifikatzailea; beharrezkoak banatuta erabiltzean (banatua) erregimena.

Aldagai hauekin edukiontzia abiarazten dugu:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro-ri buruzko oharra

Lehenespenez, Debezium-ek JSON formatuan idazten ditu datuak, sandbox eta datu kopuru txikietarako onargarria dena, baina oso kargatutako datu-baseetan arazo bihur daiteke. JSON bihurgailuaren alternatiba mezuak erabiliz serializatzea da avro formatu bitar batean, Apache Kafka-ko I/O azpisistemaren karga murrizten duena.

Avro erabiltzeko, bereizi bat zabaldu behar duzu eskema-erregistroa (diagramak gordetzeko). Bihurgailuaren aldagaiak honelakoak izango dira:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro erabiltzeko eta horretarako erregistroa konfiguratzeko xehetasunak artikulu honen esparrutik kanpo daude - aurrerago, argitasunerako, JSON erabiliko dugu.

2. Konektorea bera konfiguratzea

Orain zuzenean joan zaitezke konektorearen beraren konfiguraziora, iturriko datuak irakurriko dituena.

Ikus ditzagun bi DBMSren konektoreen adibidea: PostgreSQL eta MongoDB, esperientzia dudana eta desberdintasunak (txikiak izan arren, kasu batzuetan esanguratsuak!).

Konfigurazioa JSON idazkeran deskribatzen da eta Kafka Connect-era kargatzen da POST eskaera baten bidez.

2.1. PostgreSQL

PostgreSQL-rako konektorearen konfigurazio adibidea:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Konfigurazio honen ondoren konektorearen funtzionamendu-printzipioa nahiko erraza da:

  • Lehen aldiz abiarazten denean, konfigurazioan zehaztutako datu-basera konektatzen da eta moduan hasten da hasierako argazkia, baldintzazkoa erabiliz lortutako hasierako datu multzoa Kafkari bidaliz SELECT * FROM table_name.
  • Hasieratzea amaitu ondoren, konektorea modura sartzen da PostgreSQL WAL fitxategietako aldaketak irakurtzeko.

Erabilitako aukerei buruz:

  • name β€” behean deskribatutako konfigurazioa erabiltzen den konektorearen izena; etorkizunean, izen hau konektorearekin lan egiteko erabiltzen da (hau da, egoera ikusi/berrabiarazi/eguneratu konfigurazioa) Kafka Connect REST APIaren bidez;
  • connector.class β€” Konfiguratutako konektoreak erabiliko duen DBMS konektore-klasea;
  • plugin.name β€” WAL fitxategietako datuen deskodetze logikorako pluginaren izena. Aukeran eskuragarri wal2json, decoderbuffs ΠΈ pgoutput. Lehenengo biek DBMSn luzapen egokiak instalatzea eskatzen dute, eta pgoutput PostgreSQL 10 bertsiorako eta berriagoak ez ditu manipulazio gehigarririk behar;
  • database.* β€” datu-basera konektatzeko aukerak, non database.server.name β€” Kafka klusterrean gaiaren izena osatzeko PostgreSQL instantziaren izena;
  • table.include.list β€” aldaketen jarraipena egin nahi dugun taulen zerrenda; formatuan zehaztuta schema.table_name; ezin da batera erabili table.exclude.list;
  • heartbeat.interval.ms β€” tartea (milisegundotan) konektoreak bidaltzen dituen taupadak gai berezi bati;
  • heartbeat.action.query β€” Bihotz-taupadak mezu bakoitza bidaltzean gauzatuko den eskaera (1.1 bertsioan agertu zen aukera);
  • slot.name β€” konektoreak erabiliko duen erreplikazio-zirrikituaren izena;
  • publication.name - Izena Argitalpena Konektoreak erabiltzen duen PostgreSQL-n. Existitzen ez bada, Debezium saiatuko da sortzen. Konexioa egiten den erabiltzaileak ekintza honetarako eskubide nahikorik ez badu, konektorea akats batekin amaituko da;
  • transforms xede-gaiaren izena zehazki nola aldatu zehazten du:
    • transforms.AddPrefix.type adiera erregularrak erabiliko ditugula adierazten du;
    • transforms.AddPrefix.regex β€” xede-gaiaren izena birdefinitzen duen maskara;
    • transforms.AddPrefix.replacement - zuzenean birdefinitzen ari garena.

Bihotz-taupadei eta transformazioei buruz gehiago

Lehenespenez, konektoreak konprometitutako transakzio bakoitzeko datuak bidaltzen dizkio Kafkari, eta bere LSN (Log Sequence Number) zerbitzu-gaian erregistratzen da. offset. Baina zer gertatzen da konektorea datu-base osoa ez irakurtzeko konfiguratuta badago, bere taulen zati bat baino ez (datuen eguneraketak ez diren maiz gertatzen)?

  • Konektoreak WAL fitxategiak irakurriko ditu eta ez du detektatuko transakzio-konpromisorik kontrolatzen ari den tauletan.
  • Hori dela eta, ez du egungo posizioa eguneratuko ez gaian ez erreplika-zerbitzuan.
  • Honek, WAL fitxategiak diskoan gordetzea eragingo du eta ziurrenik diskoko lekurik gabe geratuko da.

Eta hemen aukerak erreskatatu egiten dira. heartbeat.interval.ms ΠΈ heartbeat.action.query. Aukera hauek binaka erabiliz gero, bihotz-taupada-mezu bat bidaltzen den bakoitzean datuak taula bereizi batean aldatzeko eskaera egiteko aukera ematen du. Horrela, konektorea (erreplikatzeko zirrikituan) dagoen LSN-a etengabe eguneratzen da. Horri esker, DBMSak beharrezkoak ez diren WAL fitxategiak ken ditzake. Aukeren funtzionamenduari buruz gehiago jakin dezakezu dokumentazioa.

Arreta handiagoz merezi duen beste aukera bat da transforms. Erosotasunari eta edertasunari buruz gehiago bada ere...

Lehenespenez, Debezium-ek izendapen-politika hau erabiliz sortzen ditu gaiak: serverName.schemaName.tableName. Hau ez da beti komenigarria izango. Aukerak transforms Adierazpen erregularrak erabil ditzakezu taulen zerrenda definitzeko, eta bertatik izen zehatz bat duen gai batera bideratu behar diren gertaerak.

Gure konfigurazioan eskerrik asko transforms honako hau gertatzen da: monitorizatutako datu-baseko CDC gertaera guztiak izena duen gai batera joango dira data.cdc.dbname. Bestela (ezarpen hauek gabe), Debezium-ek modu lehenetsian taula bakoitzerako gai bat sortuko luke: pg-dev.public.<table_name>.

Konektoreen mugak

PostgreSQL-ren konektorearen konfigurazioaren deskribapena amaitzeko, merezi du bere funtzionamenduaren ezaugarri/muga hauei buruz hitz egitea:

  1. PostgreSQL-ren konektorearen funtzionalitatea deskodetze logikoaren kontzeptuan oinarritzen da. Horregatik zuen ez ditu datu-basearen egitura aldatzeko eskaeren jarraipena egiten (DDL) - horren arabera, datu hauek ez dira gaietan egongo.
  2. Erreplikatzeko zirrikituak erabiltzen direnez, konektore bat konektatzea posible da bakarrik DBMS instantzia nagusira.
  3. Konektorea datu-basera konektatzen den erabiltzaileak irakurtzeko soilik eskubideak baditu, lehenengo abiarazi baino lehen eskuz sortu beharko duzu erreplika-zirrikitu bat eta datu-basean argitaratu.

Konfigurazioa aplikatzea

Beraz, karga dezagun gure konfigurazioa konektorean:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Deskarga arrakastatsua izan dela eta konektorea hasi dela egiaztatzen dugu:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Bikaina: konfiguratuta dago eta prest dago. Orain kontsumitzaile bat garela itxuratu eta Kafka-ra konektatu, ondoren taulako sarrera bat gehitu eta aldatuko dugu:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Gure gaian honela agertuko da:

JSON oso luzea gure aldaketekin

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Bi kasuetan, erregistroak aldatutako erregistroaren gakoa (PK) eta aldaketen funtsa bera osatzen dute: erregistroa zer zen aurretik eta zer bihurtu zen ondoren.

  • Kasu horretan INSERT: aurreko balioa (before) berdin null, eta ondoren - txertatu zen lerroa.
  • Kasu horretan UPDATE: at payload.before lerroaren aurreko egoera bistaratzen da, eta barruan payload.after β€” aldaketen funtsa berria.

2.2 MongoDB

Konektore honek MongoDB erreplikazio-mekanismo estandarra erabiltzen du, DBMS nodo nagusiaren oplog-aren informazioa irakurtzen.

Dagoeneko deskribatutako PgSQL-rako konektorearen antzera, hemen ere, lehen hasieran, lehen datuen argazkia hartzen da, eta ondoren konektorea oplog irakurketa modura aldatzen da.

Konfigurazio adibidea:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Ikus dezakezunez, hemen ez dago aukera berririk aurreko adibidearekin alderatuta, baina datu-basera konektatzeko ardura duten aukerak eta haien aurrizkiak soilik murriztu dira.

Ezarpenak transforms oraingoan honako hau egiten dute: xede-gaiaren izena eskematik eraldatzen dute <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

akatsen tolerantzia

Gure garaian hutsegite-tolerantziaren eta erabilgarritasun handiaren gaia inoiz baino zorrotzagoa da, batez ere datuei eta transakzioei buruz ari garenean, eta datuen aldaketen jarraipena ez da alde batera uzten arazo honetan. Ikus dezagun printzipioz zer oker egon daitekeen eta kasu bakoitzean Debezium-ekin zer gertatuko den.

Hiru aukera daude baztertzeko:

  1. Kafka Connect porrota. Connect modu banatuan lan egiteko konfiguratuta badago, hainbat langile behar dira group.id bera ezartzea. Ondoren, horietako batek huts egiten badu, konektorea beste langile batean berrabiaraziko da eta Kafkan gaian konprometitutako azken posiziotik irakurtzen jarraituko du.
  2. Kafka klusterrarekiko konektagarritasuna galtzea. Konektoreak irakurtzeari utziko dio Kafkari bidali huts egin duen posizioan, eta aldizka berriro bidaltzen saiatuko da saiakerak arrakasta izan arte.
  3. Datu-iturburua ez dago erabilgarri. Konektorea iturburura berriro konektatzen saiatuko da konfiguratuta dagoen moduan. Lehenetsia 16 saiakera erabiltzen da atzerapen esponentziala. Arrakastarik gabeko 16. saiakera egin ondoren, zeregina honela markatuko da huts egin du eta eskuz berrabiarazi beharko duzu Kafka Connect REST interfazearen bidez.
    • Kasu horretan PostgreSQL datuak ez dira galduko, zeren Erreplikatzeko zirrikituak erabiltzeak konektoreak irakurtzen ez dituen WAL fitxategiak ezabatzea eragotziko du. Kasu honetan, txanponak alde txar bat ere badu: konektorearen eta DBMSren arteko sare-konektibitatea denbora luzez eteten bada, aukera dago diskoko espazioa agortzea, eta horrek hutsegite bat ekar dezake. DBMS osoa.
    • Kasu horretan MySQL binlog fitxategiak DBMSak berak biratu ditzake konektibitatea berreskuratu aurretik. Honek konektorea hutsegite egoerara eramango du, eta funtzionamendu normala berrezartzeko, hasierako argazki moduan berrabiarazi beharko duzu binlog-etatik irakurtzen jarraitzeko.
    • ΠŸΡ€ΠΎ MongoDB. Dokumentazioak zera dio: log/oplog fitxategiak ezabatu direnean eta konektoreak utzitako posiziotik irakurtzen jarraitu ezin badu konektorearen portaera berdina da DBMS guztientzat. Konektorea egoerara joango dela esan nahi du huts egin du eta moduan berrabiarazi beharko du hasierako argazkia.

      Hala ere, badira salbuespenak. Konektorea denbora luzez deskonektatu bazen (edo ezin izan MongoDB instantziara iritsi), eta oplog-ak biraketa egin badu denbora horretan, orduan konexioa berrezartzen denean, konektoreak lasaitasunez jarraituko du eskuragarri dagoen lehen posiziotik datuak irakurtzen, horregatik Kafkan dauden datu batzuk ez jo egingo du.

Ondorioa

Debezium CDC sistemekin nire lehen esperientzia da eta orokorrean oso positiboa da. Proiektuak irabazi zuen DBMS nagusien laguntzarekin, konfigurazio errazarekin, clustering laguntzarekin eta komunitate aktiboarekin. Praktikan interesa dutenei, gidak irakurtzea gomendatzen dizuet Kafka Konektatu ΠΈ Debezium.

Kafka Connect-erako JDBC konektorearekin alderatuta, Debezium-en abantaila nagusia aldaketak DBMS erregistroetatik irakurtzen direla da, eta horri esker datuak latentzia minimoarekin jaso daitezke. JDBC Konektoreak (Kafka Connect-ekoa) monitorizatutako taulak tarte finko batean kontsultatzen ditu eta (arrazoi beragatik) ez du mezurik sortzen datuak ezabatzen direnean (nola kontsulta ditzakezu existitzen ez diren datuak?).

Antzeko arazoak konpontzeko, honako irtenbide hauei errepara diezaiekezu (Debezium-ez gain):

PS

Irakurri ere gure blogean:

Iturria: www.habr.com

Gehitu iruzkin berria