Redis Stream - xabar almashish tizimlarining ishonchliligi va kengayishi

Redis Stream - xabar almashish tizimlarining ishonchliligi va kengayishi

Redis Stream - bu Redis-da 5.0 versiyasi bilan taqdim etilgan yangi mavhum ma'lumotlar turi
Kontseptual ravishda, Redis Stream - bu ro'yxat bo'lib, unga yozuvlar qo'shishingiz mumkin. Har bir yozuv o'ziga xos identifikatorga ega. Odatiy bo'lib, ID avtomatik ravishda yaratiladi va vaqt tamg'asini o'z ichiga oladi. Shunday qilib, siz vaqt o'tishi bilan yozuvlar diapazonini so'rashingiz yoki oqimga kelganda yangi ma'lumotlarni olishingiz mumkin, xuddi Unix "tail -f" buyrug'i jurnal faylini o'qiydi va yangi ma'lumotlarni kutayotganda qotib qoladi. E'tibor bering, bir nechta mijozlar bir vaqtning o'zida ipni tinglashlari mumkin, xuddi ko'plab "tail -f" jarayonlari bir vaqtning o'zida bir-biriga zid bo'lmagan holda faylni o'qiy oladi.

Yangi ma'lumotlar turining barcha afzalliklarini tushunish uchun keling, Redis Stream funksiyasini qisman takrorlaydigan uzoq vaqtdan beri mavjud bo'lgan Redis tuzilmalarini qisqacha ko'rib chiqaylik.

Redis PUB/SUB

Redis Pub/Sub - bu kalit-qiymat do'koningizga allaqachon o'rnatilgan oddiy xabar almashish tizimi. Biroq, soddalik qimmatga tushadi:

  • Agar noshir biron sababga ko'ra muvaffaqiyatsizlikka uchrasa, u barcha obunachilarni yo'qotadi
  • Nashriyot o'zining barcha obunachilarining aniq manzilini bilishi kerak
  • Agar ma'lumotlar qayta ishlanganidan tezroq chop etilsa, nashriyot o'z obunachilarini ish bilan ortiqcha yuklashi mumkin
  • Xabar nashr etilgandan so'ng darhol nashriyot buferidan o'chiriladi, u qancha obunachiga etkazilganligi va ular ushbu xabarni qanchalik tez qayta ishlashga muvaffaq bo'lganidan qat'iy nazar.
  • Barcha abonentlar xabarni bir vaqtning o'zida oladilar. Abonentlarning o'zlari qandaydir tarzda bir xil xabarni qayta ishlash tartibini kelishib olishlari kerak.
  • Abonent xabarni muvaffaqiyatli qayta ishlaganligini tasdiqlash uchun o'rnatilgan mexanizm yo'q. Agar abonent xabarni qabul qilsa va ishlov berish paytida ishdan chiqsa, nashriyot bu haqda bilmaydi.

Redis ro'yxati

Redis List - bu o'qish buyruqlarini bloklashni qo'llab-quvvatlaydigan ma'lumotlar tuzilmasi. Siz xabarlarni roʻyxatning boshidan yoki oxiridan qoʻshishingiz va oʻqishingiz mumkin. Ushbu tuzilishga asoslanib, siz taqsimlangan tizimingiz uchun yaxshi stek yoki navbat yaratishingiz mumkin va aksariyat hollarda bu etarli bo'ladi. Redis Pub/Sub-dan asosiy farqlar:

  • Xabar bitta mijozga yetkaziladi. O'qish bloklangan birinchi mijoz ma'lumotlarni birinchi bo'lib oladi.
  • Klint har bir xabar uchun o'qish operatsiyasini o'zi boshlashi kerak. Ro'yxat mijozlar haqida hech narsa bilmaydi.
  • Xabarlar kimdir ularni o'qimaguncha yoki aniq o'chirmaguncha saqlanadi. Agar siz Redis serverini diskka ma'lumotlarni o'chirish uchun sozlasangiz, tizimning ishonchliligi keskin oshadi.

