Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər" Salam Habrites! Bu kitab axını anlamaq istəyən hər bir tərtibatçı üçün uyğundur. Paylanmış proqramlaşdırmanı başa düşmək Kafka və Kafka Axınlarını daha yaxşı başa düşməyə kömək edəcək. Kafka çərçivəsinin özünü bilmək yaxşı olardı, amma bu lazım deyil: sizə lazım olan hər şeyi söyləyəcəyəm. Təcrübəli Kafka tərtibatçıları, eləcə də yeni başlayanlar bu kitabla Kafka Axınları kitabxanasından istifadə edərək maraqlı axın proqramları qurmağı öyrənəcəklər. Serializasiya kimi anlayışlarla artıq tanış olan orta səviyyədən qabaqcıl Java tərtibatçıları öz bacarıqlarını Kafka Streams proqramlarının yaradılmasında necə tətbiq etməyi öyrənəcəklər. Kitabın mənbə kodu Java 8-də yazılmışdır və Java 8 lambda ifadə sintaksisindən çox istifadə edir, ona görə də lambda funksiyaları ilə necə işləməyi bilmək (hətta başqa proqramlaşdırma dilində də) faydalı olacaq.

Çıxarış. 5.3. Toplama və pəncərə əməliyyatları

Bu bölmədə biz Kafka axınlarının ən perspektivli hissələrini araşdırmağa davam edəcəyik. İndiyə qədər Kafka Axınlarının aşağıdakı aspektlərini əhatə etdik:

  • emal topologiyasının yaradılması;
  • axın tətbiqlərində vəziyyətdən istifadə;
  • məlumat axını əlaqələrinin həyata keçirilməsi;
  • hadisə axınları (KStream) və yeniləmə axınları (KTable) arasındakı fərqlər.

Aşağıdakı nümunələrdə bütün bu elementləri birlikdə toplayacağıq. Siz həmçinin axın tətbiqlərinin başqa bir əla xüsusiyyəti olan pəncərə əməliyyatları haqqında da öyrənəcəksiniz. İlk nümunəmiz sadə bir birləşmə olacaq.

5.3.1. Səhm satışının sənaye üzrə ümumiləşdirilməsi

Toplama və qruplaşdırma axın məlumatları ilə işləyərkən vacib alətlərdir. Fərdi qeydləri daxil olduqdan sonra araşdırmaq çox vaxt kifayət deyil. Məlumatlardan əlavə məlumat çıxarmaq üçün onları qruplaşdırmaq və birləşdirmək lazımdır.

Bu nümunədə siz bir çox sənaye üzrə şirkətlərdə səhmlərin satışını izləməli olan bir günlük treyder kimi geyinəcəksiniz. Konkret olaraq, hər bir sənayedə ən böyük pay satışına malik beş şirkətlə maraqlanırsınız.

Bu cür toplama üçün məlumatları istədiyiniz formaya (ümumi mənada) çevirmək üçün bir neçə növbəti addım tələb olunacaq.

  1. Xam fond ticarəti məlumatlarını dərc edən mövzuya əsaslanan mənbə yaradın. StockTransaction tipli obyekti ShareVolume tipli obyektlə müqayisə etməli olacağıq. Fakt budur ki, StockTransaction obyektində satış metadatası var və bizə yalnız satılan səhmlərin sayı haqqında məlumat lazımdır.
  2. ShareVolume datasını pay simvollarına görə qruplaşdırın. Simvollara görə qruplaşdırıldıqdan sonra siz bu məlumatları səhm satış həcminin ara cəminə qədər yığışdıra bilərsiniz. Qeyd edək ki, KStream.groupBy metodu KGroupedStream növünün nümunəsini qaytarır. Və siz növbəti KGroupedStream.reduce metoduna zəng edərək KTable nümunəsini əldə edə bilərsiniz.

KGroupedStream interfeysi nədir

KStream.groupBy və KStream.groupByKey metodları KGroupedStream instansiyasını qaytarır. KGroupedStream düymələr üzrə qruplaşdırıldıqdan sonra hadisələr axınının aralıq təmsilidir. Onunla birbaşa işləmək üçün nəzərdə tutulmayıb. Bunun əvəzinə, KGroupedStream həmişə KTable ilə nəticələnən toplama əməliyyatları üçün istifadə olunur. Və toplama əməliyyatlarının nəticəsi KTable olduğundan və onlar dövlət mağazasından istifadə etdikləri üçün, nəticədəki bütün yeniləmələrin boru kəməri ilə daha aşağıya göndərilməməsi mümkündür.

KTable.groupBy metodu oxşar KGroupedTable-ı qaytarır - açarla yenidən qruplaşdırılmış yeniləmələr axınının aralıq təsviri.

Qısa bir fasilə verək və Şek. 5.9, bu, nəyə nail olduğumuzu göstərir. Bu topologiya artıq sizə tanış olmalıdır.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
İndi bu topologiyanın koduna nəzər salaq (onu src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java-da tapmaq olar) (Siyahı 5.2).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Yuxarıdakı kod qısalığı və bir neçə sətirdə yerinə yetirilən böyük həcmli hərəkətləri ilə diqqət çəkir. Siz builder.stream metodunun birinci parametrində yeni bir şey görə bilərsiniz: Consumed.withOffsetResetPolicy metodundan istifadə etməklə təyin olunan AutoOffsetReset.EARLIEST sadalanan növün (LATEST də var) dəyəri. Bu sadalanan növ hər bir KStream və ya KTable üçün ofset sıfırlama strategiyasını təyin etmək üçün istifadə edilə bilər və konfiqurasiyadan ofset sıfırlama parametrindən üstünlük təşkil edir.

