Redis Stream - ความน่าเชื่อถือและความสามารถในการปรับขนาดของระบบส่งข้อความของคุณ

Redis Stream - ความน่าเชื่อถือและความสามารถในการปรับขนาดของระบบส่งข้อความของคุณ

Redis Stream เป็นประเภทข้อมูลนามธรรมใหม่ที่เปิดตัวใน Redis เวอร์ชัน 5.0
ตามแนวคิดแล้ว Redis Stream คือรายการที่คุณสามารถเพิ่มรายการได้ แต่ละรายการมีตัวระบุที่ไม่ซ้ำกัน ตามค่าเริ่มต้น รหัสจะถูกสร้างขึ้นโดยอัตโนมัติและมีการประทับเวลาด้วย ดังนั้น คุณจึงสามารถสืบค้นช่วงของบันทึกเมื่อเวลาผ่านไป หรือรับข้อมูลใหม่เมื่อมาถึงสตรีมได้ เช่นเดียวกับคำสั่ง Unix "tail -f" ที่อ่านไฟล์บันทึกและค้างขณะรอข้อมูลใหม่ โปรดทราบว่าไคลเอนต์หลายตัวสามารถฟังเธรดในเวลาเดียวกันได้ เช่นเดียวกับที่กระบวนการ "tail -f" จำนวนมากสามารถอ่านไฟล์พร้อมกันได้โดยไม่ขัดแย้งกัน

เพื่อให้เข้าใจถึงประโยชน์ทั้งหมดของประเภทข้อมูลใหม่ เรามาดูโครงสร้าง Redis ที่มีอยู่ยาวนานซึ่งจำลองฟังก์ชันการทำงานของ Redis Stream บางส่วนกัน

เผยแพร่ PUB/SUB

Redis Pub/Sub เป็นระบบส่งข้อความธรรมดาที่สร้างไว้ในที่เก็บคีย์-ค่าของคุณแล้ว อย่างไรก็ตาม ความเรียบง่ายต้องแลกมาด้วย:

  • หากผู้จัดพิมพ์ล้มเหลวด้วยเหตุผลบางประการ เขาจะสูญเสียสมาชิกทั้งหมด
  • ผู้จัดพิมพ์จำเป็นต้องทราบที่อยู่ที่แน่นอนของสมาชิกทั้งหมด
  • ผู้จัดพิมพ์อาจทำให้สมาชิกมีงานมากเกินไปหากข้อมูลถูกเผยแพร่เร็วกว่าที่ประมวลผล
  • ข้อความจะถูกลบออกจากบัฟเฟอร์ของผู้จัดพิมพ์ทันทีหลังจากการเผยแพร่ โดยไม่คำนึงถึงจำนวนสมาชิกที่ถูกส่งไปยังสมาชิก และพวกเขาสามารถประมวลผลข้อความนี้ได้เร็วแค่ไหน
  • สมาชิกทุกคนจะได้รับข้อความพร้อมกัน สมาชิกจะต้องตกลงกันเองเกี่ยวกับลำดับการประมวลผลข้อความเดียวกัน
  • ไม่มีกลไกในตัวในการยืนยันว่าสมาชิกได้ประมวลผลข้อความสำเร็จแล้ว หากสมาชิกได้รับข้อความและขัดข้องระหว่างการประมวลผล ผู้เผยแพร่จะไม่ทราบเรื่องนี้

รายการ Redis

Redis List เป็นโครงสร้างข้อมูลที่รองรับการบล็อกคำสั่งการอ่าน คุณสามารถเพิ่มและอ่านข้อความจากจุดเริ่มต้นหรือจุดสิ้นสุดของรายการได้ ตามโครงสร้างนี้ คุณสามารถสร้างสแต็กหรือคิวที่ดีสำหรับระบบแบบกระจายของคุณได้ และในกรณีส่วนใหญ่ แค่นี้ก็เพียงพอแล้ว ความแตกต่างหลักจาก Redis Pub/Sub:

  • ข้อความถูกส่งไปยังลูกค้ารายหนึ่ง ไคลเอนต์ที่ถูกบล็อกการอ่านตัวแรกจะได้รับข้อมูลก่อน
  • คลินท์ต้องเริ่มการดำเนินการอ่านสำหรับแต่ละข้อความด้วยตัวเอง รายการไม่รู้อะไรเกี่ยวกับลูกค้า
  • ข้อความจะถูกจัดเก็บจนกว่าจะมีคนอ่านหรือลบข้อความเหล่านั้นอย่างชัดเจน หากคุณกำหนดค่าเซิร์ฟเวอร์ Redis ให้ล้างข้อมูลลงดิสก์ ความน่าเชื่อถือของระบบจะเพิ่มขึ้นอย่างมาก

ข้อมูลเบื้องต้นเกี่ยวกับสตรีม

การเพิ่มรายการในสตรีม

