Prezante Debezium - CDC pou Apache Kafka

Prezante Debezium - CDC pou Apache Kafka

Nan travay mwen, mwen souvan rankontre nouvo solisyon teknik / pwodwi lojisyèl, enfòmasyon sou ki se pito ra sou entènèt la Ris-lang. Avèk atik sa a mwen pral eseye ranpli yon espas sa yo ak yon egzanp nan pratik mwen resan, lè mwen te bezwen konfigirasyon voye evènman CDC soti nan de DBMS popilè (PostgreSQL ak MongoDB) nan yon gwoup Kafka lè l sèvi avèk Debezium. Mwen espere ke atik revizyon sa a, ki parèt kòm rezilta nan travay la fè, pral itil lòt moun.

Ki sa Debezium ak CDC an jeneral?

Debezium — reprezantan kategori lojisyèl CDC (Capture Done Chanjman), oswa plis jisteman, li se yon seri konektè pou divès kalite DBMS konpatib ak kad Apache Kafka Connect la.

Li Pwojè Open Source, ki gen lisans anba Apache License v2.0 ak patwone pa Red Hat. Devlopman ap kontinye depi 2016 e kounye a li bay sipò ofisyèl pou DBMS sa yo: MySQL, PostgreSQL, MongoDB, SQL Server. Genyen tou konektè pou Cassandra ak Oracle, men nan moman sa a yo nan estati "aksè bonè", ak nouvo degaje pa garanti konpatibilite bak.

Si nou konpare CDC ak apwòch tradisyonèl la (lè aplikasyon an li done ki soti nan DBMS dirèkteman), avantaj prensipal li yo enkli aplikasyon chanjman done difizyon nan nivo ranje ak latansi ki ba, segondè fyab ak disponiblite. De dènye pwen yo reyalize lè w itilize yon gwoup Kafka kòm yon depo pou evènman CDC.

Yon lòt avantaj se lefèt ke yo itilize yon modèl sèl pou estoke evènman yo, kidonk aplikasyon an fen pa bezwen enkyete sou nuans yo nan opere diferan DBMS.

Finalman, lè l sèvi avèk yon koutye mesaj pèmèt aplikasyon ki kontwole chanjman nan done yo echèl orizontal. An menm tan an, enpak la sou sous done yo minimize, depi done yo jwenn pa dirèkteman nan DBMS la, men nan gwoup la Kafka.

Konsènan achitekti Debezium la

Sèvi ak Debezium vini nan konplo senp sa a:

DBMS (kòm yon sous done) → konektè nan Kafka Connect → Apache Kafka → konsomatè

Kòm yon ilistrasyon, isit la se yon dyagram nan sit entènèt pwojè a:

Prezante Debezium - CDC pou Apache Kafka

Sepandan, mwen pa reyèlman renmen konplo sa a, paske li sanble ke se sèlman itilize nan yon konektè koule ki posib.

An reyalite, sitiyasyon an diferan: ranpli Data Lake ou a (dènye lyen nan dyagram ki anwo a) se pa sèl fason pou itilize Debezium. Evènman voye bay Apache Kafka ka itilize pa aplikasyon w yo pou fè fas ak divès sitiyasyon. Pa egzanp:

  • retire done ki pa enpòtan nan kachèt la;
  • voye notifikasyon;
  • rechèch mizajou endèks;
  • kèk kalite mòso bwa odit;
  • ...

Nan ka ou gen yon aplikasyon Java epi pa gen okenn bezwen / posiblite pou itilize yon gwoup Kafka, gen tou posiblite pou travay nan entegre-konektè. Avantaj evidan an se ke li elimine bezwen an pou enfrastrikti adisyonèl (nan fòm lan nan yon konektè ak Kafka). Sepandan, solisyon sa a te depreke depi vèsyon 1.1 epi li pa rekòmande pou itilize ankò (sipò pou li ka retire nan degaje fiti).

Atik sa a pral diskite sou achitekti rekòmande pa devlopè yo, ki bay tolerans fay ak évolutivité.

