Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času" Pozdravljeni prebivalci Khabra! Ta knjiga je primerna za vse razvijalce, ki želijo razumeti obdelavo niti. Razumevanje porazdeljenega programiranja vam bo pomagalo bolje razumeti Kafko in Kafkine tokove. Lepo bi bilo poznati sam okvir Kafke, vendar to ni potrebno: Povedal vam bom vse, kar potrebujete. Izkušeni razvijalci Kafka in začetniki se bodo v tej knjigi naučili, kako ustvariti zanimive aplikacije za obdelavo tokov z uporabo knjižnice Kafka Streams. Srednje in napredni razvijalci Java, ki so že seznanjeni s koncepti, kot je serializacija, se bodo naučili uporabljati svoje veščine za ustvarjanje aplikacij Kafka Streams. Izvorna koda knjige je napisana v Javi 8 in v veliki meri uporablja sintakso lambda izraza Java 8, zato bo znanje o delu z lambda funkcijami (tudi v drugem programskem jeziku) prišlo prav.

Izvleček. 5.3. Operacije združevanja in okna

V tem razdelku bomo prešli na raziskovanje najbolj obetavnih delov Kafkinih tokov. Doslej smo obravnavali naslednje vidike Kafkinih tokov:

  • ustvarjanje procesne topologije;
  • uporaba stanja v pretočnih aplikacijah;
  • izvajanje povezav podatkovnega toka;
  • razlike med tokovi dogodkov (KStream) in tokovi posodobitev (KTable).

V naslednjih primerih bomo združili vse te elemente. Naučili se boste tudi o oknih, še eni odlični funkciji pretočnih aplikacij. Naš prvi primer bo preprosto združevanje.

5.3.1. Združevanje prodaje zalog po industrijskih sektorjih

Združevanje in združevanje sta ključna orodja pri delu s pretočnimi podatki. Preverjanje posameznih zapisov ob prejemu je pogosto nezadostno. Če želite iz podatkov pridobiti dodatne informacije, jih je potrebno združiti in združiti.

V tem primeru se boste oblekli v kostum dnevnega trgovca, ki mora slediti obsegu prodaje delnic podjetij v več panogah. Natančneje, zanima vas pet podjetij z največjo prodajo delnic v vsaki panogi.

Takšno združevanje bo zahtevalo naslednjih nekaj korakov za pretvorbo podatkov v želeno obliko (na splošno).

  1. Ustvarite tematski vir, ki objavlja neobdelane informacije o trgovanju z delnicami. Objekt tipa StockTransaction bomo morali preslikati v objekt tipa ShareVolume. Gre za to, da objekt StockTransaction vsebuje metapodatke o prodaji, potrebujemo pa le podatke o številu prodanih delnic.
  2. Podatke ShareVolume združite po simbolu delnice. Ko jih razvrstite po simbolih, lahko te podatke strnete v delne vsote obsega prodaje delnic. Omeniti velja, da metoda KStream.groupBy vrne primerek tipa KGroupedStream. Primerek KTable lahko dobite z nadaljnjim klicem metode KGroupedStream.reduce.

Kaj je vmesnik KGroupedStream

Metodi KStream.groupBy in KStream.groupByKey vrneta primerek KGroupedStream. KGroupedStream je vmesna predstavitev toka dogodkov po združevanju po ključih. Sploh ni namenjen neposrednemu delu z njim. Namesto tega se KGroupedStream uporablja za operacije združevanja, ki vedno povzročijo KTable. In ker je rezultat operacij združevanja KTable in uporabljajo shrambo stanja, je možno, da vse posodobitve kot rezultat niso poslane naprej po cevovodu.

Metoda KTable.groupBy vrne podobno KGroupedTable – vmesno predstavitev toka posodobitev, ponovno razvrščenih po ključu.

