Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk" Hallo Habrites! Hierdie boek is geskik vir enige ontwikkelaar wat streaming wil verstaan. Om verspreide programmering te verstaan, sal jou help om Kafka en Kafka Streams beter te verstaan. Dit sal lekker wees om die Kafka-raamwerk self te ken, maar dit is nie nodig nie: ek sal jou alles vertel wat jy nodig het. Ervare Kafka-ontwikkelaars sowel as beginners sal met hierdie boek leer hoe om interessante stroomtoepassings te bou deur die Kafka Streams-biblioteek te gebruik. Intermediêre tot gevorderde Java-ontwikkelaars, wat reeds vertroud is met konsepte soos serialisering, sal leer hoe om hul vaardighede toe te pas om Kafka Streams-toepassings te bou. Die boek se bronkode is in Java 8 geskryf en maak baie gebruik van die Java 8 lambda-uitdrukkingsintaksis, so om te weet hoe om met lambda-funksies te werk (selfs in 'n ander programmeertaal) sal handig te pas kom.

Uittreksel. 5.3. Aggregasie en venster bedrywighede

In hierdie afdeling gaan ons verder om die mees belowende dele van Kafka Streams te verken. Tot dusver het ons die volgende aspekte van Kafka Streams gedek:

  • skepping van verwerkingstopologie;
  • gebruik van staat in stroomtoepassings;
  • die uitvoering van datastroomverbindings;
  • verskille tussen gebeurtenisstrome (KStream) en opdateringstrome (KTable).

In die volgende voorbeelde sal ons al hierdie elemente bymekaar bring. Jy sal ook leer oor windowing, nog 'n wonderlike kenmerk van stroomtoepassings. Ons eerste voorbeeld sal 'n eenvoudige samevoeging wees.

5.3.1. Samevoeging van aandeelverkope volgens industrie

Aggregasie en groepering is noodsaaklike instrumente wanneer daar met stroomdata gewerk word. Ondersoek van individuele rekords soos dit ontvang word, is dikwels onvoldoende. Om bykomende inligting uit data te onttrek, is dit nodig om dit te groepeer en te kombineer.

In hierdie voorbeeld sal jy die kostuum van 'n daghandelaar aantrek wat die verkoopsvolume van aandele van maatskappye in verskeie bedrywe moet dop. Spesifiek, jy stel belang in die vyf maatskappye met die grootste aandeelverkope in elke bedryf.

Sodanige samevoeging sal die volgende verskeie stappe vereis om die data in die verlangde vorm te vertaal (in algemene terme praat).

  1. Skep 'n onderwerp-gebaseerde bron wat rou voorraad handel inligting publiseer. Ons sal 'n voorwerp van tipe StockTransaction moet karteer na 'n voorwerp van tipe ShareVolume. Die punt is dat die StockTransaction-voorwerp verkoopsmetadata bevat, maar ons benodig net data oor die aantal aandele wat verkoop word.
  2. Groepeer ShareVolume-data volgens voorraadsimbool. Sodra gegroepeer volgens simbool, kan jy hierdie data invou in subtotale van voorraadverkope volumes. Dit is opmerklik dat die KStream.groupBy-metode 'n instansie van tipe KGroupedStream terugstuur. En jy kan 'n KTable-instansie kry deur die KGroupedStream.reduce-metode verder te noem.

Wat is die KGroupedStream-koppelvlak

Die KStream.groupBy- en KStream.groupByKey-metodes gee 'n KGroupedStream-instansie terug. KGroupedStream is 'n intermediêre voorstelling van die stroom gebeurtenisse na groepering volgens sleutels. Dit is glad nie ontwerp om direk met hom te werk nie. In plaas daarvan word KGroupedStream gebruik vir samevoegingsbewerkings wat altyd 'n KTable tot gevolg het. En aangesien die resultaat van die samevoegingsoperasies 'n KTable is en hulle 'n staatswinkel gebruik, is dit moontlik dat nie alle opdaterings in die resultaat verder in die pyplyn gestuur word nie.

Die KTable.groupBy-metode gee 'n soortgelyke KGroupedTable terug - 'n intermediêre voorstelling van die stroom opdaterings, hergroepeer deur sleutel.

