کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ" سلام بر اهالی خبر! این کتاب برای هر توسعه دهنده ای که می خواهد پردازش نخ را درک کند مناسب است. درک برنامه نویسی توزیع شده به شما کمک می کند تا کافکا و جریان های کافکا را بهتر درک کنید. خوب است که خود چارچوب کافکا را بشناسید، اما این ضروری نیست: من هر آنچه را که نیاز دارید به شما خواهم گفت. توسعه دهندگان با تجربه کافکا و تازه کارها به طور یکسان یاد می گیرند که چگونه برنامه های پردازش جریانی جالب را با استفاده از کتابخانه Kafka Streams در این کتاب ایجاد کنند. توسعه دهندگان متوسط ​​و پیشرفته جاوا که قبلاً با مفاهیمی مانند سریال سازی آشنا هستند، یاد خواهند گرفت که مهارت های خود را برای ایجاد برنامه های Kafka Streams به کار گیرند. کد منبع کتاب به زبان جاوا 8 نوشته شده است و از نحو بیان لامبدا جاوا 8 استفاده قابل توجهی می کند، بنابراین دانستن نحوه کار با توابع لامبدا (حتی در یک زبان برنامه نویسی دیگر) مفید خواهد بود.

گزیده. 5.3. عملیات تجمیع و پنجره سازی

در این بخش، به بررسی امیدوارکننده‌ترین بخش‌های کافکا استریمز می‌پردازیم. تاکنون جنبه‌های زیر از جریان‌های کافکا را پوشش داده‌ایم:

  • ایجاد یک توپولوژی پردازش؛
  • استفاده از حالت در برنامه های پخش جریانی؛
  • انجام اتصالات جریان داده؛
  • تفاوت بین جریان رویداد (KStream) و جریان به روز رسانی (KTable).

در مثال های زیر تمام این عناصر را با هم می آوریم. همچنین با پنجره، یکی دیگر از ویژگی های عالی برنامه های پخش جریانی آشنا خواهید شد. اولین مثال ما یک تجمیع ساده خواهد بود.

5.3.1. تجمیع فروش سهام بر اساس بخش صنعت

هنگام کار با داده های جریانی، تجمیع و گروه بندی ابزارهای حیاتی هستند. بررسی سوابق فردی به هنگام دریافت آنها اغلب کافی نیست. برای استخراج اطلاعات اضافی از داده ها، گروه بندی و ترکیب آنها ضروری است.

در این مثال، شما لباس یک معامله گر روزانه را می پوشید که باید حجم فروش سهام شرکت ها را در چندین صنعت دنبال کند. به طور خاص، شما به پنج شرکت با بیشترین فروش سهام در هر صنعت علاقه دارید.

چنین تجمیع‌سازی به چندین مرحله زیر نیاز دارد تا داده‌ها را به شکل دلخواه ترجمه کند (به صورت کلی صحبت کنیم).

  1. یک منبع موضوعی ایجاد کنید که اطلاعات خام معاملات سهام را منتشر کند. ما باید یک شی از نوع StockTransaction را به یک شی از نوع ShareVolume نگاشت کنیم. نکته این است که شی StockTransaction حاوی ابرداده های فروش است، اما ما فقط به داده هایی در مورد تعداد سهام فروخته شده نیاز داریم.
  2. داده های ShareVolume را بر اساس نماد سهام گروه بندی کنید. پس از گروه بندی بر اساس نماد، می توانید این داده ها را در مجموع فرعی حجم فروش سهام جمع کنید. شایان ذکر است که روش KStream.groupBy نمونه ای از نوع KGroupedStream را برمی گرداند. و می توانید با فراخوانی بیشتر متد KGroupedStream.reduce یک نمونه KTable دریافت کنید.

رابط KGroupedStream چیست؟

متدهای KStream.groupBy و KStream.groupByKey یک نمونه از KGroupedStream را برمی گرداند. KGroupedStream یک نمایش میانی از جریانی از رویدادها پس از گروه بندی بر اساس کلیدها است. اصلاً برای کار مستقیم با آن در نظر گرفته نشده است. در عوض، KGroupedStream برای عملیات تجمیع استفاده می شود که همیشه منجر به KTable می شود. و از آنجایی که نتیجه عملیات تجمیع یک KTable است و آنها از یک فروشگاه حالت استفاده می کنند، ممکن است همه به روز رسانی ها در نتیجه به پایین خط لوله ارسال نشوند.

متد KTable.groupBy یک KGroupedTable مشابه را برمی‌گرداند - یک نمایش متوسط ​​از جریان به‌روزرسانی‌ها، که توسط کلید دوباره گروه‌بندی می‌شود.

بیایید کمی استراحت کنیم و به شکل. 5.9، که نشان می دهد ما به چه چیزی دست یافته ایم. این توپولوژی از قبل باید برای شما بسیار آشنا باشد.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
بیایید اکنون به کد این توپولوژی نگاه کنیم (این کد را می توان در فایل src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java یافت) (فهرست 5.2).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
کد داده شده با مختصر بودن و حجم زیادی از اقدامات انجام شده در چندین خط متمایز می شود. ممکن است در پارامتر اول متد builder.stream متوجه چیز جدیدی شوید: مقداری از نوع enum AutoOffsetReset.EARLIEST (همچنین LATEST وجود دارد)، که با استفاده از روش Consumed.withOffsetResetPolicy تنظیم شده است. این نوع شمارش را می توان برای تعیین استراتژی بازنشانی افست برای هر KStream یا KTable استفاده کرد و بر گزینه تنظیم مجدد offset از پیکربندی اولویت دارد.

