Ipinapakilala ang Debezium - CDC para sa Apache Kafka

Ipinapakilala ang Debezium - CDC para sa Apache Kafka

Sa aking trabaho, madalas akong nakatagpo ng mga bagong teknikal na solusyon / produkto ng software, impormasyon tungkol sa kung saan ay medyo mahirap makuha sa Internet na nagsasalita ng Ruso. Sa artikulong ito, susubukan kong punan ang isang ganoong puwang ng isang halimbawa mula sa aking kamakailang pagsasanay, noong kailangan kong i-set up ang pagpapadala ng mga kaganapan sa CDC mula sa dalawang sikat na DBMS (PostgreSQL at MongoDB) sa isang Kafka cluster gamit ang Debezium. Umaasa ako na ang artikulo sa pagsusuri na ito, na lumitaw bilang isang resulta ng gawaing ginawa, ay magiging kapaki-pakinabang sa iba.

Ano ang Debezium at CDC sa pangkalahatan?

Debezium - Kinatawan ng kategorya ng software ng CDC (Kunin ang pagbabago ng data), o mas tiyak, isa itong hanay ng mga konektor para sa iba't ibang DBMS na katugma sa balangkas ng Apache Kafka Connect.

Ito open source na proyekto, lisensyado sa ilalim ng Apache License v2.0 at itinataguyod ng Red Hat. Ang pag-unlad ay isinasagawa mula noong 2016 at sa ngayon ay nagbibigay ito ng opisyal na suporta para sa sumusunod na DBMS: MySQL, PostgreSQL, MongoDB, SQL Server. Mayroon ding mga konektor para sa Cassandra at Oracle, ngunit sa ngayon ay nasa "maagang pag-access" ang mga ito, at hindi ginagarantiyahan ng mga bagong release ang backward compatibility.

Kung ihahambing natin ang CDC sa tradisyunal na diskarte (kapag ang application ay direktang nagbabasa ng data mula sa DBMS), kung gayon ang mga pangunahing bentahe nito ay kinabibilangan ng pagpapatupad ng data change streaming sa row level na may mababang latency, mataas na pagiging maaasahan at availability. Ang huling dalawang puntos ay nakakamit sa pamamagitan ng paggamit ng isang Kafka cluster bilang isang repositoryo para sa mga kaganapan sa CDC.

Gayundin, ang mga pakinabang ay kinabibilangan ng katotohanan na ang isang solong modelo ay ginagamit upang mag-imbak ng mga kaganapan, kaya ang panghuling aplikasyon ay hindi kailangang mag-alala tungkol sa mga nuances ng pagpapatakbo ng iba't ibang DBMS.

Sa wakas, ang paggamit ng isang message broker ay nagbubukas ng saklaw para sa mga scale-out na application na sumusubaybay sa mga pagbabago sa data. Kasabay nito, ang epekto sa pinagmumulan ng data ay pinaliit, dahil ang data ay natanggap hindi direkta mula sa DBMS, ngunit mula sa Kafka cluster.

Tungkol sa arkitektura ng Debezium

Ang paggamit ng Debezium ay bumaba sa simpleng pamamaraan na ito:

DBMS (bilang data source) β†’ connector sa Kafka Connect β†’ Apache Kafka β†’ consumer

Bilang isang paglalarawan, magbibigay ako ng isang diagram mula sa website ng proyekto:

Ipinapakilala ang Debezium - CDC para sa Apache Kafka

Gayunpaman, hindi ko talaga gusto ang pamamaraang ito, dahil tila isang konektor ng lababo lamang ang posible.

Sa totoo lang, iba ang sitwasyon: punan ang iyong Data Lake (huling link sa diagram sa itaas) ay hindi lamang ang paraan upang gamitin ang Debezium. Maaaring gamitin ng iyong mga application ang mga kaganapang ipinadala sa Apache Kafka upang malutas ang iba't ibang sitwasyon. Halimbawa:

  • pag-alis ng hindi nauugnay na data mula sa cache;
  • pagpapadala ng mga abiso;
  • mga update sa index ng paghahanap;
  • ilang uri ng mga audit log;
  • ...

