اپاچی کافکا کے لیے ڈیبیزیم - سی ڈی سی کا تعارف

اپاچی کافکا کے لیے ڈیبیزیم - سی ڈی سی کا تعارف

میرے کام میں، میں اکثر نئے تکنیکی حل/سافٹ ویئر پروڈکٹس دیکھتا ہوں، جن کے بارے میں معلومات روسی بولنے والے انٹرنیٹ پر بہت کم ہیں۔ اس مضمون کے ساتھ، میں اپنی حالیہ مشق کی ایک مثال کے ساتھ ایسے ہی ایک خلا کو پُر کرنے کی کوشش کروں گا، جب مجھے ڈیبیزیم کا استعمال کرتے ہوئے دو مشہور DBMSs (PostgreSQL اور MongoDB) سے کافکا کلسٹر میں CDC ایونٹس بھیجنے کی ضرورت تھی۔ مجھے امید ہے کہ یہ جائزہ مضمون، جو کام کیے جانے کے نتیجے میں شائع ہوا، دوسروں کے لیے مفید ثابت ہوگا۔

ڈیبیزیم اور سی ڈی سی عام طور پر کیا ہے؟

ڈیبیزیم - سی ڈی سی سافٹ ویئر زمرہ کا نمائندہ (ڈیٹا کی تبدیلی پر قبضہ کریں۔)، یا زیادہ واضح طور پر، یہ مختلف DBMSs کے لیے کنیکٹرز کا ایک سیٹ ہے جو Apache Kafka Connect فریم ورک کے ساتھ مطابقت رکھتا ہے۔

یہ اوپن سورس پروجیکٹ، Apache License v2.0 کے تحت لائسنس یافتہ اور Red Hat کے ذریعے سپانسر شدہ۔ ترقی 2016 سے جاری ہے اور اس وقت یہ درج ذیل DBMS کے لیے باضابطہ تعاون فراہم کرتا ہے: MySQL, PostgreSQL, MongoDB, SQL Server۔ Cassandra اور Oracle کے لیے بھی کنیکٹر موجود ہیں، لیکن وہ فی الحال "ابتدائی رسائی" کی حالت میں ہیں، اور نئی ریلیز پسماندہ مطابقت کی ضمانت نہیں دیتی ہیں۔

اگر ہم سی ڈی سی کا روایتی نقطہ نظر سے موازنہ کریں (جب ایپلی کیشن براہ راست DBMS سے ڈیٹا پڑھتی ہے)، تو اس کے اہم فوائد میں کم تاخیر، اعلی وشوسنییتا اور دستیابی کے ساتھ قطار کی سطح پر ڈیٹا چینج اسٹریمنگ کا نفاذ شامل ہے۔ آخری دو نکات کافکا کلسٹر کو CDC ایونٹس کے ذخیرے کے طور پر استعمال کرکے حاصل کیے جاتے ہیں۔

اس کے علاوہ، فوائد میں یہ حقیقت بھی شامل ہے کہ واقعات کو ذخیرہ کرنے کے لیے ایک ہی ماڈل کا استعمال کیا جاتا ہے، اس لیے حتمی درخواست کو مختلف DBMS کو چلانے کی باریکیوں کے بارے میں فکر کرنے کی ضرورت نہیں ہے۔

آخر میں، ایک میسج بروکر کا استعمال ان ایپلی کیشنز کو اجازت دیتا ہے جو ڈیٹا میں تبدیلیوں کو مانیٹر کر کے افقی طور پر اسکیل آؤٹ کر سکیں۔ ایک ہی وقت میں، ڈیٹا سورس پر اثر کو کم کیا جاتا ہے، کیونکہ ڈیٹا براہ راست DBMS سے نہیں، بلکہ کافکا کلسٹر سے حاصل کیا جاتا ہے۔

ڈیبیزیم فن تعمیر کے بارے میں

ڈیبیزیم کا استعمال اس سادہ اسکیم پر آتا ہے:

