تقديم Debezium - CDC لأباتشي كافكا

تقديم Debezium - CDC لأباتشي كافكا

في عملي ، غالبًا ما أجد حلولًا تقنية / منتجات برمجية جديدة ، والمعلومات المتعلقة بها نادرة نوعًا ما على الإنترنت الناطق بالروسية. من خلال هذه المقالة ، سأحاول ملء واحدة من هذه الفجوة بمثال من ممارستي الأخيرة ، عندما احتجت إلى إعداد إرسال أحداث CDC من اثنين من أنظمة DBMS الشائعة (PostgreSQL و MongoDB) إلى مجموعة كافكا باستخدام Debezium. آمل أن تكون مقالة المراجعة هذه ، التي ظهرت نتيجة للعمل المنجز ، مفيدة للآخرين.

ما هو Debezium و CDC بشكل عام؟

ديبيزيوم - ممثل عن فئة برامج CDC (التقاط تغيير البيانات) ، أو بتعبير أدق ، إنها مجموعة من الموصلات لأنظمة DBMS مختلفة متوافقة مع إطار عمل Apache Kafka Connect.

هذا مشروع مفتوح المصدر مرخصة بموجب ترخيص Apache v2.0 وبرعاية Red Hat. تم التطوير منذ عام 2016 وفي الوقت الحالي يوفر الدعم الرسمي لنظام إدارة قواعد البيانات التالي: MySQL و PostgreSQL و MongoDB و SQL Server. هناك أيضًا موصلات لـ Cassandra و Oracle ، لكنها حاليًا في حالة "الوصول المبكر" ، والإصدارات الجديدة لا تضمن التوافق مع الإصدارات السابقة.

إذا قارنا CDC بالنهج التقليدي (عندما يقرأ التطبيق البيانات من DBMS مباشرة) ، فإن مزاياه الرئيسية تشمل تنفيذ دفق تغيير البيانات على مستوى الصف مع زمن انتقال منخفض وموثوقية عالية وتوافر. يتم تحقيق النقطتين الأخيرتين باستخدام مجموعة كافكا كمستودع لأحداث مركز السيطرة على الأمراض.

تشمل المزايا أيضًا حقيقة استخدام نموذج واحد لتخزين الأحداث ، لذلك لا داعي للقلق من التطبيق النهائي بشأن الفروق الدقيقة لتشغيل نظم إدارة قواعد البيانات المختلفة.

أخيرًا ، يؤدي استخدام وسيط الرسائل إلى فتح مجال للتحجيم الأفقي للتطبيقات التي تتعقب التغييرات في البيانات. في الوقت نفسه ، يتم تقليل التأثير على مصدر البيانات ، نظرًا لأن البيانات لا يتم تلقيها مباشرة من DBMS ، ولكن من مجموعة كافكا.

حول عمارة ديبيزيوم

يعود استخدام Debezium إلى هذا المخطط البسيط:

DBMS (كمصدر بيانات) - موصل في Kafka Connect -> Apache Kafka -> المستهلك

كتوضيح ، سأقدم رسمًا تخطيطيًا من موقع المشروع على الويب:

تقديم Debezium - CDC لأباتشي كافكا

ومع ذلك ، لا أحب هذا المخطط حقًا ، لأنه يبدو أن موصل الحوض فقط ممكن.

في الواقع ، الوضع مختلف: ملء بحيرة البيانات الخاصة بك (الرابط الأخير في الرسم البياني أعلاه) ليست الطريقة الوحيدة لاستخدام ديبيزيوم. يمكن استخدام الأحداث المرسلة إلى Apache Kafka بواسطة تطبيقاتك لحل المواقف المختلفة. على سبيل المثال:

  • إزالة البيانات غير ذات الصلة من ذاكرة التخزين المؤقت ؛
  • إرسال الإخطارات
  • تحديثات فهرس البحث ؛
  • نوع من سجلات التدقيق ؛
  • ...

في حال كان لديك تطبيق Java ولا توجد حاجة / إمكانية لاستخدام مجموعة كافكا ، فهناك أيضًا إمكانية للعمل من خلاله موصل مضمن. الإضافة الواضحة هي أنه يمكنك من خلالها رفض البنية التحتية الإضافية (في شكل موصل وكافكا). ومع ذلك ، فقد تم إهمال هذا الحل منذ الإصدار 1.1 ولم يعد موصى به للاستخدام (يمكن إزالته في الإصدارات المستقبلية).

