د اپاچي کافکا لپاره دبیزیم - CDC معرفي کول

د اپاچي کافکا لپاره دبیزیم - CDC معرفي کول

زما په کار کې، زه ډیری وختونه د نوي تخنیکي حلونو / سافټویر محصولاتو سره مخ کیږم، چې په اړه یې معلومات د روسیې په انټرنیټ کې خورا کم دي. د دې مقالې سره ، زه به هڅه وکړم چې زما د وروستي تمرین څخه د مثال سره ورته یوه تشه ډکه کړم ، کله چې ما اړتیا درلوده د دوه مشهور DBMSs (PostgreSQL او MongoDB) څخه د کافکا کلستر ته د Debezium په کارولو سره د CDC پیښو لیږلو ترتیب کړم. زه امید لرم چې د دې بیاکتنې مقاله، چې د ترسره شوي کار په پایله کې راښکاره شوه، د نورو لپاره به ګټور وي.

Debezium او CDC په عموم کې څه شی دی؟

ډیبیزیم - د CDC سافټویر کټګورۍ استازی (د معلوماتو بدلون نیول)، یا په دقیق ډول، دا د مختلفو DBMSs لپاره د نښلونکو سیټ دی چې د اپاچی کافکا کنیکټ چوکاټ سره مطابقت لري.

د دې د خلاصې سرچینې پروژه، د اپاچی لایسنس v2.0 لاندې جواز شوی او د Red Hat لخوا سپانسر شوی. پراختیا د 2016 راهیسې روانه ده او دا مهال دا د لاندې DBMS لپاره رسمي ملاتړ چمتو کوي: MySQL، PostgreSQL، MongoDB، SQL Server. د کاسندرا او اوریکل لپاره هم نښلونکي شتون لري ، مګر دا اوس د "لومړني لاسرسي" حالت کې دي ، او نوي ریلیزونه د شاته مطابقت تضمین نه کوي.

که موږ CDC د دودیز چلند سره پرتله کړو (کله چې غوښتنلیک مستقیم د DBMS څخه ډیټا لوستل کیږي) ، نو د دې اصلي ګټو کې د ټیټ ځنډ ، لوړ اعتبار او شتون سره د قطار په کچه د ډیټا بدلون سټرینګ پلي کول شامل دي. وروستي دوه ټکي د CDC پیښو لپاره د ذخیره کولو په توګه د کافکا کلستر په کارولو سره ترلاسه کیږي.

همچنان ، په ګټو کې دا حقیقت شامل دی چې یو واحد ماډل د پیښو ذخیره کولو لپاره کارول کیږي ، نو وروستی غوښتنلیک اړتیا نلري د مختلف DBMS عملیات کولو باریکیو په اړه اندیښنه ولري.

په نهایت کې ، د پیغام بروکر کارول د غوښتنلیکونو افقی اندازه کولو لپاره فرصت خلاصوي چې په ډیټا کې بدلونونه تعقیبوي. په ورته وخت کې، د معلوماتو سرچینې باندې اغیزه کمه شوې، ځکه چې ډاټا په مستقیم ډول د DBMS څخه نه، مګر د کافکا کلستر څخه ترلاسه کیږي.

د Debezium معمارۍ په اړه

د Debezium کارول دې ساده سکیم ته راځي:

DBMS (د معلوماتو سرچینې په توګه) → په کافکا نښلونکی کې نښلونکی

د مثال په توګه، زه به د پروژې ویب پاڼې څخه یو انځور ورکړم:

د اپاچي کافکا لپاره دبیزیم - CDC معرفي کول

په هرصورت، زه واقعیا دا سکیم نه خوښوم، ځکه چې داسې ښکاري چې یوازې د سنک نښلونکی ممکن دی.

په واقعیت کې، وضعیت توپیر لري: ستاسو د ډیټا لیک ډکول (په پورتني انځور کې وروستی لینک) د Debezium کارولو یوازینۍ لار نه ده. اپاچی کافکا ته لیږل شوي پیښې ستاسو د غوښتنلیکونو لخوا د مختلف حالتونو سره معامله کولو لپاره کارول کیدی شي. د مثال په ډول:

  • له زیرمې څخه د غیر اړونده معلوماتو لرې کول؛
  • د خبرتیا لیږل؛
  • د لټون شاخص تازه کول؛
  • د پلټنې ځینې ډولونه؛
  • ...