ทีม XADD เพิ่มรายการใหม่ให้กับสตรีม เรกคอร์ดไม่ได้เป็นเพียงสตริงเท่านั้น แต่ยังประกอบด้วยคู่คีย์-ค่าอย่างน้อยหนึ่งคู่ ดังนั้นแต่ละรายการจึงมีโครงสร้างอยู่แล้วและคล้ายกับโครงสร้างของไฟล์ CSV

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

ในตัวอย่างด้านบน เราเพิ่มสองฟิลด์ลงในสตรีมด้วยชื่อ (คีย์) “mystream”: “sensor-id” และ “อุณหภูมิ” ด้วยค่า “1234” และ “19.8” ตามลำดับ ในฐานะอาร์กิวเมนต์ที่สอง คำสั่งจะใช้ตัวระบุที่จะกำหนดให้กับรายการ - ตัวระบุนี้จะระบุแต่ละรายการในสตรีมโดยไม่ซ้ำกัน อย่างไรก็ตาม ในกรณีนี้ เราผ่าน * เพราะเราต้องการให้ Redis สร้าง ID ใหม่ให้เรา แต่ละ ID ใหม่จะเพิ่มขึ้น ดังนั้นแต่ละรายการใหม่จะมีตัวระบุที่สูงกว่าเมื่อเทียบกับรายการก่อนหน้า

รูปแบบตัวระบุ

ID รายการที่ส่งคืนโดยคำสั่ง XADDประกอบด้วยสองส่วน:

{millisecondsTime}-{sequenceNumber}

มิลลิวินาทีเวลา — เวลา Unix เป็นมิลลิวินาที (เวลาเซิร์ฟเวอร์ Redis) อย่างไรก็ตาม หากเวลาปัจจุบันเท่ากันหรือน้อยกว่าเวลาของการบันทึกครั้งก่อน ระบบจะใช้การประทับเวลาของการบันทึกครั้งก่อน ดังนั้น หากเวลาของเซิร์ฟเวอร์ย้อนเวลากลับไป ตัวระบุใหม่จะยังคงรักษาคุณสมบัติส่วนเพิ่มไว้

ลำดับหมายเลข ใช้สำหรับบันทึกที่สร้างในมิลลิวินาทีเดียวกัน ลำดับหมายเลข จะเพิ่มขึ้น 1 เมื่อเทียบกับรายการก่อนหน้า เพราะว่า ลำดับหมายเลข มีขนาด 64 บิต ในทางปฏิบัติ คุณไม่ควรมีจำนวนเร็กคอร์ดที่สามารถสร้างได้ภายในหนึ่งมิลลิวินาที

รูปแบบของตัวระบุดังกล่าวอาจดูแปลกเมื่อมองแวบแรก ผู้อ่านที่ไม่ไว้วางใจอาจสงสัยว่าเหตุใดเวลาจึงเป็นส่วนหนึ่งของตัวระบุ เหตุผลก็คือสตรีม Redis รองรับการสืบค้นช่วงตาม ID เนื่องจากตัวระบุเชื่อมโยงกับเวลาที่สร้างบันทึก จึงทำให้สามารถสืบค้นช่วงเวลาได้ เราจะดูตัวอย่างที่เฉพาะเจาะจงเมื่อเราดูที่คำสั่ง เอ็กซ์เรนจ์.

หากผู้ใช้จำเป็นต้องระบุตัวระบุของตนเองด้วยเหตุผลบางประการ เช่น เชื่อมโยงกับระบบภายนอกบางระบบ เราก็สามารถส่งผ่านไปยังคำสั่งได้ XADD แทน * ดังแสดงด้านล่าง:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

โปรดทราบว่าในกรณีนี้ คุณต้องตรวจสอบการเพิ่ม ID ด้วยตนเอง ในตัวอย่างของเรา ตัวระบุขั้นต่ำคือ "0-1" ดังนั้นคำสั่งจะไม่ยอมรับตัวระบุอื่นที่เท่ากับหรือน้อยกว่า "0-1"

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

จำนวนบันทึกต่อสตรีม

คุณสามารถรับจำนวนเรคคอร์ดในสตรีมได้ง่ายๆ โดยใช้คำสั่ง เอ็กซ์เลน. สำหรับตัวอย่างของเรา คำสั่งนี้จะคืนค่าต่อไปนี้:

> XLEN somestream
(integer) 2

ข้อความค้นหาช่วง - XRANGE และ XREVRANGE

หากต้องการขอข้อมูลตามช่วง เราจำเป็นต้องระบุตัวระบุสองตัว - จุดเริ่มต้นและจุดสิ้นสุดของช่วง ช่วงที่ส่งคืนจะรวมองค์ประกอบทั้งหมด รวมถึงขอบเขตด้วย นอกจากนี้ยังมีตัวระบุพิเศษสองตัว “-” และ “+” ตามลำดับ ซึ่งหมายถึงตัวระบุที่เล็กที่สุด (บันทึกแรก) และใหญ่ที่สุด (บันทึกสุดท้าย) ในสตรีม ตัวอย่างด้านล่างนี้จะแสดงรายการสตรีมทั้งหมด

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

