แนะนำ Debezium - CDC สำหรับ Apache Kafka

แนะนำ Debezium - CDC สำหรับ Apache Kafka

ในงานของฉัน ฉันมักจะพบกับโซลูชันทางเทคนิค/ผลิตภัณฑ์ซอฟต์แวร์ใหม่ๆ ซึ่งมีข้อมูลค่อนข้างน้อยในอินเทอร์เน็ตภาษารัสเซีย ในบทความนี้ ฉันจะพยายามเติมช่องว่างด้วยตัวอย่างจากแนวทางปฏิบัติล่าสุดของฉัน เมื่อฉันต้องกำหนดค่าการส่งเหตุการณ์ CDC จาก DBMS ยอดนิยมสองตัว (PostgreSQL และ MongoDB) ไปยังคลัสเตอร์ Kafka โดยใช้ Debezium ฉันหวังว่าบทความทบทวนนี้ซึ่งปรากฏเป็นผลมาจากงานที่ทำเสร็จแล้วจะเป็นประโยชน์ต่อผู้อื่น

Debezium และ CDC โดยทั่วไปคืออะไร?

ดีเบเซียม — ตัวแทนของหมวดหมู่ซอฟต์แวร์ CDC (บันทึกการเปลี่ยนแปลงข้อมูล) หรือให้เจาะจงกว่านั้นคือชุดตัวเชื่อมต่อสำหรับ DBMS ต่างๆ ที่เข้ากันได้กับเฟรมเวิร์ก Apache Kafka Connect

มัน โครงการโอเพ่นซอร์ส ได้รับอนุญาตภายใต้ Apache License v2.0 และสนับสนุนโดย Red Hat การพัฒนาดำเนินไปอย่างต่อเนื่องตั้งแต่ปี 2016 และปัจจุบันให้การสนับสนุนอย่างเป็นทางการสำหรับ DBMS ต่อไปนี้: MySQL, PostgreSQL, MongoDB, SQL Server นอกจากนี้ยังมีตัวเชื่อมต่อสำหรับ Cassandra และ Oracle แต่ในขณะนี้ พวกเขาอยู่ในสถานะ "เข้าถึงก่อน" และรุ่นใหม่ไม่รับประกันความเข้ากันได้แบบย้อนหลัง

หากเราเปรียบเทียบ CDC กับวิธีการแบบดั้งเดิม (เมื่อแอปพลิเคชันอ่านข้อมูลจาก DBMS โดยตรง) ข้อได้เปรียบหลักๆ ก็คือการนำการเปลี่ยนแปลงข้อมูลไปใช้ในระดับแถวที่มีความหน่วงต่ำ ความน่าเชื่อถือสูง และความพร้อมใช้งาน สองจุดสุดท้ายทำได้โดยใช้คลัสเตอร์ Kafka เป็นที่เก็บข้อมูลสำหรับเหตุการณ์ CDC

ข้อดีอีกประการหนึ่งคือความจริงที่ว่ามีการใช้โมเดลเดียวในการจัดเก็บเหตุการณ์ ดังนั้นแอปพลิเคชันปลายทางจึงไม่ต้องกังวลกับความแตกต่างในการใช้งาน DBMS ที่แตกต่างกัน

สุดท้ายนี้ การใช้ตัวกลางส่งข้อความช่วยให้แอปพลิเคชันที่ติดตามการเปลี่ยนแปลงของข้อมูลขยายขนาดในแนวนอนได้ ในขณะเดียวกัน ผลกระทบต่อแหล่งข้อมูลก็ลดลง เนื่องจากข้อมูลไม่ได้มาจาก DBMS โดยตรง แต่มาจากคลัสเตอร์ Kafka

เกี่ยวกับสถาปัตยกรรม Debezium

การใช้ Debezium มีรูปแบบที่เรียบง่ายดังนี้:

DBMS (เป็นแหล่งข้อมูล) → ตัวเชื่อมต่อใน Kafka Connect → Apache Kafka → Consumer