DBMS (بطور ڈیٹا ماخذ) → کافکا کنیکٹ میں کنیکٹر → اپاچی کافکا → صارف

ایک مثال کے طور پر، میں پروجیکٹ کی ویب سائٹ سے ایک خاکہ پیش کروں گا:

اپاچی کافکا کے لیے ڈیبیزیم - سی ڈی سی کا تعارف

تاہم، مجھے یہ اسکیم واقعی پسند نہیں ہے، کیونکہ ایسا لگتا ہے کہ صرف ایک سنک کنیکٹر ہی ممکن ہے۔

حقیقت میں، صورتحال مختلف ہے: اپنی ڈیٹا لیک کو بھرنا (اوپر دیے گئے خاکے میں آخری لنک) Debezium استعمال کرنے کا واحد طریقہ نہیں ہے۔ اپاچی کافکا کو بھیجے گئے ایونٹس کو آپ کی ایپلی کیشنز مختلف حالات سے نمٹنے کے لیے استعمال کر سکتی ہیں۔ مثال کے طور پر:

  • کیشے سے غیر متعلقہ ڈیٹا کو ہٹانا؛
  • اطلاعات بھیجنا؛
  • تلاش انڈیکس اپ ڈیٹس؛
  • کچھ قسم کے آڈٹ لاگز؛
  • ...

اگر آپ کے پاس جاوا ایپلی کیشن ہے اور کافکا کلسٹر استعمال کرنے کی کوئی ضرورت/امکان نہیں ہے، تو اس کے ذریعے کام کرنے کا بھی امکان ہے۔ سرایت کنیکٹر. واضح فائدہ یہ ہے کہ یہ اضافی بنیادی ڈھانچے کی ضرورت کو ختم کرتا ہے (ایک کنیکٹر اور کافکا کی شکل میں)۔ تاہم، اس حل کو ورژن 1.1 سے فرسودہ کر دیا گیا ہے اور اب اسے استعمال کرنے کی سفارش نہیں کی گئی ہے (اس کے لیے سپورٹ مستقبل کی ریلیز میں ہٹائی جا سکتی ہے)۔

یہ مضمون ڈویلپرز کے ذریعہ تجویز کردہ فن تعمیر پر تبادلہ خیال کرے گا، جو غلطی کی رواداری اور توسیع پذیری فراہم کرتا ہے۔

کنیکٹر کی ترتیب

سب سے اہم قدر - ڈیٹا - میں تبدیلیوں کا سراغ لگانا شروع کرنے کے لیے ہمیں ضرورت ہے:

  1. ڈیٹا سورس، جو ورژن 5.7، PostgreSQL 9.6+، MongoDB 3.2+ سے شروع ہونے والا MySQL ہوسکتا ہے (مکمل فہرست);
  2. اپاچی کافکا کلسٹر
  3. کافکا کنیکٹ مثال (ورژن 1.x، 2.x)؛
  4. ڈیبیزیم کنیکٹر کو ترتیب دیا گیا۔

پہلے دو نکات پر کام کریں، یعنی ڈی بی ایم ایس اور اپاچی کافکا کو انسٹال کرنے کا عمل مضمون کے دائرہ کار سے باہر ہے۔ تاہم، ان لوگوں کے لیے جو ہر چیز کو سینڈ باکس میں لگانا چاہتے ہیں، مثالوں کے ساتھ سرکاری ذخیرہ میں ایک ریڈی میڈ موجود ہے۔ docker-compose.yaml.

ہم آخری دو نکات پر مزید تفصیل سے توجہ دیں گے۔

0. کافکا کنیکٹ

یہاں اور بعد میں مضمون میں، تمام کنفیگریشن مثالوں کو ڈیبیزیم ڈویلپرز کے ذریعہ تقسیم کردہ ڈوکر امیج کے تناظر میں سمجھا جاتا ہے۔ اس میں تمام ضروری پلگ ان فائلیں (کنیکٹرز) ہیں اور ماحولیاتی متغیرات کا استعمال کرتے ہوئے کافکا کنیکٹ کنفیگریشن فراہم کرتا ہے۔

