Redis Stream - таны мессежийн системийн найдвартай байдал, өргөтгөх чадвар

Redis Stream - таны мессежийн системийн найдвартай байдал, өргөтгөх чадвар

Redis Stream нь Redis-д 5.0 хувилбараар нэвтрүүлсэн шинэ хийсвэр өгөгдлийн төрөл юм
Үзэл баримтлалын хувьд Redis Stream бол оруулга нэмэх боломжтой жагсаалт юм. Оруулга бүр өвөрмөц танигчтай. Анхдагч байдлаар, ID нь автоматаар үүсгэгддэг бөгөөд цагийн тэмдэг агуулсан байдаг. Тиймээс, Unix-ийн "tail -f" команд нь лог файлыг уншиж, шинэ өгөгдөл хүлээж байхад царцдаг шиг цаг хугацааны явцад та бичлэгийн хүрээг асууж, эсвэл урсгалд орж ирэхэд шинэ өгөгдөл хүлээн авах боломжтой. Олон "tail -f" процессууд хоорондоо зөрчилдөхгүйгээр файлыг нэгэн зэрэг уншиж чаддаг шиг олон үйлчлүүлэгч нэгэн зэрэг урсгалыг сонсож чадна гэдгийг анхаарна уу.

Шинэ төрлийн өгөгдлийн бүх ашиг тусыг ойлгохын тулд Redis Stream-ийн функцийг хэсэгчлэн хуулбарладаг Redis-ийн удаан хугацааны бүтцийг харцгаая.

Redis PUB/SUB

Redis Pub/Sub бол таны түлхүүр утгын дэлгүүрт аль хэдийн суулгасан энгийн мессежийн систем юм. Гэсэн хэдий ч энгийн байдал нь үнэтэй байдаг:

  • Хэрэв нийтлэгч ямар нэг шалтгаанаар бүтэлгүйтвэл тэр бүх захиалагчаа алддаг
  • Нийтлэгч нь бүх захиалагчдынхаа хаягийг яг таг мэдэх ёстой
  • Өгөгдлийг боловсруулснаас хурдан нийтлэх юм бол нийтлэгч захиалагчдаа ажлаа хэт ачаалж болзошгүй
  • Зурвас нийтлэгдсэний дараа хэчнээн захиалагчдад хүргэгдсэн, тэд энэ мессежийг хэр хурдан боловсруулж чадсанаас үл хамааран нийтлэгчийн буферээс устгагдах болно.
  • Бүх захиалагчид нэгэн зэрэг мессеж хүлээн авах болно. Захиалагчид өөрсдөө ижил мессежийг боловсруулах дарааллыг хооронд нь ямар нэгэн байдлаар тохиролцох ёстой.
  • Захиалагч мессежийг амжилттай боловсруулсныг баталгаажуулах ямар ч суурилуулсан механизм байдаггүй. Хэрэв захиалагч мессеж хүлээн аваад боловсруулах явцад гацсан бол нийтлэгч энэ тухай мэдэхгүй.

Redis жагсаалт

Redis List нь унших командыг хаахыг дэмждэг өгөгдлийн бүтэц юм. Та жагсаалтын эхнээс эсвэл төгсгөлөөс мессеж нэмж уншиж болно. Энэ бүтцэд үндэслэн та тараагдсан системдээ сайн стек эсвэл дараалал үүсгэж болох бөгөөд ихэнх тохиолдолд энэ нь хангалттай байх болно. Redis Pub/Sub-аас гол ялгаа:

  • Мессежийг нэг үйлчлүүлэгчид хүргэдэг. Уншихыг хориглосон анхны үйлчлүүлэгч эхлээд өгөгдлийг хүлээн авна.
  • Клинт мессеж бүрийн унших ажиллагааг өөрөө эхлүүлэх ёстой. Жагсаалт нь үйлчлүүлэгчдийн талаар юу ч мэдэхгүй.
  • Мессежийг хэн нэгэн унших эсвэл устгах хүртэл хадгалагдана. Хэрэв та Redis серверийг диск рүү өгөгдөл дамжуулахаар тохируулсан бол системийн найдвартай байдал эрс нэмэгддэг.

Stream-ийн танилцуулга

Дамжуулалтад оруулга нэмж байна

