คาฟคากลายเป็นความจริงได้อย่างไร

คาฟคากลายเป็นความจริงได้อย่างไร

เฮ้ ฮับ!

ฉันทำงานในทีม Tinkoff ซึ่งกำลังพัฒนาศูนย์การแจ้งเตือนของตัวเอง ส่วนใหญ่ฉันพัฒนาใน Java โดยใช้ Spring boot และแก้ไขปัญหาทางเทคนิคต่างๆ ที่เกิดขึ้นในโปรเจ็กต์

ไมโครเซอร์วิสส่วนใหญ่ของเราสื่อสารกันแบบอะซิงโครนัสผ่านนายหน้าข้อความ ก่อนหน้านี้ เราใช้ IBM MQ เป็นนายหน้า ซึ่งไม่สามารถรับมือกับโหลดได้อีกต่อไป แต่ในขณะเดียวกันก็มีการรับประกันการส่งมอบในระดับสูง

เพื่อเป็นการทดแทน เราได้เสนอ Apache Kafka ซึ่งมีศักยภาพในการปรับขนาดสูง แต่น่าเสียดายที่ต้องใช้วิธีการกำหนดค่าแบบเฉพาะบุคคลสำหรับสถานการณ์ที่แตกต่างกัน นอกจากนี้ กลไกการจัดส่งอย่างน้อยหนึ่งครั้งที่ทำงานใน Kafka โดยค่าเริ่มต้นไม่อนุญาตให้รักษาระดับความสอดคล้องที่ต้องการไว้นอกกรอบ ต่อไป ฉันจะแบ่งปันประสบการณ์ของเราในการกำหนดค่า Kafka โดยเฉพาะอย่างยิ่ง ฉันจะบอกคุณถึงวิธีการกำหนดค่าและใช้งานด้วยการส่งมอบครั้งเดียวกัน

รับประกันการจัดส่ง และอื่นๆ อีกมากมาย

การตั้งค่าที่กล่าวถึงด้านล่างจะช่วยป้องกันปัญหาหลายประการกับการตั้งค่าการเชื่อมต่อเริ่มต้น แต่ก่อนอื่น ฉันอยากจะให้ความสนใจกับพารามิเตอร์ตัวหนึ่งที่จะช่วยอำนวยความสะดวกในการแก้ไขข้อบกพร่องที่เป็นไปได้

สิ่งนี้จะช่วยได้ ลูกค้า.id สำหรับผู้ผลิตและผู้บริโภค เมื่อมองแวบแรก คุณสามารถใช้ชื่อแอปพลิเคชันเป็นค่าได้ และโดยส่วนใหญ่แล้วจะได้ผล แม้ว่าสถานการณ์เมื่อแอปพลิเคชันใช้ผู้บริโภคหลายรายและคุณให้ client.id เดียวกันแก่พวกเขา ผลลัพธ์ที่ได้คือคำเตือนต่อไปนี้:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

หากคุณต้องการใช้ JMX ในแอปพลิเคชันกับ Kafka นี่อาจเป็นปัญหาได้ ในกรณีนี้ วิธีที่ดีที่สุดคือใช้ชื่อแอปพลิเคชันผสมกัน เช่น ชื่อหัวข้อเป็นค่า client.id ผลลัพธ์ของการกำหนดค่าของเราสามารถดูได้ในเอาต์พุตคำสั่ง คาฟคา-กลุ่มผู้บริโภค จากสาธารณูปโภคจาก Confluent:

คาฟคากลายเป็นความจริงได้อย่างไร

ตอนนี้เรามาดูสถานการณ์สมมติในการรับประกันการส่งข้อความ Kafka Producer มีพารามิเตอร์ แอ๊คซึ่งช่วยให้คุณกำหนดค่าได้หลังจากจำนวนการรับทราบที่ผู้นำคลัสเตอร์จำเป็นต้องพิจารณาว่าข้อความที่เขียนสำเร็จ พารามิเตอร์นี้สามารถรับค่าต่อไปนี้:

  • 0 — รับทราบจะไม่ได้รับการพิจารณา
  • 1 คือพารามิเตอร์ดีฟอลต์ จำเป็นต้องมีเพียง 1 เรพลิกาเท่านั้นจึงจะรับทราบ
  • −1 - จำเป็นต้องมีการตอบรับจากแบบจำลองที่ซิงโครไนซ์ทั้งหมด (การตั้งค่าคลัสเตอร์ min.insync.replica).

จากค่าที่แสดงไว้ชัดเจนว่า acks เท่ากับ −1 ให้การรับประกันที่รัดกุมที่สุดว่าข้อความจะไม่สูญหาย

