Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu" Pozdrav, stanovnici Khabra! Ova je knjiga prikladna za svakog programera koji želi razumjeti obradu niti. Razumijevanje distribuiranog programiranja pomoći će vam da bolje razumijete Kafku i Kafkine tokove. Bilo bi lijepo znati sam Kafkin okvir, ali to nije potrebno: ​​Reći ću vam sve što trebate. Iskusni Kafka programeri i početnici u ovoj će knjizi naučiti kako stvoriti zanimljive aplikacije za obradu toka pomoću biblioteke Kafka Streams. Srednji i napredni Java programeri koji su već upoznati s konceptima poput serijalizacije naučit će primijeniti svoje vještine za stvaranje aplikacija Kafka Streams. Izvorni kod knjige napisan je u Javi 8 i značajno koristi sintaksu lambda izraza Jave 8, tako da će znanje o radu s lambda funkcijama (čak i u drugom programskom jeziku) dobro doći.

Izvod. 5.3. Operacije agregacije i prozora

U ovom ćemo odjeljku krenuti dalje s istraživanjem dijelova Kafkinih tokova koji najviše obećavaju. Do sada smo pokrili sljedeće aspekte Kafkinih tokova:

  • stvaranje topologije obrade;
  • korištenje stanja u aplikacijama za strujanje;
  • izvođenje veza toka podataka;
  • razlike između tokova događaja (KStream) i tokova ažuriranja (KTable).

U sljedećim primjerima spojit ćemo sve te elemente. Također ćete naučiti o otvaranju prozora, još jednoj sjajnoj značajci aplikacija za strujanje. Naš prvi primjer bit će jednostavno zbrajanje.

5.3.1. Agregacija prodaje zaliha po sektoru industrije

Agregacija i grupiranje vitalni su alati pri radu sa strujanjem podataka. Ispitivanje pojedinačnih zapisa kako su primljeni često je nedovoljno. Da biste izvukli dodatne informacije iz podataka, potrebno ih je grupirati i kombinirati.

U ovom primjeru obući ćete kostim dnevnog trgovca koji treba pratiti količinu prodaje dionica tvrtki u nekoliko industrija. Konkretno, zanima vas pet tvrtki s najvećom prodajom udjela u svakoj industriji.

Takvo prikupljanje će zahtijevati sljedećih nekoliko koraka za prevođenje podataka u željeni oblik (govoreći općenito).

  1. Stvorite tematski izvor koji objavljuje neobrađene informacije o trgovanju dionicama. Morat ćemo mapirati objekt tipa StockTransaction u objekt tipa ShareVolume. Radi se o tome da objekt StockTransaction sadrži metapodatke o prodaji, ali nam trebaju samo podaci o broju prodanih dionica.
  2. Grupirajte podatke ShareVolume prema simbolu dionice. Nakon grupiranja po simbolu, ove podatke možete sažeti u međuzbrojeve obujma prodaje dionica. Vrijedno je napomenuti da metoda KStream.groupBy vraća instancu tipa KGroupedStream. Instancu KTable možete dobiti daljnjim pozivanjem metode KGroupedStream.reduce.

Što je sučelje KGroupedStream

Metode KStream.groupBy i KStream.groupByKey vraćaju instancu KGroupedStream. KGroupedStream je posredni prikaz toka događaja nakon grupiranja po ključevima. Uopće nije namijenjen izravnom radu s njim. Umjesto toga, KGroupedStream se koristi za operacije združivanja, koje uvijek rezultiraju KTable. Budući da je rezultat operacija združivanja KTable i oni koriste pohranu stanja, moguće je da se sva ažuriranja kao rezultat ne šalju dalje niz cjevovod.

Metoda KTable.groupBy vraća sličnu KGroupedTable - međureprezentaciju toka ažuriranja, pregrupiranih po ključu.