แต่ละระเบียนที่ส่งคืนคืออาร์เรย์ของสององค์ประกอบ: ตัวระบุและรายการคู่คีย์-ค่า เราได้กล่าวไปแล้วว่าตัวระบุเรกคอร์ดเกี่ยวข้องกับเวลา ดังนั้นเราจึงสามารถขอช่วงระยะเวลาที่กำหนดได้ อย่างไรก็ตาม เราสามารถระบุในคำขอได้ ไม่ใช่ตัวระบุแบบเต็ม แต่ระบุเฉพาะเวลา Unix เท่านั้น โดยละเว้นส่วนที่เกี่ยวข้องกับ ลำดับหมายเลข. ส่วนที่ละเว้นของตัวระบุจะถูกตั้งค่าเป็นศูนย์โดยอัตโนมัติที่จุดเริ่มต้นของช่วงและเป็นค่าสูงสุดที่เป็นไปได้ที่จุดสิ้นสุดของช่วง ด้านล่างนี้คือตัวอย่างวิธีที่คุณสามารถขอช่วงสองมิลลิวินาทีได้

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

เรามีเพียงรายการเดียวในช่วงนี้ อย่างไรก็ตาม ในชุดข้อมูลจริง ผลลัพธ์ที่ส่งคืนอาจมีขนาดใหญ่มาก สำหรับเหตุผลนี้ เอ็กซ์เรนจ์ รองรับตัวเลือก COUNT โดยการระบุปริมาณ เราสามารถรับระเบียน N แรกได้ หากเราต้องการรับ N records ถัดไป (การแบ่งหน้า) เราสามารถใช้ ID ที่ได้รับล่าสุด เพิ่มได้ ลำดับหมายเลข ทีละคนแล้วถามอีกครั้ง ลองดูตัวอย่างต่อไปนี้ เราเริ่มเพิ่มองค์ประกอบ 10 รายการด้วย XADD (สมมติว่า mystream เต็มไปด้วยองค์ประกอบ 10 รายการแล้ว) ในการเริ่มวนซ้ำโดยรับ 2 องค์ประกอบต่อคำสั่ง เราจะเริ่มต้นด้วยช่วงเต็ม แต่มี COUNT เท่ากับ 2

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

หากต้องการวนซ้ำสององค์ประกอบถัดไป เราต้องเลือก ID ล่าสุดที่ได้รับ เช่น 1519073279157-0 และเพิ่ม 1 ไปที่ ลำดับหมายเลข.
ID ผลลัพธ์ ในกรณีนี้คือ 1519073279157-1 สามารถใช้เป็นอาร์กิวเมนต์เริ่มต้นใหม่สำหรับการโทรครั้งถัดไป เอ็กซ์เรนจ์:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

และอื่นๆ เพราะมีความซับซ้อน เอ็กซ์เรนจ์ คือ O(log(N)) เพื่อค้นหา จากนั้น O(M) เพื่อส่งคืนองค์ประกอบ M จากนั้นแต่ละขั้นตอนการวนซ้ำจะรวดเร็ว ดังนั้นการใช้ เอ็กซ์เรนจ์ สตรีมสามารถวนซ้ำได้อย่างมีประสิทธิภาพ

ทีม เอ็กซ์เรฟเรนจ์ มีค่าเท่ากัน เอ็กซ์เรนจ์แต่ส่งคืนองค์ประกอบในลำดับย้อนกลับ:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

โปรดทราบว่าคำสั่ง เอ็กซ์เรฟเรนจ์ รับช่วงอาร์กิวเมนต์เริ่มต้นและหยุดในลำดับย้อนกลับ

การอ่านรายการใหม่โดยใช้ XREAD

บ่อยครั้งที่งานเกิดจากการสมัครรับสตรีมและรับเฉพาะข้อความใหม่ แนวคิดนี้อาจดูคล้ายกับ Redis Pub/Sub หรือการบล็อกรายการ Redis แต่วิธีใช้ Redis Stream มีความแตกต่างพื้นฐานดังนี้

  1. แต่ละข้อความใหม่จะถูกส่งไปยังสมาชิกทุกคนตามค่าเริ่มต้น ลักษณะการทำงานนี้แตกต่างจากการบล็อกรายการ Redis ซึ่งสมาชิกเพียงคนเดียวจะอ่านข้อความใหม่ได้
  2. ขณะที่อยู่ใน Redis Pub/Sub ข้อความทั้งหมดจะถูกลืมและไม่มีวันคงอยู่ แต่ใน Stream ข้อความทั้งหมดจะถูกเก็บไว้อย่างไม่มีกำหนด (เว้นแต่ไคลเอ็นต์จะทำให้เกิดการลบอย่างชัดเจน)
  3. Redis Stream ช่วยให้คุณแยกความแตกต่างในการเข้าถึงข้อความภายในสตรีมเดียว สมาชิกบางรายสามารถดูได้เฉพาะประวัติข้อความส่วนตัวเท่านั้น

