ในงานของฉัน ฉันมักจะพบกับโซลูชันทางเทคนิค/ผลิตภัณฑ์ซอฟต์แวร์ใหม่ๆ ซึ่งมีข้อมูลค่อนข้างน้อยในอินเทอร์เน็ตภาษารัสเซีย ในบทความนี้ ฉันจะพยายามเติมช่องว่างด้วยตัวอย่างจากแนวทางปฏิบัติล่าสุดของฉัน เมื่อฉันต้องกำหนดค่าการส่งเหตุการณ์ CDC จาก DBMS ยอดนิยมสองตัว (PostgreSQL และ MongoDB) ไปยังคลัสเตอร์ Kafka โดยใช้ Debezium ฉันหวังว่าบทความทบทวนนี้ซึ่งปรากฏเป็นผลมาจากงานที่ทำเสร็จแล้วจะเป็นประโยชน์ต่อผู้อื่น
Debezium และ CDC โดยทั่วไปคืออะไร?
มัน
หากเราเปรียบเทียบ CDC กับวิธีการแบบดั้งเดิม (เมื่อแอปพลิเคชันอ่านข้อมูลจาก DBMS โดยตรง) ข้อได้เปรียบหลักๆ ก็คือการนำการเปลี่ยนแปลงข้อมูลไปใช้ในระดับแถวที่มีความหน่วงต่ำ ความน่าเชื่อถือสูง และความพร้อมใช้งาน สองจุดสุดท้ายทำได้โดยใช้คลัสเตอร์ Kafka เป็นที่เก็บข้อมูลสำหรับเหตุการณ์ CDC
ข้อดีอีกประการหนึ่งคือความจริงที่ว่ามีการใช้โมเดลเดียวในการจัดเก็บเหตุการณ์ ดังนั้นแอปพลิเคชันปลายทางจึงไม่ต้องกังวลกับความแตกต่างในการใช้งาน DBMS ที่แตกต่างกัน
สุดท้ายนี้ การใช้ตัวกลางส่งข้อความช่วยให้แอปพลิเคชันที่ติดตามการเปลี่ยนแปลงของข้อมูลขยายขนาดในแนวนอนได้ ในขณะเดียวกัน ผลกระทบต่อแหล่งข้อมูลก็ลดลง เนื่องจากข้อมูลไม่ได้มาจาก DBMS โดยตรง แต่มาจากคลัสเตอร์ Kafka
เกี่ยวกับสถาปัตยกรรม Debezium
การใช้ Debezium มีรูปแบบที่เรียบง่ายดังนี้:
DBMS (เป็นแหล่งข้อมูล) → ตัวเชื่อมต่อใน Kafka Connect → Apache Kafka → Consumer
นี่คือแผนภาพจากเว็บไซต์โครงการเพื่อเป็นตัวอย่าง:
อย่างไรก็ตามฉันไม่ชอบรูปแบบนี้มากนักเพราะดูเหมือนว่าจะสามารถใช้ตัวเชื่อมต่ออ่างล้างจานได้เท่านั้น
ในความเป็นจริง สถานการณ์แตกต่างออกไป: การเติมเต็ม Data Lake ของคุณ (ลิงค์สุดท้ายในแผนภาพด้านบน) นี่ไม่ใช่วิธีเดียวที่จะใช้ Debezium แอปพลิเคชันของคุณสามารถใช้เหตุการณ์ที่ส่งไปยัง Apache Kafka เพื่อจัดการกับสถานการณ์ต่างๆ ได้ ตัวอย่างเช่น:
- ลบข้อมูลที่ไม่เกี่ยวข้องออกจากแคช
- การส่งการแจ้งเตือน
- อัพเดตดัชนีการค้นหา
- บันทึกการตรวจสอบบางประเภท
- ...
ในกรณีที่คุณมีแอปพลิเคชัน Java และไม่จำเป็นต้องใช้/เป็นไปได้ที่จะใช้คลัสเตอร์ Kafka ก็มีความเป็นไปได้ที่จะทำงานผ่าน
บทความนี้จะกล่าวถึงสถาปัตยกรรมที่แนะนำโดยนักพัฒนา ซึ่งให้ความทนทานต่อข้อผิดพลาดและความสามารถในการปรับขนาด
การกำหนดค่าตัวเชื่อมต่อ
เพื่อเริ่มติดตามการเปลี่ยนแปลงในค่าที่สำคัญที่สุด - ข้อมูล - เราต้องการ:
- แหล่งข้อมูลซึ่งสามารถเป็น MySQL ได้ตั้งแต่เวอร์ชัน 5.7, PostgreSQL 9.6+, MongoDB 3.2+ (
รายการเต็มรูปแบบ ); - คลัสเตอร์ Apache Kafka;
- อินสแตนซ์ Kafka Connect (เวอร์ชัน 1.x, 2.x);
- กำหนดค่าตัวเชื่อมต่อ Debezium แล้ว
ทำงานในสองประเด็นแรกคือ กระบวนการติดตั้ง DBMS และ Apache Kafka อยู่นอกเหนือขอบเขตของบทความ อย่างไรก็ตาม สำหรับผู้ที่ต้องการปรับใช้ทุกอย่างในแซนด์บ็อกซ์ พื้นที่เก็บข้อมูลอย่างเป็นทางการพร้อมตัวอย่างจะมีแบบสำเร็จรูป
เราจะกล่าวถึงรายละเอียดเพิ่มเติมในสองประเด็นสุดท้าย
0. คาฟคาคอนเน็ค
ที่นี่และเพิ่มเติมในบทความ ตัวอย่างการกำหนดค่าทั้งหมดจะกล่าวถึงในบริบทของอิมเมจ Docker ที่เผยแพร่โดยนักพัฒนา Debezium ประกอบด้วยไฟล์ปลั๊กอินที่จำเป็นทั้งหมด (ตัวเชื่อมต่อ) และให้การกำหนดค่า Kafka Connect โดยใช้ตัวแปรสภาพแวดล้อม
หากคุณต้องการใช้ Kafka Connect จาก Confluent คุณจะต้องเพิ่มปลั๊กอินของตัวเชื่อมต่อที่จำเป็นลงในไดเร็กทอรีที่ระบุใน plugin.path
หรือตั้งค่าผ่านตัวแปรสภาพแวดล้อม CLASSPATH
. การตั้งค่าสำหรับผู้ปฏิบัติงาน Kafka Connect และตัวเชื่อมต่อถูกกำหนดผ่านไฟล์การกำหนดค่าที่ถูกส่งเป็นอาร์กิวเมนต์ไปยังคำสั่งเรียกใช้งานของผู้ปฏิบัติงาน สำหรับรายละเอียดเพิ่มเติม โปรดดู
กระบวนการทั้งหมดของการตั้งค่า Debeizum ในเวอร์ชันตัวเชื่อมต่อนั้นดำเนินการในสองขั้นตอน ลองดูแต่ละรายการ:
1. การตั้งค่าเฟรมเวิร์ก Kafka Connect
หากต้องการสตรีมข้อมูลไปยังคลัสเตอร์ Apache Kafka พารามิเตอร์เฉพาะจะถูกตั้งค่าในเฟรมเวิร์ก Kafka Connect เช่น:
- พารามิเตอร์สำหรับการเชื่อมต่อกับคลัสเตอร์
- ชื่อของหัวข้อที่จะจัดเก็บการกำหนดค่าของตัวเชื่อมต่อโดยตรง
- ชื่อของกลุ่มที่ตัวเชื่อมต่อทำงานอยู่ (หากใช้โหมดกระจาย)
อิมเมจ Docker อย่างเป็นทางการของโปรเจ็กต์รองรับการกำหนดค่าโดยใช้ตัวแปรสภาพแวดล้อม - นี่คือสิ่งที่เราจะใช้ ดังนั้นให้ดาวน์โหลดภาพ:
docker pull debezium/connect
ชุดตัวแปรสภาพแวดล้อมขั้นต่ำที่จำเป็นในการเรียกใช้ตัวเชื่อมต่อมีดังนี้:
-
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
— รายการเริ่มต้นของเซิร์ฟเวอร์คลัสเตอร์ Kafka เพื่อรับรายชื่อสมาชิกคลัสเตอร์ทั้งหมด -
OFFSET_STORAGE_TOPIC=connector-offsets
- หัวข้อสำหรับจัดเก็บตำแหน่งที่ตัวเชื่อมต่ออยู่ในปัจจุบัน -
CONNECT_STATUS_STORAGE_TOPIC=connector-status
— หัวข้อสำหรับจัดเก็บสถานะของตัวเชื่อมต่อและงาน -
CONFIG_STORAGE_TOPIC=connector-config
— หัวข้อสำหรับการจัดเก็บข้อมูลการกำหนดค่าตัวเชื่อมต่อและงาน -
GROUP_ID=1
— ตัวระบุของกลุ่มคนงานที่สามารถดำเนินการงานตัวเชื่อมต่อได้ จำเป็นเมื่อใช้แบบกระจาย (กระจาย) ระบอบการปกครอง
เราเปิดตัวคอนเทนเนอร์ด้วยตัวแปรเหล่านี้:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
หมายเหตุเกี่ยวกับรว์
ตามค่าเริ่มต้น Debezium จะเขียนข้อมูลในรูปแบบ JSON ซึ่งเป็นที่ยอมรับสำหรับแซนด์บ็อกซ์และข้อมูลจำนวนเล็กน้อย แต่อาจกลายเป็นปัญหาในฐานข้อมูลที่มีการโหลดสูง อีกทางเลือกหนึ่งสำหรับตัวแปลง JSON คือการทำให้ข้อความเป็นอนุกรมโดยใช้
หากต้องการใช้ Avro คุณต้องปรับใช้แยกต่างหาก
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
รายละเอียดเกี่ยวกับการใช้ Avro และการตั้งค่ารีจิสทรีนั้นอยู่นอกเหนือขอบเขตของบทความนี้ - เราจะใช้ JSON เพิ่มเติมเพื่อความชัดเจน
2. การกำหนดค่าตัวเชื่อมต่อเอง
ตอนนี้คุณสามารถไปที่การกำหนดค่าของตัวเชื่อมต่อได้โดยตรงซึ่งจะอ่านข้อมูลจากแหล่งที่มา
ลองดูตัวอย่างตัวเชื่อมต่อสำหรับ DBMS สองตัว: PostgreSQL และ MongoDB ซึ่งฉันมีประสบการณ์และมีความแตกต่าง (แม้ว่าจะเล็กน้อย แต่ในบางกรณีก็สำคัญ!)
การกำหนดค่าอธิบายไว้ในรูปแบบ JSON และอัปโหลดไปยัง Kafka Connect โดยใช้คำขอ POST
2.1. PostgreSQL
ตัวอย่างการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
หลักการทำงานของตัวเชื่อมต่อหลังจากการตั้งค่านี้ค่อนข้างง่าย:
- เมื่อเปิดตัวครั้งแรกจะเชื่อมต่อกับฐานข้อมูลที่ระบุในการกำหนดค่าและเริ่มในโหมด สแนปชอตเริ่มต้นโดยส่งชุดข้อมูลเริ่มต้นที่ได้รับโดยใช้เงื่อนไขไปยัง Kafka
SELECT * FROM table_name
. - หลังจากการกำหนดค่าเริ่มต้นเสร็จสมบูรณ์ ตัวเชื่อมต่อจะเข้าสู่โหมดเพื่ออ่านการเปลี่ยนแปลงจากไฟล์ PostgreSQL WAL
เกี่ยวกับตัวเลือกที่ใช้:
-
name
- ชื่อของขั้วต่อที่ใช้โครงร่างตามที่อธิบายด้านล่าง ในอนาคต ชื่อนี้จะใช้เพื่อทำงานกับตัวเชื่อมต่อ (เช่น ดูสถานะ/รีสตาร์ท/อัปเดตการกำหนดค่า) ผ่านทาง Kafka Connect REST API -
connector.class
— คลาสตัวเชื่อมต่อ DBMS ที่จะใช้โดยตัวเชื่อมต่อที่กำหนดค่าไว้ -
plugin.name
— ชื่อของปลั๊กอินสำหรับการถอดรหัสข้อมูลจากไฟล์ WAL แบบลอจิคัล มีให้เลือกwal2json
,decoderbuffs
иpgoutput
. สองรายการแรกจำเป็นต้องติดตั้งส่วนขยายที่เหมาะสมใน DBMS และpgoutput
สำหรับ PostgreSQL เวอร์ชัน 10 และสูงกว่านั้นไม่ต้องการการปรับแต่งเพิ่มเติม -
database.*
— ตัวเลือกสำหรับการเชื่อมต่อกับฐานข้อมูลที่ไหนdatabase.server.name
— ชื่ออินสแตนซ์ PostgreSQL ใช้เพื่อสร้างชื่อหัวข้อในคลัสเตอร์ Kafka -
table.include.list
— รายการตารางที่เราต้องการติดตามการเปลี่ยนแปลง ระบุไว้ในรูปแบบschema.table_name
; ไม่สามารถใช้ร่วมกับtable.exclude.list
; -
heartbeat.interval.ms
— ช่วงเวลา (เป็นมิลลิวินาที) ที่ตัวเชื่อมต่อส่งข้อความฮาร์ทบีทไปยังหัวข้อพิเศษ -
heartbeat.action.query
— คำขอที่จะดำเนินการเมื่อส่งข้อความฮาร์ทบีทแต่ละข้อความ (ตัวเลือกปรากฏในเวอร์ชัน 1.1) -
slot.name
— ชื่อของสล็อตการจำลองที่จะใช้โดยตัวเชื่อมต่อ publication.name
- ชื่อสิ่งพิมพ์ ใน PostgreSQL ซึ่งตัวเชื่อมต่อใช้ หากไม่มีอยู่ Debezium จะพยายามสร้างมันขึ้นมา หากผู้ใช้ที่ทำการเชื่อมต่อไม่มีสิทธิ์เพียงพอสำหรับการดำเนินการนี้ ตัวเชื่อมต่อจะสิ้นสุดลงพร้อมกับข้อผิดพลาด-
transforms
กำหนดวิธีการเปลี่ยนชื่อหัวข้อเป้าหมายอย่างชัดเจน:-
transforms.AddPrefix.type
บ่งชี้ว่าเราจะใช้นิพจน์ทั่วไป -
transforms.AddPrefix.regex
— หน้ากากที่กำหนดชื่อของหัวข้อเป้าหมายใหม่ -
transforms.AddPrefix.replacement
- โดยตรงสิ่งที่เรากำลังกำหนดใหม่
-
ข้อมูลเพิ่มเติมเกี่ยวกับการเต้นของหัวใจและการเปลี่ยนแปลง
ตามค่าเริ่มต้น ตัวเชื่อมต่อจะส่งข้อมูลไปยัง Kafka สำหรับธุรกรรมที่คอมมิตแต่ละรายการ และ LSN (หมายเลขลำดับบันทึก) จะถูกบันทึกไว้ในหัวข้อบริการ offset
. แต่จะเกิดอะไรขึ้นหากตัวเชื่อมต่อได้รับการกำหนดค่าให้อ่านไม่ใช่ฐานข้อมูลทั้งหมด แต่เป็นเพียงส่วนหนึ่งของตาราง (ซึ่งการอัพเดตข้อมูลไม่ได้เกิดขึ้นบ่อยครั้ง)
- ตัวเชื่อมต่อจะอ่านไฟล์ WAL และจะไม่ตรวจพบการทำธุรกรรมใดๆ ที่กระทำกับตารางที่ตัวเชื่อมต่อกำลังตรวจสอบอยู่
- ดังนั้น จะไม่ปรับปรุงตำแหน่งปัจจุบันในหัวข้อหรือในช่องการจำลองแบบ
- ซึ่งจะส่งผลให้ไฟล์ WAL ถูกเก็บไว้ในดิสก์และมีแนวโน้มว่าพื้นที่ดิสก์จะหมด
และนี่คือจุดที่ทางเลือกต่างๆ มาช่วยเหลือ heartbeat.interval.ms
и heartbeat.action.query
. การใช้ตัวเลือกเหล่านี้เป็นคู่ทำให้สามารถดำเนินการร้องขอเพื่อเปลี่ยนแปลงข้อมูลในตารางที่แยกจากกันทุกครั้งที่ส่งข้อความฮาร์ทบีท ดังนั้น LSN ที่ตัวเชื่อมต่ออยู่ในปัจจุบัน (ในช่องการจำลอง) จะได้รับการอัปเดตอย่างต่อเนื่อง ซึ่งช่วยให้ DBMS สามารถลบไฟล์ WAL ที่ไม่จำเป็นอีกต่อไปได้ คุณสามารถเรียนรู้เพิ่มเติมเกี่ยวกับวิธีการทำงานของตัวเลือกต่างๆ
อีกทางเลือกหนึ่งที่ควรค่าแก่การเอาใจใส่อย่างใกล้ชิดคือ transforms
. แม้จะเน้นความสะดวกสบายและความสวยงามมากกว่า...
ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อโดยใช้นโยบายการตั้งชื่อต่อไปนี้: serverName.schemaName.tableName
. มันอาจจะไม่สะดวกเสมอไป ตัวเลือก transforms
คุณสามารถใช้นิพจน์ทั่วไปเพื่อกำหนดรายการตาราง เหตุการณ์ที่ต้องกำหนดเส้นทางไปยังหัวข้อที่มีชื่อเฉพาะ
ในการกำหนดค่าของเราขอขอบคุณ transforms
สิ่งต่อไปนี้เกิดขึ้น: เหตุการณ์ CDC ทั้งหมดจากฐานข้อมูลที่ได้รับการตรวจสอบจะไปที่หัวข้อที่มีชื่อ data.cdc.dbname
. มิฉะนั้น (หากไม่มีการตั้งค่าเหล่านี้) ตามค่าเริ่มต้น Debezium จะสร้างหัวข้อสำหรับแต่ละตาราง เช่น: pg-dev.public.<table_name>
.
ข้อจำกัดของตัวเชื่อมต่อ
เพื่อสรุปคำอธิบายการกำหนดค่าตัวเชื่อมต่อสำหรับ PostgreSQL เราควรพูดถึงคุณสมบัติ/ข้อจำกัดของการดำเนินการต่อไปนี้:
- ฟังก์ชันการทำงานของตัวเชื่อมต่อสำหรับ PostgreSQL ขึ้นอยู่กับแนวคิดของการถอดรหัสแบบลอจิคัล ดังนั้นเขา ไม่ติดตามคำขอเปลี่ยนโครงสร้างฐานข้อมูล (DDL) - ดังนั้นข้อมูลนี้จะไม่อยู่ในหัวข้อ
- เนื่องจากมีการใช้สล็อตการจำลอง จึงสามารถเชื่อมต่อตัวเชื่อมต่อได้ เท่านั้น ไปยังอินสแตนซ์ DBMS ชั้นนำ
- หากผู้ใช้ที่ตัวเชื่อมต่อเชื่อมต่อกับฐานข้อมูลมีสิทธิ์แบบอ่านอย่างเดียว ก่อนการเปิดตัวครั้งแรก คุณจะต้องสร้างสล็อตการจำลองด้วยตนเองและเผยแพร่ไปยังฐานข้อมูล
การใช้การกำหนดค่า
ดังนั้น มาโหลดการกำหนดค่าของเราลงในตัวเชื่อมต่อกัน:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
เราตรวจสอบว่าการดาวน์โหลดสำเร็จและตัวเชื่อมต่อเริ่มต้นแล้ว:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
เยี่ยมมาก: ตั้งค่าแล้วและพร้อมใช้งาน ตอนนี้ เรามาปลอมตัวเป็นผู้บริโภคและเชื่อมต่อกับ Kafka หลังจากนั้นเราจะเพิ่มและเปลี่ยนแปลงรายการในตาราง:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', '[email protected]');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
ในหัวข้อของเราจะแสดงดังต่อไปนี้:
JSON ที่ยาวมากกับการเปลี่ยนแปลงของเรา
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"[email protected]"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"[email protected]"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
ในทั้งสองกรณี บันทึกจะประกอบด้วยคีย์ (PK) ของบันทึกที่มีการเปลี่ยนแปลง และสาระสำคัญของการเปลี่ยนแปลง: บันทึกนี้เกิดขึ้นก่อนอะไรและเกิดอะไรขึ้นหลังจากนั้น
- ในกรณีของ
INSERT
: ค่าก่อน (before
) เท่ากับnull
และหลัง - บรรทัดที่ถูกแทรก - ในกรณีของ
UPDATE
: ที่payload.before
สถานะก่อนหน้าของบรรทัดจะปรากฏขึ้น และในpayload.after
— ใหม่พร้อมแก่นแท้ของการเปลี่ยนแปลง
2.2 มอนโกดีบี
ตัวเชื่อมต่อนี้ใช้กลไกการจำลอง MongoDB มาตรฐาน โดยอ่านข้อมูลจาก oplog ของโหนด DBMS หลัก
เช่นเดียวกับตัวเชื่อมต่อที่อธิบายไว้แล้วสำหรับ PgSQL ที่นี่เช่นกัน เมื่อเริ่มต้นครั้งแรก สแนปชอตข้อมูลหลักจะถูกถ่าย หลังจากนั้นตัวเชื่อมต่อจะสลับไปที่โหมดการอ่าน oplog
ตัวอย่างการกำหนดค่า:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
อย่างที่คุณเห็น ไม่มีตัวเลือกใหม่ที่นี่เมื่อเทียบกับตัวอย่างก่อนหน้านี้ แต่มีเพียงจำนวนตัวเลือกที่รับผิดชอบในการเชื่อมต่อกับฐานข้อมูลและคำนำหน้าเท่านั้นที่ลดลง
การตั้งค่า transforms
คราวนี้พวกเขาทำสิ่งต่อไปนี้: พวกเขาเปลี่ยนชื่อหัวข้อเป้าหมายจากสคีมา <server_name>.<db_name>.<collection_name>
в data.cdc.mongo_<db_name>
.
ความอดทนต่อความผิดพลาด
ปัญหาของการยอมรับข้อผิดพลาดและความพร้อมใช้งานสูงในยุคของเรานั้นรุนแรงมากขึ้นกว่าเดิม โดยเฉพาะอย่างยิ่งเมื่อเราพูดถึงข้อมูลและธุรกรรม และการติดตามการเปลี่ยนแปลงของข้อมูลไม่ได้แยกออกจากปัญหานี้ มาดูกันว่าหลักการอาจผิดพลาดอะไรได้บ้าง และจะเกิดอะไรขึ้นกับ Debezium ในแต่ละกรณี
มีสามตัวเลือกในการยกเลิก:
- Kafka Connect ล้มเหลว. หาก Connect ได้รับการกำหนดค่าให้ทำงานในโหมดกระจาย จะต้องมีผู้ปฏิบัติงานหลายคนเพื่อตั้งค่า group.id เดียวกัน จากนั้น หากหนึ่งในนั้นล้มเหลว ตัวเชื่อมต่อจะรีสตาร์ทกับพนักงานคนอื่นและอ่านต่อจากตำแหน่งที่คอมมิตครั้งล่าสุดในหัวข้อใน Kafka
- ขาดการเชื่อมต่อกับคลัสเตอร์ Kafka. ตัวเชื่อมต่อจะหยุดอ่านที่ตำแหน่งที่ไม่สามารถส่งไปยัง Kafka ได้ และจะพยายามส่งอีกครั้งเป็นระยะๆ จนกว่าความพยายามจะสำเร็จ
- แหล่งข้อมูลไม่พร้อมใช้งาน. ตัวเชื่อมต่อจะพยายามเชื่อมต่อกับแหล่งที่มาอีกครั้งตามที่กำหนดค่าไว้ ค่าเริ่มต้นคือพยายามใช้ 16 ครั้ง
การถอยกลับแบบเอ็กซ์โปเนนเชียล . หลังจากพยายามไม่สำเร็จครั้งที่ 16 งานจะถูกทำเครื่องหมายเป็น ล้มเหลว และคุณจะต้องรีสตาร์ทด้วยตนเองผ่านอินเทอร์เฟซ Kafka Connect REST- ในกรณีของ PostgreSQL ข้อมูลจะไม่สูญหายเพราะว่า การใช้สล็อตการจำลองจะป้องกันไม่ให้คุณลบไฟล์ WAL ที่ตัวเชื่อมต่อไม่ได้อ่าน ในกรณีนี้ เหรียญก็มีข้อเสียเช่นกัน: หากการเชื่อมต่อเครือข่ายระหว่างตัวเชื่อมต่อและ DBMS ถูกรบกวนเป็นเวลานาน มีความเป็นไปได้ที่พื้นที่ดิสก์จะหมด และอาจนำไปสู่ความล้มเหลว DBMS ทั้งหมด
- ในกรณีของ MySQL ไฟล์ binlog สามารถหมุนได้โดย DBMS เองก่อนที่การเชื่อมต่อจะถูกกู้คืน ซึ่งจะทำให้ตัวเชื่อมต่อเข้าสู่สถานะล้มเหลว และเพื่อคืนค่าการทำงานปกติ คุณจะต้องรีสตาร์ทในโหมดสแน็ปช็อตเริ่มต้นเพื่ออ่านต่อจาก Binlog
- เกี่ยวกับ MongoDB. เอกสารระบุสถานะ: ลักษณะการทำงานของตัวเชื่อมต่อในกรณีที่ไฟล์บันทึก/oplog ถูกลบ และตัวเชื่อมต่อไม่สามารถอ่านต่อจากตำแหน่งที่ค้างไว้ได้จะเหมือนกันสำหรับ DBMS ทั้งหมด หมายความว่าตัวเชื่อมต่อจะเข้าสู่สถานะ ล้มเหลว และจะต้องรีสตาร์ทในโหมด สแนปชอตเริ่มต้น.
อย่างไรก็ตาม มีข้อยกเว้นอยู่ หากตัวเชื่อมต่อถูกตัดการเชื่อมต่อเป็นเวลานาน (หรือไม่สามารถเข้าถึงอินสแตนซ์ MongoDB) และ oplog ผ่านการหมุนเวียนในช่วงเวลานี้ จากนั้นเมื่อการเชื่อมต่อได้รับการกู้คืน ตัวเชื่อมต่อจะยังคงอ่านข้อมูลจากตำแหน่งแรกที่มีอยู่อย่างใจเย็น ซึ่งเป็นสาเหตุที่ข้อมูลบางส่วนในคาฟคา ไม่ จะตี
ข้อสรุป
Debezium เป็นประสบการณ์ครั้งแรกของฉันกับระบบ CDC และโดยรวมแล้วเป็นเชิงบวกมาก โปรเจ็กต์ได้รับชัยชนะด้วยการสนับสนุน DBMS หลัก ความง่ายในการกำหนดค่า การสนับสนุนการทำคลัสเตอร์ และชุมชนที่กระตือรือร้น สำหรับผู้ที่สนใจการปฏิบัติจริง แนะนำให้อ่านคำแนะนำสำหรับ
เมื่อเปรียบเทียบกับตัวเชื่อมต่อ JDBC สำหรับ Kafka Connect ข้อได้เปรียบหลักของ Debezium คือการอ่านการเปลี่ยนแปลงจากบันทึก DBMS ซึ่งช่วยให้สามารถรับข้อมูลโดยมีเวลาแฝงน้อยที่สุด ตัวเชื่อมต่อ JDBC (จาก Kafka Connect) สืบค้นตารางที่ได้รับการตรวจสอบในช่วงเวลาที่กำหนดและ (ด้วยเหตุผลเดียวกัน) จะไม่สร้างข้อความเมื่อข้อมูลถูกลบ (คุณจะสืบค้นข้อมูลที่ไม่มีอยู่ได้อย่างไร)
เพื่อแก้ไขปัญหาที่คล้ายกัน คุณสามารถใส่ใจกับวิธีแก้ไขปัญหาต่อไปนี้ (นอกเหนือจาก Debezium):
-
ตัวเชื่อมต่อ JDBC Kafka Connect; - โซลูชั่นหลายอย่างสำหรับ MySQL เท่านั้น:
-
ออราเคิล โกลเดนเกต แต่นี่คือ "หมวดหมู่น้ำหนัก" ที่แตกต่างไปจากเดิมอย่างสิ้นเชิง
PS
อ่านเพิ่มเติมในบล็อกของเรา:
- «
การกำหนดขนาดที่เหมาะสมสำหรับคลัสเตอร์ Kafka ใน Kubernetes "; - «
เรื่องราวที่เป็นประโยชน์จากชีวิตประจำวันของ SRE ของเรา ส่วนที่ 2 "; - «
ภาพรวมโดยย่อของคำสั่ง PostgreSQL สำหรับ Kubernetes ทางเลือกและประสบการณ์ของเรา '
ที่มา: will.com