كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي" مرحبًا هابريتس! هذا الكتاب مناسب لأي مطور يريد فهم البث. سيساعدك فهم البرمجة الموزعة على فهم تدفقات كافكا وكافكا بشكل أفضل. سيكون من الجيد معرفة إطار عمل كافكا نفسه، لكن هذا ليس ضروريًا: سأخبرك بكل ما تحتاجه. سيتعلم مطورو Kafka ذوو الخبرة وكذلك المبتدئون كيفية إنشاء تطبيقات دفق مثيرة للاهتمام باستخدام مكتبة Kafka Streams مع هذا الكتاب. سيتعلم مطورو Java من المستوى المتوسط ​​إلى المتقدم، والذين هم على دراية بمفاهيم مثل التسلسل، كيفية تطبيق مهاراتهم لبناء تطبيقات Kafka Streams. الكود المصدري للكتاب مكتوب بلغة Java 8 ويستخدم بشكل كبير بناء جملة تعبير Java 8 lambda، لذا فإن معرفة كيفية العمل مع وظائف lambda (حتى في لغة برمجة مختلفة) ستكون مفيدة.

مقتطفات. 5.3. عمليات التجميع والنافذة

في هذا القسم، سننتقل إلى استكشاف الأجزاء الواعدة في Kafka Streams. لقد قمنا حتى الآن بتغطية الجوانب التالية لتدفقات كافكا:

  • إنشاء طوبولوجيا المعالجة؛
  • استخدام الحالة في تطبيقات التدفق؛
  • أداء اتصالات دفق البيانات؛
  • الاختلافات بين تدفقات الأحداث (KStream) وتدفقات التحديث (KTable).

وفي الأمثلة التالية، سوف نقوم بجمع كل هذه العناصر معًا. ستتعرف أيضًا على عمليات النوافذ، وهي ميزة رائعة أخرى لتطبيقات البث. سيكون مثالنا الأول عبارة عن تجميع بسيط.

5.3.1. تجميع مبيعات الأسهم حسب الصناعة

يعد التجميع والتجميع أدوات حيوية عند العمل مع تدفق البيانات. إن البحث عن السجلات الفردية عند ورودها لا يكفي في كثير من الأحيان. لاستخراج معلومات إضافية من البيانات، يجب تجميعها ودمجها.

في هذا المثال، سترتدي زي المتداول اليومي الذي يحتاج إلى تتبع مبيعات الأسهم في الشركات عبر العديد من الصناعات. على وجه التحديد، أنت مهتم بالشركات الخمس التي لديها أكبر مبيعات للأسهم في كل صناعة.

لمثل هذا التجميع، ستكون هناك حاجة إلى عدة خطوات تالية لترجمة البيانات إلى النموذج المطلوب (بعبارات عامة).

  1. قم بإنشاء مصدر قائم على الموضوع ينشر معلومات تداول الأسهم الأولية. سيتعين علينا تعيين كائن من النوع StockTransaction إلى كائن من النوع ShareVolume. الحقيقة هي أن كائن StockTransaction يحتوي على بيانات تعريف المبيعات، ونحتاج فقط إلى بيانات حول عدد الأسهم المباعة.
  2. قم بتجميع بيانات ShareVolume حسب رموز المشاركة. بمجرد تجميعها حسب الرمز، يمكنك طي هذه البيانات وصولاً إلى الإجماليات الفرعية لحجم مبيعات الأسهم. لاحظ أن الأسلوب KStream.groupBy يقوم بإرجاع مثيل من النوع KGroupedStream. ويمكنك الحصول على مثيل KTable عن طريق استدعاء الأسلوب KGroupedStream.reduce بعد ذلك.

ما هي واجهة KGroupedStream

تقوم الأساليب KStream.groupBy وKStream.groupByKey بإرجاع مثيل KGroupedStream. KGroupedStream هو تمثيل وسيط لتدفق الأحداث بعد تجميعها حسب المفاتيح. ولم يتم تصميمه للعمل مباشرة معه على الإطلاق. بدلاً من ذلك، يتم استخدام KGroupedStream لعمليات التجميع التي تؤدي دائمًا إلى KTable. وبما أن نتيجة عمليات التجميع هي KTable وأنهم يستخدمون مخزن الحالة، فمن الممكن ألا يتم إرسال كافة التحديثات في النتيجة إلى أسفل المسار.

تقوم طريقة KTable.groupBy بإرجاع KGroupedTable مشابه - وهو تمثيل وسيط لدفق التحديثات المعاد تجميعها حسب المفتاح.