اگر آپ Confluent سے کافکا کنیکٹ استعمال کرنے کا ارادہ رکھتے ہیں، تو آپ کو ضروری کنیکٹرز کے پلگ ان کو خود اس ڈائرکٹری میں شامل کرنے کی ضرورت ہوگی۔ plugin.path یا ماحولیاتی متغیر کے ذریعے سیٹ کریں۔ CLASSPATH. کافکا کنیکٹ ورکر اور کنیکٹرز کی سیٹنگز کو کنفیگریشن فائلوں کے ذریعے بیان کیا جاتا ہے جو ورکر اسٹارٹ کمانڈ کو آرگیومنٹ کے طور پر بھیجی جاتی ہیں۔ تفصیلات کے لیے دیکھیں دستاویزات.

کنیکٹر ورژن میں ڈیبیزم قائم کرنے کا پورا عمل دو مراحل میں کیا جاتا ہے۔ آئیے ان میں سے ہر ایک پر غور کریں:

1. کافکا کنیکٹ فریم ورک ترتیب دینا

اپاچی کافکا کلسٹر میں ڈیٹا کو سٹریم کرنے کے لیے، کافکا کنیکٹ فریم ورک میں مخصوص پیرامیٹرز سیٹ کیے گئے ہیں، جیسے:

  • کلسٹر کنکشن کی ترتیبات،
  • عنوانات کے نام جن میں کنیکٹر کی ترتیب خود محفوظ کی جائے گی،
  • گروپ کا نام جس میں کنیکٹر چل رہا ہے (تقسیم شدہ موڈ استعمال کرنے کی صورت میں)۔

پروجیکٹ کی آفیشل ڈوکر امیج ماحولیاتی متغیرات کا استعمال کرتے ہوئے کنفیگریشن کی حمایت کرتی ہے - یہ وہی ہے جو ہم استعمال کریں گے۔ تو آئیے اس تصویر کو ڈاؤن لوڈ کرتے ہیں:

docker pull debezium/connect

کنیکٹر کو چلانے کے لیے درکار ماحولیاتی متغیرات کا کم از کم سیٹ حسب ذیل ہے:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - کلسٹر اراکین کی مکمل فہرست حاصل کرنے کے لیے کافکا کلسٹر سرورز کی ابتدائی فہرست؛
  • OFFSET_STORAGE_TOPIC=connector-offsets - جگہوں کو ذخیرہ کرنے کے لیے ایک موضوع جہاں کنیکٹر فی الحال موجود ہے۔
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - کنیکٹر کی حیثیت اور اس کے کاموں کو ذخیرہ کرنے کے لئے ایک موضوع؛
  • CONFIG_STORAGE_TOPIC=connector-config - کنیکٹر کنفیگریشن ڈیٹا اور اس کے کاموں کو ذخیرہ کرنے کے لیے ایک موضوع؛
  • GROUP_ID=1 - کارکنوں کے گروپ کا شناخت کنندہ جس پر کنیکٹر کے کام کو انجام دیا جاسکتا ہے۔ تقسیم کا استعمال کرتے وقت ضروری ہے (تقسیم) حکومت.

ہم ان متغیرات کے ساتھ کنٹینر لانچ کرتے ہیں:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

ایورو کے بارے میں نوٹ

پہلے سے طے شدہ طور پر، ڈیبیزیم ڈیٹا کو JSON فارمیٹ میں لکھتا ہے، جو سینڈ باکسز اور ڈیٹا کی کم مقدار کے لیے قابل قبول ہے، لیکن بھاری بھرکم ڈیٹا بیس میں ایک مسئلہ ہو سکتا ہے۔ JSON کنورٹر کا متبادل پیغامات کو سیریلائز کرنا ہے۔ یورو ایک بائنری فارمیٹ میں، جو اپاچی کافکا میں I/O سب سسٹم پر بوجھ کو کم کرتا ہے۔