GroupByKey و GroupBy

رابط KStream دو روش برای گروه بندی رکوردها دارد: GroupByKey و GroupBy. هر دو یک KGroupedTable را برمی‌گردانند، بنابراین ممکن است از خود بپرسید که تفاوت بین آنها چیست و چه زمانی باید از کدام یک استفاده کرد؟

روش GroupByKey زمانی استفاده می شود که کلیدهای KStream از قبل خالی باشند. و مهمتر از همه، پرچم "نیاز به پارتیشن بندی مجدد" هرگز تنظیم نشد.

متد GroupBy فرض می کند که کلیدهای گروه بندی را تغییر داده اید، بنابراین پرچم پارتیشن مجدد روی true تنظیم می شود. انجام Join ها، Aggregation ها و غیره بعد از متد GroupBy منجر به پارتیشن بندی مجدد خودکار می شود.
خلاصه: در صورت امکان، باید از GroupByKey به جای GroupBy استفاده کنید.

مشخص است که متدهای mapValues ​​و groupBy چه می‌کنند، بنابراین بیایید نگاهی به متد sum() بیاندازیم (که در src/main/java/bbejeck/model/ShareVolume.java یافت می‌شود) (فهرست 5.3).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
روش ShareVolume.sum مجموع در حال اجرا حجم فروش سهام را برمی گرداند و نتیجه کل زنجیره محاسبات یک شی KTable است. . اکنون نقش KTable را درک می کنید. هنگامی که اشیاء ShareVolume می رسند، شی KTable مربوطه آخرین به روز رسانی فعلی را ذخیره می کند. مهم است که به یاد داشته باشید که همه به‌روزرسانی‌ها در shareVolumeKTable قبلی منعکس شده‌اند، اما همه بیشتر ارسال نمی‌شوند.

سپس از این KTable برای تجمیع (بر اساس تعداد سهام معامله شده) استفاده می کنیم تا به پنج شرکتی که بیشترین حجم سهام معامله شده در هر صنعت را دارند برسیم. اقدامات ما در این مورد مشابه اقدامات مربوط به اولین تجمع خواهد بود.

  1. یک عملیات groupBy دیگر برای گروه بندی اشیاء ShareVolume جداگانه بر اساس صنعت انجام دهید.
  2. خلاصه کردن اشیاء ShareVolume را شروع کنید. این بار شی تجمع یک صف اولویت با اندازه ثابت است. در این صف با اندازه ثابت، تنها پنج شرکتی که بیشترین مقدار سهام فروخته شده را دارند، حفظ می‌شوند.
  3. صف‌های پاراگراف قبلی را به یک مقدار رشته ترسیم کنید و پنج سهام برتر را بر اساس تعداد بر اساس صنعت برگردانید.
  4. نتایج را به صورت رشته ای در موضوع بنویسید.

در شکل شکل 5.10 نمودار توپولوژی جریان داده را نشان می دهد. همانطور که می بینید، مرحله دوم پردازش بسیار ساده است.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
اکنون که درک روشنی از ساختار این دور دوم پردازش داریم، می‌توانیم به کد منبع آن بپردازیم (آن را در فایل src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java خواهید یافت) (فهرست 5.4) .

این اولیه شامل یک متغیر fixedQueue است. این یک شی سفارشی است که یک آداپتور برای java.util.TreeSet است که برای ردیابی نتایج N برتر به ترتیب نزولی سهام معامله شده استفاده می شود.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
قبلاً تماس‌های groupBy و mapValues ​​را دیده‌اید، بنابراین به آن‌ها نمی‌پردازیم (متد KTable.toStream را می‌خوانیم زیرا روش KTable.print منسوخ شده است). اما شما هنوز نسخه KTable aggregate() را ندیده‌اید، بنابراین ما زمان کمی را صرف بحث در مورد آن خواهیم کرد.

همانطور که به یاد دارید، چیزی که KTable را متمایز می کند این است که رکوردهایی با کلیدهای یکسان به روز رسانی محسوب می شوند. KTable ورودی قدیمی را با ورودی جدید جایگزین می کند. تجمیع به روشی مشابه اتفاق می افتد: آخرین رکوردها با کلید یکسان جمع می شوند. هنگامی که رکوردی وارد می شود، با استفاده از جمع کننده (پارامتر دوم در فراخوانی متد انبوه) به نمونه کلاس FixedSizePriorityQueue اضافه می شود، اما اگر رکورد دیگری از قبل با همان کلید وجود داشته باشد، رکورد قدیمی با استفاده از یک تفریق کننده حذف می شود (پارامتر سوم در فراخوانی متد مجموع).

همه اینها به این معنی است که تجمیع کننده ما، FixedSizePriorityQueue، همه مقادیر را با یک کلید جمع نمی کند، بلکه مجموع متحرکی از مقادیر N نوع سهام که بیشترین معامله را دارند، ذخیره می کند. هر ورودی ورودی شامل تعداد کل سهام فروخته شده تا کنون است. KTable اطلاعاتی در مورد سهام شرکت‌هایی که در حال حاضر بیشترین معامله را دارند به شما می‌دهد، بدون نیاز به تجمیع چرخشی هر به‌روزرسانی.

