Fampidirana Debezium - CDC ho an'i Apache Kafka

Fampidirana Debezium - CDC ho an'i Apache Kafka

Amin'ny asako, matetika aho no mahita vahaolana ara-teknika/vokatra rindrambaiko, vaovao momba izay zara raha misy amin'ny Internet amin'ny teny Rosiana. Amin'ity lahatsoratra ity dia hiezaka aho hameno ny banga toy izany amin'ny ohatra iray avy amin'ny fanaoko vao haingana, rehefa nila nanamboatra ny fandefasana hetsika CDC avy amin'ny DBMS malaza roa (PostgreSQL sy MongoDB) aho mankany amin'ny cluster Kafka mampiasa Debezium. Manantena aho fa ity lahatsoratra famerenana ity, izay miseho ho vokatry ny asa vita, dia hahasoa ny hafa.

Inona no atao hoe Debezium sy CDC amin'ny ankapobeny?

Debesium - solontenan'ny sokajy rindrambaiko CDC (Makà fanovana angona), na ny marimarina kokoa, dia andiana mpampitohy ho an'ny DBMS isan-karazany mifanaraka amin'ny rafitra Apache Kafka Connect.

izany Tetikasa Open Source, nahazo alalana tamin'ny Apache License v2.0 ary tohanan'ny Red Hat. Ny fampandrosoana dia mitohy hatramin'ny 2016 ary amin'izao fotoana izao dia manome fanohanana ofisialy ireto DBMS manaraka ireto: MySQL, PostgreSQL, MongoDB, SQL Server. Misy ihany koa ny mpampitohy an'i Cassandra sy Oracle, fa amin'izao fotoana izao dia ao anatin'ny toeran'ny "fidirana aloha" izy ireo, ary ny famoahana vaovao dia tsy miantoka ny fifanarahana miverina.

Raha ampitahaintsika ny CDC amin'ny fomba nentim-paharazana (rehefa mamaky mivantana ny angon-drakitra avy amin'ny DBMS ny fampiharana), ny tombony lehibe indrindra dia ny fampiharana ny fikorianan'ny angon-drakitra mivantana amin'ny ambaratonga andalana miaraka amin'ny latency ambany, azo itokisana ambony ary misy. Ny teboka roa farany dia tratra amin'ny fampiasana cluster Kafka ho tahiry ho an'ny hetsika CDC.

Ny tombony iray hafa dia ny hoe modely tokana no ampiasaina amin'ny fitahirizana hetsika, ka ny fampiharana farany dia tsy mila manahy momba ny nuances amin'ny fampiasana DBMS samihafa.

Farany, ny fampiasana broker hafatra dia ahafahan'ny rindranasa manara-maso ny fiovan'ny angon-drakitra mba hanitatra mitsivalana. Mandritra izany fotoana izany dia mihena ny fiantraikan'ny loharanom-baovao, satria ny angon-drakitra dia tsy azo mivantana avy amin'ny DBMS, fa avy amin'ny cluster Kafka.

Momba ny maritrano Debezium

Ny fampiasana Debezium dia tonga amin'ity tetika tsotra ity:

DBMS (ho loharanom-baovao) → mpampitohy ao amin'ny Kafka Connect → Apache Kafka → mpanjifa

Ho fanoharana, ity misy kisary avy amin'ny tranokalan'ny tetikasa:

Fampidirana Debezium - CDC ho an'i Apache Kafka

Na izany aza, tsy dia tiako loatra ity tetika ity, satria toa ny fampiasana connecteur sink ihany no azo atao.