GroupByKey və GroupBy

KStream interfeysi qeydləri qruplaşdırmaq üçün iki üsula malikdir: GroupByKey və GroupBy. Hər ikisi KGroupedTable-ı qaytarır, buna görə də aralarındakı fərqin nə olduğunu və hansını nə vaxt istifadə edəcəyinizi maraqlandıra bilərsiniz?

GroupByKey metodu KStream-dəki açarlar artıq boş olduqda istifadə olunur. Daha da əhəmiyyətlisi, "yenidən bölmə tələb edir" bayrağı heç vaxt təyin edilməmişdir.

GroupBy metodu qruplaşdırma üçün düymələri dəyişdiyinizi güman edir, beləliklə, yenidən bölmə bayrağı doğru olaraq təyin olunur. GroupBy metodundan sonra birləşmələr, aqreqasiyalar və s. avtomatik olaraq yenidən bölünəcək.
Xülasə: Siz mümkün olduqda GroupBy deyil, GroupByKey istifadə etməlisiniz.

mapValues ​​və groupBy metodlarının etdikləri aydındır, ona görə də gəlin sum() metoduna nəzər salaq (src/main/java/bbejeck/model/ShareVolume.java) (Siyahı 5.3).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
ShareVolume.sum metodu səhm satış həcminin ara cəmini qaytarır və bütün hesablama zəncirinin nəticəsi KTable obyektidir . İndi KTable-ın hansı rol oynadığını başa düşürsünüz. ShareVolume obyektləri gəldikdə, ən son yenilənmə müvafiq KTable obyektində saxlanılır. Yadda saxlamaq lazımdır ki, bütün yeniləmələr əvvəlki shareVolumeKTable-da əks olunur, lakin hamısı daha sonra göndərilmir.

Daha sonra hər bir sənayedə səhm satışının ən yüksək həcminə malik beş şirkəti əldə etmək üçün bu KTable-dan istifadə edirik. Bu halda bizim hərəkətlərimiz birinci toplama zamanı olan hərəkətlərə bənzəyəcək.

  1. Fərdi ShareVolume obyektlərini sənaye üzrə qruplaşdırmaq üçün başqa groupBy əməliyyatını yerinə yetirin.
  2. ShareVolume obyektlərinin cəmlənməsinə davam edin. Bu dəfə toplama obyekti sabit ölçülü prioritet növbədir. Bu sabit ölçülü növbədə yalnız ən çox satılan səhmləri olan beş şirkət qalır.
  3. Növbələri əvvəlki elementdən sətir dəyərinə uyğunlaşdırın və sənaye üzrə sənaye üzrə ən çox satılan beş ehtiyatı qaytarın.
  4. Nəticələri sətir şəklində mövzuya yazın.

Əncirdə. Şəkil 5.10-da məlumat axını topologiyası qrafiki göstərilir. Gördüyünüz kimi, emalın ikinci mərhələsi olduqca sadədir.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
İndi bu ikinci emal dairəsinin strukturunu aydın başa düşərək onun mənbə koduna müraciət edə bilərik (siz onu src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java faylında tapa bilərsiniz) (Siyahı 5.4).

Bu başlatıcı sabitQueue dəyişəninə malikdir. Bu java.util.TreeSet üçün fərdi adapter obyektidir və satılan səhmlərin sayının azalan ardıcıllığı ilə ən yaxşı N nəticələrini izləmək üçün istifadə olunur.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Siz artıq groupBy və mapValues ​​çağırışlarını görmüsünüz, ona görə də biz onların üzərində dayanmayacağıq (KTable.toStream metodunu çağırırıq, çünki KTable.print metodu köhnəlmişdir). Amma siz hələ aggregate()-in KTable versiyasını görməmisiniz, ona görə də biz onu müzakirə etməyə bir az vaxt sərf edəcəyik.

Xatırladığınız kimi, KTable fərqlidir ki, eyni açarı olan qeydlər yeniləmə hesab olunur. KTable köhnə girişi yenisi ilə əvəz edir. Toplama oxşar şəkildə işləyir: eyni açarı olan ən son qeydlər birləşdirilir. Giriş gəldikdə, o, toplayıcıdan istifadə edərək FixedSizePriorityQueue sinifinin nümunəsinə əlavə edilir (ümumi metoda çağırışda ikinci parametr), lakin eyni düymə ilə başqa bir giriş artıq mövcuddursa, köhnə giriş istifadə edərək silinir. çıxarıcı (ümumi metoda çağırışda üçüncü parametr).

Bütün bunlar o deməkdir ki, aqreqatorumuz, FixedSizePriorityQueue, ümumiyyətlə, bütün dəyərləri bir açarla birləşdirmir, lakin ən çox satılan səhm növlərinin N miqdarının yuvarlanan cəmini saxlayır. Hər bir daxil olan giriş indiyə qədər satılan səhmlərin ümumi sayını ehtiva edir. KTable sizə hazırda hansı səhmlərin ən çox satıldığı barədə məlumat verəcək, hər yeniləmənin yuvarlanan aqreqasiyası tələb olunmur.