คุณสามารถสมัครสมาชิกเธรดและรับข้อความใหม่โดยใช้คำสั่ง XREAD. มันซับซ้อนกว่าเล็กน้อย เอ็กซ์เรนจ์ดังนั้นเราจะเริ่มต้นด้วยตัวอย่างที่ง่ายกว่าก่อน

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

ตัวอย่างด้านบนแสดงแบบฟอร์มที่ไม่ปิดกั้น XREAD. โปรดทราบว่าตัวเลือก COUNT เป็นทางเลือก ในความเป็นจริง ตัวเลือกคำสั่งที่จำเป็นเพียงอย่างเดียวคือตัวเลือก STREAMS ซึ่งระบุรายการสตรีมพร้อมกับตัวระบุสูงสุดที่สอดคล้องกัน เราเขียนว่า "STREAMS mystream 0" - เราต้องการรับบันทึกทั้งหมดของสตรีม mystream ที่มีตัวระบุมากกว่า "0-0" ดังที่คุณเห็นจากตัวอย่าง คำสั่งส่งคืนชื่อของเธรดเนื่องจากเราสามารถสมัครสมาชิกหลายเธรดพร้อมกันได้ เราสามารถเขียนได้ เช่น "STREAMS mystream otherstream 0 0" โปรดทราบว่าหลังจากตัวเลือก STREAMS เราจำเป็นต้องระบุชื่อของสตรีมที่จำเป็นทั้งหมดก่อน จากนั้นจึงระบุเฉพาะรายการตัวระบุ

ในรูปแบบง่ายๆ นี้ คำสั่งไม่ได้ทำอะไรพิเศษเมื่อเปรียบเทียบกับ เอ็กซ์เรนจ์. แต่ที่น่าสนใจคือเราสามารถเลี้ยวได้ง่าย XREAD ไปยังคำสั่งการบล็อก โดยระบุอาร์กิวเมนต์ BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

ในตัวอย่างข้างต้น ตัวเลือก BLOCK ใหม่จะถูกระบุโดยมีการหมดเวลาเป็น 0 มิลลิวินาที (ซึ่งหมายถึงการรออย่างไม่มีกำหนด) ยิ่งไปกว่านั้น แทนที่จะส่งตัวระบุปกติสำหรับสตรีม mystream ตัวระบุพิเศษ $ ถูกส่งผ่าน ตัวระบุพิเศษนี้หมายความว่า XREAD ต้องใช้ตัวระบุสูงสุดใน mystream เป็นตัวระบุ ดังนั้นเราจะได้รับข้อความใหม่ตั้งแต่วินาทีที่เราเริ่มฟังเท่านั้น ในบางแง่จะคล้ายกับคำสั่ง "tail -f" ของ Unix

โปรดทราบว่าเมื่อใช้ตัวเลือก BLOCK เราไม่จำเป็นต้องใช้ตัวระบุพิเศษ $ เราสามารถใช้ตัวระบุที่มีอยู่ในสตรีมได้ หากทีมงานสามารถให้บริการตามคำขอของเราได้ทันทีโดยไม่ต้องบล็อคก็จะทำการบล็อค ไม่เช่นนั้นก็จะบล็อค

การปิดกั้น XREAD สามารถฟังหลายเธรดพร้อมกันได้ คุณเพียงแค่ต้องระบุชื่อเธรดเหล่านั้น ในกรณีนี้ คำสั่งจะส่งคืนบันทึกของสตรีมแรกที่ได้รับข้อมูล สมาชิกคนแรกที่ถูกบล็อกสำหรับเธรดที่กำหนดจะได้รับข้อมูลก่อน

กลุ่มผู้บริโภค

ในงานบางอย่าง เราต้องการจำกัดการเข้าถึงข้อความของสมาชิกภายในเธรดเดียว ตัวอย่างที่อาจเป็นประโยชน์คือคิวข้อความที่มีผู้ปฏิบัติงานซึ่งจะได้รับข้อความที่แตกต่างจากเธรด ทำให้การประมวลผลข้อความสามารถปรับขนาดได้

หากเราจินตนาการว่าเรามีสมาชิกสามคน C1, C2, C3 และเธรดที่มีข้อความ 1, 2, 3, 4, 5, 6, 7 ข้อความจะถูกให้บริการตามแผนภาพด้านล่าง:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

