Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön" Hei Khabron asukkaat! Tämä kirja sopii kaikille kehittäjille, jotka haluavat ymmärtää säikeiden käsittelyä. Hajautetun ohjelmoinnin ymmärtäminen auttaa sinua ymmärtämään paremmin Kafkaa ja Kafka Streamia. Olisi mukava tietää itse Kafka-kehys, mutta tämä ei ole välttämätöntä: kerron sinulle kaiken tarvitsemasi. Kokeneet Kafka-kehittäjät ja aloittelijat oppivat tässä kirjassa luomaan mielenkiintoisia streaminkäsittelysovelluksia Kafka Streams -kirjaston avulla. Keskitason ja edistyneet Java-kehittäjät, jotka jo tuntevat serialisoinnin kaltaiset käsitteet, oppivat soveltamaan taitojaan Kafka Streams -sovellusten luomiseen. Kirjan lähdekoodi on kirjoitettu Java 8 -kielellä ja siinä hyödynnetään merkittävästi Java 8:n lambda-lausekkeen syntaksia, joten lambda-funktioiden työskentely (jopa toisella ohjelmointikielellä) on hyödyllistä.

Ote. 5.3. Aggregointi- ja ikkunointitoiminnot

Tässä osiossa siirrymme tutkimaan Kafka Streamsin lupaavimpia osia. Tähän mennessä olemme käsitelleet seuraavat Kafka Streamsin näkökohdat:

  • prosessointitopologian luominen;
  • tilan käyttö suoratoistosovelluksissa;
  • tietovirtayhteyksien suorittaminen;
  • erot tapahtumavirtojen (KStream) ja päivitysvirtojen (KTable) välillä.

Seuraavissa esimerkeissä kokoamme kaikki nämä elementit yhteen. Opit myös ikkunoista, joka on toinen suoratoistosovellusten loistava ominaisuus. Ensimmäinen esimerkkimme on yksinkertainen aggregaatio.

5.3.1. Osakemyynnin aggregointi toimialojen mukaan

Aggregointi ja ryhmittely ovat tärkeitä työkaluja striimaustiedon käsittelyssä. Yksittäisten tietueiden tutkiminen niiden saapuessa on usein riittämätöntä. Jotta tiedoista saadaan lisätietoa, ne on ryhmiteltävä ja yhdistettävä.

Tässä esimerkissä puet päällesi päiväkauppiaan puvun, jonka on seurattava useiden toimialojen yritysten osakkeiden myyntiä. Erityisesti olet kiinnostunut viidestä yrityksestä, joilla on suurin osuus myynnistä kullakin toimialalla.

Tällainen yhdistäminen vaatii useita seuraavia vaiheita tietojen kääntämiseksi haluttuun muotoon (yleisesti sanottuna).

  1. Luo aihepohjainen lähde, joka julkaisee raakaa osakekaupan tietoja. Meidän on kartoitettava StockTransaction-tyyppinen objekti ShareVolume-tyypin objektiin. Asia on siinä, että StockTransaction-objekti sisältää myynnin metadataa, mutta tarvitsemme vain tietoja myytävien osakkeiden määrästä.
  2. Ryhmittele ShareVolume-tiedot osakesymbolin mukaan. Kun tiedot on ryhmitelty symbolien mukaan, voit tiivistää nämä tiedot varaston myyntimäärien välisummiksi. On syytä huomata, että KStream.groupBy-metodi palauttaa esiintymän, jonka tyyppi on KGroupedStream. Ja voit saada KTable-ilmentymän kutsumalla edelleen KGroupedStream.reduce-metodia.

Mikä on KGroupedStream-käyttöliittymä

KStream.groupBy- ja KStream.groupByKey-menetelmät palauttavat KGroupedStreamin esiintymän. KGroupedStream on tapahtumavirran väliesitys avainten ryhmittelyn jälkeen. Sitä ei ole ollenkaan tarkoitettu suoraan työskentelyyn sen kanssa. Sen sijaan KGroupedStreamia käytetään yhdistämistoimintoihin, jotka aina johtavat KT-taulukkoon. Ja koska aggregointitoimintojen tulos on KT-taulukko ja ne käyttävät tilavarastoa, on mahdollista, että kaikkia päivityksiä ei lähetetä pidemmälle putkilinjassa.

KTable.groupBy-metodi palauttaa samanlaisen KGroupedTablen - päivitysvirran väliesityksen avaimen mukaan ryhmiteltynä.

