Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“ Sveiki, Khabro gyventojai! Ši knyga tinka bet kuriam kūrėjui, norinčiam suprasti gijų apdorojimą. Paskirstyto programavimo supratimas padės geriau suprasti Kafka ir Kafka srautus. Būtų malonu žinoti patį Kafka karkasą, bet tai nėra būtina: aš jums pasakysiu viską, ko jums reikia. Patyrę Kafka kūrėjai ir naujokai išmoks sukurti įdomias srautų apdorojimo programas naudodami Kafka Streams biblioteką šioje knygoje. Vidutinio lygio ir pažengę Java kūrėjai, jau susipažinę su tokiomis sąvokomis kaip serializacija, išmoks pritaikyti savo įgūdžius kurdami Kafka Streams programas. Knygos šaltinio kodas parašytas Java 8 kalba ir gerokai panaudota Java 8 lambda išraiškos sintaksė, todėl žinoti, kaip dirbti su lambda funkcijomis (net ir kita programavimo kalba), pravers.

Ištrauka. 5.3. Agregavimo ir langų operacijos

Šiame skyriuje tyrinėsime perspektyviausias Kafka Streams dalis. Iki šiol aptarėme šiuos Kafka Streams aspektus:

  • apdorojimo topologijos kūrimas;
  • būsenos naudojimas srautinio perdavimo programose;
  • duomenų srautų jungčių vykdymas;
  • skirtumai tarp įvykių srautų (KStream) ir atnaujinimo srautų (KTable).

Tolesniuose pavyzdžiuose sujungsime visus šiuos elementus. Taip pat sužinosite apie langų kūrimą – kitą puikią srautinio perdavimo programų funkciją. Pirmasis mūsų pavyzdys bus paprastas agregavimas.

5.3.1. Parduotų atsargų apibendrinimas pagal pramonės sektorius

Apibendrinimas ir grupavimas yra gyvybiškai svarbūs įrankiai dirbant su srautinio perdavimo duomenimis. Atskirų įrašų ištyrimo, kai jie gaunami, dažnai nepakanka. Norint išgauti papildomą informaciją iš duomenų, būtina juos sugrupuoti ir sujungti.

Šiame pavyzdyje apsivilksite dienos prekybininko, kuriam reikia sekti kelių pramonės šakų įmonių akcijų pardavimo apimtis, kostiumą. Tiksliau, jus domina penkios įmonės, kurių pardavimas yra didžiausias kiekvienoje pramonės šakoje.

Norint išversti duomenis į norimą formą (kalbant bendrais bruožais), reikės atlikti kelis šiuos veiksmus.

  1. Sukurkite temą pagrįstą šaltinį, kuriame skelbiama neapdorota akcijų prekybos informacija. Turėsime susieti StockTransaction tipo objektą su ShareVolume tipo objektu. Esmė ta, kad StockTransaction objekte yra pardavimo metaduomenys, bet mums reikia duomenų tik apie parduodamų akcijų skaičių.
  2. Grupuokite ShareVolume duomenis pagal akcijų simbolį. Sugrupavus pagal simbolį šiuos duomenis galite sutraukti į atsargų pardavimo apimties tarpines sumas. Verta paminėti, kad KStream.groupBy metodas grąžina KGroupedStream tipo egzempliorių. Ir jūs galite gauti KTable egzempliorių toliau iškvietę KGroupedStream.reduce metodą.

Kas yra KGroupedStream sąsaja

KStream.groupBy ir KStream.groupByKey metodai grąžina KGroupedStream egzempliorių. KGroupedStream yra tarpinis įvykių srauto atvaizdas po grupavimo pagal raktus. Jis visai neskirtas tiesioginiam darbui su juo. Vietoj to, KGroupedStream naudojama agregavimo operacijoms, kurių rezultatas visada yra KTable. Kadangi agregavimo operacijų rezultatas yra KT lentelė ir jie naudoja būsenos saugyklą, gali būti, kad ne visi atnaujinimai bus siunčiami toliau.

Metodas KTable.groupBy grąžina panašią KGroupedTable – tarpinį atnaujinimų srauto atvaizdą, pergrupuotą pagal raktą.