ستناقش هذه المقالة البنية التي أوصى بها المطورون ، والتي توفر التسامح مع الأخطاء وقابلية التوسع.

تكوين الموصل

من أجل البدء في تتبع التغييرات في أهم قيمة - البيانات - نحتاج إلى:

  1. مصدر البيانات ، والذي يمكن أن يكون MySQL بدءًا من الإصدار 5.7 و PostgreSQL 9.6+ و MongoDB 3.2+ (القائمة الكاملة);
  2. كتلة أباتشي كافكا
  3. مثال كافكا كونيكت (الإصدارات 1.x ، 2.x) ؛
  4. تكوين موصل Debezium.

اعمل على النقطتين الأوليين ، أي عملية تثبيت DBMS و Apache Kafka خارج نطاق المقال. ومع ذلك ، بالنسبة لأولئك الذين يرغبون في نشر كل شيء في وضع الحماية ، هناك واحد جاهز في المستودع الرسمي مع أمثلة عامل الميناء يؤلف.

سنركز على النقطتين الأخيرتين بمزيد من التفصيل.

0. كافكا كونيكت

هنا وفي وقت لاحق من المقالة ، يتم النظر في جميع أمثلة التكوين في سياق صورة Docker التي وزعها مطورو Debezium. يحتوي على جميع ملفات الملحقات الضرورية (الموصلات) ويوفر تكوين Kafka Connect باستخدام متغيرات البيئة.

إذا كنت تنوي استخدام Kafka Connect من Confluent ، فستحتاج إلى إضافة المكونات الإضافية للموصلات الضرورية بنفسك إلى الدليل المحدد في plugin.path أو عبر متغير البيئة CLASSPATH. يتم تحديد إعدادات عامل وموصلات Kafka Connect من خلال ملفات التكوين التي يتم تمريرها كوسيطات لأمر العامل start. لمزيد من التفاصيل انظر توثيق.

يتم تنفيذ العملية الكاملة لإعداد Debeizum في إصدار الموصل على مرحلتين. دعونا نفكر في كل منهم:

1. إنشاء إطار عمل كافكا كونيكت

لدفق البيانات إلى كتلة Apache Kafka ، يتم تعيين معلمات محددة في إطار عمل Kafka Connect ، مثل:

  • إعدادات اتصال الكتلة ،
  • أسماء الموضوعات التي سيتم فيها تخزين تكوين الموصل نفسه ،
  • اسم المجموعة التي يعمل بها الموصل (في حالة استخدام الوضع الموزع).

تدعم صورة Docker الرسمية للمشروع التكوين باستخدام متغيرات البيئة - وهذا ما سنستخدمه. لنقم بتنزيل الصورة:

docker pull debezium/connect

الحد الأدنى لمجموعة متغيرات البيئة المطلوبة لتشغيل الموصل كما يلي:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - القائمة الأولية لخوادم مجموعة كافكا للحصول على قائمة كاملة بأعضاء الكتلة ؛
  • OFFSET_STORAGE_TOPIC=connector-offsets - موضوع لتخزين المواضع حيث يوجد الموصل حاليًا ؛
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - موضوع لتخزين حالة الموصل ومهامه ؛
  • CONFIG_STORAGE_TOPIC=connector-config - موضوع لتخزين بيانات تكوين الموصل ومهامه ؛
  • GROUP_ID=1 - معرّف مجموعة العمال التي يمكن تنفيذ مهمة الرابط عليها ؛ مطلوب عند استخدام الموزعة (وزعت) النظام الحاكم.

نبدأ الحاوية بهذه المتغيرات:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

ملاحظة حول Avro

بشكل افتراضي ، يكتب Debezium البيانات بتنسيق JSON ، وهو أمر مقبول لصناديق الحماية وكميات صغيرة من البيانات ، ولكن يمكن أن يمثل مشكلة في قواعد البيانات المحملة بكثافة. بديل لمحول JSON هو إجراء تسلسل للرسائل باستخدام ملفات أفرو إلى تنسيق ثنائي ، مما يقلل الحمل على نظام الإدخال / الإخراج الفرعي في Apache Kafka.

لاستخدام Avro ، تحتاج إلى نشر ملف مخطط التسجيل (لتخزين المخططات). ستبدو متغيرات المحول كما يلي:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

تفاصيل استخدام Avro وإعداد سجل لها خارج نطاق المقالة - علاوة على ذلك ، للتوضيح ، سنستخدم JSON.

2. إعداد الموصل نفسه

