Kuyambitsa Debezium - CDC ya Apache Kafka

Kuyambitsa Debezium - CDC ya Apache Kafka

M'ntchito yanga, nthawi zambiri ndimapeza njira zatsopano zamakina / mapulogalamu, zomwe zimasoweka pa intaneti yolankhula Chirasha. Ndi nkhaniyi, ndiyesera kudzaza kusiyana kotereku ndi chitsanzo kuchokera m'zochita zanga zaposachedwa, pamene ndinafunika kukhazikitsa zochitika za CDC kuchokera ku DBMS ziwiri zodziwika (PostgreSQL ndi MongoDB) ku gulu la Kafka pogwiritsa ntchito Debezium. Ndikukhulupirira kuti nkhani yowunikirayi, yomwe idawoneka chifukwa cha ntchito yomwe yachitika, ikhala yothandiza kwa ena.

Kodi Debezium ndi CDC ndi chiyani?

Debezium - Woimira gulu la mapulogalamu a CDC (Jambulani kusintha kwa data), kapena ndendende, ndi seti yolumikizira ma DBMS osiyanasiyana omwe amagwirizana ndi dongosolo la Apache Kafka Connect.

izi Open source project, zololedwa pansi pa Apache License v2.0 ndipo mothandizidwa ndi Red Hat. Chitukuko chakhala chikuchitika kuyambira 2016 ndipo pakadali pano chimapereka chithandizo chovomerezeka ku DBMS zotsatirazi: MySQL, PostgreSQL, MongoDB, SQL Server. Palinso zolumikizira za Cassandra ndi Oracle, koma pakadali pano zili "m'malo ofikira", ndipo zatsopano sizikutsimikizira kuti zimagwirizana.

Ngati tifanizira CDC ndi njira yachikhalidwe (pamene ntchitoyo ikuwerengera deta kuchokera ku DBMS mwachindunji), ndiye kuti ubwino wake waukulu umaphatikizapo kukhazikitsidwa kwa kusintha kwa deta pamlingo wa mzere ndi latency yochepa, kudalirika kwakukulu ndi kupezeka. Mfundo ziwiri zomaliza zimapezedwa pogwiritsa ntchito gulu la Kafka ngati malo osungiramo zochitika za CDC.

Komanso, ubwino umaphatikizapo kuti chitsanzo chimodzi chimagwiritsidwa ntchito kusunga zochitika, kotero kuti ntchito yomaliza sayenera kudandaula za maonekedwe a DBMS osiyanasiyana.

Pomaliza, kugwiritsa ntchito meseji broker kumatsegula mwayi wokweza mapulogalamu omwe amatsata kusintha kwa data. Panthawi imodzimodziyo, zotsatira za gwero la deta zimachepetsedwa, popeza deta imalandiridwa osati mwachindunji kuchokera ku DBMS, koma kuchokera ku gulu la Kafka.

Za zomangamanga za Debezium

Kugwiritsa ntchito Debezium kumatsikira ku chiwembu chosavuta ichi:

DBMS (monga gwero la data) β†’ cholumikizira mu Kafka Connect β†’ Apache Kafka β†’ ogula

Monga fanizo, ndipereka chithunzi kuchokera patsamba la polojekiti:

Kuyambitsa Debezium - CDC ya Apache Kafka

Komabe, sindimakonda dongosololi, chifukwa zikuwoneka kuti cholumikizira chakuya chokha ndichotheka.

M'malo mwake, zinthu ndi zosiyana: kudzaza Data Lake (ulalo womaliza pachithunzi pamwambapa) si njira yokhayo yogwiritsira ntchito Debezium. Zochitika zotumizidwa ku Apache Kafka zitha kugwiritsidwa ntchito ndi mapulogalamu anu kuthana ndi zochitika zosiyanasiyana. Mwachitsanzo:

  • kuchotsa deta yosafunika kuchokera ku cache;
  • kutumiza zidziwitso;
  • zosintha zakusaka;
  • mtundu wina wa zolemba zowerengera;
  • ...

