Redis Stream - mesajlaşma sistemlərinizin etibarlılığı və miqyası

Redis Stream - mesajlaşma sistemlərinizin etibarlılığı və miqyası

Redis Stream 5.0 versiyası ilə Redis-də təqdim edilən yeni mücərrəd məlumat növüdür
Konseptual olaraq, Redis Stream qeydlər əlavə edə biləcəyiniz Siyahıdır. Hər bir girişin unikal identifikatoru var. Varsayılan olaraq, ID avtomatik olaraq yaradılır və vaxt möhürü daxildir. Buna görə də, siz vaxt keçdikcə qeydlər diapazonunu sorğulaya və ya axına daxil olan kimi yeni məlumatları qəbul edə bilərsiniz, necə ki, Unix "tail -f" əmri jurnal faylını oxuyur və yeni məlumatları gözləyərkən donur. Nəzərə alın ki, bir çox müştəri eyni vaxtda mövzunu dinləyə bilər, necə ki, bir çox "tail -f" prosesləri bir-biri ilə ziddiyyət təşkil etmədən faylı eyni vaxtda oxuya bilər.

Yeni məlumat növünün bütün üstünlüklərini başa düşmək üçün gəlin Redis Stream-in funksionallığını qismən təkrarlayan çoxdan mövcud olan Redis strukturlarına qısaca nəzər salaq.

Redis PUB/SUB

Redis Pub/Sub artıq əsas dəyər mağazanıza daxil edilmiş sadə mesajlaşma sistemidir. Bununla belə, sadəlik baha başa gəlir:

  • Nəşriyyatçı nədənsə uğursuz olarsa, o, bütün abunəçilərini itirir
  • Nəşriyyatçı bütün abunəçilərinin dəqiq ünvanını bilməlidir
  • Məlumat emal olunduğundan daha sürətli dərc olunarsa, naşir öz abunəçilərini işlə həddən artıq yükləyə bilər
  • Mesajın neçə abunəçiyə çatdırılmasından və bu mesajı nə qədər tez emal edə bilməsindən asılı olmayaraq, mesaj dərc edildikdən dərhal sonra naşirin buferindən silinir.
  • Bütün abunəçilər mesajı eyni vaxtda alacaqlar. Abunəçilər özləri eyni mesajı emal etmək qaydasında birtəhər razılaşmalıdırlar.
  • Abunəçinin mesajı uğurla emal etdiyini təsdiqləyən daxili mexanizm yoxdur. Abunəçi bir mesaj alırsa və emal zamanı qəzaya uğrayarsa, nəşriyyat bu barədə bilməyəcək.

Redis siyahısı

Redis List oxu əmrlərinin bloklanmasını dəstəkləyən məlumat strukturudur. Siz siyahının əvvəlindən və ya sonundan mesajlar əlavə edə və oxuya bilərsiniz. Bu quruluşa əsaslanaraq, paylanmış sisteminiz üçün yaxşı bir yığın və ya növbə yarada bilərsiniz və əksər hallarda bu kifayət edəcəkdir. Redis Pub/Sub-dan əsas fərqlər:

  • Mesaj bir müştəriyə çatdırılır. Oxunması bloklanmış ilk müştəri ilk olaraq məlumatları alacaq.
  • Klint hər mesaj üçün oxu əməliyyatını özü başlamalıdır. Siyahı müştərilər haqqında heç nə bilmir.
  • Mesajlar kimsə onları oxuyana və ya açıq şəkildə silənə qədər saxlanılır. Redis serverini məlumatları diskə silmək üçün konfiqurasiya etsəniz, sistemin etibarlılığı kəskin şəkildə artır.

Stream-a giriş

Axına giriş əlavə edilir

Komanda XADD axına yeni giriş əlavə edir. Qeyd sadəcə sətir deyil, bir və ya bir neçə açar-dəyər cütündən ibarətdir. Beləliklə, hər bir giriş artıq strukturlaşdırılmışdır və CSV faylının strukturuna bənzəyir.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Yuxarıdakı nümunədə "mystream" adı (açarı) olan axına iki sahə əlavə edirik: müvafiq olaraq "1234" və "19.8" dəyərləri olan "sensor-id" və "temperatur". İkinci arqument olaraq, komanda girişə təyin ediləcək identifikatoru götürür - bu identifikator axındakı hər bir girişi unikal şəkildə müəyyənləşdirir. Bununla belə, bu halda biz * keçdik, çünki Redis-in bizim üçün yeni identifikator yaratmasını istəyirik. Hər yeni ID artırılacaq. Buna görə də, hər bir yeni giriş əvvəlki girişlərə nisbətən daha yüksək identifikatora malik olacaq.

