D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht" Moien, Khabro Awunner! Dëst Buch ass gëeegent fir all Entwéckler deen d'Threadveraarbechtung wëll verstoen. Verdeelt Programméierung verstoen hëlleft Iech besser Kafka a Kafka Streams ze verstoen. Et wier flott de Kafka Kader selwer ze kennen, awer dëst ass net néideg: Ech soen Iech alles wat Dir braucht. Erlieft Kafka Entwéckler an Ufänger léiere wéi interessant Streamveraarbechtungsapplikatioune mat der Kafka Streams Bibliothéik an dësem Buch erstallt ginn. Zwëschen an fortgeschratt Java Entwéckler scho vertraut mat Konzepter wéi Serialiséierung léieren hir Fäegkeeten anzesetzen fir Kafka Streams Uwendungen ze kreéieren. De Quellcode vum Buch ass am Java 8 geschriwwen a mécht bedeitend Notzung vun der Java 8 Lambda Ausdrock Syntax, also wësse wéi een mat Lambda Funktiounen schafft (och an enger anerer Programméiersprooch) wäert praktesch kommen.

Auszuch. 5.3. Aggregatioun an Fënsteren Operatiounen

An dëser Sektioun wäerte mir weidergoen fir déi villverspriechendst Deeler vu Kafka Streams ze entdecken. Bis elo hu mir déi folgend Aspekter vu Kafka Streams ofgedeckt:

  • eng Veraarbechtungstopologie erstellen;
  • benotzt Staat an Streaming Uwendungen;
  • Leeschtung vun Daten Stream Verbindungen;
  • Ënnerscheeder tëscht Event Streams (KStream) an Update Streams (KTable).

An de folgende Beispiller wäerte mir all dës Elementer zesumme bréngen. Dir léiert och iwwer Windowsing, eng aner super Feature vu Streaming Uwendungen. Eist éischt Beispill wäert eng einfach Aggregatioun sinn.

5.3.1. Aggregatioun vun Stock Verkaf vun Industrie Secteur

Aggregatioun a Gruppéierung si vital Tools wann Dir mat Streaming Daten schafft. D'Untersuchung vun eenzelne Rekorder wéi se opgeholl ginn ass dacks net genuch. Fir zousätzlech Informatioun aus Daten ze extrahieren, ass et néideg ze gruppéieren an ze kombinéieren.

An dësem Beispill setzt Dir de Kostüm vun engem Dag Händler un, deen de Verkafsvolumen vun Aktien vu Firmen a verschiddenen Industrien verfollege muss. Speziell sidd Dir interesséiert an de fënnef Firmen mat de gréissten Aktieverkaaf an all Industrie.

Esou Aggregatioun erfuerdert déi folgend e puer Schrëtt fir d'Donnéeën an déi gewënscht Form ze iwwersetzen (am allgemenge Begrëffer).

  1. Erstellt eng Thema-baséiert Quell déi raw Aktiehandelsinformatioun publizéiert. Mir mussen en Objet vum Typ StockTransaction op en Objet vum Typ ShareVolume mapen. De Punkt ass datt den StockTransaction Objet Verkafsmetadaten enthält, awer mir brauchen nëmmen Daten iwwer d'Zuel vun den Aktien déi verkaf ginn.
  2. Group ShareVolume Daten duerch Aktie Symbol. Eemol op Symbol gruppéiert, kënnt Dir dës Donnéeën an Subtotalen vun Aktieverkaafsvolumen zesummeklappen. Et ass derwäert ze notéieren datt d'KStream.groupBy Method eng Instanz vum Typ KGroupedStream zréckkënnt. An Dir kënnt eng KTable Instanz kréien andeems Dir d'KGroupedStream.reduce Method weider rufft.

Wat ass de KGroupedStream Interface

D'KStream.groupBy an KStream.groupByKey Methoden ginn eng Instanz vu KGroupedStream zréck. KGroupedStream ass eng Zwëschenvertriedung vun engem Stroum vun Eventer no der Gruppéierung vu Schlësselen. Et ass guer net geduecht fir direkt mat him ze schaffen. Amplaz gëtt KGroupedStream fir Aggregatiounsoperatiounen benotzt, déi ëmmer zu engem KTable resultéieren. A well d'Resultat vun den Aggregatiounsoperatiounen e KTable ass a si benotzen e Staatsgeschäft, ass et méiglech datt net all Updates als Resultat méi wäit an der Pipeline geschéckt ginn.

D'KTable.groupBy Method gëtt eng ähnlech KGroupedTable zréck - eng Zwëschenvertriedung vum Stream vun Updates, regruppéiert duerch Schlëssel.