Biz iki vacib şeyi etməyi öyrəndik:

  • KTable-də dəyərləri ümumi açarı ilə qruplaşdırmaq;
  • bu qruplaşdırılmış dəyərlər üzərində konvolyutsiya və toplama kimi faydalı əməliyyatları yerinə yetirin.

Bu əməliyyatların necə yerinə yetiriləcəyini bilmək Kafka Streams tətbiqi vasitəsilə hərəkət edən məlumatların mənasını anlamaq və onun hansı məlumatı ötürdüyünü anlamaq üçün vacibdir.

Biz həmçinin bu kitabda əvvəllər müzakirə edilən bəzi əsas anlayışları bir araya gətirdik. 4-cü fəsildə biz axın tətbiqində xətaya dözümlü, yerli vəziyyətin vacibliyini müzakirə etdik. Bu fəsildəki ilk nümunə yerli dövlətin niyə bu qədər vacib olduğunu göstərdi - bu, sizə artıq gördüyünüz məlumatları izləmək imkanı verir. Yerli giriş şəbəkə gecikmələrinin qarşısını alır, tətbiqi daha performanslı və səhvlərə dözümlü edir.

Hər hansı toplama və ya toplama əməliyyatını yerinə yetirərkən, dövlət mağazasının adını göstərməlisiniz. Qatlama və toplama əməliyyatları KTable nümunəsini qaytarır və KTable köhnə nəticələri yeniləri ilə əvəz etmək üçün dövlət anbarından istifadə edir. Gördüyünüz kimi, bütün yeniləmələr boru kəməri ilə daha da aşağı göndərilmir və bu vacibdir, çünki toplama əməliyyatları ümumi məlumat əldə etmək üçün nəzərdə tutulub. Yerli dövlətə müraciət etməsəniz, KTable bütün toplama və azalma nəticələrini daha da göndərəcək.

Sonra, müəyyən bir müddət ərzində toplama kimi əməliyyatların yerinə yetirilməsinə baxacağıq - sözdə pəncərə əməliyyatları (pəncərələmə əməliyyatları).

5.3.2. Pəncərə əməliyyatları

Əvvəlki bölmədə biz "sürüşmə" konvolusiya və aqreqasiya ilə tanış olduq. Tətbiq birjada ən çox satılan beş səhmin birləşdirilməsini izləyən səhm satışlarının həcmini davamlı olaraq artırdı.

Bəzən nəticələrin belə davamlı birləşməsi və konvoyasiyası lazımdır. Və bəzən əməliyyatları yalnız müəyyən bir müddət ərzində yerinə yetirmək lazımdır. Məsələn, son 10 dəqiqə ərzində müəyyən bir şirkətin səhmləri ilə neçə mübadilə əməliyyatı aparıldığını hesablamaq üçün. Və ya son 15 dəqiqə ərzində nə qədər istifadəçi yeni banner reklamına kliklədi. Tətbiq bu cür əməliyyatları təkrar-təkrar yerinə yetirə bilər, lakin nəticələr yalnız müəyyən edilmiş vaxt intervallarına (vaxt pəncərələri) aid edilir.

Mübadilə əməliyyatlarının alıcı tərəfindən hesablanması

Aşağıdakı nümunədə biz bir neçə treyder üçün birja əməliyyatlarını izləyəcəyik - ya böyük təşkilatlar, ya da ağıllı tək maliyyəçilər.

Bu izləmənin iki mümkün səbəbi var. Onlardan biri bazar liderlərinin nə aldığını/satdığını bilmək ehtiyacıdır. Bu böyük oyunçular və inkişaf etmiş investorlar özləri üçün bir fürsət görürlərsə, onların strategiyasına riayət etmək mənasızdır. İkinci səbəb daxili məlumatlardan istifadə edərək qeyri-qanuni əməliyyatların mümkün əlamətlərini görmək istəyidir. Bunun üçün böyük satış artımlarının mühüm press-relizlərlə əlaqəsini təhlil etməli olacaqsınız.

Bu izləmə aşağıdakı addımlardan ibarətdir:

  • səhm əməliyyatları mövzusundan oxumaq üçün mövzu yaratmaq;
  • alıcı ID və fond simvolu ilə daxil olan qeydləri qruplaşdırmaq. groupBy metodunun çağırılması KGroupedStream sinifinin nümunəsini qaytarır;
  • KGroupedStream.windowedBy metodu ilə pəncərəli birləşməyə imkan verən zaman pəncərəsi ilə məhdudlaşan məlumat axınını qaytarın. Pəncərə növündən asılı olaraq ya TimeWindowedKStream, ya da SessionWindowedKStream qaytarılır;
  • toplama əməliyyatı üçün əməliyyat sayı. Pəncərə məlumat axını müəyyən qeydin bu saya daxil olub-olmadığını müəyyən edir;
  • mövzuya nəticələri yazmaq və ya dizayn zamanı konsola çıxarmaq.

Bu tətbiqin topologiyası sadədir, lakin onun vizual şəkli zərər vermir. Əncirə baxaq. 5.11.

