Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu" Zdravo, stanovnici Khabro! Ova knjiga je pogodna za sve programere koji žele razumjeti obradu niti. Razumijevanje distribuiranog programiranja pomoći će vam da bolje razumijete Kafku i Kafka Streams. Bilo bi lijepo znati sam Kafkin okvir, ali to nije neophodno: reći ću vam sve što vam treba. Iskusni Kafka programeri i početnici će naučiti kako da kreiraju zanimljive aplikacije za obradu streamova koristeći biblioteku Kafka Streams u ovoj knjizi. Srednji i napredni Java programeri koji su već upoznati s konceptima poput serijalizacije naučit će primijeniti svoje vještine za kreiranje Kafka Streams aplikacija. Izvorni kod knjige je napisan u Javi 8 i značajno koristi sintaksu Java 8 lambda izraza, tako da će poznavanje rada sa lambda funkcijama (čak i u drugom programskom jeziku) dobro doći.

Izvod. 5.3. Operacije agregacije i prozora

U ovom odeljku ćemo preći na istraživanje delova Kafkinih tokova koji najviše obećavaju. Do sada smo pokrili sljedeće aspekte Kafkinih tokova:

  • kreiranje topologije obrade;
  • korištenje stanja u streaming aplikacijama;
  • izvođenje povezivanja toka podataka;
  • razlike između tokova događaja (KStream) i tokova ažuriranja (KTable).

U sljedećim primjerima spojit ćemo sve ove elemente. Također ćete naučiti o prozoru, još jednoj sjajnoj osobini streaming aplikacija. Naš prvi primjer će biti jednostavno agregiranje.

5.3.1. Agregacija prodaje dionica po industrijskim sektorima

Agregacija i grupisanje su vitalni alati kada radite sa streaming podacima. Ispitivanje pojedinačnih zapisa kako su primljeni često je nedovoljno. Da biste izvukli dodatne informacije iz podataka, potrebno ih je grupirati i kombinovati.

U ovom primjeru ćete obući kostim dnevnog trgovca koji treba da prati obim prodaje dionica kompanija u nekoliko industrija. Konkretno, zanima vas pet kompanija s najvećim udjelom u prodaji u svakoj djelatnosti.

Takvo agregiranje će zahtijevati nekoliko sljedećih koraka kako bi se podaci preveli u željeni oblik (uopšteno govoreći).

  1. Kreirajte izvor zasnovan na temama koji objavljuje sirove informacije o trgovanju dionicama. Morat ćemo mapirati objekt tipa StockTransaction u objekt tipa ShareVolume. Poenta je da objekat StockTransaction sadrži metapodatke o prodaji, ali su nam potrebni samo podaci o broju akcija koje se prodaju.
  2. Grupirajte podatke o količini dionica prema simbolu dionice. Kada se grupišu po simbolu, ove podatke možete sažmiti u međuzbroje obima prodaje zaliha. Vrijedi napomenuti da metoda KStream.groupBy vraća instancu tipa KGroupedStream. I možete dobiti KTable instancu daljim pozivanjem metode KGroupedStream.reduce.

Šta je sučelje KGroupedStream

Metode KStream.groupBy i KStream.groupByKey vraćaju instancu KGroupedStream. KGroupedStream je posredni prikaz toka događaja nakon grupiranja po ključevima. Uopće nije namijenjen direktnom radu s njim. Umjesto toga, KGroupedStream se koristi za operacije agregacije, čiji je rezultat uvijek KTable. A budući da je rezultat operacija agregacije KTable i oni koriste skladište stanja, moguće je da se sva ažuriranja kao rezultat ne šalju dalje niz cjevovod.

Metoda KTable.groupBy vraća sličnu KGroupedTable - posredni prikaz toka ažuriranja, pregrupisanih po ključu.

