Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam" Sveiki, Khabro iedzīvotāji! Šī grāmata ir piemērota ikvienam izstrādātājam, kurš vēlas izprast pavedienu apstrādi. Izpratne par izplatīto programmēšanu palīdzēs labāk izprast Kafka un Kafka straumes. Būtu jauki uzzināt pašu Kafka ietvaru, taču tas nav nepieciešams: es jums pastāstīšu visu, kas jums nepieciešams. Pieredzējuši Kafka izstrādātāji un iesācēji šajā grāmatā iemācīsies izveidot interesantas straumēšanas lietojumprogrammas, izmantojot Kafka Streams bibliotēku. Vidēja līmeņa un progresīvi Java izstrādātāji, kas jau pārzina tādus jēdzienus kā serializācija, iemācīsies pielietot savas prasmes, lai izveidotu Kafka Streams lietojumprogrammas. Grāmatas avota kods ir rakstīts Java 8 valodā un būtiski izmanto Java 8 lambda izteiksmes sintakse, tāpēc zināšanas, kā strādāt ar lambda funkcijām (pat citā programmēšanas valodā), noderēs.

Izvilkums. 5.3. Apkopošanas un logu operācijas

Šajā sadaļā mēs turpināsim izpētīt daudzsološākās Kafka Streams daļas. Līdz šim esam apskatījuši šādus Kafka straumju aspektus:

  • apstrādes topoloģijas izveide;
  • stāvokļa izmantošana straumēšanas lietojumprogrammās;
  • datu plūsmas savienojumu veikšana;
  • atšķirības starp notikumu straumēm (KStream) un atjaunināšanas straumēm (KTable).

Turpmākajos piemēros mēs apkoposim visus šos elementus. Jūs arī uzzināsit par logiem, kas ir vēl viena lieliska straumēšanas lietojumprogrammu funkcija. Mūsu pirmais piemērs būs vienkāršs apkopojums.

5.3.1. Krājumu realizācijas agregācija pa nozares sektoriem

Apkopošana un grupēšana ir būtiski rīki, strādājot ar straumēšanas datiem. Bieži vien nepietiek ar atsevišķu ierakstu pārbaudi to saņemšanas brīdī. Lai no datiem iegūtu papildu informāciju, tie ir jāgrupē un jāapvieno.

Šajā piemērā jūs uzvilksiet dienas tirgotāja tērpu, kuram ir jāseko vairāku nozaru uzņēmumu akciju pārdošanas apjomam. Konkrēti, jūs interesē pieci uzņēmumi ar lielāko pārdošanas daļu katrā nozarē.

Šādai apkopošanai būs jāveic vairākas šādas darbības, lai datus pārvērstu vēlamajā formā (runājot vispārīgi).

  1. Izveidojiet uz tēmu balstītu avotu, kas publicē neapstrādātu akciju tirdzniecības informāciju. Mums būs jākartē StockTransaction tipa objekts ar ShareVolume tipa objektu. Lieta tāda, ka StockTransaction objekts satur pārdošanas metadatus, bet mums ir nepieciešami tikai dati par pārdoto akciju skaitu.
  2. Grupējiet ShareVolume datus pēc akciju simbola. Kad šie dati ir sagrupēti pēc simbola, varat sakļaut šos datus akciju pārdošanas apjomu starpsummas. Ir vērts atzīmēt, ka KStream.groupBy metode atgriež KGroupedStream tipa gadījumu. Un jūs varat iegūt KTable instanci, tālāk izsaucot metodi KGroupedStream.reduce.

Kas ir KGroupedStream interfeiss

Metodes KStream.groupBy un KStream.groupByKey atgriež KGroupedStream gadījumu. KGroupedStream ir starpposma attēlojums notikumu straumei pēc grupēšanas pēc taustiņiem. Tas nepavisam nav paredzēts tiešam darbam ar to. Tā vietā KGroupedStream tiek izmantots apkopošanas operācijām, kuru rezultātā vienmēr tiek iegūta KTable. Un, tā kā apkopošanas darbību rezultāts ir KTable un tās izmanto stāvokļa krātuvi, iespējams, ka visi atjauninājumi rezultātā netiek nosūtīti tālāk konveijerā.

Metode KTable.groupBy atgriež līdzīgu KGroupedTable — atjauninājumu straumes starpposma attēlojumu, kas pārgrupēts pēc atslēgas.