Konfigirasyon Connector

Yo nan lòd yo kòmanse swiv chanjman nan valè ki pi enpòtan an - done - nou bezwen:

  1. sous done, ki ka MySQL kòmanse nan vèsyon 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lis konplè);
  2. Apache Kafka gwoup;
  3. Kafka Connect egzanp (vèsyon 1.x, 2.x);
  4. konfigirasyon Debezium Connector.

Travay sou de premye pwen yo, i.e. Pwosesis enstalasyon DBMS ak Apache Kafka depase sijè ki abòde lan atik la. Sepandan, pou moun ki vle deplwaye tout bagay nan yon bwat sab, gen yon sèl pare-fè nan repozitwa ofisyèl la ak egzanp. docker-compose.yaml.

Nou pral konsantre sou de dènye pwen yo an plis detay.

0. Kafka Konekte

Isit la ak pi lwen nan atik la, tout egzanp konfigirasyon yo diskite nan kontèks la nan imaj la Docker distribye pa devlopè Debezium yo. Li gen tout fichye Plugin ki nesesè yo (konektè) epi li bay konfigirasyon Kafka Connect lè l sèvi avèk varyab anviwònman an.

Si ou gen entansyon sèvi ak Kafka Connect soti nan Confluent, w ap bezwen ajoute grefon yo nan konektè ki nesesè yo nan anyè ki espesifye nan. plugin.path oswa mete atravè yon varyab anviwònman CLASSPATH. Anviwònman pou travayè Kafka Connect ak konektè yo detèmine atravè fichye konfigirasyon ke yo pase kòm agiman bay lòd lanse travayè a. Pou plis detay, gade dokiman.

Tout pwosesis la nan mete kanpe Debeizum nan vèsyon an Connector se te pote soti nan de etap. Ann gade nan chak nan yo:

1. Mete kanpe fondasyon Kafka Connect la

Pou difize done nan gwoup Apache Kafka, paramèt espesifik yo mete nan kad Kafka Connect, tankou:

  • paramèt pou konekte ak gwoup la,
  • non sijè kote yo pral estoke konfigirasyon konektè nan tèt li dirèkteman,
  • non gwoup la nan ki konektè a ap kouri (si yo itilize mòd distribiye).

Imaj ofisyèl Docker pwojè a sipòte konfigirasyon an lè l sèvi avèk varyab anviwònman - sa a se sa nou pral itilize. Se konsa, telechaje imaj la:

docker pull debezium/connect

Seri minimòm varyab anviwònman ki nesesè pou kouri konektè a se jan sa a:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — premye lis sèvè gwoup Kafka pou jwenn yon lis konplè manm gwoup;
  • OFFSET_STORAGE_TOPIC=connector-offsets — yon sijè pou estoke pozisyon kote konektè a ye kounye a;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - sijè pou estoke estati konektè a ak travay li yo;
  • CONFIG_STORAGE_TOPIC=connector-config — sijè pou estoke done konfigirasyon konektè ak travay li yo;
  • GROUP_ID=1 — idantifyan gwoup travayè yo ki ka egzekite travay konektè a; obligatwa lè w ap itilize distribye (distribiye) rejim.

Nou lanse veso a ak varyab sa yo:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Remak sou Avro

Pa default, Debezium ekri done nan fòma JSON, ki akseptab pou sandboxes ak ti kantite done, men li ka vin yon pwoblèm nan baz done trè chaje. Yon altènativ a yon konvètisè JSON se seri mesaj lè l sèvi avèk Avro nan yon fòma binè, ki diminye chaj la sou subsistèm I/O nan Apache Kafka.

Pou itilize Avro ou bezwen deplwaye yon lòt chema-rejis (pou estoke dyagram). Varyab yo pou konvètisè a pral sanble sa a:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Detay sou lè l sèvi avèk Avro ak konfigirasyon rejis la pou li pi lwen pase sijè ki abòde lan atik sa a - pi lwen, pou klè, nou pral itilize JSON.

2. Konfigirasyon konektè nan tèt li

