ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

يا هبر!

نذكركم بمتابعة الكتاب عن كافكا لقد قمنا بنشر عمل مثير للاهتمام بنفس القدر حول المكتبة كافكا تيارات API.

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

في الوقت الحالي، يتعلم المجتمع حدود هذه الأداة القوية. لذلك، تم نشر مقال مؤخرًا، ونود أن نقدم لكم ترجمته. من تجربته الخاصة، يخبرنا المؤلف كيفية تحويل Kafka Streams إلى مخزن بيانات موزع. استمتع بالقراءة!

مكتبة أباتشي كافكا تيارات تستخدم في جميع أنحاء العالم في المؤسسات لمعالجة التدفق الموزع أعلى Apache Kafka. أحد الجوانب التي لا تحظى بالتقدير في إطار العمل هذا هو أنه يسمح لك بتخزين الحالة المحلية المنتجة بناءً على معالجة الخيط.

سأخبرك في هذه المقالة كيف تمكنت شركتنا من استغلال هذه الفرصة بشكل مربح عند تطوير منتج لأمن التطبيقات السحابية. باستخدام Kafka Streams، أنشأنا خدمات دقيقة مشتركة للحالة، كل منها بمثابة مصدر متسامح مع الأخطاء ومتوفر للغاية للمعلومات الموثوقة حول حالة الكائنات في النظام. بالنسبة لنا، تعد هذه خطوة للأمام من حيث الموثوقية وسهولة الدعم.

إذا كنت مهتمًا بنهج بديل يسمح لك باستخدام قاعدة بيانات مركزية واحدة لدعم الحالة الرسمية لكائناتك، فاقرأه، وسيكون مثيرًا للاهتمام...

لماذا اعتقدنا أن الوقت قد حان لتغيير طريقة عملنا مع الحالة المشتركة

كنا بحاجة إلى الحفاظ على حالة العناصر المختلفة بناءً على تقارير الوكيل (على سبيل المثال: هل كان الموقع يتعرض للهجوم)؟ قبل الترحيل إلى Kafka Streams، اعتمدنا غالبًا على قاعدة بيانات مركزية واحدة (+ واجهة برمجة تطبيقات الخدمة) لإدارة الحالة. هذا النهج له عيوبه: حالات مكثفة للتاريخ يصبح الحفاظ على الاتساق والتزامن تحديًا حقيقيًا. قد تصبح قاعدة البيانات عنق الزجاجة أو ينتهي بها الأمر حالة السباق وتعاني من عدم القدرة على التنبؤ.

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 1: سيناريو الدولة المنقسمة النموذجي الذي شوهد قبل الانتقال إلى
Kafka وKafka Streams: يقوم الوكلاء بتوصيل وجهات نظرهم عبر واجهة برمجة التطبيقات (API)، ويتم حساب الحالة المحدثة من خلال قاعدة بيانات مركزية

تعرف على Kafka Streams، مما يجعل من السهل إنشاء خدمات دقيقة مشتركة للحالة

منذ حوالي عام، قررنا إلقاء نظرة فاحصة على سيناريوهات الحالة المشتركة لدينا لمعالجة هذه المشكلات. قررنا على الفور تجربة Kafka Streams - فنحن نعرف مدى قابليتها للتطوير وتوافرها بدرجة كبيرة وقدرتها على تحمل الأخطاء، ووظائف البث الغنية التي تتمتع بها (التحولات، بما في ذلك التحولات ذات الحالة). هذا ما نحتاجه بالضبط، ناهيك عن مدى نضج وموثوقية نظام المراسلة في كافكا.

تم بناء كل من الخدمات الصغيرة ذات الحالة التي أنشأناها على قمة مثيل Kafka Streams بهيكل بسيط إلى حد ما. يتكون من 1) مصدر 2) معالج مزود بمخزن قيمة مفتاح ثابت 3) حوض:

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 2: الهيكل الافتراضي لمثيلات البث الخاصة بنا للخدمات الصغيرة ذات الحالة. لاحظ أن هناك أيضًا مستودعًا هنا يحتوي على بيانات تعريف التخطيط.

