
ในงานของฉัน ฉันมักจะพบกับโซลูชันทางเทคนิค/ผลิตภัณฑ์ซอฟต์แวร์ใหม่ๆ ซึ่งมีข้อมูลค่อนข้างน้อยในอินเทอร์เน็ตภาษารัสเซีย ในบทความนี้ ฉันจะพยายามเติมช่องว่างด้วยตัวอย่างจากแนวทางปฏิบัติล่าสุดของฉัน เมื่อฉันต้องกำหนดค่าการส่งเหตุการณ์ CDC จาก DBMS ยอดนิยมสองตัว (PostgreSQL และ MongoDB) ไปยังคลัสเตอร์ Kafka โดยใช้ Debezium ฉันหวังว่าบทความทบทวนนี้ซึ่งปรากฏเป็นผลมาจากงานที่ทำเสร็จแล้วจะเป็นประโยชน์ต่อผู้อื่น
Debezium และ CDC โดยทั่วไปคืออะไร?
— ตัวแทนของหมวดหมู่ซอฟต์แวร์ CDC () หรือให้เจาะจงกว่านั้นคือชุดตัวเชื่อมต่อสำหรับ DBMS ต่างๆ ที่เข้ากันได้กับเฟรมเวิร์ก Apache Kafka Connect
มัน ได้รับอนุญาตภายใต้ Apache License v2.0 และสนับสนุนโดย Red Hat การพัฒนาดำเนินไปอย่างต่อเนื่องตั้งแต่ปี 2016 และปัจจุบันให้การสนับสนุนอย่างเป็นทางการสำหรับ DBMS ต่อไปนี้: MySQL, PostgreSQL, MongoDB, SQL Server นอกจากนี้ยังมีตัวเชื่อมต่อสำหรับ Cassandra และ Oracle แต่ในขณะนี้ พวกเขาอยู่ในสถานะ "เข้าถึงก่อน" และรุ่นใหม่ไม่รับประกันความเข้ากันได้แบบย้อนหลัง
หากเราเปรียบเทียบ CDC กับวิธีการแบบดั้งเดิม (เมื่อแอปพลิเคชันอ่านข้อมูลจาก DBMS โดยตรง) ข้อได้เปรียบหลักๆ ก็คือการนำการเปลี่ยนแปลงข้อมูลไปใช้ในระดับแถวที่มีความหน่วงต่ำ ความน่าเชื่อถือสูง และความพร้อมใช้งาน สองจุดสุดท้ายทำได้โดยใช้คลัสเตอร์ Kafka เป็นที่เก็บข้อมูลสำหรับเหตุการณ์ CDC
ข้อดีอีกประการหนึ่งคือความจริงที่ว่ามีการใช้โมเดลเดียวในการจัดเก็บเหตุการณ์ ดังนั้นแอปพลิเคชันปลายทางจึงไม่ต้องกังวลกับความแตกต่างในการใช้งาน DBMS ที่แตกต่างกัน
สุดท้ายนี้ การใช้ตัวกลางส่งข้อความช่วยให้แอปพลิเคชันที่ติดตามการเปลี่ยนแปลงของข้อมูลขยายขนาดในแนวนอนได้ ในขณะเดียวกัน ผลกระทบต่อแหล่งข้อมูลก็ลดลง เนื่องจากข้อมูลไม่ได้มาจาก DBMS โดยตรง แต่มาจากคลัสเตอร์ Kafka
เกี่ยวกับสถาปัตยกรรม Debezium
การใช้ Debezium มีรูปแบบที่เรียบง่ายดังนี้:
DBMS (เป็นแหล่งข้อมูล) → ตัวเชื่อมต่อใน Kafka Connect → Apache Kafka → Consumer
นี่คือแผนภาพจากเว็บไซต์โครงการเพื่อเป็นตัวอย่าง:

