Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym” Witam mieszkańców Khabro! Ta książka jest odpowiednia dla każdego programisty, który chce zrozumieć przetwarzanie wątków. Zrozumienie programowania rozproszonego pomoże Ci lepiej zrozumieć platformę Kafka i strumienie Kafka. Byłoby miło poznać sam framework Kafki, ale nie jest to konieczne: opowiem Ci wszystko, czego potrzebujesz. Zarówno doświadczeni programiści, jak i nowicjusze platformy Kafka dowiedzą się z tej książki, jak tworzyć ciekawe aplikacje do przetwarzania strumieni przy użyciu biblioteki Kafka Streams. Średnio-zaawansowani i zaawansowani programiści Java, zaznajomieni już z koncepcjami takimi jak serializacja, nauczą się wykorzystywać swoje umiejętności do tworzenia aplikacji Kafka Streams. Kod źródłowy książki jest napisany w języku Java 8 i w znacznym stopniu wykorzystuje składnię wyrażeń lambda języka Java 8, więc wiedza o pracy z funkcjami lambda (nawet w innym języku programowania) będzie przydatna.

Fragment. 5.3. Operacje agregacji i okienkowania

W tej sekcji przejdziemy do eksploracji najbardziej obiecujących części strumieni Kafka. Do tej pory omówiliśmy następujące aspekty strumieni Kafka:

  • tworzenie topologii przetwarzania;
  • wykorzystanie stanu w aplikacjach strumieniowych;
  • wykonywanie połączeń strumienia danych;
  • różnice pomiędzy strumieniami zdarzeń (KStream) i strumieniami aktualizacji (KTable).

W poniższych przykładach połączymy wszystkie te elementy w jedną całość. Dowiesz się także o okienku, kolejnej wspaniałej funkcji aplikacji do przesyłania strumieniowego. Naszym pierwszym przykładem będzie prosta agregacja.

5.3.1. Agregacja sprzedaży zapasów według branż

Agregacja i grupowanie to niezbędne narzędzia podczas pracy z danymi przesyłanymi strumieniowo. Badanie poszczególnych zapisów w momencie ich otrzymania jest często niewystarczające. Aby wydobyć dodatkowe informacje z danych, konieczne jest ich pogrupowanie i połączenie.

W tym przykładzie przebierzesz się za day tradera, który musi śledzić wielkość sprzedaży akcji spółek z kilku branż. W szczególności interesuje Cię pięć spółek o największej sprzedaży udziałów w każdej branży.

Taka agregacja będzie wymagała kilku kolejnych kroków w celu przetłumaczenia danych na pożądaną formę (mówiąc ogólnie).

  1. Utwórz źródło tematyczne, które publikuje informacje dotyczące handlu surowcami. Będziemy musieli zmapować obiekt typu StockTransaction na obiekt typu ShareVolume. Chodzi o to, że obiekt StockTransaction zawiera metadane sprzedażowe, ale nam potrzebne są jedynie dane o liczbie sprzedawanych akcji.
  2. Grupuj dane ShareVolume według symbolu akcji. Po pogrupowaniu według symboli możesz zwinąć te dane w podsumy wielkości sprzedaży zapasów. Warto zaznaczyć, że metoda KStream.groupBy zwraca instancję typu KGroupedStream. Możesz także uzyskać instancję KTable, wywołując metodę KGroupedStream.reduce.

Co to jest interfejs KGroupedStream

Metody KStream.groupBy i KStream.groupByKey zwracają instancję KGroupedStream. KGroupedStream jest pośrednią reprezentacją strumienia zdarzeń po zgrupowaniu według kluczy. Wcale nie jest przeznaczony do bezpośredniej pracy z nim. Zamiast tego KGroupedStream jest używany do operacji agregacji, które zawsze dają w wyniku KTable. A ponieważ wynikiem operacji agregacji jest tabela KTable, która korzysta z magazynu stanu, możliwe jest, że nie wszystkie aktualizacje zostaną w rezultacie przesłane w dalszej części potoku.

Metoda KTable.groupBy zwraca podobną KGroupedTable — pośrednią reprezentację strumienia aktualizacji, pogrupowaną według klucza.