นี่คือแผนภาพจากเว็บไซต์โครงการเพื่อเป็นตัวอย่าง:

แนะนำ Debezium - CDC สำหรับ Apache Kafka

อย่างไรก็ตามฉันไม่ชอบรูปแบบนี้มากนักเพราะดูเหมือนว่าจะสามารถใช้ตัวเชื่อมต่ออ่างล้างจานได้เท่านั้น

ในความเป็นจริง สถานการณ์แตกต่างออกไป: การเติมเต็ม Data Lake ของคุณ (ลิงค์สุดท้ายในแผนภาพด้านบน) นี่ไม่ใช่วิธีเดียวที่จะใช้ Debezium แอปพลิเคชันของคุณสามารถใช้เหตุการณ์ที่ส่งไปยัง Apache Kafka เพื่อจัดการกับสถานการณ์ต่างๆ ได้ ตัวอย่างเช่น:

  • ลบข้อมูลที่ไม่เกี่ยวข้องออกจากแคช
  • การส่งการแจ้งเตือน
  • อัพเดตดัชนีการค้นหา
  • บันทึกการตรวจสอบบางประเภท
  • ...

ในกรณีที่คุณมีแอปพลิเคชัน Java และไม่จำเป็นต้องใช้/เป็นไปได้ที่จะใช้คลัสเตอร์ Kafka ก็มีความเป็นไปได้ที่จะทำงานผ่าน ตัวเชื่อมต่อแบบฝัง. ข้อได้เปรียบที่ชัดเจนคือไม่จำเป็นต้องใช้โครงสร้างพื้นฐานเพิ่มเติม (ในรูปแบบของตัวเชื่อมต่อและคาฟคา) อย่างไรก็ตาม โซลูชันนี้เลิกใช้แล้วตั้งแต่เวอร์ชัน 1.1 และไม่แนะนำให้ใช้อีกต่อไป (การสนับสนุนอาจถูกลบออกในรุ่นต่อๆ ไป)

บทความนี้จะกล่าวถึงสถาปัตยกรรมที่แนะนำโดยนักพัฒนา ซึ่งให้ความทนทานต่อข้อผิดพลาดและความสามารถในการปรับขนาด

การกำหนดค่าตัวเชื่อมต่อ

เพื่อเริ่มติดตามการเปลี่ยนแปลงในค่าที่สำคัญที่สุด - ข้อมูล - เราต้องการ:

  1. แหล่งข้อมูลซึ่งสามารถเป็น MySQL ได้ตั้งแต่เวอร์ชัน 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (รายการเต็มรูปแบบ);
  2. คลัสเตอร์ Apache Kafka;
  3. อินสแตนซ์ Kafka Connect (เวอร์ชัน 1.x, 2.x);
  4. กำหนดค่าตัวเชื่อมต่อ Debezium แล้ว

ทำงานในสองประเด็นแรกคือ กระบวนการติดตั้ง DBMS และ Apache Kafka อยู่นอกเหนือขอบเขตของบทความ อย่างไรก็ตาม สำหรับผู้ที่ต้องการปรับใช้ทุกอย่างในแซนด์บ็อกซ์ พื้นที่เก็บข้อมูลอย่างเป็นทางการพร้อมตัวอย่างจะมีแบบสำเร็จรูป นักเทียบท่า-compose.yaml.

เราจะกล่าวถึงรายละเอียดเพิ่มเติมในสองประเด็นสุดท้าย

0. คาฟคาคอนเน็ค

ที่นี่และเพิ่มเติมในบทความ ตัวอย่างการกำหนดค่าทั้งหมดจะกล่าวถึงในบริบทของอิมเมจ Docker ที่เผยแพร่โดยนักพัฒนา Debezium ประกอบด้วยไฟล์ปลั๊กอินที่จำเป็นทั้งหมด (ตัวเชื่อมต่อ) และให้การกำหนดค่า Kafka Connect โดยใช้ตัวแปรสภาพแวดล้อม

