Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase" Dobrý deň, obyvatelia Khabra! Táto kniha je vhodná pre každého vývojára, ktorý chce porozumieť spracovaniu vlákien. Pochopenie distribuovaného programovania vám pomôže lepšie pochopiť Kafka a Kafka Streams. Bolo by pekné poznať samotný rámec Kafka, ale to nie je potrebné: ​​Poviem vám všetko, čo potrebujete. Skúsení vývojári Kafka a nováčikovia sa v tejto knihe naučia vytvárať zaujímavé aplikácie na spracovanie streamov pomocou knižnice Kafka Streams. Stredne pokročilí a pokročilí vývojári Java, ktorí sú už oboznámení s konceptmi, ako je serializácia, sa naučia uplatniť svoje zručnosti pri vytváraní aplikácií Kafka Streams. Zdrojový kód knihy je napísaný v jazyku Java 8 a výrazne využíva syntax výrazov lambda Java 8, takže vedieť pracovať s funkciami lambda (aj v inom programovacom jazyku) príde vhod.

Úryvok. 5.3. Operácie agregácie a okien

V tejto časti prejdeme k preskúmaniu najsľubnejších častí Kafkových prúdov. Doteraz sme sa zaoberali nasledujúcimi aspektmi Kafkových streamov:

  • vytvorenie topológie spracovania;
  • používanie stavu v streamovacích aplikáciách;
  • vykonávanie pripojení toku údajov;
  • rozdiely medzi tokmi udalostí (KStream) a aktualizačnými tokmi (KTable).

V nasledujúcich príkladoch všetky tieto prvky spojíme. Dozviete sa tiež o okne, ďalšej skvelej funkcii streamovacích aplikácií. Náš prvý príklad bude jednoduchá agregácia.

5.3.1. Agregácia predaja akcií podľa sektora priemyslu

Agregácia a zoskupovanie sú životne dôležité nástroje pri práci so streamovanými údajmi. Skúmanie jednotlivých záznamov pri ich prijatí je často nedostatočné. Na extrahovanie ďalších informácií z údajov je potrebné ich zoskupiť a spojiť.

V tomto príklade si oblečiete kostým denného obchodníka, ktorý potrebuje sledovať objem predaja akcií spoločností v niekoľkých odvetviach. Konkrétne vás zaujíma päť spoločností s najväčším podielom predaja v každom odvetví.

Takáto agregácia si bude vyžadovať niekoľko nasledujúcich krokov na preloženie údajov do požadovanej formy (všeobecne povedané).

  1. Vytvorte tematický zdroj, ktorý publikuje nespracované informácie o obchodovaní s akciami. Budeme musieť namapovať objekt typu StockTransaction na objekt typu ShareVolume. Ide o to, že objekt StockTransaction obsahuje predajné metadáta, no potrebujeme len údaje o počte predaných akcií.
  2. Zoskupiť údaje ShareVolume podľa symbolu akcií. Po zoskupení podľa symbolu môžete tieto údaje zbaliť do medzisúčtov objemov predaja zásob. Stojí za zmienku, že metóda KStream.groupBy vracia inštanciu typu KGroupedStream. Inštanciu KTable môžete získať ďalším volaním metódy KGroupedStream.reduce.

Čo je rozhranie KGroupedStream

Metódy KStream.groupBy a KStream.groupByKey vrátia inštanciu KGroupedStream. KGroupedStream je prechodná reprezentácia prúdu udalostí po zoskupení podľa kľúčov. Na priamu prácu s ním nie je vôbec určený. Namiesto toho sa KGroupedStream používa na agregačné operácie, ktorých výsledkom je vždy KTable. A keďže výsledkom agregačných operácií je tabuľka KT a používajú stavové úložisko, je možné, že nie všetky aktualizácie sa v dôsledku toho posielajú ďalej.

Metóda KTable.groupBy vracia podobnú KGroupedTable - prechodnú reprezentáciu prúdu aktualizácií, preskupenú podľa kľúča.