เพื่อให้บรรลุผลนี้ Redis Stream ใช้แนวคิดที่เรียกว่า Consumer Group แนวคิดนี้คล้ายกับสมาชิกหลอกซึ่งรับข้อมูลจากสตรีม แต่จริงๆ แล้วให้บริการโดยสมาชิกหลายรายภายในกลุ่ม โดยให้การรับประกันบางประการ:

  1. แต่ละข้อความจะถูกส่งไปยังสมาชิกที่แตกต่างกันภายในกลุ่ม
  2. ภายในกลุ่ม สมาชิกจะถูกระบุด้วยชื่อของพวกเขา ซึ่งเป็นสตริงที่คำนึงถึงตัวพิมพ์เล็กและตัวพิมพ์ใหญ่ หากสมาชิกออกจากกลุ่มชั่วคราว เขาสามารถกลับเข้าสู่กลุ่มได้โดยใช้ชื่อเฉพาะของตนเอง
  3. กลุ่มผู้บริโภคทุกกลุ่มปฏิบัติตามแนวคิด "ข้อความแรกที่ยังไม่ได้อ่าน" เมื่อสมาชิกร้องขอข้อความใหม่ จะสามารถรับได้เฉพาะข้อความที่ไม่เคยส่งถึงสมาชิกภายในกลุ่มมาก่อนเท่านั้น
  4. มีคำสั่งให้ยืนยันอย่างชัดเจนว่าข้อความได้รับการประมวลผลโดยสมาชิกเรียบร้อยแล้ว จนกว่าจะมีการเรียกคำสั่งนี้ ข้อความที่ร้องขอจะยังคงอยู่ในสถานะ "รอดำเนินการ"
  5. ภายในกลุ่มผู้บริโภค สมาชิกแต่ละรายสามารถขอประวัติข้อความที่ส่งถึงเขา แต่ยังไม่ได้ดำเนินการ (อยู่ในสถานะ "รอดำเนินการ")

ในแง่หนึ่ง สถานะของกลุ่มสามารถแสดงได้ดังนี้:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

ถึงเวลาทำความคุ้นเคยกับคำสั่งหลักสำหรับ Consumer Group แล้ว ได้แก่:

  • เอ็กซ์กรุ๊ป ใช้เพื่อสร้าง ทำลาย และจัดการกลุ่ม
  • XREADGROUP ใช้ในการอ่านสตรีมผ่านกลุ่ม
  • เอ็กซ์แซ็ก - คำสั่งนี้อนุญาตให้ผู้สมัครสมาชิกทำเครื่องหมายข้อความว่าประมวลผลสำเร็จแล้ว

การสร้างกลุ่มผู้บริโภค

สมมติว่า mystream มีอยู่แล้ว จากนั้นคำสั่งสร้างกลุ่มจะมีลักษณะดังนี้:

> XGROUP CREATE mystream mygroup $
OK

เมื่อสร้างกลุ่ม เราต้องส่งตัวระบุ โดยเริ่มจากกลุ่มที่จะได้รับข้อความ หากเราเพียงต้องการรับข้อความใหม่ทั้งหมด เราก็สามารถใช้ตัวระบุพิเศษ $ (ดังตัวอย่างด้านบน) หากคุณระบุ 0 แทนตัวระบุพิเศษ ข้อความทั้งหมดในเธรดจะพร้อมใช้งานสำหรับกลุ่ม

เมื่อสร้างกลุ่มแล้ว เราก็สามารถเริ่มอ่านข้อความได้ทันทีโดยใช้คำสั่ง XREADGROUP. คำสั่งนี้คล้ายกันมากกับ XREAD และรองรับตัวเลือก BLOCK เสริม อย่างไรก็ตาม มีตัวเลือก GROUP ที่จำเป็นซึ่งจะต้องระบุด้วยสองอาร์กิวเมนต์เสมอ: ชื่อกลุ่มและชื่อสมาชิก นอกจากนี้ยังรองรับตัวเลือก COUNT อีกด้วย

ก่อนจะอ่านกระทู้ ขอฝากข้อความไว้ดังนี้

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

ตอนนี้เรามาลองอ่านสตรีมนี้ผ่านกลุ่ม:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

คำสั่งดังกล่าวอ่านคำต่อคำดังนี้:

“ฉัน สมาชิก Alice ซึ่งเป็นสมาชิกของ mygroup ต้องการอ่านข้อความหนึ่งจาก mystream ที่ไม่เคยส่งถึงใครมาก่อน”

แต่ละครั้งที่สมาชิกดำเนินการกับกลุ่ม สมาชิกจะต้องระบุชื่อของตน โดยระบุตัวตนภายในกลุ่มโดยไม่ซ้ำกัน มีรายละเอียดที่สำคัญอีกประการหนึ่งในคำสั่งด้านบน - ตัวระบุพิเศษ ">" ตัวระบุพิเศษนี้จะกรองข้อความ เหลือเฉพาะข้อความที่ไม่เคยส่งมาก่อน