ما یاد گرفتیم که دو کار مهم را انجام دهیم:

  • گروه بندی مقادیر در KTable توسط یک کلید مشترک؛
  • عملیات مفیدی مانند جمع آوری و تجمیع را روی این مقادیر گروه بندی شده انجام دهید.

دانستن نحوه انجام این عملیات برای درک معنای داده‌هایی که از طریق برنامه Kafka Streams جابجا می‌شوند و درک اینکه چه اطلاعاتی با خود حمل می‌کند، مهم است.

ما همچنین برخی از مفاهیم کلیدی که قبلاً در این کتاب مورد بحث قرار گرفت را گرد هم آورده ایم. در فصل 4، در مورد اینکه وضعیت محلی و تحمل‌پذیر خطا برای یک برنامه استریم اهمیت دارد، بحث کردیم. اولین مثال در این فصل نشان داد که چرا ایالت محلی بسیار مهم است - به شما امکان می دهد اطلاعاتی را که قبلاً دیده اید پیگیری کنید. دسترسی محلی از تاخیرهای شبکه جلوگیری می کند و باعث می شود برنامه کاربردی تر و مقاوم تر در برابر خطا باشد.

هنگام انجام هر عملیات جمع‌آوری یا تجمیع، باید نام فروشگاه State را مشخص کنید. عملیات rollup و aggregation یک نمونه KTable را برمی گرداند و KTable از ذخیره سازی حالت برای جایگزینی نتایج قدیمی با نتایج جدید استفاده می کند. همانطور که دیدید، همه به‌روزرسانی‌ها در خط لوله ارسال نمی‌شوند، و این مهم است زیرا عملیات تجمیع برای تولید اطلاعات خلاصه طراحی شده‌اند. اگر حالت محلی را اعمال نکنید، KTable همه نتایج تجمیع و جمع‌آوری را فوروارد می‌کند.

در مرحله بعد، ما به انجام عملیاتی مانند تجمیع در یک بازه زمانی خاص خواهیم پرداخت - به اصطلاح عملیات پنجره.

5.3.2. عملیات پنجره

در قسمت قبل به معرفی کانولوشن و تجمع کشویی پرداختیم. این برنامه یک جمع‌آوری مداوم از حجم فروش سهام را انجام داد و به دنبال آن پنج سهام پرمعامله در بورس را تجمیع کرد.

گاهی اوقات چنین تجمیع و جمع آوری مداوم نتایج ضروری است. و گاهی اوقات شما نیاز به انجام عملیات فقط در یک دوره زمانی معین دارید. به عنوان مثال، محاسبه کنید که در 10 دقیقه گذشته چند معامله مبادله ای با سهام یک شرکت خاص انجام شده است. یا چند کاربر در 15 دقیقه گذشته روی یک بنر تبلیغاتی جدید کلیک کرده اند. یک برنامه ممکن است چندین بار چنین عملیاتی را انجام دهد، اما نتایجی که فقط برای دوره های زمانی مشخص (پنجره های زمانی) اعمال می شود.

شمارش معاملات مبادله ای توسط خریدار

در مثال بعدی، تراکنش‌های سهام را در چندین معامله‌گر-چه سازمان‌های بزرگ یا سرمایه‌گذاران فردی هوشمند- پیگیری می‌کنیم.

دو دلیل احتمالی برای این ردیابی وجود دارد. یکی از آنها نیاز به دانستن اینکه رهبران بازار چه چیزی را خرید/فروش می کنند است. اگر این بازیگران بزرگ و سرمایه گذاران پیچیده فرصت را ببینند، منطقی است که استراتژی آنها را دنبال کنند. دلیل دوم تمایل به مشاهده هرگونه نشانه احتمالی تجارت غیرقانونی خودی است. برای انجام این کار، باید همبستگی جهش های فروش بزرگ را با بیانیه های مطبوعاتی مهم تجزیه و تحلیل کنید.

چنین ردیابی شامل مراحل زیر است:

  • ایجاد یک جریان برای خواندن از مبحث معاملات سهام؛
  • گروه بندی رکوردهای دریافتی بر اساس شناسه خریدار و نماد سهام. فراخوانی متد groupBy نمونه ای از کلاس KGroupedStream را برمی گرداند.
  • متد KGroupedStream.windowedBy یک جریان داده محدود به یک پنجره زمانی را برمی‌گرداند که امکان تجمیع پنجره‌ها را فراهم می‌کند. بسته به نوع پنجره، یک TimeWindowedKStream یا یک SessionWindowedKStream برگردانده می شود.
  • تعداد تراکنش برای عملیات تجمیع جریان داده های پنجره ای تعیین می کند که آیا یک رکورد خاص در این شمارش در نظر گرفته شده است یا خیر.
  • نوشتن نتایج برای یک موضوع یا خروجی آنها در کنسول در طول توسعه.

توپولوژی این برنامه ساده است، اما یک تصویر واضح از آن می تواند مفید باشد. بیایید نگاهی به شکل 5.11.

در مرحله بعد، عملکرد عملیات پنجره و کد مربوطه را بررسی خواهیم کرد.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"