Pidetään pieni tauko ja katsotaan kuvaa. 5.9, joka osoittaa, mitä olemme saavuttaneet. Tämän topologian pitäisi olla jo hyvin tuttu sinulle.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Katsotaan nyt tämän topologian koodia (se löytyy tiedostosta src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Luettelo 5.2).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Annettu koodi erottuu lyhyydestään ja useilla riveillä suoritettavien toimien suuresta määrästä. Saatat huomata jotain uutta builder.stream-metodin ensimmäisessä parametrissa: enum-tyypin AutoOffsetReset.EARLIEST arvo (on myös LATEST), joka asetetaan Consumed.withOffsetResetPolicy-menetelmällä. Tätä luettelointityyppiä voidaan käyttää poikkeaman palautusstrategian määrittämiseen kullekin KStreamille tai KTablelle, ja se on ensisijainen määrityksessä olevaan offset-nollausvaihtoehtoon nähden.

GroupByKey ja GroupBy

KStream-käyttöliittymässä on kaksi tapaa tietueiden ryhmittelyyn: GroupByKey ja GroupBy. Molemmat palauttavat KGrouped-taulukon, joten saatat ihmetellä, mikä ero niillä on ja milloin kumpaa kannattaa käyttää?

GroupByKey-menetelmää käytetään, kun KStreamin avaimet eivät ole jo tyhjiä. Ja mikä tärkeintä, "vaatii uudelleenosion" -lippua ei koskaan asetettu.

GroupBy-menetelmä olettaa, että olet muuttanut ryhmittelyavaimia, joten uudelleenosiolipun arvoksi on asetettu tosi. Liitosten, aggregaatioiden jne. suorittaminen GroupBy-menetelmän jälkeen johtaa automaattiseen uudelleenosioimiseen.
Yhteenveto: Aina kun mahdollista, käytä GroupByKeyä GroupByn sijaan.

On selvää, mitä mapValues- ja groupBy-menetelmät tekevät, joten katsotaanpa sum()-metodia (löytyy hakemistosta src/main/java/bbejeck/model/ShareVolume.java) (listaus 5.3).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
ShareVolume.sum-menetelmä palauttaa varastomyynnin volyymin juoksevan summan ja koko laskentaketjun tulos on KTable-objekti . Nyt ymmärrät KTablen roolin. Kun ShareVolume-objektit saapuvat, vastaava KTable-objekti tallentaa uusimman päivityksen. On tärkeää muistaa, että kaikki päivitykset näkyvät edellisessä shareVolumeKTablessa, mutta kaikkia ei lähetetä eteenpäin.

Käytämme sitten tätä KT-taulukkoa aggregointiin (kaupattujen osakkeiden lukumäärän mukaan) saadaksemme viisi yritystä, joilla kullakin toimialalla vaihdetaan eniten osakkeita. Toimintamme tässä tapauksessa on samanlainen kuin ensimmäisessä yhdistämisessä.

  1. Suorita toinen groupBy-toiminto ryhmitelläksesi yksittäisiä ShareVolume-objekteja toimialan mukaan.
  2. Aloita ShareVolume-objektien yhteenveto. Tällä kertaa koontiobjektina on kiinteäkokoinen prioriteettijono. Tässä kiinteän kokoisessa jonossa säilyvät vain viisi eniten myytyä yhtiötä.
  3. Yhdistä edellisen kappaleen jonot merkkijonoarvoon ja palauta viisi eniten vaihdettua osaketta numeroittain toimialan mukaan.
  4. Kirjoita tulokset merkkijonomuodossa aiheeseen.

Kuvassa Kuva 5.10 näyttää datavirran topologiakaavion. Kuten näet, toinen käsittelykierros on melko yksinkertainen.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Nyt kun meillä on selkeä käsitys tämän toisen käsittelykierroksen rakenteesta, voimme siirtyä sen lähdekoodiin (löydät sen tiedostosta src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Luettelo 5.4) .

Tämä alustus sisältää FixQueue-muuttujan. Tämä on mukautettu objekti, joka on java.util.TreeSet-sovitin, jota käytetään seuraamaan N:n parasta tulosta vaihdettujen osakkeiden laskevassa järjestyksessä.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Olet jo nähnyt groupBy- ja mapValues-kutsut, joten emme käsittele niitä (kutsumme KTable.toStream-menetelmää, koska KTable.print-menetelmä on vanhentunut). Mutta et ole vielä nähnyt aggregate():n KTable-versiota, joten käytämme vähän aikaa keskustelemaan siitä.

Kuten muistat, KTablesta eroaa se, että tietueita, joissa on samat avaimet, pidetään päivityksinä. KTable korvaa vanhan merkinnän uudella. Aggregointi tapahtuu samalla tavalla: viimeisimmät samalla avaimella olevat tietueet kootaan yhteen. Kun tietue saapuu, se lisätään FixedSizePriorityQueue-luokan ilmentymään summaimella (toinen parametri koostemenetelmäkutsussa), mutta jos toinen tietue on jo olemassa samalla avaimella, vanha tietue poistetaan vähennyslaskulla (kolmas parametri yhdistetty menetelmäkutsu).