يمكنك الآن الانتقال مباشرة إلى تكوين الموصل نفسه ، والذي سيقرأ البيانات من المصدر.

دعنا نلقي نظرة على مثال الموصلات لاثنين من DBMS: PostgreSQL و MongoDB ، والتي لدي خبرة بها والتي توجد بها اختلافات (وإن كانت صغيرة ، ولكنها مهمة في بعض الحالات!).

تم وصف التكوين في تدوين JSON وتحميله إلى Kafka Connect باستخدام طلب POST.

2.1 PostgreSQL

مثال على ضبط الموصل لـ PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

مبدأ تشغيل الموصل بعد هذا التكوين بسيط للغاية:

  • في البداية ، يتصل بقاعدة البيانات المحددة في التكوين ويبدأ في الوضع لقطة أولية، يرسل إلى كافكا مجموعة البيانات الأولية المتلقاة مع الشرط SELECT * FROM table_name.
  • بعد اكتمال التهيئة ، يدخل الموصل في وضع قراءة التغييرات من ملفات PostgreSQL WAL.

حول الخيارات المستخدمة:

  • name - اسم الموصل الذي تم استخدام التكوين الموضح أدناه له ؛ في المستقبل ، يتم استخدام هذا الاسم للعمل مع الموصل (أي عرض الحالة / إعادة التشغيل / تحديث التكوين) من خلال واجهة برمجة تطبيقات Kafka Connect REST ؛
  • connector.class - فئة موصل DBMS التي سيتم استخدامها بواسطة الموصل الذي تم تكوينه ؛
  • plugin.name هو اسم البرنامج المساعد للفك المنطقي للبيانات من ملفات WAL. متاح للاختيار من بينها wal2json, decoderbuffs и pgoutput. يتطلب الأولان تثبيت الامتدادات المناسبة في نظام إدارة قواعد البيانات ، و pgoutput بالنسبة للإصدار 10 من PostgreSQL والإصدارات الأحدث ، لا يتطلب الأمر معالجات إضافية ؛
  • database.* - خيارات الاتصال بقاعدة البيانات ، أين database.server.name - اسم مثيل PostgreSQL المستخدم لتشكيل اسم الموضوع في مجموعة كافكا ؛
  • table.include.list - قائمة بالجداول التي نريد تتبع التغييرات فيها ؛ في الشكل schema.table_name؛ لا يمكن استخدامها مع table.exclude.list;
  • heartbeat.interval.ms - الفاصل الزمني (بالمللي ثانية) الذي يرسل بواسطته الموصل رسائل نبضات القلب إلى موضوع خاص ؛
  • heartbeat.action.query - طلب سيتم تنفيذه عند إرسال كل رسالة نبضات (ظهر الخيار منذ الإصدار 1.1) ؛
  • slot.name - اسم فتحة النسخ التي سيستخدمها الموصل ؛
  • publication.name - اسم منشور في PostgreSQL التي يستخدمها الموصل. في حالة عدم وجوده ، سيحاول Debezium إنشائه. إذا لم يكن لدى المستخدم الذي تم الاتصال بموجبه حقوق كافية لهذا الإجراء ، فسيتم إنهاء الموصل مع وجود خطأ ؛
  • transforms يحدد بالضبط كيفية تغيير اسم الموضوع الهدف:
    • transforms.AddPrefix.type يشير إلى أننا سنستخدم التعبيرات النمطية ؛
    • transforms.AddPrefix.regex - قناع يتم من خلاله إعادة تعريف اسم الموضوع المستهدف ؛
    • transforms.AddPrefix.replacement - مباشرة ما نعيد تعريفه.

المزيد عن ضربات القلب والتحولات

بشكل افتراضي ، يرسل الموصل البيانات إلى كافكا لكل معاملة ملتزمة ، ويكتب LSN (رقم تسلسل السجل) الخاص به إلى موضوع الخدمة offset. ولكن ماذا يحدث إذا تم تكوين الموصل ليس لقراءة قاعدة البيانات بأكملها ، ولكن فقط جزء من جداولها (حيث يتم تحديث البيانات بشكل غير متكرر)؟

  • سيقرأ الموصل ملفات WAL ولن يكتشف تنفيذ المعاملات فيها إلى الجداول التي يراقبها.
  • لذلك ، لن يقوم بتحديث موقعه الحالي سواء في الموضوع أو في خانة النسخ المتماثل.
  • سيؤدي هذا بدوره إلى "تعليق" ملفات WAL على القرص ومن المحتمل أن تنفد مساحة القرص.