لنأخذ استراحة قصيرة وننظر إلى الشكل. 5.9، وهو ما يوضح ما حققناه. يجب أن تكون هذه الطوبولوجيا مألوفة لك الآن.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
دعونا الآن نلقي نظرة على كود هذه الطوبولوجيا (يمكن العثور عليها في src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (القائمة 5.2).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
يتميز الكود أعلاه بإيجازه وحجم الإجراءات الكبير الذي يتم تنفيذه في عدة أسطر. قد تلاحظ شيئًا جديدًا في المعلمة الأولى للأسلوب builder.stream: قيمة النوع المعدود AutoOffsetReset.EARLIEST (يوجد أيضًا LATEST)، والذي تم تعيينه باستخدام الأسلوب Consumed.withOffsetResetPolicy. يمكن استخدام هذا النوع المذكور لتحديد استراتيجية إعادة تعيين الإزاحة لكل KStream أو KTable، وله الأسبقية على إعداد إعادة تعيين الإزاحة من التكوين.

GroupByKey وGroupBy

تحتوي واجهة KStream على طريقتين لتجميع السجلات: GroupByKey وGroupBy. كلاهما يُرجعان KGroupedTable، لذا قد تتساءل ما الفرق بينهما ومتى تستخدم أي منهما؟

يتم استخدام طريقة GroupByKey عندما تكون المفاتيح الموجودة في KStream غير فارغة بالفعل. والأهم من ذلك، أنه لم يتم تعيين علامة "يتطلب إعادة التقسيم" مطلقًا.

تفترض طريقة GroupBy أنك قمت بتغيير مفاتيح التجميع، لذلك يتم تعيين علامة إعادة التقسيم على "صحيح". عمليات الانضمام والتجمعات وما إلى ذلك بعد أن تقوم طريقة GroupBy بإعادة التقسيم تلقائيًا.
ملخص: يجب عليك استخدام GroupByKey، وليس GroupBy، كلما أمكن ذلك.

ما تفعله طريقتا MapValues ​​وgroupBy واضح، لذلك دعونا نلقي نظرة على طريقة sum() (الموجودة في src/main/java/bbejeck/model/ShareVolume.java) (القائمة 5.3).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
تقوم طريقة ShareVolume.sum بإرجاع الإجمالي الفرعي لحجم مبيعات الأسهم، وتكون نتيجة سلسلة الحساب بأكملها عبارة عن كائن KTable . الآن أنت تفهم الدور الذي يلعبه KTable. عند وصول كائنات ShareVolume، يتم تخزين آخر تحديث في كائن KTable المقابل. من المهم أن تتذكر أن كافة التحديثات تنعكس في shareVolumeKTable السابق، ولكن لا يتم إرسالها جميعًا.

نستخدم بعد ذلك KTable للتجميع (حسب عدد الأسهم المتداولة) للحصول على الشركات الخمس ذات أكبر حجم من مبيعات الأسهم في كل صناعة. ستكون أفعالنا في هذه الحالة مشابهة للإجراءات التي تمت أثناء التجميع الأول.

  1. قم بإجراء عملية groupBy أخرى لتجميع كائنات ShareVolume الفردية حسب الصناعة.
  2. انتقل إلى جمع كائنات ShareVolume. هذه المرة كائن التجميع هو قائمة انتظار ذات أولوية ذات حجم ثابت. تبقى فقط الشركات الخمس ذات الأسهم الأكثر بيعًا في قائمة الانتظار ذات الحجم الثابت.
  3. قم بتعيين قوائم الانتظار من العنصر السابق إلى قيمة سلسلة وإرجاع الأسهم الخمسة الأكثر مبيعًا حسب الصناعة حسب الصناعة.
  4. كتابة النتائج في شكل سلسلة إلى الموضوع.

على الشكل. يوضح الشكل 5.10 رسمًا بيانيًا لطوبولوجيا تدفق البيانات. كما ترون، الجولة الثانية من المعالجة بسيطة للغاية.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
الآن، بعد أن فهمنا بوضوح بنية دائرة المعالجة الثانية هذه، يمكننا الرجوع إلى الكود المصدري الخاص بها (ستجده في الملف src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (القائمة 5.4).

يحتوي هذا المُهيئ على متغير قائمة الانتظار الثابتة. هذا هو كائن محول مخصص لـ java.util.TreeSet الذي يتم استخدامه لتتبع أعلى نتائج N بترتيب تنازلي لعدد الأسهم المباعة.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
لقد رأيت بالفعل استدعاءات إلى groupBy وmapValues، لذا لن نتناولها بالتفصيل (نحن نستدعي الأسلوب KTable.toStream لأن الأسلوب KTable.print مهمل). لكنك لم تشاهد إصدار KTable من التجميع () بعد، لذلك سنقضي بعض الوقت في مناقشته.

كما تتذكر، يختلف KTable في أن السجلات التي لها نفس المفتاح تعتبر تحديثات. يستبدل KTable الإدخال القديم بالإدخال الجديد. يعمل التجميع بطريقة مماثلة: يتم تجميع أحدث السجلات بنفس المفتاح. عند وصول إدخال، تتم إضافته إلى مثيل فئة FixedSizePriorityQueue باستخدام مُضيف (المعلمة الثانية في استدعاء الأسلوب التجميعي)، ولكن إذا كان هناك إدخال آخر موجود بالفعل بنفس المفتاح، فسيتم إزالة الإدخال القديم باستخدام الطرح (المعلمة الثالثة في استدعاء الطريقة التجميعية).

كل هذا يعني أن مجمعنا، FixedSizePriorityQueue، لا يجمع كل القيم بمفتاح واحد على الإطلاق، ولكنه يخزن مجموعًا متجددًا من الكميات N من أنواع الأسهم الأكثر تداولًا. يحتوي كل إدخال وارد على إجمالي عدد الأسهم المباعة حتى الآن. سوف يوفر لك KTable معلومات حول الأسهم الأكثر مبيعًا حاليًا، ولا يلزم تجميع كل تحديث بشكل متجدد.

لقد تعلمنا أن نفعل شيئين مهمين:

  • تجميع القيم في KTable حسب مفتاحها المشترك؛
  • إجراء عمليات مفيدة على هذه القيم المجمعة، مثل الالتواء والتجميع.

تعد معرفة كيفية إجراء هذه العمليات أمرًا مهمًا لفهم معنى البيانات التي تنتقل عبر تطبيق Kafka Streams وفهم المعلومات التي تنقلها.

لقد جمعنا أيضًا بعض المفاهيم الأساسية التي تمت مناقشتها سابقًا في هذا الكتاب. في الفصل الرابع، ناقشنا أهمية الحالة المحلية المتسامحة مع الأخطاء في تطبيق التدفق. يوضح المثال الأول في هذا الفصل أهمية الحالة المحلية، فهي تمنحك القدرة على تتبع المعلومات التي اطلعت عليها بالفعل. يؤدي الوصول المحلي إلى تجنب تأخيرات الشبكة، مما يجعل التطبيق أكثر أداءً وأكثر تحملاً للأخطاء.

عند إجراء أي عملية تجميع أو تجميع، يجب عليك تحديد اسم مخزن الحالة. تُرجع عمليات الطي والتجميع مثيل KTable، ويستخدم KTable مخزن الحالة لاستبدال النتائج القديمة بنتائج جديدة. كما رأيت، لا يتم إرسال كافة التحديثات بشكل أكبر، وهذا أمر مهم، نظرًا لأن عمليات التجميع مصممة للحصول على معلومات موجزة. إذا لم تقم بتطبيق الحالة المحلية، فسوف يرسل KTable جميع نتائج التجميع والتخفيض بشكل أكبر.

بعد ذلك، سننظر في تنفيذ عمليات مثل التجميع خلال فترة زمنية محددة - ما يسمى بعمليات النافذة (عمليات النافذة).

5.3.2. عمليات النافذة

في القسم السابق، تعرفنا على الالتفاف والتجميع "الانزلاقي". يقوم التطبيق باستمرار بزيادة حجم مبيعات الأسهم، يليه تجميع الأسهم الخمسة الأكثر مبيعًا في البورصة.

في بعض الأحيان يكون هذا التجميع المستمر وتجميع النتائج ضروريًا. وفي بعض الأحيان تحتاج إلى تنفيذ العمليات فقط خلال فترة زمنية معينة. على سبيل المثال، لحساب عدد عمليات التبادل التي تمت مع أسهم شركة معينة في آخر 10 دقائق. أو عدد المستخدمين الذين نقروا على إعلان بانر جديد في آخر 15 دقيقة. يمكن للتطبيق إجراء مثل هذه العمليات بشكل متكرر، ولكن مع نتائج مرتبطة فقط بفترات زمنية محددة (النوافذ الزمنية).

عد معاملات الصرف من قبل المشتري

في المثال التالي، سنقوم بتتبع معاملات البورصة للعديد من المتداولين - إما المؤسسات الكبيرة أو الممولين الأذكياء.

هناك سببان محتملان لهذا التتبع. أحدها هو الحاجة إلى معرفة ما يشتريه/يبيعه قادة السوق. إذا رأى هؤلاء اللاعبون الكبار والمستثمرون المتطورون فرصة لأنفسهم، فمن المنطقي اتباع استراتيجيتهم. السبب الثاني هو الرغبة في ملاحظة أي علامات محتملة للمعاملات غير القانونية باستخدام المعلومات الداخلية. للقيام بذلك، ستحتاج إلى تحليل العلاقة بين الارتفاعات الكبيرة في المبيعات والبيانات الصحفية المهمة.

يتكون هذا التتبع من الخطوات التالية:

  • إنشاء موضوع للقراءة من موضوع معاملات الأسهم؛
  • تجميع السجلات الواردة حسب معرف المشتري ورمز السهم. يؤدي استدعاء الأسلوب groupBy إلى إرجاع مثيل لفئة KGroupedStream؛
  • تقوم طريقة KGroupedStream.windowedBy بإرجاع دفق بيانات محاط بنافذة زمنية، مما يسمح بتجميع الإطارات. اعتمادًا على نوع النافذة، يتم إرجاع إما TimeWindowedKStream أو SessionWindowedKStream؛
  • عدد المعاملات لعملية التجميع. يحدد تدفق بيانات النافذة ما إذا كان سيتم تضمين سجل معين في هذا العدد؛
  • كتابة النتائج إلى موضوع ما أو إخراجها إلى وحدة التحكم في وقت التصميم.

طوبولوجيا هذا التطبيق بسيطة، ولكن صورته المرئية لا تؤذي. دعونا ننظر إلى الشكل. 5.11.

بعد ذلك، سننظر في وظيفة عمليات النافذة والرمز المقابل.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"

أنواع النوافذ

هناك ثلاثة أنواع من النوافذ في Kafka Streams:

  • حصة؛
  • "تراجع" (تراجع)؛
  • انزلاق / "القفز" (انزلاق / قفز).

أي واحد للاختيار يعتمد على متطلبات العمل. تكون النوافذ المتقلبة والقفزية محدودة بالوقت، في حين أن نوافذ الجلسة مقيدة بالوقت، مع تحديد مدة الجلسة (الجلسات) فقط من خلال مدى نشاط المستخدم. الشيء الرئيسي الذي يجب تذكره هو أن جميع أنواع النوافذ تعتمد على طوابع التاريخ/الوقت للإدخالات، وليس على وقت النظام.

بعد ذلك، نقوم بتنفيذ الهيكل الخاص بنا مع كل نوع من أنواع النوافذ. سيتم عرض الكود الكامل في المثال الأول فقط، ولن يتغير شيء بالنسبة لأنواع النوافذ الأخرى، باستثناء نوع تشغيل النافذة.

نوافذ الجلسة

تختلف نوافذ الجلسة كثيرًا عن جميع أنواع النوافذ الأخرى. فهي محدودة ليس بالوقت بقدر ما تكون محدودة بنشاط المستخدم (أو نشاط الكيان الذي ترغب في تتبعه). يتم تحديد نوافذ الجلسة بفترات عدم النشاط.

يوضح الشكل 5.12 مفهوم نوافذ الجلسة. سيتم دمج الجلسة الأصغر مع الجلسة الموجودة على يسارها. وستكون الجلسة على اليمين منفصلة لأنها تتبع فترة طويلة من عدم النشاط. تعتمد نوافذ الجلسة على إجراءات المستخدم، ولكن استخدم طوابع التاريخ/الوقت من الإدخالات لتحديد الجلسة التي ينتمي إليها الإدخال.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"

استخدام نوافذ الجلسة لتتبع معاملات الصرف

دعونا نستخدم نوافذ الجلسة لالتقاط معلومات حول معاملات التبادل. يظهر تنفيذ نوافذ الجلسة في القائمة 5.5 (والتي يمكن العثور عليها في src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
لقد رأيت بالفعل معظم عمليات هذه الطوبولوجيا، لذا ليست هناك حاجة لمراجعتها هنا مرة أخرى. ولكن هناك أيضًا بعض العناصر الجديدة هنا، والتي سنناقشها الآن.

عادةً ما تقوم أي عملية groupBy بتنفيذ نوع من عمليات التجميع (التجميع أو القيمة المحتسبة أو العد). يمكنك إجراء إما تجميع تراكمي إجمالي قيد التشغيل أو تجميع في إطارات يأخذ في الاعتبار السجلات ضمن نافذة زمنية معينة.

يقوم الكود الموجود في القائمة 5.5 بحساب عدد المعاملات داخل نوافذ الجلسة. على الشكل. 5.13 يتم تحليل هذه الإجراءات خطوة بخطوة.

من خلال استدعاء windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) نقوم بإنشاء نافذة جلسة بفاصل سكون مدته 20 ثانية وفاصل حفظ مدته 15 دقيقة. تعني فترة الخمول البالغة 20 ثانية أن التطبيق سيتضمن أي إدخال يصل خلال 20 ثانية من نهاية أو بداية الجلسة الحالية في الجلسة (النشيطة) الحالية.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
بعد ذلك، نحدد عملية التجميع التي سيتم إجراؤها في نافذة الجلسة - في هذه الحالة، قم بالعد. إذا تجاوزت الكتابة الواردة الفاصل الزمني للخمول (على جانبي طابع التاريخ/الوقت)، فسيقوم التطبيق بإنشاء جلسة عمل جديدة. تعني فترة الاستمرارية إبقاء الجلسة نشطة لفترة معينة من الوقت وتسمح بالبيانات المتأخرة التي تتجاوز فترة خمول الجلسة ولكن لا يزال من الممكن إرفاقها. بالإضافة إلى ذلك، تتوافق بداية ونهاية الجلسة الجديدة الناتجة عن الدمج مع طوابع التاريخ/الوقت الأقدم والأحدث.

دعونا نلقي نظرة على بعض الإدخالات من طريقة العد لنرى كيفية عمل الجلسات (الجدول 5.1).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
عند وصول السجلات، نبحث عن الجلسات الموجودة بالفعل بنفس المفتاح، ووقت الانتهاء أقل من طابع التاريخ/الوقت الحالي - الفاصل الزمني للخمول، ووقت البدء أكبر من طابع التاريخ/الوقت الحالي + الفاصل الزمني للخمول. مع أخذ هذا في الاعتبار، أربعة إدخالات من الجدول. 5.1 الدمج في جلسة واحدة على النحو التالي.

1. يصل السجل 1 أولاً، وبالتالي فإن وقت البدء يساوي وقت الانتهاء وهو 00:00:00.

2. يأتي بعد ذلك السجل 2، ونبحث عن الجلسات التي تنتهي في موعد لا يتجاوز 23:59:55 وتبدأ في موعد لا يتجاوز 00:00:35. نجد السجل 1 وندمج الجلستين 1 و 2. نأخذ وقت بدء الجلسة 1 (سابقًا) ووقت نهاية الجلسة 2 (لاحقًا)، لذا تبدأ جلستنا الجديدة عند 00:00:00 وتنتهي عند 00:00 :15.

3. وصول السجل 3، نبحث عن الجلسات بين 00:00:30 و00:01:10 ولم نجد شيئًا. نضيف جلسة ثانية للمفتاح 123-345-654، FFBE، تبدأ وتنتهي عند الساعة 00:00:50.

4. وصول السجل 4 ونحن نبحث عن جلسات بين الساعة 23:59:45 و00:00:25. هذه المرة تم العثور على كلتا الجلستين - 1 و2. تم دمج الجلسات الثلاث في جلسة واحدة، مع وقت البدء 00:00:00 ووقت الانتهاء 00:00:15.

إليك بعض الأمور المهمة التي يجب مراعاتها من هذا القسم:

  • الجلسات ليست نوافذ ذات حجم ثابت. يتم تحديد مدة الجلسة حسب النشاط خلال فترة زمنية معينة؛
  • تحدد طوابع التاريخ/الوقت الموجودة في البيانات ما إذا كان الحدث يقع ضمن جلسة موجودة أم في فترة خمول.

بعد ذلك، سنناقش النوع التالي من النوافذ - النوافذ "المتساقطة".

نوافذ "متدلية".

تلتقط النوافذ "المتداعية" الأحداث التي تقع خلال فترة زمنية معينة. تخيل أنك بحاجة إلى التقاط جميع معاملات الصرف لشركة معينة كل 20 ثانية، بحيث تقوم بجمع كل الأحداث لهذه الفترة الزمنية. في نهاية فترة المراقبة البالغة 20 ثانية، "تهبط" النافذة وتتحول إلى فترة مراقبة جديدة مدتها 20 ثانية. يوضح الشكل 5.14 هذا الموقف.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
كما ترون، يتم تضمين كافة الأحداث التي تم تلقيها في آخر 20 ثانية في النافذة. بعد هذه الفترة الزمنية، يتم إنشاء نافذة جديدة.

تعرض القائمة 5.6 الكود الذي يوضح استخدام النوافذ المتدلية لالتقاط معاملات التبادل كل 20 ثانية (موجود في src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
مع هذا التغيير الطفيف في استدعاء الأسلوب TimeWindows.of، يمكن استخدام نافذة متأرجحة. في هذا المثال، لا يوجد استدعاء للأسلوب until()، لذلك سيتم استخدام فترة الحفظ الافتراضية البالغة 24 ساعة.

أخيرًا، حان الوقت للانتقال إلى آخر خيارات النافذة، والقفز بين النوافذ.

النوافذ المنزلقة ("القفز").

النوافذ المنزلقة/القافزة تشبه النوافذ المتدلية، ولكن مع اختلاف طفيف. لا تنتظر النوافذ المنزلقة مرور فترة زمنية قبل إنشاء نافذة جديدة للتعامل مع الأحداث الأخيرة. يبدأون حسابات جديدة بعد فترة زمنية أقل من مدة النافذة.

لتوضيح الاختلافات بين النوافذ "المتقلبة" و"القفز"، دعنا نعود إلى مثال حساب معاملات الأسهم. لا يزال هدفنا هو حساب عدد المعاملات، لكننا لا نريد الانتظار طوال الوقت قبل تحديث العداد. وبدلاً من ذلك، سنقوم بتحديث العداد على فترات زمنية أقصر. على سبيل المثال، سنظل نحسب عدد المعاملات كل 20 ثانية، ولكن نقوم بتحديث العداد كل 5 ثوانٍ، كما هو موضح في الشكل. 5.15. في هذه الحالة، لدينا ثلاث نوافذ نتائج تحتوي على بيانات متداخلة.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
تعرض القائمة 5.7 رمز إعداد النوافذ المنزلقة (الموجود في src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
يمكن تحويل النافذة المتداعية إلى نافذة مرتدّة عن طريق إضافة استدعاء للتابع AdvancedBy()‎. في المثال الموضح، الفاصل الزمني للحفظ هو 15 دقيقة.

لقد رأيت في هذا القسم كيفية قصر نتائج التجميع على النوافذ الزمنية. وعلى وجه الخصوص، أود منك أن تتذكر الأشياء الثلاثة التالية من هذا القسم:

  • لا يقتصر حجم نوافذ الجلسة على فترة زمنية، بل على نشاط المستخدم؛
  • تعطي النوافذ "المتعثرة" فكرة عن الأحداث خلال فترة زمنية معينة؛
  • تم إصلاح مدة النوافذ "المتنقلة"، ولكن يتم تحديثها بشكل متكرر وقد تحتوي على إدخالات متداخلة في كافة النوافذ.

بعد ذلك، سنتعلم كيفية تحويل KTable مرة أخرى إلى KStream للاتصال.

5.3.3. ربط كائنات KStream وKTable

في الفصل الرابع، ناقشنا ربط كائنين KStream. الآن علينا أن نتعلم كيفية ربط KTable وKStream. قد يكون هذا ضروريًا للسبب البسيط التالي. KStream هو دفق سجل وKTable هو دفق تحديث سجل، ولكن في بعض الأحيان قد يكون من الضروري إضافة سياق إضافي إلى دفق السجل مع التحديثات من KTable.

لنأخذ بيانات حول عدد معاملات التبادل ونجمعها مع أخبار التبادل الخاصة بالصناعات ذات الصلة. إليك ما يتعين عليك القيام به لتحقيق ذلك، نظرًا للكود الموجود لديك بالفعل.

  1. قم بتحويل كائن KTable الذي يحتوي على بيانات حول عدد معاملات التبادل إلى KStream، ثم استبدل المفتاح بمفتاح يشير إلى الصناعة المقابلة لرمز السهم المحدد.
  2. قم بإنشاء كائن KTable الذي يقرأ البيانات من موضوع أخبار التبادل. سيتم تصنيف KTable الجديد هذا حسب الصناعة.
  3. ربط تحديثات الأخبار بمعلومات حول عدد معاملات التبادل حسب الصناعة.

الآن دعونا نرى كيفية تنفيذ خطة العمل هذه.

تحويل KTable إلى KStream

لتحويل KTable إلى KStream عليك القيام بما يلي.

  1. استدعاء الأسلوب KTable.toStream().
  2. عن طريق استدعاء أسلوب KStream.map، استبدل المفتاح باسم الصناعة، ثم قم باسترداد كائن TransactionSummary من مثيل Windowed.

سنقوم بتسلسل هذه العمليات على النحو التالي (يمكن العثور على الكود في src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (القائمة 5.8).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
نظرًا لأننا نقوم بتنفيذ عملية KStream.map، فإن إعادة التقسيم لمثيل KStream الذي تم إرجاعه تتم تلقائيًا عند استخدامه في الاتصال.

لقد أكملنا عملية التحويل، وبعد ذلك نحتاج إلى إنشاء كائن KTable لقراءة أخبار الأسهم.

إنشاء KTable لأخبار الأسهم

لحسن الحظ، يكفي سطر واحد من التعليمات البرمجية لإنشاء كائن KTable (يمكن العثور على هذا الرمز في src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (القائمة 5.9).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
تجدر الإشارة إلى أنه لا توجد حاجة إلى كائنات Serde، حيث يتم استخدام سلسلة Serde في الإعدادات. وأيضًا، بفضل استخدام التعداد المبكر، يمتلئ الجدول بالسجلات في البداية.

الآن يمكننا أن ننتقل إلى الخطوة الأخيرة - الاتصال.

ربط تحديثات الأخبار ببيانات عدد المعاملات

إنشاء اتصال ليس بالأمر الصعب. سنستخدم صلة يسارية في حالة عدم وجود أخبار مخزون للصناعة ذات الصلة (يمكنك العثور على الكود في src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (القائمة 5.10-XNUMX).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
مشغل leftJoin هذا بسيط جدًا. على عكس الصلات الموجودة في الفصل 4، لا يتم استخدام طريقة JoinWindow لأنه عند إجراء صلة KStream-KTable، يوجد إدخال واحد فقط في KTable لكل مفتاح. مثل هذا الاتصال غير محدود بالوقت: الإدخال إما موجود في KTable أو غير موجود. الوجبات الرئيسية: باستخدام كائنات KTable، يمكنك إثراء KStream ببيانات مرجعية أقل تحديثًا.

والآن سننظر في طريقة أكثر فعالية لإثراء الأحداث من KStream.

5.3.4. كائنات GlobalKTable

كما تفهم، هناك حاجة لإثراء تدفقات الأحداث أو إضافة سياق إليها. في الفصل الرابع، رأيت الاتصال بين كائنين KStream، وفي القسم السابق، رأيت الاتصال بين KStream وKTable. في كل هذه الحالات، من الضروري إعادة تقسيم تدفق البيانات عند تعيين المفاتيح إلى نوع أو قيمة جديدة. في بعض الأحيان تتم إعادة التقسيم بشكل صريح، وأحيانًا تقوم Kafka Streams بذلك تلقائيًا. تعد إعادة التقسيم ضرورية لأن المفاتيح قد تغيرت ويجب أن تنتهي السجلات في الأقسام الجديدة وإلا فلن يكون الانضمام ممكنًا (تمت مناقشة هذا في الفصل 4، "إعادة تقسيم البيانات" في القسم 4).

إعادة التقسيم لها ثمن

تأتي إعادة التقسيم بتكلفة - تكاليف موارد إضافية لإنشاء موضوعات وسيطة، وتخزين البيانات المكررة في موضوع آخر؛ ويعني أيضًا زيادة زمن الوصول بسبب الكتابة في هذا الموضوع والقراءة منه. بالإضافة إلى ذلك، إذا كنت تريد الانضمام إلى أكثر من جانب أو بُعد واحد، فستحتاج إلى ربط السلسلة وتعيين السجلات بمفاتيح جديدة وتشغيل عملية إعادة التقسيم مرة أخرى.

الاتصال بمجموعات البيانات الأصغر

في بعض الحالات، تكون كمية البيانات المرجعية التي تم التخطيط للاتصال بها صغيرة نسبيًا، بحيث يمكن احتواء النسخ الكاملة منها محليًا على كل عقدة. في مثل هذه المواقف، توفر Kafka Streams فئة GlobalKTable.

تعتبر مثيلات GlobalKTable فريدة من نوعها لأن التطبيق يقوم بنسخ جميع البيانات إلى كل عقدة. وبما أن كل عقدة تحتوي على جميع البيانات، ليست هناك حاجة لتقسيم تدفق الحدث بواسطة مفتاح البيانات المرجعية بحيث يكون متاحًا لجميع الأقسام. يمكنك أيضًا إجراء اتصالات بدون مفتاح باستخدام كائنات GlobalKTable. ولنرجع إلى أحد الأمثلة السابقة لتوضيح هذا الاحتمال.

توصيل كائنات KStream بكائنات GlobalKTable

في القسم الفرعي 5.3.2، أجرينا تجميعًا ضمن إطار لمعاملات التبادل من قبل المشترين. تبدو نتائج هذا التجميع كما يلي:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

على الرغم من أن هذه النتائج تخدم الغرض المقصود، إلا أنه سيكون أكثر ملاءمة إذا تم أيضًا عرض اسم العميل واسم الشركة بالكامل. لإضافة اسم عميل واسم شركة، يمكنك إجراء عمليات ربط عادية، ولكن ستحتاج إلى إجراء تعيينين رئيسيين وإعادة التقسيم. مع GlobalKTable، يمكنك تجنب تكلفة مثل هذه العمليات.

للقيام بذلك، سنستخدم كائن countStream من القائمة 5.11-5 (يمكن العثور على الكود المقابل في src/main/java/bbejeck/chapter_XNUMX/GlobalKTableExample.java) ونقوم بتوصيله بكائنين GlobalKTable.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
لقد ناقشنا هذا من قبل، لذلك لن أكرر نفسي. لكن لاحظ أن التعليمات البرمجية الموجودة في الدالة toStream().map تم تلخيصها في كائن دالة لسهولة القراءة بدلاً من تعبير lambda المضمّن.

الخطوة التالية هي الإعلان عن مثيلين لـ GlobalKTable (يمكن العثور على هذا الرمز في src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (القائمة 5.12-XNUMX).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"

لاحظ أن أسماء المواضيع موصوفة باستخدام الأنواع المذكورة.

الآن بعد أن جهزنا جميع المكونات، الشيء الوحيد المتبقي هو كتابة رمز الاتصال (الذي يمكن العثور عليه في src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (القائمة 5.13-XNUMX).

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
على الرغم من وجود صلتين في هذا الكود، إلا أنه تم تنظيمهما كسلسلة لأنه لم يتم استخدام أي من نتائجهما بشكل منفصل. يتم عرض النتائج في نهاية العملية بأكملها.

عند تشغيل عملية الانضمام أعلاه، سوف تحصل على نتائج مثل هذا:

{customer='Barney, Smith' company="Exxon", transactions= 17}

لم يتغير الجوهر، لكن هذه النتائج تبدو أكثر قابلية للفهم.

إذا قمت بحساب الفصل 4، فقد رأيت بالفعل عدة أنواع من عمليات الانضمام قيد التنفيذ. وهي مدرجة في الجدول. 5.2. يعكس هذا الجدول الاتصال اعتبارًا من الإصدار 1.0.0 من Kafka Streams؛ قد تتغير الأمور في الإصدارات المستقبلية.

كتاب "تيارات كافكا في العمل. التطبيقات والخدمات الدقيقة للعمل في الوقت الفعلي"
في الختام، اسمحوا لي أن أذكرك بالشيء الرئيسي: يمكنك توصيل تدفقات الأحداث (KStream) وتحديث التدفقات (KTable) باستخدام الحالة المحلية. بالإضافة إلى ذلك، إذا لم يكن حجم البيانات المرجعية كبيرًا جدًا، فيمكنك استخدام كائن GlobalKTable. يقوم GlobalKTable بنسخ جميع الأقسام إلى كل عقد من عقد تطبيق Kafka Streams، وبالتالي ضمان توفر جميع البيانات، بغض النظر عن القسم الذي يتوافق معه المفتاح.

بعد ذلك، سنرى ميزة Kafka Streams، والتي تسمح لك بمراقبة تغييرات الحالة دون استهلاك البيانات من موضوع Kafka.

5.3.5. الحالة المطلوبة

لقد قمنا ببعض العمليات ذات الحالة من قبل وقمنا دائمًا بتسجيل النتائج إلى وحدة التحكم (لأغراض التطوير) أو بتسجيلها في موضوع (لأغراض الإنتاج). عند كتابة النتائج لموضوع ما، عليك استخدام مستهلك كافكا لعرضها.

ويمكن اعتبار قراءة البيانات من هذه المواضيع نوعا من وجهات النظر المحققة. بالنسبة لمهامنا، يمكننا استخدام تعريف العرض المادي من ويكيبيديا: "... كائن قاعدة بيانات فعلية يحتوي على نتائج الاستعلام. على سبيل المثال، يمكن أن تكون نسخة محلية من البيانات البعيدة، أو مجموعة فرعية من الصفوف و/أو الأعمدة في جدول أو نتيجة ربط، أو جدول محوري تم الحصول عليه عن طريق التجميع" (https://en.wikipedia.org/wiki / عرض مادي).

تسمح Kafka Streams أيضًا باستعلامات تفاعلية ضد المتاجر الحكومية، مما يجعل من الممكن قراءة هذه العروض المتحققة مباشرة. من المهم ملاحظة أن الطلب إلى مخزن الحالة هو عملية للقراءة فقط. وهذا يضمن أنه لا داعي للقلق بشأن جعل الحالة غير متسقة عن طريق الخطأ أثناء معالجة التطبيق للبيانات.

تعد القدرة على الاستعلام عن مخازن الحالة مباشرة أمرًا مهمًا. هذا يعني أنه يمكنك إنشاء تطبيقات لوحة المعلومات دون الحاجة إلى الحصول أولاً على البيانات من مستهلك كافكا. كما أنه يزيد من كفاءة التطبيق، وذلك لأنه ليس من الضروري كتابة البيانات مرة أخرى:

  • ونظرًا لموقع البيانات، يمكن الوصول إليها بسرعة؛
  • يتم التخلص من ازدواجية البيانات، حيث لا يتم كتابتها على وحدة التخزين الخارجية.

الشيء الرئيسي الذي أود منك أن تتذكره هو أنه يمكنك الاستعلام عن الحالة مباشرة من التطبيق. لا يمكن المبالغة في تقدير الإمكانيات التي يوفرها لك هذا. بدلاً من استهلاك البيانات من كافكا وتخزين السجلات في قاعدة بيانات التطبيق، يمكنك الاستعلام عن مخازن الحالة بنفس النتيجة. الاستعلام عن مخازن الحالة مباشرة يعني تعليمات برمجية أقل (لا يوجد مستهلك) وبرامج أقل (لا حاجة لجدول قاعدة بيانات لتخزين النتائج).

لقد قمنا بتغطية الكثير من المعلومات في هذا الفصل، لذا سنتوقف عن مناقشتنا للاستعلامات التفاعلية في المتاجر الحكومية للحظة. لكن لا تقلق: في الفصل التاسع، سنقوم بإنشاء تطبيق لوحة تحكم بسيط يحتوي على استعلامات تفاعلية. وسوف يستخدم بعض الأمثلة في هذا الفصل والفصول السابقة لتوضيح الاستعلامات التفاعلية وكيفية إضافتها إلى تطبيقات Kafka Streams.

ملخص

  • تمثل كائنات KStream تدفقات من الأحداث، يمكن مقارنتها بإدراجات قاعدة البيانات. تمثل كائنات KTable تدفقات التحديث، وهي أشبه بالتحديثات في قاعدة البيانات. لا ينمو حجم كائن KTable، ويتم استبدال السجلات القديمة بسجلات جديدة.
  • كائنات KTable مطلوبة لعمليات التجميع.
  • يتيح لك Windowing تقسيم البيانات المجمعة إلى صناديق زمنية.
  • بفضل كائنات GlobalKTable، يمكن الوصول إلى البيانات المرجعية في أي مكان في التطبيق، بغض النظر عن التقسيم.
  • الاتصالات بين كائنات KStream وKTable وGlobalKTable ممكنة.

لقد ركزنا حتى الآن على بناء تطبيقات Kafka Streams باستخدام DSL KStream عالي المستوى. على الرغم من أن النهج عالي المستوى يسمح لك بإنشاء برامج أنيقة وموجزة، إلا أن استخدامه يمثل حلاً وسطًا معينًا. إن العمل مع DSL KStream يعني جعل التعليمات البرمجية أكثر إيجازًا على حساب تحكم أقل. في الفصل التالي، سنلقي نظرة على واجهة برمجة تطبيقات عقدة المعالج ذات المستوى المنخفض ونجرب مقايضات أخرى. ستصبح البرامج أطول مما كانت عليه حتى الآن، لكننا سنكون قادرين على إنشاء أي عقدة معالج قد نحتاجها تقريبًا.

→ مزيد من التفاصيل حول الكتاب يمكن العثور عليها في موقع الناشر

→ بالنسبة لـ Khabrozhiteli خصم 25% على الكوبون - كافكا تيارات

→ عند دفع ثمن النسخة الورقية من الكتاب، يتم إرسال كتاب إلكتروني إلى البريد الإلكتروني.

المصدر: www.habr.com

إضافة تعليق