A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához" Sziasztok Khabro lakosai! Ez a könyv minden fejlesztő számára alkalmas, aki meg akarja érteni a szálfeldolgozást. Az elosztott programozás megértése segít jobban megérteni a Kafkát és a Kafka-folyamokat. Jó lenne tudni magát a Kafka keretet, de ez nem szükséges: mindent elmondok, amire szüksége van. A tapasztalt Kafka-fejlesztők és a kezdők egyaránt megtanulják, hogyan hozhatnak létre érdekes adatfolyam-feldolgozó alkalmazásokat a Kafka Streams könyvtár használatával ebben a könyvben. A középhaladó és haladó Java-fejlesztők, akik már ismerik az olyan fogalmakat, mint a szerializálás, megtanulják alkalmazni készségeiket Kafka Streams alkalmazások létrehozására. A könyv forráskódja Java 8 nyelven íródott, és jelentős mértékben alkalmazza a Java 8 lambda kifejezés szintaxisát, így jól jön a lambda függvények kezelésének ismerete (akár más programozási nyelven is).

Kivonat. 5.3. Aggregációs és ablakos műveletek

Ebben a részben a Kafka Streams legígéretesebb részeit vizsgáljuk meg. Eddig a Kafka Streams következő vonatkozásaival foglalkoztunk:

  • feldolgozási topológia létrehozása;
  • állapot használata streaming alkalmazásokban;
  • adatfolyam-kapcsolatok végrehajtása;
  • különbségek az eseményfolyamok (KStream) és a frissítési adatfolyamok (KTable) között.

A következő példákban ezeket az elemeket egyesítjük. Megismerheti az ablakozást is, amely a streaming alkalmazások másik nagyszerű funkciója. Az első példánk egy egyszerű összesítés lesz.

5.3.1. A részvényeladások összesítése iparági szektoronként

Az összesítés és a csoportosítás létfontosságú eszközök a streaming adatokkal való munka során. Az egyes iratok beérkezésükkori vizsgálata gyakran nem elegendő. Az adatokból további információk kinyeréséhez csoportosítani és kombinálni kell azokat.

Ebben a példában egy napi kereskedő jelmezét ölti magára, akinek nyomon kell követnie több iparágban működő vállalatok részvényeinek értékesítési volumenét. Konkrétan az az öt vállalat érdekli, amelyek a legnagyobb részesedéssel rendelkeznek az egyes iparágakban.

Az ilyen összesítéshez a következő több lépésre van szükség ahhoz, hogy az adatokat a kívánt formára fordítsa (általános kifejezésekkel).

  1. Hozzon létre egy témaalapú forrást, amely nyers tőzsdei kereskedési információkat tesz közzé. Egy StockTransaction típusú objektumot kell leképeznünk egy ShareVolume típusú objektumra. A lényeg az, hogy a StockTransaction objektum értékesítési metaadatokat tartalmaz, de csak az eladott részvények számáról van szükségünk adatokra.
  2. Csoportosítsa a ShareVolume adatokat részvényjelek szerint. A szimbólumok szerinti csoportosítás után ezeket az adatokat összecsukhatja a készletértékesítési mennyiségek részösszegeivé. Érdemes megjegyezni, hogy a KStream.groupBy metódus egy KGroupedStream típusú példányt ad vissza. A KTable példányt pedig a KGroupedStream.reduce metódus további meghívásával kaphatja meg.

Mi az a KGroupedStream felület?

A KStream.groupBy és KStream.groupByKey metódusok a KGroupedStream egy példányát adják vissza. A KGroupedStream egy eseményfolyam köztes reprezentációja a kulcsok szerinti csoportosítás után. Egyáltalán nem arra való, hogy közvetlenül dolgozzon vele. Ehelyett a KGroupedStream használható az összesítési műveletekhez, amelyek mindig KT-táblát eredményeznek. És mivel az összesítési műveletek eredménye egy KTable, és állapottárolót használnak, előfordulhat, hogy nem minden frissítés kerül elküldésre a folyamatban.

A KTable.groupBy metódus egy hasonló KGroupedTable-t ad vissza – a frissítések folyamának közbenső reprezentációját, kulcs szerint átcsoportosítva.

