Redis Stream - قابلیت اطمینان و مقیاس پذیری سیستم های پیام رسانی شما

Redis Stream - قابلیت اطمینان و مقیاس پذیری سیستم های پیام رسانی شما

Redis Stream یک نوع داده انتزاعی جدید است که در Redis با نسخه 5.0 معرفی شده است
از نظر مفهومی، ردیس استریم فهرستی است که می‌توانید ورودی‌ها را به آن اضافه کنید. هر ورودی دارای یک شناسه منحصر به فرد است. به‌طور پیش‌فرض، شناسه به‌طور خودکار تولید می‌شود و دارای مهر زمانی است. بنابراین، می‌توانید محدوده‌هایی از رکوردها را در طول زمان پرس و جو کنید، یا داده‌های جدیدی را با رسیدن به جریان دریافت کنید، دقیقاً مانند دستور «tail -f» یونیکس که یک فایل گزارش را می‌خواند و در حین انتظار برای داده‌های جدید ثابت می‌شود. توجه داشته باشید که چندین مشتری می توانند همزمان به یک رشته گوش دهند، همانطور که بسیاری از فرآیندهای "tail -f" می توانند یک فایل را به طور همزمان بدون تضاد با یکدیگر بخوانند.

برای درک تمام مزایای نوع داده جدید، بیایید نگاهی گذرا به ساختارهای قدیمی Redis بیندازیم که تا حدی عملکرد Redis Stream را تکرار می کنند.

Redis PUB/SUB

Redis Pub/Sub یک سیستم پیام رسانی ساده است که قبلاً در فروشگاه ارزش کلیدی شما تعبیه شده است. با این حال، سادگی قیمت دارد:

  • اگر ناشر به دلایلی شکست بخورد، تمام مشترکین خود را از دست می دهد
  • ناشر باید آدرس دقیق همه مشترکین خود را بداند
  • اگر داده‌ها سریع‌تر از پردازش داده‌ها منتشر شوند، ممکن است یک ناشر کار زیادی برای مشترکان خود ایجاد کند
  • پیام بلافاصله پس از انتشار از بافر ناشر حذف می شود، صرف نظر از اینکه به چند مشترک تحویل داده شده است و با چه سرعتی قادر به پردازش این پیام بوده اند.
  • همه مشترکین پیام را به طور همزمان دریافت خواهند کرد. خود مشترکین باید به نحوی بین خود در مورد ترتیب پردازش همان پیام به توافق برسند.
  • هیچ مکانیزم داخلی برای تأیید اینکه یک مشترک با موفقیت یک پیام را پردازش کرده است وجود ندارد. اگر مشترکی پیامی دریافت کند و در حین پردازش از کار بیفتد، ناشر از آن اطلاعی نخواهد داشت.

لیست ردیس

Redis List یک ساختار داده است که از مسدود کردن دستورات خواندن پشتیبانی می کند. می توانید پیام ها را از ابتدا یا انتهای لیست اضافه کرده و بخوانید. بر اساس این ساختار، می توانید یک پشته یا صف خوب برای سیستم توزیع شده خود ایجاد کنید و در بیشتر موارد این کافی خواهد بود. تفاوت های اصلی با Redis Pub/Sub:

  • پیام به یک مشتری تحویل داده می شود. اولین کلاینت خوانده شده مسدود شده ابتدا داده ها را دریافت می کند.
  • کلینت باید عملیات خواندن هر پیام را خودش آغاز کند. لیست چیزی در مورد مشتریان نمی داند.
  • پیام ها ذخیره می شوند تا زمانی که شخصی آنها را بخواند یا به صراحت آنها را حذف کند. اگر سرور Redis را به گونه ای پیکربندی کنید که داده ها را روی دیسک تخلیه کند، قابلیت اطمینان سیستم به طور چشمگیری افزایش می یابد.

مقدمه ای بر استریم

افزودن ورودی به جریان

تیم XADD یک ورودی جدید به جریان اضافه می کند. رکورد فقط یک رشته نیست، بلکه از یک یا چند جفت کلید-مقدار تشکیل شده است. بنابراین، هر ورودی از قبل ساختار یافته است و شبیه ساختار یک فایل CSV است.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