انواع پنجره

در کافکا استریم سه نوع پنجره وجود دارد:

  • جلسه ای
  • غلت زدن (غلت زدن);
  • سر خوردن/پریدن

کدام یک را انتخاب کنید به نیازهای کسب و کار شما بستگی دارد. پنجره‌های چرخشی و پرش محدود به زمان هستند، در حالی که پنجره‌های جلسه با فعالیت کاربر محدود می‌شوند—مدت جلسه(ها) صرفاً بر اساس میزان فعال بودن کاربر تعیین می‌شود. نکته اصلی که باید به خاطر داشته باشید این است که همه انواع پنجره ها بر اساس مهر تاریخ/زمان ورودی ها هستند، نه زمان سیستم.

سپس، توپولوژی خود را با هر یک از انواع پنجره ها پیاده سازی می کنیم. کد کامل فقط در مثال اول داده خواهد شد؛ برای انواع دیگر ویندوز هیچ چیز به جز نوع عملکرد پنجره تغییر نخواهد کرد.

پنجره های جلسه

پنجره های Session بسیار متفاوت از سایر انواع ویندوز هستند. آنها نه به اندازه فعالیت کاربر (یا فعالیت نهادی که می خواهید ردیابی کنید) محدود به زمان هستند. پنجره های جلسه با دوره های عدم فعالیت محدود می شوند.

شکل 5.12 مفهوم پنجره های جلسه را نشان می دهد. جلسه کوچکتر با جلسه سمت چپ خود ادغام می شود. و جلسه سمت راست جدا خواهد بود زیرا به دنبال یک دوره طولانی عدم فعالیت است. پنجره‌های جلسه بر اساس فعالیت کاربر است، اما از مهرهای تاریخ/زمان ورودی‌ها برای تعیین اینکه ورودی متعلق به کدام جلسه است استفاده می‌کند.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"

استفاده از پنجره های جلسه برای پیگیری معاملات سهام

بیایید از پنجره های جلسه برای گرفتن اطلاعات در مورد تراکنش های مبادله استفاده کنیم. اجرای پنجره های جلسه در لیست 5.5 نشان داده شده است (که در src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java قابل مشاهده است).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
شما قبلاً اکثر عملیات های این توپولوژی را دیده اید، بنابراین نیازی به بررسی مجدد آنها در اینجا نیست. اما چندین عنصر جدید نیز در اینجا وجود دارد که اکنون به آنها خواهیم پرداخت.

هر عملیات groupBy معمولاً نوعی عملیات تجمیع (تجمیع، جمع‌آوری یا شمارش) را انجام می‌دهد. شما می‌توانید تجمیع تجمعی را با مجموع در حال اجرا انجام دهید، یا تجمیع پنجره، که رکوردها را در یک پنجره زمانی مشخص در نظر می‌گیرد.

کد موجود در فهرست 5.5 تعداد تراکنش‌ها را در پنجره‌های جلسه شمارش می‌کند. در شکل 5.13 این اقدامات گام به گام تحلیل می شوند.

با فراخوانی windowedBy(SessionWindows.with(twentySeconds).until(پانزده دقیقه)) یک پنجره جلسه با فاصله زمانی عدم فعالیت 20 ثانیه و فاصله ماندگاری 15 دقیقه ایجاد می کنیم. فاصله زمانی بیکار 20 ثانیه به این معنی است که برنامه هر ورودی را که ظرف 20 ثانیه پس از پایان یا شروع جلسه فعلی وارد جلسه فعلی (فعال) می شود را شامل می شود.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
در مرحله بعد، ما مشخص می کنیم که کدام عملیات تجمع باید در پنجره جلسه انجام شود - در این مورد، شمارش کنید. اگر ورودی ورودی خارج از پنجره عدم فعالیت باشد (هر طرف مهر تاریخ/زمان)، برنامه یک جلسه جدید ایجاد می کند. فاصله نگهداری به معنای حفظ یک جلسه برای مدت زمان معینی است و به داده‌های دیرهنگام اجازه می‌دهد که فراتر از دوره عدم فعالیت جلسه است اما همچنان می‌توان آن را پیوست کرد. علاوه بر این، شروع و پایان جلسه جدید حاصل از ادغام با اولین و آخرین مهر تاریخ/زمان مطابقت دارد.

بیایید به چند ورودی از روش شمارش نگاه کنیم تا ببینیم جلسات چگونه کار می کنند (جدول 5.1).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
وقتی رکوردها می رسند، با همان کلید به دنبال جلسات موجود می گردیم، زمان پایان کمتر از مهر تاریخ/زمان فعلی - فاصله زمانی عدم فعالیت، و زمان شروع بیشتر از مهر تاریخ/زمان فعلی + فاصله عدم فعالیت. با در نظر گرفتن این، چهار ورودی از جدول. 5.1 به صورت زیر در یک جلسه ادغام می شوند.

1. رکورد 1 ابتدا می رسد، بنابراین زمان شروع برابر با زمان پایان است و 00:00:00 است.