баг XADD урсгалд шинэ оруулга нэмнэ. Бичлэг нь зүгээр нэг мөр биш, нэг буюу хэд хэдэн түлхүүр-утга хосоос бүрдэнэ. Тиймээс оруулга бүр нь аль хэдийн бүтэцлэгдсэн бөгөөд CSV файлын бүтэцтэй төстэй.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Дээрх жишээнд бид "mystream" гэсэн нэртэй (түлхүүр) хоёр талбарыг нэмж оруулав: "мэдрэгч-id" ба "температур" нь "1234" ба "19.8" гэсэн утгатай. Хоёр дахь аргументын хувьд тушаал нь оруулгад хуваарилагдах танигчийг авдаг - энэ танигч нь урсгал дахь оруулга бүрийг өвөрмөц байдлаар тодорхойлдог. Гэсэн хэдий ч, бид Redis-аас бидэнд шинэ ID үүсгэхийг хүсч байгаа тул энэ тохиолдолд бид * тэнцсэн. Шинэ ID бүр нэмэгдэх болно. Тиймээс шинэ оруулга бүр өмнөх оруулгуудтай харьцуулахад илүү өндөр танигчтай байх болно.

Тодорхойлогч формат

Оруулсан ID-г тушаалаар буцаасан XADD, хоёр хэсгээс бүрдэнэ:

{millisecondsTime}-{sequenceNumber}

миллисекунд цаг — Unix цаг миллисекундээр (Redis серверийн цаг). Гэсэн хэдий ч, хэрэв одоогийн цаг нь өмнөх бичлэгийн цагтай ижил эсвэл бага байвал өмнөх бичлэгийн цагийг ашиглана. Тиймээс, хэрэв серверийн цаг хугацаа буцвал шинэ танигч нь өсөлтийн шинж чанарыг хадгалсаар байх болно.

дарааллын дугаар ижил миллисекундэд үүсгэсэн бичлэгүүдэд ашигладаг. дарааллын дугаар өмнөх оруулгатай харьцуулахад 1-ээр нэмэгдэнэ. Учир нь дарааллын дугаар нь 64 бит хэмжээтэй бол практик дээр нэг миллисекундэд үүсгэж болох бичлэгийн тоог хязгаарлаж болохгүй.

Ийм таних хэлбэр нь эхлээд харахад хачирхалтай санагдаж магадгүй юм. Найдваргүй уншигч цаг яагаад тодорхойлогчийн нэг хэсэг вэ гэж гайхаж магадгүй юм. Шалтгаан нь Redis урсгал нь ID-аар хүрээний асуулгыг дэмждэг. Тодорхойлогч нь бичлэгийг үүсгэсэн цагтай холбоотой байдаг тул энэ нь цаг хугацааны хязгаарыг хайх боломжтой болгодог. Бид тушаалыг харахдаа тодорхой жишээг авч үзэх болно XRANGE.

Хэрэв ямар нэг шалтгааны улмаас хэрэглэгч өөрийн танигчийг зааж өгөх шаардлагатай бол, жишээлбэл, ямар нэгэн гадаад системтэй холбоотой бол бид үүнийг команд руу дамжуулж болно. XADD доор үзүүлсэн шиг *-ийн оронд:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Энэ тохиолдолд та ID өсөлтийг өөрөө хянах ёстой гэдгийг анхаарна уу. Бидний жишээнд хамгийн бага танигч нь "0-1" тул тушаал нь "0-1"-тэй тэнцүү буюу түүнээс бага өөр танигчийг хүлээн авахгүй.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Дамжуулалт бүрт бичлэгийн тоо

Энэ командыг ашиглан урсгал дахь бичлэгийн тоог авах боломжтой XLEN. Бидний жишээнд энэ тушаал нь дараах утгыг буцаана.

> XLEN somestream
(integer) 2

Хүрээний асуулга - XRANGE болон XREVRANGE