در مثال بالا، دو فیلد با نام (کلید) "mystream" به جریان اضافه می کنیم: "sensor-id" و "temperature" به ترتیب با مقادیر "1234" و "19.8". به عنوان آرگومان دوم، فرمان یک شناسه می گیرد که به ورودی اختصاص داده می شود - این شناسه به طور منحصر به فرد هر ورودی در جریان را شناسایی می کند. با این حال، در این مورد ما * را پاس کردیم زیرا می خواهیم Redis یک ID جدید برای ما ایجاد کند. هر شناسه جدید افزایش می یابد. بنابراین، هر ورودی جدید نسبت به ورودی های قبلی دارای شناسه بالاتری خواهد بود.

فرمت شناسه

شناسه ورودی که توسط دستور برگردانده شده است XADD، از دو بخش تشکیل شده است:

{millisecondsTime}-{sequenceNumber}

millisecondsTime - زمان یونیکس در میلی ثانیه (زمان سرور Redis). با این حال، اگر زمان فعلی یکسان یا کمتر از زمان ضبط قبلی باشد، از مهر زمانی ضبط قبلی استفاده می شود. بنابراین، اگر زمان سرور به گذشته برگردد، شناسه جدید همچنان ویژگی افزایشی را حفظ خواهد کرد.

شماره ترتیب برای رکوردهای ایجاد شده در همان میلی ثانیه استفاده می شود. شماره ترتیب نسبت به ورودی قبلی 1 افزایش می یابد. از آنجا که شماره ترتیب اندازه 64 بیت است، پس در عمل نباید با محدودیتی در تعداد رکوردهایی که می توان در عرض یک میلی ثانیه تولید کرد مواجه شوید.

قالب چنین شناسه هایی ممکن است در نگاه اول عجیب به نظر برسد. یک خواننده بی اعتماد ممکن است تعجب کند که چرا زمان بخشی از شناسه است. دلیل آن این است که استریم های Redis از پرس و جوهای محدوده توسط ID پشتیبانی می کنند. از آنجایی که شناسه با زمان ایجاد رکورد مرتبط است، این امکان جستجو در محدوده زمانی را فراهم می کند. وقتی به دستور نگاه می کنیم به یک مثال خاص نگاه می کنیم XRANGE.

اگر به دلایلی کاربر نیاز به تعیین شناسه خود داشته باشد که مثلاً با سیستم خارجی مرتبط است، می توانیم آن را به دستور ارسال کنیم. XADD به جای * مانند شکل زیر:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

لطفاً توجه داشته باشید که در این صورت باید خودتان بر افزایش شناسه نظارت داشته باشید. در مثال ما، حداقل شناسه "0-1" است، بنابراین این دستور شناسه دیگری را که مساوی یا کمتر از "0-1" باشد، نمی پذیرد.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

تعداد رکوردها در هر جریان

به سادگی با استفاده از دستور می توان تعداد رکوردهای یک استریم را بدست آورد XLEN. برای مثال ما، این دستور مقدار زیر را برمی گرداند:

> XLEN somestream
(integer) 2

جستارهای محدوده - XRANGE و XREVRANGE

برای درخواست داده بر اساس محدوده، باید دو شناسه - ابتدا و انتهای محدوده را مشخص کنیم. محدوده برگشتی شامل تمام عناصر، از جمله مرزها می شود. همچنین دو شناسه ویژه "-" و "+" وجود دارد که به ترتیب به معنای کوچکترین (اولین رکورد) و بزرگترین (آخرین رکورد) شناسه در جریان است. مثال زیر تمام ورودی‌های جریان را فهرست می‌کند.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

هر رکورد برگشتی آرایه ای از دو عنصر است: یک شناسه و یک لیست از جفت های کلید-مقدار. قبلاً گفتیم که شناسه های رکورد مربوط به زمان هستند. بنابراین، ما می توانیم محدوده ای از یک بازه زمانی خاص را درخواست کنیم. با این حال، ما می توانیم در درخواست نه شناسه کامل، بلکه فقط زمان یونیکس را مشخص کنیم و قسمت مربوط به شماره ترتیب. قسمت حذف شده از شناسه به طور خودکار در ابتدای محدوده صفر و در انتهای محدوده به حداکثر مقدار ممکن تنظیم می شود. در زیر مثالی از نحوه درخواست محدوده دو میلی ثانیه ای آورده شده است.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