Sonra, pəncərə əməliyyatlarının funksionallığına və müvafiq kodu nəzərdən keçirəcəyik.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"

Pəncərə növləri

Kafka axınlarında üç növ pəncərə var:

  • sessiya;
  • "yürümək" (yıxılma);
  • sürüşmə / "atlama" (sürüşmə / atlama).

Hansı birini seçmək biznesin tələblərindən asılıdır. Dönmə və tullanan pəncərələr vaxtla məhdudlaşır, seans pəncərələri isə vaxta bağlıdır, seans(lar)ın müddəti yalnız istifadəçinin nə qədər aktiv olması ilə müəyyən edilir. Xatırlamaq lazım olan əsas odur ki, bütün pəncərə növləri sistem vaxtına deyil, girişlərin tarix/saat möhürlərinə əsaslanır.

Sonra, topologiyamızı pəncərə tiplərinin hər biri ilə həyata keçiririk. Tam kod yalnız birinci nümunədə göstəriləcək, pəncərə əməliyyatının növü istisna olmaqla, digər pəncərə növləri üçün heç nə dəyişməyəcək.

Sessiya pəncərələri

Sessiya pəncərələri bütün digər növ pəncərələrdən çox fərqlidir. Onlar istifadəçi fəaliyyəti (və ya izləmək istədiyiniz qurumun fəaliyyəti) ilə məhdudlaşır. Sessiya pəncərələri fəaliyyətsizlik dövrləri ilə ayrılır.

Şəkil 5.12-də sessiya pəncərələri konsepsiyası təsvir edilmişdir. Daha kiçik sessiya sol tərəfdəki sessiya ilə birləşəcək. Sağdakı seans isə uzun müddət fəaliyyətsiz qaldığı üçün ayrı olacaq. Sessiya pəncərələri istifadəçi hərəkətlərinə əsaslanır, lakin girişin hansı seansa aid olduğunu müəyyən etmək üçün qeydlərdəki tarix/saat möhürlərindən istifadə edin.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"

Mübadilə əməliyyatlarını izləmək üçün sessiya pəncərələrindən istifadə

Mübadilə əməliyyatları haqqında məlumat əldə etmək üçün sessiya pəncərələrindən istifadə edək. Sessiya pəncərələrinin tətbiqi Siyahı 5.5-də göstərilmişdir (onu src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java-da tapmaq olar).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Siz bu topologiyanın əksər əməliyyatlarını artıq görmüsünüz, ona görə də onları burada yenidən nəzərdən keçirməyə ehtiyac yoxdur. Ancaq burada bir neçə yeni element də var ki, biz onları indi müzakirə edəcəyik.

İstənilən groupBy əməliyyatı adətən bir növ toplama əməliyyatını yerinə yetirir (birləşdirmə, toplama və ya sayma). Siz ya işləyən ümumi məcmu aqreqasiyanı, ya da müəyyən bir zaman pəncərəsində qeydləri nəzərə alan pəncərəli aqreqasiyanı həyata keçirə bilərsiniz.

Siyahı 5.5-dəki kod sessiya pəncərələrindəki əməliyyatların sayını hesablayır. Əncirdə. 5.13 Bu hərəkətlər addım-addım təhlil edilir.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) çağırmaqla biz 20 saniyəlik yuxu intervalı və 15 dəqiqəlik qənaət intervalı ilə sessiya pəncərəsi yaradırıq. 20 saniyəlik boş interval o deməkdir ki, tətbiq cari (aktiv) sessiyada cari sessiyanın bitməsindən və ya başlamasından 20 saniyə ərzində daxil olan hər hansı girişi daxil edəcək.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Sonra, biz seans pəncərəsində hansı toplama əməliyyatının yerinə yetiriləcəyini müəyyənləşdiririk - bu halda sayın. Əgər daxil olan yazı boş intervaldan kənara çıxarsa (tarix/vaxt möhürünün hər iki tərəfində), onda proqram yeni sessiya yaradır. Davamlılıq intervalı sessiyanın müəyyən müddət ərzində canlı saxlanılması deməkdir və sessiyanın boş müddətindən kənara çıxan, lakin hələ də əlavə edilə bilən gec məlumatlara imkan verir. Bundan əlavə, birləşmə nəticəsində yaranan yeni sessiyanın başlanğıcı və sonu ən erkən və ən son tarix/saat möhürlərinə uyğun gəlir.

Sessiyaların necə işlədiyini görmək üçün sayma metodundan bir neçə qeydə baxaq (Cədvəl 5.1).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Qeydlər gəldikdə, biz eyni açarla artıq mövcud sessiyaları axtarırıq, bitmə vaxtı cari tarix/vaxt möhüründən azdır - boş interval və cari tarix/vaxt möhürü + boş intervaldan daha böyük başlama vaxtı. Bunu nəzərə alaraq, Cədvəldən dörd giriş. 5.1 aşağıdakı kimi bir seansa birləşdirin.

1. 1-ci qeyd birinci gəlir, ona görə də başlanğıc vaxtı bitmə vaxtına bərabərdir və 00:00:00-dır.