Kom ons neem 'n kort pouse en kyk na Fig. 5.9, wat wys wat ons bereik het. Hierdie topologie behoort reeds vir jou baie bekend te wees.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Kom ons kyk nou na die kode vir hierdie topologie (dit kan gevind word in die lêer src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lysing 5.2).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Bogenoemde kode is opvallend vir sy beknoptheid en groot volume aksies wat in verskeie reëls uitgevoer word. Jy sal dalk iets nuuts in die eerste parameter van die builder.stream-metode opmerk: die waarde van die AutoOffsetReset.EARLIEST opgesomde tipe (daar is ook LATEST), wat met die Consumed.withOffsetResetPolicy-metode gestel word. Hierdie opgesomde tipe kan gebruik word om 'n offset-terugstellingstrategie vir elke KStream of KTable te spesifiseer, en geniet voorkeur bo die offset-terugstellinginstelling vanaf die konfigurasie.

GroupByKey en GroupBy

Die KStream-koppelvlak het twee metodes om rekords te groepeer: GroupByKey en GroupBy. Albei gee 'n KGroupedTable terug, so jy wonder dalk wat die verskil tussen hulle is en wanneer om watter een te gebruik?

Die GroupByKey-metode word gebruik wanneer die sleutels in die KStream reeds nie leeg is nie. En die belangrikste is dat die vlag "vereis herverdeling" nooit gestel is nie.

Die GroupBy-metode neem aan dat jy die groeperingsleutels verander het, dus is die herverdelingsvlag op waar gestel. Om aansluitings, samevoegings, ens. na die GroupBy-metode uit te voer, sal outomatiese herpartisionering tot gevolg hê.
Opsomming: Jy moet GroupByKey gebruik, nie GroupBy nie, waar moontlik.

Dit is duidelik wat die mapValues ​​en groupBy-metodes doen, so kom ons kyk na die som()-metode (gevind in src/main/java/bbejeck/model/ShareVolume.java) (Lysing 5.3).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Die ShareVolume.sum-metode gee die subtotaal van die aandeelverkoopvolume terug, en die resultaat van die hele berekeningsketting is 'n KTable-objek . Nou verstaan ​​jy watter rol KTable speel. Wanneer ShareVolume-voorwerpe aankom, word die nuutste bygewerkte opdatering in die ooreenstemmende KTable-voorwerp gestoor. Dit is belangrik om te onthou dat alle opdaterings in die vorige shareVolumeKTable weerspieël word, maar nie almal word verder gestuur nie.

Ons gebruik dan hierdie KTabel om saam te voeg (volgens die aantal aandele wat verhandel word) om by die vyf maatskappye uit te kom met die hoogste volumes aandele wat in elke bedryf verhandel word. Ons optrede in hierdie geval sal soortgelyk wees aan dié vir die eerste samevoeging.

  1. Voer 'n ander groepDeur-operasie uit om individuele ShareVolume-objekte volgens industrie te groepeer.
  2. Begin om ShareVolume-voorwerpe op te som. Hierdie keer is die samevoegingsvoorwerp 'n vaste-grootte prioriteitsry. In hierdie vaste-grootte tou word slegs die vyf maatskappye met die grootste hoeveelhede aandele verkoop behou.
  3. Karteer die toue van die vorige paragraaf na 'n stringwaarde en gee die top vyf mees verhandelde aandele volgens nommer volgens bedryf.
  4. Skryf die resultate in stringvorm by die onderwerp neer.

In Fig. Figuur 5.10 toon die datavloei-topologiegrafiek. Soos u kan sien, is die tweede rondte van verwerking redelik eenvoudig.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Noudat ons 'n duidelike begrip van die struktuur van hierdie tweede rondte van verwerking het, kan ons na die bronkode daarvan wend (jy sal dit vind in die lêer src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lysing 5.4) .

Hierdie inisialiseerder bevat 'n vasteWou-veranderlike. Dit is 'n pasgemaakte voorwerp wat 'n adapter vir java.util.TreeSet is wat gebruik word om die top N resultate op te spoor in dalende volgorde van aandele wat verhandel word.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Jy het reeds die groupBy- en mapValues-oproepe gesien, so ons sal nie daarop ingaan nie (ons roep die KTable.toStream-metode omdat die KTable.print-metode afgekeur is). Maar jy het nog nie die KTable-weergawe van aggregate() gesien nie, so ons sal 'n bietjie tyd spandeer om dit te bespreek.

Soos u onthou, is KTable anders deurdat rekords met dieselfde sleutel as opdaterings beskou word. KTable vervang die ou inskrywing met die nuwe een. Aggregasie werk op 'n soortgelyke manier: die jongste rekords met dieselfde sleutel word saamgevoeg. Wanneer 'n inskrywing aankom, word dit by 'n instansie van die FixedSizePriorityQueue-klas gevoeg deur 'n adder te gebruik (die tweede parameter in die oproep na die saamgevoegde metode), maar as 'n ander inskrywing reeds bestaan ​​met dieselfde sleutel, dan word die ou inskrywing verwyder met die aftrekker (die derde parameter in die oproep na die totale metode).