ما فقط یک ورودی در این محدوده داریم، با این حال در مجموعه داده های واقعی، نتیجه بازگشتی می تواند بسیار زیاد باشد. به این دلیل XRANGE از گزینه COUNT پشتیبانی می کند. با تعیین کمیت، می توانیم به سادگی اولین N رکورد را بدست آوریم. در صورت نیاز به دریافت N رکورد بعدی (صفحه بندی)، می توانیم از آخرین شناسه دریافتی استفاده کنیم، آن را افزایش دهیم شماره ترتیب یکی و دوباره بپرس بیایید در مثال زیر به این موضوع نگاه کنیم. ما شروع به اضافه کردن 10 عنصر با XADD (با فرض اینکه mystream قبلاً با 10 عنصر پر شده بود). برای شروع تکرار با دریافت 2 عنصر در هر دستور، با محدوده کامل اما با COUNT برابر با 2 شروع می کنیم.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

برای ادامه تکرار با دو عنصر بعدی، باید آخرین شناسه دریافتی، یعنی 1519073279157-0 را انتخاب کرده و 1 را به آن اضافه کنیم. شماره ترتیب.
شناسه حاصل، در این مورد 1519073279157-1، اکنون می تواند به عنوان آرگومان شروع جدید محدوده برای فراخوانی بعدی استفاده شود. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

و غیره. چون پیچیدگی XRANGE برای جستجو O(log(N)) و سپس O(M) برای برگرداندن عناصر M است، سپس هر مرحله تکرار سریع است. بنابراین، با استفاده از XRANGE جریان ها را می توان به طور موثر تکرار کرد.

تیم XREVRANGE معادل است XRANGE، اما عناصر را به ترتیب معکوس برمی گرداند:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

لطفا توجه داشته باشید که دستور XREVRANGE آرگومان های محدوده شروع و توقف را به ترتیب معکوس می گیرد.

خواندن ورودی های جدید با استفاده از XREAD

اغلب وظیفه مشترک شدن در یک جریان و دریافت فقط پیام های جدید است. این مفهوم ممکن است شبیه Redis Pub/Sub یا مسدود کردن Redis List به نظر برسد، اما تفاوت‌های اساسی در نحوه استفاده از Redis Stream وجود دارد:

  1. هر پیام جدید به طور پیش فرض به هر مشترک تحویل داده می شود. این رفتار با مسدود کردن لیست Redis متفاوت است، جایی که یک پیام جدید فقط توسط یک مشترک خوانده می شود.
  2. در حالی که در Redis Pub/Sub همه پیام‌ها فراموش می‌شوند و هرگز باقی نمی‌مانند، در Stream همه پیام‌ها به‌طور نامحدود حفظ می‌شوند (مگر اینکه مشتری صراحتاً باعث حذف شود).
  3. Redis Stream به شما امکان می دهد دسترسی به پیام ها را در یک جریان متمایز کنید. یک مشترک خاص فقط می تواند تاریخچه پیام شخصی خود را ببیند.

با استفاده از دستور می توانید در یک رشته مشترک شوید و پیام های جدید دریافت کنید XREAD. کمی پیچیده تر از آن است XRANGE، بنابراین ابتدا با مثال های ساده تر شروع می کنیم.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

مثال بالا یک فرم غیر مسدود کننده را نشان می دهد XREAD. توجه داشته باشید که گزینه COUNT اختیاری است. در واقع تنها گزینه فرمان مورد نیاز گزینه STREAMS است که لیستی از جریان ها را به همراه حداکثر شناسه مربوطه مشخص می کند. ما نوشتیم "STREAMS mystream 0" - می خواهیم تمام رکوردهای جریان mystream را با شناسه بزرگتر از "0-0" دریافت کنیم. همانطور که از مثال می بینید، دستور نام رشته را برمی گرداند زیرا می توانیم همزمان در چندین رشته مشترک شویم. برای مثال می‌توانیم بنویسیم "STREAMS mystream otherstream 0 0". لطفاً توجه داشته باشید که پس از گزینه STREAMS ابتدا باید نام تمام جریان‌های مورد نیاز و تنها پس از آن فهرستی از شناسه‌ها ارائه شود.

در این شکل ساده دستور در مقایسه با آن کار خاصی انجام نمی دهد XRANGE. با این حال، نکته جالب این است که ما می توانیم به راحتی بچرخیم XREAD به یک دستور مسدود کننده، با مشخص کردن آرگومان BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