Zróbmy sobie krótką przerwę i spójrzmy na rys. 5.9, który pokazuje, co osiągnęliśmy. Ta topologia powinna być już Ci dobrze znana.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Przyjrzyjmy się teraz kodowi tej topologii (znajduje się on w pliku src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Podany kod wyróżnia się zwięzłością i dużą ilością akcji wykonywanych w kilku linijkach. Nowość można zauważyć w pierwszym parametrze metody builder.stream: wartość typu wyliczeniowego AutoOffsetReset.EARLIEST (jest też LATEST), ustawiana metodą Consumed.withOffsetResetPolicy. Tego typu wyliczenia można użyć do określenia strategii resetowania przesunięcia dla każdego KStream lub KTable i ma pierwszeństwo przed opcją resetowania przesunięcia z konfiguracji.

GroupByKey i GroupBy

Interfejs KStream udostępnia dwie metody grupowania rekordów: GroupByKey i GroupBy. Obydwa zwracają KGroupedTable, więc możesz się zastanawiać, jaka jest między nimi różnica i kiedy użyć którego?

Metoda GroupByKey jest używana, gdy klucze w KStream są już niepuste. A co najważniejsze, flaga „wymaga ponownego partycjonowania” nigdy nie została ustawiona.

Metoda GroupBy zakłada, że ​​klucze grupujące zostały zmienione, więc flaga ponownego podziału jest ustawiona na wartość true. Wykonywanie złączeń, agregacji itp. po metodzie GroupBy spowoduje automatyczne ponowne partycjonowanie.
Podsumowanie: Jeśli to możliwe, powinieneś używać GroupByKey zamiast GroupBy.

Jasne jest, co robią metody mapValues ​​i groupBy, więc przyjrzyjmy się metodzie sum() (znalezionej w src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Metoda ShareVolume.sum zwraca bieżącą sumę wolumenu sprzedaży zapasów, a wynikiem całego łańcucha obliczeń jest obiekt KTable . Teraz rozumiesz rolę, jaką odgrywa KTable. Kiedy pojawią się obiekty ShareVolume, odpowiedni obiekt KTable przechowuje najnowszą bieżącą aktualizację. Należy pamiętać, że wszystkie aktualizacje znajdują odzwierciedlenie w poprzedniej tabeli shareVolumeKTable, ale nie wszystkie są przesyłane dalej.

Następnie, korzystając z tej tabeli KTable, dokonujemy agregacji (według liczby akcji w obrocie), aby otrzymać pięć spółek z największym wolumenem akcji w obrocie w każdej branży. Nasze działania w tym przypadku będą podobne jak w przypadku pierwszej agregacji.

  1. Wykonaj kolejną operację groupBy, aby pogrupować poszczególne obiekty ShareVolume według branży.
  2. Rozpocznij podsumowywanie obiektów ShareVolume. Tym razem obiektem agregacji jest kolejka priorytetowa o stałym rozmiarze. W tej kolejce o stałej wielkości zatrzymywanych jest tylko pięć spółek z największą liczbą sprzedanych akcji.
  3. Zamapuj kolejki z poprzedniego akapitu na wartość ciągu i zwróć pięć akcji, którymi handluje się najczęściej według liczby i branży.
  4. Zapisz wyniki w postaci ciągu znaków w temacie.

Na ryc. Rysunek 5.10 przedstawia wykres topologii przepływu danych. Jak widać, druga runda przetwarzania jest dość prosta.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Teraz, gdy mamy już jasne zrozumienie struktury tej drugiej rundy przetwarzania, możemy zająć się jej kodem źródłowym (znajdziesz go w pliku src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Ten inicjator zawiera zmienną fixQueue. Jest to niestandardowy obiekt będący adapterem dla java.util.TreeSet, używany do śledzenia N pierwszych wyników w malejącej kolejności sprzedawanych akcji.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Wywołania groupBy i mapValues ​​już widziałeś, więc nie będziemy się nimi zajmować (wywołujemy metodę KTable.toStream, ponieważ metoda KTable.print jest przestarzała). Ale nie widziałeś jeszcze wersji agregatu() w KTable, więc poświęcimy trochę czasu na omówienie tej kwestii.

Jak pamiętasz, tym, co wyróżnia KTable, jest to, że rekordy z tymi samymi kluczami są uważane za aktualizacje. KTable zastępuje stary wpis nowym. Agregacja odbywa się w podobny sposób: agregowane są najnowsze rekordy o tym samym kluczu. Po nadejściu rekordu jest on dodawany do instancji klasy StałeSizePriorityQueue za pomocą sumatora (drugi parametr w wywołaniu metody agregującej), ale jeśli istnieje już inny rekord z tym samym kluczem, to stary rekord jest usuwany za pomocą odejmowanie (trzeci parametr w wywołaniu metody agregującej) wywołanie metody agregowanej).

To wszystko oznacza, że ​​nasz agregator FixSizePriorityQueue nie agreguje wszystkich wartości jednym kluczem, ale przechowuje ruchomą sumę ilości N najczęściej handlowanych typów akcji. Każdy przychodzący wpis zawiera całkowitą liczbę sprzedanych dotychczas akcji. KTable dostarczy Ci informacji o akcjach spółek, które są obecnie przedmiotem największego obrotu, bez konieczności ciągłego agregowania każdej aktualizacji.

Nauczyliśmy się robić dwie ważne rzeczy:

  • grupuj wartości w KTable wspólnym kluczem;
  • wykonywać przydatne operacje, takie jak zestawienie i agregacja na tych zgrupowanych wartościach.

Znajomość sposobu wykonywania tych operacji jest ważna dla zrozumienia znaczenia danych przesyłanych przez aplikację Kafka Streams i zrozumienia, jakie informacje niosą.

Zebraliśmy także w jednym miejscu niektóre kluczowe pojęcia omówione wcześniej w tej książce. W rozdziale 4 omówiliśmy, jak ważny jest odporny na awarie stan lokalny w aplikacji do przesyłania strumieniowego. Pierwszy przykład w tym rozdziale pokazał, dlaczego stan lokalny jest tak ważny — daje możliwość śledzenia informacji, które już widziałeś. Dostęp lokalny pozwala uniknąć opóźnień w sieci, dzięki czemu aplikacja jest bardziej wydajna i odporna na błędy.

Podczas wykonywania dowolnej operacji podsumowania lub agregacji należy określić nazwę magazynu stanu. Operacje zestawienia i agregacji zwracają instancję KTable, a KTable wykorzystuje pamięć stanu do zastąpienia starych wyników nowymi. Jak widać, nie wszystkie aktualizacje są wysyłane w dół potoku, a to jest ważne, ponieważ operacje agregacji mają na celu wygenerowanie podsumowujących informacji. Jeśli nie zastosujesz stanu lokalnego, KTable przekaże wszystkie wyniki agregacji i podsumowania.

Następnie przyjrzymy się wykonywaniu operacji takich jak agregacja w określonym przedziale czasu – tzw. operacjom okienkowym.

5.3.2. Operacje okienne

W poprzedniej sekcji przedstawiliśmy splot ślizgowy i agregację. Aplikacja wykonywała ciągłe zwiększanie wolumenu sprzedaży akcji, a następnie agregację pięciu akcji, którymi notowano najwięcej na giełdzie.

Czasami konieczna jest ciągła agregacja i podsumowanie wyników. A czasami trzeba wykonywać operacje tylko przez określony czas. Przykładowo oblicz, ile transakcji wymiany akcji danej spółki dokonano w ciągu ostatnich 10 minut. Lub ilu użytkowników kliknęło nowy baner reklamowy w ciągu ostatnich 15 minut. Aplikacja może wykonywać takie operacje wielokrotnie, ale z wynikami, które dotyczą tylko określonych przedziałów czasu (okien czasowych).

Liczenie transakcji wymiany według kupującego

W następnym przykładzie będziemy śledzić transakcje na akcjach pomiędzy wieloma inwestorami — albo dużymi organizacjami, albo inteligentnymi indywidualnymi finansistami.

Istnieją dwie możliwe przyczyny tego śledzenia. Jednym z nich jest potrzeba wiedzy, co kupują/sprzedają liderzy rynku. Jeśli ci wielcy gracze i wyrafinowani inwestorzy dostrzegą szansę, warto zastosować się do ich strategii. Drugim powodem jest chęć wykrycia ewentualnych oznak nielegalnego wykorzystywania informacji poufnych. Aby to zrobić, będziesz musiał przeanalizować korelację dużych skoków sprzedaży z ważnymi komunikatami prasowymi.

Takie śledzenie składa się z następujących kroków:

  • utworzenie strumienia do czytania z tematu transakcji giełdowych;
  • grupowanie przychodzących rekordów według identyfikatora kupującego i symbolu giełdowego. Wywołanie metody groupBy zwraca instancję klasy KGroupedStream;
  • Metoda KGroupedStream.windowedBy zwraca strumień danych ograniczony do okna czasowego, co umożliwia agregację okienkową. W zależności od typu okna zwracany jest TimeWindowedKStream lub SessionWindowedKStream;
  • liczba transakcji dla operacji agregacji. Okienkowy przepływ danych określa, czy konkretny rekord zostanie uwzględniony w tym zliczeniu;
  • zapisywanie wyników w temacie lub wysyłanie ich do konsoli podczas programowania.

Topologia tej aplikacji jest prosta, ale jasny jej obraz byłby pomocny. Przyjrzyjmy się rys. 5.11.

Następnie przyjrzymy się funkcjonalności operacji na oknach i odpowiadającemu im kodowi.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”

Typy okien

W strumieniach Kafka występują trzy typy okien:

  • sesyjny;
  • „upadek” (upadek);
  • ślizganie/skakanie.

To, który z nich wybrać, zależy od wymagań Twojej firmy. Okna wirowania i przeskakiwania są ograniczone czasowo, podczas gdy okna sesji są ograniczone aktywnością użytkownika — czas trwania sesji zależy wyłącznie od aktywności użytkownika. Najważniejszą rzeczą do zapamiętania jest to, że wszystkie typy okien opierają się na znacznikach daty/godziny wpisów, a nie na czasie systemowym.

Następnie implementujemy naszą topologię dla każdego z typów okien. Kompletny kod zostanie podany tylko w pierwszym przykładzie, dla pozostałych typów okien nie zmieni się nic poza rodzajem działania okna.

Okna sesji

Okna sesji bardzo różnią się od wszystkich innych typów okien. Są one ograniczone nie tyle czasem, ile aktywnością użytkownika (lub aktywnością podmiotu, który chcesz śledzić). Okna sesji są ograniczone okresami bezczynności.

Rysunek 5.12 ilustruje koncepcję okien sesji. Mniejsza sesja połączy się z sesją po jej lewej stronie. Sesja po prawej będzie osobna, ponieważ następuje po długim okresie bezczynności. Okna sesji są oparte na aktywności użytkownika, ale używają znaczników daty/godziny z wpisów, aby określić, do której sesji należy dany wpis.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”

Korzystanie z okien sesji do śledzenia transakcji giełdowych

Wykorzystajmy okna sesji do przechwytywania informacji o transakcjach giełdowych. Implementację okien sesji pokazano na Listingu 5.5 (który można znaleźć w pliku src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Większość operacji w tej topologii już widziałeś, więc nie ma potrzeby wracać do nich ponownie. Ale jest tu także kilka nowych elementów, które teraz omówimy.

Każda operacja groupBy zazwyczaj wykonuje pewnego rodzaju operację agregacji (agregację, zestawienie lub zliczanie). Można wykonać agregację skumulowaną z sumą bieżącą lub agregację okienkową, która uwzględnia rekordy w określonym przedziale czasowym.

Kod na Listingu 5.5 zlicza liczbę transakcji w oknach sesji. Na ryc. 5.13 działania te są analizowane krok po kroku.

Wywołując windowedBy(SessionWindows.with(dwadzieścia sekund).until(fifteenMinutes)) tworzymy okno sesji z interwałem braku aktywności wynoszącym 20 sekund i interwałem trwałości wynoszącym 15 minut. Okres bezczynności wynoszący 20 sekund oznacza, że ​​aplikacja uwzględni wpisy, które pojawią się w ciągu 20 sekund od zakończenia lub rozpoczęcia bieżącej sesji, w bieżącej (aktywnej) sesji.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Następnie w oknie sesji określamy jaka operacja agregacji ma zostać wykonana – w tym przypadku count. Jeśli wpis przychodzący wykracza poza okno braku aktywności (po obu stronach znacznika daty/godziny), aplikacja tworzy nową sesję. Interwał przechowywania oznacza utrzymywanie sesji przez określony czas i pozwala na późne przesyłanie danych, które wykraczają poza okres bezczynności sesji, ale nadal można je dołączyć. Dodatkowo początek i koniec nowej sesji powstałej w wyniku połączenia odpowiadają najwcześniejszemu i najpóźniejszemu znacznikowi daty/godziny.

Przyjrzyjmy się kilku wpisom metody count, aby zobaczyć, jak działają sesje (Tabela 5.1).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Po nadejściu rekordów szukamy istniejących sesji z tym samym kluczem, czasem zakończenia krótszym niż bieżący znacznik daty/godziny – interwał bezczynności i czasem rozpoczęcia dłuższym niż bieżący znacznik daty/godziny + interwał bezczynności. Biorąc to pod uwagę, cztery wpisy z tabeli. 5.1 są łączone w jedną sesję w następujący sposób.

1. Rekord 1 pojawia się jako pierwszy, więc czas rozpoczęcia jest równy czasowi zakończenia i wynosi 00:00:00.

2. Następnie pojawia się wpis 2 i szukamy sesji, które kończą się nie wcześniej niż o 23:59:55 i rozpoczynają nie później niż o 00:00:35. Znajdujemy zapis 1 i łączymy sesje 1 i 2. Bierzemy czas rozpoczęcia sesji 1 (wcześniej) i czas zakończenia sesji 2 (później), tak że nasza nowa sesja zaczyna się o 00:00:00 i kończy o 00: 00:15.

3. Przychodzi nagranie nr 3, szukamy sesji pomiędzy 00:00:30 a 00:01:10 i nie znajdujemy. Dodaj drugą sesję dla klucza 123-345-654,FFBE, zaczynając i kończąc o 00:00:50.

4. Nadchodzi Płyta 4 i szukamy sesji pomiędzy 23:59:45 a 00:00:25. Tym razem znalezione zostaną sesje 1 i 2. Wszystkie trzy sesje zostaną połączone w jedną, której czas rozpoczęcia to 00:00:00, a czas zakończenia to 00:00:15.

Z tego, co opisano w tej sekcji, warto pamiętać o następujących ważnych niuansach:

  • sesje nie są oknami o stałym rozmiarze. Czas trwania sesji zależy od aktywności w danym okresie czasu;
  • Znaczki daty/godziny w danych określają, czy zdarzenie przypada w ramach istniejącej sesji, czy w okresie bezczynności.

Następnie omówimy kolejny typ okna - okna „wpadające”.

„Walące się” okna

Upadające okna rejestrują zdarzenia, które przypadają na określony okres czasu. Wyobraź sobie, że musisz rejestrować wszystkie transakcje giełdowe określonej firmy co 20 sekund, aby zebrać wszystkie zdarzenia z tego okresu. Pod koniec 20-sekundowego interwału okno zostaje przewrócone i przechodzi do nowego 20-sekundowego interwału obserwacji. Rysunek 5.14 ilustruje tę sytuację.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Jak widać, w oknie uwzględnione są wszystkie zdarzenia odebrane w ciągu ostatnich 20 sekund. Po upływie tego okresu tworzone jest nowe okno.

Listing 5.6 przedstawia kod demonstrujący użycie okien wirujących do przechwytywania transakcji giełdowych co 20 sekund (znajduje się w src/main/Java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Dzięki tej niewielkiej zmianie w wywołaniu metody TimeWindows.of możesz użyć okna wirującego. W tym przykładzie nie wywołano metody Until(), dlatego zastosowany zostanie domyślny interwał przechowywania wynoszący 24 godziny.

Na koniec czas przejść do ostatniej z opcji okna – „przeskakiwania” okien.

Przesuwane („skaczące”) okna

Okna przesuwane/skaczące są podobne do okien wirujących, ale z niewielką różnicą. Przesuwane okna nie czekają do końca przedziału czasu, zanim utworzą nowe okno w celu przetworzenia ostatnich zdarzeń. Rozpoczynają nowe obliczenia po okresie oczekiwania krótszym niż czas trwania okna.

Aby zilustrować różnice pomiędzy oknami wirującymi i skaczącymi, wróćmy do przykładu liczenia transakcji giełdowych. Naszym celem nadal jest zliczanie liczby transakcji, ale nie chcemy czekać całego czasu przed aktualizacją licznika. Zamiast tego będziemy aktualizować licznik w krótszych odstępach czasu. Przykładowo nadal będziemy zliczać transakcje co 20 sekund, ale aktualizujemy licznik co 5 sekund, jak pokazano na rys. 5.15. W tym przypadku otrzymujemy trzy okna wynikowe z nakładającymi się danymi.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Listing 5.7 przedstawia kod definiujący przesuwane okna (znajdujący się w src/main/Java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Okno wirujące można przekształcić w okno przeskakujące, dodając wywołanie metody beforeBy(). W pokazanym przykładzie interwał zapisywania wynosi 15 minut.

W tej sekcji widziałeś, jak ograniczyć wyniki agregacji do okien czasowych. W szczególności chcę, abyś zapamiętał następujące trzy rzeczy z tej sekcji:

  • wielkość okien sesji jest ograniczona nie okresem czasu, ale aktywnością użytkownika;
  • „przewracające się” okna dają przegląd zdarzeń w danym przedziale czasu;
  • Czas trwania przeskakujących okien jest stały, ale są one często aktualizowane i mogą zawierać nakładające się wpisy we wszystkich oknach.

Następnie dowiemy się, jak przekonwertować KTable z powrotem na KStream w celu uzyskania połączenia.

5.3.3. Łączenie obiektów KStream i KTable

W rozdziale 4 omówiliśmy połączenie dwóch obiektów KStream. Teraz musimy się nauczyć jak połączyć KTable i KStream. Może to być potrzebne z następującego prostego powodu. KStream to strumień rekordów, a KTable to strumień aktualizacji rekordów, ale czasami możesz chcieć dodać dodatkowy kontekst do strumienia rekordów, korzystając z aktualizacji z KTable.

Weźmy dane o liczbie transakcji giełdowych i połączmy je z wiadomościami giełdowymi dla odpowiednich branż. Oto, co musisz zrobić, aby to osiągnąć, biorąc pod uwagę kod, który już masz.

  1. Konwertuj obiekt KTable z danymi o liczbie transakcji giełdowych na KStream, a następnie zastąp klucz kluczem wskazującym branżę odpowiadającą temu symbolowi giełdowemu.
  2. Utwórz obiekt KTable odczytujący dane z tematu z wiadomościami giełdowymi. Ten nowy KTable będzie podzielony na kategorie według sektorów przemysłu.
  3. Połącz aktualizacje aktualności z informacjami o liczbie transakcji giełdowych w podziale na branże.

Zobaczmy teraz, jak wdrożyć ten plan działania.

Konwertuj KTable na KStream

Aby przekonwertować KTable na KStream, wykonaj następujące czynności.

  1. Wywołaj metodę KTable.toStream().
  2. Wywołując metodę KStream.map, zastąp klucz nazwą branży, a następnie pobierz obiekt TransactionSummary z instancji Windowed.

Połączymy te operacje w następujący sposób (kod znajdziesz w pliku src/main/Java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Ponieważ wykonujemy operację KStream.map, zwrócona instancja KStream jest automatycznie ponownie partycjonowana, gdy jest używana w połączeniu.

Zakończyliśmy proces konwersji, następnie musimy utworzyć obiekt KTable do czytania wiadomości giełdowych.

Stworzenie KTable dla wiadomości giełdowych

Na szczęście utworzenie obiektu KTable zajmuje tylko jedną linijkę kodu (kod można znaleźć w src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Warto zauważyć, że nie jest wymagane określanie obiektów Serde, ponieważ w ustawieniach używane są ciągi Serdes. Ponadto, stosując wyliczenie NAJWAŻNIEJSZE, tabela zapełniana jest rekordami już na samym początku.

Teraz możemy przejść do ostatniego kroku – połączenia.

Łączenie aktualizacji wiadomości z danymi dotyczącymi liczby transakcji

Utworzenie połączenia nie jest trudne. Łączenia lewego użyjemy w przypadku braku nowości giełdowych dla danej branży (niezbędny kod można znaleźć w pliku src/main/Java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Ten operator leftJoin jest dość prosty. W przeciwieństwie do złączeń opisanych w rozdziale 4, metoda JoinWindow nie jest używana, ponieważ podczas wykonywania złączenia KStream-KTable dla każdego klucza istnieje tylko jeden wpis w KTable. Takie połączenie nie jest ograniczone w czasie: rekord albo jest w KTable, albo go nie ma. Główny wniosek: korzystając z obiektów KTable można wzbogacić KStream o rzadziej aktualizowane dane referencyjne.

Teraz przyjrzymy się bardziej efektywnemu sposobowi wzbogacania wydarzeń z KStream.

5.3.4. Obiekty GlobalKTable

Jak widać, istnieje potrzeba wzbogacenia strumieni zdarzeń lub nadania im kontekstu. W rozdziale 4 widziałeś połączenia pomiędzy dwoma obiektami KStream, a w poprzedniej sekcji widziałeś połączenie pomiędzy KStream i KTable. We wszystkich tych przypadkach konieczne jest ponowne podzielenie strumienia danych podczas mapowania kluczy na nowy typ lub wartość. Czasami ponowne partycjonowanie odbywa się jawnie, a czasami Kafka Streams robi to automatycznie. Ponowna partycjonowanie jest konieczne, ponieważ klucze się zmieniły, a rekordy muszą znaleźć się w nowych sekcjach, w przeciwnym razie połączenie będzie niemożliwe (było to omówione w Rozdziale 4, w sekcji „Ponowne partycjonowanie danych” w podrozdziale 4.2.4).

Ponowne partycjonowanie wiąże się z kosztami

Ponowne partycjonowanie wymaga kosztów — dodatkowych kosztów zasobów związanych z tworzeniem tematów pośrednich i przechowywaniem zduplikowanych danych w innym temacie; oznacza to również zwiększone opóźnienia wynikające z pisania i czytania z tego tematu. Ponadto, jeśli chcesz połączyć więcej niż jeden aspekt lub wymiar, musisz połączyć złącza w łańcuch, zmapować rekordy za pomocą nowych kluczy i ponownie uruchomić proces ponownego partycjonowania.

Łączenie z mniejszymi zbiorami danych

W niektórych przypadkach objętość danych referencyjnych do połączenia jest stosunkowo niewielka, więc ich pełne kopie z łatwością zmieszczą się lokalnie w każdym węźle. W takich sytuacjach Kafka Streams udostępnia klasę GlobalKTable.

Instancje GlobalKTable są unikalne, ponieważ aplikacja replikuje wszystkie dane do każdego z węzłów. A ponieważ wszystkie dane znajdują się w każdym węźle, nie ma potrzeby dzielenia strumienia zdarzeń według klucza danych referencyjnych, aby był dostępny dla wszystkich partycji. Można także tworzyć łączenia bezkluczowe przy użyciu obiektów GlobalKTable. Wróćmy do jednego z poprzednich przykładów, aby zademonstrować tę funkcję.

Łączenie obiektów KStream z obiektami GlobalKTable

W podrozdziale 5.3.2 dokonaliśmy agregacji okienkowej transakcji giełdowych według kupujących. Wyniki tej agregacji wyglądały mniej więcej tak:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Chociaż wyniki te spełniały swój cel, bardziej przydatne byłoby, gdyby wyświetliło się również imię i nazwisko klienta oraz pełna nazwa firmy. Aby dodać nazwę klienta i nazwę firmy, możesz wykonać normalne łączenia, ale będziesz musiał wykonać dwa kluczowe mapowania i ponowne partycjonowanie. Dzięki GlobalKTable możesz uniknąć kosztów takich operacji.

W tym celu użyjemy obiektu countStream z Listingu 5.11 (odpowiedni kod można znaleźć w src/main/Java/bbejeck/chapter_5/GlobalKTableExample.java) i połączymy go z dwoma obiektami GlobalKTable.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Już o tym rozmawialiśmy, więc nie będę się powtarzać. Zauważam jednak, że ze względu na czytelność kod w funkcji toStream().map jest abstrahowany do obiektu funkcji zamiast wbudowanego wyrażenia lambda.

Następnym krokiem jest zadeklarowanie dwóch instancji GlobalKTable (pokazany kod można znaleźć w pliku src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”

Należy pamiętać, że nazwy tematów są opisywane za pomocą typów wyliczeniowych.

Teraz, gdy mamy już wszystkie komponenty, pozostaje tylko napisać kod połączenia (który można znaleźć w pliku src/main/Java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Chociaż w tym kodzie występują dwa łączenia, są one powiązane, ponieważ żaden z ich wyników nie jest używany oddzielnie. Wyniki wyświetlane są na koniec całej operacji.

Po uruchomieniu powyższej operacji łączenia otrzymasz takie wyniki:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Istota się nie zmieniła, ale te wyniki wyglądają bardziej wyraźnie.

Jeśli odliczasz do rozdziału 4, widziałeś już w akcji kilka typów połączeń. Są one wymienione w tabeli. 5.2. Ta tabela odzwierciedla możliwości łączności w wersji 1.0.0 strumieni Kafka; Coś może się zmienić w przyszłych wydaniach.

Książka „Strumienie Kafki w akcji. Aplikacje i mikroserwisy do pracy w czasie rzeczywistym”
Na koniec podsumujmy podstawy: możesz łączyć strumienie zdarzeń (KStream) i aktualizować strumienie (KTable) przy użyciu stanu lokalnego. Alternatywnie, jeśli rozmiar danych referencyjnych nie jest zbyt duży, możesz użyć obiektu GlobalKTable. GlobalKTables replikuje wszystkie partycje do każdego węzła aplikacji Kafka Streams, zapewniając dostępność wszystkich danych niezależnie od tego, której partycji odpowiada klucz.

Następnie zobaczymy funkcję Kafka Streams, dzięki której możemy obserwować zmiany stanu bez zużywania danych z tematu Kafki.

5.3.5. Stan możliwy do zapytania

Wykonaliśmy już kilka operacji na stanie i zawsze wysyłamy wyniki do konsoli (w celach programistycznych) lub zapisujemy je w temacie (w celach produkcyjnych). Podczas zapisywania wyników w temacie musisz użyć konsumenta platformy Kafka, aby je wyświetlić.

Odczytywanie danych z tych tematów można uznać za rodzaj zmaterializowanych poglądów. Dla naszych celów możemy skorzystać z definicji widoku zmaterializowanego z Wikipedii: „...fizyczny obiekt bazy danych zawierający wyniki zapytania. Może to być na przykład lokalna kopia zdalnych danych lub podzbiór wierszy i/lub kolumn tabeli lub wyników złączenia, albo tabela podsumowująca uzyskana w wyniku agregacji” (https://en.wikipedia.org/wiki /Zmaterializowany_widok).

Kafka Streams umożliwia także uruchamianie interaktywnych zapytań w magazynach stanowych, umożliwiając bezpośredni odczyt tych zmaterializowanych widoków. Należy zauważyć, że zapytanie do magazynu stanu jest operacją tylko do odczytu. Dzięki temu nie musisz się martwić przypadkowym spowodowaniem niespójności stanu podczas przetwarzania danych przez aplikację.

Ważna jest możliwość bezpośredniego wysyłania zapytań do magazynów stanu. Oznacza to, że można tworzyć aplikacje dashboardów bez konieczności pobierania danych od konsumenta platformy Kafka. Zwiększa to także wydajność aplikacji, gdyż nie ma konieczności ponownego zapisywania danych:

  • dzięki lokalizacji danych można uzyskać do nich szybki dostęp;
  • duplikacja danych jest wyeliminowana, ponieważ nie są one zapisywane w pamięci zewnętrznej.

Najważniejszą rzeczą, o której chcę, abyś pamiętał, jest to, że możesz bezpośrednio sprawdzać stan z poziomu swojej aplikacji. Możliwości, jakie to daje, nie da się przecenić. Zamiast zużywać dane z Kafki i przechowywać rekordy w bazie danych dla aplikacji, możesz wykonywać zapytania do magazynów stanu z tym samym wynikiem. Bezpośrednie zapytania do sklepów stanowych oznaczają mniej kodu (brak konsumenta) i mniej oprogramowania (nie ma potrzeby stosowania tabeli bazy danych do przechowywania wyników).

W tym rozdziale omówiliśmy już sporo zagadnień, więc na razie pomińmy dyskusję na temat interaktywnych zapytań względem sklepów stanowych. Ale nie martw się: w rozdziale 9 utworzymy prostą aplikację dashboardu z interaktywnymi zapytaniami. Wykorzysta niektóre przykłady z tego i poprzednich rozdziałów, aby zademonstrować interaktywne zapytania i sposób dodawania ich do aplikacji Kafka Streams.

Streszczenie

  • Obiekty KStream reprezentują strumienie zdarzeń, porównywalne do wstawek do bazy danych. Obiekty KTable reprezentują strumienie aktualizacji, bardziej przypominające aktualizacje bazy danych. Rozmiar obiektu KTable nie rośnie, stare rekordy są zastępowane nowymi.
  • Obiekty KTable są wymagane do operacji agregacji.
  • Korzystając z operacji okienkowych, możesz podzielić zagregowane dane na segmenty czasu.
  • Dzięki obiektom GlobalKTable możesz uzyskać dostęp do danych referencyjnych w dowolnym miejscu aplikacji, niezależnie od partycjonowania.
  • Możliwe są połączenia pomiędzy obiektami KStream, KTable i GlobalKTable.

Do tej pory skupialiśmy się na budowaniu aplikacji Kafka Streams przy użyciu wysokiego poziomu KStream DSL. Chociaż podejście wysokiego poziomu pozwala na tworzenie schludnych i zwięzłych programów, korzystanie z niego stanowi kompromis. Praca z DSL KStream oznacza zwiększenie zwięzłości kodu poprzez zmniejszenie stopnia kontroli. W następnym rozdziale przyjrzymy się interfejsowi API węzła obsługi niskiego poziomu i spróbujemy innych kompromisów. Programy będą dłuższe niż poprzednio, ale będziemy w stanie stworzyć prawie każdy węzeł obsługi, jakiego możemy potrzebować.

→ Więcej szczegółów na temat książki można znaleźć na stronie strona wydawcy

→ Dla Habrozhiteli 25% zniżki przy użyciu kuponu - Strumienie Kafki

→ Po opłaceniu wersji papierowej książki, pocztą elektroniczną zostanie wysłana książka elektroniczna.

Źródło: www.habr.com

Dodaj komentarz