Хүрээгээр өгөгдөл хүсэхийн тулд бид мужийн эхлэл ба төгсгөл гэсэн хоёр танигчийг зааж өгөх хэрэгтэй. Буцаагдсан мужид бүх элементүүд, түүний дотор хил хязгаар орно. Мөн урсгал дахь хамгийн жижиг (эхний бичлэг) болон хамгийн том (сүүлийн бичлэг) тодорхойлогч гэсэн утгатай "-" ба "+" гэсэн хоёр тусгай танигч байдаг. Доорх жишээнд бүх дамжуулалтын оруулгуудыг жагсаах болно.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Буцаагдсан бичлэг бүр нь танигч ба түлхүүр-утга хосын жагсаалт гэсэн хоёр элементийн массив юм. Бичлэгийн тодорхойлогч нь цаг хугацаатай холбоотой гэдгийг бид аль хэдийн хэлсэн. Тиймээс бид тодорхой хугацааны туршид хүсэлт гаргаж болно. Гэсэн хэдий ч бид хүсэлтэд бүрэн танигч биш, зөвхөн Unix-ийн цагийг зааж өгөх боломжтой бөгөөд үүнд хамаарах хэсгийг орхиж болно. дарааллын дугаар. Тодорхойлогчийн орхигдсон хэсэг нь мужын эхэнд автоматаар тэг болж, мужийн төгсгөлд боломжит хамгийн дээд утгад тохируулагдана. Та хоёр миллисекундын мужийг хэрхэн хүсэх жишээг доор харуулав.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Бидэнд энэ мужид зөвхөн нэг оруулга байгаа боловч бодит өгөгдлийн багцад буцаж ирсэн үр дүн нь асар их байж болно. Энэ шалтгааны улмаас XRANGE COUNT сонголтыг дэмждэг. Тоо хэмжээг зааж өгснөөр бид эхний N бичлэгийг авч болно. Хэрэв бид дараагийн N бичлэг (хуудас) авах шаардлагатай бол бид хамгийн сүүлд хүлээн авсан ID-г ашиглаж болно, үүнийг нэмэгдүүлээрэй дарааллын дугаар нэг нэгээр нь дахин асуу. Үүнийг дараах жишээн дээр харцгаая. Бид 10 элемент нэмж эхэлдэг XADD (mystream аль хэдийн 10 элементээр дүүрсэн гэж үзвэл). Нэг команд бүрт 2 элемент авч давталтыг эхлүүлэхийн тулд бид бүтэн мужаас эхэлнэ, гэхдээ COUNT нь 2-той тэнцүү байна.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Дараагийн хоёр элементтэй давталтыг үргэлжлүүлэхийн тулд бид хамгийн сүүлд хүлээн авсан ID буюу 1519073279157-0-г сонгоод 1-ийг нэмэх хэрэгтэй. дарааллын дугаар.
Үүссэн ID, энэ тохиолдолд 1519073279157-1-ийг дараагийн дуудлагын мужын шинэ эхлэл болгон ашиглаж болно. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

гэх мэт. Учир нь нарийн төвөгтэй байдал XRANGE хайлт хийхэд O(log(N)), дараа нь M элементийг буцаах O(M) байвал давталтын алхам бүр хурдан болно. Тиймээс ашиглах XRANGE урсгалуудыг үр дүнтэй давтаж болно.

баг XREVRANGE тэнцүү байна XRANGE, гэхдээ урвуу дарааллаар элементүүдийг буцаана:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

тушаал гэдгийг анхаарна уу XREVRANGE мужын аргументуудыг урвуу дарааллаар эхлүүлж, зогсооно.

XREAD ашиглан шинэ оруулгуудыг уншиж байна

Ихэнхдээ урсгалд бүртгүүлэх, зөвхөн шинэ мессеж хүлээн авах даалгавар үүсдэг. Энэ ойлголт нь Redis Pub/Sub эсвэл Redis List-ийг хаахтай төстэй мэт санагдаж болох ч Redis Stream-г хэрхэн ашиглахад үндсэн ялгаа бий:

  1. Шинэ мессеж бүрийг анхдагч байдлаар захиалагч бүрт хүргэдэг. Энэ үйлдэл нь шинэ мессежийг зөвхөн нэг захиалагч унших боломжтой Redis List-аас ялгаатай.
  2. Redis Pub/Sub-д бүх мессежүүд мартагдаж, хэзээ ч хадгалагдахгүй байхад Stream-д бүх мессежүүд тодорхойгүй хугацаагаар хадгалагддаг (үйлчлүүлэгч нь устгахыг тодорхой шалтгаангүй бол).
  3. Redis Stream нь нэг урсгал доторх мессежүүдэд хандах хандалтыг ялгах боломжийг танд олгоно. Тодорхой захиалагч зөвхөн өөрийн хувийн мессежийн түүхийг харах боломжтой.

