Debezium متعارف ڪرايو - CDC Apache Kafka لاءِ

Debezium متعارف ڪرايو - CDC Apache Kafka لاءِ

منهنجي ڪم ۾، آئون اڪثر نوان ٽيڪنيڪل حل / سافٽ ويئر پراڊڪٽس ۾ ايندا آهن، جن بابت ڄاڻ روسي ڳالهائيندڙ انٽرنيٽ تي تمام گهٽ آهي. هن آرٽيڪل سان، مان هڪ اهڙي خال کي ڀرڻ جي ڪوشش ڪندس مثال سان منهنجي تازي مشق مان، جڏهن مون کي سيٽ ڪرڻ جي ضرورت هئي سي ڊي سي واقعن کي موڪلڻ لاءِ ٻن مشهور DBMSs (PostgreSQL ۽ MongoDB) کان ڪافڪا ڪلستر ڏانهن ڊيبيزيم استعمال ڪندي. مون کي اميد آهي ته هي جائزو آرٽيڪل، جيڪو ڪم جي نتيجي ۾ ظاهر ٿيو، ٻين لاء ڪارائتو ٿيندو.

عام طور تي ڊيبيزيم ۽ سي ڊي سي ڇا آهي؟

ڊيبيزيم - سي ڊي سي سافٽ ويئر جي درجي جو نمائندو (ڊيٽا جي تبديلي کي پڪڙيو)، يا وڌيڪ واضح طور تي، اهو مختلف DBMSs لاءِ ڪنيڪٽرن جو هڪ سيٽ آهي جيڪي Apache Kafka Connect فريم ورڪ سان مطابقت رکن ٿا.

هن اوپن سورس پروجيڪٽ، Apache License v2.0 تحت لائسنس يافته ۽ Red Hat پاران اسپانسر ٿيل. ترقي 2016 کان جاري آهي ۽ هن وقت اها هيٺين DBMS لاءِ سرڪاري مدد فراهم ڪري ٿي: MySQL، PostgreSQL، MongoDB، SQL Server. Cassandra ۽ Oracle لاءِ به رابطا آهن، پر اهي هن وقت ”ابتدائي پهچ“ جي حالت ۾ آهن، ۽ نوان رليز پٺتي پيل مطابقت جي ضمانت نٿا ڏين.

جيڪڏهن اسان روايتي طريقي سان سي ڊي سي جو مقابلو ڪريون ٿا (جڏهن اپليڪيشن سڌو سنئون ڊي بي ايم ايس کان ڊيٽا پڙهي ٿي)، پوء ان جي مکيه فائدن ۾ شامل آهي ڊيٽا جي تبديلي جي عمل کي لاڳو ڪرڻ واري سطح تي گهٽ ويڪرائي، اعلي اعتبار ۽ دستيابي سان. آخري ٻه پوائنٽ حاصل ڪيا ويا آهن ڪافڪا ڪلستر استعمال ڪندي سي ڊي سي واقعن لاءِ مخزن جي طور تي.

انهي سان گڏ، فائدن ۾ حقيقت اها آهي ته هڪ واحد ماڊل واقعن کي ذخيرو ڪرڻ لاء استعمال ڪيو ويندو آهي، تنهنڪري حتمي ايپليڪيشن مختلف ڊي بي ايم ايس کي هلائڻ جي نونسن بابت پريشان ٿيڻ جي ضرورت ناهي.

آخرڪار، هڪ پيغام بروکر استعمال ڪندي ايپليڪيشنن جي افقي اسڪيلنگ لاء گنجائش پيدا ڪري ٿي جيڪا ڊيٽا ۾ تبديلين کي ٽريڪ ڪري ٿي. ساڳئي وقت، ڊيٽا جي ماخذ تي اثر گھٽجي ويو آهي، ڇاڪاڻ ته ڊيٽا سڌو سنئون ڊي بي ايم ايس کان نه، پر ڪافڪا ڪلستر کان حاصل ڪئي وئي آهي.

Debezium فن تعمير جي باري ۾

Debezium استعمال ڪندي هن سادي منصوبي تي اچي ٿو:

