Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase" Dobrý den, obyvatelé Khabra! Tato kniha je vhodná pro každého vývojáře, který chce porozumět zpracování vláken. Pochopení distribuovaného programování vám pomůže lépe porozumět Kafkovi a Kafka Streams. Bylo by hezké znát samotný framework Kafka, ale není to nutné: řeknu vám vše, co potřebujete. Zkušení vývojáři Kafka i nováčci se v této knize naučí vytvářet zajímavé aplikace pro zpracování streamů pomocí knihovny Kafka Streams. Středně pokročilí a pokročilí vývojáři v Javě, kteří jsou již obeznámeni s koncepty, jako je serializace, se naučí uplatnit své dovednosti při vytváření aplikací Kafka Streams. Zdrojový kód knihy je napsán v jazyce Java 8 a výrazně využívá syntaxi výrazu Java 8 lambda, takže znalost práce s funkcemi lambda (i v jiném programovacím jazyce) přijde vhod.

Výňatek. 5.3. Agregační a okenní operace

V této sekci přejdeme k prozkoumání nejslibnějších částí Kafkových proudů. Dosud jsme se zabývali následujícími aspekty Kafka Streams:

  • vytvoření topologie zpracování;
  • použití stavu ve streamovacích aplikacích;
  • provádění připojení toku dat;
  • rozdíly mezi streamy událostí (KStream) a streamy aktualizací (KTable).

V následujících příkladech svedeme všechny tyto prvky dohromady. Dozvíte se také o vytváření oken, další skvělé funkci streamovacích aplikací. Náš první příklad bude jednoduchá agregace.

5.3.1. Agregace tržeb akcií podle odvětví

Agregace a seskupování jsou životně důležité nástroje při práci se streamovanými daty. Zkoumání jednotlivých záznamů tak, jak jsou přijímány, je často nedostatečné. Chcete-li získat další informace z dat, je nutné je seskupit a kombinovat.

V tomto příkladu si obléknete kostým denního obchodníka, který potřebuje sledovat objem prodeje akcií společností v několika odvětvích. Konkrétně vás zajímá pět společností s největším podílem prodeje v každém odvětví.

Taková agregace bude vyžadovat několik následujících kroků k převedení dat do požadované formy (obecně řečeno).

  1. Vytvořte tematický zdroj, který publikuje nezpracované informace o obchodování s akciemi. Budeme muset namapovat objekt typu StockTransaction na objekt typu ShareVolume. Jde o to, že objekt StockTransaction obsahuje prodejní metadata, ale potřebujeme pouze data o počtu prodávaných akcií.
  2. Seskupit data ShareVolume podle symbolu akcií. Po seskupení podle symbolu můžete tato data sbalit do mezisoučtů objemů prodeje zásob. Stojí za zmínku, že metoda KStream.groupBy vrací instanci typu KGroupedStream. A instanci KTable můžete získat dalším voláním metody KGroupedStream.reduce.

Co je rozhraní KGroupedStream

Metody KStream.groupBy a KStream.groupByKey vracejí instanci KGroupedStream. KGroupedStream je střední reprezentace proudu událostí po seskupení podle klíčů. Pro přímou práci s ním není vůbec určen. Místo toho se KGroupedStream používá pro agregační operace, jejichž výsledkem je vždy KTable. A protože výsledkem agregačních operací je KTable a používají stavové úložiště, je možné, že ne všechny aktualizace jako výsledek budou odeslány dále v kanálu.

Metoda KTable.groupBy vrací podobnou KGroupedTable - přechodnou reprezentaci proudu aktualizací, přeskupených podle klíče.