Dit beteken alles dat ons aggregator, FixedSizePriorityQueue, glad nie alle waardes met een sleutel saamvoeg nie, maar 'n rollende som van N hoeveelhede van die mees verhandelde soorte aandele stoor. Elke inkomende inskrywing bevat die totale aantal aandele wat tot dusver verkoop is. KTable sal jou inligting gee oor watter aandele tans die meeste verkoop, deurlopende samevoeging van elke opdatering is nie nodig nie.

Ons het geleer om twee belangrike dinge te doen:

  • groepeer waardes in KTable volgens hul gemeenskaplike sleutel;
  • voer nuttige bewerkings uit soos oprol en samevoeging op hierdie gegroepeerde waardes.

Om te weet hoe om hierdie bewerkings uit te voer, is belangrik om die betekenis te verstaan ​​van die data wat deur 'n Kafka Streams-toepassing beweeg en om te verstaan ​​watter inligting dit bevat.

Ons het ook van die sleutelbegrippe wat vroeër in hierdie boek bespreek is, bymekaar gebring. In Hoofstuk 4 het ons die belangrikheid van foutverdraagsame, plaaslike staat in 'n stroomtoepassing bespreek. Die eerste voorbeeld in hierdie hoofstuk het gewys hoekom plaaslike staat so belangrik is—dit gee jou die vermoë om tred te hou met watter inligting jy reeds gesien het. Plaaslike toegang vermy netwerkvertragings, wat die toepassing meer doeltreffend en foutverdraagsaam maak.

Wanneer u enige oprol- of samevoegingsbewerking uitvoer, moet u die naam van die staatswinkel spesifiseer. Die vou- en totale bewerkings gee 'n KTable-instansie terug, en die KTable gebruik 'n staatswinkel om ou resultate met nuwes te vervang. Soos u gesien het, word nie alle opdaterings verder in die pyplyn gestuur nie, en dit is belangrik, aangesien samevoegingsbewerkings ontwerp is om opsommende inligting te verkry. As jy nie die plaaslike staat toepas nie, sal KTable al die resultate van samevoeging en vermindering verder stuur.

Vervolgens sal ons kyk na die uitvoer van bewerkings soos samevoeging binne 'n spesifieke tydperk - sogenaamde vensterbewerkings.

5.3.2. Vensterbedrywighede

In die vorige afdeling het ons glykonvolusie en samevoeging bekendgestel. Die toepassing het 'n deurlopende oprol van voorraadverkopevolume uitgevoer, gevolg deur samevoeging van die vyf mees verhandelde aandele op die beurs.

Soms is sulke voortdurende samevoeging en konvolusie van resultate nodig. En soms moet u slegs operasies oor 'n gegewe tydperk uitvoer. Byvoorbeeld, om te bereken hoeveel ruiltransaksies in die laaste 10 minute met die aandele van 'n spesifieke maatskappy gemaak is. Of hoeveel gebruikers die afgelope 15 minute op 'n nuwe banieradvertensie geklik het. 'n Toepassing kan sulke bewerkings herhaaldelik uitvoer, maar met resultate wat slegs verband hou met gespesifiseerde tydintervalle (tydvensters).

Tel ruiltransaksies deur koper

In die volgende voorbeeld sal ons voorraadtransaksies oor verskeie handelaars dop - óf groot organisasies óf slim individuele finansiers.

Daar is twee moontlike redes vir hierdie dop. Een daarvan is die behoefte om te weet wat die markleiers koop/verkoop. As hierdie groot spelers en gesofistikeerde beleggers 'n geleentheid vir hulself sien, maak dit sin om hul strategie te volg. Die tweede rede is die begeerte om enige moontlike tekens van onwettige transaksies met behulp van binne-inligting raak te sien. Om dit te doen, sal jy die korrelasie van groot verkoopspieke met belangrike persvrystellings moet ontleed.

Hierdie opsporing bestaan ​​uit die volgende stappe:

  • skep 'n draad vir lees uit die voorraadtransaksie-onderwerp;
  • groepering van inkomende rekords volgens koper-ID en voorraadsimbool. Die oproep van die groupBy-metode gee 'n instansie van die KGroupedStream-klas terug;
  • Die KGroupedStream.windowedBy-metode gee 'n datastroom terug wat beperk is tot 'n tydvenster, wat venstersamevoeging moontlik maak. Afhangende van die venstertipe, word óf 'n TimeWindowedKStream óf 'n SessionWindowedKStream teruggestuur;
  • transaksietelling vir die samevoegingsoperasie. Die gevensterde datavloei bepaal of 'n bepaalde rekord in hierdie telling in ag geneem word;
  • om resultate vir 'n onderwerp te skryf of dit tydens ontwikkeling na die konsole uit te voer.