หากคุณต้องการใช้ Kafka Connect จาก Confluent คุณจะต้องเพิ่มปลั๊กอินของตัวเชื่อมต่อที่จำเป็นลงในไดเร็กทอรีที่ระบุใน plugin.path หรือตั้งค่าผ่านตัวแปรสภาพแวดล้อม CLASSPATH. การตั้งค่าสำหรับผู้ปฏิบัติงาน Kafka Connect และตัวเชื่อมต่อถูกกำหนดผ่านไฟล์การกำหนดค่าที่ถูกส่งเป็นอาร์กิวเมนต์ไปยังคำสั่งเรียกใช้งานของผู้ปฏิบัติงาน สำหรับรายละเอียดเพิ่มเติม โปรดดู เอกสาร.

กระบวนการทั้งหมดของการตั้งค่า Debeizum ในเวอร์ชันตัวเชื่อมต่อนั้นดำเนินการในสองขั้นตอน ลองดูแต่ละรายการ:

1. การตั้งค่าเฟรมเวิร์ก Kafka Connect

หากต้องการสตรีมข้อมูลไปยังคลัสเตอร์ Apache Kafka พารามิเตอร์เฉพาะจะถูกตั้งค่าในเฟรมเวิร์ก Kafka Connect เช่น:

  • พารามิเตอร์สำหรับการเชื่อมต่อกับคลัสเตอร์
  • ชื่อของหัวข้อที่จะจัดเก็บการกำหนดค่าของตัวเชื่อมต่อโดยตรง
  • ชื่อของกลุ่มที่ตัวเชื่อมต่อทำงานอยู่ (หากใช้โหมดกระจาย)

อิมเมจ Docker อย่างเป็นทางการของโปรเจ็กต์รองรับการกำหนดค่าโดยใช้ตัวแปรสภาพแวดล้อม - นี่คือสิ่งที่เราจะใช้ ดังนั้นให้ดาวน์โหลดภาพ:

docker pull debezium/connect

ชุดตัวแปรสภาพแวดล้อมขั้นต่ำที่จำเป็นในการเรียกใช้ตัวเชื่อมต่อมีดังนี้:

  • BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092 — รายการเริ่มต้นของเซิร์ฟเวอร์คลัสเตอร์ Kafka เพื่อรับรายชื่อสมาชิกคลัสเตอร์ทั้งหมด
  • OFFSET_STORAGE_TOPIC=connector-offsets - หัวข้อสำหรับจัดเก็บตำแหน่งที่ตัวเชื่อมต่ออยู่ในปัจจุบัน
  • CONNECT_STATUS_STORAGE_TOPIC=connector-status — หัวข้อสำหรับจัดเก็บสถานะของตัวเชื่อมต่อและงาน
  • CONFIG_STORAGE_TOPIC=connector-config — หัวข้อสำหรับการจัดเก็บข้อมูลการกำหนดค่าตัวเชื่อมต่อและงาน
  • GROUP_ID=1 — ตัวระบุของกลุ่มคนงานที่สามารถดำเนินการงานตัวเชื่อมต่อได้ จำเป็นเมื่อใช้แบบกระจาย (กระจาย) ระบอบการปกครอง

เราเปิดตัวคอนเทนเนอร์ด้วยตัวแปรเหล่านี้:

docker run 
  -e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092' 
  -e GROUP_ID=1 
  -e CONFIG_STORAGE_TOPIC=my_connect_configs 
  -e OFFSET_STORAGE_TOPIC=my_connect_offsets 
  -e STATUS_STORAGE_TOPIC=my_connect_statuses  debezium/connect:1.2

หมายเหตุเกี่ยวกับรว์

ตามค่าเริ่มต้น Debezium จะเขียนข้อมูลในรูปแบบ JSON ซึ่งเป็นที่ยอมรับสำหรับแซนด์บ็อกซ์และข้อมูลจำนวนเล็กน้อย แต่อาจกลายเป็นปัญหาในฐานข้อมูลที่มีการโหลดสูง อีกทางเลือกหนึ่งสำหรับตัวแปลง JSON คือการทำให้ข้อความเป็นอนุกรมโดยใช้ รว์ เป็นรูปแบบไบนารี่ ซึ่งจะช่วยลดภาระบนระบบย่อย I/O ใน Apache Kafka