Koulye a, ou ka ale dirèkteman nan konfigirasyon an nan konektè nan tèt li, ki pral li done ki soti nan sous la.

Ann gade egzanp konektè pou de DBMS: PostgreSQL ak MongoDB, nan ki mwen gen eksperyans ak nan ki gen diferans (kwake ti, men nan kèk ka enpòtan!).

Konfigirasyon an dekri nan notasyon JSON epi yo telechaje sou Kafka Connect lè l sèvi avèk yon demann POST.

2.1. PostgreSQL

Egzanp konfigirasyon konektè pou PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Prensip la nan operasyon nan konektè a apre konfigirasyon sa a se byen senp:

  • Nan premye kòmansman an, li konekte ak baz done ki espesifye nan konfigirasyon an epi li kòmanse nan mòd la premye snapshot, voye bay Kafka premye seri done yo te resevwa ak kondisyonèl la SELECT * FROM table_name.
  • Apre inisyalizasyon fini, konektè a antre nan mòd pou li chanjman ki soti nan dosye PostgreSQL WAL.

Konsènan opsyon yo itilize:

  • name — non konektè a pou ki konfigirasyon ki dekri anba a yo itilize; nan lavni an, non sa a yo itilize pou travay ak konektè a (sa vle di, wè estati a / rekòmanse / aktyalizasyon konfigirasyon an) atravè Kafka Connect REST API a;
  • connector.class — Klas konektè DBMS ki pral itilize pa konektè konfigirasyon an;
  • plugin.name se non Plugin la pou dekodaj lojik done ki sòti nan dosye WAL. Disponib pou chwazi nan wal2json, decoderbuffs и pgoutput. De premye yo mande pou enstalasyon ekstansyon apwopriye yo nan DBMS la, ak pgoutput pou PostgreSQL vèsyon 10 ak pi wo a pa mande pou manipilasyon adisyonèl;
  • database.* — opsyon pou konekte ak baz done a, kote database.server.name — Non egzanp PostgreSQL yo itilize pou fòme non sijè a nan gwoup Kafka a;
  • table.include.list — yon lis tab kote nou vle swiv chanjman yo; espesifye nan fòma a schema.table_name; pa ka itilize ansanm ak table.exclude.list;
  • heartbeat.interval.ms — entèval (an milisgond) ak ki konektè a voye mesaj batman kè nan yon sijè espesyal;
  • heartbeat.action.query — yon demann ki pral egzekite lè w ap voye chak mesaj batman kè (opsyon ki parèt nan vèsyon 1.1);
  • slot.name — non an nan plas la replikasyon ki pral itilize pa konektè a;
  • publication.name - Non Piblikasyon nan PostgreSQL, ki konektè a itilize. Si li pa egziste, Debezium ap eseye kreye li. Si itilizatè a ki fè koneksyon an pa gen ase dwa pou aksyon sa a, konektè a pral sispann ak yon erè;
  • transforms detèmine egzakteman ki jan yo chanje non an nan sijè a sib:
    • transforms.AddPrefix.type endike ke nou pral itilize ekspresyon regilye;
    • transforms.AddPrefix.regex — yon mask ki redefini non sijè sib la;
    • transforms.AddPrefix.replacement - dirèkteman sa n ap redéfinir.

Plis sou batman kè ak transfòmasyon

Pa default, konektè a voye done bay Kafka pou chak tranzaksyon komèt, epi LSN li (Nimewo Sekans Log) anrejistre nan sijè sèvis la. offset. Men, sa k ap pase si konektè a configuré pou li pa tout baz done a, men sèlman yon pati nan tab li yo (nan ki mizajou done yo pa rive souvan)?

  • Konektè a pral li dosye WAL epi li pa pral detekte okenn tranzaksyon komèt nan tablo li ap kontwole yo.
  • Se poutèt sa, li pa pral mete ajou pozisyon aktyèl li swa nan sijè a oswa nan plas la replikasyon.
  • Sa a, nan vire, pral lakòz dosye yo WAL yo dwe "kole" sou disk epi yo pral gen anpil chans kouri soti nan espas ki gen kapasite.

