Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі» Привіт, Хаброжителі! Ця книга підійде для будь-якого розробника, який хоче розібратися у потоковій обробці. Розуміння розподіленого програмування допоможе краще вивчити Kafka та Kafka Streams. Було б добре знати і сам фреймворк Kafka, але це не обов'язково: я розповім вам все, що потрібно. Досвідчені розробники Kafka, як і новачки, завдяки цій книзі освоять створення цікавих програм для потокової обробки за допомогою бібліотеки Kafka Streams. Java-розробники середнього та високого рівня, вже звичні до таких понять, як серіалізація, навчаться застосовувати свої навички для створення програм Kafka Streams. Вихідний код книги написаний на Java 8 і суттєво використовує синтаксис лямбда-виразів Java 8, так що вміння працювати з лямбда-функціями (навіть іншою мовою програмування) вам знадобиться.

Уривок. 5.3. Агрегування та віконні операції

У цьому розділі ми перейдемо до вивчення найперспективніших частин Kafka Streams. Поки що ми розглянули наступні аспекти Kafka Streams:

  • створення топології обробки;
  • використання стану у потокових додатках;
  • виконання з'єднань потоків даних;
  • різницю між потоками подій (KStream) і потоками оновлень (KTable).

У наступних прикладах ми зберемо всі ці елементи воєдино. Крім того, ви познайомитеся з віконними операціями - ще однією чудовою можливістю потокових програм. Першим нашим прикладом буде просте агрегування.

5.3.1. Агрегування обсягу продажу акцій з галузей промисловості

Агрегування та угруповання - життєво необхідні інструменти при роботі з потоковими даними. Дослідження окремих записів у міру надходження часто виявляється недостатньо. Для отримання даних додаткової інформації необхідні їх угруповання і комбінування.

У цьому прикладі вам доведеться приміряти костюм внутрішньоденного трейдера, якому потрібно відстежувати обсяги продажу акцій компаній у кількох галузях промисловості. Зокрема, вас цікавлять п'ять компаній із найбільшими обсягами продажу акцій у кожній із галузей промисловості.

Для такого агрегування знадобиться кілька наступних кроків щодо переведення даних у потрібний вигляд (якщо говорити в загальних рисах).

  1. Створити джерело на основі топіка, що публікує необроблену інформацію з торгівлі акціями. Нам доведеться відобразити об'єкт типу StockTransaction в об'єкт типу ShareVolume. Справа в тому, що об'єкт StockTransaction містить метадані продажів, а нам потрібні тільки дані про кількість акцій, що продаються.
  2. Згрупувати дані ShareVolume за символами акцій. Після угруповання символів можна згорнути ці дані до проміжних сум обсягів продажу акцій. Варто зазначити, що метод KStream.group повертає екземпляр типу KGroupedStream. А отримати екземпляр KTable можна, викликавши далі метод KGroupedStream.reduce.

Що таке інтерфейс KGroupedStream

Методи KStream.groupBy та KStream.groupByKey повертають екземпляр KGroupedStream. KGroupedStream є проміжним поданням потоку подій після угруповання за ключами. Він не призначений для безпосередньої роботи з ним. Натомість KGroupedStream використовується для операцій агрегування, результатом яких завжди є KTable. А оскільки результатом операцій агрегування є KTable і в них застосовується сховище стану, то, можливо, не всі оновлення в результаті вирушають далі конвеєром.

Метод KTable.groupBy повертає аналогічний KGroupedTable - проміжне подання потоку оновлень, перегрупованих за ключом.