İdentifikator formatı

Komanda tərəfindən qaytarılan giriş identifikatoru XADD, iki hissədən ibarətdir:

{millisecondsTime}-{sequenceNumber}

millisaniyə vaxt — Unix vaxtı millisaniyələrlə (Redis server vaxtı). Bununla belə, cari vaxt əvvəlki qeydin vaxtı ilə eyni və ya ondan azdırsa, əvvəlki qeydin vaxt möhürü istifadə olunur. Buna görə də, əgər server vaxtı geriyə gedirsə, yeni identifikator yenə də artım xüsusiyyətini saxlayacaq.

ardıcıllıq nömrəsi eyni millisaniyədə yaradılmış qeydlər üçün istifadə olunur. ardıcıllıq nömrəsi əvvəlki girişə nisbətən 1 artacaq. Çünki ardıcıllıq nömrəsi ölçüsü 64 bitdir, onda praktikada bir millisaniyə ərzində yaradıla bilən qeydlərin sayına məhdudiyyət qoymamalısınız.

Belə identifikatorların formatı ilk baxışdan qəribə görünə bilər. Etibarsız oxucu vaxtın niyə identifikatorun bir hissəsi olduğunu düşünə bilər. Bunun səbəbi Redis axınlarının ID ilə diapazon sorğularını dəstəkləməsidir. İdentifikator qeydin yaradıldığı vaxtla əlaqəli olduğundan, bu, vaxt diapazonlarını sorğulamağa imkan verir. Əmrə baxdığımız zaman konkret nümunəyə baxacağıq XRANGE.

Əgər nədənsə istifadəçi, məsələn, hansısa xarici sistemlə əlaqəli olan öz identifikatorunu təyin etməlidirsə, onda biz onu əmrə ötürə bilərik. XADD aşağıda göstərildiyi kimi * əvəzinə:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Nəzərə alın ki, bu halda siz şəxsiyyət vəsiqəsi artımına özünüz nəzarət etməlisiniz. Bizim nümunəmizdə minimum identifikator "0-1"dir, buna görə də komanda "0-1"-ə bərabər və ya ondan kiçik olan başqa identifikatoru qəbul etməyəcək.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Hər axın üzrə qeydlərin sayı

Sadəcə əmrdən istifadə etməklə axındakı qeydlərin sayını əldə etmək mümkündür XLEN. Bizim nümunəmiz üçün bu əmr aşağıdakı dəyəri qaytaracaq:

> XLEN somestream
(integer) 2

Aralıq sorğuları - XRANGE və XREVRANGE