Loosst eis eng kuerz Paus maachen a kucken op Fig. 5.9, wat weist wat mir erreecht hunn. Dës Topologie sollt Iech scho ganz vertraut sinn.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Loosst eis elo de Code fir dës Topologie kucken (et kann an der Datei fonnt ginn src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
De gegebene Code ënnerscheet sech duerch seng Kuerzegkeet an de grousse Volumen vun Aktiounen, déi a verschiddene Linnen ausgefouert goufen. Dir kënnt eppes Neies am éischte Parameter vun der Builder.stream Method bemierken: e Wäert vun der Enum-Typ AutoOffsetReset.EARLIEST (et gëtt och e LATEST), gesat mat der Method Consumed.withOffsetResetPolicy. Dësen Opzielungstyp kann benotzt ginn fir eng Offset-Reset-Strategie fir all KStream oder KTable ze spezifizéieren an huet Virrang iwwer d'Offset-Reset-Optioun vun der Konfiguratioun.

GroupByKey an GroupBy

D'KStream Interface huet zwou Methoden fir records ze gruppéieren: GroupByKey a GroupBy. Béid ginn e KGroupedTable zréck, sou datt Dir Iech vläicht froe wat den Ënnerscheed tëscht hinnen ass a wéini een ze benotzen?

D'GroupByKey Method gëtt benotzt wann d'Schlësselen am KStream scho net eidel sinn. A virun allem ass de Fändel "re-partitioning erfuerdert" ni gesat.

D'GroupBy-Methode gëtt ugeholl datt Dir d'Gruppéierungsschlëssel geännert hutt, sou datt de Repartitiounsfändel op richteg gesat gëtt. Ausféierung vun Joints, Aggregatiounen, etc.
Resumé: Wann ëmmer méiglech, sollt Dir GroupByKey benotzen anstatt GroupBy.

Et ass kloer wat d'mapValues ​​an groupBy Methoden maachen, also loosst eis d'Summ () Method kucken (fonnt an src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
D'ShareVolume.sum Method gëtt de lafenden Total vum Aktieverkaafsvolumen zréck, an d'Resultat vun der ganzer Kette vu Berechnungen ass e KTable Objet . Elo verstitt Dir d'Roll KTable spillt. Wann ShareVolume Objete ukommen, späichert de entspriechende KTable-Objet déi lescht aktuell Aktualiséierung. Et ass wichteg ze erënneren datt all Updates am virege ShareVolumeKTable reflektéiert ginn, awer net all gi weider geschéckt.

Mir benotzen dann dëse KTable fir ze aggregéieren (no Unzuel vun den gehandelte Aktien) fir bei de fënnef Firmen ze kommen mat den héchste Volumen vun Aktien an all Industrie gehandelt. Eis Handlungen an dësem Fall wäerten ähnlech sinn wéi déi fir déi éischt Aggregatioun.

  1. Maacht eng aner GroupBy Operatioun fir eenzel ShareVolume Objekter no Industrie ze gruppéieren.
  2. Start ShareVolume Objekter ze resuméieren. Dës Kéier ass den Aggregatiounsobjekt eng fix Gréisst Prioritéit Schlaang. An dëser fixer Gréisst Schlaang sinn nëmmen déi fënnef Firmen mat de gréisste Quantitéiten un verkaafte Aktien zréckbehalen.
  3. Kaart d'Schlaangen aus dem viregte Paragraf op e Stringwäert a gitt déi Top fënnef am meeschte gehandelte Aktien no Zuel no Industrie zréck.
  4. Schreift d'Resultater a Stringform zum Thema.

An Fig. Figur 5.10 weist d'Date Flux Topologie Graf. Wéi Dir kënnt gesinn, ass déi zweet Ronn vun der Veraarbechtung ganz einfach.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Elo datt mir e kloert Verständnis vun der Struktur vun dëser zweeter Veraarbechtungsronn hunn, kënne mir op säi Quellcode wenden (Dir fannt et an der Datei src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Dësen Initialisator enthält eng fix Queue Variabel. Dëst ass e personaliséierten Objet deen en Adapter fir java.util.TreeSet ass, dee benotzt gëtt fir déi Top N Resultater an ofstigend Uerdnung vun den gehandelte Aktien ze verfolgen.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Dir hutt d'groupBy a mapValues ​​Uruff scho gesinn, also wäerte mir net an déi goen (mir ruffen d'KTable.toStream Method well d'KTable.print Method ofgeschaaft ass). Awer Dir hutt d'KTable Versioun vun aggregate () nach net gesinn, also wäerte mir e bëssen Zäit verbréngen doriwwer ze diskutéieren.

Wéi Dir Iech erënnert, wat KTable anescht mécht, ass datt records mat de selwechte Schlësselen als Updates ugesi ginn. KTable ersetzt déi al Entrée mat engem neien. Aggregatioun geschitt op eng ähnlech Manéier: déi lescht records mat deemselwechte Schlëssel ginn aggregéiert. Wann e Rekord ukomm ass, gëtt en an d'FixedSizePriorityQueue Klasse Instanz mat Hëllef vun engem Adder bäigefüügt (zweet Parameter am aggregéierte Method Uruff), awer wann en anere Rekord scho mam selwechte Schlëssel existéiert, da gëtt den alen Rekord mat engem Subtraktor geläscht (drëtte Parameter an den aggregéierte Method Opruff).

Dëst alles bedeit datt eisen Aggregator, FixedSizePriorityQueue, net all Wäerter mat engem Schlëssel aggregéiert, mee späichert eng bewegend Zomm vun de Quantitéite vun den N am meeschte gehandelten Aarte vu Aktien. All erakommen Entrée enthält d'total Unzuel vun Aktien bis elo verkaaft. KTable gëtt Iech Informatioun iwwer wéi eng Firmen Aktien am Moment am meeschte gehandelt sinn, ouni Rolling Aggregatioun vun all Update ze erfuerderen.

Mir hunn geléiert zwee wichteg Saachen ze maachen:

  • Grupp Wäerter am KTable mat engem gemeinsame Schlëssel;
  • Leeschtunge nëtzlech Operatiounen wéi Rollup an Aggregatioun op dës gruppéiert Wäerter.

Wësse wéi een dës Operatiounen ausféiert ass wichteg fir d'Bedeitung vun den Donnéeën ze verstoen, déi duerch eng Kafka Streams Applikatioun beweegen an ze verstoen wéi eng Informatioun et bréngt.

Mir hunn och e puer vun de Schlësselkonzepter zesummebruecht, déi virdru an dësem Buch diskutéiert goufen. Am Kapitel 4 hu mir diskutéiert wéi Feeler-tolerant, lokal Staat wichteg ass fir eng Streaming Applikatioun. Dat éischt Beispill an dësem Kapitel huet bewisen firwat de lokale Staat sou wichteg ass - et erlaabt Iech ze verfollegen wéi eng Informatioun Dir scho gesinn hutt. Lokalen Zougang vermeit Netzwierkverzögerungen, wat d'Applikatioun méi performant a Feelerbeständeg mécht.

Wann Dir all Rollup oder Aggregatiounsoperatioun ausféiert, musst Dir den Numm vum Staatsgeschäft uginn. D'Rollup- an Aggregatiounsoperatioune ginn eng KTable Instanz zréck, an de KTable benotzt Staatsspeicherung fir al Resultater mat neien ze ersetzen. Wéi Dir gesi hutt, ginn net all Updates an der Pipeline erof geschéckt, an dëst ass wichteg well Aggregatiounsoperatioune sinn entwéckelt fir Resuméinformatioun ze produzéieren. Wann Dir de lokale Staat net applizéiert, wäert KTable all Aggregatiouns- a Rollup Resultater weiderginn.

Als nächst wäerte mir Operatiounen ausféieren wéi Aggregatioun bannent enger spezifescher Zäit - sougenannte Fënsteroperatiounen.

5.3.2. Fënster Operatiounen

An der viregter Sektioun hu mir d'Rutschkonvolutioun an d'Aggregatioun agefouert. D'Applikatioun huet eng kontinuéierlech Roll-up vum Aktieverkaafsvolumen gemaach, gefollegt vun der Aggregatioun vun de fënnef meescht gehandelte Aktien um Austausch.

Heiansdo ass sou eng kontinuéierlech Aggregatioun a Roll-up vun de Resultater néideg. An heiansdo musst Dir Operatiounen nëmmen iwwer eng bestëmmten Zäit maachen. Zum Beispill, berechent wéi vill Austauschtransaktiounen mat Aktien vun enger bestëmmter Firma an de leschten 10 Minutten gemaach goufen. Oder wéi vill Benotzer an de leschten 15 Minutten op en neie Reklammbanner geklickt hunn. Eng Applikatioun kann esou Operatiounen e puer Mol ausféieren, awer mat Resultater déi nëmmen op spezifizéierte Perioden (Zäitfenster) gëllen.

Den Austauschtransaktioune vum Keefer zielen

Am nächste Beispill verfollege mir Aktietransaktiounen iwwer verschidde Händler - entweder grouss Organisatiounen oder intelligent individuell Finanzéierer.

Et ginn zwee méiglech Grënn fir dës Tracking. Ee vun hinnen ass de Besoin ze wëssen wat Maartleit kafen / verkafen. Wann dës grouss Spiller an sophistikéiert Investisseuren Chance gesinn, mécht et Sënn hir Strategie ze verfollegen. Den zweete Grond ass de Wonsch all méiglech Unzeeche vun illegalen Insiderhandel ze gesinn. Fir dëst ze maachen, musst Dir d'Korrelatioun vu grousse Verkafsspikes mat wichtege Pressematdeelungen analyséieren.

Esou Tracking besteet aus de folgende Schrëtt:

  • e Stroum erstellen fir aus dem Thema Aktietransaktiounen ze liesen;
  • gruppéiere Entréeën records duerch Keefer ID an Aktie Symbol. D'Method groupBy ruffen gëtt eng Instanz vun der KGroupedStream Klass zréck;
  • D'KGroupedStream.windowedBy Method gëtt en Datestroum zréck, limitéiert op eng Zäitfenster, déi d'Fënstere Aggregatioun erlaabt. Ofhängeg vun der Fënstertyp gëtt entweder e TimeWindowedKStream oder e SessionWindowedKStream zréckginn;
  • Transaktiounszuel fir d'Aggregatiounsoperatioun. De windowed Daten Flux bestëmmt ob e bestëmmte Rekord an dësem Grof geholl gëtt;
  • Schreift d'Resultater op en Thema oder gitt se op d'Konsole wärend der Entwécklung.

D'Topologie vun dëser Applikatioun ass einfach, awer e kloert Bild dovun wier hëllefräich. Loosst eis e Bléck op Fig. 5.11.

Als nächst wäerte mir d'Funktionalitéit vun de Fënsteroperatiounen an den entspriechende Code kucken.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"

Fënster Zorte

Et ginn dräi Aarte vu Fënsteren a Kafka Streams:

  • Sëtzung;
  • "trommelen";
  • rutschen / sprangen.

Wéi ee fir ze wielen hänkt vun Äre Geschäftsbedéngungen of. Tumbling- a Sprangfenster sinn Zäitbegrenzt, während Sessiounsfenster duerch Benotzeraktivitéit limitéiert sinn - d'Dauer vun der Sessioun(en) gëtt eleng festgeluecht wéi aktiv de Benotzer ass. Den Haapt Saach ze erënneren ass, datt all Fënster Zorte baséiert op den Datum / Zäit Timberen vun Entréen, net der System Zäit.

Als nächst implementéiere mir eis Topologie mat jiddereng vun de Fënstertypen. De komplette Code gëtt nëmmen am éischte Beispill uginn, fir aner Aarte vu Fënsteren ännert sech näischt ausser der Aart vun der Fënsteroperatioun.

Sessioun Fënsteren

Sessiounsfenster si ganz anescht wéi all aner Aarte vu Fënsteren. Si sinn net sou vill vun der Zäit limitéiert wéi duerch d'Aktivitéit vum Benotzer (oder d'Aktivitéit vun der Entitéit déi Dir verfollege wëllt). Sessiounsfenster sinn duerch Perioden vun Inaktivitéit ofgrenzt.

Figur 5.12 illustréiert d'Konzept vu Sessiounsfenster. Déi méi kleng Sessioun fusionéiert mat der Sessioun lénks. An d'Sessioun op der rietser Säit wäert getrennt sinn, well et no enger laanger Period vun Inaktivitéit ass. Sessiounsfenster baséieren op Benotzeraktivitéit, awer benotzt Datum / Zäitstempel vun Entréen fir ze bestëmmen wéi eng Sessioun den Entrée gehéiert.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"

Benotzt Sessiounsfenster fir Aktietransaktiounen ze verfolgen

Loosst eis Sessiounsfenster benotze fir Informatioun iwwer Austauschtransaktiounen z'erfëllen. D'Ëmsetze vu Sessiounsfenster gëtt an der Listing 5.5 gewisen (wat an src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java fonnt ka ginn).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Dir hutt schonn déi meescht Operatiounen an dëser Topologie gesinn, also ass et net néideg fir se hei nach eng Kéier ze kucken. Mä et sinn och e puer nei Elementer hei, déi mer elo diskutéieren.

All GroupBy Operatioun mécht typesch eng Aart Aggregatiounsoperatioun aus (Aggregatioun, Rollup oder Zielen). Dir kënnt entweder kumulativ Aggregatioun mat engem lafenden Total maachen, oder Fënsteraggregatioun, déi Rekorder bannent enger spezifizéierter Zäitfenster berücksichtegt.

De Code am Listing 5.5 zielt d'Zuel vun den Transaktiounen bannent Sessiounsfenster. An Fig. 5.13 dës Aktiounen ginn Schrëtt fir Schrëtt analyséiert.

Andeems mir windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) uruffen, kreéiere mir eng Sessiounsfenster mat engem Inaktivitéitsintervall vun 20 Sekonnen an engem Persistenzintervall vu 15 Minutten. En Idle Intervall vun 20 Sekonnen bedeit datt d'Applikatioun all Entrée enthält déi bannent 20 Sekonnen vum Enn oder dem Ufank vun der aktueller Sessioun an déi aktuell (aktiv) Sessioun ukommt.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Als nächst spezifizéiere mir wéi eng Aggregatiounsoperatioun an der Sessiounsfenster muss gemaach ginn - an dësem Fall zielen. Wann en erakommen Entrée ausserhalb vun der Inaktivitéitsfenster fällt (eng Säit vum Datum/Zäitstempel), erstellt d'Applikatioun eng nei Sessioun. Retentiounsintervall bedeit eng Sessioun fir eng gewëssen Zäit z'erhalen an erlaabt spéit Donnéeën, déi iwwer d'Inaktivitéitsperiod vun der Sessioun erausstinn, awer nach ëmmer befestegt kënne ginn. Zousätzlech entsprécht den Ufank an Enn vun der neier Sessioun, déi aus der Fusioun entstinn, dem fréisten a leschten Datum / Zäitstempel.

