หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์" สวัสดีชาวคาโบร! หนังสือเล่มนี้เหมาะสำหรับนักพัฒนาที่ต้องการทำความเข้าใจการประมวลผลเธรด การทำความเข้าใจการเขียนโปรแกรมแบบกระจายจะช่วยให้คุณเข้าใจ Kafka และ Kafka Streams ได้ดีขึ้น คงจะดีถ้าได้รู้จักกรอบการทำงานของ Kafka แต่ไม่จำเป็น: ฉันจะบอกทุกสิ่งที่คุณต้องการ นักพัฒนา Kafka และมือใหม่ที่มีประสบการณ์จะได้เรียนรู้วิธีสร้างแอปพลิเคชันการประมวลผลสตรีมที่น่าสนใจโดยใช้ไลบรารี Kafka Streams ในหนังสือเล่มนี้ นักพัฒนา Java ระดับกลางและขั้นสูงที่คุ้นเคยกับแนวคิดต่างๆ เช่น การทำให้เป็นอนุกรม จะได้เรียนรู้การใช้ทักษะของตนเพื่อสร้างแอปพลิเคชัน Kafka Streams ซอร์สโค้ดของหนังสือเขียนด้วยภาษา Java 8 และใช้ประโยชน์จากไวยากรณ์นิพจน์แลมบ์ดาของ Java 8 อย่างมาก ดังนั้นการรู้วิธีทำงานกับฟังก์ชันแลมบ์ดา (แม้จะเป็นภาษาโปรแกรมอื่นก็ตาม) จะมีประโยชน์มาก

ข้อความที่ตัดตอนมา 5.3. การดำเนินการรวมและหน้าต่าง

ในส่วนนี้ เราจะไปสำรวจส่วนที่มีแนวโน้มมากที่สุดของ Kafka Streams จนถึงตอนนี้ เราได้กล่าวถึงประเด็นต่างๆ ของ Kafka Streams ดังต่อไปนี้แล้ว:

  • การสร้างโทโพโลยีการประมวลผล
  • การใช้สถานะในแอปพลิเคชันสตรีมมิ่ง
  • ทำการเชื่อมต่อสตรีมข้อมูล
  • ความแตกต่างระหว่างสตรีมเหตุการณ์ (KStream) และสตรีมอัปเดต (KTable)

ในตัวอย่างต่อไปนี้ เราจะนำองค์ประกอบทั้งหมดเหล่านี้มารวมกัน นอกจากนี้คุณยังจะได้เรียนรู้เกี่ยวกับหน้าต่างซึ่งเป็นคุณสมบัติที่ยอดเยี่ยมอีกประการหนึ่งของแอปพลิเคชันสตรีมมิ่ง ตัวอย่างแรกของเราจะเป็นการรวมแบบง่ายๆ

5.3.1. การรวมยอดขายหุ้นตามภาคอุตสาหกรรม

การรวมและการจัดกลุ่มเป็นเครื่องมือสำคัญเมื่อทำงานกับข้อมูลการสตรีม การตรวจสอบบันทึกแต่ละรายการเมื่อได้รับมักจะไม่เพียงพอ หากต้องการแยกข้อมูลเพิ่มเติมออกจากข้อมูล จำเป็นต้องจัดกลุ่มและรวมเข้าด้วยกัน

ในตัวอย่างนี้ คุณจะสวมเครื่องแต่งกายของเดย์เทรดเดอร์ที่ต้องการติดตามปริมาณการขายหุ้นของบริษัทในหลายอุตสาหกรรม โดยเฉพาะคุณสนใจบริษัท XNUMX แห่งที่มีส่วนแบ่งการขายมากที่สุดในแต่ละอุตสาหกรรม

การรวมดังกล่าวจะต้องมีหลายขั้นตอนต่อไปนี้เพื่อแปลข้อมูลเป็นรูปแบบที่ต้องการ (พูดในแง่ทั่วไป)

  1. สร้างแหล่งข้อมูลตามหัวข้อที่เผยแพร่ข้อมูลการซื้อขายหุ้นดิบ เราจะต้องแมปวัตถุประเภท StockTransaction กับวัตถุประเภท ShareVolume ประเด็นก็คือออบเจ็กต์ StockTransaction มีข้อมูลเมตาการขาย แต่เราต้องการเพียงข้อมูลเกี่ยวกับจำนวนหุ้นที่ขายไป
  2. จัดกลุ่มข้อมูล ShareVolume ตามสัญลักษณ์หุ้น เมื่อจัดกลุ่มตามสัญลักษณ์แล้ว คุณสามารถยุบข้อมูลนี้เป็นผลรวมย่อยของปริมาณการขายหุ้นได้ เป็นที่น่าสังเกตว่าเมธอด KStream.groupBy ส่งคืนอินสแตนซ์ประเภท KGroupedStream และคุณสามารถรับอินสแตนซ์ KTable ได้โดยการเรียกเมธอด KGroupedStream.reduce เพิ่มเติม

อินเทอร์เฟซ KGgroupedStream คืออะไร

วิธีการ KStream.groupBy และ KStream.groupByKey ส่งคืนอินสแตนซ์ของ KGroupedStream KGroupedStream เป็นตัวแทนระดับกลางของสตรีมของเหตุการณ์หลังจากจัดกลุ่มตามคีย์ มันไม่ได้มีไว้สำหรับการทำงานโดยตรงกับมันเลย แต่ KGroupedStream จะใช้สำหรับการดำเนินการรวม ซึ่งจะส่งผลให้เกิด KTable เสมอ และเนื่องจากผลลัพธ์ของการดำเนินการรวมเป็น KTable และใช้ที่เก็บข้อมูลของรัฐ จึงเป็นไปได้ว่าการอัปเดตบางส่วนอาจไม่ถูกส่งต่อไปในไปป์ไลน์

เมธอด KTable.groupBy ส่งคืน KGroupedTable ที่คล้ายกัน ซึ่งเป็นการแสดงระดับกลางของสตรีมการอัปเดต ซึ่งจัดกลุ่มใหม่ตามคีย์

พักสักหน่อยแล้วดูรูป.. 5.9 ซึ่งแสดงให้เห็นสิ่งที่เราทำได้สำเร็จ โทโพโลยีนี้น่าจะคุ้นเคยกับคุณเป็นอย่างดี

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
ตอนนี้เรามาดูโค้ดสำหรับโทโพโลยีนี้ (สามารถพบได้ในไฟล์ src/main/java/bbejeck/chapter_5/AggregationsAndReduceExample.java) (รายการ 5.2)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
รหัสที่กำหนดมีความโดดเด่นด้วยความกะทัดรัดและการดำเนินการจำนวนมากในหลายบรรทัด คุณอาจสังเกตเห็นสิ่งใหม่ๆ ในพารามิเตอร์แรกของเมธอด builder.stream: ค่าของประเภท enum AutoOffsetReset.EARLIEST (ยังมี LATEST ด้วย) ตั้งค่าโดยใช้เมธอด Consumed.withOffsetResetPolicy ประเภทการแจงนับนี้สามารถใช้เพื่อระบุกลยุทธ์การรีเซ็ตออฟเซ็ตสำหรับ KStream หรือ KTable แต่ละรายการ และมีความสำคัญเหนือกว่าตัวเลือกการรีเซ็ตออฟเซ็ตจากการกำหนดค่า

