“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar" Salom, Khabro aholisi! Ushbu kitob ipni qayta ishlashni tushunishni istagan har qanday dasturchi uchun javob beradi. Tarqalgan dasturlashni tushunish Kafka va Kafka oqimlarini yaxshiroq tushunishga yordam beradi. Kafka ramkasining o'zini bilish yaxshi bo'lardi, lekin bu shart emas: men sizga kerak bo'lgan hamma narsani aytib beraman. Tajribali Kafka ishlab chiquvchilari ham, yangi boshlanuvchilar ham ushbu kitobda Kafka Streams kutubxonasidan foydalangan holda qiziqarli oqimlarni qayta ishlash ilovalarini yaratishni o'rganadilar. Serializatsiya kabi tushunchalar bilan tanish bo'lgan o'rta va ilg'or Java dasturchilari Kafka Streams ilovalarini yaratishda o'z ko'nikmalarini qo'llashni o'rganadilar. Kitobning manba kodi Java 8 da yozilgan va Java 8 lambda ifodasi sintaksisidan sezilarli darajada foydalanadi, shuning uchun lambda funksiyalari bilan ishlashni bilish (hatto boshqa dasturlash tilida ham) foydali bo'ladi.

Ko'chirma. 5.3. Birlashtirish va oynalash operatsiyalari

Ushbu bo'limda biz Kafka oqimlarining eng istiqbolli qismlarini o'rganishga harakat qilamiz. Hozircha biz Kafka oqimlarining quyidagi jihatlarini ko‘rib chiqdik:

  • ishlov berish topologiyasini yaratish;
  • oqimli ilovalarda davlatdan foydalanish;
  • ma'lumotlar oqimi ulanishlarini amalga oshirish;
  • hodisa oqimlari (KStream) va yangilanish oqimlari (KTable) o'rtasidagi farqlar.

Quyidagi misollarda biz ushbu elementlarning barchasini birlashtiramiz. Oqimli ilovalarning yana bir ajoyib xususiyati bo‘lgan oynalash haqida ham bilib olasiz. Bizning birinchi misolimiz oddiy yig'ish bo'ladi.

5.3.1. Sanoat tarmoqlari bo'yicha aktsiyalarni sotish yig'indisi

Birlashtirish va guruhlash oqimli ma'lumotlar bilan ishlashda muhim vositadir. Qabul qilingan shaxsiy yozuvlarni tekshirish ko'pincha etarli emas. Ma'lumotlardan qo'shimcha ma'lumot olish uchun ularni guruhlash va birlashtirish kerak.

Ushbu misolda siz bir nechta sohalardagi kompaniyalar aktsiyalarini sotish hajmini kuzatishi kerak bo'lgan kunlik treyderning kostyumini kiyasiz. Xususan, siz har bir sohada eng katta ulush sotiladigan beshta kompaniyaga qiziqasiz.

Bunday yig'ish ma'lumotlarni kerakli shaklga (umumiy so'z bilan aytganda) tarjima qilish uchun quyidagi bir necha bosqichlarni talab qiladi.

  1. Xom birja savdosi ma'lumotlarini nashr etadigan mavzuga asoslangan manba yarating. StockTransaction turidagi ob'ektni ShareVolume tipidagi ob'ektga solishtirishimiz kerak bo'ladi. Gap shundaki, StockTransaction ob'ekti savdo metama'lumotlarini o'z ichiga oladi, ammo bizga faqat sotilayotgan aksiyalar soni haqidagi ma'lumotlar kerak.
  2. ShareHajm ma'lumotlarini birja belgisi bo'yicha guruhlang. Belgilar bo'yicha guruhlangandan so'ng, siz ushbu ma'lumotlarni aktsiyalarni sotish hajmining oraliq jamilariga yig'ishingiz mumkin. Shuni ta'kidlash kerakki, KStream.groupBy usuli KGroupedStream tipidagi misolni qaytaradi. KGroupedStream.reduce usuliga qo'ng'iroq qilib, KTable misolini olishingiz mumkin.

KGroupedStream interfeysi nima

KStream.groupBy va KStream.groupByKey usullari KGroupedStream misolini qaytaradi. KGroupedStream - bu tugmalar bo'yicha guruhlangandan keyin voqealar oqimining oraliq tasviri. U bilan bevosita ishlash uchun umuman mo'ljallanmagan. Buning o'rniga KGroupedStream yig'ish operatsiyalari uchun ishlatiladi, bu har doim KTablega olib keladi. Va yig'ish operatsiyalari natijasi KTable bo'lgani va ular davlat do'konidan foydalanganligi sababli, natijada barcha yangilanishlar quvur liniyasiga yuborilmasligi mumkin.

KTable.groupBy usuli shunga o'xshash KGroupedTable-ni qaytaradi - kalit bo'yicha qayta guruhlangan yangilanishlar oqimining oraliq tasviri.

Keling, qisqa tanaffus qilib, rasmga qaraylik. 5.9, bu biz erishgan narsalarni ko'rsatadi. Ushbu topologiya sizga allaqachon tanish bo'lishi kerak.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Keling, ushbu topologiyaning kodini ko'rib chiqaylik (uni src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java faylida topish mumkin) (5.2 ro'yxat).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Berilgan kod o'zining qisqaligi va bir nechta satrlarda bajariladigan katta hajmdagi harakatlar bilan ajralib turadi. Siz builder.stream usulining birinchi parametrida yangi narsani sezishingiz mumkin: Consumed.withOffsetResetPolicy usuli yordamida o'rnatilgan AutoOffsetReset.EARLIEST (shuningdek, LATEST ham bor) enum turidagi qiymat. Ushbu sanab turi har bir KStream yoki KTable uchun ofsetni tiklash strategiyasini belgilash uchun ishlatilishi mumkin va konfiguratsiyadagi ofsetni tiklash opsiyasidan ustun turadi.