Avro استعمال کرنے کے لیے، آپ کو الگ سے تعینات کرنا ہوگا۔ سکیما رجسٹری (اسکیموں کو ذخیرہ کرنے کے لیے)۔ کنورٹر کے متغیرات اس طرح نظر آئیں گے:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro کے استعمال اور اس کے لیے رجسٹری قائم کرنے کی تفصیلات مضمون کے دائرہ کار سے باہر ہیں - مزید وضاحت کے لیے، ہم JSON استعمال کریں گے۔

2. کنیکٹر خود ترتیب دینا

اب آپ براہ راست کنیکٹر کی کنفیگریشن پر جا سکتے ہیں، جو ماخذ سے ڈیٹا پڑھے گا۔

آئیے دو DBMS کے کنیکٹرز کی مثال دیکھیں: PostgreSQL اور MongoDB، جس کے لیے مجھے تجربہ ہے اور جن کے لیے اختلافات ہیں (اگرچہ چھوٹے، لیکن کچھ معاملات میں اہم!)

ترتیب کو JSON نوٹیشن میں بیان کیا گیا ہے اور POST درخواست کا استعمال کرتے ہوئے کافکا کنیکٹ پر اپ لوڈ کیا گیا ہے۔

2.1. پوسٹگری ایس کیو ایل

PostgreSQL کے لیے کنیکٹر کنفیگریشن کی مثال:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

اس سیٹ اپ کے بعد کنیکٹر کے آپریشن کا اصول بہت آسان ہے:

  • پہلی شروعات میں، یہ کنفیگریشن میں بیان کردہ ڈیٹا بیس سے جڑتا ہے اور موڈ میں شروع ہوتا ہے۔ ابتدائی سنیپ شاٹ، مشروط کے ساتھ موصول ہونے والے ڈیٹا کا ابتدائی سیٹ کافکا کو بھیجنا SELECT * FROM table_name.
  • ابتداء مکمل ہونے کے بعد، کنیکٹر PostgreSQL WAL فائلوں سے تبدیلیوں کو پڑھنے کے موڈ میں داخل ہوتا ہے۔

استعمال شدہ اختیارات کے بارے میں:

  • name - کنیکٹر کا نام جس کے لیے ذیل میں بیان کردہ کنفیگریشن استعمال کی گئی ہے۔ مستقبل میں، یہ نام کافکا کنیکٹ REST API کے ذریعے کنیکٹر کے ساتھ کام کرنے کے لیے استعمال کیا جاتا ہے (یعنی اسٹیٹس دیکھیں/دوبارہ شروع کریں/کنفیگریشن کو اپ ڈیٹ کریں)؛
  • connector.class — DBMS کنیکٹر کلاس جو کنفیگرڈ کنیکٹر کے ذریعے استعمال کیا جائے گا۔
  • plugin.name - WAL فائلوں سے ڈیٹا کی منطقی ضابطہ کشائی کے لیے پلگ ان کا نام۔ منتخب کرنے کے لیے دستیاب ہے۔ wal2json, decoderbuffs и pgoutput. پہلے دو کو DBMS میں مناسب ایکسٹینشن کی تنصیب کی ضرورت ہوتی ہے، اور pgoutput PostgreSQL ورژن 10 اور اس سے زیادہ کے لیے اضافی ہیرا پھیری کی ضرورت نہیں ہے۔
  • database.* - ڈیٹا بیس سے جڑنے کے اختیارات، جہاں database.server.name — PostgreSQL مثال کا نام کافکا کلسٹر میں موضوع کا نام بنانے کے لیے استعمال کیا جاتا ہے۔
  • table.include.list - جدولوں کی ایک فہرست جس میں ہم تبدیلیوں کو ٹریک کرنا چاہتے ہیں۔ فارمیٹ میں دیا گیا ہے۔ schema.table_name; کے ساتھ استعمال نہیں کیا جا سکتا table.exclude.list;
  • heartbeat.interval.ms - وقفہ (ملی سیکنڈ میں) جس کے ساتھ کنیکٹر دل کی دھڑکن کے پیغامات کسی خاص موضوع پر بھیجتا ہے۔
  • heartbeat.action.query - ایک درخواست جو ہر دل کی دھڑکن کا پیغام بھیجتے وقت عمل میں لائی جائے گی (آپشن ورژن 1.1 سے ظاہر ہوا ہے)؛
  • slot.name - نقل کی سلاٹ کا نام جو کنیکٹر کے ذریعہ استعمال کیا جائے گا؛
  • publication.name - نام اشاعت PostgreSQL میں جو کنیکٹر استعمال کرتا ہے۔ اگر یہ موجود نہیں ہے تو، Debezium اسے بنانے کی کوشش کرے گا۔ اگر صارف جس کے تحت کنکشن بنایا گیا ہے اس کے پاس اس کارروائی کے لیے کافی حقوق نہیں ہیں، کنیکٹر غلطی کے ساتھ باہر نکل جائے گا۔
  • transforms ہدف کے عنوان کا نام تبدیل کرنے کا طریقہ بالکل طے کرتا ہے:
    • transforms.AddPrefix.type اشارہ کرتا ہے کہ ہم ریگولر ایکسپریشن استعمال کریں گے۔
    • transforms.AddPrefix.regex - ماسک جس کے ذریعے ہدف کے عنوان کا نام دوبارہ بیان کیا گیا ہے۔
    • transforms.AddPrefix.replacement - براہ راست جس کی ہم دوبارہ تعریف کرتے ہیں۔