2. Sonra 2-ci rekord gəlir və biz 23:59:55-dən tez bitməyən və 00:00:35-dən gec olmayaraq başlayan sessiyaları axtarırıq. Biz rekord 1 tapırıq və 1 və 2-ci sessiyaları birləşdiririk. Biz 1-ci sessiyanın başlama vaxtını (əvvəllər) və 2-ci sessiyanın bitmə vaxtını (sonra) götürürük, beləliklə, yeni sessiyamız 00:00:00-da başlayır və 00:00-da bitir. :15.

3. Record 3 gəlir, biz 00:00:30 ilə 00:01:10 arasında seanslar axtarırıq və heç birini tapmırıq. 123-345-654,FFBE açarı üçün saat 00:00:50-də başlayan və bitən ikinci sessiya əlavə edirik.

4. Rekord 4 gəlir və biz 23:59:45 və 00:00:25 arasında seanslar axtarırıq. Bu dəfə hər iki seans tapıldı - 1 və 2. Hər üç seans 00:00:00 başlama vaxtı və 00:00:15 bitmə vaxtı ilə bir yerdə birləşdirilir.

Bu bölmədə nəzərə alınmalı olan bəzi vacib şeylər bunlardır:

  • sessiyalar sabit ölçülü pəncərələr deyil. Sessiyanın müddəti müəyyən bir müddət ərzindəki fəaliyyətlə müəyyən edilir;
  • məlumatdakı tarix/saat ştampları hadisənin mövcud sessiyaya və ya boş dövrə düşdüyünü müəyyən edir.

Bundan sonra, növbəti pəncərə növünü - "yıxılan" pəncərələri müzakirə edəcəyik.

"Yürür" pəncərələr

"Tumbling" pəncərələri müəyyən bir müddətə düşən hadisələri çəkir. Təsəvvür edin ki, hər 20 saniyədən bir müəyyən bir şirkətin bütün mübadilə əməliyyatlarını tutmalısınız, buna görə də bu müddət üçün bütün hadisələri toplamırsınız. 20 saniyəlik intervalın sonunda pəncərə "yıxılır" və yeni 20 saniyəlik müşahidə intervalına keçir. Şəkil 5.14 bu vəziyyəti təsvir edir.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Göründüyü kimi, son 20 saniyə ərzində alınan bütün hadisələr pəncərəyə daxil edilir. Bu müddətdən sonra yeni bir pəncərə yaradılır.

Siyahı 5.6 hər 20 saniyədən bir mübadilə əməliyyatlarını ələ keçirmək üçün yuvarlanan pəncərələrin istifadəsini nümayiş etdirən kodu göstərir (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-da tapılıb).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
TimeWindows.of metoduna edilən çağırışda edilən bu kiçik dəyişikliklə, dönmə pəncərəsindən istifadə etmək olar. Bu nümunədə, qədər() metoduna heç bir çağırış yoxdur, ona görə də defolt saxlama intervalı 24 saat istifadə olunacaq.

Nəhayət, pəncərələri atlayaraq, pəncərə seçimlərinin sonuncusuna keçməyin vaxtı gəldi.

Sürüşən ("atlanan") pəncərələr

Sürüşən/hoppanan pəncərələr yuvarlanan pəncərələrə bənzəyir, lakin cüzi fərqlə. Sürüşən pəncərələr son hadisələri idarə etmək üçün yeni pəncərə yaratmazdan əvvəl vaxt intervalının keçməsini gözləmir. Onlar yeni hesablamalara pəncərənin müddətindən az olan fasilədən sonra başlayırlar.

"Tumbling" və "atlanan" pəncərələr arasındakı fərqləri göstərmək üçün birja əməliyyatlarının hesablanması nümunəsinə qayıdaq. Məqsədimiz hələ də tranzaksiyaların sayını hesablamaqdır, lakin sayğacı yeniləmədən əvvəl hər zaman gözləmək istəmirik. Əvəzində sayğacı daha qısa fasilələrlə yeniləyəcəyik. Məsələn, biz hələ də hər 20 saniyədən bir əməliyyatların sayını sayacağıq, lakin Şəkil 5-də göstərildiyi kimi sayğacı hər 5.15 saniyədən bir yeniləyin. XNUMX. Bu halda, üst-üstə düşən məlumatları olan üç nəticə pəncərəmiz var.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Siyahı 5.7 sürüşən pəncərələrin qurulması üçün kodu göstərir (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-da tapılıb).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Aparılan pəncərəni advanceBy() metoduna zəng əlavə etməklə sıçrayan pəncərəyə çevirmək olar. Göstərilən nümunədə qənaət intervalı 15 dəqiqədir.

Siz bu bölmədə birləşdirmənin nəticələrini vaxt pəncərələri ilə necə məhdudlaşdıracağınızı gördünüz. Xüsusilə, bu bölmədən aşağıdakı üç şeyi xatırlamağınızı istərdim:

  • sessiya pəncərələrinin ölçüsü müəyyən bir müddətlə deyil, istifadəçi fəaliyyəti ilə məhdudlaşır;
  • "Tumble" pəncərələri müəyyən bir müddət ərzində baş verən hadisələr haqqında fikir verir;
  • "Atlanan" pəncərələrin müddəti sabitdir, lakin onlar tez-tez yenilənir və bütün pəncərələrdə üst-üstə düşən qeydlər ola bilər.

