Nintroduċu Debezium - CDC għal Apache Kafka

Nintroduċu Debezium - CDC għal Apache Kafka

Fix-xogħol tiegħi, ħafna drabi niltaqa 'ma' soluzzjonijiet tekniċi / prodotti ta 'softwer ġodda, li informazzjoni dwarhom hija pjuttost skarsa fuq l-Internet li jitkellem ir-Russu. B'dan l-artikolu, ser nipprova nimla vojt bħal dan b'eżempju mill-prattika reċenti tiegħi, meta kelli nwaqqaf nibgħat avvenimenti CDC minn żewġ DBMSs popolari (PostgreSQL u MongoDB) għal cluster Kafka bl-użu ta 'Debezium. Nittama li dan l-artiklu ta’ reviżjoni, li deher bħala riżultat tax-xogħol li sar, ikun utli għal ħaddieħor.

X'inhu Debezium u CDC b'mod ġenerali?

Debezium - Rappreżentant tal-kategorija tas-softwer CDC (Qbid tad-data bidla), jew b'mod aktar preċiż, huwa sett ta 'konnetturi għal diversi DBMSs li huma kompatibbli mal-qafas Apache Kafka Connect.

Hija proġett open source, liċenzjat taħt l-Apache License v2.0 u sponsorjat minn Red Hat. L-iżvilupp ilu għaddej mill-2016 u bħalissa jipprovdi appoġġ uffiċjali għad-DBMS li ġejjin: MySQL, PostgreSQL, MongoDB, SQL Server. Hemm ukoll konnetturi għal Cassandra u Oracle, iżda bħalissa jinsabu fi stat ta '"aċċess bikri", u ħarġiet ġodda ma jiggarantixxux kompatibilità b'lura.

Jekk inqabblu CDC mal-approċċ tradizzjonali (meta l-applikazzjoni taqra d-dejta mid-DBMS direttament), allura l-vantaġġi ewlenin tagħha jinkludu l-implimentazzjoni ta 'streaming tal-bidla tad-dejta fil-livell ta' ringiela b'latenza baxxa, affidabilità għolja u disponibbiltà. L-aħħar żewġ punti jinkisbu billi jintuża cluster Kafka bħala repożitorju għal avvenimenti CDC.

Ukoll, il-vantaġġi jinkludu l-fatt li mudell wieħed jintuża biex jaħżen l-avvenimenti, għalhekk l-applikazzjoni finali m'għandhiex għalfejn tinkwieta dwar l-sfumaturi tat-tħaddim ta 'DBMS differenti.

Fl-aħħarnett, l-użu ta 'sensar tal-messaġġi jiftaħ skop għal skalar orizzontali ta' applikazzjonijiet li jsegwu l-bidliet fid-dejta. Fl-istess ħin, l-impatt fuq is-sors tad-dejta huwa minimizzat, peress li d-dejta tasal mhux direttament mid-DBMS, iżda mill-cluster Kafka.

Dwar l-arkitettura tad-Debezium

L-użu ta' Debezium niżel għal din l-iskema sempliċi:

DBMS (bħala sors tad-dejta) → konnettur f'Kafka Connect → Apache Kafka → konsumatur

Bħala illustrazzjoni, se nagħti dijagramma mill-websajt tal-proġett:

Nintroduċu Debezium - CDC għal Apache Kafka

Madankollu, ma tantx jogħġobni din l-iskema, għax jidher li huwa possibbli biss konnettur tas-sink.

Fir-realtà, is-sitwazzjoni hija differenti: timla l-Lag tad-Data tiegħek (l-aħħar link fid-dijagramma ta’ hawn fuq) mhuwiex l-uniku mod kif tuża Debezium. Avvenimenti mibgħuta lil Apache Kafka jistgħu jintużaw mill-applikazzjonijiet tiegħek biex jittrattaw diversi sitwazzjonijiet. Pereżempju:

  • tneħħija ta' data irrilevanti mill-cache;
  • jibgħat notifiki;
  • aġġornamenti tal-indiċi tat-tiftix;
  • xi tip ta' reġistru tal-verifika;
  • ...

