Redis Stream - الموثوقية وقابلية التوسع لأنظمة المراسلة الخاصة بك

Redis Stream - الموثوقية وقابلية التوسع لأنظمة المراسلة الخاصة بك

Redis Stream هو نوع بيانات تجريدي جديد تم تقديمه في Redis بالإصدار 5.0
من الناحية النظرية، Redis Stream عبارة عن قائمة يمكنك إضافة إدخالات إليها. كل إدخال له معرف فريد. افتراضيًا، يتم إنشاء المعرف تلقائيًا ويتضمن طابعًا زمنيًا. لذلك، يمكنك الاستعلام عن نطاقات السجلات بمرور الوقت، أو تلقي بيانات جديدة عند وصولها إلى الدفق، تمامًا مثلما يقرأ أمر Unix "tail -f" ملف سجل ويتجمد أثناء انتظار البيانات الجديدة. لاحظ أنه يمكن لعدة عملاء الاستماع إلى سلسلة رسائل في نفس الوقت، تمامًا كما يمكن للعديد من عمليات "tail -f" قراءة ملف في وقت واحد دون التعارض مع بعضها البعض.

لفهم جميع فوائد نوع البيانات الجديد، دعونا نلقي نظرة سريعة على هياكل Redis الموجودة منذ فترة طويلة والتي تحاكي وظائف Redis Stream جزئيًا.

ريديس حانة/SUB

Redis Pub/Sub هو نظام مراسلة بسيط مدمج بالفعل في متجر القيمة الأساسية الخاص بك. ومع ذلك، فإن البساطة تأتي بتكلفة:

  • إذا فشل الناشر لسبب ما، فإنه يفقد جميع المشتركين فيه
  • يحتاج الناشر إلى معرفة العنوان الدقيق لجميع المشتركين فيه
  • قد يقوم الناشر بإثقال كاهل مشتركيه بالعمل إذا تم نشر البيانات بشكل أسرع من معالجتها
  • يتم حذف الرسالة من المخزن المؤقت للناشر فور نشرها، بغض النظر عن عدد المشتركين الذين تم تسليمها إليهم ومدى سرعة معالجة هذه الرسالة.
  • سيتلقى جميع المشتركين الرسالة في نفس الوقت. يجب على المشتركين أنفسهم أن يتفقوا بطريقة أو بأخرى فيما بينهم على ترتيب معالجة نفس الرسالة.
  • لا توجد آلية مدمجة للتأكد من أن المشترك قد نجح في معالجة الرسالة. إذا تلقى المشترك رسالة وتعطل أثناء المعالجة، فلن يعلم الناشر بذلك.

قائمة ريديس

قائمة Redis هي بنية بيانات تدعم حظر أوامر القراءة. يمكنك إضافة وقراءة الرسائل من بداية القائمة أو نهايتها. بناءً على هذا الهيكل، يمكنك إنشاء مكدس أو قائمة انتظار جيدة لنظامك الموزع، وسيكون هذا كافيًا في معظم الحالات. الاختلافات الرئيسية عن Redis Pub/Sub:

  • يتم تسليم الرسالة إلى عميل واحد. سيتلقى العميل الأول المحظور للقراءة البيانات أولاً.
  • يجب أن يبدأ كلينت عملية القراءة لكل رسالة بنفسه. القائمة لا تعرف شيئًا عن العملاء.
  • يتم تخزين الرسائل حتى يقرأها شخص ما أو يحذفها بشكل صريح. إذا قمت بتكوين خادم Redis لتدفق البيانات إلى القرص، فستزيد موثوقية النظام بشكل كبير.

مقدمة إلى تيار

إضافة إدخال إلى دفق

فريق XADD يضيف إدخالاً جديدًا إلى الدفق. السجل ليس مجرد سلسلة، فهو يتكون من زوج واحد أو أكثر من أزواج المفاتيح والقيمة. وبالتالي، فإن كل إدخال منظم بالفعل ويشبه بنية ملف CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