Kung sakaling mayroon kang Java application at hindi na kailangan/posibilidad na gumamit ng Kafka cluster, mayroon ding posibilidad na magtrabaho naka-embed na connector. Ang halatang plus ay na kasama nito maaari mong tanggihan ang karagdagang imprastraktura (sa anyo ng isang connector at Kafka). Gayunpaman, ang solusyon na ito ay hindi na ginagamit mula noong bersyon 1.1 at hindi na inirerekomenda para sa paggamit (maaari itong alisin sa mga susunod na release).

Tatalakayin ng artikulong ito ang arkitektura na inirerekomenda ng mga developer, na nagbibigay ng fault tolerance at scalability.

Configuration ng connector

Upang simulan ang pagsubaybay sa mga pagbabago sa pinakamahalagang halaga - data - kailangan namin:

  1. data source, na maaaring MySQL simula sa bersyon 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (buong listahan);
  2. Kumpol ng Apache Kafka
  3. Halimbawa ng Kafka Connect (mga bersyon 1.x, 2.x);
  4. naka-configure na Debezium connector.

Magtrabaho sa unang dalawang punto, i.e. ang proseso ng pag-install ng DBMS at Apache Kafka ay lampas sa saklaw ng artikulo. Gayunpaman, para sa mga gustong i-deploy ang lahat sa isang sandbox, mayroong isang handa sa opisyal na imbakan na may mga halimbawa docker-compose.yaml.

Tutuon tayo sa huling dalawang punto nang mas detalyado.

0. Kafka Connect

Dito at sa ibang pagkakataon sa artikulo, ang lahat ng mga halimbawa ng pagsasaayos ay isinasaalang-alang sa konteksto ng imahe ng Docker na ipinamahagi ng mga developer ng Debezium. Naglalaman ito ng lahat ng kinakailangang mga file ng plugin (mga konektor) at nagbibigay ng configuration ng Kafka Connect gamit ang mga variable ng kapaligiran.

Kung balak mong gamitin ang Kafka Connect mula sa Confluent, kakailanganin mong idagdag ang mga plugin ng mga kinakailangang konektor mismo sa direktoryo na tinukoy sa plugin.path o itinakda sa pamamagitan ng isang variable ng kapaligiran CLASSPATH. Ang mga setting para sa manggagawa ng Kafka Connect at mga konektor ay tinukoy sa pamamagitan ng mga file ng pagsasaayos na ipinasa bilang mga argumento sa command ng pagsisimula ng manggagawa. Para sa mga detalye tingnan dokumentasyon.

Ang buong proseso ng pag-set up ng Debeizum sa bersyon ng connector ay isinasagawa sa dalawang yugto. Isaalang-alang natin ang bawat isa sa kanila:

1. Pagse-set up ng framework ng Kafka Connect

Upang mag-stream ng data sa isang Apache Kafka cluster, itinakda ang mga partikular na parameter sa framework ng Kafka Connect, gaya ng:

  • mga setting ng koneksyon sa kumpol,
  • mga pangalan ng mga paksa kung saan maiimbak ang configuration ng connector mismo,
  • ang pangalan ng pangkat kung saan tumatakbo ang connector (sa kaso ng paggamit ng distributed mode).

Ang opisyal na imahe ng Docker ng proyekto ay sumusuporta sa pagsasaayos gamit ang mga variable ng kapaligiran - ito ang aming gagamitin. Kaya't i-download natin ang larawan:

docker pull debezium/connect