دل کی دھڑکن اور تبدیلیوں کے بارے میں مزید

پہلے سے طے شدہ طور پر، کنیکٹر کافکا کو ہر کمٹڈ ٹرانزیکشن کے لیے ڈیٹا بھیجتا ہے، اور سروس کے موضوع پر اپنا LSN (لاگ سیکوینس نمبر) لکھتا ہے۔ offset. لیکن کیا ہوتا ہے اگر کنیکٹر پورے ڈیٹا بیس کو نہیں، بلکہ اس کے ٹیبلز کا صرف ایک حصہ پڑھنے کے لیے ترتیب دیا گیا ہے (جس میں ڈیٹا کو کبھی کبھار اپ ڈیٹ کیا جاتا ہے)؟

  • کنیکٹر WAL فائلوں کو پڑھے گا اور ان میں لین دین کی کمٹ کا پتہ نہیں لگائے گا ان میزوں پر جو اس کی نگرانی کرتا ہے۔
  • لہذا، یہ اپنی موجودہ پوزیشن کو یا تو موضوع میں یا نقل کی سلاٹ میں اپ ڈیٹ نہیں کرے گا۔
  • اس کے نتیجے میں، WAL فائلیں ڈسک پر "اٹک" جائیں گی اور ممکنہ طور پر ڈسک کی جگہ ختم ہو جائے گی۔

اور یہاں آپشنز ریسکیو کے لیے آتے ہیں۔ heartbeat.interval.ms и heartbeat.action.query. ان اختیارات کو جوڑوں میں استعمال کرنے سے ہر بار جب دل کی دھڑکن کا پیغام بھیجا جاتا ہے تو علیحدہ جدول میں ڈیٹا کو تبدیل کرنے کی درخواست پر عمل کرنا ممکن ہو جاتا ہے۔ اس طرح، LSN جس پر کنیکٹر فی الحال واقع ہے (نقل کی سلاٹ میں) مسلسل اپ ڈیٹ ہوتا رہتا ہے۔ یہ DBMS کو WAL فائلوں کو ہٹانے کی اجازت دیتا ہے جن کی مزید ضرورت نہیں ہے۔ اختیارات کیسے کام کرتے ہیں اس بارے میں مزید معلومات کے لیے، دیکھیں دستاویزات.