وهنا تأتي الخيارات للإنقاذ. heartbeat.interval.ms и heartbeat.action.query. يتيح استخدام هذه الخيارات في أزواج تنفيذ طلب لتغيير البيانات في جدول منفصل في كل مرة يتم فيها إرسال رسالة نبضات القلب. وبالتالي ، يتم تحديث LSN الذي يوجد عليه الموصل حاليًا (في فتحة النسخ المتماثل) باستمرار. يسمح ذلك لنظام إدارة قواعد البيانات (DBMS) بإزالة ملفات WAL التي لم تعد مطلوبة. لمزيد من المعلومات حول كيفية عمل الخيارات ، راجع توثيق.

الخيار الآخر الذي يستحق اهتمامًا أكبر هو transforms. على الرغم من أن الأمر يتعلق أكثر بالراحة والجمال ...

بشكل افتراضي ، يقوم Debezium بإنشاء موضوعات باستخدام سياسة التسمية التالية: serverName.schemaName.tableName. قد لا يكون هذا مناسبًا دائمًا. خيارات transforms باستخدام التعبيرات العادية ، يمكنك تحديد قائمة الجداول التي يجب توجيه الأحداث الخاصة بها إلى موضوع باسم معين.

في تكويننا بفضل transforms يحدث ما يلي: ستنتقل جميع أحداث CDC من قاعدة البيانات المتعقبة إلى الموضوع بالاسم data.cdc.dbname. خلاف ذلك (بدون هذه الإعدادات) ، سيقوم Debezium افتراضيًا بإنشاء موضوع لكل جدول في النموذج: pg-dev.public.<table_name>.

قيود الموصل

في نهاية وصف تكوين الموصل لـ PostgreSQL ، يجدر الحديث عن الميزات / القيود التالية لعمله:

  1. تعتمد وظيفة الموصل في PostgreSQL على مفهوم فك التشفير المنطقي. لذلك هو لا تتبع الطلبات لتغيير هيكل قاعدة البيانات (DDL) - وفقًا لذلك ، لن تكون هذه البيانات في الموضوعات.
  2. نظرًا لاستخدام فتحات النسخ المتماثل ، يمكن توصيل الموصل فقط إلى مثيل DBMS الرئيسي.
  3. إذا كان لدى المستخدم الذي يتصل بموجبه الموصل بقاعدة البيانات حقوق للقراءة فقط ، فقبل التشغيل الأول ، ستحتاج إلى إنشاء فتحة نسخ متماثل يدويًا ونشرها في قاعدة البيانات.

تطبيق التكوين

لذلك دعونا نحمل التكوين الخاص بنا في الموصل:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

نتحقق من نجاح التنزيل وأن الموصل بدأ:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

رائع: تم إعداده وجاهز للانطلاق. الآن دعونا نتظاهر بأننا مستهلكون ونتصل بكافكا ، وبعد ذلك نضيف ونغير إدخالاً في الجدول:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

في موضوعنا سيتم عرض هذا على النحو التالي:

JSON طويلة جدًا مع التغييرات التي أجريناها

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

في كلتا الحالتين ، تتكون السجلات من مفتاح (PK) للسجل الذي تم تغييره ، وجوهر التغييرات: ما كان السجل من قبل وما أصبح بعده.

  • في حالة INSERT: القيمة قبل (before) يساوي nullمتبوعًا بالسلسلة التي تم إدخالها.
  • في حالة UPDATE: في payload.before يتم عرض الحالة السابقة للصف ، وفي payload.after - جديد مع جوهر التغيير.

2.2 مونغو دي بي

يستخدم هذا الموصل آلية النسخ المتماثل MongoDB القياسية ، وقراءة المعلومات من سجل عقدة DBMS الأساسية.

على غرار الموصل الموصوف بالفعل لـ PgSQL ، هنا أيضًا ، في البداية ، يتم أخذ لقطة البيانات الأولية ، وبعد ذلك ينتقل الموصل إلى وضع قراءة oplog.

مثال على التكوين:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

كما ترى ، لا توجد خيارات جديدة مقارنة بالمثال السابق ، ولكن تم فقط تقليل عدد الخيارات المسؤولة عن الاتصال بقاعدة البيانات والبادئات الخاصة بها.

إعدادات transforms هذه المرة يفعلون ما يلي: تحويل اسم الموضوع الهدف من المخطط <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

التسامح مع الخطأ