Ang minimum na hanay ng mga variable ng kapaligiran na kinakailangan upang patakbuhin ang connector ay ang mga sumusunod:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - paunang listahan ng mga server ng cluster ng Kafka upang makakuha ng kumpletong listahan ng mga miyembro ng cluster;
  • OFFSET_STORAGE_TOPIC=connector-offsets β€” isang paksa para sa pag-iimbak ng mga posisyon kung saan kasalukuyang matatagpuan ang connector;
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - isang paksa para sa pag-iimbak ng katayuan ng connector at mga gawain nito;
  • CONFIG_STORAGE_TOPIC=connector-config - isang paksa para sa pag-iimbak ng data ng configuration ng connector at mga gawain nito;
  • GROUP_ID=1 β€” identifier ng pangkat ng mga manggagawa kung saan maaaring maisagawa ang gawain ng connector; kinakailangan kapag gumagamit ng ipinamamahagi (ibinahagi) rehimen

Sinisimulan namin ang lalagyan gamit ang mga variable na ito:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Paalala tungkol sa Avro

Bilang default, ang Debezium ay nagsusulat ng data sa JSON na format, na katanggap-tanggap para sa mga sandbox at maliit na halaga ng data, ngunit maaaring maging problema sa mga database na maraming load. Ang isang alternatibo sa JSON converter ay ang pag-serialize ng mga mensahe gamit euro sa isang binary na format, na binabawasan ang pagkarga sa I / O subsystem sa Apache Kafka.

Upang magamit ang Avro, kailangan mong mag-deploy ng hiwalay schema-registry (para sa pag-iimbak ng mga schema). Ang mga variable para sa converter ay magiging ganito:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Ang mga detalye sa paggamit ng Avro at pag-set up ng isang registry para dito ay lampas sa saklaw ng artikulo - higit pa, para sa kalinawan, gagamitin namin ang JSON.

2. Pagse-set up ng connector mismo

Ngayon ay maaari kang direktang pumunta sa pagsasaayos ng connector mismo, na magbabasa ng data mula sa pinagmulan.

Tingnan natin ang halimbawa ng mga konektor para sa dalawang DBMS: PostgreSQL at MongoDB, kung saan mayroon akong karanasan at kung saan may mga pagkakaiba (kahit maliit, ngunit sa ilang mga kaso ay makabuluhan!).

Inilalarawan ang configuration sa JSON notation at na-upload sa Kafka Connect gamit ang isang POST request.

2.1. PostgreSQL

Halimbawang configuration ng connector para sa PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

Ang prinsipyo ng pagpapatakbo ng connector pagkatapos ng pagsasaayos na ito ay medyo simple:

  • Sa unang pagsisimula, kumokonekta ito sa database na tinukoy sa pagsasaayos at magsisimula sa mode paunang snapshot, na nagpapadala sa Kafka ng paunang hanay ng data na natanggap na may kondisyon SELECT * FROM table_name.
  • Matapos makumpleto ang pagsisimula, papasok ang connector sa mode ng pagbabasa ng mga pagbabago mula sa mga PostgreSQL WAL file.