Streamga kirish

Oqimga yozuv qo‘shish

komanda XADD oqimga yangi yozuv qo'shadi. Yozuv shunchaki satr emas, u bir yoki bir nechta kalit-qiymat juftliklaridan iborat. Shunday qilib, har bir yozuv allaqachon tuzilgan va CSV faylining tuzilishiga o'xshaydi.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Yuqoridagi misolda biz oqimga "mystream" (kalit) nomi bilan ikkita maydon qo'shamiz: mos ravishda "1234" va "19.8" qiymatlari bilan "sensor-id" va "harorat". Ikkinchi argument sifatida buyruq yozuvga tayinlanadigan identifikatorni oladi - bu identifikator oqimdagi har bir yozuvni noyob tarzda aniqlaydi. Biroq, bu holatda biz * ga o'tdik, chunki Redis biz uchun yangi identifikator yaratishini xohlaymiz. Har bir yangi ID ortadi. Shuning uchun, har bir yangi yozuv oldingi yozuvlarga nisbatan yuqoriroq identifikatorga ega bo'ladi.

Identifikator formati

Buyruq tomonidan qaytarilgan kirish identifikatori XADD, ikki qismdan iborat:

{millisecondsTime}-{sequenceNumber}

millisekundlar vaqt — Unix vaqti millisekundlarda (Redis server vaqti). Biroq, agar joriy vaqt oldingi yozuv vaqti bilan bir xil yoki undan kamroq bo'lsa, avvalgi yozuvning vaqt tamg'asi ishlatiladi. Shuning uchun, agar server vaqti o'z vaqtida orqaga qaytsa, yangi identifikator baribir o'sish xususiyatini saqlab qoladi.

ketma-ketlik raqami bir xil millisekundda yaratilgan yozuvlar uchun ishlatiladi. ketma-ketlik raqami oldingi yozuvga nisbatan 1 ga oshiriladi. Chunki ketma-ketlik raqami hajmi 64 bit bo'lsa, amalda siz bir millisekund ichida yaratilishi mumkin bo'lgan yozuvlar soni bo'yicha cheklovga duch kelmasligingiz kerak.

Bunday identifikatorlarning formati bir qarashda g'alati tuyulishi mumkin. Ishonchsiz o'quvchi nima uchun vaqt identifikatorning bir qismi ekanligiga hayron bo'lishi mumkin. Buning sababi shundaki, Redis oqimlari ID bo'yicha diapazon so'rovlarini qo'llab-quvvatlaydi. Identifikator yozuv yaratilgan vaqt bilan bog'langanligi sababli, bu vaqt diapazonlarini so'rash imkonini beradi. Biz buyruqni ko'rib chiqqach, aniq misolni ko'rib chiqamiz XRANGE.

Agar biron sababga ko'ra foydalanuvchi, masalan, qandaydir tashqi tizim bilan bog'langan o'z identifikatorini ko'rsatishi kerak bo'lsa, biz uni buyruqqa o'tkazishimiz mumkin. XADD quyida ko'rsatilganidek * o'rniga:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

E'tibor bering, bu holda siz ID o'sishini o'zingiz kuzatishingiz kerak. Bizning misolimizda minimal identifikator "0-1" dir, shuning uchun buyruq "0-1" ga teng yoki undan kichik bo'lgan boshqa identifikatorni qabul qilmaydi.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Har bir oqim uchun yozuvlar soni

Oqimdagi yozuvlar sonini oddiygina buyruq yordamida olish mumkin XLEN. Bizning misolimiz uchun bu buyruq quyidagi qiymatni qaytaradi:

> XLEN somestream
(integer) 2

Diapazon so'rovlari - XRANGE va XREVRANGE