GroupByKey va GroupBy

KStream interfeysida yozuvlarni guruhlashning ikkita usuli mavjud: GroupByKey va GroupBy. Ikkalasi ham KGroupedTable-ni qaytaradi, shuning uchun ular orasidagi farq nima va qaysi birini qachon ishlatish kerakligi haqida savol tug'ilishi mumkin.

GroupByKey usuli KStream-dagi kalitlar allaqachon bo'sh bo'lmaganda qo'llaniladi. Va eng muhimi, "qayta bo'linishni talab qiladi" bayrog'i hech qachon o'rnatilmagan.

GroupBy usuli siz guruhlash kalitlarini o'zgartirgan deb hisoblaydi, shuning uchun qayta bo'lim bayrog'i rost deb o'rnatiladi. GroupBy usulidan so'ng birlashma, yig'ish va hokazolarni bajarish avtomatik ravishda qayta bo'linishga olib keladi.
Xulosa: Iloji bo'lsa, GroupBy o'rniga GroupByKey dan foydalaning.

mapValues ​​va groupBy usullari nima qilishi aniq, shuning uchun keling, sum() usulini ko'rib chiqamiz (src/main/java/bbejeck/model/ShareVolume.java) (5.3 ro'yxati).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
ShareVolume.sum usuli aktsiyalarni sotish hajmining amaldagi jamini qaytaradi va butun hisob-kitoblar zanjiri natijasi KTable ob'ektidir. . Endi siz KTable qanday rol o'ynashini tushunasiz. ShareVolume ob'ektlari kelganda, tegishli KTable ob'ekti so'nggi joriy yangilanishni saqlaydi. Shuni esda tutish kerakki, barcha yangilanishlar oldingi ShareVolumeKTable-da aks ettirilgan, ammo barchasi boshqa yuborilmaydi.