در مثال بالا، یک گزینه BLOCK جدید با بازه زمانی 0 میلی ثانیه مشخص شده است (این به معنای انتظار نامحدود است). علاوه بر این، به جای ارسال شناسه معمول برای جریان mystream، یک شناسه ویژه $ ارسال شد. این شناسه ویژه به این معنی است XREAD باید از حداکثر شناسه در mystream به عنوان شناسه استفاده کند. بنابراین ما فقط از لحظه ای که گوش دادن را شروع کرده ایم، پیام های جدید دریافت خواهیم کرد. از برخی جهات این شبیه به فرمان یونیکس "tail -f" است.

توجه داشته باشید که هنگام استفاده از گزینه BLOCK لزوماً نیازی به استفاده از شناسه ویژه $ نیست. ما می توانیم از هر شناسه موجود در جریان استفاده کنیم. اگر تیم بتواند درخواست ما را بلافاصله بدون مسدود کردن سرویس دهد، این کار را انجام می دهد، در غیر این صورت مسدود می شود.

مسدود کردن XREAD همچنین می تواند به چندین رشته به طور همزمان گوش دهد، فقط باید نام آنها را مشخص کنید. در این حالت، دستور یک رکورد از اولین جریانی که داده را دریافت کرده است، برمی گرداند. اولین مشترکی که برای یک جریان مشخص مسدود شده است ابتدا داده ها را دریافت می کند.

گروه های مصرف کننده

در وظایف خاصی، ما می خواهیم دسترسی مشترک به پیام های یک رشته را محدود کنیم. مثالی که می‌تواند مفید باشد، صف پیام با کارگران است که پیام‌های متفاوتی را از یک رشته دریافت می‌کنند و به پردازش پیام اجازه می‌دهند مقیاس شوند.

اگر تصور کنیم که سه مشترک C1، C2، C3 و یک رشته حاوی پیام های 1، 2، 3، 4، 5، 6، 7 داریم، پیام ها مانند نمودار زیر ارائه می شوند:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

برای دستیابی به این اثر، Redis Stream از مفهومی به نام Consumer Group استفاده می کند. این مفهوم شبیه به شبه مشترک است که داده‌ها را از یک جریان دریافت می‌کند، اما در واقع توسط چندین مشترک در یک گروه ارائه می‌شود و تضمین‌های خاصی را ارائه می‌دهد:

  1. هر پیام به مشترک دیگری در گروه تحویل داده می شود.
  2. در یک گروه، مشترکین با نامشان شناسایی می شوند که یک رشته حساس به حروف بزرگ و کوچک است. اگر مشترکی به طور موقت از گروه خارج شود، می تواند با استفاده از نام منحصر به فرد خود به گروه بازگردانده شود.
  3. هر گروه مصرف کننده از مفهوم "اولین پیام خوانده نشده" پیروی می کند. هنگامی که یک مشترک درخواست پیام های جدید می کند، فقط می تواند پیام هایی را دریافت کند که قبلاً هرگز به هیچ مشترکی در گروه ارسال نشده است.
  4. دستوری وجود دارد که صریحاً تأیید می کند که پیام با موفقیت توسط مشترک پردازش شده است. تا زمانی که این دستور فراخوانی نشود، پیام درخواستی در وضعیت "در انتظار" باقی می ماند.
  5. در گروه مصرف کننده، هر مشترک می تواند تاریخچه ای از پیام هایی را که به او تحویل داده شده است، اما هنوز پردازش نشده است (در وضعیت "در انتظار") درخواست کند.

به یک معنا، وضعیت گروه را می توان به صورت زیر بیان کرد:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

اکنون زمان آن است که با دستورات اصلی گروه مصرف کننده آشنا شوید، یعنی:

  • XGROUP برای ایجاد، تخریب و مدیریت گروه ها استفاده می شود
  • XREADGROUP برای خواندن جریان از طریق گروه استفاده می شود
  • XACK - این دستور به مشترک اجازه می دهد پیام را به عنوان پردازش موفقیت آمیز علامت گذاری کند

ایجاد گروه مصرف کننده

بیایید فرض کنیم که mystream از قبل وجود دارد. سپس دستور ایجاد گروه به شکل زیر خواهد بود:

> XGROUP CREATE mystream mygroup $
OK