2. بعد، ورودی 2 می رسد، و ما به دنبال جلساتی می گردیم که زودتر از 23:59:55 پایان نمی یابند و حداکثر تا ساعت 00:00:35 شروع می شوند. ما رکورد 1 را پیدا می کنیم و جلسات 1 و 2 را ترکیب می کنیم. زمان شروع جلسه 1 (قبلتر) و زمان پایان جلسه 2 (بعداً) را می گیریم، به طوری که جلسه جدید ما از ساعت 00:00:00 شروع می شود و در 00 به پایان می رسد: 00:15.

3. رکورد 3 می رسد، ما بین ساعت 00:00:30 تا 00:01:10 به دنبال جلسات می گردیم و هیچ کدام را پیدا نمی کنیم. جلسه دوم را برای کلید 123-345-654، FFBE اضافه کنید که در ساعت 00:00:50 شروع و پایان می یابد.

4. رکورد 4 می رسد و ما به دنبال جلسات بین 23:59:45 و 00:00:25 هستیم. این بار هر دو جلسه 1 و 2 یافت می شوند. هر سه جلسه در یک جلسه با زمان شروع 00:00:00 و زمان پایان 00:00:15 ترکیب می شوند.

از آنچه در این بخش توضیح داده شده است، لازم است نکات مهم زیر را به خاطر بسپارید:

  • جلسات ویندوز با اندازه ثابت نیستند. مدت یک جلسه با فعالیت در یک دوره زمانی معین تعیین می شود.
  • مهرهای تاریخ/زمان در داده ها تعیین می کند که آیا رویداد در یک جلسه موجود است یا در یک دوره بیکار.

در ادامه در مورد نوع بعدی پنجره - پنجره های "غلتشی" بحث خواهیم کرد.

پنجره های "غلت خوردن".

پنجره‌های در حال چرخش رویدادهایی را ثبت می‌کنند که در یک بازه زمانی مشخص رخ می‌دهند. تصور کنید که باید هر 20 ثانیه تمام معاملات سهام یک شرکت خاص را ضبط کنید، بنابراین تمام رویدادهای آن دوره زمانی را جمع آوری می کنید. در پایان بازه 20 ثانیه ای، پنجره می چرخد ​​و به بازه مشاهده 20 ثانیه ای جدید می رود. شکل 5.14 این وضعیت را نشان می دهد.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
همانطور که می بینید، تمام رویدادهای دریافت شده در 20 ثانیه گذشته در پنجره گنجانده شده است. در پایان این بازه زمانی، یک پنجره جدید ایجاد می شود.

فهرست 5.6 کدی را نشان می‌دهد که استفاده از پنجره‌های در حال سقوط را برای ثبت معاملات سهام هر 20 ثانیه نشان می‌دهد (در src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java یافت می‌شود).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
با این تغییر کوچک در فراخوانی متد TimeWindows.of، می توانید از یک پنجره غلتشی استفاده کنید. در این مثال متد () () فراخوانی نمی‌شود، بنابراین از فاصله زمانی پیش‌فرض نگهداری 24 ساعت استفاده می‌شود.

در نهایت، زمان آن رسیده است که به آخرین گزینه پنجره - "پرش" پنجره ها برویم.

پنجره های کشویی ("پرش").

پنجره های کشویی/هپینگ شبیه پنجره های غلتشی هستند اما با کمی تفاوت. پنجره های کشویی قبل از ایجاد یک پنجره جدید برای پردازش رویدادهای اخیر تا پایان بازه زمانی منتظر نمی مانند. آنها محاسبات جدید را پس از فاصله زمانی کمتر از مدت زمان پنجره شروع می کنند.

برای نشان دادن تفاوت بین پنجره های غلتشی و جامپینگ، اجازه دهید به مثال شمارش معاملات بورس بازگردیم. هدف ما هنوز شمارش تعداد تراکنش‌ها است، اما نمی‌خواهیم کل زمان را قبل از به‌روزرسانی پیشخوان منتظر بمانیم. در عوض، شمارنده را در فواصل زمانی کوتاه‌تر به‌روزرسانی می‌کنیم. به عنوان مثال، ما همچنان تعداد تراکنش‌ها را هر 20 ثانیه می‌شماریم، اما همانطور که در شکل نشان داده شده است، شمارنده را هر 5 ثانیه به‌روزرسانی می‌کنیم. 5.15. در این مورد، ما با سه پنجره نتیجه با داده های همپوشانی مواجه می شویم.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
لیست 5.7 کد تعریف پنجره های کشویی را نشان می دهد (که در src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java یافت می شود).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
با افزودن یک فراخوانی به متد advanceBy() می‌توان یک پنجره درحال تبدیل را به یک پنجره پرش تبدیل کرد. در مثال نشان داده شده، فاصله ذخیره 15 دقیقه است.

در این بخش نحوه محدود کردن نتایج تجمع را به پنجره های زمانی مشاهده کردید. به ویژه، می خواهم سه مورد زیر را از این بخش به خاطر بسپارید:

  • اندازه پنجره های جلسه نه با دوره زمانی، بلکه با فعالیت کاربر محدود می شود.
  • پنجره های "غلت" یک نمای کلی از رویدادها در یک دوره زمانی معین ارائه می دهند.
  • مدت زمان پرش پنجره ها ثابت است، اما آنها اغلب به روز می شوند و ممکن است شامل ورودی های همپوشانی در همه پنجره ها باشند.

در مرحله بعد، نحوه تبدیل KTable را به KStream برای اتصال یاد خواهیم گرفت.