Ma'lumotlarni diapazon bo'yicha so'rash uchun biz ikkita identifikatorni ko'rsatishimiz kerak - diapazonning boshi va oxiri. Qaytarilgan diapazon barcha elementlarni, jumladan, chegaralarni o'z ichiga oladi. Shuningdek, ikkita maxsus identifikator “-” va “+” mavjud bo'lib, ular mos ravishda oqimdagi eng kichik (birinchi yozuv) va eng katta (oxirgi yozuv) identifikatorini bildiradi. Quyidagi misolda barcha oqim yozuvlari keltirilgan.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Har bir qaytarilgan yozuv ikki elementdan iborat massivdir: identifikator va kalit-qiymat juftliklari ro'yxati. Biz allaqachon rekord identifikatorlar vaqt bilan bog'liqligini aytdik. Shuning uchun biz ma'lum bir vaqt oralig'ini so'rashimiz mumkin. Biroq, biz so'rovda to'liq identifikatorni emas, balki faqat Unix vaqtini ko'rsatishimiz mumkin, bu bilan bog'liq bo'lgan qismni o'tkazib yuboramiz. ketma-ketlik raqami. Identifikatorning o'tkazib yuborilgan qismi diapazon boshida avtomatik ravishda nolga va diapazon oxirida maksimal mumkin bo'lgan qiymatga o'rnatiladi. Quyida ikki millisekundlik diapazonni qanday so'rashingiz mumkinligiga misol keltirilgan.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Ushbu diapazonda bizda faqat bitta yozuv bor, ammo haqiqiy ma'lumotlar to'plamlarida qaytarilgan natija juda katta bo'lishi mumkin. Shu sababdan XRANGE COUNT variantni qo‘llab-quvvatlaydi. Miqdorni ko'rsatib, biz shunchaki birinchi N yozuvni olishimiz mumkin. Agar keyingi N yozuvlarni (sahifalash) olishimiz kerak bo'lsa, biz oxirgi qabul qilingan identifikatordan foydalanishimiz mumkin, uni oshiring ketma-ketlik raqami bir marta va yana so'rang. Buni quyidagi misolda ko'rib chiqamiz. bilan 10 ta element qo'shishni boshlaymiz XADD (Agar mystream allaqachon 10 ta element bilan to'ldirilgan bo'lsa). Har bir buyruq uchun 2 ta elementni olish uchun iteratsiyani boshlash uchun biz to'liq diapazondan boshlaymiz, lekin COUNT 2 ga teng.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Keyingi ikkita element bilan takrorlashni davom ettirish uchun biz oxirgi qabul qilingan identifikatorni tanlashimiz kerak, ya'ni 1519073279157-0 va 1 ni qo'shishimiz kerak. ketma-ketlik raqami.
Olingan identifikator, bu holda 1519073279157-1, endi keyingi qo'ng'iroq uchun diapazonning yangi boshlanish argumenti sifatida ishlatilishi mumkin. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Va hokazo. Chunki murakkablik XRANGE qidirish uchun O(log(N)) va keyin M elementni qaytarish uchun O(M) bo'lsa, har bir iteratsiya bosqichi tezdir. Shunday qilib, foydalanish XRANGE oqimlarni samarali tarzda takrorlash mumkin.

komanda XREVRANGE ekvivalenti hisoblanadi XRANGE, lekin elementlarni teskari tartibda qaytaradi:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Iltimos, buyruqni unutmang XREVRANGE diapazon argumentlarini teskari tartibda boshlaydi va to'xtatadi.

XREAD yordamida yangi yozuvlarni o'qish

Ko'pincha vazifa oqimga obuna bo'lish va faqat yangi xabarlarni olishdan iborat. Ushbu kontseptsiya Redis Pub/Subga o'xshab ko'rinishi yoki Redis ro'yxatini bloklashi mumkin, ammo Redis Streamdan qanday foydalanishda tub farqlar mavjud:

  1. Har bir yangi xabar sukut bo'yicha har bir abonentga yetkaziladi. Bu xatti-harakat Redis ro'yxatini blokirovka qilishdan farq qiladi, bu erda yangi xabar faqat bitta obunachi tomonidan o'qiladi.
  2. Redis Pub/Sub-da barcha xabarlar unutiladi va hech qachon saqlanib qolmaydi, Stream-da barcha xabarlar cheksiz saqlanadi (agar mijoz aniq o'chirishga sabab bo'lmasa).
  3. Redis Stream sizga bitta oqim ichidagi xabarlarga kirishni farqlash imkonini beradi. Muayyan abonent faqat shaxsiy xabarlar tarixini ko'rishi mumkin.