هنگام ایجاد یک گروه، باید یک شناسه ارسال کنیم که از آن گروه پیام ها را دریافت می کند. اگر فقط می‌خواهیم همه پیام‌های جدید را دریافت کنیم، می‌توانیم از شناسه ویژه $ استفاده کنیم (مانند مثال بالا). اگر به جای یک شناسه خاص، 0 را مشخص کنید، تمام پیام‌های موجود در رشته برای گروه در دسترس خواهند بود.

اکنون که گروه ایجاد شده است، می توانیم بلافاصله با استفاده از دستور شروع به خواندن پیام ها کنیم XREADGROUP. این دستور بسیار شبیه به XREAD و از گزینه BLOCK اختیاری پشتیبانی می کند. با این حال، یک گزینه GROUP مورد نیاز وجود دارد که همیشه باید با دو آرگومان مشخص شود: نام گروه و نام مشترک. گزینه COUNT نیز پشتیبانی می شود.

قبل از خواندن تاپیک، اجازه دهید چند پیام را در آنجا قرار دهیم:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

حالا بیایید سعی کنیم این جریان را از طریق گروه بخوانیم:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

دستور بالا کلمه به کلمه به صورت زیر خوانده می شود:

من، مشترک آلیس، عضو mygroup، می‌خواهم یک پیام از mystream را بخوانم که قبلاً هرگز به کسی ارسال نشده است.»

هر بار که مشترکی عملیاتی را بر روی یک گروه انجام می دهد، باید نام خود را ارائه دهد و به طور منحصر به فرد خود را در گروه شناسایی کند. یک جزئیات بسیار مهم دیگر در دستور بالا وجود دارد - شناسه ویژه ">". این شناسه ویژه پیام‌ها را فیلتر می‌کند و تنها پیام‌هایی را باقی می‌گذارد که قبلاً هرگز تحویل داده نشده‌اند.

همچنین در موارد خاص می توانید یک شناسه واقعی مانند 0 یا هر شناسه معتبر دیگری را تعیین کنید. در این مورد فرمان XREADGROUP تاریخچه ای از پیام هایی را با وضعیت "در انتظار" که به مشترک مشخص شده (آلیس) تحویل داده شده اند اما هنوز با استفاده از دستور تایید نشده اند را به شما برمی گرداند. XACK.

ما می‌توانیم این رفتار را با مشخص کردن شناسه 0 بدون این گزینه آزمایش کنیم COUNT. ما به سادگی یک پیام در انتظار را خواهیم دید، یعنی پیام apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

اما اگر تأیید کنیم که پیام با موفقیت پردازش شده است، دیگر نمایش داده نخواهد شد:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

حالا نوبت باب است که چیزی بخواند:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

باب، یکی از اعضای mygroup، بیش از دو پیام نخواست. این فرمان فقط پیام های تحویل نشده را به دلیل شناسه ویژه ">" گزارش می کند. همانطور که می بینید، پیام "سیب" نمایش داده نمی شود زیرا قبلاً به آلیس تحویل داده شده است، بنابراین باب "نارنجی" و "توت فرنگی" را دریافت می کند.

به این ترتیب، آلیس، باب و هر مشترک دیگری در گروه می توانند پیام های متفاوتی را از یک جریان بخوانند. آنها همچنین می توانند تاریخچه پیام های پردازش نشده خود را بخوانند یا پیام ها را به عنوان پردازش شده علامت گذاری کنند.

چند نکته را باید در نظر داشت:

  • به محض اینکه مشترک پیام را یک دستور بداند XREADGROUP، این پیام به حالت "در انتظار" می رود و به آن مشترک خاص اختصاص داده می شود. سایر مشترکین گروه نمی توانند این پیام را بخوانند.
  • مشترکین در اولین ذکر به طور خودکار ایجاد می شوند، نیازی به ایجاد صریح آنها نیست.
  • با XREADGROUP شما می توانید پیام ها را از چندین رشته مختلف به طور همزمان بخوانید، اما برای این کار باید ابتدا گروه هایی با همان نام برای هر رشته ایجاد کنید. XGROUP

بازیابی پس از شکست

