Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе» Прывітанне, Хаброжыцелі! Гэтая кніга падыдзе для любога распрацоўніка, які хоча разабрацца ў струменевай апрацоўцы. Разуменне размеркаванага праграмавання дапаможа лепш вывучыць Kafka і Kafka Streams. Было б нядрэнна ведаць і сам фрэймворк Kafka, але гэта не абавязкова: я раскажу вам усё, што трэба. Доследныя распрацоўшчыкі Kafka, як і навічкі, дзякуючы гэтай кнізе асвояць стварэнне цікавых прыкладанняў для струменевай апрацоўкі з дапамогай бібліятэкі Kafka Streams. Java-распрацоўшчыкі сярэдняга і высокага ўзроўню, ужо звыклыя да такіх паняццяў, як серыялізацыя, навучацца прымяняць свае навыкі для стварэння прыкладанняў Kafka Streams. Зыходны код кнігі напісаны на Java 8 і істотна выкарыстоўвае сінтаксіс лямбда-выразаў Java 8, так што ўменне працаваць з лямбда-функцыямі (нават на іншай мове праграмавання) вам спатрэбіцца.

Урывак. 5.3. Агрэгаванне і аконныя аперацыі

У гэтым раздзеле мы пяройдзем да вывучэння найболей шматспадзеўных частак Kafka Streams. Пакуль мы разгледзелі наступныя аспекты Kafka Streams:

  • стварэнне тапалогіі апрацоўкі;
  • выкарыстанне стану ў струменевых дадатках;
  • выкананне злучэнняў патокаў даных;
  • адрозненні паміж патокамі падзей (KStream) і патокамі абнаўленняў (KTable).

У наступных жа прыкладах мы збярэм усе гэтыя элементы разам. Акрамя таго, вы пазнаёміцеся з аконнымі аперацыямі - яшчэ адной выдатнай магчымасцю струменевых прыкладанняў. Першым нашым прыкладам будзе простае агрэгаванне.

5.3.1. Агрэгаванне аб'ёму продажаў акцый па галінах прамысловасці

Агрэгаванне і групоўка - жыццёва неабходныя прылады пры працы з струменевымі дадзенымі. Даследаванні асобных запісаў па меры паступлення часта аказваецца недастаткова. Для вымання з дадзеных дадатковай інфармацыі неабходны іх групоўка і камбінаванне.

У гэтым прыкладзе вам трэба будзе прымерыць касцюм ўнутрыдзённага трэйдара, якому трэба адсочваць аб'ёмы продажаў акцый кампаній у некалькіх галінах прамысловасці. У прыватнасці, вас цікавяць пяць кампаній з найвялікімі аб'ёмамі продажаў акцый у кожнай з галін прамысловасці.

Для падобнага агрэгавання запатрабуецца некалькі наступных крокаў па перакладзе дадзеных у патрэбны выгляд (калі казаць у агульных рысах).

  1. Стварыць крыніцу на аснове топіка, які публікуе неапрацаваную інфармацыю па гандлі акцыямі. Нам давядзецца адлюстраваць аб'ект тыпу StockTransaction у аб'ект тыпу ShareVolume. Справа ў тым, што аб'ект StockTransaction змяшчае метададзеныя продажаў, а нам патрэбныя толькі дадзеныя аб колькасці прадаваных акцый.
  2. Згрупаваць дадзеныя ShareVolume па сімвалах акцый. Пасля групоўкі па сімвалах можна згарнуць гэтыя дадзеныя да прамежкавых сум аб'ёмаў продажаў акцый. Варта адзначыць, што метад KStream.groupBy вяртае асобнік тыпу KGroupedStream. А атрымаць асобнік KTable можна, выклікаўшы далей метад KGroupedStream.reduce.

Што такое інтэрфейс KGroupedStream

Метады KStream.groupBy і KStream.groupByKey вяртаюць асобнік KGroupedStream. KGroupedStream з'яўляецца прамежкавым уяўленнем патоку падзей пасля групоўкі па ключах. Ён зусім не прызначаны для непасрэднай працы з ім. Замест гэтага KGroupedStream выкарыстоўваецца для аперацый агрэгавання, вынікам якіх заўсёды з'яўляецца KTable. А паколькі вынікам аперацый агрэгавання з'яўляецца KTable і ў іх ужываецца сховішча стану, то, магчыма, не ўсе абнаўленні ў выніку адпраўляюцца далей па канвееры.

Метад KTable.groupBy вяртае аналагічны KGroupedTable - прамежкавае ўяўленне струменя абнаўленняў, перагрупаваць па ключы.