قریب سے توجہ دینے کے قابل ایک اور اختیار ہے transforms. اگرچہ یہ سہولت اور خوبصورتی کے بارے میں زیادہ ہے ...

پہلے سے طے شدہ طور پر، Debezium مندرجہ ذیل نام کی پالیسی کا استعمال کرتے ہوئے عنوانات تخلیق کرتا ہے: serverName.schemaName.tableName. یہ ہمیشہ آسان نہیں ہوسکتا ہے۔ اختیارات transforms ریگولر ایکسپریشنز کا استعمال کرتے ہوئے، آپ ٹیبلز کی ایک فہرست متعین کر سکتے ہیں جن کے ایونٹس کو کسی مخصوص نام کے ساتھ کسی موضوع کی طرف لے جانے کی ضرورت ہے۔

ہماری ترتیب میں شکریہ transforms مندرجہ ذیل ہوتا ہے: ٹریک کردہ ڈیٹا بیس سے تمام CDC واقعات نام کے ساتھ موضوع پر جائیں گے۔ data.cdc.dbname. بصورت دیگر (ان ترتیبات کے بغیر)، ڈیبیزیم بطور ڈیفالٹ فارم کے ہر ٹیبل کے لیے ایک موضوع بنائے گا: pg-dev.public.<table_name>.

کنیکٹر کی حدود

PostgreSQL کے لیے کنیکٹر کنفیگریشن کی تفصیل کے آخر میں، اس کے کام کی درج ذیل خصوصیات / حدود کے بارے میں بات کرنا قابل قدر ہے۔

  1. PostgreSQL کے لیے کنیکٹر کی فعالیت منطقی ضابطہ کشائی کے تصور پر منحصر ہے۔ اس لیے وہ ڈیٹا بیس کی ساخت کو تبدیل کرنے کی درخواستوں کو ٹریک نہیں کرتا ہے۔ (DDL) - اس کے مطابق، یہ ڈیٹا عنوانات میں نہیں ہوگا۔
  2. چونکہ نقل کے سلاٹ استعمال کیے جاتے ہیں، کنیکٹر کا رابطہ ممکن ہے۔ صرف ماسٹر ڈی بی ایم ایس مثال کے طور پر۔
  3. اگر صارف جس کے تحت کنیکٹر ڈیٹا بیس سے جڑتا ہے اس کے پاس صرف پڑھنے کے حقوق ہیں، تو پھر پہلی لانچ سے پہلے، آپ کو دستی طور پر ایک نقل کی سلاٹ بنانے اور ڈیٹا بیس میں شائع کرنے کی ضرورت ہوگی۔

ترتیب کا اطلاق کرنا

تو آئیے اپنی کنفیگریشن کو کنیکٹر میں لوڈ کرتے ہیں:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

ہم چیک کرتے ہیں کہ ڈاؤن لوڈ کامیاب تھا اور کنیکٹر شروع ہوا:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

بہت اچھا: یہ سیٹ اپ ہے اور جانے کے لیے تیار ہے۔ اب ہم ایک صارف ہونے کا بہانہ کرتے ہیں اور کافکا سے جڑتے ہیں، جس کے بعد ہم ٹیبل میں ایک اندراج شامل اور تبدیل کرتے ہیں:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

ہمارے موضوع میں، یہ مندرجہ ذیل طور پر دکھایا جائے گا:

ہماری تبدیلیوں کے ساتھ بہت طویل JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

دونوں صورتوں میں، ریکارڈز اس ریکارڈ کی کلید (PK) پر مشتمل ہوتے ہیں جسے تبدیل کیا گیا تھا، اور تبدیلیوں کا جوہر: ریکارڈ پہلے کیا تھا اور اس کے بعد کیا ہوا۔

  • کے معاملے میں INSERT: قدر سے پہلے (before) برابر ہے۔ nullاس کے بعد سٹرنگ ڈالی گئی۔
  • کے معاملے میں UPDATE: پر payload.before قطار کی پچھلی حالت ظاہر ہوتی ہے، اور میں payload.after - تبدیلی کے جوہر کے ساتھ نیا۔