Tartsunk egy kis szünetet, és nézzük meg az ábrát. 5.9, ami azt mutatja, hogy mit értünk el. Ennek a topológiának már nagyon ismerősnek kell lennie.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Nézzük most ennek a topológiának a kódját (az src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java fájlban található) (5.2-es lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Az adott kódot a rövidsége és a több sorban végrehajtott műveletek nagy mennyisége jellemzi. A builder.stream metódus első paraméterében újdonságokat vehetünk észre: az AutoOffsetReset.EARLIEST enum típusú értéket (van egy LATEST is), amelyet a Consumed.withOffsetResetPolicy metódussal állítunk be. Ez a felsorolási típus használható eltolás-visszaállítási stratégia megadására minden KStream vagy KTable számára, és elsőbbséget élvez a konfigurációból származó eltolás-visszaállítási opcióval szemben.

GroupByKey és GroupBy

A KStream felületen két módszer található a rekordok csoportosítására: GroupByKey és GroupBy. Mindkettő egy KGroupedTable-t ad vissza, így felmerülhet a kérdés, hogy mi a különbség köztük, és mikor melyiket érdemes használni?

A GroupByKey metódus akkor használatos, ha a KStream kulcsai már nem üresek. És ami a legfontosabb, az „újraparticionálást igényel” jelző soha nem lett beállítva.

A GroupBy metódus feltételezi, hogy módosította a csoportosítási kulcsokat, így az újrapartíciós jelző igaz értékre van állítva. A GroupBy metódus után végzett csatlakozások, aggregációk stb. automatikus újraparticionálást eredményez.
Összegzés: Amikor csak lehetséges, használja a GroupByKey-t a GroupBy helyett.

Világos, hogy mit csinálnak a mapValues ​​és a groupBy metódusok, ezért vessünk egy pillantást a sum() metódusra (az src/main/java/bbejeck/model/ShareVolume.java fájlban található) (5.3-as lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
A ShareVolume.sum metódus a készletértékesítési mennyiség futó összegét adja vissza, és a teljes számítási lánc eredménye egy KTable objektum . Most már megértette a KTable szerepét. Amikor a ShareVolume objektumok megérkeznek, a megfelelő KTable objektum tárolja a legújabb frissítést. Fontos megjegyezni, hogy minden frissítés megjelenik az előző shareVolumeKTable-ban, de nem mindegyiket küldi tovább.

Ezt a KT-táblázatot használjuk az összesítésre (a kereskedett részvények száma alapján), hogy megkapjuk azt az öt vállalatot, amelyek az egyes iparágakban a legnagyobb mennyiségű részvényt forgalmazzák. Cselekvéseink ebben az esetben hasonlóak lesznek az első összesítéshez.

  1. Hajtson végre egy másik groupBy műveletet az egyes ShareVolume objektumok iparág szerinti csoportosításához.
  2. Kezdje el a ShareVolume objektumok összegzését. Ezúttal az aggregációs objektum egy rögzített méretű prioritású sor. Ebben a fix méretű sorban csak az az öt cég marad meg, amelyeknek a legtöbb eladott részvénye van.
  3. Leképezze az előző bekezdésben szereplő sorokat egy karakterlánc értékre, és adja vissza az öt leggyakrabban kereskedett részvényt szám szerint iparágonként.
  4. Az eredményeket szöveges formában írd be a témába!

ábrán. Az 5.10. ábra az adatfolyam-topológia grafikonját mutatja. Amint látja, a feldolgozás második köre meglehetősen egyszerű.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Most, hogy világosan megértettük a feldolgozás második körének felépítését, lapozhatunk a forráskódhoz (az src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java fájlban található) (5.4-es lista) .

Ez az inicializáló egy fixQueue változót tartalmaz. Ez egy egyéni objektum, amely a java.util.TreeSet adaptere, és a kereskedési részvények csökkenő sorrendjében a legjobb N eredmény követésére szolgál.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Már láttad a groupBy és a mapValues ​​hívásokat, ezért ezekre nem térünk ki (a KTable.toStream metódust hívjuk, mert a KTable.print metódus elavult). De még nem láttad az aggregate() KTable verzióját, ezért szánunk egy kis időt ennek megbeszélésére.

Emlékszel, hogy a KTable abban különbözik, hogy az azonos kulcsokkal rendelkező rekordok frissítésnek minősülnek. A KTable lecseréli a régi bejegyzést egy újra. Az összesítés hasonló módon történik: a legutóbbi, azonos kulcsú rekordokat összesítik. Amikor egy rekord megérkezik, hozzáadódik a FixedSizePriorityQueue osztálypéldányhoz egy összeadó segítségével (második paraméter az összesített metódushívásban), de ha már létezik másik rekord ugyanazzal a kulccsal, akkor a régi rekordot kivonóval eltávolítják (harmadik paraméter az összesített metódushívás).

Mindez azt jelenti, hogy a FixedSizePriorityQueue aggregátorunk nem aggregál minden értéket egy kulccsal, hanem az N legtöbb kereskedéssel rendelkező részvény mennyiségének mozgó összegét tárolja. Minden bejövő bejegyzés az eddig eladott részvények teljes számát tartalmazza. A KTable tájékoztatást ad arról, hogy jelenleg mely vállalatok részvényeivel kereskednek a legtöbbet, anélkül, hogy minden frissítést gördülő összesítésre lenne szükség.

Két fontos dolgot tanultunk meg:

  • csoportosítsa az értékeket a KTable-ban egy közös kulccsal;
  • hasznos műveleteket, például összesítést és összesítést hajt végre ezeken a csoportosított értékeken.

E műveletek végrehajtásának ismerete fontos ahhoz, hogy megértsük a Kafka Streams alkalmazáson keresztül mozgó adatok jelentését, és megértsük, milyen információkat hordoznak.

A könyvben korábban tárgyalt kulcsfogalmakat is összegyűjtöttük. A 4. fejezetben megvitattuk, hogy mennyire fontos a hibatűrő, helyi állapot egy streaming alkalmazás számára. A fejezet első példája bemutatta, miért olyan fontos a helyi állam – ez lehetővé teszi, hogy nyomon kövesse a már látott információkat. A helyi hozzáférés elkerüli a hálózati késéseket, így az alkalmazás hatékonyabb és hibaállóbb.

Minden összesítési vagy összesítési művelet végrehajtásakor meg kell adnia az állapottároló nevét. Az összesítési és összesítési műveletek egy KTable példányt adnak vissza, a KTable pedig állapottárat használ a régi eredmények újakra cserélésére. Mint láthatta, nem minden frissítés kerül elküldésre a folyamatban, és ez azért fontos, mert az összesítési műveletek összefoglaló információk előállítására szolgálnak. Ha nem alkalmazza a helyi állapotot, a KTable továbbítja az összes összesítési és összesítési eredményt.

Ezután megvizsgáljuk a műveletek, például az összesítés egy meghatározott időtartamon belüli végrehajtását – úgynevezett ablakműveleteket.

5.3.2. Ablakműveletek

Az előző részben bemutattuk a csúszó konvolúciót és az aggregációt. Az alkalmazás a részvényeladások volumenének folyamatos felgöngyölítését végezte, majd a tőzsde öt legtöbbet forgalmazott részvényének összesítését követte.

Néha szükséges az eredmények ilyen folyamatos összesítése és összesítése. És néha csak egy adott ideig kell műveleteket végrehajtania. Például számítsa ki, hány csereügyletet bonyolítottak le egy adott cég részvényeivel az elmúlt 10 percben. Vagy hány felhasználó kattintott egy új reklámszalagra az elmúlt 15 percben. Egy alkalmazás többször is végrehajthat ilyen műveleteket, de olyan eredményekkel, amelyek csak meghatározott időszakokra vonatkoznak (időablakok).

A csereügyletek számlálása vevő szerint

A következő példában több kereskedő – akár nagy szervezetek, akár okos egyéni finanszírozók – részvénytranzakcióit követjük nyomon.

Ennek a követésnek két oka lehet. Az egyik az, hogy tudni kell, mit vásárolnak/adnak el a piacvezetők. Ha ezek a nagy szereplők és a kifinomult befektetők lehetőséget látnak, akkor érdemes követni stratégiájukat. A második ok az a vágy, hogy észrevegyék az illegális bennfentes kereskedelem minden lehetséges jelét. Ehhez elemeznie kell a nagy eladási ugrások és a fontos sajtóközlemények közötti összefüggést.

Az ilyen nyomon követés a következő lépésekből áll:

  • folyam létrehozása a részvény-tranzakciók témakörből való olvasáshoz;
  • a bejövő rekordok csoportosítása a vevő azonosítója és a készlet szimbóluma szerint. A groupBy metódus meghívása a KGroupedStream osztály egy példányát adja vissza;
  • A KGroupedStream.windowedBy metódus egy időablakra korlátozott adatfolyamot ad vissza, amely lehetővé teszi az ablakos összesítést. Az ablak típusától függően vagy egy TimeWindowedKStream vagy egy SessionWindowedKStream kerül visszaadásra;
  • tranzakciószám az összesítési művelethez. Az ablakos adatfolyam határozza meg, hogy egy adott rekordot figyelembe kell-e venni ebben a számlálásban;
  • eredmények írása egy témába vagy kiadása a konzolra a fejlesztés során.

Az alkalmazás topológiája egyszerű, de hasznos lenne egy világos kép róla. Vessünk egy pillantást a Fig. 5.11.

Ezután az ablakműveletek funkcionalitását és a megfelelő kódot nézzük meg.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"

Ablaktípusok

A Kafka Streamsben három ablaktípus létezik:

  • ülésszakos;
  • "bukdácsoló";
  • csúszás/ugrás.

Hogy melyiket válasszuk, az üzleti igényeitől függ. A bukdácsoló és ugráló ablakok időben korlátozottak, míg a munkamenet-ablakokat a felhasználói tevékenység korlátozza – a munkamenet(ek) időtartamát kizárólag a felhasználó aktivitása határozza meg. A legfontosabb, hogy ne feledje, hogy minden ablaktípus a bejegyzések dátum-/időbélyegzőjén alapul, nem a rendszeridőn.

Ezután az egyes ablaktípusokkal megvalósítjuk a topológiánkat. A teljes kódot csak az első példában adjuk meg, más típusú ablakoknál semmi sem változik, kivéve az ablak működési típusát.

Munkamenet ablakok

A munkamenet ablakai nagyon különböznek az összes többi ablaktípustól. Ezeket nem annyira az idő, mint inkább a felhasználó tevékenysége (vagy a nyomon követni kívánt entitás tevékenysége) korlátozza. A munkamenet ablakait az inaktivitási időszakok határolják.

Az 5.12. ábra szemlélteti a szekcióablak fogalmát. A kisebb munkamenet egyesül a tőle balra lévővel. A jobb oldali munkamenet pedig külön lesz, mert hosszú inaktivitást követ. A munkamenet-ablakok a felhasználói tevékenységen alapulnak, de a bejegyzések dátum-/időbélyegzőivel határozzák meg, hogy a bejegyzés melyik munkamenethez tartozik.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"

Munkamenet ablakok használata a részvénytranzakciók nyomon követésére

Használjunk munkamenet ablakokat a cseretranzakciók információinak rögzítésére. A munkamenet-ablakok megvalósítása az 5.5-ös listában látható (amely az src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java fájlban található).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
A legtöbb műveletet már látta ebben a topológiában, így nem szükséges újra megnézni őket. De van itt több új elem is, amelyeket most tárgyalunk.

Bármely groupBy művelet általában valamilyen összesítési műveletet hajt végre (összevonás, összesítés vagy számlálás). Végezhet kumulatív összesítést futó összesítéssel, vagy ablak-összesítést, amely figyelembe veszi a megadott időablakon belüli rekordokat.

Az 5.5-ös listában található kód a munkamenet-ablakon belüli tranzakciók számát számolja. ábrán. 5.13 Ezeket a tevékenységeket lépésről lépésre elemzik.

A windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) meghívásával egy munkamenet-ablakot hozunk létre 20 másodperces inaktivitási időközzel és 15 perces kitartási időközzel. A 20 másodperces tétlenségi időköz azt jelenti, hogy az alkalmazás minden olyan bejegyzést tartalmaz az aktuális (aktív) munkamenetbe, amely az aktuális munkamenet végétől vagy kezdetétől számított 20 másodpercen belül érkezik.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Ezután megadjuk, hogy a munkamenet ablakban melyik összesítési műveletet kell végrehajtani - ebben az esetben a számlálást. Ha egy bejövő bejegyzés kívül esik az inaktivitási ablakon (a dátum/időbélyegző bármelyik oldalán), az alkalmazás új munkamenetet hoz létre. A megőrzési intervallum azt jelenti, hogy egy munkamenetet egy bizonyos ideig fenntartanak, és lehetővé teszik a késői adatok megjelenítését, amelyek túlnyúlnak a munkamenet inaktivitási időszakán, de még csatolhatók. Ezenkívül az összevonás eredményeként létrejövő új munkamenet kezdete és vége a legkorábbi és legkésőbbi dátum/idő bélyegzőnek felel meg.

Nézzünk meg néhány bejegyzést a számlálási módszerből, hogy lássuk, hogyan működnek a szekciók (5.1. táblázat).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
A rekordok megérkezésekor megkeressük a meglévő munkameneteket ugyanazzal a kulccsal, a befejezési időponttal kisebb, mint az aktuális dátum/időbélyegző – inaktivitási intervallum, és a kezdő időpont nagyobb, mint az aktuális dátum/időbélyeg + inaktivitási intervallum. Ezt figyelembe véve négy bejegyzés a táblázatból. 5.1 egyetlen munkamenetté egyesül az alábbiak szerint.

1. Az 1. rekord érkezik először, így a kezdési időpont megegyezik a befejezési időponttal és 00:00:00.

2. Ezután érkezik a 2. bejegyzés, és olyan foglalkozásokat keresünk, amelyek legkorábban 23:59:55-kor érnek véget és legkésőbb 00:00:35-kor kezdődnek. Megtaláljuk az 1. rekordot, és kombináljuk az 1. és 2. munkamenetet. Az 1. (korábbi) munkamenet kezdési idejét és a 2. (későbbi) munkamenet befejezési idejét vesszük, így az új munkamenetünk 00:00:00-kor kezdődik és 00:00-kor ér véget: 15:XNUMX.

3. Megérkezik a 3. rekord, 00:00:30 és 00:01:10 között keresünk munkameneteket és nem találunk. Adjon hozzá egy második munkamenetet a 123-345-654,FFBE kulcshoz, amely 00:00:50-kor kezdődik és végződik.

4. Megérkezik a 4. rekord és 23:59:45 és 00:00:25 között keresünk foglalkozásokat. Ezúttal az 1. és a 2. munkamenet is megtalálható. Mindhárom munkamenet egybe van vonva, 00:00:00 kezdési időponttal és 00:00:15 befejezési időponttal.

Az ebben a részben leírtak alapján érdemes megjegyezni a következő fontos árnyalatokat:

  • a munkamenetek nem fix méretű ablakok. A foglalkozás időtartamát az adott időtartamon belüli tevékenység határozza meg;
  • Az adatokban lévő dátum/idő bélyegek határozzák meg, hogy az esemény egy meglévő munkamenetbe vagy tétlenségi időszakra esik-e.

Ezután a következő ablaktípusról fogunk beszélni - a „zuhanó” ablakokról.

"Bomló" ablakok

A zuhanó ablakok olyan eseményeket rögzítenek, amelyek egy bizonyos időszakon belülre esnek. Képzelje el, hogy egy adott vállalat összes részvénytranzakcióját 20 másodpercenként rögzítenie kell, így össze kell gyűjtenie az összes eseményt ebben az időszakban. A 20 másodperces intervallum végén az ablak átfordul és egy új 20 másodperces megfigyelési intervallumra lép. Az 5.14. ábra szemlélteti ezt a helyzetet.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Amint láthatja, az utolsó 20 másodpercben fogadott összes esemény megjelenik az ablakban. Ezen időtartam végén egy új ablak jön létre.

Az 5.6-os lista olyan kódot mutat be, amely bemutatja a zuhanó ablakok használatát a részvénytranzakciók 20 másodpercenkénti rögzítésére (az src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fájlban található).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Ezzel a kis változtatással a TimeWindows.of metódushívásban használhatja a bukdácsoló ablakot. Ez a példa nem hívja meg az till() metódust, ezért az alapértelmezett 24 órás megőrzési intervallumot fogja használni.

Végül itt az ideje, hogy továbblépjünk az utolsó ablakbeállításokhoz – az ablakok „ugrálásához”.

Toló ("ugró") ablakok

A toló/ugró ablakok hasonlóak a bukdácsoló ablakokhoz, de kis eltéréssel. A csúszóablakok nem várják meg az időintervallum végét, mielőtt új ablakot hoznak létre a legutóbbi események feldolgozására. Az ablak időtartamánál rövidebb várakozási idő után kezdenek új számításokat.

A zuhanó és ugró ablakok közötti különbségek szemléltetésére térjünk vissza a tőzsdei tranzakciók számlálásának példájához. Továbbra is az a célunk, hogy megszámoljuk a tranzakciók számát, de nem szeretnénk a teljes időt kivárni a számláló frissítésével. Ehelyett rövidebb időközönként frissítjük a számlálót. Például továbbra is 20 másodpercenként számoljuk a tranzakciók számát, de 5 másodpercenként frissítjük a számlálót, amint az ábra mutatja. 5.15. Ebben az esetben három eredményablakot kapunk átfedő adatokkal.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Az 5.7-es lista mutatja a csúszóablakok meghatározásának kódját (az src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fájlban).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
A bukdácsoló ablak átalakítható ugráló ablakmá, ha hozzáadjuk az advanceBy() metódust. A bemutatott példában a mentési intervallum 15 perc.

Ebben a részben láthatta, hogyan korlátozhatja az összesítési eredményeket az időablakra. Különösen szeretném, ha emlékezne a következő három dologra ebből a részből:

  • a munkamenet ablakok méretét nem az időtartam, hanem a felhasználói tevékenység korlátozza;
  • A „zuhanó” ablakok áttekintést adnak az eseményekről egy adott időszakon belül;
  • Az ablakok átugrásának időtartama rögzített, de gyakran frissülnek, és minden ablakban tartalmazhatnak átfedő bejegyzéseket.

Ezután megtanuljuk, hogyan lehet egy KTable-t visszaalakítani KStreammé a kapcsolat létrehozásához.

5.3.3. KStream és KTable objektumok összekapcsolása

A 4. fejezetben két KStream objektum összekapcsolását tárgyaltuk. Most meg kell tanulnunk a KTable és a KStream összekapcsolását. Erre a következő egyszerű ok miatt lehet szükség. A KStream rekordok folyama, a KTable pedig rekordfrissítések folyama, de néha érdemes lehet további kontextust hozzáadni a rekordfolyamhoz a KTable frissítéseivel.

Vegyük a tőzsdei tranzakciók számának adatait, és vonjuk össze az érintett iparágak tőzsdei híreivel. Íme, mit kell tennie ennek eléréséhez, tekintettel a már meglévő kódra.

  1. A készlettranzakciók számának adatait tartalmazó KTable objektumot konvertáljon KStreammé, majd cserélje le a kulcsot a készletszimbólumnak megfelelő iparágat jelző kulccsal.
  2. Hozzon létre egy KTable objektumot, amely tőzsdei híreket tartalmazó témakörből olvas adatokat. Ez az új KTable ipari szektor szerint lesz kategorizálva.
  3. Kapcsolja össze a híreket a tőzsdei tranzakciók számával kapcsolatos információkkal ágazatonként.

Most pedig nézzük meg, hogyan valósítsuk meg ezt a cselekvési tervet.

A KTable konvertálása KSreamté

A KTable KStreammé konvertálásához a következőket kell tennie.

  1. Hívja meg a KTable.toStream() metódust.
  2. A KStream.map metódus meghívásával cserélje ki a kulcsot az iparág nevére, majd kérje le a TransactionSummary objektumot a Windowed példányból.

Ezeket a műveleteket a következőképpen láncoljuk össze (a kód az src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fájlban található) (5.8-as lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Mivel KStream.map műveletet hajtunk végre, a visszaküldött KStream példány automatikusan újraparticionálódik, amikor egy kapcsolatban használják.

Az átalakítási folyamatot befejeztük, ezután létre kell hoznunk egy KTable objektumot a tőzsdei hírek olvasásához.

KTable létrehozása a tőzsdei hírekhez

Szerencsére egy KTable objektum létrehozásához csak egy sor kódra van szükség (a kód az src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java helyen található) (5.9-es lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Érdemes megjegyezni, hogy nem szükséges Serde objektumokat megadni, mivel a beállításokban a Serdes karakterlánc szerepel. Ezenkívül a LEGKORÁBBI felsorolás használatával a táblázat már az elején tele van rekordokkal.

Most áttérhetünk az utolsó lépésre - a kapcsolatra.

A hírfrissítések összekapcsolása a tranzakciószámadatokkal

A kapcsolat létrehozása nem nehéz. Bal oldali csatlakozást használunk arra az esetre, ha nincs készlethír az adott iparághoz (a szükséges kód az src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fájlban található) (5.10. lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Ez a leftJoin operátor meglehetősen egyszerű. A 4. fejezetben található illesztésekkel ellentétben a JoinWindow metódus nem használatos, mivel a KStream-KTable összekapcsolás során minden kulcshoz csak egy bejegyzés található a KTable-ban. Az ilyen kapcsolat időben nincs korlátozva: a rekord vagy a KTtáblában van, vagy hiányzik. A fő következtetés: KTable objektumok használatával a KStream ritkábban frissített referenciaadatokkal gazdagítható.

Most egy hatékonyabb módszert fogunk megvizsgálni a KStream eseményeinek gazdagítására.

5.3.4. GlobalKTable objektumok

Amint látja, szükség van az eseményfolyamok gazdagítására vagy kontextus hozzáadására. A 4. fejezetben láthatta a kapcsolatokat két KStream objektum között, az előző részben pedig egy KStream és egy KTable közötti kapcsolatot. Mindezen esetekben szükség van az adatfolyam újraparticionálására, amikor a kulcsokat egy új típushoz vagy értékhez rendeli hozzá. Néha az újraparticionálás kifejezetten történik, néha pedig a Kafka Streams automatikusan. Az újraparticionálásra azért van szükség, mert a kulcsok megváltoztak, és a rekordoknak új szekciókba kell kerülniük, különben a kapcsolat lehetetlenné válik (erről a 4. fejezetben, a 4.2.4. alfejezetben az „Adatok újraparticionálása” részben volt szó).

Az újraparticionálásnak ára van

Az újraparticionálás költségeket igényel – további erőforrás-költségek a közbenső témák létrehozásához, duplikált adatok másik témakörben való tárolása; az ebből a témából való írás és olvasás miatt megnövekedett késleltetést is jelent. Ezen túlmenően, ha egynél több szempontot vagy dimenziót kell összekapcsolnia, láncolnia kell az összekapcsolásokat, le kell képeznie a rekordokat új kulcsokkal, és újra le kell futtatnia az újraparticionálási folyamatot.

Csatlakozás kisebb adatkészletekhez

Egyes esetekben a csatlakoztatandó referenciaadatok mennyisége viszonylag kicsi, így ezek teljes másolatai könnyen elférnek lokálisan az egyes csomópontokon. Az ehhez hasonló helyzetekre a Kafka Streams biztosítja a GlobalKTable osztályt.

A GlobalKTable példányok egyediek, mert az alkalmazás minden adatot replikál az egyes csomópontokhoz. És mivel az összes adat minden csomóponton megtalálható, nincs szükség az eseményfolyam particionálására referencia-adatkulccsal, hogy az minden partíció számára elérhető legyen. A GlobalKTable objektumok használatával kulcs nélküli összekapcsolásokat is létrehozhat. Térjünk vissza az előző példák egyikéhez a funkció bemutatására.

KStream objektumok összekapcsolása GlobalKTable objektumokkal

Az 5.3.2 alfejezetben elvégeztük a csereügyletek vevők szerinti ablakösszesítését. Az összesítés eredménye valahogy így nézett ki:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Bár ezek az eredmények megfeleltek a célnak, hasznosabb lett volna, ha az ügyfél neve és a teljes cégneve is megjelent volna. Az ügyfél nevének és a cégnévnek a hozzáadásához normál összekapcsolást is végezhet, de két kulcsleképezést és újraparticionálást kell végrehajtania. A GlobalKTable segítségével elkerülheti az ilyen műveletek költségeit.

Ehhez használjuk a countStream objektumot az 5.11-es listából (a megfelelő kód az src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fájlban található), és összekapcsoljuk két GlobalKTable objektummal.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Ezt már megbeszéltük, ezért nem ismétlem meg. De megjegyzem, hogy a toStream().map függvényben lévő kód az olvashatóság kedvéért egy függvényobjektummá van absztrahálva, nem pedig egy soron belüli lambda kifejezést.

A következő lépés a GlobalKTable két példányának deklarálása (a megjelenített kód az src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fájlban található) (5.12. lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"

Kérjük, vegye figyelembe, hogy a témanevek leírása felsorolt ​​típusokkal történik.

Most, hogy minden komponens készen van, már csak meg kell írni a kapcsolat kódját (amely az src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fájlban található) (5.13-as lista).

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
Bár ebben a kódban két összekapcsolás található, ezek láncolva vannak, mivel egyik eredményüket sem használják külön. Az eredmények a teljes művelet végén megjelennek.

A fenti összekapcsolási művelet futtatásakor a következő eredményeket kapja:

{customer='Barney, Smith' company="Exxon", transactions= 17}

A lényeg nem változott, de ezek az eredmények egyértelműbbnek tűnnek.

Ha visszaszámlál a 4. fejezetig, már több típusú kapcsolatot láttál működés közben. táblázatban vannak felsorolva. 5.2. Ez a táblázat a Kafka Streams 1.0.0-s verziójának csatlakozási lehetőségeit tükrözi; Valami változhat a jövőbeli kiadásokban.

A „Kafka Streams in Action. Alkalmazások és mikroszolgáltatások a valós idejű munkához"
A dolgok lezárásaként foglaljuk össze az alapokat: az eseményfolyamokat (KStream) és frissítheti a folyamokat (KTable) a helyi állapot használatával. Alternatív megoldásként, ha a referenciaadatok mérete nem túl nagy, használhatja a GlobalKTable objektumot. A GlobalKTables replikálja az összes partíciót minden Kafka Streams alkalmazáscsomópontra, biztosítva, hogy minden adat elérhető legyen, függetlenül attól, hogy a kulcs melyik partícióhoz tartozik.

Következőben a Kafka Streams funkciót fogjuk látni, aminek köszönhetően egy Kafka-témából származó adatok fogyasztása nélkül figyelhetjük meg az állapotváltozásokat.

5.3.5. Lekérdezhető állapot

Számos állapotot érintő műveletet végrehajtottunk már, és az eredményeket mindig kiadjuk a konzolra (fejlesztési célból), vagy egy témába írjuk (gyártási célból). Amikor eredményeket ír egy témához, egy Kafka-fogyasztót kell használnia azok megtekintéséhez.

Az ezekből a témákból származó adatok olvasása a materializált nézetek egy fajtájának tekinthető. Céljainkra használhatjuk a materializált nézet definícióját a Wikipédiából: „...egy lekérdezés eredményeit tartalmazó fizikai adatbázis-objektum. Ez lehet például távoli adatok helyi másolata, egy táblázat sorainak és/vagy oszlopainak részhalmaza, vagy egyesítheti az eredményeket, vagy egy összesítéssel nyert összefoglaló táblázat” (https://en.wikipedia.org/wiki /Materializált_nézet).

A Kafka Streams interaktív lekérdezések futtatását is lehetővé teszi az állami boltokban, lehetővé téve ezeknek a materializált nézetek közvetlen olvasását. Fontos megjegyezni, hogy az állapottároló lekérdezése csak olvasható művelet. Ez biztosítja, hogy ne kelljen attól tartania, hogy az alkalmazás adatfeldolgozása közben véletlenül inkonzisztens állapotot okoz.

Fontos az állapottárolók közvetlen lekérdezésének képessége. Ez azt jelenti, hogy anélkül is létrehozhat irányítópult-alkalmazásokat, hogy először adatokat kellene lekérnie a Kafka fogyasztótól. Növeli az alkalmazás hatékonyságát is, mivel nincs szükség újbóli adatírásra:

  • az adatok helyének köszönhetően gyorsan elérhetők;
  • Az adatok megkettőzése megszűnik, mivel nem íródnak külső tárolóra.

A legfontosabb dolog, amire emlékezni szeretném, az az, hogy az állapotot közvetlenül az alkalmazásból is lekérdezheti. Az ezzel járó lehetőségeket nem lehet túlbecsülni. Ahelyett, hogy a Kafka-tól származó adatokat fogyasztana, és rekordokat tárolna egy adatbázisban az alkalmazás számára, lekérdezheti az állapottárolókat ugyanazzal az eredménnyel. Az állapottárolókba irányuló közvetlen lekérdezések kevesebb kódot (nincs fogyasztó) és kevesebb szoftvert jelentenek (nincs szükség adatbázistáblára az eredmények tárolásához).

Ebben a fejezetben elég sok területet feldolgoztunk, ezért az állami áruházakkal kapcsolatos interaktív lekérdezések tárgyalását egyelőre hagyjuk. De ne aggódjon: a 9. fejezetben létrehozunk egy egyszerű irányítópult-alkalmazást interaktív lekérdezésekkel. A jelen és az előző fejezetek néhány példáját felhasználva bemutatja az interaktív lekérdezéseket, és bemutatja, hogyan adhatja hozzá őket a Kafka Streams alkalmazásokhoz.

Összegzés

  • A KStream objektumok eseményfolyamokat képviselnek, hasonlóak az adatbázisba történő beillesztésekhez. A KTable objektumok frissítési folyamokat képviselnek, inkább egy adatbázis frissítéséhez. A KTable objektum mérete nem nő, a régi rekordokat újak váltják fel.
  • A KTable objektumok szükségesek az összesítési műveletekhez.
  • Az ablakozási műveletek segítségével az összesített adatokat időzónákra oszthatja.
  • A GlobalKTable objektumoknak köszönhetően az alkalmazásban bárhol elérheti a referenciaadatokat, a particionálástól függetlenül.
  • A KStream, a KTable és a GlobalKTable objektumok közötti kapcsolatok lehetségesek.

Eddig a Kafka Streams alkalmazások magas szintű KStream DSL használatával történő építésére összpontosítottunk. Bár a magas szintű megközelítés lehetővé teszi tiszta és tömör programok létrehozását, használata kompromisszumot jelent. A DSL KStreammel való együttműködés azt jelenti, hogy a vezérlés mértékének csökkentésével növeli a kód tömörségét. A következő fejezetben megvizsgáljuk az alacsony szintű kezelő csomópont API-t, és más kompromisszumokat is kipróbálunk. A programok hosszabbak lesznek, mint korábban voltak, de szinte bármilyen kezelőcsomópontot létrehozhatunk, amire szükségünk lehet.

→ További részletek a könyvről a címen találhatók a kiadó honlapján

→ Habrozhiteli részére 25% kedvezmény kupon felhasználásával - Kafka-folyamok

→ A könyv papíralapú változatának kifizetése után egy elektronikus könyvet küldünk e-mailben.

Forrás: will.com

Hozzászólás