في المثال أعلاه، نضيف حقلين إلى الدفق بالاسم (المفتاح) "mystream": "sensor-id" و"درجة الحرارة" بالقيمتين "1234" و"19.8" على التوالي. كالوسيطة الثانية، يأخذ الأمر معرفًا سيتم تعيينه للإدخال - يحدد هذا المعرف بشكل فريد كل إدخال في الدفق. ومع ذلك، في هذه الحالة مررنا * لأننا نريد أن يقوم Redis بإنشاء معرف جديد لنا. سوف يزيد كل معرف جديد. لذلك، سيكون لكل إدخال جديد معرف أعلى مقارنة بالإدخالات السابقة.

تنسيق المعرف

معرف الإدخال الذي تم إرجاعه بواسطة الأمر XADD، يتكون من جزأين:

{millisecondsTime}-{sequenceNumber}

ميلي ثانية الوقت - وقت Unix بالمللي ثانية (وقت خادم Redis). ومع ذلك، إذا كان الوقت الحالي هو نفس وقت التسجيل السابق أو أقل منه، فسيتم استخدام الطابع الزمني للتسجيل السابق. لذلك، إذا عاد وقت الخادم بالزمن إلى الوراء، فسيظل المعرف الجديد يحتفظ بخاصية الزيادة.

رقم التسلسل يستخدم للسجلات التي تم إنشاؤها في نفس المللي ثانية. رقم التسلسل سيتم زيادته بمقدار 1 بالنسبة للإدخال السابق. بسبب ال رقم التسلسل يبلغ حجمه 64 بت، فمن الناحية العملية يجب ألا تواجه حدًا لعدد السجلات التي يمكن إنشاؤها خلال مللي ثانية واحدة.

قد يبدو تنسيق هذه المعرفات غريبًا للوهلة الأولى. قد يتساءل القارئ الذي لا يثق به عن سبب كون الوقت جزءًا من المعرف. والسبب هو أن تدفقات Redis تدعم استعلامات النطاق حسب المعرف. وبما أن المعرف مرتبط بالوقت الذي تم فيه إنشاء السجل، فإن هذا يجعل من الممكن الاستعلام عن النطاقات الزمنية. سننظر إلى مثال محدد عندما ننظر إلى الأمر نطاق واسع.

إذا احتاج المستخدم لسبب ما إلى تحديد المعرف الخاص به، والذي، على سبيل المثال، يرتبط ببعض الأنظمة الخارجية، فيمكننا تمريره إلى الأمر XADD بدلاً من * كما هو موضح أدناه:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

يرجى ملاحظة أنه في هذه الحالة يجب عليك مراقبة زيادة المعرف بنفسك. في مثالنا، الحد الأدنى للمعرف هو "0-1"، لذلك لن يقبل الأمر معرفًا آخر يساوي أو أقل من "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

عدد السجلات لكل دفق

من الممكن الحصول على عدد السجلات في الدفق ببساطة باستخدام الأمر XLEN. على سبيل المثال، سيعيد هذا الأمر القيمة التالية:

> XLEN somestream
(integer) 2

استعلامات النطاق - XRANGE وXREVRANGE

لطلب البيانات حسب النطاق، نحتاج إلى تحديد معرفين - بداية النطاق ونهايته. سيتضمن النطاق الذي تم إرجاعه جميع العناصر، بما في ذلك الحدود. هناك أيضًا معرفان خاصان "-" و"+"، ويعنيان على التوالي أصغر معرف (السجل الأول) وأكبر معرف (السجل الأخير) في الدفق. المثال أدناه سوف يسرد كافة إدخالات الدفق.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

كل سجل تم إرجاعه عبارة عن مصفوفة من عنصرين: معرف وقائمة بأزواج قيمة المفتاح. لقد قلنا بالفعل أن معرفات السجل مرتبطة بالوقت. ولذلك، يمكننا أن نطلب مجموعة من فترة زمنية محددة. ومع ذلك، يمكننا أن نحدد في الطلب ليس المعرف الكامل، ولكن فقط وقت يونكس، مع حذف الجزء المتعلق بـ رقم التسلسل. سيتم تعيين الجزء المحذوف من المعرف تلقائيًا على الصفر في بداية النطاق وإلى أقصى قيمة ممكنة في نهاية النطاق. فيما يلي مثال لكيفية طلب نطاق من ميلي ثانية.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