Raha ny marina, hafa ny toe-javatra: mameno ny Farihy Datanao (Rohy farany amin'ny kisary etsy ambony) Tsy io ihany no fomba fampiasana Debezium. Ny hetsika alefa any amin'ny Apache Kafka dia azon'ny rindranasa ampiasaina hiatrehana toe-javatra samihafa. Ohatra:

  • fanesorana ny angona tsy misy ifandraisany amin'ny cache;
  • fandefasana fampandrenesana;
  • fanavaozana fanondroana fikarohana;
  • karazana log de audit;
  • ...

Raha toa ka manana fampiharana Java ianao ary tsy ilaina / azo atao ny mampiasa cluster Kafka, dia misy ihany koa ny fahafahana miasa embedded-connector. Ny tombony miharihary dia ny hoe afaka mandà fotodrafitrasa fanampiny ianao (amin'ny endrika connector sy Kafka). Na izany aza, ity vahaolana ity dia tsy ampiasaina hatramin'ny dikan-teny 1.1 ary tsy asaina ampiasaina intsony (mety esorina amin'ny famoahana ho avy).

Ity lahatsoratra ity dia hiresaka momba ny maritrano natolotry ny mpamorona, izay manome ny fandeferana sy ny scalability.

Fametrahana mpampitohy

Mba hanombohana ny fanaraha-maso ny fiovana amin'ny sanda manan-danja indrindra - data - dia mila:

  1. loharano angona, izay mety ho MySQL manomboka amin'ny version 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (lisitra feno);
  2. Apache Kafka cluster;
  3. Ohatra Kafka Connect (dikan-teny 1.x, 2.x);
  4. configured Debezium connector.

Miasa amin'ny teboka roa voalohany, i.e. Ny fizotry ny fametrahana ny DBMS sy Apache Kafka dia mihoatra ny faritry ny lahatsoratra. Na izany aza, ho an'ireo izay te hametraka ny zava-drehetra ao anaty boaty fasika, ny tahiry ofisialy misy ohatra dia efa vonona. docker-compose.yaml.

Hiresaka bebe kokoa momba ireo teboka roa farany isika.

0. Kafka Connect

Eto sy any aoriana ao amin'ny lahatsoratra, ny ohatra config rehetra dia raisina amin'ny tontolon'ny sary Docker nozarain'ireo mpamorona Debezium. Izy io dia ahitana ny rakitra plugin rehetra ilaina (mpampifandray) ary manome ny konfigurasi Kafka Connect amin'ny fampiasana ny fari-piainan'ny tontolo iainana.

Raha mikasa ny hampiasa ny Kafka Connect avy amin'ny Confluent ianao, dia mila ampidirinao tsy miankina amin'ny lahatahiry voatondro ao amin'ny lahatahiry ireo plugins an'ny mpampitohy ilaina. plugin.path na apetraka amin'ny alalan'ny fari-piainana manodidina CLASSPATH. Ny fika ho an'ny mpiasa sy ny mpampitohy Kafka Connect dia voafaritra amin'ny alàlan'ny rakitra fanamafisam-peo izay ampitaina ho tohan-kevitra amin'ny baiko fanombohan'ny mpiasa. Raha mila fanazavana fanampiny dia jereo tahirin-kevitra.

Ny dingana manontolo amin'ny fametrahana Debeizum amin'ny dikan-tsarimihetsika dia atao amin'ny dingana roa. Andeha hojerentsika ny tsirairay amin'izy ireo:

1. Fametrahana ny rafitra Kafka Connect

Mba hampandehanana ny angona mankany amin'ny kluster Apache Kafka, dia apetraka ao amin'ny rafitra Kafka Connect ny mason-tsivana manokana, toy ny:

  • paramètre hifandraisana amin'ny cluster,
  • anaran'ny lohahevitra hitehirizana ny fanefena ny mpampitohy,
  • ny anaran'ny vondrona misy ny mpampitohy (raha ny fomba fizarana no ampiasaina).

Ny sarin'ny Docker ofisialin'ny tetikasa dia manohana ny fanamafisana amin'ny fampiasana ny fari-piainan'ny tontolo iainana - izany no hampiasaintsika. Aleo àry alaina ny sary:

docker pull debezium/connect

Toy izao manaraka izao ny fitambaran'ny fari-piainan'ny tontolo iainana ilaina hampandehanana ny mpampitohy:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - lisitra voalohany amin'ireo mpizara kluster Kafka hahazoana lisitra feno amin'ireo mpikambana ao amin'ny kluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets - lohahevitra momba ny fitehirizana toerana misy ny mpampitohy amin'izao fotoana izao;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - lohahevitra momba ny fitahirizana ny satan'ny mpampitohy sy ny asany;
  • CONFIG_STORAGE_TOPIC=connector-config - lohahevitra momba ny fitehirizana angon-drakitra fanamafisana ny mpampitohy sy ny asany;
  • GROUP_ID=1 - famantarana ny vondron'ny mpiasa izay azo tanterahina ny asa mpampitohy; ilaina rehefa mampiasa zaraina (zaraina) fitondrana.

Manomboka ny container miaraka amin'ireto variable ireto izahay:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Fanamarihana momba an'i Avro

Amin'ny alàlan'ny default, Debezium dia manoratra angona amin'ny endrika JSON, izay azo ekena ho an'ny sandboxes sy data kely, saingy mety ho lasa olana amin'ny angon-drakitra feno entana. Ny safidy hafa amin'ny mpanova JSON dia ny fandefasana hafatra amin'ny fampiasana Avro amin'ny endrika binary, izay mampihena ny enta-mavesatra amin'ny subsystem I/O ao amin'ny Apache Kafka.

Raha te hampiasa ny Avro ianao dia mila mametraka misaraka schema-registry (ho fitahirizana kisary). Ny fari-piainana ho an'ny mpanova dia ho toy izao:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Ny antsipirihan'ny fampiasana Avro sy ny fametrahana ny rejisitra ho azy dia mihoatra ny sahan'ity lahatsoratra ity - bebe kokoa, ho fanazavana, hampiasa JSON izahay.

2. Fametrahana ny connector mihitsy

Ankehitriny ianao dia afaka mandeha mivantana amin'ny fanamafisana ny connector mihitsy, izay hamaky ny angona avy amin'ny loharano.

Andeha hojerentsika ny ohatry ny mpampitohy ho an'ny DBMS roa: PostgreSQL sy MongoDB, izay ananako traikefa ary misy fahasamihafana (na dia kely aza, fa amin'ny tranga sasany dia manan-danja!).

Nofaritana tao amin'ny fanamarihana JSON ny fanitsiana ary nampidirina tao amin'ny Kafka Connect mampiasa fangatahana POST.

2.1. PostgreSQL

Ohatra fanefena mpampitohy ho an'ny PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ny fitsipiky ny fiasan'ny connector taorian'ny setup dia tena tsotra:

  • Amin'ny fanombohana voalohany dia mifandray amin'ny angon-drakitra voatondro ao amin'ny fanamafisana ary manomboka amin'ny fomba sary voalohany, mandefa any amin'ny Kafka ny angon-drakitra voalohany azo amin'ny fampiasana ny fepetra SELECT * FROM table_name.
  • Rehefa vita ny fanombohana dia miditra amin'ny fomba famakiana ny fanovana avy amin'ny rakitra PostgreSQL WAL ny mpampitohy.

Momba ny safidy ampiasaina:

  • name - ny anaran'ny mpampitohy izay ampiasaina ny fanamafisam-peo voalaza etsy ambany; amin'ny ho avy, io anarana io dia ampiasaina hiara-miasa amin'ny mpampitohy (izany hoe, jereo ny sata / avereno / fanavaozana ny fanamafisana) amin'ny alàlan'ny Kafka Connect REST API;
  • connector.class - Kilasy mpampitohy DBMS izay hampiasain'ny mpampitohy voarindra;
  • plugin.name - ny anaran'ny plugin ho an'ny decoding lojika ny angona avy amin'ny rakitra WAL. Azo isafidianana wal2json, decoderbuffs и pgoutput. Ny roa voalohany dia mitaky ny fametrahana ny fanitarana mety amin'ny DBMS, ary pgoutput ho an'ny PostgreSQL version 10 sy ambony dia tsy mitaky fanodinkodinana fanampiny;
  • database.* — safidy hifandraisana amin'ny angon-drakitra, aiza database.server.name - Anarana ohatra PostgreSQL ampiasaina hamoronana ny anaran'ny lohahevitra ao amin'ny cluster Kafka;
  • table.include.list - lisitry ny latabatra izay tiantsika hanara-maso ny fiovana; voafaritra amin'ny endrika schema.table_name; tsy azo ampiasaina miaraka table.exclude.list;
  • heartbeat.interval.ms - elanelam-potoana (amin'ny milisegogondra) izay handefasan'ny mpampitohy hafatra momba ny fitempon'ny fo amin'ny lohahevitra manokana;
  • heartbeat.action.query - fangatahana izay hotanterahina rehefa mandefa hafatra fitepon'ny fo tsirairay (miseho amin'ny dikan-teny 1.1 ny safidy);
  • slot.name - ny anaran'ny slot replication izay hampiasain'ny mpampitohy;
  • publication.name - Anarana boky ao amin'ny PostgreSQL, izay ampiasain'ny mpampitohy. Raha tsy misy izany dia hiezaka hamorona azy i Debezium. Raha tsy manana zo ampy amin'ity hetsika ity ny mpampiasa izay anaovana ny fifandraisana, dia hivoaka miaraka amin'ny hadisoana ny mpampitohy;
  • transforms mamaritra tsara ny fomba hanovana ny anaran'ny lohahevitra kendrena:
    • transforms.AddPrefix.type manondro fa hampiasa fomba fiteny mahazatra isika;
    • transforms.AddPrefix.regex - saron-tava izay mamaritra ny anaran'ny lohahevitra kendrena;
    • transforms.AddPrefix.replacement - mivantana izay amaritanay indray.

Bebe kokoa momba ny fitempon'ny fo sy ny fiovana

Amin'ny alàlan'ny default, ny mpampitohy dia mandefa angona any amin'ny Kafka isaky ny fifampiraharahana atao, ary manoratra ny LSN (Log Sequence Number) ao amin'ny lohahevitry ny serivisy. offset. Fa inona no mitranga raha toa ny connector dia natao hamaky tsy ny angon-drakitra manontolo, fa ampahany amin'ny latabatra ihany (izay tsy misy fanavaozana data matetika)?

  • Ny mpampitohy dia hamaky ny rakitra WAL ary tsy hahita ny fifampiraharahana atao amin'ny tabilao arahiny.
  • Noho izany dia tsy hanavao ny toerana misy azy amin'izao fotoana izao na amin'ny lohahevitra na amin'ny slot replication.
  • Izany, ho setrin'izany, dia hahatonga ny rakitra WAL hotazonina amin'ny kapila ary mety ho lany ny habaka kapila.

Ary eto no tonga ny safidy ho famonjena. heartbeat.interval.ms и heartbeat.action.query. Ny fampiasana tsiroaroa ireo safidy ireo dia ahafahana manao fangatahana hanovana angona ao anaty latabatra mitokana isaky ny misy hafatra fitepon'ny fo. Noho izany, ny LSN izay misy ny connector amin'izao fotoana izao (ao amin'ny slot replication) dia havaozina tsy tapaka. Izany dia ahafahan'ny DBMS manala ny rakitra WAL izay tsy ilaina intsony. Afaka mianatra bebe kokoa momba ny fomba fiasan'ny safidy ianao tahirin-kevitra.

Safidy iray hafa mendrika hojerena akaiky dia transforms. Na dia miompana kokoa amin'ny fanamorana sy ny hatsarana aza...

Amin'ny alàlan'ny default, Debezium dia mamorona lohahevitra mampiasa ity politikan'ny anarana manaraka ity: serverName.schemaName.tableName. Mety tsy mety foana izany. FANDIKANA transforms Azonao atao ny mampiasa fomba fiteny mahazatra hamaritana lisitr'ireo tabilao, hetsika izay tokony halehana amin'ny lohahevitra misy anarana manokana.

Ao amin'ny configuration anay misaotra transforms izao no mitranga: ny hetsika CDC rehetra avy amin'ny angon-drakitra voara-maso dia handeha amin'ny lohahevitra iray misy ny anarana data.cdc.dbname. Raha tsy izany (tsy misy ireo fanovana ireo), Debezium dia mamorona lohahevitra ho an'ny latabatra tsirairay toy ny: pg-dev.public.<table_name>.

Fepetran'ny mpampitohy

Mba hamaranana ny famaritana ny fanefena connector ho an'ny PostgreSQL dia ilaina ny miresaka momba ireto endri-javatra manaraka ireto / fetran'ny fampandehanana azy:

  1. Ny fiasan'ny mpampitohy ho an'ny PostgreSQL dia miankina amin'ny foto-kevitry ny decoding lojika. Noho izany izy tsy manara-maso ny fangatahana hanovana ny rafitra angona (DDL) - arak'izany dia tsy ho ao anatin'ireo lohahevitra ireo angona ireo.
  2. Koa satria slots replication no ampiasaina, dia azo atao ny mampifandray connector ihany mankany amin'ny ohatra DBMS voalohany.
  3. Raha nomena zo vakiana fotsiny ny mpampiasa izay ampifandraisin'ny mpampitohy amin'ny angon-drakitra, dia alohan'ny fandefasana voalohany dia mila mamorona slot replication ianao ary mamoaka amin'ny angon-drakitra.

Mampihatra ny configuration

Noho izany, andao hampiditra ny config ao amin'ny connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Manamarina izahay fa nahomby ny fampidinana ary nanomboka ny mpampitohy:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Tsara: efa napetraka ary vonona ny handeha. Andao izao mody ho mpanjifa ary hifandray amin'i Kafka, ary avy eo dia hanampy sy hanova ny fidirana amin'ny latabatra:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Ao amin'ny lohahevitray dia toy izao no hiseho:

JSON lava be miaraka amin'ny fiovanay

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Amin'ireo tranga roa ireo, ny firaketana dia ahitana ny fanalahidin'ny rakitra novaina, ary ny tena fototry ny fiovana: inona ilay rakitsoratra taloha ary inona no lasa taorian'izay.

  • Amin'ny tranga INSERT: sanda taloha (before) mitovy null, ary aorian'izany - ny tsipika nampidirina.
  • Amin'ny tranga UPDATE: ao payload.before ny toetry ny tsipika teo aloha dia aseho, ary ao payload.after - vaovao miaraka amin'ny fototry ny fiovana.

2.2 MongoDB

Ity mpampitohy ity dia mampiasa ny rafitra replication MongoDB mahazatra, mamaky fampahalalana avy amin'ny oplog amin'ny node DBMS voalohany.

Mitovy amin'ilay mpampitohy efa nofaritana ho an'ny PgSQL, eto koa, amin'ny fanombohana voalohany, dia alaina ny snapshot data voalohany, ary avy eo ny mpampitohy dia mifindra amin'ny fomba famakiana oplog.

Ohatra config:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Araka ny hitanao dia tsy misy safidy vaovao eto raha oharina amin'ny ohatra teo aloha, fa ny isan'ny safidy tompon'andraikitra amin'ny fampifandraisana amin'ny angon-drakitra sy ny prefixes ihany no nahena.

Fikirana transforms Amin'ity indray mitoraka ity dia manao izao manaraka izao izy ireo: manova ny anaran'ny lohahevitra kendrena amin'ny tetika <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

fandeferana fahadisoana

Ny olan'ny fandeferana ny fahadisoana sy ny fahazoana avo lenta amin'izao androntsika izao dia mahery kokoa noho ny hatramin'izay - indrindra rehefa miresaka momba ny angon-drakitra sy ny fifampiraharahana isika, ary ny fanaraha-maso ny fiovan'ny angon-drakitra dia tsy eny an-dalana amin'ity raharaha ity. Andeha hojerentsika izay mety ho diso amin'ny fitsipika ary inona no hitranga amin'i Debezium amin'ny tranga tsirairay.

Misy safidy telo hialana:

  1. Tsy fahombiazana ny Kafka Connect. Raha namboarina hiasa amin'ny fomba mizarazara ny Connect, dia mitaky mpiasa maromaro izany mba hametraka groupe.id mitovy. Avy eo, raha toa ka tsy mahomby ny iray amin'izy ireo, dia haverina amin'ny mpiasa iray hafa ny mpampitohy ary hanohy ny famakiana avy amin'ny toerana fanolorana farany amin'ny lohahevitra ao amin'ny Kafka.
  2. Very ny fifandraisana amin'ny cluster Kafka. Hajanony fotsiny ny famakian-teny eo amin'ny toerana tsy nandefasana an'i Kafka ny mpampitohy, ary hiezaka ny handefa azy io tsindraindray mandra-pahombiazan'ilay andrana.
  3. Ny tsy fisian'ny loharanom-baovao. Ny mpampitohy dia hiezaka ny hampifandray indray amin'ny loharano araka ny fanamboarana. Ny default dia andrana 16 ampiasaina fihemorana exponential. Aorian'ny andrana faha-16 tsy nahomby dia homarihina ho toy ny tsy nahomby ary mila averinao amin'ny tanana izany amin'ny alàlan'ny interface Kafka Connect REST.
    • Amin'ny tranga PostgreSQL ny angona tsy ho very, satria Ny fampiasana slot replication dia hisakana ny famafana ny rakitra WAL izay tsy vakian'ny mpampitohy. Amin'ity tranga ity, misy ny lafy ratsiny: raha tapaka ny fifandraisana amin'ny tambajotra eo amin'ny connector sy ny DBMS mandritra ny fotoana maharitra, dia mety ho lany ny habaka kapila, ary mety hitarika ho amin'ny tsy fahombiazan'ny DBMS manontolo izany.
    • Amin'ny tranga MySQL Ny rakitra binlog dia azo odidin'ny DBMS mihitsy alohan'ny hamerenana ny fifandraisana. Izany dia hahatonga ny mpampitohy hiditra amin'ny toe-javatra tsy nahomby, ary mba hamerenana amin'ny laoniny ny fiasa ara-dalàna, dia mila averinao indray amin'ny maodely snapshot voalohany hanohizana ny famakiana ny binlogs.
    • amin'ny MongoDB. Ny antontan-taratasy dia milaza fa ny fitondran-tenan'ny mpampitohy raha toa ka voafafa ny rakitra log/oplog ary ny mpampitohy dia tsy afaka manohy mamaky ny toerana nialany dia mitovy amin'ny DBMS rehetra. Midika izany fa ny connector dia ho any amin'ny fanjakana tsy nahomby ary mila averina amin'ny fomba sary voalohany.

      Na izany aza, misy maningana. Raha tapaka nandritra ny fotoana ela ny connector (na tsy tonga tany amin'ny ohatra MongoDB), ary ny oplog dia nandalo fihodinana nandritra izany fotoana izany, dia rehefa tafaverina ny fifandraisana dia hanohy hamaky ny angona avy amin'ny toerana voalohany misy ny connecteur, izay no mahatonga ny sasany amin'ireo angona ao amin'ny Kafka tsy hamely.

famaranana

Debezium no traikefako voalohany tamin'ny rafitra CDC ary tena tsara amin'ny ankapobeny. Nandresy ny tetikasa tamin'ny fanohanany ireo DBMS lehibe, ny fanamorana ny fanamafisana, ny fanohanana ny clustering, ary ny fiaraha-monina mavitrika. Ho an'ireo liana amin'ny fampiharana dia manoro hevitra anao aho hamaky ny torolàlana momba ny Kafka Connect и Debesium.

Raha ampitahaina amin'ny mpampitohy JDBC ho an'ny Kafka Connect, ny tombony lehibe indrindra amin'ny Debezium dia ny famakiana ny fanovana avy amin'ny logs DBMS, izay ahafahan'ny angon-drakitra azo raisina miaraka amin'ny latency kely indrindra. Ny JDBC Connector (avy amin'ny Kafka Connect) dia manontany ny latabatra voaara-maso amin'ny elanelana voafaritra ary (noho izany antony izany ihany) dia tsy mamoaka hafatra rehefa voafafa ny angona (ahoana no ahafahanao manontany angona tsy misy?).

Mba hamahana olana mitovy amin'izany dia azonao atao ny mandinika ireto vahaolana manaraka ireto (ankoatra ny Debezium):

Sal

Vakio ihany koa ao amin'ny bilaoginay:

Source: www.habr.com

Add a comment