DBMS (ڊيٽا ماخذ جي طور تي) → ڪافڪا ڪنيڪٽ ۾ ڪنيڪٽر → Apache Kafka → صارف

مثال طور، مان پروجيڪٽ ويب سائيٽ مان هڪ خاڪو ڏيندس:

Debezium متعارف ڪرايو - CDC Apache Kafka لاءِ

بهرحال، مون کي هن اسڪيم کي پسند نه ڪيو، ڇاڪاڻ ته اهو لڳي ٿو ته صرف هڪ سنڪ کنیکٹر ممڪن آهي.

حقيقت ۾، صورتحال مختلف آهي: توهان جي ڊيٽا ڍنڍ کي ڀرڻ (مٿي ڏنل ڊراگرام ۾ آخري لنڪ) Debezium استعمال ڪرڻ جو واحد طريقو ناهي. Apache Kafka ڏانهن موڪليل واقعا توهان جي ايپليڪيشنن طرفان مختلف حالتن کي حل ڪرڻ لاءِ استعمال ڪري سگھجن ٿا. مثال طور:

  • ڪيش مان غير لاڳاپيل ڊيٽا کي ختم ڪرڻ؛
  • اطلاع موڪلڻ؛
  • ڳولا انڊيڪس اپڊيٽ؛
  • ڪجهه قسم جا آڊٽ لاگز؛
  • ...

جيڪڏهن توهان وٽ جاوا ايپليڪيشن آهي ۽ ڪافڪا ڪلستر استعمال ڪرڻ جي ڪا ضرورت/امڪان ناهي، اتي پڻ ڪم ڪرڻ جو امڪان آهي شامل ٿيل ڪنيڪٽر. واضح پلس اهو آهي ته ان سان توهان اضافي زيربناء کي رد ڪري سگهو ٿا (هڪ کنیکٹر ۽ ڪافڪا جي صورت ۾). بهرحال، هي حل ورزن 1.1 کان ختم ڪيو ويو آهي ۽ هاڻي استعمال لاءِ سفارش نه ڪئي وئي آهي (اهو مستقبل جي رليز ۾ هٽائي سگهجي ٿو).

اهو آرٽيڪل ڊولپرز پاران تجويز ڪيل فن تعمير تي بحث ڪندو، جيڪو غلطي رواداري ۽ اسڪالبل مهيا ڪري ٿو.

ڪنيڪٽر جي جوڙجڪ

سڀ کان اهم قدر ۾ تبديلين کي ٽريڪ ڪرڻ شروع ڪرڻ لاء - ڊيٽا - اسان کي ضرورت آهي:

  1. ڊيٽا جو ذريعو، جيڪو MySQL ٿي سگھي ٿو ورجن 5.7 کان شروع ٿي، PostgreSQL 9.6+، MongoDB 3.2+ (مڪمل فهرست);
  2. Apache Kafka ڪلستر
  3. ڪافڪا ڪنيڪٽ مثال (ورزن 1.x، 2.x)؛
  4. ترتيب ڏنل Debezium connector.

پهرين ٻن نقطن تي ڪم ڪريو، يعني. DBMS ۽ Apache Kafka انسٽال ڪرڻ جو عمل آرٽيڪل جي دائري کان ٻاهر آهي. تنهن هوندي، انهن لاء جيڪي هر شي کي سينڊ باڪس ۾ لڳائڻ چاهيندا آهن، مثالن سان گڏ سرڪاري مخزن ۾ هڪ تيار ڪيل آهي. docker-compose.yaml.

اسان وڌيڪ تفصيل سان آخري ٻن پوائنٽن تي ڌيان ڏينداسين.

0. ڪافڪا ڪنيڪٽ

هتي ۽ بعد ۾ آرٽيڪل ۾، سڀني ترتيبن جا مثال سمجهيا وڃن ٿا ڊڪر تصوير جي حوالي سان ڊيبيزيم ڊولپرز پاران ورهايل. اهو سڀ ضروري پلگ ان فائلن تي مشتمل آهي (ڪنيڪٽرز) ۽ مهيا ڪري ٿو Kafka Connect ترتيب ڏيڻ ماحوليات جي متغير استعمال ڪندي.