په هغه صورت کې چې تاسو د جاوا غوښتنلیک لرئ او د کافکا کلستر کارولو ته اړتیا/امکان شتون نلري، د کار کولو امکان هم شتون لري. سرایت شوی نښلونکی. ښکاره پلس دا دی چې د دې سره تاسو کولی شئ اضافي زیربنا (د نښلونکي او کافکا په بڼه) رد کړئ. په هرصورت، دا حل د 1.1 نسخه راهیسې له مینځه وړل شوی او نور د کارولو لپاره سپارښتنه نه کیږي (دا ممکن په راتلونکو خپرونو کې لیرې شي).

دا مقاله به د پراختیا کونکو لخوا وړاندیز شوي جوړښت په اړه بحث وکړي ، کوم چې د خطا زغم او توزیع چمتو کوي.

د نښلونکي ترتیب

په خورا مهم ارزښت کې د بدلونونو تعقیب پیل کولو لپاره - ډیټا - موږ اړتیا لرو:

  1. د معلوماتو سرچینه، کوم چې کیدای شي MySQL وي چې د 5.7 نسخه څخه پیل کیږي، PostgreSQL 9.6+، MongoDB 3.2+ (بشپړ لیست);
  2. د اپاچی کافکا کلستر
  3. د کافکا ارتباط مثال (نسخه 1.x، 2.x)؛
  4. د Debezium نښلونکی ترتیب شوی.

په لومړیو دوو ټکو کار وکړئ، یعنی د DBMS او اپاچی کافکا نصبولو پروسه د مقالې له دائرې څخه بهر ده. په هرصورت، د هغو کسانو لپاره چې غواړي هرڅه په شګو بکس کې ځای په ځای کړي، د مثالونو سره په رسمي ذخیره کې یو چمتو شوی دی. docker-compose.yaml.

موږ به په وروستي دوه ټکو باندې په ډیر تفصیل سره تمرکز وکړو.

0. کافکا نښلول

دلته او وروسته په مقاله کې ، د ترتیب ټول مثالونه د ډیبیزیم پراختیا کونکو لخوا توزیع شوي د ډاکر عکس په شرایطو کې په پام کې نیول شوي. دا ټول اړین پلگ ان فایلونه (وصل کونکي) لري او د چاپیریال متغیرونو په کارولو سره د کافکا کنیکټ ترتیب چمتو کوي.

که تاسو د کنفلوینټ څخه د کافکا کنیکټ کارولو اراده لرئ ، نو تاسو به اړتیا ولرئ د اړینو نښلونکو پلگ انونه پخپله په لارښود کې مشخص شوي ته اضافه کړئ. plugin.path یا د چاپیریال متغیر له لارې تنظیم کړئ CLASSPATH. د کافکا کنیکټ کارګر او نښلونکو لپاره تنظیمات د تشکیلاتو فایلونو له لارې تعریف شوي چې د کارګر پیل کمانډ ته د دلیلونو په توګه لیږدول کیږي. د جزیاتو لپاره وګورئ اسناد.

د نښلونکي نسخه کې د Debeizum تنظیم کولو ټوله پروسه په دوه مرحلو کې ترسره کیږي. راځئ چې هر یو یې په پام کې ونیسو:

1. د کافکا ارتباط چوکاټ ترتیب کول

د اپاچي کافکا کلستر ته د ډیټا جریان کولو لپاره ، د کافکا کنیکټ چوکاټ کې ځانګړي پیرامیټونه تنظیم شوي ، لکه:

  • د کلستر پیوستون تنظیمات،
  • د موضوعاتو نومونه په کوم کې چې د نښلونکي ترتیب به پخپله ذخیره شي،
  • د ګروپ نوم په کوم کې چې نښلونکی روان دی (د توزیع شوي حالت کارولو په صورت کې).

د پروژې رسمي ډاکر عکس د چاپیریال متغیرونو په کارولو سره ترتیب کولو ملاتړ کوي - دا هغه څه دي چې موږ به یې وکاروو. نو راځئ چې انځور ډاونلوډ کړو:

docker pull debezium/connect