Buyruq yordamida siz mavzuga obuna bo'lishingiz va yangi xabarlarni olishingiz mumkin XREAD. Bu biroz murakkabroq XRANGE, shuning uchun biz avval oddiyroq misollardan boshlaymiz.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Yuqoridagi misol bloklanmaydigan shaklni ko'rsatadi XREAD. COUNT varianti ixtiyoriy ekanligini unutmang. Aslida, yagona talab qilinadigan buyruq varianti oqimlar ro'yxatini mos keladigan maksimal identifikator bilan belgilaydigan STREAMS opsiyasidir. Biz "STREAMS mystream 0" deb yozdik - biz "0-0" dan katta identifikator bilan mystream oqimining barcha yozuvlarini olishni xohlaymiz. Misoldan ko'rinib turibdiki, buyruq ip nomini qaytaradi, chunki biz bir vaqtning o'zida bir nechta mavzularga obuna bo'lishimiz mumkin. Masalan, "STREAMS mystream otherstream 0 0" deb yozishimiz mumkin. Esda tutingki, STREAMS opsiyasidan so‘ng avval barcha kerakli oqimlarning nomlarini va shundan keyingina identifikatorlar ro‘yxatini taqdim etishimiz kerak.

Ushbu oddiy shaklda buyruq bilan solishtirganda maxsus hech narsa qilmaydi XRANGE. Biroq, qiziq tomoni shundaki, biz osongina burila olamiz XREAD BLOCK argumentini ko'rsatib, blokirovka qilish buyrug'iga:

> XREAD BLOCK 0 STREAMS mystream $

Yuqoridagi misolda yangi BLOCK opsiyasi 0 millisekundlik kutish vaqti bilan ko'rsatilgan (bu cheksiz kutishni anglatadi). Bundan tashqari, mystream oqimi uchun odatiy identifikatorni o'tkazish o'rniga, maxsus $ identifikatori o'tkazildi. Bu maxsus identifikator shuni anglatadi XREAD identifikator sifatida mystreamdagi maksimal identifikatordan foydalanishi kerak. Shunday qilib, biz faqat tinglashni boshlaganimizdan boshlab yangi xabarlarni olamiz. Qaysidir ma'noda bu Unix "tail -f" buyrug'iga o'xshaydi.

Esda tutingki, BLOCK opsiyasidan foydalanganda biz maxsus $ identifikatoridan foydalanishimiz shart emas. Biz oqimda mavjud bo'lgan har qanday identifikatordan foydalanishimiz mumkin. Agar jamoa bizning so'rovimizni blokirovka qilmasdan darhol xizmat qila olsa, u buni amalga oshiradi, aks holda u bloklaydi.

Bloklash XREAD bir vaqtning o'zida bir nechta mavzularni tinglashi mumkin, shunchaki ularning nomlarini ko'rsatishingiz kerak. Bunday holda, buyruq ma'lumotlarni qabul qilgan birinchi oqimning yozuvini qaytaradi. Berilgan to'plam uchun bloklangan birinchi abonent birinchi navbatda ma'lumotlarni oladi.

Iste'molchilar guruhlari

Muayyan vazifalarda biz abonentning bitta mavzudagi xabarlarga kirishini cheklashni xohlaymiz. Bu foydali bo'lishi mumkin bo'lgan misol, ishchilar bilan xabarlar navbati bo'lib, u ipdan turli xabarlarni qabul qiladi va xabarni qayta ishlashni kengaytirish imkonini beradi.