หากต้องการใช้ Avro คุณต้องปรับใช้แยกต่างหาก สคีมารีจิสทรี (สำหรับจัดเก็บไดอะแกรม) ตัวแปรสำหรับตัวแปลงจะมีลักษณะดังนี้:

name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER   
value: io.confluent.connect.avro.AvroConverter

รายละเอียดเกี่ยวกับการใช้ Avro และการตั้งค่ารีจิสทรีนั้นอยู่นอกเหนือขอบเขตของบทความนี้ - เราจะใช้ JSON เพิ่มเติมเพื่อความชัดเจน

2. การกำหนดค่าตัวเชื่อมต่อเอง

ตอนนี้คุณสามารถไปที่การกำหนดค่าของตัวเชื่อมต่อได้โดยตรงซึ่งจะอ่านข้อมูลจากแหล่งที่มา

ลองดูตัวอย่างตัวเชื่อมต่อสำหรับ DBMS สองตัว: PostgreSQL และ MongoDB ซึ่งฉันมีประสบการณ์และมีความแตกต่าง (แม้ว่าจะเล็กน้อย แต่ในบางกรณีก็สำคัญ!)

การกำหนดค่าอธิบายไว้ในรูปแบบ JSON และอัปโหลดไปยัง Kafka Connect โดยใช้คำขอ POST

2.1. PostgreSQL

ตัวอย่างการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL:

{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "definitelynotpassword",
    "database.dbname" : "dbname",
    "database.server.name": "pg-dev",
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
    "transforms": "AddPrefix",
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex": "pg-dev.public.(.*)",
    "transforms.AddPrefix.replacement": "data.cdc.dbname"
  }
}

หลักการทำงานของตัวเชื่อมต่อหลังจากการตั้งค่านี้ค่อนข้างง่าย:

  • เมื่อเปิดตัวครั้งแรกจะเชื่อมต่อกับฐานข้อมูลที่ระบุในการกำหนดค่าและเริ่มในโหมด สแนปชอตเริ่มต้นโดยส่งชุดข้อมูลเริ่มต้นที่ได้รับโดยใช้เงื่อนไขไปยัง Kafka SELECT * FROM table_name.
  • หลังจากการกำหนดค่าเริ่มต้นเสร็จสมบูรณ์ ตัวเชื่อมต่อจะเข้าสู่โหมดเพื่ออ่านการเปลี่ยนแปลงจากไฟล์ PostgreSQL WAL