Keyin biz ushbu KTable-dan har bir sohada eng ko'p sotilgan aksiyalar hajmiga ega bo'lgan beshta kompaniyaga kirish uchun (sotilgan aksiyalar soni bo'yicha) foydalanamiz. Bu holatda bizning harakatlarimiz birinchi yig'ish uchun harakatlarga o'xshash bo'ladi.

  1. Ayrim ShareVolume obyektlarini sanoat bo‘yicha guruhlash uchun boshqa groupBy operatsiyasini bajaring.
  2. ShareVolume obyektlarini umumlashtirishni boshlang. Bu safar yig'ish obyekti qat'iy o'lchamdagi ustuvor navbatdir. Ushbu qat'iy o'lchamdagi navbatda, eng ko'p sotilgan aktsiyalari bo'lgan beshta kompaniyagina saqlanib qoladi.
  3. Oldingi paragrafdagi navbatlarni satr qiymatiga moslang va sanoat bo'yicha eng ko'p sotiladigan beshta qimmatli qog'ozlarni qaytaring.
  4. Natijalarni mavzuga satr shaklida yozing.

Shaklda. 5.10-rasmda ma'lumotlar oqimi topologiyasi grafigi ko'rsatilgan. Ko'rib turganingizdek, ishlov berishning ikkinchi bosqichi juda oddiy.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Endi biz ushbu qayta ishlashning ikkinchi bosqichining tuzilishini aniq tushunganimizdan so'ng, biz uning manba kodiga murojaat qilishimiz mumkin (siz uni src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java faylida topasiz) (5.4-listing) .

Ushbu ishga tushirgichda fixedQueue o'zgaruvchisi mavjud. Bu java.util.TreeSet uchun adapter bo'lgan maxsus ob'ekt bo'lib, u sotilgan aktsiyalarning kamayish tartibida eng yaxshi N natijalarini kuzatish uchun ishlatiladi.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Siz allaqachon groupBy va mapValues ​​qo'ng'iroqlarini ko'rgansiz, shuning uchun biz ularga kirmaymiz (biz KTable.toStream usulini chaqiramiz, chunki KTable.print usuli eskirgan). Lekin siz hali aggregate() ning KTable versiyasini ko‘rmagansiz, shuning uchun biz buni muhokama qilishga biroz vaqt ajratamiz.

Esingizda bo'lsa, KTable-ning farqi shundaki, bir xil kalitlarga ega yozuvlar yangilanish hisoblanadi. KTable eski yozuvni yangisi bilan almashtiradi. Birlashtirish xuddi shunday tarzda sodir bo'ladi: bir xil kalitga ega so'nggi yozuvlar jamlanadi. Yozuv kelganda, u FixedSizePriorityQueue klassi misoliga qo'shuvchi yordamida qo'shiladi (jamlangan usul chaqiruvidagi ikkinchi parametr), lekin xuddi shu kalit bilan boshqa yozuv allaqachon mavjud bo'lsa, eski yozuv ayiruvchi (uchinchi parametrda) yordamida o'chiriladi. agregat usuli chaqiruvi).

Bu shuni anglatadiki, bizning FixedSizePriorityQueue agregatorimiz barcha qiymatlarni bitta kalit bilan jamlamaydi, balki N eng ko'p sotiladigan aksiyalar miqdorining harakatlanuvchi summasini saqlaydi. Har bir kiruvchi yozuv hozirgacha sotilgan aksiyalarning umumiy sonini o'z ichiga oladi. KTable hozirda qaysi kompaniyalarning aksiyalari eng ko'p sotilayotgani haqida ma'lumot beradi, bunda har bir yangilanishni birlashtirishni talab qilmaydi.

Biz ikkita muhim narsani qilishni o'rgandik:

  • KTabledagi qiymatlarni umumiy kalit orqali guruhlash;
  • ushbu guruhlangan qiymatlar bo'yicha yig'ish va yig'ish kabi foydali operatsiyalarni bajaring.

Ushbu operatsiyalarni qanday bajarishni bilish, Kafka Streams ilovasi orqali harakatlanadigan ma'lumotlarning ma'nosini tushunish va u qanday ma'lumotlarni olib borishini tushunish uchun muhimdir.

Shuningdek, biz ushbu kitobda avval muhokama qilingan asosiy tushunchalarni birlashtirdik. 4-bobda biz oqimli dastur uchun nosozliklarga chidamli, mahalliy holat qanchalik muhimligi haqida gapirdik. Ushbu bobdagi birinchi misol mahalliy davlat nima uchun bunchalik muhimligini ko'rsatdi - bu sizga qanday ma'lumotlarni ko'rganingizni kuzatib borish imkoniyatini beradi. Mahalliy kirish tarmoqdagi kechikishlardan qochadi, bu esa dasturni yanada samarali va xatolarga chidamli qiladi.

Har qanday yig'ish yoki yig'ish operatsiyasini bajarishda siz davlat do'konining nomini ko'rsatishingiz kerak. Birlashtirish va yig'ish operatsiyalari KTable namunasini qaytaradi va KTable eski natijalarni yangilari bilan almashtirish uchun davlat xotirasidan foydalanadi. Ko'rib turganingizdek, barcha yangilanishlar ham bir vaqtning o'zida yuborilmaydi va bu muhim, chunki yig'ish operatsiyalari umumiy ma'lumotni ishlab chiqarish uchun mo'ljallangan. Agar siz mahalliy holatni qo'llamasangiz, KTable barcha yig'ish va yig'ish natijalarini yo'naltiradi.

Keyinchalik, ma'lum bir vaqt oralig'ida yig'ish kabi operatsiyalarni bajarishni ko'rib chiqamiz - oynalash operatsiyalari.

5.3.2. Oyna operatsiyalari

Oldingi bo'limda biz toymasin konvolyutsiya va agregatsiya bilan tanishdik. Ilova birjada eng ko'p sotiladigan beshta aktsiyalarni jamlashdan so'ng aktsiyalarni sotishni doimiy ravishda to'plashni amalga oshirdi.

Ba'zan bunday doimiy yig'ish va natijalarni yig'ish kerak bo'ladi. Va ba'zida operatsiyalarni faqat ma'lum vaqt oralig'ida bajarish kerak. Misol uchun, so'nggi 10 daqiqada ma'lum bir kompaniyaning aktsiyalari bilan qancha ayirboshlash operatsiyalari amalga oshirilganligini hisoblang. Yoki oxirgi 15 daqiqada qancha foydalanuvchi yangi reklama banneriga bosgan. Ilova bunday operatsiyalarni bir necha marta bajarishi mumkin, ammo natijalar faqat ma'lum vaqt oralig'ida (vaqt oynalari) amal qiladi.

Xaridor tomonidan ayirboshlash operatsiyalarini hisoblash

Keyingi misolda biz bir nechta treyderlar - yirik tashkilotlar yoki aqlli individual moliyachilar bo'yicha birja operatsiyalarini kuzatib boramiz.

Ushbu kuzatuvning ikkita mumkin bo'lgan sababi bor. Ulardan biri bozor rahbarlari nimani sotib / sotayotganini bilish zarurati. Agar bu yirik o'yinchilar va murakkab investorlar imkoniyatni ko'rsalar, ularning strategiyasiga amal qilish mantiqan. Ikkinchi sabab - noqonuniy insayder savdosining mumkin bo'lgan belgilarini aniqlash istagi. Buni amalga oshirish uchun siz yirik savdo ko'rsatkichlarining muhim press-relizlar bilan bog'liqligini tahlil qilishingiz kerak bo'ladi.

Bunday kuzatuv quyidagi bosqichlardan iborat:

  • birja operatsiyalari mavzusini o'qish uchun oqim yaratish;
  • kiruvchi yozuvlarni xaridor identifikatori va aktsiya belgisi bo'yicha guruhlash. groupBy usulini chaqirish KGroupedStream sinfining namunasini qaytaradi;
  • KGroupedStream.windowedBy usuli vaqt oynasi bilan cheklangan ma'lumotlar oqimini qaytaradi, bu esa oynali yig'ish imkonini beradi. Oyna turiga qarab TimeWindowedKStream yoki SessionWindowedKStream qaytariladi;
  • yig'ish operatsiyasi uchun tranzaktsiyalar soni. Oynali ma'lumotlar oqimi ushbu hisobda ma'lum bir yozuvning hisobga olinishini aniqlaydi;
  • mavzuga natijalar yozish yoki ishlab chiqish jarayonida ularni konsolga chiqarish.

Ushbu ilovaning topologiyasi oddiy, ammo uning aniq tasviri foydali bo'ladi. Keling, rasmni ko'rib chiqaylik. 5.11.

Keyinchalik, oyna operatsiyalarining funksionalligini va tegishli kodni ko'rib chiqamiz.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"

Deraza turlari

Kafka oqimlarida uchta turdagi derazalar mavjud:

  • sessiya;
  • "yiqilib tushish";
  • sirpanish/sakrash.

Qaysi birini tanlash sizning biznesingiz talablariga bog'liq. O'chirish va sakrash oynalari vaqt bilan cheklangan, seans oynalari esa foydalanuvchi faoliyati bilan cheklangan - sessiya(lar)ning davomiyligi faqat foydalanuvchi qanchalik faol ekanligiga qarab belgilanadi. Esda tutish kerak bo'lgan asosiy narsa shundaki, barcha oyna turlari tizim vaqtiga emas, balki yozuvlarning sana/vaqt belgilariga asoslangan.

Keyinchalik, biz har bir oyna turi bilan topologiyamizni amalga oshiramiz. To'liq kod faqat birinchi misolda beriladi, boshqa turdagi oynalar uchun oynaning ishlash turidan tashqari hech narsa o'zgarmaydi.

Seans oynalari

Sessiya oynalari boshqa barcha turdagi oynalardan juda farq qiladi. Ular vaqt bilan emas, balki foydalanuvchi faoliyati (yoki siz kuzatmoqchi bo'lgan ob'ekt faoliyati) bilan cheklangan. Seans oynalari harakatsizlik davrlari bilan chegaralanadi.

5.12-rasmda seans oynalari tushunchasi tasvirlangan. Kichikroq seans o'zining chap tomonidagi sessiya bilan birlashadi. Va o'ngdagi sessiya alohida bo'ladi, chunki u uzoq vaqt davomida harakatsizlikni kuzatib boradi. Seans oynalari foydalanuvchi faoliyatiga asoslanadi, lekin yozuv qaysi seansga tegishli ekanligini aniqlash uchun yozuvlardagi sana/vaqt belgilaridan foydalaning.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"

Birja operatsiyalarini kuzatish uchun sessiya oynalaridan foydalanish

Birja operatsiyalari haqida ma'lumot olish uchun sessiya oynalaridan foydalanamiz. Seans oynalarini amalga oshirish Listing 5.5 da ko'rsatilgan (uni src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java da topish mumkin).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Siz ushbu topologiyadagi operatsiyalarning aksariyatini allaqachon ko'rgansiz, shuning uchun ularni bu erda qayta ko'rib chiqishning hojati yo'q. Ammo bu erda bir nechta yangi elementlar ham bor, biz ularni hozir muhokama qilamiz.

Har qanday groupBy operatsiyasi odatda qandaydir yig'ish operatsiyalarini (jamlash, yig'ish yoki hisoblash) amalga oshiradi. Ishlayotgan jami bilan jamlangan yig'ishni yoki belgilangan vaqt oynasidagi yozuvlarni hisobga oladigan oynalarni yig'ishni amalga oshirishingiz mumkin.