2.2 مونگو ڈی بی

یہ کنیکٹر معیاری MongoDB ریپلیکیشن میکانزم کا استعمال کرتا ہے، DBMS پرائمری نوڈ کے oplog سے معلومات پڑھتا ہے۔

اسی طرح PgSQL کے لیے پہلے سے بیان کردہ کنیکٹر کی طرح، یہاں بھی، پہلی شروعات میں، بنیادی ڈیٹا سنیپ شاٹ لیا جاتا ہے، جس کے بعد کنیکٹر اوپلاگ ریڈنگ موڈ میں بدل جاتا ہے۔

ترتیب مثال:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

جیسا کہ آپ دیکھ سکتے ہیں، پچھلی مثال کے مقابلے میں کوئی نیا آپشن نہیں ہے، لیکن صرف ڈیٹا بیس سے منسلک ہونے کے لیے ذمہ دار آپشنز کی تعداد اور ان کے سابقوں کو کم کیا گیا ہے۔

ترتیبات transforms اس بار وہ مندرجہ ذیل کام کرتے ہیں: اسکیم سے ہدف والے عنوان کا نام تبدیل کریں۔ <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

غلطی کی رواداری

ہمارے وقت میں غلطی برداشت کرنے اور اعلی دستیابی کا مسئلہ پہلے سے کہیں زیادہ شدید ہے - خاص طور پر جب ہم ڈیٹا اور لین دین کے بارے میں بات کر رہے ہیں، اور ڈیٹا کی تبدیلیوں کو ٹریک کرنا اس مسئلے میں ایک طرف نہیں کھڑا ہے۔ آئیے دیکھتے ہیں کہ اصولی طور پر کیا غلط ہو سکتا ہے اور ہر معاملے میں ڈیبیزیم کا کیا ہو گا۔

