Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken" Hallo, inwoners van Khabro! Dit boek is geschikt voor elke ontwikkelaar die threadverwerking wil begrijpen. Als u gedistribueerde programmering begrijpt, kunt u Kafka en Kafka Streams beter begrijpen. Het zou leuk zijn om het Kafka-framework zelf te kennen, maar dit is niet nodig: ik zal je alles vertellen wat je nodig hebt. Ervaren Kafka-ontwikkelaars en beginners zullen in dit boek leren hoe ze interessante streamverwerkingstoepassingen kunnen maken met behulp van de Kafka Streams-bibliotheek. Gemiddelde en gevorderde Java-ontwikkelaars die al bekend zijn met concepten als serialisatie, zullen leren hun vaardigheden toe te passen om Kafka Streams-applicaties te maken. De broncode van het boek is geschreven in Java 8 en maakt veel gebruik van de Java 8 lambda-expressiesyntaxis, dus weten hoe je met lambda-functies moet werken (zelfs in een andere programmeertaal) zal van pas komen.

Uittreksel. 5.3. Aggregatie- en vensterbewerkingen

In dit gedeelte gaan we verder met het verkennen van de meest veelbelovende delen van Kafka Streams. Tot nu toe hebben we de volgende aspecten van Kafka Streams behandeld:

  • het creëren van een verwerkingstopologie;
  • status gebruiken in streaming-applicaties;
  • het uitvoeren van datastroomverbindingen;
  • verschillen tussen gebeurtenisstromen (KStream) en updatestromen (KTable).

In de volgende voorbeelden brengen we al deze elementen samen. Je leert ook over windowing, een andere geweldige functie van streaming-applicaties. Ons eerste voorbeeld zal een eenvoudige aggregatie zijn.

5.3.1. Aggregatie van de aandelenverkopen per bedrijfstak

Aggregatie en groepering zijn essentiële hulpmiddelen bij het werken met streaminggegevens. Onderzoek van individuele dossiers zoals deze worden ontvangen, is vaak onvoldoende. Om aanvullende informatie uit gegevens te halen, is het noodzakelijk deze te groeperen en combineren.

In dit voorbeeld kleedt u zich aan als een daghandelaar die het verkoopvolume van aandelen van bedrijven in verschillende sectoren moet volgen. Concreet bent u geïnteresseerd in de vijf bedrijven met de grootste aandelenverkoop in elke sector.

Een dergelijke aggregatie vereist de volgende verschillende stappen om de gegevens in de gewenste vorm te vertalen (in algemene termen gesproken).

  1. Creëer een op onderwerpen gebaseerde bron die onbewerkte informatie over aandelenhandel publiceert. We zullen een object van het type StockTransaction moeten toewijzen aan een object van het type ShareVolume. Het punt is dat het StockTransaction-object verkoopmetagegevens bevat, maar we hebben alleen gegevens nodig over het aantal verkochte aandelen.
  2. Groepeer ShareVolume-gegevens op aandelensymbool. Eenmaal gegroepeerd op symbool, kunt u deze gegevens samenvouwen tot subtotalen van de voorraadverkoopvolumes. Het is vermeldenswaard dat de methode KStream.groupBy een exemplaar van het type KGroupedStream retourneert. En u kunt een KTable-instantie verkrijgen door de methode KGroupedStream.reduce verder aan te roepen.

Wat is de KGroupedStream-interface

De methoden KStream.groupBy en KStream.groupByKey retourneren een exemplaar van KGroupedStream. KGroupedStream is een tussenweergave van een stroom gebeurtenissen na groepering op sleutels. Het is helemaal niet bedoeld om er direct mee te werken. In plaats daarvan wordt KGroupedStream gebruikt voor aggregatiebewerkingen, die altijd resulteren in een KTable. En aangezien het resultaat van aggregatiebewerkingen een KTable is en ze een statusopslag gebruiken, is het mogelijk dat niet alle updates als resultaat verder in de pijplijn worden verzonden.

De methode KTable.groupBy retourneert een vergelijkbare KGroupedTable - een tussenweergave van de stroom updates, gehergroepeerd op sleutel.