لدينا إدخال واحد فقط في هذا النطاق، ولكن في مجموعات البيانات الحقيقية يمكن أن تكون النتيجة التي يتم إرجاعها ضخمة. لهذا السبب نطاق واسع يدعم خيار الكونت. من خلال تحديد الكمية، يمكننا ببساطة الحصول على سجلات N الأولى. إذا أردنا الحصول على سجلات N التالية (ترقيم الصفحات)، فيمكننا استخدام آخر معرف تم استلامه، وزيادته رقم التسلسل من واحد واسأل مرة أخرى. دعونا ننظر إلى هذا في المثال التالي. نبدأ بإضافة 10 عناصر مع XADD (بافتراض أن mystream كان مليئًا بالفعل بـ 10 عناصر). لبدء التكرار بالحصول على عنصرين لكل أمر، نبدأ بالنطاق الكامل ولكن بـ COUNT يساوي 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

لمواصلة التكرار مع العنصرين التاليين، نحتاج إلى تحديد آخر معرف تم استلامه، أي 1519073279157-0، وإضافة 1 إلى رقم التسلسل.
يمكن الآن استخدام المعرف الناتج، في هذه الحالة 1519073279157-1، كبداية جديدة لوسيطة النطاق للمكالمة التالية نطاق واسع:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

وما إلى ذلك وهلم جرا. لأن التعقيد نطاق واسع هو O(log(N)) للبحث ثم O(M) لإرجاع عناصر M، ثم تكون كل خطوة تكرار سريعة. وهكذا باستخدام نطاق واسع يمكن تكرار التدفقات بكفاءة.

فريق XREVRANGE هو ما يعادل نطاق واسع، لكنه يُرجع العناصر بترتيب عكسي:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

يرجى ملاحظة أن الأمر XREVRANGE يأخذ وسيطات النطاق تبدأ وتتوقف بترتيب عكسي.

قراءة الإدخالات الجديدة باستخدام XREAD

غالبًا ما تنشأ مهمة الاشتراك في البث وتلقي الرسائل الجديدة فقط. قد يبدو هذا المفهوم مشابهًا لـ Redis Pub/Sub أو حظر قائمة Redis، ولكن هناك اختلافات جوهرية في كيفية استخدام Redis Stream:

  1. يتم تسليم كل رسالة جديدة إلى كل مشترك بشكل افتراضي. يختلف هذا السلوك عن قائمة Redis المحظورة، حيث سيتم قراءة الرسالة الجديدة بواسطة مشترك واحد فقط.
  2. بينما في Redis Pub/Sub يتم نسيان جميع الرسائل وعدم استمرارها أبدًا، في Stream يتم الاحتفاظ بجميع الرسائل إلى أجل غير مسمى (ما لم يتسبب العميل صراحةً في الحذف).
  3. يسمح لك Redis Stream بتمييز الوصول إلى الرسائل ضمن تدفق واحد. يمكن لمشترك معين رؤية سجل رسائله الشخصية فقط.

يمكنك الاشتراك في موضوع وتلقي رسائل جديدة باستخدام الأمر قراءة. انها أكثر تعقيدا قليلا من نطاق واسع، لذلك سنبدأ بالأمثلة الأبسط أولاً.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

يوضح المثال أعلاه نموذجًا غير محظور قراءة. لاحظ أن خيار COUNT اختياري. في الواقع، خيار الأمر الوحيد المطلوب هو خيار STREAMS، الذي يحدد قائمة التدفقات مع الحد الأقصى للمعرف المقابل. لقد كتبنا "STREAMS mystream 0" - نريد تلقي جميع سجلات دفق mystream بمعرف أكبر من "0-0". كما ترون من المثال، يقوم الأمر بإرجاع اسم الموضوع لأنه يمكننا الاشتراك في عدة سلاسل رسائل في نفس الوقت. يمكننا أن نكتب، على سبيل المثال، "STREAMS mystreamotherstream 0 0". يرجى ملاحظة أنه بعد خيار التدفقات، نحتاج أولاً إلى تقديم أسماء جميع التدفقات المطلوبة وبعد ذلك فقط قائمة المعرفات.

في هذا النموذج البسيط، لا يقوم الأمر بأي شيء خاص مقارنة بـ نطاق واسع. ومع ذلك، الشيء المثير للاهتمام هو أنه يمكننا أن نتحول بسهولة قراءة إلى أمر الحظر، مع تحديد وسيطة BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