5.3.3. اتصال اشیاء KStream و KTable

در فصل 4، ما در مورد اتصال دو شیء KStream بحث کردیم. اکنون باید نحوه اتصال KTable و KStream را یاد بگیریم. این ممکن است به دلیل ساده زیر مورد نیاز باشد. KStream جریانی از رکوردها است و KTable جریانی از به‌روزرسانی‌های رکورد است، اما گاهی اوقات ممکن است بخواهید با استفاده از به‌روزرسانی‌های KTable، زمینه اضافی را به جریان رکورد اضافه کنید.

بیایید داده های تعداد معاملات بورس را بگیریم و آنها را با اخبار بورس برای صنایع مربوطه ترکیب کنیم. با توجه به کدی که از قبل دارید، برای رسیدن به این هدف باید چه کاری انجام دهید.

  1. یک شی KTable با داده‌های مربوط به تعداد معاملات سهام را به KStream تبدیل کنید و سپس کلید را با کلیدی که بخش صنعت مربوط به این نماد سهام را نشان می‌دهد جایگزین کنید.
  2. یک شی KTable ایجاد کنید که داده ها را از یک موضوع با اخبار بورس می خواند. این KTable جدید بر اساس بخش صنعت دسته بندی می شود.
  3. به روز رسانی اخبار را با اطلاعات تعداد معاملات بورس اوراق بهادار بر اساس بخش صنعت مرتبط کنید.

حال بیایید ببینیم که چگونه این برنامه عملیاتی را اجرا کنیم.

KTable را به KStream تبدیل کنید

برای تبدیل KTable به KStream باید موارد زیر را انجام دهید.

  1. متد KTable.toStream() را فراخوانی کنید.
  2. با فراخوانی متد KStream.map، نام صنعت را جایگزین کلید کنید و سپس شیء TransactionSummary را از نمونه Windowed بازیابی کنید.

ما این عملیات را به صورت زیر با هم زنجیره می کنیم (کد را می توان در فایل src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java یافت) (فهرست 5.8).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
از آنجایی که ما در حال انجام یک عملیات KStream.map هستیم، نمونه KStream برگشتی هنگامی که در یک اتصال استفاده می‌شود، دوباره به‌طور خودکار پارتیشن بندی می‌شود.

ما فرآیند تبدیل را کامل کرده ایم، سپس باید یک شی KTable برای خواندن اخبار سهام ایجاد کنیم.

ایجاد KTable برای اخبار سهام

خوشبختانه، ایجاد یک شی KTable فقط به یک خط کد نیاز دارد (کد را می توان در src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java یافت) (فهرست 5.9).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
شایان ذکر است که هیچ شیء Serde لازم نیست که مشخص شود، زیرا Serdes رشته ای در تنظیمات استفاده می شود. همچنین با استفاده از EARLIEST enumeration، جدول در همان ابتدا با رکوردها پر می شود.

اکنون می توانیم به مرحله نهایی - اتصال برویم.

اتصال به‌روزرسانی‌های اخبار با داده‌های تعداد تراکنش‌ها

ایجاد ارتباط کار سختی نیست. در صورتی که خبری از سهام برای صنعت مربوطه نباشد (کد لازم را می‌توانید در فایل src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java بیابید) استفاده می‌کنیم (فهرست 5.10).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
این عملگر leftJoin بسیار ساده است. برخلاف اتصال‌های فصل 4، از روش JoinWindow استفاده نمی‌شود، زیرا هنگام اجرای اتصال KStream-KTable، برای هر کلید فقط یک ورودی در KTable وجود دارد. چنین ارتباطی محدود به زمان نیست: رکورد یا در KTable است یا وجود ندارد. نتیجه گیری اصلی: با استفاده از اشیاء KTable می توانید KStream را با داده های مرجع که کمتر به روز می شوند غنی کنید.

اکنون ما به روش کارآمدتری برای غنی‌سازی رویدادها از KStream نگاه خواهیم کرد.

5.3.4. اشیاء GlobalKTable

همانطور که می بینید، نیاز به غنی سازی جریان رویداد یا اضافه کردن زمینه به آنها وجود دارد. در فصل 4 اتصالات بین دو شی KStream و در قسمت قبل ارتباط بین KStream و KTable را مشاهده کردید. در تمام این موارد، هنگام نگاشت کلیدها به نوع یا مقدار جدید، لازم است جریان داده را مجدداً پارتیشن بندی کنید. گاهی اوقات پارتیشن بندی مجدد به صراحت انجام می شود و گاهی اوقات Kafka Streams این کار را به صورت خودکار انجام می دهد. پارتیشن بندی مجدد ضروری است، زیرا کلیدها تغییر کرده اند و رکوردها باید به بخش های جدید ختم شوند، در غیر این صورت اتصال غیرممکن خواهد بود (این مورد در فصل 4، در بخش "پارتیشن بندی مجدد داده ها" در بخش فرعی 4.2.4 مورد بحث قرار گرفت).

پارتیشن بندی مجدد هزینه دارد