เกี่ยวกับตัวเลือกที่ใช้:

  • name - ชื่อของขั้วต่อที่ใช้โครงร่างตามที่อธิบายด้านล่าง ในอนาคต ชื่อนี้จะใช้เพื่อทำงานกับตัวเชื่อมต่อ (เช่น ดูสถานะ/รีสตาร์ท/อัปเดตการกำหนดค่า) ผ่านทาง Kafka Connect REST API
  • connector.class — คลาสตัวเชื่อมต่อ DBMS ที่จะใช้โดยตัวเชื่อมต่อที่กำหนดค่าไว้
  • plugin.name — ชื่อของปลั๊กอินสำหรับการถอดรหัสข้อมูลจากไฟล์ WAL แบบลอจิคัล มีให้เลือก wal2json, decoderbuffs и pgoutput. สองรายการแรกจำเป็นต้องติดตั้งส่วนขยายที่เหมาะสมใน DBMS และ pgoutput สำหรับ PostgreSQL เวอร์ชัน 10 และสูงกว่านั้นไม่ต้องการการปรับแต่งเพิ่มเติม
  • database.* — ตัวเลือกสำหรับการเชื่อมต่อกับฐานข้อมูลที่ไหน database.server.name — ชื่ออินสแตนซ์ PostgreSQL ใช้เพื่อสร้างชื่อหัวข้อในคลัสเตอร์ Kafka
  • table.include.list — รายการตารางที่เราต้องการติดตามการเปลี่ยนแปลง ระบุไว้ในรูปแบบ schema.table_name; ไม่สามารถใช้ร่วมกับ table.exclude.list;
  • heartbeat.interval.ms — ช่วงเวลา (เป็นมิลลิวินาที) ที่ตัวเชื่อมต่อส่งข้อความฮาร์ทบีทไปยังหัวข้อพิเศษ
  • heartbeat.action.query — คำขอที่จะดำเนินการเมื่อส่งข้อความฮาร์ทบีทแต่ละข้อความ (ตัวเลือกปรากฏในเวอร์ชัน 1.1)
  • slot.name — ชื่อของสล็อตการจำลองที่จะใช้โดยตัวเชื่อมต่อ
  • publication.name - ชื่อ สิ่งพิมพ์ ใน PostgreSQL ซึ่งตัวเชื่อมต่อใช้ หากไม่มีอยู่ Debezium จะพยายามสร้างมันขึ้นมา หากผู้ใช้ที่ทำการเชื่อมต่อไม่มีสิทธิ์เพียงพอสำหรับการดำเนินการนี้ ตัวเชื่อมต่อจะสิ้นสุดลงพร้อมกับข้อผิดพลาด
  • transforms กำหนดวิธีการเปลี่ยนชื่อหัวข้อเป้าหมายอย่างชัดเจน:
    • transforms.AddPrefix.type บ่งชี้ว่าเราจะใช้นิพจน์ทั่วไป
    • transforms.AddPrefix.regex — หน้ากากที่กำหนดชื่อของหัวข้อเป้าหมายใหม่
    • transforms.AddPrefix.replacement - โดยตรงสิ่งที่เรากำลังกำหนดใหม่

ข้อมูลเพิ่มเติมเกี่ยวกับการเต้นของหัวใจและการเปลี่ยนแปลง

ตามค่าเริ่มต้น ตัวเชื่อมต่อจะส่งข้อมูลไปยัง Kafka สำหรับธุรกรรมที่คอมมิตแต่ละรายการ และ LSN (หมายเลขลำดับบันทึก) จะถูกบันทึกไว้ในหัวข้อบริการ offset. แต่จะเกิดอะไรขึ้นหากตัวเชื่อมต่อได้รับการกำหนดค่าให้อ่านไม่ใช่ฐานข้อมูลทั้งหมด แต่เป็นเพียงส่วนหนึ่งของตาราง (ซึ่งการอัพเดตข้อมูลไม่ได้เกิดขึ้นบ่อยครั้ง)

  • ตัวเชื่อมต่อจะอ่านไฟล์ WAL และจะไม่ตรวจพบการทำธุรกรรมใดๆ ที่กระทำกับตารางที่ตัวเชื่อมต่อกำลังตรวจสอบอยู่
  • ดังนั้น จะไม่ปรับปรุงตำแหน่งปัจจุบันในหัวข้อหรือในช่องการจำลองแบบ
  • ซึ่งจะส่งผลให้ไฟล์ WAL ถูกเก็บไว้ในดิสก์และมีแนวโน้มว่าพื้นที่ดิสก์จะหมด

และนี่คือจุดที่ทางเลือกต่างๆ มาช่วยเหลือ heartbeat.interval.ms и heartbeat.action.query. การใช้ตัวเลือกเหล่านี้เป็นคู่ทำให้สามารถดำเนินการร้องขอเพื่อเปลี่ยนแปลงข้อมูลในตารางที่แยกจากกันทุกครั้งที่ส่งข้อความฮาร์ทบีท ดังนั้น LSN ที่ตัวเชื่อมต่ออยู่ในปัจจุบัน (ในช่องการจำลอง) จะได้รับการอัปเดตอย่างต่อเนื่อง ซึ่งช่วยให้ DBMS สามารถลบไฟล์ WAL ที่ไม่จำเป็นอีกต่อไปได้ คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับวิธีการทำงานของตัวเลือกต่างๆ เอกสาร.

