Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho" Kumusta, mga residente ng Khabro! Ang aklat na ito ay angkop para sa sinumang developer na gustong maunawaan ang pagproseso ng thread. Ang pag-unawa sa distributed programming ay makakatulong sa iyong mas maunawaan ang Kafka at Kafka Stream. Masarap malaman ang mismong balangkas ng Kafka, ngunit hindi ito kinakailangan: Sasabihin ko sa iyo ang lahat ng kailangan mo. Matututo ang mga may karanasang developer at baguhan ng Kafka kung paano lumikha ng mga kawili-wiling application sa pagpoproseso ng stream gamit ang library ng Kafka Stream sa aklat na ito. Ang mga intermediate at advanced na developer ng Java na pamilyar na sa mga konsepto tulad ng serialization ay matututong ilapat ang kanilang mga kasanayan upang lumikha ng mga application ng Kafka Streams. Ang source code ng libro ay nakasulat sa Java 8 at gumagawa ng makabuluhang paggamit ng Java 8 lambda expression syntax, kaya ang pag-alam kung paano magtrabaho sa mga function ng lambda (kahit sa ibang programming language) ay magiging kapaki-pakinabang.

Sipi. 5.3. Pagsasama-sama at pagpapatakbo ng windowing

Sa seksyong ito, magpapatuloy tayo upang tuklasin ang mga pinaka-promising na bahagi ng Kafka Streams. Sa ngayon ay sakop na namin ang mga sumusunod na aspeto ng Kafka Streams:

  • paglikha ng isang processing topology;
  • paggamit ng estado sa streaming application;
  • pagsasagawa ng mga koneksyon sa stream ng data;
  • mga pagkakaiba sa pagitan ng mga stream ng kaganapan (KStream) at mga stream ng update (KTable).

Sa mga sumusunod na halimbawa, pagsasama-samahin natin ang lahat ng mga elementong ito. Matututuhan mo rin ang tungkol sa windowing, isa pang magandang feature ng streaming application. Ang aming unang halimbawa ay magiging isang simpleng pagsasama-sama.

5.3.1. Pagsasama-sama ng mga benta ng stock ayon sa sektor ng industriya

Ang pagsasama-sama at pagpapangkat ay mahahalagang tool kapag nagtatrabaho sa streaming data. Ang pagsusuri sa mga indibidwal na rekord habang tinatanggap ang mga ito ay kadalasang hindi sapat. Upang kunin ang karagdagang impormasyon mula sa data, kinakailangan na pangkatin at pagsamahin ang mga ito.

Sa halimbawang ito, isusuot mo ang costume ng isang day trader na kailangang subaybayan ang dami ng benta ng mga stock ng mga kumpanya sa ilang industriya. Sa partikular, interesado ka sa limang kumpanya na may pinakamalaking share sales sa bawat industriya.

Ang nasabing pagsasama-sama ay mangangailangan ng sumusunod na ilang hakbang upang isalin ang data sa nais na anyo (na nagsasalita sa mga pangkalahatang termino).

  1. Gumawa ng source na batay sa paksa na nag-publish ng hilaw na impormasyon ng stock trading. Kakailanganin nating imapa ang isang object ng uri ng StockTransaction sa isang object ng uri ng ShareVolume. Ang punto ay naglalaman ang object ng StockTransaction ng metadata ng mga benta, ngunit kailangan lang namin ng data tungkol sa bilang ng mga share na ibinebenta.
  2. Igrupo ang data ng ShareVolume ayon sa simbolo ng stock. Kapag nakapangkat na ayon sa simbolo, maaari mong i-collapse ang data na ito sa mga subtotal ng dami ng mga benta ng stock. Kapansin-pansin na ang paraan ng KStream.groupBy ay nagbabalik ng isang halimbawa ng uri na KGroupedStream. At maaari kang makakuha ng KTable na instance sa pamamagitan ng karagdagang pagtawag sa KGroupedStream.reduce na paraan.

Ano ang interface ng KGroupedStream

Ang mga pamamaraan ng KStream.groupBy at KStream.groupByKey ay nagbabalik ng isang instance ng KGroupedStream. Ang KGroupedStream ay isang intermediate na representasyon ng isang stream ng mga kaganapan pagkatapos ng pagpapangkat ayon sa mga key. Hindi ito inilaan para sa direktang trabaho dito. Sa halip, ginagamit ang KGroupedStream para sa mga pagpapatakbo ng pagsasama-sama, na palaging nagreresulta sa isang KTable. At dahil ang resulta ng mga pagpapatakbo ng pagsasama-sama ay isang KTable at gumagamit sila ng isang tindahan ng estado, posibleng hindi lahat ng mga pag-update bilang resulta ay ipinadala sa ibaba ng pipeline.

Ang paraan ng KTable.groupBy ay nagbabalik ng katulad na KGroupedTable - isang intermediate na representasyon ng stream ng mga update, na muling pinagsama-sama ng key.