Uzmimo kratku stanku i pogledajmo sl. 5.9, koji pokazuje što smo postigli. Ova topologija bi vam već trebala biti dobro poznata.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Pogledajmo sada kod za ovu topologiju (može se pronaći u datoteci src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Navedeni kod odlikuje se kratkoćom i velikim opsegom radnji koje se izvode u nekoliko redaka. Možda ćete primijetiti nešto novo u prvom parametru metode builder.stream: vrijednost tipa enum AutoOffsetReset.EARLIEST (postoji i LATEST), postavljenu pomoću metode Consumed.withOffsetResetPolicy. Ovaj tip nabrajanja može se koristiti za određivanje strategije resetiranja pomaka za svaki KStream ili KTable i ima prednost nad opcijom resetiranja pomaka iz konfiguracije.

GroupByKey i GroupBy

KStream sučelje ima dvije metode za grupiranje zapisa: GroupByKey i GroupBy. Oba vraćaju KGroupedTable, pa se možda pitate koja je razlika između njih i kada koristiti koju?

Metoda GroupByKey koristi se kada ključevi u KStreamu već nisu prazni. I što je najvažnije, oznaka "zahtijeva ponovno particioniranje" nikada nije postavljena.

Metoda GroupBy pretpostavlja da ste promijenili ključeve grupiranja, tako da je oznaka reparticije postavljena na true. Izvođenje spajanja, združivanja itd. nakon metode GroupBy rezultirat će automatskim ponovnim particioniranjem.
Sažetak: Kad god je to moguće, trebali biste koristiti GroupByKey umjesto GroupBy.

Jasno je što rade metode mapValues ​​​​i groupBy, pa pogledajmo metodu sum() (nalazi se u src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Metoda ShareVolume.sum vraća tekući ukupni volumen prodaje zaliha, a rezultat cijelog lanca izračuna je KTable objekt . Sada razumijete ulogu KTable. Kada ShareVolume objekti stignu, odgovarajući KTable objekt pohranjuje najnovije trenutno ažuriranje. Važno je zapamtiti da se sva ažuriranja odražavaju u prethodnoj tablici shareVolumeKTable, ali se ne šalju sva dalje.

Zatim koristimo ovu KTable za agregiranje (po broju dionica kojima se trguje) kako bismo došli do pet kompanija s najvećim količinama dionica kojima se trguje u svakoj industriji. Naše radnje u ovom slučaju bit će slične onima za prvo združivanje.

  1. Izvedite drugu operaciju groupBy za grupiranje pojedinačnih ShareVolume objekata prema djelatnosti.
  2. Počnite sažimati ShareVolume objekte. Ovaj put objekt združivanja je red čekanja fiksne veličine. U ovom redu čekanja fiksne veličine zadržava se samo pet tvrtki s najvećim količinama prodanih dionica.
  3. Mapirajte redove iz prethodnog odlomka na vrijednost niza i vratite prvih pet najtrgovanijih dionica po broju prema industriji.
  4. Zapišite rezultate u obliku niza u temu.

Na sl. Slika 5.10 prikazuje graf topologije protoka podataka. Kao što vidite, drugi krug obrade je prilično jednostavan.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Sada kada jasno razumijemo strukturu ovog drugog kruga obrade, možemo se okrenuti njegovom izvornom kodu (naći ćete ga u datoteci src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Ispis 5.4) .

Ovaj inicijalizator sadrži varijablu fixedQueue. Ovo je prilagođeni objekt koji je adapter za java.util.TreeSet koji se koristi za praćenje prvih N rezultata u silaznom redoslijedu dionica kojima se trguje.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Već ste vidjeli pozive groupBy i mapValues ​​​​pa nećemo ulaziti u njih (pozivamo metodu KTable.toStream jer je metoda KTable.print zastarjela). Ali još niste vidjeli KTable verziju aggregate(), pa ćemo potrošiti malo vremena na raspravu o tome.

Kao što se sjećate, ono što čini KTable drugačijim je to što se zapisi s istim ključevima smatraju ažuriranjima. KTable zamjenjuje stari unos novim. Agregacija se događa na sličan način: najnoviji zapisi s istim ključem su agregirani. Kada zapis stigne, dodaje se instanci klase FixedSizePriorityQueue pomoću zbrajača (drugi parametar u pozivu agregatne metode), ali ako već postoji drugi zapis s istim ključem, tada se stari zapis uklanja pomoću oduzimača (treći parametar u agregatni poziv metode).

To sve znači da naš agregator, FixedSizePriorityQueue, ne agregira sve vrijednosti s jednim ključem, već pohranjuje pokretni zbroj količina N vrsta dionica kojima se najviše trguje. Svaki dolazni unos sadrži ukupan broj do sada prodanih dionica. KTable će vam dati informacije o tome kojim se dionicama kompanija trenutno najviše trguje, bez potrebe za tekućim zbrajanjem svakog ažuriranja.

Naučili smo raditi dvije važne stvari:

  • grupiranje vrijednosti u KTable zajedničkim ključem;
  • izvodite korisne operacije kao što su zbrajanje i agregacija na ovim grupiranim vrijednostima.

Znati kako izvesti ove operacije važno je za razumijevanje značenja podataka koji se kreću kroz aplikaciju Kafka Streams i razumijevanje informacija koje oni nose.

Također smo sakupili neke od ključnih pojmova o kojima smo ranije govorili u ovoj knjizi. U poglavlju 4 raspravljali smo o tome koliko je lokalno stanje otporno na greške važno za aplikaciju za strujanje. Prvi primjer u ovom poglavlju pokazao je zašto je lokalno stanje toliko važno—omogućuje vam praćenje informacija koje ste već vidjeli. Lokalni pristup izbjegava mrežna kašnjenja, čineći aplikaciju učinkovitijom i otpornijom na pogreške.

Prilikom izvođenja bilo koje operacije skupljanja ili združivanja, morate navesti naziv pohrane stanja. Operacije skupljanja i agregacije vraćaju instancu KTable, a KTable koristi pohranu stanja za zamjenu starih rezultata novima. Kao što ste vidjeli, ne šalju se sva ažuriranja niz cjevovod, a to je važno jer su operacije združivanja osmišljene za proizvodnju sažetih informacija. Ako ne primijenite lokalno stanje, KTable će proslijediti sve rezultate združivanja i zbrajanja.

Zatim ćemo pogledati izvođenje operacija kao što je agregacija unutar određenog vremenskog razdoblja - takozvane operacije prozora.

5.3.2. Operacije prozora

U prethodnom odjeljku predstavili smo kliznu konvoluciju i agregaciju. Aplikacija je vršila kontinuirano roll-up prodaje dionica nakon čega je slijedilo zbrajanje pet dionica kojima se najviše trguje na burzi.

Ponekad je takvo kontinuirano prikupljanje i skupljanje rezultata potrebno. A ponekad morate izvršiti operacije samo tijekom određenog vremenskog razdoblja. Na primjer, izračunajte koliko je mjenjačkih transakcija obavljeno dionicama pojedine tvrtke u zadnjih 10 minuta. Ili koliko je korisnika kliknulo na novi reklamni banner u zadnjih 15 minuta. Aplikacija može izvoditi takve operacije više puta, ali s rezultatima koji se odnose samo na određena vremenska razdoblja (vremenski prozori).

Brojanje mjenjačkih transakcija po kupcu

U sljedećem primjeru pratit ćemo dioničke transakcije među višestrukim trgovcima—bilo velikim organizacijama ili pametnim pojedinačnim financijerima.

Postoje dva moguća razloga za ovo praćenje. Jedna od njih je potreba da se zna što tržišni lideri kupuju/prodaju. Ako ti veliki igrači i sofisticirani investitori vide priliku, ima smisla slijediti njihovu strategiju. Drugi razlog je želja da se uoče svi mogući znakovi nezakonitog trgovanja povlaštenim informacijama. Da biste to učinili, morat ćete analizirati korelaciju velikih porasta prodaje s važnim priopćenjima za javnost.

Takvo praćenje sastoji se od sljedećih koraka:

  • stvaranje streama za čitanje iz teme dioničke transakcije;
  • grupiranje dolaznih zapisa prema ID-u kupca i simbolu dionice. Pozivanje metode groupBy vraća instancu klase KGroupedStream;
  • Metoda KGroupedStream.windowedBy vraća tok podataka ograničen vremenskim okvirom, što omogućuje agregaciju u prozoru. Ovisno o vrsti prozora, vraća se ili TimeWindowedKStream ili SessionWindowedKStream;
  • broj transakcija za operaciju združivanja. Prozorski tijek podataka određuje je li određeni zapis uzet u obzir u ovom brojanju;
  • pisanje rezultata u temu ili njihovo ispisivanje na konzolu tijekom razvoja.

Topologija ove aplikacije je jednostavna, ali bila bi od pomoći njena jasna slika. Pogledajmo Sl. 5.11.

Zatim ćemo pogledati funkcionalnost prozorskih operacija i odgovarajući kod.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"

Vrste prozora

Postoje tri vrste prozora u Kafkinim tokovima:

  • sjednički;
  • “tumbanje” (tumbanje);
  • klizanje/skakanje.

Koju odabrati ovisi o vašim poslovnim zahtjevima. Prevrtanje i skakanje prozora vremenski su ograničeni, dok su prozori sesije ograničeni aktivnošću korisnika—trajanje sesije(a) određeno je isključivo time koliko je korisnik aktivan. Najvažnije je zapamtiti da se sve vrste prozora temelje na datumskim/vremenskim oznakama unosa, a ne na sistemskom vremenu.

Zatim implementiramo našu topologiju sa svakom od vrsta prozora. Cjeloviti kod bit će dan samo u prvom primjeru, za ostale tipove prozora ništa se neće promijeniti osim tipa rada prozora.

Prozori sesije

Prozori sesije vrlo su različiti od svih ostalih vrsta prozora. Oni nisu ograničeni toliko vremenom koliko aktivnošću korisnika (ili aktivnošću subjekta koji želite pratiti). Prozori sesija razgraničeni su razdobljima neaktivnosti.

Slika 5.12 ilustrira koncept prozora sesije. Manja sesija spojit će se sa sesijom s lijeve strane. A sesija s desne strane bit će odvojena jer slijedi dugo razdoblje neaktivnosti. Prozori sesija temelje se na aktivnosti korisnika, ali koriste se oznake datuma/vremena iz unosa kako bi se odredilo kojoj sesiji unos pripada.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"

Korištenje prozora sesije za praćenje dioničkih transakcija

Upotrijebimo prozore sesije za prikupljanje informacija o transakcijama razmjene. Implementacija prozora sesije prikazana je u ispisu 5.5 (koji se može pronaći u src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Već ste vidjeli većinu operacija u ovoj topologiji, tako da nema potrebe ponovno ih pregledavati. Ali ovdje postoji i nekoliko novih elemenata o kojima ćemo sada raspravljati.

Bilo koja operacija groupBy obično izvodi neku vrstu operacije združivanja (zbrajanje, zbrajanje ili brojanje). Možete izvršiti kumulativnu agregaciju s tekućim ukupnim iznosom ili agregaciju prozora, koja uzima u obzir zapise unutar određenog vremenskog okvira.

Kod u ispisu 5.5 broji broj transakcija unutar prozora sesije. Na sl. 5.13 ove radnje se analiziraju korak po korak.

Pozivanjem windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) stvaramo prozor sesije s intervalom neaktivnosti od 20 sekundi i intervalom postojanosti od 15 minuta. Interval mirovanja od 20 sekundi znači da će aplikacija uključiti svaki unos koji stigne unutar 20 sekundi od završetka ili početka trenutne sesije u trenutnu (aktivnu) sesiju.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Zatim navodimo koju operaciju združivanja treba izvršiti u prozoru sesije - u ovom slučaju brojanje. Ako dolazni unos padne izvan prozora neaktivnosti (s obje strane oznake datuma/vremena), aplikacija stvara novu sesiju. Interval zadržavanja znači održavanje sesije određeno vrijeme i dopušta kasne podatke koji nadilaze razdoblje neaktivnosti sesije, ali se još uvijek mogu priložiti. Osim toga, početak i kraj nove sesije koja proizlazi iz spajanja odgovaraju najranijoj i najkasnijoj datumskoj/vremenskoj oznaci.

Pogledajmo nekoliko unosa iz metode brojanja da vidimo kako sesije rade (tablica 5.1).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Kada stignu zapisi, tražimo postojeće sesije s istim ključem, vremenom završetka manjim od trenutne oznake datuma/vremena - interval neaktivnosti i vremenom početka većim od trenutne oznake datuma/vremena + interval neaktivnosti. Uzimajući to u obzir, četiri unosa iz tab. 5.1 spajaju se u jednu sesiju kako slijedi.

1. Zapis 1 stiže prvi, tako da je vrijeme početka jednako vremenu završetka i iznosi 00:00:00.

2. Zatim dolazi unos 2 i tražimo sesije koje završavaju najkasnije u 23:59:55 i počinju najkasnije u 00:00:35. Pronalazimo zapis 1 i kombiniramo sesije 1 i 2. Uzimamo vrijeme početka sesije 1 (ranije) i vrijeme završetka sesije 2 (kasnije), tako da naša nova sesija počinje u 00:00:00 i završava u 00:00:15 XNUMX:XNUMX.

3. Zapis 3 stiže, tražimo sesije između 00:00:30 i 00:01:10 i ne nalazimo nijednu. Dodajte drugu sesiju za ključ 123-345-654,FFBE, s početkom i završetkom u 00:00:50.

4. Stiže rekord 4 i tražimo termine između 23:59:45 i 00:00:25. Ovaj put su pronađene obje sesije 1 i 2. Sve tri sesije su spojene u jednu, s vremenom početka 00:00:00 i vremenom završetka 00:00:15.

Iz onoga što je opisano u ovom odjeljku, vrijedi zapamtiti sljedeće važne nijanse:

  • sesije nisu prozori fiksne veličine. Trajanje sesije određeno je aktivnošću unutar određenog vremenskog razdoblja;
  • Oznake datuma/vremena u podacima određuju spada li događaj unutar postojeće sesije ili tijekom razdoblja mirovanja.

Zatim ćemo razgovarati o sljedećoj vrsti prozora - "okretnim" prozorima.

"Trmbling" prozori

Pokretni prozori bilježe događaje koji padaju unutar određenog vremenskog razdoblja. Zamislite da trebate uhvatiti sve dioničke transakcije određene tvrtke svakih 20 sekundi, tako da prikupite sve događaje u tom vremenskom razdoblju. Na kraju intervala od 20 sekundi, prozor se okreće i pomiče na novi interval promatranja od 20 sekundi. Slika 5.14 ilustrira ovu situaciju.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Kao što vidite, svi događaji primljeni u zadnjih 20 sekundi uključeni su u prozor. Na kraju ovog vremenskog razdoblja kreira se novi prozor.

Ispis 5.6 prikazuje kod koji demonstrira korištenje vrtećih prozora za bilježenje dioničkih transakcija svakih 20 sekundi (nalazi se u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Uz ovu malu promjenu u pozivu metode TimeWindows.of, možete koristiti okretni prozor. Ovaj primjer ne poziva metodu until(), tako da će se koristiti zadani interval zadržavanja od 24 sata.

Konačno, vrijeme je da prijeđemo na posljednju opciju prozora - "skakutanje" prozora.

Klizni ("skačući") prozori

Klizni/skakući prozori slični su prozorima koji se okreću, ali s malom razlikom. Klizni prozori ne čekaju do kraja vremenskog intervala prije stvaranja novog prozora za obradu nedavnih događaja. Oni započinju nove izračune nakon intervala čekanja manjeg od trajanja prozora.

Da bismo ilustrirali razlike između tumbanja i skakanja prozora, vratimo se na primjer brojanja burzovnih transakcija. Naš je cilj i dalje brojati broj transakcija, ali ne želimo čekati cijelo vrijeme prije nego što ažuriramo brojač. Umjesto toga, ažurirat ćemo brojač u kraćim intervalima. Na primjer, i dalje ćemo brojati broj transakcija svakih 20 sekundi, ali ćemo ažurirati brojač svakih 5 sekundi, kao što je prikazano na slici. 5.15. U ovom slučaju, na kraju imamo tri prozora rezultata s preklapajućim podacima.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Ispis 5.7 prikazuje kod za definiranje kliznih prozora (nalazi se u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Okretni prozor može se pretvoriti u skakutajući prozor dodavanjem poziva u metodu advanceBy(). U prikazanom primjeru interval spremanja je 15 minuta.

U ovom ste odjeljku vidjeli kako ograničiti rezultate združivanja na vremenske okvire. Posebno želim da zapamtite sljedeće tri stvari iz ovog odjeljka:

  • veličina prozora sesije nije ograničena vremenskim razdobljem, već aktivnošću korisnika;
  • “tumble” prozori daju pregled događaja u određenom vremenskom razdoblju;
  • Trajanje skakajućih prozora je fiksno, ali se često ažuriraju i mogu sadržavati preklapajuće unose u svim prozorima.

Zatim ćemo naučiti kako pretvoriti KTable natrag u KStream za vezu.

5.3.3. Povezivanje KStream i KTable objekata

U poglavlju 4 raspravljali smo o povezivanju dva KStream objekta. Sada moramo naučiti kako spojiti KTable i KStream. Ovo može biti potrebno iz sljedećeg jednostavnog razloga. KStream je tok zapisa, a KTable je tok ažuriranja zapisa, ali ponekad ćete možda htjeti dodati dodatni kontekst u tok zapisa koristeći ažuriranja iz KTable.

Uzmimo podatke o broju burzovnih transakcija i kombinirajmo ih s burzovnim vijestima za relevantne industrije. Evo što trebate učiniti da biste to postigli s obzirom na kôd koji već imate.

  1. Pretvorite KTable objekt s podacima o broju dioničkih transakcija u KStream, nakon čega slijedi zamjena ključa s ključem koji označava industrijski sektor koji odgovara ovom simbolu dionice.
  2. Napravite objekt KTable koji čita podatke iz teme s burzovnim vijestima. Ova nova KTable bit će kategorizirana prema industrijskom sektoru.
  3. Povežite ažurirane vijesti s informacijama o broju burzovnih transakcija po industrijskim sektorima.

Pogledajmo sada kako provesti ovaj akcijski plan.

Pretvorite KTable u KStream

Za pretvaranje KTable u KStream morate učiniti sljedeće.

  1. Pozovite metodu KTable.toStream().
  2. Pozivanjem metode KStream.map, zamijenite ključ s nazivom industrije, a zatim dohvatite objekt TransactionSummary iz Windowed instance.

Ulančat ćemo ove operacije na sljedeći način (kod se može pronaći u datoteci src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (ispis 5.8).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Budući da izvodimo KStream.map operaciju, vraćena KStream instanca se automatski ponovno particionira kada se koristi u vezi.

Završili smo proces konverzije, zatim moramo kreirati KTable objekt za čitanje dioničkih vijesti.

Izrada KTable za burzovne vijesti

Srećom, stvaranje KTable objekta zahtijeva samo jedan red koda (kod se može pronaći u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Vrijedno je napomenuti da se ne zahtijeva navođenje Serde objekata, budući da se niz Serdes koristi u postavkama. Također, korištenjem EARLIEST enumeracije, tablica se popunjava zapisima na samom početku.

Sada možemo prijeći na posljednji korak - povezivanje.

Povezivanje ažuriranih vijesti s podacima o broju transakcija

Stvaranje veze nije teško. Koristit ćemo lijevi spoj u slučaju da nema vijesti o dionicama za relevantnu industriju (potrebni kod može se pronaći u datoteci src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Ovaj operator leftJoin prilično je jednostavan. Za razliku od spajanja u 4. poglavlju, metoda JoinWindow se ne koristi jer kada se izvodi spajanje KStream-KTable, postoji samo jedan unos u KTable za svaki ključ. Takva veza nije vremenski ograničena: zapis je ili u KTable ili ga nema. Glavni zaključak: korištenjem KTable objekata možete obogatiti KStream referentnim podacima koji se rjeđe ažuriraju.

Sada ćemo pogledati učinkovitiji način obogaćivanja događaja iz KStreama.

5.3.4. GlobalKTable objekti

Kao što vidite, postoji potreba za obogaćivanjem tokova događaja ili dodavanjem konteksta u njih. U poglavlju 4 vidjeli ste veze između dva KStream objekta, au prethodnom odjeljku vidjeli ste vezu između KStream i KTable. U svim tim slučajevima potrebno je ponovno particionirati tok podataka prilikom preslikavanja ključeva u novi tip ili vrijednost. Ponekad se reparticioniranje vrši eksplicitno, a ponekad Kafka Streams to radi automatski. Ponovno particioniranje je potrebno jer su se ključevi promijenili i zapisi moraju završiti u novim odjeljcima, inače će veza biti nemoguća (o tome je bilo riječi u 4. poglavlju, u odjeljku “Ponovno particioniranje podataka” u pododjeljku 4.2.4).

Ponovno particioniranje ima cijenu

Ponovno particioniranje zahtijeva troškove - dodatne troškove resursa za stvaranje srednjih tema, pohranjivanje duplih podataka u drugu temu; to također znači povećanu latenciju zbog pisanja i čitanja iz ove teme. Osim toga, ako se trebate pridružiti preko više od jednog aspekta ili dimenzije, morate ulančati spojeve, mapirati zapise s novim ključevima i ponovno pokrenuti proces ponovnog particioniranja.

Povezivanje s manjim skupovima podataka

U nekim slučajevima, količina referentnih podataka koje treba povezati je relativno mala, tako da njihove potpune kopije mogu lako stati lokalno na svaki čvor. Za situacije poput ove, Kafka Streams pruža klasu GlobalKTable.

Instance GlobalKTable su jedinstvene jer aplikacija replicira sve podatke na svaki od čvorova. Budući da su svi podaci prisutni na svakom čvoru, nema potrebe za particioniranjem toka događaja prema ključu referentnih podataka tako da je dostupan svim particijama. Također možete napraviti spajanja bez ključa pomoću objekata GlobalKTable. Vratimo se jednom od prethodnih primjera da demonstriramo ovu značajku.

Povezivanje KStream objekata s objektima GlobalKTable

U pododjeljku 5.3.2 izvršili smo prozorsku agregaciju mjenjačkih transakcija po kupcima. Rezultati ove agregacije izgledali su otprilike ovako:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Iako su ti rezultati poslužili svrsi, bilo bi korisnije da su prikazani i ime kupca i puni naziv tvrtke. Da biste dodali ime klijenta i naziv tvrtke, možete napraviti normalna spajanja, ali morat ćete napraviti dva preslikavanja ključeva i ponovno particioniranje. Uz GlobalKTable možete izbjeći troškove takvih operacija.

Da bismo to učinili, koristit ćemo objekt countStream iz ispisa 5.11 (odgovarajući kod se može pronaći u src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) i povezati ga s dva objekta GlobalKTable.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
O tome smo već razgovarali, pa neću ponavljati. Ali napominjem da je kod u funkciji toStream().map apstrahiran u funkcijski objekt umjesto u ugrađeni lambda izraz radi čitljivosti.

Sljedeći korak je deklarirati dvije instance GlobalKTable (prikazani kod se može naći u datoteci src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"

Imajte na umu da su nazivi tema opisani pomoću nabrojanih tipova.

Sada kada imamo sve komponente spremne, preostaje samo napisati kod za vezu (koji se može pronaći u datoteci src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Iako postoje dva spoja u ovom kodu, oni su ulančani jer se nijedan od njihovih rezultata ne koristi zasebno. Rezultati se prikazuju na kraju cijele operacije.

Kada pokrenete gornju operaciju spajanja, dobit ćete rezultate poput ovih:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Suština se nije promijenila, ali ovi rezultati izgledaju jasnije.

Ako odbrojavate do 4. poglavlja, već ste vidjeli nekoliko vrsta veza na djelu. Navedeni su u tablici. 5.2. Ova tablica odražava mogućnosti povezivanja od verzije 1.0.0 Kafka Streams; Nešto bi se moglo promijeniti u budućim izdanjima.

Knjiga “Kafkini tokovi na djelu. Aplikacije i mikroservisi za rad u stvarnom vremenu"
Da završimo stvari, ponovimo osnove: možete povezati tokove događaja (KStream) i ažurirati tokove (KTable) koristeći lokalno stanje. Alternativno, ako veličina referentnih podataka nije prevelika, možete koristiti objekt GlobalKTable. GlobalKTables replicira sve particije na svaki čvor aplikacije Kafka Streams, osiguravajući da su svi podaci dostupni bez obzira kojoj particiji ključ odgovara.

Zatim ćemo vidjeti značajku Kafka Streams, zahvaljujući kojoj možemo promatrati promjene stanja bez konzumiranja podataka iz Kafkine teme.

5.3.5. Upitno stanje

Već smo izvršili nekoliko operacija koje uključuju stanje i uvijek šaljemo rezultate na konzolu (za potrebe razvoja) ili ih pišemo u temu (za potrebe proizvodnje). Kada pišete rezultate za temu, morate koristiti Kafka potrošača da biste ih vidjeli.

Čitanje podataka iz ovih tema može se smatrati vrstom materijaliziranih pogleda. Za naše potrebe, možemo koristiti definiciju materijaliziranog pogleda iz Wikipedije: “... fizički objekt baze podataka koji sadrži rezultate upita. Na primjer, to može biti lokalna kopija udaljenih podataka, ili podskup redaka i/ili stupaca tablice ili rezultata spajanja, ili tablica sažetka dobivena agregacijom” (https://en.wikipedia.org/wiki /Materijalizirani_prikaz).

Kafka Streams vam također omogućuje pokretanje interaktivnih upita na državnim pohranama, omogućujući vam izravno čitanje ovih materijaliziranih prikaza. Važno je napomenuti da je upit prema spremištu stanja operacija samo za čitanje. To osigurava da ne morate brinuti o tome da slučajno stanje postane nedosljedno dok vaša aplikacija obrađuje podatke.

Mogućnost izravnog postavljanja upita o pohrani stanja je važna. To znači da možete izraditi aplikacije nadzorne ploče bez potrebe da prvo dohvatite podatke od Kafka potrošača. Također povećava učinkovitost aplikacije, zbog činjenice da nema potrebe za ponovnim pisanjem podataka:

  • zahvaljujući lokalnosti podataka, može im se brzo pristupiti;
  • umnožavanje podataka je eliminirano, budući da se ne zapisuju u vanjsku pohranu.

Glavna stvar koju želim da zapamtite je da možete izravno postaviti upit o stanju unutar svoje aplikacije. Prilike koje vam ovo pruža ne mogu se precijeniti. Umjesto konzumiranja podataka iz Kafke i pohranjivanja zapisa u bazu podataka za aplikaciju, možete postaviti upite pohrani stanja s istim rezultatom. Izravni upiti državnim pohranama znače manje koda (nema potrošača) i manje softvera (nema potrebe za tablicom baze podataka za pohranu rezultata).

U ovom smo poglavlju obradili dosta toga, pa ćemo za sada ostaviti raspravu o interaktivnim upitima prema državnim trgovinama. Ali ne brinite: u 9. poglavlju izradit ćemo jednostavnu aplikaciju nadzorne ploče s interaktivnim upitima. Koristit će se nekim od primjera iz ovog i prethodnih poglavlja da demonstrira interaktivne upite i kako ih možete dodati Kafka Streams aplikacijama.

Rezime

  • KStream objekti predstavljaju tokove događaja, usporedive s umetcima u bazu podataka. KTable objekti predstavljaju tokove ažuriranja, više poput ažuriranja baze podataka. Veličina KTable objekta ne raste, stari zapisi se zamjenjuju novima.
  • KTable objekti potrebni su za operacije združivanja.
  • Koristeći operacije prozora, možete podijeliti agregirane podatke u vremenske segmente.
  • Zahvaljujući GlobalKTable objektima, možete pristupiti referentnim podacima bilo gdje u aplikaciji, bez obzira na particiju.
  • Moguće su veze između KStream, KTable i GlobalKTable objekata.

Do sada smo se fokusirali na izgradnju Kafka Streams aplikacija koristeći KStream DSL visoke razine. Iako vam pristup visoke razine omogućuje stvaranje urednih i sažetih programa, njegova uporaba predstavlja kompromis. Rad s DSL KStreamom znači povećanje konciznosti vašeg koda smanjenjem stupnja kontrole. U sljedećem poglavlju pogledat ćemo API čvora rukovatelja niske razine i isprobati druge kompromise. Programi će biti duži nego što su bili prije, ali ćemo moći stvoriti gotovo bilo koji čvor rukovatelja koji nam može zatrebati.

→ Više detalja o knjizi možete pronaći na web stranica izdavača

→ Za Habrozhiteli 25% popusta korištenjem kupona - Kafkini potoci

→ Po uplati papirnate verzije knjige, elektronička knjiga bit će poslana e-poštom.

Izvor: www.habr.com

Dodajte komentar