مشترک می تواند از شکست بهبود یابد و لیست پیام های خود را با وضعیت "در انتظار" دوباره بخواند. با این حال، در دنیای واقعی، مشترکین ممکن است در نهایت شکست بخورند. اگر مشترک نتواند پس از خرابی بهبود یابد، چه اتفاقی برای پیام های گیر کرده مشترک می افتد؟
Consumer Group یک ویژگی را ارائه می دهد که برای چنین مواردی استفاده می شود - زمانی که باید صاحب پیام ها را تغییر دهید.

اولین کاری که باید انجام دهید فراخوانی فرمان است در حال انتظار، که تمام پیام های گروه را با وضعیت "در انتظار" نمایش می دهد. در ساده‌ترین شکل، فرمان تنها با دو آرگومان فراخوانی می‌شود: نام رشته و نام گروه:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

تیم تعداد پیام های پردازش نشده را برای کل گروه و برای هر مشترک نمایش داد. ما فقط باب را با دو پیام برجسته داریم زیرا تنها پیامی که آلیس درخواست کرده بود با آن تأیید شد XACK.

ما می توانیم با استفاده از آرگومان های بیشتر اطلاعات بیشتری درخواست کنیم:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - محدوده شناسه‌ها (می‌توانید از «-» و «+» استفاده کنید)
{count} - تعداد تلاش‌های تحویل
{consumer-name} - نام گروه

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

اکنون ما جزئیات هر پیام را داریم: شناسه، نام مشترک، زمان بیکاری بر حسب میلی ثانیه و در نهایت تعداد تلاش های ارسال. ما دو پیام از باب داریم و آنها به مدت 74170458 میلی ثانیه، حدود 20 ساعت، بیکار بوده اند.

لطفاً توجه داشته باشید که هیچ کس ما را از بررسی محتوای پیام صرفاً با استفاده از آن باز نمی دارد XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

فقط باید همان شناسه را دو بار در آرگومان ها تکرار کنیم. اکنون که ایده‌ای داریم، آلیس ممکن است تصمیم بگیرد که پس از 20 ساعت توقف، باب احتمالاً بهبود نمی‌یابد، و زمان آن رسیده است که آن پیام‌ها را پرس و جو کرده و پردازش آنها را برای باب از سر بگیرد. برای این کار از دستور استفاده می کنیم XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

با استفاده از این دستور، با تغییر مالک به {consumer}، می‌توانیم یک پیام "خارجی" دریافت کنیم که هنوز پردازش نشده است. با این حال، می‌توانیم حداقل زمان بیکاری {min-idle-time} را نیز ارائه کنیم. این به جلوگیری از موقعیتی کمک می کند که در آن دو مشتری سعی می کنند به طور همزمان صاحب پیام های مشابه را تغییر دهند:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

اولین مشتری زمان توقف را بازنشانی می کند و شمارنده تحویل را افزایش می دهد. بنابراین مشتری دوم نمی تواند آن را درخواست کند.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

این پیام با موفقیت توسط آلیس ادعا شد، که اکنون می تواند پیام را پردازش کرده و آن را تأیید کند.

از مثال بالا می بینید که یک درخواست موفق محتویات خود پیام را برمی گرداند. با این حال، این ضروری نیست. گزینه JUSTID فقط برای بازگرداندن شناسه پیام ها قابل استفاده است. اگر علاقه ای به جزئیات پیام ندارید و می خواهید عملکرد سیستم را افزایش دهید، این کار مفید است.

پیشخوان تحویل

شمارنده ای که در خروجی می بینید در حال انتظار تعداد تحویل هر پیام است. چنین شمارنده ای به دو صورت افزایش می یابد: زمانی که یک پیام با موفقیت از طریق درخواست شود XCLAIM یا زمانی که از تماس استفاده می شود XREADGROUP.

طبیعی است که برخی از پیام ها چندین بار ارسال شوند. نکته اصلی این است که همه پیام ها در نهایت پردازش می شوند. گاهی اوقات هنگام پردازش یک پیام مشکلاتی رخ می دهد زیرا خود پیام خراب است یا پردازش پیام باعث ایجاد خطا در کد کنترل کننده می شود. در این صورت، ممکن است معلوم شود که هیچ کس نمی تواند این پیام را پردازش کند. از آنجایی که ما یک شمارنده تلاش برای تحویل داریم، می‌توانیم از این شمارنده برای شناسایی چنین موقعیت‌هایی استفاده کنیم. بنابراین، هنگامی که تعداد تحویل به عدد بالایی که شما مشخص کرده‌اید رسید، احتمالاً عاقلانه‌تر خواهد بود که چنین پیامی را در یک تاپیک دیگر قرار دهید و یک اعلان برای مدیر سیستم ارسال کنید.