Та уг командыг ашиглан хэлхээнд бүртгүүлж, шинэ мессеж хүлээн авах боломжтой XREAD. Энэ нь арай илүү төвөгтэй юм XRANGE, тиймээс бид эхлээд энгийн жишээнүүдээс эхэлнэ.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Дээрх жишээ нь блоклохгүй маягтыг харуулж байна XREAD. COUNT сонголт нь сонголттой гэдгийг анхаарна уу. Үнэн хэрэгтээ шаардлагатай цорын ганц тушаалын сонголт бол STREAMS сонголт бөгөөд урсгалын жагсаалтыг харгалзах хамгийн их танигчийн хамт зааж өгдөг. Бид "STREAMS mystream 0" гэж бичсэн - бид "0-0"-ээс их танигчтай mystream урсгалын бүх бичлэгийг хүлээн авахыг хүсч байна. Жишээнээс харахад бид олон хэлхээнд нэгэн зэрэг бүртгүүлэх боломжтой тул команд нь хэлхээний нэрийг буцаадаг. Бид жишээ нь "STREAMS mystream otherstream 0 0" гэж бичиж болно. STREAMS сонголтын дараа бид эхлээд шаардлагатай бүх урсгалуудын нэрийг өгч, дараа нь тодорхойлогчдын жагсаалтыг өгөх хэрэгтэй гэдгийг анхаарна уу.

Энэ энгийн хэлбэрээр команд нь харьцуулахад ямар ч онцгой зүйл хийдэггүй XRANGE. Гэсэн хэдий ч сонирхолтой зүйл бол бид амархан эргэх боломжтой юм XREAD Блоклох аргументыг зааж блоклох команд руу:

> XREAD BLOCK 0 STREAMS mystream $

Дээрх жишээнд шинэ БЛОК сонголтыг 0 миллисекундын хугацаатай зааж өгсөн (энэ нь тодорхойгүй хугацаагаар хүлээх гэсэн үг). Түүнээс гадна mystream урсгалын ердийн танигчийг дамжуулахын оронд $ тусгай танигчийг дамжуулсан. Энэ тусгай танигч нь үүнийг илэрхийлдэг XREAD mystream дахь хамгийн их танигчийг танигч болгон ашиглах ёстой. Тиймээс бид сонсож эхэлсэн цагаасаа эхлэн зөвхөн шинэ мессеж хүлээн авах болно. Энэ нь зарим талаараа Unix-ийн "tail -f" командтай төстэй юм.

BLOCK сонголтыг ашиглахдаа бид заавал $ гэсэн тусгай танигчийг ашиглах шаардлагагүй гэдгийг анхаарна уу. Бид урсгалд байгаа ямар ч танигчийг ашиглаж болно. Хэрэв баг манай хүсэлтийг блоклохгүйгээр шууд үйлчилж чадвал үүнийг хийх болно, үгүй ​​бол блоклох болно.

Блоклох XREAD Мөн олон хэлхээг нэгэн зэрэг сонсох боломжтой, та зөвхөн нэрийг нь зааж өгөхөд л хангалттай. Энэ тохиолдолд тушаал нь өгөгдөл хүлээн авсан эхний урсгалын бичлэгийг буцаана. Өгөгдсөн хэлхээнд блоклогдсон эхний захиалагч эхлээд өгөгдлийг хүлээн авах болно.

Хэрэглэгчийн бүлгүүд

Тодорхой ажлуудад бид захиалагчийн нэг урсгал доторх мессежүүдэд хандах хандалтыг хязгаарлахыг хүсч байна. Энэ нь ашигтай байж болох жишээ бол хэлхээнээс өөр өөр мессеж хүлээн авах ажилчидтай мессежийн дараалал байж, мессеж боловсруулалтыг өргөжүүлэх боломжийг олгодог.