นอกจากนี้ ในกรณีพิเศษ คุณสามารถระบุตัวระบุจริง เช่น 0 หรือตัวระบุที่ถูกต้องอื่น ๆ ได้ ในกรณีนี้คือคำสั่ง XREADGROUP จะส่งคืนประวัติข้อความที่มีสถานะ "รอดำเนินการ" ที่ส่งไปยังสมาชิกที่ระบุ (อลิซ) แต่ยังไม่ได้รับการยอมรับโดยใช้คำสั่ง เอ็กซ์แซ็ก.

เราสามารถทดสอบพฤติกรรมนี้ได้โดยการระบุ ID 0 ทันทีโดยไม่มีตัวเลือก COUNT. เราจะเห็นข้อความที่ค้างอยู่เพียงข้อความเดียวนั่นคือข้อความ Apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

อย่างไรก็ตาม หากเรายืนยันว่าข้อความได้รับการประมวลผลเรียบร้อยแล้ว ข้อความนั้นจะไม่แสดงอีกต่อไป:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

ตอนนี้ถึงคราวของ Bob ที่จะอ่านอะไรบางอย่าง:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob สมาชิก mygroup ขอข้อความไม่เกินสองข้อความ คำสั่งรายงานเฉพาะข้อความที่ไม่ได้จัดส่งเนื่องจากตัวระบุพิเศษ ">" อย่างที่คุณเห็น ข้อความ "apple" จะไม่แสดงเนื่องจากได้ส่งถึง Alice แล้ว Bob จึงได้รับ "สีส้ม" และ "สตรอเบอร์รี่"

ด้วยวิธีนี้ Alice, Bob และสมาชิกคนอื่นๆ ในกลุ่มสามารถอ่านข้อความต่างๆ จากสตรีมเดียวกันได้ พวกเขายังสามารถอ่านประวัติข้อความที่ยังไม่ได้ประมวลผลหรือทำเครื่องหมายข้อความว่าประมวลผลแล้ว

มีบางสิ่งที่ควรคำนึงถึง:

  • ทันทีที่ผู้สมัครสมาชิกพิจารณาว่าข้อความนั้นเป็นคำสั่ง XREADGROUPข้อความนี้จะเข้าสู่สถานะ "รอดำเนินการ" และถูกกำหนดให้กับสมาชิกรายนั้น สมาชิกกลุ่มอื่นจะไม่สามารถอ่านข้อความนี้ได้
  • สมาชิกจะถูกสร้างขึ้นโดยอัตโนมัติเมื่อมีการพูดถึงครั้งแรก ไม่จำเป็นต้องสร้างสมาชิกอย่างชัดเจน
  • ด้วย XREADGROUP คุณสามารถอ่านข้อความจากหลายๆ เธรดได้พร้อมๆ กัน แต่เพื่อให้ทำงานได้ คุณต้องสร้างกลุ่มที่มีชื่อเดียวกันสำหรับแต่ละเธรดก่อนโดยใช้ เอ็กซ์กรุ๊ป

การกู้คืนหลังจากความล้มเหลว

สมาชิกสามารถกู้คืนจากความล้มเหลวและอ่านรายการข้อความของเขาอีกครั้งโดยมีสถานะ "รอดำเนินการ" อย่างไรก็ตาม ในโลกแห่งความเป็นจริง สมาชิกอาจล้มเหลวในที่สุด จะเกิดอะไรขึ้นกับข้อความที่ค้างของผู้สมัครสมาชิกหากสมาชิกไม่สามารถกู้คืนจากความล้มเหลวได้
Consumer Group เสนอคุณสมบัติที่ใช้สำหรับกรณีดังกล่าว - เมื่อคุณต้องการเปลี่ยนเจ้าของข้อความ

สิ่งแรกที่คุณต้องทำคือเรียกคำสั่ง เอ็กซ์เพนดิ้งซึ่งจะแสดงข้อความทั้งหมดในกลุ่มที่มีสถานะ "รอดำเนินการ" ในรูปแบบที่ง่ายที่สุด คำสั่งถูกเรียกโดยมีเพียงสองอาร์กิวเมนต์: ชื่อเธรดและชื่อกลุ่ม:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

ทีมงานแสดงจำนวนข้อความที่ยังไม่ได้ประมวลผลสำหรับทั้งกลุ่มและสมาชิกแต่ละคน เรามี Bob เพียงสองข้อความที่โดดเด่นเพราะข้อความเดียวที่อลิซร้องขอได้รับการยืนยันแล้ว เอ็กซ์แซ็ก.

เราสามารถขอข้อมูลเพิ่มเติมโดยใช้ข้อโต้แย้งเพิ่มเติม:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - ช่วงของตัวระบุ (คุณสามารถใช้ "-" และ "+")
{count} — จำนวนความพยายามในการจัดส่ง
{ชื่อผู้บริโภค} - ชื่อกลุ่ม

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