د نښلونکي چلولو لپاره د چاپیریال متغیرونو لږترلږه سیټ په لاندې ډول دی:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - د کلستر غړو بشپړ لیست ترلاسه کولو لپاره د کافکا کلستر سرورونو لومړني لیست؛
  • OFFSET_STORAGE_TOPIC=connector-offsets - د پوستونو ذخیره کولو لپاره یوه موضوع چیرې چې نښلونکی اوس مهال موقعیت لري؛
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - د نښلونکي حالت او د هغې دندو ذخیره کولو موضوع؛
  • CONFIG_STORAGE_TOPIC=connector-config - د نښلونکي ترتیب کولو ډیټا او د دې دندو ذخیره کولو موضوع؛
  • GROUP_ID=1 - د کارګرانو د ډلې پیژندونکی په کوم کې چې د نښلونکي دنده اجرا کیدی شي؛ د توزیع کارول اړین دي (توزیع شوی) رژیم

موږ کانټینر د دې متغیرونو سره پیل کوو:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

د Avro په اړه یادونه

په ډیفالټ ، ډیبیزیم د JSON ب formatه کې ډاټا لیکي ، کوم چې د شګو بکسونو او لږ مقدار ډیټا لپاره د منلو وړ دي ، مګر په ډیری بار شوي ډیټابیسونو کې ستونزه کیدی شي. د JSON کنورټر لپاره بدیل د دې په کارولو سره د پیغامونو لړۍ کول دي Avro د بائنری شکل ته، کوم چې په اپاچی کافکا کې د I/O فرعي سیسټم بار کموي.

د Avro کارولو لپاره، تاسو اړتیا لرئ یو جلا ځای پرځای کړئ سکیما-رجسټری (د سکیمونو ذخیره کولو لپاره). د کنورټر لپاره متغیرونه به داسې ښکاري:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

د Avro کارولو او د دې لپاره د راجسټری تنظیم کولو توضیحات د مقالې له ساحې بهر دي - نور ، د وضاحت لپاره ، موږ به JSON وکاروو.

2. پخپله نښلونکی تنظیم کول

اوس تاسو کولی شئ په مستقیم ډول د نښلونکي ترتیب ته لاړ شئ، کوم چې به د سرچینې څخه ډاټا ولولي.

راځئ چې د دوه DBMS لپاره د نښلونکو مثال وګورو: PostgreSQL او MongoDB، د کوم لپاره چې زه تجربه لرم او د کوم لپاره چې توپیرونه شتون لري (که څه هم کوچني، مګر په ځینو مواردو کې مهم دي!).

ترتیب د JSON په یادښت کې بیان شوی او د POST غوښتنې په کارولو سره کافکا کنیکټ ته اپلوډ شوی.

2.1. PostgreSQL

د PostgreSQL لپاره د نښلونکي ترتیب مثال:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

د دې ترتیب وروسته د نښلونکي عملیات اصول خورا ساده دي:

  • په لومړي پیل کې ، دا په ترتیب کې مشخص شوي ډیټابیس سره وصل کیږي او په حالت کې پیل کیږي لومړنی عکسکافکا ته د شرطونو سره د ترلاسه شوي معلوماتو لومړنۍ سیټ لیږل SELECT * FROM table_name.
  • د پیل کولو بشپړیدو وروسته، نښلونکی د PostgreSQL WAL فایلونو څخه د لوستلو بدلونونو حالت ته ننوځي.

