«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер» Сәлем, Хабро тұрғындары! Бұл кітап ағынды өңдеуді түсінгісі келетін кез келген әзірлеушіге жарамды. Бөлінген бағдарламалауды түсіну сізге Кафка мен Кафка ағындарын жақсырақ түсінуге көмектеседі. Кафка құрылымының өзін білу жақсы болар еді, бірақ бұл қажет емес: мен сізге қажет нәрсенің бәрін айтамын. Тәжірибелі Кафка әзірлеушілері де, жаңадан бастаушылар да осы кітапта Kafka Streams кітапханасын пайдаланып қызықты ағынды өңдеу қосымшаларын жасауды үйренеді. Серияландыру сияқты ұғымдармен бұрыннан таныс орта және жоғары деңгейлі Java әзірлеушілері Kafka Streams қосымшаларын жасау үшін өз дағдыларын қолдануды үйренеді. Кітаптың бастапқы коды Java 8 тілінде жазылған және Java 8 lambda өрнек синтаксисін айтарлықтай пайдаланады, сондықтан lambda функцияларымен (тіпті басқа бағдарламалау тілінде) жұмыс істеуді білу пайдалы болады.

Үзінді. 5.3. Агрегация және терезелеу операциялары

Бұл бөлімде біз Кафка ағындарының ең перспективалы бөліктерін зерттеуге көшеміз. Осы уақытқа дейін біз Кафка ағындарының келесі аспектілерін қарастырдық:

  • өңдеу топологиясын құру;
  • ағынды қолданбаларда күйді пайдалану;
  • деректер ағынының қосылымдарын орындау;
  • оқиғалар ағындары (KStream) және жаңарту ағындары (KTable) арасындағы айырмашылықтар.

Келесі мысалдарда біз осы элементтердің барлығын біріктіреміз. Сондай-ақ, сіз ағынды қолданбалардың тағы бір тамаша мүмкіндігін терезелеу туралы білесіз. Біздің бірінші мысал қарапайым жинақтау болады.

5.3.1. Өнеркәсіп секторы бойынша акцияларды сатуды жинақтау

Ағынды деректермен жұмыс істеу кезінде біріктіру және топтау маңызды құрал болып табылады. Жеке жазбаларды тексеру көбіне жеткіліксіз. Деректерден қосымша ақпаратты алу үшін оларды топтастыру және біріктіру қажет.

Бұл мысалда сіз бірнеше саладағы компаниялардың акцияларының сатылым көлемін бақылауы қажет күнделікті трейдердің костюмін киесіз. Нақтырақ айтқанда, сізді әр салада ең көп сату үлесі бар бес компания қызықтырады.

Мұндай жинақтау деректерді қажетті пішінге (жалпы сөзбен айтқанда) аудару үшін келесі бірнеше қадамдарды қажет етеді.

  1. Шикізат саудасы туралы ақпаратты жариялайтын тақырыпқа негізделген дереккөзді жасаңыз. StockTransaction түріндегі нысанды ShareVolume түріндегі нысанмен салыстыруымыз керек. Мәселе мынада, StockTransaction нысанында сату метадеректері бар, бірақ бізге тек сатылатын акциялар саны туралы деректер қажет.
  2. ShareVolume деректерін акция белгісі бойынша топтаңыз. Таңба бойынша топтастырылғаннан кейін бұл деректерді акцияларды сату көлемдерінің аралық жиындарына жиюге болады. Айта кетейік, KStream.groupBy әдісі KGroupedStream түріндегі дананы қайтарады. KGroupedStream.reduce әдісін одан әрі шақыру арқылы KTable данасын алуға болады.

KGroupedStream интерфейсі дегеніміз не

KStream.groupBy және KStream.groupByKey әдістері KGroupedStream данасын қайтарады. KGroupedStream – пернелер бойынша топтастырудан кейінгі оқиғалар ағынының аралық көрінісі. Онымен тікелей жұмыс істеуге мүлдем арналмаған. Оның орнына KGroupedStream біріктіру операциялары үшін пайдаланылады, ол әрқашан KTable-ге әкеледі. Ал біріктіру әрекеттерінің нәтижесі KTable болғандықтан және олар мемлекеттік қойманы пайдаланатындықтан, нәтижесінде барлық жаңартулар құбырдан әрі қарай жіберілмеуі мүмкін.

KTable.groupBy әдісі ұқсас KGroupedTable - кілт арқылы қайта топтастырылған жаңартулар ағынының аралық көрінісін қайтарады.

