Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde" Hej Khabro beboere! Denne bog er velegnet til enhver udvikler, der ønsker at forstå trådbehandling. At forstå distribueret programmering vil hjælpe dig med bedre at forstå Kafka og Kafka Streams. Det ville være rart at kende selve Kafka-rammen, men dette er ikke nødvendigt: ​​Jeg vil fortælle dig alt, hvad du har brug for. Både erfarne Kafka-udviklere og nybegyndere vil lære, hvordan man skaber interessante streambehandlingsapplikationer ved hjælp af Kafka Streams-biblioteket i denne bog. Mellemliggende og avancerede Java-udviklere, der allerede er fortrolige med begreber som serialisering, vil lære at anvende deres færdigheder til at skabe Kafka Streams-applikationer. Bogens kildekode er skrevet i Java 8 og gør betydelig brug af Java 8 lambda-udtrykssyntaks, så det vil være praktisk at vide, hvordan man arbejder med lambda-funktioner (selv i et andet programmeringssprog).

Uddrag. 5.3. Aggregering og vinduesoperationer

I dette afsnit går vi videre med at udforske de mest lovende dele af Kafka Streams. Indtil videre har vi dækket følgende aspekter af Kafka Streams:

  • skabe en behandlingstopologi;
  • brug af tilstand i streaming-applikationer;
  • udførelse af datastrømforbindelser;
  • forskelle mellem hændelsesstrømme (KStream) og opdateringsstrømme (KTable).

I de følgende eksempler vil vi samle alle disse elementer. Du vil også lære om windowing, en anden fantastisk funktion ved streaming-applikationer. Vores første eksempel vil være en simpel aggregering.

5.3.1. Aggregering af lagersalg efter industrisektor

Aggregation og gruppering er vitale værktøjer, når du arbejder med streaming af data. Gennemgang af individuelle optegnelser, efterhånden som de modtages, er ofte utilstrækkelig. For at udtrække yderligere information fra data er det nødvendigt at gruppere og kombinere dem.

I dette eksempel tager du kostumet på af en daghandler, der skal spore salgsmængden af ​​aktier i virksomheder i flere brancher. Helt konkret er du interesseret i de fem virksomheder med det største aktiesalg i hver branche.

En sådan aggregering vil kræve de følgende adskillige trin for at oversætte dataene til den ønskede form (i generelle vendinger).

  1. Opret en emnebaseret kilde, der udgiver rå aktiehandelsinformation. Vi bliver nødt til at knytte et objekt af typen StockTransaction til et objekt af typen ShareVolume. Pointen er, at StockTransaction-objektet indeholder salgsmetadata, men vi behøver kun data om antallet af aktier, der sælges.
  2. Grupper ShareVolume-data efter aktiesymbol. Når du først er grupperet efter symbol, kan du kollapse disse data til subtotaler af lagersalgsmængder. Det er værd at bemærke, at KStream.groupBy-metoden returnerer en instans af typen KGroupedStream. Og du kan få en KTable-instans ved yderligere at kalde KGroupedStream.reduce-metoden.

Hvad er KGroupedStream-grænsefladen

Metoderne KStream.groupBy og KStream.groupByKey returnerer en forekomst af KGroupedStream. KGroupedStream er en mellemrepræsentation af en strøm af begivenheder efter gruppering efter nøgler. Det er slet ikke beregnet til direkte arbejde med det. I stedet bruges KGroupedStream til aggregeringsoperationer, som altid resulterer i en KTable. Og da resultatet af aggregeringsoperationer er en KTable, og de bruger en tilstandsbutik, er det muligt, at ikke alle opdateringer som følge heraf sendes længere ned i pipelinen.

KTable.groupBy-metoden returnerer en lignende KGroupedTable - en mellemrepræsentation af strømmen af ​​opdateringer, omgrupperet efter nøgle.