Loosst eis e puer Entréen aus der Zielmethod kucken fir ze kucken wéi Sessiounen funktionnéieren (Tabelle 5.1).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Wann records ukommen, kucke mir no existéierende Sessiounen mam selwechte Schlëssel, eng Ennzäit manner wéi den aktuellen Datum / Zäitstempel - Inaktivitéitsintervall, an eng Startzäit méi grouss wéi den aktuellen Datum / Zäitstempel + Inaktivitéitsintervall. Maachen dëst Rechnung, véier Entréen aus Dësch. 5.1 ginn an eng eenzeg Sessioun fusionéiert wéi follegt.

1. Rekord 1 kënnt als éischt, sou datt d'Startzäit gläich ass mat der Ennzäit an ass 00:00:00.

2. Als nächst kënnt d'Entrée 2, a mir sichen no Sessiounen déi net méi fréi wéi 23:59:55 ophalen an net méi spéit wéi 00:00:35 ufänken. Mir fanne Rekord 1 a kombinéiere Sessiounen 1 an 2. Mir huelen d'Startzäit vun der Sessioun 1 (fréier) an d'Endzäit vun der Sessioun 2 (spéider), sou datt eis nei Sessioun um 00:00:00 ufänkt an um 00:00 ophält: 15:XNUMX.