Listing 5.5dagi kod sessiya oynalaridagi tranzaktsiyalar sonini hisoblaydi. Shaklda. 5.13 Ushbu harakatlar bosqichma-bosqich tahlil qilinadi.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) ga qo'ng'iroq qilish orqali biz 20 soniya harakatsizlik oralig'i va 15 daqiqa davomiylik oralig'i bilan sessiya oynasini yaratamiz. 20 sekundlik bo'sh vaqt oralig'i ilova joriy seans tugaganidan yoki boshlanganidan keyin 20 soniya ichida kelgan har qanday yozuvni joriy (faol) seansga kiritishini bildiradi.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Keyinchalik, biz seans oynasida qanday yig'ish operatsiyasini bajarish kerakligini aniqlaymiz - bu holda, hisoblash. Agar kiruvchi yozuv harakatsizlik oynasidan tashqariga tushsa (sana/vaqt tamg'asining har ikki tomoni), dastur yangi seans yaratadi. Saqlash oralig'i sessiyani ma'lum vaqt davomida ushlab turishni anglatadi va sessiyaning harakatsizlik davridan tashqariga cho'zilgan, lekin hali ham biriktirilishi mumkin bo'lgan kech ma'lumotlarga imkon beradi. Bundan tashqari, birlashish natijasida paydo bo'lgan yangi seansning boshlanishi va oxiri eng erta va oxirgi sana/vaqt tamg'asiga to'g'ri keladi.

Seanslar qanday ishlashini ko'rish uchun hisoblash usulidan bir nechta yozuvlarni ko'rib chiqamiz (5.1-jadval).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Yozuvlar kelganda, biz bir xil kalit bilan mavjud sessiyalarni qidiramiz, tugash vaqti joriy sana/vaqt belgisidan kamroq - harakatsizlik oralig'i va joriy sana/vaqt tamg'asi + harakatsizlik oralig'idan kattaroq boshlanish vaqti. Buni hisobga olgan holda, jadvaldan to'rtta yozuv. 5.1 quyidagi tarzda bitta sessiyaga birlashtiriladi.

1. Yozuv 1 birinchi bo'lib keladi, shuning uchun boshlanish vaqti tugash vaqtiga teng va 00:00:00.