Laten we een korte pauze nemen en naar Fig. 5.9, waaruit blijkt wat we hebben bereikt. Deze topologie zou u al zeer bekend moeten zijn.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Laten we nu naar de code voor deze topologie kijken (deze kan worden gevonden in het bestand src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
De gegeven code onderscheidt zich door zijn beknoptheid en het grote aantal acties dat in verschillende regels wordt uitgevoerd. Mogelijk merkt u iets nieuws op in de eerste parameter van de builder.stream-methode: een waarde van het enum-type AutoOffsetReset.EARLIEST (er is ook een LATEST), ingesteld met behulp van de Consumed.withOffsetResetPolicy-methode. Dit opsommingstype kan worden gebruikt om een ​​offset-resetstrategie te specificeren voor elke KStream of KTable en heeft voorrang op de offset-resetoptie uit de configuratie.

GroupByKey en GroupBy

De KStream-interface heeft twee methoden voor het groeperen van records: GroupByKey en GroupBy. Beide retourneren een KGroupedTable, dus u vraagt ​​zich misschien af ​​wat het verschil tussen beide is en wanneer u welke moet gebruiken?

De GroupByKey-methode wordt gebruikt wanneer de sleutels in KStream al niet leeg zijn. En het allerbelangrijkste: de vlag “requires re-partitioning” is nooit ingesteld.

De GroupBy-methode gaat ervan uit dat u de groeperingssleutels hebt gewijzigd, zodat de herpartitievlag is ingesteld op waar. Het uitvoeren van joins, aggregaties, enz. na de GroupBy-methode zal resulteren in automatische herpartitionering.
Samenvatting: Waar mogelijk moet u GroupByKey gebruiken in plaats van GroupBy.

Het is duidelijk wat de methoden mapValues ​​en groupBy doen, dus laten we eens kijken naar de methode sum() (te vinden in src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
De methode ShareVolume.sum retourneert het lopende totaal van het voorraadverkoopvolume, en het resultaat van de hele reeks berekeningen is een KTable-object . Nu begrijp je de rol die KTable speelt. Wanneer ShareVolume-objecten arriveren, slaat het corresponderende KTable-object de laatste huidige update op. Het is belangrijk om te onthouden dat alle updates worden weerspiegeld in de vorige shareVolumeKTable, maar niet allemaal verder worden verzonden.

Vervolgens gebruiken we deze KTable om te aggregeren (op basis van het aantal verhandelde aandelen) om te komen tot de vijf bedrijven met de hoogste volumes verhandelde aandelen in elke sector. Onze acties in dit geval zullen vergelijkbaar zijn met die voor de eerste aggregatie.

  1. Voer nog een groupBy-bewerking uit om individuele ShareVolume-objecten per branche te groeperen.
  2. Begin met het samenvatten van ShareVolume-objecten. Deze keer is het aggregatieobject een prioriteitswachtrij met een vaste grootte. In deze wachtrij met een vaste omvang worden alleen de vijf bedrijven met de grootste aantallen verkochte aandelen behouden.
  3. Wijs de wachtrijen uit de vorige paragraaf toe aan een tekenreekswaarde en retourneer de top vijf van meest verhandelde aandelen, gerangschikt op nummer per sector.
  4. Schrijf de resultaten in tekenreeksvorm naar het onderwerp.

In afb. Figuur 5.10 toont de datastroomtopologiegrafiek. Zoals u kunt zien, is de tweede verwerkingsronde vrij eenvoudig.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Nu we een duidelijk begrip hebben van de structuur van deze tweede verwerkingsronde, kunnen we ons wenden tot de broncode ervan (je vindt deze in het bestand src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Deze initialisatiefunctie bevat een fixedQueue-variabele. Dit is een aangepast object dat een adapter is voor java.util.TreeSet dat wordt gebruikt om de top N resultaten bij te houden in aflopende volgorde van de verhandelde aandelen.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Je hebt de groupBy- en mapValues-aanroepen al gezien, dus daar gaan we niet op in (we roepen de methode KTable.toStream aan omdat de methode KTable.print verouderd is). Maar je hebt de KTable-versie van aggregate() nog niet gezien, dus we zullen daar wat tijd aan besteden.

Zoals u zich herinnert, is wat KTable anders maakt, dat records met dezelfde sleutels als updates worden beschouwd. KTable vervangt de oude invoer door een nieuwe. Aggregatie vindt op een vergelijkbare manier plaats: de nieuwste records met dezelfde sleutel worden geaggregeerd. Wanneer een record binnenkomt, wordt het toegevoegd aan de klasse-instantie FixedSizePriorityQueue met behulp van een opteller (tweede parameter in de aanroep van de aggregatiemethode), maar als er al een ander record bestaat met dezelfde sleutel, wordt het oude record verwijderd met behulp van een aftrekker (derde parameter in de geaggregeerde methodeaanroep).

Dit alles betekent dat onze aggregator, FixedSizePriorityQueue, niet alle waarden samenvoegt met één sleutel, maar een voortschrijdende som opslaat van de hoeveelheden van de N meest verhandelde soorten aandelen. Elke inkomende inzending bevat het totale aantal tot nu toe verkochte aandelen. KTable geeft u informatie over de aandelen van bedrijven die momenteel het meest worden verhandeld, zonder dat een voortschrijdende aggregatie van elke update nodig is.

We hebben twee belangrijke dingen geleerd:

  • groepeer waarden in KTable met een gemeenschappelijke sleutel;
  • voer nuttige bewerkingen uit, zoals samenvouwen en aggregatie, op deze gegroepeerde waarden.

Weten hoe u deze bewerkingen moet uitvoeren, is belangrijk om de betekenis te begrijpen van de gegevens die door een Kafka Streams-applicatie gaan en om te begrijpen welke informatie deze bevat.

We hebben ook enkele van de belangrijkste concepten die eerder in dit boek zijn besproken, samengebracht. In hoofdstuk 4 hebben we besproken hoe fouttolerante, lokale status belangrijk is voor een streamingtoepassing. Het eerste voorbeeld in dit hoofdstuk laat zien waarom de lokale staat zo belangrijk is: het geeft je de mogelijkheid om bij te houden welke informatie je al hebt gezien. Lokale toegang vermijdt netwerkvertragingen, waardoor de applicatie performanter en foutbestendiger wordt.

Wanneer u een samenvoeg- of aggregatiebewerking uitvoert, moet u de naam van het statusarchief opgeven. De samenvoegings- en aggregatiebewerkingen retourneren een KTable-instantie, en de KTable gebruikt statusopslag om oude resultaten te vervangen door nieuwe. Zoals u heeft gezien, worden niet alle updates in de pijplijn verzonden, en dit is belangrijk omdat aggregatiebewerkingen zijn ontworpen om samenvattende informatie te produceren. Als u de lokale status niet toepast, zal KTable alle aggregatie- en samenvoegingsresultaten doorsturen.

Vervolgens kijken we naar het uitvoeren van bewerkingen zoals aggregatie binnen een specifieke periode, de zogenaamde windowing-bewerkingen.

5.3.2. Vensterbewerkingen

In de vorige sectie hebben we glijdende convolutie en aggregatie geïntroduceerd. De applicatie voerde een continue optelling van het verkoopvolume van de aandelen uit, gevolgd door de aggregatie van de vijf meest verhandelde aandelen op de beurs.

Soms is een dergelijke continue aggregatie en bundeling van resultaten noodzakelijk. En soms hoeft u bewerkingen slechts gedurende een bepaalde periode uit te voeren. Bereken bijvoorbeeld hoeveel ruiltransacties er de afgelopen 10 minuten zijn uitgevoerd met aandelen van een bepaald bedrijf. Of hoeveel gebruikers de afgelopen 15 minuten op een nieuwe reclamebanner hebben geklikt. Een applicatie kan dergelijke bewerkingen meerdere keren uitvoeren, maar met resultaten die alleen van toepassing zijn op bepaalde tijdsperioden (tijdvensters).

Wisseltransacties per koper tellen

In het volgende voorbeeld volgen we aandelentransacties van meerdere handelaars, zowel grote organisaties als slimme individuele financiers.

Er zijn twee mogelijke redenen voor deze tracking. Eén daarvan is de noodzaak om te weten wat marktleiders kopen/verkopen. Als deze grote spelers en geavanceerde investeerders kansen zien, is het logisch om hun strategie te volgen. De tweede reden is de wens om eventuele tekenen van illegale handel met voorkennis op te sporen. Om dit te doen, moet u de correlatie analyseren tussen grote verkooppieken en belangrijke persberichten.

Een dergelijke tracking bestaat uit de volgende stappen:

  • een stream maken om te lezen vanuit het onderwerp aandelentransacties;
  • binnenkomende records groeperen op koper-ID en aandelensymbool. Het aanroepen van de methode groupBy retourneert een exemplaar van de klasse KGroupedStream;
  • De methode KGroupedStream.windowedBy retourneert een gegevensstroom die beperkt is tot een tijdvenster, waardoor aggregatie in vensters mogelijk is. Afhankelijk van het venstertype wordt een TimeWindowedKStream of een SessionWindowedKStream geretourneerd;
  • aantal transacties voor de aggregatiebewerking. De gegevensstroom in vensters bepaalt of bij deze telling rekening wordt gehouden met een bepaald record;
  • resultaten naar een onderwerp schrijven of deze tijdens de ontwikkeling naar de console uitvoeren.

De topologie van deze applicatie is eenvoudig, maar een duidelijk beeld ervan zou nuttig zijn. Laten we eens kijken naar afb. 5.11.

Vervolgens kijken we naar de functionaliteit van vensterbewerkingen en de bijbehorende code.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"

Venstertypes

Er zijn drie soorten vensters in Kafka Streams:

  • sessief;
  • “tuimelen” (tuimelen);
  • glijden/hoppen.

Welke u moet kiezen, hangt af van uw zakelijke vereisten. Tuimel- en springvensters zijn beperkt in de tijd, terwijl sessievensters worden beperkt door gebruikersactiviteit: de duur van de sessie(s) wordt uitsluitend bepaald door hoe actief de gebruiker is. Het belangrijkste om te onthouden is dat alle venstertypen gebaseerd zijn op de datum-/tijdstempels van de items, en niet op de systeemtijd.

Vervolgens implementeren we onze topologie met elk van de venstertypen. De volledige code wordt alleen in het eerste voorbeeld gegeven; voor andere typen vensters verandert er niets behalve het type vensterbewerking.

Sessievensters

Sessievensters verschillen sterk van alle andere typen vensters. Ze worden niet zozeer beperkt door de tijd als wel door de activiteit van de gebruiker (of de activiteit van de entiteit die u wilt volgen). Sessievensters worden begrensd door perioden van inactiviteit.

Figuur 5.12 illustreert het concept van sessievensters. De kleinere sessie wordt samengevoegd met de sessie aan de linkerkant. En de sessie aan de rechterkant zal apart zijn omdat deze volgt op een lange periode van inactiviteit. Sessievensters zijn gebaseerd op gebruikersactiviteit, maar gebruiken datum-/tijdstempels van vermeldingen om te bepalen tot welke sessie de vermelding behoort.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"

Sessievensters gebruiken om aandelentransacties bij te houden

Laten we sessievensters gebruiken om informatie over beurstransacties vast te leggen. De implementatie van sessievensters wordt getoond in Listing 5.5 (te vinden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
U heeft de meeste bewerkingen in deze topologie al gezien, dus het is niet nodig om ze hier nog eens te bekijken. Maar er zijn hier ook verschillende nieuwe elementen, die we nu zullen bespreken.

Elke groupBy-bewerking voert doorgaans een soort aggregatiebewerking uit (aggregatie, samentelling of telling). U kunt cumulatieve aggregatie uitvoeren met een lopend totaal, of vensteraggregatie, waarbij rekening wordt gehouden met records binnen een opgegeven tijdvenster.

De code in Listing 5.5 telt het aantal transacties binnen sessievensters. In afb. 5.13 Deze acties worden stap voor stap geanalyseerd.

Door windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) aan te roepen, creëren we een sessievenster met een inactiviteitsinterval van 20 seconden en een persistentie-interval van 15 minuten. Een inactiviteitsinterval van 20 seconden betekent dat de applicatie alle gegevens die binnen 20 seconden na het einde of begin van de huidige sessie binnenkomen, in de huidige (actieve) sessie zal opnemen.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Vervolgens specificeren we welke aggregatiebewerking moet worden uitgevoerd in het sessievenster - in dit geval tellen we. Als een inkomend item buiten het inactiviteitsvenster valt (beide kanten van de datum-/tijdstempel), maakt de applicatie een nieuwe sessie. Retentie-interval betekent dat een sessie gedurende een bepaalde tijd wordt gehandhaafd en laat gegevens toe die verder reiken dan de inactiviteitsperiode van de sessie, maar nog steeds kunnen worden bijgevoegd. Bovendien komen het begin en einde van de nieuwe sessie die voortvloeit uit de samenvoeging overeen met de vroegste en laatste datum-/tijdstempel.

Laten we een paar items uit de telmethode bekijken om te zien hoe sessies werken (tabel 5.1).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Wanneer er records binnenkomen, zoeken we naar bestaande sessies met dezelfde sleutel, een eindtijd kleiner dan de huidige datum/tijdstempel - inactiviteitsinterval, en een starttijd groter dan de huidige datum/tijdstempel + inactiviteitsinterval. Hiermee rekening houdend, vier gegevens uit de tabel. 5.1 worden als volgt samengevoegd tot één sessie.

1. Record 1 arriveert als eerste, dus de starttijd is gelijk aan de eindtijd en is 00:00:00.

2. Vervolgens arriveert item 2 en zoeken we naar sessies die niet eerder eindigen dan 23:59:55 en niet later beginnen dan 00:00:35. We vinden record 1 en combineren sessies 1 en 2. We nemen de starttijd van sessie 1 (eerder) en de eindtijd van sessie 2 (later), zodat onze nieuwe sessie begint om 00:00:00 en eindigt om 00:00 uur: 15:XNUMX.

3. Record 3 arriveert, we zoeken naar sessies tussen 00:00:30 en 00:01:10 en vinden er geen. Voeg een tweede sessie toe voor de sleutel 123-345-654,FFBE, beginnend en eindigend om 00:00:50.

4. Record 4 arriveert en we zoeken sessies tussen 23:59:45 en 00:00:25. Dit keer worden sessie 1 en 2 gevonden. Alle drie de sessies worden gecombineerd tot één, met een starttijd van 00:00:00 en een eindtijd van 00:00:15.

Van wat in deze sectie wordt beschreven, is het de moeite waard om de volgende belangrijke nuances te onthouden:

  • sessies zijn geen vensters met een vaste grootte. De duur van een sessie wordt bepaald door de activiteit binnen een bepaalde tijdsperiode;
  • De datum-/tijdstempels in de gegevens bepalen of de gebeurtenis binnen een bestaande sessie valt of tijdens een inactieve periode.

Vervolgens bespreken we het volgende type venster: "tuimelende" vensters.

"tuimelende" ramen

Tuimelvensters leggen gebeurtenissen vast die binnen een bepaalde periode vallen. Stel je voor dat je elke 20 seconden alle aandelentransacties van een bepaald bedrijf moet vastleggen, zodat je alle gebeurtenissen gedurende die periode verzamelt. Aan het einde van het interval van 20 seconden draait het venster om en gaat het naar een nieuw observatie-interval van 20 seconden. Figuur 5.14 illustreert deze situatie.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Zoals u kunt zien, worden alle gebeurtenissen die in de afgelopen 20 seconden zijn ontvangen, in het venster opgenomen. Aan het einde van deze periode wordt een nieuw venster aangemaakt.

Listing 5.6 toont code die het gebruik demonstreert van tuimelende vensters om elke 20 seconden aandelentransacties vast te leggen (te vinden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Met deze kleine wijziging in de methodeaanroep TimeWindows.of kunt u een tuimelend venster gebruiken. In dit voorbeeld wordt de methode until() niet aangeroepen, dus wordt het standaard bewaarinterval van 24 uur gebruikt.

Eindelijk is het tijd om verder te gaan naar de laatste vensteropties: "springende" vensters.

Schuiframen ("springende")

Schuif-/springramen zijn vergelijkbaar met kantelramen, maar met een klein verschil. Schuifvensters wachten niet tot het einde van het tijdsinterval voordat ze een nieuw venster maken om recente gebeurtenissen te verwerken. Ze starten nieuwe berekeningen na een wachtinterval dat korter is dan de vensterduur.

Laten we, om de verschillen tussen tuimelende en springende vensters te illustreren, terugkeren naar het voorbeeld van het tellen van beurstransacties. Ons doel is nog steeds om het aantal transacties te tellen, maar we willen niet de hele tijd wachten voordat we de teller bijwerken. In plaats daarvan zullen we de teller met kortere tussenpozen bijwerken. We tellen bijvoorbeeld nog steeds het aantal transacties elke 20 seconden, maar werken de teller elke 5 seconden bij, zoals weergegeven in figuur 5.15. XNUMX. In dit geval krijgen we drie resultaatvensters met overlappende gegevens.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Listing 5.7 toont de code voor het definiëren van schuifvensters (te vinden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Een tuimelvenster kan worden geconverteerd naar een huppelvenster door een aanroep toe te voegen aan de methode AdvanceBy(). In het getoonde voorbeeld bedraagt ​​het spaarinterval 15 minuten.

In deze sectie hebt u gezien hoe u de aggregatieresultaten kunt beperken tot tijdvensters. Ik wil vooral dat je de volgende drie dingen uit dit gedeelte onthoudt:

  • de grootte van sessievensters wordt niet beperkt door de tijdsperiode, maar door gebruikersactiviteit;
  • “tuimelende” vensters bieden een overzicht van gebeurtenissen binnen een bepaalde periode;
  • De duur van het springen tussen vensters ligt vast, maar ze worden regelmatig bijgewerkt en kunnen overlappende vermeldingen in alle vensters bevatten.

Vervolgens leren we hoe u een KTable terug naar een KStream kunt converteren voor een verbinding.

5.3.3. KStream- en KTable-objecten verbinden

In Hoofdstuk 4 hebben we het verbinden van twee KStream-objecten besproken. Nu moeten we leren hoe we KTable en KStream kunnen verbinden. Dit kan om de volgende eenvoudige reden nodig zijn. KStream is een stroom records en KTable is een stroom recordupdates, maar soms wilt u misschien extra context toevoegen aan de recordstroom met behulp van updates van KTable.

Laten we gegevens over het aantal beurstransacties nemen en deze combineren met beursnieuws voor de relevante sectoren. Dit is wat u moet doen om dit te bereiken, gegeven de code die u al heeft.

  1. Converteer een KTable-object met gegevens over het aantal aandelentransacties naar een KStream, gevolgd door het vervangen van de sleutel door de sleutel die de industriesector aangeeft die overeenkomt met dit aandelensymbool.
  2. Maak een KTable-object dat gegevens leest uit een onderwerp met beursnieuws. Deze nieuwe KTable zal worden gecategoriseerd per industriële sector.
  3. Verbind nieuwsupdates met informatie over het aantal beurstransacties per branche.

Laten we nu eens kijken hoe we dit actieplan kunnen implementeren.

Converteer KTable naar KStream

Om KTable naar KStream te converteren moet u het volgende doen.

  1. Roep de methode KTable.toStream() aan.
  2. Door de methode KStream.map aan te roepen, vervangt u de sleutel door de branchenaam en haalt u vervolgens het TransactionSummary-object op uit de Windowed-instantie.

We zullen deze bewerkingen als volgt aan elkaar koppelen (de code kan worden gevonden in het bestand src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Omdat we een KStream.map-bewerking uitvoeren, wordt het geretourneerde KStream-exemplaar automatisch opnieuw gepartitioneerd wanneer het in een verbinding wordt gebruikt.

We hebben het conversieproces voltooid. Vervolgens moeten we een KTable-object maken voor het lezen van beursnieuws.

Creatie van KTable voor beursnieuws

Gelukkig kost het maken van een KTable-object slechts één regel code (de code kan worden gevonden in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Het is vermeldenswaard dat er geen Serde-objecten hoeven te worden opgegeven, omdat string Serdes in de instellingen wordt gebruikt. Door de EARLIEST-opsomming te gebruiken, wordt de tabel bovendien helemaal aan het begin gevuld met records.

Nu kunnen we verder gaan met de laatste stap: verbinding maken.

Nieuwsupdates koppelen aan gegevens over het aantal transacties

Een verbinding tot stand brengen is niet moeilijk. We zullen een left join gebruiken als er geen beursnieuws is voor de betreffende branche (de benodigde code is te vinden in het bestand src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Deze leftJoin-operator is vrij eenvoudig. In tegenstelling tot de joins in Hoofdstuk 4 wordt de methode JoinWindow niet gebruikt omdat er bij het uitvoeren van een KStream-KTable-join slechts één item in KTable is voor elke sleutel. Een dergelijke verbinding is niet beperkt in de tijd: het record bevindt zich in de KTable of is afwezig. De belangrijkste conclusie: door gebruik te maken van KTable-objecten kunt u KStream verrijken met minder vaak bijgewerkte referentiegegevens.

Nu gaan we kijken naar een efficiëntere manier om gebeurtenissen van KStream te verrijken.

5.3.4. GlobalKTable-objecten

Zoals je kunt zien, is er behoefte aan het verrijken van evenementenstromen of het toevoegen van context. In hoofdstuk 4 zag u de verbindingen tussen twee KStream-objecten, en in de vorige sectie zag u de verbinding tussen een KStream en een KTable. In al deze gevallen is het noodzakelijk om de gegevensstroom opnieuw te verdelen bij het toewijzen van de sleutels aan een nieuw type of nieuwe waarde. Soms gebeurt het herpartitioneren expliciet, en soms doet Kafka Streams dit automatisch. Opnieuw partitioneren is nodig omdat de sleutels zijn gewijzigd en de records in nieuwe secties moeten terechtkomen, anders is de verbinding onmogelijk (dit is besproken in hoofdstuk 4, in de paragraaf “Opnieuw partitioneren van gegevens” in paragraaf 4.2.4).

Opnieuw partitioneren brengt kosten met zich mee

Voor het opnieuw partitioneren zijn kosten nodig: extra resourcekosten voor het maken van tussenliggende onderwerpen, het opslaan van dubbele gegevens in een ander onderwerp; het betekent ook een verhoogde latentie als gevolg van het schrijven en lezen van dit onderwerp. Als u bovendien meerdere aspecten of dimensies wilt samenvoegen, moet u de joins aan elkaar koppelen, de records toewijzen met nieuwe sleutels en het herpartitioneringsproces opnieuw uitvoeren.

Verbinding maken met kleinere datasets

In sommige gevallen is het volume aan referentiegegevens dat moet worden verbonden relatief klein, zodat volledige kopieën ervan gemakkelijk lokaal op elk knooppunt kunnen passen. Voor dit soort situaties biedt Kafka Streams de klasse GlobalKTable.

GlobalKTable-instanties zijn uniek omdat de applicatie alle gegevens naar elk van de knooppunten repliceert. En aangezien alle gegevens op elk knooppunt aanwezig zijn, is het niet nodig de gebeurtenissenstroom te verdelen op basis van referentiegegevenssleutels, zodat deze beschikbaar zijn voor alle partities. U kunt ook sleutelloze joins maken met behulp van GlobalKTable-objecten. Laten we teruggaan naar een van de vorige voorbeelden om deze functie te demonstreren.

KStream-objecten verbinden met GlobalKTable-objecten

In paragraaf 5.3.2 hebben we vensteraggregatie van valutatransacties door kopers uitgevoerd. De resultaten van deze aggregatie zagen er ongeveer zo uit:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Hoewel deze resultaten het doel dienden, zou het nuttiger zijn geweest als ook de naam van de klant en de volledige bedrijfsnaam waren weergegeven. Om de klantnaam en de bedrijfsnaam toe te voegen, kunt u normale joins uitvoeren, maar dan moet u twee sleuteltoewijzingen uitvoeren en opnieuw partitioneren. Met GlobalKTable kunt u de kosten van dergelijke operaties vermijden.

Om dit te doen, gebruiken we het object countStream uit Listing 5.11 (de corresponderende code kun je vinden in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) en verbinden dit met twee GlobalKTable-objecten.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
We hebben dit al eerder besproken, dus ik zal het niet herhalen. Maar ik merk op dat de code in de functie toStream().map omwille van de leesbaarheid is geabstraheerd tot een functieobject in plaats van een inline lambda-expressie.

De volgende stap is het declareren van twee exemplaren van GlobalKTable (de weergegeven code kan worden gevonden in het bestand src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lijst 5.12).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"

Houd er rekening mee dat onderwerpnamen worden beschreven met behulp van opgesomde typen.

Nu we alle componenten gereed hebben, hoeft we alleen nog maar de code voor de verbinding te schrijven (die te vinden is in het bestand src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Hoewel deze code twee joins bevat, zijn ze aan elkaar gekoppeld omdat geen van de resultaten afzonderlijk wordt gebruikt. De resultaten worden aan het einde van de gehele operatie weergegeven.

Wanneer u de bovenstaande join-bewerking uitvoert, krijgt u de volgende resultaten:

{customer='Barney, Smith' company="Exxon", transactions= 17}

De essentie is niet veranderd, maar deze resultaten zien er duidelijker uit.

Als je aftelt naar hoofdstuk 4, heb je al verschillende soorten verbindingen in actie gezien. Ze staan ​​vermeld in de tabel. 5.2. Deze tabel weerspiegelt de connectiviteitsmogelijkheden vanaf versie 1.0.0 van Kafka Streams; Er kan iets veranderen in toekomstige releases.

Het boek "Kafka Streams in actie. Applicaties en microservices voor real-time werken"
Laten we, om de zaken af ​​te ronden, de basis samenvatten: u kunt gebeurtenisstromen (KStream) verbinden en stromen bijwerken (KTable) met behulp van de lokale status. Als alternatief kunt u, als de omvang van de referentiegegevens niet te groot is, het GlobalKTable-object gebruiken. GlobalKTables repliceert alle partities naar elk Kafka Streams-applicatieknooppunt, zodat alle gegevens beschikbaar zijn, ongeacht met welke partitie de sleutel overeenkomt.

Vervolgens zullen we de Kafka Streams-functie zien, waardoor we statusveranderingen kunnen observeren zonder gegevens van een Kafka-onderwerp te verbruiken.

5.3.5. Opvraagbare staat

We hebben al verschillende bewerkingen met status uitgevoerd en voeren de resultaten altijd uit naar de console (voor ontwikkelingsdoeleinden) of schrijven ze naar een onderwerp (voor productiedoeleinden). Wanneer u resultaten naar een onderwerp schrijft, moet u een Kafka-consument gebruiken om ze te bekijken.

Het lezen van gegevens over deze onderwerpen kan worden beschouwd als een soort gematerialiseerde weergaven. Voor onze doeleinden kunnen we de definitie van een gematerialiseerde weergave van Wikipedia gebruiken: “...een fysiek databaseobject dat de resultaten van een zoekopdracht bevat. Het kan bijvoorbeeld een lokale kopie zijn van gegevens op afstand, of een subset van de rijen en/of kolommen van een tabel of samenvoegingsresultaten, of een samenvattende tabel verkregen door aggregatie” (https://en.wikipedia.org/wiki /Gematerialiseerde_weergave).

Met Kafka Streams kunt u ook interactieve query's uitvoeren op staatswinkels, zodat u deze gerealiseerde weergaven direct kunt lezen. Het is belangrijk op te merken dat de query naar het statusarchief een alleen-lezen-bewerking is. Dit zorgt ervoor dat u zich geen zorgen hoeft te maken dat de status per ongeluk inconsistent wordt terwijl uw toepassing gegevens verwerkt.

De mogelijkheid om rechtstreeks staatswinkels te bevragen is belangrijk. Dit betekent dat u dashboardapplicaties kunt maken zonder dat u eerst gegevens bij de Kafka-consument hoeft op te halen. Het verhoogt ook de efficiëntie van de applicatie, omdat het niet nodig is om opnieuw gegevens te schrijven:

  • dankzij de locatie van de gegevens zijn ze snel toegankelijk;
  • duplicatie van gegevens wordt geëlimineerd, omdat deze niet naar externe opslag worden geschreven.

Het belangrijkste dat u moet onthouden, is dat u rechtstreeks vanuit uw toepassing de status kunt opvragen. De kansen die dit u biedt, kunnen niet genoeg worden benadrukt. In plaats van gegevens uit Kafka te gebruiken en records in een database voor de toepassing op te slaan, kunt u met hetzelfde resultaat query's uitvoeren op staatsarchieven. Directe zoekopdrachten naar staatswinkels betekenen minder code (geen consument) en minder software (geen databasetabel nodig om de resultaten op te slaan).

We hebben in dit hoofdstuk al heel wat onderwerpen besproken, dus laten we onze bespreking van interactieve zoekopdrachten tegen staatswinkels voorlopig achterwege. Maar maak je geen zorgen: in hoofdstuk 9 maken we een eenvoudige dashboardapplicatie met interactieve queries. Er zullen enkele voorbeelden uit dit en voorgaande hoofdstukken worden gebruikt om interactieve query's te demonstreren en hoe u deze kunt toevoegen aan Kafka Streams-toepassingen.

Beknopt

  • KStream-objecten vertegenwoordigen stromen van gebeurtenissen, vergelijkbaar met invoegingen in een database. KTable-objecten vertegenwoordigen updatestromen, meer zoals updates voor een database. De grootte van het KTable-object groeit niet, oude records worden vervangen door nieuwe.
  • KTable-objecten zijn vereist voor aggregatiebewerkingen.
  • Met behulp van vensterbewerkingen kunt u geaggregeerde gegevens opsplitsen in tijdsbuckets.
  • Dankzij GlobalKTable-objecten heeft u overal in de applicatie toegang tot referentiegegevens, ongeacht de partitie.
  • Verbindingen tussen KStream-, KTable- en GlobalKTable-objecten zijn mogelijk.

Tot nu toe hebben we ons geconcentreerd op het bouwen van Kafka Streams-applicaties met behulp van het hoogwaardige KStream DSL. Hoewel je met de aanpak op hoog niveau nette en beknopte programma's kunt maken, is het gebruik ervan een afweging. Werken met DSL KStream betekent het vergroten van de beknoptheid van uw code door de mate van controle te verminderen. In het volgende hoofdstuk bekijken we de low-level handlernode-API en proberen we andere compromissen. De programma's zullen langer zijn dan voorheen, maar we zullen vrijwel elk handlerknooppunt kunnen creëren dat we nodig hebben.

→ Meer details over het boek zijn te vinden op website van de uitgever

→ Voor Habrozhiteli 25% korting met coupon - Kafka-stromen

→ Bij betaling voor de papieren versie van het boek wordt een elektronisch boek per e-mail verzonden.

Bron: www.habr.com

Voeg een reactie