آپٹ آؤٹ کے تین اختیارات ہیں:

  1. کافکا کنیکٹ کی ناکامی۔. اگر کنیکٹ کو تقسیم شدہ موڈ میں کام کرنے کے لیے کنفیگر کیا گیا ہے، تو اس کے لیے ایک سے زیادہ کارکنوں کو ایک ہی group.id سیٹ کرنے کی ضرورت ہے۔ پھر، اگر ان میں سے ایک ناکام ہو جاتا ہے، تو کنیکٹر دوسرے کارکن پر دوبارہ شروع ہو جائے گا اور کافکا میں موضوع میں آخری عہد کی پوزیشن سے پڑھنا جاری رکھیں گے۔
  2. کافکا کلسٹر کے ساتھ رابطے کا نقصان. کنیکٹر صرف اس پوزیشن پر پڑھنا بند کر دے گا جو کافکا کو بھیجنے میں ناکام رہا، اور وقتاً فوقتاً اسے دوبارہ بھیجنے کی کوشش کرے گا جب تک کہ کوشش کامیاب نہ ہو جائے۔
  3. ڈیٹا کا ذریعہ دستیاب نہیں ہے۔. کنیکٹر ترتیب کے مطابق ماخذ سے دوبارہ جڑنے کی کوشش کرے گا۔ پہلے سے طے شدہ استعمال کرنے کی 16 کوششیں ہیں۔ کفایتی واپسی. 16ویں ناکام کوشش کے بعد، کام کو بطور نشان زد کیا جائے گا۔ ناکام اور اسے کافکا کنیکٹ REST انٹرفیس کے ذریعے دستی طور پر دوبارہ شروع کرنے کی ضرورت ہوگی۔
    • کے معاملے میں PostgreSQL کی ڈیٹا ضائع نہیں ہوگا، کیونکہ نقل کی سلاٹ کا استعمال کنیکٹر کے ذریعہ نہ پڑھی گئی WAL فائلوں کو حذف ہونے سے روکے گا۔ اس صورت میں، ایک منفی پہلو ہے: اگر کنیکٹر اور DBMS کے درمیان نیٹ ورک کنیکٹیویٹی طویل عرصے تک منقطع رہتی ہے، تو امکان ہے کہ ڈسک کی جگہ ختم ہو جائے، اور یہ پورے DBMS کی ناکامی کا باعث بن سکتا ہے۔
    • کے معاملے میں MySQL کنیکٹوٹی بحال ہونے سے پہلے binlog فائلوں کو خود DBMS کے ذریعے گھمایا جا سکتا ہے۔ اس کی وجہ سے کنیکٹر ناکام حالت میں چلا جائے گا، اور معمول کے آپریشن کو بحال کرنے کے لیے، آپ کو بِن لاگز سے پڑھنا جاری رکھنے کے لیے ابتدائی سنیپ شاٹ موڈ میں دوبارہ شروع کرنے کی ضرورت ہوگی۔
    • کے بارے میں منگو ڈی بی. دستاویزات کہتی ہیں: لاگ/اوپلاگ فائلوں کو حذف کرنے کی صورت میں کنیکٹر کا رویہ اور کنیکٹر اس پوزیشن سے پڑھنا جاری نہیں رکھ سکتا جہاں سے اسے چھوڑا گیا تھا، تمام DBMS کے لیے یکساں ہے۔ یہ اس حقیقت میں مضمر ہے کہ کنیکٹر ریاست میں جائے گا۔ ناکام اور موڈ میں دوبارہ شروع کرنے کی ضرورت ہوگی۔ ابتدائی سنیپ شاٹ.

      تاہم، مستثنیات ہیں. اگر کنیکٹر ایک طویل عرصے سے منقطع حالت میں تھا (یا MongoDB مثال تک نہیں پہنچ سکتا تھا)، اور اس وقت کے دوران oplog کو گھمایا گیا تھا، پھر جب کنکشن بحال ہو جائے گا، کنیکٹر سکون سے پہلی دستیاب پوزیشن سے ڈیٹا پڑھنا جاری رکھے گا۔ ، یہی وجہ ہے کہ کافکا میں کچھ ڈیٹا کوئی مارے گا.

حاصل يہ ہوا

ڈیبیزیم سی ڈی سی سسٹمز کے ساتھ میرا پہلا تجربہ ہے اور مجموعی طور پر بہت مثبت رہا ہے۔ پروجیکٹ نے مرکزی DBMS کی مدد، ترتیب میں آسانی، کلسٹرنگ کے لیے تعاون اور ایک فعال کمیونٹی کو رشوت دی۔ پریکٹس میں دلچسپی رکھنے والوں کے لیے، میں تجویز کرتا ہوں کہ آپ گائیڈز پڑھیں کافکا کنیکٹ и ڈیبیزیم.

کافکا کنیکٹ کے JDBC کنیکٹر کے مقابلے میں، Debezium کا بنیادی فائدہ یہ ہے کہ تبدیلیاں DBMS لاگز سے پڑھی جاتی ہیں، جس سے ڈیٹا کو کم سے کم تاخیر سے موصول کیا جا سکتا ہے۔ JDBC کنیکٹر (کافکا کنیکٹ کی طرف سے فراہم کردہ) ایک مقررہ وقفہ پر ٹریک شدہ ٹیبل سے استفسار کرتا ہے اور (اسی وجہ سے) ڈیٹا کو حذف ہونے پر پیغامات نہیں بناتا (آپ اس ڈیٹا کے لیے کیسے استفسار کر سکتے ہیں جو وہاں نہیں ہے؟)۔

اسی طرح کے مسائل کو حل کرنے کے لیے، آپ مندرجہ ذیل حل پر توجہ دے سکتے ہیں (ڈیبیزیم کے علاوہ):

PS

ہمارے بلاگ پر بھی پڑھیں:

ماخذ: www.habr.com

نیا تبصرہ شامل کریں