Кішкене үзіліс жасап, суретке назар аударайық. 5.9, бұл біздің қол жеткізгенімізді көрсетеді. Бұл топология сізге бұрыннан таныс болуы керек.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Енді осы топологияның кодын қарастырайық (оны src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлынан табуға болады) (5.2 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Берілген код өзінің қысқалығымен және бірнеше жолда орындалатын әрекеттердің үлкен көлемімен ерекшеленеді. Builder.stream әдісінің бірінші параметрінде жаңа нәрсені байқауыңыз мүмкін: Consumed.withOffsetResetPolicy әдісі арқылы орнатылған AutoOffsetReset.EARLIEST (сонымен қатар LATEST бар) нөмір түрінің мәні. Бұл санау түрі әрбір KStream немесе KTable үшін ығысуды қалпына келтіру стратегиясын көрсету үшін пайдаланылуы мүмкін және конфигурациядағы ығысуды қалпына келтіру опциясынан басымдыққа ие болады.

GroupByKey және GroupBy

KStream интерфейсінде жазбаларды топтастырудың екі әдісі бар: GroupByKey және GroupBy. Екеуі де KGroupedTable қайтарады, сондықтан олардың арасындағы айырмашылық неде және қайсысын қашан пайдалану керек деген сұрақ туындауы мүмкін.

GroupByKey әдісі KStream ішіндегі кілттер әлдеқашан бос болмаған кезде пайдаланылады. Ең бастысы, «қайта бөлуді қажет етеді» жалауы ешқашан орнатылмаған.

GroupBy әдісі топтау пернелерін өзгерткеніңізді болжайды, сондықтан қайта бөлу жалаушасы шын мәніне орнатылады. GroupBy әдісінен кейін біріктірулерді, біріктірулерді және т.б. орындау автоматты түрде қайта бөлуге әкеледі.
Түйіндеме: Мүмкіндігінше GroupBy емес, GroupByKey пайдалануыңыз керек.

mapValues ​​және groupBy әдістерінің не істейтіні түсінікті, сондықтан sum() әдісін қарастырайық (src/main/java/bbejeck/model/ShareVolume.java) (5.3 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
ShareVolume.sum әдісі акция сату көлемінің ағымдағы жиынын қайтарады және барлық есептеулер тізбегінің нәтижесі KTable нысаны болып табылады . Енді сіз KTable ойнайтын рөлді түсінесіз. ShareVolume нысандары келгенде, сәйкес KTable нысаны соңғы ағымдағы жаңартуды сақтайды. Барлық жаңартулар алдыңғы shareVolumeKTable файлында көрсетілетінін есте ұстаған жөн, бірақ барлығы әрі қарай жіберілмейді.

Содан кейін біз әрбір салада сатылатын акциялардың ең жоғары көлемі бар бес компанияға жету үшін осы KTable деректерін (сатылған акциялар саны бойынша) пайдаланамыз. Бұл жағдайда біздің әрекеттеріміз бірінші жинақтау әрекеттеріне ұқсас болады.

  1. Жеке ShareVolume нысандарын сала бойынша топтау үшін басқа groupBy әрекетін орындаңыз.
  2. ShareVolume нысандарын қорытындылауды бастаңыз. Бұл жолы жинақтау нысаны тіркелген өлшемді басымдық кезегі болып табылады. Бұл белгіленген мөлшердегі кезекте акцияларының ең көп сатылған бес компаниясы ғана сақталады.
  3. Алдыңғы абзацтағы кезектерді жол мәніне салыңыз және сала бойынша саны бойынша ең көп сатылатын бес үздік акцияларды қайтарыңыз.
  4. Нәтижелерді тақырыпқа жол түрінде жазыңыз.

Суретте. 5.10-суретте деректер ағынының топологиясының графигі көрсетілген. Көріп отырғаныңыздай, өңдеудің екінші кезеңі өте қарапайым.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Енді біз өңдеудің осы екінші раундының құрылымын нақты түсіндік, біз оның бастапқы кодына жүгіне аламыз (оны src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java файлынан таба аласыз) (5.4 тізімі) .

Бұл инициализаторда fixedQueue айнымалысы бар. Бұл реттелетін нысан, ол Java.util.TreeSet адаптері болып табылады, ол саудаланатын акциялардың кему реті бойынша N үздік нәтижелерді бақылау үшін пайдаланылады.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Сіз groupBy және mapValues ​​қоңырауларын көрдіңіз, сондықтан біз оларға кірмейміз (KTable.toStream әдісін шақырамыз, себебі KTable.print әдісі ескірген). Бірақ сіз әлі aggregate() KTable нұсқасын көрген жоқсыз, сондықтан біз оны талқылауға аз уақыт жұмсаймыз.

Естеріңізде болса, KTable бағдарламасының айырмашылығы сол кілттері бар жазбалар жаңартулар болып саналады. KTable ескі жазбаны жаңасымен ауыстырады. Агрегация ұқсас жолмен жүреді: бірдей кілті бар соңғы жазбалар біріктіріледі. Жазба келгенде, ол қосқыш арқылы FixedSizePriorityQueue класының данасына қосылады (жиынтық әдіс шақыруындағы екінші параметр), бірақ сол кілтпен басқа жазба бұрыннан бар болса, ескі жазба азайтуды (үшінші параметр жиынтық әдісті шақыру).

Мұның бәрі FixedSizePriorityQueue агрегаторы барлық мәндерді бір кілтпен біріктірмейтінін, бірақ акциялардың ең көп сатылатын N түрлерінің жылжымалы сомасын сақтайтынын білдіреді. Әрбір кіріс жазба осы уақытқа дейін сатылған акциялардың жалпы санын қамтиды. KTable сізге әрбір жаңартуды жылжымалы біріктіруді қажет етпестен, қазіргі уақытта қай компаниялардың акциялары ең көп сатылатындығы туралы ақпаратты береді.

Біз екі маңызды нәрсені істеуді үйрендік:

  • KTable-дегі мәндерді ортақ кілт арқылы топтастыру;
  • осы топталған мәндерде жинақтау және біріктіру сияқты пайдалы әрекеттерді орындаңыз.

Бұл әрекеттерді орындау жолын білу Kafka Streams қолданбасы арқылы қозғалатын деректердің мағынасын түсіну және оның қандай ақпаратты тасымалдайтынын түсіну үшін маңызды.

Біз сондай-ақ осы кітапта бұрын талқыланған кейбір негізгі ұғымдарды біріктірдік. 4-тарауда біз қатеге төзімді, жергілікті күйдің ағынды қолданба үшін қаншалықты маңызды екенін талқыладық. Осы тараудағы бірінші мысал жергілікті мемлекеттің неліктен соншалықты маңызды екенін көрсетті — ол сізге бұрыннан көрген ақпаратты бақылау мүмкіндігін береді. Жергілікті қол жеткізу желілік кідірістерді болдырмайды, бұл қолданбаны өнімдірек және қателерге төзімді етеді.

Кез келген жинақтау немесе біріктіру әрекетін орындаған кезде, сіз мемлекеттік қойманың атын көрсетуіңіз керек. Жинақтау және біріктіру әрекеттері KTable данасын қайтарады және KTable ескі нәтижелерді жаңасымен ауыстыру үшін күй жадын пайдаланады. Көріп отырғаныңыздай, барлық жаңартулар құбырға жіберілмейді және бұл маңызды, себебі біріктіру әрекеттері жиынтық ақпаратты шығаруға арналған. Жергілікті күйді қолданбасаңыз, KTable барлық жинақтау және жинақтау нәтижелерін қайта жібереді.

Әрі қарай, белгілі бір уақыт аралығында біріктіру сияқты операцияларды - терезелеу операциялары деп аталатын әрекеттерді қарастырамыз.

5.3.2. Терезе операциялары

Алдыңғы бөлімде біз жылжымалы конволюция мен біріктіруді таныстырдық. Қолданба биржадағы ең көп сатылатын бес акцияны біріктіру арқылы акцияларды сату көлемін үздіксіз көбейтуді орындады.

Кейде осындай үздіксіз жинақтау және нәтижелерді жинақтау қажет. Ал кейде операцияларды тек белгілі бір уақыт аралығында орындау қажет. Мысалы, соңғы 10 минут ішінде белгілі бір компанияның акцияларымен қанша айырбастау операциясы жасалғанын есептеңіз. Немесе соңғы 15 минутта қанша пайдаланушы жаңа жарнамалық баннерді басқаны. Қолданба мұндай әрекеттерді бірнеше рет орындауы мүмкін, бірақ нәтижелер тек белгілі бір уақыт кезеңдері үшін қолданылады (уақыт терезелері).

Сатып алушы бойынша айырбастау операцияларын санау

Келесі мысалда біз ірі ұйымдар немесе ақылды жеке қаржыгерлер сияқты бірнеше трейдерлердің акцияларының транзакцияларын бақылаймыз.

Бұл бақылаудың екі ықтимал себебі бар. Олардың бірі - нарық көшбасшыларының не сатып алатынын/сататынын білу қажеттілігі. Егер бұл ірі ойыншылар мен күрделі инвесторлар мүмкіндікті көрсе, олардың стратегиясын ұстанған жөн. Екінші себеп - заңсыз инсайдерлік сауданың кез келген ықтимал белгілерін анықтауға деген ұмтылыс. Мұны істеу үшін сізге маңызды пресс-релиздермен үлкен сатылым өсімінің корреляциясын талдау қажет болады.

Мұндай бақылау келесі қадамдардан тұрады:

  • биржалық операциялар тақырыбын оқу үшін ағын құру;
  • сатып алушы идентификаторы және акция белгісі бойынша кіріс жазбаларды топтау. groupBy әдісін шақыру KGroupedStream сыныбының данасын қайтарады;
  • KGroupedStream.windowedBy әдісі терезелі біріктіруге мүмкіндік беретін уақыт терезесімен шектелген деректер ағынын қайтарады. Терезе түріне байланысты TimeWindowedKStream немесе SessionWindowedKStream қайтарылады;
  • біріктіру операциясы үшін транзакциялар саны. Терезелік деректер ағыны осы есепте белгілі бір жазбаның ескерілетінін анықтайды;
  • тақырыпқа нәтижелерді жазу немесе әзірлеу кезінде оларды консольге шығару.

Бұл қолданбаның топологиясы қарапайым, бірақ оның нақты суреті пайдалы болар еді. Суретке назар аударайық. 5.11.

Әрі қарай, біз терезе әрекеттерінің функционалдығын және сәйкес кодты қарастырамыз.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»

Терезе түрлері

Кафка ағындарында терезелердің үш түрі бар:

  • сессиялық;
  • «төңкеру» (төбелесу);
  • сырғанау/секіру.

Қайсысын таңдау сіздің бизнес талаптарыңызға байланысты. Айналдыру және секіру терезелері уақытпен шектеледі, ал сеанс терезелері пайдаланушы әрекетімен шектеледі — сеанс(лар) ұзақтығы тек пайдаланушының қаншалықты белсенді екендігімен анықталады. Есте сақтау керек ең бастысы, барлық терезе түрлері жүйе уақытына емес, жазбалардың күн/уақыт белгілеріне негізделген.

Әрі қарай, біз терезе түрлерінің әрқайсысымен топологиямызды жүзеге асырамыз. Толық код бірінші мысалда ғана беріледі, басқа терезе түрлері үшін терезе әрекетінің түрінен басқа ештеңе өзгермейді.

Сеанс терезелері

Сеанс терезелері терезелердің барлық басқа түрлерінен айтарлықтай ерекшеленеді. Олар уақыт бойынша емес, пайдаланушының әрекетімен (немесе сіз бақылағыңыз келетін нысанның әрекетімен) шектеледі. Сеанс терезелері әрекетсіздік кезеңдерімен шектеледі.

5.12-сурет сеанс терезелерінің түсінігін көрсетеді. Кішірек сеанс сол жағындағы сеанспен біріктіріледі. Ал оң жақтағы сеанс бөлек болады, себебі ол ұзақ әрекетсіздік кезеңінен кейін жүреді. Сеанс терезелері пайдаланушы әрекетіне негізделген, бірақ жазбаның қай сеансқа жататынын анықтау үшін жазбалардағы күн/уақыт белгілерін пайдаланыңыз.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»

Биржа операцияларын бақылау үшін сеанс терезелерін пайдалану

Айырбастау операциялары туралы ақпаратты алу үшін сеанс терезелерін қолданайық. Сеанс терезелерінің орындалуы 5.5 листингінде көрсетілген (оны src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java ішінен табуға болады).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Сіз осы топологиядағы операциялардың көпшілігін көрдіңіз, сондықтан оларды осы жерде қайта қараудың қажеті жоқ. Бірақ біз қазір талқылайтын бірнеше жаңа элементтер бар.

Кез келген groupBy әрекеті әдетте қандай да бір жинақтау әрекетін (агрегация, жинақтау немесе санау) орындайды. Орындалатын жиынтықпен жиынтық біріктіруді немесе белгіленген уақыт терезесінде жазбаларды есепке алатын терезені біріктіруді орындауға болады.

5.5 листингіндегі код сеанс терезелеріндегі транзакциялар санын есептейді. Суретте. 5.13 Бұл әрекеттер кезең-кезеңімен талданады.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) шақыру арқылы біз әрекетсіздік аралығы 20 секунд және тұрақтылық аралығы 15 минут болатын сеанс терезесін жасаймыз. 20 секундтық бос уақыт аралығы қолданбаның ағымдағы сеанс аяқталғаннан немесе басталғаннан кейін 20 секунд ішінде келетін кез келген жазбаны ағымдағы (белсенді) сеансқа қосатынын білдіреді.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Әрі қарай, біз сеанс терезесінде қандай біріктіру операциясын орындау керектігін көрсетеміз - бұл жағдайда санау. Кіріс жазба әрекетсіздік терезесінің сыртына түссе (күн/уақыт белгісінің кез келген жағы), бағдарлама жаңа сеанс жасайды. Сақтау аралығы сеанстың белгілі бір уақыт мөлшерінде сақталуын білдіреді және сеанстың әрекетсіздік кезеңінен асатын, бірақ әлі де тіркелуі мүмкін кеш деректерге мүмкіндік береді. Сонымен қатар, біріктіру нәтижесінде пайда болатын жаңа сеанстың басталуы мен аяқталуы ең ерте және соңғы күн/уақыт белгісіне сәйкес келеді.

Сеанстардың қалай жұмыс істейтінін көру үшін санау әдісінен бірнеше жазбаларды қарастырайық (5.1-кесте).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Жазбалар келгенде, біз бірдей кілтпен бар сеанстарды іздейміз, аяқталу уақыты ағымдағы күн/уақыт белгісінен аз - әрекетсіздік аралығы және ағымдағы күн/уақыт белгісінен үлкен басталу уақыты + әрекетсіздік аралығы. Осыны ескере отырып, кестеден төрт жазба. 5.1 келесідей бір сеансқа біріктіріледі.

1. 1-жазба бірінші келеді, сондықтан басталу уақыты аяқталу уақытына тең және 00:00:00.

2. Содан кейін 2-ші жазба келеді және біз 23:59:55-тен ерте аяқталмайтын және 00:00:35-тен кешіктірмей басталатын сеанстарды іздейміз. Біз 1 жазбаны тауып, 1 және 2 сеанстарды біріктіреміз. Жаңа сессия 1:2:00-де басталып, 00-де аяқталуы үшін 00-сеанстың басталу уақытын (бұрынғы) және 00-сеанстың аяқталу уақытын (кейінірек) аламыз: 00:15.

3. 3 жазба келеді, біз 00:00:30 және 00:01:10 аралығындағы сеанстарды іздейміз және ешқайсысын таппаймыз. 123:345:654-де басталып, аяқталатын 00-00-50,FFBE кілті үшін екінші сеансты қосыңыз.

4. 4 жазба келді және біз 23:59:45 пен 00:00:25 арасындағы сеанстарды іздейміз. Бұл жолы 1 және 2 сеанстардың екеуі де табылды. Барлық үш сеанс бір сеансқа біріктірілген, басталу уақыты 00:00:00 және аяқталу уақыты 00:00:15.

Осы бөлімде сипатталғандардан келесі маңызды нюанстарды есте ұстаған жөн:

  • сеанстар бекітілген өлшемді терезелер емес. Сеанстың ұзақтығы белгілі бір уақыт аралығындағы әрекетпен анықталады;
  • Деректердегі күн/уақыт таңбалары оқиғаның бар сеансқа немесе бос кезеңге сәйкес келетінін анықтайды.

Әрі қарай біз терезенің келесі түрін - «құлдырғыш» терезелерді талқылаймыз.

«Төңкерілген» терезелер

Айналмалы терезелер белгілі бір уақыт аралығындағы оқиғаларды түсіреді. Әр 20 секунд сайын белгілі бір компанияның барлық акцияларын түсіріп алу керек деп елестетіп көріңіз, осылайша сіз осы уақыт аралығындағы барлық оқиғаларды жинайсыз. 20 секундтық интервалдың соңында терезе айналады және жаңа 20 секундтық бақылау аралығына ауысады. 5.14-сурет бұл жағдайды көрсетеді.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Көріп отырғаныңыздай, соңғы 20 секундта алынған барлық оқиғалар терезеге енгізілген. Осы уақыт кезеңінің соңында жаңа терезе жасалады.

5.6 листинг әр 20 секунд сайын биржалық транзакцияларды түсіру үшін құлдырау терезелерін пайдалануды көрсететін кодты көрсетеді (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ішінде табылған).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
TimeWindows.of әдісінің шақыруындағы осы шағын өзгерту арқылы сіз бұрмалау терезесін пайдалана аласыз. Бұл мысал дейін () әдісін шақырмайды, сондықтан 24 сағаттық әдепкі сақтау аралығы пайдаланылады.

Ақырында, терезе опцияларының соңғысына көшудің уақыты келді - «секіру» терезелері.

Жылжымалы («секіру») терезелер

Жылжымалы/секіргіш терезелер құлататын терезелерге ұқсас, бірақ шамалы айырмашылығы бар. Жылжымалы терезелер соңғы оқиғаларды өңдеу үшін жаңа терезе жасамас бұрын уақыт аралығының аяқталуын күтпейді. Олар жаңа есептеулерді терезе ұзақтығынан аз күту аралығынан кейін бастайды.

Терезелердің құлдырауы мен секіруі арасындағы айырмашылықты көрсету үшін биржалық операцияларды санау мысалына оралайық. Біздің мақсатымыз әлі де транзакциялар санын санау, бірақ есептегішті жаңартудан бұрын бүкіл уақытты күткіміз келмейді. Оның орнына біз есептегішті қысқа уақыт аралығымен жаңартамыз. Мысалы, біз әлі де транзакциялар санын 20 секунд сайын санайтын боламыз, бірақ 5 секунд сайын есептегішті жаңартып отырамыз, суретте көрсетілгендей. 5.15. Бұл жағдайда біз деректердің қабаттасуы бар үш нәтиже терезесін аламыз.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Листинг 5.7 жылжымалы терезелерді анықтау кодын көрсетеді (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ішінен табылған).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Қоңырауды advanceBy() әдісіне қосу арқылы құлап бара жатқан терезені секіргіш терезеге түрлендіруге болады. Көрсетілген мысалда сақтау аралығы 15 минутты құрайды.

Сіз осы бөлімде жинақтау нәтижелерін уақыт терезелеріне қалай шектеу керектігін көрдіңіз. Атап айтқанда, осы бөлімнен келесі үш нәрсені есте сақтағаныңызды қалаймын:

  • сеанс терезелерінің өлшемі уақыт кезеңімен емес, пайдаланушы әрекетімен шектеледі;
  • «Төңкерілген» терезелер белгілі бір уақыт аралығындағы оқиғаларға шолуды қамтамасыз етеді;
  • Терезелерді көшіру ұзақтығы бекітілген, бірақ олар жиі жаңартылып отырады және барлық терезелерде бір-біріне сәйкес келетін жазбалар болуы мүмкін.

Әрі қарай, қосылым үшін KTable-ді KStream-ге қайта түрлендіру жолын үйренеміз.

5.3.3. KStream және KTable объектілерін қосу

4-тарауда біз екі KStream нысанын қосуды талқыладық. Енді KTable мен KStream-ді қосуды үйренуіміз керек. Бұл келесі қарапайым себеппен қажет болуы мүмкін. KStream — жазбалар ағыны, ал KTable — жазба жаңартуларының ағыны, бірақ кейде KTable жаңартуларын пайдаланып жазба ағынына қосымша контекст қосқыңыз келуі мүмкін.

Биржалық операциялардың саны туралы деректерді алып, оларды тиісті салаларға арналған биржа жаңалықтарымен біріктірейік. Сізде бар кодты ескере отырып, бұған қол жеткізу үшін не істеу керек.

  1. Акция операцияларының саны туралы деректері бар KTable нысанын KStream түріне түрлендіріңіз, содан кейін кілтті осы акция белгісіне сәйкес сала секторын көрсететін кілтпен ауыстырыңыз.
  2. Биржа жаңалықтары бар тақырыптағы деректерді оқитын KTable нысанын жасаңыз. Бұл жаңа KTable өнеркәсіп секторы бойынша жіктеледі.
  3. Өнеркәсіп секторы бойынша биржалық операциялардың саны туралы ақпаратпен жаңалықтар жаңартуларын қосыңыз.

Енді осы іс-шаралар жоспарын қалай жүзеге асыруға болатынын көрейік.

KTable бағдарламасын KStream түріне түрлендіру

KTable бағдарламасын KStream түріне түрлендіру үшін келесі әрекеттерді орындау қажет.

  1. KTable.toStream() әдісіне қоңырау шалыңыз.
  2. KStream.map әдісін шақыру арқылы кілтті сала атауымен ауыстырыңыз, содан кейін TransactionSummary нысанын Windowed данасынан шығарып алыңыз.

Біз бұл операцияларды төмендегідей тізбектейміз (кодты src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлынан табуға болады) (5.8 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Біз KStream.map әрекетін орындап жатқандықтан, қайтарылған KStream данасы қосылымда пайдаланылған кезде автоматты түрде қайта бөлінеді.

Біз түрлендіру процесін аяқтадық, келесіде биржалық жаңалықтарды оқу үшін KTable нысанын жасау керек.

Биржалық жаңалықтар үшін KTable құру

Бақытымызға орай, KTable нысанын жасау кодтың бір жолын ғана алады (кодты src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java ішінен табуға болады) (5.9 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Айта кету керек, ешқандай Serde нысандарын көрсету қажет емес, өйткені параметрлерде Serdes жолы пайдаланылады. Сондай-ақ, EARLIEST санау арқылы кесте ең басында жазбалармен толтырылады.

Енді біз соңғы қадамға - қосылуға көшеміз.

Жаңалықтар жаңартуларын транзакциялар саны деректерімен қосу

Байланыс құру қиын емес. Тиісті сала үшін қор жаңалықтары болмаған жағдайда сол жақ біріктіруді қолданамыз (қажетті кодты src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java файлынан табуға болады) (Листинг 5.10).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Бұл leftJoin операторы өте қарапайым. 4-тараудағы біріктірулерден айырмашылығы, JoinWindow әдісі қолданылмайды, себебі KStream-KTable біріктіруін орындау кезінде әрбір кілт үшін KTable-де бір ғана жазба бар. Мұндай байланыс уақыт бойынша шектелмейді: жазба не KTable-де, не жоқ. Негізгі қорытынды: KTable нысандарын пайдалану арқылы KStream-ді сирек жаңартылатын анықтамалық деректермен байыта аласыз.

Енді біз KStream оқиғаларын байытудың тиімді жолын қарастырамыз.

5.3.4. GlobalKTable нысандары

Көріп отырғаныңыздай, оқиғалар ағындарын байыту немесе оларға контекст қосу қажет. 4-тарауда сіз екі KStream нысаны арасындағы байланыстарды көрдіңіз, ал алдыңғы бөлімде KStream мен KTable арасындағы байланысты көрдіңіз. Осы жағдайлардың барлығында кілттерді жаңа түрге немесе мәнге салыстыру кезінде деректер ағынын қайта бөлу қажет. Кейде қайта бөлу нақты орындалады, ал кейде Kafka Streams оны автоматты түрде жасайды. Қайта бөлу қажет, себебі кілттер өзгерді және жазбалар жаңа бөлімдерде аяқталуы керек, әйтпесе қосылу мүмкін болмайды (бұл 4-тарауда, 4.2.4-тармақшадағы «Деректерді қайта бөлу» бөлімінде талқыланды).

Қайта бөлу құны бар

Қайта бөлу шығындарды талап етеді - аралық тақырыптарды құру, басқа тақырыпта қайталанатын деректерді сақтау үшін қосымша ресурс шығындары; бұл сонымен қатар осы тақырыпты жазу мен оқуға байланысты кідірістің жоғарылауын білдіреді. Оған қоса, бірнеше аспект немесе өлшем бойынша қосылу қажет болса, біріктірулерді тізбектеп, жазбаларды жаңа кілттермен салыстырып, қайта бөлу процесін қайта іске қосу керек.

Кішірек деректер жиынына қосылу

Кейбір жағдайларда қосылатын анықтамалық деректердің көлемі салыстырмалы түрде аз, сондықтан оның толық көшірмелері әр түйінге оңай орналасады. Осындай жағдайлар үшін Kafka Streams GlobalKTable сыныбын ұсынады.

GlobalKTable даналары бірегей, себебі қолданба барлық деректерді түйіндердің әрқайсысына қайталайды. Және барлық деректер әрбір түйінде болғандықтан, оқиға ағынын барлық бөлімдерге қолжетімді болуы үшін анықтамалық деректер кілті арқылы бөлудің қажеті жоқ. Сондай-ақ GlobalKTable нысандарын пайдаланып кілтсіз қосылулар жасауға болады. Бұл мүмкіндікті көрсету үшін алдыңғы мысалдардың біріне оралайық.

KStream нысандарын GlobalKTable нысандарына қосу

5.3.2-бөлімшеде біз сатып алушылар айырбастау операцияларын терезелік жинақтауды орындадық. Бұл жинақтау нәтижелері келесідей болды:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Бұл нәтижелер мақсатқа сай болғанымен, тұтынушының аты мен толық компания атауы да көрсетілсе, пайдалырақ болар еді. Тұтынушы атын және компания атын қосу үшін қалыпты біріктірулерді орындауға болады, бірақ екі негізгі салыстыруды және қайта бөлуді орындау керек. GlobalKTable көмегімен мұндай операциялардың құнын болдырмауға болады.

Ол үшін біз 5.11 листингіндегі countStream нысанын қолданамыз (тиісті кодты src/main/java/bbejeck/chapter_5/GlobalKTableExample.java ішінен табуға болады) және оны екі GlobalKTable нысанына қосамыз.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Біз бұл туралы бұрын талқылаған болатынбыз, сондықтан мен оны қайталамаймын. Бірақ оқуға ыңғайлы болу үшін toStream().map функциясындағы код кірістірілген лямбда өрнегі орнына функция нысанына абстракцияланғанын ескеремін.

Келесі қадам GlobalKTable екі данасын жариялау болып табылады (көрсетілген кодты src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлында табуға болады) (5.12 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»

Тақырып атаулары санаулы түрлер арқылы сипатталатынын ескеріңіз.

Енді бізде барлық құрамдас бөліктер дайын, тек қосылым кодын жазу ғана қалды (оны src/main/java/bbejeck/chapter_5/GlobalKTableExample.java файлынан табуға болады) (5.13 тізімі).

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Бұл кодта екі біріктіру бар болса да, олардың нәтижелерінің ешқайсысы бөлек пайдаланылмағандықтан, олар тізбектелген. Нәтижелер бүкіл операцияның соңында көрсетіледі.

Жоғарыдағы біріктіру әрекетін іске қосқанда, келесідей нәтижелерді аласыз:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Мәні өзгерген жоқ, бірақ бұл нәтижелер айқынырақ көрінеді.

Егер сіз 4-тарауға дейін кері санасаңыз, қосылымның бірнеше түрін қазірдің өзінде көрдіңіз. Олар кестеде көрсетілген. 5.2. Бұл кесте Kafka Streams 1.0.0 нұсқасындағы қосылым мүмкіндіктерін көрсетеді; Болашақ шығарылымдарда бір нәрсе өзгеруі мүмкін.

«Кафка әрекеттегі ағындар. Нақты уақыттағы жұмыс үшін қолданбалар мен микросервистер»
Қорытындылау үшін, негізгі мәліметтерді қайталап көрейік: жергілікті күйді пайдаланып оқиға ағындарын (KStream) қосуға және ағындарды жаңартуға (KTable) болады. Немесе, егер анықтамалық деректердің өлшемі тым үлкен болмаса, GlobalKTable нысанын пайдалануға болады. GlobalKTables барлық бөлімдерді әрбір Kafka Streams қолданбасының түйініне қайталайды, бұл кілт қай бөлімге сәйкес келетініне қарамастан барлық деректердің қолжетімді болуын қамтамасыз етеді.

Әрі қарай біз Кафка ағындары мүмкіндігін көреміз, соның арқасында біз Кафка тақырыбынан деректерді пайдаланбай күй өзгерістерін бақылай аламыз.

5.3.5. Сұрауға болатын күй

Біз күйге қатысты бірнеше әрекеттерді орындадық және әрқашан нәтижелерді консольге шығарамыз (әзірлеу мақсатында) немесе оларды тақырыпқа жазамыз (өндірістік мақсаттар үшін). Нәтижелерді тақырыпқа жазған кезде оларды көру үшін Кафка тұтынушысын пайдалану керек.

Бұл тақырыптардағы деректерді оқу материалданған көзқарастардың бір түрі деп санауға болады. Біздің мақсаттарымыз үшін біз Википедиядағы материалдандырылған көрініс анықтамасын пайдалана аламыз: «...сұрау нәтижелерін қамтитын физикалық дерекқор нысаны. Мысалы, бұл қашықтағы деректердің жергілікті көшірмесі немесе кестенің жолдары мен/немесе бағандарының ішкі жиыны немесе біріктіру нәтижелері немесе жинақтау арқылы алынған жиынтық кесте болуы мүмкін» (https://en.wikipedia.org/wiki) /Материалдандырылған_көрініс).

Kafka Streams сонымен қатар мемлекеттік дүкендерде интерактивті сұрауларды іске қосуға мүмкіндік береді, бұл материалдандырылған көріністерді тікелей оқуға мүмкіндік береді. Мемлекеттік дүкенге сұрау тек оқуға арналған операция екенін ескеру маңызды. Бұл қолданбаңыз деректерді өңдеу кезінде күйді кездейсоқ сәйкессіздікке айналдыру туралы алаңдамауды қамтамасыз етеді.

Мемлекеттік дүкендерді тікелей сұрау мүмкіндігі маңызды. Бұл алдымен Кафка тұтынушысынан деректерді алудың қажетінсіз бақылау тақтасы қолданбаларын жасай алатыныңызды білдіреді. Сондай-ақ ол қосымшаның тиімділігін арттырады, себебі деректерді қайта жазудың қажеті жоқ:

  • деректердің локализациясының арқасында оларға жылдам қол жеткізуге болады;
  • деректердің қайталануы жойылады, өйткені олар сыртқы жадқа жазылмайды.

Есте сақтағыңыз келетін ең бастысы - күйді қолданбаңыздан тікелей сұрауға болады. Бұл сізге беретін мүмкіндіктерді асыра бағалау мүмкін емес. Кафкадан деректерді тұтынудың және қолданбаға арналған дерекқорда жазбаларды сақтаудың орнына, бірдей нәтижемен күй қоймаларын сұрауға болады. Күй дүкендеріне тікелей сұраулар аз кодты (тұтынушы жоқ) және аз бағдарламалық құралды білдіреді (нәтижелерді сақтау үшін дерекқор кестесінің қажеті жоқ).

Біз осы тарауда біраз жайды қарастырдық, сондықтан біз мемлекеттік дүкендерге қатысты интерактивті сұрауларды талқылауды әзірге қалдырамыз. Бірақ алаңдамаңыз: 9-тарауда біз интерактивті сұраулары бар қарапайым бақылау тақтасы қолданбасын жасаймыз. Ол интерактивті сұрауларды және оларды Kafka Streams қолданбаларына қалай қосуға болатынын көрсету үшін осы және алдыңғы тараулардағы кейбір мысалдарды пайдаланады.

Резюме

  • KStream нысандары дерекқорға кірістірулермен салыстырылатын оқиғалар ағындарын білдіреді. KTable нысандары дерекқордың жаңартулары сияқты жаңарту ағындарын көрсетеді. KTable нысанының көлемі өспейді, ескі жазбалар жаңаларымен ауыстырылады.
  • KTable нысандары біріктіру операциялары үшін қажет.
  • Терезелеу әрекеттерін пайдалана отырып, жиынтық деректерді уақыт шелектеріне бөлуге болады.
  • GlobalKTable нысандарының арқасында бөлімге қарамастан, қолданбаның кез келген жерінде анықтамалық деректерге қол жеткізуге болады.
  • KStream, KTable және GlobalKTable нысандары арасындағы байланыстар мүмкін.

Осы уақытқа дейін біз жоғары деңгейлі KStream DSL көмегімен Kafka Streams қосымшаларын құруға назар аудардық. Жоғары деңгейлі тәсіл сізге ұқыпты және қысқа бағдарламаларды жасауға мүмкіндік берсе де, оны пайдалану сауда-саттықты білдіреді. DSL KStream бағдарламасымен жұмыс істеу басқару дәрежесін азайту арқылы кодтың қысқалығын арттыруды білдіреді. Келесі тарауда біз төменгі деңгейлі өңдегіш түйінінің API интерфейсін қарастырамыз және басқа да айырбастарды қолданып көреміз. Бағдарламалар бұрынғыға қарағанда ұзағырақ болады, бірақ біз қажет болуы мүмкін кез келген өңдеуші түйінін жасай аламыз.

→ Кітап туралы толық ақпаратты мына жерден табуға болады баспагердің веб-сайты

→ Хаброжители үшін купонды пайдаланып 25% жеңілдік - Кафка ағындары

→ Кітаптың қағаз нұсқасына төлем жасалғаннан кейін электронды кітап электронды поштаға жіберіледі.

Ақпарат көзі: www.habr.com

пікір қалдыру