Agar bizda uchta obunachi C1, C2, C3 va 1, 2, 3, 4, 5, 6, 7 xabarlarini o'z ichiga olgan mavzu borligini tasavvur qilsak, xabarlar quyidagi diagrammadagi kabi taqdim etiladi:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Ushbu effektga erishish uchun Redis Stream Consumer Group deb nomlangan kontseptsiyadan foydalanadi. Ushbu kontseptsiya oqimdan ma'lumotlarni oladigan, lekin aslida bir guruh ichidagi bir nechta abonentlar tomonidan xizmat ko'rsatadigan soxta obunachiga o'xshaydi va ma'lum kafolatlarni beradi:

  1. Har bir xabar guruh ichidagi boshqa abonentga yetkaziladi.
  2. Guruh ichida abonentlar katta-kichik harflar qatori bo'lgan nomlari bilan aniqlanadi. Agar abonent guruhdan vaqtincha chiqib ketsa, u o'zining noyob nomidan foydalanib guruhga qayta tiklanishi mumkin.
  3. Har bir iste'molchi guruhi "birinchi o'qilmagan xabar" tushunchasiga amal qiladi. Abonent yangi xabarlarni so'raganda, u faqat guruh ichidagi birorta abonentga ilgari hech qachon yetkazilmagan xabarlarni qabul qilishi mumkin.
  4. Xabarning abonent tomonidan muvaffaqiyatli qayta ishlanganligini aniq tasdiqlash buyrug'i mavjud. Ushbu buyruq chaqirilmaguncha, so'ralgan xabar "kutishda" holatida qoladi.
  5. Iste'molchilar guruhida har bir abonent unga etkazilgan, lekin hali qayta ishlanmagan ("kutilayotgan" holatida) xabarlar tarixini so'rashi mumkin.

Qaysidir ma'noda guruhning holatini quyidagicha ifodalash mumkin:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Endi iste'molchilar guruhi uchun asosiy buyruqlar bilan tanishish vaqti keldi, xususan:

  • XGROUP guruhlarni yaratish, yo'q qilish va boshqarish uchun ishlatiladi
  • XREADGROUP guruh orqali oqim o'qish uchun ishlatiladi
  • XACK - bu buyruq abonentga xabarni muvaffaqiyatli qayta ishlangan deb belgilash imkonini beradi

Iste'molchilar guruhini yaratish

Aytaylik, mystream allaqachon mavjud. Keyin guruh yaratish buyrug'i quyidagicha ko'rinadi:

> XGROUP CREATE mystream mygroup $
OK

Guruh yaratishda biz identifikatorni o'tkazishimiz kerak, undan boshlab guruh xabarlarni oladi. Agar biz barcha yangi xabarlarni olishni istasak, u holda maxsus $ identifikatoridan foydalanishimiz mumkin (yuqoridagi misolimizda bo'lgani kabi). Agar siz maxsus identifikator o'rniga 0 ni belgilasangiz, mavzudagi barcha xabarlar guruh uchun mavjud bo'ladi.

Guruh yaratilgan bo'lsa, biz darhol buyruq yordamida xabarlarni o'qishni boshlashimiz mumkin XREADGROUP. Bu buyruq juda o'xshash XREAD va ixtiyoriy BLOCK opsiyasini qo'llab-quvvatlaydi. Biroq, har doim ikkita argument bilan ko'rsatilishi kerak bo'lgan GROUP opsiyasi mavjud: guruh nomi va obunachi nomi. COUNT opsiyasi ham qo‘llab-quvvatlanadi.

Mavzuni o'qishdan oldin, keling, u erga bir nechta xabarlarni joylashtiramiz:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Keling, ushbu oqimni guruh orqali o'qishga harakat qilaylik:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Yuqoridagi buyruq so'zma-so'z quyidagicha o'qiladi:

"Men, abonent Elis, mening guruhim a'zosi, mystreamdan ilgari hech kimga yetkazilmagan bitta xabarni o'qishni xohlayman."