2. Keyin 2-kiritma keladi va biz 23:59:55 dan erta tugamaydigan va 00:00:35 dan kech bo'lmagan sessiyalarni qidiramiz. Biz 1-rekordni topamiz va 1 va 2-sessiyalarni birlashtiramiz. Biz 1-sessiyaning boshlanish vaqtini (avvalgi) va 2-sessiyaning tugash vaqtini (keyinroq) olamiz, shunda bizning yangi sessiyamiz 00:00:00 da boshlanadi va 00 da tugaydi: 00:15.

3. Record 3 keladi, biz 00:00:30 va 00:01:10 oralig'ida sessiyalarni qidiramiz va hech birini topa olmadik. 123:345:654 da boshlanadigan va tugaydigan 00-00-50,FFBE kaliti uchun ikkinchi seansni qo'shing.

4. Record 4 keladi va biz 23:59:45 va 00:00:25 oralig'ida sessiyalarni qidirmoqdamiz. Bu safar ikkala seans 1 va 2 topildi. Uchala seans bittaga birlashtirilib, boshlanish vaqti 00:00:00 va tugash vaqti 00:00:15.

Ushbu bo'limda tasvirlanganlardan quyidagi muhim nuanslarni esga olish kerak:

  • seanslar qattiq o'lchamli oynalar emas. Sessiyaning davomiyligi ma'lum vaqt oralig'idagi faoliyat bilan belgilanadi;
  • Ma'lumotlardagi sana/vaqt belgilari hodisaning mavjud sessiyaga yoki bo'sh vaqtga to'g'ri kelishini aniqlaydi.

Keyin biz keyingi oyna turini - "tumbling" oynalarni muhokama qilamiz.

"Tumbling" oynalari

Oynali oynalar ma'lum vaqt oralig'ida sodir bo'lgan voqealarni yozib oladi. Tasavvur qiling-a, siz har 20 soniyada ma'lum bir kompaniyaning barcha aksiyalarini yozib olishingiz kerak, shuning uchun siz o'sha vaqtdagi barcha voqealarni to'playsiz. 20 sekundlik intervalning oxirida oyna ag'dariladi va yangi 20 soniyali kuzatish oralig'iga o'tadi. 5.14-rasmda bu holat tasvirlangan.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Ko'rib turganingizdek, oxirgi 20 soniya ichida olingan barcha voqealar oynaga kiritilgan. Ushbu davr oxirida yangi oyna yaratiladi.

Listing 5.6 har 20 soniyada birja tranzaktsiyalarini yozib olish uchun aylanma oynalardan foydalanishni ko'rsatadigan kodni ko'rsatadi (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java da topilgan).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
TimeWindows.of usuli chaqiruvidagi bu kichik o'zgarish bilan siz aylana oynasidan foydalanishingiz mumkin. Ushbu misol qadar() usulini chaqirmaydi, shuning uchun 24 soatlik standart saqlash oralig'i ishlatiladi.

Nihoyat, oxirgi oyna variantlariga o'tish vaqti keldi - "sakrash" oynalari.

Sürgülü ("sakrash") oynalar

Sürgülü/sakrab turuvchi oynalar ag'darilgan oynalarga o'xshaydi, lekin biroz farq qiladi. Sürgülü oynalar oxirgi voqealarni qayta ishlash uchun yangi oyna yaratishdan oldin vaqt oralig'ining tugashini kutmaydi. Ular oyna davomiyligidan kamroq kutish oralig'idan keyin yangi hisob-kitoblarni boshlaydilar.

Derazalarni ag'darish va sakrash o'rtasidagi farqni ko'rsatish uchun birja operatsiyalarini hisoblash misoliga qaytaylik. Bizning maqsadimiz hali ham tranzaktsiyalar sonini sanashdir, lekin hisoblagichni yangilashdan oldin butun vaqtni kutishni xohlamaymiz. Buning o'rniga biz hisoblagichni qisqaroq vaqt oralig'ida yangilaymiz. Misol uchun, biz hali ham har 20 soniyada tranzaktsiyalar sonini hisoblaymiz, lekin rasmda ko'rsatilganidek, hisoblagichni har 5 soniyada yangilaymiz. 5.15. Bunday holda, biz bir-biriga mos keladigan ma'lumotlarga ega uchta natija oynasiga ega bo'lamiz.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
5.7 ro'yxati surma oynalarni aniqlash uchun kodni ko'rsatadi (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java da topilgan).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
O'zgaruvchan oynani advanceBy() usuliga qo'ng'iroq qo'shish orqali sakrash oynasiga aylantirish mumkin. Ko'rsatilgan misolda saqlash oralig'i 15 minut.

Siz ushbu bo'limda yig'ish natijalarini vaqt oynalari bilan qanday cheklashni ko'rgansiz. Xususan, ushbu bo'limdan quyidagi uchta narsani eslab qolishingizni istayman:

  • seans oynalarining hajmi vaqt davri bilan emas, balki foydalanuvchi faoliyati bilan cheklangan;
  • "Tiblo" oynalari ma'lum bir vaqt oralig'idagi voqealar haqida umumiy ma'lumot beradi;
  • O'tish oynalarining davomiyligi belgilangan, lekin ular tez-tez yangilanadi va barcha oynalarda bir-biriga o'xshash yozuvlarni o'z ichiga olishi mumkin.

Keyinchalik, ulanish uchun KTable-ni KStream-ga qanday aylantirishni o'rganamiz.

5.3.3. KStream va KTable obyektlarini ulash