د کارول شویو اختیارونو په اړه:

  • name - د نښلونکي نوم چې لاندې تشریح شوي ترتیب کارول کیږي؛ په راتلونکي کې، دا نوم د کافکا کنیکټ REST API له لارې د نښلونکي سره کار کولو لپاره کارول کیږي (د بیلګې په توګه وضعیت وګورئ / بیا پیل کړئ / تشکیلات تازه کړئ)؛
  • connector.class - د DBMS نښلونکي ټولګي چې د ترتیب شوي نښلونکي لخوا کارول کیږي؛
  • plugin.name د WAL فایلونو څخه د ډیټا منطقي کوډ کولو لپاره د پلگ ان نوم دی. د انتخاب کولو لپاره شتون لري wal2json, decoderbuffs и pgoutput. لومړی دوه په DBMS کې د مناسبو توسیعونو نصبولو ته اړتیا لري، او pgoutput د PostgreSQL نسخه 10 او لوړ لپاره اضافي لاسوهنې ته اړتیا نلري؛
  • database.* - د ډیټابیس سره د نښلولو اختیارونه، چیرته database.server.name - د PostgreSQL مثال نوم چې د کافکا کلستر کې د موضوع نوم جوړولو لپاره کارول کیږي؛
  • table.include.list - د جدولونو لیست په کوم کې چې موږ غواړو بدلونونه تعقیب کړو؛ په بڼه ورکړل شوی schema.table_name; سره یوځای نشي کارول کیدی table.exclude.list;
  • heartbeat.interval.ms - وقفه (په ملی ثانیو کې) چې ورسره نښلونکی د زړه ضربان پیغامونه یوې ځانګړې موضوع ته لیږي؛
  • heartbeat.action.query - یوه غوښتنه چې د هر زړه ټکان پیغام لیږلو پرمهال به اجرا شي (اختیار د 1.1 نسخه راهیسې څرګند شوی)؛
  • slot.name - د نقل کولو سلاټ نوم چې د نښلونکي لخوا کارول کیږي؛
  • publication.name - نوم خپرونې په PostgreSQL کې چې نښلونکی یې کاروي. په هغه صورت کې چې دا شتون نلري، ډیبیزیم به هڅه وکړي چې دا رامینځته کړي. که چیرې هغه کارن چې لاندې اړیکه جوړه شوې وي د دې عمل لپاره کافي حقونه نلري، نښلونکی به د خطا سره وځي؛
  • transforms مشخص کوي چې څنګه د هدف موضوع نوم بدل کړئ:
    • transforms.AddPrefix.type اشاره کوي چې موږ به منظم بیانونه وکاروو؛
    • transforms.AddPrefix.regex - ماسک چې په واسطه یې د هدف موضوع نوم بیا تعریف شوی؛
    • transforms.AddPrefix.replacement - مستقیم هغه څه چې موږ یې بیا تعریف کوو.

د زړه ضربان او بدلونونو په اړه نور معلومات

په ډیفالټ ډول، نښلونکی د هرې ژمنې معاملې لپاره کافکا ته معلومات لیږي، او د خدماتو موضوع ته خپل LSN (د ترتیب شمیره) لیکي. offset. مګر څه پیښیږي که چیرې نښلونکی د ټول ډیټابیس لوستلو لپاره تنظیم شوی نه وي ، مګر د دې میزونو یوازې برخه (په کوم کې چې ډاټا په مکرر ډول تازه کیږي)؟

  • نښلونکی به د WAL فایلونه ولولي او د راکړې ورکړې ژمنې به په هغه میزونو کې ونه پیژني چې دا یې څاري.
  • نو ځکه، دا به خپل اوسنی موقعیت یا په موضوع یا د نقل کولو سلاټ کې تازه نه کړي.
  • دا به په بدل کې د WAL فایلونو لامل شي چې په ډیسک کې "بند" شي او احتمال به د ډیسک ځای پای ته ورسیږي.

او دلته اختیارونه د ژغورنې لپاره راځي. heartbeat.interval.ms и heartbeat.action.query. په جوړه کې د دې اختیارونو کارول دا امکان رامینځته کوي چې هرکله چې د زړه ضربان پیغام لیږل کیږي په جلا جدول کې د معلوماتو بدلولو غوښتنه پلي کول. په دې توګه، LSN چې نښلونکی اوس مهال موقعیت لري (د نقل کولو سلاټ کې) په دوامداره توګه تازه کیږي. دا DBMS ته اجازه ورکوي چې د WAL فایلونه لرې کړي چې نور ورته اړتیا نلري. د نورو معلوماتو لپاره چې اختیارونه څنګه کار کوي، وګورئ اسناد.

بل اختیار چې د نږدې پاملرنې مستحق دی transforms. که څه هم دا د اسانتیا او ښکلا په اړه ډیر دی ...

د ډیفالټ په واسطه، ډیبیزیم د لاندې نومونې پالیسۍ په کارولو سره موضوعات رامینځته کوي: serverName.schemaName.tableName. دا ممکن تل اسانه نه وي. اختیارونه transforms د منظم بیانونو په کارولو سره، تاسو کولی شئ د میزونو لیست تعریف کړئ چې پیښې باید د ځانګړي نوم سره یوې موضوع ته واستول شي.

زموږ په ترتیب کې مننه transforms لاندې پیښیږي: د تعقیب شوي ډیټابیس څخه د CDC ټولې پیښې به د نوم سره موضوع ته لاړ شي data.cdc.dbname. که نه نو (د دې ترتیباتو پرته)، ډیبیزیم به په ډیفالټ ډول د فورمې د هر جدول لپاره موضوع رامینځته کړي: pg-dev.public.<table_name>.

