Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid" Hei Khabro-beboere! Denne boken passer for alle utviklere som ønsker å forstå trådbehandling. Å forstå distribuert programmering vil hjelpe deg bedre å forstå Kafka og Kafka Streams. Det ville være fint å vite selve Kafka-rammen, men dette er ikke nødvendig: ​​Jeg vil fortelle deg alt du trenger. Erfarne Kafka-utviklere og nybegynnere vil lære hvordan man lager interessante strømbehandlingsapplikasjoner ved å bruke Kafka Streams-biblioteket i denne boken. Middels og avanserte Java-utviklere som allerede er kjent med konsepter som serialisering vil lære å bruke ferdighetene sine til å lage Kafka Streams-applikasjoner. Bokens kildekode er skrevet i Java 8 og gjør betydelig bruk av Java 8 lambda-uttrykkssyntaks, så det vil være nyttig å vite hvordan man jobber med lambda-funksjoner (selv i et annet programmeringsspråk).

Utdrag. 5.3. Aggregering og vindusoperasjoner

I denne delen går vi videre for å utforske de mest lovende delene av Kafka Streams. Så langt har vi dekket følgende aspekter ved Kafka Streams:

  • lage en behandlingstopologi;
  • bruk av tilstand i strømmeapplikasjoner;
  • utføre datastrømtilkoblinger;
  • forskjeller mellom hendelsesstrømmer (KStream) og oppdateringsstrømmer (KTable).

I de følgende eksemplene vil vi bringe alle disse elementene sammen. Du vil også lære om vindu, en annen flott funksjon i strømmeapplikasjoner. Vårt første eksempel vil være en enkel aggregering.

5.3.1. Aggregering av lagersalg etter industrisektor

Aggregering og gruppering er viktige verktøy når du arbeider med strømming av data. Undersøkelse av individuelle poster etter hvert som de mottas er ofte utilstrekkelig. For å trekke ut tilleggsinformasjon fra data, er det nødvendig å gruppere og kombinere dem.

I dette eksemplet tar du på deg kostymet til en daghandler som trenger å spore salgsvolumet til aksjer til selskaper i flere bransjer. Helt konkret er du interessert i de fem selskapene med størst aksjesalg i hver bransje.

Slik aggregering vil kreve følgende flere trinn for å oversette dataene til ønsket form (snakker i generelle termer).

  1. Lag en emnebasert kilde som publiserer rå informasjon om aksjehandel. Vi må kartlegge et objekt av typen StockTransaction til et objekt av typen ShareVolume. Poenget er at StockTransaction-objektet inneholder salgsmetadata, men vi trenger kun data om antall aksjer som selges.
  2. Grupper ShareVolum-data etter aksjesymbol. Når du er gruppert etter symbol, kan du kollapse disse dataene til delsummer av lagersalgsvolumer. Det er verdt å merke seg at KStream.groupBy-metoden returnerer en forekomst av typen KGroupedStream. Og du kan få en KTable-forekomst ved å kalle KGroupedStream.reduce-metoden ytterligere.

Hva er KGroupedStream-grensesnittet

Metodene KStream.groupBy og KStream.groupByKey returnerer en forekomst av KGroupedStream. KGroupedStream er en mellomrepresentasjon av en strøm av hendelser etter gruppering etter nøkler. Den er ikke i det hele tatt ment for direkte arbeid med den. I stedet brukes KGroupedStream til aggregeringsoperasjoner, som alltid resulterer i en KT-tabell. Og siden resultatet av aggregeringsoperasjoner er en KTable og de bruker en statlig butikk, er det mulig at ikke alle oppdateringer som et resultat sendes lenger ned i rørledningen.

KTable.groupBy-metoden returnerer en lignende KGroupedTable - en mellomrepresentasjon av strømmen av oppdateringer, omgruppert etter nøkkel.

