It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime" Hallo, ynwenners fan Khabro! Dit boek is geskikt foar elke ûntwikkelder dy't threadferwurking wol begripe. Begryp fan ferdielde programmearring sil jo helpe om Kafka en Kafka Streams better te begripen. It soe moai wêze om it Kafka-kader sels te kennen, mar dit is net nedich: ik sil jo alles fertelle wat jo nedich binne. Erfarne Kafka-ûntwikkelders en novices sille leare hoe't jo ynteressante streamferwurkingsapplikaasjes kinne meitsje mei de Kafka Streams-bibleteek yn dit boek. Intermediate en avansearre Java-ûntwikkelders dy't al bekend binne mei begripen lykas serialisaasje sille leare har feardigens oan te passen om Kafka Streams-applikaasjes te meitsjen. De boarnekoade fan it boek is skreaun yn Java 8 en makket signifikant gebrûk fan Java 8 lambda-ekspresjesyntaksis, dus witten hoe't jo wurkje mei lambda-funksjes (sels yn in oare programmeartaal) sil handich wêze.

Úttreksel. 5.3. Aggregaasje- en finsteroperaasjes

Yn dizze seksje sille wy trochgean om de meast tasizzende dielen fan Kafka Streams te ferkennen. Oant no hawwe wy de folgjende aspekten fan Kafka Streams behannele:

  • it meitsjen fan in ferwurkingstopology;
  • gebrûk fan steat yn streamingapplikaasjes;
  • it útfieren fan gegevensstreamferbiningen;
  • ferskillen tusken evenemintstreamen (KStream) en updatestreamen (KTable).

Yn 'e folgjende foarbylden sille wy al dizze eleminten byinoar bringe. Jo sille ek leare oer finstering, in oare geweldige funksje fan streamingapplikaasjes. Us earste foarbyld sil in ienfâldige aggregaasje wêze.

5.3.1. Aggregaasje fan stock ferkeap troch yndustry sektor

Aggregaasje en groepearring binne fitale ark by it wurkjen mei streaminggegevens. Undersyk fan yndividuele records sa't se wurde ûntfongen is faak net genôch. Om ekstra ynformaasje út gegevens te heljen, is it nedich om se te groepearjen en te kombinearjen.

Yn dit foarbyld sille jo it kostúm oanmeitsje fan in deihanneler dy't it ferkeapvolumint fan oandielen fan bedriuwen yn ferskate yndustry moat folgje. Spesifyk binne jo ynteressearre yn 'e fiif bedriuwen mei de grutste oandielferkeap yn elke sektor.

Sa'n aggregaasje sil de folgjende ferskate stappen fereaskje om de gegevens oer te setten yn 'e winske foarm (yn algemiene termen).

  1. Meitsje in ûnderwerp-basearre boarne dy't rauwe stock trading ynformaasje publisearret. Wy sille in objekt fan it type StockTransaction moatte mapje oan in objekt fan it type ShareVolume. It punt is dat it StockTransaction-objekt ferkeapmetadata befettet, mar wy hawwe allinich gegevens nedich oer it oantal oandielen dat ferkocht wurdt.
  2. Groep ShareVolume gegevens troch stock symboal. Sadree't groepearre troch symboal, kinne jo ynstoart dizze gegevens yn subtotalen fan stock ferkeap folume. It is de muoite wurdich op te merken dat de metoade KStream.groupBy in eksimplaar fan it type KGroupedStream weromjout. En jo kinne in KTable-eksimplaar krije troch fierder de metoade KGroupedStream.reduce te neamen.

Wat is de KGroupedStream-ynterface

De metoaden KStream.groupBy en KStream.groupByKey jouwe in eksimplaar fan KGroupedStream werom. KGroupedStream is in tuskenlizzende fertsjintwurdiging fan in stream fan eveneminten nei groepearring troch kaaien. It is hielendal net bedoeld foar direkte wurk mei. Ynstee dêrfan wurdt KGroupedStream brûkt foar aggregaasje operaasjes, dy't altyd resultearje yn in KTable. En sûnt it resultaat fan aggregaasje operaasjes is in KTable en se brûke in steat winkel, is it mooglik dat net alle updates as gefolch wurde ferstjoerd fierder de pipeline.

De metoade KTable.groupBy jout in soartgelikense KGroupedTable werom - in tuskenlizzende fertsjintwurdiging fan de stream fan updates, hergroepearre troch kaai.