وضعیت موضوع

تیم XINFO برای درخواست اطلاعات مختلف در مورد یک موضوع و گروه های آن استفاده می شود. به عنوان مثال، یک دستور پایه به صورت زیر است:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

دستور بالا اطلاعات کلی در مورد جریان مشخص شده را نمایش می دهد. حالا یک مثال کمی پیچیده تر:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

دستور بالا اطلاعات کلی برای تمام گروه های رشته مشخص شده را نمایش می دهد

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

دستور بالا اطلاعاتی را برای همه مشترکین جریان و گروه مشخص شده نمایش می دهد.
اگر دستور دستور را فراموش کردید، فقط از خود فرمان کمک بخواهید:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

محدودیت اندازه جریان

بسیاری از برنامه ها نمی خواهند برای همیشه داده ها را در یک جریان جمع آوری کنند. معمولاً داشتن حداکثر تعداد پیام مجاز در هر رشته مفید است. در موارد دیگر، انتقال همه پیام‌ها از یک رشته به یک فروشگاه دائمی دیگر زمانی که به اندازه رشته مشخص شده رسید، مفید است. می توانید اندازه یک جریان را با استفاده از پارامتر MAXLEN در دستور محدود کنید XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

هنگام استفاده از MAXLEN، رکوردهای قدیمی زمانی که به طول مشخصی می رسند به طور خودکار حذف می شوند، بنابراین جریان اندازه ثابتی دارد. با این حال، هرس در این مورد به کارآمدترین روش در حافظه Redis رخ نمی دهد. شما می توانید وضعیت را به شرح زیر بهبود بخشید:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

آرگومان ~ در مثال بالا به این معنی است که ما لزوماً نیازی به محدود کردن طول جریان به یک مقدار خاص نداریم. در مثال ما، این می تواند هر عددی بزرگتر یا مساوی 1000 باشد (مثلاً 1000، 1010 یا 1030). ما فقط به صراحت مشخص کردیم که می خواهیم جریان ما حداقل 1000 رکورد ذخیره کند. این امر مدیریت حافظه را در داخل Redis بسیار کارآمدتر می کند.

همچنین یک تیم جداگانه وجود دارد XTRIM، که همین کار را انجام می دهد:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

ذخیره سازی و تکرار مداوم

Redis Stream به طور ناهمزمان به گره‌های برده تکثیر می‌شود و در فایل‌هایی مانند AOF (عکس فوری از همه داده‌ها) و RDB (ورود از همه عملیات نوشتن) ذخیره می‌شود. تکرار وضعیت گروه های مصرف کننده نیز پشتیبانی می شود. بنابراین، اگر پیامی در گره اصلی در وضعیت "در انتظار" باشد، در گره های برده این پیام همان وضعیت را خواهد داشت.

حذف عناصر منفرد از یک جریان

دستور خاصی برای حذف پیام ها وجود دارد XDEL. این فرمان نام رشته و به دنبال آن شناسه های پیامی که باید حذف شوند را دریافت می کند:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

هنگام استفاده از این دستور، باید در نظر داشته باشید که حافظه واقعی بلافاصله آزاد نمی شود.

جریان های با طول صفر

تفاوت بین استریم ها و سایر ساختارهای داده Redis در این است که وقتی دیگر ساختارهای داده دیگر عناصری در خود ندارند، به عنوان یک اثر جانبی، ساختار داده خود از حافظه حذف می شود. بنابراین، برای مثال، هنگامی که فراخوانی ZREM آخرین عنصر را حذف می کند، مجموعه مرتب شده به طور کامل حذف می شود. در عوض، رشته‌ها اجازه دارند حتی بدون داشتن هیچ عنصری در حافظه باقی بمانند.

نتیجه

Redis Stream برای ایجاد دلال‌های پیام، صف‌های پیام، ورود به سیستم یکپارچه و سیستم‌های چت تاریخ‌نگاری ایده‌آل است.

همانطور که یک بار گفتم نیکلاوس ویرث، برنامه ها الگوریتم هایی به اضافه ساختار داده هستند و Redis در حال حاضر هر دو را به شما می دهد.

منبع: www.habr.com

اضافه کردن نظر