Tungkol sa mga opsyon na ginamit:

  • name β€” ang pangalan ng connector kung saan ginagamit ang configuration na inilarawan sa ibaba; sa hinaharap, ang pangalang ito ay ginagamit upang gumana sa connector (ibig sabihin, tingnan ang status / i-restart / i-update ang configuration) sa pamamagitan ng Kafka Connect REST API;
  • connector.class β€” ang klase ng DBMS connector na gagamitin ng naka-configure na connector;
  • plugin.name ay ang pangalan ng plugin para sa lohikal na pag-decode ng data mula sa mga WAL file. Magagamit na mapagpipilian wal2json, decoderbuffs ΠΈ pgoutput. Ang unang dalawa ay nangangailangan ng pag-install ng naaangkop na mga extension sa DBMS, at pgoutput para sa PostgreSQL na bersyon 10 at mas mataas ay hindi nangangailangan ng mga karagdagang manipulasyon;
  • database.* β€” mga opsyon para sa pagkonekta sa database, kung saan database.server.name - ang pangalan ng PostgreSQL instance na ginamit upang mabuo ang pangalan ng paksa sa Kafka cluster;
  • table.include.list - isang listahan ng mga talahanayan kung saan nais naming subaybayan ang mga pagbabago; ibinigay sa format schema.table_name; hindi maaaring gamitin kasama ng table.exclude.list;
  • heartbeat.interval.ms β€” interval (sa millisecond) kung saan ang connector ay nagpapadala ng mga mensahe ng heartbeat sa isang espesyal na paksa;
  • heartbeat.action.query - isang kahilingan na isasagawa kapag ipinapadala ang bawat mensahe ng tibok ng puso (ang opsyon ay lumitaw mula noong bersyon 1.1);
  • slot.name β€” ang pangalan ng replication slot na gagamitin ng connector;
  • publication.name - Pangalan Publikasyon sa PostgreSQL na ginagamit ng connector. Kung sakaling wala ito, susubukan ni Debezium na likhain ito. Kung ang user kung saan ginawa ang koneksyon ay walang sapat na karapatan para sa pagkilos na ito, lalabas ang connector nang may error;
  • transforms tinutukoy kung paano eksaktong baguhin ang pangalan ng target na paksa:
    • transforms.AddPrefix.type ay nagpapahiwatig na gagamit tayo ng mga regular na expression;
    • transforms.AddPrefix.regex β€” mask kung saan ang pangalan ng target na paksa ay muling tinukoy;
    • transforms.AddPrefix.replacement - direkta kung ano ang aming muling tinukoy.

Higit pa tungkol sa tibok ng puso at pagbabago

Bilang default, ang connector ay nagpapadala ng data sa Kafka para sa bawat ginawang transaksyon, at isinusulat ang LSN nito (Log Sequence Number) sa paksa ng serbisyo offset. Ngunit ano ang mangyayari kung ang connector ay na-configure upang basahin hindi ang buong database, ngunit bahagi lamang ng mga talahanayan nito (kung saan ang data ay madalang na na-update)?

  • Ang connector ay magbabasa ng mga WAL file at hindi makakakita ng mga transaksyon na ginagawa sa mga ito sa mga talahanayan na sinusubaybayan nito.
  • Samakatuwid, hindi nito ia-update ang kasalukuyang posisyon nito alinman sa paksa o sa puwang ng pagtitiklop.
  • Ito, sa turn, ay magiging sanhi ng mga WAL file na "naipit" sa disk at malamang na maubusan ng espasyo sa disk.

At narito ang mga pagpipilian upang iligtas. heartbeat.interval.ms ΠΈ heartbeat.action.query. Ang paggamit ng mga opsyong ito nang magkapares ay ginagawang posible na magsagawa ng kahilingan na baguhin ang data sa isang hiwalay na talahanayan sa tuwing may ipapadalang mensahe ng tibok ng puso. Kaya, ang LSN kung saan ang connector ay kasalukuyang matatagpuan (sa replication slot) ay patuloy na ina-update. Nagbibigay-daan ito sa DBMS na alisin ang mga WAL file na hindi na kailangan. Para sa higit pang impormasyon sa kung paano gumagana ang mga opsyon, tingnan ang dokumentasyon.

Ang isa pang pagpipilian na karapat-dapat ng mas malapit na pansin ay transforms. Kahit na ito ay higit pa tungkol sa kaginhawahan at kagandahan ...

Bilang default, gumagawa ang Debezium ng mga paksa gamit ang sumusunod na patakaran sa pagbibigay ng pangalan: serverName.schemaName.tableName. Maaaring hindi ito palaging maginhawa. Mga pagpipilian transforms gamit ang mga regular na expression, maaari mong tukuyin ang isang listahan ng mga talahanayan na ang mga kaganapan ay kailangang iruta sa isang paksa na may partikular na pangalan.

Sa aming pagsasaayos salamat sa transforms mangyayari ang sumusunod: lahat ng mga kaganapan sa CDC mula sa sinusubaybayang database ay mapupunta sa paksang may pangalan data.cdc.dbname. Kung hindi (nang wala ang mga setting na ito), ang Debezium ay sa pamamagitan ng default ay gagawa ng paksa para sa bawat talahanayan ng form: pg-dev.public.<table_name>.