جيڪڏهن توهان Confluent کان ڪافڪا ڪنيڪٽ استعمال ڪرڻ جو ارادو رکو ٿا، ته توهان کي گهربل ڪنيڪٽرن جا پلگ ان پاڻ ۾ شامل ڪرڻا پوندا. plugin.path يا هڪ ماحولياتي متغير ذريعي سيٽ ڪريو CLASSPATH. ڪافڪا ڪنيڪٽ ورڪر ۽ ڪنيڪٽرن لاءِ سيٽنگون ترتيب ڏنل فائلن جي ذريعي بيان ڪيون ويون آهن جيڪي ڪم ڪندڙ شروعاتي ڪمانڊ ڏانهن دليلن جي طور تي منظور ڪيون ويون آهن. تفصيل لاءِ ڏسو دستاويز.

Connector ورزن ۾ Debeizum قائم ڪرڻ جو سڄو عمل ٻن مرحلن ۾ ڪيو ويندو آهي. اچو ته انهن مان هر هڪ تي غور ڪريو:

1. ڪافڪا ڪنيڪٽ فريم ورڪ کي ترتيب ڏيڻ

Apache Kafka ڪلستر تي ڊيٽا کي وهڪرو ڪرڻ لاء، مخصوص پيٽرولر ڪافڪا ڪنيڪٽ فريم ورڪ ۾ مقرر ڪيا ويا آهن، جهڙوڪ:

  • ڪلستر ڪنيڪشن سيٽنگون،
  • عنوانن جا نالا جن ۾ ڪنيڪٽر جي ترتيب پاڻ کي محفوظ ڪئي ويندي،
  • گروپ جو نالو جنهن ۾ ڪنيڪٽر هلي رهيو آهي (تقسيم ٿيل موڊ استعمال ڪرڻ جي صورت ۾).

پروجيڪٽ جي سرڪاري ڊاکر تصوير ماحول جي متغيرن کي استعمال ڪندي ترتيب ڏيڻ جي حمايت ڪري ٿي - اھو اھو آھي جيڪو اسان استعمال ڪنداسين. پوء اچو ته تصوير ڊائون لوڊ ڪريو:

docker pull debezium/connect

ڪنيڪٽر کي هلائڻ لاءِ گهربل ماحوليات جي گھٽ ۾ گھٽ سيٽ ھيٺ ڏنل آھي:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 - ڪلستر ميمبرن جي مڪمل فهرست حاصل ڪرڻ لاء ڪافڪا ڪلستر سرور جي شروعاتي فهرست؛
  • OFFSET_STORAGE_TOPIC=connector-offsets - جڳهه کي محفوظ ڪرڻ لاء هڪ موضوع جتي ڪنيڪٽر هن وقت واقع آهي؛
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status - کنیکٹر ۽ ان جي ڪمن جي حيثيت کي محفوظ ڪرڻ لاء هڪ موضوع؛
  • CONFIG_STORAGE_TOPIC=connector-config - ڪنيڪٽر جي ترتيب واري ڊيٽا ۽ ان جي ڪمن کي محفوظ ڪرڻ لاءِ هڪ موضوع؛
  • GROUP_ID=1 - ڪارڪنن جي گروپ جي سڃاڻپ ڪندڙ جنهن تي ڪنيڪٽر جو ڪم سرانجام ڏئي سگهجي ٿو؛ ضرورت آهي جڏهن تقسيم استعمال ڪندي (ورهايل) راڄ

اسان انهن متغيرن سان ڪنٽينر شروع ڪريون ٿا:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

Avro بابت نوٽ

ڊفالٽ طور، Debezium JSON فارميٽ ۾ ڊيٽا لکندو آهي، جيڪو سينڊ باڪسز ۽ ڊيٽا جي ننڍي مقدار لاءِ قابل قبول آهي، پر وڏي لوڊ ٿيل ڊيٽابيس ۾ مسئلو ٿي سگهي ٿو. JSON ڪنورٽر جو متبادل آھي پيغامن کي سيريل ڪرڻ لاءِ استعمال ڪندي Avro بائنري فارميٽ ۾، جيڪو اپاچي ڪافڪا ۾ I/O سب سسٽم تي لوڊ گھٽائي ٿو.