Tämä kaikki tarkoittaa, että aggregaattorimme, FixedSizePriorityQueue, ei kokoa kaikkia arvoja yhdellä avaimella, vaan tallentaa liikkuvan summan N eniten vaihdetun osaketyypin määristä. Jokainen saapuva merkintä sisältää tähän mennessä myytyjen osakkeiden kokonaismäärän. KTable antaa sinulle tietoa siitä, minkä yritysten osakkeilla tällä hetkellä käydään eniten kauppaa, ilman että jokaisen päivityksen rullaava aggregointi edellyttää.

Opimme tekemään kaksi tärkeää asiaa:

  • ryhmittele arvot KT-taulukossa yhteisellä avaimella;
  • suorittaa hyödyllisiä toimintoja, kuten kokoamisen ja yhdistämisen näille ryhmitellyille arvoille.

Näiden toimintojen suorittamisen tunteminen on tärkeää Kafka Streams -sovelluksen kautta liikkuvan tiedon merkityksen ymmärtämiseksi ja sen sisältämän tiedon ymmärtämiseksi.

Olemme myös koonneet yhteen joitakin keskeisiä käsitteitä, joita on käsitelty aiemmin tässä kirjassa. Luvussa 4 käsittelimme, kuinka tärkeää vikasietoinen paikallinen tila on suoratoistosovellukselle. Tämän luvun ensimmäinen esimerkki osoitti, miksi paikallinen osavaltio on niin tärkeä – sen avulla voit seurata, mitä tietoja olet jo nähnyt. Paikallinen pääsy välttää verkon viiveet, mikä tekee sovelluksesta tehokkaamman ja virheenkestävämmän.

Kun suoritat koonti- tai koontitoimintoa, sinun on määritettävä tilavaraston nimi. Kokoamis- ja koontioperaatiot palauttavat KTable-ilmentymän, ja KTable käyttää tilamuistia vanhojen tulosten korvaamiseen uusilla. Kuten olet nähnyt, kaikkia päivityksiä ei lähetetä putkeen, ja tämä on tärkeää, koska yhdistämistoiminnot on suunniteltu tuottamaan yhteenvetotietoja. Jos et käytä paikallista osavaltiota, KTable välittää kaikki koonti- ja koontitulokset.

Seuraavaksi tarkastellaan toimintojen, kuten yhdistämisen, suorittamista tietyn ajanjakson sisällä - niin sanottuja ikkunointitoimintoja.

5.3.2. Ikkunatoiminnot

Edellisessä osiossa esittelimme liukuvan konvoluution ja aggregoinnin. Sovellus suoritti jatkuvan osakemyynnin keruun, jota seurasi viiden eniten vaihdetun pörssin osakkeen yhdistäminen.

Joskus tällainen jatkuva tulosten aggregointi ja kokoaminen on tarpeen. Ja joskus sinun on suoritettava toimintoja vain tietyn ajan kuluessa. Laske esimerkiksi, kuinka monta vaihtotapahtumaa tietyn yrityksen osakkeilla tehtiin viimeisen 10 minuutin aikana. Tai kuinka moni käyttäjä on napsauttanut uutta mainosbanneria viimeisen 15 minuutin aikana. Sovellus voi suorittaa tällaisia ​​toimintoja useita kertoja, mutta tuloksilla, jotka koskevat vain tiettyjä ajanjaksoja (aikaikkunat).

Vaihtotapahtumien laskeminen ostajan mukaan

Seuraavassa esimerkissä seuraamme useiden kauppiaiden osaketapahtumia – joko suuria organisaatioita tai älykkäitä yksittäisiä rahoittajia.

Tälle seurannalle on kaksi mahdollista syytä. Yksi niistä on tarve tietää, mitä markkinajohtajat ostavat/myyvät. Jos nämä suuret toimijat ja kehittyneet sijoittajat näkevät mahdollisuuden, on järkevää seurata heidän strategiaansa. Toinen syy on halu havaita mahdolliset merkit laittomasta sisäpiirikaupasta. Tätä varten sinun on analysoitava suurten myyntipiikkien korrelaatio tärkeiden lehdistötiedotteiden kanssa.

Tällainen seuranta koostuu seuraavista vaiheista:

  • streamin luominen osaketransaktioiden aiheeseen lukemista varten;
  • ryhmittele saapuvat tietueet ostajan tunnuksen ja varastotunnuksen mukaan. GroupBy-metodin kutsuminen palauttaa KGroupedStream-luokan esiintymän;
  • KGroupedStream.windowedBy-metodi palauttaa aikaikkunaan rajoitetun tietovirran, mikä mahdollistaa ikkunoidun aggregoinnin. Ikkunatyypistä riippuen palautetaan joko TimeWindowedKStream tai SessionWindowedKStream;
  • tapahtumien laskeminen yhdistämisoperaatiota varten. Ikkunallinen tietovirta määrittää, otetaanko tietty tietue huomioon tässä laskennassa;
  • tulosten kirjoittaminen aiheeseen tai tulostaminen konsoliin kehityksen aikana.