Die topologie van hierdie toepassing is eenvoudig, maar 'n duidelike prentjie daarvan sal nuttig wees. Kom ons kyk na Fig. 5.11.

Vervolgens kyk ons ​​na die funksionaliteit van vensterbewerkings en die ooreenstemmende kode.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"

Tipes vensters

Daar is drie tipes vensters in Kafka Streams:

  • sessie;
  • "tuimel" (tuimel);
  • gly / "spring" (gly / huppel).

Watter een om te kies hang af van jou besigheidsvereistes. Tuimel- en springvensters is tydsbeperk, terwyl sessievensters deur gebruikeraktiwiteit beperk word—die duur van die sessie(s) word uitsluitlik bepaal deur hoe aktief die gebruiker is. Die belangrikste ding om te onthou is dat alle venstertipes gebaseer is op die datum-/tydstempels van die inskrywings, nie die stelseltyd nie.

Vervolgens implementeer ons ons topologie met elk van die venstertipes. Die volledige kode sal slegs in die eerste voorbeeld gegee word vir ander tipes vensters sal niks verander nie, behalwe die tipe vensterbewerking.

Sessie vensters

Sessievensters verskil baie van alle ander soorte vensters. Hulle word nie soseer deur tyd beperk nie as deur die aktiwiteit van die gebruiker (of die aktiwiteit van die entiteit wat jy graag wil dop). Sessievensters word deur periodes van onaktiwiteit afgebaken.

Figuur 5.12 illustreer die konsep van sessievensters. Die kleiner sessie sal saamsmelt met die sessie aan sy linkerkant. En die sessie aan die regterkant sal apart wees omdat dit 'n lang tydperk van onaktiwiteit volg. Sessievensters is gebaseer op gebruikeraktiwiteit, maar gebruik datum-/tydstempels van inskrywings om te bepaal aan watter sessie die inskrywing behoort.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"

Gebruik sessievensters om ruiltransaksies op te spoor

Kom ons gebruik sessievensters om inligting oor ruiltransaksies vas te lê. Die implementering van sessievensters word in Listing 5.5 gewys (wat gevind kan word in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Jy het reeds die meeste van die bewerkings in hierdie topologie gesien, so dit is nie nodig om weer hierna te kyk nie. Maar daar is ook verskeie nuwe elemente hier, wat ons nou sal bespreek.

Enige groupBy-bewerking voer tipies 'n soort samevoegingsbewerking uit (samevoeging, oprol of tel). U kan óf kumulatiewe samevoeging met 'n lopende totaal uitvoer, of venstersamevoeging, wat rekords binne 'n gespesifiseerde tydvenster in ag neem.

Die kode in Notering 5.5 tel die aantal transaksies binne sessievensters. In Fig. 5.13 hierdie aksies word stap vir stap ontleed.

Deur windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) te roep skep ons 'n sessievenster met 'n onaktiwiteitsinterval van 20 sekondes en 'n volhardingsinterval van 15 minute. 'n Onaktiewe interval van 20 sekondes beteken dat die toepassing enige inskrywing sal insluit wat binne 20 sekondes na die einde of begin van die huidige sessie by die huidige (aktiewe) sessie aankom.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Vervolgens spesifiseer ons watter samevoegingsbewerking om in die sessievenster uit te voer - tel in hierdie geval. As die inkomende skrywe verder gaan as die ledige interval (aan weerskante van die datum/tydstempel), dan skep die toepassing 'n nuwe sessie. 'n Volhardingsinterval beteken om 'n sessie vir 'n sekere tyd lewendig te hou en laat data toe wat verder gaan as die sessie se ledige tydperk, maar steeds aangeheg kan word. Daarbenewens stem die begin en einde van die nuwe sessie as gevolg van die samesmelting ooreen met die vroegste en jongste datum-/tydstempels.

Kom ons kyk na 'n paar inskrywings van die telmetode om te sien hoe sessies werk (Tabel 5.1).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Wanneer rekords aankom, soek ons ​​na bestaande sessies met dieselfde sleutel, 'n eindtyd minder as die huidige datum/tydstempel - onaktiwiteitsinterval, en 'n begintyd groter as die huidige datum/tydstempel + onaktiwiteitsinterval. Met inagneming hiervan, vier inskrywings uit tabel. 5.1 word soos volg in 'n enkele sessie saamgevoeg.