Vzemimo kratek odmor in poglejmo sl. 5.9, ki prikazuje, kaj smo dosegli. Ta topologija bi vam morala biti že dobro poznana.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Poglejmo zdaj kodo za to topologijo (najdete jo v datoteki src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (izpis 5.2).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Dano kodo odlikuje jedrnatost in velik obseg dejanj, izvedenih v več vrsticah. Morda boste opazili nekaj novega v prvem parametru metode builder.stream: vrednost tipa enum AutoOffsetReset.EARLIEST (obstaja tudi LATEST), nastavljena z metodo Consumed.withOffsetResetPolicy. Ta tip oštevilčenja se lahko uporablja za določitev strategije ponastavitve odmika za vsak KStream ali KTable in ima prednost pred možnostjo ponastavitve odmika iz konfiguracije.

GroupByKey in GroupBy

Vmesnik KStream ima dve metodi za združevanje zapisov: GroupByKey in GroupBy. Oba vrneta KGroupedTable, zato se morda sprašujete, kakšna je razlika med njima in kdaj uporabiti katero?

Metoda GroupByKey se uporablja, ko ključi v KStream že niso prazni. In kar je najpomembnejše, zastavica »zahteva ponovno particioniranje« ni bila nikoli nastavljena.

Metoda GroupBy predvideva, da ste spremenili ključe združevanja, zato je zastavica ponovne razdelitve nastavljena na true. Izvajanje združevanj, združevanj itd. po metodi GroupBy bo povzročilo samodejno ponovno particioniranje.
Povzetek: kadar koli je to mogoče, raje uporabite GroupByKey kot GroupBy.

Jasno je, kaj počneta metodi mapValues ​​​​in groupBy, zato si poglejmo metodo sum() (najdete jo v src/main/java/bbejeck/model/ShareVolume.java) (izpis 5.3).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Metoda ShareVolume.sum vrne tekočo vsoto obsega prodaje zalog, rezultat celotne verige izračunov pa je objekt KTable . Zdaj razumete vlogo KTable. Ko prispejo objekti ShareVolume, ustrezen objekt KTable shrani najnovejšo trenutno posodobitev. Pomembno si je zapomniti, da se vse posodobitve odražajo v prejšnji tabeli shareVolumeKTable, vendar niso vse poslane naprej.

To tabelo KTable nato uporabimo za seštevanje (po številu delnic, s katerimi se trguje), da pridemo do petih podjetij z največjimi količinami delnic, s katerimi se trguje v vsaki panogi. Naša dejanja v tem primeru bodo podobna tistim pri prvem združevanju.

  1. Izvedite drugo operacijo groupBy, da združite posamezne objekte ShareVolume po panogah.
  2. Začnite povzemati objekte ShareVolume. Tokrat je objekt združevanja prednostna čakalna vrsta s fiksno velikostjo. V tej čakalni vrsti s fiksno velikostjo se ohrani le pet podjetij z največjimi količinami prodanih delnic.
  3. Preslikajte čakalne vrste iz prejšnjega odstavka v vrednost niza in vrnite prvih pet najbolj prometnih delnic po številu glede na panogo.
  4. Zapišite rezultate v obliki niza v temo.

Na sl. Slika 5.10 prikazuje graf topologije pretoka podatkov. Kot lahko vidite, je drugi krog obdelave precej preprost.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Zdaj, ko jasno razumemo strukturo tega drugega kroga obdelave, se lahko obrnemo na njegovo izvorno kodo (našli jo boste v datoteki src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (izpis 5.4) .

Ta inicializator vsebuje spremenljivko fixedQueue. To je objekt po meri, ki je adapter za java.util.TreeSet, ki se uporablja za sledenje prvih N rezultatov v padajočem vrstnem redu delnic, s katerimi se trguje.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Videli ste že klice groupBy in mapValues​, zato se vanje ne bomo spuščali (metodo KTable.toStream kličemo, ker je metoda KTable.print zastarela). Toda različice aggregate() KTable še niste videli, zato bomo o tem razpravljali nekaj časa.

Kot se spomnite, se KTable razlikuje po tem, da se zapisi z enakimi ključi štejejo za posodobitve. KTable zamenja stari vnos z novim. Združevanje poteka na podoben način: združijo se zadnji zapisi z istim ključem. Ko prispe zapis, se doda v primerek razreda FixedSizePriorityQueue z uporabo seštevalnika (drugi parameter v klicu agregatne metode), če pa že obstaja drug zapis z istim ključem, se stari zapis odstrani z odštevalnikom (tretji parameter v klic agregatne metode).

Vse to pomeni, da naš agregator, FixedSizePriorityQueue, ne združuje vseh vrednosti z enim ključem, ampak shranjuje premikajočo se vsoto količin N najbolj prometnih vrst delnic. Vsak vhodni vnos vsebuje skupno število do sedaj prodanih delnic. KTable vam bo dal informacije o tem, s katerimi delnicami podjetij se trenutno največ trguje, ne da bi zahtevali tekoče združevanje vsake posodobitve.

Naučili smo se narediti dve pomembni stvari:

  • združite vrednosti v KTable s skupnim ključem;
  • izvajati uporabne operacije, kot sta združevanje in združevanje teh združenih vrednosti.

Poznavanje izvajanja teh operacij je pomembno za razumevanje pomena podatkov, ki se premikajo skozi aplikacijo Kafka Streams, in razumevanje informacij, ki jih prenašajo.

Zbrali smo tudi nekaj ključnih konceptov, o katerih smo prej razpravljali v tej knjigi. V 4. poglavju smo razpravljali o tem, kako pomembno je lokalno stanje, odporno na napake, za pretočno aplikacijo. Prvi primer v tem poglavju je pokazal, zakaj je lokalno stanje tako pomembno – omogoča vam, da spremljate informacije, ki ste jih že videli. Lokalni dostop prepreči zamude v omrežju, zaradi česar je aplikacija bolj zmogljiva in odporna na napake.

Ko izvajate katero koli operacijo zbiranja ali združevanja, morate podati ime shrambe stanja. Operacije zbiranja in združevanja vrnejo primerek KTable, KTable pa uporablja pomnilnik stanja za zamenjavo starih rezultatov z novimi. Kot ste videli, vse posodobitve niso poslane po cevovodu, kar je pomembno, ker so operacije združevanja zasnovane za ustvarjanje povzetkov informacij. Če ne uporabite lokalnega stanja, bo KTable posredoval vse rezultate združevanja in zbiranja.

Nato si bomo ogledali izvajanje operacij, kot je združevanje v določenem časovnem obdobju - tako imenovane operacije oken.

5.3.2. Okenske operacije

V prejšnjem razdelku smo predstavili drsno konvolucijo in združevanje. Aplikacija je izvedla neprekinjeno zbiranje prodaj delnic, čemur je sledilo združevanje petih najbolj prometnih delnic na borzi.

Včasih je potrebno tako stalno združevanje in združevanje rezultatov. In včasih morate operacije izvajati samo v določenem časovnem obdobju. Na primer, izračunajte, koliko menjalnih poslov je bilo opravljenih z delnicami določenega podjetja v zadnjih 10 minutah. Ali koliko uporabnikov je kliknilo na novo oglasno pasico v zadnjih 15 minutah. Aplikacija lahko takšne operacije izvede večkrat, vendar z rezultati, ki veljajo le za določena časovna obdobja (časovna okna).

Štetje menjalnih poslov po kupcu

V naslednjem primeru bomo sledili transakcijam z delnicami med več trgovci – bodisi velikimi organizacijami bodisi pametnimi posameznimi finančniki.

Obstajata dva možna razloga za to sledenje. Eden od njih je potreba po tem, kaj kupujejo/prodajajo vodilni na trgu. Če ti veliki igralci in prefinjeni vlagatelji vidijo priložnost, je smiselno slediti njihovi strategiji. Drugi razlog je želja po odkrivanju morebitnih znakov nezakonitega trgovanja z notranjimi informacijami. Če želite to narediti, boste morali analizirati korelacijo med velikimi skoki prodaje in pomembnimi sporočili za javnost.

Tako sledenje je sestavljeno iz naslednjih korakov:

  • ustvarjanje toka za branje iz teme o delniških transakcijah;
  • združevanje dohodnih zapisov po ID-ju kupca in simbolu delnice. Klic metode groupBy vrne primerek razreda KGroupedStream;
  • Metoda KGroupedStream.windowedBy vrne tok podatkov, omejen na časovno okno, kar omogoča okensko združevanje. Odvisno od vrste okna je vrnjen TimeWindowedKStream ali SessionWindowedKStream;
  • število transakcij za operacijo združevanja. Okenski tok podatkov določa, ali se pri tem štetju upošteva določen zapis;
  • pisanje rezultatov v temo ali njihovo izpisovanje na konzolo med razvojem.

Topologija te aplikacije je preprosta, vendar bi bila njena jasna slika v pomoč. Oglejmo si sl. 5.11.

Nato si bomo ogledali funkcionalnost okenskih operacij in ustrezno kodo.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"

Vrste oken

V Kafkinih tokovih obstajajo tri vrste oken:

  • sejni;
  • "tumble" (tumble);
  • drsenje/skakanje.

Katerega izbrati, je odvisno od vaših poslovnih potreb. Prevračanje in preskakovanje oken je časovno omejeno, medtem ko so okna sej omejena z aktivnostjo uporabnika – trajanje sej je določeno izključno glede na to, kako aktiven je uporabnik. Glavna stvar, ki si jo morate zapomniti, je, da vse vrste oken temeljijo na datumskih/časovnih žigih vnosov in ne na sistemskem času.

Nato implementiramo našo topologijo z vsako od vrst oken. Celotna koda bo podana le v prvem primeru, pri drugih tipih oken se ne bo spremenilo nič razen tipa delovanja okna.

Okna sej

Okna sej se zelo razlikujejo od vseh drugih vrst oken. Niso omejeni toliko s časom kot z aktivnostjo uporabnika (ali dejavnostjo subjekta, ki mu želite slediti). Okna sej so ločena z obdobji nedejavnosti.

Slika 5.12 ponazarja koncept oken sej. Manjša seja se bo združila s sejo na njeni levi. In seja na desni bo ločena, ker sledi dolgemu obdobju nedejavnosti. Okna sej temeljijo na dejavnosti uporabnika, vendar uporabljajo datumske/časovne žige vnosov, da določijo, kateri seji vnos pripada.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"

Uporaba oken sej za sledenje transakcij delnic

Uporabimo okna sej za zajemanje informacij o menjalnih transakcijah. Implementacija oken seje je prikazana v seznamu 5.5 (ki ga najdete v src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Večino operacij v tej topologiji ste že videli, zato jih tukaj ni treba znova gledati. Tu pa je tudi več novih elementov, o katerih bomo zdaj razpravljali.

Vsaka operacija groupBy običajno izvede neke vrste operacijo združevanja (združevanje, zbiranje ali štetje). Izvedete lahko kumulativno združevanje z tekočo vsoto ali okensko združevanje, ki upošteva zapise v določenem časovnem oknu.

Koda v seznamu 5.5 šteje število transakcij znotraj oken seje. Na sl. 5.13 ta dejanja se analizirajo korak za korakom.

S klicem windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) ustvarimo okno seje z intervalom nedejavnosti 20 sekund in intervalom obstojnosti 15 minut. Interval nedejavnosti 20 sekund pomeni, da bo aplikacija vključila vsak vnos, ki prispe v 20 sekundah po koncu ali začetku trenutne seje, v trenutno (aktivno) sejo.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Nato določimo, katero operacijo združevanja je treba izvesti v oknu seje - v tem primeru štetje. Če dohodni vnos pade zunaj okna nedejavnosti (na kateri koli strani datumskega/časovnega žiga), aplikacija ustvari novo sejo. Interval hrambe pomeni vzdrževanje seje določen čas in omogoča pozne podatke, ki presegajo obdobje nedejavnosti seje, vendar jih je še vedno mogoče priložiti. Poleg tega začetek in konec nove seje, ki izhaja iz združitve, ustrezata najzgodnejšemu in najpoznejšemu datumskemu/časovnemu žigu.

Oglejmo si nekaj vnosov iz metode štetja, da vidimo, kako delujejo seje (tabela 5.1).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Ko prispejo zapisi, poiščemo obstoječe seje z enakim ključem, končnim časom, ki je krajši od trenutnega datumskega/časovnega žiga – interval nedejavnosti, in začetnim časom, ki je daljši od trenutnega datumskega/časovnega žiga + interval nedejavnosti. Ob upoštevanju tega so štirje vnosi iz tabele. 5.1 so združeni v eno sejo, kot sledi.

1. Zapis 1 prispe prvi, tako da je začetni čas enak končnemu času in je 00:00:00.

2. Nato pride vnos 2 in poiščemo seje, ki se ne končajo pred 23:59:55 in se začnejo najpozneje ob 00:00:35. Poiščemo zapis 1 in združimo seji 1 in 2. Vzamemo čas začetka seje 1 (prej) in čas konca seje 2 (pozneje), tako da se naša nova seja začne ob 00:00:00 in konča ob 00: 00:15.

3. Prispe zapis 3, iščemo seje med 00:00:30 in 00:01:10 in ne najdemo nobene. Dodajte drugo sejo za ključ 123-345-654,FFBE, ki se začne in konča ob 00:00:50.

4. Prispe zapisnik 4 in iščemo termine med 23:59:45 in 00:00:25. Tokrat sta najdeni obe seji 1 in 2. Vse tri seje so združene v eno z začetnim časom 00:00:00 in končnim časom 00:00:15.

Iz tega, kar je opisano v tem razdelku, je vredno zapomniti naslednje pomembne nianse:

  • seje niso okna fiksne velikosti. Trajanje seje je določeno z aktivnostjo v določenem časovnem obdobju;
  • Datumski/časovni žigi v podatkih določajo, ali dogodek spada v obstoječo sejo ali med obdobjem mirovanja.

Nato bomo razpravljali o naslednji vrsti oken - "prevrnitvenih" oknih.

"Prevrtljiva" okna

Vrteča se okna zajamejo dogodke v določenem časovnem obdobju. Predstavljajte si, da morate vsakih 20 sekund zajeti vse borzne transakcije določenega podjetja, tako da zberete vse dogodke v tem časovnem obdobju. Ob koncu 20-sekundnega intervala se okno prevrne in premakne na nov 20-sekundni interval opazovanja. Slika 5.14 ponazarja to situacijo.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Kot lahko vidite, so vsi dogodki, prejeti v zadnjih 20 sekundah, vključeni v okno. Ob koncu tega časovnega obdobja se ustvari novo okno.

Listing 5.6 prikazuje kodo, ki prikazuje uporabo padajočih oken za zajemanje delniških transakcij vsakih 20 sekund (najdete jo v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
S to majhno spremembo klica metode TimeWindows.of lahko uporabite vrtljivo okno. Ta primer ne kliče metode until(), zato bo uporabljen privzeti interval hrambe 24 ur.

Končno je čas, da se premaknemo na zadnjo možnost oken - "skočna" okna.

Drsna ("skakalna") okna

Drsna/skakajoča okna so podobna vrtljivim oknom, vendar z majhno razliko. Drsna okna ne čakajo do konca časovnega intervala, preden ustvarijo novo okno za obdelavo nedavnih dogodkov. Nove izračune začnejo po čakalnem intervalu, krajšem od trajanja okna.

Za ponazoritev razlik med padajočimi in skakajočimi okni se vrnimo k primeru štetja borznih poslov. Naš cilj je še vedno prešteti število transakcij, vendar ne želimo čakati ves čas, preden posodobimo števec. Namesto tega bomo posodabljali števec v krajših intervalih. Na primer, še vedno bomo šteli število transakcij vsakih 20 sekund, vendar bomo števec posodabljali vsakih 5 sekund, kot je prikazano na sl. 5.15. V tem primeru imamo na koncu tri okna z rezultati s prekrivajočimi se podatki.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Listing 5.7 prikazuje kodo za definiranje drsnih oken (najdete jo v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Vrteče se okno lahko pretvorite v skakajoče okno z dodajanjem klica metodi advanceBy(). V prikazanem primeru je interval shranjevanja 15 minut.

V tem razdelku ste videli, kako omejiti rezultate združevanja na časovna okna. Še posebej želim, da si zapomnite naslednje tri stvari iz tega razdelka:

  • velikost oken seje ni omejena s časovnim obdobjem, ampak z dejavnostjo uporabnika;
  • »tumble« okna omogočajo pregled dogajanja v določenem časovnem obdobju;
  • Trajanje preskakujočih oken je določeno, vendar se pogosto posodabljajo in lahko vsebujejo prekrivajoče se vnose v vseh oknih.

Nato se bomo naučili, kako pretvoriti KTable nazaj v KStream za povezavo.

5.3.3. Povezovanje objektov KStream in KTable

V 4. poglavju smo razpravljali o povezovanju dveh objektov KStream. Zdaj se moramo naučiti, kako povezati KTable in KStream. To je morda potrebno zaradi naslednjega preprostega razloga. KStream je tok zapisov, KTable pa je tok posodobitev zapisov, včasih pa boste morda želeli dodati dodaten kontekst toku zapisov z uporabo posodobitev iz KTable.

Vzemimo podatke o številu borznih poslov in jih združimo z borznimi novicami za ustrezne panoge. Tukaj je opisano, kaj morate storiti, da to dosežete glede na kodo, ki jo že imate.

  1. Pretvorite objekt KTable s podatki o številu delniških transakcij v KStream, čemur sledi zamenjava ključa s ključem, ki označuje sektor industrije, ki ustreza temu simbolu delnice.
  2. Ustvarite objekt KTable, ki bere podatke iz teme z novicami na borzi. Ta nova KTable bo kategorizirana po industrijskih sektorjih.
  3. Povežite posodobitve novic z informacijami o številu borznih transakcij po industrijskih panogah.

Zdaj pa poglejmo, kako uresničiti ta akcijski načrt.

Pretvorite KTable v KStream

Za pretvorbo KTable v KStream morate narediti naslednje.

  1. Pokličite metodo KTable.toStream().
  2. S klicem metode KStream.map zamenjajte ključ z imenom industrije in nato pridobite objekt TransactionSummary iz primerka Windowed.

Te operacije bomo povezali na naslednji način (kodo lahko najdete v datoteki src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (izpis 5.8).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Ker izvajamo operacijo KStream.map, se vrnjeni primerek KStream samodejno ponovno particionira, ko je uporabljen v povezavi.

Končali smo postopek pretvorbe, nato pa moramo ustvariti objekt KTable za branje borznih novic.

Izdelava KTable za borzne novice

Na srečo ustvarjanje predmeta KTable zahteva samo eno vrstico kode (kodo lahko najdete v src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (izpis 5.9).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Omeniti velja, da ni treba podati nobenih objektov Serde, saj se v nastavitvah uporablja niz Serdes. Prav tako se z uporabo EARLIEST naštevanja tabela napolni z zapisi na samem začetku.

Zdaj lahko preidemo na zadnji korak - povezavo.

Povezovanje posodobitev novic s podatki o številu transakcij

Ustvarjanje povezave ni težko. Levo združevanje bomo uporabili v primeru, da za zadevno panogo ni novic o borzi (potrebno kodo najdete v datoteki src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (izpis 5.10).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Ta operator leftJoin je precej preprost. Za razliko od združevanj v 4. poglavju se metoda JoinWindow ne uporablja, ker je pri izvajanju združevanja KStream-KTable v KTable za vsak ključ samo en vnos. Takšna povezava ni časovno omejena: zapis je v KTable ali pa ga ni. Glavni zaključek: z uporabo objektov KTable lahko obogatite KStream z manj pogosto posodobljenimi referenčnimi podatki.

Zdaj si bomo ogledali učinkovitejši način obogatitve dogodkov iz KStream.

5.3.4. Objekti GlobalKTable

Kot lahko vidite, je treba tokove dogodkov obogatiti ali jim dodati kontekst. V 4. poglavju ste videli povezave med dvema objektoma KStream, v prejšnjem razdelku pa povezavo med KStream in KTable. V vseh teh primerih je treba pri preslikavi ključev v novo vrsto ali vrednost znova razdeliti tok podatkov. Včasih se ponovno particioniranje izvede izrecno, včasih pa Kafka Streams to naredi samodejno. Ponovno particioniranje je potrebno, ker so se ključi spremenili in morajo zapisi končati v novih razdelkih, sicer bo povezava nemogoča (to je bilo obravnavano v 4. poglavju, v razdelku »Ponovno particioniranje podatkov« v pododdelku 4.2.4).

Ponovna particija ima stroške

Ponovno particioniranje zahteva stroške - dodatni stroški virov za ustvarjanje vmesnih tem, shranjevanje podvojenih podatkov v drugo temo; pomeni tudi povečano zakasnitev zaradi pisanja in branja iz te teme. Poleg tega, če se morate združiti v več kot enem vidiku ali dimenziji, morate verižiti združevanja, preslikati zapise z novimi ključi in znova zagnati postopek ponovne particije.

Povezovanje z manjšimi zbirkami podatkov

V nekaterih primerih je obseg referenčnih podatkov, ki jih je treba povezati, razmeroma majhen, zato se njegove popolne kopije zlahka prilegajo lokalno na vsako vozlišče. Za takšne situacije ponuja Kafka Streams razred GlobalKTable.

Primerki GlobalKTable so edinstveni, ker aplikacija podvoji vse podatke v vsako od vozlišč. In ker so vsi podatki prisotni na vsakem vozlišču, ni potrebe po particioniranju toka dogodkov po referenčnem podatkovnem ključu, tako da je na voljo vsem particijam. Združevanja brez ključa lahko naredite tudi z uporabo objektov GlobalKTable. Vrnimo se k enemu od prejšnjih primerov, da pokažemo to funkcijo.

Povezovanje objektov KStream z objekti GlobalKTable

V podpoglavju 5.3.2 smo izvedli okensko agregacijo menjalnih poslov po kupcih. Rezultati tega združevanja so izgledali nekako takole:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Čeprav so ti rezultati služili namenu, bi bilo bolj uporabno, če bi bilo prikazano tudi ime stranke in polno ime podjetja. Če želite dodati ime stranke in ime podjetja, lahko izvedete običajna združevanja, vendar boste morali izvesti dve preslikavi ključev in ponovno particioniranje. Z GlobalKTable se lahko izognete stroškom tovrstnih operacij.

Za to bomo uporabili objekt countStream iz seznama 5.11 (ustrezno kodo lahko najdete v src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) in ga povezali z dvema objektoma GlobalKTable.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
O tem smo že razpravljali, zato ne bom ponavljal. Vendar opažam, da je koda v funkciji toStream().map zaradi berljivosti abstrahirana v funkcijski objekt namesto v vgrajeni lambda izraz.

Naslednji korak je deklaracija dveh primerkov GlobalKTable (prikazano kodo lahko najdete v datoteki src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (izpis 5.12).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"

Upoštevajte, da so imena tem opisana z uporabo oštevilčenih tipov.

Zdaj, ko imamo pripravljene vse komponente, ostane le še pisanje kode za povezavo (ki jo najdete v datoteki src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (izpis 5.13).

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Čeprav sta v tej kodi dve združitvi, sta verižni, ker se nobeden od njunih rezultatov ne uporablja ločeno. Rezultati se prikažejo na koncu celotne operacije.

Ko zaženete zgornjo operacijo združevanja, boste dobili takšne rezultate:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Bistvo se ni spremenilo, vendar so ti rezultati videti bolj jasni.

Če odštevate do 4. poglavja, ste že videli več vrst povezav v akciji. Navedeni so v tabeli. 5.2. Ta tabela odraža povezljivostne zmožnosti od različice 1.0.0 Kafka Streams; V prihodnjih izdajah se lahko kaj spremeni.

Knjiga »Kafka Streams in Action. Aplikacije in mikrostoritve za delo v realnem času"
Če zaključimo, povzamemo osnove: povežete lahko tokove dogodkov (KStream) in posodabljate tokove (KTable) z uporabo lokalnega stanja. Če velikost referenčnih podatkov ni prevelika, lahko uporabite objekt GlobalKTable. GlobalKTables podvoji vse particije v vsako vozlišče aplikacije Kafka Streams in zagotovi, da so vsi podatki na voljo ne glede na to, kateri particiji ključ ustreza.

Nato bomo videli funkcijo Kafka Streams, zahvaljujoč kateri lahko opazujemo spremembe stanja brez porabe podatkov iz teme Kafka.

5.3.5. Vprašljivo stanje

Izvedli smo že več operacij, ki vključujejo stanje, in rezultate vedno izpišemo na konzolo (za razvojne namene) ali jih zapišemo v temo (za produkcijske namene). Ko pišete rezultate v temo, morate za ogled uporabiti porabnika Kafka.

Branje podatkov iz teh tem lahko štejemo za vrsto materializiranih pogledov. Za naše namene lahko uporabimo definicijo materializiranega pogleda iz Wikipedije: »... fizični objekt baze podatkov, ki vsebuje rezultate poizvedbe. Lahko je na primer lokalna kopija oddaljenih podatkov ali podmnožica vrstic in/ali stolpcev tabele ali rezultatov združevanja ali tabela s povzetkom, pridobljena z združevanjem« (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams vam omogoča tudi izvajanje interaktivnih poizvedb v državnih trgovinah, kar vam omogoča neposredno branje teh materializiranih pogledov. Pomembno je omeniti, da je poizvedba v shrambo stanja operacija samo za branje. To zagotavlja, da vam ni treba skrbeti, da bi slučajno naredili stanje nedosledno, medtem ko vaša aplikacija obdeluje podatke.

Zmožnost neposrednega poizvedovanja po shrambah stanja je pomembna. To pomeni, da lahko ustvarite aplikacije na nadzorni plošči, ne da bi morali najprej pridobiti podatke od uporabnika Kafka. Prav tako poveča učinkovitost aplikacije, saj ni potrebe po ponovnem zapisovanju podatkov:

  • zahvaljujoč lokalnosti podatkov so lahko hitro dostopni;
  • podvajanje podatkov je odpravljeno, saj se ne zapisujejo v zunanji pomnilnik.

Glavna stvar, ki si jo želim zapomniti, je, da lahko neposredno poizvedujete o stanju znotraj svoje aplikacije. Priložnosti, ki vam jih to ponuja, ni mogoče preceniti. Namesto porabe podatkov iz Kafke in shranjevanja zapisov v zbirko podatkov za aplikacijo, lahko poizvedujete po shrambah stanja z enakim rezultatom. Neposredne poizvedbe do državnih shramb pomenijo manj kode (brez potrošnika) in manj programske opreme (ni potrebe po tabeli baze podatkov za shranjevanje rezultatov).

V tem poglavju smo pokrili kar nekaj teme, zato bomo za zdaj pustili razpravo o interaktivnih poizvedbah proti državnim trgovinam. Vendar ne skrbite: v 9. poglavju bomo ustvarili preprosto aplikacijo na nadzorni plošči z interaktivnimi poizvedbami. Uporabil bo nekaj primerov iz tega in prejšnjih poglavij za predstavitev interaktivnih poizvedb in kako jih lahko dodate v aplikacije Kafka Streams.

Povzetek

  • Objekti KStream predstavljajo tokove dogodkov, primerljive z vstavitvami v bazo podatkov. Objekti KTable predstavljajo tokove posodobitev, bolj podobne posodobitvam baze podatkov. Velikost objekta KTable se ne poveča, stari zapisi se nadomestijo z novimi.
  • Objekti KTable so potrebni za operacije združevanja.
  • Z uporabo okenskih operacij lahko razdelite združene podatke v časovne segmente.
  • Zahvaljujoč objektom GlobalKTable lahko dostopate do referenčnih podatkov kjer koli v aplikaciji, ne glede na particijo.
  • Možne so povezave med objekti KStream, KTable in GlobalKTable.

Doslej smo se osredotočali na izdelavo aplikacij Kafka Streams z uporabo KStream DSL na visoki ravni. Čeprav vam pristop na visoki ravni omogoča ustvarjanje čednih in jedrnatih programov, njegova uporaba predstavlja kompromis. Delo z DSL KStream pomeni povečanje jedrnatosti vaše kode z zmanjšanjem stopnje nadzora. V naslednjem poglavju si bomo ogledali API vozlišča obdelovalnika nizke ravni in preizkusili druge kompromise. Programi bodo daljši, kot so bili prej, vendar bomo lahko ustvarili skoraj vsako upravljalno vozlišče, ki ga bomo morda potrebovali.

→ Več podrobnosti o knjigi najdete na spletno stran založbe

→ Za Habrozhiteli 25% popust z uporabo kupona - Kafkovi tokovi

→ Ob plačilu papirne različice knjige vam po elektronski pošti pošljemo elektronsko knjigo.

Vir: www.habr.com

Dodaj komentar