Tämän sovelluksen topologia on yksinkertainen, mutta selkeä kuva siitä olisi hyödyllinen. Katsotaanpa kuvaa Fig. 5.11.

Seuraavaksi tarkastellaan ikkunatoimintojen toimivuutta ja vastaavaa koodia.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"

Ikkunatyypit

Kafka Streamsissa on kolmenlaisia ​​ikkunoita:

  • istunto;
  • "pysähdys" (pysähdys);
  • liukuminen/hyppely.

Kumpi valita, riippuu yrityksesi tarpeista. Hyppy- ja hyppyikkunat ovat ajallisesti rajoitettuja, kun taas istuntoikkunoita rajoittaa käyttäjän toiminta – istunnon (istuntojen) kesto määräytyy vain käyttäjän aktiivisuuden mukaan. Tärkeintä on muistaa, että kaikki ikkunatyypit perustuvat merkintöjen päivämäärä/aikaleimoihin, eivät järjestelmän aikaan.

Seuraavaksi toteutamme topologiamme jokaisen ikkunatyypin kanssa. Täydellinen koodi annetaan vain ensimmäisessä esimerkissä; muun tyyppisissä ikkunoissa mikään ei muutu paitsi ikkunan toiminnan tyyppi.

Session ikkunat

Istuntoikkunat eroavat suuresti kaikista muista ikkunoista. Niitä ei rajoita niinkään aika kuin käyttäjän toiminta (tai sen kokonaisuuden toiminta, jota haluat seurata). Istuntoikkunat on rajattu passiivisuusjaksojen mukaan.

Kuva 5.12 havainnollistaa istuntoikkunoiden käsitettä. Pienempi istunto sulautuu sen vasemmalla olevaan istuntoon. Ja oikealla oleva istunto on erillinen, koska se seuraa pitkän passiivisuuden. Istuntoikkunat perustuvat käyttäjän toimintaan, mutta käytä merkintöjen päivämäärä-/aikaleimoja määrittääksesi, mihin istuntoon merkintä kuuluu.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"

Istuntoikkunoiden käyttäminen osaketapahtumien seuraamiseen

Käytetään istuntoikkunoita vaihtotapahtumien tietojen tallentamiseen. Istuntoikkunoiden toteutus on esitetty listauksessa 5.5 (joka löytyy osoitteesta src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Olet jo nähnyt suurimman osan operaatioista tässä topologiassa, joten sinun ei tarvitse katsoa niitä uudelleen täällä. Mutta tässä on myös useita uusia elementtejä, joista keskustelemme nyt.

Mikä tahansa groupBy-toiminto suorittaa tyypillisesti jonkinlaisen koontioperaation (aggregointi, koonti tai laskenta). Voit suorittaa joko kumulatiivisen aggregoinnin juoksevalla kokonaissummalla tai ikkunoiden yhdistämisen, joka ottaa huomioon tietueet tietyn aikaikkunan sisällä.

Listauksen 5.5 koodi laskee tapahtumien määrän istuntoikkunoissa. Kuvassa 5.13 Näitä toimia analysoidaan vaihe vaiheelta.

Kutsumalla windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) luomme istuntoikkunan, jonka toimettomuusväli on 20 sekuntia ja pysyvyysväli 15 minuuttia. 20 sekunnin tyhjäkäyntiväli tarkoittaa, että sovellus sisällyttää kaikki merkinnät, jotka saapuvat 20 sekunnin sisällä nykyisen istunnon päättymisestä tai alkamisesta, nykyiseen (aktiiviseen) istuntoon.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Seuraavaksi määritämme, mikä kokoamistoiminto on suoritettava istuntoikkunassa - tässä tapauksessa laske. Jos saapuva merkintä jää passiivisuusikkunan ulkopuolelle (päivämäärä/aikaleiman jommallekummalle puolelle), sovellus luo uuden istunnon. Säilytysväli tarkoittaa istunnon ylläpitämistä tietyn ajan ja sallii myöhäisen datan, joka ulottuu istunnon toimettomuusjakson ulkopuolelle, mutta joka voidaan silti liittää. Lisäksi yhdistämisestä johtuvan uuden istunnon alku ja loppu vastaavat vanhinta ja viimeisintä päivämäärä/aikaleimaa.

Katsotaanpa muutamia laskentamenetelmän merkintöjä nähdäksesi kuinka istunnot toimivat (taulukko 5.1).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Kun tietueet saapuvat, etsimme olemassa olevia istuntoja samalla avaimella, jonka päättymisaika on lyhyempi kuin nykyinen päivämäärä/aikaleima – passiivisuusväli ja alkamisaika, joka on suurempi kuin nykyinen päivämäärä/aikaleima + passiivisuusväli. Kun tämä otetaan huomioon, neljä merkintää taulukosta. 5.1 yhdistetään yhdeksi istunnoksi seuraavasti.