د نښلونکي محدودیتونه

د PostgreSQL لپاره د نښلونکي تشکیلاتو توضیحاتو په پای کې ، دا د دې د کار لاندې ځانګړتیاو / محدودیتونو په اړه د خبرو ارزښت لري:

  1. د PostgreSQL لپاره د نښلونکي فعالیت د منطقي کوډ کولو مفهوم باندې تکیه کوي. له همدې امله هغه د ډیټابیس جوړښت بدلولو غوښتنې نه تعقیبوي (DDL) - په وینا، دا معلومات به په موضوعاتو کې نه وي.
  2. څرنګه چې د نقل کولو سلاټونه کارول کیږي، د نښلونکي پیوستون ممکن دی یوازې د ماسټر DBMS مثال ته.
  3. که چیرې هغه کارن چې لاندې نښلونکی ډیټابیس سره وصل وي یوازې د لوستلو حقونه لري ، نو د لومړي لانچ دمخه ، تاسو اړتیا لرئ په لاسي ډول د نقل کولو سلاټ رامینځته کړئ او ډیټابیس ته یې خپور کړئ.

د تشکیلاتو پلي کول

نو راځئ چې خپل تشکیلات په نښلونکي کې پورته کړو:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

موږ ګورو چې ډاونلوډ بریالی و او نښلونکی پیل شو:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

عالي: دا تنظیم شوی او د تګ لپاره چمتو دی. اوس راځئ چې د مصرف کونکي په توګه ویاړو او له کافکا سره اړیکه ونیسو، وروسته له دې چې موږ په جدول کې یو ننوتل اضافه او بدل کړو:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

زموږ په موضوع کې، دا به په لاندې ډول ښکاره شي:

زموږ د بدلونونو سره ډیر اوږد JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

په دواړو حالتونو کې، ریکارډونه د هغه ریکارډ کلیدي (PK) لري چې بدل شوي، او د بدلونونو اساس: ریکارډ مخکې څه و او وروسته څه شو.

  • په حالت کې INSERT: مخکې ارزښت (before) مساوي nullوروسته هغه تار چې داخل شوی و.
  • په حالت کې UPDATE: په payload.before د قطار پخوانی حالت ښودل شوی، او دننه payload.after - د بدلون جوهر سره نوی.

2.2 MongoDB

دا نښلونکی د معیاري MongoDB نقل میکانیزم کاروي، د DBMS لومړني نوډ د اپلاګ څخه معلومات لوستل.

په ورته ډول د PgSQL لپاره دمخه بیان شوي نښلونکي ته ، دلته هم په لومړي پیل کې ، د لومړني ډیټا سنیپ شاټ اخیستل کیږي ، وروسته لدې چې نښلونکی د اپلاګ لوستلو حالت ته ځي.

د ترتیب بیلګه:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

لکه څنګه چې تاسو لیدلی شئ، د تیرو مثالونو په پرتله هیڅ نوي اختیارونه شتون نلري، مګر یوازې د ډیټابیس سره د نښلولو لپاره مسؤل انتخابونه او د دوی مخکینۍ کم شوي.

امستنې transforms دا ځل دوی لاندې کار کوي: د سکیم څخه د هدف موضوع نوم بدل کړئ <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

د خطا زغم

زموږ په وخت کې د غلطۍ زغم او لوړ شتون مسله د هرکله څخه خورا شدیده ده - په ځانګړي توګه کله چې موږ د ډیټا او لیږدونو په اړه وغږیږو ، او د معلوماتو بدلون تعقیب پدې مسله کې د پام وړ ندي. راځئ وګورو چې په اصولو کې څه غلط کیدی شي او په هره قضیه کې دبیزیم سره به څه پیښ شي.