Avro استعمال ڪرڻ لاء، توهان کي الڳ ڪرڻ جي ضرورت آهي اسڪيما-رجسٽري (اسڪيمن کي محفوظ ڪرڻ لاءِ). ڪنورٽر لاءِ متغير هن طرح نظر ايندا:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

Avro استعمال ڪرڻ ۽ ان لاءِ رجسٽري قائم ڪرڻ جا تفصيل مضمون جي دائري کان ٻاهر آهن - وڌيڪ وضاحت لاءِ، اسان JSON استعمال ڪنداسين.

2. پاڻ کي کنیکٹر قائم ڪرڻ

ھاڻي توھان سڌو سنئون ڪنيڪٽر جي ٺاھ جوڙ ڏانھن وڃو، جيڪو ذريعو مان ڊيٽا پڙھندو.

اچو ته ڏسو ڪنيڪٽرن جا مثال ٻن DBMS لاءِ: PostgreSQL ۽ MongoDB، جن لاءِ مون وٽ تجربو آهي ۽ جن لاءِ اختلاف آهن (جيتوڻيڪ ننڍو، پر ڪجهه حالتن ۾ اهم!).

تشڪيل JSON نوٽيشن ۾ بيان ڪئي وئي آهي ۽ پوسٽ درخواست استعمال ڪندي ڪافڪا ڪنيڪٽ تي اپ لوڊ ڪئي وئي آهي.

2.1. PostgreSQL

PostgreSQL لاءِ ڪنيڪٽر جي تشڪيل جو مثال:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

هن ٺاھ جوڙ کان پوء connector جي آپريشن جو اصول ڪافي سادو آهي:

  • پهرين شروعات ۾، اهو ترتيب ۾ بيان ڪيل ڊيٽابيس سان ڳنڍيندو آهي ۽ موڊ ۾ شروع ٿئي ٿو شروعاتي تصوير, ڪافڪا ڏانهن موڪلڻ شرطن سان حاصل ڪيل ڊيٽا جي شروعاتي سيٽ SELECT * FROM table_name.
  • شروعاتي مڪمل ٿيڻ کان پوء، کنیکٹر PostgreSQL WAL فائلن مان تبديلين پڙهڻ جي موڊ ۾ داخل ٿئي ٿو.

استعمال ٿيل اختيارن بابت:

  • name - ڪنيڪٽر جو نالو جنهن لاءِ هيٺ بيان ڪيل تشڪيل استعمال ڪئي وئي آهي؛ مستقبل ۾، هي نالو ڪفڪا ڪنيڪٽ REST API ذريعي ڪنيڪٽر سان ڪم ڪرڻ لاءِ استعمال ڪيو ويندو آهي (يعني اسٽيٽس ڏسو / ٻيهر شروع ڪريو / ترتيب تازه ڪريو)؛
  • connector.class - ڊي بي ايم ايس ڪنيڪٽر ڪلاس جيڪو ترتيب ڏنل ڪنيڪٽر پاران استعمال ڪيو ويندو؛
  • plugin.name WAL فائلن مان ڊيٽا جي منطقي ڊيڪوڊنگ لاءِ پلگ ان جو نالو آهي. مان چونڊڻ لاءِ دستياب آهي wal2json, decoderbuffs и pgoutput. پهرين ٻن کي DBMS ۾ مناسب ايڪسٽينشن جي انسٽاليشن جي ضرورت آهي، ۽ pgoutput PostgreSQL ورجن 10 ۽ ان کان وڌيڪ لاءِ اضافي ٺاھڻ جي ضرورت نه آھي.
  • database.* - ڊيٽابيس سان ڳنڍڻ جا اختيار، جتي database.server.name PostgreSQL مثال جو نالو ڪافڪا ڪلستر ۾ موضوع جو نالو ٺاهڻ لاءِ استعمال ڪيو ويو؛
  • table.include.list - جدولن جي هڪ فهرست جنهن ۾ اسان تبديلين کي ٽريڪ ڪرڻ چاهيون ٿا؛ فارميٽ ۾ ڏنل آهي schema.table_name؛ سان گڏ استعمال نه ٿو ڪري سگھجي table.exclude.list;
  • heartbeat.interval.ms - وقفو (ملي سيڪنڊن ۾) جنهن سان ڪنيڪٽر هڪ خاص موضوع تي دل جي ڌڙڪن جا پيغام موڪلي ٿو؛
  • heartbeat.action.query - هڪ درخواست جنهن تي عمل ڪيو ويندو جڏهن هر دل جي ڌڙڪڻ واري پيغام موڪليندي (آپشن 1.1 ورزن کان ظاهر ٿيو آهي)؛
  • slot.name - نقل واري سلاٽ جو نالو جيڪو کنیکٹر پاران استعمال ڪيو ويندو؛
  • publication.name - نالو اشاعت PostgreSQL ۾ جيڪو ڪنيڪٽر استعمال ڪري ٿو. صورت ۾ اهو موجود ناهي، Debezium ان کي ٺاهڻ جي ڪوشش ڪندو. جيڪڏهن صارف جنهن جي تحت ڪنيڪشن ٺاهيو ويو آهي، هن عمل لاء ڪافي حق نه آهن، ڪنيڪٽر هڪ غلطي سان نڪرندو؛
  • transforms اهو طئي ڪري ٿو ته ڪيئن صحيح طور تي ٽارگيٽ موضوع جو نالو تبديل ڪرڻ:
    • transforms.AddPrefix.type اشارو ڪري ٿو ته اسان باقاعده اظهار استعمال ڪنداسين؛
    • transforms.AddPrefix.regex - ماسڪ جنهن جي ذريعي ٽارگيٽ موضوع جو نالو ٻيهر بيان ڪيو ويو آهي؛
    • transforms.AddPrefix.replacement - سڌو سنئون جيڪو اسان ٻيهر بيان ڪيو آهي.