Dáme si krátku prestávku a pozrieme sa na Obr. 5.9, ktorá ukazuje, čo sme dosiahli. Táto topológia by vám už mala byť veľmi známa.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Pozrime sa teraz na kód pre túto topológiu (možno ho nájsť v súbore src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (výpis 5.2).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Daný kód sa vyznačuje svojou stručnosťou a veľkým objemom úkonov vykonávaných vo viacerých riadkoch. V prvom parametri metódy builder.stream si môžete všimnúť niečo nové: hodnotu typu enum AutoOffsetReset.EARLIEST (existuje aj LATEST), nastavenú pomocou metódy Consumed.withOffsetResetPolicy. Tento typ enumerácie možno použiť na určenie stratégie resetovania posunu pre každý KStream alebo KTable a má prednosť pred možnosťou resetovania posunu z konfigurácie.

GroupByKey a GroupBy

Rozhranie KStream má dve metódy na zoskupovanie záznamov: GroupByKey a GroupBy. Obe vrátia KGroupedTable, takže by vás mohlo zaujímať, aký je medzi nimi rozdiel a kedy použiť ktorý z nich?

Metóda GroupByKey sa používa, keď kľúče v KStream už nie sú prázdne. A čo je najdôležitejšie, príznak „vyžaduje opätovné rozdelenie“ nebol nikdy nastavený.

Metóda GroupBy predpokladá, že ste zmenili kľúče zoskupenia, takže príznak prerozdelenie je nastavený na hodnotu true. Vykonanie spojení, agregácií atď. po metóde GroupBy bude mať za následok automatické opätovné rozdelenie.
Zhrnutie: Kedykoľvek je to možné, mali by ste použiť GroupByKey namiesto GroupBy.

Je jasné, čo robia metódy mapValues ​​​​a groupBy, takže sa pozrime na metódu sum() (nachádza sa v src/main/java/bbejeck/model/ShareVolume.java) (výpis 5.3).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Metóda ShareVolume.sum vráti priebežný súčet objemu predaja zásob a výsledkom celého reťazca výpočtov je objekt KTable . Teraz chápete, akú úlohu hrá KTable. Keď prídu objekty ShareVolume, príslušný objekt KTable uloží najnovšiu aktuálnu aktualizáciu. Je dôležité si zapamätať, že všetky aktualizácie sa odrážajú v predchádzajúcej tabuľke shareVolumeKTable, ale nie všetky sa posielajú ďalej.

Túto tabuľku KT potom použijeme na agregáciu (podľa počtu obchodovaných akcií), aby sme dospeli k piatim spoločnostiam s najvyššími objemami obchodovaných akcií v každom odvetví. Naše kroky v tomto prípade budú podobné ako pri prvej agregácii.

  1. Vykonajte ďalšiu operáciu groupBy na zoskupenie jednotlivých objektov ShareVolume podľa odvetvia.
  2. Začnite sumarizovať objekty ShareVolume. Tentoraz je objektom agregácie prioritný front s pevnou veľkosťou. V tomto rade s pevnou veľkosťou zostáva len päť spoločností s najväčším počtom predaných akcií.
  3. Namapujte fronty z predchádzajúceho odseku na hodnotu reťazca a vráťte päť najobchodovanejších akcií podľa počtu podľa odvetvia.
  4. Výsledky zapíšte v reťazcovom tvare k téme.

Na obr. Obrázok 5.10 znázorňuje graf topológie dátového toku. Ako vidíte, druhé kolo spracovania je celkom jednoduché.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Teraz, keď sme jasne pochopili štruktúru tohto druhého kola spracovania, môžeme prejsť k jeho zdrojovému kódu (nájdete ho v súbore src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Výpis 5.4) .

Tento inicializátor obsahuje premennú fixedQueue. Toto je vlastný objekt, ktorý je adaptérom pre java.util.TreeSet, ktorý sa používa na sledovanie prvých N výsledkov v zostupnom poradí obchodovaných akcií.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Už ste videli volania groupBy a mapValues ​​​​, takže sa nimi nebudeme zaoberať (voláme metódu KTable.toStream, pretože metóda KTable.print je zastaraná). Ale ešte ste nevideli KTable verziu agregátu(), takže strávime trochu času diskusiou o tom.

Ako si pamätáte, KTable sa líši tým, že záznamy s rovnakými kľúčmi sa považujú za aktualizácie. KTable nahradí starý záznam novým. Agregácia prebieha podobným spôsobom: agregujú sa najnovšie záznamy s rovnakým kľúčom. Keď príde záznam, pridá sa do inštancie triedy FixedSizePriorityQueue pomocou sčítačky (druhý parameter vo volaní agregovanej metódy), ale ak už existuje iný záznam s rovnakým kľúčom, potom sa starý záznam odstráni pomocou odčítača (tretí parameter v volanie agregovanej metódy).

To všetko znamená, že náš agregátor FixedSizePriorityQueue nezhromažďuje všetky hodnoty pomocou jedného kľúča, ale ukladá pohyblivý súčet množstiev N najobchodovanejších typov akcií. Každá prichádzajúca položka obsahuje celkový počet doteraz predaných akcií. KTable vám poskytne informácie o tom, ktoré akcie spoločností sú v súčasnosti najobchodovanejšie, bez toho, aby ste vyžadovali priebežnú agregáciu každej aktualizácie.

Naučili sme sa robiť dve dôležité veci:

  • zoskupiť hodnoty v KTtable spoločným kľúčom;
  • vykonávať užitočné operácie, ako je súhrn a agregácia na týchto zoskupených hodnotách.

Vedieť, ako vykonávať tieto operácie, je dôležité na pochopenie významu údajov, ktoré sa pohybujú cez aplikáciu Kafka Streams, a na pochopenie toho, aké informácie nesie.

Zhrnuli sme aj niektoré kľúčové koncepty, o ktorých sme v tejto knihe hovorili vyššie. V kapitole 4 sme diskutovali o tom, aký dôležitý je lokálny stav odolný voči chybám pre aplikáciu na streamovanie. Prvý príklad v tejto kapitole ukázal, prečo je miestny štát taký dôležitý – dáva vám možnosť sledovať, aké informácie ste už videli. Lokálny prístup zabraňuje sieťovým oneskoreniam, vďaka čomu je aplikácia výkonnejšia a odolnejšia voči chybám.

Pri vykonávaní akejkoľvek operácie súhrnu alebo agregácie musíte zadať názov stavového úložiska. Operácie súhrnu a agregácie vrátia inštanciu KTable a KTable používa stavové úložisko na nahradenie starých výsledkov novými. Ako ste videli, nie všetky aktualizácie sa odosielajú v potrubí, a to je dôležité, pretože operácie agregácie sú navrhnuté tak, aby vytvárali súhrnné informácie. Ak nepoužijete miestny štát, KTable prepošle všetky výsledky agregácie a súhrnu.

Ďalej sa pozrieme na vykonávanie operácií, ako je agregácia v rámci určitého časového obdobia – takzvané operácie okien.

5.3.2. Operácie s oknami

V predchádzajúcej časti sme predstavili posuvnú konvolúciu a agregáciu. Aplikácia vykonávala nepretržité zhrnutie objemu predaja akcií, po ktorom nasledovalo agregovanie piatich najobchodovanejších akcií na burze.

Niekedy je takéto priebežné zhromažďovanie a zhrnutie výsledkov nevyhnutné. A niekedy je potrebné vykonať operácie iba počas určitého časového obdobia. Vypočítajte napríklad, koľko výmenných transakcií sa uskutočnilo s akciami konkrétnej spoločnosti za posledných 10 minút. Alebo koľko používateľov kliklo na nový reklamný banner za posledných 15 minút. Aplikácia môže vykonávať takéto operácie viackrát, ale s výsledkami, ktoré sa vzťahujú len na určené časové obdobia (časové okná).

Počítanie výmenných transakcií kupujúcim

V ďalšom príklade budeme sledovať akciové transakcie u viacerých obchodníkov – buď veľkých organizácií, alebo inteligentných individuálnych finančníkov.

Pre toto sledovanie existujú dva možné dôvody. Jednou z nich je potreba vedieť, čo lídri na trhu kupujú/predávajú. Ak títo veľkí hráči a sofistikovaní investori vidia príležitosť, má zmysel nasledovať ich stratégiu. Druhým dôvodom je túžba odhaliť akékoľvek možné známky nezákonného obchodovania s využitím dôverných informácií. Aby ste to dosiahli, budete musieť analyzovať koreláciu veľkých nárastov predaja s dôležitými tlačovými správami.

Takéto sledovanie pozostáva z nasledujúcich krokov:

  • vytvorenie streamu na čítanie z témy burzové transakcie;
  • zoskupenie došlých záznamov podľa ID kupujúceho a symbolu skladu. Volaním metódy groupBy sa vráti inštancia triedy KGroupedStream;
  • Metóda KGroupedStream.windowedBy vracia dátový tok obmedzený na časové okno, čo umožňuje agregáciu v okne. V závislosti od typu okna sa vráti buď TimeWindowedKStream alebo SessionWindowedKStream;
  • počet transakcií pre operáciu agregácie. Tok údajov v okne určuje, či sa v tomto počte berie do úvahy konkrétny záznam;
  • zapisovanie výsledkov do témy alebo ich výstup do konzoly počas vývoja.

Topológia tejto aplikácie je jednoduchá, ale pomohol by jej jasný obraz. Pozrime sa na Obr. 5.11.

Ďalej sa pozrieme na funkčnosť operácií s oknami a zodpovedajúci kód.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"

Typy okien

V Kafkových prúdoch sú tri typy okien:

  • relácia;
  • „prevracanie“ (prevracanie);
  • kĺzanie/skákanie.

Ktorý z nich si vybrať, závisí od vašich obchodných požiadaviek. Okná hádzania a skákania sú časovo obmedzené, zatiaľ čo okná relácií sú obmedzené aktivitou používateľa – trvanie relácie (relácií) je určené výlučne tým, ako je používateľ aktívny. Hlavná vec na zapamätanie je, že všetky typy okien sú založené na dátumových/časových pečiatkach položiek, nie na systémovom čase.

Ďalej implementujeme našu topológiu s každým z typov okien. Úplný kód bude uvedený iba v prvom príklade, pre ostatné typy okien sa okrem typu operácie okna nič nezmení.

Okná relácie

Okná relácií sa veľmi líšia od všetkých ostatných typov okien. Nie sú obmedzené ani tak časom, ako skôr aktivitou používateľa (alebo aktivitou subjektu, ktorý by ste chceli sledovať). Okná relácií sú ohraničené obdobiami nečinnosti.

Obrázok 5.12 znázorňuje koncept okien relácie. Menšia relácia sa spojí s reláciou naľavo. A relácia vpravo bude oddelená, pretože nasleduje dlhé obdobie nečinnosti. Okná relácie sú založené na aktivite používateľa, ale na určenie, do ktorej relácie daný záznam patrí, použite dátumové/časové pečiatky zo záznamov.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"

Používanie okien relácií na sledovanie akciových transakcií

Využime okná relácií na zachytenie informácií o výmenných transakciách. Implementácia okien relácie je znázornená vo výpise 5.5 (ktorý možno nájsť v src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Väčšinu operácií v tejto topológii ste už videli, takže tu nie je potrebné sa na ne znova pozerať. Je tu však aj niekoľko nových prvkov, o ktorých si teraz povieme.

Akákoľvek operácia groupBy zvyčajne vykonáva určitý druh operácie agregácie (agregácia, súhrn alebo počítanie). Môžete vykonať buď kumulatívnu agregáciu s priebežným súčtom, alebo agregáciu okien, ktorá zohľadňuje záznamy v rámci zadaného časového okna.

Kód vo výpise 5.5 počíta počet transakcií v rámci okien relácie. Na obr. 5.13 tieto akcie sa analyzujú krok za krokom.

Volaním windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) vytvoríme okno relácie s intervalom nečinnosti 20 sekúnd a intervalom zotrvania 15 minút. Nečinný interval 20 sekúnd znamená, že aplikácia zahrnie do aktuálnej (aktívnej) relácie akýkoľvek záznam, ktorý príde do 20 sekúnd od konca alebo začiatku aktuálnej relácie.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Ďalej určíme, ktorá operácia agregácie sa musí vykonať v okne relácie - v tomto prípade počítať. Ak sa prichádzajúci záznam dostane mimo okno nečinnosti (na ktorejkoľvek strane dátumovej/časovej pečiatky), aplikácia vytvorí novú reláciu. Interval uchovávania znamená udržiavanie relácie na určitý čas a umožňuje neskoršie údaje, ktoré presahujú obdobie nečinnosti relácie, ale stále môžu byť pripojené. Začiatok a koniec novej relácie, ktorá je výsledkom zlúčenia, navyše zodpovedá najskoršej a najnovšej dátumovej/časovej pečiatke.

Pozrime sa na niekoľko záznamov z metódy počítania, aby sme videli, ako relácie fungujú (tabuľka 5.1).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Keď dorazia záznamy, hľadáme existujúce relácie s rovnakým kľúčom, časom ukončenia menším, než je aktuálny dátum/časová pečiatka – interval nečinnosti, a časom začiatku väčším ako aktuálny dátum/čas + interval nečinnosti. Berúc do úvahy, štyri záznamy z tabuľky. 5.1 sa zlúčia do jednej relácie nasledovne.

1. Záznam 1 prichádza ako prvý, takže čas začiatku sa rovná času ukončenia a je 00:00:00.

2. Ďalej prichádza záznam 2 a hľadáme relácie, ktoré končia najskôr o 23:59:55 a začnú najneskôr o 00:00:35. Nájdeme záznam 1 a skombinujeme relácie 1 a 2. Vezmeme čas začiatku relácie 1 (skôr) a čas ukončenia relácie 2 (neskôr), takže naša nová relácia začína o 00:00:00 a končí o 00:00: 15:XNUMX.

3. Prichádza záznam 3, hľadáme relácie medzi 00:00:30 a 00:01:10 a nenájdeme žiadne. Pridajte druhú reláciu pre kľúč 123-345-654,FFBE, začínajúcu a končiacu o 00:00:50.

4. Prichádza záznam 4 a hľadáme relácie medzi 23:59:45 a 00:00:25. Tentokrát sú nájdené obe relácie 1 a 2. Všetky tri relácie sú spojené do jednej s časom začiatku 00:00:00 a časom ukončenia 00:00:15.

Z toho, čo je opísané v tejto časti, stojí za to pamätať na nasledujúce dôležité nuansy:

  • relácie nie sú okná s pevnou veľkosťou. Trvanie relácie je určené aktivitou v rámci daného časového obdobia;
  • Pečiatky dátumu a času v údajoch určujú, či udalosť spadá do existujúcej relácie alebo do obdobia nečinnosti.

Ďalej budeme diskutovať o ďalšom type okna - „prepadové“ okná.

"Tomling" okná

Otočné okná zachytávajú udalosti, ktoré spadajú do určitého časového obdobia. Predstavte si, že musíte každých 20 sekúnd zachytiť všetky burzové transakcie určitej spoločnosti, takže zbierate všetky udalosti počas tohto časového obdobia. Na konci 20-sekundového intervalu sa okno prevráti a prejde na nový 20-sekundový interval pozorovania. Obrázok 5.14 znázorňuje túto situáciu.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Ako vidíte, v okne sú zahrnuté všetky udalosti prijaté za posledných 20 sekúnd. Na konci tohto časového obdobia sa vytvorí nové okno.

Výpis 5.6 ukazuje kód, ktorý demonštruje použitie okien na zachytenie akciových transakcií každých 20 sekúnd (nájdené v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
S touto malou zmenou vo volaní metódy TimeWindows.of môžete použiť okno s pádom. Tento príklad nevolá metódu till(), takže sa použije predvolený interval uchovávania 24 hodín.

Nakoniec je čas prejsť na poslednú z možností okien – „hopping“ okná.

Posuvné ("skákacie") okná

Posuvné/skákacie okná sú podobné ako sklápacie okná, no s malým rozdielom. Posuvné okná nečakajú na koniec časového intervalu pred vytvorením nového okna na spracovanie posledných udalostí. Nové výpočty začnú po čakacom intervale kratšom ako je trvanie okna.

Pre ilustráciu rozdielov medzi omieľaním a skákaním okien sa vráťme k príkladu počítania burzových transakcií. Naším cieľom je stále počítať počet transakcií, ale nechceme čakať celý čas na aktualizáciu počítadla. Namiesto toho budeme počítadlo aktualizovať v kratších intervaloch. Napríklad budeme stále počítať počet transakcií každých 20 sekúnd, ale počítadlo aktualizujeme každých 5 sekúnd, ako je znázornené na obr. 5.15. V tomto prípade skončíme s tromi oknami výsledkov s prekrývajúcimi sa údajmi.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Výpis 5.7 zobrazuje kód na definovanie posuvných okien (nachádza sa v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Otočné okno možno previesť na skokové okno pridaním volania metódy advanceBy(). V zobrazenom príklade je interval ukladania 15 minút.

V tejto časti ste videli, ako obmedziť výsledky agregácie na časové okná. Chcem, aby ste si z tejto časti zapamätali najmä tieto tri veci:

  • veľkosť okien relácie nie je obmedzená časovým obdobím, ale aktivitou používateľa;
  • „prepadové“ okná poskytujú prehľad udalostí v danom časovom období;
  • Trvanie preskakovania okien je pevné, ale často sa aktualizujú a môžu obsahovať prekrývajúce sa položky vo všetkých oknách.

Ďalej sa naučíme, ako previesť KTable späť na KStream pre pripojenie.

5.3.3. Prepojenie objektov KStream a KTable

V kapitole 4 sme diskutovali o prepojení dvoch objektov KStream. Teraz sa musíme naučiť, ako prepojiť KTable a KStream. Môže to byť potrebné z nasledujúceho jednoduchého dôvodu. KStream je prúd záznamov a KTable je prúd aktualizácií záznamov, ale niekedy možno budete chcieť pridať ďalší kontext do prúdu záznamov pomocou aktualizácií z KTable.

Zoberme si údaje o počte burzových transakcií a spojme ich s burzovými novinkami pre príslušné odvetvia. Tu je to, čo musíte urobiť, aby ste to dosiahli vzhľadom na kód, ktorý už máte.

  1. Skonvertujte objekt KTable s údajmi o počte akciových transakcií na KSstream, po ktorom nasleduje nahradenie kľúča kľúčom označujúcim odvetvie priemyslu zodpovedajúce tomuto symbolu akcií.
  2. Vytvorte objekt KTable, ktorý číta údaje z témy s burzovými novinkami. Tento nový KTtable bude kategorizovaný podľa priemyselného sektora.
  3. Prepojte aktuality s informáciami o počte burzových transakcií podľa odvetvia.

Teraz sa pozrime, ako implementovať tento akčný plán.

Previesť KTable na KStream

Ak chcete previesť KTable na KStream, musíte urobiť nasledovné.

  1. Zavolajte metódu KTable.toStream().
  2. Zavolaním metódy KStream.map nahraďte kľúč názvom odvetvia a potom získajte objekt TransactionSummary z inštancie Windowed.

Tieto operácie zreťazíme nasledovne (kód možno nájsť v súbore src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (výpis 5.8).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Pretože vykonávame operáciu KStream.map, vrátená inštancia KStream sa automaticky znova rozdelí, keď sa použije v pripojení.

Dokončili sme proces konverzie, ďalej musíme vytvoriť objekt KTable na čítanie akciových správ.

Vytvorenie tabuľky KT pre akciové novinky

Našťastie vytvorenie objektu KTable zaberie len jeden riadok kódu (kód možno nájsť v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (výpis 5.9).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Stojí za zmienku, že nie je potrebné zadať žiadne objekty Serde, pretože v nastaveniach sa používajú reťazce Serdes. Taktiež použitím NAJSKOREJ enumerácie sa tabuľka naplní záznamami hneď na začiatku.

Teraz môžeme prejsť na posledný krok – pripojenie.

Prepojenie aktualizácií noviniek s údajmi o počte transakcií

Vytvorenie spojenia nie je ťažké. Ľavé spojenie použijeme v prípade, že neexistujú žiadne novinky o akciách pre príslušné odvetvie (potrebný kód nájdete v súbore src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Výpis 5.10).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Tento operátor leftJoin je celkom jednoduchý. Na rozdiel od spojení v kapitole 4 sa metóda JoinWindow nepoužíva, pretože pri vykonávaní spojenia KStream-KTable je v tabuľke KTable len jeden záznam pre každý kľúč. Takéto spojenie nie je časovo obmedzené: záznam je buď v KTable, alebo chýba. Hlavný záver: pomocou objektov KTable môžete obohatiť KStream o menej často aktualizované referenčné údaje.

Teraz sa pozrieme na efektívnejší spôsob obohatenia udalostí z KStreamu.

5.3.4. Objekty GlobalKTable

Ako vidíte, je potrebné obohatiť streamy udalostí alebo im pridať kontext. V kapitole 4 ste videli prepojenia medzi dvoma objektmi KStream a v predchádzajúcej časti ste videli prepojenie medzi KStreamom a KTable. Vo všetkých týchto prípadoch je potrebné pri mapovaní kľúčov na nový typ alebo hodnotu znova rozdeliť dátový tok. Niekedy sa prerozdelenie robí explicitne a niekedy to robí Kafka Streams automaticky. Opätovné rozdelenie je potrebné, pretože kľúče sa zmenili a záznamy musia skončiť v nových sekciách, inak spojenie nebude možné (o tom sme hovorili v kapitole 4 v časti „Opätovné rozdelenie údajov“ v podsekcii 4.2.4).

Opätovné rozdelenie má svoje náklady

Opätovné rozdelenie si vyžaduje náklady – dodatočné náklady na zdroje na vytváranie prechodných tém, ukladanie duplicitných údajov v inej téme; znamená to aj zvýšenú latenciu v dôsledku písania a čítania z tejto témy. Okrem toho, ak sa potrebujete spojiť vo viacerých aspektoch alebo dimenziách, musíte spojenia zreťaziť, namapovať záznamy s novými kľúčmi a znova spustiť proces opätovného rozdelenia.

Pripojenie k menším súborom údajov

V niektorých prípadoch je objem referenčných údajov, ktoré sa majú pripojiť, relatívne malý, takže ich úplné kópie sa ľahko zmestia lokálne na každý uzol. Pre situácie, ako je táto, poskytuje Kafka Streams triedu GlobalKTable.

Inštancie GlobalKTable sú jedinečné, pretože aplikácia replikuje všetky údaje do každého z uzlov. A keďže všetky údaje sú prítomné na každom uzle, nie je potrebné rozdeliť tok udalostí podľa referenčného dátového kľúča, aby bol dostupný pre všetky oddiely. Môžete tiež vytvárať bezkľúčové spojenia pomocou objektov GlobalKTable. Vráťme sa k jednému z predchádzajúcich príkladov, aby sme túto funkciu demonštrovali.

Pripojenie objektov KStream k objektom GlobalKTable

V podsekcii 5.3.2 sme vykonali okennú agregáciu výmenných transakcií podľa kupujúcich. Výsledky tejto agregácie vyzerali asi takto:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Hoci tieto výsledky splnili svoj účel, bolo by užitočnejšie, keby sa zobrazilo aj meno zákazníka a celý názov spoločnosti. Ak chcete pridať meno zákazníka a názov spoločnosti, môžete vykonať bežné spojenia, ale budete musieť vykonať dve kľúčové mapovania a opätovné rozdelenie. S GlobalKTable sa môžete vyhnúť nákladom na takéto operácie.

Na to použijeme objekt countStream z výpisu 5.11 (zodpovedajúci kód nájdete v src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) a pripojíme ho k dvom objektom GlobalKTable.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Už sme o tom diskutovali, takže to nebudem opakovať. Všimol som si však, že kód vo funkcii toStream().map je z dôvodu čitateľnosti abstrahovaný do funkčného objektu namiesto inline výrazu lambda.

Ďalším krokom je deklarácia dvoch inštancií GlobalKTable (zobrazený kód možno nájsť v súbore src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Výpis 5.12).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"

Upozorňujeme, že názvy tém sú popísané pomocou vymenovaných typov.

Teraz, keď máme všetky komponenty pripravené, ostáva už len napísať kód pre pripojenie (ktorý nájdete v súbore src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Výpis 5.13).

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Aj keď sú v tomto kóde dve spojenia, sú zreťazené, pretože ani jeden z ich výsledkov sa nepoužíva samostatne. Výsledky sa zobrazia na konci celej operácie.

Keď spustíte vyššie uvedenú operáciu spojenia, dostanete takéto výsledky:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Podstata sa nezmenila, ale tieto výsledky vyzerajú jasnejšie.

Ak odpočítavate do kapitoly 4, už ste videli niekoľko typov spojení v akcii. Sú uvedené v tabuľke. 5.2. Táto tabuľka odráža možnosti pripojenia od verzie 1.0.0 Kafka Streams; V budúcich vydaniach sa môže niečo zmeniť.

Kniha „Kafka Streams in Action. Aplikácie a mikroslužby pre prácu v reálnom čase"
Aby sme to uzavreli, zopakujme si základy: môžete pripojiť streamy udalostí (KStream) a aktualizovať streamy (KTable) pomocou miestneho stavu. Prípadne, ak veľkosť referenčných údajov nie je príliš veľká, môžete použiť objekt GlobalKTable. GlobalKTables replikujú všetky oddiely do každého aplikačného uzla Kafka Streams, čím zaisťujú dostupnosť všetkých údajov bez ohľadu na to, ktorému oddielu zodpovedá kľúč.

Ďalej uvidíme funkciu Kafka Streams, vďaka ktorej môžeme pozorovať zmeny stavu bez spotreby dát z kafkovskej témy.

5.3.5. Dopytovateľný stav

Už sme vykonali niekoľko operácií zahŕňajúcich stav a vždy výstup výsledkov do konzoly (pre účely vývoja) alebo ich zapísanie do témy (na účely produkcie). Pri písaní výsledkov k téme musíte na ich zobrazenie použiť spotrebiteľa Kafka.

Čítanie údajov z týchto tém možno považovať za typ zhmotnených pohľadov. Pre naše účely môžeme použiť definíciu materializovaného pohľadu z Wikipédie: „...fyzický databázový objekt obsahujúci výsledky dotazu. Môže to byť napríklad lokálna kópia vzdialených údajov alebo podmnožina riadkov a/alebo stĺpcov tabuľky alebo výsledkov spojenia, alebo súhrnná tabuľka získaná prostredníctvom agregácie“ (https://en.wikipedia.org/wiki /Materializované_zobrazenie).

Kafka Streams vám tiež umožňuje spúšťať interaktívne dopyty na štátne obchody, čo vám umožňuje priamo čítať tieto zhmotnené pohľady. Je dôležité poznamenať, že dotaz na stavový sklad je operácia len na čítanie. To zaisťuje, že sa nemusíte obávať náhodného nekonzistentnosti stavu, keď vaša aplikácia spracováva údaje.

Schopnosť priamo dopytovať stavové obchody je dôležitá. To znamená, že môžete vytvárať aplikácie dashboardu bez toho, aby ste museli najprv načítať údaje od spotrebiteľa Kafka. Zvyšuje tiež efektivitu aplikácie, pretože nie je potrebné znova zapisovať dáta:

  • vďaka lokalite údajov sú k nim rýchlo prístupné;
  • duplicita údajov je eliminovaná, pretože sa nezapisujú na externé úložisko.

Hlavná vec, ktorú chcem, aby ste si zapamätali, je, že môžete priamo dotazovať stav z vašej aplikácie. Príležitosti, ktoré vám to dáva, nemožno preceňovať. Namiesto spotrebovávania údajov z Kafky a ukladania záznamov do databázy pre aplikáciu môžete dotazovať stavové sklady s rovnakým výsledkom. Priame dopyty do štátnych obchodov znamenajú menej kódu (žiadny spotrebiteľ) a menej softvéru (nie je potrebná tabuľka databázy na ukladanie výsledkov).

V tejto kapitole sme prebrali dosť veľa vecí, takže diskusiu o interaktívnych dopytoch proti štátnym obchodom zatiaľ necháme. Ale nebojte sa: v kapitole 9 vytvoríme jednoduchú aplikáciu dashboard s interaktívnymi dopytmi. Použije niektoré príklady z tejto a predchádzajúcich kapitol na demonštráciu interaktívnych dopytov a na to, ako ich môžete pridať do aplikácií Kafka Streams.

Zhrnutie

  • Objekty KStream predstavujú prúdy udalostí, porovnateľné s vkladmi do databázy. Objekty KTable predstavujú aktualizačné prúdy, skôr aktualizácie databázy. Veľkosť objektu KTable nerastie, staré záznamy sú nahradené novými.
  • Pre operácie agregácie sú potrebné objekty KTable.
  • Pomocou operácií okien môžete rozdeliť agregované údaje do časových segmentov.
  • Vďaka objektom GlobalKTable máte prístup k referenčným údajom kdekoľvek v aplikácii, bez ohľadu na rozdelenie.
  • Sú možné prepojenia medzi objektmi KStream, KTable a GlobalKTable.

Doteraz sme sa zamerali na vytváranie aplikácií Kafka Streams pomocou vysokoúrovňového KStream DSL. Hoci vám prístup na vysokej úrovni umožňuje vytvárať prehľadné a stručné programy, jeho používanie predstavuje kompromis. Práca s DSL KStream znamená zvýšenie stručnosti vášho kódu znížením stupňa kontroly. V ďalšej kapitole sa pozrieme na rozhranie API uzla obslužného programu na nízkej úrovni a vyskúšame ďalšie kompromisy. Programy budú dlhšie ako predtým, ale budeme môcť vytvoriť takmer akýkoľvek uzol obsluhy, ktorý by sme mohli potrebovať.

→ Viac podrobností o knihe nájdete na webová stránka vydavateľa

→ Pre Habrozhiteli 25% zľavu pomocou kupónu - Kafkove prúdy

→ Po zaplatení papierovej verzie knihy bude elektronická kniha zaslaná e-mailom.

Zdroj: hab.com

Pridať komentár