ตอนนี้เรามีรายละเอียดสำหรับแต่ละข้อความ: ID, ชื่อสมาชิก, เวลาว่างในหน่วยมิลลิวินาที และสุดท้ายคือจำนวนความพยายามในการจัดส่ง เรามีข้อความสองข้อความจาก Bob และข้อความเหล่านั้นไม่ได้ใช้งานเป็นเวลา 74170458 มิลลิวินาที หรือประมาณ 20 ชั่วโมง

โปรดทราบว่าไม่มีใครหยุดเราจากการตรวจสอบว่าเนื้อหาของข้อความนั้นเป็นเพียงการใช้อะไร เอ็กซ์เรนจ์.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

เราแค่ต้องทำซ้ำตัวระบุเดียวกันสองครั้งในอาร์กิวเมนต์ ตอนนี้เรามีความคิดบางอย่างแล้ว Alice อาจตัดสินใจว่าหลังจากหยุดทำงานไปแล้ว 20 ชั่วโมง Bob อาจจะไม่ฟื้นตัว และถึงเวลาที่จะสืบค้นข้อความเหล่านั้นและดำเนินการประมวลผลต่อให้กับ Bob สำหรับสิ่งนี้เราใช้คำสั่ง เอ็กซ์คลาอิม:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

การใช้คำสั่งนี้เราจะได้รับข้อความ "ต่างประเทศ" ที่ยังไม่ได้ประมวลผลโดยการเปลี่ยนเจ้าของเป็น {consumer} อย่างไรก็ตาม เรายังสามารถระบุเวลาว่างขั้นต่ำได้ {min-idle-time} ซึ่งจะช่วยหลีกเลี่ยงสถานการณ์ที่ไคลเอนต์สองรายพยายามเปลี่ยนเจ้าของข้อความเดียวกันพร้อมกัน:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

ลูกค้ารายแรกจะรีเซ็ตเวลาหยุดทำงานและเพิ่มเคาน์เตอร์จัดส่ง ดังนั้นลูกค้ารายที่สองจะไม่สามารถร้องขอได้

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice อ้างสิทธิ์ข้อความสำเร็จแล้ว ซึ่งขณะนี้สามารถประมวลผลข้อความและรับทราบได้แล้ว

จากตัวอย่างข้างต้น คุณจะเห็นว่าคำขอที่สำเร็จส่งคืนเนื้อหาของข้อความนั้นเอง อย่างไรก็ตาม สิ่งนี้ไม่จำเป็น ตัวเลือก JUSTID สามารถใช้เพื่อส่งคืนรหัสข้อความเท่านั้น สิ่งนี้มีประโยชน์หากคุณไม่สนใจรายละเอียดของข้อความและต้องการเพิ่มประสิทธิภาพของระบบ

เคาน์เตอร์จัดส่ง

ตัวนับที่คุณเห็นในเอาต์พุต เอ็กซ์เพนดิ้ง คือจำนวนครั้งที่ส่งข้อความแต่ละข้อความ ตัวนับดังกล่าวจะเพิ่มขึ้นในสองวิธี: เมื่อมีการร้องขอข้อความผ่านทางสำเร็จ เอ็กซ์คลาอิม หรือเมื่อมีการใช้สาย XREADGROUP.

เป็นเรื่องปกติที่ข้อความบางข้อความจะถูกส่งหลายครั้ง สิ่งสำคัญคือข้อความทั้งหมดจะได้รับการประมวลผลในที่สุด บางครั้งปัญหาเกิดขึ้นเมื่อประมวลผลข้อความเนื่องจากตัวข้อความเสียหาย หรือการประมวลผลข้อความทำให้เกิดข้อผิดพลาดในโค้ดตัวจัดการ ในกรณีนี้อาจกลายเป็นว่าไม่มีใครสามารถประมวลผลข้อความนี้ได้ เนื่องจากเรามีตัวนับความพยายามในการจัดส่ง เราจึงสามารถใช้ตัวนับนี้เพื่อตรวจจับสถานการณ์ดังกล่าวได้ ดังนั้น เมื่อจำนวนการจัดส่งถึงจำนวนสูงสุดที่คุณระบุ ก็คงจะดีกว่าถ้าจะฝากข้อความดังกล่าวไว้ในเธรดอื่น และส่งการแจ้งเตือนไปยังผู้ดูแลระบบ

สถานะเธรด

ทีม ซินโฟ ใช้เพื่อขอข้อมูลต่าง ๆ เกี่ยวกับเธรดและกลุ่ม ตัวอย่างเช่น คำสั่งพื้นฐานมีลักษณะดังนี้:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

คำสั่งด้านบนแสดงข้อมูลทั่วไปเกี่ยวกับสตรีมที่ระบุ ตอนนี้เป็นตัวอย่างที่ซับซ้อนกว่าเล็กน้อย:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