Хэрэв бид C1, C2, C3 гэсэн гурван захиалагчтай, 1, 2, 3, 4, 5, 6, 7 гэсэн мессежүүдийг агуулсан хэлхээтэй гэж төсөөлвөл доорх диаграмын дагуу мессежүүд үйлчлэх болно.

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Энэ үр дүнд хүрэхийн тулд Redis Stream нь Consumer Group хэмээх ойлголтыг ашигладаг. Энэ үзэл баримтлал нь дамжуулалтаас өгөгдөл хүлээн авдаг псевдо-захиалагчтай төстэй боловч үнэндээ бүлэг доторх олон захиалагчаар үйлчилдэг бөгөөд тодорхой баталгааг өгдөг:

  1. Мессеж бүрийг групп доторх өөр захиалагчдад хүргэдэг.
  2. Бүлэг дотор захиалагчдыг нэрээр нь тодорхойлдог бөгөөд энэ нь том жижиг үсгээр ялгах тэмдэгт мөр юм. Хэрэв захиалагч группээс түр гарвал түүнийг өөрийн өвөрмөц нэрийг ашиглан бүлэгт сэргээж болно.
  3. Хэрэглэгчийн бүлэг бүр "анхны уншаагүй мессеж" гэсэн ойлголтыг баримталдаг. Захиалагч шинэ зурвас илгээх хүсэлт гаргахдаа зөвхөн бүлгийн аль ч захиалагчид өмнө нь хэзээ ч хүргэж байгаагүй мессежийг хүлээн авах боломжтой.
  4. Захиалагч мессежийг амжилттай боловсруулсан гэдгийг тодорхой баталгаажуулах тушаал байдаг. Энэ тушаалыг дуудах хүртэл хүссэн зурвас "хүлээгдэж буй" төлөвт үлдэнэ.
  5. Хэрэглэгчийн бүлэгт захиалагч бүр өөрт нь хүргэгдсэн боловч боловсруулагдаагүй байгаа мессежүүдийн түүхийг хүсэх боломжтой ("хүлээгдэж буй" төлөвт)

Нэг ёсондоо бүлгийн төлөв байдлыг дараах байдлаар илэрхийлж болно.

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Одоо Хэрэглэгчийн бүлгийн үндсэн командуудтай танилцах цаг болжээ, тухайлбал:

  • XGROUP бүлгүүдийг үүсгэх, устгах, удирдахад ашигладаг
  • XREADGROUP бүлгээр дамжуулж уншихад ашигладаг
  • ХАСК - энэ тушаал нь захиалагчийг мессежийг амжилттай боловсруулсан гэж тэмдэглэх боломжийг олгодог

Хэрэглэгчийн бүлэг байгуулах

Mystream аль хэдийн байгаа гэж бодъё. Дараа нь бүлэг үүсгэх тушаал дараах байдлаар харагдах болно.

> XGROUP CREATE mystream mygroup $
OK

Бүлэг үүсгэх үед бид тодорхойлогчийг дамжуулах ёстой бөгөөд үүнээс эхлэн бүлэг мессеж хүлээн авах болно. Хэрэв бид бүх шинэ мессежийг хүлээн авахыг хүсвэл $ тусгай танигчийг ашиглаж болно (дээрх жишээн дээрх шиг). Хэрэв та тусгай танигчийн оронд 0-г зааж өгвөл хэлхээн дэх бүх мессежийг бүлэгт ашиглах боломжтой болно.

Бүлэг үүсгэгдсэн тул бид тушаалыг ашиглан шууд мессеж уншиж эхлэх боломжтой XREADGROUP. Энэ тушаал нь маш төстэй юм XREAD мөн нэмэлт БЛОК сонголтыг дэмждэг. Гэсэн хэдий ч шаардлагатай GROUP сонголт байдаг бөгөөд үүнийг үргэлж хоёр аргументтай зааж өгөх ёстой: бүлгийн нэр болон захиалагчийн нэр. COUNT сонголтыг мөн дэмждэг.

Сэдвийг уншихаасаа өмнө тэнд хэдэн мессеж бичье:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Одоо энэ дамжуулалтыг группээр дамжуулан уншихыг хичээцгээе:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Дээрх тушаалыг дараах байдлаар үгчлэн уншина.

"Би, захиалагч Алис, миний группын гишүүн, өмнө нь хэзээ ч хэнд ч хүргэж байгаагүй mystream-ээс нэг мессеж уншихыг хүсч байна."

Захиалагч нь бүлэгт ямар нэгэн үйлдэл хийх бүрдээ тухайн бүлэгт өөрийгөө таниулах нэрээ зааж өгөх ёстой. Дээрх тушаалд бас нэг чухал мэдээлэл бий - тусгай танигч ">". Энэхүү тусгай танигч нь мессежийг шүүж, зөвхөн өмнө нь хэзээ ч хүргэж байгаагүй мессежүүдийг үлдээдэг.