Mga limitasyon ng connector

Sa pagtatapos ng paglalarawan ng pagsasaayos ng connector para sa PostgreSQL, sulit na pag-usapan ang mga sumusunod na tampok / limitasyon ng trabaho nito:

  1. Ang connector functionality para sa PostgreSQL ay umaasa sa konsepto ng logical decoding. Samakatuwid siya hindi sinusubaybayan ang mga kahilingan upang baguhin ang istraktura ng database (DDL) - ayon dito, ang data na ito ay wala sa mga paksa.
  2. Dahil ginagamit ang mga replication slot, posible ang koneksyon ng connector lamang sa master DBMS instance.
  3. Kung ang user kung saan kumokonekta ang connector sa database ay may mga read-only na karapatan, bago ang unang paglunsad, kakailanganin mong manu-manong lumikha ng replication slot at mag-publish sa database.

Paglalapat ng Configuration

Kaya't i-load natin ang ating configuration sa connector:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

Sinusuri namin kung matagumpay ang pag-download at nagsimula ang connector:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

Mahusay: naka-set up na ito at handa nang umalis. Ngayon ay magpanggap tayo bilang isang mamimili at kumonekta sa Kafka, pagkatapos nito ay nagdaragdag at nagpalit tayo ng isang entry sa talahanayan:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

Sa aming paksa, ito ay ipapakita tulad ng sumusunod:

Napakahabang JSON sa aming mga pagbabago

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

Sa parehong mga kaso, ang mga talaan ay binubuo ng susi (PK) ng rekord na binago, at ang pinakadiwa ng mga pagbabago: kung ano ang rekord noon at kung ano ang naging pagkatapos nito.

  • Sa kaso ng INSERT: halaga bago (before) katumbas nullsinundan ng string na ipinasok.
  • Sa kaso ng UPDATE: sa payload.before ang nakaraang estado ng row ay ipinapakita, at sa payload.after - bago na may diwa ng pagbabago.

2.2 MongoDB

Ginagamit ng connector na ito ang karaniwang mekanismo ng pagtitiklop ng MongoDB, na nagbabasa ng impormasyon mula sa oplog ng pangunahing node ng DBMS.

Katulad ng inilarawan nang connector para sa PgSQL, dito rin, sa unang pagsisimula, ang pangunahing snapshot ng data ay kinukuha, pagkatapos ay lumipat ang connector sa oplog reading mode.

Halimbawa ng configuration:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

Tulad ng nakikita mo, walang mga bagong opsyon kumpara sa nakaraang halimbawa, ngunit ang bilang lamang ng mga opsyon na responsable para sa pagkonekta sa database at ang kanilang mga prefix ay nabawasan.

Mga Setting transforms sa pagkakataong ito ay ginagawa nila ang sumusunod: i-turn ang pangalan ng target na paksa mula sa scheme <server_name>.<db_name>.<collection_name> Π² data.cdc.mongo_<db_name>.

pagpaparaya sa kasalanan

Ang isyu ng fault tolerance at mataas na kakayahang magamit sa ating panahon ay mas talamak kaysa dati - lalo na kapag pinag-uusapan natin ang tungkol sa data at mga transaksyon, at ang pagsubaybay sa pagbabago ng data ay wala sa sideline sa bagay na ito. Tingnan natin kung ano ang maaaring magkamali sa prinsipyo at kung ano ang mangyayari kay Debezium sa bawat kaso.