دل جي ڌڙڪن ۽ تبديلين بابت وڌيڪ

ڊفالٽ طور، ڪنيڪٽر هر ڪم ٿيل ٽرانزيڪشن لاءِ ڪافڪا ڏانهن ڊيٽا موڪلي ٿو، ۽ سروس جي موضوع تي پنهنجو LSN (لاگ تسلسل نمبر) لکي ٿو. offset. پر ڇا ٿيندو جيڪڏهن ڪنيڪٽر مڪمل ڊيٽابيس کي نه پڙهڻ لاءِ ترتيب ڏنو ويو آهي، پر ان جي جدولن جو صرف هڪ حصو (جنهن ۾ ڊيٽا ڪڏهن ڪڏهن اپڊيٽ ڪيو ويندو آهي)؟

  • ڪنيڪٽر WAL فائلن کي پڙهندو ۽ انهن ۾ ٽرانزيڪشن ڪمٽ کي نه ڳوليندو انهن جدولن ڏانهن جيڪو اهو مانيٽر ڪري ٿو.
  • تنهن ڪري، اهو پنهنجي موجوده پوزيشن کي اپڊيٽ نه ڪندو يا ته موضوع ۾ يا نقل واري سلاٽ ۾.
  • اهو، موڙ ۾، سبب ٿيندو WAL فائلن کي ڊسڪ تي "پڪڙجي" ۽ ممڪن طور تي ڊسڪ اسپيس ختم ٿي ويندي.

۽ هتي اختيارات بچاء لاء ايندا. heartbeat.interval.ms и heartbeat.action.query. انهن اختيارن کي جوڑوں ۾ استعمال ڪرڻ ممڪن بڻائي ٿو ته ڊيٽا کي تبديل ڪرڻ جي درخواست تي عمل ڪرڻ هڪ الڳ جدول ۾ هر دفعي دل جي ڌڙڪن جو پيغام موڪليو وڃي. اهڙيء طرح، LSN جنهن تي ڪنيڪٽر هن وقت واقع آهي (نقل جي سلاٽ ۾) مسلسل اپڊيٽ ڪيو ويندو آهي. هي DBMS کي اجازت ڏئي ٿو WAL فائلن کي هٽائڻ لاءِ جيڪي هاڻي گهربل نه آهن. وڌيڪ معلومات لاءِ ته اختيار ڪيئن ڪم ڪن ٿا، ڏسو دستاويز.