پارتیشن بندی مجدد نیاز به هزینه دارد - هزینه های منابع اضافی برای ایجاد موضوعات میانی، ذخیره داده های تکراری در موضوع دیگر. همچنین به معنای افزایش تاخیر به دلیل نوشتن و خواندن از این موضوع است. علاوه بر این، اگر نیاز به پیوستن به بیش از یک جنبه یا بعد دارید، باید اتصالات را زنجیره‌ای کنید، رکوردها را با کلیدهای جدید ترسیم کنید و دوباره فرآیند پارتیشن‌بندی را اجرا کنید.

اتصال به مجموعه داده های کوچکتر

در برخی موارد، حجم داده‌های مرجع برای اتصال نسبتاً کم است، بنابراین کپی‌های کامل آن به راحتی می‌توانند به صورت محلی در هر گره قرار بگیرند. برای موقعیت‌هایی مانند این، Kafka Streams کلاس GlobalKTable را ارائه می‌کند.

نمونه های GlobalKTable منحصر به فرد هستند زیرا برنامه تمام داده ها را در هر یک از گره ها تکرار می کند. و از آنجایی که تمام داده ها در هر گره وجود دارد، نیازی به پارتیشن بندی جریان رویداد توسط کلید داده مرجع وجود ندارد تا برای همه پارتیشن ها در دسترس باشد. همچنین می توانید با استفاده از اشیاء GlobalKTable پیوندهای بدون کلید ایجاد کنید. برای نشان دادن این ویژگی به یکی از مثال های قبلی بازگردیم.

اتصال اشیاء KStream به اشیاء GlobalKTable

در زیربخش 5.3.2، تجمیع پنجره معاملات مبادله ای را توسط خریداران انجام دادیم. نتایج این تجمیع چیزی شبیه به این بود:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

در حالی که این نتایج در خدمت هدف بودند، اگر نام مشتری و نام کامل شرکت نیز نمایش داده می شد، مفیدتر بود. برای افزودن نام مشتری و نام شرکت، می‌توانید اتصالات معمولی را انجام دهید، اما باید دو نگاشت کلید و پارتیشن‌بندی مجدد انجام دهید. با GlobalKTable می توانید از هزینه چنین عملیاتی جلوگیری کنید.

برای انجام این کار، از شی countStream از لیست 5.11 استفاده می کنیم (کد مربوطه را می توان در src/main/java/bbejeck/chapter_5/GlobalKTableExample.java یافت) و آن را به دو شی GlobalKTable متصل می کنیم.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
قبلاً در این مورد بحث کرده ایم، بنابراین آن را تکرار نمی کنم. اما توجه داشته باشم که کد موجود در تابع toStream().map برای خوانایی به جای یک عبارت لامبدا درون خطی در یک شی تابع انتزاع می شود.

مرحله بعدی اعلام دو نمونه از GlobalKTable است (کد نشان داده شده را می توان در فایل src/main/java/bbejeck/chapter_5/GlobalKTableExample.java یافت) (فهرست 5.12).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"

لطفاً توجه داشته باشید که نام موضوعات با استفاده از انواع برشماری شرح داده شده است.

اکنون که همه اجزا را آماده کرده ایم، فقط نوشتن کد اتصال (که در فایل src/main/java/bbejeck/chapter_5/GlobalKTableExample.java موجود است) باقی مانده است (فهرست 5.13).

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
اگرچه دو اتصال در این کد وجود دارد، اما آنها زنجیره ای هستند زیرا هیچ یک از نتایج آنها به طور جداگانه استفاده نمی شود. نتایج در پایان کل عملیات نمایش داده می شود.

هنگامی که عملیات Join بالا را اجرا می کنید، نتایجی مانند زیر دریافت خواهید کرد:

{customer='Barney, Smith' company="Exxon", transactions= 17}

ماهیت تغییر نکرده است، اما این نتایج واضح تر به نظر می رسند.

اگر برای فصل 4 شمارش معکوس کنید، قبلاً چندین نوع اتصال را در عمل مشاهده کرده اید. آنها در جدول ذکر شده اند. 5.2. این جدول قابلیت های اتصال را در نسخه 1.0.0 Kafka Streams نشان می دهد. ممکن است چیزی در نسخه های بعدی تغییر کند.

کتاب «جریان های کافکا در عمل. برنامه های کاربردی و میکروسرویس برای کار بلادرنگ"
برای جمع بندی همه چیز، اجازه دهید اصول اولیه را جمع بندی کنیم: می توانید جریان های رویداد (KStream) و به روز رسانی جریان ها (KTable) را با استفاده از حالت محلی متصل کنید. از طرف دیگر، اگر اندازه داده های مرجع خیلی بزرگ نیست، می توانید از شی GlobalKTable استفاده کنید. GlobalKTables همه پارتیشن‌ها را در هر گره برنامه Kafka Streams کپی می‌کند و اطمینان حاصل می‌کند که همه داده‌ها بدون توجه به اینکه کلید مربوط به کدام پارتیشن است، در دسترس هستند.

در ادامه ویژگی Kafka Streams را مشاهده خواهیم کرد که به لطف آن می‌توانیم تغییرات حالت را بدون مصرف داده‌های یک موضوع کافکا مشاهده کنیم.

5.3.5. وضعیت قابل استعلام

ما قبلاً چندین عملیات شامل حالت انجام داده‌ایم و همیشه نتایج را به کنسول (برای اهداف توسعه) خروجی می‌دهیم یا آنها را در یک موضوع (برای اهداف تولید) می‌نویسیم. هنگام نوشتن نتایج برای یک موضوع، باید از یک مصرف کننده کافکا برای مشاهده آنها استفاده کنید.