อย่างไรก็ตามฉันไม่ชอบรูปแบบนี้มากนักเพราะดูเหมือนว่าจะสามารถใช้ตัวเชื่อมต่ออ่างล้างจานได้เท่านั้น
ในความเป็นจริง สถานการณ์แตกต่างออกไป: การเติมเต็ม Data Lake ของคุณ (ลิงค์สุดท้ายในแผนภาพด้านบน) นี่ไม่ใช่วิธีเดียวที่จะใช้ Debezium แอปพลิเคชันของคุณสามารถใช้เหตุการณ์ที่ส่งไปยัง Apache Kafka เพื่อจัดการกับสถานการณ์ต่างๆ ได้ ตัวอย่างเช่น:
- ลบข้อมูลที่ไม่เกี่ยวข้องออกจากแคช
- การส่งการแจ้งเตือน
- อัพเดตดัชนีการค้นหา
- บันทึกการตรวจสอบบางประเภท
- ...
ในกรณีที่คุณมีแอปพลิเคชัน Java และไม่จำเป็นต้องใช้/เป็นไปได้ที่จะใช้คลัสเตอร์ Kafka ก็มีความเป็นไปได้ที่จะทำงานผ่าน . ข้อได้เปรียบที่ชัดเจนคือไม่จำเป็นต้องใช้โครงสร้างพื้นฐานเพิ่มเติม (ในรูปแบบของตัวเชื่อมต่อและคาฟคา) อย่างไรก็ตาม โซลูชันนี้เลิกใช้แล้วตั้งแต่เวอร์ชัน 1.1 และไม่แนะนำให้ใช้อีกต่อไป (การสนับสนุนอาจถูกลบออกในรุ่นต่อๆ ไป)
บทความนี้จะกล่าวถึงสถาปัตยกรรมที่แนะนำโดยนักพัฒนา ซึ่งให้ความทนทานต่อข้อผิดพลาดและความสามารถในการปรับขนาด
การกำหนดค่าตัวเชื่อมต่อ
เพื่อเริ่มติดตามการเปลี่ยนแปลงในค่าที่สำคัญที่สุด - ข้อมูล - เราต้องการ:
- แหล่งข้อมูลซึ่งสามารถเป็น MySQL ได้ตั้งแต่เวอร์ชัน 5.7, PostgreSQL 9.6+, MongoDB 3.2+ ();
- คลัสเตอร์ Apache Kafka;
- อินสแตนซ์ Kafka Connect (เวอร์ชัน 1.x, 2.x);
- กำหนดค่าตัวเชื่อมต่อ Debezium แล้ว
ทำงานในสองประเด็นแรกคือ กระบวนการติดตั้ง DBMS และ Apache Kafka อยู่นอกเหนือขอบเขตของบทความ อย่างไรก็ตาม สำหรับผู้ที่ต้องการปรับใช้ทุกอย่างในแซนด์บ็อกซ์ พื้นที่เก็บข้อมูลอย่างเป็นทางการพร้อมตัวอย่างจะมีแบบสำเร็จรูป .
เราจะกล่าวถึงรายละเอียดเพิ่มเติมในสองประเด็นสุดท้าย
0. คาฟคาคอนเน็ค
ที่นี่และเพิ่มเติมในบทความ ตัวอย่างการกำหนดค่าทั้งหมดจะกล่าวถึงในบริบทของอิมเมจ Docker ที่เผยแพร่โดยนักพัฒนา Debezium ประกอบด้วยไฟล์ปลั๊กอินที่จำเป็นทั้งหมด (ตัวเชื่อมต่อ) และให้การกำหนดค่า Kafka Connect โดยใช้ตัวแปรสภาพแวดล้อม
หากคุณต้องการใช้ Kafka Connect จาก Confluent คุณจะต้องเพิ่มปลั๊กอินของตัวเชื่อมต่อที่จำเป็นลงในไดเร็กทอรีที่ระบุใน plugin.path หรือตั้งค่าผ่านตัวแปรสภาพแวดล้อม CLASSPATH. การตั้งค่าสำหรับผู้ปฏิบัติงาน Kafka Connect และตัวเชื่อมต่อถูกกำหนดผ่านไฟล์การกำหนดค่าที่ถูกส่งเป็นอาร์กิวเมนต์ไปยังคำสั่งเรียกใช้งานของผู้ปฏิบัติงาน สำหรับรายละเอียดเพิ่มเติม โปรดดู .
กระบวนการทั้งหมดของการตั้งค่า Debeizum ในเวอร์ชันตัวเชื่อมต่อนั้นดำเนินการในสองขั้นตอน ลองดูแต่ละรายการ:
1. การตั้งค่าเฟรมเวิร์ก Kafka Connect
หากต้องการสตรีมข้อมูลไปยังคลัสเตอร์ Apache Kafka พารามิเตอร์เฉพาะจะถูกตั้งค่าในเฟรมเวิร์ก Kafka Connect เช่น:
- พารามิเตอร์สำหรับการเชื่อมต่อกับคลัสเตอร์
- ชื่อของหัวข้อที่จะจัดเก็บการกำหนดค่าของตัวเชื่อมต่อโดยตรง
- ชื่อของกลุ่มที่ตัวเชื่อมต่อทำงานอยู่ (หากใช้โหมดกระจาย)
อิมเมจ Docker อย่างเป็นทางการของโปรเจ็กต์รองรับการกำหนดค่าโดยใช้ตัวแปรสภาพแวดล้อม - นี่คือสิ่งที่เราจะใช้ ดังนั้นให้ดาวน์โหลดภาพ:
docker pull debezium/connectชุดตัวแปรสภาพแวดล้อมขั้นต่ำที่จำเป็นในการเรียกใช้ตัวเชื่อมต่อมีดังนี้:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092— รายการเริ่มต้นของเซิร์ฟเวอร์คลัสเตอร์ Kafka เพื่อรับรายชื่อสมาชิกคลัสเตอร์ทั้งหมด -
OFFSET_STORAGE_TOPIC=connector-offsets- หัวข้อสำหรับจัดเก็บตำแหน่งที่ตัวเชื่อมต่ออยู่ในปัจจุบัน -
CONNECT_STATUS_STORAGE_TOPIC=connector-status— หัวข้อสำหรับจัดเก็บสถานะของตัวเชื่อมต่อและงาน -
CONFIG_STORAGE_TOPIC=connector-config— หัวข้อสำหรับการจัดเก็บข้อมูลการกำหนดค่าตัวเชื่อมต่อและงาน -
GROUP_ID=1— ตัวระบุของกลุ่มคนงานที่สามารถดำเนินการงานตัวเชื่อมต่อได้ จำเป็นเมื่อใช้แบบกระจาย (กระจาย) ระบอบการปกครอง
เราเปิดตัวคอนเทนเนอร์ด้วยตัวแปรเหล่านี้:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2หมายเหตุเกี่ยวกับรว์
ตามค่าเริ่มต้น Debezium จะเขียนข้อมูลในรูปแบบ JSON ซึ่งเป็นที่ยอมรับสำหรับแซนด์บ็อกซ์และข้อมูลจำนวนเล็กน้อย แต่อาจกลายเป็นปัญหาในฐานข้อมูลที่มีการโหลดสูง อีกทางเลือกหนึ่งสำหรับตัวแปลง JSON คือการทำให้ข้อความเป็นอนุกรมโดยใช้ เป็นรูปแบบไบนารี่ ซึ่งจะช่วยลดภาระบนระบบย่อย I/O ใน Apache Kafka
หากต้องการใช้ Avro คุณต้องปรับใช้แยกต่างหาก (สำหรับจัดเก็บไดอะแกรม) ตัวแปรสำหรับตัวแปลงจะมีลักษณะดังนี้:
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverterรายละเอียดเกี่ยวกับการใช้ Avro และการตั้งค่ารีจิสทรีนั้นอยู่นอกเหนือขอบเขตของบทความนี้ - เราจะใช้ JSON เพิ่มเติมเพื่อความชัดเจน
2. การกำหนดค่าตัวเชื่อมต่อเอง
ตอนนี้คุณสามารถไปที่การกำหนดค่าของตัวเชื่อมต่อได้โดยตรงซึ่งจะอ่านข้อมูลจากแหล่งที่มา
ลองดูตัวอย่างตัวเชื่อมต่อสำหรับ DBMS สองตัว: PostgreSQL และ MongoDB ซึ่งฉันมีประสบการณ์และมีความแตกต่าง (แม้ว่าจะเล็กน้อย แต่ในบางกรณีก็สำคัญ!)
การกำหนดค่าอธิบายไว้ในรูปแบบ JSON และอัปโหลดไปยัง Kafka Connect โดยใช้คำขอ POST
2.1. PostgreSQL
ตัวอย่างการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}หลักการทำงานของตัวเชื่อมต่อหลังจากการตั้งค่านี้ค่อนข้างง่าย:
- เมื่อเปิดตัวครั้งแรกจะเชื่อมต่อกับฐานข้อมูลที่ระบุในการกำหนดค่าและเริ่มในโหมด สแนปชอตเริ่มต้นโดยส่งชุดข้อมูลเริ่มต้นที่ได้รับโดยใช้เงื่อนไขไปยัง Kafka
SELECT * FROM table_name. - หลังจากการกำหนดค่าเริ่มต้นเสร็จสมบูรณ์ ตัวเชื่อมต่อจะเข้าสู่โหมดเพื่ออ่านการเปลี่ยนแปลงจากไฟล์ PostgreSQL WAL
เกี่ยวกับตัวเลือกที่ใช้:
-
name- ชื่อของขั้วต่อที่ใช้โครงร่างตามที่อธิบายด้านล่าง ในอนาคต ชื่อนี้จะใช้เพื่อทำงานกับตัวเชื่อมต่อ (เช่น ดูสถานะ/รีสตาร์ท/อัปเดตการกำหนดค่า) ผ่านทาง Kafka Connect REST API -
connector.class— คลาสตัวเชื่อมต่อ DBMS ที่จะใช้โดยตัวเชื่อมต่อที่กำหนดค่าไว้ -
plugin.name— ชื่อของปลั๊กอินสำหรับการถอดรหัสข้อมูลจากไฟล์ WAL แบบลอจิคัล มีให้เลือกwal2json,decoderbuffsиpgoutput. สองรายการแรกจำเป็นต้องติดตั้งส่วนขยายที่เหมาะสมใน DBMS และpgoutputสำหรับ PostgreSQL เวอร์ชัน 10 และสูงกว่านั้นไม่ต้องการการปรับแต่งเพิ่มเติม -
database.*— ตัวเลือกสำหรับการเชื่อมต่อกับฐานข้อมูลที่ไหนdatabase.server.name— ชื่ออินสแตนซ์ PostgreSQL ใช้เพื่อสร้างชื่อหัวข้อในคลัสเตอร์ Kafka -
table.include.list— รายการตารางที่เราต้องการติดตามการเปลี่ยนแปลง ระบุไว้ในรูปแบบschema.table_name; ไม่สามารถใช้ร่วมกับtable.exclude.list; -
heartbeat.interval.ms— ช่วงเวลา (เป็นมิลลิวินาที) ที่ตัวเชื่อมต่อส่งข้อความฮาร์ทบีทไปยังหัวข้อพิเศษ -
heartbeat.action.query— คำขอที่จะดำเนินการเมื่อส่งข้อความฮาร์ทบีทแต่ละข้อความ (ตัวเลือกปรากฏในเวอร์ชัน 1.1) -
slot.name— ชื่อของสล็อตการจำลองที่จะใช้โดยตัวเชื่อมต่อ publication.name- ชื่อ ใน PostgreSQL ซึ่งตัวเชื่อมต่อใช้ หากไม่มีอยู่ Debezium จะพยายามสร้างมันขึ้นมา หากผู้ใช้ที่ทำการเชื่อมต่อไม่มีสิทธิ์เพียงพอสำหรับการดำเนินการนี้ ตัวเชื่อมต่อจะสิ้นสุดลงพร้อมกับข้อผิดพลาด-
transformsกำหนดวิธีการเปลี่ยนชื่อหัวข้อเป้าหมายอย่างชัดเจน:-
transforms.AddPrefix.typeบ่งชี้ว่าเราจะใช้นิพจน์ทั่วไป -
transforms.AddPrefix.regex— หน้ากากที่กำหนดชื่อของหัวข้อเป้าหมายใหม่ -
transforms.AddPrefix.replacement- โดยตรงสิ่งที่เรากำลังกำหนดใหม่
-
ข้อมูลเพิ่มเติมเกี่ยวกับการเต้นของหัวใจและการเปลี่ยนแปลง
ตามค่าเริ่มต้น ตัวเชื่อมต่อจะส่งข้อมูลไปยัง Kafka สำหรับธุรกรรมที่คอมมิตแต่ละรายการ และ LSN (หมายเลขลำดับบันทึก) จะถูกบันทึกไว้ในหัวข้อบริการ offset. แต่จะเกิดอะไรขึ้นหากตัวเชื่อมต่อได้รับการกำหนดค่าให้อ่านไม่ใช่ฐานข้อมูลทั้งหมด แต่เป็นเพียงส่วนหนึ่งของตาราง (ซึ่งการอัพเดตข้อมูลไม่ได้เกิดขึ้นบ่อยครั้ง)
- ตัวเชื่อมต่อจะอ่านไฟล์ WAL และจะไม่ตรวจพบการทำธุรกรรมใดๆ ที่กระทำกับตารางที่ตัวเชื่อมต่อกำลังตรวจสอบอยู่
- ดังนั้น จะไม่ปรับปรุงตำแหน่งปัจจุบันในหัวข้อหรือในช่องการจำลองแบบ
- ซึ่งจะส่งผลให้ไฟล์ WAL ถูกเก็บไว้ในดิสก์และมีแนวโน้มว่าพื้นที่ดิสก์จะหมด
และนี่คือจุดที่ทางเลือกต่างๆ มาช่วยเหลือ heartbeat.interval.ms и heartbeat.action.query. การใช้ตัวเลือกเหล่านี้เป็นคู่ทำให้สามารถดำเนินการร้องขอเพื่อเปลี่ยนแปลงข้อมูลในตารางที่แยกจากกันทุกครั้งที่ส่งข้อความฮาร์ทบีท ดังนั้น LSN ที่ตัวเชื่อมต่ออยู่ในปัจจุบัน (ในช่องการจำลอง) จะได้รับการอัปเดตอย่างต่อเนื่อง ซึ่งช่วยให้ DBMS สามารถลบไฟล์ WAL ที่ไม่จำเป็นอีกต่อไปได้ คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับวิธีการทำงานของตัวเลือกต่างๆ .
อีกทางเลือกหนึ่งที่ควรค่าแก่การเอาใจใส่อย่างใกล้ชิดคือ transforms. แม้จะเน้นความสะดวกสบายและความสวยงามมากกว่า...
ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อโดยใช้นโยบายการตั้งชื่อต่อไปนี้: serverName.schemaName.tableName. มันอาจจะไม่สะดวกเสมอไป ตัวเลือก transforms คุณสามารถใช้นิพจน์ทั่วไปเพื่อกำหนดรายการตาราง เหตุการณ์ที่ต้องกำหนดเส้นทางไปยังหัวข้อที่มีชื่อเฉพาะ
ในการกำหนดค่าของเราขอขอบคุณ transforms สิ่งต่อไปนี้เกิดขึ้น: เหตุการณ์ CDC ทั้งหมดจากฐานข้อมูลที่ได้รับการตรวจสอบจะไปที่หัวข้อที่มีชื่อ data.cdc.dbname. มิฉะนั้น (หากไม่มีการตั้งค่าเหล่านี้) ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อสำหรับแต่ละตาราง เช่น: pg-dev.public.<table_name>.
ข้อจำกัดของตัวเชื่อมต่อ
เพื่อสรุปคำอธิบายการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL เราควรพูดถึงคุณสมบัติ/ข้อจำกัดของการดำเนินการต่อไปนี้:
- ฟังก์ชันการทำงานของตัวเชื่อมต่อสำหรับ PostgreSQL ขึ้นอยู่กับแนวคิดของการถอดรหัสแบบลอจิคัล ดังนั้นเขา ไม่ติดตามคำขอเปลี่ยนโครงสร้างฐานข้อมูล (DDL) - ดังนั้นข้อมูลนี้จะไม่อยู่ในหัวข้อ
- เนื่องจากมีการใช้สล็อตการจำลอง จึงสามารถเชื่อมต่อตัวเชื่อมต่อได้ เท่านั้น ไปยังอินสแตนซ์ DBMS ชั้นนำ
- หากผู้ใช้ที่ตัวเชื่อมต่อเชื่อมต่อกับฐานข้อมูลมีสิทธิ์แบบอ่านอย่างเดียว ก่อนการเปิดตัวครั้งแรก คุณจะต้องสร้างสล็อตการจำลองด้วยตนเองและเผยแพร่ไปยังฐานข้อมูล
การใช้การกำหนดค่า
ดังนั้น มาโหลดการกำหนดค่าของเราลงในตัวเชื่อมต่อกัน:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.jsonเราตรวจสอบว่าการดาวน์โหลดสำเร็จและตัวเชื่อมต่อเริ่มต้นแล้ว:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}เยี่ยมมาก: ตั้งค่าแล้วและพร้อมใช้งาน ตอนนี้ เรามาปลอมตัวเป็นผู้บริโภคและเชื่อมต่อกับ Kafka หลังจากนั้นเราจะเพิ่มและเปลี่ยนแปลงรายการในตาราง:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1ในหัวข้อของเราจะแสดงดังต่อไปนี้:
JSON ที่ยาวมากกับการเปลี่ยนแปลงของเรา
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}ในทั้งสองกรณี บันทึกจะประกอบด้วยคีย์ (PK) ของบันทึกที่มีการเปลี่ยนแปลง และสาระสำคัญของการเปลี่ยนแปลง: บันทึกนี้เกิดขึ้นก่อนอะไรและเกิดอะไรขึ้นหลังจากนั้น
- ในกรณีของ
INSERT: ค่าก่อน (before) เท่ากับnullและหลัง - บรรทัดที่ถูกแทรก - ในกรณีของ
UPDATE: ที่payload.beforeสถานะก่อนหน้าของบรรทัดจะปรากฏขึ้น และในpayload.after— ใหม่พร้อมแก่นแท้ของการเปลี่ยนแปลง
2.2 มอนโกดีบี
ตัวเชื่อมต่อนี้ใช้กลไกการจำลอง MongoDB มาตรฐาน โดยอ่านข้อมูลจาก oplog ของโหนด DBMS หลัก
เช่นเดียวกับตัวเชื่อมต่อที่อธิบายไว้แล้วสำหรับ PgSQL ที่นี่เช่นกัน เมื่อเริ่มต้นครั้งแรก สแนปชอตข้อมูลหลักจะถูกถ่าย หลังจากนั้นตัวเชื่อมต่อจะสลับไปที่โหมดการอ่าน oplog
ตัวอย่างการกำหนดค่า:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}อย่างที่คุณเห็น ไม่มีตัวเลือกใหม่ที่นี่เมื่อเทียบกับตัวอย่างก่อนหน้านี้ แต่มีเพียงจำนวนตัวเลือกที่รับผิดชอบในการเชื่อมต่อกับฐานข้อมูลและคำนำหน้าเท่านั้นที่ลดลง
การตั้งค่า transforms คราวนี้พวกเขาทำสิ่งต่อไปนี้: พวกเขาเปลี่ยนชื่อหัวข้อเป้าหมายจากสคีมา <server_name>.<db_name>.<collection_name> в data.cdc.mongo_<db_name>.
ความอดทนต่อความผิดพลาด
ปัญหาของการยอมรับข้อผิดพลาดและความพร้อมใช้งานสูงในยุคของเรานั้นรุนแรงมากขึ้นกว่าเดิม โดยเฉพาะอย่างยิ่งเมื่อเราพูดถึงข้อมูลและธุรกรรม และการติดตามการเปลี่ยนแปลงของข้อมูลไม่ได้แยกออกจากปัญหานี้ มาดูกันว่าหลักการอาจผิดพลาดอะไรได้บ้าง และจะเกิดอะไรขึ้นกับ Debezium ในแต่ละกรณี
มีสามตัวเลือกในการยกเลิก:
- Kafka Connect ล้มเหลว. หาก Connect ได้รับการกำหนดค่าให้ทำงานในโหมดกระจาย จะต้องมีผู้ปฏิบัติงานหลายคนเพื่อตั้งค่า group.id เดียวกัน จากนั้น หากหนึ่งในนั้นล้มเหลว ตัวเชื่อมต่อจะรีสตาร์ทกับพนักงานคนอื่นและอ่านต่อจากตำแหน่งที่คอมมิตครั้งล่าสุดในหัวข้อใน Kafka
- ขาดการเชื่อมต่อกับคลัสเตอร์ Kafka. ตัวเชื่อมต่อจะหยุดอ่านที่ตำแหน่งที่ไม่สามารถส่งไปยัง Kafka ได้ และจะพยายามส่งอีกครั้งเป็นระยะๆ จนกว่าความพยายามจะสำเร็จ
- แหล่งข้อมูลไม่พร้อมใช้งาน. ตัวเชื่อมต่อจะพยายามเชื่อมต่อกับแหล่งที่มาอีกครั้งตามที่กำหนดค่าไว้ ค่าเริ่มต้นคือพยายามใช้ 16 ครั้ง . หลังจากพยายามไม่สำเร็จครั้งที่ 16 งานจะถูกทำเครื่องหมายเป็น ล้มเหลว และคุณจะต้องรีสตาร์ทด้วยตนเองผ่านอินเทอร์เฟซ Kafka Connect REST
- ในกรณีของ PostgreSQL ข้อมูลจะไม่สูญหายเพราะว่า การใช้สล็อตการจำลองจะป้องกันไม่ให้คุณลบไฟล์ WAL ที่ตัวเชื่อมต่อไม่ได้อ่าน ในกรณีนี้ เหรียญก็มีข้อเสียเช่นกัน: หากการเชื่อมต่อเครือข่ายระหว่างตัวเชื่อมต่อและ DBMS ถูกรบกวนเป็นเวลานาน มีความเป็นไปได้ที่พื้นที่ดิสก์จะหมด และอาจนำไปสู่ความล้มเหลว DBMS ทั้งหมด
- ในกรณีของ MySQL ไฟล์ binlog สามารถหมุนได้โดย DBMS เองก่อนที่การเชื่อมต่อจะถูกกู้คืน ซึ่งจะทำให้ตัวเชื่อมต่อเข้าสู่สถานะล้มเหลว และเพื่อคืนค่าการทำงานปกติ คุณจะต้องรีสตาร์ทในโหมดสแน็ปช็อตเริ่มต้นเพื่ออ่านต่อจาก Binlog
- เกี่ยวกับ MongoDB. เอกสารระบุสถานะ: ลักษณะการทำงานของตัวเชื่อมต่อในกรณีที่ไฟล์บันทึก/oplog ถูกลบ และตัวเชื่อมต่อไม่สามารถอ่านต่อจากตำแหน่งที่ค้างไว้ได้จะเหมือนกันสำหรับ DBMS ทั้งหมด หมายความว่าตัวเชื่อมต่อจะเข้าสู่สถานะ ล้มเหลว และจะต้องรีสตาร์ทในโหมด สแนปชอตเริ่มต้น.
อย่างไรก็ตาม มีข้อยกเว้นอยู่ หากตัวเชื่อมต่อถูกตัดการเชื่อมต่อเป็นเวลานาน (หรือไม่สามารถเข้าถึงอินสแตนซ์ MongoDB) และ oplog ผ่านการหมุนเวียนในช่วงเวลานี้ จากนั้นเมื่อการเชื่อมต่อได้รับการกู้คืน ตัวเชื่อมต่อจะยังคงอ่านข้อมูลจากตำแหน่งแรกที่มีอยู่อย่างใจเย็น ซึ่งเป็นสาเหตุที่ข้อมูลบางส่วนในคาฟคา ไม่ จะตี
ข้อสรุป
Debezium เป็นประสบการณ์ครั้งแรกของฉันกับระบบ CDC และโดยรวมแล้วเป็นเชิงบวกมาก โปรเจ็กต์ได้รับชัยชนะด้วยการสนับสนุน DBMS หลัก ความง่ายในการกำหนดค่า การสนับสนุนการทำคลัสเตอร์ และชุมชนที่กระตือรือร้น สำหรับผู้ที่สนใจการปฏิบัติจริง แนะนำให้อ่านคำแนะนำสำหรับ и .
เมื่อเปรียบเทียบกับตัวเชื่อมต่อ JDBC สำหรับ Kafka Connect ข้อได้เปรียบหลักของ Debezium คือการอ่านการเปลี่ยนแปลงจากบันทึก DBMS ซึ่งช่วยให้สามารถรับข้อมูลโดยมีเวลาแฝงน้อยที่สุด ตัวเชื่อมต่อ JDBC (จาก Kafka Connect) สืบค้นตารางที่ได้รับการตรวจสอบในช่วงเวลาที่กำหนดและ (ด้วยเหตุผลเดียวกัน) จะไม่สร้างข้อความเมื่อข้อมูลถูกลบ (คุณจะสืบค้นข้อมูลที่ไม่มีอยู่ได้อย่างไร)
เพื่อแก้ไขปัญหาที่คล้ายกัน คุณสามารถใส่ใจกับวิธีแก้ไขปัญหาต่อไปนี้ (นอกเหนือจาก Debezium):
- โซลูชั่นหลายอย่างสำหรับ MySQL เท่านั้น:
- แต่นี่คือ "หมวดหมู่น้ำหนัก" ที่แตกต่างไปจากเดิมอย่างสิ้นเชิง
PS
อ่านเพิ่มเติมในบล็อกของเรา:
- «";
- «";
- «'
ที่มา: will.com