ٻيو اختيار جيڪو وڌيڪ ڌيان جي لائق آهي transforms. جيتوڻيڪ اها سهولت ۽ خوبصورتي بابت وڌيڪ آهي ...

ڊفالٽ طور، Debezium هيٺ ڏنل نالو ڏيڻ واري پاليسي استعمال ڪندي عنوان ٺاهي ٿو: serverName.schemaName.tableName. اهو هميشه آسان نه ٿي سگهي. اختيارن transforms باقاعده اظهار کي استعمال ڪندي، توهان جدولن جي هڪ فهرست بيان ڪري سگهو ٿا جن جي واقعن کي مخصوص نالي سان هڪ موضوع ڏانهن وڃڻ جي ضرورت آهي.

اسان جي ترتيب ۾ مهرباني ڪري transforms هيٺيان ٿئي ٿو: ٽريڪ ٿيل ڊيٽابيس مان سڀئي سي ڊي سي واقعا نالي سان موضوع ڏانهن ويندا data.cdc.dbname. ٻي صورت ۾ (انهن سيٽنگن کان سواء)، Debezium ڊفالٽ طور تي فارم جي هر ٽيبل لاء هڪ موضوع ٺاهيندو: pg-dev.public.<table_name>.

ڪنيڪٽر جون حدون

PostgreSQL لاء کنیکٹر جي تشريح جي وضاحت جي آخر ۾، ان جي ڪم جي هيٺين خاصيتن / حدن جي باري ۾ ڳالهائڻ جي قابل آهي:

  1. PostgreSQL لاءِ ڪنيڪٽر ڪارڪردگي منطقي ڊيڪوڊنگ جي تصور تي ڀاڙي ٿو. تنهن ڪري هن ڊيٽابيس جي جوڙجڪ کي تبديل ڪرڻ جي درخواستن کي ٽريڪ نٿو ڪري (DDL) - مطابق، هي ڊيٽا عنوانن ۾ نه هوندي.
  2. جيئن ته نقل واري سلاٽ استعمال ڪيا ويا آهن، ڪنيڪٽر جو ڪنيڪشن ممڪن آهي صرف ماسٽر DBMS مثال ڏانهن.
  3. جيڪڏهن صارف جنهن جي تحت ڪنيڪٽر ڊيٽابيس سان ڳنڍيندو آهي صرف پڙهڻ جا حق آهن، پوء پهرين لانچ کان پهريان، توهان کي دستي طور تي هڪ نقل واري سلاٽ ٺاهڻ ۽ ڊيٽابيس ۾ شايع ڪرڻ جي ضرورت پوندي.

ٺاھ جوڙ لاڳو ڪرڻ

سو اچو ته اسان جي ڪنيگريشن کي ڪنيڪٽر ۾ لوڊ ڪريون:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

اسان چيڪ ڪيو ته ڊائون لوڊ ڪامياب ٿي ويو ۽ ڪنيڪٽر شروع ڪيو:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

عظيم: اهو سيٽ اپ آهي ۽ وڃڻ لاءِ تيار آهي. هاڻي اچو ته هڪ صارف ٿيڻ جو مظاهرو ڪريون ۽ ڪافڪا سان ڳنڍيون، جنهن کان پوء اسان ٽيبل ۾ هڪ داخلا شامل ۽ تبديل ڪريون ٿا:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

اسان جي موضوع ۾، هن هيٺ ڏنل ڏيکاري ويندي:

اسان جي تبديلين سان تمام ڊگهو JSON

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

ٻنهي صورتن ۾، رڪارڊ ۾ تبديل ٿيل رڪارڊ جي ڪنجي (PK) تي مشتمل آهي، ۽ تبديلين جو بلڪل جوهر: رڪارڊ اڳ ڇا هو ۽ پوءِ ڇا ٿيو.

  • جي حالت ۾ INSERT: قدر کان اڳ (before) برابر nullان جي پٺيان لڳل تار.
  • جي حالت ۾ UPDATE: تي payload.before قطار جي پوئين حالت ڏيکاري ٿي، ۽ اندر payload.after - تبديلي جي جوهر سان نئون.