Padarykime trumpą pertraukėlę ir pažiūrėkime į pav. 5.9, kuris parodo, ką pasiekėme. Ši topologija jums jau turėtų būti labai pažįstama.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Dabar pažiūrėkime į šios topologijos kodą (jį galima rasti faile src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (5.2 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Pateiktas kodas išsiskiria trumpumu ir didele kelių eilučių veiksmų apimtimi. Pirmajame builder.stream metodo parametre galite pastebėti ką nors naujo: enum tipo AutoOffsetReset.EARLIEST reikšmę (yra ir LATEST), nustatyta naudojant Consumed.withOffsetResetPolicy metodą. Šis sąrašo tipas gali būti naudojamas norint nurodyti poslinkio nustatymo iš naujo strategiją kiekvienam KStream arba KTable ir turi viršenybę prieš poslinkio nustatymo iš naujo parinktį iš konfigūracijos.

GroupByKey ir GroupBy

KStream sąsaja turi du įrašų grupavimo būdus: GroupByKey ir GroupBy. Abi pateikia KGroupedTable, todėl jums gali kilti klausimas, kuo jie skiriasi ir kada kurią naudoti?

GroupByKey metodas naudojamas, kai KStream raktai jau nėra tušti. Ir, svarbiausia, vėliavėlė „reikalauja pakartotinio skaidymo“ niekada nebuvo nustatyta.

„GroupBy“ metodas daro prielaidą, kad pakeitėte grupavimo raktus, todėl perskirstymo vėliavėlė nustatyta į „true“. Atliekant sujungimus, sujungimus ir t. t. po GroupBy metodo, bus automatiškai perskirstytas skaidymas.
Santrauka: kai tik įmanoma, naudokite GroupByKey, o ne GroupBy.

Aišku, ką daro metodai mapValues ​​ir groupBy, todėl pažvelkime į metodą sum() (rasti src/main/java/bbejeck/model/ShareVolume.java) (5.3 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
ShareVolume.sum metodas grąžina einamąją akcijų pardavimo apimties sumą, o visos skaičiavimų grandinės rezultatas yra KTable objektas . Dabar jūs suprantate, kokį vaidmenį atlieka KTable. Kai gaunami ShareVolume objektai, atitinkamas KTable objektas išsaugo naujausią dabartinį naujinimą. Svarbu atsiminti, kad visi atnaujinimai atsispindi ankstesniame shareVolumeKTable, bet ne visi siunčiami toliau.

Tada naudojame šią KT lentelę, kad apibendrintume (pagal parduodamų akcijų skaičių), kad gautume penkias įmones, kurių kiekvienoje pramonės šakoje parduodamas didžiausias akcijų kiekis. Mūsų veiksmai šiuo atveju bus panašūs į tuos, kurie buvo atlikti pirmą kartą.

  1. Norėdami sugrupuoti atskirus ShareVolume objektus pagal pramonės šaką, atlikite kitą groupBy operaciją.
  2. Pradėkite apibendrinti ShareVolume objektus. Šį kartą agregavimo objektas yra fiksuoto dydžio prioriteto eilė. Šioje fiksuoto dydžio eilėje lieka tik penkios daugiausiai parduotų akcijų parduotos įmonės.
  3. Susiekite ankstesnės pastraipos eiles su eilutės reikšme ir grąžinkite penkias populiariausias akcijas pagal skaičių pagal pramonės šaką.
  4. Įrašykite rezultatus eilutės forma į temą.

Fig. 5.10 paveiksle parodytas duomenų srauto topologijos grafikas. Kaip matote, antrasis apdorojimo etapas yra gana paprastas.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Dabar, kai aiškiai suprantame šio antrojo apdorojimo etapo struktūrą, galime kreiptis į jo šaltinio kodą (jį rasite faile src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (5.4 sąrašas) .

Šiame iniciatoriuje yra „fixQueue“ kintamasis. Tai tinkintas objektas, kuris yra java.util.TreeSet adapteris, naudojamas sekti geriausius N rezultatus mažėjančia prekiaujamų akcijų tvarka.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Jau matėte „groupBy“ ir „mapValues“ iškvietimus, todėl į juos nesigilinsime (kviečiame KTable.toStream metodą, nes KTable.print metodas nebenaudojamas). Tačiau dar nematėte aggregate() KTable versijos, todėl skirsime šiek tiek laiko tai aptardami.

Kaip prisimenate, KTable skiriasi tuo, kad įrašai su tais pačiais raktais laikomi atnaujinimais. KTable pakeičia seną įrašą nauju. Sumavimas vyksta panašiai: sujungiami naujausi įrašai su tuo pačiu raktu. Kai gaunamas įrašas, jis pridedamas prie FixedSizePriorityQueue klasės egzemplioriaus naudojant sumatorių (antrasis parametras agreguoto metodo iškvietime), tačiau jei jau yra kitas įrašas su tuo pačiu raktu, senasis įrašas pašalinamas naudojant atimtuvą (trečias parametras suminio metodo iškvietimas).

Visa tai reiškia, kad mūsų agregatorius FixedSizePriorityQueue nesujungia visų reikšmių vienu raktu, o išsaugo slenkančią N dažniausiai parduodamų akcijų tipų kiekių sumą. Kiekviename gaunamame įraše nurodomas bendras iki šiol parduotų akcijų skaičius. KTable suteiks informacijos apie tai, kurių įmonių akcijomis šiuo metu prekiaujama daugiausiai, nereikalaujant slenkamojo kiekvieno atnaujinimo agregavimo.

Išmokome daryti du svarbius dalykus:

  • sugrupuokite reikšmes KTtable bendru raktu;
  • atlikti naudingas šių sugrupuotų verčių operacijas, pvz., apibendrinimą ir agregavimą.

Žinojimas, kaip atlikti šias operacijas, yra svarbus norint suprasti duomenų, judančių per Kafka Streams programą, prasmę ir suprasti, kokią informaciją jie turi.

Taip pat sujungėme keletą pagrindinių sąvokų, aptartų anksčiau šioje knygoje. 4 skyriuje aptarėme, kaip gedimams atspari vietinė būsena yra svarbi srautinio perdavimo programai. Pirmasis šio skyriaus pavyzdys parodė, kodėl vietinė valstybė yra tokia svarbi – ji leidžia sekti, kokią informaciją jau matėte. Vietinė prieiga leidžia išvengti tinklo vėlavimų, todėl programa tampa našesnė ir atsparesnė klaidoms.

Atlikdami bet kokią apibendrinimo ar agregavimo operaciją, turite nurodyti būsenos parduotuvės pavadinimą. Apibendrinimo ir agregavimo operacijos grąžina KTable egzempliorių, o KTable naudoja būsenos saugyklą, kad pakeistų senus rezultatus naujais. Kaip matėte, ne visi naujinimai siunčiami, ir tai svarbu, nes agregavimo operacijos yra skirtos pateikti suvestinę informaciją. Jei netaikote vietinės valstijos, KTable persiųs visus agregavimo ir apibendrinimo rezultatus.

Toliau apžvelgsime tokių operacijų, kaip agregavimas, atlikimą per tam tikrą laikotarpį – vadinamąsias langų operacijas.

5.3.2. Langų operacijos

Ankstesniame skyriuje pristatėme slankiąją konvoliuciją ir agregaciją. Taikant programą buvo atliktas nuolatinis akcijų pardavimo apibendrinimas, po kurio buvo sujungtos penkios dažniausiai biržoje prekiaujamos akcijos.

Kartais toks nuolatinis rezultatų agregavimas ir apibendrinimas yra būtinas. O kartais operacijas reikia atlikti tik per tam tikrą laikotarpį. Pavyzdžiui, apskaičiuokite, kiek mainų sandorių buvo atlikta su konkrečios įmonės akcijomis per pastarąsias 10 minučių. Arba kiek vartotojų spustelėjo naują reklaminį skydelį per pastarąsias 15 minučių. Programa tokias operacijas gali atlikti kelis kartus, bet su rezultatais, kurie taikomi tik tam tikram laikotarpiui (laiko langams).

Pirkėjo keitimo sandorių skaičiavimas

Kitame pavyzdyje stebėsime akcijų sandorius tarp kelių prekybininkų – didelių organizacijų arba protingų individualių finansininkų.

Yra dvi galimos šio stebėjimo priežastys. Vienas iš jų – būtinybė žinoti, ką rinkos lyderiai perka/parduoda. Jei šie dideli žaidėjai ir patyrę investuotojai mato galimybę, prasminga laikytis jų strategijos. Antroji priežastis – noras pastebėti bet kokius galimus nelegalios prekybos viešai neatskleista informacija požymius. Norėdami tai padaryti, turėsite išanalizuoti didelių pardavimų šuolių ryšį su svarbiais pranešimais spaudai.

Toks stebėjimas susideda iš šių veiksmų:

  • sukurti srautą skaitymui iš akcijų sandorių temos;
  • gaunamų įrašų grupavimas pagal pirkėjo ID ir akcijų simbolį. Metodo groupBy iškvietimas grąžina KGroupedStream klasės egzempliorių;
  • Metodas KGroupedStream.windowedBy grąžina duomenų srautą, apribotą laiko langu, kuris leidžia langų agregaciją. Priklausomai nuo lango tipo, grąžinamas TimeWindowedKStream arba SessionWindowedKStream;
  • sandorių skaičius agregavimo operacijai. Duomenų srautas su langais nustato, ar skaičiuojant atsižvelgiama į konkretų įrašą;
  • rašyti rezultatus į temą arba išvesti juos į konsolę kūrimo metu.

Šios programos topologija paprasta, tačiau aiškus jos vaizdas būtų naudingas. Pažvelkime į pav. 5.11.

Toliau apžvelgsime lango operacijų funkcionalumą ir atitinkamą kodą.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“

Langų tipai

„Kafka Streams“ yra trijų tipų langai:

  • seansinis;
  • „dūžtantis“ (dūžtantis);
  • slydimas / šokinėjimas.

Kurį pasirinkti, priklauso nuo jūsų verslo reikalavimų. Slenkančių ir šokinėjančių langų laikas yra ribotas, o seanso langai yra apriboti naudotojo veikla – seanso (-ų) trukmė nustatoma tik pagal vartotojo aktyvumą. Svarbiausia atsiminti, kad visi langų tipai yra pagrįsti įrašų datos ir laiko žymomis, o ne sistemos laiku.

Toliau įdiegiame savo topologiją su kiekvienu langų tipu. Visas kodas bus pateiktas tik pirmame pavyzdyje kitų tipų langams niekas nepasikeis, išskyrus lango veikimo tipą.

Sesijos langai

Seanso langai labai skiriasi nuo visų kitų langų tipų. Juos riboja ne tiek laikas, kiek vartotojo veikla (arba subjekto, kurį norite sekti, veikla). Seanso langai yra ribojami neveiklumo laikotarpiais.

5.12 pav. parodyta seanso langų samprata. Mažesnė sesija bus sujungta su seansu, esančia jos kairėje. O seansas dešinėje bus atskiras, nes vyksta po ilgo neveiklumo laikotarpio. Seanso langai yra pagrįsti vartotojo veikla, bet naudokite datos / laiko žymes iš įrašų, kad nustatytumėte, kuriai sesijai įrašas priklauso.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“

Seanso langų naudojimas akcijų sandoriams stebėti

Naudokime seansų langus informacijai apie mainų operacijas užfiksuoti. Seansų langų įgyvendinimas parodytas 5.5 sąraše (kurį rasite src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Jūs jau matėte daugumą šios topologijos operacijų, todėl nereikia jų dar kartą čia žiūrėti. Tačiau čia taip pat yra keletas naujų elementų, kuriuos dabar aptarsime.

Bet kuri operacija groupBy paprastai atlieka tam tikrą agregavimo operaciją (sujungimą, apibendrinimą arba skaičiavimą). Galite atlikti kaupiamąjį agregavimą su einamuoju sumumu arba langų agregaciją, kuri atsižvelgia į įrašus per nurodytą laiko langą.

5.5 sąraše esantis kodas skaičiuoja operacijų skaičių seanso languose. Fig. 5.13 šie veiksmai yra analizuojami žingsnis po žingsnio.

Iškvietę windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) sukuriame seanso langą su 20 sekundžių neveiklumo intervalu ir 15 minučių pastovumo intervalu. 20 sekundžių tuščiosios eigos intervalas reiškia, kad programa į dabartinę (aktyvią) seansą įtrauks bet kokį įrašą, gautą per 20 sekundžių nuo dabartinės sesijos pabaigos arba pradžios.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Toliau seanso lange nurodome, kokią agregavimo operaciją reikia atlikti – šiuo atveju suskaičiuokite. Jei gaunamas įrašas nepatenka į neveiklumo langą (bet kurioje datos / laiko žymos pusėje), programa sukuria naują seansą. Saugojimo intervalas reiškia seanso palaikymą tam tikrą laiką ir leidžia pavėluoti duomenis, kurie tęsiasi po seanso neveiklumo laikotarpio, bet vis tiek gali būti pridedami. Be to, naujos sesijos, atsiradusios dėl sujungimo, pradžia ir pabaiga atitinka anksčiausią ir naujausią datos / laiko žymą.

Pažiūrėkime į kelis skaičiavimo metodo įrašus, kad pamatytume, kaip veikia seansai (5.1 lentelė).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Kai gaunami įrašai, ieškome esamų seansų su tuo pačiu raktu, kurių pabaigos laikas yra mažesnis nei dabartinė datos / laiko žyma – neveiklumo intervalas ir pradžios laikas, didesnis nei dabartinė datos / laiko žyma + neveiklumo intervalas. Atsižvelgiant į tai, keturi įrašai iš lentelės. 5.1 yra sujungiami į vieną seansą taip.

1. Pirmas ateina 1 įrašas, todėl pradžios laikas yra lygus pabaigos laikui ir yra 00:00:00.

2. Toliau ateina 2 įrašas, ir mes ieškome seansų, kurie baigiasi ne anksčiau kaip 23:59:55 ir prasideda ne vėliau kaip 00:00:35. Surandame 1 įrašą ir sujungiame 1 ir 2 seansus. Paimame 1 seanso pradžios laiką (anksčiau) ir 2 seanso pabaigos laiką (vėliau), kad mūsų nauja sesija prasidėtų 00:00:00 ir baigtųsi 00:00: 15:XNUMX.

3. Atvyksta 3 įrašas, ieškome seansų nuo 00:00:30 iki 00:01:10 ir nerandame. Pridėkite antrą rakto seansą 123-345-654,FFBE, prasidedantį ir baigiant 00:00:50.

4. Atvyksta 4 įrašas ir mes ieškome seansų nuo 23:59:45 iki 00:00:25. Šį kartą randami ir 1, ir 2 seansai, kurių pradžios laikas yra 00:00:00 ir pabaigos laikas 00:00:15.

Iš to, kas aprašyta šiame skyriuje, verta prisiminti šiuos svarbius niuansus:

  • sesijos nėra fiksuoto dydžio langai. Užsiėmimo trukmę lemia veikla per tam tikrą laikotarpį;
  • Duomenų datos ir laiko žymos nustato, ar įvykis patenka į esamą seansą, ar per neaktyvumo laikotarpį.

Toliau aptarsime kitą langų tipą – „būktančius“ langus.

„Dunkstantys“ langai

Dūžtantys langai fiksuoja įvykius, kurie patenka į tam tikrą laikotarpį. Įsivaizduokite, kad reikia fiksuoti visus tam tikros įmonės akcijų sandorius kas 20 sekundžių, todėl renkate visus įvykius per tą laikotarpį. Pasibaigus 20 sekundžių intervalui, langas apsiverčia ir pereina į naują 20 sekundžių stebėjimo intervalą. Šią situaciją iliustruoja 5.14 pav.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Kaip matote, visi įvykiai, gauti per paskutines 20 sekundžių, yra įtraukti į langą. Pasibaigus šiam laikotarpiui, sukuriamas naujas langas.

5.6 sąraše rodomas kodas, rodantis, kad slenkantys langai naudojami akcijų sandoriams užfiksuoti kas 20 sekundžių (rasta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Atlikę šį nedidelį TimeWindows.of metodo iškvietimo pakeitimą, galite naudoti slenkantį langą. Šis pavyzdys neiškviečia iki() metodo, todėl bus naudojamas numatytasis 24 valandų saugojimo intervalas.

Galiausiai atėjo laikas pereiti prie paskutinės lango parinkčių – langų „šokinėjimo“.

Stumdomi („šokantys“) langai

Stumdomi / šokinėjantys langai yra panašūs į besiverčiančius langus, tačiau šiek tiek skiriasi. Stumdomi langai nelaukia iki laiko intervalo pabaigos prieš sukurdami naują langą naujausiems įvykiams apdoroti. Jie pradeda naujus skaičiavimus po laukimo intervalo, trumpesnio nei lango trukmė.

Norėdami iliustruoti slenkančių ir šokinėjančių langų skirtumus, grįžkime prie biržos sandorių skaičiavimo pavyzdžio. Mūsų tikslas vis dar yra suskaičiuoti operacijų skaičių, bet nenorime laukti viso laiko prieš atnaujindami skaitiklį. Vietoj to skaitiklį atnaujinsime trumpesniais intervalais. Pavyzdžiui, operacijų skaičių vis tiek skaičiuosime kas 20 sekundžių, bet skaitiklį atnaujinsime kas 5 sekundes, kaip parodyta pav. 5.15. Tokiu atveju gauname tris rezultatų langus su persidengiančiais duomenimis.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
5.7 sąraše rodomas slankiojančių langų apibrėžimo kodas (rastas src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Slenkantį langą galima konvertuoti į šuoliuojantį langą, pridedant iškvietimą į advanceBy() metodą. Pateiktame pavyzdyje išsaugojimo intervalas yra 15 minučių.

Šiame skyriuje matėte, kaip apriboti agregavimo rezultatus iki laiko langų. Visų pirma noriu, kad iš šio skyriaus prisimintumėte šiuos tris dalykus:

  • seanso langų dydį riboja ne laikotarpis, o vartotojo veikla;
  • „bunkantys“ langai suteikia įvykių per tam tikrą laikotarpį apžvalgą;
  • Peršokančių langų trukmė yra fiksuota, tačiau jie dažnai atnaujinami ir visuose languose gali būti sutampančių įrašų.

Toliau sužinosime, kaip KT lentelę paversti atgal į KStream ryšiui.

5.3.3. KStream ir KTable objektų sujungimas

4 skyriuje aptarėme dviejų KStream objektų sujungimą. Dabar turime išmokti sujungti KTable ir KStream. To gali prireikti dėl šios paprastos priežasties. KStream yra įrašų srautas, o KTable yra įrašų naujinimų srautas, tačiau kartais gali tekti pridėti papildomo konteksto įrašų srautui naudojant naujinimus iš KTable.

Paimkime duomenis apie biržos sandorių skaičių ir sujungkime juos su biržos naujienomis atitinkamoms pramonės šakoms. Štai ką turite padaryti, kad tai pasiektumėte, atsižvelgiant į jau turimą kodą.

  1. Konvertuokite KTable objektą su duomenimis apie akcijų operacijų skaičių į KStream, tada pakeiskite raktą raktu, nurodančiu pramonės sektorių, atitinkantį šį akcijų simbolį.
  2. Sukurkite KTable objektą, kuris nuskaito duomenis iš temos su biržos naujienomis. Ši nauja KT lentelė bus suskirstyta pagal pramonės sektorių.
  3. Susiekite naujienas su informacija apie biržos sandorių skaičių pagal pramonės sektorių.

Dabar pažiūrėkime, kaip įgyvendinti šį veiksmų planą.

Konvertuoti KTable į KStream

Norėdami konvertuoti KTable į KStream, turite atlikti šiuos veiksmus.

  1. Iškvieskite KTable.toStream() metodą.
  2. Iškviesdami KStream.map metodą, pakeiskite raktą pramonės pavadinimu, tada gaukite objektą TransactionSummary iš langinio egzemplioriaus.

Šias operacijas sujungsime taip (kodą rasite faile src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.8 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Kadangi atliekame KStream.map operaciją, grąžintas KStream egzempliorius yra automatiškai perskirstomas, kai jis naudojamas ryšiui.

Baigėme konvertavimo procesą, tada turime sukurti KTable objektą akcijų naujienoms skaityti.

KTtable kūrimas akcijų naujienoms

Laimei, norint sukurti KTable objektą, reikia tik vienos kodo eilutės (kodą galima rasti src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.9 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Verta paminėti, kad Serde objektų nurodyti nereikia, nes nustatymuose naudojama eilutė Serdes. Be to, naudojant ANKSČIAUSĮ sąrašą, lentelė įrašais užpildoma pačioje pradžioje.

Dabar galime pereiti prie paskutinio žingsnio – ryšio.

Naujienų atnaujinimų susiejimas su operacijų skaičiaus duomenimis

Užmegzti ryšį nėra sunku. Naudosime kairįjį sujungimą, jei nėra atitinkamos pramonės naujienų (būtiną kodą rasite faile src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.10 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Šis leftJoin operatorius yra gana paprastas. Skirtingai nuo sujungimų 4 skyriuje, JoinWindow metodas nenaudojamas, nes atliekant KStream-KTable sujungimą, kiekvienam raktui KTlentelėje yra tik vienas įrašas. Toks ryšys nėra ribojamas laike: įrašas yra KT lentelėje arba jo nėra. Pagrindinė išvada: naudodami KTable objektus galite praturtinti KStream rečiau atnaujinamais informaciniais duomenimis.

Dabar pažvelgsime į efektyvesnį būdą, kaip praturtinti įvykius iš KStream.

5.3.4. GlobalKTable objektai

Kaip matote, reikia praturtinti įvykių srautus arba pridėti prie jų konteksto. 4 skyriuje matėte ryšius tarp dviejų KStream objektų, o ankstesniame skyriuje matėte ryšį tarp KStream ir KTable. Visais šiais atvejais būtina iš naujo skaidyti duomenų srautą, kai raktai susiejami su nauju tipu ar verte. Kartais perskirstymas atliekamas aiškiai, o kartais „Kafka Streams“ tai atlieka automatiškai. Perskirstymas yra būtinas, nes pasikeitė raktai ir įrašai turi atsidurti naujose sekcijose, kitaip ryšys bus neįmanomas (apie tai buvo kalbama 4 skyriuje, 4.2.4 poskyrio skyriuje „Duomenų perskirstymas“).

Pakartotinis skaidymas kainuoja

Perskirstymas reikalauja išlaidų – papildomų išteklių sąnaudų kuriant tarpines temas, saugoti pasikartojančius duomenis kitoje temoje; tai taip pat reiškia padidėjusį delsą dėl šios temos rašymo ir skaitymo. Be to, jei reikia sujungti daugiau nei vieną aspektą ar dimensiją, turite sujungti sujungimus, susieti įrašus su naujais raktais ir dar kartą paleisti skaidymo procesą.

Prisijungimas prie mažesnių duomenų rinkinių

Kai kuriais atvejais jungiamų pamatinių duomenų kiekis yra palyginti mažas, todėl visos jų kopijos gali lengvai tilpti kiekviename mazge. Tokiose situacijose „Kafka Streams“ teikia „GlobalKTable“ klasę.

GlobalKTable egzemplioriai yra unikalūs, nes programa pakartoja visus duomenis kiekviename mazge. Ir kadangi visi duomenys yra kiekviename mazge, nereikia skaidyti įvykių srauto pagal nuorodos duomenų raktą, kad jis būtų prieinamas visiems skaidiniams. Taip pat galite atlikti berakčius sujungimus naudodami GlobalKTable objektus. Norėdami parodyti šią funkciją, grįžkime prie vieno iš ankstesnių pavyzdžių.

KStream objektų prijungimas prie GlobalKTable objektų

5.3.2 poskyryje atlikome pirkėjų mainų operacijų langų agregavimą. Šios agregacijos rezultatai atrodė maždaug taip:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Nors šie rezultatai atitiko tikslą, būtų buvę naudingiau, jei būtų buvę rodomas ir kliento vardas bei visas įmonės pavadinimas. Norėdami pridėti kliento vardą ir įmonės pavadinimą, galite atlikti įprastus sujungimus, tačiau turėsite atlikti du pagrindinius susiejimus ir pertvarkyti. Naudodami GlobalKTable galite išvengti tokių operacijų išlaidų.

Norėdami tai padaryti, naudosime objektą countStream iš 5.11 sąrašo (atitinkamą kodą rasite src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) ir prijungsime jį prie dviejų GlobalKTable objektų.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Apie tai jau diskutavome anksčiau, todėl nekartosiu. Tačiau atkreipiu dėmesį, kad toStream().map funkcijos kodas yra abstrahuotas į funkcijos objektą, o ne į eilutę lambda išraišką, kad būtų lengviau skaityti.

Kitas žingsnis yra paskelbti du GlobalKTable egzempliorius (rodomą kodą galima rasti faile src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (5.12 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“

Atkreipkite dėmesį, kad temų pavadinimai aprašomi naudojant išvardytus tipus.

Dabar, kai visi komponentai yra paruošti, belieka parašyti ryšio kodą (kurį rasite faile src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (5.13 sąrašas).

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Nors šiame kode yra du sujungimai, jie yra sujungti į grandinę, nes nė vienas iš jų rezultatų nenaudojamas atskirai. Rezultatai rodomi visos operacijos pabaigoje.

Vykdydami aukščiau pateiktą sujungimo operaciją gausite tokius rezultatus:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Esmė nepasikeitė, tačiau šie rezultatai atrodo aiškesni.

Jei skaičiuosite iki 4 skyriaus, jau matėte kelių tipų ryšius. Jie išvardyti lentelėje. 5.2. Šioje lentelėje pateikiamos Kafka Streams 1.0.0 versijos ryšio galimybės; Ateities leidimuose kažkas gali pasikeisti.

Knyga „Kafkos srautai veikia. Programos ir mikropaslaugos darbui realiuoju laiku“
Norėdami baigti, apibendrinkite pagrindus: galite prijungti įvykių srautus (KStream) ir atnaujinti srautus (KTable) naudodami vietinę būseną. Arba, jei atskaitos duomenų dydis nėra per didelis, galite naudoti objektą GlobalKTable. „GlobalKTables“ atkartoja visus skaidinius kiekviename „Kafka Streams“ programos mazge, užtikrindama, kad visi duomenys būtų prieinami, nepaisant to, kurį skaidinį atitinka raktas.

Toliau pamatysime Kafka Streams funkciją, kurios dėka galėsime stebėti būsenos pokyčius nenaudodami duomenų iš Kafkos temos.

5.3.5. Užklausa būsena

Jau atlikome keletą operacijų, susijusių su būsena, ir visada išvedame rezultatus į konsolę (kurimo tikslais) arba įrašome į temą (gamybos tikslais). Rašydami rezultatus į temą, norėdami juos peržiūrėti, turite naudoti Kafka vartotoją.

Šių temų duomenų skaitymas gali būti laikomas materializuotų požiūrių tipu. Savo tikslams galime naudoti materializuoto rodinio apibrėžimą iš Vikipedijos: „...fizinis duomenų bazės objektas, kuriame yra užklausos rezultatai. Pavyzdžiui, tai gali būti vietinė nuotolinių duomenų kopija, lentelės eilučių ir (arba) stulpelių poaibis arba rezultatų sujungimas, arba suvestinė lentelė, gauta apibendrinant“ (https://en.wikipedia.org/wiki /Materializuotas_vaizdas).

„Kafka Streams“ taip pat leidžia vykdyti interaktyvias užklausas valstijos parduotuvėse, leidžiančias tiesiogiai skaityti šiuos materializuotus rodinius. Svarbu pažymėti, kad užklausa valstybės parduotuvei yra tik skaitoma operacija. Tai užtikrina, kad jums nereikės jaudintis dėl netyčinio būsenos nenuoseklumo, kai programa apdoroja duomenis.

Galimybė tiesiogiai pateikti užklausą valstybės parduotuvėse yra svarbi. Tai reiškia, kad galite kurti prietaisų skydelio programas iš pradžių negavę duomenų iš Kafka vartotojo. Tai taip pat padidina programos efektyvumą dėl to, kad nereikia dar kartą rašyti duomenų:

  • dėl duomenų buvimo vietos juos galima greitai pasiekti;
  • pašalinamas duomenų dubliavimas, nes jie neįrašomi į išorinę saugyklą.

Svarbiausias dalykas, kurį noriu atsiminti, yra tai, kad galite tiesiogiai pateikti užklausą apie būseną savo programoje. Neįmanoma pervertinti galimybių, kurias tai suteikia. Užuot vartoję duomenis iš Kafka ir saugoję įrašus programos duomenų bazėje, galite pateikti būsenos saugyklų užklausą su tuo pačiu rezultatu. Tiesioginės užklausos į būsenų saugyklas reiškia mažiau kodo (nėra vartotojo) ir mažiau programinės įrangos (nereikia duomenų bazės lentelės rezultatams saugoti).

Šiame skyriuje apžvelgėme nemažai, todėl kol kas paliksime aptarimą apie interaktyvias užklausas prieš valstybines parduotuves. Tačiau nesijaudinkite: 9 skyriuje sukursime paprastą prietaisų skydelio programą su interaktyviomis užklausomis. Jame bus naudojami kai kurie pavyzdžiai iš šio ir ankstesnių skyrių, kad parodytų interaktyvias užklausas ir kaip galite jas įtraukti į Kafka Streams programas.

Santrauka

  • KStream objektai yra įvykių srautai, panašūs į įterpimus į duomenų bazę. KTable objektai yra naujinimų srautai, labiau panašūs į duomenų bazės naujinimus. KTable objekto dydis neauga, seni įrašai keičiami naujais.
  • KTable objektai reikalingi agregavimo operacijoms.
  • Naudodami langų operacijas galite suskirstyti sukauptus duomenis į laiko segmentus.
  • „GlobalKTable“ objektų dėka galite pasiekti atskaitos duomenis bet kurioje programos vietoje, nepaisant skaidymo.
  • Galimi ryšiai tarp KStream, KTable ir GlobalKTable objektų.

Iki šiol daugiausia dėmesio skyrėme Kafka Streams programų kūrimui naudojant aukšto lygio KStream DSL. Nors aukšto lygio metodas leidžia kurti tvarkingas ir glaustas programas, jo naudojimas reiškia kompromisą. Darbas su DSL KStream reiškia padidinti kodo glaustumą sumažinant valdymo laipsnį. Kitame skyriuje apžvelgsime žemo lygio tvarkyklės mazgo API ir išbandysime kitus kompromisus. Programos bus ilgesnės nei buvo anksčiau, tačiau galėsime sukurti beveik bet kokį tvarkyklės mazgą, kurio mums gali prireikti.

→ Daugiau informacijos apie knygą rasite adresu leidėjo svetainė

→ Habrozhiteli 25% nuolaida naudojant kuponą - Kafkos srautai

→ Sumokėjus už popierinę knygos versiją, elektroninė knyga bus atsiųsta el.

Šaltinis: www.habr.com

Добавить комментарий