May tatlong opsyon sa pag-opt out:

  1. Nabigo ang Kafka Connect. Kung naka-configure ang Connect upang gumana sa distributed mode, nangangailangan ito ng maraming manggagawa upang itakda ang parehong group.id. Pagkatapos, kung mabigo ang isa sa mga ito, ire-restart ang connector sa ibang manggagawa at magpapatuloy sa pagbabasa mula sa huling nakatalagang posisyon sa paksa sa Kafka.
  2. Nawalan ng koneksyon sa Kafka cluster. Ang connector ay hihinto lamang sa pagbabasa sa posisyon na nabigo itong ipadala sa Kafka at pana-panahong subukang ipadala itong muli hanggang sa magtagumpay ang pagtatangka.
  3. Hindi available ang data source. Susubukan ng connector na muling kumonekta sa pinagmulan ayon sa configuration. Ang default ay 16 na pagtatangka sa paggamit exponential backoff. Pagkatapos ng ika-16 na nabigong pagtatangka, ang gawain ay mamarkahan bilang Nabigo ang at kakailanganin itong manu-manong i-restart sa pamamagitan ng interface ng Kafka Connect REST.
    • Sa kaso ng PostgreSQL hindi mawawala ang data, dahil ang paggamit ng mga replication slot ay mapipigilan ang pagtanggal ng mga WAL file na hindi nabasa ng connector. Sa kasong ito, mayroong isang downside: kung ang pagkakakonekta ng network sa pagitan ng connector at ng DBMS ay nagambala nang mahabang panahon, mayroong isang pagkakataon na ang puwang ng disk ay maubusan, at ito ay maaaring humantong sa pagkabigo ng buong DBMS.
    • Sa kaso ng MySQL Ang mga binlog file ay maaaring paikutin ng DBMS mismo bago maibalik ang pagkakakonekta. Ito ay magiging sanhi ng connector upang mapunta sa nabigong estado, at upang maibalik ang normal na operasyon, kakailanganin itong i-restart sa paunang snapshot mode upang magpatuloy sa pagbabasa mula sa mga binlog.
    • Tungkol sa MongoDB. Sinasabi ng dokumentasyon: ang pag-uugali ng connector kung sakaling ang mga log/oplog file ay natanggal at ang connector ay hindi maaaring magpatuloy sa pagbabasa mula sa posisyon kung saan ito tumigil ay pareho para sa lahat ng DBMS. Ito ay nakasalalay sa katotohanan na ang connector ay pupunta sa estado Nabigo ang at mangangailangan ng pag-restart sa mode paunang snapshot.

      Gayunpaman, may mga pagbubukod. Kung ang connector ay nasa isang disconnected state sa loob ng mahabang panahon (o hindi maabot ang MongoDB instance), at ang oplog ay pinaikot sa panahong ito, at kapag ang koneksyon ay naibalik, ang connector ay kalmadong magpapatuloy sa pagbabasa ng data mula sa unang available na posisyon. , kaya naman ang ilan sa data sa Kafka hindi tatamaan.

Konklusyon

Ang Debezium ang aking unang karanasan sa mga sistema ng CDC at naging napakapositibo sa pangkalahatan. Sinuhulan ng proyekto ang suporta ng pangunahing DBMS, kadalian ng pagsasaayos, suporta para sa clustering at isang aktibong komunidad. Para sa mga interesado sa pagsasanay, inirerekumenda ko na basahin mo ang mga gabay para sa Kafka Connect ΠΈ Debezium.

Kung ikukumpara sa JDBC connector para sa Kafka Connect, ang pangunahing bentahe ng Debezium ay binabasa ang mga pagbabago mula sa mga log ng DBMS, na nagpapahintulot sa data na matanggap nang may kaunting pagkaantala. Ang JDBC Connector (na ibinigay ng Kafka Connect) ay nagtatanong sa sinusubaybayang talahanayan sa isang nakapirming agwat at (para sa parehong dahilan) ay hindi bumubuo ng mga mensahe kapag ang data ay tinanggal (paano ka makakapag-query para sa data na wala doon?).

Upang malutas ang mga katulad na problema, maaari mong bigyang-pansin ang mga sumusunod na solusyon (bilang karagdagan sa Debezium):

PS

Basahin din sa aming blog:

Pinagmulan: www.habr.com

Magdagdag ng komento