4-bobda biz ikkita KStream obyektini ulashni muhokama qildik. Endi biz KTable va KStreamni qanday ulashni o'rganishimiz kerak. Bu quyidagi oddiy sababga ko'ra kerak bo'lishi mumkin. KStream - bu yozuvlar oqimi va KTable - rekord yangilanishlar oqimi, lekin ba'zida siz KTable-dan yangilanishlar yordamida yozuv oqimiga qo'shimcha kontekstni qo'shishingiz mumkin.

Keling, birja operatsiyalari soni haqidagi ma'lumotlarni olib, ularni tegishli tarmoqlar uchun birja yangiliklari bilan birlashtiramiz. Sizda mavjud kodni hisobga olgan holda bunga erishish uchun nima qilishingiz kerak.

  1. Birja bitimlari soni haqidagi ma'lumotlarga ega KTable ob'ektini KStream ga aylantiring, so'ngra kalitni ushbu aksiya belgisiga mos keladigan sanoat sektorini ko'rsatadigan kalit bilan almashtiring.
  2. Birja yangiliklari bilan mavzu ma'lumotlarini o'qiydigan KTable ob'ektini yarating. Ushbu yangi KTable sanoat sektori bo'yicha tasniflanadi.
  3. Yangiliklar yangilanishlarini sanoat tarmoqlari bo'yicha birja operatsiyalari soni haqidagi ma'lumotlar bilan bog'lang.

Keling, ushbu harakat rejasini qanday amalga oshirishni ko'rib chiqaylik.

KTable-ni KStream-ga aylantiring

KTable-ni KStream-ga aylantirish uchun siz quyidagilarni bajarishingiz kerak.

  1. KTable.toStream() usulini chaqiring.
  2. KStream.map usulini chaqirish orqali kalitni sanoat nomi bilan almashtiring va keyin Windowed misolidan TransactionSummary obyektini oling.