1. Rekord 1 kom eerste, so die begintyd is gelyk aan die eindtyd en is 00:00:00.

2. Vervolgens kom inskrywing 2, en ons soek sessies wat nie vroeër as 23:59:55 eindig nie en nie later as 00:00:35 begin nie. Ons vind rekord 1 en kombineer sessies 1 en 2. Ons neem die begintyd van sessie 1 (vroeër) en die eindtyd van sessie 2 (later), sodat ons nuwe sessie om 00:00:00 begin en om 00 eindig: 00:15.

3. Rekord 3 kom, ons soek sessies tussen 00:00:30 en 00:01:10 en kry geen. Voeg 'n tweede sessie by vir die sleutel 123-345-654,FFBE, wat om 00:00:50 begin en eindig.

4. Rekord 4 arriveer en ons soek sessies tussen 23:59:45 en 00:00:25. Hierdie keer word beide sessies 1 en 2 gevind. Al drie sessies word in een gekombineer, met 'n begintyd van 00:00:00 en 'n eindtyd van 00:00:15.

Hier is 'n paar belangrike dinge om in gedagte te hou uit hierdie afdeling:

  • sessies is nie vensters van vaste grootte nie. Die duur van 'n sessie word bepaal deur die aktiwiteit binne 'n gegewe tydperk;
  • Die datum-/tydstempels in die data bepaal of die gebeurtenis binne 'n bestaande sessie of gedurende 'n ledige tydperk val.

Vervolgens sal ons die volgende tipe venster bespreek - "tuimel" vensters.

"Tuimelende" vensters

Tuimelende vensters vang gebeurtenisse vas wat binne 'n sekere tydperk val. Stel jou voor dat jy elke 20 sekondes al die voorraadtransaksies van 'n sekere maatskappy moet vaslê, sodat jy al die gebeure gedurende daardie tydperk versamel. Aan die einde van die 20-sekonde-interval rol die venster om en skuif na 'n nuwe 20-sekonde-waarnemingsinterval. Figuur 5.14 illustreer hierdie situasie.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Soos u kan sien, is alle gebeurtenisse wat in die laaste 20 sekondes ontvang is in die venster ingesluit. Aan die einde van hierdie tydperk word 'n nuwe venster geskep.