2.2 مونگو ڊي بي

هي ڪنيڪٽر معياري MongoDB نقل ڪرڻ واري ميڪانيزم کي استعمال ڪري ٿو، DBMS پرائمري نوڊ جي oplog کان معلومات پڙهڻ.

ساڳي طرح PgSQL لاءِ اڳ ۾ ئي بيان ڪيل ڪنيڪٽر، هتي، پڻ، پهرين شروعات ۾، بنيادي ڊيٽا سنيپ شاٽ ورتو وڃي ٿو، جنهن کان پوءِ ڪنيڪٽر اوپلاگ ريڊنگ موڊ تي سوئچ ڪري ٿو.

تشڪيل مثال:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

جئين توهان ڏسي سگهو ٿا، پوئين مثال جي مقابلي ۾ ڪو به نئون آپشن ناهي، پر صرف ڊيٽابيس سان ڳنڍڻ لاء ذميوار اختيارن جو تعداد ۽ انهن جي اڳڀرائي کي گهٽايو ويو آهي.

سيٽنگون transforms هن ڀيري اهي هيٺيان ڪندا آهن: اسڪيم مان ٽارگيٽ موضوع جو نالو ڦيرايو <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

غلطي رواداري

اسان جي وقت ۾ غلطي رواداري ۽ اعلي دستيابي جو مسئلو هميشه کان وڌيڪ شديد آهي - خاص طور تي جڏهن اسان ڊيٽا ۽ ٽرانزيڪشن بابت ڳالهايون ٿا، ۽ ڊيٽا جي تبديلي جي ٽريڪنگ هن معاملي ۾ پاسي تي نه آهي. اچو ته ڏسو ته ڇا اصول ۾ غلط ٿي سگهي ٿو ۽ هر صورت ۾ ڊيبيزيم کي ڇا ٿيندو.

اتي ٽي آپٽ آئوٽ آپشن آھن:

  1. Kafka Connect ناڪامي. جيڪڏهن ڪنيڪٽ کي ورهايل موڊ ۾ ڪم ڪرڻ لاءِ ترتيب ڏنو ويو آهي، ان لاءِ ڪيترن ئي ڪارڪنن کي ساڳئي group.id سيٽ ڪرڻ جي ضرورت آهي. پوء، جيڪڏهن انهن مان هڪ ناڪام ٿئي ٿي، ڪنيڪٽر ٻئي ڪم ڪندڙ تي ٻيهر شروع ڪيو ويندو ۽ ڪافڪا ۾ موضوع ۾ آخري عزم واري پوزيشن کان پڙهڻ جاري رکو.
  2. ڪافڪا ڪلستر سان رابطي جو نقصان. ڪنيڪٽر صرف ان پوزيشن تي پڙهڻ بند ڪندو جيڪو ڪافڪا ڏانهن موڪلڻ ۾ ناڪام ٿيو ۽ وقتي طور تي ان کي ٻيهر موڪلڻ جي ڪوشش ڪندو جيستائين ڪوشش ڪامياب نه ٿئي.
  3. ڊيٽا جو ذريعو دستياب ناهي. ڪنيڪٽر ترتيب جي مطابق ذريعن سان ٻيهر ڳنڍڻ جي ڪوشش ڪندو. ڊفالٽ آهي 16 ڪوششون استعمال ڪندي تجزياتي واپسي. 16 هين ناڪام ڪوشش کان پوء، ڪم کي نشان لڳايو ويندو ناڪام ٿي ويو ۽ ان کي دستي طور تي ٻيهر شروع ڪرڻ جي ضرورت پوندي Kafka Connect REST انٽرفيس ذريعي.
    • جي حالت ۾ PostgreSQL ڊيٽا ضايع نه ٿيندي، ڇاڪاڻ ته replication سلاٽ استعمال ڪرڻ سان WAL فائلن کي ڊليٽ ٿيڻ کان روڪيو ويندو جيڪو ڪنيڪٽر طرفان نه پڙهيو ويو آهي. هن معاملي ۾، اتي هڪ downside آهي: جيڪڏهن connector ۽ DBMS جي وچ ۾ نيٽ ورڪ رابطي هڪ ڊگهي وقت تائين بيٺو آهي، اتي هڪ موقعو آهي ته ڊسڪ جي جاء ختم ٿي ويندي، ۽ هن سڄي DBMS جي ناڪامي ٿي سگهي ٿي.
    • جي حالت ۾ هن MySQL ڪنيڪشن بحال ٿيڻ کان اڳ binlog فائلن کي ڊي بي ايم ايس پاران گھمائي سگھجي ٿو. اهو ڪنيڪٽر کي ناڪام رياست ۾ وڃڻ جو سبب بڻائيندو، ۽ ان کي شروعاتي سنيپ شاٽ موڊ ۾ ٻيهر شروع ڪرڻ جي ضرورت پوندي ته جيئن عام آپريشن کي بحال ڪرڻ لاءِ binlogs مان پڙهڻ جاري رکو.
    • تي منڊو ڊي. دستاويز چوي ٿو: ڪنيڪٽر جو رويو ان صورت ۾ جڏهن لاگ/اوپلاگ فائلون ڊهي ويون آهن ۽ ڪنيڪٽر ان پوزيشن کان پڙهڻ جاري نٿو رکي سگهي جتي اهو ڇڏي ويو آهي سڀني DBMS لاءِ ساڳيو آهي. اهو حقيقت ۾ ڪوڙ آهي ته کنیکٹر رياست ۾ ويندا ناڪام ٿي ويو ۽ موڊ ۾ ٻيهر شروع ڪرڻ جي ضرورت پوندي شروعاتي تصوير.

      بهرحال، اتي استثنا آهن. جيڪڏهن ڪنيڪٽر هڪ ڊگھي عرصي تائين منقطع حالت ۾ هو (يا مونگو ڊي بي مثال تائين پهچي نه سگهيو)، ۽ اوپلاگ هن وقت ۾ گھمايو ويو، پوء جڏهن ڪنيڪشن بحال ٿيندو، ڪنيڪٽر آرام سان پهرين دستياب پوزيشن کان ڊيٽا پڙهڻ جاري رکندو. ، جنهن ڪري ڪيفڪا ۾ ڪجهه ڊيٽا نه ماريندو.