Зробім невялікі перапынак і паглядзім на мал. 5.9, на якім паказана, чаго мы дабіліся. Гэтая тапалогія павінна быць вам ужо добра знаёмая.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Зірнем зараз на код для гэтай тапалогіі (яго можна знайсці ў файле src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (лістынг 5.2).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Прыведзены код адрозніваецца сцісласцю і вялікім аб'ёмам вырабляных у некалькіх радках дзеянняў. У першым параметры метаду builder.stream вы можаце заўважыць нешта новае для сябе: значэнне пералічанага тыпу AutoOffsetReset.EARLIEST (існуе таксама і LATEST), якое задаецца з дапамогай метаду Consumed.withOffsetResetPolicy. З дапамогай гэтага пералічанага тыпу можна паказаць стратэгію скіду зрушэнняў для кожнага з KStream ці KTable, ён валодае прыярытэтам над параметрам скіду зрушэнняў з канфігурацыі.

GroupByKey і GroupBy

У інтэрфейсе KStream ёсць два метады для групоўкі запісаў: GroupByKey і GroupBy. Абодва вяртаюць KGroupedTable, так што ў вас можа з'явіцца заканамернае пытанне: у чым жа адрозненне паміж імі і калі выкарыстоўваць які з іх?

Метад GroupByKey ужываецца, калі ключы ў KStream ужо непустыя. А галоўнае, сцяг "патрабуе паўторнага секцыянавання" ніколі не ўсталёўваўся.

Метад GroupBy мяркуе, што вы мянялі ключы для групоўкі, так што сцяг паўторнага секцыянавання ўсталяваны ў true. Выкананне пасля метаду GroupBy злучэнняў, агрэгавання і т. п. прывядзе да аўтаматычнага паўторнага секцыянавання.
Рэзюмэ: варта пры найменшай магчымасці выкарыстоўваць GroupByKey, а не GroupBy.

Што робяць метады mapValues ​​і groupBy – зразумела, так што зірнем на метад sum() (яго можна знайсці ў файле src/main/java/bbejeck/model/ShareVolume.java) (лістынг 5.3).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Метад ShareVolume.sum вяртае прамежкавую суму аб'ёму продажаў акцый, а вынік усяго ланцужка вылічэнняў уяўляе сабой аб'ект KTable . Цяпер вы разумееце, якую ролю гуляе KTable. Пры паступленні аб'ектаў ShareVolume у які адпавядае аб'екце KTable захоўваецца апошняе актуальнае абнаўленне. Важна не забываць, што ўсе абнаўленні адлюстроўваюцца ў папярэднім shareVolumeKTable, але не ўсё адпраўляюцца далей.

Далей з дапамогай гэтага KTable мы выконваем агрэгавання (па колькасці прадаваных акцый), каб атрымаць пяць кампаній з найбольшымі аб'ёмамі продажаў акцый у кожнай з галін прамысловасці. Нашы дзеянні пры гэтым будуць аналагічныя дзеянням пры першым агрэгаванні.

  1. Выканаць яшчэ адну аперацыю groupBy для групоўкі асобных аб'ектаў ShareVolume па галінах прамысловасці.
  2. Прыступіць да падсумоўвання аб'ектаў ShareVolume. На гэты раз аб'ект агрэгавання ўяўляе сабой чаргу па прыярытэце фіксаванага памеру. У такой чарзе фіксаванага памеру захоўваюцца толькі пяць кампаній з найбольшымі колькасцямі прададзеных акцый.
  3. Паказваць чэргі з папярэдняга пункта ў радковае значэнне і вярнуць пяць найбольш прадаваных па колькасці акцый па галінах прамысловасці.
  4. Запісаць вынікі ў радковым выглядзе ў топік.

На мал. 5.10 паказаны граф тапалогіі руху дадзеных. Як вы бачыце, другі круг апрацоўкі дастаткова просты.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Зараз, выразна ўразумеўшы сабе структуру гэтага другога круга апрацоўкі, можна звярнуцца да яго зыходнага кода (вы знойдзеце яго ў файле src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (лістынг 5.4).

У дадзеным ініцыялізатары ёсць зменная fixedQueue. Гэта карыстацкі аб'ект - адаптар для java.util.TreeSet, які ўжываецца для адсочвання N найбольшых вынікаў у парадку змяншэння колькасці прададзеных акцый.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Вы ўжо сустракаліся з выклікамі groupBy і mapValues, так што не будзем на іх спыняцца (мы выклікаем метад KTable.toStream, паколькі метад KTable.print лічыцца састарэлым). Але вы пакуль яшчэ не бачылі KTable-версію метаду aggregate(), так што мы выдаткуем крыху часу на яго абмеркаванне.

Як вы падушыце, KTable адрознівае тое, што запісы з аднолькавымі ключамі лічацца абнаўленнямі. KTable замяняе стары запіс новай. Агрэгаванне адбываецца падобнай жа выявай: агрэгуюцца апошнія запісы з адным ключом. Пры паступленні запісу яна дадаецца ў асобнік класа FixedSizePriorityQueue з дапамогай суматара (другі параметр у выкліку метаду aggregate), але калі ўжо існуе іншы запіс з тым жа ключом, то стары запіс выдаляецца з дапамогай вычытальніка (трэці параметр у выкліку метаду aggregate).

Гэта ўсё значыць, што наш агрэгатар, FixedSizePriorityQueue, зусім не агрэгуе ўсе значэння з адным ключом, а захоўвае слізгальную суму колькасцяў N найболей прадаваных выглядаў акцый. У кожным паступаючым запісе змяшчаецца агульная колькасць прададзеных да гэтага часу акцый. KTable дасць вам інфармацыю аб тым, акцый якіх кампаній прадаецца больш за ўсё ў сапраўдны момант, слізгальнае агрэгаванне кожнага з абнаўленняў не патрабуецца.

Мы навучыліся рабіць дзве важныя рэчы:

  • групаваць значэнні ў KTable па агульным для іх ключы;
  • выконваць над гэтымі згрупаванымі значэннямі такія карысныя аперацыі, як скрутка і агрэгаванне.

Уменне выконваць гэтыя аперацыі важна для разумення сэнсу даных, якія рухаюцца праз прыкладанне Kafka Streams, і высвятлення таго, якую інфармацыю яны нясуць.

Мы таксама злучылі разам некаторыя з ключавых паняццяў, якія абмяркоўваліся раней у гэтай кнізе. У главе 4 мы распавядалі, наколькі важна для струменевага прыкладання адмоваўстойлівы, лакальны стан. Першы прыклад з гэтага раздзела прадэманстраваў, чаму настолькі важны лакальны стан - ён дае магчымасць адсочваць, якую інфармацыю вы ўжо бачылі. Лакальны доступ дазваляе пазбегнуць сеткавых затрымак, дзякуючы чаму прыкладанне становіцца больш прадукцыйным і ўстойлівым да памылак.

Пры выкананні любой аперацыі скруткі або агрэгавання неабходна паказаць назву сховішчы стану. Аперацыі скруткі і агрэгаванні вяртаюць асобнік KTable, а KTable выкарыстае сховішча стану для замены старых вынікаў новымі. Як вы бачылі, далёка не ўсе абнаўленні адпраўляюцца далей па канвееры, і гэта важна, паколькі аперацыі агрэгавання прызначаны для атрымання выніковай інфармацыі. Калі не ўжываць лакальны стан, KTable будзе адпраўляць далей усе вынікі агрэгавання і скруткі.

Далей мы паглядзім на выкананне такіх аперацый, як агрэгавання, у межах канкрэтнага прамежку часу - так званых аконных аперацый (windowing operations).

5.3.2. Аконныя аперацыі

У папярэднім раздзеле мы пазнаёміліся са «слізгальнымі» скруткам і агрэгаваннем. Прыкладанне вырабляла бесперапынную скрутку аб'ёму продажаў акцый з наступным агрэгаваннем пяці найболей прадаваных на біржы акцый.

Часам падобныя бесперапынныя агрэгавання і скрутка вынікаў неабходны. А часам трэба выканаць аперацыі толькі над зададзеным прамежкам часу. Напрыклад, вылічыць, колькі было зроблена біржавых аперацый з акцыямі канкрэтнай кампаніі за апошнія 10 хвілін. Або колькі карыстальнікаў націснула на новы рэкламны банэр за апошнія 15 хвілін. Прыкладанне можа вырабляць такія аперацыі шматкроць, але з вынікамі, якія адносяцца толькі да зададзеных прамежкаў часу (часовым вокнам).

Падлік біржавых транзакцый па пакупніку

У наступным прыкладзе мы зоймемся адсочваннем біржавых транзакцый па некалькіх трэйдарам – альбо буйным арганізацыям, альбо цямлівым фінансістам-адзіночкам.

Існуе дзве магчымыя прычыны для падобнага адсочвання. Адна з іх - неабходнасць ведаць, што купляюць / прадаюць лідэры рынку. Калі гэтыя буйныя гульцы і спрактыкаваныя інвестары бачаць для сябе адкрываныя магчымасці, мае сэнс прытрымлівацца іх стратэгіі. Другая прычына заключаецца ў жаданні заўважыць любыя магчымыя прыкметы незаконных здзелак з выкарыстаннем унутранай інфармацыі. Для гэтага вам спатрэбіцца прааналізаваць карэляцыю буйных усплёскаў продажаў з важнымі прэс-рэлізамі.

Такое адсочванне складаецца з такіх этапаў, як:

  • стварэнне патоку для чытання з топіка stock-transactions;
  • групоўка ўваходных запісаў па ідэнтыфікатару пакупніка і біржавому знаку акцыі. Выклік метаду groupBy вяртае асобнік класа KGroupedStream;
  • вяртанне метадам KGroupedStream.windowedBy патоку дадзеных, абмежаванага часовым акном, што дазваляе выконваць аконнае агрэгаванне. У залежнасці ад тыпу акна вяртаецца альбо TimeWindowedKStream, альбо SessionWindowedKStream;
  • падлік транзакцый для аперацыі агрэгавання. Аконны паток даных вызначае, ці ўлічваецца пры гэтым падліку канкрэтны запіс;
  • запіс вынікаў у топік або вывад іх у кансоль падчас распрацоўкі.

Тапалогія дадзенага прыкладання простая, але навочная яе карцінка не перашкодзіць. Зірнем на мал. 5.11.

Далей мы разгледзім функцыянальнасць аконных аперацый і адпаведны код.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»

Тыпы вокнаў

У Kafka Streams існуе тры тыпу вокнаў:

  • сеансавыя;
  • "кульгаюцца" (tumbling);
  • слізгальныя/«скачучыя» (sliding/hopping).

Якое абраць - залежыць ад бізнес-патрабаванняў. "Кіляюцца" і "скачуць" вокны абмяжоўваюцца па часе, у то час як абмежаванні сеансавых злучаны з дзеяннямі карыстачоў - працягласць сеансу (-ов) вызначаецца выключна тым, наколькі актыўна паводзіць сябе карыстач. Галоўнае - не забываць, што ўсе тыпы вокнаў засноўваюцца на пазнаках даты/часу запісаў, а не на сістэмным часе.

Далей мы рэалізуем нашу тапалогію з кожным з тыпаў вокнаў. Поўны код будзе прыведзены толькі ў першым прыкладзе, для іншых тыпаў вокнаў нічога не зменіцца, акрамя тыпу аконнай аперацыі.

Сеансавыя вокны

Сеансавыя вокны моцна адрозніваюцца ад усіх астатніх тыпаў вокнаў. Яны абмяжоўваюцца не столькі па часе, колькі актыўнасцю карыстальніка (ці актыўнасцю той сутнасці, якую вы хацелі б адсочваць). Сеансавыя вокны размяжоўваюцца перыядамі бяздзейнасці.

Малюнак 5.12 ілюструе паняцце сеансавых вокнаў. Меншы сеанс будзе злівацца з сеансам злева ад яго. А сеанс справа будзе асобным, паколькі ідзе за працяглым перыядам бяздзейнасці. Сеансавыя вокны засноўваюцца на дзеяннях карыстачоў, але ўжываюць пазнакі даты/часу з запісаў для вызначэння таго, да якога сеансу адносіцца запіс.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»

Выкарыстанне сеансавых вокнаў для адсочвання біржавых транзакцый

Скарыстаемся сеансавымі вокнамі для захопу інфармацыі аб біржавых транзакцыях. Рэалізацыя сеансавых вокнаў паказана ў лістынгу 5.5 (які можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Большасць аперацый гэтай тапалогіі вы ўжо сустракалі, так што няма галечы разглядаць іх тут ізноў. Але ёсць тут і некалькі новых элементаў, якія мы зараз абмяркуем.

Пры ўсякай аперацыі groupBy звычайна выконваецца якая-небудзь аперацыя агрэгавання (агрэгаванне, скрутка або падлік колькасці). Можна выканаць ці назапашвальнае агрэгаванне з нарастальным вынікам, ці аконнае агрэгаванне, пры якім улічваюцца запісы ў межах зададзенага часавага акна.

Код з лістынга 5.5/5.13 выконвае падлік колькасці транзакцый у межах сеансавых вокнаў. На мал. XNUMX гэтыя дзеянні аналізуюцца пакрокава.

З дапамогай выкліку windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) мы ствараем сеансавае акно з інтэрвалам бяздзейнасці 20 секунд і інтэрвалам захавання 15 хвілін. Інтэрвал бяздзейнасці 20 секунд азначае, што прыкладанне будзе ўключаць любую запіс, якая паступіць у межах 20 секунд ад заканчэння ці пачатку бягучага сеансу ў бягучы (актыўны) сеанс.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Далей мы паказваем, якую аперацыю агрэгавання трэба выканаць у сеансавым акне - у дадзеным выпадку count. Калі ўваходны запіс выходзіць за межы інтэрвалу бяздзейнасці (з любой з бакоў ад пазнакі даты/часу), то прыкладанне стварае новы сеанс. Інтэрвал захавання азначае падтрыманне сеансу на працягу вызначанага часу і дапушчае запозненыя дадзеныя, якія выходзяць за перыяд бяздзейнасці сеансу, але ўсё яшчэ могуць быць далучаныя. Акрамя таго, пачатак і канец новага сеансу, які атрымаўся ў выніку аб'яднання, адпавядаюць самай ранняй і самай позняй метцы даты/часу.

Разгледзім некалькі запісаў з метаду count, каб убачыць, як працуюць сеансы (табл. 5.1).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Пры паступленні запісаў мы шукаем ужо існуючыя сеансы з тым жа ключом, часам заканчэння менш чым бягучая пазнака даты/часу — інтэрвал бяздзейнасці і часам пачала больш чым бягучая пазнака даты/часу + інтэрвал бяздзейнасці. З улікам гэтага чатыры запісы з табл. 5.1/XNUMX зліваюцца ў адзіны сеанс наступным чынам.

1. Першай паступае запіс 1, так што час пачатку роўна часу канчатка і роўна 00:00:00.

2. Далей паступае запіс 2, і мы шукаем сеансы, якія сканчаюцца не раней 23:59:55 і якія пачынаюцца не пазней 00:00:35. Знаходзім запіс 1 і аб'ядноўваем сеансы 1 і 2. Бярэм час пачатку сеанса 1 (ранейшы) і час канчатка сеансу 2 (пазней), так што наш новы сеанс пачынаецца ў 00:00:00 і сканчаецца ў 00:00:15.

3. Паступае запіс 3, мы шукаем сеансы паміж 00:00:30 і 00:01:10 і не знаходзім ніводнага. Дадаем другі сеанс для ключа 123-345-654,FFBE, які пачынаецца і заканчваецца ў 00:00:50.

4. Паступае запіс 4, і мы шукаем сеансы паміж 23:59:45 і 00:00:25. На гэты раз знаходзяцца абодва сеансы - 1 і 2. Усе тры сеансы аб'ядноўваюцца ў адзін, з часам пачатку 00:00:00 і часам заканчэння 00:00:15.

З расказанага ў гэтым раздзеле варта запомніць наступныя важныя нюансы:

  • сеансы - не вокны фіксаванага памеру. Працягласць сеанса вызначаецца актыўнасцю ў рамках зададзенага прамежку часу;
  • пазнакі даты/часу ў дадзеных вызначаюць, ці трапляе падзея ў існуючы сеанс або ў прамежак бяздзейнасці.

Далей мы абмяркуем наступную разнавіднасць вокнаў - «куляюцца» вокны.

«Кіляюцца» вокны

«Кіляюцца» (tumbling) вокны захопліваюць падзеі, якія трапляюць у вызначаны прамежак часу. Уявіце сабе, што вам трэба захопліваць усе біржавыя транзакцыі нейкай кампаніі кожныя 20 секунд, так што вы збіраеце ўсе падзеі за гэты прамежак часу. Па канчатку 20-секунднага інтэрвалу акно "куляецца" і пераходзіць на новы 20-секундны інтэрвал назірання. Малюнак 5.14 ілюструе гэтую сытуацыю.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Як вы можаце бачыць, усё паступілі за апошнія 20 секунд падзеі ўключаны ў акно. Па канчатку гэтага прамежку часу ствараецца новае акно.

У лістынгу 5.6 прыведзены код, які дэманструе выкарыстанне "куляюцца" вокнаў для захопу кожныя 20 секунд біржавых транзакцый (яго можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Дзякуючы гэтай невялікай змене выкліку метаду TimeWindows.of можна выкарыстоўваць «кульгае» акно. У дадзеным прыкладзе няма выкліку метаду until(), з прычыны чаго будзе выкарыстоўвацца інтэрвал захавання па змаўчанні, роўны 24 гадзінам.

Нарэшце, пара перайсці да апошняга з варыянтаў вокнаў – «скачуць» (hopping) вокнам.

Слізгальныя («скачучыя») вокны

Слізгальныя/«скачучыя» (sliding/hopping) вокны падобныя на «куляюцца», але з невялікім адрозненнем. Слізгальныя вокны не чакаюць канчаткі інтэрвалу часу перад стварэннем новага акна для апрацоўкі нядаўніх падзей. Яны запускаюць новыя вылічэнні пасля інтэрвалу чакання, меншага чым працягласць акна.

Для ілюстрацыі адрозненняў "куляюцца" і "скачуць" вокнаў вернемся да прыкладу з падлікам біржавых транзакцый. Наша мэта па-ранейшаму заключаецца ў падліку колькасці транзакцый, але нам не хацелася б чакаць увесь прамежак часу перад абнаўленнем лічыльніка. Замест гэтага мы будзем абнаўляць лічыльнік праз карацейшыя прамежкі часу. Напрыклад, падлічваць колькасць транзакцый мы будзем па-ранейшаму кожныя 20 секунд, але абнаўляць лічыльнік - кожныя 5 секунд, як паказана на мал. 5.15. Пры гэтым у нас аказваецца тры акны вынікаў з перакрываюцца дадзенымі.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
У лістынгу 5.7 прыведзены код для задання слізгальных вокнаў (яго можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
"Калябаецца" акно можна пераўтварыць у "скача" з дапамогай дадання выкліку метаду advanceBy(). У прыведзеным прыкладзе інтэрвал захавання роўны 15 хвілінам.

Вы ўбачылі ў гэтым раздзеле, як абмяжоўваць вынікі агрэгавання часовымі вокнамі. У прыватнасці, хацелася б, каб вы запомнілі з гэтага раздзела наступныя тры рэчы:

  • памер сеансавых вокнаў абмяжоўваецца не прамежкам часу, а актыўнасцю карыстальнікаў;
  • «куляюцца» вокны даюць уяўленне пра падзеі ў рамках зададзенага перыяду часу;
  • працягласць працы «скачуць» вокнаў фіксаваная, але яны часта абнаўляюцца і могуць утрымоўваць ва ўсіх вокнах якія перасякаюцца запісы.

Далей мы даведаемся, як пераўтварыць KTable назад у KStream для злучэння.

5.3.3. Злучэнне аб'ектаў KStream і KTable

У раздзеле 4 мы абмяркоўвалі злучэнне двух аб'ектаў KStream. Цяпер нам трэба будзе навучыцца злучаць KTable і KStream. Спатрэбіцца гэта можа па наступным простым чынніку. KStream - струмень запісаў, а KTable - струмень абнаўленняў запісаў, але часам можа быць трэба дадаць дадатковы кантэкст да струменя запісаў з дапамогай абнаўленняў з KTable.

Возьмем дадзеныя аб колькасці біржавых транзакцый і злучым іх з біржавымі навінамі па адпаведных галінах прамысловасці. Вось што трэба зрабіць, што дабіцца гэтага з улікам ужо наяўнага кода.

  1. Пераўтварыць аб'ект KTable з дадзенымі аб колькасці біржавых транзакцый у KStream з наступнай заменай ключа на ключ, які пазначае галіну прамысловасці, якая адпавядае дадзенаму знаку акцый.
  2. Стварыць аб'ект KTable, які чытае дадзеныя з топіка з біржавымі навінамі. Гэты новы KTable будзе катэгарызаваць па галінах прамысловасці.
  3. Злучыць абнаўленні навін з інфармацыяй аб колькасці біржавых транзакцый па галінах прамысловасці.

Цяпер паглядзім, як рэалізаваць гэты план дзеянняў.

Пераўтварэнне KTable ў KStream

Для пераўтварэння KTable у KStream неабходна зрабіць наступнае.

  1. Выклікаць метад KTable.toStream().
  2. З дапамогай выкліку метаду KStream.map замяніць ключ назвай галіны прамысловасці, пасля чаго выняць з асобніка Windowed аб'ект TransactionSummary.

Мы звяжам гэтыя аперацыі ланцужком наступным чынам (код можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (лістынг 5.8).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Паколькі мы выконваем аперацыю KStream.map, то паўторнае секцыянаванне для які вяртаецца асобніка KStream вырабляецца аўтаматычна пры ім выкарыстанні ў злучэнні.

Мы завяршылі працэс пераўтварэнні, далей нам трэба стварыць аб'ект KTable для чытання біржавых навін.

Стварэнне KTable для біржавых навін

На шчасце, для стварэння аб'екта KTable дастаткова аднаго радка кода (гэты код можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (лістынг 5.9).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Варта адзначыць, што ніякіх аб'ектаў Serde паказваць не патрабуецца, паколькі ў наладах выкарыстоўваюцца радковыя Serde. Таксама дзякуючы прымяненню пераліку EARLIEST табліца запаўняецца запісамі ў самым пачатку.

Цяпер мы можам перайсці да заключнага кроку – злучэння.

Злучэнне абнаўленняў навін са звесткамі аб ліку транзакцый

Стварэнне злучэння не ўяўляе складанасцей. Мы скарыстаемся левым злучэннем на выпадак, калі па адпаведнай галіне прамысловасці няма біржавых навін (патрэбны код можна знайсці ў файле src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (лістынг 5.10).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Гэты аператар leftJoin дастаткова просты. У адрозненне ад злучэнняў з раздзела 4, метад JoinWindow не выкарыстоўваецца, паколькі пры выкананні злучэння KStream-KTable для кожнага ключа ў KTable прысутнічае толькі адзін запіс. Такое злучэнне не абмяжоўваецца па часе: запіс ці ёсць у KTable, ці адсутнічае. Асноўная выснова: з дапамогай аб'ектаў KTable можна ўзбагачаць KStream радзей якія абнаўляюцца даведкавымі дадзенымі.

А зараз мы разгледзім больш эфектыўны спосаб узбагачэння падзей з KStream.

5.3.4. Аб'екты GlobalKTable

Як вы зразумелі, існуе неабходнасць узбагачэння патокаў падзей або дадання да іх кантэксту. У чале 4 вы бачылі злучэнні двух аб'ектаў KStream, а ў папярэдняй частцы - злучэнне KStream і KTable. Ва ўсіх гэтых выпадках неабходна паўторнае секцыянаванне струменя дадзеных пры адлюстраванні ключоў на новы тып або значэнне. Часам паўторнае секцыянаванне выконваецца відавочнай выявай, а часам Kafka Streams робіць гэта аўтаматычна. Паўторнае секцыянаванне неабходна, паколькі ключы змяніліся і запісы павінны апынуцца ў новых секцыях, інакш злучэнне апынецца немагчымым (гэта абмяркоўвалася ў главе 4, у пункце "Паўторнае секцыянаванне дадзеных" падраздзела 4.2.4).

Паўторнае секцыянаванне мае свой кошт

Паўторнае секцыянаванне патрабуе выдаткаў - дадатковых выдаткаў рэсурсаў на стварэнне прамежкавых топікаў, захаванне якія дублююцца дадзеных у яшчэ адным топіцы; яно таксама азначае падвышэнне затрымкі з прычыны запісу і чытанні з гэтага топіка. Акрамя таго, пры неабходнасці выканаць злучэнне больш за па адным аспекце або вымярэнні трэба арганізаваць злучэнні ланцужком, адлюстраваць запісы з новымі ключамі і зноў правесці працэс паўторнага секцыянавання.

Злучэнне з наборамі дадзеных меншага памеру

У некаторых выпадках аб'ём даведачных дадзеных, з якімі плануецца злучэнне, адносна невялікі, так што поўныя іх копіі могуць змясціцца лакальна на кожным з вузлоў. Для падобных сітуацый у Kafka Streams прадугледжаны клас GlobalKTable.

Асобнікі GlobalKTable унікальныя, паколькі прыкладанне рэплікуе ўсе дадзеныя на кожны з вузлоў. А паколькі на кожным з вузлоў прысутнічаюць усе дадзеныя, няма неабходнасці секцыянаваць струмень падзей па ключы даведачных дадзеных, каб ён быў даступны ўсім секцыям. З дапамогай аб'ектаў GlobalKTable можна таксама выконваць бясключавыя злучэнні. Вернемся да аднаго з папярэдніх прыкладаў для дэманстрацыі гэтай магчымасці.

Злучэнне аб'ектаў KStream з аб'ектамі GlobalKTable

У падраздзеле 5.3.2 мы выканалі аконнае агрэгаванне біржавых транзакцый па пакупніках. Вынікі гэтага агрэгавання выглядалі прыкладна наступным чынам:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Хоць гэтыя вынікі адпавядалі пастаўленай мэце, было б зручней, калі б выводзілася таксама імя кліента і поўная назва кампаніі. Каб дадаць імя пакупніка і назоў кампаніі, можна выконваць звычайныя злучэнні, але пры гэтым спатрэбіцца вырабіць два адлюстравання ключоў і паўторнае секцыянаванне. З дапамогай GlobalKTable можна пазбегнуць выдаткаў на падобныя аперацыі.

Для гэтага мы скарыстаемся аб'ектам countStream з лістынга 5.11 (адпаведны код можна знайсці ў файле src/main/java/bbejeck/chapter_5/GlobalKTableExample.java), злучыўшы яго з двума аб'ектамі GlobalKTable.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Мы ўжо абмяркоўвалі гэта раней, так што не буду паўтарацца. Але адзначу, што код у функцыі toStream().map дзеля удобочитаемости абстрагаваны ў аб'ект-функцыю замест убудаванага лямбда-выразы.

Наступны этап - аб'ява двух асобнікаў GlobalKTable (прыведзены код можна знайсці ў файле src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (лістынг 5.12).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»

Звярніце ўвагу, што назвы топікаў апісваюцца з дапамогай пералічаных тыпаў.

Цяпер, калі мы падрыхтавалі ўсе кампаненты, засталося напісаць код для злучэння (які можна знайсці ў файле src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (лістынг 5.13).

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
Хоць у гэтым кодзе прысутнічаюць два злучэнні, яны арганізаваны ў выглядзе ланцужка, паколькі асобна ніводны з іх вынікаў не выкарыстоўваецца. Вынікі выводзяцца напрыканцы ўсёй аперацыі.

Пры запуску вышэйпрыведзенай аперацыі злучэння вы атрымаеце вынікі наступнага выгляду:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Іста не змянілася, але гэтыя вынікі выглядаюць больш зразумела.

Калі лічыць раздзел 4, вы ўжо бачылі некалькі тыпаў злучэнняў у дзеянні. Яны пералічаны ў табл. 5.2. Гэтая табліца адлюстроўвае магчымасці злучэння, актуальныя для версіі Kafka Streams 1.0.0; у будучых выпусках, магчыма, нешта памяняецца.

Кніга «Kafka Streams у дзеянні. Прыкладанні і мікрасэрвісы для працы ў рэальным часе»
У заключэнне нагадаю асноўнае: вы можаце злучаць струмені падзей (KStream) і струмені абнаўленняў (KTable) з дапамогай лакальнага стану. Акрамя таго, калі памер даведачных дадзеных не занадта вялікі, можна скарыстацца аб'ектам GlobalKTable. GlobalKTable рэплікуюць усе секцыі на кожны з вузлоў прыкладання Kafka Streams, забяспечваючы тым самым даступнасць усіх дадзеных незалежна ад таго, які секцыі адпавядае ключ.

Далей мы ўбачым магчымасць Kafka Streams, дзякуючы якой можна назіраць змены стану без спажывання дадзеных з топіка Kafka.

5.3.5. Даступны для запытаў стан

Мы ўжо выконвалі некалькі аперацый з удзелам стану і заўсёды выводзілі вынікі ў кансоль (для мэт распрацоўкі) або запісвалі іх у топік (для мэт прамысловай эксплуатацыі). Пры запісе вынікаў у топік даводзіцца выкарыстоўваць спажывец Kafka для іх прагляду.

Чытанне дадзеных з гэтых топікаў можна лічыць разнавіднасцю матэрыялізаваных уяўленняў (materialized views). Для нашых задач можна выкарыстоўваць вызначэнне матэрыялізаванага прадстаўлення з «Вікіпедыі»: «…фізічны аб'ект базы дадзеных, які змяшчае вынікі выканання запыту. Напрыклад, яно можа быць лакальнай копіяй выдаленых дадзеных, ці падмноствам радкоў і/ці слупкоў табліцы ці вынікаў злучэння, ці зводнай табліцай, атрыманай з дапамогай агрэгавання» (https://en.wikipedia.org/wiki/Materialized_view).

Kafka Streams таксама дазваляе выконваць інтэрактыўныя запыты (interactive queries) да сховішчаў стану, што дае магчымасць непасрэднага чытання гэтых матэрыялізаваных уяўленняў. Важна адзначыць, што запыт да сховішча стану носіць характар ​​аперацыі "толькі для чытання". Дзякуючы гэтаму вы можаце не баяцца выпадкова зрабіць стан няўзгодненым падчас апрацоўкі дадзеных дадаткам.

Магчымасць непасрэдных запытаў да сховішчаў стану мае вялікае значэнне. Яна значыць, што можна ствараць прыкладанні - інфармацыйныя панэлі без неабходнасці спачатку атрымліваць дадзеныя ад спажыўца Kafka. Падвышае яна і эфектыўнасць прыкладання, дзякуючы таму што не патрабуецца зноў запісваць дадзеныя:

  • дзякуючы лакальнасці дадзеных да іх можна хутка звярнуцца;
  • выключаецца дубляванне дадзеных, паколькі яны не запісваюцца ў вонкавае сховішча.

Галоўнае, што я хацеў бы, каб вы запомнілі: можна напрамую выконваць запыты да стану з прыкладання. Нельга пераацаніць магчымасці, якія вам гэта дае. Замест таго каб спажываць дадзеныя з Kafka і захоўваць запісы ў базе дадзеных для прыкладання, можна выконваць запыты да сховішчаў стану з тым жа вынікам. Непасрэдныя запыты да сховішчаў стану азначаюць меншы аб'ём кода (адсутнасць спажыўца) і менш праграмнага забеспячэння (адсутнасць запатрабавання ў табліцы базы дадзеных для захоўвання вынікаў).

Мы ахапілі немалы аб'ём інфармацыі ў гэтым раздзеле, таму на час спынім наша абмеркаванне інтэрактыўных запытаў да сховішчаў стану. Але не хвалюйцеся: у главе 9 мы будзем ствараць простае прыкладанне - інфармацыйную панэль з інтэрактыўнымі запытамі. Для дэманстрацыі інтэрактыўных запытаў і магчымасцяў іх дадання ў прыкладанні Kafka Streams у ім будуць выкарыстоўвацца некаторыя з прыкладаў гэтай і папярэдніх раздзелаў.

Рэзюмэ

  • Аб'екты KStream увасабляюць струмені падзей, параўнальныя са ўстаўкамі ў базу дадзеных. Аб'екты KTable увасабляюць струмені абнаўленняў, яны больш падобныя з абнаўленнямі ў базе дадзеных. Памер аб'екта KTable не расце, старыя запісы замяняюцца новымі.
  • Аб'екты KTable неабходныя для аперацый агрэгавання.
  • З дапамогай аконных аперацый можна разбіць агрэгаваныя дадзеныя па часавых кошыках.
  • Дзякуючы аб'ектам GlobalKTable можна атрымаць доступ да даведачных дадзеных у любым пункце прыкладання, незалежна ад разбіцця па секцыях.
  • Магчымы злучэнні паміж сабой аб'ектаў KStream, KTable і GlobalKTable.

Да гэтага часу мы канцэнтравалі ўвагу на стварэнні прыкладанняў Kafka Streams з дапамогай высокаўзроўневага DSL KStream. Хоць высокаўзроўневы падыход дазваляе ствараць акуратныя і лаканічныя праграмы, яго выкарыстанне ўяўляе сабой пэўны кампраміс. Праца з DSL KStream азначае павышэнне лаканічнасці кода за кошт зніжэння ступені кантролю. У наступным раздзеле мы разгледзім нізкаўзроўневы API вузлоў-апрацоўшчыкаў і паспрабуем іншыя кампрамісы. Праграмы стануць даўжэй, чым былі да гэтага часу, затое ў нас з'явіцца магчымасць стварэння практычна любога вузла-апрацоўшчыка, які толькі можа нам спатрэбіцца.

→ Больш падрабязна з кнігай можна азнаёміцца ​​на сайце выдавецтва

→ Для Хаброжыцеляў зніжка 25% па купоне Kafka Streams

→ Па факце аплаты папяровай версіі кнігі на e-mail высылаецца электронная кніга.

Крыніца: habr.com

Дадаць каментар