GroupByKey และ GroupBy

อินเทอร์เฟซ KStream มีสองวิธีในการจัดกลุ่มบันทึก: GroupByKey และ GroupBy ทั้งสองส่งคืน KGgroupedTable ดังนั้นคุณอาจสงสัยว่าอะไรคือความแตกต่างระหว่างทั้งสองและเมื่อใดจึงควรใช้อันใด

เมธอด GroupByKey ถูกใช้เมื่อคีย์ใน KStream ไม่ว่างเปล่าอยู่แล้ว และที่สำคัญที่สุดคือไม่เคยตั้งค่าสถานะ "ต้องแบ่งพาร์ติชันใหม่"

กระบวนการGroupByวิธีถือว่าคุณได้เปลี่ยนคีย์การจัดกลุ่ม ดังนั้นการตั้งค่าสถานะการแบ่งพาร์ติชันจะถูกตั้งค่าเป็นจริง การดำเนินการรวม การรวมกลุ่ม ฯลฯ หลังจากวิธี GroupBy จะส่งผลให้มีการแบ่งพาร์ติชันใหม่โดยอัตโนมัติ
สรุป: เมื่อใดก็ตามที่เป็นไปได้ คุณควรใช้ GroupByKey แทน GroupBy

ชัดเจนว่าเมธอด mapValues ​​​​และ groupBy ทำหน้าที่อะไร ดังนั้นเรามาดูเมธอด sum() กันดีกว่า (พบได้ใน src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
วิธี ShareVolume.sum ส่งกลับผลรวมของปริมาณการขายสต็อก และผลลัพธ์ของห่วงโซ่การคำนวณทั้งหมดจะเป็นอ็อบเจ็กต์ KTable . ตอนนี้คุณเข้าใจบทบาทของ KTable แล้ว เมื่อออบเจ็กต์ ShareVolume มาถึง อ็อบเจ็กต์ KTable ที่เกี่ยวข้องจะจัดเก็บการอัปเดตล่าสุดในปัจจุบัน สิ่งสำคัญคือต้องจำไว้ว่าการอัปเดตทั้งหมดจะแสดงอยู่ใน shareVolumeKTable ก่อนหน้า แต่ไม่ใช่ทั้งหมดที่จะถูกส่งไปเพิ่มเติม

ต่อไป เมื่อใช้ KTable นี้ เราจะรวบรวม (ตามจำนวนหุ้นที่มีการซื้อขาย) เพื่อเข้าถึงบริษัท XNUMX แห่งที่มีปริมาณการซื้อขายหุ้นสูงสุดในแต่ละอุตสาหกรรม การกระทำของเราในกรณีนี้จะคล้ายกับการดำเนินการสำหรับการรวมกลุ่มครั้งแรก

  1. ดำเนินการ groupBy อื่นเพื่อจัดกลุ่มออบเจ็กต์ ShareVolume แต่ละรายการตามอุตสาหกรรม
  2. เริ่มการสรุปออบเจ็กต์ ShareVolume คราวนี้ออบเจ็กต์การรวมเป็นคิวลำดับความสำคัญที่มีขนาดคงที่ ในคิวที่มีขนาดคงที่นี้ มีเพียงห้าบริษัทที่มีจำนวนหุ้นที่ขายได้มากที่สุดเท่านั้นที่ยังคงอยู่
  3. แมปคิวจากย่อหน้าก่อนหน้าไปยังค่าสตริง และส่งคืนหุ้นที่มีการซื้อขายมากที่สุดห้าอันดับแรกตามจำนวนตามอุตสาหกรรม
  4. เขียนผลลัพธ์ในรูปแบบสตริงให้กับหัวข้อ

ในรูป รูปที่ 5.10 แสดงกราฟโทโพโลยีการไหลของข้อมูล อย่างที่คุณเห็น การประมวลผลรอบที่สองนั้นค่อนข้างง่าย

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
ตอนนี้เรามีความเข้าใจที่ชัดเจนเกี่ยวกับโครงสร้างของการประมวลผลรอบที่สองนี้แล้ว เราก็สามารถหันไปใช้ซอร์สโค้ดของมันได้ (คุณจะพบมันในไฟล์ src/main/java/bbejeck/chapter_5/AggregationsAndReduceExample.java) (Listing 5.4) .

เครื่องมือเริ่มต้นนี้มีตัวแปรแบบคงที่ นี่เป็นอ็อบเจ็กต์แบบกำหนดเองที่เป็นอะแด็ปเตอร์สำหรับ java.util.TreeSet ที่ใช้เพื่อติดตามผลลัพธ์ N อันดับแรกตามลำดับจากมากไปน้อยของการซื้อขายหุ้น

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
คุณเคยเห็นการเรียก groupBy และ mapValues ​​​​แล้ว ดังนั้นเราจะไม่เข้าไปในสิ่งเหล่านั้น (เรากำลังเรียกเมธอด KTable.toStream เนื่องจากเมธอด KTable.print เลิกใช้แล้ว) แต่คุณยังไม่ได้ดูเวอร์ชัน KTable ของการรวม () ดังนั้นเราจะใช้เวลาพูดคุยกันเล็กน้อย

อย่างที่คุณจำได้ สิ่งที่ทำให้ KTable แตกต่างก็คือบันทึกที่มีคีย์เดียวกันนั้นถือเป็นการอัปเดต KTable แทนที่รายการเก่าด้วยรายการใหม่ การรวมเกิดขึ้นในลักษณะเดียวกัน: บันทึกล่าสุดที่มีคีย์เดียวกันจะถูกรวมเข้าด้วยกัน เมื่อบันทึกมาถึง มันจะถูกเพิ่มไปยังอินสแตนซ์คลาส FixSizePriorityQueue โดยใช้ตัวบวก (พารามิเตอร์ตัวที่สองในการเรียกเมธอดรวม) แต่หากมีบันทึกอื่นที่มีคีย์เดียวกันอยู่แล้ว บันทึกเก่าจะถูกลบออกโดยใช้ตัวลบ (พารามิเตอร์ตัวที่สามใน การเรียกวิธีการรวม)

ทั้งหมดนี้หมายความว่าผู้รวบรวมของเรา FixSizePriorityQueue ไม่ได้รวมค่าทั้งหมดด้วยคีย์เดียว แต่เก็บผลรวมที่เคลื่อนไหวของปริมาณของหุ้น N ประเภทที่มีการซื้อขายมากที่สุด แต่ละรายการที่เข้ามาประกอบด้วยจำนวนหุ้นที่ขายไปแล้วทั้งหมด KTable จะให้ข้อมูลเกี่ยวกับหุ้นของบริษัทใดที่มีการซื้อขายมากที่สุดในปัจจุบัน โดยไม่ต้องมีการรวบรวมการอัปเดตแต่ละครั้ง

เราเรียนรู้ที่จะทำสองสิ่งสำคัญ:

  • จัดกลุ่มค่าใน KTable ด้วยคีย์ทั่วไป
  • ดำเนินการที่เป็นประโยชน์ เช่น การสรุปและการรวมกลุ่มกับค่าที่จัดกลุ่มเหล่านี้

การรู้วิธีดำเนินการเหล่านี้เป็นสิ่งสำคัญในการทำความเข้าใจความหมายของข้อมูลที่เคลื่อนย้ายผ่านแอปพลิเคชัน Kafka Streams และทำความเข้าใจว่ามีข้อมูลใดบ้างที่บรรจุอยู่

นอกจากนี้เรายังได้รวบรวมแนวคิดหลักบางส่วนที่กล่าวถึงก่อนหน้านี้ในหนังสือเล่มนี้ไว้ด้วย ในบทที่ 4 เราได้พูดคุยกันว่าสถานะท้องถิ่นที่ทนทานต่อข้อผิดพลาดมีความสำคัญต่อแอปพลิเคชันสตรีมมิ่งอย่างไร ตัวอย่างแรกในบทนี้แสดงให้เห็นว่าเหตุใดรัฐในท้องถิ่นจึงมีความสำคัญ เนื่องจากช่วยให้คุณสามารถติดตามข้อมูลที่คุณเห็นแล้วได้ การเข้าถึงภายในเครื่องช่วยหลีกเลี่ยงความล่าช้าของเครือข่าย ทำให้แอปพลิเคชันมีประสิทธิภาพมากขึ้นและทนทานต่อข้อผิดพลาด

เมื่อดำเนินการยกเลิกหรือการรวม คุณต้องระบุชื่อของที่จัดเก็บสถานะ การดำเนินการรวบรวมและรวบรวมจะส่งคืนอินสแตนซ์ KTable และ KTable ใช้พื้นที่จัดเก็บสถานะเพื่อแทนที่ผลลัพธ์เก่าด้วยผลลัพธ์ใหม่ อย่างที่คุณเห็น ไม่ใช่ว่าการอัปเดตทั้งหมดจะถูกส่งไปยังไปป์ไลน์ และนี่เป็นสิ่งสำคัญเนื่องจากการดำเนินการรวมได้รับการออกแบบเพื่อสร้างข้อมูลสรุป หากคุณไม่ใช้สถานะท้องถิ่น KTable จะส่งต่อผลการรวมและสรุปทั้งหมด

ต่อไป เราจะดูการดำเนินการต่างๆ เช่น การรวมกลุ่มภายในระยะเวลาที่กำหนด ซึ่งเรียกว่าการดำเนินการแบบหน้าต่าง

5.3.2. การทำงานของหน้าต่าง

ในส่วนก่อนหน้านี้ เราได้แนะนำการเลื่อนและการรวมกลุ่ม แอปพลิเคชันดำเนินการขายหุ้นอย่างต่อเนื่อง ตามด้วยการรวมห้าหุ้นที่มีการซื้อขายมากที่สุดในตลาดหลักทรัพย์

บางครั้งจำเป็นต้องมีการรวมกลุ่มและการสะสมผลลัพธ์อย่างต่อเนื่อง และบางครั้งคุณจำเป็นต้องดำเนินการตามระยะเวลาที่กำหนดเท่านั้น ตัวอย่างเช่น คำนวณจำนวนธุรกรรมการแลกเปลี่ยนที่เกิดขึ้นกับหุ้นของบริษัทใดบริษัทหนึ่งในช่วง 10 นาทีที่ผ่านมา หรือจำนวนผู้ใช้ที่คลิกแบนเนอร์โฆษณาใหม่ในช่วง 15 นาทีที่ผ่านมา แอปพลิเคชันอาจดำเนินการดังกล่าวหลายครั้ง แต่ด้วยผลลัพธ์ที่ใช้กับช่วงเวลาที่กำหนดเท่านั้น (กรอบเวลา)

การนับธุรกรรมการแลกเปลี่ยนโดยผู้ซื้อ

ในตัวอย่างถัดไป เราจะติดตามธุรกรรมหุ้นระหว่างเทรดเดอร์หลายราย ไม่ว่าจะเป็นองค์กรขนาดใหญ่หรือนักการเงินรายย่อยที่ชาญฉลาด

มีเหตุผลที่เป็นไปได้สองประการสำหรับการติดตามนี้ หนึ่งในนั้นคือความต้องการทราบว่าผู้นำตลาดกำลังซื้อ/ขายอะไร หากผู้เล่นรายใหญ่และนักลงทุนที่เชี่ยวชาญเหล่านี้มองเห็นโอกาส ก็สมเหตุสมผลที่จะปฏิบัติตามกลยุทธ์ของพวกเขา เหตุผลที่สองคือความปรารถนาที่จะมองเห็นสัญญาณที่เป็นไปได้ของการซื้อขายหลักทรัพย์โดยใช้ข้อมูลภายในที่ผิดกฎหมาย ในการทำเช่นนี้ คุณจะต้องวิเคราะห์ความสัมพันธ์ระหว่างยอดขายที่เพิ่มขึ้นอย่างรวดเร็วกับข่าวประชาสัมพันธ์ที่สำคัญ

การติดตามดังกล่าวประกอบด้วยขั้นตอนต่อไปนี้:

  • สร้างกระแสการอ่านจากหัวข้อหุ้น-ธุรกรรม
  • การจัดกลุ่มบันทึกที่เข้ามาตามรหัสผู้ซื้อและสัญลักษณ์หุ้น การเรียกเมธอดgroupByจะส่งกลับอินสแตนซ์ของคลาส KGroupedStream
  • เมธอด KGroupedStream.windowedBy ส่งกลับสตรีมข้อมูลที่จำกัดไว้ที่กรอบเวลา ซึ่งอนุญาตให้มีการรวมแบบหน้าต่าง ขึ้นอยู่กับชนิดของหน้าต่าง TimeWindowedKStream หรือ SessionWindowedKStream จะถูกส่งกลับ
  • จำนวนธุรกรรมสำหรับการดำเนินการรวม กระแสข้อมูลแบบหน้าต่างจะกำหนดว่าบันทึกเฉพาะเจาะจงถูกนำมาพิจารณาในการนับนี้หรือไม่
  • การเขียนผลลัพธ์ไปยังหัวข้อหรือส่งออกไปยังคอนโซลระหว่างการพัฒนา

โทโพโลยีของแอปพลิเคชันนี้เรียบง่าย แต่การมีภาพที่ชัดเจนจะเป็นประโยชน์ มาดูกันตามรูปครับ. 5.11.

ต่อไปเราจะดูฟังก์ชันการทำงานของการทำงานของหน้าต่างและโค้ดที่เกี่ยวข้อง

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"

ประเภทของหน้าต่าง

Kafka Streams มีหน้าต่างอยู่สามประเภท:

  • เซสชั่น;
  • “ไม้ลอย” (ไม้ลอย);
  • เลื่อน/กระโดด

ตัวเลือกใดขึ้นอยู่กับความต้องการทางธุรกิจของคุณ กรอบเวลาการสะดุดและกระโดดมีการจำกัดเวลา ในขณะที่กรอบเวลาเซสชันจะถูกจำกัดโดยกิจกรรมของผู้ใช้ ระยะเวลาของเซสชันจะพิจารณาจากความเคลื่อนไหวของผู้ใช้เท่านั้น สิ่งสำคัญที่ต้องจำคือ หน้าต่างทุกประเภทจะขึ้นอยู่กับการประทับวันที่/เวลาของรายการ ไม่ใช่เวลาของระบบ

ต่อไป เราจะใช้โทโพโลยีกับหน้าต่างแต่ละประเภท รหัสที่สมบูรณ์จะได้รับในตัวอย่างแรกเท่านั้น สำหรับ windows ประเภทอื่น ๆ จะไม่มีอะไรเปลี่ยนแปลงยกเว้นประเภทของการทำงานของหน้าต่าง

หน้าต่างเซสชัน

หน้าต่างเซสชันแตกต่างจากหน้าต่างประเภทอื่นๆ อย่างมาก สิ่งเหล่านี้ถูกจำกัดตามเวลาไม่มากเท่ากับกิจกรรมของผู้ใช้ (หรือกิจกรรมของหน่วยงานที่คุณต้องการติดตาม) กรอบเวลาเซสชั่นจะถูกคั่นด้วยช่วงเวลาที่ไม่มีการใช้งาน

รูปที่ 5.12 แสดงให้เห็นแนวคิดของหน้าต่างเซสชั่น เซสชันที่เล็กกว่าจะรวมเข้ากับเซสชันทางด้านซ้าย และเซสชันทางด้านขวาจะแยกจากกันเนื่องจากไม่มีการใช้งานเป็นระยะเวลานาน หน้าต่างเซสชันจะขึ้นอยู่กับกิจกรรมของผู้ใช้ แต่ใช้การประทับวันที่/เวลาจากรายการเพื่อกำหนดว่ารายการนั้นเป็นของเซสชันใด

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"

การใช้หน้าต่างเซสชันเพื่อติดตามธุรกรรมหุ้น

ลองใช้หน้าต่างเซสชันเพื่อรวบรวมข้อมูลเกี่ยวกับธุรกรรมการแลกเปลี่ยน การใช้งานหน้าต่างเซสชันจะแสดงอยู่ใน Listing 5.5 (ซึ่งสามารถพบได้ใน src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
คุณได้เห็นการดำเนินการส่วนใหญ่ในโทโพโลยีนี้แล้ว ดังนั้นจึงไม่จำเป็นต้องตรวจสอบการดำเนินการเหล่านี้ที่นี่อีกครั้ง แต่ยังมีองค์ประกอบใหม่หลายประการที่นี่ ซึ่งเราจะพูดถึงในตอนนี้

โดยทั่วไปการดำเนินการ groupBy ใดๆ จะดำเนินการรวมบางประเภท (การรวม การสะสม หรือการนับ) คุณสามารถดำเนินการรวมแบบสะสมด้วยผลรวมที่กำลังดำเนินการ หรือการรวมหน้าต่าง ซึ่งคำนึงถึงบันทึกบัญชีภายในกรอบเวลาที่ระบุ

รหัสในรายการ 5.5 นับจำนวนธุรกรรมภายในหน้าต่างเซสชัน ในรูป 5.13 การกระทำเหล่านี้ได้รับการวิเคราะห์ทีละขั้นตอน

โดยการเรียก windowedBy(SessionWindows.with(twentySeconds).จนถึง(สิบห้านาที)) เราจะสร้างหน้าต่างเซสชันโดยมีช่วงเวลาไม่มีการใช้งาน 20 วินาทีและช่วงเวลาคงอยู่ 15 นาที ช่วงเวลาที่ไม่ได้ใช้งาน 20 วินาทีหมายความว่าแอปพลิเคชันจะรวมรายการใดๆ ที่มาถึงภายใน 20 วินาทีของการสิ้นสุดหรือเริ่มต้นเซสชันปัจจุบันลงในเซสชันปัจจุบัน (ใช้งานอยู่)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
ต่อไป เราระบุการดำเนินการรวมที่ต้องดำเนินการในหน้าต่างเซสชัน - ในกรณีนี้ ให้นับ หากรายการขาเข้าอยู่นอกหน้าต่างการไม่ใช้งาน (ด้านใดด้านหนึ่งของการประทับวันที่/เวลา) แอปพลิเคชันจะสร้างเซสชันใหม่ ช่วงเวลาการเก็บรักษาหมายถึงการรักษาเซสชันเป็นระยะเวลาหนึ่ง และอนุญาตให้มีข้อมูลที่ล่าช้าซึ่งขยายออกไปเกินระยะเวลาที่ไม่มีการใช้งานของเซสชัน แต่ยังคงสามารถแนบได้ นอกจากนี้ การเริ่มต้นและสิ้นสุดของเซสชันใหม่อันเป็นผลมาจากการผสานสอดคล้องกับการประทับวันที่/เวลาที่เร็วที่สุดและล่าสุด

มาดูรายการบางส่วนจากวิธีการนับเพื่อดูว่าเซสชันทำงานอย่างไร (ตาราง 5.1)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
เมื่อบันทึกมาถึง เราจะค้นหาเซสชันที่มีอยู่ด้วยคีย์เดียวกัน เวลาสิ้นสุดน้อยกว่าการประทับวันที่/เวลาปัจจุบัน - ช่วงเวลาไม่มีกิจกรรม และเวลาเริ่มต้นมากกว่าการประทับวันที่/เวลาปัจจุบัน + ช่วงเวลาไม่มีกิจกรรม เมื่อคำนึงถึงสิ่งนี้แล้ว มีสี่รายการจากตาราง 5.1 รวมเป็นเซสชันเดียวดังนี้

1. บันทึก 1 มาถึงก่อน ดังนั้นเวลาเริ่มต้นจึงเท่ากับเวลาสิ้นสุดและเป็น 00:00:00 น.

2. ถัดไป รายการที่ 2 มาถึง และเรามองหาเซสชันที่สิ้นสุดไม่เร็วกว่า 23:59:55 และเริ่มไม่เกิน 00:00:35 น. เราค้นหาบันทึก 1 และรวมเซสชัน 1 และ 2 เราใช้เวลาเริ่มต้นของเซสชัน 1 (ก่อนหน้า) และเวลาสิ้นสุดของเซสชัน 2 (ภายหลัง) เพื่อให้เซสชันใหม่ของเราเริ่มต้นเวลา 00:00:00 น. และสิ้นสุดที่ 00: 00:15.

3. บันทึก 3 มาถึง เราค้นหาเซสชันระหว่าง 00:00:30 น. ถึง 00:01:10 น. และไม่พบเลย เพิ่มเซสชันที่สองสำหรับคีย์ 123-345-654,FFBE เริ่มต้นและสิ้นสุดเวลา 00:00:50 น.

4. บันทึก 4 มาถึงแล้ว และเรากำลังมองหาเซสชันระหว่าง 23:59:45 น. ถึง 00:00:25 น. คราวนี้พบทั้งเซสชัน 1 และ 2 ทั้งสามเซสชันจะรวมกันเป็นหนึ่งเดียว โดยมีเวลาเริ่มต้น 00:00:00 น. และเวลาสิ้นสุด 00:00:15 น.

จากสิ่งที่อธิบายไว้ในส่วนนี้ควรจดจำความแตกต่างที่สำคัญต่อไปนี้:

  • เซสชันไม่ใช่หน้าต่างขนาดคงที่ ระยะเวลาของเซสชั่นจะถูกกำหนดโดยกิจกรรมภายในระยะเวลาที่กำหนด
  • การประทับวันที่/เวลาในข้อมูลจะกำหนดว่าเหตุการณ์นั้นอยู่ในเซสชันที่มีอยู่หรือในช่วงที่ไม่มีการใช้งาน

ต่อไปเราจะพูดถึงหน้าต่างประเภทถัดไป - หน้าต่าง "ไม้ลอย"

หน้าต่าง "ไม้ลอย"

หน้าต่างที่พังทลายจะบันทึกเหตุการณ์ที่เกิดขึ้นภายในช่วงระยะเวลาหนึ่ง ลองนึกภาพว่าคุณต้องบันทึกธุรกรรมหุ้นทั้งหมดของบริษัทหนึ่งทุกๆ 20 วินาที ดังนั้นคุณจึงรวบรวมเหตุการณ์ทั้งหมดในช่วงเวลานั้น เมื่อสิ้นสุดช่วงเวลา 20 วินาที หน้าต่างจะเลื่อนไปและเลื่อนไปยังช่วงเวลาการสังเกต 20 วินาทีใหม่ รูปที่ 5.14 แสดงให้เห็นถึงสถานการณ์นี้

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
อย่างที่คุณเห็น กิจกรรมทั้งหมดที่ได้รับในช่วง 20 วินาทีที่ผ่านมาจะรวมอยู่ในหน้าต่าง เมื่อสิ้นสุดระยะเวลานี้ หน้าต่างใหม่จะถูกสร้างขึ้น

รายการ 5.6 แสดงโค้ดที่สาธิตการใช้หน้าต่าง tumbling เพื่อบันทึกธุรกรรมหุ้นทุกๆ 20 วินาที (พบได้ใน src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
ด้วยการเปลี่ยนแปลงเล็กๆ น้อยๆ ในการเรียกเมธอด TimeWindows.of คุณสามารถใช้หน้าต่างแบบพลิกกลับได้ ตัวอย่างนี้ไม่ได้เรียกใช้เมธอด until() ดังนั้นระบบจะใช้ช่วงเวลาการเก็บรักษาเริ่มต้นที่ 24 ชั่วโมง

ในที่สุดก็ถึงเวลาไปยังตัวเลือกหน้าต่างสุดท้าย - หน้าต่าง "กระโดด"

หน้าต่างบานเลื่อน ("กระโดด")

หน้าต่างบานเลื่อน/กระโดดมีลักษณะคล้ายกับหน้าต่างไม้ลอย แต่มีความแตกต่างกันเล็กน้อย หน้าต่างแบบเลื่อนไม่ต้องรอจนกว่าจะสิ้นสุดช่วงเวลาก่อนที่จะสร้างหน้าต่างใหม่เพื่อประมวลผลเหตุการณ์ล่าสุด โดยจะเริ่มการคำนวณใหม่หลังจากช่วงรอที่น้อยกว่าระยะเวลาของกรอบเวลา

เพื่อแสดงให้เห็นความแตกต่างระหว่างหน้าต่างไม้ลอยและหน้าต่างกระโดด กลับไปที่ตัวอย่างการนับธุรกรรมในตลาดหลักทรัพย์กัน เป้าหมายของเรายังคงนับจำนวนธุรกรรม แต่เราไม่ต้องการรอเวลาทั้งหมดก่อนที่จะอัปเดตตัวนับ แต่เราจะอัปเดตตัวนับในช่วงเวลาที่สั้นลงแทน ตัวอย่างเช่น เราจะยังคงนับจำนวนธุรกรรมทุกๆ 20 วินาที แต่จะอัพเดตตัวนับทุกๆ 5 วินาที ดังแสดงในรูปที่ 5.15 XNUMX. ในกรณีนี้ เราจะได้หน้าต่างผลลัพธ์สามหน้าต่างที่มีข้อมูลที่ทับซ้อนกัน

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
รายการ 5.7 แสดงโค้ดสำหรับกำหนดหน้าต่างแบบเลื่อน (พบใน src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
หน้าต่างไม้ลอยสามารถแปลงเป็นหน้าต่างกระโดดได้โดยการเพิ่มการเรียกไปยังเมธอด advanceBy() ในตัวอย่างที่แสดง ช่วงเวลาการบันทึกคือ 15 นาที

คุณเห็นในส่วนนี้ถึงวิธีจำกัดผลลัพธ์การรวมตามกรอบเวลา โดยเฉพาะอย่างยิ่ง ฉันต้องการให้คุณจำสามสิ่งต่อไปนี้จากส่วนนี้:

  • ขนาดของหน้าต่างเซสชั่นไม่ได้จำกัดตามช่วงเวลา แต่ตามกิจกรรมของผู้ใช้
  • หน้าต่าง "ไม้ลอย" ให้ภาพรวมของเหตุการณ์ภายในระยะเวลาที่กำหนด
  • ระยะเวลาของการข้ามหน้าต่างได้รับการแก้ไขแล้ว แต่จะมีการอัปเดตบ่อยครั้งและอาจมีรายการที่ทับซ้อนกันในทุกหน้าต่าง

ต่อไป เราจะเรียนรู้วิธีแปลง KTable กลับเป็น KStream สำหรับการเชื่อมต่อ

5.3.3. การเชื่อมต่อวัตถุ KStream และ KTable

ในบทที่ 4 เราได้พูดคุยถึงการเชื่อมต่อสองวัตถุ KStream ตอนนี้เราต้องเรียนรู้วิธีเชื่อมต่อ KTable และ KStream อาจจำเป็นด้วยเหตุผลง่ายๆ ดังต่อไปนี้ KStream คือสตรีมของบันทึก และ KTable คือสตรีมของการอัปเดตบันทึก แต่บางครั้งคุณอาจต้องการเพิ่มบริบทเพิ่มเติมให้กับสตรีมบันทึกโดยใช้การอัปเดตจาก KTable

ลองนำข้อมูลเกี่ยวกับจำนวนธุรกรรมในตลาดหลักทรัพย์มารวมกับข่าวสารตลาดหลักทรัพย์สำหรับอุตสาหกรรมที่เกี่ยวข้องกัน นี่คือสิ่งที่คุณต้องทำเพื่อให้บรรลุเป้าหมายนี้โดยพิจารณาจากโค้ดที่คุณมีอยู่แล้ว

  1. แปลงออบเจ็กต์ KTable ด้วยข้อมูลเกี่ยวกับจำนวนธุรกรรมหุ้นเป็น KStream ตามด้วยการแทนที่คีย์ด้วยคีย์ที่ระบุภาคอุตสาหกรรมที่สอดคล้องกับสัญลักษณ์หุ้นนี้
  2. สร้างวัตถุ KTable ที่อ่านข้อมูลจากหัวข้อที่มีข่าวตลาดหลักทรัพย์ KTable ใหม่นี้จะถูกจัดหมวดหมู่ตามภาคอุตสาหกรรม
  3. เชื่อมต่อข่าวสารล่าสุดพร้อมข้อมูลจำนวนธุรกรรมในตลาดหลักทรัพย์แยกตามภาคอุตสาหกรรม

ตอนนี้เรามาดูวิธีการดำเนินการตามแผนปฏิบัติการนี้

แปลง KTable เป็น KStream

ในการแปลง KTable เป็น KStream คุณต้องทำดังต่อไปนี้

  1. เรียกใช้เมธอด KTable.toStream()
  2. โดยการเรียกเมธอด KStream.map แทนที่คีย์ด้วยชื่ออุตสาหกรรม จากนั้นดึงข้อมูลอ็อบเจ็กต์ TransactionSummary จากอินสแตนซ์ Windowed

เราจะเชื่อมโยงการดำเนินการเหล่านี้เข้าด้วยกันดังต่อไปนี้ (โค้ดสามารถพบได้ในไฟล์ src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (รายการ 5.8)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
เนื่องจากเรากำลังดำเนินการ KStream.map อินสแตนซ์ KStream ที่ส่งคืนจึงถูกแบ่งพาร์ติชันใหม่โดยอัตโนมัติเมื่อมีการใช้ในการเชื่อมต่อ

เราเสร็จสิ้นกระบวนการแปลงแล้ว ต่อไปเราต้องสร้างวัตถุ KTable สำหรับอ่านข่าวหุ้น

จัดทำ KTable สำหรับข่าวหุ้น

โชคดีที่การสร้างวัตถุ KTable ใช้โค้ดเพียงบรรทัดเดียว (โค้ดสามารถพบได้ใน src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (รายการ 5.9)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
เป็นที่น่าสังเกตว่าไม่จำเป็นต้องระบุวัตถุ Serde เนื่องจากมีการใช้สตริง Serdes ในการตั้งค่า นอกจากนี้ โดยการใช้การแจงนับ EARLIEST ตารางจะเต็มไปด้วยเรกคอร์ดที่จุดเริ่มต้น

ตอนนี้เราสามารถไปยังขั้นตอนสุดท้ายได้ - การเชื่อมต่อ

เชื่อมโยงการอัพเดตข่าวสารกับข้อมูลการนับธุรกรรม

การสร้างการเชื่อมต่อไม่ใช่เรื่องยาก เราจะใช้การรวมด้านซ้ายในกรณีที่ไม่มีข่าวหุ้นสำหรับอุตสาหกรรมที่เกี่ยวข้อง (รหัสที่จำเป็นสามารถพบได้ในไฟล์ src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (รายการ 5.10)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
ตัวดำเนินการ leftJoin นี้ค่อนข้างง่าย ต่างจากการรวมในบทที่ 4 ตรงที่เมธอด JoinWindow จะไม่ถูกใช้ เนื่องจากเมื่อทำการรวม KStream-KTable จะมีเพียงรายการเดียวใน KTable สำหรับแต่ละคีย์ การเชื่อมต่อดังกล่าวไม่ได้จำกัดเวลา: บันทึกอยู่ใน KTable หรือขาดหายไป ข้อสรุปหลัก: การใช้วัตถุ KTable คุณสามารถเพิ่ม KStream ด้วยข้อมูลอ้างอิงที่อัปเดตไม่บ่อยนัก

ตอนนี้เราจะมาดูวิธีที่มีประสิทธิภาพมากขึ้นในการเพิ่มคุณค่าให้กับกิจกรรมจาก KStream

5.3.4. วัตถุ GlobalKTable

อย่างที่คุณเห็น มีความจำเป็นต้องปรับปรุงสตรีมกิจกรรมหรือเพิ่มบริบทให้กับสตรีมเหล่านั้น ในบทที่ 4 คุณเห็นการเชื่อมต่อระหว่าง KStream object สองตัว และในส่วนก่อนหน้านี้ คุณเห็นการเชื่อมต่อระหว่าง KStream และ KTable ในกรณีเหล่านี้ทั้งหมด จำเป็นต้องแบ่งพาร์ติชันสตรีมข้อมูลอีกครั้งเมื่อแมปคีย์กับประเภทหรือค่าใหม่ บางครั้งการแบ่งพาร์ติชั่นก็ทำอย่างชัดเจน และบางครั้ง Kafka Streams ก็ทำโดยอัตโนมัติ จำเป็นต้องแบ่งพาร์ติชันใหม่เนื่องจากคีย์มีการเปลี่ยนแปลงและบันทึกจะต้องสิ้นสุดในส่วนใหม่ มิฉะนั้นการเชื่อมต่อจะเป็นไปไม่ได้ (สิ่งนี้ถูกกล่าวถึงในบทที่ 4 ในส่วน "การแบ่งพาร์ติชันข้อมูลใหม่" ในส่วนย่อย 4.2.4)

การแบ่งพาร์ติชันใหม่มีค่าใช้จ่าย

การแบ่งพาร์ติชันใหม่ต้องใช้ต้นทุน - ต้นทุนทรัพยากรเพิ่มเติมสำหรับการสร้างหัวข้อระดับกลาง การจัดเก็บข้อมูลที่ซ้ำกันในหัวข้ออื่น นอกจากนี้ยังหมายถึงเวลาแฝงที่เพิ่มขึ้นเนื่องจากการเขียนและการอ่านจากหัวข้อนี้ นอกจากนี้ หากคุณต้องการรวมข้ามมากกว่าหนึ่งลักษณะหรือมิติ คุณต้องโยงการรวม แมปเรกคอร์ดด้วยคีย์ใหม่ และรันกระบวนการแบ่งพาร์ติชันใหม่อีกครั้ง

การเชื่อมต่อกับชุดข้อมูลขนาดเล็ก

ในบางกรณี ปริมาณข้อมูลอ้างอิงที่จะเชื่อมต่อมีขนาดค่อนข้างเล็ก ดังนั้นสำเนาที่สมบูรณ์ของข้อมูลจึงพอดีกับแต่ละโหนดได้อย่างง่ายดาย สำหรับสถานการณ์เช่นนี้ Kafka Streams จัดให้มีคลาส GlobalKTable

อินสแตนซ์ GlobalKTable จะไม่ซ้ำกันเนื่องจากแอปพลิเคชันจะจำลองข้อมูลทั้งหมดไปยังแต่ละโหนด และเนื่องจากข้อมูลทั้งหมดปรากฏบนแต่ละโหนด จึงไม่จำเป็นต้องแบ่งพาร์ติชันสตรีมเหตุการณ์ด้วยคีย์ข้อมูลอ้างอิง เพื่อให้พาร์ติชันทั้งหมดสามารถใช้ได้ คุณยังสามารถสร้างการรวมแบบไม่ใช้คีย์ได้โดยใช้ออบเจ็กต์ GlobalKTable กลับไปที่ตัวอย่างก่อนหน้านี้เพื่อสาธิตคุณลักษณะนี้

การเชื่อมต่อวัตถุ KStream กับวัตถุ GlobalKTable

ในหัวข้อย่อย 5.3.2 เราได้ดำเนินการรวมหน้าต่างของธุรกรรมการแลกเปลี่ยนโดยผู้ซื้อ ผลลัพธ์ของการรวมกลุ่มนี้มีลักษณะดังนี้:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

แม้ว่าผลลัพธ์เหล่านี้จะตอบสนองวัตถุประสงค์ แต่จะมีประโยชน์มากกว่าหากแสดงชื่อลูกค้าและชื่อเต็มของบริษัทด้วย หากต้องการเพิ่มชื่อลูกค้าและชื่อบริษัท คุณสามารถทำการรวมตามปกติได้ แต่คุณจะต้องทำการแมปคีย์สองรายการและแบ่งพาร์ติชันใหม่ ด้วย GlobalKTable คุณสามารถหลีกเลี่ยงค่าใช้จ่ายในการดำเนินการดังกล่าวได้

ในการดำเนินการนี้ เราจะใช้อ็อบเจ็กต์ countStream จากรายการ 5.11 (โค้ดที่เกี่ยวข้องสามารถพบได้ใน src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) และเชื่อมต่อกับอ็อบเจ็กต์ GlobalKTable สองอัน

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
เราเคยคุยกันเรื่องนี้มาก่อนแล้ว ดังนั้นฉันจะไม่พูดซ้ำอีก แต่ฉันสังเกตว่าโค้ดในฟังก์ชัน toStream().map นั้นถูกทำให้เป็นนามธรรมในออบเจ็กต์ฟังก์ชัน แทนที่จะเป็นนิพจน์แลมบ์ดาแบบอินไลน์เพื่อให้สามารถอ่านได้

ขั้นตอนต่อไปคือการประกาศ GlobalKTable สองอินสแตนซ์ (โค้ดที่แสดงอยู่ในไฟล์ src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (รายการ 5.12)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"

โปรดทราบว่าชื่อหัวข้อมีการอธิบายโดยใช้ประเภทที่แจกแจง

ตอนนี้เรามีส่วนประกอบทั้งหมดพร้อมแล้ว ที่เหลือก็แค่เขียนโค้ดสำหรับการเชื่อมต่อ (ซึ่งสามารถพบได้ในไฟล์ src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (รายการ 5.13)

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
แม้ว่าจะมีการรวมสองรายการในโค้ดนี้ แต่ก็ถูกล่ามโซ่เนื่องจากไม่มีการใช้ผลลัพธ์แยกกัน ผลลัพธ์จะแสดงเมื่อสิ้นสุดการดำเนินการทั้งหมด

เมื่อคุณรันการดำเนินการเข้าร่วมข้างต้น คุณจะได้ผลลัพธ์ดังนี้:

{customer='Barney, Smith' company="Exxon", transactions= 17}

สาระสำคัญไม่ได้เปลี่ยนแปลง แต่ผลลัพธ์เหล่านี้ดูชัดเจนยิ่งขึ้น

หากคุณนับถอยหลังสู่บทที่ 4 คุณคงได้เห็นการเชื่อมต่อหลายประเภทแล้ว มีการระบุไว้ในตาราง 5.2. ตารางนี้แสดงถึงความสามารถในการเชื่อมต่อใน Kafka Streams เวอร์ชัน 1.0.0 บางสิ่งอาจมีการเปลี่ยนแปลงในการเปิดตัวในอนาคต

หนังสือ “Kafka Streams in Action” แอปพลิเคชันและไมโครเซอร์วิสเพื่อการทำงานแบบเรียลไทม์"
เพื่อสรุปสิ่งต่างๆ เรามาสรุปข้อมูลพื้นฐานกัน: คุณสามารถเชื่อมต่อสตรีมเหตุการณ์ (KStream) และอัปเดตสตรีม (KTable) โดยใช้สถานะท้องถิ่น หรือหากขนาดของข้อมูลอ้างอิงไม่ใหญ่เกินไป คุณสามารถใช้อ็อบเจ็กต์ GlobalKTable ได้ GlobalKTables จำลองพาร์ติชันทั้งหมดไปยังแต่ละโหนดแอปพลิเคชัน Kafka Streams เพื่อให้มั่นใจว่าข้อมูลทั้งหมดจะพร้อมใช้งาน ไม่ว่าคีย์จะสอดคล้องกับพาร์ติชันใดก็ตาม

ต่อไปเราจะเห็นฟีเจอร์ Kafka Streams ซึ่งช่วยให้เราสังเกตการเปลี่ยนแปลงสถานะโดยไม่ต้องใช้ข้อมูลจากหัวข้อ Kafka

5.3.5. สถานะที่น่าสงสัย

เราได้ดำเนินการหลายอย่างที่เกี่ยวข้องกับสถานะแล้วและส่งออกผลลัพธ์ไปยังคอนโซลเสมอ (เพื่อวัตถุประสงค์ในการพัฒนา) หรือเขียนลงในหัวข้อ (เพื่อวัตถุประสงค์ในการผลิต) เมื่อเขียนผลลัพธ์ในหัวข้อ คุณต้องใช้ Kafka Consumer เพื่อดูผลลัพธ์

การอ่านข้อมูลจากหัวข้อเหล่านี้ถือได้ว่าเป็นมุมมองที่เป็นรูปธรรมประเภทหนึ่ง เพื่อวัตถุประสงค์ของเรา เราสามารถใช้คำจำกัดความของมุมมองที่เป็นรูปธรรมจากวิกิพีเดีย: “...วัตถุฐานข้อมูลทางกายภาพที่มีผลลัพธ์ของการสืบค้น ตัวอย่างเช่น อาจเป็นสำเนาของข้อมูลระยะไกลในเครื่อง หรือชุดย่อยของแถวและ/หรือคอลัมน์ของตารางหรือผลรวม หรือตารางสรุปที่ได้รับจากการรวมกลุ่ม” (https://en.wikipedia.org/wiki /Materialized_view)

Kafka Streams ยังอนุญาตให้คุณเรียกใช้การสืบค้นแบบโต้ตอบในร้านค้าของรัฐ ซึ่งทำให้คุณสามารถอ่านมุมมองที่เป็นรูปธรรมเหล่านี้ได้โดยตรง สิ่งสำคัญคือต้องทราบว่าการสืบค้นไปยังที่จัดเก็บสถานะเป็นการดำเนินการแบบอ่านอย่างเดียว สิ่งนี้ทำให้มั่นใจได้ว่าคุณไม่ต้องกังวลว่าจะทำให้สถานะไม่สอดคล้องกันโดยไม่ตั้งใจในขณะที่แอปพลิเคชันของคุณกำลังประมวลผลข้อมูล

ความสามารถในการสืบค้นร้านค้าของรัฐโดยตรงเป็นสิ่งสำคัญ ซึ่งหมายความว่าคุณสามารถสร้างแอปพลิเคชันแดชบอร์ดได้โดยไม่ต้องดึงข้อมูลจากผู้ใช้ Kafka ก่อน นอกจากนี้ยังเพิ่มประสิทธิภาพของแอปพลิเคชันเนื่องจากไม่จำเป็นต้องเขียนข้อมูลอีกครั้ง:

  • เนื่องจากตำแหน่งของข้อมูลทำให้สามารถเข้าถึงข้อมูลได้อย่างรวดเร็ว
  • ความซ้ำซ้อนของข้อมูลจะหมดไป เนื่องจากไม่ได้เขียนลงที่จัดเก็บข้อมูลภายนอก

สิ่งสำคัญที่ฉันต้องการให้คุณจำไว้คือคุณสามารถค้นหาสถานะได้โดยตรงจากภายในแอปพลิเคชันของคุณ โอกาสที่มอบให้คุณไม่สามารถพูดเกินจริงได้ แทนที่จะใช้ข้อมูลจาก Kafka และจัดเก็บข้อมูลในฐานข้อมูลสำหรับแอปพลิเคชัน คุณสามารถสืบค้นที่จัดเก็บสถานะด้วยผลลัพธ์เดียวกันได้ การสืบค้นโดยตรงไปยังร้านค้าของรัฐหมายถึงโค้ดน้อยลง (ไม่มีผู้บริโภค) และซอฟต์แวร์น้อยลง (ไม่ต้องใช้ตารางฐานข้อมูลเพื่อจัดเก็บผลลัพธ์)

เราได้กล่าวถึงพื้นฐานไปบ้างแล้วในบทนี้ ดังนั้นเราจะทิ้งการสนทนาเกี่ยวกับการสอบถามแบบโต้ตอบกับร้านค้าของรัฐไว้ชั่วคราว แต่อย่ากังวล: ในบทที่ 9 เราจะสร้างแอปพลิเคชันแดชบอร์ดที่เรียบง่ายพร้อมข้อความค้นหาแบบโต้ตอบ โดยจะใช้ตัวอย่างบางส่วนจากบทนี้และบทก่อนหน้าเพื่อสาธิตการสืบค้นแบบโต้ตอบ และวิธีที่คุณสามารถเพิ่มลงในแอปพลิเคชัน Kafka Streams

สรุป

  • ออบเจ็กต์ KStream แสดงถึงสตรีมของเหตุการณ์ เทียบได้กับการแทรกลงในฐานข้อมูล อ็อบเจ็กต์ KTable แสดงถึงกระแสการอัพเดต เช่นเดียวกับการอัพเดตฐานข้อมูล ขนาดของวัตถุ KTable ไม่เพิ่มขึ้น บันทึกเก่าจะถูกแทนที่ด้วยบันทึกใหม่
  • วัตถุ KTable จำเป็นสำหรับการดำเนินการรวม
  • เมื่อใช้การดำเนินการหน้าต่าง คุณสามารถแบ่งข้อมูลที่รวบรวมออกเป็นช่วงเวลาได้
  • ด้วยออบเจ็กต์ GlobalKTable คุณสามารถเข้าถึงข้อมูลอ้างอิงได้ทุกที่ในแอปพลิเคชัน โดยไม่คำนึงถึงการแบ่งพาร์ติชัน
  • การเชื่อมต่อระหว่างวัตถุ KStream, KTable และ GlobalKTable เป็นไปได้

จนถึงตอนนี้ เราได้มุ่งเน้นไปที่การสร้างแอปพลิเคชัน Kafka Streams โดยใช้ KStream DSL ระดับสูง แม้ว่าวิธีการระดับสูงจะช่วยให้คุณสร้างโปรแกรมที่เรียบร้อยและรัดกุม แต่การใช้วิธีนี้ก็ถือเป็นข้อดีอย่างหนึ่ง การทำงานกับ DSL KStream หมายถึงการเพิ่มความกระชับของโค้ดของคุณโดยการลดระดับการควบคุม ในบทถัดไป เราจะดูที่ API โหนดตัวจัดการระดับต่ำ และลองแลกเปลี่ยนอื่นๆ โปรแกรมจะใช้งานได้นานกว่าเมื่อก่อน แต่เราสามารถสร้างโหนดตัวจัดการได้เกือบทุกโหนดที่เราอาจต้องการ

→ ดูรายละเอียดหนังสือเพิ่มเติมได้ที่ เว็บไซต์ของผู้จัดพิมพ์

→ สำหรับ Habrozhiteli ส่วนลด 25% โดยใช้คูปอง - คาฟคาสตรีม

→ เมื่อชำระเงินค่าหนังสือในรูปแบบกระดาษ หนังสืออิเล็กทรอนิกส์จะถูกส่งทางอีเมล

ที่มา: will.com

เพิ่มความคิดเห็น