Lè sa a se kote opsyon yo vin sekou. heartbeat.interval.ms и heartbeat.action.query. Sèvi ak opsyon sa yo an pè fè li posib pou egzekite yon demann pou chanje done nan yon tablo separe chak fwa yo voye yon mesaj batman kè. Kidonk, LSN ki konektè a ye kounye a (nan plas replikasyon an) toujou ap mete ajou. Sa a pèmèt DBMS yo retire dosye WAL ki pa nesesè ankò. Pou plis enfòmasyon sou fason opsyon travay, gade dokiman.

Yon lòt opsyon ki merite atansyon pi pre se transforms. Malgre ke li se plis sou konvenyans ak bote ...

Pa default, Debezium kreye sijè lè l sèvi avèk politik non sa yo: serverName.schemaName.tableName. Sa a ka pa toujou pratik. Opsyon transforms Ou ka itilize ekspresyon regilye yo defini yon lis tab, evènman ki soti nan ki bezwen yo dwe dirije nan yon sijè ki gen yon non espesifik.

Nan konfigirasyon nou an mèsi transforms bagay sa yo rive: tout evènman CDC ki soti nan baz done kontwole a pral ale nan yon sijè ki gen non an data.cdc.dbname. Sinon (san paramèt sa yo), Debezium ta pa default kreye yon sijè pou chak tab tankou: pg-dev.public.<table_name>.

Limitasyon Connector

Pou konkli deskripsyon konfigirasyon konektè a pou PostgreSQL, li vo pale sou karakteristik sa yo / limit nan operasyon li yo:

  1. Fonksyonalite konektè a pou PostgreSQL depann sou konsèp dekodaj ki lojik. Se poutèt sa li pa swiv demann pou chanje estrikti baz done a (DDL) - kòmsadwa, done sa yo pa pral nan sijè yo.
  2. Depi fant replikasyon yo itilize, konekte yon konektè posib sèlman nan egzanp dirijan DBMS la.
  3. Si itilizatè a anba ki moun ki konektè a konekte nan baz done a gen dwa pou lekti sèlman, Lè sa a, anvan premye lansman an, ou pral bezwen manyèlman kreye yon plas replikasyon ak pibliye nan baz done a.

Aplike konfigirasyon an

Se konsa, ann chaje konfigirasyon nou an nan konektè a:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Nou tcheke si download la te reyisi ak konektè a te kòmanse:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Gwo: li mete kanpe e li pare pou ale. Koulye a, ann pretann yo se yon konsomatè epi konekte ak Kafka, apre sa nou pral ajoute ak chanje yon antre nan tablo a:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Nan sijè nou an, li pral parèt jan sa a:

Trè long JSON ak chanjman nou yo

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Nan de ka yo, dosye yo konpoze de kle (PK) dosye a ki te chanje a, ak sans nan chanjman yo: sa dosye a te anvan ak sa li te vin apre.

  • Nan ka a INSERT: valè anvan (before) egal null, epi apre - liy lan ki te antre.
  • Nan ka a UPDATE: nan payload.before eta a anvan liy lan parèt, ak nan payload.after - nouvo ak sans nan chanjman.

2.2 MongoDB

Konektè sa a sèvi ak mekanis replikasyon MongoDB estanda, li enfòmasyon ki soti nan oplog prensipal ne DBMS la.

Menm jan ak konektè a deja dekri pou PgSQL, isit la tou, nan premye kòmansman an, yo pran snapshot done prensipal la, apre sa konektè a chanje nan mòd lekti oplog.

Egzanp konfigirasyon:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Kòm ou ka wè, pa gen okenn nouvo opsyon isit la konpare ak egzanp anvan an, men se sèlman kantite opsyon ki responsab pou konekte ak baz done a ak prefiks yo te redwi.

Anviwònman transforms fwa sa a yo fè bagay sa yo: yo transfòme non sijè a sib soti nan chema a <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

tolerans fòt