คำสั่งด้านบนจะแสดงข้อมูลทั่วไปสำหรับทุกกลุ่มของเธรดที่ระบุ

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

คำสั่งด้านบนจะแสดงข้อมูลสำหรับสมาชิกทั้งหมดของสตรีมและกลุ่มที่ระบุ
หากคุณลืมไวยากรณ์คำสั่ง เพียงขอความช่วยเหลือจากคำสั่ง:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

ขีดจำกัดขนาดสตรีม

แอปพลิเคชันจำนวนมากไม่ต้องการรวบรวมข้อมูลลงในสตรีมตลอดไป การมีจำนวนข้อความสูงสุดที่อนุญาตต่อเธรดมักเป็นประโยชน์ ในกรณีอื่นๆ จะเป็นประโยชน์ในการย้ายข้อความทั้งหมดจากเธรดไปยังที่จัดเก็บถาวรอื่นเมื่อถึงขนาดเธรดที่ระบุ คุณสามารถจำกัดขนาดของสตรีมได้โดยใช้พารามิเตอร์ MAXLEN ในคำสั่ง XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

เมื่อใช้ MAXLEN บันทึกเก่าจะถูกลบโดยอัตโนมัติเมื่อถึงความยาวที่กำหนด ดังนั้นสตรีมจึงมีขนาดคงที่ อย่างไรก็ตาม การตัดในกรณีนี้ไม่ได้เกิดขึ้นในวิธีที่มีประสิทธิภาพมากที่สุดในหน่วยความจำ Redis คุณสามารถปรับปรุงสถานการณ์ได้ดังนี้:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

อาร์กิวเมนต์ ~ ในตัวอย่างข้างต้นหมายความว่าเราไม่จำเป็นต้องจำกัดความยาวของสตรีมให้เป็นค่าเฉพาะ ในตัวอย่างของเรา นี่อาจเป็นตัวเลขใดๆ ที่มากกว่าหรือเท่ากับ 1000 (เช่น 1000, 1010 หรือ 1030) เราเพิ่งระบุไว้อย่างชัดเจนว่าเราต้องการให้สตรีมของเราจัดเก็บบันทึกอย่างน้อย 1000 รายการ ทำให้การจัดการหน่วยความจำภายใน Redis มีประสิทธิภาพมากขึ้น

มีแยกทีมด้วย เอ็กซ์ตริมซึ่งทำสิ่งเดียวกัน:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

การจัดเก็บและการจำลองแบบถาวร

Redis Stream ถูกจำลองแบบอะซิงโครนัสไปยังโหนดทาสและบันทึกลงในไฟล์เช่น AOF (สแนปช็อตของข้อมูลทั้งหมด) และ RDB (บันทึกของการดำเนินการเขียนทั้งหมด) นอกจากนี้ยังรองรับการจำลองสถานะกลุ่มผู้บริโภคด้วย ดังนั้น หากข้อความอยู่ในสถานะ "รอดำเนินการ" บนโหนดหลัก ดังนั้นบนโหนดทาส ข้อความนี้จะมีสถานะเดียวกัน

การลบองค์ประกอบแต่ละรายการออกจากสตรีม

มีคำสั่งพิเศษให้ลบข้อความ เอ็กซ์เดล. คำสั่งได้รับชื่อของเธรดตามด้วย ID ข้อความที่ต้องการลบ:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

เมื่อใช้คำสั่งนี้คุณต้องคำนึงว่าหน่วยความจำจริงจะไม่ถูกปล่อยออกมาทันที

สตรีมที่มีความยาวเป็นศูนย์

ความแตกต่างระหว่างสตรีมและโครงสร้างข้อมูล Redis อื่นๆ ก็คือ เมื่อโครงสร้างข้อมูลอื่นๆ ไม่มีองค์ประกอบอยู่ภายในอีกต่อไป โครงสร้างข้อมูลเองก็จะถูกลบออกจากหน่วยความจำ ซึ่งเป็นผลข้างเคียง ตัวอย่างเช่น ชุดที่เรียงลำดับจะถูกลบออกทั้งหมดเมื่อการเรียก ZREM ลบองค์ประกอบสุดท้าย เธรดจะได้รับอนุญาตให้ยังคงอยู่ในหน่วยความจำแทนแม้ว่าจะไม่มีองค์ประกอบใดๆ อยู่ภายในก็ตาม

ข้อสรุป

Redis Stream เหมาะอย่างยิ่งสำหรับการสร้างตัวรับส่งข้อความ คิวข้อความ การบันทึกแบบรวมศูนย์ และระบบแชทเพื่อเก็บประวัติ

อย่างที่ฉันเคยกล่าวไว้ นิเคลาส์ เวิร์ธโปรแกรมคืออัลกอริธึมบวกโครงสร้างข้อมูล และ Redis ก็ให้ทั้งสองอย่างแก่คุณแล้ว

ที่มา: will.com

เพิ่มความคิดเห็น