Diapazon üzrə məlumat tələb etmək üçün biz iki identifikator göstərməliyik - diapazonun başlanğıcı və sonu. Qaytarılan aralığa sərhədlər daxil olmaqla bütün elementlər daxil olacaq. Həmçinin iki xüsusi identifikator “-” və “+” var, müvafiq olaraq axındakı ən kiçik (ilk qeyd) və ən böyük (son qeyd) identifikatoru deməkdir. Aşağıdakı nümunə bütün axın girişlərini sadalayacaq.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Hər bir qaytarılmış qeyd iki elementdən ibarət massivdir: identifikator və açar-dəyər cütlərinin siyahısı. Artıq qeyd etmişdik ki, qeyd identifikatorları zamanla bağlıdır. Buna görə də, biz müəyyən bir zaman aralığını tələb edə bilərik. Bununla belə, sorğuda tam identifikatoru deyil, yalnız Unix vaxtını qeyd edə bilərik. ardıcıllıq nömrəsi. İdentifikatorun buraxılmış hissəsi avtomatik olaraq diapazonun əvvəlində sıfıra və diapazonun sonunda mümkün olan maksimum dəyərə təyin ediləcək. Aşağıda iki millisaniyəlik diapazonu necə tələb edə biləcəyiniz nümunəsi verilmişdir.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Bu diapazonda yalnız bir girişimiz var, lakin real məlumat dəstlərində qaytarılan nəticə böyük ola bilər. Bu səbəbdən XRANGE COUNT seçimini dəstəkləyir. Kəmiyyəti qeyd etməklə, sadəcə olaraq ilk N qeydi əldə edə bilərik. Növbəti N qeydləri (səhifələşdirmə) əldə etmək lazımdırsa, son alınan ID-dən istifadə edə bilərik, onu artıra bilərik ardıcıllıq nömrəsi bir dəfə və yenidən soruşun. Buna aşağıdakı misalda baxaq. ilə 10 element əlavə etməyə başlayırıq XADD (fərz edək ki, mystream artıq 10 elementlə doldurulub). Hər komanda üçün 2 element əldə edən iterasiyaya başlamaq üçün biz tam diapazondan başlayırıq, lakin 2-yə bərabər COUNT ilə.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Növbəti iki elementlə təkrarlamağa davam etmək üçün biz alınan son identifikatoru seçməliyik, yəni 1519073279157-0 və üzərinə 1 əlavə etməliyik. ardıcıllıq nömrəsi.
Nəticədə identifikator, bu halda 1519073279157-1, indi növbəti zəng üçün diapazonun yeni başlanğıcı arqumenti kimi istifadə edilə bilər. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Və s. Çünki mürəkkəblik XRANGE axtarış etmək üçün O(log(N)), sonra isə M elementi qaytarmaq üçün O(M) olarsa, hər bir iterasiya addımı sürətlidir. Beləliklə, istifadə XRANGE axınlar səmərəli şəkildə təkrarlana bilər.

Komanda XREVRANGE ekvivalentdir XRANGE, lakin elementləri tərs qaydada qaytarır:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Qeyd edək ki, əmr XREVRANGE aralıq arqumentlərini tərs qaydada başlayır və dayandırır.

XREAD istifadə edərək yeni qeydlərin oxunması

Çox vaxt vəzifə axına abunə olmaq və yalnız yeni mesajlar almaqdan yaranır. Bu konsepsiya Redis Pub/Sub və ya Redis List-i bloklayan kimi görünə bilər, lakin Redis Stream-dən necə istifadə olunacağına dair əsas fərqlər var:

  1. Hər bir yeni mesaj standart olaraq hər bir abunəçiyə çatdırılır. Bu davranış bloklayan Redis Siyahısından fərqlidir, burada yeni mesaj yalnız bir abunəçi tərəfindən oxunacaqdır.
  2. Redis Pub/Sub-da bütün mesajlar unudulub və heç vaxt saxlanmadığı halda, Stream-də bütün mesajlar qeyri-müəyyən müddətə saxlanılır (müştəri açıq şəkildə silinməyə səbəb olmadıqda).
  3. Redis Stream sizə bir axın daxilində mesajlara girişi fərqləndirməyə imkan verir. Müəyyən bir abunəçi yalnız şəxsi mesaj tarixçəsini görə bilər.

Komandadan istifadə edərək mövzuya abunə ola və yeni mesajlar ala bilərsiniz XOXUYUN. Bir az daha mürəkkəbdir XRANGE, buna görə də əvvəlcə daha sadə nümunələrlə başlayacağıq.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Yuxarıdakı nümunə bloklanmayan bir formanı göstərir XOXUYUN. Qeyd edək ki, COUNT seçimi isteğe bağlıdır. Əslində, tələb olunan yeganə əmr seçimi müvafiq maksimum identifikatorla birlikdə axınların siyahısını müəyyən edən STREAMS seçimidir. Biz “STREAMS mystream 0” yazdıq – biz “0-0”dan böyük identifikatorla mystream axınının bütün qeydlərini almaq istəyirik. Nümunədən göründüyü kimi, biz eyni anda bir neçə mövzuya abunə ola bildiyimiz üçün əmr mövzunun adını qaytarır. Məsələn, "STREAMS mystream otherstream 0 0" yaza bilərik. Nəzərə alın ki, STREAMS seçimindən sonra əvvəlcə bütün tələb olunan axınların adlarını və yalnız sonra identifikatorların siyahısını təqdim etməliyik.

Bu sadə formada komanda ilə müqayisədə xüsusi bir şey yoxdur XRANGE. Ancaq maraqlısı odur ki, biz asanlıqla dönə bilirik XOXUYUN BLOCK arqumentini göstərərək bloklama əmrinə:

> XREAD BLOCK 0 STREAMS mystream $

Yuxarıdakı misalda 0 millisaniyəlik fasilə ilə yeni BLOCK seçimi göstərilmişdir (bu, qeyri-müəyyən müddətə gözləmək deməkdir). Üstəlik, axın mystream üçün adi identifikatoru ötürmək əvəzinə, xüsusi $ identifikatoru keçdi. Bu xüsusi identifikator o deməkdir ki XOXUYUN identifikator kimi mystream-də maksimum identifikatordan istifadə etməlidir. Beləliklə, biz yalnız dinləməyə başladığımız andan etibarən yeni mesajlar alacağıq. Bu, müəyyən mənada Unix-in "tail -f" əmrinə bənzəyir.

Qeyd edək ki, BLOCK seçimindən istifadə edərkən $ xüsusi identifikatorundan istifadə etmək məcburiyyətində deyilik. Biz axında mövcud olan istənilən identifikatordan istifadə edə bilərik. Əgər komanda bloklamadan sorğumuza dərhal xidmət göstərə bilsə, bunu edəcək, əks halda bloklayacaq.

Bloklama XOXUYUN eyni zamanda birdən çox mövzuya qulaq asa bilərsiniz, sadəcə adlarını qeyd etməlisiniz. Bu halda, komanda məlumatı qəbul edən ilk axının qeydini qaytaracaq. Verilən başlıq üçün bloklanan ilk abunəçi ilk olaraq məlumatları alacaq.

İstehlakçı Qrupları

Müəyyən tapşırıqlarda abunəçinin bir başlıq daxilində mesajlara girişini məhdudlaşdırmaq istəyirik. Bunun faydalı ola biləcəyi bir nümunə, mesajın işlənməsini miqyaslandırmağa imkan verən mövzudan fərqli mesajlar alacaq işçilərlə mesaj növbəsidir.

Üç C1, C2, C3 abunəçimiz və 1, 2, 3, 4, 5, 6, 7 mesajlarını ehtiva edən bir başlığımız olduğunu təsəvvür etsək, mesajlar aşağıdakı diaqramdakı kimi təqdim ediləcək:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Bu effektə nail olmaq üçün Redis Stream Consumer Group adlı konsepsiyadan istifadə edir. Bu konsepsiya bir axından məlumat alan, lakin faktiki olaraq müəyyən zəmanətlər verən qrup daxilində birdən çox abunəçi tərəfindən xidmət göstərən psevdo abunəçiyə bənzəyir:

  1. Hər bir mesaj qrup daxilində fərqli abunəçiyə çatdırılır.
  2. Qrup daxilində abunəçilər hərf-həssas sətir olan adları ilə müəyyən edilir. Abunəçi müvəqqəti olaraq qrupdan çıxarsa, o, öz unikal adından istifadə edərək qrupa bərpa oluna bilər.
  3. Hər bir İstehlakçı Qrupu “ilk oxunmamış mesaj” konsepsiyasına əməl edir. Abunəçi yeni mesajlar tələb etdikdə, o, yalnız əvvəllər qrup daxilində heç bir abunəçiyə çatdırılmamış mesajları qəbul edə bilər.
  4. Mesajın abunəçi tərəfindən uğurla işləndiyini açıq şəkildə təsdiqləmək üçün bir əmr var. Bu əmr çağırılana qədər tələb olunan mesaj "gözləyən" statusunda qalacaq.
  5. İstehlakçılar Qrupu daxilində hər bir abunəçi ona çatdırılmış, lakin hələ də işlənməmiş ("gözləyən" statusunda) mesajların tarixçəsini tələb edə bilər.

Müəyyən mənada qrupun vəziyyətini belə ifadə etmək olar:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

İndi İstehlakçı Qrupu üçün əsas əmrlərlə tanış olmaq vaxtıdır, yəni:

  • XGROUP qruplar yaratmaq, məhv etmək və idarə etmək üçün istifadə olunur
  • XREADGROUP qrup vasitəsilə axını oxumaq üçün istifadə olunur
  • XACK - bu əmr abunəçiyə mesajı uğurla işlənmiş kimi qeyd etməyə imkan verir

İstehlakçı Qrupunun yaradılması