Sonra, əlaqə üçün KTable-ni yenidən KStream-ə çevirməyi öyrənəcəyik.

5.3.3. KStream və KTable obyektlərinin birləşdirilməsi

4-cü fəsildə biz iki KStream obyektini birləşdirməyi müzakirə etdik. İndi KTable və KStream-i necə birləşdirməyi öyrənməliyik. Bu, aşağıdakı sadə səbəbə görə lazım ola bilər. KStream rekord axındır və KTable rekord yeniləmə axınıdır, lakin bəzən KTable-dan yeniləmələrlə qeyd axınına əlavə kontekst əlavə etmək lazım ola bilər.

Birja əməliyyatlarının sayı haqqında məlumatları götürək və onları müvafiq sənayelər üçün birja xəbərləri ilə birləşdirək. Artıq əldə etdiyiniz kodu nəzərə alaraq, buna nail olmaq üçün nə etməlisiniz.

  1. Mübadilə əməliyyatlarının sayı haqqında məlumatları olan KTable obyektini KStream-ə çevirin və sonra açarı verilmiş fond simvoluna uyğun sənayeni bildirən açarla əvəz edin.
  2. Mübadilə xəbərləri mövzusundan məlumatları oxuyan KTable obyekti yaradın. Bu yeni KTable sənaye üzrə kateqoriyalara bölünəcək.
  3. Xəbər yeniləmələrini sənaye üzrə mübadilə əməliyyatlarının sayı haqqında məlumatla birləşdirin.

İndi gəlin bu fəaliyyət planını necə həyata keçirəcəyimizə baxaq.

KTable KStream-ə çevrilir

KTable-ni KStream-ə çevirmək üçün aşağıdakıları etməlisiniz.

  1. KTable.toStream() metoduna zəng edin.
  2. KStream.map metoduna zəng etməklə açarı sənayenin adı ilə əvəz edin və sonra Windowed instansiyasından TransactionSummary obyektini əldə edin.

Biz bu əməliyyatları aşağıdakı kimi zəncirləyəcəyik (kodu src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-da tapmaq olar) (Siyahı 5.8).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Biz KStream.map əməliyyatını yerinə yetirdiyimiz üçün, qaytarılmış KStream nümunəsi üçün yenidən bölmələr o, əlaqədə istifadə edildikdə avtomatik olaraq həyata keçirilir.

Dönüşüm prosesini tamamladıq, bundan sonra birja xəbərlərini oxumaq üçün KTable obyekti yaratmalıyıq.

Birja xəbərləri üçün KTable yaratmaq

Xoşbəxtlikdən, KTable obyektini yaratmaq üçün bir kod sətri kifayətdir (bu kodu src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-da tapmaq olar) (Listinq 5.9).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Qeyd etmək lazımdır ki, heç bir Serde obyekti tələb olunmur, çünki parametrlərdə Serde sətri istifadə olunur. Həmçinin, EARLIEST sadalamanın istifadəsi sayəsində cədvəl ən başlanğıcda qeydlərlə doldurulur.

İndi son mərhələyə - əlaqəyə keçə bilərik.

Xəbər yeniləmələrinin əməliyyatların sayı məlumatı ilə əlaqələndirilməsi

Bağlantı yaratmaq çətin deyil. Müvafiq sənaye üçün birja xəbərləri olmadığı halda sola birləşmədən istifadə edəcəyik (kodu src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-da tapa bilərsiniz) (Siyahı 5.10-XNUMX).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Bu leftJoin operatoru olduqca sadədir. 4-cü fəsildəki birləşmələrdən fərqli olaraq, JoinWindow metodu istifadə edilmir, çünki KStream-KTable birləşməsi həyata keçirildikdə, KTable-da hər bir açar üçün yalnız bir giriş olur. Belə bir əlaqə zamanla məhdudlaşmır: giriş ya KTable-də mövcuddur, ya da yoxdur. Əsas götürmə: KTable obyektlərindən istifadə edərək, siz KStream-i daha az yenilənən istinad məlumatları ilə zənginləşdirə bilərsiniz.

İndi biz KStream-dən hadisələri zənginləşdirməyin daha səmərəli yoluna baxacağıq.

5.3.4. GlobalKTable obyektləri

Anladığınız kimi, hadisə axınlarını zənginləşdirməyə və ya onlara kontekst əlavə etməyə ehtiyac var. 4-cü fəsildə siz iki KStream obyektinin, əvvəlki bölmədə isə KStream və KTable-nin əlaqəsini gördünüz. Bütün bu hallarda açarları yeni bir növə və ya dəyərə uyğunlaşdırarkən məlumat axınını yenidən bölmək lazımdır. Bəzən yenidən bölmələr açıq şəkildə həyata keçirilir və bəzən Kafka Streams bunu avtomatik edir. Yenidən bölmə zəruridir, çünki açarlar dəyişib və qeydlər yeni bölmələrdə bitməlidir, əks halda birləşmə mümkün olmayacaq (bu, Bölmə 4-də "Məlumatların yenidən bölüşdürülməsi" 4.2.4-cü Fəsildə müzakirə edilmişdir).

Yenidən bölmənin qiyməti var