3. Rekord 3 kënnt, mir sichen no Seancen tëscht 00:00:30 an 00:01:10 a fanne keng. Füügt eng zweet Sessioun fir de Schlëssel 123-345-654,FFBE un, fänkt um 00:00:50 un.

4. Rekord 4 kënnt a mir sichen no Seancen tëscht 23:59:45 an 00:00:25. Dës Kéier sinn déi zwou Sessiounen 1 an 2 fonnt.

Vun deem wat an dëser Rubrik beschriwwe gëtt, ass et derwäert déi folgend wichteg Nuancen ze erënneren:

  • Sessiounen sinn net fixe Gréisst Fënsteren. D'Dauer vun enger Sessioun gëtt vun der Aktivitéit bannent enger bestëmmter Zäit festgeluegt;
  • D'Datum / Zäitstempel an den Donnéeën bestëmmen ob d'Evenement an enger existéierender Sessioun oder während enger Idle Period fällt.

Als nächst wäerte mir déi nächst Aart vu Fënster diskutéieren - "tumbling" Fënsteren.

"Tumbling" Fënsteren

Tumbling Fënsteren erfaassen Eventer déi an enger gewësser Zäit falen. Stellt Iech vir, Dir musst all Aktietransaktioune vun enger bestëmmter Firma all 20 Sekonnen erfaassen, sou datt Dir all d'Evenementer während där Zäit sammelt. Um Enn vum 20 Sekonnen Intervall rullt d'Fënster ëm a geet op en neien 20 Sekonnen Observatiounsintervall. Figur 5.14 illustréiert dës Situatioun.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Wéi Dir gesitt, sinn all Eventer, déi an de leschten 20 Sekonnen kritt goufen, an der Fënster abegraff. Um Enn vun dëser Period gëtt eng nei Fënster erstallt.