อีกทางเลือกหนึ่งที่ควรค่าแก่การเอาใจใส่อย่างใกล้ชิดคือ transforms. แม้จะเน้นความสะดวกสบายและความสวยงามมากกว่า...

ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อโดยใช้นโยบายการตั้งชื่อต่อไปนี้: serverName.schemaName.tableName. มันอาจจะไม่สะดวกเสมอไป ตัวเลือก transforms คุณสามารถใช้นิพจน์ทั่วไปเพื่อกำหนดรายการตาราง เหตุการณ์ที่ต้องกำหนดเส้นทางไปยังหัวข้อที่มีชื่อเฉพาะ

ในการกำหนดค่าของเราขอขอบคุณ transforms สิ่งต่อไปนี้เกิดขึ้น: เหตุการณ์ CDC ทั้งหมดจากฐานข้อมูลที่ได้รับการตรวจสอบจะไปที่หัวข้อที่มีชื่อ data.cdc.dbname. มิฉะนั้น (หากไม่มีการตั้งค่าเหล่านี้) ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อสำหรับแต่ละตาราง เช่น: pg-dev.public.<table_name>.

ข้อจำกัดของตัวเชื่อมต่อ

เพื่อสรุปคำอธิบายการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL เราควรพูดถึงคุณสมบัติ/ข้อจำกัดของการดำเนินการต่อไปนี้:

  1. ฟังก์ชันการทำงานของตัวเชื่อมต่อสำหรับ PostgreSQL ขึ้นอยู่กับแนวคิดของการถอดรหัสแบบลอจิคัล ดังนั้นเขา ไม่ติดตามคำขอเปลี่ยนโครงสร้างฐานข้อมูล (DDL) - ดังนั้นข้อมูลนี้จะไม่อยู่ในหัวข้อ
  2. เนื่องจากมีการใช้สล็อตการจำลอง จึงสามารถเชื่อมต่อตัวเชื่อมต่อได้ เท่านั้น ไปยังอินสแตนซ์ DBMS ชั้นนำ
  3. หากผู้ใช้ที่ตัวเชื่อมต่อเชื่อมต่อกับฐานข้อมูลมีสิทธิ์แบบอ่านอย่างเดียว ก่อนการเปิดตัวครั้งแรก คุณจะต้องสร้างสล็อตการจำลองด้วยตนเองและเผยแพร่ไปยังฐานข้อมูล

การใช้การกำหนดค่า

ดังนั้น มาโหลดการกำหนดค่าของเราลงในตัวเชื่อมต่อกัน:

curl -i -X POST -H "Accept:application/json" 
  -H  "Content-Type:application/json"  http://localhost:8083/connectors/ 
  -d @pg-con.json

เราตรวจสอบว่าการดาวน์โหลดสำเร็จและตัวเชื่อมต่อเริ่มต้นแล้ว:

$ curl -i http://localhost:8083/connectors/pg-connector/status 
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)

{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}

เยี่ยมมาก: ตั้งค่าแล้วและพร้อมใช้งาน ตอนนี้ เรามาปลอมตัวเป็นผู้บริโภคและเชื่อมต่อกับ Kafka หลังจากนั้นเราจะเพิ่มและเปลี่ยนแปลงรายการในตาราง:

$ kafka/bin/kafka-console-consumer.sh 
  --bootstrap-server kafka:9092 
  --from-beginning 
  --property print.key=true 
  --topic data.cdc.dbname

postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1

ในหัวข้อของเราจะแสดงดังต่อไปนี้:

JSON ที่ยาวมากกับการเปลี่ยนแปลงของเรา

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}