Yenidən bölmək bir baha başa gəlir - aralıq mövzuların yaradılması, dublikat məlumatların başqa bir mövzuda saxlanması üçün əlavə resurs xərcləri; bu həm də həmin mövzuya yazmaq və ondan oxumaq səbəbindən artan gecikmə deməkdir. Həmçinin, birdən çox aspekt və ya ölçüyə qoşulmaq istəyirsinizsə, zəncirvari birləşmə, qeydləri yeni açarlarla xəritələndirməli və yenidən bölmə prosesini yenidən işə salmalısınız.

Daha kiçik verilənlər bazalarına qoşulma

Bəzi hallarda, əlaqənin planlaşdırıldığı istinad məlumatlarının miqdarı nisbətən kiçikdir, belə ki, onun tam nüsxələri qovşaqların hər birində yerli olaraq yaxşı uyğunlaşa bilər. Belə vəziyyətlər üçün Kafka Streams GlobalKTable sinfini təqdim edir.

GlobalKTable nümunələri unikaldır, çünki proqram bütün məlumatları qovşaqların hər birinə təkrarlayır. Və qovşaqların hər birində bütün məlumatlar olduğundan, hadisə axınının bütün bölmələr üçün əlçatan olması üçün istinad məlumat açarı ilə bölməyə ehtiyac yoxdur. Siz həmçinin GlobalKTable obyektlərindən istifadə edərək açarsız əlaqə qura bilərsiniz. Bu ehtimalı nümayiş etdirmək üçün əvvəlki nümunələrdən birinə qayıdaq.

KStream obyektlərinin GlobalKTable obyektlərinə qoşulması

5.3.2-ci yarımbənddə biz alıcılar tərəfindən mübadilə əməliyyatlarının pəncərəli aqreqasiyasını həyata keçirdik. Bu birləşmənin nəticələri belə görünürdü:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Bu nəticələr nəzərdə tutulan məqsədə xidmət etsə də, müştərinin adı və tam şirkət adı da göstərilsəydi, daha rahat olardı. Müştəri adı və şirkət adını əlavə etmək üçün siz normal birləşmələr edə bilərsiniz, lakin siz iki əsas xəritələmə və yenidən bölmə etməli olacaqsınız. GlobalKTable ilə siz bu cür əməliyyatların xərcindən qaça bilərsiniz.

Bunun üçün biz 5.11-5-ci Siyahıdakı countStream obyektindən istifadə edəcəyik (uyğun kodu src/main/java/bbejeck/chapter_XNUMX/GlobalKTableExample.java-da tapmaq olar) və onu iki GlobalKTable obyektinə birləşdirəcəyik.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Biz bunu əvvəllər müzakirə etmişik, ona görə də özümü təkrar etməyəcəm. Lakin nəzərə alın ki, toStream().map funksiyasındakı kod daxili lambda ifadəsi əvəzinə oxunaqlılıq üçün funksiya obyektinə abstraksiya olunub.

Növbəti addım iki GlobalKTable instansiyasını elan etməkdir (bu kodu src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-da tapmaq olar) (siyahı 5.12-XNUMX).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"

Qeyd edək ki, mövzu adları sadalanan növlərdən istifadə etməklə təsvir olunur.

İndi bütün komponentləri hazır etdikdən sonra yalnız əlaqə kodunu yazmaq qalır (onu src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-da tapa bilərsiniz) (Siyahı 5.13-XNUMX).

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Bu kodda iki birləşmə olsa da, onların nəticələrinin heç biri ayrıca istifadə edilmədiyi üçün onlar zəncir kimi təşkil edilmişdir. Nəticələr bütün əməliyyatın sonunda göstərilir.

Yuxarıdakı birləşmə əməliyyatını icra etdikdə belə nəticələr əldə edəcəksiniz:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Mahiyyət dəyişməyib, lakin bu nəticələr daha başa düşülən görünür.

Fəsil 4-ü sayırsınızsa, siz artıq bir neçə növ birləşmənin hərəkətdə olduğunu görmüsünüz. Onlar Cədvəldə verilmişdir. 5.2. Bu cədvəl Kafka Axınlarının 1.0.0 versiyasına olan əlaqəni əks etdirir; gələcək buraxılışlarda hər şey dəyişə bilər.

Kitab "Kafka hərəkətdədir. Real vaxt rejimində iş üçün proqramlar və mikroservislər"
Sonda sizə əsas şeyi xatırlatmaq istərdim: yerli vəziyyətdən istifadə edərək hadisə axınlarını (KStream) birləşdirə və axınları yeniləyə bilərsiniz (KTable). Bundan əlavə, əgər istinad məlumatının ölçüsü çox böyük deyilsə, siz GlobalKTable obyektindən istifadə edə bilərsiniz. GlobalKTable bütün bölmələri Kafka Streams tətbiqinin hər qovşağına təkrarlayır, beləliklə açarın hansı bölməyə uyğun olmasından asılı olmayaraq bütün məlumatların mövcud olmasını təmin edir.

Bundan sonra biz Kafka mövzusundan məlumat istehlak etmədən vəziyyət dəyişikliklərini müşahidə etməyə imkan verən Kafka Axınları funksiyasını görəcəyik.

5.3.5. Tələb olunan vəziyyət