Зробимо невелику перерву та подивимося на рис. 5.9, на якому показано, чого ми досягли. Ця топологія має бути вам уже добре знайома.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Поглянемо тепер на код для цієї топології (його можна знайти у файлі src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (листинг 5.2).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Наведений код відрізняється короткістю і великим обсягом дій, що проводяться в декількох рядках. У першому параметрі методу builder.stream ви можете помітити щось нове для себе: значення типу AutoOffsetReset.EARLIEST (існує також і LATEST), що задається за допомогою методу Consumed.withOffsetResetPolicy. За допомогою цього типу можна вказати стратегію скидання зсувів для кожного з KStream або KTable, він має пріоритет над параметром скидання зміщень з конфігурації.

GroupByKey та GroupBy

В інтерфейсі KStream є два методи для групування записів: GroupByKey та GroupBy. Обидва повертають KGroupedTable, так що у вас може виникнути закономірне питання: в чому ж різниця між ними і коли використовувати якийсь із них?

Метод GroupByKey застосовується, коли ключі KStream вже не порожні. А головне, прапор "вимагає повторного секціонування" ніколи не встановлювався.

Метод GroupBy передбачає, що ви змінювали ключі для угруповання, так що прапор повторного секціонування встановлений у true. Виконання після методу GroupBy з'єднань, агрегування тощо приведе до автоматичного повторного секціонування.
Резюме: слід за найменшої нагоди використовувати GroupByKey, а не GroupBy.

Що роблять методи mapValues ​​і groupBy - зрозуміло, так що поглянемо на метод sum() (його можна знайти у файлі src/main/java/bbejeck/model/ShareVolume.java) (листинг 5.3).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Метод ShareVolume.sum повертає проміжну суму обсягу продажу акцій, а результат всього ланцюжка обчислень є об'єктом KTable . Тепер ви знаєте, яку роль відіграє KTable. Під час надходження об'єктів ShareVolume у відповідному об'єкті KTable зберігається останнє актуальне оновлення. Важливо не забувати, що всі оновлення відображаються в попередньому shareVolumeKTable, але не всі вирушають далі.

Далі за допомогою цього KTable ми виконуємо агрегування (за кількістю акцій, що продаються), щоб отримати п'ять компаній з найбільшими обсягами продажів акцій у кожній з галузей промисловості. Наші дії будуть аналогічні діям при першому агрегуванні.

  1. Виконати ще одну операцію groupBy для групування окремих об'єктів ShareVolume по галузях промисловості.
  2. Почати підсумовування об'єктів ShareVolume. Цього разу об'єкт агрегування є чергою за пріоритетом фіксованого розміру. У такій черзі фіксованого розміру зберігаються лише п'ять компаній із найбільшою кількістю проданих акцій.
  3. Відобразити черги з попереднього пункту в рядкове значення і повернути п'ять акцій по галузях промисловості, що найбільше продаються за кількістю акцій.
  4. Записати результати у рядковому вигляді в топік.

На рис. 5.10 показано граф топології руху даних. Як ви бачите, друге коло обробки досить просте.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Тепер, чітко усвідомивши структуру цього другого кола обробки, можна звернутися до його вихідного коду (ви знайдете його у файлі src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (листинг 5.4).

У цьому ініціалізаторі є змінна fixedQueue. Це об'єкт користувача - адаптер для java.util.TreeSet, який застосовується для відстеження N найбільших результатів у порядку зменшення кількості проданих акцій.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Ви вже зустрічалися з викликами groupBy і mapValues, тому не будемо на них зупинятися (ми викликаємо метод KTable.toStream, оскільки метод KTable.print вважається застарілим). Але ви поки що не бачили KTable-версію методу aggregate(), тому ми витратимо трохи часу на його обговорення.

Як ви пам'ятаєте, KTable відрізняє те, що записи з однаковими ключами вважаються оновленнями. KTable замінює старий новий запис. Агрегування відбувається так само: агрегуються останні записи з одним ключем. При надходженні запису вона додається в екземпляр класу FixedSizePriorityQueue за допомогою суматора (другий параметр у виклику методу aggregate), але якщо вже існує інший запис з тим же ключем, то старий запис видаляється за допомогою від'ємника (третій параметр у виклику методу aggregate).

Це все означає, що наш агрегатор, FixedSizePriorityQueue, зовсім не агрегує всі значення з одним ключем, а зберігає ковзну суму кількостей видів акцій, що найбільше продаються. У кожному записі міститься загальна кількість проданих досі акцій. KTable дасть вам інформацію про те, акцій яких компаній продається найбільше зараз, ковзне агрегування кожного з оновлень не потрібно.

Ми навчилися робити дві важливі речі:

  • групувати значення KTable за спільним для них ключем;
  • виконувати над цими згрупованими значеннями такі корисні операції, як згортка та агрегування.

Вміння виконувати ці операції є важливим для розуміння змісту даних, що рухаються через програму Kafka Streams, і з'ясування того, яку інформацію вони несуть.

Ми також поєднали докупи деякі з ключових понять, які раніше обговорювалися в цій книзі. У розділі 4 ми розповідали, наскільки важливий для потокового застосування відмовостійкий, локальний стан. Перший приклад із цього розділу продемонстрував, чому настільки важливий локальний стан — він дає можливість відстежувати, яку інформацію ви вже бачили. Локальний доступ дозволяє уникнути мережних затримок, завдяки чому програма стає більш продуктивною та стійкою до помилок.

Під час виконання будь-якої операції згортки або агрегування необхідно вказати назву сховища стану. Операції згортки та агрегування повертають екземпляр KTable, а KTable використовує сховище стану для заміни старих результатів на нові. Як ви бачили, далеко не всі оновлення відправляються далі конвеєром, і це важливо, оскільки операції агрегування призначені для отримання підсумкової інформації. Якщо не застосовувати локальний стан, KTable надсилатиме далі всі результати агрегування та згортки.

Далі ми подивимося виконання таких операцій, як агрегування, у межах конкретного проміжку часу — про віконних операцій (windowing operations).

5.3.2. Віконні операції

У попередньому розділі ми познайомилися зі «ковзаючими» пакунками та агрегуванням. Додаток виробляло безперервну згортку обсягу продажу акцій з наступним агрегуванням п'яти акцій, що найбільше продаються на біржі.

Іноді подібні безперервні агрегування та згортка результатів необхідні. А іноді потрібно виконати операції лише над заданим проміжком часу. Наприклад, обчислити, скільки було здійснено біржових операцій з акціями конкретної компанії за останні 10 хвилин. Або скільки користувачів натиснуло на новий рекламний банер за останні 15 хвилин. Додаток може здійснювати такі операції багаторазово, але з результатами, що стосуються лише заданих проміжків часу (тимчасових вікон).

Підрахунок біржових транзакцій за покупцем

У наступному прикладі ми займемося відстеженням біржових транзакцій кількома трейдерами — або великими організаціями, або тямущими фінансистами-одинаками.

Існує дві можливі причини для такого відстеження. Одна з них – необхідність знати, що купують/продають лідери ринку. Якщо ці великі гравці і досвідчені інвестори бачать для себе можливості, що є, має сенс дотримуватися їх стратегії. Друга причина полягає у бажанні помітити будь-які можливі ознаки незаконних угод із використанням внутрішньої інформації. Для цього вам знадобиться проаналізувати кореляцію великих сплесків продажів із важливими прес-релізами.

Таке відстеження складається з таких етапів, як:

  • створення потоку для читання з топіка stock-transactions;
  • угруповання вхідних записів за ідентифікатором покупця та біржовим символом акції. Виклик методу groupBy повертає екземпляр класу KGroupedStream;
  • Повернення методом KGroupedStream.windowedBy потоку даних, обмеженого тимчасовим вікном, що дозволяє виконувати віконне агрегування. Залежно від типу вікна повертається або TimeWindowedKStream, або SessionWindowedKStream;
  • підрахунок транзакцій для операції агрегування. Віконний потік даних визначає, чи при цьому підрахунку враховується конкретний запис;
  • запис результатів у топік або виведення їх у консоль під час розробки.

Топологія цієї програми проста, але наочна її картинка не завадить. Погляньмо на рис. 5.11.

Далі ми розглянемо функціональність віконних операцій та відповідний код.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»

Типи вікон

У Kafka Streams існує три типи вікон:

  • сеансові;
  • «перекидаються» (tumbling);
  • ковзаючі/«стрибають» (sliding/hopping).

Яке вибрати залежить від бізнес-вимог. «Перетворені» і «стрибають» вікна обмежуються за часом, у той час як обмеження сеансових пов'язані з діями користувачів - тривалість сеансу (-ів) визначається виключно тим, наскільки активно поводиться користувач. Головне — не забувати, що всі типи вікон ґрунтуються на мітках дати/часу записів, а не системного часу.

Далі ми реалізуємо нашу топологію з кожним із типів вікон. Повний код буде наведено лише в першому прикладі, для інших типів вікон нічого не зміниться, крім типу віконної операції.

Сеансові вікна

Сеансові вікна сильно відрізняються від інших типів вікон. Вони обмежуються не так за часом, як активністю користувача (або активністю тієї сутності, яку ви хотіли б відстежувати). Сеансові вікна розмежовуються періодами бездіяльності.

Малюнок 5.12 ілюструє поняття сеансових вікон. Менший сеанс зливатиметься з сеансом ліворуч від нього. А сеанс справа буде окремим, оскільки слідує за тривалим періодом бездіяльності. Сеансові вікна ґрунтуються на діях користувачів, але застосовують мітки дати/часу із записів для визначення того, до якого сеансу належить запис.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»

Використання сеансових вікон для відстеження біржових транзакцій

Скористайтеся сеансовими вікнами для захоплення інформації про біржові транзакції. Реалізація сеансових вікон показана у лістингу 5.5 (який можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Більшість операцій цієї топології ви вже зустрічали, тому немає потреби розглядати їх тут знову. Але є тут кілька нових елементів, які ми зараз обговоримо.

При будь-якій операції groupBy зазвичай виконується будь-яка операція агрегування (агрегування, згортка або підрахунок кількості). Можна виконати або накопичувальне агрегування з наростаючим підсумком, або агрегування вікон, при якому враховуються записи в межах заданого тимчасового вікна.

Код із лістингу 5.5 виконує підрахунок кількості транзакцій у межах сеансових вікон. На рис. 5.13 ці дії аналізуються покроково.

За допомогою windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) ми створюємо сеансове вікно з інтервалом бездіяльності 20 секунд і інтервалом збереження 15 хвилин. Інтервал бездіяльності 20 секунд означає, що програма включатиме будь-який запис, який надійде в межах 20 секунд від закінчення або початку поточного сеансу в поточний (активний) сеанс.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Далі ми вказуємо, яку операцію агрегування потрібно виконати у сеансовому вікні – у цьому випадку count. Якщо вхідний запис виходить за межі інтервалу бездіяльності (з будь-якої зі сторін від мітки дати/часу), програма створює новий сеанс. Інтервал збереження означає підтримку сеансу протягом певного часу і допускає запізнілі дані, які виходять за період бездіяльності сеансу, але все ще можуть бути приєднані. Крім того, початок і кінець нового сеансу, що вийшов в результаті об'єднання, відповідають ранній і пізній мітці дати/часу.

Розглянемо кілька записів із методу count, щоб побачити, як працюють сеанси (табл. 5.1).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
При надходженні записів ми шукаємо вже існуючі сеанси з тим же ключем, часом закінчення менше ніж поточна мітка дати/часу - інтервал бездіяльності і часом почала більше ніж поточна мітка дати/часу + інтервал бездіяльності. З огляду на це чотири записи з табл. 5.1 зливаються в єдиний сеанс в такий спосіб.

1. Першою надходить запис 1, так що час початку дорівнює часу закінчення і дорівнює 00:00:00.

2. Далі надходить запис 2, і ми шукаємо сеанси, що закінчуються не раніше 23:59:55 і починаються пізніше 00:00:35. Знаходимо запис 1 і об'єднуємо сеанси 1 і 2. Беремо час початку сеансу 1 (раніше) і час закінчення сеансу 2 (пізніше), так що наш новий сеанс починається о 00:00:00 і закінчується о 00:00:15.

3. Надходить запис 3, ми шукаємо сеанси між 00:00:30 та 00:01:10 і не знаходимо жодного. Додаємо другий сеанс для ключа 123-345-654,FFBE, що починається і закінчується о 00:00:50.

4. Надходить запис 4, і ми шукаємо сеанси між 23:59:45 та 00:00:25. Цього разу знаходяться обидва сеанси - 1 і 2. Всі три сеанси об'єднуються в один, з часом початку 00:00:00 і часом закінчення 00:00:15.

З розказаного у цьому розділі варто запам'ятати такі важливі нюанси:

  • сеанси – не вікна фіксованого розміру. Тривалість сеансу визначається активністю у межах заданого проміжку часу;
  • мітки дати/часу даних визначають, потрапляє подія в існуючий сеанс чи проміжок бездіяльності.

Далі ми обговоримо наступний різновид вікон — вікна, що «викидаються».

«Перетворені» вікна

вікна, що «перекидаються» (tumbling), захоплюють події, що потрапляють у певний проміжок часу. Уявіть собі, що вам потрібно захоплювати всі біржові транзакції якоїсь компанії кожні 20 секунд, тому ви збираєте всі події за цей проміжок часу. Після закінчення 20-секундного інтервалу вікно «перекидається» і переходить на новий 20-секундний інтервал спостереження. Малюнок 5.14 ілюструє цю ситуацію.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Як ви можете бачити, всі події, що надійшли за останні 20 секунд, включені у вікно. Після цього проміжку часу створюється нове вікно.

У лістингу 5.6 наведено код, що демонструє використання вікон, що «перекидаються», для захоплення кожні 20 секунд біржових транзакцій (його можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Завдяки цій невеликій зміні виклику методу TimeWindows.of можна використовувати вікно, що «викидається». У даному прикладі немає виклику методу until(), внаслідок чого використовуватиметься інтервал збереження за умовчанням, рівний 24 годин.

Нарешті, настав час перейти до останнього з варіантів вікон - «стрибаючим» (hopping) вікнам.

Ковзаючі («стрибають») вікна

Ковзаючі/«стрибають» (sliding/hopping) вікна схожі на «перекидаються», але з невеликою відмінністю. Ковзаючі вікна не чекають закінчення інтервалу часу перед створенням нового вікна для обробки недавніх подій. Вони запускають нові обчислення після інтервалу очікування меншого, ніж тривалість вікна.

Для ілюстрації відмінностей вікон, що «стрибаються» і «стрибають», повернемося наприклад з підрахунком біржових транзакцій. Наша мета, як і раніше, полягає в підрахунку числа транзакцій, але нам не хотілося б чекати весь проміжок часу перед оновленням лічильника. Натомість ми будемо оновлювати лічильник через короткі проміжки часу. Наприклад, підраховувати кількість транзакцій ми як і раніше кожні 20 секунд, але оновлювати лічильник — кожні 5 секунд, як показано на рис. 5.15. При цьому у нас виявляється три вікна результатів з даними, що перекриваються.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
У лістингу 5.7 наведено код для завдання ковзних вікон (його можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
«Перекинуте» вікно можна перетворити на «стрибаюче» за допомогою додавання виклику методу advanceBy(). У наведеному прикладі інтервал збереження дорівнює 15 хвилин.

Ви побачили у цьому розділі, як обмежувати результати агрегування тимчасовими вікнами. Зокрема, хотілося б, щоб ви запам'ятали з цього розділу такі три речі:

  • розмір сеансових вікон обмежується не проміжком часу, а активністю користувачів;
  • вікна, що «кидаються», дають уявлення про події в рамках заданого періоду часу;
  • тривалість роботи «стрибають» вікон фіксована, але вони часто оновлюються і можуть містити у всіх вікнах записи, що перетинаються.

Далі ми дізнаємося, як перетворити KTable назад на KStream для з'єднання.

5.3.3. З'єднання об'єктів KStream та KTable

У розділі 4 ми обговорювали з'єднання двох об'єктів KStream. Тепер ми маємо навчитися з'єднувати KTable і KStream. Знадобитися це може з наступної простої причини. KStream – потік записів, а KTable – потік оновлень записів, але іноді може бути потрібно додати додатковий контекст до потоку записів за допомогою оновлень із KTable.

Візьмемо дані про кількість біржових транзакцій та з'єднаємо їх із біржовими новинами за відповідними галузями промисловості. Ось що потрібно зробити, що досягти цього з урахуванням вже наявного коду.

  1. Перетворити об'єкт KTable з даними про кількість біржових транзакцій KStream з наступною заміною ключа на ключ, що позначає галузь промисловості, що відповідає даному символу акцій.
  2. Створити об'єкт KTable, який читає дані з топіка з біржовими новинами. Цей новий KTable буде категоризований за галузями промисловості.
  3. Поєднати оновлення новин з інформацією про кількість біржових транзакцій галузями промисловості.

Тепер побачимо, як реалізувати цей план дій.

Перетворення KTable на KStream

Для перетворення KTable на KStream необхідно зробити таке.

  1. Викликати метод KTable.toStream().
  2. За допомогою виклику методу KStream.map замінити ключ назвою галузі промисловості, після чого витягти з екземпляра Windowed об'єкт TransactionSummary.

Ми зв'яжемо ці операції ланцюжком так (код можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.8).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Оскільки ми виконуємо операцію KStream.map, то повторне секціонування для екземпляра KStream, що повертається, проводиться автоматично при його використанні в з'єднанні.

Ми завершили процес перетворення, далі нам потрібно створити об'єкт KTable для читання біржових новин.

Створення KTable для біржових новин

На щастя, для створення об'єкта KTable достатньо одного рядка коду (цей код можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.9).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Слід зазначити, що ніяких об'єктів Serde вказувати не потрібно, оскільки в налаштуваннях використовуються рядкові Serde. Завдяки застосуванню перерахування EARLIEST таблиця заповнюється записами на самому початку.

Тепер ми можемо перейти до заключного кроку з'єднання.

З'єднання оновлень новин із даними про кількість транзакцій

Створення з'єднання не є складними. Ми скористаємося лівим з'єднанням на випадок, якщо у відповідній галузі промисловості немає біржових новин (потрібний код можна знайти у файлі src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (листинг 5.10).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Цей оператор leftJoin досить простий. На відміну від з'єднань з розділу 4, метод JoinWindow не використовується, оскільки при виконанні з'єднання KStream-KTable для кожного ключа KTable є тільки один запис. Таке з'єднання не обмежується за часом: запис або є в KTable або відсутній. Основний висновок: за допомогою об'єктів KTable можна збагачувати KStream рідше оновлюваними довідковими даними.

А тепер ми розглянемо ефективніший спосіб збагачення подій з KStream.

5.3.4. Об'єкти GlobalKTable

Як ви зрозуміли, існує необхідність збагачення потоків подій чи додавання до них контексту. У розділі 4 ви бачили з'єднання двох об'єктів KStream, а в попередньому розділі з'єднання KStream і KTable. У всіх цих випадках необхідно повторне секціонування потоку даних при відображенні ключів на новий тип або значення. Іноді повторне секціонування виконується явно, а іноді Kafka Streams робить це автоматично. Повторне секціонування необхідно, оскільки ключі змінилися і записи повинні опинитися в нових секціях, інакше з'єднання виявиться неможливим (це обговорювалося в розділі 4, у пункті «Повторне секціювання даних» підрозділу 4.2.4).

Повторне секціонування має власну ціну

Повторне секціонування вимагає витрат — додаткових витрат ресурсів створення проміжних топіків, збереження даних у ще одному топіці; воно також означає підвищення затримки внаслідок запису та читання з цього топіка. Крім того, при необхідності виконати з'єднання більш ніж за одним аспектом або виміром, потрібно організувати з'єднання ланцюжком, відобразити записи з новими ключами і знову провести процес повторного секціонування.

З'єднання з наборами даних меншого розміру

У деяких випадках обсяг довідкових даних, з якими планується з'єднання, відносно невеликий, так що повні їх копії можуть поміститися локально на кожному з вузлів. Для таких ситуацій у Kafka Streams передбачений клас GlobalKTable.

Примірники GlobalKTable є унікальними, оскільки програма реплікує всі дані на кожен з вузлів. А оскільки на кожному з вузлів є всі дані, немає необхідності секціонувати потік подій по ключу довідкових даних, щоб він був доступний усім секціям. За допомогою об'єктів GlobalKTable можна також виконувати безключові з'єднання. Повернімося до одного із попередніх прикладів для демонстрації цієї можливості.

З'єднання об'єктів KStream з об'єктами GlobalKTable

У підрозділі 5.3.2 ми виконали віконне агрегування біржових транзакцій щодо покупців. Результати цього агрегування виглядали приблизно так:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Хоча ці результати відповідали поставленій меті, було б зручніше, якби виводилося також ім'я клієнта та повна назва компанії. Щоб додати ім'я покупця та назву компанії, можна виконувати звичайні з'єднання, але при цьому знадобиться зробити два відображення ключів та повторне секціонування. За допомогою GlobalKTable можна уникнути витрат на такі операції.

Для цього ми скористаємося об'єктом countStream з лістингу 5.11 (відповідний код можна знайти у файлі src/main/java/bbejeck/chapter_5/GlobalKTableExample.java), з'єднавши його з двома об'єктами GlobalKTable.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Ми вже обговорювали це раніше, так що не повторюватимуся. Але зазначу, що код функції toStream().map задля легкочитаності абстрагований в об'єкт-функцію замість вбудованого лямбда-вираження.

Наступний етап – оголошення двох примірників GlobalKTable (наведений код можна знайти у файлі src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (листинг 5.12).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»

Зверніть увагу, що назви топіків описуються за допомогою перерахованих типів.

Тепер, коли ми підготували всі компоненти, залишилося написати код для з'єднання (який можна знайти у файлі src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (листинг 5.13).

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Хоча в цьому коді є дві сполуки, вони організовані у вигляді ланцюжка, оскільки окремо жоден з їх результатів не використовується. Результати виводяться наприкінці всієї операції.

При запуску наведеної вище операції з'єднання ви отримаєте результати наступного виду:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Суть не змінилася, але ці результати виглядають зрозумілішими.

Якщо рахувати розділ 4, ви вже бачили кілька типів з'єднань у дії. Вони перераховані у табл. 5.2. Ця таблиця відображає можливості з'єднання актуальні для версії 1.0.0 Kafka Streams; у майбутніх випусках, можливо, щось зміниться.

Книга «Kafka Streams у дії. Програми та мікросервіси для роботи в реальному часі»
Насамкінець нагадаю основне: ви можете з'єднувати потоки подій (KStream) і потоки оновлень (KTable) за допомогою локального стану. Крім того, якщо розмір довідкових даних не дуже великий, можна скористатися об'єктом GlobalKTable. GlobalKTable реплікує всі секції на кожен з вузлів програми Kafka Streams, забезпечуючи тим самим доступність всіх даних незалежно від того, якій секції відповідає ключ.

Далі ми побачимо можливість Kafka Streams, завдяки якій можна спостерігати зміни стану без споживання даних із топіка Kafka.

5.3.5. Доступний для запитів стан

Ми вже виконували кілька операцій за участю стану і завжди виводили результати в консоль (для цілей розробки) або записували їх у топік (для цілей промислової експлуатації). При записі результатів у топік доводиться використовувати споживач Kafka для їх перегляду.

Читання даних із цих топіків можна вважати різновидом матеріалізованих уявлень (materialized views). Для наших завдань можна використовувати визначення матеріалізованого подання з «Вікіпедії»: «…фізичний об'єкт бази даних, що містить результати виконання запиту. Наприклад, воно може бути локальною копією віддалених даних, або підмножиною рядків та/або стовпців таблиці або результатів з'єднання, або зведеною таблицею, отриманою за допомогою агрегування» (https://en.wikipedia.org/wiki/Materialized_view).

Kafka Streams також дозволяє виконувати інтерактивні запити (interactive queries) до сховищ стану, що дає можливість безпосереднього читання цих матеріалізованих уявлень. Важливо, що запит до сховища стану носить характер операції «тільки читання». Завдяки цьому ви можете не боятися випадково зробити стан неузгодженим під час обробки даних програмою.

Можливість безпосередніх запитів до сховищ стану має велике значення. Вона означає, що можна створювати програми - інформаційні панелі без необхідності спочатку отримувати дані від споживача Kafka. Підвищує вона і ефективність програми, тому що не потрібно знову записувати дані:

  • завдяки локальності даних до них можна швидко звернутись;
  • виключається дублювання даних, оскільки вони не записуються у зовнішнє сховище.

Головне, що я хотів би, щоб ви запам'ятали: можна безпосередньо виконувати запити до стану програми. Не можна переоцінити можливості, які вам дає. Замість того, щоб споживати дані з Kafka і зберігати записи в базі даних для програми, можна виконувати запити до сховищ стану з тим самим результатом. Безпосередні запити до сховищ стану означають менший обсяг коду (відсутність споживача) та менше програмного забезпечення (відсутність потреби в таблиці бази даних для зберігання результатів).

Ми охопили чималий обсяг інформації в цьому розділі, тому на якийсь час припинимо наше обговорення інтерактивних запитів до сховищ стану. Але не хвилюйтеся: у розділі 9 ми створюватимемо простий додаток — інформаційну панель з інтерактивними запитами. Для демонстрації інтерактивних запитів та можливостей їх додавання до Kafka Streams у ньому будуть використовуватися деякі з прикладів цієї та попередніх розділів.

Резюме

  • Об'єкти KStream уособлюють потоки подій, які можна порівняти зі вставками в базу даних. Об'єкти KTable уособлюють потоки оновлень, вони більше схожі на оновлення в базі даних. Розмір об'єкта KTable не зростає, старі записи замінюються на нові.
  • Об'єкти KTable потрібні для операцій агрегування.
  • За допомогою віконних операцій можна розбити агреговані дані по тимчасових кошиках.
  • Завдяки об'єктам GlobalKTable можна отримати доступ до довідкових даних у будь-якій точці програми, незалежно від розбиття по секціях.
  • Можливі з'єднання між собою об'єктів KStream, KTable та GlobalKTable.

Досі ми концентрували увагу на створенні програм Kafka Streams за допомогою високорівневого DSL KStream. Хоча високорівневий підхід дозволяє створювати акуратні та лаконічні програми, його використання є певним компромісом. Робота з DSL KStream означає підвищення лаконічності коду за рахунок зниження рівня контролю. У наступному розділі ми розглянемо низькорівневий API вузлів-обробників та спробуємо інші компроміси. Програми стануть довшими, ніж були дотепер, зате у нас з'явиться можливість створення практично будь-якого вузла-обробника, який може нам знадобитися.

→ Докладніше з книгою можна ознайомитись на сайті видавництва

→ Для Хаброжителів знижка 25% купона Потоки Кафки

→ За фактом оплати паперової версії книги на e-mail надсилається електронна книга.

Джерело: habr.com

Додати коментар або відгук