Түүнчлэн, онцгой тохиолдолд та 0 эсвэл бусад хүчинтэй танигч гэх мэт бодит танигчийг зааж өгч болно. Энэ тохиолдолд тушаал XREADGROUP заасан захиалагчид (Алис) хүргэсэн боловч тушаалыг ашиглан хараахан хүлээн зөвшөөрөөгүй байгаа "хүлээгдэж буй" статустай мессежүүдийн түүхийг танд буцааж өгөх болно. ХАСК.

Бид сонголтгүйгээр ID 0-г нэн даруй зааж өгснөөр энэ үйлдлийг шалгаж болно COUNT. Бид зүгээр л нэг хүлээгдэж буй мессежийг харах болно, өөрөөр хэлбэл алимны мессеж:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Гэсэн хэдий ч, хэрэв бид мессежийг амжилттай боловсруулсан болохыг баталгаажуулвал энэ нь цаашид харагдахгүй болно:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Одоо Боб ямар нэг зүйлийг унших ээлж боллоо:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Mygroup-ийн гишүүн Боб хоёроос илүүгүй мессеж хүссэн. Тус команд нь зөвхөн ">" тусгай танигчийн улмаас хүргэгдээгүй мессежийг мэдээлдэг. Таны харж байгаагаар "алим" гэсэн мессеж нь Алисад аль хэдийн хүргэгдсэн тул харагдахгүй тул Боб "улбар шар", "гүзээлзгэнэ" хүлээн авдаг.

Ингэснээр Алис, Боб болон бүлгийн бусад захиалагч нэг дамжуулалтаас өөр өөр мессежийг унших боломжтой. Тэд мөн боловсруулаагүй мессежүүдийн түүхийг унших эсвэл мессежийг боловсруулсан гэж тэмдэглэх боломжтой.

Хэд хэдэн зүйлийг анхаарах хэрэгтэй:

  • Захиалагч мессежийг тушаал гэж үзсэн даруйд XREADGROUP, энэ мессеж нь "хүлээгдэж буй" төлөвт шилжиж, тухайн захиалагч руу хуваарилагдана. Бусад бүлгийн захиалагчид энэ мессежийг унших боломжгүй.
  • Захиалагчдыг анх дурьдахад автоматаар үүсгэгддэг тул тэдгээрийг тодорхой үүсгэх шаардлагагүй.
  • Тусламжийн тусламжтайгаар XREADGROUP Та хэд хэдэн өөр хэлхээсээс ирсэн мессежийг нэгэн зэрэг уншиж болно, гэхдээ энэ нь ажиллахын тулд эхлээд хэлхээ болгонд ижил нэртэй бүлгүүдийг үүсгэх хэрэгтэй. XGROUP

Амжилтгүй болсоны дараа сэргээх

Захиалагч алдаагаа сэргээж, "хүлээгдэж буй" статустай мессежийн жагсаалтыг дахин унших боломжтой. Гэсэн хэдий ч бодит ертөнцөд захиалагчид эцэст нь бүтэлгүйтэж магадгүй юм. Захиалагч алдаагаа сэргээж чадахгүй бол захиалагчийн гацсан мессежийг яах вэ?
Consumer Group нь мессеж эзэмшигчийг солих шаардлагатай үед яг ийм тохиолдолд ашигладаг функцийг санал болгодог.

Таны хийх ёстой хамгийн эхний зүйл бол командыг дуудах явдал юм XPENDING, "хүлээгдэж буй" статустай бүлгийн бүх мессежийг харуулдаг. Хамгийн энгийн хэлбэрээр командыг зөвхөн хоёр аргументаар дууддаг: урсгалын нэр ба бүлгийн нэр:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Баг бүхэл бүтэн бүлэг болон захиалагч бүрийн боловсруулаагүй мессежийн тоог харуулав. Алисын хүссэн цорын ганц мессежийг баталгаажуулсан тул бидэнд зөвхөн хоёр гайхалтай зурвастай Боб байна ХАСК.