Tutaq ki, mystream artıq mövcuddur. Sonra qrup yaratma əmri belə görünəcək:

> XGROUP CREATE mystream mygroup $
OK

Qrup yaradarkən, qrup mesajları alacağı identifikatoru keçməliyik. Əgər biz sadəcə olaraq bütün yeni mesajları almaq istəyiriksə, o zaman xüsusi $ identifikatorundan istifadə edə bilərik (yuxarıdakı nümunəmizdə olduğu kimi). Xüsusi identifikator əvəzinə 0 göstərsəniz, mövzudakı bütün mesajlar qrup üçün əlçatan olacaq.

Qrup yaradıldığından dərhal əmrdən istifadə edərək mesajları oxumağa başlaya bilərik XREADGROUP. Bu əmr çox oxşardır XOXUYUN və isteğe bağlı BLOCK seçimini dəstəkləyir. Bununla belə, həmişə iki arqumentlə göstərilməli olan tələb olunan QRUP seçimi var: qrup adı və abunəçi adı. COUNT seçimi də dəstəklənir.

Mövzunu oxumazdan əvvəl ora bəzi mesajlar qoyaq:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

İndi qrup vasitəsilə bu axını oxumağa çalışaq:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Yuxarıdakı əmr hərfi olaraq aşağıdakı kimi oxunur:

"Mən, abunəçi Alice, mənim qrupunun üzvü, mənim axınımdan əvvəllər heç kimə çatdırılmamış bir mesaj oxumaq istəyirəm."

Abunəçi hər dəfə qrup üzərində əməliyyat həyata keçirdikdə, o, qrup daxilində özünü unikal şəkildə tanıyaraq, adını təqdim etməlidir. Yuxarıdakı əmrdə daha bir çox vacib detal var - xüsusi identifikator ">". Bu xüsusi identifikator mesajları süzür və yalnız əvvəllər heç vaxt çatdırılmamış mesajları buraxır.

Həmçinin, xüsusi hallarda siz 0 və ya hər hansı digər etibarlı identifikator kimi real identifikator təyin edə bilərsiniz. Bu vəziyyətdə əmr XREADGROUP göstərilən abunəçiyə (Alice) çatdırılmış, lakin əmrdən istifadə edərək hələ təsdiqlənməmiş "gözləyən" statuslu mesajların tarixçəsini sizə qaytaracaq XACK.

Bu davranışı seçim olmadan dərhal ID 0 göstərərək yoxlaya bilərik COUNT. Sadəcə bir gözlənilən mesajı, yəni alma mesajını görəcəyik:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Bununla belə, mesajın uğurla emal edildiyini təsdiqləsək, o, artıq göstərilməyəcək:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

İndi bir şey oxumaq növbəsi Bobdadır:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Qrupun üzvü Bob iki mesajdan çox olmamasını istədi. Komanda yalnız ">" xüsusi identifikatoruna görə çatdırılmamış mesajları bildirir. Gördüyünüz kimi, "alma" mesajı artıq Aliceyə çatdırıldığı üçün göstərilməyəcək, buna görə Bob "portağal" və "çiyələk" alır.

Beləliklə, Alice, Bob və qrupun hər hansı digər abunəçisi eyni yayımdan fərqli mesajları oxuya bilər. Onlar həmçinin işlənməmiş mesajların tarixçəsini oxuya və ya mesajları işlənmiş kimi qeyd edə bilərlər.

Nəzərə almaq lazım olan bir neçə şey var:

  • Abunəçi mesajı əmr hesab edən kimi XREADGROUP, bu mesaj “gözləyən” vəziyyətinə keçir və həmin xüsusi abunəçiyə təyin edilir. Digər qrup abunəçiləri bu mesajı oxuya bilməyəcəklər.
  • Abunəçilər ilk qeyd edildikdən sonra avtomatik olaraq yaradılır, onları açıq şəkildə yaratmağa ehtiyac yoxdur.
  • Ilə XREADGROUP siz eyni vaxtda bir neçə fərqli mövzudan gələn mesajları oxuya bilərsiniz, lakin bunun işləməsi üçün əvvəlcə hər mövzu üçün eyni adda qruplar yaratmalısınız. XGROUP

Uğursuzluğun Bərpası