F'każ li jkollok applikazzjoni Java u m'hemmx bżonn/possibbiltà li tuża cluster Kafka, hemm ukoll il-possibbiltà li taħdem permezz konnettur inkorporat. Il-vantaġġ ovvju huwa li magħha tista 'tirrifjuta infrastruttura addizzjonali (fil-forma ta' konnettur u Kafka). Madankollu, din is-soluzzjoni ġiet deprecata mill-verżjoni 1.1 u m'għadhiex rakkomandata għall-użu (tista 'titneħħa f'rilaxxi futuri).

Dan l-artikolu ser jiddiskuti l-arkitettura rakkomandata mill-iżviluppaturi, li tipprovdi tolleranza għall-ħsarat u skalabbiltà.

Konfigurazzjoni tal-konnettur

Sabiex nibdew insegwu l-bidliet fl-iktar valur importanti - data - neħtieġu:

  1. sors tad-dejta, li jista' jkun MySQL li jibda mill-verżjoni 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lista sħiħa);
  2. cluster Apache Kafka
  3. Kafka Connect istanza (verżjonijiet 1.x, 2.x);
  4. konnettur Debezium konfigurat.

Ħidma fuq l-ewwel żewġ punti, i.e. il-proċess ta 'installazzjoni ta' DBMS u Apache Kafka huma lil hinn mill-ambitu tal-artikolu. Madankollu, għal dawk li jridu jużaw kollox f'sandbox, hemm waħda lesta fir-repożitorju uffiċjali b'eżempji docker-compose.yaml.

Se niffukaw fuq l-aħħar żewġ punti f'aktar dettall.

0. Kafka Qabbad

Hawnhekk u aktar tard fl-artiklu, l-eżempji ta 'konfigurazzjoni kollha huma kkunsidrati fil-kuntest tal-immaġni Docker mqassma mill-iżviluppaturi Debezium. Fiha l-fajls kollha tal-plugin (konnetturi) meħtieġa u tipprovdi konfigurazzjoni Kafka Connect billi tuża varjabbli ambjentali.

Jekk għandek il-ħsieb li tuża Kafka Connect minn Confluent, ikollok bżonn iżżid il-plugins tal-konnetturi meħtieġa lilek innifsek fid-direttorju speċifikat f' plugin.path jew issettjat permezz ta' varjabbli ambjentali CLASSPATH. Is-settings għall-ħaddiem Kafka Connect u l-konnetturi huma definiti permezz ta 'fajls ta' konfigurazzjoni li huma mgħoddija bħala argumenti lill-kmand tal-bidu tal-ħaddiem. Għad-dettalji ara dokumentazzjoni.

Il-proċess kollu tat-twaqqif ta' Debeizum fil-verżjoni tal-konnettur jitwettaq f'żewġ stadji. Ejja nikkunsidraw kull wieħed minnhom:

1. It-twaqqif tal-qafas Kafka Connect