د وتلو درې اختیارونه شتون لري:

  1. د کافکا نښلول ناکامي. که نښلول په توزیع شوي حالت کې د کار کولو لپاره تنظیم شوی وي، دا ډیری کارګرانو ته اړتیا لري چې ورته group.id تنظیم کړي. بیا، که یو له دوی څخه ناکام شي، نښلونکی به په بل کارګر کې بیا پیل شي او د کافکا په موضوع کې د وروستي ژمن دریځ څخه لوستلو ته دوام ورکړي.
  2. د کافکا کلستر سره د ارتباط له لاسه ورکول. نښلونکی به په ساده ډول په هغه ځای کې لوستل ودروي چې کافکا ته په لیږلو کې پاتې راغلی او په وخت سره هڅه کوي چې دا بیا لیږلو پورې هڅه وکړي تر څو چې هڅه بریالۍ نشي.
  3. د معلوماتو سرچینه شتون نلري. نښلونکی به هڅه وکړي چې د ترتیب سره سم سرچینې سره وصل شي. ډیفالټ د کارولو 16 هڅې دي کفایتي بیک آف. د 16 ناکامې هڅې وروسته، دنده به په نښه شي ناکام شو او دا به د کافکا کنیکټ REST انٹرفیس له لارې په لاسي ډول بیا پیل کولو ته اړتیا ولري.
    • په حالت کې پوسټری ایس ایس ایل ډاټا به له لاسه ورنکړي، ځکه د نقل کولو سلاټونو کارول به د WAL فایلونو له مینځه وړلو مخه ونیسي چې د نښلونکي لخوا نه لوستل کیږي. په دې حالت کې، یو نیمګړتیا شتون لري: که چیرې د نښلونکي او DBMS ترمنځ د شبکې ارتباط د اوږدې مودې لپاره ګډوډ شي، د دې امکان شتون لري چې د ډیسک ځای پای ته ورسیږي، او دا ممکن د ټول DBMS ناکامۍ لامل شي.
    • په حالت کې مای د بنلاګ فایلونه پخپله د DBMS لخوا ګرځیدلی شي مخکې لدې چې ارتباط بحال شي. دا به د دې لامل شي چې نښلونکی ناکام حالت ته لاړ شي، او دا به اړتیا ولري چې په ابتدايي سنیپ شاټ حالت کې بیا پیل شي ترڅو د نورمال عملیاتو بیرته راګرځولو لپاره د بنلاګونو لوستلو ته دوام ورکړي.
    • په MongoDB. اسناد وايي: د نښلونکي چلند په هغه حالت کې چې د لاګ/اپلاګ فایلونه حذف شوي وي او نښلونکی نشي کولی له هغه ځای څخه لوستلو ته دوام ورکړي چیرې چې پریښودل شوی وي د ټولو DBMS لپاره ورته دی. دا په حقیقت کې دی چې نښلونکی به دولت ته لاړ شي ناکام شو او په حالت کې به بیا پیل ته اړتیا ولري لومړنی عکس.

      په هرصورت، استثناوې شتون لري. که نښلونکی د اوږدې مودې لپاره په منحل حالت کې و (یا د MongoDB مثال ته نشي رسیدلی)، او اپلوګ پدې وخت کې وګرځول شو، نو کله چې پیوستون بحال شي، نښلونکی به په آرامۍ سره د لومړي موجود موقعیت څخه ډاټا لوستلو ته دوام ورکړي. له همدې امله په کافکا کې ځینې معلومات نه ووهل شي.

پایلې

ډیبیزیم د CDC سیسټمونو سره زما لومړۍ تجربه ده او په ټولیز ډول خورا مثبته وه. پروژې د اصلي DBMS مالتړ، د ترتیب کولو اسانتیا، د کلستر کولو لپاره مالتړ او د یوې فعالې ټولنې لپاره رشوت ورکړ. د هغو کسانو لپاره چې په تمرین کې لیوالتیا لري، زه وړاندیز کوم چې تاسو لارښوونې ولولئ کافکا نښلول и ډیبیزیم.

د کافکا کنیکټ لپاره د JDBC نښلونکي په پرتله، د Debezium اصلي ګټه دا ده چې بدلونونه د DBMS لاګونو څخه لوستل کیږي، کوم چې د لږ ځنډ سره ډاټا ترلاسه کولو ته اجازه ورکوي. د JDBC نښلونکی (د کافکا کنیکټ لخوا چمتو شوی) د تعقیب شوي میز څخه په ټاکلي وقفه کې پوښتنې کوي او (د ورته دلیل لپاره) پیغامونه نه رامینځته کوي کله چې ډیټا حذف شي (تاسو څنګه کولی شئ د ډیټا لپاره پوښتنه وکړئ چې شتون نلري؟).

د ورته ستونزو د حل لپاره، تاسو کولی شئ لاندې حلونو ته پام وکړئ (د ډیبیزیم سربیره):

PS

زموږ په بلاګ کې هم ولولئ:

سرچینه: www.habr.com

Add a comment