Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete" Hej alla Khabrobor! Den här boken är lämplig för alla utvecklare som vill förstå trådbearbetning. Att förstå distribuerad programmering hjälper dig att bättre förstå Kafka och Kafka Streams. Det skulle vara trevligt att känna till själva Kafka-ramverket, men detta är inte nödvändigt: jag kommer att berätta allt du behöver. Både erfarna Kafka-utvecklare och nybörjare kommer att lära sig hur man skapar intressanta strömbehandlingsprogram med Kafka Streams-biblioteket i den här boken. Mellanliggande och avancerade Java-utvecklare som redan är bekanta med begrepp som serialisering kommer att lära sig att tillämpa sina färdigheter för att skapa Kafka Streams-applikationer. Bokens källkod är skriven i Java 8 och använder sig av Java 8 lambda-uttryckssyntax, så att veta hur man arbetar med lambda-funktioner (även i ett annat programmeringsspråk) kommer att vara praktiskt.

Utdrag. 5.3. Aggregering och fönsteroperationer

I det här avsnittet går vi vidare för att utforska de mest lovande delarna av Kafka Streams. Hittills har vi täckt följande aspekter av Kafka Streams:

  • skapa en bearbetningstopologi;
  • använda tillstånd i streamingapplikationer;
  • utföra dataströmsanslutningar;
  • skillnader mellan händelseströmmar (KStream) och uppdateringsströmmar (KTable).

I följande exempel kommer vi att sammanföra alla dessa element. Du kommer också att lära dig om fönster, en annan fantastisk funktion i streamingapplikationer. Vårt första exempel kommer att vara en enkel aggregering.

5.3.1. Aggregering av lagerförsäljning per industrisektor

Aggregation och gruppering är viktiga verktyg när du arbetar med strömmande data. Granskning av enskilda journaler när de tas emot är ofta otillräcklig. För att extrahera ytterligare information från data är det nödvändigt att gruppera och kombinera dem.

I det här exemplet tar du på dig kostymen till en daghandlare som behöver spåra försäljningsvolymen för aktier i företag i flera branscher. Konkret är du intresserad av de fem bolagen med störst aktieförsäljning i varje bransch.

Sådan aggregering kommer att kräva följande flera steg för att översätta data till önskad form (allmänna termer).

  1. Skapa en ämnesbaserad källa som publicerar rå information om aktiehandel. Vi måste mappa ett objekt av typen StockTransaction till ett objekt av typen ShareVolume. Poängen är att StockTransaction-objektet innehåller försäljningsmetadata, men vi behöver bara data om antalet aktier som säljs.
  2. Gruppera ShareVolume-data efter aktiesymbol. När du har grupperat efter symbol kan du kollapsa dessa data till delsummor av lagerförsäljningsvolymer. Det är värt att notera att metoden KStream.groupBy returnerar en instans av typen KGroupedStream. Och du kan få en KTable-instans genom att ytterligare anropa metoden KGroupedStream.reduce.

Vad är KGroupedStream-gränssnittet

Metoderna KStream.groupBy och KStream.groupByKey returnerar en instans av KGroupedStream. KGroupedStream är en mellanliggande representation av en ström av händelser efter gruppering efter nycklar. Den är inte alls avsedd för direkt arbete med den. Istället används KGroupedStream för aggregeringsoperationer, som alltid resulterar i en KTable. Och eftersom resultatet av aggregeringsoperationer är en KT-tabell och de använder en tillståndsbutik, är det möjligt att inte alla uppdateringar som ett resultat skickas längre ner i pipelinen.

Metoden KTable.groupBy returnerar en liknande KGroupedTable - en mellanliggande representation av strömmen av uppdateringar, omgrupperad efter nyckel.