Har safar abonent guruhda operatsiyani amalga oshirganda, u o'z nomini ko'rsatishi kerak, bu esa o'zini guruh ichida identifikatsiyalashi kerak. Yuqoridagi buyruqda yana bir juda muhim tafsilot mavjud - maxsus identifikator ">". Ushbu maxsus identifikator xabarlarni filtrlaydi va faqat ilgari hech qachon yetkazilmaganlarini qoldiradi.

Bundan tashqari, alohida holatlarda siz 0 yoki boshqa har qanday haqiqiy identifikator kabi haqiqiy identifikatorni belgilashingiz mumkin. Bu holda buyruq XREADGROUP ko'rsatilgan abonentga (Alice) etkazilgan, ammo buyruq yordamida hali tasdiqlanmagan "kutishda" holatiga ega xabarlar tarixini sizga qaytaradi. XACK.

Biz ushbu xatti-harakatni tanlovsiz darhol 0 identifikatorini ko'rsatish orqali sinab ko'rishimiz mumkin COUNT. Biz shunchaki kutilayotgan bitta xabarni, ya'ni olma xabarini ko'ramiz:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Biroq, agar biz xabar muvaffaqiyatli qayta ishlanganligini tasdiqlasak, u endi ko'rsatilmaydi:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Endi nimadir o'qish navbati Bobga keldi:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Mygroup a'zosi Bob ikkitadan ko'p bo'lmagan xabar so'radi. Buyruq faqat ">" maxsus identifikatori tufayli yetkazilmagan xabarlar haqida xabar beradi. Ko'rib turganingizdek, "olma" xabari ko'rsatilmaydi, chunki u allaqachon Elisga yetkazilgan, shuning uchun Bob "apelsin" va "qulupnay" ni oladi.

Shunday qilib, Elis, Bob va guruhning boshqa abonentlari bir xil oqimdan turli xabarlarni o'qishlari mumkin. Shuningdek, ular qayta ishlanmagan xabarlar tarixini o'qishlari yoki xabarlarni qayta ishlangan deb belgilashlari mumkin.

Yodda tutish kerak bo'lgan bir nechta narsa bor:

  • Abonent xabarni buyruq deb hisoblashi bilanoq XREADGROUP, bu xabar "kutishda" holatiga o'tadi va o'sha aniq abonentga tayinlanadi. Guruhning boshqa obunachilari bu xabarni o‘qiy olmaydi.
  • Obunachilar birinchi eslatilganda avtomatik ravishda yaratiladi, ularni aniq yaratishga hojat yo'q.
  • Yordamida XREADGROUP siz bir vaqtning o'zida bir nechta turli mavzulardagi xabarlarni o'qishingiz mumkin, ammo buning ishlashi uchun avval har bir mavzu uchun bir xil nomdagi guruhlar yaratishingiz kerak. XGROUP

Muvaffaqiyatsizlikdan keyin tiklanish

Abonent nosozlikdan xalos bo'lishi va "kutishda" holati bilan o'z xabarlari ro'yxatini qayta o'qishi mumkin. Biroq, haqiqiy dunyoda abonentlar oxir-oqibat muvaffaqiyatsiz bo'lishi mumkin. Agar abonent xatolikdan keyin tiklana olmasa, abonentning yopishib qolgan xabarlari bilan nima sodir bo'ladi?
Consumer Group aynan shunday holatlar uchun – xabarlar egasini o‘zgartirish kerak bo‘lganda foydalaniladigan funksiyani taklif etadi.

Siz qilishingiz kerak bo'lgan birinchi narsa - buyruqni chaqirish XPENDING, bu guruhdagi barcha xabarlarni "kutishda" holati bilan ko'rsatadi. Eng oddiy shaklda buyruq faqat ikkita argument bilan chaqiriladi: ip nomi va guruh nomi:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Jamoa butun guruh va har bir abonent uchun qayta ishlanmagan xabarlar sonini ko'rsatdi. Bizda faqat ikkita ajoyib xabar bo'lgan Bob bor, chunki Elis so'ragan yagona xabar tasdiqlandi XACK.