ในทั้งสองกรณี บันทึกจะประกอบด้วยคีย์ (PK) ของบันทึกที่มีการเปลี่ยนแปลง และสาระสำคัญของการเปลี่ยนแปลง: บันทึกนี้เกิดขึ้นก่อนอะไรและเกิดอะไรขึ้นหลังจากนั้น

  • ในกรณีของ INSERT: ค่าก่อน (before) เท่ากับ nullและหลัง - บรรทัดที่ถูกแทรก
  • ในกรณีของ UPDATE: ที่ payload.before สถานะก่อนหน้าของบรรทัดจะปรากฏขึ้น และใน payload.after — ใหม่พร้อมแก่นแท้ของการเปลี่ยนแปลง

2.2 มอนโกดีบี

ตัวเชื่อมต่อนี้ใช้กลไกการจำลอง MongoDB มาตรฐาน โดยอ่านข้อมูลจาก oplog ของโหนด DBMS หลัก

เช่นเดียวกับตัวเชื่อมต่อที่อธิบายไว้แล้วสำหรับ PgSQL ที่นี่เช่นกัน เมื่อเริ่มต้นครั้งแรก สแนปชอตข้อมูลหลักจะถูกถ่าย หลังจากนั้นตัวเชื่อมต่อจะสลับไปที่โหมดการอ่าน oplog

ตัวอย่างการกำหนดค่า:

{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}

อย่างที่คุณเห็น ไม่มีตัวเลือกใหม่ที่นี่เมื่อเทียบกับตัวอย่างก่อนหน้านี้ แต่มีเพียงจำนวนตัวเลือกที่รับผิดชอบในการเชื่อมต่อกับฐานข้อมูลและคำนำหน้าเท่านั้นที่ลดลง

การตั้งค่า transforms คราวนี้พวกเขาทำสิ่งต่อไปนี้: พวกเขาเปลี่ยนชื่อหัวข้อเป้าหมายจากสคีมา <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.

ความอดทนต่อความผิดพลาด

ปัญหาของการยอมรับข้อผิดพลาดและความพร้อมใช้งานสูงในยุคของเรานั้นรุนแรงมากขึ้นกว่าเดิม โดยเฉพาะอย่างยิ่งเมื่อเราพูดถึงข้อมูลและธุรกรรม และการติดตามการเปลี่ยนแปลงของข้อมูลไม่ได้แยกออกจากปัญหานี้ มาดูกันว่าหลักการอาจผิดพลาดอะไรได้บ้าง และจะเกิดอะไรขึ้นกับ Debezium ในแต่ละกรณี