Magpahinga muna tayo at tingnan ang Fig. 5.9, na nagpapakita kung ano ang aming nakamit. Ang topology na ito ay dapat na pamilyar sa iyo.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Tingnan natin ngayon ang code para sa topology na ito (matatagpuan ito sa file na src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ang ibinigay na code ay nakikilala sa pamamagitan ng kaiklian nito at ang malaking dami ng mga aksyon na isinagawa sa ilang mga linya. Maaaring may mapansin kang bago sa unang parameter ng paraan ng builder.stream: isang halaga ng uri ng enum na AutoOffsetReset.EARLIEST (mayroon ding LATEST), na itinakda gamit ang paraan ng Consumed.withOffsetResetPolicy. Maaaring gamitin ang uri ng enumeration na ito upang tumukoy ng diskarte sa pag-reset ng offset para sa bawat KStream o KTable at mauuna sa opsyon sa pag-reset ng offset mula sa configuration.

GroupByKey at GroupBy

Ang interface ng KStream ay may dalawang paraan para sa pagpapangkat ng mga tala: GroupByKey at GroupBy. Parehong nagbabalik ng KGroupedTable, kaya maaaring nagtataka ka kung ano ang pagkakaiba sa pagitan nila at kung kailan gagamitin ang alin?

Ginagamit ang paraan ng GroupByKey kapag wala nang laman ang mga susi sa KStream. At higit sa lahat, hindi kailanman naitakda ang flag na "nangangailangan ng muling paghahati".

Ipinapalagay ng paraan ng GroupBy na binago mo ang mga key ng pagpapangkat, kaya ang bandila ng repartition ay nakatakda sa true. Ang pagsasagawa ng mga pagsali, pagsasama-sama, atbp. pagkatapos ng pamamaraang GroupBy ay magreresulta sa awtomatikong muling paghahati.
Buod: Hangga't maaari, dapat mong gamitin ang GroupByKey kaysa sa GroupBy.

Malinaw kung ano ang ginagawa ng mga pamamaraan ng mapValues ​​at groupBy, kaya tingnan natin ang paraan ng sum() (matatagpuan sa src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ibinabalik ng ShareVolume.sum na paraan ang tumatakbong kabuuang dami ng benta ng stock, at ang resulta ng buong hanay ng mga kalkulasyon ay isang KTable na bagay . Ngayon naiintindihan mo na ang papel na ginagampanan ng KTable. Kapag dumating ang mga bagay na ShareVolume, iniimbak ng kaukulang bagay na KTable ang pinakabagong kasalukuyang update. Mahalagang tandaan na ang lahat ng mga update ay makikita sa nakaraang shareVolumeKTable, ngunit hindi lahat ay ipinadala pa.

Susunod, gamit ang KTable na ito, pinagsasama-sama namin (ayon sa bilang ng mga na-trade na pagbabahagi) upang makarating sa limang kumpanyang may pinakamataas na volume ng mga na-trade na pagbabahagi sa bawat industriya. Ang aming mga aksyon sa kasong ito ay magiging katulad ng para sa unang pagsasama-sama.

  1. Magsagawa ng isa pang pagpapatakbo ng groupBy upang pangkatin ang mga indibidwal na ShareVolume object ayon sa industriya.
  2. Simulan ang pagbubuod ng mga bagay ng ShareVolume. Sa pagkakataong ito ang aggregation object ay isang fixed-size na priority queue. Sa fixed-size queue na ito, tanging ang limang kumpanyang may pinakamalaking halaga ng shares na naibenta ang nananatili.
  3. I-map ang mga pila mula sa nakaraang talata sa isang string value at ibalik ang nangungunang limang pinakana-trade na stock ayon sa numero ayon sa industriya.
  4. Isulat ang mga resulta sa string form sa paksa.

Sa Fig. Ipinapakita ng Figure 5.10 ang data flow topology graph. Tulad ng nakikita mo, ang pangalawang pag-ikot ng pagproseso ay medyo simple.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ngayon na mayroon na tayong malinaw na pag-unawa sa istruktura ng ikalawang round ng pagproseso na ito, maaari tayong bumaling sa source code nito (makikita mo ito sa file na src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Naglalaman ang initializer na ito ng fixedQueue variable. Ito ay isang custom na bagay na isang adaptor para sa java.util.TreeSet na ginagamit upang subaybayan ang nangungunang N resulta sa pababang pagkakasunud-sunod ng mga share na nakalakal.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Nakita mo na ang groupBy at mapValues ​​​​mga tawag, kaya hindi na kami pupunta sa mga iyon (tinatawag namin ang KTable.toStream na paraan dahil hindi na ginagamit ang paraan ng KTable.print). Ngunit hindi mo pa nakikita ang KTable na bersyon ng aggregate() kaya maglalaan kami ng kaunting oras sa pagtalakay niyan.

Tulad ng iyong natatandaan, kung bakit naiiba ang KTable ay ang mga talaan na may parehong mga key ay itinuturing na mga update. Pinapalitan ng KTable ang lumang entry ng bago. Nagaganap ang pagsasama-sama sa katulad na paraan: ang mga pinakabagong tala na may parehong key ay pinagsama-sama. Kapag may dumating na record, idinaragdag ito sa FixedSizePriorityQueue class instance gamit ang isang adder (pangalawang parameter sa aggregate method call), ngunit kung mayroon nang ibang record na may parehong key, ang lumang record ay aalisin gamit ang subtractor (third parameter sa ang pinagsama-samang paraan ng tawag).

Nangangahulugan ito na ang aming aggregator, ang FixedSizePriorityQueue, ay hindi pinagsama-sama ang lahat ng mga halaga sa isang susi, ngunit nag-iimbak ng isang gumagalaw na kabuuan ng mga dami ng N pinakanakalakal na uri ng mga stock. Ang bawat papasok na entry ay naglalaman ng kabuuang bilang ng mga share na naibenta sa ngayon. Bibigyan ka ng KTable ng impormasyon tungkol sa kung aling mga kumpanya ang kasalukuyang pinakanakalakal, nang hindi nangangailangan ng rolling aggregation ng bawat update.

Natutunan nating gawin ang dalawang mahahalagang bagay:

  • mga halaga ng pangkat sa KTable ng isang karaniwang key;
  • magsagawa ng mga kapaki-pakinabang na operasyon gaya ng rollup at aggregation sa mga nakagrupong value na ito.

Ang pag-alam kung paano isasagawa ang mga operasyong ito ay mahalaga sa pag-unawa sa kahulugan ng data na lumilipat sa pamamagitan ng isang Kafka Streams application at pag-unawa kung anong impormasyon ang dala nito.

Pinagsama-sama rin namin ang ilan sa mga pangunahing konsepto na tinalakay kanina sa aklat na ito. Sa Kabanata 4, tinalakay namin kung paano mahalaga ang fault-tolerant, lokal na estado para sa isang streaming application. Ang unang halimbawa sa kabanatang ito ay nagpakita kung bakit napakahalaga ng lokal na estadoβ€”nagbibigay ito sa iyo ng kakayahang subaybayan kung anong impormasyon ang nakita mo na. Iniiwasan ng lokal na pag-access ang mga pagkaantala sa network, na ginagawang mas mahusay ang application at lumalaban sa error.

Kapag nagsasagawa ng anumang rollup o aggregation operation, dapat mong tukuyin ang pangalan ng state store. Ang rollup at aggregation operations ay nagbabalik ng isang KTable instance, at ang KTable ay gumagamit ng state storage para palitan ang mga lumang resulta ng mga bago. Tulad ng nakita mo, hindi lahat ng mga update ay ipinadala sa pipeline, at ito ay mahalaga dahil ang mga pagpapatakbo ng pagsasama-sama ay idinisenyo upang makagawa ng buod ng impormasyon. Kung hindi mo ilalapat ang lokal na estado, ipapasa ng KTable ang lahat ng resulta ng pagsasama-sama at rollup.

Susunod, titingnan natin ang pagsasagawa ng mga operasyon tulad ng pagsasama-sama sa loob ng isang partikular na yugto ng panahon - tinatawag na mga pagpapatakbo ng windowing.

5.3.2. Mga operasyon sa bintana

Sa nakaraang seksyon, ipinakilala namin ang sliding convolution at aggregation. Ang application ay nagsagawa ng tuluy-tuloy na roll-up ng dami ng benta ng stock, na sinusundan ng pagsasama-sama ng limang pinakanakalakal na stock sa exchange.

Minsan ang ganitong tuluy-tuloy na pagsasama-sama at roll-up ng mga resulta ay kinakailangan. At kung minsan kailangan mong magsagawa ng mga operasyon sa loob lamang ng isang takdang panahon. Halimbawa, kalkulahin kung gaano karaming mga transaksyon sa palitan ang ginawa sa mga bahagi ng isang partikular na kumpanya sa huling 10 minuto. O kung gaano karaming mga gumagamit ang nag-click sa isang bagong banner ng advertising sa huling 15 minuto. Ang isang application ay maaaring magsagawa ng mga naturang operasyon nang maraming beses, ngunit may mga resulta na nalalapat lamang sa mga tinukoy na yugto ng panahon (mga window ng oras).

Nagbibilang ng mga transaksyon sa palitan ng mamimili

Sa susunod na halimbawa, susubaybayan namin ang mga transaksyon ng stock sa maraming mangangalakalβ€”malalaking organisasyon o matalinong indibidwal na financier.

Mayroong dalawang posibleng dahilan para sa pagsubaybay na ito. Isa na rito ang pangangailangang malaman kung ano ang binibili/binebenta ng mga market leaders. Kung ang mga malalaking manlalaro at mga sopistikadong mamumuhunan ay nakakakita ng pagkakataon, makatuwirang sundin ang kanilang diskarte. Ang pangalawang dahilan ay ang pagnanais na makita ang anumang posibleng senyales ng ilegal na insider trading. Upang gawin ito, kakailanganin mong pag-aralan ang ugnayan ng malalaking pagtaas ng benta na may mahahalagang press release.

Ang nasabing pagsubaybay ay binubuo ng mga sumusunod na hakbang:

  • paggawa ng stream para sa pagbabasa mula sa paksa ng stock-transactions;
  • pagpapangkat ng mga papasok na tala sa pamamagitan ng buyer ID at stock symbol. Ang pagtawag sa groupBy method ay nagbabalik ng isang instance ng klase ng KGroupedStream;
  • Ang paraan ng KGroupedStream.windowedBy ay nagbabalik ng stream ng data na limitado sa isang window ng oras, na nagbibigay-daan sa windowed aggregation. Depende sa uri ng window, ibabalik ang TimeWindowedKStream o SessionWindowedKStream;
  • bilang ng transaksyon para sa pagpapatakbo ng pagsasama-sama. Tinutukoy ng windowed data flow kung ang isang partikular na tala ay isinasaalang-alang sa bilang na ito;
  • pagsulat ng mga resulta sa isang paksa o pag-output ng mga ito sa console sa panahon ng pagbuo.

Ang topology ng application na ito ay simple, ngunit ang isang malinaw na larawan nito ay makakatulong. Tingnan natin ang Fig. 5.11.

Susunod, titingnan natin ang functionality ng mga pagpapatakbo ng window at ang kaukulang code.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"

Mga uri ng bintana

May tatlong uri ng mga bintana sa Kafka Streams:

  • sessional;
  • "tumbling";
  • pag-slide/paglukso.

Alin ang pipiliin ay depende sa iyong mga kinakailangan sa negosyo. Ang mga tumbling at jumping window ay limitado sa oras, habang ang mga session window ay limitado sa pamamagitan ng aktibidad ng userβ€”ang tagal ng (mga) session ay tinutukoy lamang sa kung gaano kaaktibo ang user. Ang pangunahing bagay na dapat tandaan ay ang lahat ng mga uri ng window ay batay sa mga selyo ng petsa/oras ng mga entry, hindi ang oras ng system.

Susunod, ipinatupad namin ang aming topology sa bawat isa sa mga uri ng window. Ang kumpletong code ay ibibigay lamang sa unang halimbawa; para sa iba pang mga uri ng mga bintana walang magbabago maliban sa uri ng pagpapatakbo ng window.

Mga window ng session

Ang mga window ng session ay ibang-iba sa lahat ng iba pang uri ng mga bintana. Nalilimitahan ang mga ito hindi gaanong ayon sa oras kundi sa aktibidad ng user (o sa aktibidad ng entity na gusto mong subaybayan). Ang mga window ng session ay nililimitahan ng mga panahon ng kawalan ng aktibidad.

Ang Figure 5.12 ay naglalarawan ng konsepto ng session window. Ang mas maliit na session ay magsasama sa session sa kaliwa nito. At ang session sa kanan ay magiging hiwalay dahil kasunod ito ng mahabang panahon ng kawalan ng aktibidad. Nakabatay ang mga window ng session sa aktibidad ng user, ngunit gumamit ng mga stamp ng petsa/oras mula sa mga entry upang matukoy kung saang session kabilang ang entry.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"

Paggamit ng mga window ng session upang subaybayan ang mga transaksyon sa stock

Gamitin natin ang mga session window para kumuha ng impormasyon tungkol sa mga transaksyon sa palitan. Ang pagpapatupad ng mga window ng session ay ipinapakita sa Listing 5.5 (na makikita sa src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Nakita mo na ang karamihan sa mga operasyon sa topology na ito, kaya hindi na kailangang suriin muli ang mga ito dito. Ngunit mayroon ding ilang mga bagong elemento dito, na tatalakayin natin ngayon.

Anumang groupBy operation ay karaniwang nagsasagawa ng ilang uri ng aggregation operation (pagsasama-sama, rollup, o pagbibilang). Maaari kang magsagawa ng alinman sa pinagsama-samang pagsasama-sama na may kabuuang tumatakbo, o pagsasama-sama ng window, na isinasaalang-alang ang mga talaan sa loob ng isang tinukoy na palugit ng oras.

Binibilang ng code sa Listing 5.5 ang bilang ng mga transaksyon sa loob ng mga window ng session. Sa Fig. 5.13 ang mga pagkilos na ito ay sinusuri ng hakbang-hakbang.

Sa pamamagitan ng pagtawag sa windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) gumawa kami ng session window na may inactivity interval na 20 segundo at isang persistence interval na 15 minuto. Ang isang idle interval na 20 segundo ay nangangahulugan na ang application ay isasama ang anumang entry na darating sa loob ng 20 segundo ng pagtatapos o pagsisimula ng kasalukuyang session sa kasalukuyang (aktibo) na session.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Susunod, tinutukoy namin kung aling operasyon ng pagsasama-sama ang kailangang isagawa sa window ng session - sa kasong ito, bilangin. Kung ang isang papasok na entry ay nahuhulog sa labas ng inactivity window (alinman sa gilid ng date/time stamp), ang application ay gagawa ng bagong session. Ang agwat ng pagpapanatili ay nangangahulugan ng pagpapanatili ng isang session para sa isang tiyak na tagal ng oras at nagbibigay-daan para sa late data na lumalampas sa panahon ng kawalan ng aktibidad ng session ngunit maaari pa ring i-attach. Bilang karagdagan, ang pagsisimula at pagtatapos ng bagong session na nagreresulta mula sa pagsasanib ay tumutugma sa pinakamaaga at pinakahuling selyo ng petsa/oras.

Tingnan natin ang ilang mga entry mula sa paraan ng pagbilang upang makita kung paano gumagana ang mga session (Talahanayan 5.1).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Kapag dumating ang mga tala, naghahanap kami ng mga umiiral nang session na may parehong key, isang oras ng pagtatapos na mas mababa kaysa sa kasalukuyang stamp ng petsa/oras - pagitan ng kawalan ng aktibidad, at isang oras ng pagsisimula na mas malaki kaysa sa kasalukuyang selyo ng petsa/oras + agwat ng kawalan ng aktibidad. Isinasaalang-alang ito, apat na mga entry mula sa talahanayan. 5.1 ay pinagsama sa isang session gaya ng mga sumusunod.

1. Unang dumating ang record 1, kaya ang oras ng pagsisimula ay katumbas ng oras ng pagtatapos at 00:00:00.

2. Susunod, darating ang entry 2, at hinahanap namin ang mga session na magtatapos nang hindi mas maaga sa 23:59:55 at magsisimula nang hindi lalampas sa 00:00:35. Nahanap namin ang record 1 at pinagsama ang mga session 1 at 2. Kinukuha namin ang oras ng pagsisimula ng session 1 (mas maaga) at ang oras ng pagtatapos ng session 2 (mamaya), upang ang aming bagong session ay magsisimula sa 00:00:00 at magtatapos sa 00: 00:15.

3. Dumating ang record 3, naghahanap kami ng mga session sa pagitan ng 00:00:30 at 00:01:10 at wala kaming mahanap. Magdagdag ng pangalawang session para sa key 123-345-654,FFBE, simula at magtatapos sa 00:00:50.

4. Dumating ang record 4 at naghahanap kami ng mga session sa pagitan ng 23:59:45 at 00:00:25. Sa pagkakataong ito makikita ang parehong session 1 at 2. Ang lahat ng tatlong session ay pinagsama sa isa, na may oras ng pagsisimula na 00:00:00 at isang oras ng pagtatapos na 00:00:15.

Mula sa kung ano ang inilarawan sa seksyong ito, ito ay nagkakahalaga ng pag-alala sa mga sumusunod na mahahalagang nuances:

  • ang mga session ay hindi fixed-size na mga window. Ang tagal ng isang session ay tinutukoy ng aktibidad sa loob ng isang takdang panahon;
  • Tinutukoy ng mga stamp ng petsa/oras sa data kung ang kaganapan ay nasa loob ng isang kasalukuyang session o sa panahon ng idle na panahon.

Susunod na tatalakayin natin ang susunod na uri ng window - "tumbling" na mga bintana.

Mga "tumbling" na bintana

Kinukuha ng mga tumbling window ang mga event na nahuhulog sa loob ng isang partikular na yugto ng panahon. Isipin na kailangan mong makuha ang lahat ng mga transaksyon sa stock ng isang partikular na kumpanya bawat 20 segundo, kaya kinokolekta mo ang lahat ng mga kaganapan sa panahong iyon. Sa pagtatapos ng 20 segundong agwat, ang window ay gumulong at lilipat sa isang bagong 20 segundong agwat ng pagmamasid. Ang Figure 5.14 ay naglalarawan ng sitwasyong ito.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Tulad ng nakikita mo, ang lahat ng mga kaganapan na natanggap sa huling 20 segundo ay kasama sa window. Sa pagtatapos ng panahong ito, isang bagong window ang gagawa.

Ang listahan 5.6 ay nagpapakita ng code na nagpapakita ng paggamit ng mga tumbling window upang makuha ang mga transaksyon sa stock bawat 20 segundo (matatagpuan sa src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Sa maliit na pagbabagong ito sa TimeWindows.of method na tawag, maaari kang gumamit ng tumbling window. Ang halimbawang ito ay hindi tinatawag ang until() na paraan, kaya ang default na pagitan ng pagpapanatili ng 24 na oras ay gagamitin.

Sa wakas, oras na upang lumipat sa huling mga pagpipilian sa window - "hopping" na mga bintana.

Mga sliding ("paglukso") na mga bintana

Ang mga sliding/hopping window ay katulad ng mga tumbling window, ngunit may kaunting pagkakaiba. Ang mga sliding window ay hindi maghihintay hanggang sa katapusan ng agwat ng oras bago gumawa ng bagong window upang iproseso ang mga kamakailang kaganapan. Nagsisimula sila ng mga bagong kalkulasyon pagkatapos ng agwat ng paghihintay na mas mababa kaysa sa tagal ng window.

Upang ilarawan ang mga pagkakaiba sa pagitan ng tumbling at jumping windows, bumalik tayo sa halimbawa ng pagbibilang ng mga transaksyon sa stock exchange. Ang layunin namin ay bilangin pa rin ang bilang ng mga transaksyon, ngunit hindi namin gustong maghintay ng buong tagal bago i-update ang counter. Sa halip, ia-update namin ang counter sa mas maikling pagitan. Halimbawa, bibilangin pa rin namin ang bilang ng mga transaksyon tuwing 20 segundo, ngunit i-update ang counter tuwing 5 segundo, tulad ng ipinapakita sa Fig. 5.15. Sa kasong ito, napupunta kami sa tatlong window ng resulta na may magkakapatong na data.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ipinapakita ng listahan 5.7 ang code para sa pagtukoy ng mga sliding window (matatagpuan sa src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ang isang tumbling window ay maaaring ma-convert sa isang hopping window sa pamamagitan ng pagdaragdag ng isang tawag sa advanceBy() na paraan. Sa halimbawang ipinakita, ang pagitan ng pag-save ay 15 minuto.

Nakita mo sa seksyong ito kung paano limitahan ang mga resulta ng pagsasama-sama sa mga palugit ng oras. Sa partikular, gusto kong tandaan mo ang sumusunod na tatlong bagay mula sa seksyong ito:

  • ang laki ng mga window ng session ay limitado hindi sa tagal ng panahon, ngunit sa pamamagitan ng aktibidad ng user;
  • Ang mga "tumbling" na bintana ay nagbibigay ng pangkalahatang-ideya ng mga kaganapan sa loob ng isang partikular na yugto ng panahon;
  • Ang tagal ng paglukso ng mga bintana ay naayos, ngunit ang mga ito ay madalas na ina-update at maaaring maglaman ng magkakapatong na mga entry sa lahat ng mga bintana.

Susunod, matututunan natin kung paano i-convert ang isang KTable pabalik sa isang KStream para sa isang koneksyon.

5.3.3. Pagkonekta ng KStream at KTable na mga bagay

Sa Kabanata 4, tinalakay namin ang pagkonekta ng dalawang bagay sa KStream. Ngayon kailangan nating matutunan kung paano ikonekta ang KTable at KStream. Maaaring kailanganin ito para sa sumusunod na simpleng dahilan. Ang KStream ay isang stream ng mga record, at ang KTable ay isang stream ng mga record update, ngunit minsan ay maaaring gusto mong magdagdag ng karagdagang konteksto sa record stream gamit ang mga update mula sa KTable.

Kumuha tayo ng data sa bilang ng mga transaksyon sa stock exchange at pagsamahin ang mga ito sa mga balita sa stock exchange para sa mga nauugnay na industriya. Narito ang kailangan mong gawin upang makamit ito dahil sa code na mayroon ka na.

  1. I-convert ang isang KTable object na may data sa bilang ng mga transaksyon sa stock sa isang KStream, na sinusundan ng pagpapalit sa key ng key na nagsasaad ng sektor ng industriya na naaayon sa simbolo ng stock na ito.
  2. Gumawa ng KTable object na nagbabasa ng data mula sa isang paksa na may balita sa stock exchange. Ang bagong KTable na ito ay ikategorya ayon sa sektor ng industriya.
  3. Ikonekta ang mga update ng balita sa impormasyon sa bilang ng mga transaksyon sa stock exchange ayon sa sektor ng industriya.

Ngayon tingnan natin kung paano ipatupad ang action plan na ito.

I-convert ang KTable sa KStream

Upang i-convert ang KTable sa KStream kailangan mong gawin ang sumusunod.

  1. Tawagan ang KTable.toStream() na paraan.
  2. Sa pamamagitan ng pagtawag sa paraan ng KStream.map, palitan ang key ng pangalan ng industriya, at pagkatapos ay kunin ang TransactionSummary object mula sa Windowed instance.

Pagsasama-samahin namin ang mga operasyong ito tulad ng sumusunod (matatagpuan ang code sa file na src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Dahil nagsasagawa kami ng operasyon ng KStream.map, ang ibinalik na instance ng KStream ay awtomatikong nahahati muli kapag ginamit ito sa isang koneksyon.

Nakumpleto na namin ang proseso ng conversion, susunod na kailangan naming lumikha ng KTable object para sa pagbabasa ng stock news.

Paglikha ng KTable para sa stock na balita

Sa kabutihang palad, ang paglikha ng isang KTable object ay tumatagal lamang ng isang linya ng code (ang code ay matatagpuan sa src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Kapansin-pansin na walang mga bagay na Serde ang kinakailangang tukuyin, dahil ginagamit ang string Serdes sa mga setting. Gayundin, sa pamamagitan ng paggamit ng EARLIEST enumeration, ang talahanayan ay puno ng mga tala sa pinakadulo simula.

Ngayon ay maaari na tayong magpatuloy sa huling hakbang - koneksyon.

Pagkonekta ng mga update sa balita gamit ang data ng bilang ng transaksyon

Ang paglikha ng isang koneksyon ay hindi mahirap. Gagamit kami ng kaliwang pagsali kung sakaling walang stock na balita para sa nauugnay na industriya (ang kinakailangang code ay makikita sa file na src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Ang leftJoin operator na ito ay medyo simple. Hindi tulad ng mga pagsali sa Kabanata 4, ang paraan ng JoinWindow ay hindi ginagamit dahil kapag nagsasagawa ng KStream-KTable na pagsali, mayroon lamang isang entry para sa bawat key sa KTable. Ang ganitong koneksyon ay hindi limitado sa oras: ang rekord ay nasa KTable o wala. Ang pangunahing konklusyon: gamit ang mga bagay na KTable maaari mong pagyamanin ang KStream na may hindi gaanong madalas na na-update na data ng sanggunian.

Ngayon ay titingnan natin ang isang mas mahusay na paraan upang pagyamanin ang mga kaganapan mula sa KStream.

5.3.4. GlobalKTable na mga bagay

Gaya ng nakikita mo, kailangang pagyamanin ang mga stream ng kaganapan o magdagdag ng konteksto sa mga ito. Sa Kabanata 4 nakita mo ang mga koneksyon sa pagitan ng dalawang KStream object, at sa nakaraang seksyon nakita mo ang koneksyon sa pagitan ng isang KStream at isang KTable. Sa lahat ng mga kasong ito, kinakailangan na muling hatiin ang stream ng data kapag nagmamapa ng mga susi sa isang bagong uri o halaga. Minsan tahasang ginagawa ang repartitioning, at kung minsan ay awtomatiko itong ginagawa ng Kafka Streams. Ang muling paghahati ay kinakailangan dahil ang mga susi ay nagbago at ang mga talaan ay dapat mapunta sa mga bagong seksyon, kung hindi ang koneksyon ay magiging imposible (ito ay tinalakay sa Kabanata 4, sa seksyong "Muling paghahati ng data" sa subsection 4.2.4).

Ang muling paghahati ay may gastos

Ang muling paghahati ay nangangailangan ng mga gastos - mga karagdagang gastos sa mapagkukunan para sa paglikha ng mga intermediate na paksa, pag-iimbak ng mga duplicate na data sa ibang paksa; nangangahulugan din ito ng tumaas na latency dahil sa pagsusulat at pagbabasa mula sa paksang ito. Bukod pa rito, kung kailangan mong sumali sa higit sa isang aspeto o dimensyon, dapat mong i-chain ang mga pagsali, imapa ang mga talaan gamit ang mga bagong key, at patakbuhin muli ang proseso ng muling paghahati.

Kumokonekta sa mas maliliit na dataset

Sa ilang mga kaso, ang dami ng reference na data na ikokonekta ay medyo maliit, kaya ang kumpletong mga kopya nito ay madaling magkasya nang lokal sa bawat node. Para sa mga sitwasyong tulad nito, ang Kafka Streams ay nagbibigay ng GlobalKTable na klase.

Ang mga globalKTable na pagkakataon ay natatangi dahil ang application ay kinokopya ang lahat ng data sa bawat isa sa mga node. At dahil ang lahat ng data ay naroroon sa bawat node, hindi na kailangang hatiin ang stream ng kaganapan sa pamamagitan ng reference data key upang ito ay magagamit sa lahat ng mga partisyon. Maaari ka ring gumawa ng mga keyless na pagsali gamit ang mga bagay na GlobalKTable. Bumalik tayo sa isa sa mga nakaraang halimbawa para ipakita ang feature na ito.

Pagkonekta ng mga KStream object sa GlobalKTable objects

Sa subsection 5.3.2, nagsagawa kami ng window aggregation ng mga exchange transaction ng mga mamimili. Ang mga resulta ng pagsasama-samang ito ay mukhang ganito:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Bagama't ang mga resultang ito ay nagsilbi sa layunin, ito ay magiging mas kapaki-pakinabang kung ang pangalan ng customer at buong pangalan ng kumpanya ay ipinakita din. Upang idagdag ang pangalan ng customer at pangalan ng kumpanya, maaari kang gumawa ng mga normal na pagsali, ngunit kakailanganin mong gumawa ng dalawang pangunahing pagmamapa at muling paghahati. Sa GlobalKTable maiiwasan mo ang gastos ng mga naturang operasyon.

Upang gawin ito, gagamitin namin ang countStream object mula sa Listing 5.11 (ang kaukulang code ay matatagpuan sa src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) at ikonekta ito sa dalawang GlobalKTable object.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Napag-usapan na natin ito dati, kaya hindi ko na uulitin. Ngunit tandaan ko na ang code sa toStream().map function ay na-abstract sa isang function object sa halip na isang inline na lambda expression para sa kapakanan ng pagiging madaling mabasa.

Ang susunod na hakbang ay ang magdeklara ng dalawang instance ng GlobalKTable (ang ipinapakitang code ay makikita sa file na src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"

Pakitandaan na ang mga pangalan ng paksa ay inilalarawan gamit ang mga enumerated na uri.

Ngayong handa na natin ang lahat ng sangkap, ang natitira na lang ay isulat ang code para sa koneksyon (na makikita sa file na src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Bagama't mayroong dalawang pagsasama sa code na ito, nakakadena ang mga ito dahil wala sa kanilang mga resulta ang ginagamit nang hiwalay. Ang mga resulta ay ipinapakita sa dulo ng buong operasyon.

Kapag pinatakbo mo ang operasyon ng pagsali sa itaas, makakakuha ka ng mga resulta tulad nito:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Ang kakanyahan ay hindi nagbago, ngunit ang mga resultang ito ay mukhang mas malinaw.

Kung magbibilang ka pababa sa Kabanata 4, nakakita ka na ng ilang uri ng mga koneksyon sa pagkilos. Nakalista ang mga ito sa talahanayan. 5.2. Sinasalamin ng talahanayang ito ang mga kakayahan sa pagkakakonekta mula sa bersyon 1.0.0 ng Kafka Streams; Maaaring may magbago sa mga susunod na release.

Ang aklat na "Kafka Streams in Action. Mga application at microservice para sa real-time na trabaho"
Upang tapusin ang lahat, balikan natin ang mga pangunahing kaalaman: maaari mong ikonekta ang mga stream ng kaganapan (KStream) at i-update ang mga stream (KTable) gamit ang lokal na estado. Bilang kahalili, kung ang laki ng reference na data ay hindi masyadong malaki, maaari mong gamitin ang GlobalKTable object. Ginagaya ng GlobalKTables ang lahat ng partition sa bawat node ng application ng Kafka Streams, tinitiyak na available ang lahat ng data anuman ang partition na tumutugma sa key.

Susunod na makikita natin ang tampok na Kafka Streams, salamat sa kung saan maaari nating obserbahan ang mga pagbabago sa estado nang hindi kumukonsumo ng data mula sa isang paksa ng Kafka.

5.3.5. Mapagtatanong na estado

Nagawa na namin ang ilang mga operasyon na kinasasangkutan ng estado at palaging naglalabas ng mga resulta sa console (para sa mga layunin ng pag-unlad) o isulat ang mga ito sa isang paksa (para sa mga layunin ng produksyon). Kapag nagsusulat ng mga resulta sa isang paksa, kailangan mong gumamit ng Kafka consumer upang tingnan ang mga ito.

Ang pagbabasa ng data mula sa mga paksang ito ay maaaring ituring na isang uri ng mga materyal na pananaw. Para sa aming mga layunin, maaari naming gamitin ang kahulugan ng isang materialized view mula sa Wikipedia: "...isang pisikal na database object na naglalaman ng mga resulta ng isang query. Halimbawa, maaaring ito ay isang lokal na kopya ng malayuang data, o isang subset ng mga row at/o column ng isang talahanayan o mga resulta ng pagsasama, o isang talahanayan ng buod na nakuha sa pamamagitan ng pagsasama-sama" (https://en.wikipedia.org/wiki /Materialized_view).

Binibigyang-daan ka rin ng Kafka Streams na magpatakbo ng mga interactive na query sa mga tindahan ng estado, na nagbibigay-daan sa iyong direktang basahin ang mga materyal na view na ito. Mahalagang tandaan na ang query sa state store ay isang read-only na operasyon. Tinitiyak nito na hindi mo kailangang mag-alala tungkol sa aksidenteng paggawa ng state inconsistent habang pinoproseso ng iyong application ang data.

Ang kakayahang direktang mag-query sa mga tindahan ng estado ay mahalaga. Nangangahulugan ito na maaari kang lumikha ng mga application ng dashboard nang hindi kinakailangang kumuha muna ng data mula sa consumer ng Kafka. Pinatataas din nito ang kahusayan ng application, dahil sa katotohanan na hindi na kailangang magsulat muli ng data:

  • salamat sa lokalidad ng data, mabilis silang ma-access;
  • Ang pagdoble ng data ay tinanggal, dahil hindi ito nakasulat sa panlabas na imbakan.

Ang pangunahing bagay na gusto kong tandaan mo ay maaari kang mag-query ng estado nang direkta mula sa loob ng iyong aplikasyon. Ang mga pagkakataong ibinibigay nito sa iyo ay hindi maaaring sobra-sobra. Sa halip na kumonsumo ng data mula sa Kafka at mag-imbak ng mga tala sa isang database para sa application, maaari kang magtanong sa mga tindahan ng estado na may parehong resulta. Ang mga direktang query sa mga tindahan ng estado ay nangangahulugan ng mas kaunting code (walang mamimili) at mas kaunting software (hindi na kailangan ng isang talahanayan ng database upang iimbak ang mga resulta).

Medyo marami na kaming natalakay sa kabanatang ito, kaya iiwan namin ang aming talakayan ng mga interactive na query laban sa mga tindahan ng estado sa ngayon. Ngunit huwag mag-alala: sa Kabanata 9, gagawa kami ng simpleng dashboard application na may mga interactive na query. Gagamitin nito ang ilan sa mga halimbawa mula dito at mga nakaraang kabanata upang ipakita ang mga interactive na query at kung paano mo maidaragdag ang mga ito sa mga application ng Kafka Streams.

Buod

  • Ang mga bagay ng KStream ay kumakatawan sa mga stream ng mga kaganapan, na maihahambing sa mga pagsingit sa isang database. Ang mga KTable na bagay ay kumakatawan sa mga stream ng update, na mas katulad ng mga update sa isang database. Ang laki ng bagay na KTable ay hindi lumalaki, ang mga lumang talaan ay pinalitan ng mga bago.
  • Ang mga KTable na bagay ay kinakailangan para sa mga pagpapatakbo ng pagsasama-sama.
  • Gamit ang mga pagpapatakbo ng windowing, maaari mong hatiin ang pinagsama-samang data sa mga time bucket.
  • Salamat sa mga bagay na GlobalKTable, maa-access mo ang reference na data saanman sa application, anuman ang pagkahati.
  • Ang mga koneksyon sa pagitan ng KStream, KTable at GlobalKTable na mga bagay ay posible.

Sa ngayon, nakatuon kami sa pagbuo ng mga application ng Kafka Stream gamit ang mataas na antas na KStream DSL. Bagama't ang mataas na antas na diskarte ay nagpapahintulot sa iyo na lumikha ng maayos at maigsi na mga programa, ang paggamit nito ay kumakatawan sa isang trade-off. Ang pagtatrabaho sa DSL KStream ay nangangahulugan ng pagtaas ng pagiging maikli ng iyong code sa pamamagitan ng pagbabawas ng antas ng kontrol. Sa susunod na kabanata, titingnan natin ang low-level na handler node API at susubukan ang iba pang mga trade-off. Ang mga programa ay magiging mas mahaba kaysa sa dati, ngunit magagawa naming lumikha ng halos anumang handler node na maaaring kailanganin namin.

β†’ Higit pang mga detalye tungkol sa aklat ay matatagpuan sa website ng publisher

β†’ Para sa Habrozhiteli 25% na diskwento gamit ang kupon - Mga Agos ng Kafka

β†’ Sa pagbabayad para sa papel na bersyon ng libro, isang elektronikong libro ang ipapadala sa pamamagitan ng e-mail.

Pinagmulan: www.habr.com

Magdagdag ng komento