Pwoblèm tolerans fòt ak disponiblite segondè nan epòk nou an pi grav pase tout tan - sitou lè n ap pale de done ak tranzaksyon yo, epi swiv chanjman done yo pa kanpe sou kote nan pwoblèm sa a. Ann gade sa ki ka ale mal nan prensip ak sa ki pral rive Debezium nan chak ka.

Gen twa opsyon pou retire li:

  1. Kafka Connect echèk. Si Connect configuré pou travay nan mòd distribiye, sa mande plizyè travayè pou mete menm group.id. Lè sa a, si youn nan yo echwe, konektè a pral rekòmanse sou yon lòt travayè epi kontinye li nan dènye pozisyon ki te angaje nan sijè a nan Kafka.
  2. Pèt koneksyon ak gwoup Kafka. Konektè a pral tou senpleman sispann lekti nan pozisyon an ki echwe pou pou voye bay Kafka, epi li pral detanzantan eseye renvoyer li jiskaske tantativ la reyisi.
  3. Sous done pa disponib. Konektè a pral eseye rekonekte ak sous la jan konfigirasyon. Defo a se 16 tantativ lè l sèvi avèk bak eksponansyèl. Apre 16yèm tantativ la san siksè, travay la pral make kòm echwe epi w ap bezwen rekòmanse li manyèlman atravè koòdone Kafka Connect REST la.
    • Nan ka a Postgrèskl done yo pa pral pèdi, paske Sèvi ak fant replikasyon yo ap anpeche w efase fichye WAL ki konektè a pa li. Nan ka sa a, gen tou yon dezavantaj nan pyès monnen an: si koneksyon rezo a ant konektè a ak DBMS deranje pou yon tan long, gen yon posibilite ke espas ki la ki gen kapasite pral fini, e sa ka mennen nan yon echèk nan. DBMS an antye.
    • Nan ka a Miskl dosye binlog yo ka vire toutotou pa DBMS nan tèt li anvan koneksyon yo retabli. Sa a pral lakòz konektè a ale nan eta a echwe, epi retabli operasyon nòmal, w ap bezwen rekòmanse nan mòd snapshot inisyal kontinye li nan binlogs.
    • sou MongoDB. Dokimantasyon an di: konpòtman an nan konektè a nan evènman an ki dosye log/oplog yo te efase epi konektè a pa ka kontinye li nan pozisyon kote li te kite a se menm bagay la pou tout DBMSs. Sa vle di ke konektè a pral antre nan eta a echwe epi yo pral mande pou rekòmanse nan mòd premye snapshot.

      Sepandan, gen eksepsyon. Si konektè a te dekonekte pou yon tan long (oswa pa t 'kapab rive jwenn egzanp MongoDB la), ak oplog la te ale nan wotasyon pandan tan sa a, Lè sa a, lè koneksyon an retabli, konektè a ap kontinye ak kalm li done ki soti nan premye pozisyon ki disponib la, ki se poukisa kèk nan done yo nan Kafka pa gen okenn pral frape.

Konklizyon

Debezium se premye eksperyans mwen ak sistèm CDC e an jeneral trè pozitif. Pwojè a te genyen sipò li pou gwo DBMS yo, fasilite nan konfigirasyon, sipò gwoupman, ak kominote aktif. Pou moun ki enterese nan pratik, mwen rekòmande ke ou li gid yo pou Kafka Konekte и Debezium.

Konpare ak konektè JDBC pou Kafka Connect, avantaj prensipal Debezium se ke chanjman yo li nan mòso bwa DBMS yo, ki pèmèt done yo dwe resevwa ak yon latansi minimòm. JDBC Connector (ki soti nan Kafka Connect) mande tab la kontwole nan yon entèval fiks epi (pou menm rezon an) pa jenere mesaj lè done yo efase (ki jan ou ka rechèch done ki pa egziste?).

Pou rezoud pwoblèm menm jan an, ou ka peye atansyon sou solisyon sa yo (anplis Debezium):

PS

Li tou sou blog nou an:

Sous: www.habr.com

Add nouvo kòmantè