في المثال أعلاه، تم تحديد خيار حظر جديد بمهلة قدرها 0 مللي ثانية (وهذا يعني الانتظار إلى أجل غير مسمى). علاوة على ذلك، بدلاً من تمرير المعرف المعتاد للتيار mystream، تم تمرير معرف خاص $. هذا المعرف الخاص يعني ذلك قراءة يجب استخدام الحد الأقصى للمعرف في mystream كمعرف. لذلك لن نتلقى سوى الرسائل الجديدة بدءًا من اللحظة التي بدأنا فيها الاستماع. يشبه هذا في بعض النواحي أمر Unix "tail -f".

لاحظ أنه عند استخدام خيار BLOCK لا نحتاج بالضرورة إلى استخدام المعرف الخاص $. يمكننا استخدام أي معرف موجود في الدفق. إذا تمكن الفريق من تلبية طلبنا على الفور دون حظر، فسوف يفعل ذلك، وإلا فسيتم حظره.

الحظر قراءة يمكنك أيضًا الاستماع إلى عدة سلاسل رسائل في وقت واحد، ما عليك سوى تحديد أسمائها. في هذه الحالة، سيقوم الأمر بإرجاع سجل الدفق الأول الذي استقبل البيانات. المشترك الأول الذي تم حظره لموضوع معين سيتلقى البيانات أولاً.

جماعات المستهلكين

في بعض المهام، نريد تقييد وصول المشتركين إلى الرسائل ضمن سلسلة رسائل واحدة. أحد الأمثلة التي يمكن أن يكون فيها ذلك مفيدًا هو قائمة انتظار الرسائل التي تحتوي على عاملين سيتلقون رسائل مختلفة من سلسلة رسائل، مما يسمح بتوسيع نطاق معالجة الرسائل.

إذا تخيلنا أن لدينا ثلاثة مشتركين C1، C2، C3 وخيط يحتوي على الرسائل 1، 2، 3، 4، 5، 6، 7، فسيتم تقديم الرسائل كما في الرسم البياني أدناه:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

لتحقيق هذا التأثير، يستخدم Redis Stream مفهومًا يسمى مجموعة المستهلكين. يشبه هذا المفهوم المشترك الزائف، الذي يتلقى البيانات من التدفق، ولكن يتم خدمته فعليًا بواسطة مشتركين متعددين داخل المجموعة، مما يوفر ضمانات معينة:

  1. يتم تسليم كل رسالة إلى مشترك مختلف داخل المجموعة.
  2. داخل المجموعة، يتم تعريف المشتركين بأسمائهم، وهي سلسلة حساسة لحالة الأحرف. إذا انسحب أحد المشتركين مؤقتًا من المجموعة، فيمكن استعادته إلى المجموعة باستخدام اسمه الفريد.
  3. تتبع كل مجموعة مستهلكين مفهوم "الرسالة غير المقروءة الأولى". عندما يطلب أحد المشتركين رسائل جديدة، يمكنه فقط استقبال الرسائل التي لم يتم تسليمها مسبقًا لأي مشترك داخل المجموعة.
  4. يوجد أمر للتأكيد بشكل صريح على أن الرسالة قد تمت معالجتها بنجاح بواسطة المشترك. وإلى أن يتم استدعاء هذا الأمر، ستظل الرسالة المطلوبة في حالة "معلقة".
  5. ضمن مجموعة المستهلكين، يمكن لكل مشترك طلب سجل الرسائل التي تم تسليمها له، ولكن لم تتم معالجتها بعد (في الحالة "معلقة")

بمعنى ما، يمكن التعبير عن حالة المجموعة على النحو التالي:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

حان الوقت الآن للتعرف على الأوامر الرئيسية لمجموعة المستهلكين، وهي:

  • XGROUP تستخدم لإنشاء وتدمير وإدارة المجموعات
  • XREADGROUP تستخدم لقراءة الدفق من خلال المجموعة
  • XACK - يسمح هذا الأمر للمشترك بوضع علامة على الرسالة على أنها تمت معالجتها بنجاح

إنشاء مجموعة المستهلكين

لنفترض أن mystream موجود بالفعل. بعد ذلك سيبدو أمر إنشاء المجموعة كما يلي:

> XGROUP CREATE mystream mygroup $
OK