Biz bu operatsiyalarni quyidagi tarzda birlashtiramiz (kodni src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java faylida topish mumkin) (5.8 ro'yxat).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Biz KStream.map operatsiyasini bajarayotganimiz sababli, qaytarilgan KStream namunasi ulanishda foydalanilganda avtomatik ravishda qayta bo'linadi.

Biz konvertatsiya jarayonini yakunladik, keyin birja yangiliklarini o'qish uchun KTable ob'ektini yaratishimiz kerak.

Birja yangiliklari uchun KTable yaratish

Yaxshiyamki, KTable obyektini yaratish faqat bitta kod qatorini oladi (kodni src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java da topish mumkin) (5.9 ro'yxati).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Shuni ta'kidlash kerakki, hech qanday Serde ob'ektini ko'rsatish talab qilinmaydi, chunki sozlamalarda Serdes qatoridan foydalaniladi. Bundan tashqari, EARLIEST ro'yxatidan foydalangan holda, jadval boshida yozuvlar bilan to'ldiriladi.

Endi biz oxirgi bosqichga o'tishimiz mumkin - ulanish.

Yangiliklar yangilanishlarini tranzaktsiyalar soni ma'lumotlari bilan ulash

Ulanishni yaratish qiyin emas. Tegishli sanoat uchun birja yangiliklari bo'lmagan taqdirda chapga qo'shilishdan foydalanamiz (kerakli kodni src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java faylida topish mumkin) (5.10 ro'yxati).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Bu leftJoin operatori juda oddiy. 4-bobdagi birlashmalardan farqli o'laroq, JoinWindow usuli qo'llanilmaydi, chunki KStream-KTable ulanishini amalga oshirishda har bir kalit uchun KTableda faqat bitta yozuv mavjud. Bunday ulanish vaqt bilan cheklanmaydi: yozuv KTableda yoki yo'q. Asosiy xulosa: KTable ob'ektlari yordamida siz KStream-ni kamroq yangilanadigan ma'lumotnoma ma'lumotlari bilan boyitishingiz mumkin.

Endi biz KStream-dan voqealarni boyitishning yanada samarali usulini ko'rib chiqamiz.

5.3.4. GlobalKTable ob'ektlari

Ko'rib turganingizdek, voqea oqimlarini boyitish yoki ularga kontekst qo'shish kerak. 4-bobda siz ikkita KStream ob'ekti orasidagi bog'lanishni ko'rdingiz va oldingi bo'limda KStream va KTable o'rtasidagi aloqani ko'rdingiz. Ushbu holatlarning barchasida kalitlarni yangi tur yoki qiymatga solishtirishda ma'lumotlar oqimini qayta bo'lish kerak. Ba'zan qayta qismlarga ajratish aniq amalga oshiriladi, ba'zan esa Kafka oqimlari buni avtomatik ravishda amalga oshiradi. Qayta qismlarga ajratish zarur, chunki kalitlar o'zgargan va yozuvlar yangi bo'limlarda tugashi kerak, aks holda ulanish imkonsiz bo'ladi (bu 4-bobda, 4.2.4-kichik bo'limdagi "Ma'lumotlarni qayta bo'lish" bo'limida muhokama qilingan).

Qayta qismlarga ajratish qimmatga tushadi

Qayta qismlarga ajratish xarajatlarni talab qiladi - oraliq mavzularni yaratish, boshqa mavzuda takroriy ma'lumotlarni saqlash uchun qo'shimcha resurs xarajatlari; shuningdek, ushbu mavzuni yozish va o'qish tufayli kechikishning kuchayganligini anglatadi. Bunga qo'shimcha ravishda, agar siz bir nechta jihat yoki o'lchov bo'ylab qo'shilishingiz kerak bo'lsa, siz birlashmalarni zanjirlashingiz, yozuvlarni yangi kalitlar bilan taqqoslashingiz va qayta qismlarga ajratish jarayonini qayta ishga tushirishingiz kerak.

Kichikroq ma'lumotlar to'plamlariga ulanish

Ba'zi hollarda ulanish uchun mos yozuvlar ma'lumotlarining hajmi nisbatan kichik, shuning uchun uning to'liq nusxalari har bir tugunga osongina joylashishi mumkin. Bu kabi holatlar uchun Kafka Streams GlobalKTable sinfini taqdim etadi.

GlobalKTable misollari noyobdir, chunki dastur barcha ma'lumotlarni tugunlarning har biriga takrorlaydi. Va barcha ma'lumotlar har bir tugunda mavjud bo'lganligi sababli, voqealar oqimini barcha bo'limlar uchun mavjud bo'lishi uchun mos yozuvlar ma'lumotlar kaliti bo'yicha bo'lishning hojati yo'q. GlobalKTable obyektlari yordamida kalitsiz ulanishlarni ham amalga oshirishingiz mumkin. Ushbu xususiyatni ko'rsatish uchun oldingi misollardan biriga qaytaylik.

KStream obyektlarini GlobalKTable obyektlariga ulash

5.3.2-kichik bo'limda biz xaridorlar tomonidan ayirboshlash operatsiyalarini oyna yig'ishini amalga oshirdik. Ushbu yig'ish natijalari quyidagicha ko'rindi:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Ushbu natijalar maqsadga xizmat qilgan bo'lsa-da, mijozning ismi va to'liq kompaniya nomi ham ko'rsatilsa, foydaliroq bo'lar edi. Mijoz nomini va kompaniya nomini qo'shish uchun siz oddiy birlashmalarni amalga oshirishingiz mumkin, lekin ikkita asosiy xaritalash va qayta bo'limlarni bajarishingiz kerak bo'ladi. GlobalKTable yordamida siz bunday operatsiyalarning narxidan qochishingiz mumkin.

Buning uchun 5.11 Listingdagi countStream obyektidan foydalanamiz (tegishli kodni src/main/java/bbejeck/chapter_5/GlobalKTableExample.java da topish mumkin) va uni ikkita GlobalKTable obyektiga ulaymiz.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Biz buni allaqachon muhokama qilganmiz, shuning uchun men buni takrorlamayman. Lekin shuni ta'kidlaymanki, toStream().map funksiyasidagi kod o'qilishi uchun inline lambda ifodasi o'rniga funksiya ob'ektiga abstrakt qilingan.

Keyingi qadam GlobalKTablening ikkita nusxasini e'lon qilishdir (ko'rsatilgan kodni src/main/java/bbejeck/chapter_5/GlobalKTableExample.java faylida topish mumkin) (5.12 ro'yxati).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"

E'tibor bering, mavzu nomlari sanab o'tilgan turlar yordamida tavsiflanadi.

Endi bizda barcha komponentlar tayyor, qolgani ulanish kodini yozish (uni src/main/java/bbejeck/chapter_5/GlobalKTableExample.java faylida topish mumkin) (5.13 roʻyxati).

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Ushbu kodda ikkita birlashma mavjud bo'lsa-da, ular zanjirlangan, chunki ularning natijalaridan hech biri alohida ishlatilmaydi. Natijalar butun operatsiya oxirida ko'rsatiladi.

Yuqoridagi qo'shilish operatsiyasini bajarganingizda, quyidagi natijalarga erishasiz:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Mohiyat o'zgarmadi, ammo bu natijalar aniqroq ko'rinadi.

Agar siz 4-bobgacha hisoblasangiz, siz allaqachon amalda bir nechta ulanish turlarini ko'rgansiz. Ular jadvalda keltirilgan. 5.2. Ushbu jadval Kafka oqimlarining 1.0.0 versiyasidagi ulanish imkoniyatlarini aks ettiradi; Kelgusi nashrlarda biror narsa o'zgarishi mumkin.

“Kafka oqimlari harakatda. Real vaqt rejimida ishlash uchun ilovalar va mikroservislar"
Xulosa qilish uchun, keling, asosiy narsalarni takrorlaymiz: mahalliy holatdan foydalanib, voqea oqimlarini (KStream) ulashingiz va oqimlarni yangilashingiz (KTable) mumkin. Shu bilan bir qatorda, agar mos yozuvlar ma'lumotlarining hajmi juda katta bo'lmasa, GlobalKTable ob'ektidan foydalanishingiz mumkin. GlobalKTables barcha bo'limlarni har bir Kafka Streams ilova tuguniga takrorlaydi va bu kalit qaysi bo'limga mos kelishidan qat'iy nazar barcha ma'lumotlar mavjudligini ta'minlaydi.

Keyinchalik biz Kafka oqimlari xususiyatini ko'ramiz, buning yordamida biz Kafka mavzusidagi ma'lumotlarni iste'mol qilmasdan holat o'zgarishlarini kuzatishimiz mumkin.

5.3.5. So'raladigan holat

Biz allaqachon davlat bilan bog'liq bir nechta operatsiyalarni bajarganmiz va har doim natijalarni konsolga chiqaramiz (ishlab chiqish uchun) yoki ularni mavzuga yozamiz (ishlab chiqarish maqsadlari uchun). Mavzuga natijalar yozishda ularni ko'rish uchun Kafka iste'molchisidan foydalanish kerak.

Ushbu mavzulardagi ma'lumotlarni o'qishni moddiylashtirilgan qarashlar turi deb hisoblash mumkin. Bizning maqsadlarimiz uchun biz Vikipediyadagi materiallashtirilgan ko'rinishning ta'rifidan foydalanishimiz mumkin: “...so'rov natijalarini o'z ichiga olgan jismoniy ma'lumotlar bazasi ob'ekti. Masalan, bu masofaviy ma'lumotlarning mahalliy nusxasi yoki jadval yoki qo'shilish natijalari satrlari va/yoki ustunlarining quyi to'plami yoki yig'ish orqali olingan yig'ma jadval bo'lishi mumkin" (https://en.wikipedia.org/wiki) /Materiallashtirilgan_ko'rinish).

Kafka Streams shuningdek, davlat do'konlarida interaktiv so'rovlarni bajarishga imkon beradi, bu esa ushbu moddiylashtirilgan ko'rinishlarni bevosita o'qish imkonini beradi. Shuni ta'kidlash kerakki, davlat do'koniga so'rov faqat o'qish uchun mo'ljallangan. Bu sizning ilovangiz ma'lumotlarni qayta ishlash vaqtida tasodifiy holatni nomuvofiq qilishdan tashvishlanishingizga hojat yo'qligini ta'minlaydi.

Davlat do'konlarini to'g'ridan-to'g'ri so'rash qobiliyati muhim ahamiyatga ega. Bu shuni anglatadiki, siz avval Kafka iste'molchisidan ma'lumotlarni olmasdan asboblar paneli ilovalarini yaratishingiz mumkin. Bundan tashqari, ma'lumotlarni qayta yozishning hojati yo'qligi sababli dastur samaradorligini oshiradi:

  • ma'lumotlarning joylashuvi tufayli ularga tezda kirish mumkin;
  • ma'lumotlarning takrorlanishi yo'q qilinadi, chunki ular tashqi xotiraga yozilmaydi.

Eslab qolishingiz kerak bo'lgan asosiy narsa shundaki, siz to'g'ridan-to'g'ri arizangizdan holatni so'rashingiz mumkin. Bu sizga beradigan imkoniyatlarni ortiqcha baholab bo'lmaydi. Kafka ma'lumotlarini iste'mol qilish va dastur uchun ma'lumotlar bazasida yozuvlarni saqlash o'rniga, xuddi shu natija bilan davlat do'konlarini so'rashingiz mumkin. Davlat do'konlariga to'g'ridan-to'g'ri so'rovlar kamroq kod (iste'molchi yo'q) va kamroq dasturiy ta'minot (natijalarni saqlash uchun ma'lumotlar bazasi jadvaliga ehtiyoj yo'q) degan ma'noni anglatadi.

Biz ushbu bobda juda ko'p narsalarni ko'rib chiqdik, shuning uchun biz hozircha davlat do'konlariga qarshi interaktiv so'rovlar muhokamasini qoldiramiz. Lekin tashvishlanmang: 9-bobda biz interaktiv so‘rovlar bilan oddiy boshqaruv paneli ilovasini yaratamiz. Bu interfaol so'rovlar va ularni Kafka Streams ilovalariga qanday qo'shish mumkinligini ko'rsatish uchun ushbu va oldingi boblardagi ba'zi misollardan foydalanadi.

Xulosa

  • KStream ob'ektlari ma'lumotlar bazasiga qo'shimchalar bilan taqqoslanadigan voqealar oqimlarini ifodalaydi. KTable ob'ektlari ma'lumotlar bazasini yangilash kabi yangilanish oqimlarini ifodalaydi. KTable ob'ektining hajmi o'smaydi, eski yozuvlar yangilari bilan almashtiriladi.
  • KTable ob'ektlari yig'ish operatsiyalari uchun talab qilinadi.
  • Derazalash operatsiyalari yordamida siz jamlangan ma'lumotlarni vaqt paqirlariga bo'lishingiz mumkin.
  • GlobalKTable ob'ektlari tufayli siz bo'linishdan qat'i nazar, ilovaning istalgan joyidan ma'lumotnoma ma'lumotlariga kirishingiz mumkin.
  • KStream, KTable va GlobalKTable ob'ektlari o'rtasida ulanishlar mumkin.

Hozircha biz yuqori darajadagi KStream DSL yordamida Kafka Streams ilovalarini yaratishga e'tibor qaratdik. Yuqori darajadagi yondashuv sizga aniq va ixcham dasturlarni yaratishga imkon bersa-da, undan foydalanish savdoni anglatadi. DSL KStream bilan ishlash nazorat darajasini pasaytirish orqali kodingizning ixchamligini oshirishni anglatadi. Keyingi bobda biz past darajadagi ishlov beruvchi tugun API-ni ko'rib chiqamiz va boshqa o'zaro kelishuvlarni sinab ko'ramiz. Dasturlar avvalgisiga qaraganda uzunroq bo'ladi, lekin biz kerak bo'lishi mumkin bo'lgan deyarli har qanday ishlov beruvchi tugunini yaratishimiz mumkin.

→ Kitob haqida batafsil ma'lumotni quyidagi manzilda topishingiz mumkin nashriyot sayti

→ Habrozhiteli uchun kupondan foydalangan holda 25% chegirma - Kafka oqimlari

→ Kitobning qog‘oz versiyasi uchun to‘lov amalga oshirilgandan so‘ng elektron kitob elektron pochta orqali yuboriladi.

Manba: www.habr.com

a Izoh qo'shish