Lad os tage en kort pause og se på Fig. 5.9, som viser, hvad vi har opnået. Denne topologi burde allerede være meget velkendt for dig.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Lad os nu se på koden for denne topologi (den kan findes i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Den givne kode er kendetegnet ved dens korthed og den store mængde handlinger, der udføres i flere linjer. Du bemærker muligvis noget nyt i den første parameter i builder.stream-metoden: en værdi af enum-typen AutoOffsetReset.EARLIEST (der er også en LATEST), indstillet ved hjælp af Consumed.withOffsetResetPolicy-metoden. Denne opregningstype kan bruges til at specificere en offset-nulstillingsstrategi for hver KStream eller KTable og har forrang over offset-nulstillingsmuligheden fra konfigurationen.

GroupByKey og GroupBy

KStream-grænsefladen har to metoder til at gruppere poster: GroupByKey og GroupBy. Begge returnerer en KGroupedTable, så du undrer dig måske over, hvad forskellen er mellem dem, og hvornår du skal bruge hvilken?

GroupByKey-metoden bruges, når nøglerne i KStream allerede er ikke tomme. Og vigtigst af alt blev flaget "kræver ompartitionering" aldrig sat.

GroupBy-metoden antager, at du har ændret grupperingsnøglerne, så ompartitionsflaget er sat til sand. Udførelse af joinforbindelser, sammenlægninger osv. efter GroupBy-metoden vil resultere i automatisk genpartitionering.
Resumé: Når det er muligt, bør du bruge GroupByKey i stedet for GroupBy.

Det er tydeligt, hvad mapValues ​​og groupBy-metoderne gør, så lad os tage et kig på sum()-metoden (findes i src/main/java/bbejeck/model/ShareVolume.java) (liste 5.3).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
ShareVolume.sum metoden returnerer den løbende total af lagersalgsvolumen, og resultatet af hele kæden af ​​beregninger er et KTable-objekt . Nu forstår du, hvilken rolle KTable spiller. Når ShareVolume-objekter ankommer, gemmer det tilsvarende KTable-objekt den seneste aktuelle opdatering. Det er vigtigt at huske, at alle opdateringer afspejles i den tidligere shareVolumeKTable, men ikke alle sendes videre.

Vi bruger derefter denne KT-tabel til at aggregere (efter antal handlede aktier) for at nå frem til de fem virksomheder med de højeste mængder af aktier, der handles i hver branche. Vores handlinger i dette tilfælde vil ligne dem for den første sammenlægning.

  1. Udfør en anden groupBy-handling for at gruppere individuelle ShareVolume-objekter efter branche.
  2. Begynd at opsummere ShareVolume-objekter. Denne gang er aggregeringsobjektet en prioritetskø med fast størrelse. I denne kø med fast størrelse er det kun de fem selskaber, der har de største mængder solgte aktier, der beholdes.
  3. Kortlæg køerne fra det foregående afsnit til en strengværdi og returner de fem mest handlede aktier efter antal efter branche.
  4. Skriv resultaterne i strengform til emnet.

I fig. Figur 5.10 viser dataflowtopologigrafen. Som du kan se, er anden behandlingsrunde ret enkel.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Nu hvor vi har en klar forståelse af strukturen af ​​denne anden behandlingsrunde, kan vi vende os til dens kildekode (du finder den i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Denne initializer indeholder en fixedQueue-variabel. Dette er et brugerdefineret objekt, der er en adapter til java.util.TreeSet, der bruges til at spore de øverste N resultater i faldende rækkefølge af handlede aktier.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Du har allerede set groupBy- og mapValues-kaldene, så vi vil ikke gå ind i dem (vi kalder KTable.toStream-metoden, fordi KTable.print-metoden er forældet). Men du har ikke set KTable-versionen af ​​aggregate() endnu, så vi vil bruge lidt tid på at diskutere det.

Som du husker, er det, der gør KTable anderledes, at poster med de samme nøgler betragtes som opdateringer. KTable erstatter den gamle post med en ny. Aggregering sker på lignende måde: de seneste poster med samme nøgle aggregeres. Når en post ankommer, tilføjes den til FixedSizePriorityQueue-klasseinstansen ved hjælp af en adder (anden parameter i det samlede metodekald), men hvis der allerede findes en anden post med den samme nøgle, fjernes den gamle post ved hjælp af en subtraktor (tredje parameter i det samlede metodekald).

Alt dette betyder, at vores aggregator, FixedSizePriorityQueue, ikke samler alle værdier med én nøgle, men gemmer en bevægende sum af mængderne af de N mest handlede typer af aktier. Hver indgående post indeholder det samlede antal solgte aktier indtil videre. KTable vil give dig information om, hvilke selskabers aktier der i øjeblikket er mest handlet, uden at kræve rullende aggregering af hver opdatering.

Vi lærte at gøre to vigtige ting:

  • grupper værdier i KTable med en fælles nøgle;
  • udføre nyttige operationer såsom rollup og aggregering på disse grupperede værdier.

At vide, hvordan man udfører disse operationer er vigtigt for at forstå betydningen af ​​de data, der bevæger sig gennem en Kafka Streams-applikation og forstå, hvilken information den bærer.

Vi har også samlet nogle af de nøglebegreber, der er diskuteret tidligere i denne bog. I kapitel 4 diskuterede vi, hvor fejltolerant, lokal stat er vigtig for en streamingapplikation. Det første eksempel i dette kapitel viste, hvorfor lokal stat er så vigtig – det giver dig mulighed for at holde styr på, hvilke oplysninger du allerede har set. Lokal adgang undgår netværksforsinkelser, hvilket gør applikationen mere effektiv og fejlbestandig.

Når du udfører en opsamlings- eller aggregeringsoperation, skal du angive navnet på statens lager. Rollup- og aggregeringsoperationerne returnerer en KTable-instans, og KTable'en bruger tilstandslagring til at erstatte gamle resultater med nye. Som du har set, sendes ikke alle opdateringer ned ad pipelinen, og dette er vigtigt, fordi aggregeringsoperationer er designet til at producere sammenfattende information. Hvis du ikke anvender lokal stat, videresender KTable alle sammenlægnings- og oprulningsresultater.

Dernæst vil vi se på udførelse af operationer såsom aggregering inden for en bestemt tidsperiode - såkaldte vinduesoperationer.

5.3.2. Vinduesbetjening

I det foregående afsnit introducerede vi glidende foldning og aggregering. Applikationen udførte en løbende oprulning af lagersalgsvolumen efterfulgt af sammenlægning af de fem mest handlede aktier på børsen.

Nogle gange er en sådan kontinuerlig sammenlægning og oprulning af resultater nødvendig. Og nogle gange skal du kun udføre operationer over en given periode. Beregn for eksempel, hvor mange byttetransaktioner, der blev foretaget med aktier i et bestemt selskab i de sidste 10 minutter. Eller hvor mange brugere der har klikket på et nyt reklamebanner inden for de sidste 15 minutter. Et program kan udføre sådanne handlinger flere gange, men med resultater, der kun gælder for specificerede tidsperioder (tidsvinduer).

Optælling af byttetransaktioner af køber

I det næste eksempel sporer vi aktietransaktioner på tværs af flere handlende – enten store organisationer eller smarte individuelle finansfolk.

Der er to mulige årsager til denne sporing. En af dem er behovet for at vide, hvad markedsledere køber/sælger. Hvis disse store spillere og sofistikerede investorer ser muligheder, giver det mening at følge deres strategi. Den anden grund er ønsket om at få øje på eventuelle tegn på ulovlig insiderhandel. For at gøre dette skal du analysere sammenhængen mellem store salgsstigninger og vigtige pressemeddelelser.

Sådan sporing består af følgende trin:

  • oprettelse af en strøm til læsning fra emnet om aktietransaktioner;
  • gruppering af indgående poster efter køber-id og aktiesymbol. Kaldning af groupBy-metoden returnerer en forekomst af KGroupedStream-klassen;
  • Metoden KGroupedStream.windowedBy returnerer en datastrøm begrænset til et tidsvindue, som tillader aggregering i vinduer. Afhængigt af vinduestypen returneres enten en TimeWindowedKStream eller en SessionWindowedKStream;
  • transaktionsantal for aggregeringsoperationen. Det vinduesbaserede dataflow bestemmer, om der tages hensyn til en bestemt post i denne optælling;
  • skrive resultater til et emne eller udlæse dem til konsollen under udvikling.

Topologien af ​​denne applikation er enkel, men et klart billede af det ville være nyttigt. Lad os tage et kig på fig. 5.11.

Dernæst vil vi se på funktionaliteten af ​​vinduesoperationer og den tilsvarende kode.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"

Vinduestyper

Der er tre typer vinduer i Kafka Streams:

  • sessionsbestemt;
  • "tumbling";
  • glide/hoppe.

Hvilken man skal vælge afhænger af dine forretningskrav. Tumbling og springvinduer er tidsbegrænsede, mens sessionsvinduer er begrænset af brugeraktivitet - varigheden af ​​sessionen/sessionerne bestemmes udelukkende af, hvor aktiv brugeren er. Det vigtigste at huske er, at alle vinduestyper er baseret på dato/tidsstempler for indtastningerne, ikke systemtiden.

Dernæst implementerer vi vores topologi med hver af vinduestyperne. Den komplette kode vil kun blive givet i det første eksempel; for andre typer vinduer vil intet ændre sig, undtagen typen af ​​vinduesdrift.

Sessionsvinduer

Sessionsvinduer er meget forskellige fra alle andre typer vinduer. De begrænses ikke så meget af tid som af brugerens aktivitet (eller aktiviteten af ​​den enhed, du gerne vil spore). Sessionsvinduer er afgrænset af perioder med inaktivitet.

Figur 5.12 illustrerer konceptet med sessionsvinduer. Den mindre session vil smelte sammen med sessionen til venstre. Og sessionen til højre vil være adskilt, fordi den følger en lang periode med inaktivitet. Sessionsvinduer er baseret på brugeraktivitet, men brug dato/tidsstempler fra poster til at bestemme, hvilken session posten tilhører.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"

Brug af sessionsvinduer til at spore aktietransaktioner

Lad os bruge sessionsvinduer til at fange information om udvekslingstransaktioner. Implementeringen af ​​sessionsvinduer er vist i Listing 5.5 (som kan findes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Du har allerede set de fleste af operationerne i denne topologi, så der er ingen grund til at se på dem igen her. Men der er også flere nye elementer her, som vi nu vil diskutere.

Enhver groupBy-operation udfører typisk en form for aggregeringsoperation (sammenlægning, oprulning eller optælling). Du kan udføre enten kumulativ aggregering med en løbende total eller vinduesaggregering, som tager højde for poster inden for et specificeret tidsvindue.

Koden i Listing 5.5 tæller antallet af transaktioner inden for sessionsvinduer. I fig. 5.13 disse handlinger analyseres trin for trin.

Ved at kalde windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) opretter vi et sessionsvindue med et inaktivitetsinterval på 20 sekunder og et persistensinterval på 15 minutter. Et inaktivt interval på 20 sekunder betyder, at applikationen vil inkludere enhver post, der ankommer inden for 20 sekunder efter afslutningen eller starten af ​​den aktuelle session til den aktuelle (aktive) session.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Dernæst specificerer vi, hvilken aggregeringsoperation der skal udføres i sessionsvinduet - i dette tilfælde tæller. Hvis en indgående post falder uden for inaktivitetsvinduet (på begge sider af dato/tidsstemplet), opretter applikationen en ny session. Opbevaringsinterval betyder at opretholde en session i et vist tidsrum og giver mulighed for sene data, der strækker sig ud over sessionens inaktivitetsperiode, men som stadig kan vedhæftes. Derudover svarer starten og slutningen af ​​den nye session, der er resultatet af fletningen, til det tidligste og seneste dato/tidsstempel.

Lad os se på nogle få poster fra tællemetoden for at se, hvordan sessioner fungerer (tabel 5.1).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Når registreringer ankommer, søger vi efter eksisterende sessioner med den samme nøgle, et sluttidspunkt, der er mindre end det aktuelle dato/tidsstempel - inaktivitetsinterval, og et starttidspunkt, der er større end det aktuelle dato/tidsstempel + inaktivitetsinterval. Med dette i betragtning, fire poster fra tabellen. 5.1 slås sammen til en enkelt session som følger.

1. Record 1 ankommer først, så starttidspunktet er lig med sluttidspunktet og er 00:00:00.

2. Dernæst ankommer post 2, og vi leder efter sessioner, der slutter tidligst 23:59:55 og starter senest 00:00:35. Vi finder rekord 1 og kombinerer session 1 og 2. Vi tager starttidspunktet for session 1 (tidligere) og sluttidspunktet for session 2 (senere), således at vores nye session starter kl. 00:00:00 og slutter kl. 00: 00:15.

3. Rekord 3 ankommer, vi leder efter sessioner mellem 00:00:30 og 00:01:10 og finder ingen. Tilføj en anden session for nøglen 123-345-654,FFBE, der starter og slutter kl. 00:00:50.

4. Rekord 4 ankommer, og vi leder efter sessioner mellem 23:59:45 og 00:00:25. Denne gang findes både session 1 og 2. Alle tre sessioner er samlet til én, med et starttidspunkt på 00:00:00 og et sluttidspunkt på 00:00:15.

Fra det, der er beskrevet i dette afsnit, er det værd at huske følgende vigtige nuancer:

  • sessioner er ikke vinduer med fast størrelse. Varigheden af ​​en session bestemmes af aktiviteten inden for en given tidsperiode;
  • Dato/tidsstemplerne i dataene bestemmer, om hændelsen falder inden for en eksisterende session eller i en inaktiv periode.

Dernæst vil vi diskutere den næste type vindue - "tumbling" vinduer.

"Tumbling" vinduer

Tumbling windows fanger begivenheder, der falder inden for et bestemt tidsrum. Forestil dig, at du skal fange alle aktietransaktioner i en bestemt virksomhed hvert 20. sekund, så du samler alle begivenhederne i denne periode. Ved slutningen af ​​20 sekunders interval ruller vinduet om og flytter til et nyt 20 sekunders observationsinterval. Figur 5.14 illustrerer denne situation.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Som du kan se, er alle hændelser modtaget inden for de sidste 20 sekunder inkluderet i vinduet. Ved afslutningen af ​​denne periode oprettes et nyt vindue.

Liste 5.6 viser kode, der demonstrerer brugen af ​​tumbling-vinduer til at fange aktietransaktioner hvert 20. sekund (findes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Med denne lille ændring af TimeWindows.of-metodekaldet kan du bruge et tumbling-vindue. Dette eksempel kalder ikke indtil()-metoden, så standardopbevaringsintervallet på 24 timer vil blive brugt.

Endelig er det tid til at gå videre til den sidste af vinduesmulighederne - "hoppende" vinduer.

Skydende ("hoppende") vinduer

Skyde-/hoppevinduer ligner tumlevinduer, men med en lille forskel. Skydevinduer vent ikke til slutningen af ​​tidsintervallet, før du opretter et nyt vindue til at behandle seneste hændelser. De starter nye beregninger efter et venteinterval, der er mindre end vinduesvarigheden.

For at illustrere forskellene mellem tumbling og springvinduer, lad os vende tilbage til eksemplet med at tælle børstransaktioner. Vores mål er stadig at tælle antallet af transaktioner, men vi ønsker ikke at vente hele tiden, før vi opdaterer tælleren. I stedet vil vi opdatere tælleren med kortere intervaller. For eksempel vil vi stadig tælle antallet af transaktioner hvert 20. sekund, men opdatere tælleren hvert 5. sekund, som vist i fig. 5.15. I dette tilfælde ender vi med tre resultatvinduer med overlappende data.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Liste 5.7 viser koden til at definere glidende vinduer (findes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Et tumbling-vindue kan konverteres til et hop-vindue ved at tilføje et kald til advanceBy()-metoden. I det viste eksempel er spareintervallet 15 minutter.

Du så i dette afsnit, hvordan man begrænser aggregeringsresultater til tidsvinduer. Især vil jeg have dig til at huske følgende tre ting fra dette afsnit:

  • Størrelsen af ​​sessionsvinduer er ikke begrænset af tidsperiode, men af ​​brugeraktivitet;
  • "tumbling" vinduer giver et overblik over begivenheder inden for en given tidsperiode;
  • Varigheden af ​​springvinduer er fast, men de opdateres hyppigt og kan indeholde overlappende poster i alle vinduer.

Dernæst vil vi lære, hvordan man konverterer en KTable tilbage til en KSream for en forbindelse.

5.3.3. Tilslutning af KStream- og KTable-objekter

I kapitel 4 diskuterede vi at forbinde to KStream-objekter. Nu skal vi lære at forbinde KTable og KStream. Dette kan være nødvendigt af følgende simple årsag. KStream er en strøm af poster, og KTable er en strøm af rekordopdateringer, men nogle gange vil du måske tilføje yderligere kontekst til rekordstrømmen ved at bruge opdateringer fra KTable.

Lad os tage data om antallet af børstransaktioner og kombinere dem med børsnyheder for de relevante brancher. Her er, hvad du skal gøre for at opnå dette givet den kode, du allerede har.

  1. Konverter et KTable-objekt med data om antallet af aktietransaktioner til en KStream, efterfulgt af udskiftning af nøglen med nøglen, der angiver den industrisektor, der svarer til dette aktiesymbol.
  2. Opret et KTable-objekt, der læser data fra et emne med børsnyheder. Denne nye KTable vil blive kategoriseret efter industrisektor.
  3. Forbind nyhedsopdateringer med information om antallet af børstransaktioner efter branche.

Lad os nu se, hvordan denne handlingsplan implementeres.

Konverter KTable til KSream

For at konvertere KTable til KStream skal du gøre følgende.

  1. Kald KTable.toStream() metoden.
  2. Ved at kalde KStream.map-metoden skal du erstatte nøglen med industrinavnet og derefter hente TransactionSummary-objektet fra Windowed-forekomsten.

Vi kæder disse operationer sammen som følger (koden kan findes i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Fordi vi udfører en KStream.map-operation, bliver den returnerede KStream-instans automatisk genpartitioneret, når den bruges i en forbindelse.

Vi har afsluttet konverteringsprocessen, derefter skal vi oprette et KTable-objekt til at læse aktienyheder.

Oprettelse af KTable til aktienyheder

Heldigvis kræver det kun én linje kode at oprette et KTable-objekt (koden kan findes i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Det er værd at bemærke, at ingen Serde-objekter skal angives, da streng Serdes bruges i indstillingerne. Ved at bruge den TIDLIGSTE opregning er tabellen også fyldt med poster helt i begyndelsen.

Nu kan vi gå videre til det sidste trin - forbindelse.

Forbindelse af nyhedsopdateringer med transaktionsoptællingsdata

Det er ikke svært at skabe en forbindelse. Vi vil bruge en venstre join i tilfælde af, at der ikke er aktienyheder for den relevante branche (den nødvendige kode kan findes i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Denne leftJoin-operator er ret enkel. I modsætning til joins i kapitel 4, bruges JoinWindow-metoden ikke, fordi når man udfører en KStream-KTable-join, er der kun én indgang i KTablen for hver nøgle. En sådan forbindelse er ikke begrænset i tid: posten er enten i KT-tabellen eller fraværende. Hovedkonklusionen: ved at bruge KTable-objekter kan du berige KStream med sjældnere opdaterede referencedata.

Nu vil vi se på en mere effektiv måde at berige begivenheder fra KStream på.

5.3.4. GlobalKTable-objekter

Som du kan se, er der behov for at berige begivenhedsstreams eller tilføje kontekst til dem. I kapitel 4 så du forbindelserne mellem to KStream-objekter, og i det foregående afsnit så du forbindelsen mellem en KStream og en KTable. I alle disse tilfælde er det nødvendigt at ompartitionere datastrømmen, når nøglerne tilknyttes en ny type eller værdi. Nogle gange udføres ompartitionering eksplicit, og nogle gange gør Kafka Streams det automatisk. Ompartitionering er nødvendig, fordi nøglerne er ændret, og posterne skal ende i nye sektioner, ellers vil forbindelsen være umulig (dette blev diskuteret i kapitel 4, i afsnittet "Ompartitionering af data" i underafsnit 4.2.4).

Genopdeling har en omkostning

Ompartitionering kræver omkostninger - yderligere ressourceomkostninger til oprettelse af mellemliggende emner, lagring af duplikerede data i et andet emne; det betyder også øget latenstid på grund af skrivning og læsning fra dette emne. Derudover, hvis du har brug for at forbinde på tværs af mere end ét aspekt eller dimension, skal du kæde sammenkædningerne, kortlægge posterne med nye nøgler og køre ompartitioneringsprocessen igen.

Tilslutning til mindre datasæt

I nogle tilfælde er mængden af ​​referencedata, der skal tilsluttes, relativt lille, så komplette kopier af dem kan nemt passe lokalt på hver node. Til situationer som denne tilbyder Kafka Streams GlobalKTable-klassen.

GlobalKTable-instanser er unikke, fordi applikationen replikerer alle data til hver af noderne. Og da alle data er til stede på hver node, er der ikke behov for at partitionere hændelsesstrømmen efter referencedatanøgle, så den er tilgængelig for alle partitioner. Du kan også lave nøglefri joins ved hjælp af GlobalKTable-objekter. Lad os gå tilbage til et af de foregående eksempler for at demonstrere denne funktion.

Forbindelse af KStream-objekter til GlobalKTable-objekter

I underafsnit 5.3.2 udførte vi vinduesaggregering af købers byttetransaktioner. Resultaterne af denne sammenlægning så nogenlunde således ud:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Selvom disse resultater tjente formålet, ville det have været mere nyttigt, hvis kundens navn og fulde firmanavn også var blevet vist. For at tilføje kundenavnet og firmanavnet kan du lave normale joinforbindelser, men du skal lave to nøgletilknytninger og ompartitionere. Med GlobalKTable kan du undgå omkostningerne ved sådanne operationer.

For at gøre dette bruger vi countStream-objektet fra Listing 5.11 (den tilsvarende kode kan findes i src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) og forbinder det til to GlobalKTable-objekter.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Vi har allerede diskuteret dette før, så jeg vil ikke gentage det. Men jeg bemærker, at koden i toStream().map-funktionen er abstraheret til et funktionsobjekt i stedet for et inline lambda-udtryk af hensyn til læsbarheden.

Det næste trin er at erklære to forekomster af GlobalKTable (den viste kode kan findes i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"

Bemærk venligst, at emnenavne er beskrevet ved hjælp af opregnede typer.

Nu hvor vi har alle komponenterne klar, er der kun tilbage at skrive koden til forbindelsen (som kan findes i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
Selvom der er to joins i denne kode, er de kædet sammen, fordi ingen af ​​deres resultater bruges separat. Resultaterne vises i slutningen af ​​hele operationen.

Når du kører ovenstående join-operation, får du resultater som dette:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Essensen har ikke ændret sig, men disse resultater ser mere klare ud.

Hvis du tæller ned til kapitel 4, har du allerede set flere typer forbindelser i aktion. De er anført i tabellen. 5.2. Denne tabel afspejler tilslutningsmulighederne fra version 1.0.0 af Kafka Streams; Noget kan ændre sig i fremtidige udgivelser.

Bogen "Kafka-strømme i aktion. Applikationer og mikrotjenester til realtidsarbejde"
For at afslutte tingene, lad os opsummere det grundlæggende: du kan forbinde begivenhedsstreams (KStream) og opdatere streams (KTable) ved hjælp af lokal stat. Alternativt, hvis størrelsen af ​​referencedataene ikke er for store, kan du bruge GlobalKTable-objektet. GlobalKTables replikerer alle partitioner til hver Kafka Streams-applikationsknude, hvilket sikrer, at alle data er tilgængelige, uanset hvilken partition nøglen svarer til.

Dernæst vil vi se Kafka Streams-funktionen, takket være hvilken vi kan observere tilstandsændringer uden at forbruge data fra et Kafka-emne.

5.3.5. Forespørgselstilstand

Vi har allerede udført adskillige operationer, der involverer tilstand og udsender altid resultaterne til konsollen (til udviklingsformål) eller skriver dem til et emne (til produktionsformål). Når du skriver resultater til et emne, skal du bruge en Kafka-forbruger til at se dem.

At læse data fra disse emner kan betragtes som en type materialiserede synspunkter. Til vores formål kan vi bruge definitionen af ​​en materialiseret visning fra Wikipedia: "...et fysisk databaseobjekt, der indeholder resultaterne af en forespørgsel. Det kunne f.eks. være en lokal kopi af fjerndata eller en undergruppe af rækkerne og/eller kolonnerne i en tabel eller sammenføjningsresultater eller en oversigtstabel opnået gennem aggregering" (https://en.wikipedia.org/wiki /Materialiseret_visning).

Kafka Streams giver dig også mulighed for at køre interaktive forespørgsler på statslige butikker, så du direkte kan læse disse materialiserede synspunkter. Det er vigtigt at bemærke, at forespørgslen til statens lager er en skrivebeskyttet handling. Dette sikrer, at du ikke behøver at bekymre dig om ved et uheld at gøre tilstanden inkonsekvent, mens din applikation behandler data.

Evnen til direkte at forespørge i statens butikker er vigtig. Det betyder, at du kan oprette dashboard-applikationer uden først at skulle hente data fra Kafka-forbrugeren. Det øger også effektiviteten af ​​applikationen, på grund af det faktum, at der ikke er behov for at skrive data igen:

  • takket være lokaliteten af ​​dataene kan de hurtigt tilgås;
  • duplikering af data elimineres, da de ikke skrives til eksternt lager.

Det vigtigste, jeg vil have dig til at huske, er, at du direkte kan forespørge om tilstand fra din ansøgning. De muligheder dette giver dig kan ikke overvurderes. I stedet for at forbruge data fra Kafka og gemme poster i en database til applikationen, kan du forespørge på tilstandsbutikker med det samme resultat. Direkte forespørgsler til statsbutikker betyder mindre kode (ingen forbruger) og mindre software (intet behov for en databasetabel til at gemme resultaterne).

Vi har dækket en del af jorden i dette kapitel, så vi lader vores diskussion af interaktive forespørgsler mod statslige butikker ligge indtil videre. Men bare rolig: I kapitel 9 opretter vi en simpel dashboard-applikation med interaktive forespørgsler. Den vil bruge nogle af eksemplerne fra dette og tidligere kapitler til at demonstrere interaktive forespørgsler, og hvordan du kan tilføje dem til Kafka Streams-applikationer.

Resumé

  • KStream-objekter repræsenterer strømme af begivenheder, der kan sammenlignes med indsættelser i en database. KTable-objekter repræsenterer opdateringsstrømme, mere som opdateringer til en database. Størrelsen på KTable-objektet vokser ikke, gamle optegnelser erstattes af nye.
  • KTable-objekter er nødvendige for aggregeringsoperationer.
  • Ved at bruge vinduesoperationer kan du opdele aggregerede data i tidsintervaller.
  • Takket være GlobalKTable-objekter kan du få adgang til referencedata overalt i applikationen, uanset partitionering.
  • Forbindelser mellem KStream-, KTable- og GlobalKTable-objekter er mulige.

Indtil videre har vi fokuseret på at bygge Kafka Streams-applikationer ved hjælp af KStream DSL på højt niveau. Selvom tilgangen på højt niveau giver dig mulighed for at skabe pæne og kortfattede programmer, er brugen af ​​den en afvejning. At arbejde med DSL KStream betyder at gøre din kode mere kortfattet ved at reducere graden af ​​kontrol. I det næste kapitel vil vi se på lav-niveau handler node API og prøve andre afvejninger. Programmerne vil være længere, end de var før, men vi vil være i stand til at oprette næsten enhver håndteringsknude, som vi måtte have brug for.

→ Flere detaljer om bogen kan findes på forlagets hjemmeside

→ For Habrozhiteli 25% rabat ved brug af kupon - Kafka-strømme

→ Ved betaling for papirversionen af ​​bogen fremsendes en elektronisk bog på e-mail.

Kilde: www.habr.com

Tilføj en kommentar