عند إنشاء مجموعة، يجب علينا تمرير معرف، بدءًا من المجموعة التي ستتلقى الرسائل. إذا أردنا فقط تلقي جميع الرسائل الجديدة، فيمكننا استخدام المعرف الخاص $ (كما في مثالنا أعلاه). إذا قمت بتحديد 0 بدلاً من معرف خاص، فستكون جميع الرسائل الموجودة في سلسلة الرسائل متاحة للمجموعة.

الآن وبعد إنشاء المجموعة، يمكننا البدء فورًا في قراءة الرسائل باستخدام الأمر XREADGROUP. هذا الأمر مشابه جدًا لـ قراءة ويدعم خيار BLOCK الاختياري. ومع ذلك، هناك خيار GROUP مطلوب والذي يجب تحديده دائمًا باستخدام وسيطتين: اسم المجموعة واسم المشترك. خيار COUNT مدعوم أيضًا.

قبل قراءة الموضوع دعونا نضع بعض الرسائل هناك:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

الآن دعونا نحاول قراءة هذا الدفق من خلال المجموعة:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

يقرأ الأمر أعلاه حرفيًا كما يلي:

"أنا، المشترك أليس، عضو في مجموعة mygroup، أريد قراءة رسالة واحدة من mystream لم يتم تسليمها إلى أي شخص من قبل."

في كل مرة يقوم فيها المشترك بإجراء عملية على مجموعة، يجب عليه تقديم اسمه، مما يؤدي إلى تعريف نفسه بشكل فريد داخل المجموعة. هناك تفصيل آخر مهم جدًا في الأمر أعلاه - المعرف الخاص ">". يقوم هذا المعرف الخاص بتصفية الرسائل، مع ترك فقط تلك التي لم يتم تسليمها من قبل.

أيضًا، في حالات خاصة، يمكنك تحديد معرف حقيقي مثل 0 أو أي معرف صالح آخر. في هذه الحالة الأمر XREADGROUP سيعيد لك سجل الرسائل ذات الحالة "معلقة" والتي تم تسليمها إلى المشترك المحدد (أليس) ولكن لم يتم الإقرار بها بعد باستخدام الأمر XACK.

يمكننا اختبار هذا السلوك عن طريق تحديد المعرف 0 على الفور، دون هذا الخيار بالإحصاء. سنرى ببساطة رسالة واحدة معلقة، وهي رسالة التفاحة:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

ومع ذلك، إذا أكدنا أن الرسالة تمت معالجتها بنجاح، فلن يتم عرضها بعد الآن:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

الآن حان دور بوب لقراءة شيء ما:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

بوب، عضو في مجموعتي، طلب ما لا يزيد عن رسالتين. يقوم الأمر فقط بالإبلاغ عن الرسائل التي لم يتم تسليمها بسبب المعرف الخاص ">". كما ترون، لن يتم عرض الرسالة "تفاحة" حيث تم تسليمها بالفعل إلى أليس، لذلك يتلقى بوب "برتقالي" و"فراولة".

بهذه الطريقة، يمكن لأليس وبوب وأي مشترك آخر في المجموعة قراءة رسائل مختلفة من نفس الدفق. ويمكنهم أيضًا قراءة سجل الرسائل التي لم تتم معالجتها أو وضع علامة على الرسائل على أنها تمت معالجتها.

هناك بعض الأشياء التي يجب وضعها في الاعتبار:

  • بمجرد أن يعتبر المشترك أن الرسالة أمر XREADGROUP، تنتقل هذه الرسالة إلى الحالة "معلقة" ويتم تخصيصها لهذا المشترك المحدد. لن يتمكن المشتركون الآخرون في المجموعة من قراءة هذه الرسالة.
  • يتم إنشاء المشتركين تلقائيًا عند الإشارة لأول مرة، وليست هناك حاجة إلى إنشائهم بشكل صريح.
  • استخدام XREADGROUP يمكنك قراءة الرسائل من عدة سلاسل رسائل مختلفة في نفس الوقت، ولكن لكي ينجح هذا، عليك أولاً إنشاء مجموعات بنفس الاسم لكل سلسلة رسائل باستخدام XGROUP

استرداد الفشل