Бид нэмэлт аргумент ашиглан нэмэлт мэдээлэл авах хүсэлт гаргаж болно:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - тодорхойлогчдын хүрээ (та "-" ба "+"-г ашиглаж болно)
{count} - хүргэх оролдлогын тоо
{хэрэглэгчийн нэр} - бүлгийн нэр

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Одоо бид мессеж бүрийн дэлгэрэнгүй мэдээлэлтэй байна: ID, захиалагчийн нэр, миллисекунд дэх сул зогсолт, эцэст нь хүргэх оролдлогын тоо. Бидэнд Бобоос хоёр мессеж ирсэн бөгөөд тэд 74170458 миллисекунд буюу 20 цаг орчим идэвхгүй байсан.

Зүгээр л ашиглан мессежийн агуулгыг шалгахад хэн ч саад болохгүй гэдгийг анхаарна уу XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Аргументуудад бид нэг тодорхойлогчийг хоёр удаа давтах хэрэгтэй. Одоо бидэнд ямар нэгэн санаа байгаа тул Алис 20 цаг завсарласны дараа Боб сэргэхгүй байх, мөн эдгээр мессежүүдийг асууж, тэдгээрийг Бобын хувьд үргэлжлүүлэн боловсруулах цаг болсон гэж үзэж магадгүй юм. Үүний тулд бид командыг ашигладаг XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Энэ командыг ашигласнаар бид эзэмшигчийг {consumer} болгон өөрчилснөөр хараахан боловсруулагдаагүй "гадаадын" мессежийг хүлээн авах боломжтой. Гэсэн хэдий ч бид хамгийн бага сул зогсолтын хугацааг {min-idle-time}-аар хангах боломжтой. Энэ нь хоёр үйлчлүүлэгч ижил мессежийн эзэмшигчийг нэгэн зэрэг солихыг оролдох нөхцөл байдлаас зайлсхийхэд тусалдаг:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Эхний үйлчлүүлэгч сул зогсолтыг дахин тохируулж, хүргэлтийн тоолуурыг нэмэгдүүлнэ. Тиймээс хоёр дахь үйлчлүүлэгч үүнийг хүсэх боломжгүй болно.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Энэ мессежийг Алис амжилттай нэхэмжилсэн бөгөөд одоо мессежийг боловсруулж, хүлээн зөвшөөрөх боломжтой.

Дээрх жишээнээс харахад амжилттай хүсэлт нь мессежийн агуулгыг өөрөө буцааж өгч байгааг харж болно. Гэсэн хэдий ч энэ нь шаардлагагүй юм. JUSTID сонголтыг зөвхөн мессежийн ID-г буцаахад ашиглаж болно. Хэрэв та мессежийн нарийн ширийнийг сонирхохгүй, системийн гүйцэтгэлийг нэмэгдүүлэхийг хүсч байвал энэ нь ашигтай.

Хүргэлтийн тоолуур

Гаралт дээр харж буй тоолуур XPENDING нь мессеж бүрийн хүргэлтийн тоо юм. Ийм тоолуурыг хоёр аргаар нэмэгдүүлнэ: мессежийг дамжуулан амжилттай хүссэн тохиолдолд XCLAIM эсвэл дуудлага ашиглах үед XREADGROUP.

Зарим мессежийг олон удаа хүргэх нь хэвийн үзэгдэл юм. Хамгийн гол нь бүх мессежийг эцэст нь боловсруулдаг. Заримдаа мессеж өөрөө эвдэрсэн эсвэл мессеж боловсруулалт нь зохицуулагчийн кодонд алдаа гаргадаг тул мессежийг боловсруулахад асуудал гардаг. Энэ тохиолдолд хэн ч энэ мессежийг боловсруулах боломжгүй болж магадгүй юм. Бид хүргэх оролдлогын тоолууртай тул ийм нөхцөл байдлыг илрүүлэхийн тулд энэ тоолуурыг ашиглаж болно. Тиймээс, хүргэлтийн тоо таны заасан дээд хэмжээнд хүрмэгц ийм мессежийг өөр хэлхээнд байрлуулж, системийн администраторт мэдэгдэл илгээх нь илүү ухаалаг хэрэг болно.

Thread State