อย่างที่เราทราบกันดีว่าระบบแบบกระจายนั้นไม่น่าเชื่อถือ เพื่อป้องกันข้อผิดพลาดชั่วคราว Kafka Producer ได้จัดเตรียมทางเลือกไว้ ลองใหม่ซึ่งช่วยให้คุณกำหนดจำนวนครั้งในการส่งซ้ำภายในได้ การจัดส่ง.timeout.ms. เนื่องจากพารามิเตอร์การลองใหม่มีค่าเริ่มต้นเป็น Integer.MAX_VALUE (2147483647) จำนวนการลองข้อความใหม่จึงสามารถปรับได้โดยการเปลี่ยนเฉพาะ delivery.timeout.ms

เรากำลังก้าวไปสู่การส่งมอบครั้งเดียว

การตั้งค่าที่ระบุไว้ช่วยให้ผู้ผลิตของเราสามารถส่งข้อความที่มีการรับประกันในระดับสูง ตอนนี้เรามาพูดถึงวิธีการตรวจสอบให้แน่ใจว่ามีการเขียนข้อความเพียงสำเนาเดียวในหัวข้อ Kafka ในกรณีที่ง่ายที่สุด ในการดำเนินการนี้ คุณต้องตั้งค่าพารามิเตอร์บน Producer เปิดใช้งาน idempotence เป็นจริง Idempotency รับประกันว่ามีเพียงข้อความเดียวเท่านั้นที่ถูกเขียนไปยังพาร์ติชันเฉพาะของหัวข้อเดียว เงื่อนไขเบื้องต้นสำหรับการเปิดใช้งาน idempotency คือค่าต่างๆ acks = ทั้งหมด ลองอีกครั้ง > 0, max.in.flight.requests.per.connection ≤ 5. หากนักพัฒนาไม่ได้ระบุพารามิเตอร์เหล่านี้ ค่าข้างต้นจะถูกตั้งค่าโดยอัตโนมัติ

เมื่อมีการกำหนดค่า idempotency จำเป็นต้องตรวจสอบให้แน่ใจว่าข้อความเดียวกันจะจบลงในพาร์ติชันเดียวกันทุกครั้ง ซึ่งสามารถทำได้โดยการตั้งค่าคีย์ partitioner.class และพารามิเตอร์เป็น Producer เริ่มจากกุญแจกันก่อน จะต้องเหมือนกันสำหรับการส่งแต่ละครั้ง สามารถทำได้ง่ายๆ โดยใช้รหัสธุรกิจจากโพสต์ต้นฉบับ พารามิเตอร์ partitioner.class มีค่าเริ่มต้น - พาร์ติชั่นเริ่มต้น. ด้วยกลยุทธ์การแบ่งพาร์ติชันนี้ ตามค่าเริ่มต้นเราจะดำเนินการดังนี้:

  • หากมีการระบุพาร์ติชันอย่างชัดเจนเมื่อส่งข้อความแสดงว่าเราใช้พาร์ติชันนั้น
  • หากไม่ได้ระบุพาร์ติชัน แต่มีการระบุคีย์ ให้เลือกพาร์ติชันตามแฮชของคีย์
  • หากไม่ได้ระบุพาร์ติชันและคีย์ ให้เลือกพาร์ติชันทีละรายการ (round-robin)

นอกจากนี้การใช้คีย์และการส่ง idempotent พร้อมพารามิเตอร์ max.in.flight.requests.per.connection = 1 ช่วยให้คุณประมวลผลข้อความกับผู้บริโภคได้อย่างคล่องตัว นอกจากนี้ โปรดจำไว้ว่าหากมีการกำหนดค่าการควบคุมการเข้าถึงบนคลัสเตอร์ของคุณ คุณจะต้องมีสิทธิ์ในการเขียนหัวข้อโดยสมบูรณ์

หากจู่ๆ คุณขาดความสามารถในการส่ง idempotent ด้วยคีย์ หรือตรรกะในฝั่ง Producer จำเป็นต้องรักษาความสอดคล้องของข้อมูลระหว่างพาร์ติชันต่างๆ ธุรกรรมต่างๆ จะช่วยได้ นอกจากนี้ เมื่อใช้ธุรกรรมลูกโซ่ คุณสามารถซิงโครไนซ์บันทึกใน Kafka ได้ตามเงื่อนไข เช่น กับบันทึกในฐานข้อมูล หากต้องการเปิดใช้งานการส่งธุรกรรมไปยัง Producer จะต้องกำหนด idempotent และตั้งค่าเพิ่มเติม ธุรกรรม.id. หากคลัสเตอร์ Kafka ของคุณมีการกำหนดค่าการควบคุมการเข้าถึง บันทึกธุรกรรม เช่น บันทึก idempotent จะต้องมีสิทธิ์ในการเขียน ซึ่งสามารถให้สิทธิ์ได้โดยมาสก์โดยใช้ค่าที่จัดเก็บไว้ใน Transactional.id