Paņemsim nelielu pārtraukumu un apskatīsim att. 5.9, kas parāda, ko esam sasnieguši. Šai topoloģijai jums jau vajadzētu būt ļoti pazīstamai.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Tagad apskatīsim šīs topoloģijas kodu (to var atrast failā src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (5.2. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Dotais kods izceļas ar īsumu un lielo darbību apjomu vairākās rindās. Metodes builder.stream pirmajā parametrā var pamanīt ko jaunu: enum tipa AutoOffsetReset.EARLIEST vērtību (ir arī LATEST), kas iestatīta, izmantojot Consumed.withOffsetResetPolicy metodi. Šo uzskaitīšanas veidu var izmantot, lai norādītu nobīdes atiestatīšanas stratēģiju katram KStream vai KTable, un tam ir prioritāte pār nobīdes atiestatīšanas opciju no konfigurācijas.

GroupByKey un GroupBy

KStream saskarnē ir divas ierakstu grupēšanas metodes: GroupByKey un GroupBy. Abi atgriež KGroupedTable, tāpēc jums varētu rasties jautājums, kāda ir atšķirība starp tām un kad kuru izmantot?

Metode GroupByKey tiek izmantota, ja KStream atslēgas jau nav tukšas. Un vissvarīgākais ir tas, ka karodziņš “nepieciešama atkārtota sadalīšana” nekad netika iestatīts.

Metode GroupBy pieņem, ka esat mainījis grupēšanas atslēgas, tāpēc pārdalīšanas karodziņš ir iestatīts uz True. Veicot savienojumus, apkopojumus utt. pēc GroupBy metodes, tiks veikta automātiska atkārtota sadalīšana.
Kopsavilkums: kad vien iespējams, izmantojiet GroupByKey, nevis GroupBy.

Ir skaidrs, ko dara metodes mapValues ​​un groupBy, tāpēc apskatīsim metodi sum() (atrodams src/main/java/bbejeck/model/ShareVolume.java) (5.3. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Metode ShareVolume.sum atgriež krājumu pārdošanas apjoma tekošo kopsummu, un visas aprēķinu ķēdes rezultāts ir KTable objekts . Tagad jūs saprotat, kādu lomu spēlē KTable. Kad tiek saņemti ShareVolume objekti, atbilstošais KTable objekts saglabā jaunāko pašreizējo atjauninājumu. Ir svarīgi atcerēties, ka visi atjauninājumi ir atspoguļoti iepriekšējā shareVolumeKTable, taču ne visi tiek nosūtīti tālāk.

Pēc tam mēs izmantojam šo KT tabulu, lai apkopotu (pēc tirgoto akciju skaita), lai iegūtu piecus uzņēmumus ar vislielāko tirgoto akciju apjomu katrā nozarē. Mūsu darbības šajā gadījumā būs līdzīgas tām, kas veiktas pirmajā apkopojumā.

  1. Veiciet citu grupuBy darbību, lai grupētu atsevišķus ShareVolume objektus pēc nozares.
  2. Sāciet apkopot ShareVolume objektus. Šoreiz apkopošanas objekts ir noteikta izmēra prioritātes rinda. Šajā fiksētā lieluma rindā tiek saglabāti tikai pieci uzņēmumi ar lielāko pārdoto akciju apjomu.
  3. Kartējiet rindas no iepriekšējās rindkopas ar virknes vērtību un atgrieziet piecas visvairāk tirgotās akcijas pēc skaita pēc nozares.
  4. Ierakstiet rezultātus virknes formā tēmai.

Attēlā 5.10. attēlā parādīts datu plūsmas topoloģijas grafiks. Kā redzat, otrā apstrādes kārta ir pavisam vienkārša.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Tagad, kad mums ir skaidra izpratne par šīs otrās apstrādes kārtas struktūru, varam pievērsties tā pirmkodam (to atradīsit failā src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (5.4. saraksts) .

Šis inicializētājs satur mainīgo FixQueue. Šis ir pielāgots objekts, kas ir java.util.TreeSet adapteris, kas tiek izmantots, lai izsekotu N labākos rezultātus tirgoto akciju dilstošā secībā.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Jūs jau esat redzējis zvanus groupBy un mapValues, tāpēc mēs tajos neiedziļināsimies (mēs izsaucam KTable.toStream metodi, jo KTable.print metode ir novecojusi). Bet jūs vēl neesat redzējis aggregate() KTable versiju, tāpēc mēs veltīsim nedaudz laika, lai to apspriestu.

Kā jūs atceraties, KTable atšķiras ar to, ka ieraksti ar vienādām atslēgām tiek uzskatīti par atjauninājumiem. KTable aizstāj veco ierakstu ar jaunu. Apkopošana notiek līdzīgi: tiek apkopoti jaunākie ieraksti ar vienu un to pašu atslēgu. Kad tiek saņemts ieraksts, tas tiek pievienots FixedSizePriorityQueue klases instancei, izmantojot summatoru (otrais parametrs apkopotās metodes izsaukumā), bet, ja jau pastāv cits ieraksts ar to pašu atslēgu, tad vecais ieraksts tiek noņemts, izmantojot atņemtāju (trešais parametrs apkopotās metodes izsaukums).

Tas viss nozīmē, ka mūsu apkopotājs FixedSizePriorityQueue neapkopo visas vērtības ar vienu atslēgu, bet uzglabā mainīgu N visvairāk tirgoto akciju veidu daudzumu summu. Katrs ienākošais ieraksts satur kopējo līdz šim pārdoto akciju skaitu. KTable sniegs jums informāciju par to, kuru uzņēmumu akcijas pašlaik ir visvairāk tirgotas, neprasot katra atjauninājuma slīdošo apkopošanu.

Mēs iemācījāmies darīt divas svarīgas lietas:

  • grupēt vērtības KTtable ar kopēju atslēgu;
  • veikt noderīgas darbības, piemēram, apkopošanu un apkopošanu ar šīm grupētajām vērtībām.

Ir svarīgi zināt, kā veikt šīs darbības, lai izprastu Kafka Streams lietojumprogrammā pārvietoto datu nozīmi un saprastu, kādu informāciju tie nes.

Mēs esam arī apkopojuši dažus no galvenajiem jēdzieniem, kas tika apspriesti iepriekš šajā grāmatā. 4. nodaļā mēs apspriedām, cik straumēšanas lietojumprogrammai ir svarīgs vietējais stāvoklis pret kļūmēm. Pirmais piemērs šajā nodaļā parādīja, kāpēc vietējais štats ir tik svarīgs — tas sniedz jums iespēju izsekot informācijai, kuru jau esat redzējis. Vietējā piekļuve novērš tīkla aizkavi, padarot lietojumprogrammu efektīvāku un izturīgāku pret kļūdām.

Veicot jebkuru apkopošanas vai apkopošanas darbību, jānorāda štata veikala nosaukums. Apkopošanas un apkopošanas darbības atgriež KTable instanci, un KTable izmanto stāvokļa krātuvi, lai aizstātu vecos rezultātus ar jauniem. Kā redzējāt, ne visi atjauninājumi tiek nosūtīti, un tas ir svarīgi, jo apkopošanas darbības ir paredzētas kopsavilkuma informācijas iegūšanai. Ja nelietojat vietējo štatu, KTable pārsūtīs visus apkopojuma un apkopojuma rezultātus.

Tālāk mēs aplūkosim tādu darbību veikšanu kā apkopošana noteiktā laika periodā - tā sauktās logu darbības.

5.3.2. Logu operācijas

Iepriekšējā sadaļā mēs ieviesām slīdošo konvolūciju un apkopošanu. Lietojumprogramma veica nepārtrauktu akciju pārdošanas apjoma apkopojumu, kam sekoja piecu biržā visvairāk tirgoto akciju apkopošana.

Dažreiz šāda nepārtraukta rezultātu apkopošana un apkopošana ir nepieciešama. Un dažreiz jums ir jāveic darbības tikai noteiktā laika periodā. Piemēram, aprēķiniet, cik maiņas darījumu ar konkrēta uzņēmuma akcijām veikti pēdējo 10 minūšu laikā. Vai arī cik lietotāju pēdējo 15 minūšu laikā noklikšķināja uz jauna reklāmas reklāmkaroga. Lietojumprogramma var veikt šādas darbības vairākas reizes, bet ar rezultātiem, kas attiecas tikai uz noteiktiem laika periodiem (laika logi).

Pircēja maiņas darījumu uzskaite

Nākamajā piemērā mēs izsekosim akciju darījumus starp vairākiem tirgotājiem — vai nu lielām organizācijām, vai gudriem individuāliem finansētājiem.

Šai izsekošanai ir divi iespējamie iemesli. Viens no tiem ir nepieciešamība zināt, ko tirgus līderi pērk/pārdod. Ja šie lielie spēlētāji un izsmalcinātie investori redz iespēju, ir lietderīgi sekot viņu stratēģijai. Otrs iemesls ir vēlme pamanīt jebkādas iespējamās nelegālās iekšējās informācijas tirdzniecības pazīmes. Lai to izdarītu, jums būs jāanalizē lielo pārdošanas pieaugumu korelācija ar svarīgiem preses paziņojumiem.

Šāda izsekošana sastāv no šādām darbībām:

  • straumes izveidošana lasīšanai no temata akciju darījumi;
  • ienākošo ierakstu grupēšana pēc pircēja ID un akciju simbola. Izsaucot metodi groupBy, tiek atgriezts KGroupedStream klases gadījums;
  • Metode KGroupedStream.windowedBy atgriež datu straumi, kas ierobežota ar laika logu, kas pieļauj logu apkopošanu. Atkarībā no loga veida tiek atgriezts TimeWindowedKStream vai SessionWindowedKStream;
  • darījumu skaits apkopošanas operācijai. Logu datu plūsma nosaka, vai konkrētais ieraksts tiek ņemts vērā šajā skaitā;
  • rakstot rezultātus tēmai vai izvadot tos konsolē izstrādes laikā.

Šīs lietojumprogrammas topoloģija ir vienkārša, taču noderētu skaidrs priekšstats par to. Apskatīsim att. 5.11.

Tālāk mēs apskatīsim logu darbību funkcionalitāti un atbilstošo kodu.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"

Logu veidi

Pakalpojumā Kafka Streams ir trīs veidu logi:

  • sesijas;
  • “kļūšana”;
  • slīdēšana/lēkšana.

Kuru izvēlēties, ir atkarīgs no jūsu biznesa prasībām. Slēpšanas un lēkšanas logi ir ierobežoti laika ziņā, savukārt sesijas logus ierobežo lietotāja aktivitātes — sesijas(-u) ilgums tiek noteikts tikai pēc lietotāja aktivitātes. Galvenais, kas jāatceras, ir tas, ka visi logu veidi ir balstīti uz ierakstu datuma/laika zīmogiem, nevis sistēmas laiku.

Tālāk mēs ieviešam savu topoloģiju ar katru logu tipu. Pilns kods tiks dots tikai pirmajā piemērā, citiem logu veidiem nekas nemainīsies, izņemot loga darbības veidu.

Sesiju logi

Sesijas logi ļoti atšķiras no visiem citiem logu veidiem. Tos ierobežo ne tik daudz laika, cik lietotāja aktivitātes (vai tās entītijas darbības, kuru vēlaties izsekot). Sesiju logus ierobežo neaktivitātes periodi.

5.12. attēlā ir parādīts sesijas logu jēdziens. Mazākā sesija tiks apvienota ar sesiju pa kreisi. Un sesija labajā pusē būs atsevišķa, jo tā seko ilgam dīkstāves periodam. Sesijas logi ir balstīti uz lietotāja aktivitātēm, taču izmantojiet datuma/laika zīmogus no ierakstiem, lai noteiktu, kurai sesijai ieraksts pieder.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"

Sesiju logu izmantošana akciju darījumu izsekošanai

Izmantosim sesiju logus, lai tvertu informāciju par apmaiņas darījumiem. Sesiju logu ieviešana ir parādīta sarakstā 5.5 (kuru var atrast src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Jūs jau esat redzējis lielāko daļu operāciju šajā topoloģijā, tāpēc šeit nav nepieciešams tās vēlreiz apskatīt. Bet šeit ir arī vairāki jauni elementi, kurus mēs tagad apspriedīsim.

Jebkura groupBy darbība parasti veic noteikta veida apkopošanas darbību (apkopošanu, apkopošanu vai skaitīšanu). Varat veikt vai nu kumulatīvo apkopošanu ar tekošu kopsummu, vai logu apkopošanu, kas ņem vērā ierakstus noteiktā laika logā.

Kods sarakstā 5.5 uzskaita transakciju skaitu sesijas logos. Attēlā 5.13. šīs darbības tiek analizētas soli pa solim.

Izsaucot windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)), mēs izveidojam sesijas logu ar neaktivitātes intervālu 20 sekundes un noturības intervālu 15 minūtes. 20 sekunžu dīkstāves intervāls nozīmē, ka lietojumprogramma pašreizējā (aktīvajā) sesijā iekļaus jebkuru ierakstu, kas tiek saņemts 20 sekunžu laikā pēc pašreizējās sesijas beigām vai sākuma.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Tālāk mēs norādām, kura apkopošanas darbība ir jāveic sesijas logā - šajā gadījumā skaitiet. Ja ienākošais ieraksts atrodas ārpus neaktivitātes loga (abās pusēs datuma/laika zīmoga), lietojumprogramma izveido jaunu sesiju. Saglabāšanas intervāls nozīmē sesijas saglabāšanu noteiktu laiku un ļauj saņemt novēlotus datus, kas pārsniedz sesijas neaktivitātes periodu, taču tos joprojām var pievienot. Turklāt sapludināšanas rezultātā izveidotās jaunās sesijas sākums un beigas atbilst agrākajam un jaunākajam datuma/laika zīmogam.

Apskatīsim dažus ierakstus no skaitīšanas metodes, lai redzētu, kā darbojas sesijas (5.1. tabula).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Kad tiek saņemti ieraksti, mēs meklējam esošās sesijas ar to pašu atslēgu, beigu laiku, kas ir mazāks par pašreizējo datuma/laika zīmogu — neaktivitātes intervāls, un sākuma laiku, kas ir lielāks par pašreizējo datuma/laika zīmogu + neaktivitātes intervālu. Ņemot to vērā, četri ieraksti no tabulas. 5.1 tiek apvienoti vienā sesijā šādi.

1. 1. ieraksts ir pirmais, tāpēc sākuma laiks ir vienāds ar beigu laiku un ir 00:00:00.

2. Tālāk pienāk 2. ieraksts, un mēs meklējam sesijas, kas beidzas ne agrāk kā 23:59:55 un sākas ne vēlāk kā 00:00:35. Mēs atrodam 1. ierakstu un apvienojam 1. un 2. sesiju. Mēs ņemam 1. sesijas sākuma laiku (agrāk) un 2. sesijas beigu laiku (vēlāk), lai mūsu jaunā sesija sākas 00:00:00 un beidzas 00: 00:15.

3. Pienāk 3. ieraksts, mēs meklējam sesijas laikā no 00:00:30 līdz 00:01:10 un neatrodam nevienu. Pievienojiet otru sesiju atslēgai 123-345-654,FFBE, kas sākas un beidzas plkst. 00:00:50.

4. Pienāk 4. ieraksts, un mēs meklējam sesijas laikā no 23:59:45 līdz 00:00:25. Šoreiz ir atrastas gan 1., gan 2. sesijas. Visas trīs sesijas ir apvienotas vienā, ar sākuma laiku 00:00:00 un beigu laiku 00:00:15.

No šajā sadaļā aprakstītā ir vērts atcerēties šādas svarīgas nianses:

  • sesijas nav noteikta izmēra logi. Sesijas ilgumu nosaka aktivitāte noteiktā laika periodā;
  • Datuma/laika spiedogi datos nosaka, vai notikums ietilpst esošajā sesijā vai dīkstāves periodā.

Tālāk mēs apspriedīsim nākamo logu veidu - “krītošos” logus.

"Kūkstoši" logi

Klīstošie logi fiksē notikumus, kas ietilpst noteiktā laika periodā. Iedomājieties, ka jums ik pēc 20 sekundēm ir jātver visas konkrēta uzņēmuma akciju transakcijas, lai jūs apkopotu visus notikumus šajā laika periodā. 20 sekunžu intervāla beigās logs apgriežas un pāriet uz jaunu 20 sekunžu novērošanas intervālu. 5.14. attēls ilustrē šo situāciju.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Kā redzat, visi notikumi, kas saņemti pēdējo 20 sekunžu laikā, tiek iekļauti logā. Šī laika perioda beigās tiek izveidots jauns logs.

5.6. sarakstā ir parādīts kods, kas parāda mainīgu logu izmantošanu, lai tvertu akciju darījumus ik pēc 20 sekundēm (atrodams src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Izmantojot šīs nelielas izmaiņas TimeWindows.of metodes izsaukumā, varat izmantot krītošu logu. Šajā piemērā netiek izsaukta līdz() metode, tāpēc tiks izmantots noklusējuma saglabāšanas intervāls 24 stundas.

Beidzot ir pienācis laiks pāriet uz pēdējo no loga opcijām - logu "lēciena".

Bīdāmie ("lecošie") logi

Bīdāmie logi ir līdzīgi krītošiem logiem, taču ar nelielu atšķirību. Bīdāmie logi negaida līdz laika intervāla beigām, pirms tiek izveidots jauns logs, lai apstrādātu nesenos notikumus. Viņi sāk jaunus aprēķinus pēc gaidīšanas intervāla, kas ir mazāks par loga ilgumu.

Lai ilustrētu atšķirības starp krītošiem un lecošiem logiem, atgriezīsimies pie biržas darījumu skaitīšanas piemēra. Mūsu mērķis joprojām ir saskaitīt darījumu skaitu, taču mēs nevēlamies gaidīt visu laiku pirms skaitītāja atjaunināšanas. Tā vietā mēs atjaunināsim skaitītāju ar īsākiem intervāliem. Piemēram, mēs joprojām uzskaitīsim darījumu skaitu ik pēc 20 sekundēm, bet atjaunināsim skaitītāju ik pēc 5 sekundēm, kā parādīts attēlā. 5.15. Šajā gadījumā mēs iegūstam trīs rezultātu logus ar datiem, kas pārklājas.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Sarakstā 5.7 ir parādīts kods bīdāmo logu definēšanai (atrodams src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Kļūstošu logu var pārveidot par lēciena logu, pievienojot izsaukumu advanceBy() metodei. Parādītajā piemērā saglabāšanas intervāls ir 15 minūtes.

Šajā sadaļā jūs redzējāt, kā ierobežot apkopošanas rezultātus līdz laika logiem. Jo īpaši es vēlos, lai jūs no šīs sadaļas atceraties šādas trīs lietas:

  • sesiju logu lielumu ierobežo nevis laika periods, bet gan lietotāja aktivitāte;
  • “klīstošie” logi sniedz pārskatu par notikumiem noteiktā laika periodā;
  • Pārlēkšanas logu ilgums ir fiksēts, taču tie tiek bieži atjaunināti un var ietvert ierakstus, kas pārklājas visos logos.

Tālāk mēs uzzināsim, kā pārveidot KTable atpakaļ par KStream savienojuma izveidei.

5.3.3. KStream un KTable objektu savienošana

4. nodaļā mēs apspriedām divu KStream objektu savienošanu. Tagad mums ir jāiemācās savienot KTable un KStream. Tas var būt nepieciešams šāda vienkārša iemesla dēļ. KStream ir ierakstu straume, un KTable ir ierakstu atjauninājumu straume, taču dažreiz jūs varat pievienot papildu kontekstu ierakstu straumei, izmantojot atjauninājumus no KTable.

Ņemsim datus par biržas darījumu skaitu un apvienosim ar biržas jaunumiem attiecīgajām nozarēm. Lūk, kas jums jādara, lai to sasniegtu, ņemot vērā jums jau esošo kodu.

  1. Pārvērtiet KTable objektu ar datiem par akciju darījumu skaitu par KStream, kam seko atslēgas aizstāšana ar atslēgu, kas norāda šim akciju simbolam atbilstošo nozares sektoru.
  2. Izveidojiet KTable objektu, kas nolasa datus no tēmas ar biržas ziņām. Šis jaunais KTable tiks iedalīts kategorijās pēc nozares.
  3. Savienojiet jaunumus ar informāciju par biržas darījumu skaitu pa nozarēm.

Tagad redzēsim, kā īstenot šo rīcības plānu.

Konvertējiet KTable par KStream

Lai pārvērstu KTable par KStream, jums ir jāveic šādas darbības.

  1. Izsauciet KTable.toStream() metodi.
  2. Izsaucot metodi KStream.map, nomainiet atslēgu ar nozares nosaukumu un pēc tam izgūstiet objektu TransactionSummary no logu instances.

Mēs šīs darbības sasaistīsim kopā šādi (kodu var atrast failā src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.8. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Tā kā mēs veicam KStream.map darbību, atgrieztā KStream instance tiek automātiski atkārtoti sadalīta, kad tā tiek izmantota savienojumā.

Esam pabeiguši konvertēšanas procesu, tālāk jāizveido KTable objekts akciju ziņu lasīšanai.

KTable izveide akciju jaunumiem

Par laimi, lai izveidotu KTable objektu, ir nepieciešama tikai viena koda rindiņa (kodu var atrast src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.9. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Ir vērts atzīmēt, ka Serde objekti nav jānorāda, jo iestatījumos tiek izmantota virkne Serdes. Tāpat, izmantojot ARĪGĀKO uzskaitījumu, tabula tiek aizpildīta ar ierakstiem jau pašā sākumā.

Tagad mēs varam pāriet uz pēdējo posmu - savienojumu.

Ziņu atjauninājumu savienošana ar darījumu skaita datiem

Izveidot savienojumu nav grūti. Mēs izmantosim kreiso savienojumu gadījumā, ja attiecīgajā nozarē nebūs akciju jaunumu (nepieciešamo kodu var atrast failā src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (5.10. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Šis leftJoin operators ir diezgan vienkāršs. Atšķirībā no savienojumiem 4. nodaļā, JoinWindow metode netiek izmantota, jo, veicot KStream-KTable savienošanu, katrai atslēgai KTtable ir tikai viens ieraksts. Šāds savienojums nav ierobežots laikā: ieraksts ir vai nu KT tabulā, vai arī nav. Galvenais secinājums: izmantojot KTable objektus, jūs varat bagātināt KStream ar retāk atjauninātiem atsauces datiem.

Tagad mēs apskatīsim efektīvāku veidu, kā bagātināt notikumus no KStream.

5.3.4. GlobalKTable objekti

Kā redzat, ir nepieciešams bagātināt notikumu straumes vai pievienot tām kontekstu. 4. nodaļā jūs redzējāt savienojumus starp diviem KStream objektiem, un iepriekšējā sadaļā jūs redzējāt savienojumu starp KStream un KTable. Visos šajos gadījumos ir nepieciešams atkārtoti sadalīt datu straumi, kartējot atslēgas ar jaunu veidu vai vērtību. Dažreiz atkārtota sadalīšana tiek veikta tieši, un dažreiz Kafka Streams to veic automātiski. Pārdalīšana ir nepieciešama, jo ir mainījušās atslēgas un ierakstiem jānonāk jaunās sadaļās, pretējā gadījumā savienojums nebūs iespējams (par to tika runāts 4.nodaļā, sadaļā “Datu pārdalījumi” 4.2.4.apakšnodaļā).

Atkārtotai sadalīšanai ir jāmaksā

Pārdalīšana prasa izmaksas - papildu resursu izmaksas starptēmu veidošanai, dublēto datu glabāšanai citā tēmā; tas nozīmē arī palielinātu latentumu sakarā ar rakstīšanu un lasīšanu no šīs tēmas. Turklāt, ja jums ir nepieciešams savienot vairāk nekā vienu aspektu vai dimensiju, jums ir jāsavieno savienojumi, jāsakartē ieraksti ar jaunām atslēgām un vēlreiz jāpalaiž atkārtotas sadalīšanas process.

Savienojuma izveide ar mazākām datu kopām

Dažos gadījumos savienojamo atsauces datu apjoms ir salīdzinoši neliels, tāpēc pilnīgas to kopijas var viegli ievietot lokāli katrā mezglā. Šādās situācijās Kafka Streams nodrošina GlobalKTable klasi.

GlobalKTable gadījumi ir unikāli, jo lietojumprogramma replicē visus datus katrā mezglā. Un tā kā visi dati atrodas katrā mezglā, notikumu straume nav jāsadala ar atsauces datu atslēgu, lai tā būtu pieejama visiem nodalījumiem. Varat arī izveidot bezatslēgas savienojumus, izmantojot GlobalKTable objektus. Atgriezīsimies pie viena no iepriekšējiem piemēriem, lai parādītu šo funkciju.

KStream objektu savienošana ar GlobalKTable objektiem

5.3.2.apakšsadaļā veicām pircēju maiņas darījumu logu agregāciju. Šīs apkopošanas rezultāti izskatījās apmēram šādi:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Lai gan šie rezultāti kalpoja mērķim, būtu bijis lietderīgāk, ja būtu parādīts arī klienta vārds un pilns uzņēmuma nosaukums. Lai pievienotu klienta vārdu un uzņēmuma nosaukumu, varat veikt parastos savienojumus, taču jums būs jāveic divas atslēgas kartēšanas un atkārtota sadalīšana. Izmantojot GlobalKTable, jūs varat izvairīties no šādu darbību izmaksām.

Lai to izdarītu, mēs izmantosim countStream objektu no saraksta 5.11 (attiecīgo kodu var atrast src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) un savienosim to ar diviem GlobalKTable objektiem.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Mēs to jau esam apsprieduši iepriekš, tāpēc es to neatkārtošu. Bet es atzīmēju, ka kods funkcijā toStream().map lasāmības labad tiek abstrahēts funkcijas objektā, nevis iekļautā lambda izteiksmē.

Nākamais solis ir deklarēt divus GlobalKTable gadījumus (parādītais kods ir atrodams failā src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (5.12. saraksts).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"

Lūdzu, ņemiet vērā, ka tēmu nosaukumi ir aprakstīti, izmantojot uzskaitītos veidus.

Tagad, kad visi komponenti ir gatavi, atliek tikai uzrakstīt savienojuma kodu (to var atrast failā src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Saraksts 5.13).

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Lai gan šajā kodā ir divi savienojumi, tie ir savienoti ķēdē, jo neviens no to rezultātiem netiek izmantots atsevišķi. Rezultāti tiek parādīti visas darbības beigās.

Veicot iepriekš minēto pievienošanas darbību, jūs iegūsit šādus rezultātus:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Būtība nav mainījusies, taču šie rezultāti izskatās skaidrāki.

Ja paskaitāt atpakaļ līdz 4. nodaļai, jūs jau esat redzējis vairāku veidu savienojumus darbībā. Tie ir norādīti tabulā. 5.2. Šī tabula atspoguļo savienojamības iespējas Kafka Streams 1.0.0 versijā; Nākamajos laidienos kaut kas var mainīties.

Grāmata “Kafkas straumes darbībā. Lietojumprogrammas un mikropakalpojumi reāllaika darbam"
Lai pabeigtu lietas, atkārtosim pamatus: jūs varat savienot notikumu straumes (KStream) un atjaunināt straumes (KTable), izmantojot vietējo stāvokli. Alternatīvi, ja atsauces datu izmērs nav pārāk liels, varat izmantot GlobalKTable objektu. GlobalKTables atkārto visus nodalījumus katrā Kafka Streams lietojumprogrammas mezglā, nodrošinot, ka visi dati ir pieejami neatkarīgi no tā, kuram nodalījumam atbilst atslēga.

Tālāk mēs redzēsim Kafka Streams funkciju, pateicoties kurai mēs varam novērot stāvokļa izmaiņas, neizmantojot datus no Kafka tēmas.

5.3.5. Pieprasāms stāvoklis

Mēs jau esam veikuši vairākas darbības, kas saistītas ar stāvokli, un vienmēr izvadām rezultātus uz konsoli (izstrādes nolūkos) vai ierakstām tos tēmā (ražošanas nolūkos). Rakstot rezultātus tēmai, to apskatīšanai ir jāizmanto Kafka patērētājs.

Datu lasīšanu par šīm tēmām var uzskatīt par materializētu uzskatu veidu. Mūsu vajadzībām mēs varam izmantot materializēta skata definīciju no Wikipedia: “...fizisks datu bāzes objekts, kas satur vaicājuma rezultātus. Piemēram, tā varētu būt attālo datu lokāla kopija vai tabulas rindu un/vai kolonnu apakškopa vai apvienot rezultātus, vai kopsavilkuma tabula, kas iegūta, izmantojot apkopošanu” (https://en.wikipedia.org/wiki /Materializēts_skats).

Kafka Streams arī ļauj palaist interaktīvus vaicājumus valsts veikalos, ļaujot tieši lasīt šos materializētos skatus. Ir svarīgi atzīmēt, ka vaicājums valsts veikalam ir tikai lasāma darbība. Tas nodrošina, ka jums nav jāuztraucas par nejaušu stāvokļa nekonsekvenci, kamēr jūsu lietojumprogramma apstrādā datus.

Svarīga ir iespēja tieši meklēt štata veikalus. Tas nozīmē, ka varat izveidot informācijas paneļa lietojumprogrammas, vispirms neiegūstot datus no Kafka patērētāja. Tas arī palielina lietojumprogrammas efektivitāti, jo nav nepieciešams atkārtoti rakstīt datus:

  • pateicoties datu atrašanās vietai, tiem var ātri piekļūt;
  • tiek novērsta datu dublēšanās, jo tie netiek ierakstīti ārējā atmiņā.

Galvenais, ko es vēlos, lai jūs atceraties, ir tas, ka varat tieši vaicāt stāvokli savā lietojumprogrammā. Iespējas, ko tas jums sniedz, nevar pārvērtēt. Tā vietā, lai patērētu datus no Kafka un saglabātu ierakstus lietojumprogrammas datu bāzē, varat veikt vaicājumus par statusa veikaliem ar tādu pašu rezultātu. Tiešie vaicājumi štata veikaliem nozīmē mazāk koda (nav patērētāja) un mazāk programmatūras (nav nepieciešama datu bāzes tabula, lai saglabātu rezultātus).

Šajā nodaļā mēs esam aplūkojuši diezgan daudz jautājumu, tāpēc pagaidām atstāsim diskusiju par interaktīvajiem vaicājumiem pret valsts veikaliem. Bet neuztraucieties: 9. nodaļā mēs izveidosim vienkāršu informācijas paneļa lietojumprogrammu ar interaktīviem vaicājumiem. Tajā tiks izmantoti daži piemēri no šīs un iepriekšējām nodaļām, lai parādītu interaktīvos vaicājumus un to, kā tos pievienot Kafka Streams lietojumprogrammām.

Kopsavilkums

  • KStream objekti attēlo notikumu straumes, kas ir salīdzināmas ar ievietojumiem datu bāzē. KTable objekti attēlo atjaunināšanas straumes, vairāk kā datu bāzes atjauninājumus. KTable objekta izmērs nepalielinās, vecie ieraksti tiek aizstāti ar jauniem.
  • KTable objekti ir nepieciešami apkopošanas darbībām.
  • Izmantojot logu operācijas, varat sadalīt apkopotos datus laika segmentos.
  • Pateicoties GlobalKTable objektiem, jūs varat piekļūt atsauces datiem jebkurā lietojumprogrammā neatkarīgi no sadalīšanas.
  • Ir iespējami savienojumi starp KStream, KTable un GlobalKTable objektiem.

Līdz šim mēs esam koncentrējušies uz Kafka Streams lietojumprogrammu izveidi, izmantojot augsta līmeņa KStream DSL. Lai gan augsta līmeņa pieeja ļauj izveidot glītas un kodolīgas programmas, tās izmantošana ir kompromiss. Darbs ar DSL KStream nozīmē palielināt koda kodolīgumu, samazinot kontroles pakāpi. Nākamajā nodaļā apskatīsim zema līmeņa apdarinātāja mezgla API un izmēģināsim citus kompromisus. Programmas būs garākas nekā iepriekš, taču mēs varēsim izveidot gandrīz jebkuru apstrādātāja mezglu, kas mums varētu būt nepieciešams.

→ Sīkāku informāciju par grāmatu var atrast vietnē izdevēja vietne

→ Habrozhiteli 25% atlaide, izmantojot kuponu - Kafkas straumes

→ Apmaksājot grāmatas papīra versiju, pa e-pastu tiks nosūtīta elektroniskā grāmata.

Avots: www.habr.com

Pievieno komentāru