يمكن للمشترك التعافي من الفشل وإعادة قراءة قائمة رسائله بالحالة "معلقة". ومع ذلك، في العالم الحقيقي، قد يفشل المشتركون في نهاية المطاف. ماذا يحدث لرسائل المشترك العالقة إذا كان المشترك غير قادر على التعافي من الفشل؟
تقدم مجموعة المستهلكين ميزة يتم استخدامها لمثل هذه الحالات فقط - عندما تحتاج إلى تغيير مالك الرسائل.

أول شيء عليك القيام به هو استدعاء الأمر XPENDING، والذي يعرض كافة الرسائل الموجودة في المجموعة بالحالة "معلقة". في أبسط أشكاله، يتم استدعاء الأمر باستخدام وسيطتين فقط: اسم الموضوع واسم المجموعة:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

قام الفريق بعرض عدد الرسائل غير المعالجة للمجموعة بأكملها ولكل مشترك. لدينا فقط بوب لديه رسالتان معلقتان لأنه تم تأكيد الرسالة الوحيدة التي طلبتها أليس XACK.

يمكننا طلب المزيد من المعلومات باستخدام المزيد من الوسائط:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - نطاق المعرفات (يمكنك استخدام "-" و"+")
{count} — عدد محاولات التسليم
{اسم المستهلك} - اسم المجموعة

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

الآن لدينا تفاصيل لكل رسالة: المعرف، واسم المشترك، ووقت الخمول بالمللي ثانية، وأخيرًا عدد محاولات التسليم. لدينا رسالتان من بوب وكانتا في وضع الخمول لمدة 74170458 مللي ثانية، أي حوالي 20 ساعة.

يرجى ملاحظة أنه لا أحد يمنعنا من التحقق من محتوى الرسالة ببساطة عن طريق الاستخدام نطاق واسع.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

علينا فقط تكرار نفس المعرف مرتين في الوسائط. والآن بعد أن أصبح لدينا بعض الأفكار، قد تقرر أليس أنه بعد 20 ساعة من التوقف، ربما لن يتعافى بوب، وقد حان الوقت للاستعلام عن تلك الرسائل واستئناف معالجتها لبوب. لهذا نستخدم الأمر XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

باستخدام هذا الأمر، يمكننا استلام رسالة "أجنبية" لم تتم معالجتها بعد عن طريق تغيير المالك إلى {consumer}. ومع ذلك، يمكننا أيضًا توفير الحد الأدنى لوقت الخمول {min-idle-time}. يساعد هذا في تجنب الموقف الذي يحاول فيه عميلان تغيير مالك نفس الرسائل في وقت واحد:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

سيقوم العميل الأول بإعادة ضبط وقت التوقف عن العمل وزيادة عداد التسليم. لذلك لن يتمكن العميل الثاني من طلب ذلك.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

تمت المطالبة بالرسالة بنجاح بواسطة Alice، التي يمكنها الآن معالجة الرسالة والإقرار بها.

من المثال أعلاه، يمكنك أن ترى أن الطلب الناجح يعيد محتويات الرسالة نفسها. ومع ذلك، هذا ليس ضروريا. يمكن استخدام خيار JUSTID لإرجاع معرفات الرسائل فقط. يعد هذا مفيدًا إذا لم تكن مهتمًا بتفاصيل الرسالة وترغب في زيادة أداء النظام.

عداد التسليم

العداد الذي تراه في الإخراج XPENDING هو عدد مرات تسليم كل رسالة. تتم زيادة هذا العداد بطريقتين: عندما يتم طلب الرسالة بنجاح عبر XCLAIM أو عند استخدام المكالمة XREADGROUP.

من الطبيعي أن يتم تسليم بعض الرسائل عدة مرات. الشيء الرئيسي هو أن جميع الرسائل تتم معالجتها في النهاية. في بعض الأحيان تحدث مشكلات عند معالجة رسالة لأن الرسالة نفسها تالفة، أو أن معالجة الرسالة تؤدي إلى حدوث خطأ في رمز المعالج. في هذه الحالة، قد يتبين أنه لن يتمكن أحد من معالجة هذه الرسالة. وبما أن لدينا عدادًا لمحاولات التسليم، فيمكننا استخدام هذا العداد لاكتشاف مثل هذه المواقف. لذلك، بمجرد أن يصل عدد التسليم إلى العدد الكبير الذي تحدده، فمن الأفضل وضع مثل هذه الرسالة على مؤشر ترابط آخر وإرسال إشعار إلى مسؤول النظام.