1. Tietue 1 saapuu ensin, joten aloitusaika on yhtä suuri kuin päättymisaika ja on 00:00:00.

2. Seuraavaksi saapuu merkintä 2, ja etsimme istuntoja, jotka päättyvät aikaisintaan klo 23:59:55 ja alkavat viimeistään klo 00:00:35. Etsimme tietueen 1 ja yhdistämme istunnot 1 ja 2. Otamme istunnon 1 alkamisajan (aiemmin) ja istunnon 2 päättymisajan (myöhemmin), niin että uusi istunto alkaa klo 00:00:00 ja päättyy klo 00: 00:15.

3. Tietue 3 saapuu, etsimme istuntoja välillä 00:00:30 ja 00:01:10 emmekä löydä yhtään. Lisää toinen istunto avaimelle 123-345-654,FFBE, joka alkaa ja päättyy klo 00.

4. Levy 4 saapuu ja etsimme istuntoja klo 23:59:45 ja 00:00:25 välisenä aikana. Tällä kertaa löytyy sekä istunnot 1 että 2. Kaikki kolme istuntoa yhdistetään yhdeksi, alkamisaika on 00:00:00 ja päättymisaika 00:00:15.

Tässä osiossa kuvatun perusteella on syytä muistaa seuraavat tärkeät vivahteet:

  • istunnot eivät ole kiinteän kokoisia ikkunoita. Istunnon kesto määräytyy tietyn ajanjakson toiminnan perusteella;
  • Datassa olevat päivämäärä-/aikaleimat määrittävät, kuuluuko tapahtuma olemassa olevaan istuntoon vai lepotilaan.

Seuraavaksi keskustelemme seuraavasta ikkunatyypistä - "pysähdyksistä" ikkunoista.

"Pystyvät" ikkunat

Pyörivät ikkunat tallentavat tapahtumia, jotka osuvat tiettyyn ajanjaksoon. Kuvittele, että sinun täytyy tallentaa kaikki tietyn yrityksen osaketapahtumat 20 sekunnin välein, joten keräät kaikki tapahtumat kyseiseltä ajanjaksolta. 20 sekunnin havaintojakson lopussa ikkuna kääntyy ja siirtyy uuteen 20 sekunnin havaintoväliin. Kuva 5.14 havainnollistaa tätä tilannetta.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Kuten näet, kaikki viimeisen 20 sekunnin aikana vastaanotetut tapahtumat sisältyvät ikkunaan. Tämän ajanjakson lopussa luodaan uusi ikkuna.