Biz koʻproq argumentlar yordamida qoʻshimcha maʼlumot soʻrashimiz mumkin:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - identifikatorlar diapazoni (siz “-” va “+” dan foydalanishingiz mumkin)
{count} - yetkazib berishga urinishlar soni
{consumer-name} - guruh nomi

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Endi bizda har bir xabar uchun tafsilotlar mavjud: ID, abonent nomi, millisekundlarda bo'sh turish vaqti va nihoyat etkazib berish urinishlari soni. Bizda Bobdan ikkita xabar bor va ular 74170458 millisekund, taxminan 20 soat davomida ishlamay qoldi.

E'tibor bering, hech kim bizga xabar mazmunini shunchaki foydalanish orqali tekshirishimizga to'sqinlik qilmayapti XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Argumentlarda bir xil identifikatorni ikki marta takrorlashimiz kifoya. Endi bizda qandaydir fikr bor, Elis 20 soatlik tanaffusdan keyin Bob tuzalmasligiga qaror qilishi mumkin va bu xabarlarni so'roq qilish va ularni Bob uchun qayta ishlashni davom ettirish vaqti keldi. Buning uchun biz buyruqdan foydalanamiz XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Ushbu buyruq yordamida biz egasini {consumer} ga o'zgartirish orqali hali qayta ishlanmagan "xorijiy" xabarni olishimiz mumkin. Biroq, biz minimal bo'sh vaqtni ham ta'minlashimiz mumkin {min-idle-time}. Bu ikki mijoz bir vaqtning o'zida bir xil xabarlar egasini o'zgartirishga harakat qiladigan vaziyatdan qochishga yordam beradi:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Birinchi mijoz ishlamay qolish vaqtini tiklaydi va etkazib berish hisoblagichini oshiradi. Shunday qilib, ikkinchi mijoz buni talab qila olmaydi.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Xabarni Elis muvaffaqiyatli da'vo qildi, endi u xabarni qayta ishlay oladi va uni tan oladi.

Yuqoridagi misoldan, muvaffaqiyatli so'rov xabarning o'zi mazmunini qaytarishini ko'rishingiz mumkin. Biroq, bu kerak emas. JUSTID opsiyasi faqat xabar identifikatorlarini qaytarish uchun ishlatilishi mumkin. Agar siz xabarning tafsilotlari bilan qiziqmasangiz va tizim ish faoliyatini oshirishni xohlasangiz foydali bo'ladi.

Yetkazib berish hisoblagichi

Chiqishda ko'rgan hisoblagich XPENDING har bir xabarni yetkazib berish soni. Bunday hisoblagich ikki yo'l bilan oshiriladi: xabar orqali muvaffaqiyatli so'ralganda XCLAIM yoki qo'ng'iroq ishlatilganda XREADGROUP.

Ba'zi xabarlar bir necha marta yetkazilishi odatiy holdir. Asosiysi, barcha xabarlar oxir-oqibat qayta ishlanadi. Ba'zida xabarni qayta ishlashda muammolar yuzaga keladi, chunki xabarning o'zi buzilgan yoki xabarni qayta ishlash ishlov beruvchi kodida xatolikka olib keladi. Bunday holda, hech kim ushbu xabarni qayta ishlay olmasligi mumkin. Bizda yetkazib berishga urinish hisoblagichi borligi sababli, biz bunday vaziyatlarni aniqlash uchun ushbu hisoblagichdan foydalanishimiz mumkin. Shuning uchun, etkazib berish soni siz ko'rsatgan yuqori raqamga yetganda, bunday xabarni boshqa mavzuga qo'yish va tizim ma'muriga bildirishnoma yuborish oqilona bo'lar edi.

Mavzu holati