Dáme si krátkou přestávku a podíváme se na Obr. 5.9, která ukazuje, čeho jsme dosáhli. Tato topologie by vám již měla být velmi známá.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Podívejme se nyní na kód této topologie (lze jej nalézt v souboru src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (výpis 5.2).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Daný kód se vyznačuje stručností a velkým objemem akcí prováděných v několika řádcích. V prvním parametru metody builder.stream si můžete všimnout něčeho nového: hodnoty výčtového typu AutoOffsetReset.EARLIEST (existuje také LATEST), nastavené pomocí metody Consumed.withOffsetResetPolicy. Tento typ výčtu lze použít k určení strategie resetování posunu pro každý KStream nebo KTable a má přednost před možností resetování posunu z konfigurace.

GroupByKey a GroupBy

Rozhraní KStream má dvě metody pro seskupování záznamů: GroupByKey a GroupBy. Oba vrací KGroupedTable, takže by vás mohlo zajímat, jaký je mezi nimi rozdíl a kdy použít který z nich?

Metoda GroupByKey se používá, když klíče v KStream již nejsou prázdné. A co je nejdůležitější, příznak „vyžaduje opětovné rozdělení“ nebyl nikdy nastaven.

Metoda GroupBy předpokládá, že jste změnili seskupovací klíče, takže příznak přerozdělení je nastaven na hodnotu true. Provádění spojení, agregací atd. po metodě GroupBy bude mít za následek automatické opětovné rozdělení.
Shrnutí: Kdykoli je to možné, měli byste používat GroupByKey spíše než GroupBy.

Je jasné, co dělají metody mapValues ​​​​a groupBy, takže se podívejme na metodu sum() (najdete ji v src/main/java/bbejeck/model/ShareVolume.java) (výpis 5.3).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Metoda ShareVolume.sum vrací průběžný součet objemu prodeje zásob a výsledkem celého řetězce výpočtů je objekt KTable . Nyní chápete, jakou roli hraje KTable. Když objekty ShareVolume dorazí, odpovídající objekt KTable uloží nejnovější aktuální aktualizaci. Je důležité si uvědomit, že všechny aktualizace se projeví v předchozí tabulce shareVolumeKTable, ale ne všechny se odesílají dále.

Dále pomocí této tabulky KT agregujeme (podle počtu obchodovaných akcií), abychom dospěli k pěti společnostem s nejvyššími objemy akcií obchodovaných v každém odvětví. Naše akce v tomto případě budou podobné těm pro první agregaci.

  1. Proveďte další operaci groupBy a seskupte jednotlivé objekty ShareVolume podle odvětví.
  2. Začněte sumarizovat objekty ShareVolume. Tentokrát je objektem agregace fronta s pevnou prioritou. V této frontě s pevnou velikostí je ponecháno pouze pět společností s největším množstvím prodaných akcií.
  3. Namapujte fronty z předchozího odstavce na hodnotu řetězce a vraťte prvních pět nejobchodovanějších akcií podle počtu podle odvětví.
  4. Výsledky zapište v řetězci k tématu.

Na Obr. Obrázek 5.10 ukazuje graf topologie datového toku. Jak vidíte, druhé kolo zpracování je celkem jednoduché.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Nyní, když jasně rozumíme struktuře tohoto druhého kola zpracování, můžeme přejít k jeho zdrojovému kódu (najdete ho v souboru src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Výpis 5.4) .

Tento inicializátor obsahuje proměnnou fixedQueue. Toto je vlastní objekt, který je adaptérem pro java.util.TreeSet, který se používá ke sledování prvních N výsledků v sestupném pořadí obchodovaných akcií.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Už jste viděli volání groupBy a mapValues ​​​​, takže se jimi nebudeme zabývat (voláme metodu KTable.toStream, protože metoda KTable.print je zastaralá). Ale ještě jste neviděli verzi agregátu () KTable, takže o tom strávíme trochu času.

Jak si pamatujete, KTable se liší tím, že záznamy se stejnými klíči jsou považovány za aktualizace. KTable nahradí starý záznam novým. Agregace probíhá podobným způsobem: agregují se nejnovější záznamy se stejným klíčem. Když přijde záznam, je přidán do instance třídy FixedSizePriorityQueue pomocí sčítačky (druhý parametr ve volání agregované metody), ale pokud již existuje jiný záznam se stejným klíčem, pak se starý záznam odstraní pomocí odčítače (třetí parametr v volání agregované metody).

To vše znamená, že náš agregátor FixedSizePriorityQueue neshromažďuje všechny hodnoty pomocí jednoho klíče, ale ukládá pohyblivý součet množství N nejobchodovanějších typů akcií. Každá příchozí položka obsahuje celkový počet dosud prodaných akcií. KTable vám poskytne informace o tom, které akcie společností jsou v současné době nejobchodovanější, aniž by bylo nutné provádět agregaci každé aktualizace.

Naučili jsme se dělat dvě důležité věci:

  • seskupit hodnoty v KTable pomocí společného klíče;
  • provádět užitečné operace, jako je souhrn a agregace na těchto seskupených hodnotách.

Vědět, jak tyto operace provádět, je důležité pro pochopení významu dat procházejících aplikací Kafka Streams a pochopení toho, jaké informace přenáší.

Shromáždili jsme také některé z klíčových konceptů, o kterých jsme v této knize hovořili dříve. V kapitole 4 jsme diskutovali o tom, jak je pro streamovací aplikaci důležitý místní stav odolný proti chybám. První příklad v této kapitole ukázal, proč je místní stát tak důležitý – dává vám možnost sledovat, jaké informace jste již viděli. Místní přístup zabraňuje zpožděním v síti, díky čemuž je aplikace výkonnější a odolnější proti chybám.

Při provádění jakékoli operace kumulace nebo agregace musíte zadat název úložiště stavu. Operace kumulace a agregace vrátí instanci KTable a KTable používá stavové úložiště k nahrazení starých výsledků novými. Jak jste viděli, ne všechny aktualizace se odesílají do kanálu, a to je důležité, protože agregační operace jsou navrženy tak, aby produkovaly souhrnné informace. Pokud nepoužijete místní stav, KTable předá všechny výsledky agregace a souhrnu.

Dále se podíváme na provádění operací, jako je agregace v určitém časovém období – takzvané operace oken.

5.3.2. Operace s okny

V předchozí části jsme představili posuvnou konvoluci a agregaci. Aplikace prováděla kontinuální roll-up prodeje akcií s následnou agregací pěti nejobchodovanějších akcií na burze.

Někdy je taková průběžná agregace a shrnování výsledků nezbytná. A někdy je potřeba provádět operace pouze za dané časové období. Spočítejte například, kolik směnných transakcí bylo provedeno s akciemi konkrétní společnosti za posledních 10 minut. Nebo kolik uživatelů kliklo na nový reklamní banner za posledních 15 minut. Aplikace může provádět takové operace vícekrát, ale s výsledky, které se vztahují pouze na určitá časová období (časová okna).

Počítání směnných transakcí kupujícím

V dalším příkladu budeme sledovat akciové transakce mezi více obchodníky – buď velkými organizacemi, nebo chytrými jednotlivými finančníky.

Existují dva možné důvody pro toto sledování. Jedním z nich je potřeba vědět, co lídři na trhu kupují/prodávají. Pokud tito velcí hráči a sofistikovaní investoři vidí příležitost, má smysl řídit se jejich strategií. Druhým důvodem je touha odhalit jakékoli možné známky nezákonného obchodování zasvěcených osob. K tomu budete muset analyzovat korelaci velkých prodejních špiček s důležitými tiskovými zprávami.

Takové sledování se skládá z následujících kroků:

  • vytvoření streamu pro čtení z tématu burzovní transakce;
  • seskupení příchozích záznamů podle ID kupujícího a symbolu akcií. Volání metody groupBy vrátí instanci třídy KGroupedStream;
  • Metoda KGroupedStream.windowedBy vrací datový proud omezený na časové okno, což umožňuje agregaci v okně. V závislosti na typu okna se vrátí buď TimeWindowedKStream nebo SessionWindowedKStream;
  • počet transakcí pro operaci agregace. Datový tok v okně určuje, zda je v tomto počtu zohledněn konkrétní záznam;
  • zápis výsledků do tématu nebo jejich výstup do konzole během vývoje.

Topologie této aplikace je jednoduchá, ale byl by užitečný její jasný obrázek. Podívejme se na Obr. 5.11.

Dále se podíváme na funkčnost operací s okny a odpovídající kód.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"

Typy oken

V Kafkových proudech jsou tři typy oken:

  • relační;
  • „omílání“ (omílání);
  • klouzání/poskakování.

Který z nich si vybrat, závisí na požadavcích vašeho podnikání. Okna přehazování a skákání jsou časově omezena, zatímco okna relací jsou omezena aktivitou uživatele – trvání relací (relací) je určeno výhradně tím, jak aktivní je uživatel. Hlavní věc, kterou je třeba si zapamatovat, je, že všechny typy oken jsou založeny na datech a časech položek, nikoli na systémovém čase.

Dále implementujeme naši topologii s každým z typů oken. Úplný kód bude uveden pouze v prvním příkladu, pro ostatní typy oken se kromě typu ovládání okna nic nemění.

Okna relace

Okna relací se velmi liší od všech ostatních typů oken. Nejsou omezeny ani tak časem, jako spíše aktivitou uživatele (nebo aktivitou subjektu, který byste chtěli sledovat). Okna relace jsou ohraničena obdobími nečinnosti.

Obrázek 5.12 znázorňuje koncept oken relace. Menší relace se spojí s relací nalevo. A relace vpravo bude samostatná, protože následuje po dlouhé době nečinnosti. Okna relace jsou založena na aktivitě uživatele, ale k určení, ke které relaci položka patří, se používají razítka data/času z položek.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"

Použití oken relace ke sledování burzovních transakcí

Použijme okna relací k zachycení informací o výměnných transakcích. Implementace oken relace je uvedena ve výpisu 5.5 (který lze nalézt v src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Většinu operací v této topologii jste již viděli, takže zde není třeba se na ně znovu dívat. Je zde ale také několik nových prvků, které si nyní probereme.

Jakákoli operace groupBy obvykle provádí určitý druh operace agregace (agregace, souhrn nebo počítání). Můžete provádět buď kumulativní agregaci s průběžným součtem, nebo agregaci oken, která bere v úvahu záznamy v rámci zadaného časového okna.

Kód ve výpisu 5.5 počítá počet transakcí v oknech relace. Na Obr. 5.13 tyto akce jsou analyzovány krok za krokem.

Voláním windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) vytvoříme okno relace s intervalem nečinnosti 20 sekund a intervalem trvání 15 minut. Nečinný interval 20 sekund znamená, že aplikace zahrne jakýkoli záznam, který dorazí do 20 sekund od konce nebo začátku aktuální relace, do aktuální (aktivní) relace.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Dále určíme, kterou operaci agregace je třeba provést v okně relace – v tomto případě počítat. Pokud se příchozí záznam dostane mimo okno nečinnosti (na obou stranách razítka data/času), aplikace vytvoří novou relaci. Interval uchování znamená udržování relace po určitou dobu a umožňuje pozdní data, která přesahují dobu nečinnosti relace, ale stále mohou být připojena. Začátek a konec nové relace vyplývající ze sloučení navíc odpovídá nejstaršímu a nejnovějšímu datovému/časovému razítku.

Podívejme se na několik záznamů z metody počítání, abychom viděli, jak relace fungují (tabulka 5.1).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Když dorazí záznamy, hledáme existující relace se stejným klíčem, časem ukončení menším, než je aktuální datum/časové razítko – interval nečinnosti, a počátečním časem větším než aktuální datum/časové razítko + interval nečinnosti. Vezmeme-li toto v úvahu, čtyři záznamy z tabulky. 5.1 jsou sloučeny do jedné relace následovně.

1. Záznam 1 dorazí jako první, takže čas začátku je roven času konce a je 00:00:00.

2. Dále přichází záznam 2 a hledáme relace, které končí nejdříve ve 23:59:55 a začínají nejpozději v 00:00:35. Najdeme záznam 1 a spojíme relace 1 a 2. Vezmeme čas začátku relace 1 (dříve) a čas ukončení relace 2 (později), takže naše nová relace začíná v 00:00:00 a končí v 00:00: 15:XNUMX.

3. Přichází záznam 3, hledáme relace mezi 00:00:30 a 00:01:10 a žádné nenacházíme. Přidejte druhou relaci pro klíč 123-345-654,FFBE, začínající a končící v 00:00:50.

4. Přichází záznam 4 a my hledáme relace mezi 23:59:45 a 00:00:25. Tentokrát jsou nalezeny obě relace 1 a 2. Všechny tři relace jsou spojeny do jedné s časem zahájení 00:00:00 a časem ukončení 00:00:15.

Z toho, co je popsáno v této části, stojí za to zapamatovat si následující důležité nuance:

  • relace nejsou okna s pevnou velikostí. Doba trvání relace je určena aktivitou v daném časovém období;
  • Datum/časová razítka v datech určují, zda událost spadá do existující relace nebo do období nečinnosti.

Dále budeme diskutovat o dalším typu oken - „sklápěcích“ oknech.

"Sklápěcí" okna

Otáčející se okna zachycují události, které spadají do určitého časového období. Představte si, že musíte každých 20 sekund zachytit všechny transakce s akciemi určité společnosti, takže shromáždíte všechny události během tohoto časového období. Na konci 20sekundového intervalu se okno přetočí a přesune se na nový 20sekundový interval pozorování. Obrázek 5.14 znázorňuje tuto situaci.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Jak vidíte, v okně jsou zahrnuty všechny události přijaté za posledních 20 sekund. Na konci této doby se vytvoří nové okno.

Výpis 5.6 ukazuje kód, který demonstruje použití otočných oken k zachycení burzovních transakcí každých 20 sekund (nalezeno v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
S touto malou změnou ve volání metody TimeWindows.of můžete použít omílací okno. Tento příklad nevolá metodu till(), takže bude použit výchozí interval uchování 24 hodin.

Konečně je čas přejít k poslední z možností okna – „hopping“ oknům.

Posuvná ("skákací") okna

Posuvná/hoppingová okna jsou podobná omílacím oknům, ale s drobným rozdílem. Posuvná okna nečekají až do konce časového intervalu před vytvořením nového okna pro zpracování nedávných událostí. Nové výpočty zahájí po čekacím intervalu kratším, než je doba trvání okna.

Abychom ilustrovali rozdíly mezi tumbling a jumping windows, vraťme se k příkladu počítání burzovních transakcí. Naším cílem je stále počítat počet transakcí, ale nechceme čekat celou dobu před aktualizací počítadla. Místo toho budeme počítadlo aktualizovat v kratších intervalech. Například budeme stále počítat počet transakcí každých 20 sekund, ale aktualizujeme počítadlo každých 5 sekund, jak je znázorněno na Obr. 5.15. V tomto případě skončíme se třemi výsledkovými okny s překrývajícími se daty.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Výpis 5.7 ukazuje kód pro definování posuvných oken (nachází se v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Otočné okno lze převést na skákací okno přidáním volání metody advanceBy() . V uvedeném příkladu je interval ukládání 15 minut.

V této části jste viděli, jak omezit výsledky agregace na časová okna. Zejména chci, abyste si zapamatovali následující tři věci z této části:

  • velikost oken relace není omezena časovým obdobím, ale aktivitou uživatele;
  • „omílací“ okna poskytují přehled událostí v daném časovém období;
  • Doba trvání přeskakování oken je pevná, ale jsou často aktualizována a mohou obsahovat překrývající se položky ve všech oknech.

Dále se naučíme, jak převést KTable zpět na KStream pro připojení.

5.3.3. Propojení objektů KStream a KTable

V kapitole 4 jsme diskutovali o propojení dvou objektů KStream. Nyní se musíme naučit, jak propojit KTable a KStream. To může být potřeba z následujícího jednoduchého důvodu. KStream je proud záznamů a KTable je proud aktualizací záznamů, ale někdy možná budete chtít přidat další kontext do streamu záznamů pomocí aktualizací z KTable.

Vezměme data o počtu burzovních transakcí a spojme je s burzovními novinkami pro příslušná odvětví. Zde je to, co musíte udělat, abyste toho dosáhli s ohledem na kód, který již máte.

  1. Převeďte objekt KTable s údaji o počtu burzovních transakcí na KStream, poté nahradíte klíč klíčem označujícím průmyslový sektor odpovídající tomuto akciovému symbolu.
  2. Vytvořte objekt KTable, který čte data z tématu s burzovními zprávami. Tento nový KTtable bude rozdělen do kategorií podle průmyslového sektoru.
  3. Propojte aktuality s informacemi o počtu burzovních transakcí podle odvětví.

Nyní se podívejme, jak tento akční plán implementovat.

Převést KTable na KStream

Chcete-li převést KTable na KStream, musíte provést následující.

  1. Zavolejte metodu KTable.toStream().
  2. Voláním metody KStream.map nahraďte klíč názvem odvětví a poté načtěte objekt TransactionSummary z instance Windowed.

Tyto operace zřetězíme následovně (kód lze nalézt v souboru src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (výpis 5.8).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Protože provádíme operaci KStream.map, je vrácená instance KStream automaticky znovu rozdělena, když je použita v připojení.

Dokončili jsme proces převodu, dále musíme vytvořit objekt KTable pro čtení burzovních zpráv.

Vytvoření KTtable pro novinky o akciích

Naštěstí vytvoření objektu KTable zabere pouze jeden řádek kódu (kód lze nalézt v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (výpis 5.9).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Stojí za zmínku, že není nutné zadat žádné objekty Serde, protože v nastavení jsou použity řetězce Serdes. Také pomocí výčtu NEJSTARŠÍ je tabulka naplněna záznamy hned na začátku.

Nyní můžeme přejít k poslednímu kroku – připojení.

Propojení aktualizací novinek s údaji o počtu transakcí

Vytvoření spojení není obtížné. Levé spojení použijeme v případě, že pro příslušné odvětví nejsou žádné novinky o akciích (potřebný kód najdete v souboru src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Výpis 5.10).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Tento operátor leftJoin je docela jednoduchý. Na rozdíl od spojení v kapitole 4 se metoda JoinWindow nepoužívá, protože při provádění spojení KStream-KTable je v tabulce KTable pouze jeden záznam pro každý klíč. Takové spojení není časově omezeno: záznam je buď v KTable, nebo chybí. Hlavní závěr: pomocí objektů KTable můžete obohatit KStream o méně často aktualizovaná referenční data.

Nyní se podíváme na efektivnější způsob, jak obohatit události z KStream.

5.3.4. Objekty GlobalKTable

Jak vidíte, je potřeba obohatit streamy událostí nebo k nim přidat kontext. V kapitole 4 jste viděli spojení mezi dvěma objekty KStream a v předchozí části jste viděli spojení mezi KStreamem a KTable. Ve všech těchto případech je nutné při mapování klíčů na nový typ nebo hodnotu znovu rozdělit datový tok. Někdy se přerozdělení provádí explicitně a někdy to Kafka Streams provádí automaticky. Přerozdělení je nutné, protože se změnily klíče a záznamy musí skončit v nových sekcích, jinak nebude připojení možné (toto bylo probráno v kapitole 4, v části „Přerozdělování dat“ v podsekci 4.2.4).

Opětovné rozdělení má své náklady

Přerozdělení vyžaduje náklady – dodatečné náklady na zdroje pro vytváření dílčích témat, ukládání duplicitních dat v jiném tématu; znamená to také zvýšenou latenci kvůli psaní a čtení z tohoto tématu. Pokud se navíc potřebujete spojit ve více než jednom aspektu nebo dimenzi, musíte spojení zřetězit, namapovat záznamy s novými klíči a znovu spustit proces opětovného rozdělení.

Připojení k menším datovým sadám

V některých případech je objem referenčních dat, která mají být připojena, relativně malý, takže jejich kompletní kopie se snadno vejdou lokálně na každý uzel. Pro situace jako je tato poskytuje Kafka Streams třídu GlobalKTable.

Instance GlobalKTable jsou jedinečné, protože aplikace replikuje všechna data do každého z uzlů. A protože všechna data jsou přítomna na každém uzlu, není potřeba rozdělovat tok událostí pomocí referenčního datového klíče, aby byl dostupný všem oddílům. Můžete také provádět bezklíčová spojení pomocí objektů GlobalKTable. Vraťme se k jednomu z předchozích příkladů, abychom tuto funkci demonstrovali.

Připojení objektů KStream k objektům GlobalKTable

V podsekci 5.3.2 jsme provedli okenní agregaci směnných transakcí podle kupujících. Výsledky této agregace vypadaly asi takto:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

I když tyto výsledky splnily svůj účel, bylo by užitečnější, kdyby se také zobrazilo jméno zákazníka a celý název společnosti. Chcete-li přidat jméno zákazníka a název společnosti, můžete provést normální spojení, ale budete muset provést dvě mapování klíčů a nové rozdělení. S GlobalKTable se můžete vyhnout nákladům na takové operace.

K tomu použijeme objekt countStream z výpisu 5.11 (odpovídající kód lze nalézt v src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) a propojíme jej se dvěma objekty GlobalKTable.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Už jsme to probírali, takže to nebudu opakovat. Ale podotýkám, že kód ve funkci toStream().map je z důvodu čitelnosti abstrahován do funkčního objektu namísto vloženého výrazu lambda.

Dalším krokem je deklarace dvou instancí GlobalKTable (zobrazený kód lze nalézt v souboru src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (výpis 5.12).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"

Vezměte prosím na vědomí, že názvy témat jsou popsány pomocí výčtových typů.

Nyní, když máme všechny komponenty hotové, zbývá už jen napsat kód pro připojení (který najdete v souboru src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Výpis 5.13).

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Přestože v tomto kódu existují dvě spojení, jsou zřetězená, protože žádný z jejich výsledků není použit samostatně. Výsledky se zobrazí na konci celé operace.

Když spustíte výše uvedenou operaci spojení, získáte výsledky jako tento:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Podstata se nezměnila, ale tyto výsledky vypadají jasněji.

Pokud odpočítáváte do kapitoly 4, už jste viděli několik typů spojení v akci. Jsou uvedeny v tabulce. 5.2. Tato tabulka odráží možnosti připojení od verze 1.0.0 Kafka Streams; V budoucích verzích se může něco změnit.

Kniha „Kafka Streams in Action. Aplikace a mikroslužby pro práci v reálném čase"
Abychom to uzavřeli, zrekapitulujme si základy: můžete připojit streamy událostí (KStream) a aktualizovat streamy (KTable) pomocí místního stavu. Případně, pokud velikost referenčních dat není příliš velká, můžete použít objekt GlobalKTable. GlobalKTables replikují všechny oddíly do každého aplikačního uzlu Kafka Streams a zajišťují, že všechna data jsou dostupná bez ohledu na to, kterému oddílu klíč odpovídá.

Dále uvidíme funkci Kafka Streams, díky které můžeme pozorovat změny stavu, aniž bychom spotřebovávali data z kafkovského tématu.

5.3.5. Dotazovatelný stav

Již jsme provedli několik operací zahrnujících stav a vždy výstup výsledků do konzole (pro účely vývoje) nebo zapsání do tématu (pro účely produkce). Při psaní výsledků k tématu musíte k jejich zobrazení použít spotřebitele Kafka.

Čtení dat z těchto témat lze považovat za typ materializovaných pohledů. Pro naše účely můžeme použít definici materializovaného pohledu z Wikipedie: „...fyzický databázový objekt obsahující výsledky dotazu. Může to být například místní kopie vzdálených dat nebo podmnožina řádků a/nebo sloupců tabulky nebo výsledků spojení nebo souhrnná tabulka získaná agregací“ (https://en.wikipedia.org/wiki /Materializované_zobrazení).

Kafka Streams také umožňuje spouštět interaktivní dotazy na státní obchody, což vám umožňuje přímo číst tyto materializované pohledy. Je důležité si uvědomit, že dotaz na stavové úložiště je operace pouze pro čtení. Tím je zajištěno, že se nemusíte obávat náhodného nekonzistentnosti stavu, když vaše aplikace zpracovává data.

Schopnost přímo dotazovat stavové obchody je důležitá. To znamená, že můžete vytvářet aplikace řídicích panelů, aniž byste museli nejprve načítat data od uživatele Kafka. Zvyšuje také efektivitu aplikace, protože není potřeba znovu zapisovat data:

  • díky lokalizaci dat jsou rychle dostupná;
  • duplikace dat je eliminována, protože se nezapisují na externí úložiště.

Hlavní věc, kterou chci, abyste si zapamatovali, je, že můžete přímo dotazovat stav z vaší aplikace. Příležitosti, které vám to dává, nelze přeceňovat. Místo spotřebovávání dat z Kafky a ukládání záznamů do databáze pro aplikaci můžete se stejným výsledkem dotazovat stavové obchody. Přímé dotazy do státních úložišť znamenají méně kódu (žádný spotřebitel) a méně softwaru (není potřeba databázová tabulka pro uložení výsledků).

V této kapitole jsme toho probrali docela dost, takže naši diskusi o interaktivních dotazech proti státním obchodům zatím opustíme. Ale nebojte se: v kapitole 9 vytvoříme jednoduchou aplikaci dashboard s interaktivními dotazy. Použije některé příklady z této a předchozích kapitol k demonstraci interaktivních dotazů a toho, jak je můžete přidat do aplikací Kafka Streams.

Shrnutí

  • Objekty KStream představují proudy událostí, srovnatelné s vkládáním do databáze. Objekty KTable představují aktualizační proudy, spíše aktualizace databáze. Velikost objektu KTable neroste, staré záznamy jsou nahrazovány novými.
  • Pro operace agregace jsou vyžadovány objekty KTable.
  • Pomocí operací oken můžete rozdělit agregovaná data do časových segmentů.
  • Díky objektům GlobalKTable můžete přistupovat k referenčním datům kdekoli v aplikaci, bez ohledu na rozdělení.
  • Spojení mezi objekty KStream, KTable a GlobalKTable je možné.

Doposud jsme se soustředili na vytváření aplikací Kafka Streams pomocí vysokoúrovňového KStream DSL. Přestože přístup na vysoké úrovni umožňuje vytvářet úhledné a stručné programy, jeho použití představuje kompromis. Práce s DSL KStream znamená zvýšení stručnosti vašeho kódu snížením míry kontroly. V další kapitole se podíváme na nízkoúrovňové rozhraní API pro obslužný uzel a vyzkoušíme další kompromisy. Programy budou delší, než byly dříve, ale budeme schopni vytvořit téměř jakýkoli uzel obslužného programu, který bychom mohli potřebovat.

→ Více podrobností o knize naleznete na webové stránky vydavatele

→ Pro Habrozhiteli 25% slevu pomocí kupónu - Kafkovy proudy

→ Při platbě za papírovou verzi knihy bude elektronická kniha zaslána e-mailem.

Zdroj: www.habr.com

Přidat komentář