Abunəçi uğursuzluqdan qurtula və "gözləyən" statusu ilə mesajlar siyahısını yenidən oxuya bilər. Bununla belə, real dünyada abunəçilər nəticədə uğursuz ola bilər. Abunəçi uğursuzluqdan qurtula bilmirsə, abunəçinin ilişib qalmış mesajları ilə nə baş verir?
Consumer Group, məhz belə hallar üçün - mesajların sahibini dəyişmək lazım gəldikdə istifadə olunan funksiyanı təklif edir.

Etməli olduğunuz ilk şey əmri çağırmaqdır XPENDING, bu, qrupdakı bütün mesajları “gözləmədədir” statusu ilə göstərir. Ən sadə formada əmr yalnız iki arqumentlə çağırılır: mövzu adı və qrup adı:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Komanda bütün qrup və hər bir abunəçi üçün işlənməmiş mesajların sayını göstərdi. Əlimizdə yalnız iki əlamətdar mesajı olan Bob var, çünki Alicenin tələb etdiyi yeganə mesaj onunla təsdiqlənib XACK.

Daha çox arqumentdən istifadə edərək daha çox məlumat tələb edə bilərik:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - identifikatorlar diapazonu (“-” və “+” istifadə edə bilərsiniz)
{count} — çatdırılma cəhdlərinin sayı
{istehlakçı adı} - qrup adı

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

İndi hər bir mesaj üçün təfərrüatlarımız var: ID, abunəçi adı, millisaniyələrdə boş qalma vaxtı və nəhayət çatdırılma cəhdlərinin sayı. Bobdan iki mesajımız var və onlar 74170458 millisaniyə, təxminən 20 saat boş qalıb.

Nəzərə alın ki, heç kim mesajın məzmununu sadəcə istifadə etməklə yoxlamağımıza mane olmur XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Sadəcə, arqumentlərdə eyni identifikatoru iki dəfə təkrarlamalıyıq. İndi bir fikrimiz var, Alice qərar verə bilər ki, 20 saatlıq fasilədən sonra Bob çox güman ki, sağalmayacaq və bu mesajları sorğulamaq və onları Bob üçün emal etməyə davam etmək vaxtıdır. Bunun üçün əmrdən istifadə edirik XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Bu əmrdən istifadə edərək, sahibini {istehlakçı} olaraq dəyişdirməklə hələ emal olunmamış “xarici” mesaj ala bilərik. Bununla belə, biz minimum boş qalma müddətini də təmin edə bilərik {min-idle-time}. Bu, iki müştərinin eyni mesajların sahibini eyni vaxtda dəyişməyə çalışdığı bir vəziyyətdən qaçmağa kömək edir:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

İlk müştəri dayanma müddətini sıfırlayacaq və çatdırılma sayğacını artıracaq. Beləliklə, ikinci müştəri bunu tələb edə bilməyəcək.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mesajı uğurla qəbul edən Alice artıq mesajı emal edə və onu qəbul edə bilər.

Yuxarıdakı nümunədən görə bilərsiniz ki, uğurlu sorğu mesajın məzmununu qaytarır. Bununla belə, bu lazım deyil. JUSTID seçimi yalnız mesaj identifikatorlarını qaytarmaq üçün istifadə edilə bilər. Mesajın təfərrüatları ilə maraqlanmırsınızsa və sistemin işini artırmaq istəyirsinizsə, bu faydalıdır.

Çatdırılma sayğacı

Çıxışda gördüyünüz sayğac XPENDING hər bir mesajın çatdırılma sayıdır. Belə bir sayğac iki yolla artırılır: mesaj uğurla tələb edildikdə XCLAIM və ya zəng istifadə edildikdə XREADGROUP.

Bəzi mesajların dəfələrlə çatdırılması normaldır. Əsas odur ki, bütün mesajlar nəticədə işlənir. Bəzən mesajın işlənməsi zamanı problemlər yaranır, çünki mesajın özü zədələnir və ya mesajın işlənməsi işləyici kodunda xətaya səbəb olur. Bu halda, heç kimin bu mesajı emal edə bilməyəcəyi ortaya çıxa bilər. Çatdırılma cəhdi sayğacımız olduğundan, biz bu sayğacı belə halları aşkar etmək üçün istifadə edə bilərik. Buna görə də, çatdırılma sayı qeyd etdiyiniz yüksək rəqəmə çatdıqdan sonra, belə bir mesajı başqa bir mövzuya yerləşdirmək və sistem administratoruna bildiriş göndərmək daha ağıllı olardı.