Oplëschtung 5.6 weist Code deen d'Benotzung vun tumblingfenster weist fir Aktietransaktiounen all 20 Sekonnen z'erfaassen (fonnt an src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Mat dëser klenger Ännerung vun der TimeWindows.of Method Opruff, kënnt Dir eng tumbling Fënster benotzen. Dëst Beispill nennt d'Method bis () net, sou datt de Standard Retentiounsintervall vu 24 Stonnen benotzt gëtt.

Endlech ass et Zäit fir op déi lescht vun de Fënsteroptiounen ze goen - "hopping" Fënsteren.

Rutsch ("sprangen") Fënsteren

Schieber / Sprangfenster sinn ähnlech wéi Trommelfenster, awer mat engem klengen Ënnerscheed. Rutschfenster waart net bis zum Enn vum Zäitintervall ier Dir eng nei Fënster erstellt fir rezent Eventer ze veraarbecht. Si fänken nei Berechnungen no engem waarden Intervall manner wéi d'Fënster Dauer.

Fir d'Ënnerscheeder tëscht tumbling a sprangen Fënsteren ze illustréieren, loosse mer zréck op d'Beispill vun der Bourse Transaktiounen zielen. Eist Zil ass nach ëmmer d'Zuel vun den Transaktiounen ze zielen, awer mir wëllen net déi ganz Zäit waarden ier Dir de Comptoir aktualiséiert. Amplaz wäerte mir de Comptoir mat méi kuerzen Intervalle aktualiséieren. Zum Beispill wäerte mir nach ëmmer d'Zuel vun den Transaktiounen all 20 Sekonnen zielen, awer aktualiséieren de Comptoir all 5 Sekonnen, wéi an der Fig. 5.15. An dësem Fall komme mir mat dräi Resultatfenster mat iwwerlappende Donnéeën.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Oplëschtung 5.7 weist de Code fir Schiebefenster ze definéieren (fonnt an src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
A tumbling Fënster kann zu engem sprangen Fënster ëmgerechent ginn andeems en Opruff un der advanceBy () Method. Am gewisen Beispill ass de Spuerintervall 15 Minutten.

Dir hutt an dëser Sektioun gesinn wéi Dir Aggregatiounsresultater op Zäitfenster limitéiere kënnt. Besonnesch wëll ech Iech déi folgend dräi Saachen aus dëser Sektioun erënneren:

  • d'Gréisst vun de Sessiounsfenster ass limitéiert net duerch Zäitperiod, mee duerch Benotzeraktivitéit;
  • "Tumbling" Fënsteren déi en Iwwerbléck vun Evenementer bannent enger bestëmmter Zäit;
  • D'Dauer vu Sprangfenster ass fixéiert, awer si ginn dacks aktualiséiert a kënnen iwwerlappend Entréen an all Fënsteren enthalen.

Als nächst wäerte mir léieren wéi een KTable zréck an eng KStream konvertéiert fir eng Verbindung.

5.3.3. KStream an KTable Objete verbannen

Am Kapitel 4 hu mir diskutéiert zwee KStream Objeten ze verbannen. Elo musse mir léieren wéi een KTable an KStream verbënnt. Dëst kann aus dem folgenden einfache Grond gebraucht ginn. KStream ass e Stream vun Opzeechnungen, an KTable ass e Stream vu Rekordupdates, awer heiansdo wëllt Dir en zousätzleche Kontext zum Rekordstroum addéieren andeems Dir Updates vum KTable benotzt.

Loosst eis Donnéeën iwwer d'Zuel vun de Bourse Transaktiounen huelen a kombinéieren se mat Börsennoriichten fir déi relevant Industrien. Hei ass wat Dir maache musst fir dëst z'erreechen mat dem Code deen Dir scho hutt.

  1. Konvertéiert e KTable-Objet mat Daten iwwer d'Zuel vun den Aktietransaktiounen an e KStream, gefollegt vun der Ersatz vum Schlëssel mam Schlëssel, deen den Industriesektor besot deen dësem Aktiesymbol entsprécht.
  2. Erstellt e KTable-Objet deen Daten aus engem Thema mat Börsennoriichten liest. Dësen neie KTable gëtt no Industriesektor kategoriséiert.
  3. Connect news Aktualiséierungen mat Informatiounen iwwert d'Zuel vun Bourse Transaktiounen vun Industrie Secteur.

Loosst eis elo kucken wéi dësen Aktiounsplang ëmgesat gëtt.

Konvertéiert KTable op KStream

Fir KTable op KStream ze konvertéieren musst Dir déi folgend maachen.

  1. Rufft d'Ktable.toStream() Method un.
  2. Andeems Dir d'KStream.map Method nennt, ersetzt de Schlëssel mam Industrienumm, a recuperéiert dann den TransactionSummary Objet aus der Windowed Instanz.

Mir ketten dës Operatiounen zesummen wéi follegt (de Code kann an der Datei fonnt ginn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Well mir eng KStream.map Operatioun ausféieren, gëtt déi zréckginn KStream Instanz automatesch nei partitionéiert wann se an enger Verbindung benotzt gëtt.

Mir hunn de Konversiounsprozess ofgeschloss, als nächst musse mir e KTable-Objet erstellen fir Aktienoriichten ze liesen.

Kreatioun vun KTable fir Stock Neiegkeeten

Glécklecherweis hëlt e KTable-Objet nëmmen eng Zeil Code (de Code kann an src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fonnt ginn) (Listing 5.9).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Et ass derwäert ze notéieren datt keng Serde-Objete musse spezifizéiert ginn, well String Serdes an den Astellunge benotzt ginn. Och, andeems Dir déi EARLIEST Enumeratioun benotzt, ass den Dësch ganz am Ufank mat Opzeechnunge gefëllt.

Elo kënne mir op de leschte Schrëtt weidergoen - Verbindung.

Neiegkeetupdates mat Transaktiounszueldaten verbannen

Eng Verbindung ze kreéieren ass net schwéier. Mir wäerten e lénksen Join benotzen am Fall wou et keng Aktienoriichten fir déi relevant Industrie gëtt (den néidege Code kann an der Datei fonnt ginn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Dëse leftJoin Bedreiwer ass ganz einfach. Am Géigesaz zu de Join am Kapitel 4 gëtt d'JoinWindow Method net benotzt, well wann Dir e KStream-KTable Join ausféiert, gëtt et nëmmen eng Entrée am KTable fir all Schlëssel. Sou eng Verbindung ass net an der Zäit limitéiert: de Rekord ass entweder am KTable oder fehlt. D'Haaptconclusioun: mat KTable Objekter kënnt Dir KStream mat manner dacks aktualiséiert Referenzdaten beräicheren.

Elo kucke mir e méi effiziente Wee fir Eventer vu KStream ze beräicheren.

5.3.4. GlobalKTable Objete

Wéi Dir kënnt gesinn, ass et e Besoin fir Eventstreamen ze beräicheren oder Kontext derbäi ze ginn. Am Kapitel 4 hutt Dir d'Verbindungen tëscht zwee KStream-Objeten gesinn, an an der viregter Sektioun hutt Dir d'Verbindung tëscht engem KStream an engem KTable gesinn. An all dëse Fäll ass et néideg den Datestroum nei ze partitionéieren wann Dir d'Schlësselen op en neien Typ oder Wäert kartéiert. Heiansdo gëtt d'Repartitionéierung explizit gemaach, an heiansdo mécht Kafka Streams et automatesch. D'Re-Partitionéierung ass néideg, well d'Schlëssel geännert hunn an d'Records mussen an neie Sektiounen ophalen, soss ass d'Verbindung onméiglech (dëst gouf am Kapitel 4 diskutéiert, an der Rubrik "Daten nei Partitionéieren" an der Ënnersektioun 4.2.4).

Re-Partitionéierung huet e Käschte

Re-Partitionéierung erfuerdert Käschten - zousätzlech Ressourcekäschte fir Zwëschenthemen ze kreéieren, Duplikatdaten an engem aneren Thema ze späicheren; et heescht och erhéicht latency wéinst Schreiwen a Liesen vun dësem Thema. Zousätzlech, wann Dir iwwer méi wéi een Aspekt oder Dimensioun matmaache musst, musst Dir d'Joins chainen, d'Records mat neie Schlësselen kartéieren an de Re-Partitionéierungsprozess erëm ausféieren.

Verbindung mat méi klengen Datesätz

A verschiddene Fäll ass de Volume vun de Referenzdaten, déi verbonne sinn, relativ kleng, sou datt komplett Kopien dovun einfach lokal op all Node passen. Fir Situatiounen wéi dës liwwert Kafka Streams d'GlobalKTable Klass.

GlobalKTable Instanzen sinn eenzegaarteg well d'Applikatioun all Daten op jiddereng vun den Noden replizéiert. A well all d'Donnéeën op all Node präsent sinn, ass et net néideg den Eventstroum duerch Referenzdatenschlëssel ze partitionéieren sou datt et fir all Partitionen verfügbar ass. Dir kënnt och Schlëssellos Verbindungen mat GlobalKTable Objekter maachen. Loosst eis op ee vun de fréiere Beispiller zréckgoen fir dës Feature ze demonstréieren.

KStream Objete mat GlobalKTable Objete verbannen

Am Ënnersektioun 5.3.2 hu mir d'Fënsteraggregatioun vun Austauschtransaktioune vu Keefer gemaach. D'Resultater vun dëser Aggregatioun hunn esou ausgesinn:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Wärend dës Resultater den Zweck gedéngt hunn, wier et méi nëtzlech gewiescht wann de Client säin Numm an de ganze Firmennumm och ugewise goufen. Fir de Clientnumm an de Firmennumm derbäizefügen, kënnt Dir normal Bäiträg maachen, awer Dir musst zwee Schlësselmappingen maachen an nei Partitionéieren. Mat GlobalKTable kënnt Dir d'Käschte vun esou Operatiounen vermeiden.

Fir dëst ze maachen, benotze mir den countStream-Objet aus Listing 5.11 (de entspriechende Code kann an src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fonnt ginn) a verbannen et mat zwee GlobalKTable-Objeten.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Mir hunn dat scho virdru diskutéiert, also widderhuelen ech et net. Mee ech feststellen, datt de Code am toStream ().map Funktioun abstrakt an eng Funktioun Objet amplaz vun engem inline Lambda Ausdrock fir d'Liesbarkeet.

De nächste Schrëtt ass zwee Instanzen vu GlobalKTable ze deklaréieren (de gewise Code kann an der Datei fonnt ginn src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"

Notéiert w.e.g. datt Themennimm beschriwwe ginn mat opgezielten Typen.

Elo datt mir all d'Komponente fäerdeg hunn, bleift just de Code fir d'Verbindung ze schreiwen (deen an der Datei src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fonnt ka ginn) (Listing 5.13).

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Och wann et zwee Bäitrëtter an dësem Code sinn, si se geketten well keng vun hire Resultater getrennt benotzt gëtt. D'Resultater ginn um Enn vun der ganzer Operatioun ugewisen.

Wann Dir déi uewe genannte Joint Operatioun ausféiert, kritt Dir Resultater wéi dës:

{customer='Barney, Smith' company="Exxon", transactions= 17}

D'Essenz huet net geännert, awer dës Resultater kucken méi kloer.

Wann Dir op Kapitel 4 zielt, hutt Dir scho verschidden Aarte vu Verbindungen an Aktioun gesinn. Si sinn an der Tabell opgezielt. 5.2. Dës Tabell reflektéiert d'Konnektivitéitsfäegkeeten wéi d'Versioun 1.0.0 vu Kafka Streams; Eppes kann an zukünfteg Verëffentlechungen änneren.

D'Buch "Kafka Streams in Action. Uwendungen a Mikroservicer fir Echtzäitaarbecht"
Fir d'Saachen ofzeschléissen, loosst eis d'Grondlage recapéieren: Dir kënnt Eventstreams (KStream) an Update Streams (KTable) mat Hëllef vum lokale Staat verbannen. Alternativ, wann d'Gréisst vun de Referenzdaten net ze grouss ass, kënnt Dir den GlobalKTable Objet benotzen. GlobalKTables replizéieren all Partitionen op all Kafka Streams Applikatiounsknuet, a garantéiert datt all Daten verfügbar sinn egal wéi eng Partition de Schlëssel entsprécht.

Als nächst wäerte mir d'Kafka Streams Feature gesinn, duerch déi mir Staatsännerunge kënne beobachten ouni Daten aus engem Kafka Thema ze konsuméieren.

5.3.5. Queryable Staat

Mir hu scho verschidde Operatioune gemaach, déi Staat involvéieren an ëmmer d'Resultater op d'Konsole ausginn (fir Entwécklungszwecker) oder se an en Thema schreiwen (fir Produktiounszwecker). Wann Dir Resultater zu engem Thema schreift, musst Dir e Kafka Konsument benotzen fir se ze gesinn.

D'Liesen vun Daten aus dësen Themen kënnen als eng Zort materialiséierter Meenung ugesi ginn. Fir eis Zwecker kënne mir d'Definitioun vun enger materialiséierter Vue aus Wikipedia benotzen: "...e kierperlecht Datebankobjekt mat de Resultater vun enger Ufro. Zum Beispill kann et eng lokal Kopie vu Ferndaten sinn, oder e Subset vun de Reihen an/oder Kolonnen vun enger Tabell oder Bäitrëttsresultater, oder e Resumétabel, deen duerch Aggregatioun kritt gëtt" (https://en.wikipedia.org/wiki) /Materialiséierter Vue).

Kafka Streams erlaabt Iech och interaktiv Ufroen op staatleche Geschäfter ze lafen, wat Iech erlaabt direkt dës materialiséiert Meenungen ze liesen. Et ass wichteg ze bemierken datt d'Ufro un de Staatsgeschäft eng Lies-nëmme Operatioun ass. Dëst garantéiert datt Dir Iech keng Suergen maache musst iwwer zoufälleg Staat inkonsistent ze maachen wärend Är Uwendung Daten veraarbecht.

D'Fäegkeet fir direkt Staatsgeschäfter ze froen ass wichteg. Dëst bedeit datt Dir Dashboard Uwendungen erstellen kënnt ouni éischt Daten vum Kafka Konsument ze sichen. Et erhéicht och d'Effizienz vun der Applikatioun, wéinst der Tatsaach datt et net néideg ass fir Daten erëm ze schreiwen:

  • dank der Uertschaft vun den Donnéeën, si kënne séier zougänglech sinn;
  • Duplikatioun vun Daten gëtt éliminéiert, well se net op extern Späichere geschriwwe ginn.

D'Haaptsaach, déi ech wëll datt Dir Iech drun erënnert ass datt Dir de Staat direkt vun Ärer Applikatioun ufroe kënnt. D'Chancen, déi dëst Iech gëtt, kënnen net iwwerschätzt ginn. Amplaz Daten aus Kafka ze konsuméieren an records an enger Datebank fir d'Applikatioun ze späicheren, kënnt Dir Staatsgeschäfter mat deemselwechte Resultat ufroen. Direkt Ufroen un Staatsgeschäfter bedeite manner Code (kee Konsument) a manner Software (kee Besoin fir eng Datebank Tabelle fir d'Resultater ze späicheren).

Mir hunn zimmlech e bësse Buedem an dësem Kapitel ofgedeckt, sou datt mir eis Diskussioun iwwer interaktive Ufroe géint Staatsgeschäfter fir de Moment verloossen. Awer maach der keng Suergen: am Kapitel 9 kreéiere mir eng einfach Dashboard Applikatioun mat interaktiven Ufroen. Et wäert e puer vun de Beispiller aus dësem a fréiere Kapitelen benotzen fir interaktiv Ufroen ze demonstréieren a wéi Dir se op Kafka Streams Uwendungen addéiere kënnt.

Summary

  • KStream Objekter representéieren Stréim vun Eventer, vergläichbar mat Inserts an eng Datebank. KTable Objete representéieren Update Streams, méi wéi Updates op eng Datebank. D'Gréisst vum KTable Objet wiisst net, al records ginn duerch nei ersat.
  • KTable Objete sinn néideg fir Aggregatioun Operatiounen.
  • Mat Fënsteroperatioune kënnt Dir aggregéiert Daten an Zäiteem opdeelen.
  • Dank GlobalKTable Objekter kënnt Dir Referenzdaten iwwerall an der Applikatioun zougräifen, onofhängeg vun der Partitionéierung.
  • Verbindungen tëscht KStream, KTable an GlobalKTable Objete sinn méiglech.

Bis elo hu mir eis fokusséiert op Kafka Streams Uwendungen ze bauen mat dem héijen Niveau KStream DSL. Och wann d'High-Level Approche Iech erlaabt Iech ordentlech a präzis Programmer ze kreéieren, stellt d'Benotzung e Trade-Off duer. Schafft mat DSL KStream heescht d'Erhéijung vun der Conciseness vun Ärem Code andeems Dir de Grad vun der Kontroll reduzéiert. Am nächste Kapitel wäerte mir den Low-Level Handler Node API kucken a probéieren aner Ofsaz. D'Programmer wäerte méi laang sinn wéi se virdru waren, awer mir kënne bal all Handler Node kreéieren dee mir brauchen.

→ Méi Detailer iwwer d'Buch fannt Dir op Websäit vum Verlag

→ Fir Habrozhiteli 25% Remise mat Coupon - Kafka Streams

→ Beim Bezuelen vun der Pabeierversioun vum Buch gëtt en elektronescht Buch per E-Mail geschéckt.

Source: will.com

Setzt e Commentaire