
Amin'ny asako, matetika aho no mahita vahaolana ara-teknika/vokatra rindrambaiko, vaovao momba izay zara raha misy amin'ny Internet amin'ny teny Rosiana. Amin'ity lahatsoratra ity dia hiezaka aho hameno ny banga toy izany amin'ny ohatra iray avy amin'ny fanaoko vao haingana, rehefa nila nanamboatra ny fandefasana hetsika CDC avy amin'ny DBMS malaza roa (PostgreSQL sy MongoDB) aho mankany amin'ny cluster Kafka mampiasa Debezium. Manantena aho fa ity lahatsoratra famerenana ity, izay miseho ho vokatry ny asa vita, dia hahasoa ny hafa.
Inona no atao hoe Debezium sy CDC amin'ny ankapobeny?
- solontenan'ny sokajy rindrambaiko CDC (), na ny marimarina kokoa, dia andiana mpampitohy ho an'ny DBMS isan-karazany mifanaraka amin'ny rafitra Apache Kafka Connect.
izany nahazo alalana tamin'ny Apache License v2.0 ary tohanan'ny Red Hat. Ny fampandrosoana dia mitohy hatramin'ny 2016 ary amin'izao fotoana izao dia manome fanohanana ofisialy ireto DBMS manaraka ireto: MySQL, PostgreSQL, MongoDB, SQL Server. Misy ihany koa ny mpampitohy an'i Cassandra sy Oracle, fa amin'izao fotoana izao dia ao anatin'ny toeran'ny "fidirana aloha" izy ireo, ary ny famoahana vaovao dia tsy miantoka ny fifanarahana miverina.
Raha ampitahaintsika ny CDC amin'ny fomba nentim-paharazana (rehefa mamaky mivantana ny angon-drakitra avy amin'ny DBMS ny fampiharana), ny tombony lehibe indrindra dia ny fampiharana ny fikorianan'ny angon-drakitra mivantana amin'ny ambaratonga andalana miaraka amin'ny latency ambany, azo itokisana ambony ary misy. Ny teboka roa farany dia tratra amin'ny fampiasana cluster Kafka ho tahiry ho an'ny hetsika CDC.
Ny tombony iray hafa dia ny hoe modely tokana no ampiasaina amin'ny fitahirizana hetsika, ka ny fampiharana farany dia tsy mila manahy momba ny nuances amin'ny fampiasana DBMS samihafa.
Farany, ny fampiasana broker hafatra dia ahafahan'ny rindranasa manara-maso ny fiovan'ny angon-drakitra mba hanitatra mitsivalana. Mandritra izany fotoana izany dia mihena ny fiantraikan'ny loharanom-baovao, satria ny angon-drakitra dia tsy azo mivantana avy amin'ny DBMS, fa avy amin'ny cluster Kafka.
Momba ny maritrano Debezium
Ny fampiasana Debezium dia tonga amin'ity tetika tsotra ity:
DBMS (ho loharanom-baovao) → mpampitohy ao amin'ny Kafka Connect → Apache Kafka → mpanjifa
Ho fanoharana, ity misy kisary avy amin'ny tranokalan'ny tetikasa:

Na izany aza, tsy dia tiako loatra ity tetika ity, satria toa ny fampiasana connecteur sink ihany no azo atao.
Raha ny marina, hafa ny toe-javatra: mameno ny Farihy Datanao (Rohy farany amin'ny kisary etsy ambony) Tsy io ihany no fomba fampiasana Debezium. Ny hetsika alefa any amin'ny Apache Kafka dia azon'ny rindranasa ampiasaina hiatrehana toe-javatra samihafa. Ohatra:
- fanesorana ny angona tsy misy ifandraisany amin'ny cache;
- fandefasana fampandrenesana;
- fanavaozana fanondroana fikarohana;
- karazana log de audit;
- ...
Raha toa ka manana fampiharana Java ianao ary tsy ilaina / azo atao ny mampiasa cluster Kafka, dia misy ihany koa ny fahafahana miasa . Ny tombony miharihary dia ny hoe afaka mandà fotodrafitrasa fanampiny ianao (amin'ny endrika connector sy Kafka). Na izany aza, ity vahaolana ity dia tsy ampiasaina hatramin'ny dikan-teny 1.1 ary tsy asaina ampiasaina intsony (mety esorina amin'ny famoahana ho avy).
Ity lahatsoratra ity dia hiresaka momba ny maritrano natolotry ny mpamorona, izay manome ny fandeferana sy ny scalability.
Fametrahana mpampitohy
Mba hanombohana ny fanaraha-maso ny fiovana amin'ny sanda manan-danja indrindra - data - dia mila:
- loharano angona, izay mety ho MySQL manomboka amin'ny version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ ();
- Apache Kafka cluster;
- Ohatra Kafka Connect (dikan-teny 1.x, 2.x);
- configured Debezium connector.
Miasa amin'ny teboka roa voalohany, i.e. Ny fizotry ny fametrahana ny DBMS sy Apache Kafka dia mihoatra ny faritry ny lahatsoratra. Na izany aza, ho an'ireo izay te hametraka ny zava-drehetra ao anaty boaty fasika, ny tahiry ofisialy misy ohatra dia efa vonona. .
Hiresaka bebe kokoa momba ireo teboka roa farany isika.
0. Kafka Connect
Eto sy any aoriana ao amin'ny lahatsoratra, ny ohatra config rehetra dia raisina amin'ny tontolon'ny sary Docker nozarain'ireo mpamorona Debezium. Izy io dia ahitana ny rakitra plugin rehetra ilaina (mpampifandray) ary manome ny konfigurasi Kafka Connect amin'ny fampiasana ny fari-piainan'ny tontolo iainana.
Raha mikasa ny hampiasa ny Kafka Connect avy amin'ny Confluent ianao, dia mila ampidirinao tsy miankina amin'ny lahatahiry voatondro ao amin'ny lahatahiry ireo plugins an'ny mpampitohy ilaina. plugin.path na apetraka amin'ny alalan'ny fari-piainana manodidina CLASSPATH. Ny fika ho an'ny mpiasa sy ny mpampitohy Kafka Connect dia voafaritra amin'ny alàlan'ny rakitra fanamafisam-peo izay ampitaina ho tohan-kevitra amin'ny baiko fanombohan'ny mpiasa. Raha mila fanazavana fanampiny dia jereo .
Ny dingana manontolo amin'ny fametrahana Debeizum amin'ny dikan-tsarimihetsika dia atao amin'ny dingana roa. Andeha hojerentsika ny tsirairay amin'izy ireo:
1. Fametrahana ny rafitra Kafka Connect
Mba hampandehanana ny angona mankany amin'ny kluster Apache Kafka, dia apetraka ao amin'ny rafitra Kafka Connect ny mason-tsivana manokana, toy ny:
- paramètre hifandraisana amin'ny cluster,
- anaran'ny lohahevitra hitehirizana ny fanefena ny mpampitohy,
- ny anaran'ny vondrona misy ny mpampitohy (raha ny fomba fizarana no ampiasaina).
Ny sarin'ny Docker ofisialin'ny tetikasa dia manohana ny fanamafisana amin'ny fampiasana ny fari-piainan'ny tontolo iainana - izany no hampiasaintsika. Aleo àry alaina ny sary:
docker pull debezium/connectToy izao manaraka izao ny fitambaran'ny fari-piainan'ny tontolo iainana ilaina hampandehanana ny mpampitohy:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092- lisitra voalohany amin'ireo mpizara kluster Kafka hahazoana lisitra feno amin'ireo mpikambana ao amin'ny kluster; -
OFFSET_STORAGE_TOPIC=connector-offsets- lohahevitra momba ny fitehirizana toerana misy ny mpampitohy amin'izao fotoana izao; -
CONNECT_STATUS_STORAGE_TOPIC=connector-status- lohahevitra momba ny fitahirizana ny satan'ny mpampitohy sy ny asany; -
CONFIG_STORAGE_TOPIC=connector-config- lohahevitra momba ny fitehirizana angon-drakitra fanamafisana ny mpampitohy sy ny asany; -
GROUP_ID=1- famantarana ny vondron'ny mpiasa izay azo tanterahina ny asa mpampitohy; ilaina rehefa mampiasa zaraina (zaraina) fitondrana.
Manomboka ny container miaraka amin'ireto variable ireto izahay:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2Fanamarihana momba an'i Avro
Amin'ny alàlan'ny default, Debezium dia manoratra angona amin'ny endrika JSON, izay azo ekena ho an'ny sandboxes sy data kely, saingy mety ho lasa olana amin'ny angon-drakitra feno entana. Ny safidy hafa amin'ny mpanova JSON dia ny fandefasana hafatra amin'ny fampiasana amin'ny endrika binary, izay mampihena ny enta-mavesatra amin'ny subsystem I/O ao amin'ny Apache Kafka.
Raha te hampiasa ny Avro ianao dia mila mametraka misaraka (ho fitahirizana kisary). Ny fari-piainana ho an'ny mpanova dia ho toy izao:
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverterNy antsipirihan'ny fampiasana Avro sy ny fametrahana ny rejisitra ho azy dia mihoatra ny sahan'ity lahatsoratra ity - bebe kokoa, ho fanazavana, hampiasa JSON izahay.
2. Fametrahana ny connector mihitsy
Ankehitriny ianao dia afaka mandeha mivantana amin'ny fanamafisana ny connector mihitsy, izay hamaky ny angona avy amin'ny loharano.
Andeha hojerentsika ny ohatry ny mpampitohy ho an'ny DBMS roa: PostgreSQL sy MongoDB, izay ananako traikefa ary misy fahasamihafana (na dia kely aza, fa amin'ny tranga sasany dia manan-danja!).
Nofaritana tao amin'ny fanamarihana JSON ny fanitsiana ary nampidirina tao amin'ny Kafka Connect mampiasa fangatahana POST.
2.1. PostgreSQL
Ohatra fanefena mpampitohy ho an'ny PostgreSQL:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}Ny fitsipiky ny fiasan'ny connector taorian'ny setup dia tena tsotra:
- Amin'ny fanombohana voalohany dia mifandray amin'ny angon-drakitra voatondro ao amin'ny fanamafisana ary manomboka amin'ny fomba sary voalohany, mandefa any amin'ny Kafka ny angon-drakitra voalohany azo amin'ny fampiasana ny fepetra
SELECT * FROM table_name. - Rehefa vita ny fanombohana dia miditra amin'ny fomba famakiana ny fanovana avy amin'ny rakitra PostgreSQL WAL ny mpampitohy.
Momba ny safidy ampiasaina:
-
name- ny anaran'ny mpampitohy izay ampiasaina ny fanamafisam-peo voalaza etsy ambany; amin'ny ho avy, io anarana io dia ampiasaina hiara-miasa amin'ny mpampitohy (izany hoe, jereo ny sata / avereno / fanavaozana ny fanamafisana) amin'ny alàlan'ny Kafka Connect REST API; -
connector.class- Kilasy mpampitohy DBMS izay hampiasain'ny mpampitohy voarindra; -
plugin.name- ny anaran'ny plugin ho an'ny decoding lojika ny angona avy amin'ny rakitra WAL. Azo isafidiananawal2json,decoderbuffsиpgoutput. Ny roa voalohany dia mitaky ny fametrahana ny fanitarana mety amin'ny DBMS, arypgoutputho an'ny PostgreSQL version 10 sy ambony dia tsy mitaky fanodinkodinana fanampiny; -
database.*— safidy hifandraisana amin'ny angon-drakitra, aizadatabase.server.name- Anarana ohatra PostgreSQL ampiasaina hamoronana ny anaran'ny lohahevitra ao amin'ny cluster Kafka; -
table.include.list- lisitry ny latabatra izay tiantsika hanara-maso ny fiovana; voafaritra amin'ny endrikaschema.table_name; tsy azo ampiasaina miarakatable.exclude.list; -
heartbeat.interval.ms- elanelam-potoana (amin'ny milisegogondra) izay handefasan'ny mpampitohy hafatra momba ny fitempon'ny fo amin'ny lohahevitra manokana; -
heartbeat.action.query- fangatahana izay hotanterahina rehefa mandefa hafatra fitepon'ny fo tsirairay (miseho amin'ny dikan-teny 1.1 ny safidy); -
slot.name- ny anaran'ny slot replication izay hampiasain'ny mpampitohy; publication.name- Anarana ao amin'ny PostgreSQL, izay ampiasain'ny mpampitohy. Raha tsy misy izany dia hiezaka hamorona azy i Debezium. Raha tsy manana zo ampy amin'ity hetsika ity ny mpampiasa izay anaovana ny fifandraisana, dia hivoaka miaraka amin'ny hadisoana ny mpampitohy;-
transformsmamaritra tsara ny fomba hanovana ny anaran'ny lohahevitra kendrena:-
transforms.AddPrefix.typemanondro fa hampiasa fomba fiteny mahazatra isika; -
transforms.AddPrefix.regex- saron-tava izay mamaritra ny anaran'ny lohahevitra kendrena; -
transforms.AddPrefix.replacement- mivantana izay amaritanay indray.
-
Bebe kokoa momba ny fitempon'ny fo sy ny fiovana
Amin'ny alàlan'ny default, ny mpampitohy dia mandefa angona any amin'ny Kafka isaky ny fifampiraharahana atao, ary manoratra ny LSN (Log Sequence Number) ao amin'ny lohahevitry ny serivisy. offset. Fa inona no mitranga raha toa ny connector dia natao hamaky tsy ny angon-drakitra manontolo, fa ampahany amin'ny latabatra ihany (izay tsy misy fanavaozana data matetika)?
- Ny mpampitohy dia hamaky ny rakitra WAL ary tsy hahita ny fifampiraharahana atao amin'ny tabilao arahiny.
- Noho izany dia tsy hanavao ny toerana misy azy amin'izao fotoana izao na amin'ny lohahevitra na amin'ny slot replication.
- Izany, ho setrin'izany, dia hahatonga ny rakitra WAL hotazonina amin'ny kapila ary mety ho lany ny habaka kapila.
Ary eto no tonga ny safidy ho famonjena. heartbeat.interval.ms и heartbeat.action.query. Ny fampiasana tsiroaroa ireo safidy ireo dia ahafahana manao fangatahana hanovana angona ao anaty latabatra mitokana isaky ny misy hafatra fitepon'ny fo. Noho izany, ny LSN izay misy ny connector amin'izao fotoana izao (ao amin'ny slot replication) dia havaozina tsy tapaka. Izany dia ahafahan'ny DBMS manala ny rakitra WAL izay tsy ilaina intsony. Afaka mianatra bebe kokoa momba ny fomba fiasan'ny safidy ianao .
Safidy iray hafa mendrika hojerena akaiky dia transforms. Na dia miompana kokoa amin'ny fanamorana sy ny hatsarana aza...
Amin'ny alàlan'ny default, Debezium dia mamorona lohahevitra mampiasa ity politikan'ny anarana manaraka ity: serverName.schemaName.tableName. Mety tsy mety foana izany. FANDIKANA transforms Azonao atao ny mampiasa fomba fiteny mahazatra hamaritana lisitr'ireo tabilao, hetsika izay tokony halehana amin'ny lohahevitra misy anarana manokana.
Ao amin'ny configuration anay misaotra transforms izao no mitranga: ny hetsika CDC rehetra avy amin'ny angon-drakitra voara-maso dia handeha amin'ny lohahevitra iray misy ny anarana data.cdc.dbname. Raha tsy izany (tsy misy ireo fanovana ireo), Debezium dia mamorona lohahevitra ho an'ny latabatra tsirairay toy ny: pg-dev.public.<table_name>.
Fepetran'ny mpampitohy
Mba hamaranana ny famaritana ny fanefena connector ho an'ny PostgreSQL dia ilaina ny miresaka momba ireto endri-javatra manaraka ireto / fetran'ny fampandehanana azy:
- Ny fiasan'ny mpampitohy ho an'ny PostgreSQL dia miankina amin'ny foto-kevitry ny decoding lojika. Noho izany izy tsy manara-maso ny fangatahana hanovana ny rafitra angona (DDL) - arak'izany dia tsy ho ao anatin'ireo lohahevitra ireo angona ireo.
- Koa satria slots replication no ampiasaina, dia azo atao ny mampifandray connector ihany mankany amin'ny ohatra DBMS voalohany.
- Raha nomena zo vakiana fotsiny ny mpampiasa izay ampifandraisin'ny mpampitohy amin'ny angon-drakitra, dia alohan'ny fandefasana voalohany dia mila mamorona slot replication ianao ary mamoaka amin'ny angon-drakitra.
Mampihatra ny configuration
Noho izany, andao hampiditra ny config ao amin'ny connector:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.jsonManamarina izahay fa nahomby ny fampidinana ary nanomboka ny mpampitohy:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}Tsara: efa napetraka ary vonona ny handeha. Andao izao mody ho mpanjifa ary hifandray amin'i Kafka, ary avy eo dia hanampy sy hanova ny fidirana amin'ny latabatra:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1Ao amin'ny lohahevitray dia toy izao no hiseho:
JSON lava be miaraka amin'ny fiovanay
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}Amin'ireo tranga roa ireo, ny firaketana dia ahitana ny fanalahidin'ny rakitra novaina, ary ny tena fototry ny fiovana: inona ilay rakitsoratra taloha ary inona no lasa taorian'izay.
- Amin'ny tranga
INSERT: sanda taloha (before) mitovynull, ary aorian'izany - ny tsipika nampidirina. - Amin'ny tranga
UPDATE: aopayload.beforeny toetry ny tsipika teo aloha dia aseho, ary aopayload.after- vaovao miaraka amin'ny fototry ny fiovana.
2.2 MongoDB
Ity mpampitohy ity dia mampiasa ny rafitra replication MongoDB mahazatra, mamaky fampahalalana avy amin'ny oplog amin'ny node DBMS voalohany.
Mitovy amin'ilay mpampitohy efa nofaritana ho an'ny PgSQL, eto koa, amin'ny fanombohana voalohany, dia alaina ny snapshot data voalohany, ary avy eo ny mpampitohy dia mifindra amin'ny fomba famakiana oplog.
Ohatra config:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}Araka ny hitanao dia tsy misy safidy vaovao eto raha oharina amin'ny ohatra teo aloha, fa ny isan'ny safidy tompon'andraikitra amin'ny fampifandraisana amin'ny angon-drakitra sy ny prefixes ihany no nahena.
Fikirana transforms Amin'ity indray mitoraka ity dia manao izao manaraka izao izy ireo: manova ny anaran'ny lohahevitra kendrena amin'ny tetika <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
fandeferana fahadisoana
Ny olan'ny fandeferana ny fahadisoana sy ny fahazoana avo lenta amin'izao androntsika izao dia mahery kokoa noho ny hatramin'izay - indrindra rehefa miresaka momba ny angon-drakitra sy ny fifampiraharahana isika, ary ny fanaraha-maso ny fiovan'ny angon-drakitra dia tsy eny an-dalana amin'ity raharaha ity. Andeha hojerentsika izay mety ho diso amin'ny fitsipika ary inona no hitranga amin'i Debezium amin'ny tranga tsirairay.
Misy safidy telo hialana:
- Tsy fahombiazana ny Kafka Connect. Raha namboarina hiasa amin'ny fomba mizarazara ny Connect, dia mitaky mpiasa maromaro izany mba hametraka groupe.id mitovy. Avy eo, raha toa ka tsy mahomby ny iray amin'izy ireo, dia haverina amin'ny mpiasa iray hafa ny mpampitohy ary hanohy ny famakiana avy amin'ny toerana fanolorana farany amin'ny lohahevitra ao amin'ny Kafka.
- Very ny fifandraisana amin'ny cluster Kafka. Hajanony fotsiny ny famakian-teny eo amin'ny toerana tsy nandefasana an'i Kafka ny mpampitohy, ary hiezaka ny handefa azy io tsindraindray mandra-pahombiazan'ilay andrana.
- Ny tsy fisian'ny loharanom-baovao. Ny mpampitohy dia hiezaka ny hampifandray indray amin'ny loharano araka ny fanamboarana. Ny default dia andrana 16 ampiasaina . Aorian'ny andrana faha-16 tsy nahomby dia homarihina ho toy ny tsy nahomby ary mila averinao amin'ny tanana izany amin'ny alàlan'ny interface Kafka Connect REST.
- Amin'ny tranga PostgreSQL ny angona tsy ho very, satria Ny fampiasana slot replication dia hisakana ny famafana ny rakitra WAL izay tsy vakian'ny mpampitohy. Amin'ity tranga ity, misy ny lafy ratsiny: raha tapaka ny fifandraisana amin'ny tambajotra eo amin'ny connector sy ny DBMS mandritra ny fotoana maharitra, dia mety ho lany ny habaka kapila, ary mety hitarika ho amin'ny tsy fahombiazan'ny DBMS manontolo izany.
- Amin'ny tranga MySQL Ny rakitra binlog dia azo odidin'ny DBMS mihitsy alohan'ny hamerenana ny fifandraisana. Izany dia hahatonga ny mpampitohy hiditra amin'ny toe-javatra tsy nahomby, ary mba hamerenana amin'ny laoniny ny fiasa ara-dalàna, dia mila averinao indray amin'ny maodely snapshot voalohany hanohizana ny famakiana ny binlogs.
- amin'ny MongoDB. Ny antontan-taratasy dia milaza fa ny fitondran-tenan'ny mpampitohy raha toa ka voafafa ny rakitra log/oplog ary ny mpampitohy dia tsy afaka manohy mamaky ny toerana nialany dia mitovy amin'ny DBMS rehetra. Midika izany fa ny connector dia ho any amin'ny fanjakana tsy nahomby ary mila averina amin'ny fomba sary voalohany.
Na izany aza, misy maningana. Raha tapaka nandritra ny fotoana ela ny connector (na tsy tonga tany amin'ny ohatra MongoDB), ary ny oplog dia nandalo fihodinana nandritra izany fotoana izany, dia rehefa tafaverina ny fifandraisana dia hanohy hamaky ny angona avy amin'ny toerana voalohany misy ny connecteur, izay no mahatonga ny sasany amin'ireo angona ao amin'ny Kafka tsy hamely.
famaranana
Debezium no traikefako voalohany tamin'ny rafitra CDC ary tena tsara amin'ny ankapobeny. Nandresy ny tetikasa tamin'ny fanohanany ireo DBMS lehibe, ny fanamorana ny fanamafisana, ny fanohanana ny clustering, ary ny fiaraha-monina mavitrika. Ho an'ireo liana amin'ny fampiharana dia manoro hevitra anao aho hamaky ny torolàlana momba ny и .
Raha ampitahaina amin'ny mpampitohy JDBC ho an'ny Kafka Connect, ny tombony lehibe indrindra amin'ny Debezium dia ny famakiana ny fanovana avy amin'ny logs DBMS, izay ahafahan'ny angon-drakitra azo raisina miaraka amin'ny latency kely indrindra. Ny JDBC Connector (avy amin'ny Kafka Connect) dia manontany ny latabatra voaara-maso amin'ny elanelana voafaritra ary (noho izany antony izany ihany) dia tsy mamoaka hafatra rehefa voafafa ny angona (ahoana no ahafahanao manontany angona tsy misy?).
Mba hamahana olana mitovy amin'izany dia azonao atao ny mandinika ireto vahaolana manaraka ireto (ankoatra ny Debezium):
- Vahaolana maromaro ho an'ny MySQL ihany:
- , fa hafa tanteraka ity “sokajy lanja”.
Sal
Vakio ihany koa ao amin'ny bilaoginay:
- «";
- «";
- «".
Source: www.habr.com