La oss ta en kort pause og se på fig. 5.9, som viser hva vi har fått til. Denne topologien burde allerede være veldig kjent for deg.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
La oss nå se på koden for denne topologien (den kan finnes i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Den gitte koden utmerker seg ved sin korthet og det store volumet av handlinger utført på flere linjer. Du vil kanskje legge merke til noe nytt i den første parameteren i builder.stream-metoden: en verdi av enum-typen AutoOffsetReset.EARLIEST (det er også en LATEST), satt ved å bruke Consumed.withOffsetResetPolicy-metoden. Denne oppregningstypen kan brukes til å spesifisere en offset-tilbakestillingsstrategi for hver KStream eller KTable og har forrang over offset-tilbakestillingsalternativet fra konfigurasjonen.

GroupByKey og GroupBy

KStream-grensesnittet har to metoder for gruppering av poster: GroupByKey og GroupBy. Begge returnerer en KGroupedTable, så du lurer kanskje på hva forskjellen er mellom dem og når du skal bruke hvilken?

GroupByKey-metoden brukes når nøklene i KStream allerede er tomme. Og viktigst av alt, flagget "krever ompartisjonering" ble aldri satt.

GroupBy-metoden forutsetter at du har endret grupperingsnøklene, slik at ompartisjonsflagget er satt til sant. Å utføre sammenføyninger, aggregeringer osv. etter GroupBy-metoden vil resultere i automatisk re-partisjonering.
Sammendrag: Når det er mulig, bør du bruke GroupByKey i stedet for GroupBy.

Det er tydelig hva mapValues ​​og groupBy-metodene gjør, så la oss ta en titt på sum()-metoden (finnes i src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
ShareVolume.sum-metoden returnerer den løpende summen av lagersalgsvolumet, og resultatet av hele kjeden av beregninger er et KTable-objekt . Nå forstår du rollen KTable spiller. Når ShareVolume-objekter ankommer, lagrer det tilsvarende KTable-objektet den siste gjeldende oppdateringen. Det er viktig å huske at alle oppdateringer gjenspeiles i forrige shareVolumeKTable, men ikke alle sendes videre.

Vi bruker deretter denne KT-tabellen til å aggregere (etter antall omsatte aksjer) for å komme frem til de fem selskapene med høyest omsatt volum av aksjer i hver bransje. Handlingene våre i dette tilfellet vil være lik de for den første aggregeringen.

  1. Utfør en annen groupBy-operasjon for å gruppere individuelle ShareVolume-objekter etter bransje.
  2. Begynn å oppsummere ShareVolume-objekter. Denne gangen er aggregeringsobjektet en prioritetskø med fast størrelse. I denne faststørrelseskøen beholdes kun de fem selskapene med størst antall solgte aksjer.
  3. Kartlegg køene fra forrige avsnitt til en strengverdi og returner de fem mest omsatte aksjene etter nummer etter bransje.
  4. Skriv resultatene i strengform til emnet.

I fig. Figur 5.10 viser dataflyttopologigrafen. Som du kan se, er den andre behandlingsrunden ganske enkel.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Nå som vi har en klar forståelse av strukturen til denne andre behandlingsrunden, kan vi gå til kildekoden (du finner den i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Denne initialisatoren inneholder en fixedQueue-variabel. Dette er et tilpasset objekt som er en adapter for java.util.TreeSet som brukes til å spore de øverste N-resultatene i synkende rekkefølge av aksjer som handles.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Du har allerede sett groupBy- og mapValues-kallene, så vi vil ikke gå inn på disse (vi kaller KTable.toStream-metoden fordi KTable.print-metoden er utdatert). Men du har ikke sett KTable-versjonen av aggregate() ennå, så vi skal bruke litt tid på å diskutere det.

Som du husker, det som gjør KTable annerledes er at poster med de samme nøklene anses som oppdateringer. KTable erstatter den gamle oppføringen med en ny. Aggregering skjer på lignende måte: de siste postene med samme nøkkel aggregeres. Når en post kommer, blir den lagt til FixedSizePriorityQueue-klasseforekomsten ved å bruke en adder (andre parameter i det samlede metodekallet), men hvis en annen post allerede eksisterer med samme nøkkel, fjernes den gamle posten ved hjelp av en subtraktor (tredje parameter i det samlede metodekallet).

Alt dette betyr at aggregatoren vår, FixedSizePriorityQueue, ikke samler alle verdier med én nøkkel, men lagrer en bevegelig sum av mengdene av de N mest omsatte aksjetypene. Hver innkommende oppføring inneholder det totale antallet solgte aksjer så langt. KTable vil gi deg informasjon om hvilke selskapers aksjer som for tiden er mest omsatt, uten å kreve rullende aggregering av hver oppdatering.

Vi lærte å gjøre to viktige ting:

  • grupper verdier i KTable med en felles nøkkel;
  • utføre nyttige operasjoner som sammendrag og aggregering på disse grupperte verdiene.

Å vite hvordan man utfører disse operasjonene er viktig for å forstå betydningen av dataene som beveger seg gjennom en Kafka Streams-applikasjon og forstå hvilken informasjon den inneholder.

Vi har også samlet noen av nøkkelbegrepene diskutert tidligere i denne boken. I kapittel 4 diskuterte vi hvor feiltolerant, lokal stat er viktig for en strømmeapplikasjon. Det første eksemplet i dette kapittelet viste hvorfor lokal stat er så viktig – det gir deg muligheten til å holde styr på hvilken informasjon du allerede har sett. Lokal tilgang unngår nettverksforsinkelser, noe som gjør applikasjonen mer ytelsessterk og feilbestandig.

Når du utfører en samle- eller aggregeringsoperasjon, må du spesifisere navnet på delstatsbutikken. Samle- og aggregeringsoperasjonene returnerer en KTable-forekomst, og KTable bruker tilstandslagring for å erstatte gamle resultater med nye. Som du har sett, sendes ikke alle oppdateringer i rørledningen, og dette er viktig fordi aggregeringsoperasjoner er designet for å produsere sammendragsinformasjon. Hvis du ikke bruker lokal stat, vil KTable videresende alle aggregerings- og sammendragsresultater.

Deretter skal vi se på å utføre operasjoner som aggregering innenfor en bestemt tidsperiode – såkalte vindusoperasjoner.

5.3.2. Vindusoperasjoner

I den forrige delen introduserte vi glidende konvolusjon og aggregering. Applikasjonen utførte en kontinuerlig opprulling av aksjesalgsvolum, etterfulgt av aggregering av de fem mest omsatte aksjene på børsen.

Noen ganger er slik kontinuerlig aggregering og opprulling av resultater nødvendig. Og noen ganger trenger du bare å utføre operasjoner over en gitt tidsperiode. Regn for eksempel ut hvor mange byttetransaksjoner som ble gjort med aksjer i et bestemt selskap i løpet av de siste 10 minuttene. Eller hvor mange brukere som har klikket på et nytt reklamebanner de siste 15 minuttene. En applikasjon kan utføre slike operasjoner flere ganger, men med resultater som bare gjelder for spesifiserte tidsperioder (tidsvinduer).

Telling av byttetransaksjoner av kjøper

I det neste eksemplet vil vi spore aksjetransaksjoner på tvers av flere tradere – enten store organisasjoner eller smarte individuelle finansmenn.

Det er to mulige årsaker til denne sporingen. En av dem er behovet for å vite hva markedsledere kjøper/selger. Hvis disse store aktørene og sofistikerte investorene ser muligheter, er det fornuftig å følge deres strategi. Den andre grunnen er ønsket om å oppdage eventuelle tegn på ulovlig innsidehandel. For å gjøre dette, må du analysere sammenhengen mellom store salgstopper med viktige pressemeldinger.

Slik sporing består av følgende trinn:

  • lage en strøm for lesing fra emnet om aksjetransaksjoner;
  • gruppering av innkommende poster etter kjøper-ID og aksjesymbol. Å kalle groupBy-metoden returnerer en forekomst av KGroupedStream-klassen;
  • Metoden KGroupedStream.windowedBy returnerer en datastrøm begrenset til et tidsvindu, som tillater aggregering i vinduer. Avhengig av vindustypen, returneres enten en TimeWindowedKStream eller en SessionWindowedKStream;
  • transaksjonsantallet for aggregeringsoperasjonen. Vinduet dataflyt bestemmer om en bestemt post tas i betraktning i denne tellingen;
  • skrive resultater til et emne eller sende dem til konsollen under utvikling.

Topologien til denne applikasjonen er enkel, men et klart bilde av det ville være nyttig. La oss ta en titt på fig. 5.11.

Deretter skal vi se på funksjonaliteten til vindusoperasjoner og den tilsvarende koden.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"

Vindustyper

Det er tre typer vinduer i Kafka Streams:

  • økt;
  • «tumbling» (tumbling);
  • skli/hopping.

Hvilken du skal velge avhenger av forretningskravene dine. Tumbling- og hoppvinduer er tidsbegrenset, mens øktvinduer er begrenset av brukeraktivitet - varigheten av økten(e) bestemmes utelukkende av hvor aktiv brukeren er. Det viktigste å huske er at alle vindustyper er basert på dato/tidsstemplene for oppføringene, ikke systemtiden.

Deretter implementerer vi topologien vår med hver av vindustypene. Den fullstendige koden vil bare bli gitt i det første eksemplet; for andre typer vinduer vil ingenting endres bortsett fra typen vindusoperasjon.

Sesjonsvinduer

Sesjonsvinduer er svært forskjellige fra alle andre typer vinduer. De begrenses ikke så mye av tid som av aktiviteten til brukeren (eller aktiviteten til enheten du ønsker å spore). Øktvinduer er avgrenset av perioder med inaktivitet.

Figur 5.12 illustrerer konseptet med øktvinduer. Den mindre økten vil slå seg sammen med økten til venstre. Og økten til høyre vil være separat fordi den følger en lang periode med inaktivitet. Sesjonsvinduer er basert på brukeraktivitet, men bruk dato/tidsstempler fra oppføringer for å bestemme hvilken økt oppføringen tilhører.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"

Bruk av øktvinduer for å spore aksjetransaksjoner

La oss bruke øktvinduer til å fange informasjon om utvekslingstransaksjoner. Implementeringen av øktvinduer er vist i Listing 5.5 (som kan finnes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Du har allerede sett de fleste operasjonene i denne topologien, så det er ikke nødvendig å se på dem igjen her. Men det er også flere nye elementer her, som vi nå skal diskutere.

Enhver groupBy-operasjon utfører vanligvis en slags aggregeringsoperasjon (aggregering, sammendrag eller telling). Du kan utføre enten kumulativ aggregering med en løpende total, eller vindusaggregering, som tar hensyn til poster innenfor et spesifisert tidsvindu.

Koden i oppføring 5.5 teller antall transaksjoner innenfor øktvinduer. I fig. 5.13 disse handlingene analyseres trinnvis.

Ved å kalle windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) lager vi et øktvindu med et inaktivitetsintervall på 20 sekunder og et persistensintervall på 15 minutter. Et inaktivt intervall på 20 sekunder betyr at applikasjonen vil inkludere alle oppføringer som kommer innen 20 sekunder etter slutten eller starten av den gjeldende økten til den gjeldende (aktive) økten.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Deretter spesifiserer vi hvilken aggregeringsoperasjon som må utføres i øktvinduet - i dette tilfellet teller. Hvis en innkommende oppføring faller utenfor inaktivitetsvinduet (hver side av dato-/tidsstempelet), oppretter applikasjonen en ny økt. Oppbevaringsintervall betyr å opprettholde en økt i en viss tid og gir mulighet for sene data som strekker seg utover øktens inaktivitetsperiode, men som fortsatt kan legges ved. I tillegg tilsvarer starten og slutten av den nye økten som følge av sammenslåingen det tidligste og siste dato-/tidsstempelet.

La oss se på noen oppføringer fra tellemetoden for å se hvordan økter fungerer (tabell 5.1).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Når poster kommer, ser vi etter eksisterende økter med samme nøkkel, en sluttid mindre enn gjeldende dato/tidsstempel - inaktivitetsintervall, og et starttidspunkt som er større enn gjeldende dato/tidsstempel + inaktivitetsintervall. Tar dette i betraktning, fire oppføringer fra tabellen. 5.1 slås sammen til en enkelt økt som følger.

1. Post 1 kommer først, så starttiden er lik sluttid og er 00:00:00.

2. Deretter kommer oppføring 2, og vi ser etter økter som slutter tidligst 23:59:55 og starter senest 00:00:35. Vi finner rekord 1 og kombinerer økt 1 og 2. Vi tar starttidspunkt for økt 1 (tidligere) og sluttid for økt 2 (senere), slik at vår nye økt starter kl 00:00:00 og slutter kl 00: 00:15.

3. Rekord 3 kommer, vi ser etter økter mellom 00:00:30 og 00:01:10 og finner ingen. Legg til en ekstra økt for nøkkelen 123-345-654,FFBE, som starter og slutter kl. 00:00:50.

4. Rekord 4 kommer og vi ser etter økter mellom 23:59:45 og 00:00:25. Denne gangen finnes både økt 1 og 2. Alle tre øktene er slått sammen til én, med starttid 00:00:00 og sluttid 00:00:15.

Fra det som er beskrevet i denne delen, er det verdt å huske følgende viktige nyanser:

  • økter er ikke vinduer med fast størrelse. Varigheten av en økt bestemmes av aktiviteten innenfor en gitt tidsperiode;
  • Dato-/tidsstemplene i dataene bestemmer om hendelsen faller innenfor en eksisterende økt eller i en inaktiv periode.

Deretter vil vi diskutere den neste typen vindu - "tumbling" vinduer.

"Tumbling" vinduer

Tumbling-vinduer fanger opp hendelser som faller innenfor en viss tidsperiode. Tenk deg at du trenger å fange opp alle aksjetransaksjonene til et bestemt selskap hvert 20. sekund, slik at du samler alle hendelsene i løpet av den tidsperioden. På slutten av 20-sekundersintervallet ruller vinduet over og flytter seg til et nytt 20-sekunders observasjonsintervall. Figur 5.14 illustrerer denne situasjonen.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Som du kan se, er alle hendelser mottatt i løpet av de siste 20 sekundene inkludert i vinduet. På slutten av denne perioden opprettes et nytt vindu.

Oppføring 5.6 viser kode som demonstrerer bruken av tumbling-vinduer for å fange opp aksjetransaksjoner hvert 20. sekund (finnes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Med denne lille endringen til TimeWindows.of-metodekallet, kan du bruke et tumbling-vindu. Dette eksemplet kaller ikke indtil()-metoden, så standard oppbevaringsintervall på 24 timer vil bli brukt.

Til slutt er det på tide å gå videre til det siste av vindusalternativene – «hopping»-vinduer.

Skyve ("hoppende") vinduer

Skyve-/hoppevinduer ligner på rullevinduer, men med en liten forskjell. Skyvevinduer vent ikke til slutten av tidsintervallet før du oppretter et nytt vindu for å behandle nylige hendelser. De starter nye beregninger etter et venteintervall som er mindre enn vinduets varighet.

For å illustrere forskjellene mellom tumbling- og hoppevinduer, la oss gå tilbake til eksemplet med å telle børstransaksjoner. Målet vårt er fortsatt å telle antall transaksjoner, men vi ønsker ikke å vente hele tiden før vi oppdaterer telleren. I stedet vil vi oppdatere telleren med kortere intervaller. For eksempel vil vi fortsatt telle antall transaksjoner hvert 20. sekund, men oppdatere telleren hvert 5. sekund, som vist i fig. 5.15. I dette tilfellet ender vi opp med tre resultatvinduer med overlappende data.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Oppføring 5.7 viser koden for å definere skyvevinduer (finnes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Et tumbling-vindu kan konverteres til et hoppevindu ved å legge til et kall til advanceBy()-metoden. I det viste eksemplet er lagringsintervallet 15 minutter.

Du så i denne delen hvordan du begrenser aggregeringsresultater til tidsvinduer. Spesielt vil jeg at du skal huske følgende tre ting fra denne delen:

  • Størrelsen på øktvinduene er ikke begrenset av tidsperiode, men av brukeraktivitet;
  • «tumbling»-vinduer gir en oversikt over hendelser innenfor et gitt tidsrom;
  • Varigheten av hoppvinduer er fast, men de oppdateres ofte og kan inneholde overlappende oppføringer i alle vinduer.

Deretter lærer vi hvordan du konverterer en KTable tilbake til en KSream for en tilkobling.

5.3.3. Koble sammen KStream- og KTable-objekter

I kapittel 4 diskuterte vi å koble sammen to KStream-objekter. Nå må vi lære å koble KTable og KStream. Dette kan være nødvendig av følgende enkle grunn. KStream er en strøm av poster, og KTable er en strøm av postoppdateringer, men noen ganger vil du kanskje legge til ekstra kontekst til poststrømmen ved å bruke oppdateringer fra KTable.

La oss ta data om antall børstransaksjoner og kombinere dem med børsnyheter for de aktuelle bransjene. Her er hva du må gjøre for å oppnå dette gitt koden du allerede har.

  1. Konverter et KTable-objekt med data om antall aksjetransaksjoner til en KStream, etterfulgt av å erstatte nøkkelen med nøkkelen som indikerer industrisektoren som tilsvarer dette aksjesymbolet.
  2. Lag et KTable-objekt som leser data fra et emne med børsnyheter. Denne nye KT-tabellen vil bli kategorisert etter industrisektor.
  3. Koble sammen nyhetsoppdateringer med informasjon om antall børstransaksjoner etter bransje.

La oss nå se hvordan du implementerer denne handlingsplanen.

Konverter KTable til KStream

For å konvertere KTable til KStream må du gjøre følgende.

  1. Kall opp KTable.toStream()-metoden.
  2. Ved å kalle KStream.map-metoden, erstatt nøkkelen med industrinavnet, og hent deretter TransactionSummary-objektet fra Windowed-forekomsten.

Vi vil lenke disse operasjonene sammen som følger (koden finnes i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Fordi vi utfører en KStream.map-operasjon, blir den returnerte KStream-forekomsten automatisk re-partisjonert når den brukes i en tilkobling.

Vi har fullført konverteringsprosessen, deretter må vi lage et KTable-objekt for å lese aksjenyheter.

Oppretting av KTable for aksjenyheter

Heldigvis tar det å lage et KTable-objekt bare én linje med kode (koden kan finnes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Det er verdt å merke seg at ingen Serde-objekter må spesifiseres, siden streng Serdes brukes i innstillingene. Dessuten, ved å bruke den TIDLIGSTE oppregningen, fylles tabellen med poster helt i begynnelsen.

Nå kan vi gå videre til det siste trinnet - tilkobling.

Koble nyhetsoppdateringer med data for antall transaksjoner

Å opprette en forbindelse er ikke vanskelig. Vi vil bruke en venstre sammenføyning i tilfelle det ikke er aksjenyheter for den aktuelle bransjen (den nødvendige koden finnes i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Denne leftJoin-operatøren er ganske enkel. I motsetning til sammenføyningene i kapittel 4, brukes ikke JoinWindow-metoden fordi når du utfører en KStream-KTable-sammenføyning, er det bare én oppføring i KT-tabellen for hver nøkkel. En slik forbindelse er ikke begrenset i tid: posten er enten i KT-tabellen eller fraværende. Hovedkonklusjonen: ved å bruke KTable-objekter kan du berike KStream med sjeldnere oppdaterte referansedata.

Nå skal vi se på en mer effektiv måte å berike arrangementer fra KStream.

5.3.4. GlobalKTable-objekter

Som du kan se, er det behov for å berike hendelsesstrømmer eller legge til kontekst til dem. I kapittel 4 så du forbindelsene mellom to KStream-objekter, og i forrige avsnitt så du sammenhengen mellom en KStream og en KTable. I alle disse tilfellene er det nødvendig å partisjonere datastrømmen på nytt når nøklene tilordnes en ny type eller verdi. Noen ganger gjøres ompartisjonering eksplisitt, og noen ganger gjør Kafka Streams det automatisk. Re-partisjonering er nødvendig fordi nøklene er endret og postene må havne i nye seksjoner, ellers vil tilkoblingen være umulig (dette ble diskutert i kapittel 4, i avsnittet "Re-partisjonering av data" i underavsnitt 4.2.4).

Re-partisjonering har en kostnad

Re-partisjonering krever kostnader - ekstra ressurskostnader for å lage mellomliggende emner, lagring av dupliserte data i et annet emne; det betyr også økt ventetid på grunn av skriving og lesing fra dette emnet. I tillegg, hvis du trenger å koble sammen på tvers av mer enn ett aspekt eller dimensjon, må du kjede sammenføyningene, kartlegge postene med nye nøkler og kjøre re-partisjoneringsprosessen på nytt.

Koble til mindre datasett

I noen tilfeller er volumet av referansedata som skal kobles til relativt lite, så komplette kopier av det kan enkelt passe lokalt på hver node. For situasjoner som dette tilbyr Kafka Streams GlobalKTable-klassen.

GlobalKTable-forekomster er unike fordi applikasjonen replikerer alle data til hver av nodene. Og siden alle dataene er tilstede på hver node, er det ikke nødvendig å partisjonere hendelsesstrømmen etter referansedatanøkkel slik at den er tilgjengelig for alle partisjoner. Du kan også lage nøkkelfrie sammenføyninger ved å bruke GlobalKTable-objekter. La oss gå tilbake til et av de forrige eksemplene for å demonstrere denne funksjonen.

Koble KStream-objekter til GlobalKTable-objekter

I underavsnitt 5.3.2 utførte vi vindusaggregering av vekslingstransaksjoner fra kjøpere. Resultatene av denne aggregeringen så omtrent slik ut:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Selv om disse resultatene tjente formålet, ville det vært mer nyttig om kundens navn og fulle firmanavn også hadde blitt vist. For å legge til kundenavnet og firmanavnet kan du gjøre vanlige sammenføyninger, men du må gjøre to nøkkeltilordninger og re-partisjonere. Med GlobalKTable kan du unngå kostnadene ved slike operasjoner.

For å gjøre dette, bruker vi countStream-objektet fra Listing 5.11 (den korresponderende koden finnes i src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) og kobler det til to GlobalKTable-objekter.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Vi har allerede diskutert dette før, så jeg vil ikke gjenta det. Men jeg legger merke til at koden i toStream().map-funksjonen abstraheres til et funksjonsobjekt i stedet for et innebygd lambda-uttrykk for lesbarhetens skyld.

Neste trinn er å erklære to forekomster av GlobalKTable (koden som vises kan finnes i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"

Vær oppmerksom på at emnenavn er beskrevet ved hjelp av oppregnede typer.

Nå som vi har alle komponentene klare, gjenstår det bare å skrive koden for tilkoblingen (som finnes i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
Selv om det er to sammenføyninger i denne koden, er de lenket fordi ingen av resultatene deres brukes separat. Resultatene vises på slutten av hele operasjonen.

Når du kjører sammenføyningsoperasjonen ovenfor, vil du få resultater som dette:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Essensen har ikke endret seg, men disse resultatene ser mer tydelige ut.

Hvis du teller ned til kapittel 4, har du allerede sett flere typer forbindelser i aksjon. De er oppført i tabellen. 5.2. Denne tabellen gjenspeiler tilkoblingsmulighetene fra versjon 1.0.0 av Kafka Streams; Noe kan endre seg i fremtidige utgivelser.

Boken «Kafka-strømmer i aksjon. Applikasjoner og mikrotjenester for sanntidsarbeid"
For å avslutte ting, la oss oppsummere det grunnleggende: du kan koble til hendelsesstrømmer (KStream) og oppdatere strømmer (KTable) ved å bruke lokal stat. Alternativt, hvis størrelsen på referansedataene ikke er for store, kan du bruke GlobalKTable-objektet. GlobalKTables replikerer alle partisjoner til hver Kafka Streams-applikasjonsnode, og sikrer at all data er tilgjengelig uavhengig av hvilken partisjon nøkkelen tilsvarer.

Deretter vil vi se Kafka Streams-funksjonen, takket være hvilken vi kan observere tilstandsendringer uten å konsumere data fra et Kafka-emne.

5.3.5. Spørrebar tilstand

Vi har allerede utført flere operasjoner som involverer tilstand og sender alltid resultatene til konsollen (for utviklingsformål) eller skriver dem til et emne (for produksjonsformål). Når du skriver resultater til et emne, må du bruke en Kafka-forbruker for å se dem.

Å lese data fra disse emnene kan betraktes som en type materialiserte synspunkter. For våre formål kan vi bruke definisjonen av en materialisert visning fra Wikipedia: "...et fysisk databaseobjekt som inneholder resultatene av en spørring. Det kan for eksempel være en lokal kopi av eksterne data, eller et undersett av radene og/eller kolonnene i en tabell eller sammenføyningsresultater, eller en sammendragstabell hentet gjennom aggregering» (https://en.wikipedia.org/wiki /Materialisert_visning).

Kafka Streams lar deg også kjøre interaktive spørringer på statlige butikker, slik at du direkte kan lese disse materialiserte visningene. Det er viktig å merke seg at spørringen til delstatsbutikken er en skrivebeskyttet operasjon. Dette sikrer at du ikke trenger å bekymre deg for ved et uhell å gjøre tilstanden inkonsekvent mens applikasjonen din behandler data.

Evnen til å direkte spørre statlige butikker er viktig. Dette betyr at du kan lage dashbordapplikasjoner uten først å måtte hente data fra Kafka-forbrukeren. Det øker også effektiviteten til applikasjonen, på grunn av det faktum at det ikke er nødvendig å skrive data igjen:

  • takket være lokaliteten til dataene kan de raskt nås;
  • duplisering av data elimineres, siden de ikke skrives til ekstern lagring.

Det viktigste jeg vil at du skal huske er at du kan spørre om tilstand direkte fra søknaden din. Mulighetene dette gir deg kan ikke overvurderes. I stedet for å konsumere data fra Kafka og lagre poster i en database for applikasjonen, kan du spørre etter tilstandslagre med samme resultat. Direkte forespørsler til statlige butikker betyr mindre kode (ingen forbruker) og mindre programvare (ikke behov for en databasetabell for å lagre resultatene).

Vi har dekket ganske mye av bakken i dette kapittelet, så vi lar diskusjonen om interaktive forespørsler mot statlige butikker ligge foreløpig. Men ikke bekymre deg: i kapittel 9 vil vi lage en enkel dashbordapplikasjon med interaktive spørringer. Den vil bruke noen av eksemplene fra dette og tidligere kapitler for å demonstrere interaktive spørringer og hvordan du kan legge dem til Kafka Streams-applikasjoner.

Oppsummering

  • KSream-objekter representerer strømmer av hendelser, sammenlignbare med innsettinger i en database. KTable-objekter representerer oppdateringsstrømmer, mer som oppdateringer til en database. Størrelsen på KTable-objektet vokser ikke, gamle poster erstattes av nye.
  • KTable-objekter kreves for aggregeringsoperasjoner.
  • Ved å bruke vindusoperasjoner kan du dele opp aggregerte data i tidsperioder.
  • Takket være GlobalKTable-objekter kan du få tilgang til referansedata hvor som helst i applikasjonen, uavhengig av partisjonering.
  • Forbindelser mellom KStream-, KTable- og GlobalKTable-objekter er mulig.

Så langt har vi fokusert på å bygge Kafka Streams-applikasjoner ved å bruke KStream DSL på høyt nivå. Selv om tilnærmingen på høyt nivå lar deg lage ryddige og konsise programmer, representerer bruken av den en avveining. Å jobbe med DSL KStream betyr å øke konsisiteten til koden din ved å redusere graden av kontroll. I det neste kapittelet skal vi se på lavnivå-handlernoden API og prøve andre avveininger. Programmene vil være lengre enn de var før, men vi vil være i stand til å lage nesten alle behandlernoder vi måtte trenge.

→ Flere detaljer om boken finner du på forlagets nettside

→ For Habrozhiteli 25% rabatt ved bruk av kupong - Kafka bekker

→ Ved betaling for papirutgaven av boken sendes en elektronisk bok på e-post.

Kilde: www.habr.com

Legg til en kommentar