баг XINFO thread болон түүний бүлгүүдийн талаар янз бүрийн мэдээлэл хүсэхэд ашигладаг. Жишээлбэл, үндсэн тушаал дараах байдалтай байна.

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Дээрх тушаал нь заасан урсгалын талаарх ерөнхий мэдээллийг харуулдаг. Одоо арай илүү төвөгтэй жишээ:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Дээрх тушаал нь заасан хэлхээний бүх бүлгийн ерөнхий мэдээллийг харуулдаг

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Дээрх тушаал нь заасан урсгал болон бүлгийн бүх захиалагчдын мэдээллийг харуулдаг.
Хэрэв та командын синтаксийг мартсан бол командаасаа тусламж хүснэ үү:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Дамжуулалтын хэмжээ хязгаар

Олон программууд өгөгдлийг урсгал руу үүрд цуглуулахыг хүсдэггүй. Нэг хэлхээнд зөвшөөрөгдсөн хамгийн их мессеж байх нь ихэвчлэн ашигтай байдаг. Бусад тохиолдолд заасан утсанд хүрсэн үед бүх мессежийг хэлхээнээс өөр хадгалалт руу шилжүүлэх нь ашигтай байдаг. Та тушаалын MAXLEN параметрийг ашиглан урсгалын хэмжээг хязгаарлаж болно XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLEN-ийг ашиглах үед хуучин бичлэгүүд заасан уртад хүрэхэд автоматаар устдаг тул урсгал нь тогтмол хэмжээтэй байдаг. Гэсэн хэдий ч, энэ тохиолдолд тайрах нь Redis санах ойд хамгийн үр дүнтэй байдлаар хийгддэггүй. Та нөхцөл байдлыг дараах байдлаар сайжруулж болно.

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Дээрх жишээн дээрх ~ аргумент нь дамжуулалтын уртыг тодорхой утгаар хязгаарлах шаардлагагүй гэсэн үг юм. Бидний жишээнд энэ нь 1000-аас их буюу тэнцүү ямар ч тоо байж болно (жишээлбэл, 1000, 1010 эсвэл 1030). Бид шууд дамжуулалтдаа дор хаяж 1000 бичлэг хадгалахыг хүсч байгаагаа тодорхой зааж өгсөн. Энэ нь Redis дотор санах ойн менежментийг илүү үр дүнтэй болгодог.

Мөн тусдаа баг байдаг XTRIM, энэ нь ижил зүйлийг хийдэг:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Байнгын хадгалалт, хуулбар

Redis Stream-ийг асинхроноор боол зангилаа руу хуулбарлаж, AOF (бүх өгөгдлийн агшин зураг) болон RDB (бүх бичих үйлдлийн бүртгэл) зэрэг файлуудад хадгалдаг. Хэрэглэгчийн бүлгүүдийн төлөвийг хуулбарлахыг мөн дэмждэг. Тиймээс, хэрэв мессеж мастер зангилаа дээр "хүлээгдэж буй" төлөвт байгаа бол боол зангилаанууд дээр энэ мессеж ижил статустай байх болно.

Дамжуулалтаас бие даасан элементүүдийг устгах

Зурвас устгах тусгай тушаал байдаг XDEL. Тушаал нь хэлхээний нэрийг, дараа нь устгах мессежийн ID-г авна.

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Энэ командыг ашиглахдаа та бодит санах ойг шууд гаргахгүй гэдгийг анхаарч үзэх хэрэгтэй.

Тэг урттай урсгал

Урсгалууд болон бусад Redis өгөгдлийн бүтцүүдийн хоорондох ялгаа нь бусад өгөгдлийн бүтцүүд дотор нь элементгүй болсон үед гаж нөлөөний хувьд өгөгдлийн бүтэц өөрөө санах ойноос устах болно. Жишээлбэл, ZREM дуудлага нь сүүлчийн элементийг устгах үед эрэмбэлэгдсэн багц бүрэн устгагдах болно. Үүний оронд утаснууд дотор ямар ч элементгүй байсан ч санах ойд үлдэхийг зөвшөөрдөг.

дүгнэлт

Redis Stream нь мессеж брокер, мессежийн дараалал, нэгдсэн бүртгэл, түүхийг хадгалах чатын системийг бий болгоход тохиромжтой.

Би нэг удаа хэлсэнчлэн Никлаус Вирт, програмууд нь алгоритмууд болон өгөгдлийн бүтэц бөгөөд Redis аль хэдийн танд хоёуланг нь өгдөг.

Эх сурвалж: www.habr.com

сэтгэгдэл нэмэх