Låt oss ta en kort paus och titta på Fig. 5.9, som visar vad vi har uppnått. Denna topologi borde redan vara mycket bekant för dig.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Låt oss nu titta på koden för denna topologi (den kan hittas i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Den givna koden kännetecknas av dess korthet och den stora mängden åtgärder som utförs på flera rader. Du kanske märker något nytt i den första parametern i builder.stream-metoden: ett värde av enum-typen AutoOffsetReset.EARLIEST (det finns också en LATEST), inställd med metoden Consumed.withOffsetResetPolicy. Denna uppräkningstyp kan användas för att specificera en offsetåterställningsstrategi för varje KStream eller KTable och har företräde framför offsetåterställningsalternativet från konfigurationen.

GroupByKey och GroupBy

KStream-gränssnittet har två metoder för att gruppera poster: GroupByKey och GroupBy. Båda returnerar en KGroupedTable, så du kanske undrar vad skillnaden är mellan dem och när du ska använda vilken?

GroupByKey-metoden används när nycklarna i KStream redan är tomma. Och viktigast av allt, flaggan "kräver ompartitionering" ställdes aldrig in.

GroupBy-metoden förutsätter att du har ändrat grupperingsnycklarna, så ompartitionsflaggan är satt till true. Att utföra kopplingar, aggregering, etc. efter GroupBy-metoden kommer att resultera i automatisk ompartitionering.
Sammanfattning: När det är möjligt bör du använda GroupByKey istället för GroupBy.

Det är tydligt vad mapValues- och groupBy-metoderna gör, så låt oss ta en titt på sum()-metoden (finns i src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Metoden ShareVolume.sum returnerar den löpande summan av lagerförsäljningsvolymen, och resultatet av hela beräkningskedjan är ett KTable-objekt . Nu förstår du vilken roll KTable spelar. När ShareVolume-objekt anländer lagrar motsvarande KTable-objekt den senaste aktuella uppdateringen. Det är viktigt att komma ihåg att alla uppdateringar återspeglas i den tidigare shareVolumeKTable, men att alla inte skickas vidare.

Vi använder sedan denna KT-tabell för att aggregera (efter antal omsatta aktier) för att komma fram till de fem företagen med de högsta volymerna av aktier som omsätts i varje bransch. Våra åtgärder i det här fallet kommer att likna dem för den första sammanställningen.

  1. Utför en annan groupBy-operation för att gruppera enskilda ShareVolume-objekt efter bransch.
  2. Börja sammanfatta ShareVolume-objekt. Den här gången är aggregeringsobjektet en prioritetskö med fast storlek. I denna kö med fast storlek behålls endast de fem bolagen med de största mängderna sålda aktier.
  3. Kartlägg köerna från föregående stycke till ett strängvärde och returnera de fem mest omsatta aktierna efter antal efter bransch.
  4. Skriv resultaten i strängform till ämnet.

I fig. Figur 5.10 visar dataflödestopologigrafen. Som du kan se är den andra omgången av bearbetning ganska enkel.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Nu när vi har en klar förståelse för strukturen för denna andra bearbetningsomgång kan vi vända oss till dess källkod (du hittar den i filen src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Denna initialiserare innehåller en fixQueue-variabel. Detta är ett anpassat objekt som är en adapter för java.util.TreeSet som används för att spåra de översta N-resultaten i fallande ordning för aktier som handlas.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Du har redan sett groupBy- och mapValues-anropen, så vi går inte in på dem (vi anropar KTable.toStream-metoden eftersom KTable.print-metoden är utfasad). Men du har inte sett KTable-versionen av aggregate() än, så vi ska ägna lite tid åt att diskutera det.

Som ni minns är det som gör KTable annorlunda att poster med samma nycklar anses vara uppdateringar. KTable ersätter den gamla posten med en ny. Aggregering sker på liknande sätt: de senaste posterna med samma nyckel aggregeras. När en post anländer läggs den till i klassinstansen FixedSizePriorityQueue med hjälp av en adderare (andra parametern i det samlade metodanropet), men om en annan post redan finns med samma nyckel, tas den gamla posten bort med en subtraktor (tredje parametern i aggregerade metodanropet).

Allt detta betyder att vår aggregator, FixedSizePriorityQueue, inte aggregerar alla värden med en nyckel, utan lagrar en rörlig summa av kvantiteterna av de N mest omsatta typerna av aktier. Varje inkommande post innehåller det totala antalet sålda aktier hittills. KTable ger dig information om vilka företags aktier som för närvarande är mest omsatta, utan att det krävs rullande aggregering av varje uppdatering.

Vi lärde oss att göra två viktiga saker:

  • gruppera värden i KTable med en gemensam nyckel;
  • utföra användbara operationer såsom sammanställning och aggregering av dessa grupperade värden.

Att veta hur man utför dessa operationer är viktigt för att förstå innebörden av data som rör sig genom en Kafka Streams-applikation och förstå vilken information den bär.

Vi har också samlat några av de nyckelbegrepp som diskuterats tidigare i den här boken. I kapitel 4 diskuterade vi hur feltolerant, lokal stat är viktig för en streamingapplikation. Det första exemplet i det här kapitlet visade varför lokal stat är så viktig – det ger dig möjlighet att hålla reda på vilken information du redan har sett. Lokal åtkomst undviker nätverksförseningar, vilket gör applikationen mer prestanda och felbeständig.

När du utför en sammanställning eller aggregering måste du ange namnet på tillståndsbutiken. Rollup- och aggregeringsoperationerna returnerar en KTable-instans, och KTable använder tillståndslagring för att ersätta gamla resultat med nya. Som du har sett skickas inte alla uppdateringar i pipelinen, och detta är viktigt eftersom aggregeringsoperationer är utformade för att producera sammanfattande information. Om du inte tillämpar lokal tillstånd kommer KTable att vidarebefordra alla aggregerings- och samlade resultat.

Därefter ska vi titta på att utföra operationer såsom aggregering inom en viss tidsperiod - så kallade fönsteroperationer.

5.3.2. Fönsteroperationer

I föregående avsnitt introducerade vi glidfalsning och aggregering. Applikationen genomförde en kontinuerlig upprullning av aktieförsäljning följt av aggregering av de fem mest omsatta aktierna på börsen.

Ibland är sådan kontinuerlig sammanställning och upprullning av resultat nödvändig. Och ibland behöver du endast utföra operationer under en viss tidsperiod. Beräkna till exempel hur många bytestransaktioner som gjorts med aktier i ett visst företag under de senaste 10 minuterna. Eller hur många användare som klickade på en ny reklambanner under de senaste 15 minuterna. Ett program kan utföra sådana operationer flera gånger, men med resultat som endast gäller för specificerade tidsperioder (tidsfönster).

Räknar bytestransaktioner av köpare

I nästa exempel kommer vi att spåra aktietransaktioner mellan flera handlare – antingen stora organisationer eller smarta enskilda finansiärer.

Det finns två möjliga orsaker till denna spårning. En av dem är behovet av att veta vad marknadsledare köper/säljer. Om dessa stora aktörer och sofistikerade investerare ser möjligheter är det vettigt att följa deras strategi. Det andra skälet är önskan att upptäcka eventuella tecken på illegal insiderhandel. För att göra detta måste du analysera sambandet mellan stora försäljningstoppar och viktiga pressmeddelanden.

Sådan spårning består av följande steg:

  • skapa en ström för läsning från ämnet aktietransaktioner;
  • gruppera inkommande poster efter köpar-ID och aktiesymbol. Att anropa groupBy-metoden returnerar en instans av klassen KGroupedStream;
  • Metoden KGroupedStream.windowedBy returnerar en dataström som är begränsad till ett tidsfönster, vilket tillåter aggregering i fönster. Beroende på fönstertyp returneras antingen en TimeWindowedKStream eller en SessionWindowedKStream;
  • transaktionsräkning för aggregeringsoperationen. Det fönsterförsedda dataflödet bestämmer huruvida en specifik post tas med i denna räkning;
  • skriva resultat till ett ämne eller mata ut dem till konsolen under utveckling.

Topologin för denna applikation är enkel, men en tydlig bild av den skulle vara till hjälp. Låt oss ta en titt på fig. 5.11.

Därefter ska vi titta på funktionaliteten för fönsteroperationer och motsvarande kod.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"

Fönstertyper

Det finns tre typer av fönster i Kafka Streams:

  • sessionsmässig;
  • ”tumling” (tumbling);
  • glida/hoppa.

Vilken du ska välja beror på dina affärsbehov. Tumling- och hoppfönster är tidsbegränsade, medan sessionsfönster begränsas av användaraktivitet – varaktigheten av sessionen/sessionerna bestäms enbart av hur aktiv användaren är. Det viktigaste att komma ihåg är att alla fönstertyper är baserade på datum/tidsstämpeln för posterna, inte systemtiden.

Därefter implementerar vi vår topologi med var och en av fönstertyperna. Den fullständiga koden kommer endast att ges i det första exemplet; för andra typer av fönster kommer ingenting att ändras förutom typen av fönsteroperation.

Sessionsfönster

Sessionsfönster skiljer sig mycket från alla andra typer av fönster. De begränsas inte så mycket av tid som av användarens aktivitet (eller aktiviteten hos den enhet som du vill spåra). Sessionsfönster avgränsas av perioder av inaktivitet.

Figur 5.12 illustrerar konceptet med sessionsfönster. Den mindre sessionen kommer att slås samman med sessionen till vänster. Och sessionen till höger kommer att vara separat eftersom den följer en lång period av inaktivitet. Sessionsfönster är baserade på användaraktivitet, men använd datum/tidsstämplar från poster för att avgöra vilken session posten tillhör.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"

Använda sessionsfönster för att spåra aktietransaktioner

Låt oss använda sessionsfönster för att fånga information om utbytestransaktioner. Implementeringen av sessionsfönster visas i Listing 5.5 (som finns i src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Du har redan sett de flesta operationerna i denna topologi, så det finns ingen anledning att titta på dem igen här. Men här finns också flera nya inslag som vi nu ska diskutera.

Varje groupBy-operation utför vanligtvis någon form av aggregeringsoperation (aggregation, sammanställning eller räkning). Du kan utföra antingen kumulativ aggregering med en löpande summa, eller fönstersammanställning, som tar hänsyn till poster inom ett angivet tidsfönster.

Koden i Listing 5.5 räknar antalet transaktioner inom sessionsfönster. I fig. 5.13 Dessa åtgärder analyseras steg för steg.

Genom att anropa windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) skapar vi ett sessionsfönster med ett inaktivitetsintervall på 20 sekunder och ett persistensintervall på 15 minuter. Ett inaktivt intervall på 20 sekunder betyder att applikationen kommer att inkludera alla poster som anländer inom 20 sekunder efter slutet eller början av den aktuella sessionen till den aktuella (aktiva) sessionen.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Därefter anger vi vilken aggregeringsoperation som måste utföras i sessionsfönstret - i det här fallet, räkna. Om en inkommande post faller utanför inaktivitetsfönstret (vardera sidan av datum-/tidsstämpeln), skapar programmet en ny session. Lagringsintervall innebär att en session upprätthålls under en viss tid och tillåter sena data som sträcker sig utöver sessionens inaktivitetsperiod men som fortfarande kan bifogas. Dessutom motsvarar början och slutet av den nya sessionen som är resultatet av sammanslagningen det tidigaste och senaste datum-/tidsstämpeln.

Låt oss titta på några poster från räknemetoden för att se hur sessioner fungerar (tabell 5.1).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
När poster anländer letar vi efter befintliga sessioner med samma nyckel, en sluttid som är mindre än aktuellt datum/tidsstämpel - inaktivitetsintervall och en starttid som är större än aktuellt datum/tidsstämpel + inaktivitetsintervall. Med hänsyn till detta, fyra poster från tabellen. 5.1 slås samman till en enda session enligt följande.

1. Post 1 kommer först, så starttiden är lika med sluttiden och är 00:00:00.

2. Därefter kommer ingång 2, och vi letar efter pass som slutar tidigast 23:59:55 och startar senast 00:00:35. Vi hittar rekord 1 och kombinerar pass 1 och 2. Vi tar starttiden för pass 1 (tidigare) och sluttiden för pass 2 (senare), så att vår nya session börjar kl 00:00:00 och slutar kl 00: 00:15.

3. Rekord 3 anländer, vi letar efter sessioner mellan 00:00:30 och 00:01:10 och hittar inga. Lägg till en andra session för nyckeln 123-345-654,FFBE, som börjar och slutar kl. 00:00:50.

4. Rekord 4 anländer och vi söker pass mellan 23:59:45 och 00:00:25. Den här gången hittas både session 1 och 2. Alla tre sessioner kombineras till en, med en starttid på 00:00:00 och en sluttid på 00:00:15.

Från det som beskrivs i det här avsnittet är det värt att komma ihåg följande viktiga nyanser:

  • sessioner är inte fönster med fast storlek. Längden på en session bestäms av aktiviteten inom en given tidsperiod;
  • Datum/tidsstämplarna i data bestämmer om händelsen faller inom en befintlig session eller under en viloperiod.

Härnäst kommer vi att diskutera nästa typ av fönster - "tumlande" fönster.

"Trumlande" fönster

Tumbling windows fångar händelser som faller inom en viss tidsperiod. Föreställ dig att du behöver fånga alla aktietransaktioner för ett visst företag var 20:e sekund, så att du samlar alla händelser under den tidsperioden. I slutet av 20-sekundersintervallet rullar fönstret över och flyttas till ett nytt 20-sekunders observationsintervall. Figur 5.14 illustrerar denna situation.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Som du kan se är alla händelser som tagits emot under de senaste 20 sekunderna med i fönstret. I slutet av denna tidsperiod skapas ett nytt fönster.

Lista 5.6 visar kod som visar användningen av tumblingsfönster för att fånga aktietransaktioner var 20:e sekund (finns i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Med denna lilla ändring av metodanropet TimeWindows.of kan du använda ett tumblingsfönster. Det här exemplet anropar inte till()-metoden, så standardretentionsintervallet på 24 timmar kommer att användas.

Äntligen är det dags att gå vidare till det sista av fönsteralternativen - "hoppande" fönster.

Skjutbara ("hoppande") fönster

Skjut-/hoppfönster liknar tumlande fönster, men med en liten skillnad. Skjutfönster vänta inte till slutet av tidsintervallet innan du skapar ett nytt fönster för att bearbeta senaste händelser. De startar nya beräkningar efter ett vänteintervall som är kortare än fönstrets varaktighet.

För att illustrera skillnaderna mellan tumlande och hoppande fönster, låt oss återgå till exemplet med att räkna börstransaktioner. Vårt mål är fortfarande att räkna antalet transaktioner, men vi vill inte vänta hela tiden innan vi uppdaterar räknaren. Istället kommer vi att uppdatera räknaren med kortare intervall. Till exempel kommer vi fortfarande att räkna antalet transaktioner var 20:e sekund, men uppdatera räknaren var 5:e sekund, som visas i Fig. 5.15. I det här fallet får vi tre resultatfönster med överlappande data.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Lista 5.7 visar koden för att definiera glidfönster (finns i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Ett tumblingsfönster kan konverteras till ett hoppfönster genom att lägga till ett anrop till advanceBy()-metoden. I exemplet som visas är sparintervallet 15 minuter.

Du såg i det här avsnittet hur man begränsar aggregeringsresultat till tidsfönster. I synnerhet vill jag att du kommer ihåg följande tre saker från det här avsnittet:

  • storleken på sessionsfönster begränsas inte av tidsperiod, utan av användaraktivitet;
  • "tumlande" fönster ger en översikt över händelser inom en given tidsperiod;
  • Varaktigheten för hoppfönster är fast, men de uppdateras ofta och kan innehålla överlappande poster i alla fönster.

Därefter kommer vi att lära oss hur man konverterar en KTable tillbaka till en KSream för en anslutning.

5.3.3. Ansluta KStream- och KTable-objekt

I kapitel 4 diskuterade vi att koppla två KStream-objekt. Nu måste vi lära oss hur man kopplar ihop KTable och KStream. Detta kan behövas av följande enkla anledning. KStream är en ström av poster, och KTable är en ström av postuppdateringar, men ibland kanske du vill lägga till ytterligare sammanhang till postströmmen med uppdateringar från KTable.

Låt oss ta data om antalet börstransaktioner och kombinera dem med börsnyheter för relevanta branscher. Här är vad du behöver göra för att uppnå detta med den kod du redan har.

  1. Konvertera ett KTable-objekt med data om antalet aktietransaktioner till en KStream, följt av att ersätta nyckeln med nyckeln som indikerar industrisektorn som motsvarar denna aktiesymbol.
  2. Skapa ett KTable-objekt som läser data från ett ämne med börsnyheter. Denna nya KT-tabell kommer att kategoriseras efter industrisektor.
  3. Anslut nyhetsuppdateringar med information om antalet börstransaktioner per bransch.

Låt oss nu se hur man genomför denna handlingsplan.

Konvertera KTable till KSream

För att konvertera KTable till KStream måste du göra följande.

  1. Anropa KTable.toStream()-metoden.
  2. Genom att anropa metoden KStream.map, ersätt nyckeln med industrinamnet och hämta sedan TransactionSummary-objektet från Windowed-instansen.

Vi kommer att koppla samman dessa operationer enligt följande (koden kan hittas i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Eftersom vi utför en KStream.map-operation, partitioneras den returnerade KStream-instansen automatiskt om när den används i en anslutning.

Vi har slutfört konverteringsprocessen, därefter måste vi skapa ett KTable-objekt för att läsa aktienyheter.

Skapande av KTable för aktienyheter

Lyckligtvis tar att skapa ett KTable-objekt bara en rad kod (koden finns i src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Det är värt att notera att inga Serde-objekt behöver anges, eftersom strängen Serdes används i inställningarna. Dessutom, genom att använda den FÖRSTA uppräkningen, fylls tabellen med poster i början.

Nu kan vi gå vidare till det sista steget - anslutning.

Koppla nyhetsuppdateringar med transaktionsräkningsdata

Att skapa en anslutning är inte svårt. Vi kommer att använda en vänsterkoppling om det inte finns några lagernyheter för den relevanta branschen (den nödvändiga koden finns i filen src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Denna leftJoin-operatör är ganska enkel. Till skillnad från joins i kapitel 4, används inte JoinWindow-metoden eftersom när man utför en KStream-KTable-join, finns det bara en post i KTablen för varje nyckel. En sådan anslutning är inte begränsad i tid: posten finns antingen i KT-tabellen eller saknas. Huvudslutsatsen: med hjälp av KTable-objekt kan du berika KStream med mindre ofta uppdaterade referensdata.

Nu ska vi titta på ett mer effektivt sätt att berika evenemang från KStream.

5.3.4. GlobalKTable-objekt

Som du kan se finns det ett behov av att berika händelseströmmar eller lägga till sammanhang till dem. I kapitel 4 såg du kopplingarna mellan två KStream-objekt, och i föregående avsnitt såg du kopplingen mellan en KStream och en KTable. I alla dessa fall är det nödvändigt att partitionera om dataströmmen när nycklarna mappas till en ny typ eller ett nytt värde. Ibland görs ompartitionering explicit, och ibland gör Kafka Streams det automatiskt. Ompartitionering är nödvändig eftersom nycklarna har ändrats och posterna måste hamna i nya sektioner, annars blir anslutningen omöjlig (detta diskuterades i kapitel 4, i avsnittet "Ompartitionering av data" i underavsnitt 4.2.4).

Ompartitionering har en kostnad

Ompartitionering kräver kostnader - ytterligare resurskostnader för att skapa mellanliggande ämnen, lagring av dubbletter av data i ett annat ämne; det innebär också ökad latens på grund av att skriva och läsa från detta ämne. Dessutom, om du behöver sammanfoga över mer än en aspekt eller dimension, måste du koppla samman sammanfogningarna, kartlägga posterna med nya nycklar och köra ompartitioneringsprocessen igen.

Ansluter till mindre datauppsättningar

I vissa fall är volymen referensdata som ska anslutas relativt liten, så fullständiga kopior av den kan enkelt passa lokalt på varje nod. För situationer som denna tillhandahåller Kafka Streams GlobalKTable-klassen.

GlobalKTable-instanser är unika eftersom applikationen replikerar all data till var och en av noderna. Och eftersom all data finns på varje nod, finns det inget behov av att partitionera händelseströmmen med referensdatanyckel så att den är tillgänglig för alla partitioner. Du kan också göra nyckellösa kopplingar med hjälp av GlobalKTable-objekt. Låt oss gå tillbaka till ett av de tidigare exemplen för att demonstrera denna funktion.

Ansluter KStream-objekt till GlobalKTable-objekt

I underavsnitt 5.3.2 utförde vi fönsteraggregation av bytestransaktioner av köpare. Resultaten av denna sammanställning såg ut ungefär så här:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Även om dessa resultat tjänade syftet, skulle det ha varit mer användbart om kundens namn och fullständiga företagsnamn också hade visats. För att lägga till kundnamn och företagsnamn kan du göra normala joins, men du måste göra två nyckelmappningar och ompartitionering. Med GlobalKTable kan du undvika kostnaderna för sådana operationer.

För att göra detta använder vi objektet countStream från Listing 5.11 (motsvarande kod finns i src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) och kopplar det till två GlobalKTable-objekt.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Vi har redan diskuterat det här tidigare, så jag ska inte upprepa det. Men jag noterar att koden i toStream().map-funktionen är abstraherad till ett funktionsobjekt istället för ett inlinet lambda-uttryck för läsbarhetens skull.

Nästa steg är att deklarera två instanser av GlobalKTable (koden som visas kan hittas i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"

Observera att ämnesnamn beskrivs med hjälp av uppräknade typer.

Nu när vi har alla komponenter klara återstår bara att skriva koden för anslutningen (som finns i filen src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
Även om det finns två kopplingar i den här koden, är de kedjade eftersom inget av deras resultat används separat. Resultaten visas i slutet av hela operationen.

När du kör ovanstående joinoperation får du resultat som detta:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Kärnan har inte förändrats, men dessa resultat ser mer tydliga ut.

Om du räknar ner till kapitel 4 har du redan sett flera typer av anslutningar i aktion. De är listade i tabellen. 5.2. Den här tabellen återspeglar anslutningsmöjligheterna från och med version 1.0.0 av Kafka Streams; Något kan ändras i framtida utgåvor.

Boken "Kafka Streams in Action. Applikationer och mikrotjänster för realtidsarbete"
För att avsluta saker och ting, låt oss sammanfatta grunderna: du kan ansluta händelseströmmar (KStream) och uppdatera strömmar (KTable) med hjälp av lokala tillstånd. Alternativt, om storleken på referensdata inte är för stor, kan du använda GlobalKTable-objektet. GlobalKTables replikerar alla partitioner till varje Kafka Streams applikationsnod, vilket säkerställer att all data är tillgänglig oavsett vilken partition nyckeln motsvarar.

Därefter kommer vi att se Kafka Streams-funktionen, tack vare vilken vi kan observera tillståndsförändringar utan att konsumera data från ett Kafka-ämne.

5.3.5. Frågabart tillstånd

Vi har redan utfört flera operationer som involverar tillstånd och matar alltid ut resultaten till konsolen (för utvecklingsändamål) eller skriver dem till ett ämne (för produktionsändamål). När du skriver resultat till ett ämne måste du använda en Kafka-konsument för att se dem.

Att läsa data från dessa ämnen kan betraktas som en typ av materialiserade åsikter. För våra syften kan vi använda definitionen av en materialiserad vy från Wikipedia: "... ett fysiskt databasobjekt som innehåller resultaten av en fråga. Det kan till exempel vara en lokal kopia av fjärrdata, eller en delmängd av raderna och/eller kolumnerna i en tabell eller sammanfogningsresultat, eller en sammanfattningstabell som erhållits genom aggregering” (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams låter dig också köra interaktiva frågor på statliga butiker, så att du direkt kan läsa dessa materialiserade vyer. Det är viktigt att notera att frågan till tillståndsarkivet är en skrivskyddad operation. Detta säkerställer att du inte behöver oroa dig för att oavsiktligt göra tillstånd inkonsekvent medan din applikation bearbetar data.

Möjligheten att direkt fråga tillståndsbutiker är viktig. Det betyder att du kan skapa instrumentpanelsapplikationer utan att först behöva hämta data från Kafka-konsumenten. Det ökar också applikationens effektivitet, på grund av att det inte finns något behov av att skriva data igen:

  • tack vare informationens lokalisering kan de snabbt nås;
  • dubbelarbete av data elimineras, eftersom det inte skrivs till extern lagring.

Det viktigaste jag vill att du ska komma ihåg är att du kan fråga tillstånd direkt från din ansökan. De möjligheter detta ger dig kan inte överskattas. Istället för att konsumera data från Kafka och lagra poster i en databas för applikationen, kan du fråga tillståndsbutiker med samma resultat. Direkta förfrågningar till statliga butiker betyder mindre kod (ingen konsument) och mindre programvara (inget behov av en databastabell för att lagra resultaten).

Vi har täckt en hel del av marken i det här kapitlet, så vi lämnar vår diskussion om interaktiva frågor mot statliga butiker för nu. Men oroa dig inte: i kapitel 9 skapar vi en enkel instrumentpanelapplikation med interaktiva frågor. Den kommer att använda några av exemplen från detta och tidigare kapitel för att demonstrera interaktiva frågor och hur du kan lägga till dem i Kafka Streams-applikationer.

Sammanfattning

  • KStream-objekt representerar strömmar av händelser, jämförbara med inlägg i en databas. KTable-objekt representerar uppdateringsströmmar, mer som uppdateringar av en databas. Storleken på KTable-objektet växer inte, gamla rekord ersätts med nya.
  • KTable-objekt krävs för aggregeringsoperationer.
  • Med hjälp av fönsteroperationer kan du dela upp aggregerad data i tidssegment.
  • Tack vare GlobalKTable-objekt kan du komma åt referensdata var som helst i applikationen, oavsett partitionering.
  • Anslutningar mellan KStream-, KTable- och GlobalKTable-objekt är möjliga.

Hittills har vi fokuserat på att bygga Kafka Streams-applikationer med KStream DSL på hög nivå. Även om tillvägagångssättet på hög nivå låter dig skapa snygga och koncisa program, är det en avvägning att använda den. Att arbeta med DSL KStream innebär att göra din kod kortare genom att minska graden av kontroll. I nästa kapitel kommer vi att titta på lågnivåhanterarnodens API och prova andra avvägningar. Programmen kommer att vara längre än de var tidigare, men vi kommer att kunna skapa nästan vilken hanterarnod som helst som vi kan behöva.

→ Mer information om boken finns på förlagets webbplats

→ För Habrozhiteli 25% rabatt med kupong - Kafka strömmar

→ Vid betalning för pappersversionen av boken skickas en elektronisk bok via e-post.

Källa: will.com

Lägg en kommentar