Litte wy in koarte pauze nimme en nei Fig. 5.9, dat lit sjen wat wy hawwe berikt. Dizze topology soe jo al tige bekend wêze moatte.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Litte wy no nei de koade foar dizze topology sjen (dy kin fûn wurde yn it bestân src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
De opjûne koade wurdt ûnderskieden troch syn koarteheid en it grutte folume fan aksjes útfierd yn ferskate rigels. Jo kinne wat nijs fernimme yn 'e earste parameter fan' e builder.stream-metoade: in wearde fan 'e enum-type AutoOffsetReset.EARLIEST (der is ek in LATEST), ynsteld mei de metoade Consumed.withOffsetResetPolicy. Dit opsommingstype kin brûkt wurde om in offset-resetstrategy oan te jaan foar elke KStream of KTable en hat foarrang boppe de offset-reset-opsje út de konfiguraasje.

GroupByKey en GroupBy

De KStream-ynterface hat twa metoaden foar groepearjen fan records: GroupByKey en GroupBy. Beide jouwe in KGroupedTable werom, dus jo freegje jo miskien ôf wat it ferskil is tusken har en wannear't jo hokker brûke moatte?

De metoade GroupByKey wurdt brûkt as de kaaien yn 'e KStream al net leech binne. En it wichtichste, de flagge "fereasket opnij partitionearjen" waard nea ynsteld.

De GroupBy metoade giet derfan út dat jo hawwe feroare de groep kaaien, sadat de repartition flagge is ynsteld op wier. It útfieren fan joins, aggregaasjes, ensfh. nei de GroupBy-metoade sil resultearje yn automatyske opnij partitionearring.
Gearfetting: As it mooglik is, moatte jo GroupByKey brûke ynstee fan GroupBy.

It is dúdlik wat de mapValues ​​en groupBy-metoaden dogge, dus litte wy ris nei de som () metoade sjen (fûn yn src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
De metoade ShareVolume.sum jout it rinnende totaal werom fan it folume fan stockferkeap, en it resultaat fan 'e hiele keatling fan berekkeningen is in KTable-objekt . No begripe jo de rol KTable spilet. As ShareVolume-objekten oankomme, bewarret it oerienkommende KTable-objekt de lêste aktuele fernijing. It is wichtich om te ûnthâlden dat alle updates wurde wjerspegele yn de foarige shareVolumeKTable, mar net allegearre wurde ferstjoerd fierder.

Wy brûke dan dizze KTable om te aggregearjen (troch oantal ferhannele oandielen) om te kommen ta de fiif bedriuwen mei de heechste folumes fan oandielen dy't yn elke yndustry wurde ferhannele. Us aksjes yn dit gefal sille fergelykber wêze mei dy foar de earste aggregaasje.

  1. Utfiere in oare groepBy operaasje om yndividuele ShareVolume-objekten te groepearjen troch yndustry.
  2. Begjin gearfetsje ShareVolume-objekten. Dizze kear is it aggregaasjeobjekt in prioriteitswachtrige fan fêste grutte. Yn dizze wachtrige fan fêste grutte wurde allinich de fiif bedriuwen mei de grutste hoemannichten ferkochte oandielen behâlden.
  3. Map de wachtrijen út 'e foarige paragraaf oan in tekenrige wearde en werom de top fiif meast ferhannele oandielen troch nûmer troch yndustry.
  4. Skriuw de resultaten yn tekenrige foarm nei it ûnderwerp.

Yn Fig. Figure 5.10 toant de gegevensstreamtopologygrafyk. Sa't jo sjen kinne, de twadde ronde fan ferwurkjen is frij simpel.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
No't wy in dúdlik begryp hawwe fan 'e struktuer fan dizze twadde ferwurkingsronde, kinne wy ​​​​omgean nei de boarnekoade (jo sille it fine yn it bestân src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Dizze initializer befettet in fêste Queue-fariabele. Dit is in oanpaste foarwerp dat is in adapter foar java.util.TreeSet dat wurdt brûkt om te folgjen de top N resultaten yn ôfnimmende folchoarder fan oandielen ferhannele.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Jo hawwe de groupBy- en mapValues-oanroppen al sjoen, dus wy sille net yngean op dy (wy neame de KTable.toStream-metoade om't de KTable.print-metoade is ôfkard). Mar jo hawwe de KTable-ferzje fan aggregate() noch net sjoen, dus wy sille in bytsje tiid besteegje oan it besprekken dat.

Sa't jo ûnthâlde, wat makket KTable oars is dat records mei deselde kaaien wurde beskôge updates. KTable ferfangt de âlde yngong mei in nije. Aggregaasje komt op in fergelykbere manier foar: de lêste records mei deselde kaai wurde aggregearre. As in record oankomt, wurdt it tafoege oan it FixedSizePriorityQueue-klasseeksimplaar mei in adder (twadde parameter yn 'e aggregearre metoadeoprop), mar as in oar record al bestiet mei deselde kaai, dan wurdt it âlde record fuortsmiten mei in subtractor (tredde parameter yn de aggregearre metoade oprop).

Dit alles betsjut dat ús aggregator, FixedSizePriorityQueue, net alle wearden mei ien kaai aggregearret, mar in bewegende som opslacht fan 'e hoemannichten fan 'e N meast ferhannele soarten oandielen. Elke ynkommende yngong befettet it totale oantal oandielen oant no ta ferkocht. KTable sil jo ynformaasje jaan oer hokker oandielen fan bedriuwen op it stuit it meast ferhannele binne, sûnder rôljende aggregaasje fan elke fernijing te fereaskje.

Wy learden twa wichtige dingen te dwaan:

  • groep wearden yn KTable troch in mienskiplike kaai;
  • útfiere nuttige operaasjes lykas rollup en aggregaasje op dizze groepearre wearden.

Wisten hoe't jo dizze operaasjes útfiere, is wichtich om de betsjutting te begripen fan 'e gegevens dy't troch in Kafka Streams-applikaasje ferpleatse en te begripen hokker ynformaasje it draacht.

Wy hawwe ek guon fan 'e kaaibegripen byinoar brocht dy't earder yn dit boek besprutsen binne. Yn haadstik 4 hawwe wy besprutsen hoe fouttolerant, lokale steat wichtich is foar in streamingapplikaasje. It earste foarbyld yn dit haadstik liet sjen wêrom't lokale steat sa wichtich is - it jout jo de mooglikheid om by te hâlden hokker ynformaasje jo al sjoen hawwe. Lokale tagong foarkomt netwurkfertragingen, wêrtroch de applikaasje performanter en flaterbestindich is.

By it útfieren fan in oprol- of aggregaasjeoperaasje moatte jo de namme fan 'e steatwinkel opjaan. De rollup- en aggregaasje-operaasjes jouwe in KTable-eksimplaar werom, en de KTable brûkt steatopslach om âlde resultaten te ferfangen troch nije. Lykas jo hawwe sjoen, wurde net alle updates troch de pipeline stjoerd, en dit is wichtich om't aggregaasjeoperaasjes binne ûntworpen om gearfettingynformaasje te produsearjen. As jo ​​de lokale steat net tapasse, sil KTable alle aggregaasje- en oprolresultaten trochstjoere.

Folgjende sille wy sjen nei it útfieren fan operaasjes lykas aggregaasje binnen in spesifike tiidperioade - saneamde finsteroperaasjes.

5.3.2. Finster operaasjes

Yn 'e foarige paragraaf yntrodusearre wy sliding convolution en aggregaasje. De applikaasje útfierd in trochgeande roll-up fan stock ferkeap folume, folge troch aggregaasje fan de fiif meast ferhannele oandielen op 'e útwikseling.

Soms is sa'n trochgeande aggregaasje en oprol fan resultaten nedich. En soms moatte jo operaasjes allinich oer in bepaalde perioade útfiere. Berekkenje bygelyks hoefolle wikseltransaksjes binne makke mei oandielen fan in bepaald bedriuw yn 'e lêste 10 minuten. Of hoefolle brûkers yn 'e lêste 15 minuten op in nije advertinsjebanner klikke. In applikaasje kin sokke operaasjes meardere kearen útfiere, mar mei resultaten dy't allinich jilde foar bepaalde perioaden (tiidfinsters).

Counting útwikseling transaksjes troch keaper

Yn it folgjende foarbyld sille wy oandieltransaksjes folgje oer meardere hannelers - itsij grutte organisaasjes as tûke yndividuele finansiers.

D'r binne twa mooglike redenen foar dizze tracking. Ien fan har is de needsaak om te witten wat merklieders keapje/ferkeapje. As dizze grutte spilers en ferfine ynvestearders kâns sjogge, makket it sin om har strategy te folgjen. De twadde reden is de winsk om alle mooglike tekens fan yllegale ynsiderhannel te spotten. Om dit te dwaan moatte jo de korrelaasje analysearje fan grutte ferkeappunten mei wichtige parseberjochten.

Sokke tracking bestiet út de folgjende stappen:

  • it meitsjen fan in stream foar it lêzen fan it ûnderwerp fan aksjetransaksjes;
  • groepearjen ynkommende records troch keaper ID en stock symboal. It oproppen fan de metoade groupBy jout in eksimplaar fan de klasse KGroupedStream werom;
  • De metoade KGroupedStream.windowedBy jout in gegevensstream werom dy't beheind is ta in tiidfinster, wêrtroch finsteraggregaasje mooglik is. Ofhinklik fan it finstertype wurdt of in TimeWindowedKStream of in SessionWindowedKStream weromjûn;
  • transaksje telle foar de aggregaasje operaasje. De windowed gegevensstream bepaalt oft in bepaald rekord wurdt rekken holden yn dizze telling;
  • skriuw resultaten nei in ûnderwerp of útfiere se nei de konsole tidens ûntwikkeling.

De topology fan dizze applikaasje is ienfâldich, mar in dúdlik byld derfan soe nuttich wêze. Litte wy ris nei Fig. 5.11.

Folgjende sille wy sjen nei de funksjonaliteit fan finster operaasjes en de byhearrende koade.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"

Finstertypen

D'r binne trije soarten finsters yn Kafka Streams:

  • sessional;
  • "tumbling";
  • sliding / hopping.

Hokker te kiezen hinget ôf fan jo bedriuweasken. Tumbling- en springfinsters binne tiidbeheind, wylst sesjefinsters wurde beheind troch brûkersaktiviteit - de doer fan 'e sesje(s) wurdt allinich bepaald troch hoe aktyf de brûker is. It wichtichste ding om te ûnthâlden is dat alle finstertypen binne basearre op de datum-/tiidstempels fan 'e yngongen, net de systeemtiid.

Dêrnei implementearje wy ús topology mei elk fan 'e finstertypen. De folsleine koade sil allinich yn it earste foarbyld jûn wurde foar oare soarten finsters sil neat feroarje, útsein it type finsteroperaasje.

Sesje finsters

Sesje finsters binne hiel oars as alle oare soarten finsters. Se wurde net sa folle beheind troch tiid as troch de aktiviteit fan 'e brûker (of de aktiviteit fan' e entiteit dy't jo wolle folgje). Sesjefinsters wurde beheine troch perioaden fan ynaktiviteit.

Ofbylding 5.12 yllustrearret it konsept fan sesjefinsters. De lytsere sesje sil fusearje mei de sesje oan de linkerkant. En de sesje oan 'e rjochterkant sil apart wêze om't it folget op in lange perioade fan ynaktiviteit. Sesje finsters binne basearre op brûker aktiviteit, mar brûk datum / tiid stimpels út yngongen foar in bepale hokker sesje de yngong heart ta.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"

Sesjefinsters brûke om stocktransaksjes te folgjen

Litte wy sesjefinsters brûke om ynformaasje oer útwikselingstransaksjes te fangen. De ymplemintaasje fan sesjefinsters wurdt werjûn yn Listing 5.5 (dy't te finen is yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Jo hawwe de measte operaasjes al sjoen yn dizze topology, dus it is net nedich om se hjir nochris nei te sjen. Mar d'r binne hjir ek ferskate nije eleminten, dy't wy no sille beprate.

Elke groupBy-operaasje fiert typysk in soarte fan aggregaasjeoperaasje (aggregaasje, rollup of tellen). Jo kinne of kumulative aggregaasje útfiere mei in rinnend totaal, of finsteraggregaasje, dy't rekken hâldt mei records binnen in spesifisearre tiidfinster.

De koade yn Listing 5.5 telt it oantal transaksjes binnen sesjefinsters. Yn Fig. 5.13 dizze aksjes wurde analysearre stap foar stap.

Troch windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) op te roppen, meitsje wy in sesjefinster mei in ynaktiviteit-ynterval fan 20 sekonden en in persistinsje-ynterval fan 15 minuten. In ynaktive ynterval fan 20 sekonden betsjut dat de applikaasje elke yngong sil befetsje dy't binnen 20 sekonden nei it ein of begjin fan 'e aktuele sesje komt yn 'e aktuele (aktive) sesje.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Folgjende spesifisearje wy hokker aggregaasjeoperaasje moat wurde útfierd yn it sesjefinster - yn dit gefal, telle. As in ynkommende yngong bûten it ynaktiviteitsfinster falt (wererkant fan it datum-/tiidstempel), makket de applikaasje in nije sesje. Behâldynterval betsjut in sesje foar in bepaalde tiid te behâlden en soarget foar lette gegevens dy't útwreidzje dan de ynaktiviteitsperioade fan 'e sesje, mar noch kinne wurde taheakke. Derneist, it begjin en ein fan 'e nije sesje dy't ûntstiet út de fúzje oerienkomt mei de ierste en lêste datum / tiid stimpel.

Litte wy nei in pear yngongen sjen fan 'e telmetoade om te sjen hoe't sesjes wurkje (tabel 5.1).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
As records oankomme, sykje wy nei besteande sesjes mei deselde kaai, in eintiid minder as de hjoeddeiske datum / tiid stimpel - ynaktiviteit ynterval, en in begjintiid grutter as de hjoeddeiske datum / tiid stimpel + ynaktiviteit ynterval. Troch dit yn rekken brocht, fjouwer yngongen út tabel. 5.1 wurde fusearre yn ien sesje as folget.

1. Record 1 komt earst oan, dus de starttiid is lyk oan de eintiid en is 00:00:00.

2. Folgjende komt yngong 2, en wy sykje sesjes dy't einigje net earder as 23:59:55 en begjinne net letter as 00:00:35. Wy fine rekord 1 en kombinearje sesjes 1 en 2. Wy nimme de starttiid fan sesje 1 (earder) en de eintiid fan sesje 2 (letter), sadat ús nije sesje begjint om 00:00:00 en einiget om 00:00 oere: 15:XNUMX.

3. Record 3 komt, wy sykje sesjes tusken 00:00:30 en 00:01:10 en fine gjin. Foegje in twadde sesje ta foar de kaai 123-345-654,FFBE, begjinnend en einigje om 00:00:50.

4. Record 4 komt en wy sykje sesjes tusken 23:59:45 en 00:00:25. Dizze kear binne beide sesjes 1 en 2 fûn Alle trije sesjes wurde kombinearre yn ien, mei in starttiid fan 00:00:00 en in eintiid fan 00:00:15.

Ut wat wurdt beskreaun yn dizze paragraaf, is it wurdich ûnthâlden de folgjende wichtige nuânses:

  • sesjes binne gjin finsters fan fêste grutte. De doer fan in sesje wurdt bepaald troch de aktiviteit binnen in opjûne perioade;
  • De datum-/tiidstempels yn 'e gegevens bepale oft it evenemint falt binnen in besteande sesje of yn in idle perioade.

Folgjende sille wy it folgjende type finster besprekke - "tumbling" finsters.

"Tumbling" finsters

Tumbling finsters fange eveneminten dy't binnen in bepaalde perioade falle. Stel jo foar dat jo alle oandieltransaksjes fan in bepaald bedriuw elke 20 sekonden moatte fange, sadat jo alle eveneminten yn dy perioade sammelje. Oan 'e ein fan it ynterval fan 20 sekonden rôlet it finster oer en ferpleatst nei in nij observaasje-ynterval fan 20 sekonden. Figuer 5.14 yllustrearret dizze situaasje.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Sa't jo sjen kinne, binne alle barrens ûntfongen yn 'e lêste 20 sekonden opnommen yn it finster. Oan 'e ein fan dizze perioade wurdt in nij finster makke.

Listing 5.6 lit koade sjen dy't it gebrûk fan tumbling-finsters toant om stocktransaksjes elke 20 sekonden te fangen (fûn yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Mei dizze lytse feroaring oan 'e TimeWindows.of metoade oprop, kinne jo gebrûk meitsje fan in tumbling finster. Dit foarbyld neamt de metoade oant () net, sadat it standert retensjonsinterval fan 24 oeren brûkt wurdt.

Uteinlik is it tiid om troch te gean nei de lêste fan 'e finsteropsjes - "hopping" finsters.

Sliding ("springende") finsters

Sliding / hopping finsters binne fergelykber mei tumbling finsters, mar mei in lyts ferskil. Sliding finsters wachtsje net oant it ein fan it tiidynterval foardat jo in nij finster meitsje om resinte eveneminten te ferwurkjen. Se begjinne nije berekkeningen nei in wachtsjen ynterval minder as it finster doer.

Om de ferskillen tusken tumbling en springende finsters te yllustrearjen, litte wy weromgean nei it foarbyld fan it tellen fan beurstransaksjes. Us doel is noch om it oantal transaksjes te tellen, mar wy wolle net de hiele tiid wachtsje foardat wy de teller bywurkje. Ynstee, wy sille bywurkje de teller mei koartere yntervallen. Wy sille bygelyks noch elke 20 sekonden it oantal transaksjes telle, mar de teller elke 5 sekonden bywurkje, lykas werjûn yn Fig. 5.15. Yn dit gefal einigje wy mei trije resultaatfinsters mei oerlappende gegevens.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Listing 5.7 toant de koade foar it definiearjen fan glidefinsters (fûn yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
In tumbling finster kin wurde omboud ta in hopping finster troch in oprop ta de advanceBy () metoade. Yn it werjûn foarbyld is it besparringsinterval 15 minuten.

Jo seagen yn dizze seksje hoe't jo aggregaasjeresultaten kinne beheine ta tiidfinsters. Benammen wol ik dat jo de folgjende trije dingen út dizze seksje ûnthâlde:

  • de grutte fan sesjefinsters wurdt net beheind troch tiidperioade, mar troch brûkersaktiviteit;
  • "tumbling" finsters jouwe in oersjoch fan eveneminten binnen in opjûne perioade;
  • De doer fan springende finsters is fêst, mar se wurde faak bywurke en kinne oerlappende yngongen yn alle finsters befetsje.

Folgjende sille wy leare hoe't jo in KTable weromsette nei in KStream foar in ferbining.

5.3.3. KStream en KTable-objekten ferbine

Yn haadstik 4 besprutsen wy it ferbinen fan twa KStream-objekten. No moatte wy leare hoe't jo KTable en KStream ferbine kinne. Dit kin nedich wêze foar de folgjende ienfâldige reden. KStream is in stream fan records, en KTable is in stream fan record updates, mar soms kinne jo ekstra kontekst taheakje wolle oan de record stream mei updates fan de KTable.

Litte wy gegevens nimme oer it oantal beurstransaksjes en kombinearje se mei beursnijs foar de relevante yndustry. Hjir is wat jo moatte dwaan om dit te berikken jûn de koade dy't jo al hawwe.

  1. Konvertearje in KTable-objekt mei gegevens oer it oantal stocktransaksjes yn in KStream, folge troch it ferfangen fan de kaai mei de kaai dy't de yndustrysektor oanjout dy't oerienkomt mei dit stocksymboal.
  2. Meitsje in KTable-objekt dat gegevens lêst fan in ûnderwerp mei beursnijs. Dizze nije KTable sil wurde kategorisearre troch yndustry sektor.
  3. Ferbine nijsupdates mei ynformaasje oer it oantal beurstransaksjes per sektorsektor.

Litte wy no sjen hoe't jo dit aksjeplan útfiere.

Konvertearje KTable nei KSream

Om KTable te konvertearjen nei KStream moatte jo it folgjende dwaan.

  1. Rop de metoade KTable.toStream() op.
  2. Troch de KStream.map-metoade oan te roppen, ferfange de kaai mei de yndustrynamme, en helje dan it TransactionSummary-objekt út it Windowed-eksimplaar.

Wy sille dizze operaasjes as folgjend keatling meitsje (de koade is te finen yn it bestân src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Om't wy in KStream.map-operaasje útfiere, wurdt de weromjûne KStream-eksimplaar automatysk opnij partitionearre as it wurdt brûkt yn in ferbining.

Wy hawwe it konverzjeproses foltôge, folgjende moatte wy in KTable-objekt meitsje foar it lêzen fan stocknijs.

Skepping fan KTable foar stock nijs

Gelokkich nimt it meitsjen fan in KTable-objekt mar ien rigel koade (de koade kin fûn wurde yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
It is de muoite wurdich op te merken dat gjin Serde-objekten moatte wurde oantsjutte, om't string Serdes wurde brûkt yn 'e ynstellings. Ek troch it brûken fan de EARLIEST enumeraasje, wurdt de tabel oan it begjin fol mei records.

No kinne wy ​​​​gean nei de lêste stap - ferbining.

Ferbine nijsupdates mei transaksje telle gegevens

In ferbining meitsje is net dreech. Wy sille gebrûk meitsje fan in links join yn it gefal dat der gjin stock nijs foar de oanbelangjende yndustry (de nedige koade is te finen yn de triem src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Dizze leftJoin-operator is frij simpel. Oars as de joins yn haadstik 4, wurdt de JoinWindow-metoade net brûkt, om't by it útfieren fan in KStream-KTable join, d'r mar ien yngong is yn 'e KTable foar elke kaai. Sa'n ferbining is net beheind yn 'e tiid: it rekord is of yn' e KTable of ôfwêzich. De wichtichste konklúzje: mei KTable-objekten kinne jo KStream ferrykje mei minder faak bywurke referinsjegegevens.

No sille wy sjen nei in effisjintere manier om eveneminten fan KStream te ferrykjen.

5.3.4. GlobalKTable objekten

Lykas jo kinne sjen, is d'r needsaak om evenemintestreamen te ferrykjen of kontekst oan te foegjen. Yn haadstik 4 seagen jo de ferbiningen tusken twa KStream-objekten, en yn 'e foarige paragraaf seagen jo de ferbining tusken in KStream en in KTable. Yn al dizze gefallen is it nedich om de gegevensstream opnij te dielen by it yn kaart bringen fan de kaaien nei in nij type of wearde. Soms wurdt repartitioning eksplisyt dien, en soms docht Kafka Streams it automatysk. Re-partitioning is nedich om't de kaaien binne feroare en de records moatte einigje yn nije seksjes, oars sil de ferbining ûnmooglik wêze (dit waard besprutsen yn haadstik 4, yn 'e paragraaf "Re-partitioning data" yn subparagraaf 4.2.4).

Re-partitioning hat in kosten

Re-partitioning fereasket kosten - ekstra boarnekosten foar it meitsjen fan tuskenlizzende ûnderwerpen, opslaan fan dûbele gegevens yn in oar ûnderwerp; it betsjut ek ferhege latency troch skriuwen en lêzen fan dit ûnderwerp. Derneist, as jo moatte meidwaan oer mear dan ien aspekt of dimensje, moatte jo de joins ketting meitsje, de records yn kaart bringe mei nije kaaien, en it opnij partitioneringsproses opnij útfiere.

Ferbine mei lytsere datasets

Yn guon gefallen is it folume fan referinsjegegevens te ferbinen relatyf lyts, sadat folsleine kopyen dêrfan maklik lokaal op elke knooppunt passe kinne. Foar situaasjes lykas dizze leveret Kafka Streams de GlobalKTable-klasse.

GlobalKTable-eksimplaren binne unyk om't de applikaasje alle gegevens nei elk fan 'e knopen replikearret. En om't alle gegevens oanwêzich binne op elke knooppunt, is d'r gjin needsaak om de evenemintstream te dielen troch referinsjegegevenskaai, sadat it beskikber is foar alle partysjes. Jo kinne ek keyless joins meitsje mei help fan GlobalKTable-objekten. Litte wy weromgean nei ien fan 'e foarige foarbylden om dizze funksje te demonstrearjen.

KStream-objekten ferbine mei GlobalKTable-objekten

Yn subseksje 5.3.2 hawwe wy finsteraggregaasje fan wikseltransaksjes útfierd troch keapers. De resultaten fan dizze aggregaasje seagen der sa út:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Hoewol dizze resultaten it doel tsjinne, soe it nuttich west hawwe as de namme fan 'e klant en folsleine bedriuwsnamme ek werjûn wiene. Om de klantnamme en bedriuwsnamme ta te foegjen, kinne jo normale joins dwaan, mar jo moatte twa kaaimappings dwaan en opnij partitionearje. Mei GlobalKTable kinne jo de kosten fan sokke operaasjes foarkomme.

Om dit te dwaan, sille wy it countStream-objekt fan Listing 5.11 brûke (de oerienkommende koade is te finen yn src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) en ferbine it mei twa GlobalKTable-objekten.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Wy hawwe dit al earder besprutsen, dus ik sil it net werhelje. Mar ik merk op dat de koade yn de toStream ().map funksje wurdt abstrahearre yn in funksje foarwerp ynstee fan in ynline lambda útdrukking foar de lêsberens.

De folgjende stap is om twa eksimplaren fan GlobalKTable te ferklearjen (de werjûn koade kin fûn wurde yn it bestân src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"

Tink derom dat ûnderwerpnammen beskreaun wurde mei opnomde typen.

No't wy alle komponinten klear hawwe, bliuwt it allinich om de koade foar de ferbining te skriuwen (dy't te finen is yn it bestân src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Hoewol binne d'r twa joins yn dizze koade, binne se keatling, om't gjin fan har resultaten apart wurdt brûkt. De resultaten wurde werjûn oan 'e ein fan' e hiele operaasje.

As jo ​​​​de boppesteande join-operaasje útfiere, krije jo resultaten lykas dit:

{customer='Barney, Smith' company="Exxon", transactions= 17}

De essinsje is net feroare, mar dizze resultaten sjogge dúdliker.

As jo ​​ôftelle nei haadstik 4, hawwe jo al ferskate soarten ferbinings yn aksje sjoen. Se wurde neamd yn tabel. 5.2. Dizze tabel wjerspegelet de ferbiningsmooglikheden as fan ferzje 1.0.0 fan Kafka Streams; Der kin wat feroarje yn takomstige releases.

It boek "Kafka Streams in Action. Applikaasjes en mikrotsjinsten foar wurk yn realtime"
Om dingen yn te pakken, litte wy de basis opnij opnimme: jo kinne evenemintstreamen (KStream) ferbine en streamen (KTable) bywurkje mei de lokale steat. As alternatyf, as de grutte fan de referinsjegegevens net te grut is, kinne jo it GlobalKTable-objekt brûke. GlobalKTables replikearje alle partysjes nei elke Kafka Streams-applikaasjeknooppunt, en soargje derfoar dat alle gegevens beskikber binne nettsjinsteande hokker partysje de kaai oerienkomt.

Folgjende sille wy de Kafka Streams-funksje sjen, wêrtroch't wy steatferoarings kinne observearje sûnder gegevens te konsumearjen fan in Kafka-ûnderwerp.

5.3.5. Queryable steat

Wy hawwe al ferskate operaasjes útfierd mei steat en altyd útfiere de resultaten nei de konsole (foar ûntwikkeling doelen) of skriuwe se nei in ûnderwerp (foar produksje doelen). By it skriuwen fan resultaten nei in ûnderwerp, moatte jo in Kafka-konsumint brûke om se te besjen.

It lêzen fan gegevens fan dizze ûnderwerpen kin wurde beskôge as in soarte fan materialisearre werjeften. Foar ús doelen kinne wy ​​​​de definysje fan in materialisearre werjefte fan Wikipedia brûke: "... in fysike databankobjekt mei de resultaten fan in query. It kin bygelyks in lokale kopy wêze fan gegevens op ôfstân, of in subset fan 'e rigen en/of kolommen fan in tabel of gearfoegjen fan resultaten, of in gearfettingtabel krigen troch aggregaasje" (https://en.wikipedia.org/wiki) /Materialized_view).

Kafka Streams kinne jo ek ynteraktive fragen útfiere op steatwinkels, wêrtroch jo dizze materialisearre werjeften direkt kinne lêze. It is wichtich om te merken dat de fraach nei de steat winkel is in lês-allinne operaasje. Dit soarget derfoar dat jo gjin soargen hoege te meitsjen oer it per ongelok it meitsjen fan steat ynkonsistint wylst jo applikaasje gegevens ferwurket.

De mooglikheid om direkt query steat winkels is wichtich. Dit betsjut dat jo dashboardapplikaasjes kinne oanmeitsje sûnder earst gegevens fan 'e Kafka-konsumint op te heljen. It fergruttet ek de effisjinsje fan 'e applikaasje, fanwege it feit dat d'r gjin ferlet is om gegevens opnij te skriuwen:

  • troch de lokaasje fan 'e gegevens kinne se fluch tagong wurde;
  • duplikaasje fan gegevens wurdt eliminearre, sûnt it is net skreaun nei eksterne opslach.

It wichtichste ding dat ik wol dat jo ûnthâlde is dat jo steat direkt kinne oanfreegje fanút jo applikaasje. De kânsen dy't dit jo jout, kinne net oerskatte wurde. Ynstee fan gegevens fan Kafka te konsumearjen en records op te slaan yn in databank foar de applikaasje, kinne jo steatwinkels opfreegje mei itselde resultaat. Direkte fragen nei steatwinkels betsjutte minder koade (gjin konsumint) en minder software (gjin ferlet fan in databanktabel om de resultaten op te slaan).

Wy hawwe yn dit haadstik nochal wat grûn bedutsen, dus wy litte ús diskusje oer ynteraktive fragen tsjin steatswinkels foar no litte. Mar meitsje jo gjin soargen: yn haadstik 9 meitsje wy in ienfâldige dashboardapplikaasje mei ynteraktive fragen. It sil guon fan 'e foarbylden fan dizze en foarige haadstikken brûke om ynteraktive fragen te demonstrearjen en hoe't jo se kinne tafoegje oan Kafka Streams-applikaasjes.

Gearfetting

  • KSream-objekten fertsjintwurdigje streamen fan eveneminten, te fergelykjen mei ynfoegingen yn in databank. KTable-objekten fertsjintwurdigje updatestreamen, mear as fernijings nei in databank. De grutte fan it KTable-objekt groeit net, âlde records wurde ferfongen troch nije.
  • KTable objekten binne nedich foar aggregaasje operaasjes.
  • Mei it brûken fan finsteroperaasjes kinne jo aggregearre gegevens splitse yn tiidbakken.
  • Mei tank oan GlobalKTable-objekten kinne jo tagong krije ta referinsjegegevens oeral yn 'e applikaasje, nettsjinsteande partitionearring.
  • Ferbinings tusken KStream, KTable en GlobalKTable objekten binne mooglik.

Oant no hawwe wy ús rjochte op it bouwen fan Kafka Streams-applikaasjes mei de KStream DSL op hege nivo. Hoewol de oanpak op hege nivo jo makket om kreaze en beknopte programma's te meitsjen, fertsjinwurdiget it gebrûk fan in trade-off. Wurkje mei DSL KStream betsjut it fergrutsjen fan de beknoptheid fan jo koade troch it ferminderjen fan de mjitte fan kontrôle. Yn it folgjende haadstik sille wy sjen nei de API foar hannelsknooppunt op leech nivo en oare ôfwikselingen besykje. De programma's sille langer wêze as earder, mar wy sille hast elke handlerknooppunt kinne oanmeitsje dy't wy miskien nedich binne.

→ Mear details oer it boek is te finen op útjouwer syn webside

→ Foar Habrozhiteli 25% koarting mei coupon - Kafka Streams

→ By betelling foar de papieren ferzje fan it boek sil in elektroanysk boek per e-post ferstjoerd wurde.

Boarne: www.habr.com

Add a comment