حالة الموضوع

فريق XINFO يستخدم لطلب معلومات متنوعة حول الموضوع ومجموعاته. على سبيل المثال، يبدو الأمر الأساسي كما يلي:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

يعرض الأمر أعلاه معلومات عامة حول الدفق المحدد. الآن مثال أكثر تعقيدًا قليلاً:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

يعرض الأمر أعلاه معلومات عامة لجميع مجموعات الموضوع المحدد

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

يعرض الأمر أعلاه معلومات لجميع المشتركين في الدفق والمجموعة المحددين.
إذا نسيت بناء جملة الأمر، فما عليك سوى طلب المساعدة من الأمر نفسه:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

حد حجم الدفق

لا ترغب العديد من التطبيقات في جمع البيانات في دفق إلى الأبد. غالبًا ما يكون من المفيد تحديد الحد الأقصى لعدد الرسائل المسموح بها لكل سلسلة رسائل. وفي حالات أخرى، يكون من المفيد نقل كافة الرسائل من سلسلة رسائل إلى مخزن دائم آخر عند الوصول إلى حجم سلسلة المحادثات المحدد. يمكنك تحديد حجم الدفق باستخدام المعلمة MAXLEN في الأمر XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

عند استخدام MAXLEN، يتم حذف السجلات القديمة تلقائيًا عندما تصل إلى طول محدد، وبالتالي يكون للتدفق حجم ثابت. ومع ذلك، لا يتم التقليم في هذه الحالة بالطريقة الأكثر فعالية في ذاكرة Redis. يمكنك تحسين الوضع على النحو التالي:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

الوسيطة ~ في المثال أعلاه تعني أننا لا نحتاج بالضرورة إلى تحديد طول الدفق بقيمة معينة. في مثالنا، يمكن أن يكون هذا أي رقم أكبر من أو يساوي 1000 (على سبيل المثال، 1000، 1010، أو 1030). لقد حددنا بوضوح أننا نريد أن يقوم التدفق الخاص بنا بتخزين 1000 سجل على الأقل. وهذا يجعل إدارة الذاكرة أكثر كفاءة داخل Redis.

هناك أيضًا فريق منفصل اكستريم، والذي يفعل نفس الشيء:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

التخزين المستمر والنسخ المتماثل

يتم نسخ Redis Stream بشكل غير متزامن إلى العقد التابعة وحفظه في ملفات مثل AOF (لقطة لجميع البيانات) وRDB (سجل جميع عمليات الكتابة). يتم أيضًا دعم النسخ المتماثل لحالة مجموعات المستهلكين. لذلك، إذا كانت الرسالة في حالة "معلقة" على العقدة الرئيسية، فستكون لهذه الرسالة نفس الحالة على العقد التابعة.

إزالة العناصر الفردية من الدفق

هناك أمر خاص لحذف الرسائل اكسديل. يحصل الأمر على اسم سلسلة الرسائل متبوعة بمعرفات الرسالة المراد حذفها:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

عند استخدام هذا الأمر، عليك أن تأخذ في الاعتبار أن الذاكرة الفعلية لن يتم تحريرها على الفور.

تيارات بطول صفر

الفرق بين التدفقات وهياكل بيانات Redis الأخرى هو أنه عندما لا تحتوي هياكل البيانات الأخرى على عناصر بداخلها، كأثر جانبي، ستتم إزالة بنية البيانات نفسها من الذاكرة. لذلك، على سبيل المثال، ستتم إزالة المجموعة التي تم فرزها بالكامل عندما يقوم استدعاء ZREM بإزالة العنصر الأخير. بدلاً من ذلك، يُسمح للخيوط بالبقاء في الذاكرة حتى بدون وجود أي عناصر بداخلها.

اختتام

يعد Redis Stream مثاليًا لإنشاء وسطاء الرسائل وقوائم انتظار الرسائل والتسجيل الموحد وأنظمة الدردشة لحفظ التاريخ.

كما قلت ذات مرة نيكلاوس ويرثالبرامج عبارة عن خوارزميات بالإضافة إلى هياكل البيانات، ويمنحك Redis كليهما بالفعل.

المصدر: www.habr.com

إضافة تعليق