Lys 5.6 toon kode wat die gebruik van tuimelvensters demonstreer om voorraadtransaksies elke 20 sekondes vas te vang (gevind in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Met hierdie klein verandering aan die TimeWindows.of metode oproep, kan jy 'n tuimelvenster gebruik. Hierdie voorbeeld noem nie die tot() metode nie, dus sal die verstek retensie-interval van 24 uur gebruik word.

Uiteindelik is dit tyd om aan te gaan na die laaste van die venster-opsies - "hopping" vensters.

Glyende ("springende") vensters

Skuif-/springvensters is soortgelyk aan tuimelvensters, maar met 'n effense verskil. Skuifvensters wag nie tot die einde van die tydinterval voordat 'n nuwe venster geskep word om onlangse gebeure te verwerk nie. Hulle begin nuwe berekeninge na 'n waginterval minder as die vensterduur.

Om die verskille tussen "tuimel" en "spring" vensters te illustreer, kom ons gaan terug na die voorbeeld van die tel van voorraadtransaksies. Ons doelwit is steeds om die aantal transaksies te tel, maar ons wil nie heeltyd wag voordat ons die teller opdateer nie. In plaas daarvan sal ons die teller met korter tussenposes opdateer. Ons sal byvoorbeeld steeds die aantal transaksies elke 20 sekondes tel, maar die teller elke 5 sekondes opdateer, soos in Fig. 5.15. In hierdie geval het ons drie resultaatvensters met oorvleuelende data.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Lys 5.7 toon die kode vir die definisie van skuifvensters (gevind in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
'n Tuimelvenster kan omgeskakel word na 'n springvenster deur 'n oproep by die advanceBy()-metode by te voeg. In die voorbeeld wat gewys word, is die spaarinterval 15 minute.

Jy het in hierdie afdeling gesien hoe om samevoegingsresultate tot tydvensters te beperk. In die besonder wil ek hê jy moet die volgende drie dinge uit hierdie afdeling onthou:

  • die grootte van sessievensters word nie beperk deur tydperk nie, maar deur gebruikeraktiwiteit;
  • "Tumble" vensters gee 'n idee van gebeure binne 'n gegewe tydperk;
  • Die tydsduur van springvensters is vas, maar hulle word gereeld opgedateer en kan oorvleuelende inskrywings in alle vensters bevat.

Vervolgens sal ons leer hoe om 'n KTable terug te skakel na 'n KSream vir 'n verbinding.

5.3.3. Verbind KStream en KTable-voorwerpe

In Hoofstuk 4 het ons die koppeling van twee KStream-voorwerpe bespreek. Nou moet ons leer hoe om KTable en KStream te verbind. Dit kan nodig wees om die volgende eenvoudige rede. KStream is 'n stroom rekords, en KTable is 'n stroom rekordopdaterings, maar soms wil jy dalk addisionele konteks by die rekordstroom voeg deur opdaterings vanaf die KTable te gebruik.

Kom ons neem data oor die aantal aandelebeurstransaksies en kombineer dit met aandelebeursnuus vir die betrokke bedrywe. Hier is wat jy moet doen om dit te bereik gegewe die kode wat jy reeds het.

  1. Omskep 'n KTable-voorwerp met data oor die aantal voorraadtransaksies in 'n KStream, gevolg deur die sleutel te vervang met die sleutel wat die industriesektor aandui wat met hierdie voorraadsimbool ooreenstem.
  2. Skep 'n KTable-voorwerp wat data van 'n onderwerp met aandelebeursnuus lees. Hierdie nuwe KTable sal volgens nywerheidsektor gekategoriseer word.
  3. Koppel nuusopdaterings met inligting oor die aantal aandelebeurstransaksies per bedryfsektor.

Kom ons kyk nou hoe om hierdie aksieplan te implementeer.

Skakel KTable om na KStream

Om KTable na KStream om te skakel, moet jy die volgende doen.

  1. Roep die KTable.toStream() metode.
  2. Deur die KStream.map-metode te roep, vervang die sleutel met die industrienaam, en haal dan die TransactionSummary-voorwerp uit die Windowed-instansie.

Ons sal hierdie bewerkings soos volg aanmekaar ketting (die kode kan gevind word in die lêer src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lysing 5.8).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Omdat ons 'n KStream.map-bewerking uitvoer, word die teruggekeerde KStream-instansie outomaties herpartisioneer wanneer dit in 'n verbinding gebruik word.

Ons het die omskakelingsproses voltooi, volgende moet ons 'n KTable-objek skep om voorraadnuus te lees.

Skep 'n KTable vir voorraadnuus

Gelukkig neem die skep van 'n KTable-objek net een reël kode (die kode kan gevind word in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lysing 5.9).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Dit is opmerklik dat geen Serde-voorwerpe vereis word om gespesifiseer te word nie, aangesien string Serdes in die instellings gebruik word. Ook, deur die VROOGSTE opsomming te gebruik, word die tabel heel aan die begin met rekords gevul.

Nou kan ons aanbeweeg na die laaste stap – die verbinding.

Koppel nuusopdaterings met transaksietellingdata

Dit is nie moeilik om 'n verbinding te skep nie. Ons sal 'n linkeraansluiting gebruik indien daar geen voorraadnuus vir die betrokke bedryf is nie (die nodige kode kan gevind word in die lêer src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lysing 5.10).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Hierdie leftJoin-operateur is redelik eenvoudig. Anders as die koppelings in Hoofstuk 4, word die JoinWindow-metode nie gebruik nie, want wanneer 'n KStream-KTable-koppeling gemaak word, is daar net een inskrywing in die KTable vir elke sleutel. So 'n verband is nie in tyd beperk nie: die inskrywing bestaan ​​óf in die KTable óf dit bestaan ​​nie. Sleutel wegneemetes: Deur KTable-voorwerpe te gebruik, kan jy 'n KStream verryk met minder gereeld bygewerkte verwysingsdata.

Nou sal ons kyk na 'n meer doeltreffende manier om geleenthede vanaf KStream te verryk.

5.3.4. GlobalKTable voorwerpe

Soos u verstaan, is daar 'n behoefte om die gebeurtenisstrome te verryk of konteks daarby te voeg. In Hoofstuk 4 het jy die verbinding van twee KStream-voorwerpe gesien, en in die vorige afdeling het jy die verbinding van KStream en KTable gesien. In al hierdie gevalle is dit nodig om die datastroom te herpartisioneer wanneer sleutels na 'n nuwe tipe of waarde gekarteer word. Soms word herpartisionering eksplisiet gedoen, en soms doen Kafka Streams dit outomaties. Die herpartisionering is nodig omdat die sleutels verander het en die rekords in die nuwe partisies moet beland of die koppeling sal nie moontlik wees nie (dit is bespreek in Hoofstuk 4, "Data Herpartisionering" in Afdeling 4.2.4).

Herpartisionering het 'n prys

Herpartisionering kom teen 'n koste - bykomende hulpbronkoste vir die skep van intermediêre onderwerpe, die stoor van duplikaatdata in 'n ander onderwerp; dit beteken ook verhoogde latensie as gevolg van skryf na en lees van daardie onderwerp. Ook, as jy op meer as een aspek of dimensie wil aansluit, moet jy kettingverbinding maak, rekords met nuwe sleutels karteer en die herverdelingsproses weer laat loop.

Koppel aan kleiner datastelle

In sommige gevalle is die volume verwysingsdata wat gekoppel moet word relatief klein, dus kan volledige kopieë daarvan maklik plaaslik op elke nodus pas. Vir situasies soos hierdie bied Kafka Streams die GlobalKTable-klas.

GlobalKTable-gevalle is uniek omdat die toepassing alle data na elk van die nodusse repliseer. En aangesien elkeen van die nodusse al die data het, is dit nie nodig om die gebeurtenisstroom deur die verwysingsdatasleutel te partisieer sodat dit vir alle partisies beskikbaar is nie. U kan ook sleutellose verbindings uitvoer deur GlobalKTable-voorwerpe te gebruik. Kom ons gaan terug na een van die vorige voorbeelde om hierdie moontlikheid te demonstreer.

Koppel KStream-voorwerpe aan GlobalKTable-voorwerpe

In subafdeling 5.3.2 het ons venstersamevoeging van ruiltransaksies deur kopers uitgevoer. Die resultate van hierdie samevoeging het iets soos volg gelyk:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Alhoewel hierdie resultate die doel gedien het, sou dit nuttiger gewees het as die kliënt se naam en volle maatskappynaam ook vertoon is. Om die kliëntnaam en maatskappynaam by te voeg, kan jy normale aansluitings doen, maar jy sal twee sleutelkaarte en herpartisionering moet doen. Met GlobalKTable kan jy die koste van sulke bedrywighede vermy.

Om dit te doen, sal ons die countStream-voorwerp van Listing 5.11 gebruik (die ooreenstemmende kode kan gevind word in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) en dit aan twee GlobalKTable-objekte koppel.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Ons het dit al voorheen bespreek, so ek sal dit nie herhaal nie. Maar ek let daarop dat die kode in die toStream().map-funksie in 'n funksie-objek geabstraheer word in plaas van 'n inlyn-lambda-uitdrukking ter wille van leesbaarheid.

Die volgende stap is om twee gevalle van GlobalKTable te verklaar (die kode wat gewys word kan gevind word in die lêer src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lysing 5.12).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"

Neem asseblief kennis dat onderwerpname beskryf word deur gebruik te maak van opgesomde tipes.

Noudat ons al die komponente gereed het, is al wat oorbly om die kode vir die verbinding te skryf (wat gevind kan word in die lêer src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lysing 5.13).

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Alhoewel daar twee verbindings in hierdie kode is, is hulle vasgeketting omdat nie een van hul resultate afsonderlik gebruik word nie. Die resultate word aan die einde van die hele operasie vertoon.

Wanneer jy die bogenoemde aansluit-operasie uitvoer, sal jy resultate soos hierdie kry:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Die essensie het nie verander nie, maar hierdie resultate lyk meer verstaanbaar.

As jy aftel na Hoofstuk 4, het jy reeds verskeie tipes verbindings in aksie gesien. Hulle word in die tabel gelys. 5.2. Hierdie tabel weerspieël die verbindingsvermoëns vanaf weergawe 1.0.0 van Kafka Streams; Iets kan in toekomstige vrystellings verander.

Die boek "Kafka-strome in aksie. Toepassings en mikrodienste vir intydse werk"
Ten slotte, laat ek jou herinner aan die belangrikste ding: jy kan gebeurtenisstrome (KStream) en strome (KTable) bywerk deur die plaaslike staat te koppel. Daarbenewens, as die grootte van die verwysingsdata nie te groot is nie, kan jy die GlobalKTable-objek gebruik. GlobalKTable repliseer alle partisies na elk van die nodusse van die Kafka Streams-toepassing, om sodoende te verseker dat alle data beskikbaar is, ongeag met watter partisie die sleutel ooreenstem.

Vervolgens sal ons die Kafka Streams-funksie sien, waardeur ons toestandsveranderinge kan waarneem sonder om data van 'n Kafka-onderwerp te verbruik.

5.3.5. Versoekbare staat

Ons het reeds verskeie bewerkings uitgevoer wat staat behels en voer altyd die resultate na die konsole uit (vir ontwikkelingsdoeleindes) of skryf dit na 'n onderwerp (vir produksiedoeleindes). Wanneer u resultate vir 'n onderwerp skryf, moet u 'n Kafka-verbruiker gebruik om dit te sien.

Die lees van data uit hierdie onderwerpe kan as 'n soort gematerialiseerde sienings beskou word. Vir ons take kan ons die definisie van 'n gematerialiseerde siening van Wikipedia gebruik: "... 'n fisiese databasisobjek wat die resultate van 'n navraag bevat. Dit kan byvoorbeeld 'n plaaslike kopie van afgeleë data wees, of 'n subset van die rye en/of kolomme van 'n tabel of aansluitingsresultaat, of 'n spiltabel wat deur samevoeging verkry is” (https://en.wikipedia.org/wiki) /Materialized_view).

Kafka Streams laat jou ook toe om interaktiewe navrae op staatswinkels uit te voer, sodat jy hierdie gematerialiseerde sienings direk kan lees. Dit is belangrik om daarop te let dat die navraag na die staatswinkel 'n leesalleen-bewerking is. Dit verseker dat jy nie hoef te bekommer dat jy per ongeluk staat inkonsekwent maak terwyl jou toepassing data verwerk nie.

Die vermoë om direk navraag te doen oor staatswinkels is belangrik. Dit beteken dat jy dashboard-toepassings kan skep sonder om eers data van die Kafka-verbruiker te hoef te gaan haal. Dit verhoog ook die doeltreffendheid van die toepassing, as gevolg van die feit dat dit nie nodig is om data weer te skryf nie:

  • as gevolg van die ligging van die data, kan hulle vinnig verkry word;
  • duplisering van data word uitgeskakel, aangesien dit nie na eksterne berging geskryf word nie.

Die belangrikste ding wat ek wil hê jy moet onthou, is dat jy die staat direk vanaf die aansoek kan navraag doen. Die moontlikhede wat dit vir jou bied, kan nie oorskat word nie. In plaas daarvan om data van Kafka af te verbruik en rekords in die databasis vir die toepassing te stoor, kan u die staatwinkels navraag doen met dieselfde resultaat. Om direk na staatwinkels te vra, beteken minder kode (geen verbruiker) en minder sagteware (geen behoefte aan 'n databasistabel om resultate te stoor nie).

Ons het baie inligting in hierdie hoofstuk behandel, so ons sal ons bespreking van interaktiewe navrae oor staatswinkels vir 'n oomblik stop. Maar moenie bekommerd wees nie: in Hoofstuk 9 sal ons 'n eenvoudige dashboard-toepassing met interaktiewe navrae skep. Dit sal sommige van die voorbeelde in hierdie en vorige hoofstukke gebruik om interaktiewe navrae te demonstreer en hoe om dit by Kafka Streams-toepassings te voeg.

Opsomming

  • KSream-voorwerpe verteenwoordig strome van gebeurtenisse, vergelykbaar met invoegings in 'n databasis. KTable-objekte verteenwoordig opdateringsstrome, meer soos opdaterings aan 'n databasis. Die grootte van die KTable-voorwerp groei nie, ou rekords word deur nuwes vervang.
  • KTable-objekte word benodig vir samevoegingsbewerkings.
  • Venster laat jou toe om saamgestelde data in tydbakke af te breek.
  • Danksy GlobalKTable-voorwerpe kan u toegang tot verwysingsdata oral in die toepassing verkry, ongeag die partisie.
  • Verbindings tussen KStream, KTable en GlobalKTable voorwerpe is moontlik.

Tot dusver het ons daarop gefokus om Kafka Streams-toepassings te bou deur die hoëvlak DSL KStream te gebruik. Alhoewel die hoëvlakbenadering jou toelaat om netjiese en bondige programme te skep, verteenwoordig die gebruik daarvan 'n sekere kompromie. Werk met DSL KStream beteken om kode meer bondig te maak ten koste van minder beheer. In die volgende hoofstuk kyk ons ​​na die lae-vlak hanteerder node API en probeer ander afwegings. Programme sal langer word as wat dit tot dusver was, maar ons sal byna enige hanteerdernodus kan skep wat ons nodig mag hê.

→ Meer besonderhede oor die boek kan gevind word by uitgewer se webwerf

→ Vir Khabrozhiteli, 'n 25% afslag op die koepon - Kafka-strome

→ By betaling vir die papierweergawe van die boek sal 'n elektroniese boek per e-pos gestuur word.

Bron: will.com

Voeg 'n opmerking