أصبحت مسألة التسامح مع الأخطاء والتوافر العالي في عصرنا أكثر حدة من أي وقت مضى - خاصة عندما نتحدث عن البيانات والمعاملات ، ولا يكون تتبع تغيير البيانات على الهامش في هذا الأمر. لنلق نظرة على الخطأ الذي يمكن أن يحدث من حيث المبدأ وما سيحدث لـ Debezium في كل حالة.

هناك ثلاثة خيارات لإلغاء الاشتراك:

  1. فشل كافكا كونيكت. إذا تم تكوين الاتصال للعمل في الوضع الموزع ، فإن هذا يتطلب عدة عمال لتعيين نفس المجموعة. بعد ذلك ، إذا فشل أحدهما ، فسيتم إعادة تشغيل الموصل على العامل الآخر ومتابعة القراءة من آخر موضع تم الالتزام به في الموضوع في كافكا.
  2. فقدان الاتصال مع كتلة كافكا. سيتوقف الرابط ببساطة عن القراءة في الموضع الذي فشل في إرساله إلى كافكا ويحاول بشكل دوري إعادة إرساله حتى تنجح المحاولة.
  3. مصدر البيانات غير متوفر. سيحاول الموصل إعادة الاتصال بالمصدر وفقًا للتكوين. الافتراضي هو 16 محاولة باستخدام تراجع أسي. بعد المحاولة السادسة عشرة الفاشلة ، سيتم وضع علامة على المهمة كـ فشل وسيتعين إعادة تشغيله يدويًا عبر واجهة Kafka Connect REST.
    • في حالة كيو لن تضيع البيانات ، لأن سيؤدي استخدام فتحات النسخ المتماثل إلى منع حذف ملفات WAL التي لا يقرأها الموصل. في هذه الحالة ، هناك جانب سلبي: إذا تعطل اتصال الشبكة بين الموصل ونظام إدارة قواعد البيانات لفترة طويلة ، فهناك احتمال أن تنفد مساحة القرص ، وقد يؤدي ذلك إلى فشل نظام إدارة قواعد البيانات بالكامل.
    • في حالة MySQL يمكن تدوير ملفات binlog بواسطة نظام إدارة قواعد البيانات نفسه قبل استعادة الاتصال. سيؤدي هذا إلى دخول الموصل إلى الحالة الفاشلة ، وسيحتاج إلى إعادة التشغيل في وضع اللقطة الأولي لمتابعة القراءة من binlogs لاستعادة التشغيل العادي.
    • حول MongoDB. تقول الوثائق: إن سلوك الموصل في حالة حذف ملفات السجل / سجل التشغيل ولا يمكن للموصل متابعة القراءة من الموضع الذي توقف عنده هو نفسه بالنسبة لجميع نظم إدارة قواعد البيانات. يكمن في حقيقة أن الموصل سوف يدخل الدولة فشل وسيتطلب إعادة التشغيل في الوضع لقطة أولية.

      ومع ذلك ، هناك استثناءات. إذا كان الموصل في حالة قطع الاتصال لفترة طويلة (أو تعذر الوصول إلى مثيل MongoDB) ، وتم تدوير oplog خلال هذا الوقت ، فعند استعادة الاتصال ، سيستمر الموصل بهدوء في قراءة البيانات من أول موقع متاح ، وهذا هو سبب بعض البيانات في كافكا لا سوف يضرب.

اختتام

Debezium هي تجربتي الأولى مع أنظمة CDC وكانت إيجابية للغاية بشكل عام. قدم المشروع رشوة لدعم نظام إدارة قواعد البيانات الرئيسي ، وسهولة التكوين ، ودعم التجميع والمجتمع النشط. للمهتمين بالممارسة ، أوصي بأن تقرأ أدلة كافكا كونيكت и ديبيزيوم.

بالمقارنة مع موصل JDBC الخاص بـ Kafka Connect ، فإن الميزة الرئيسية لـ Debezium هي قراءة التغييرات من سجلات DBMS ، مما يسمح باستلام البيانات بأقل تأخير. يستعلم موصل JDBC (المقدم من Kafka Connect) عن الجدول المتعقب بفاصل زمني ثابت و (لنفس السبب) لا يُنشئ رسائل عند حذف البيانات (كيف يمكنك الاستعلام عن البيانات غير الموجودة؟).

لحل مشاكل مماثلة ، يمكنك الانتباه إلى الحلول التالية (بالإضافة إلى Debezium):

PS

اقرأ أيضًا على مدونتنا:

المصدر: www.habr.com

إضافة تعليق