Biz əvvəllər bir neçə vəziyyətə uyğun əməliyyatlar etmişik və həmişə nəticələri konsola daxil etmişik (inkişaf məqsədləri üçün) və ya onları mövzuya daxil etmişik (istehsal məqsədləri üçün). Mövzuya nəticələr yazarkən onlara baxmaq üçün Kafka istehlakçısından istifadə etməlisiniz.

Bu mövzulardan məlumatların oxunması bir növ maddiləşmiş baxış sayıla bilər. Tapşırıqlarımız üçün Vikipediyadan materiallaşdırılmış görünüşün tərifindən istifadə edə bilərik: “... sorğunun nəticələrini ehtiva edən fiziki verilənlər bazası obyekti. Məsələn, bu, uzaq məlumatların yerli nüsxəsi və ya cədvəlin və ya birləşmə nəticəsinin sətir və/yaxud sütunlarının alt çoxluğu və ya birləşmə nəticəsində əldə edilmiş pivot cədvəl ola bilər” (https://en.wikipedia.org/wiki) /Materiallaşdırılmış_baxış).

Kafka Streams həmçinin dövlət mağazalarına qarşı interaktiv sorğulara imkan verir ki, bu da bu materiallaşdırılmış görünüşləri birbaşa oxumağa imkan verir. Qeyd etmək vacibdir ki, dövlət mağazasına edilən sorğu yalnız oxumaq üçün nəzərdə tutulmuş əməliyyatdır. Bu, proqram məlumatları emal edərkən vəziyyəti təsadüfən uyğunsuzlaşdırmaqdan narahat olmamağınızı təmin edir.

Dövlət mağazalarını birbaşa sorğulamaq bacarığı vacibdir. Bu o deməkdir ki, əvvəlcə Kafka istehlakçısından məlumat almadan tablosuna tətbiqlər yarada bilərsiniz. Yenidən məlumat yazmağa ehtiyac olmadığı üçün tətbiqin səmərəliliyini də artırır:

  • məlumatların lokallığına görə onlara tez daxil olmaq olar;
  • məlumatların təkrarlanması aradan qaldırılır, çünki onlar xarici yaddaşa yazılmır.

Yadda saxlamağınızı istərdim əsas odur ki, dövləti birbaşa tətbiqdən sorğulaya bilərsiniz. Bunun sizə verdiyi imkanları çox qiymətləndirmək olmaz. Kafkadan məlumatları istehlak etmək və proqram üçün verilənlər bazasında qeydlər saxlamaq əvəzinə, eyni nəticə ilə dövlət mağazalarını sorğulaya bilərsiniz. Dövlət mağazalarını birbaşa sorğulamaq daha az kod (istehlakçı yoxdur) və daha az proqram təminatı (nəticələri saxlamaq üçün verilənlər bazası cədvəlinə ehtiyac yoxdur) deməkdir.

Biz bu fəsildə çoxlu məlumatı əhatə etdik, ona görə də dövlət mağazalarında interaktiv sorğuların müzakirəsini bir anlığa dayandıracağıq. Ancaq narahat olmayın: 9-cu Fəsildə biz interaktiv sorğularla sadə tablo tətbiqi yaradacağıq. O, interaktiv sorğuları və onları Kafka Streams proqramlarına necə əlavə etməyi nümayiş etdirmək üçün bu və əvvəlki fəsillərdəki bəzi nümunələrdən istifadə edəcək.

Xülasə

  • KStream obyektləri verilənlər bazası əlavələri ilə müqayisə edilə bilən hadisələr axınlarını təmsil edir. KTable obyektləri daha çox verilənlər bazasındakı yeniləmələr kimi yeniləmə axınlarını təmsil edir. KTable obyektinin ölçüsü böyümür, köhnə qeydlər yeniləri ilə əvəz olunur.
  • KTable obyektləri toplama əməliyyatları üçün tələb olunur.
  • Pəncərələmə, yığılmış məlumatları vaxt qutularına bölməyə imkan verir.
  • GlobalKTable obyektləri sayəsində istinad məlumatlarına bölmədən asılı olmayaraq tətbiqin istənilən yerindən daxil olmaq olar.
  • KStream, KTable və GlobalKTable obyektləri arasında əlaqə mümkündür.

İndiyədək biz yüksək səviyyəli DSL KStream-dən istifadə edərək Kafka Streams proqramlarının yaradılmasına diqqət yetirmişik. Yüksək səviyyəli yanaşma səliqəli və qısa proqramlar yaratmağa imkan versə də, onun istifadəsi müəyyən bir kompromis deməkdir. DSL KStream ilə işləmək daha az nəzarət hesabına kodu daha yığcam etmək deməkdir. Növbəti fəsildə biz aşağı səviyyəli işləyici node API-yə baxacağıq və digər güzəştləri sınayacağıq. Proqramlar indiyə qədər olduğundan daha uzun olacaq, lakin biz ehtiyac duya biləcəyimiz demək olar ki, istənilən işləyici qovşağını yarada biləcəyik.

→ Kitab haqqında ətraflı məlumatı buradan əldə edə bilərsiniz nəşriyyat saytı

→ Khabrozhiteli üçün kuponda 25% endirim - Kafka çayları

→ Kitabın kağız variantı üçün ödəniş edildikdən sonra e-kitab e-poçta göndərilir.

Mənbə: www.habr.com

Добавить комментарий