อย่างเป็นทางการ สตริงใดๆ เช่น ชื่อแอปพลิเคชัน สามารถใช้เป็นตัวระบุธุรกรรมได้ แต่ถ้าคุณเปิดแอปพลิเคชันเดียวกันหลายอินสแตนซ์ด้วย Transactional.id เดียวกัน อินสแตนซ์ที่เปิดใช้งานครั้งแรกจะหยุดทำงานโดยมีข้อผิดพลาด เนื่องจาก Kafka จะพิจารณาว่าเป็นกระบวนการซอมบี้

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

เพื่อแก้ไขปัญหานี้ เราเพิ่มส่วนต่อท้ายให้กับชื่อแอปพลิเคชันในรูปแบบของชื่อโฮสต์ ซึ่งเราได้รับจากตัวแปรสภาพแวดล้อม

ผู้ผลิตได้รับการกำหนดค่าแล้ว แต่ธุรกรรมใน Kafka จะควบคุมเฉพาะขอบเขตของข้อความเท่านั้น ไม่ว่าสถานะธุรกรรมจะเป็นอย่างไร ข้อความจะไปที่หัวข้อทันที แต่มีคุณลักษณะของระบบเพิ่มเติม

เพื่อป้องกันไม่ให้ผู้บริโภคอ่านข้อความดังกล่าวล่วงหน้า จำเป็นต้องตั้งค่าพารามิเตอร์ การแยกระดับ เพื่อ read_commit ค่า ผู้บริโภคดังกล่าวจะสามารถอ่านข้อความที่ไม่ใช่ธุรกรรมได้เหมือนเมื่อก่อน และข้อความธุรกรรมหลังจากยืนยันแล้วเท่านั้น
หากคุณได้ตั้งค่าทั้งหมดที่แสดงไว้ก่อนหน้านี้ แสดงว่าคุณได้กำหนดค่าการจัดส่งครั้งเดียวทุกประการ ยินดีด้วย!

แต่มีความแตกต่างอีกอย่างหนึ่ง Transactional.id ซึ่งเรากำหนดค่าไว้ข้างต้น จริงๆ แล้วเป็นคำนำหน้าธุรกรรม ในตัวจัดการธุรกรรม หมายเลขลำดับจะถูกเพิ่มเข้าไป ตัวระบุที่ได้รับจะออกให้กับ ธุรกรรม.id.expiration.msซึ่งได้รับการกำหนดค่าบนคลัสเตอร์ Kafka และมีค่าเริ่มต้นเป็น “7 วัน” หากในช่วงเวลานี้แอปพลิเคชันไม่ได้รับข้อความใด ๆ เมื่อคุณลองส่งธุรกรรมครั้งถัดไป คุณจะได้รับ InvalidPidMappingException. ผู้ประสานงานธุรกรรมจะออกหมายเลขลำดับใหม่สำหรับธุรกรรมถัดไป อย่างไรก็ตาม ข้อความอาจหายไปถ้า InvalidPidMappingException ไม่ได้รับการจัดการอย่างถูกต้อง

แทนผลรวม

อย่างที่คุณเห็นการส่งข้อความถึงคาฟคาเพียงอย่างเดียวนั้นไม่เพียงพอ คุณต้องเลือกพารามิเตอร์รวมกันและเตรียมพร้อมที่จะทำการเปลี่ยนแปลงอย่างรวดเร็ว ในบทความนี้ ฉันพยายามแสดงรายละเอียดการตั้งค่าการจัดส่งเพียงครั้งเดียว และอธิบายปัญหาหลายประการเกี่ยวกับการกำหนดค่า client.id และ Transactional.id ที่เราพบ ด้านล่างนี้คือข้อมูลสรุปของการตั้งค่าผู้ผลิตและผู้บริโภค

ผู้ผลิต:

  1. acks = ทั้งหมด
  2. ลองใหม่ > 0
  3. Enable.idempotence = จริง
  4. max.in.flight.requests.per.connection ≤ 5 (1 สำหรับการส่งตามลำดับ)
  5. Transactional.id = ${ชื่อแอปพลิเคชัน}-${ชื่อโฮสต์}

ผู้บริโภค:

  1. Isolation.level = read_commission

เพื่อลดข้อผิดพลาดในการใช้งานในอนาคต เราได้สร้าง wrapper ของเราเองเหนือการกำหนดค่าสปริง โดยที่ค่าสำหรับพารามิเตอร์บางรายการในรายการได้รับการตั้งค่าไว้แล้ว

ต่อไปนี้เป็นเอกสารสองสามอย่างสำหรับการศึกษาด้วยตนเอง:

ที่มา: will.com

เพิ่มความคิดเห็น