komanda XINFO ip va uning guruhlari haqida turli ma'lumotlarni so'rash uchun ishlatiladi. Masalan, asosiy buyruq quyidagicha ko'rinadi:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Yuqoridagi buyruq belgilangan oqim haqida umumiy ma'lumotni ko'rsatadi. Endi biroz murakkabroq misol:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Yuqoridagi buyruq belgilangan ipning barcha guruhlari uchun umumiy ma'lumotlarni ko'rsatadi

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Yuqoridagi buyruq belgilangan oqim va guruhning barcha abonentlari uchun ma'lumotlarni ko'rsatadi.
Agar buyruq sintaksisini unutib qo'ysangiz, buyruqning o'zidan yordam so'rang:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Oqim hajmi chegarasi

Ko'pgina ilovalar ma'lumotlarni oqimga abadiy yig'ishni xohlamaydi. Ko'pincha har bir mavzu uchun ruxsat etilgan xabarlarning maksimal soniga ega bo'lish foydalidir. Boshqa hollarda, belgilangan ip hajmiga erishilganda, barcha xabarlarni ipdan boshqa doimiy do'konga ko'chirish foydali bo'ladi. Buyruqdagi MAXLEN parametri yordamida oqim hajmini cheklashingiz mumkin XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLEN dan foydalanganda, eski yozuvlar belgilangan uzunlikka yetganda avtomatik ravishda o'chiriladi, shuning uchun oqim doimiy o'lchamga ega. Biroq, bu holatda kesish Redis xotirasida eng samarali tarzda amalga oshirilmaydi. Siz vaziyatni quyidagicha yaxshilashingiz mumkin:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Yuqoridagi misoldagi ~ argumenti biz oqim uzunligini ma'lum bir qiymat bilan cheklashimiz shart emasligini anglatadi. Bizning misolimizda bu 1000 dan katta yoki unga teng har qanday raqam bo'lishi mumkin (masalan, 1000, 1010 yoki 1030). Biz shunchaki strimimiz kamida 1000 ta yozuvni saqlashini xohlayotganimizni aniq belgilab oldik. Bu Redis ichida xotira boshqaruvini ancha samarali qiladi.

Bundan tashqari, alohida jamoa mavjud XTRIM, xuddi shu narsani bajaradi:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Doimiy saqlash va takrorlash

Redis Stream asinxron ravishda tobe tugunlarga takrorlanadi va AOF (barcha ma'lumotlarning surati) va RDB (barcha yozish operatsiyalari jurnali) kabi fayllarga saqlanadi. Iste'molchi guruhlari holatini takrorlash ham qo'llab-quvvatlanadi. Shuning uchun, agar xabar asosiy tugunda "kutishda" holatida bo'lsa, u holda tobe tugunlarda bu xabar bir xil maqomga ega bo'ladi.

Oqimdan alohida elementlarni olib tashlash

Xabarlarni o'chirish uchun maxsus buyruq mavjud XDEL. Buyruq o'chirilishi kerak bo'lgan xabar identifikatorlari bilan birga mavzu nomini oladi:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Ushbu buyruqdan foydalanganda, haqiqiy xotira darhol bo'shatilmasligini hisobga olishingiz kerak.

Nol uzunlikdagi oqimlar

Oqimlar va boshqa Redis ma'lumotlar tuzilmalari o'rtasidagi farq shundaki, agar boshqa ma'lumotlar tuzilmalari tarkibida elementlar bo'lmasa, yon ta'sir sifatida ma'lumotlar strukturasining o'zi xotiradan o'chiriladi. Shunday qilib, masalan, ZREM chaqiruvi oxirgi elementni olib tashlaganda tartiblangan to'plam butunlay o'chiriladi. Buning o'rniga, iplar ichida hech qanday element bo'lmasa ham xotirada qolishiga ruxsat beriladi.

xulosa

Redis Stream xabarlar brokerlari, xabarlar navbatlari, birlashtirilgan jurnallar va tarixni saqlaydigan chat tizimlarini yaratish uchun ideal.

Bir marta aytganimdek Niklaus Wirth, dasturlar algoritmlar va ma'lumotlar tuzilmalaridir va Redis sizga ikkalasini ham beradi.

Manba: www.habr.com

a Izoh qo'shish