Mövzu vəziyyəti

Komanda XİNFO mövzu və onun qrupları haqqında müxtəlif məlumatları tələb etmək üçün istifadə olunur. Məsələn, əsas əmr belə görünür:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Yuxarıdakı əmr göstərilən axın haqqında ümumi məlumatları göstərir. İndi bir az daha mürəkkəb bir nümunə:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Yuxarıdakı əmr göstərilən mövzunun bütün qrupları üçün ümumi məlumatları göstərir

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Yuxarıdakı əmr göstərilən axın və qrupun bütün abunəçiləri üçün məlumatları göstərir.
Əmr sintaksisini unutsanız, kömək üçün əmrin özündən soruşun:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Yayım Ölçüsü Limiti

Bir çox proqram verilənləri həmişəlik axına toplamaq istəmir. Hər mövzuda icazə verilən maksimum mesaj sayının olması çox vaxt faydalıdır. Digər hallarda, göstərilən mövzu ölçüsünə çatdıqda bütün mesajları bir başlıqdan başqa bir davamlı mağazaya köçürmək faydalıdır. Komandada MAXLEN parametrindən istifadə edərək axının ölçüsünü məhdudlaşdıra bilərsiniz XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLEN istifadə edərkən, müəyyən uzunluğa çatdıqda köhnə qeydlər avtomatik olaraq silinir, buna görə də axın sabit ölçüyə malikdir. Lakin bu halda budama Redis yaddaşında ən effektiv şəkildə baş vermir. Vəziyyəti aşağıdakı kimi yaxşılaşdıra bilərsiniz:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Yuxarıdakı misaldakı ~ arqumenti o deməkdir ki, axın uzunluğunu müəyyən bir dəyərlə məhdudlaşdırmağa ehtiyacımız yoxdur. Bizim nümunəmizdə bu, 1000-dən böyük və ya ona bərabər istənilən rəqəm ola bilər (məsələn, 1000, 1010 və ya 1030). Biz sadəcə olaraq açıq şəkildə qeyd etdik ki, axınımızın ən azı 1000 qeyd saxlamasını istəyirik. Bu, Redis daxilində yaddaşın idarə edilməsini daha səmərəli edir.

Ayrı bir komanda da var XTRIM, eyni şeyi edir:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Davamlı saxlama və təkrarlama

Redis Stream asinxron şəkildə qul qovşaqlarına təkrarlanır və AOF (bütün məlumatların snapshot) və RDB (bütün yazma əməliyyatlarının jurnalı) kimi fayllarda saxlanılır. İstehlakçı Qrupları vəziyyətinin təkrarlanması da dəstəklənir. Buna görə də, əgər mesaj əsas qovşaqda “gözləyən” statusundadırsa, kölə qovşaqlarda bu mesaj eyni statusa malik olacaq.

Axından fərdi elementlərin çıxarılması

Mesajları silmək üçün xüsusi bir əmr var XDEL. Komanda silinəcək mesaj identifikatorlarının ardınca başlığın adını alır:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Bu əmrdən istifadə edərkən faktiki yaddaşın dərhal buraxılmayacağını nəzərə almaq lazımdır.

Sıfır uzunluqlu axınlar

Axınlar və digər Redis məlumat strukturları arasındakı fərq ondan ibarətdir ki, digər məlumat strukturlarında artıq elementlər olmadıqda, yan təsir olaraq, məlumat strukturunun özü yaddaşdan silinəcəkdir. Beləliklə, məsələn, ZREM çağırışı sonuncu elementi sildikdə çeşidlənmiş dəst tamamilə silinəcəkdir. Bunun əvəzinə, iplərin yaddaşda heç bir element olmadan belə qalmasına icazə verilir.

Nəticə

Redis Stream mesaj brokerləri, mesaj növbələri, vahid giriş və tarix saxlayan söhbət sistemləri yaratmaq üçün idealdır.

Bir dəfə dediyim kimi Niklaus Wirth, proqramlar alqoritmlər və məlumat strukturlarıdır və Redis artıq sizə hər ikisini verir.

Mənbə: www.habr.com

Добавить комментарий