Biex tixxandar dejta lil cluster Apache Kafka, parametri speċifiċi huma stabbiliti fil-qafas Kafka Connect, bħal:

  • settings tal-konnessjoni tal-cluster,
  • ismijiet ta 'suġġetti li fihom il-konfigurazzjoni tal-konnettur innifsu se tkun maħżuna,
  • l-isem tal-grupp li fih ikun qed jaħdem il-konnettur (f'każ li tuża l-mod distribwit).

L-immaġni Docker uffiċjali tal-proġett tappoġġja l-konfigurazzjoni bl-użu ta 'varjabbli ambjentali - dan huwa dak li se nużaw. Mela ejja niżżlu l-immaġni:

docker pull debezium/connect

Is-sett minimu ta' varjabbli ambjentali meħtieġa biex jitħaddem il-konnettur huwa kif ġej:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lista inizjali tas-servers tal-clusters Kafka biex tinkiseb lista kompleta tal-membri tal-clusters;
  • OFFSET_STORAGE_TOPIC=connector-offsets — suġġett għall-ħażna ta' pożizzjonijiet fejn jinsab il-konnettur bħalissa;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - suġġett għall-ħażna tal-istatus tal-konnettur u l-kompiti tiegħu;
  • CONFIG_STORAGE_TOPIC=connector-config - suġġett għall-ħażna tad-dejta tal-konfigurazzjoni tal-konnettur u l-kompiti tagħha;
  • GROUP_ID=1 — identifikatur tal-grupp ta' ħaddiema li fuqu jista' jitwettaq il-kompitu tal-konnettur; meħtieġa meta tuża mqassma (imqassam) reġim.

Nibdew il-kontenitur b'dawn il-varjabbli:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Nota dwar Avro

B'mod awtomatiku, Debezium jikteb dejta f'format JSON, li huwa aċċettabbli għal sandboxes u ammonti żgħar ta 'dejta, iżda jista' jkun problema f'databases mgħobbija ħafna. Alternattiva għall-konvertitur JSON hija li serialize messaġġi bl-użu Avro għal format binarju, li jnaqqas it-tagħbija fuq is-subsistema I / O f'Apache Kafka.

Biex tuża Avro, trid tiskjera separata skema-reġistru (għall-ħażna ta' skemi). Il-varjabbli għall-konvertitur se jidhru bħal dan:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Dettalji dwar l-użu ta 'Avro u t-twaqqif ta' reġistru għalih huma lil hinn mill-ambitu tal-artikolu - barra minn hekk, għaċ-ċarezza, se nużaw JSON.

2. Twaqqif tal-konnettur innifsu

Issa tista 'tmur direttament għall-konfigurazzjoni tal-konnettur innifsu, li se jaqra data mis-sors.

Ejja nħarsu lejn l-eżempju ta 'konnetturi għal żewġ DBMS: PostgreSQL u MongoDB, li għalihom għandi esperjenza u li għalihom hemm differenzi (għalkemm żgħar, iżda f'xi każijiet sinifikanti!).

Il-konfigurazzjoni hija deskritta f'notazzjoni JSON u tittella' fuq Kafka Connect permezz ta' talba POST.

2.1. PostgreSQL

Eżempju ta' konfigurazzjoni tal-konnettur għal PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Il-prinċipju tat-tħaddim tal-konnettur wara din il-konfigurazzjoni huwa pjuttost sempliċi:

  • Fl-ewwel bidu, tikkonnettja mad-database speċifikata fil-konfigurazzjoni u tibda fil-mod ritratt inizjali, tibgħat lil Kafka s-sett inizjali ta' dejta riċevuti bil-kondizzjonali SELECT * FROM table_name.
  • Wara li titlesta l-inizjalizzazzjoni, il-konnettur jidħol fil-mod ta 'qari tal-bidliet mill-fajls PostgreSQL WAL.

Dwar l-għażliet użati:

  • name — l-isem tal-konnettur li għalih tintuża l-konfigurazzjoni deskritta hawn taħt; fil-futur, dan l-isem jintuża biex jaħdem mal-konnettur (jiġifieri tara l-istatus / terġa 'tibda / taġġorna l-konfigurazzjoni) permezz tal-Kafka Connect REST API;
  • connector.class — il-klassi tal-konnettur DBMS li se tintuża mill-konnettur konfigurat;
  • plugin.name huwa l-isem tal-plugin għad-dekodifikazzjoni loġika tad-dejta mill-fajls WAL. Disponibbli minn fejn tagħżel wal2json, decoderbuffs и pgoutput. L-ewwel tnejn jeħtieġu l-installazzjoni tal-estensjonijiet xierqa fid-DBMS, u pgoutput għal PostgreSQL verżjoni 10 u ogħla ma teħtieġx manipulazzjonijiet addizzjonali;
  • database.* — għażliet għall-konnessjoni mad-database, fejn database.server.name - l-isem tal-istanza PostgreSQL użata biex tifforma l-isem tas-suġġett fil-grupp Kafka;
  • table.include.list - lista ta' tabelli li fihom irridu nsegwu l-bidliet; mogħtija fil-format schema.table_name; ma jistax jintuża flimkien ma table.exclude.list;
  • heartbeat.interval.ms — intervall (f'millisekondi) li bih il-konnettur jibgħat messaġġi tat-taħbit tal-qalb għal suġġett speċjali;
  • heartbeat.action.query - talba li se titwettaq meta jintbagħat kull messaġġ ta' taħbit tal-qalb (l-għażla dehret mill-verżjoni 1.1);
  • slot.name — l-isem tas-slot ta' replikazzjoni li se jintuża mill-konnettur;
  • publication.name - Isem Pubblikazzjoni f'PostgreSQL li juża l-konnettur. F’każ li ma teżistix, Debezium jipprova joħloqha. Jekk l-utent li taħtu ssir il-konnessjoni ma jkollux biżżejjed drittijiet għal din l-azzjoni, il-konnettur joħroġ bi żball;
  • transforms jiddetermina kif eżattament jibdel l-isem tas-suġġett fil-mira:
    • transforms.AddPrefix.type jindika li se nużaw espressjonijiet regolari;
    • transforms.AddPrefix.regex — maskra li biha jiġi definit mill-ġdid l-isem tas-suġġett fil-mira;
    • transforms.AddPrefix.replacement - direttament dak li niddefinixxu mill-ġdid.

Aktar dwar it-taħbit tal-qalb u t-trasformazzjonijiet

B'mod awtomatiku, il-konnettur jibgħat dejta lil Kafka għal kull tranżazzjoni impenjata, u jikteb l-LSN tiegħu (Numru tas-Sekwenza tal-Log) fis-suġġett tas-servizz. offset. Imma x'jiġri jekk il-konnettur ikun ikkonfigurat biex jaqra mhux id-database kollha, iżda biss parti mit-tabelli tiegħu (li fihom id-dejta tiġi aġġornata b'mod mhux frekwenti)?

  • Il-konnettur se jaqra l-fajls WAL u ma jiskoprix it-tranżazzjonijiet jikkommetti fihom mat-tabelli li jimmonitorja.
  • Għalhekk, mhux se jaġġorna l-pożizzjoni attwali tiegħu la fis-suġġett u lanqas fl-islott tar-replikazzjoni.
  • Dan, min-naħa tiegħu, jikkawża li l-fajls WAL ikunu "mwaħħlin" fuq id-disk u x'aktarx jispiċċaw l-ispazju tad-diska.

U hawn l-għażliet jiġu salvati. heartbeat.interval.ms и heartbeat.action.query. L-użu ta' dawn l-għażliet f'pari jagħmilha possibbli li titwettaq talba biex tinbidel id-dejta f'tabella separata kull darba li jintbagħat messaġġ ta' taħbit tal-qalb. Għalhekk, l-LSN li fuqu jinsab il-konnettur bħalissa (fis-slot ta 'replikazzjoni) huwa aġġornat b'mod kostanti. Dan jippermetti lid-DBMS ineħħi fajls WAL li m'għadhomx meħtieġa. Għal aktar informazzjoni dwar kif jaħdmu l-għażliet, ara dokumentazzjoni.

Għażla oħra li jistħoqqilha attenzjoni aktar mill-qrib hija transforms. Għalkemm huwa aktar dwar il-konvenjenza u s-sbuħija ...

B'mod awtomatiku, Debezium joħloq suġġetti billi juża l-politika ta' ismijiet li ġejja: serverName.schemaName.tableName. Dan jista 'mhux dejjem ikun konvenjenti. Għażliet transforms billi tuża espressjonijiet regolari, tista 'tiddefinixxi lista ta' tabelli li l-avvenimenti tagħhom jeħtieġ li jiġu mgħoddija għal suġġett b'isem speċifiku.

Fil-konfigurazzjoni tagħna grazzi għal transforms jiġri dan li ġej: l-avvenimenti kollha tas-CDC mid-database tracked se jmorru għas-suġġett bl-isem data.cdc.dbname. Inkella (mingħajr dawn is-settings), Debezium awtomatikament joħloq suġġett għal kull tabella tal-formola: pg-dev.public.<table_name>.

Limitazzjonijiet tal-konnettur

Fl-aħħar tad-deskrizzjoni tal-konfigurazzjoni tal-konnettur għal PostgreSQL, ta 'min jitkellem dwar il-karatteristiċi / limitazzjonijiet li ġejjin tax-xogħol tiegħu:

  1. Il-funzjonalità tal-konnettur għal PostgreSQL tiddependi fuq il-kunċett ta 'dekodifikazzjoni loġika. Għalhekk hu ma jsegwix it-talbiet biex tinbidel l-istruttura tad-database (DDL) - għalhekk, din id-dejta mhux se tkun fis-suġġetti.
  2. Peress li jintużaw slots ta 'replikazzjoni, il-konnessjoni tal-konnettur hija possibbli biss lill-istanza prinċipali tad-DBMS.
  3. Jekk l-utent li taħtu l-konnettur jgħaqqad mad-database għandu drittijiet ta 'qari biss, imbagħad qabel l-ewwel tnedija, ser ikollok bżonn toħloq manwalment slot ta' replikazzjoni u tippubblika fid-database.

Applikazzjoni ta' Konfigurazzjoni

Mela ejja tagħbija l-konfigurazzjoni tagħna fil-konnettur:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Aħna niċċekkjaw li t-tniżżil kien suċċess u l-konnettur beda:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Kbir: huwa stabbilit u lest biex imur. Issa ejja nippretendu li nkunu konsumatur u nqabbdu ma' Kafka, u wara nżidu u nibdlu entrata fit-tabella:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Fis-suġġett tagħna, dan se jintwera kif ġej:

JSON twil ħafna bil-bidliet tagħna

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Fiż-żewġ każijiet, ir-rekords jikkonsistu fiċ-ċavetta (PK) tar-rekord li nbidel, u l-essenza stess tal-bidliet: x'kien ir-rekord qabel u x'sar wara.

  • Fil-każ ta ' INSERT: valur qabel (before) ugwali nullsegwit mis-sekwenza li ddaħħlet.
  • Fil-każ ta ' UPDATE: fi payload.before jintwera l-istat preċedenti tar-ringiela, u ġewwa payload.after - ġdid bl-essenza tal-bidla.

2.2 MongoDB

Dan il-konnettur juża l-mekkaniżmu standard ta 'replikazzjoni MongoDB, jaqra informazzjoni mill-oplog tan-nodu primarju DBMS.

Bl-istess mod għall-konnettur diġà deskritt għal PgSQL, hawnhekk ukoll, fl-ewwel bidu, tittieħed l-istampa tad-dejta primarja, u wara l-konnettur jaqleb għall-mod ta 'qari oplog.

Eżempju ta' konfigurazzjoni:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kif tistgħu taraw, m'hemm l-ebda għażliet ġodda meta mqabbla mal-eżempju preċedenti, iżda n-numru ta 'għażliet responsabbli għall-konnessjoni mad-database u l-prefissi tagħhom biss tnaqqsu.

Settings transforms din id-darba jagħmlu dan li ġej: dawwar l-isem tas-suġġett fil-mira mill-iskema <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolleranza għall-ħsarat

Il-kwistjoni tat-tolleranza tal-ħsarat u d-disponibbiltà għolja fi żmienna hija aktar akuta minn qatt qabel - speċjalment meta nitkellmu dwar id-dejta u t-tranżazzjonijiet, u t-traċċar tal-bidliet tad-dejta mhuwiex fuq il-ġenb f'din il-kwistjoni. Ejja nħarsu lejn x'jista' jmur ħażin fil-prinċipju u x'se jiġri minn Debezium f'kull każ.

Hemm tliet għażliet ta’ esklużjoni:

  1. Kafka Connect falliment. Jekk Connect hija kkonfigurata biex taħdem fil-mod distribwit, dan jeħtieġ ħaddiema multipli biex jistabbilixxu l-istess group.id. Imbagħad, jekk wieħed minnhom ifalli, il-konnettur jerġa 'jinbeda fuq il-ħaddiem l-ieħor u jkompli jaqra mill-aħħar pożizzjoni impenjata fis-suġġett f'Kafka.
  2. Telf ta' konnettività mal-cluster Kafka. Il-konnettur sempliċement se jieqaf jaqra fil-pożizzjoni li naqas li jibgħat lil Kafka u perjodikament jipprova jerġa 'jibgħatha sakemm l-attentat jirnexxi.
  3. Sors tad-dejta mhux disponibbli. Il-konnettur se jipprova jerġa 'jikkonnettja mas-sors skond il-konfigurazzjoni. Id-default huwa 16-il tentattiv bl-użu backoff esponenzjali. Wara s-16-il tentattiv fallut, il-kompitu jiġi mmarkat bħala naqset u jeħtieġ li jerġa' jinbeda manwalment permezz tal-interface Kafka Connect REST.
    • Fil-każ ta ' PostgreSQL data mhux se tintilef, għaliex bl-użu ta 'slots ta' replikazzjoni se jipprevjeni t-tħassir ta 'fajls WAL li ma jinqrawx mill-konnettur. F'dan il-każ, hemm żvantaġġ: jekk il-konnettività tan-netwerk bejn il-konnettur u d-DBMS tkun imfixkla għal żmien twil, hemm ċans li l-ispazju tad-diska jispiċċa, u dan jista 'jwassal għall-falliment tad-DBMS kollu.
    • Fil-każ ta ' MySQL binlog files jistgħu jiġu mdawra mid-DBMS innifsu qabel ma tiġi restawrata l-konnettività. Dan jikkawża li l-konnettur jidħol fl-istat fallut, u jeħtieġ li jerġa 'jibda fil-modalità snapshot inizjali biex ikompli jaqra minn binlogs biex jerġa' jopera normali.
    • fuq MongoDB. Id-dokumentazzjoni tgħid: l-imġieba tal-konnettur f'każ li l-fajls log/oplog ikunu tħassru u l-konnettur ma jistax ikompli jaqra mill-pożizzjoni fejn ħalla huwa l-istess għad-DBMS kollha. Hija tinsab fil-fatt li l-konnettur se jidħol fl-istat naqset u se jeħtieġu bidu mill-ġdid fil-mod ritratt inizjali.

      Madankollu, hemm eċċezzjonijiet. Jekk il-konnettur kien fi stat skonnettjat għal żmien twil (jew ma setax jilħaq l-istanza MongoDB), u oplog kien imdawwar matul dan iż-żmien, allura meta l-konnessjoni tiġi restawrata, il-konnettur bil-kalma jkompli jaqra d-dejta mill-ewwel pożizzjoni disponibbli , u huwa għalhekk li xi dejta f'Kafka ebda se tolqot.

Konklużjoni

Debezium hija l-ewwel esperjenza tiegħi mas-sistemi CDC u b'mod ġenerali kienet pożittiva ħafna. Il-proġett bribed l-appoġġ tad-DBMS prinċipali, faċilità ta 'konfigurazzjoni, appoġġ għall-clustering u komunità attiva. Għal dawk interessati fil-prattika, nirrakkomanda li taqra l-gwidi għal Kafka Qabbad и Debezium.

Meta mqabbel mal-konnettur JDBC għal Kafka Connect, il-vantaġġ ewlieni ta 'Debezium huwa li l-bidliet jinqraw mir-zkuk tad-DBMS, li jippermetti li d-data tiġi riċevuta b'dewmien minimu. Il-Konnettur JDBC (provdut minn Kafka Connect) jistaqsi t-tabella rintraċċata f'intervall fiss u (għall-istess raġuni) ma jiġġenerax messaġġi meta titħassar id-dejta (kif tista' titlob dejta li ma teżistix?).

Biex issolvi problemi simili, tista 'tagħti attenzjoni għas-soluzzjonijiet li ġejjin (minbarra Debezium):

PS

Aqra wkoll fuq il-blog tagħna:

Sors: www.habr.com

Żid kumment