في هذا النهج الجديد، يقوم الوكلاء بتأليف الرسائل التي يتم تغذيتها في الموضوع المصدر، ويتلقى المستهلكون - على سبيل المثال، خدمة إعلام البريد - الحالة المشتركة المحسوبة من خلال الحوض (موضوع الإخراج).

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 3: مثال جديد لتدفق المهام لسيناريو يتضمن خدمات صغيرة مشتركة: 1) يقوم الوكيل بإنشاء رسالة تصل إلى موضوع مصدر كافكا؛ 2) تقوم خدمة صغيرة ذات حالة مشتركة (باستخدام Kafka Streams) بمعالجتها وكتابة الحالة المحسوبة في موضوع Kafka النهائي؛ وبعد ذلك 3) يقبل المستهلكون الحالة الجديدة

مهلا، هذا المتجر ذو القيمة الرئيسية المدمج مفيد جدًا في الواقع!

كما ذكرنا سابقًا، تحتوي طوبولوجيا الحالة المشتركة لدينا على مخزن قيمة مفتاحية. لقد وجدنا عدة خيارات لاستخدامه، واثنان منها موضحان أدناه.

الخيار رقم 1: استخدم مخزن القيمة الرئيسية لإجراء العمليات الحسابية

احتوى مخزن القيمة الرئيسية الأول لدينا على البيانات المساعدة التي نحتاجها لإجراء العمليات الحسابية. على سبيل المثال، في بعض الحالات، تم تحديد الدولة المشتركة على أساس مبدأ "أغلبية الأصوات". يمكن أن يحتوي المستودع على أحدث تقارير الوكيل حول حالة بعض الكائنات. وبعد ذلك، عندما نتلقى تقريرًا جديدًا من وكيل أو آخر، يمكننا حفظه واسترجاع التقارير من جميع الوكلاء الآخرين حول حالة نفس الكائن من التخزين، وتكرار العملية الحسابية.
يوضح الشكل 4 أدناه كيف قمنا بتعريض مخزن المفتاح/القيمة لطريقة معالجة المعالج بحيث يمكن معالجة الرسالة الجديدة بعد ذلك.

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

المثال التوضيحي 4: نفتح الوصول إلى مخزن القيمة الرئيسية لطريقة معالجة المعالج (بعد ذلك، يجب على كل برنامج نصي يعمل مع الحالة المشتركة تنفيذ الطريقة doProcess)

الخيار رقم 2: إنشاء واجهة برمجة تطبيقات CRUD أعلى تدفقات كافكا

بعد أن أنشأنا تدفق المهام الأساسي لدينا، بدأنا في محاولة كتابة واجهة برمجة تطبيقات RESTful CRUD لخدماتنا المصغرة الخاصة بالحالة المشتركة. أردنا أن نكون قادرين على استرداد حالة بعض الكائنات أو جميعها، بالإضافة إلى تعيين حالة الكائن أو إزالتها (مفيد لدعم الواجهة الخلفية).

لدعم جميع واجهات برمجة تطبيقات Get State، كلما احتجنا إلى إعادة حساب الحالة أثناء المعالجة، قمنا بتخزينها في مخزن قيمة مفتاح مدمج لفترة طويلة. في هذه الحالة، يصبح من السهل جدًا تنفيذ واجهة برمجة التطبيقات هذه باستخدام مثيل واحد من Kafka Streams، كما هو موضح في القائمة أدناه:

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 5: استخدام مخزن القيمة الرئيسية المدمج للحصول على الحالة المحسوبة مسبقًا للكائن

من السهل أيضًا تنفيذ تحديث حالة الكائن عبر واجهة برمجة التطبيقات. في الأساس، كل ما عليك فعله هو إنشاء منتج كافكا واستخدامه لإنشاء سجل يحتوي على الحالة الجديدة. وهذا يضمن أن جميع الرسائل التي يتم إنشاؤها من خلال واجهة برمجة التطبيقات ستتم معالجتها بنفس الطريقة التي يتم بها معالجة تلك الرسائل المستلمة من المنتجين الآخرين (مثل الوكلاء).

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 6: يمكنك ضبط حالة الكائن باستخدام منتج كافكا