خواندن داده‌ها از این موضوعات را می‌توان نوعی دیدگاه تحقق‌یافته در نظر گرفت. برای اهداف خود، می‌توانیم از تعریف نمای واقعی از ویکی‌پدیا استفاده کنیم: «...یک شی پایگاه داده فیزیکی حاوی نتایج یک پرس و جو. برای مثال، می‌تواند یک کپی محلی از داده‌های راه دور، یا زیرمجموعه‌ای از ردیف‌ها و/یا ستون‌های یک جدول یا نتایج پیوسته، یا یک جدول خلاصه به‌دست‌آمده از طریق تجمیع باشد.» (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams همچنین به شما امکان می‌دهد پرس‌وجوهای تعاملی را در فروشگاه‌های دولتی اجرا کنید و به شما امکان می‌دهد مستقیماً این دیدگاه‌های تحقق‌یافته را بخوانید. توجه به این نکته مهم است که پرس و جو به فروشگاه state یک عملیات فقط خواندنی است. این تضمین می‌کند که در زمانی که برنامه شما در حال پردازش داده‌ها است، نگران ناسازگاری تصادفی وضعیت نباشید.

توانایی پرس و جو مستقیم از فروشگاه های حالت مهم است. این بدان معناست که شما می توانید برنامه های داشبورد را بدون نیاز به واکشی داده ها از مصرف کننده کافکا ایجاد کنید. همچنین با توجه به اینکه نیازی به نوشتن مجدد داده ها نیست، کارایی برنامه را افزایش می دهد:

  • به لطف محلی بودن داده ها، می توان به سرعت به آنها دسترسی داشت.
  • تکرار داده ها حذف می شود، زیرا در حافظه خارجی نوشته نشده است.

نکته اصلی که می‌خواهم به خاطر بسپارید این است که می‌توانید مستقیماً وضعیت را از داخل برنامه خود استعلام کنید. فرصت هایی که این به شما می دهد را نمی توان اغراق کرد. به جای مصرف داده‌های کافکا و ذخیره رکوردها در پایگاه داده برای برنامه، می‌توانید فروشگاه‌های حالت را با همان نتیجه جستجو کنید. پرس و جوهای مستقیم به فروشگاه های ایالتی به معنای کد کمتر (بدون مصرف کننده) و نرم افزار کمتر (بدون نیاز به جدول پایگاه داده برای ذخیره نتایج) است.

ما در این فصل مقدار زیادی از زمینه را پوشش داده ایم، بنابراین بحث خود را در مورد پرس و جوهای تعاملی در برابر فروشگاه های دولتی فعلاً ترک می کنیم. اما نگران نباشید: در فصل 9، ما یک برنامه داشبورد ساده با پرس و جوهای تعاملی ایجاد خواهیم کرد. از برخی از مثال‌های این فصل و فصل‌های قبلی برای نشان دادن پرسش‌های تعاملی و نحوه اضافه کردن آنها به برنامه‌های Kafka Streams استفاده می‌کند.

خلاصه

  • اشیاء KStream جریان‌هایی از رویدادها را نشان می‌دهند که با درج‌های موجود در پایگاه داده قابل مقایسه هستند. اشیاء KTable نشان دهنده جریان های به روز رسانی هستند، بیشتر شبیه به روز رسانی های یک پایگاه داده. اندازه شی KTable بزرگ نمی شود، رکوردهای قدیمی با موارد جدید جایگزین می شوند.
  • اشیاء KTable برای عملیات تجمیع مورد نیاز هستند.
  • با استفاده از عملیات پنجره، می‌توانید داده‌های جمع‌آوری شده را به سطل‌های زمانی تقسیم کنید.
  • به لطف اشیاء GlobalKTable، بدون در نظر گرفتن پارتیشن بندی، می توانید به داده های مرجع در هر نقطه از برنامه دسترسی داشته باشید.
  • اتصالات بین اشیاء KStream، KTable و GlobalKTable امکان پذیر است.

تا کنون، ما بر روی ساخت برنامه های Kafka Streams با استفاده از KStream DSL سطح بالا تمرکز کرده ایم. اگرچه رویکرد سطح بالا به شما امکان می دهد برنامه های منظم و مختصر ایجاد کنید، استفاده از آن نشان دهنده یک معامله است. کار با DSL KStream به معنای افزایش مختصر کد شما با کاهش درجه کنترل است. در فصل بعدی، API گره کنترل کننده سطح پایین را بررسی خواهیم کرد و سایر مبادلات را امتحان خواهیم کرد. برنامه ها طولانی تر از قبل خواهند بود، اما ما می توانیم تقریباً هر گره کنترل کننده ای را که ممکن است نیاز داشته باشیم ایجاد کنیم.

← جزییات بیشتر در مورد کتاب را می توانید در این آدرس بیابید وب سایت ناشر

→ برای Habrozhiteli 25٪ تخفیف با استفاده از کوپن - جریان های کافکا

← پس از پرداخت نسخه کاغذی کتاب، کتاب الکترونیکی از طریق ایمیل ارسال می شود.

منبع: www.habr.com

اضافه کردن نظر