ٿڪل

ڊيبيزيم سي ڊي سي سسٽم سان منهنجو پهريون تجربو آهي ۽ مجموعي طور تي تمام مثبت رهيو آهي. پروجيڪٽ بنيادي ڊي بي ايم ايس جي مدد، ترتيب جي آسانيء، ڪلسترنگ جي حمايت ۽ هڪ سرگرم ڪميونٽي جي مدد ڪئي. انھن لاءِ جيڪي مشق ۾ دلچسپي رکن ٿا، مان سفارش ڪريان ٿو ته توھان پڙھو ھدايتن لاءِ ڪافڪا ڪنيڪٽ и ڊيبيزيم.

ڪافڪا ڪنيڪٽ لاءِ JDBC ڪنيڪٽر جي مقابلي ۾، ڊيبيزيم جو بنيادي فائدو اهو آهي ته تبديليون پڙهيا وڃن ٿا DBMS لاگز مان، جيڪا ڊيٽا کي گهٽ ۾ گهٽ دير سان حاصل ڪرڻ جي اجازت ڏئي ٿي. JDBC ڪنيڪٽر (کافڪا ڪنيڪٽ پاران مهيا ڪيل) ٽريڪ ڪيل ٽيبل کي هڪ مقرر وقفي تي پڇي ٿو ۽ (ساڳئي سبب لاءِ) جڏهن ڊيٽا ڊهي وڃي ٿي ته پيغام پيدا نه ٿئي (توهان ان ڊيٽا لاءِ ڪيئن پڇا ڳاڇا ڪري سگهو ٿا جيڪو اتي ناهي؟).

ساڳئي مسئلن کي حل ڪرڻ لاء، توهان هيٺ ڏنل حلن تي ڌيان ڏئي سگهو ٿا (ڊيبيزيم کان علاوه):

پي ايس

اسان جي بلاگ تي پڻ پڙهو:

جو ذريعو: www.habr.com

تبصرو شامل ڪريو