Listaus 5.6 näyttää koodin, joka osoittaa pölyttyvien ikkunoiden käytön osaketapahtumien sieppaamiseen 20 sekunnin välein (löytyy hakemistosta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Tällä pienellä muutoksella TimeWindows.of-menetelmäkutsuun voit käyttää pysähdysikkunaa. Tämä esimerkki ei kutsu till()-menetelmää, joten käytetään oletusarvoista 24 tunnin säilytysväliä.

Lopuksi on aika siirtyä viimeiseen ikkunavaihtoehtoon - "hyppääviin" ikkunoihin.

Liukuvat ("hyppäävät") ikkunat

Liuku-/hyppyikkunat ovat samankaltaisia ​​kuin kallistuvat ikkunat, mutta pienellä erolla. Liukuikkunat eivät odota ajanjakson loppuun ennen kuin luodaan uusi ikkuna käsittelemään viimeaikaisia ​​tapahtumia. He aloittavat uudet laskelmat ikkunan kestoa lyhyemmän odotusajan jälkeen.

Havainnollistaaksemme poikkeavien ja hyppäävien ikkunoiden välisiä eroja, palataanpa esimerkkiin pörssitransaktioiden laskemisesta. Tavoitteenamme on edelleen laskea tapahtumien määrä, mutta emme halua odottaa koko aikaa ennen laskurin päivittämistä. Sen sijaan päivitämme laskuria lyhyemmällä aikavälillä. Esimerkiksi, laskemme edelleen tapahtumien määrän 20 sekunnin välein, mutta päivitämme laskurin 5 sekunnin välein, kuten kuvassa 5.15 näkyy. XNUMX. Tässä tapauksessa päädymme kolmeen tulosikkunaan, joissa on päällekkäisiä tietoja.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Listaus 5.7 näyttää koodin liukuikkunoiden määrittämiseen (löytyy hakemistosta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Pyörivä ikkuna voidaan muuntaa hyppiväksi ikkunaksi lisäämällä kutsu advanceBy()-menetelmään. Esitetyssä esimerkissä tallennusväli on 15 minuuttia.

Näit tässä osiossa, kuinka koontitulokset rajoitetaan aikaikkunoihin. Erityisesti haluan sinun muistavan seuraavat kolme asiaa tästä osiosta:

  • istuntoikkunoiden kokoa ei rajoita aika, vaan käyttäjän toiminta;
  • "pyörivät" ikkunat tarjoavat yleiskuvan tapahtumista tietyn ajanjakson sisällä;
  • Hyppäävien ikkunoiden kesto on kiinteä, mutta niitä päivitetään usein ja ne voivat sisältää päällekkäisiä merkintöjä kaikissa ikkunoissa.

Seuraavaksi opimme muuttamaan KT-taulukon takaisin KStreamiksi yhteyttä varten.

5.3.3. KStream- ja KTable-objektien yhdistäminen

Luvussa 4 käsittelimme kahden KStream-objektin yhdistämistä. Nyt meidän on opittava yhdistämään KTable ja KStream. Tämä voi olla tarpeen seuraavasta yksinkertaisesta syystä. KStream on tietueiden virta ja KTable on tietueiden päivitysvirta, mutta joskus saatat haluta lisätä lisäkontekstia tietuevirtaan käyttämällä KTable-päivityksiä.

Otetaan tiedot pörssitapahtumien määrästä ja yhdistetään ne asianomaisten toimialojen pörssiuutisiin. Tässä on mitä sinun on tehtävä saavuttaaksesi tämän, koska sinulla on jo olemassa oleva koodi.

  1. Muunna KTable-objekti, jossa on tietoja osaketapahtumien määrästä, KStreamiksi, minkä jälkeen avain korvataan avaimella, joka ilmaisee tätä osakesymbolia vastaavan toimialan.
  2. Luo KTable-objekti, joka lukee tietoja aiheesta pörssiuutisten kanssa. Tämä uusi KT-taulukko luokitellaan toimialan mukaan.
  3. Yhdistä uutispäivitykset tietoihin pörssitapahtumien määrästä toimialoittain.

Katsotaan nyt, kuinka tämä toimintasuunnitelma toteutetaan.

Muunna KTable KStreamiksi

Jos haluat muuntaa KTablen KStreamiksi, sinun on tehtävä seuraava.

  1. Kutsu KTable.toStream()-metodia.
  2. Kutsumalla KStream.map-metodia korvaa avain toimialan nimellä ja nouta sitten TransactionSummary-objekti ikkunoidusta ilmentymästä.

Ketjaamme nämä toiminnot yhteen seuraavasti (koodi löytyy tiedostosta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (listaus 5.8).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Koska suoritamme KStream.map-toiminnon, palautettu KStream-ilmentymä osioidaan automaattisesti uudelleen, kun sitä käytetään yhteydessä.

Olemme saaneet muunnosprosessin päätökseen, seuraavaksi meidän on luotava KTable-objekti osakeuutisten lukemista varten.

KTablen luominen osakeuutisia varten

Onneksi KTable-objektin luominen vie vain yhden rivin koodia (koodi löytyy hakemistosta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (listaus 5.9).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
On syytä huomata, että Serde-objekteja ei tarvitse määrittää, koska asetuksissa käytetään merkkijonoa Serdes. Myös EARLIEST-luetteloa käyttämällä taulukko täytetään tietueilla heti alussa.

Nyt voimme siirtyä viimeiseen vaiheeseen - yhdistämiseen.

Uutispäivitysten yhdistäminen tapahtumamäärätietoihin

Yhteyden luominen ei ole vaikeaa. Käytämme vasenta liitosta, jos kyseiselle toimialalle ei ole pörssiuutisia (tarvittava koodi löytyy tiedostosta src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (listaus 5.10).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Tämä leftJoin-operaattori on melko yksinkertainen. Toisin kuin luvun 4 liitokset, JoinWindow-menetelmää ei käytetä, koska kun KStream-KTable -liitos suoritetaan, KT-taulukossa on vain yksi merkintä kullekin avaimelle. Tällainen yhteys ei ole ajallisesti rajoitettu: tietue on joko KT-taulukossa tai poissa. Pääjohtopäätös: käyttämällä KTable-objekteja voit rikastuttaa KStreamia harvemmin päivitetyillä viitetiedoilla.

Nyt tarkastellaan tehokkaampaa tapaa rikastuttaa tapahtumia KStreamista.

5.3.4. GlobalKTable-objektit

Kuten näet, on tarpeen rikastaa tapahtumavirtoja tai lisätä niihin kontekstia. Luvussa 4 näit yhteydet kahden KStream-objektin välillä, ja edellisessä osiossa näit yhteyden KStreamin ja KTablen välillä. Kaikissa näissä tapauksissa tietovirta on osioitava uudelleen, kun avaimet kartoitetaan uuteen tyyppiin tai arvoon. Joskus uudelleenosioiminen tehdään eksplisiittisesti, ja joskus Kafka Streams tekee sen automaattisesti. Uudelleenosioiminen on tarpeen, koska avaimet ovat vaihtuneet ja tietueiden on päätyttävä uusiin osioihin, muuten yhteys on mahdoton (tätä käsiteltiin luvussa 4, kohdassa "Tietojen uudelleenosioiminen" kohdassa 4.2.4).

Uudelleen osiointi maksaa

Uudelleenosioiminen vaatii kustannuksia - lisäresurssikustannuksia väliaiheiden luomisesta, päällekkäisten tietojen tallentamisesta toiseen aiheeseen; se tarkoittaa myös lisääntynyttä latenssia tämän aiheen kirjoittamisen ja lukemisen vuoksi. Lisäksi, jos sinun on liitettävä useampi kuin yksi aspekti tai ulottuvuus, sinun on ketjutettava liitokset, kartoitettava tietueet uusilla avaimilla ja suoritettava uudelleenosioprosessi uudelleen.

Yhteyden muodostaminen pienempiin tietokokonaisuuksiin

Joissakin tapauksissa yhdistettävän vertailudatan määrä on suhteellisen pieni, joten sen täydelliset kopiot mahtuvat helposti paikallisesti jokaiseen solmuun. Tällaisia ​​tilanteita varten Kafka Streams tarjoaa GlobalKTable-luokan.

GlobalKTable-esiintymät ovat ainutlaatuisia, koska sovellus replikoi kaikki tiedot kuhunkin solmuun. Ja koska kaikki tiedot ovat läsnä jokaisessa solmussa, tapahtumavirtaa ei tarvitse osioida viitetietoavaimella niin, että se on kaikkien osioiden käytettävissä. Voit myös tehdä avaimettomia liitoksia GlobalKTable-objektien avulla. Palataan yhteen aiemmista esimerkeistä tämän ominaisuuden osoittamiseksi.

KStream-objektien yhdistäminen GlobalKTable-objekteihin

Kohdassa 5.3.2 suoritimme ostajien vaihtotransaktioiden ikkunayhdistyksen. Tämän yhdistämisen tulokset näyttivät tältä:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Vaikka nämä tulokset palvelivat tarkoitusta, olisi ollut hyödyllisempää, jos asiakkaan nimi ja koko yrityksen nimi olisi näkynyt. Voit lisätä asiakkaan nimen ja yrityksen nimen tekemällä normaalit liitokset, mutta sinun on tehtävä kaksi avainkartoitusta ja osiointi uudelleen. GlobalKTablen avulla voit välttää tällaisten toimintojen kustannukset.

Tätä varten käytämme listan 5.11 countStream-objektia (vastaava koodi löytyy osoitteesta src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) ja yhdistämme sen kahteen GlobalKTable-objektiin.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Olemme keskustelleet tästä jo aiemmin, joten en toista sitä. Huomautan kuitenkin, että toStream().map-funktion koodi on abstrahoitu luettavuuden vuoksi funktioobjektiksi rivin sisäisen lambda-lausekkeen sijaan.

Seuraava vaihe on ilmoittaa kaksi GlobalKTable-instanssia (näkyvä koodi löytyy tiedostosta src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (listaus 5.12).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"

Huomaa, että aiheiden nimet on kuvattu käyttäen lueteltuja tyyppejä.

Nyt kun meillä on kaikki komponentit valmiina, ei tarvitse kuin kirjoittaa yhteyden koodi (joka löytyy tiedostosta src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (listaus 5.13).

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Vaikka tässä koodissa on kaksi liitosta, ne on ketjutettu, koska kumpaakaan niiden tuloksista ei käytetä erikseen. Tulokset näkyvät koko toimenpiteen lopussa.

Kun suoritat yllä olevan liitosoperaation, saat seuraavanlaisia ​​tuloksia:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Olemus ei ole muuttunut, mutta nämä tulokset näyttävät selkeämmiltä.

Jos lasket alas lukuun 4, olet jo nähnyt useita yhteyksiä toiminnassa. Ne on lueteltu taulukossa. 5.2. Tämä taulukko heijastaa Kafka Streamsin versiosta 1.0.0 saatavia liitäntöjä. Jokin saattaa muuttua tulevissa julkaisuissa.

Kirja "Kafka Streams in Action. Sovellukset ja mikropalvelut reaaliaikaiseen työhön"
Lopuksi kerrotaan perusasiat: voit yhdistää tapahtumavirtoja (KStream) ja päivittää streameja (KTable) käyttämällä paikallista tilaa. Vaihtoehtoisesti, jos viitetietojen koko ei ole liian suuri, voit käyttää GlobalKTable-objektia. GlobalKTables replikoi kaikki osiot jokaiseen Kafka Streams -sovellussolmuun varmistaen, että kaikki tiedot ovat saatavilla riippumatta siitä, mitä osiota avain vastaa.

Seuraavaksi näemme Kafka Streams -ominaisuuden, jonka ansiosta voimme tarkkailla tilan muutoksia kuluttamatta dataa Kafka-aiheesta.

5.3.5. Kysyttävä tila

Olemme jo suorittaneet useita tilaan liittyviä toimintoja ja aina tulostamme tulokset konsoliin (kehitystarkoituksiin) tai kirjoittaneet ne aiheeseen (tuotantotarkoituksiin). Kun kirjoitat tuloksia aiheeseen, sinun on käytettävä Kafka-kuluttajaa niiden katseluun.

Tietojen lukemista näistä aiheista voidaan pitää eräänlaisena materialisoituneena näkemyksenä. Omissa tarkoituksissamme voimme käyttää Wikipedian materialisoidun näkymän määritelmää: "...fyysinen tietokantaobjekti, joka sisältää kyselyn tulokset. Se voi olla esimerkiksi paikallinen kopio etätiedoista tai taulukon rivien ja/tai sarakkeiden osajoukko tai liitostulokset tai yhteenvetotaulukko, joka on saatu yhdistämällä" (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams -sovelluksen avulla voit myös suorittaa interaktiivisia kyselyitä osavaltion myymälöissä, jolloin voit lukea suoraan näitä materialisoituneita näkymiä. On tärkeää huomata, että tilamuistiin tehtävä kysely on vain luku -toiminto. Tämä varmistaa, että sinun ei tarvitse huolehtia siitä, että tila muuttuu vahingossa epäjohdonmukaiseksi, kun sovelluksesi käsittelee tietoja.

Mahdollisuus tehdä suoraan kyselyitä tilavarastoista on tärkeä. Tämä tarkoittaa, että voit luoda kojelautasovelluksia ilman, että sinun tarvitsee ensin hakea tietoja Kafka-kuluttajalta. Se lisää myös sovelluksen tehokkuutta, koska tietoja ei tarvitse kirjoittaa uudelleen:

  • tietojen paikallisuuden ansiosta ne ovat nopeasti käytettävissä;
  • tietojen päällekkäisyys on eliminoitu, koska niitä ei kirjoiteta ulkoiseen tallennustilaan.

Tärkein asia, jonka haluan sinun muistavan, on, että voit kysyä tilaa suoraan sovelluksestasi. Sen tarjoamia mahdollisuuksia ei voi yliarvioida. Sen sijaan, että kuluttaisit Kafkan tietoja ja tallentaisit tietueita sovelluksen tietokantaan, voit tehdä kyselyn tilavarastoista samalla tuloksella. Suorat kyselyt tilavarastoihin tarkoittavat vähemmän koodia (ei kuluttajaa) ja vähemmän ohjelmistoja (ei tarvitse tietokantataulukkoa tulosten tallentamiseen).

Olemme käsitelleet tässä luvussa melko paljon maaperää, joten jätämme keskustelumme osavaltiokauppojen interaktiivisista kyselyistä toistaiseksi. Mutta älä huoli: luvussa 9 luomme yksinkertaisen kojelautasovelluksen interaktiivisilla kyselyillä. Se käyttää joitain esimerkkejä tästä ja edellisistä luvuista näyttääkseen interaktiivisia kyselyitä ja kuinka voit lisätä niitä Kafka Streams -sovelluksiin.

Yhteenveto

  • KStream-objektit edustavat tapahtumavirtoja, jotka ovat verrattavissa tietokantaan tehtyihin lisäyksiin. KTable-objektit edustavat päivitysvirtoja, enemmän kuin tietokannan päivityksiä. KTable-objektin koko ei kasva, vanhat tietueet korvataan uusilla.
  • KTable-objekteja tarvitaan yhdistämistoimintoihin.
  • Ikkunointitoimintojen avulla voit jakaa aggregoidut tiedot aikaryhmiin.
  • GlobalKTable-objektien ansiosta voit käyttää viitetietoja missä tahansa sovelluksessa osioista riippumatta.
  • Yhteydet KStream-, KTable- ja GlobalKTable-objektien välillä ovat mahdollisia.

Toistaiseksi olemme keskittyneet Kafka Streams -sovellusten rakentamiseen korkean tason KStream DSL:n avulla. Vaikka korkean tason lähestymistapa antaa sinun luoda siistejä ja ytimekkäitä ohjelmia, sen käyttö edustaa kompromissia. Työskentely DSL KStreamin kanssa tarkoittaa koodin tiiviyden lisäämistä vähentämällä hallinnan astetta. Seuraavassa luvussa tarkastellaan matalan tason käsittelijän solmusovellusliittymää ja kokeillaan muita kompromisseja. Ohjelmat ovat pidempiä kuin ennen, mutta voimme luoda melkein minkä tahansa käsittelijän solmun, jota voimme tarvita.

→ Lisätietoja kirjasta löytyy osoitteesta kustantajan verkkosivuilla

→ Habrozhitelille 25% alennus kupongilla - Kafka-virrat

→ Kun kirjan paperiversio on maksettu, lähetetään sähköinen kirja sähköpostitse.

Lähde: will.com

Lisää kommentti