Napravimo kratku pauzu i pogledajmo sl. 5.9, koji pokazuje šta smo postigli. Ova topologija bi vam već trebala biti poznata.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Pogledajmo sada kod za ovu topologiju (može se naći u datoteci src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Dati kod se odlikuje kratkoćom i velikim brojem radnji koje se izvode u nekoliko redova. Možda ćete primijetiti nešto novo u prvom parametru metode builder.stream: vrijednost tipa enuma AutoOffsetReset.EARLIEST (postoji i LATEST), postavljena pomoću metode Consumed.withOffsetResetPolicy. Ovaj tip nabrajanja može se koristiti za specificiranje strategije poništavanja pomaka za svaki KStream ili KTable i ima prednost nad opcijom poništavanja pomaka iz konfiguracije.

GroupByKey i GroupBy

KStream interfejs ima dve metode za grupisanje zapisa: GroupByKey i GroupBy. Oba vraćaju KGroupedTable, pa se možda pitate koja je razlika između njih i kada koristiti koju?

Metoda GroupByKey se koristi kada ključevi u KStream-u već nisu prazni. I što je najvažnije, zastavica “zahteva ponovno particioniranje” nikada nije postavljena.

Metoda GroupBy pretpostavlja da ste promijenili ključeve za grupisanje, tako da je zastavica reparticije postavljena na true. Izvođenje spajanja, agregiranja itd. nakon metode GroupBy rezultirat će automatskim ponovnim particioniranjem.
Sažetak: Kad god je moguće, trebali biste koristiti GroupByKey umjesto GroupBy.

Jasno je šta rade metode mapValues ​​i groupBy, pa pogledajmo metodu sum() (koja se nalazi u src/main/java/bbejeck/model/ShareVolume.java) (listing 5.3).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Metoda ShareVolume.sum vraća tekući zbroj obima prodaje dionica, a rezultat cijelog lanca proračuna je KTable objekat . Sada razumete ulogu koju igra KTable. Kada stignu ShareVolume objekti, odgovarajući KTable objekt pohranjuje najnovije trenutno ažuriranje. Važno je zapamtiti da se sva ažuriranja odražavaju u prethodnom shareVolumeKTable, ali se ne šalju sve dalje.

Zatim koristimo ovu KTtable da agregiramo (prema broju trgovanih dionica) da bismo došli do pet kompanija s najvećim obimom dionica kojima se trguje u svakoj industriji. Naše akcije u ovom slučaju će biti slične onima za prvu agregaciju.

  1. Izvršite drugu groupBy operaciju da grupišete pojedinačne ShareVolume objekte prema djelatnostima.
  2. Počnite sa sumiranjem ShareVolume objekata. Ovaj put objekt agregacije je red s prioritetom fiksne veličine. U ovom redu fiksne veličine zadržava se samo pet kompanija sa najvećim brojem prodatih akcija.
  3. Mapirajte redove iz prethodnog pasusa na vrijednost niza i vratite prvih pet dionica koje se najviše trguju po broju prema djelatnostima.
  4. Zapišite rezultate u obliku niza na temu.

Na sl. Slika 5.10 prikazuje graf topologije toka podataka. Kao što vidite, drugi krug obrade je prilično jednostavan.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Sada kada imamo jasno razumijevanje strukture ovog drugog kruga obrade, možemo se obratiti njegovom izvornom kodu (naći ćete ga u datoteci src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Ovaj inicijalizator sadrži varijablu fixedQueue. Ovo je prilagođeni objekat koji je adapter za java.util.TreeSet koji se koristi za praćenje prvih N rezultata u opadajućem redoslijedu dionica kojima se trguje.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Već ste vidjeli pozive groupBy i mapValues, pa nećemo ulaziti u njih (pozivamo metodu KTable.toStream jer je metoda KTable.print zastarjela). Ali još niste vidjeli KTable verziju aggregate(), tako da ćemo provesti malo vremena raspravljajući o tome.

Kao što se sećate, ono što čini KTable drugačijim je to što se zapisi sa istim ključevima smatraju ažuriranjima. KTable zamjenjuje stari unos novim. Agregacija se dešava na sličan način: agregiraju se najnoviji zapisi sa istim ključem. Kada zapis stigne, on se dodaje u instancu klase FixedSizePriorityQueue pomoću zbrajača (drugi parametar u pozivu agregatne metode), ali ako već postoji drugi zapis sa istim ključem, tada se stari zapis uklanja pomoću oduzimanja (treći parametar u poziv agregatne metode).

Sve ovo znači da naš agregator, FixedSizePriorityQueue, ne agregira sve vrijednosti jednim ključem, već pohranjuje pokretni zbir količina N najtrgovanijih vrsta dionica. Svaki ulazni unos sadrži ukupan broj do sada prodatih dionica. KTable će vam dati informacije o tome kojim se dionicama kompanija trenutno najviše trguje, bez potrebe za objedinjavanjem svakog ažuriranja.

Naučili smo da radimo dve važne stvari:

  • grupirati vrijednosti u KTable pomoću zajedničkog ključa;
  • izvoditi korisne operacije kao što su zbrajanje i agregacija na ovim grupisanim vrijednostima.

Poznavanje kako izvršiti ove operacije važno je za razumijevanje značenja podataka koji se kreću kroz Kafka Streams aplikaciju i razumijevanje koje informacije one nose.

Također smo spojili neke od ključnih koncepata o kojima smo ranije govorili u ovoj knjizi. U poglavlju 4, raspravljali smo o tome koliko je lokalno stanje otporno na greške važno za streaming aplikaciju. Prvi primjer u ovom poglavlju pokazao je zašto je lokalna država toliko važna—omogućava vam da pratite koje ste informacije već vidjeli. Lokalni pristup izbjegava mrežna kašnjenja, čineći aplikaciju učinkovitijom i otpornijom na greške.

Kada izvodite bilo koju operaciju skupljanja ili agregacije, morate navesti ime spremišta stanja. Operacije skupljanja i agregacije vraćaju KTable instancu, a KTable koristi pohranu stanja da zamijeni stare rezultate novima. Kao što ste vidjeli, ne šalju se sva ažuriranja u toku, a ovo je važno jer su operacije agregacije dizajnirane da proizvode sažete informacije. Ako ne primijenite lokalno stanje, KTable će proslijediti sve rezultate agregacije i skupljanja.

Zatim ćemo pogledati izvođenje operacija kao što je agregacija unutar određenog vremenskog perioda - takozvane operacije prozora.

5.3.2. Prozorske operacije

U prethodnom odeljku uveli smo kliznu konvoluciju i agregaciju. Aplikacija je izvršila kontinuirani roll-up prodaje dionica nakon čega je uslijedilo agregiranje pet najtrgovanijih dionica na berzi.

Ponekad je potrebno takvo kontinuirano agregiranje i sakupljanje rezultata. A ponekad morate izvršiti operacije samo u određenom vremenskom periodu. Na primjer, izračunajte koliko je transakcija izvršeno s dionicama određene kompanije u posljednjih 10 minuta. Ili koliko je korisnika kliknulo na novi reklamni baner u posljednjih 15 minuta. Aplikacija može izvoditi takve operacije više puta, ali s rezultatima koji se primjenjuju samo na određene vremenske periode (vremenski prozori).

Računanje menjačkih transakcija po kupcu

U sljedećem primjeru pratit ćemo transakcije dionica kod više trgovaca—bilo velikih organizacija ili pametnih pojedinačnih finansijera.

Dva su moguća razloga za ovo praćenje. Jedna od njih je potreba da se zna šta tržišni lideri kupuju/prodaju. Ako ovi veliki igrači i sofisticirani investitori vide priliku, ima smisla slijediti njihovu strategiju. Drugi razlog je želja da se uoče svi mogući znakovi ilegalne trgovine insajderima. Da biste to učinili, morat ćete analizirati korelaciju velikih skokova prodaje s važnim saopštenjima za javnost.

Takvo praćenje se sastoji od sljedećih koraka:

  • kreiranje toka za čitanje iz teme transakcija dionica;
  • grupisanje ulaznih zapisa prema ID-u kupca i simbolu dionice. Pozivanje metode groupBy vraća instancu klase KGroupedStream;
  • Metoda KGroupedStream.windowedBy vraća tok podataka ograničen na vremenski prozor, koji omogućava agregaciju u prozorima. Ovisno o tipu prozora, vraća se ili TimeWindowedKStream ili SessionWindowedKStream;
  • broj transakcija za operaciju agregacije. Prozorski tok podataka određuje da li je određeni zapis uzet u obzir u ovom brojanju;
  • zapisivanje rezultata u temu ili njihovo slanje na konzolu tokom razvoja.

Topologija ove aplikacije je jednostavna, ali bi njena jasna slika bila od pomoći. Pogledajmo sl. 5.11.

Zatim ćemo pogledati funkcionalnost prozorskih operacija i odgovarajući kod.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"

Vrste prozora

Postoje tri tipa prozora u Kafka Streams:

  • sessional;
  • “tumbling” (tumbling);
  • klizanje/skakanje.

Koju ćete odabrati ovisi o vašim poslovnim zahtjevima. Prozori za prevrtanje i skakanje su vremenski ograničeni, dok su prozori sesije ograničeni aktivnošću korisnika – trajanje sesije (sesija) određuje se isključivo prema tome koliko je korisnik aktivan. Glavna stvar koju treba zapamtiti je da se svi tipovi prozora zasnivaju na datumskim/vremenskim oznakama unosa, a ne na sistemskom vremenu.

Zatim implementiramo našu topologiju sa svakim od tipova prozora. Kompletan kod će biti dat samo u prvom primjeru za druge tipove prozora ništa se neće promijeniti osim tipa operacije prozora.

Prozori sesije

Prozori sesije se veoma razlikuju od svih ostalih tipova prozora. Oni su ograničeni ne toliko vremenom koliko aktivnošću korisnika (ili aktivnošću entiteta koji želite pratiti). Prozori sesije su ograničeni periodima neaktivnosti.

Slika 5.12 ilustruje koncept prozora sesije. Manja sesija će se spojiti sa sesijom s njegove lijeve strane. A sesija sa desne strane će biti odvojena jer prati dug period neaktivnosti. Prozori sesije su zasnovani na aktivnosti korisnika, ali koriste oznake datuma/vremena iz unosa kako bi odredili kojoj sesiji taj unos pripada.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"

Korištenje prozora sesije za praćenje transakcija dionica

Koristimo prozore sesije da uhvatimo informacije o transakcijama razmene. Implementacija prozora sesije je prikazana u Listingu 5.5 (koji se može naći u src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Već ste vidjeli većinu operacija u ovoj topologiji, tako da nema potrebe da ih ovdje ponovo gledate. Ali ovdje postoji i nekoliko novih elemenata o kojima ćemo sada razgovarati.

Svaka groupBy operacija obično izvodi neku vrstu operacije agregacije (agregiranje, rollup ili brojanje). Možete izvesti ili kumulativno agregiranje s tekućim ukupnim iznosom ili agregaciju prozora, koja uzima u obzir zapise unutar određenog vremenskog okvira.

Kod u Listingu 5.5 broji broj transakcija unutar prozora sesije. Na sl. 5.13 ove akcije se analiziraju korak po korak.

Pozivanjem windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) kreiramo prozor sesije sa intervalom neaktivnosti od 20 sekundi i intervalom postojanosti od 15 minuta. Interval mirovanja od 20 sekundi znači da će aplikacija uključiti svaki unos koji stigne unutar 20 sekundi od završetka ili početka trenutne sesije u tekuću (aktivnu) sesiju.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Zatim specificiramo koju operaciju agregacije treba izvršiti u prozoru sesije - u ovom slučaju, count. Ako dolazni unos padne izvan prozora neaktivnosti (na obje strane oznake datuma/vremena), aplikacija kreira novu sesiju. Interval zadržavanja znači održavanje sesije određeno vrijeme i omogućava kasne podatke koji se protežu i nakon perioda neaktivnosti sesije, ali se i dalje mogu priložiti. Osim toga, početak i kraj nove sesije koja je rezultat spajanja odgovaraju najranijoj i najnovijoj datum/vremenskoj oznaci.

Pogledajmo nekoliko unosa iz metode brojanja da vidimo kako funkcionišu sesije (tabela 5.1).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Kada zapisi stignu, tražimo postojeće sesije sa istim ključem, vremenom završetka manjim od trenutnog datuma/vremenske oznake - interval neaktivnosti, i vremenom početka većim od trenutnog datuma/vremenskog žiga + intervala neaktivnosti. Uzimajući ovo u obzir, četiri unosa iz tabele. 5.1 se spajaju u jednu sesiju na sljedeći način.

1. Prvi stiže zapis 1, tako da je vrijeme početka jednako vremenu završetka i iznosi 00:00:00.

2. Zatim dolazi unos 2 i tražimo sesije koje se završavaju ne ranije od 23:59:55 i počinju najkasnije u 00:00:35. Pronalazimo zapis 1 i kombinujemo sesije 1 i 2. Uzimamo vreme početka sesije 1 (ranije) i vreme završetka sesije 2 (kasnije), tako da naša nova sesija počinje u 00:00:00 i završava se u 00: 00:15.

3. Stiže zapis 3, tražimo sesije između 00:00:30 i 00:01:10 i ne nalazimo nijednu. Dodajte drugu sesiju za ključ 123-345-654,FFBE, koja počinje i završava u 00:00:50.

4. Stiže zapis 4 i tražimo sesije između 23:59:45 i 00:00:25. Ovog puta se nalaze obje sesije 1 i 2. Sve tri sesije su kombinovane u jednu, sa vremenom početka 00:00:00 i vremenom završetka 00:00:15.

Iz onoga što je opisano u ovom odjeljku, vrijedi zapamtiti sljedeće važne nijanse:

  • sesije nisu prozori fiksne veličine. Trajanje sesije je određeno aktivnošću u datom vremenskom periodu;
  • Oznake datuma/vremena u podacima određuju da li događaj spada u postojeću sesiju ili tokom perioda mirovanja.

Dalje ćemo razgovarati o sljedećoj vrsti prozora - prozorima koji se prevrću.

"Tumbling" prozori

Prozori koji se prevrću hvataju događaje koji spadaju u određeni vremenski period. Zamislite da trebate snimiti sve transakcije dionica određene kompanije svakih 20 sekundi, tako da prikupite sve događaje u tom vremenskom periodu. Na kraju intervala od 20 sekundi, prozor se okreće i prelazi na novi interval posmatranja od 20 sekundi. Slika 5.14 ilustruje ovu situaciju.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Kao što vidite, svi događaji primljeni u posljednjih 20 sekundi su uključeni u prozor. Na kraju ovog vremenskog perioda kreira se novi prozor.

Listing 5.6 prikazuje kod koji demonstrira upotrebu prevrtljivih prozora za snimanje transakcija dionica svakih 20 sekundi (nalazi se u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Sa ovom malom promjenom u pozivu metode TimeWindows.of, možete koristiti prozor koji se okreće. Ovaj primjer ne poziva metodu until(), tako da će se koristiti podrazumevani interval zadržavanja od 24 sata.

Konačno, vrijeme je da prijeđemo na posljednju opciju prozora - "skakanje" prozora.

Klizni ("skakanje") prozori

Klizni/skokovi prozori su slični prozorima koji se prevrću, ali sa malom razlikom. Klizni prozori ne čekaju do kraja vremenskog intervala prije nego što kreiraju novi prozor za obradu nedavnih događaja. Počinju nove proračune nakon intervala čekanja manjeg od trajanja prozora.

Da bismo ilustrovali razlike između prevrtanja i skakanja prozora, vratimo se na primjer brojanja berzanskih transakcija. Naš cilj je i dalje brojati broj transakcija, ali ne želimo čekati cijelo vrijeme prije ažuriranja brojača. Umjesto toga, ažurirat ćemo brojač u kraćim intervalima. Na primjer, i dalje ćemo brojati broj transakcija svakih 20 sekundi, ali ćemo ažurirati brojač svakih 5 sekundi, kao što je prikazano na slici. 5.15. U ovom slučaju, na kraju imamo tri prozora rezultata sa podacima koji se preklapaju.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Listing 5.7 prikazuje kod za definisanje kliznih prozora (naći se u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Prozor koji se prevrće može se konvertovati u prozor za skakanje dodavanjem poziva metodi advanceBy(). U prikazanom primjeru, interval čuvanja je 15 minuta.

U ovom odeljku ste videli kako ograničiti rezultate agregacije na vremenske prozore. Posebno želim da zapamtite sljedeće tri stvari iz ovog odjeljka:

  • veličina prozora sesije nije ograničena vremenskim periodom, već aktivnostima korisnika;
  • Prozori koji se „prevrću“ pružaju pregled događaja u datom vremenskom periodu;
  • Trajanje prozora za preskakanje je fiksno, ali se oni često ažuriraju i mogu sadržavati preklapajuće unose u svim prozorima.

Zatim ćemo naučiti kako pretvoriti KTable natrag u KStream za povezivanje.

5.3.3. Povezivanje KStream i KTable objekata

U poglavlju 4, raspravljali smo o povezivanju dva KStream objekta. Sada moramo naučiti kako povezati KTable i KStream. Ovo može biti potrebno iz sljedećeg jednostavnog razloga. KStream je tok zapisa, a KTable je tok ažuriranja zapisa, ali ponekad ćete možda poželjeti da dodate dodatni kontekst u tok zapisa koristeći ažuriranja iz KTable.

Uzmimo podatke o broju berzanskih transakcija i kombinujmo ih sa berzanskim vijestima za relevantne industrije. Evo šta trebate učiniti da to postignete s obzirom na kod koji već imate.

  1. Pretvorite KTable objekat s podacima o broju transakcija dionica u KStream, nakon čega slijedi zamjena ključa ključem koji označava industrijski sektor koji odgovara ovom simbolu dionice.
  2. Kreirajte KTable objekat koji čita podatke iz teme sa vestima sa berze. Ovaj novi KTable će biti kategoriziran po industrijskim sektorima.
  3. Povežite ažurirane vijesti s informacijama o broju berzanskih transakcija po industrijskim sektorima.

Sada da vidimo kako implementirati ovaj akcioni plan.

Pretvorite KTable u KStream

Da konvertujete KTable u KStream potrebno je da uradite sledeće.

  1. Pozovite metodu KTable.toStream().
  2. Pozivanjem metode KStream.map, zamijenite ključ imenom industrije, a zatim preuzmite TransactionSummary objekat iz Windowed instance.

Ove operacije ćemo ulančati zajedno na sljedeći način (kod se može naći u datoteci src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Budući da izvodimo operaciju KStream.map, vraćena KStream instanca se automatski ponovo particionira kada se koristi u vezi.

Završili smo proces konverzije, zatim moramo kreirati KTable objekat za čitanje vijesti o dionicama.

Kreiranje KTtable za vijesti o dionicama

Na sreću, kreiranje KTable objekta traje samo jedan red koda (kôd se može naći u src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Vrijedi napomenuti da nije potrebno specificirati Serde objekte, jer se niz Serdes koristi u postavkama. Takođe, korišćenjem NAJRANIJE enumeracije, tabela se popunjava zapisima na samom početku.

Sada možemo prijeći na posljednji korak - povezivanje.

Povezivanje ažuriranja vijesti s podacima o broju transakcija

Stvaranje veze nije teško. Koristićemo lijevo spajanje u slučaju da nema vijesti o dionicama za relevantnu industriju (potreban kod se može naći u datoteci src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Ovaj operator leftJoin je prilično jednostavan. Za razliku od spajanja u poglavlju 4, JoinWindow metoda se ne koristi jer kada se izvodi spajanje KStream-KTable, postoji samo jedan unos u KTable za svaki ključ. Takva veza nije vremenski ograničena: zapis je ili u KTable ili ga nema. Glavni zaključak: korištenjem KTable objekata možete obogatiti KStream s manje često ažuriranim referentnim podacima.

Sada ćemo pogledati efikasniji način za obogaćivanje događaja iz KStreama.

5.3.4. GlobalKTable objekti

Kao što vidite, postoji potreba za obogaćivanjem tokova događaja ili dodavanjem konteksta u njih. U 4. poglavlju ste videli veze između dva KStream objekta, au prethodnom odeljku videli ste vezu između KStream-a i KTable-a. U svim ovim slučajevima, potrebno je ponovno particionirati tok podataka prilikom mapiranja ključeva na novi tip ili vrijednost. Ponekad se reparticioniranje vrši eksplicitno, a ponekad Kafka Streams to radi automatski. Ponovno particioniranje je neophodno jer su se ključevi promijenili i zapisi moraju završiti u novim sekcijama, inače će povezivanje biti nemoguće (o tome je bilo riječi u poglavlju 4, u dijelu „Ponovno particioniranje podataka“ u pododjeljku 4.2.4).

Ponovno particioniranje ima cijenu

Ponovno particioniranje zahtijeva troškove – dodatne troškove resursa za kreiranje međutema, pohranjivanje duplih podataka u drugu temu; to također znači povećano kašnjenje zbog pisanja i čitanja iz ove teme. Dodatno, ako trebate spojiti više od jednog aspekta ili dimenzije, morate povezati spojeve, mapirati zapise s novim ključevima i ponovo pokrenuti proces ponovnog particioniranja.

Povezivanje na manje skupove podataka

U nekim slučajevima, obim referentnih podataka koji se povezuju je relativno mali, tako da se njihove kompletne kopije lako mogu smjestiti lokalno na svaki čvor. Za ovakve situacije, Kafka Streams pruža klasu GlobalKTable.

GlobalKTable instance su jedinstvene jer aplikacija replicira sve podatke na svaki od čvorova. A pošto su svi podaci prisutni na svakom čvoru, nema potrebe za particioniranjem toka događaja pomoću referentnog ključa podataka tako da bude dostupan svim particijama. Također možete napraviti spajanja bez ključa koristeći GlobalKTable objekte. Vratimo se na jedan od prethodnih primjera da demonstriramo ovu osobinu.

Povezivanje KStream objekata sa GlobalKTable objektima

U pododjeljku 5.3.2 izvršili smo prozor agregacije transakcija razmjene po kupcima. Rezultati ove agregacije izgledali su otprilike ovako:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Iako su ovi rezultati služili svrsi, bilo bi korisnije da su ime kupca i puni naziv kompanije također bili prikazani. Da biste dodali ime kupca i ime kompanije, možete napraviti uobičajena spajanja, ali ćete morati napraviti dva mapiranja ključa i ponovno particioniranje. Uz GlobalKTable možete izbjeći troškove takvih operacija.

Da bismo to uradili, koristićemo objekat countStream iz Listinga 5.11 (odgovarajući kod se može naći u src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) i povezati ga sa dva GlobalKTable objekta.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
O tome smo već razgovarali, pa neću ponavljati. Ali primjećujem da je kod u funkciji toStream().map apstrahovan u funkcijski objekt umjesto u inline lambda izraz radi čitljivosti.

Sljedeći korak je deklariranje dvije instance GlobalKTable (prikazan kod se može naći u datoteci src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"

Imajte na umu da su nazivi tema opisani pomoću nabrojanih tipova.

Sada kada imamo sve komponente spremne, ostaje samo da napišemo kod za vezu (koji se može naći u datoteci src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Iako postoje dva spoja u ovom kodu, oni su ulančani jer se nijedan njihov rezultat ne koristi zasebno. Rezultati se prikazuju na kraju cijele operacije.

Kada pokrenete gornju operaciju spajanja, dobit ćete sljedeće rezultate:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Suština se nije promijenila, ali ovi rezultati izgledaju jasnije.

Ako odbrojite do 4. poglavlja, već ste vidjeli nekoliko vrsta veza u akciji. Oni su navedeni u tabeli. 5.2. Ova tabela odražava mogućnosti povezivanja od verzije 1.0.0 Kafka Streams-a; Nešto se može promijeniti u budućim izdanjima.

Knjiga „Kafkini tokovi u akciji. Aplikacije i mikroservise za rad u realnom vremenu"
Da završimo stvari, hajde da ponovimo osnove: možete povezati tokove događaja (KStream) i ažurirati tokove (KTable) koristeći lokalno stanje. Alternativno, ako veličina referentnih podataka nije prevelika, možete koristiti GlobalKTable objekt. GlobalKTables replicira sve particije na svaki Kafka Streams aplikacijski čvor, osiguravajući da su svi podaci dostupni bez obzira na to kojoj particiji ključ odgovara.

Zatim ćemo vidjeti funkciju Kafka Streams, zahvaljujući kojoj možemo promatrati promjene stanja bez konzumiranja podataka iz Kafka teme.

5.3.5. Upitno stanje

Već smo izvršili nekoliko operacija koje uključuju stanje i uvijek šaljemo rezultate na konzolu (u svrhu razvoja) ili ih zapisujemo u temu (u svrhu proizvodnje). Kada pišete rezultate na temu, morate koristiti Kafka potrošača da biste ih vidjeli.

Čitanje podataka iz ovih tema može se smatrati vrstom materijaliziranih pogleda. Za naše potrebe možemo koristiti definiciju materijaliziranog pogleda iz Wikipedije: „...fizički objekt baze podataka koji sadrži rezultate upita. Na primjer, to može biti lokalna kopija udaljenih podataka, ili podskup redova i/ili stupaca tablice ili rezultata spajanja, ili sažeta tablica dobivena agregacijom” (https://en.wikipedia.org/wiki /Materijalizirani_pregled).

Kafka Streams vam takođe omogućava pokretanje interaktivnih upita na državnim prodavnicama, omogućavajući vam da direktno čitate ove materijalizovane poglede. Važno je napomenuti da je upit spremištu stanja operacija samo za čitanje. Ovo osigurava da ne morate brinuti o tome da ćete slučajno učiniti stanje nekonzistentnim dok vaša aplikacija obrađuje podatke.

Važna je mogućnost direktnog upita skladišta stanja. To znači da možete kreirati aplikacije za kontrolnu tablu bez potrebe da prvo preuzimate podatke od Kafka potrošača. Takođe povećava efikasnost aplikacije, zbog činjenice da nema potrebe za ponovnim pisanjem podataka:

  • zahvaljujući lokalnosti podataka, može im se brzo pristupiti;
  • Dupliciranje podataka je eliminirano, jer se ne upisuju u eksternu memoriju.

Glavna stvar koju želim da zapamtite je da možete direktno pitati stanje iz vaše aplikacije. Mogućnosti koje vam ovo pruža ne mogu se precijeniti. Umjesto da konzumirate podatke iz Kafke i pohranjujete zapise u bazu podataka za aplikaciju, možete postaviti upit za skladišta stanja s istim rezultatom. Direktni upiti do skladišta stanja znače manje koda (bez potrošača) i manje softvera (nema potrebe za tablicom baze podataka za pohranjivanje rezultata).

U ovom poglavlju smo pokrili dosta terena, tako da ćemo za sada ostaviti našu raspravu o interaktivnim upitima prema državnim prodavnicama. Ali ne brinite: u 9. poglavlju ćemo kreirati jednostavnu aplikaciju za kontrolnu tablu sa interaktivnim upitima. Koristit će se neki od primjera iz ovog i prethodnih poglavlja da bi se demonstrirali interaktivni upiti i kako ih možete dodati Kafka Streams aplikacijama.

Rezime

  • KStream objekti predstavljaju tokove događaja, uporedivi sa umetanjima u bazu podataka. KTable objekti predstavljaju tokove ažuriranja, više kao ažuriranja baze podataka. Veličina KTable objekta ne raste, stari zapisi se zamjenjuju novim.
  • KTable objekti su potrebni za operacije agregacije.
  • Koristeći prozorske operacije, možete podijeliti agregirane podatke u vremenske segmente.
  • Zahvaljujući GlobalKTable objektima, možete pristupiti referentnim podacima bilo gdje u aplikaciji, bez obzira na particioniranje.
  • Moguće su veze između KStream, KTable i GlobalKTable objekata.

Do sada smo se fokusirali na izgradnju Kafka Streams aplikacija koristeći KStream DSL visokog nivoa. Iako vam pristup visokog nivoa omogućava kreiranje urednih i sažetih programa, njegovo korištenje predstavlja kompromis. Rad sa DSL KStream-om znači povećanje konciznosti vašeg koda smanjenjem stepena kontrole. U sledećem poglavlju, pogledaćemo API čvora rukovaoca niskog nivoa i isprobati druge kompromise. Programi će biti duži nego što su bili prije, ali moći ćemo kreirati gotovo svaki čvor rukovanja koji nam može zatrebati.

→ Više detalja o knjizi možete pronaći na web stranicu izdavača

→ Za Habrozhiteli 25% popusta koristeći kupon - Kafka Streams

→ Po uplati papirne verzije knjige, elektronska knjiga će biti poslana e-poštom.

izvor: www.habr.com

Dodajte komentar