มีสามตัวเลือกในการยกเลิก:

  1. Kafka Connect ล้มเหลว. หาก Connect ได้รับการกำหนดค่าให้ทำงานในโหมดกระจาย จะต้องมีผู้ปฏิบัติงานหลายคนเพื่อตั้งค่า group.id เดียวกัน จากนั้น หากหนึ่งในนั้นล้มเหลว ตัวเชื่อมต่อจะรีสตาร์ทกับพนักงานคนอื่นและอ่านต่อจากตำแหน่งที่คอมมิตครั้งล่าสุดในหัวข้อใน Kafka
  2. ขาดการเชื่อมต่อกับคลัสเตอร์ Kafka. ตัวเชื่อมต่อจะหยุดอ่านที่ตำแหน่งที่ไม่สามารถส่งไปยัง Kafka ได้ และจะพยายามส่งอีกครั้งเป็นระยะๆ จนกว่าความพยายามจะสำเร็จ
  3. แหล่งข้อมูลไม่พร้อมใช้งาน. ตัวเชื่อมต่อจะพยายามเชื่อมต่อกับแหล่งที่มาอีกครั้งตามที่กำหนดค่าไว้ ค่าเริ่มต้นคือพยายามใช้ 16 ครั้ง การถอยกลับแบบเอ็กซ์โปเนนเชียล. หลังจากพยายามไม่สำเร็จครั้งที่ 16 งานจะถูกทำเครื่องหมายเป็น ล้มเหลว และคุณจะต้องรีสตาร์ทด้วยตนเองผ่านอินเทอร์เฟซ Kafka Connect REST
    • ในกรณีของ PostgreSQL ข้อมูลจะไม่สูญหายเพราะว่า การใช้สล็อตการจำลองจะป้องกันไม่ให้คุณลบไฟล์ WAL ที่ตัวเชื่อมต่อไม่ได้อ่าน ในกรณีนี้ เหรียญก็มีข้อเสียเช่นกัน: หากการเชื่อมต่อเครือข่ายระหว่างตัวเชื่อมต่อและ DBMS ถูกรบกวนเป็นเวลานาน มีความเป็นไปได้ที่พื้นที่ดิสก์จะหมด และอาจนำไปสู่ความล้มเหลว DBMS ทั้งหมด
    • ในกรณีของ MySQL ไฟล์ binlog สามารถหมุนได้โดย DBMS เองก่อนที่การเชื่อมต่อจะถูกกู้คืน ซึ่งจะทำให้ตัวเชื่อมต่อเข้าสู่สถานะล้มเหลว และเพื่อคืนค่าการทำงานปกติ คุณจะต้องรีสตาร์ทในโหมดสแน็ปช็อตเริ่มต้นเพื่ออ่านต่อจาก Binlog
    • เกี่ยวกับ MongoDB. เอกสารระบุสถานะ: ลักษณะการทำงานของตัวเชื่อมต่อในกรณีที่ไฟล์บันทึก/oplog ถูกลบ และตัวเชื่อมต่อไม่สามารถอ่านต่อจากตำแหน่งที่ค้างไว้ได้จะเหมือนกันสำหรับ DBMS ทั้งหมด หมายความว่าตัวเชื่อมต่อจะเข้าสู่สถานะ ล้มเหลว และจะต้องรีสตาร์ทในโหมด สแนปชอตเริ่มต้น.

      อย่างไรก็ตาม มีข้อยกเว้นอยู่ หากตัวเชื่อมต่อถูกตัดการเชื่อมต่อเป็นเวลานาน (หรือไม่สามารถเข้าถึงอินสแตนซ์ MongoDB) และ oplog ผ่านการหมุนเวียนในช่วงเวลานี้ จากนั้นเมื่อการเชื่อมต่อได้รับการกู้คืน ตัวเชื่อมต่อจะยังคงอ่านข้อมูลจากตำแหน่งแรกที่มีอยู่อย่างใจเย็น ซึ่งเป็นสาเหตุที่ข้อมูลบางส่วนในคาฟคา ไม่ จะตี

ข้อสรุป

Debezium เป็นประสบการณ์ครั้งแรกของฉันกับระบบ CDC และโดยรวมแล้วเป็นเชิงบวกมาก โปรเจ็กต์ได้รับชัยชนะด้วยการสนับสนุน DBMS หลัก ความง่ายในการกำหนดค่า การสนับสนุนการทำคลัสเตอร์ และชุมชนที่กระตือรือร้น สำหรับผู้ที่สนใจการปฏิบัติจริง แนะนำให้อ่านคำแนะนำสำหรับ คาฟคาคอนเน็ค и ดีเบเซียม.

เมื่อเปรียบเทียบกับตัวเชื่อมต่อ JDBC สำหรับ Kafka Connect ข้อได้เปรียบหลักของ Debezium คือการอ่านการเปลี่ยนแปลงจากบันทึก DBMS ซึ่งช่วยให้สามารถรับข้อมูลโดยมีเวลาแฝงน้อยที่สุด ตัวเชื่อมต่อ JDBC (จาก Kafka Connect) สืบค้นตารางที่ได้รับการตรวจสอบในช่วงเวลาที่กำหนดและ (ด้วยเหตุผลเดียวกัน) จะไม่สร้างข้อความเมื่อข้อมูลถูกลบ (คุณจะสืบค้นข้อมูลที่ไม่มีอยู่ได้อย่างไร)

เพื่อแก้ไขปัญหาที่คล้ายกัน คุณสามารถใส่ใจกับวิธีแก้ไขปัญหาต่อไปนี้ (นอกเหนือจาก Debezium):

PS

อ่านเพิ่มเติมในบล็อกของเรา:

ที่มา: will.com

เพิ่มความคิดเห็น