تعقيد صغير: يحتوي كافكا على العديد من الأقسام

بعد ذلك، أردنا توزيع حمل المعالجة وتحسين التوفر من خلال توفير مجموعة من الخدمات الصغيرة ذات الحالة المشتركة لكل سيناريو. كان الإعداد سهلاً للغاية: بمجرد تكوين جميع المثيلات للتشغيل تحت نفس معرف التطبيق (ونفس خوادم التمهيد)، تم تنفيذ كل شيء آخر تقريبًا تلقائيًا. لقد حددنا أيضًا أن كل موضوع مصدر سيتكون من عدة أقسام، بحيث يمكن تعيين مجموعة فرعية من هذه الأقسام لكل مثيل.

سأذكر أيضًا أنه من الشائع عمل نسخة احتياطية من مخزن الحالة، بحيث، على سبيل المثال، في حالة الاسترداد بعد الفشل، قم بنقل هذه النسخة إلى مثيل آخر. لكل مخزن حالة في Kafka Streams، يتم إنشاء موضوع منسوخ باستخدام سجل التغيير (الذي يتتبع التحديثات المحلية). وهكذا، فإن كافكا يدعم باستمرار متجر الدولة. لذلك، في حالة فشل مثيل Kafka Streams أو آخر، يمكن استعادة مخزن الحالة بسرعة في مثيل آخر، حيث ستنتقل الأقسام المقابلة. وقد أظهرت اختباراتنا أن هذا يتم في غضون ثوانٍ، حتى لو كان هناك ملايين السجلات في المتجر.

بالانتقال من خدمة صغيرة واحدة ذات حالة مشتركة إلى مجموعة من الخدمات الصغيرة، يصبح تنفيذ Get State API أقل تافهًا. في الوضع الجديد، يحتوي مخزن الحالة لكل خدمة صغيرة على جزء فقط من الصورة الإجمالية (تلك الكائنات التي تم تعيين مفاتيحها إلى قسم معين). كان علينا تحديد المثيل الذي يحتوي على حالة الكائن الذي نحتاجه، وقمنا بذلك بناءً على البيانات التعريفية للسلسلة، كما هو موضح أدناه:

ليس فقط المعالجة: كيف أنشأنا قاعدة بيانات موزعة من Kafka Streams، وما الذي نتج عنها

الشكل 7: باستخدام بيانات تعريف الدفق، نحدد من أي مثيل سيتم الاستعلام عن حالة الكائن المطلوب؛ تم استخدام نهج مماثل مع GET ALL API

النتائج الرئيسية

يمكن أن تكون مخازن الدولة في Kafka Streams بمثابة قاعدة بيانات موزعة فعليًا،

  • تتكرر باستمرار في كافكا
  • يمكن بسهولة إنشاء واجهة برمجة تطبيقات CRUD فوق مثل هذا النظام
  • يعد التعامل مع الأقسام المتعددة أكثر تعقيدًا بعض الشيء
  • من الممكن أيضًا إضافة مخزن حالة واحد أو أكثر إلى طوبولوجيا التدفق لتخزين البيانات المساعدة. يمكن استخدام هذا الخيار من أجل:
  • تخزين البيانات اللازمة على المدى الطويل لإجراء العمليات الحسابية أثناء معالجة التدفق
  • تخزين طويل الأمد للبيانات التي قد تكون مفيدة في المرة التالية التي يتم فيها توفير مثيل البث
  • أكثر بكثير...

هذه المزايا وغيرها تجعل Kafka Streams مناسبة تمامًا للحفاظ على الحالة العالمية في نظام موزع مثل نظامنا. لقد أثبتت Kafka Streams أنها موثوقة جدًا في الإنتاج (لم نفقد أي رسائل تقريبًا منذ نشرها)، ونحن واثقون من أن قدراتها لن تتوقف عند هذا الحد!

المصدر: www.habr.com

إضافة تعليق