Ngati muli ndi pulogalamu ya Java ndipo palibe chifukwa / mwayi wogwiritsa ntchito gulu la Kafka, palinso mwayi woti mugwiritse ntchito. cholumikizira chophatikizidwa. Chodziwika bwino ndi chakuti mukhoza kukana zowonjezera zowonjezera (monga cholumikizira ndi Kafka). Komabe, yankho ili silinagwiritsidwe ntchito kuyambira mtundu 1.1 ndipo silikulimbikitsidwanso kuti ligwiritsidwe ntchito (litha kuchotsedwa m'mabuku amtsogolo).

Nkhaniyi ifotokoza za zomangamanga zomwe zimalimbikitsidwa ndi omanga, zomwe zimapereka kulolerana kwa zolakwika ndi scalability.

Kusintha kwa cholumikizira

Kuti tiyambe kutsatira zosintha pamtengo wofunikira kwambiri - data - timafunikira:

  1. gwero la data, lomwe lingakhale MySQL kuyambira mtundu 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (mndandanda wathunthu);
  2. Apache Kafka gulu
  3. Chitsanzo cha Kafka Connect (mitundu 1.x, 2.x);
  4. adakhazikitsa cholumikizira cha Debezium.

Gwirani ntchito pa mfundo ziwiri zoyamba, i.e. njira yoyika DBMS ndi Apache Kafka ndizoposa zomwe zalembedwa. Komabe, kwa iwo omwe akufuna kuyika chilichonse mubokosi la mchenga, pali chokonzekera m'malo ovomerezeka okhala ndi zitsanzo. docker-compose.yaml.

Tikambirana mfundo ziwiri zomaliza mwatsatanetsatane.

0. Kafka Connect

Pano ndi pambuyo pake m'nkhaniyi, zitsanzo zonse zosinthika zimaganiziridwa pazithunzi za chithunzi cha Docker chogawidwa ndi opanga Debezium. Ili ndi mafayilo onse ofunikira (zolumikizira) ndipo imapereka kasinthidwe ka Kafka Connect pogwiritsa ntchito zosintha zachilengedwe.

Ngati mukufuna kugwiritsa ntchito Kafka Connect kuchokera ku Confluent, muyenera kuwonjezera mapulagini a zolumikizira zofunika nokha ku chikwatu chomwe chafotokozedwa mu. plugin.path kapena kukhazikitsidwa kudzera pakusintha kwachilengedwe CLASSPATH. Zokonda za Kafka Connect wogwira ntchito ndi zolumikizira zimatanthauzidwa kudzera pamafayilo osintha omwe amaperekedwa ngati zotsutsana ndi lamulo loyambira wogwira ntchito. Zambiri onani zolemba.

Njira yonse yokhazikitsira Debeizum mumtundu wolumikizira imachitika m'magawo awiri. Tiyeni tilingalire chilichonse mwa izi:

1. Kukhazikitsa dongosolo la Kafka Connect

Kuti musunthire deta ku gulu la Apache Kafka, magawo ena amayikidwa mu dongosolo la Kafka Connect, monga:

  • makonda a kulumikizana kwa cluster,
  • mayina amitu momwe kasinthidwe ka cholumikizira chokha chidzasungidwa,
  • dzina la gulu lomwe cholumikizira chikuyenda (ngati mukugwiritsa ntchito njira yogawa).

Chithunzi chovomerezeka cha Docker cha polojekitiyi chimathandizira kasinthidwe pogwiritsa ntchito zosintha zachilengedwe - izi ndi zomwe tidzagwiritse ntchito. Ndiye tiyeni titsitse chithunzichi:

docker pull debezium/connect

Zosintha zochepa za chilengedwe zomwe zimafunikira kuyendetsa cholumikizira ndi motere:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - mndandanda woyamba wa maseva amagulu a Kafka kuti mupeze mndandanda wathunthu wamagulu amgulu;
  • OFFSET_STORAGE_TOPIC=connector-offsets - mutu wosunga malo pomwe cholumikizira chilipo;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - mutu wosunga mawonekedwe a cholumikizira ndi ntchito zake;
  • CONFIG_STORAGE_TOPIC=connector-config - mutu wosungira deta yolumikizira cholumikizira ndi ntchito zake;
  • GROUP_ID=1 - chizindikiritso cha gulu la ogwira ntchito omwe ntchito yolumikizira ingagwire ntchito; zofunikira pakugawa (zagawidwa) boma.

Timayamba chidebe ndi mitundu iyi:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Dziwani za Avro

Mwachikhazikitso, Debezium imalemba deta mumtundu wa JSON, womwe umavomerezeka ku mabokosi a mchenga ndi deta yaying'ono, koma ikhoza kukhala vuto m'mabuku odzaza kwambiri. Njira ina yosinthira JSON ndikusindikiza mauthenga pogwiritsa ntchito yuro ku mtundu wa binary, womwe umachepetsa katundu pa I / O subsystem ku Apache Kafka.

Kuti mugwiritse ntchito Avro, muyenera kutumiza padera schema-registry (zosungirako schemas). Zosintha za converter ziziwoneka motere:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Tsatanetsatane wogwiritsa ntchito Avro ndikukhazikitsa kaundula wa izo ndizoposa zomwe zalembedwapo - kupitilira apo, kuti zimveke bwino, tidzagwiritsa ntchito JSON.

2. Kukhazikitsa cholumikizira chokha

Tsopano mutha kupita mwachindunji ku kasinthidwe ka cholumikizira chokha, chomwe chidzawerenga deta kuchokera ku gwero.

Tiyeni tiwone chitsanzo cha zolumikizira za DBMS ziwiri: PostgreSQL ndi MongoDB, zomwe ndili nazo komanso zomwe pali zosiyana (ngakhale zazing'ono, koma nthawi zina zimakhala zofunikira!).

Kukonzekera kumafotokozedwa muzolemba za JSON ndikukwezedwa ku Kafka Connect pogwiritsa ntchito pempho la POST.

2.1. PostgreSQL

Chitsanzo cholumikizira cha PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Mfundo ya ntchito cholumikizira pambuyo kasinthidwe ndi yosavuta:

  • Pachiyambi choyamba, imagwirizanitsa ndi database yomwe yatchulidwa mu kasinthidwe ndikuyamba mumayendedwe chithunzithunzi choyambirira, kutumiza ku Kafka seti yoyamba ya data yomwe idalandiridwa ndi zovomerezeka SELECT * FROM table_name.
  • Kukhazikitsa kukamalizidwa, cholumikizira chimalowetsamo momwe mungawerengere zosintha kuchokera ku mafayilo a PostgreSQL WAL.

Za zomwe mungagwiritse ntchito:

  • name - dzina la cholumikizira chomwe makonzedwe omwe afotokozedwa pansipa akugwiritsidwa ntchito; m'tsogolomu, dzinali limagwiritsidwa ntchito pogwira ntchito ndi chojambulira (i.e. yang'anani chikhalidwe / kuyambitsanso / kusintha kasinthidwe) kudzera mu Kafka Connect REST API;
  • connector.class - kalasi yolumikizira ya DBMS yomwe idzagwiritsidwa ntchito ndi cholumikizira chokhazikika;
  • plugin.name ndi dzina la pulogalamu yowonjezera yosinthira deta kuchokera ku mafayilo a WAL. Lilipo kuti musankhe wal2json, decoderbuffs ΠΈ pgoutput. Zoyamba ziwiri zimafuna kukhazikitsa zowonjezera zoyenera mu DBMS, ndi pgoutput pa mtundu wa PostgreSQL 10 ndi apamwamba safuna kusintha zina;
  • database.* - zosankha zolumikizira ku database, komwe database.server.name - dzina la chitsanzo cha PostgreSQL chomwe chimagwiritsidwa ntchito kupanga dzina la mutuwo mgulu la Kafka;
  • table.include.list - mndandanda wa matebulo omwe tikufuna kutsatira zosintha; kuperekedwa mu mawonekedwe schema.table_name; sungagwiritsidwe ntchito limodzi ndi table.exclude.list;
  • heartbeat.interval.ms - nthawi (mu milliseconds) yomwe cholumikizira chimatumiza mauthenga okhudza mtima kumutu wapadera;
  • heartbeat.action.query - pempho lomwe lidzaperekedwa potumiza uthenga uliwonse wa kugunda kwamtima (kusankha kwawonekera kuyambira mtundu 1.1);
  • slot.name - dzina la malo obwereza omwe adzagwiritsidwa ntchito ndi cholumikizira;
  • publication.name - Dzina zofalitsa mu PostgreSQL yomwe cholumikizira chimagwiritsa ntchito. Ngati palibe, Debezium ayesa kupanga. Ngati wogwiritsa ntchito pomwe kulumikizanako akupangidwira alibe ufulu wokwanira pakuchita izi, cholumikizira chidzatuluka ndi cholakwika;
  • transforms imatsimikiza momwe mungasinthire ndendende dzina la mutu womwe mukufuna:
    • transforms.AddPrefix.type zimasonyeza kuti tidzagwiritsa ntchito mawu okhazikika;
    • transforms.AddPrefix.regex - chigoba chomwe dzina la mutu womwe mukufuna kumatanthauziranso;
    • transforms.AddPrefix.replacement - mwachindunji zomwe timafotokozeranso.

Zambiri za kugunda kwamtima ndikusintha

Mwachikhazikitso, cholumikizira chimatumiza deta ku Kafka pazochitika zilizonse, ndikulemba LSN yake (Log Sequence Number) kumutu wautumiki. offset. Koma chimachitika ndi chiyani ngati cholumikizira chakonzedwa kuti chiwerenge osati nkhokwe yonse, koma gawo limodzi la matebulo ake (omwe deta imasinthidwa pafupipafupi)?

  • Chojambuliracho chidzawerenga mafayilo a WAL ndipo osazindikira zomwe zikuchitika m'matebulo omwe amawayang'anira.
  • Chifukwa chake, sizisintha momwe zilili pano pamutu kapena pagawo lobwerezabwereza.
  • Izi, zidzapangitsa kuti mafayilo a WAL "atseke" pa disk ndipo mwina atha kutha.

Ndipo apa zosankha zimabwera kudzapulumutsa. heartbeat.interval.ms ΠΈ heartbeat.action.query. Kugwiritsa ntchito njirazi mwa awiriawiri kumapangitsa kuti zitheke kuchita pempho losintha deta patebulo losiyana nthawi iliyonse uthenga wa kugunda kwamtima utumizidwa. Chifukwa chake, LSN yomwe cholumikizira chili pano (mugawo lobwereza) imasinthidwa pafupipafupi. Izi zimathandiza DBMS kuchotsa mafayilo a WAL omwe sakufunikanso. Kuti mudziwe zambiri za momwe zosankha zimagwirira ntchito, onani zolemba.

Njira ina yomwe imayenera kuyang'aniridwa kwambiri ndi transforms. Ngakhale ndizosavuta komanso kukongola ...

Mwachikhazikitso, Debezium imapanga mitu pogwiritsa ntchito mfundo zotsatirazi: serverName.schemaName.tableName. Izi sizingakhale zothandiza nthawi zonse. Zosankha transforms pogwiritsa ntchito mawu okhazikika, mutha kufotokozera mndandanda wa matebulo omwe zochitika zake ziyenera kutumizidwa kumutu wokhala ndi dzina linalake.

Mu kasinthidwe wathu zikomo transforms zotsatirazi zimachitika: zochitika zonse za CDC kuchokera kumalo osungira omwe amatsatiridwa zidzapita kumutu ndi dzina data.cdc.dbname. Kupanda kutero (popanda makonda awa), Debezium ikanapanga mutu patebulo lililonse la mawonekedwe: pg-dev.public.<table_name>.

Zolepheretsa zolumikizira

Pamapeto pa kufotokozera za kasinthidwe kolumikizira kwa PostgreSQL, ndikofunikira kuyankhula za izi / zoperewera za ntchito yake:

  1. Magwiridwe a cholumikizira cha PostgreSQL amadalira lingaliro la decoding yomveka. Choncho iye sichitsata zopempha zosintha kamangidwe ka nkhokwe (DDL) - molingana, izi sizidzakhala pamitu.
  2. Popeza mipata yobwerezabwereza imagwiritsidwa ntchito, kulumikizana kwa cholumikizira ndikotheka okha kwa master DBMS chitsanzo.
  3. Ngati wogwiritsa ntchito pomwe cholumikizira chimalumikizidwa ndi nkhokwe ali ndi ufulu wowerengera okha, ndiye kuti musanayambe kukhazikitsidwa koyamba, muyenera kupanga pamanja polowera ndikusindikiza ku database.

Kugwiritsa ntchito kasinthidwe

Chifukwa chake tiyeni tikweze kasinthidwe kwathu mu cholumikizira:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Tikuwona kuti kutsitsa kudachita bwino ndipo cholumikizira chinayamba:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Zabwino: zakhazikitsidwa ndipo zakonzeka kupita. Tsopano tiyeni tiyerekeze kuti ndife ogula ndikulumikizana ndi Kafka, pambuyo pake timawonjezera ndikusintha cholowa patebulo:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Mu mutu wathu, izi zikuwonetsedwa motere:

JSON yayitali kwambiri ndi zosintha zathu

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

M'zochitika zonsezi, zolembazo zimakhala ndi fungulo (PK) la zolemba zomwe zinasinthidwa, komanso zenizeni za kusintha: zomwe mbiriyo inalipo kale ndi zomwe zinakhala pambuyo pake.

  • Pankhani ya INSERT: mtengo kale (before) zofanana nullkutsatiridwa ndi chingwe chomwe chinayikidwa.
  • Pankhani ya UPDATE: at payload.before mawonekedwe am'mbuyo a mzere akuwonetsedwa, ndi mkati payload.after - zatsopano ndi chiyambi cha kusintha.

2.2 MongoDB

Cholumikizira ichi chimagwiritsa ntchito njira yobwerezabwereza ya MongoDB, kuwerenga zambiri kuchokera ku oplog ya node yoyamba ya DBMS.

Mofananamo ndi chojambulira chomwe chafotokozedwa kale cha PgSQL, apanso, poyambira koyamba, chithunzithunzi choyambirira cha deta chimatengedwa, pambuyo pake cholumikizira chimasinthira ku oplog kuwerenga mode.

Chitsanzo chokonzekera:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Monga mukuonera, palibe zosankha zatsopano poyerekeza ndi chitsanzo chapitachi, koma chiwerengero chokha cha zosankha zomwe zimagwirizanitsa ndi database ndi prefixes zawo zachepetsedwa.

Makhalidwe transforms nthawiyi amachita izi: tembenuzani dzina la mutu womwe mukufuna kuchoka pa chiwembu <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

kulekerera zolakwika

Nkhani yololera zolakwa ndi kupezeka kwakukulu mu nthawi yathu ndi yovuta kwambiri kuposa kale lonse - makamaka tikamalankhula za deta ndi zochitika, ndi kufufuza kusintha kwa deta sikuli pambali pa nkhaniyi. Tiyeni tiwone zomwe zingasokoneze mfundo ndi zomwe zidzachitikire Debezium pazochitika zilizonse.

Pali njira zitatu zotuluka:

  1. Kafka Connect kulephera. Ngati Connect yakonzedwa kuti igwire ntchito yogawidwa, izi zimafuna antchito angapo kuti akhazikitse gulu lomwelo.id. Ndiye, ngati mmodzi wa iwo alephera, cholumikizira chidzayambiranso kwa wogwira ntchitoyo ndikupitiriza kuwerenga kuchokera kumalo omaliza odzipereka pamutu wa Kafka.
  2. Kutayika kwa kulumikizana ndi gulu la Kafka. Chojambuliracho chimangosiya kuwerenga pamalo omwe chinalephera kutumiza ku Kafka ndipo nthawi ndi nthawi yesetsani kutumizanso mpaka kuyesa kukwanitsa.
  3. Kochokera deta palibe. Cholumikizira chidzayesa kulumikizanso ku gwero malinga ndi kasinthidwe. Zosasintha ndizoyesa 16 kugwiritsa ntchito exponential backback. Pambuyo pa kuyesa kwa 16 kulephera, ntchitoyi idzalembedwa ngati Inalephera ndipo iyenera kuyambiranso pamanja kudzera pa mawonekedwe a Kafka Connect REST.
    • Pankhani ya PostgreSQL deta sidzatayika, chifukwa kugwiritsa ntchito mipata yobwereza kudzalepheretsa kuchotsedwa kwa mafayilo a WAL osawerengedwa ndi cholumikizira. Pankhaniyi, pali zovuta: ngati kugwirizana kwa intaneti pakati pa cholumikizira ndi DBMS kumasokonekera kwa nthawi yayitali, pali mwayi woti danga la disk lidzatha, ndipo izi zingayambitse kulephera kwa DBMS yonse.
    • Pankhani ya MySQL mafayilo a binlog amatha kuzunguliridwa ndi DBMS yokha kulumikizidwa kusanabwezeretsedwe. Izi zipangitsa kuti cholumikizira chilowe m'malo olephera, ndipo chidzafunika kuyambiranso mumayendedwe ojambulitsa kuti mupitirize kuwerenga kuchokera ku ma binlogs kuti mubwezeretse ntchito yabwinobwino.
    • pa MongoDB. Zolembazo zimati: khalidwe la cholumikizira ngati mafayilo a log / oplog achotsedwa ndipo cholumikizira sichingapitirize kuwerenga kuchokera pamalo pomwe chinasiya ndichofanana ndi DBMS yonse. Zimakhala kuti cholumikizira chidzalowa m'boma Inalephera ndipo adzafunika kuyambiransoko mumalowedwe chithunzithunzi choyambirira.

      Komabe, pali zosiyana. Ngati cholumikizira chinali chosalumikizidwa kwa nthawi yayitali (kapena sichinafike pamwambo wa MongoDB), ndipo oplog idazunguliridwa panthawiyi, ndiye kuti kugwirizanako kubwezeretsedwa, cholumikizira chidzapitirizabe kuwerenga deta kuchokera pamalo oyamba omwe alipo. , ndichifukwa chake zina mwa data ku Kafka osati idzagunda.

Pomaliza

Debezium ndichidziwitso changa choyamba ndi machitidwe a CDC ndipo yakhala yabwino kwambiri. Pulojekitiyi idapereka chiphuphu ku chithandizo cha DBMS yayikulu, kumasuka kwa kasinthidwe, chithandizo chamagulu ndi gulu logwira ntchito. Kwa omwe ali ndi chidwi chochita, ndikupangira kuti muwerenge malangizowo Kafka Connect ΠΈ Debezium.

Poyerekeza ndi cholumikizira cha JDBC cha Kafka Connect, mwayi waukulu wa Debezium ndikuti zosintha zimawerengedwa kuchokera ku zipika za DBMS, zomwe zimalola kuti deta ilandilidwe ndikuchedwa pang'ono. JDBC Connector (yoperekedwa ndi Kafka Connect) imafunsa tebulo lotsatiridwa panthawi yokhazikika ndipo (pazifukwa zomwezo) sizimapanga mauthenga pamene deta yachotsedwa (mungafunse bwanji deta yomwe palibe?).

Kuthetsa mavuto ofanana, inu mukhoza kulabadira njira zotsatirazi (kuphatikiza Debezium):

PS

Werenganinso pa blog yathu:

Source: www.habr.com

Kuwonjezera ndemanga