Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks" Tere, Khabro elanikud! See raamat sobib igale arendajale, kes soovib mõista niiditöötlust. Hajutatud programmeerimise mõistmine aitab teil paremini mõista Kafkat ja Kafka voogusid. Oleks tore teada Kafka raamistikku ennast, kuid see pole vajalik: ma ütlen teile kõike, mida vajate. Kogenud Kafka arendajad ja algajad saavad sellest raamatust teada, kuidas luua huvitavaid vootöötlusrakendusi, kasutades Kafka Streamsi teeki. Kesktaseme ja edasijõudnud Java-arendajad, kes on juba tuttavad selliste kontseptsioonidega nagu serialiseerimine, õpivad rakendama oma oskusi Kafka Streamsi rakenduste loomisel. Raamatu lähtekood on kirjutatud Java 8 keeles ja kasutab märkimisväärselt Java 8 lambda avaldise süntaksit, seega tuleb kasuks teadmine, kuidas töötada lambda funktsioonidega (isegi mõnes teises programmeerimiskeeles).

Väljavõte. 5.3. Koondamis- ja aknaoperatsioonid

Selles jaotises jätkame Kafka Streamsi kõige lootustandvamate osade uurimisega. Siiani oleme käsitlenud järgmisi Kafka Streamsi aspekte:

  • töötlemistopoloogia loomine;
  • oleku kasutamine voogedastusrakendustes;
  • andmevooühenduste teostamine;
  • erinevused sündmuste voogude (KStream) ja värskendusvoogude (KTable) vahel.

Järgmistes näidetes koondame kõik need elemendid kokku. Samuti saate teada akende kasutamise kohta, mis on veel üks voogesitusrakenduste suurepärane funktsioon. Meie esimene näide on lihtne liitmine.

5.3.1. Varude müügi summeerimine tööstusharude kaupa

Koondamine ja rühmitamine on andmete voogesituse töötlemisel olulised tööriistad. Üksikute dokumentide uurimine nende kättesaamisel on sageli ebapiisav. Andmetest lisateabe eraldamiseks on vaja need rühmitada ja kombineerida.

Selles näites paned selga päevakaupleja kostüümi, kes peab jälgima mitme tööstusharu ettevõtete aktsiate müügimahtu. Täpsemalt, teid huvitavad viis ettevõtet, millel on igas tööstusharus suurim müügiosakaal.

Selline koondamine nõuab andmete soovitud vormi (üldiselt öeldes) tõlkimiseks mitut järgmist sammu.

  1. Looge teemapõhine allikas, mis avaldab toores aktsiate kauplemise teavet. Peame kaardistama StockTransaction-tüüpi objekti ShareVolume-tüüpi objektiga. Asi on selles, et StockTransactioni objekt sisaldab müügi metaandmeid, kuid meil on vaja andmeid ainult müüdavate aktsiate arvu kohta.
  2. Rühmitage ShareVolume'i andmed aktsia sümboli järgi. Pärast sümboli järgi rühmitamist saate need andmed ahendada varude müügimahtude vahesummadeks. Tasub märkida, et meetod KStream.groupBy tagastab KGroupedStream tüüpi eksemplari. Ja KTable-eksemplari saate hankida, kutsudes edasi meetodit KGroupedStream.reduce.

Mis on KGroupedStreami liides

Meetodid KStream.groupBy ja KStream.groupByKey tagastavad KGroupedStreami eksemplari. KGroupedStream on sündmuste voo vahepealne esitus pärast võtmete järgi rühmitamist. See ei ole üldse mõeldud sellega otseseks tööks. Selle asemel kasutatakse liitmistoiminguteks KGroupedStreami, mille tulemuseks on alati KTable. Ja kuna liitmistoimingute tulemuseks on KT-tabel ja nad kasutavad olekusalvet, on võimalik, et kõiki selle tulemusel tehtud värskendusi ei saadeta konveieri kaudu edasi.

Meetod KTable.groupBy tagastab sarnase KGroupedTable-i – uuenduste voo vahepealse esituse, mis on ümber rühmitatud võtme järgi.

Teeme väikese pausi ja vaatame joonist fig. 5.9, mis näitab, mida oleme saavutanud. See topoloogia peaks teile juba väga tuttav olema.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Vaatame nüüd selle topoloogia koodi (selle võib leida failist src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (loetelu 5.2).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Antud kood eristub lühiduse ja mitmes reas tehtavate toimingute suure mahu poolest. Meetodi builder.stream esimeses parameetris võite märgata midagi uut: enum-tüübi väärtust AutoOffsetReset.EARLIEST (on olemas ka LATEST), mis on määratud meetodi Consumed.withOffsetResetPolicy abil. Seda loenditüüpi saab kasutada nihke lähtestamise strateegia määramiseks igale KStreamile või KTable-le ja see on konfiguratsioonis oleva nihke lähtestamise valiku ees ülimuslik.

GroupByKey ja GroupBy

KStreami liidesel on kirjete rühmitamiseks kaks meetodit: GroupByKey ja GroupBy. Mõlemad tagastavad KGroupedTable-i, seega võib tekkida küsimus, mis vahe neil on ja millal kumba kasutada?

GroupByKey meetodit kasutatakse siis, kui KStreami võtmed pole juba tühjad. Ja mis kõige tähtsam, lippu "nõuab ümberjaotamist" ei seatud kunagi.

GroupBy meetod eeldab, et olete rühmitamisvõtmeid muutnud, seega on ümberjaotamise lipp seatud väärtusele Tõene. Ühenduste, liitmiste jms teostamine pärast GroupBy meetodit toob kaasa automaatse ümberpartitsioneerimise.
Kokkuvõte: võimalusel peaksite kasutama GroupByKey asemel GroupBy-d.

On selge, mida teevad meetodid mapValues ​​ja groupBy, nii et vaatame meetodit sum() (leitud failist src/main/java/bbejeck/model/ShareVolume.java) (loetelu 5.3).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Meetod ShareVolume.sum tagastab varude müügimahu jooksva kogusumma ja kogu arvutusahela tulemuseks on KTable objekt . Nüüd saate aru, millist rolli KTable mängib. ShareVolume'i objektide saabumisel salvestab vastav KTable objekt uusima värskenduse. Oluline on meeles pidada, et kõik värskendused kajastuvad eelmises shareVolumeKTable-s, kuid kõiki ei saadeta edasi.

Seejärel kasutame seda KT-tabelit, et koondada (kaubeldavate aktsiate arvu järgi), et jõuda viie ettevõtteni, millel on igas tööstusharus kõige rohkem kaubeldavaid aktsiaid. Meie toimingud on sel juhul sarnased esimese koondamisega.

  1. Üksikute ShareVolume'i objektide rühmitamiseks valdkonna järgi tehke teine ​​operatsioon groupBy.
  2. Alustage ShareVolume'i objektide kokkuvõtete tegemist. Seekord on koondamisobjektiks fikseeritud suurusega prioriteetne järjekord. Sellesse fikseeritud suurusega järjekorda jäävad ainult viis ettevõtet, kelle aktsiad on kõige rohkem müüdud.
  3. Kaardistage eelmise lõigu järjekorrad stringiväärtusega ja tagastage XNUMX kõige enam kaubeldavat aktsiat arvu järgi tegevusalade kaupa.
  4. Kirjutage tulemused stringi kujul teemasse.

Joonisel fig. Joonis 5.10 näitab andmevoo topoloogia graafikut. Nagu näete, on töötlemise teine ​​voor üsna lihtne.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Nüüd, kui oleme selle teise töötlemisringi struktuurist selgelt aru saanud, võime pöörduda selle lähtekoodi poole (leiate selle failist src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Loend 5.4) .

See initsialiseerija sisaldab muutujatfixQueue. See on kohandatud objekt, mis on java.util.TreeSeti adapter, mida kasutatakse N parima tulemuse jälgimiseks kaubeldavate aktsiate kahanevas järjekorras.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Olete juba näinud groupBy ja mapValues'i kõnesid, seega me nendesse ei lasku (me kutsume meetodit KTable.toStream, kuna meetod KTable.print on aegunud). Kuid te pole veel aggregate() KTable versiooni näinud, seega kulutame selle üle veidi aega.

Nagu mäletate, teeb KTable erinevaks see, et samade võtmetega kirjeid peetakse uuendusteks. KTable asendab vana kirje uuega. Agregeerimine toimub sarnaselt: viimased sama võtmega kirjed agregeeritakse. Kui kirje saabub, lisatakse see liitja abil FixedSizePriorityQueue klassi eksemplari (teine ​​parameeter liitmeetodi kutses), kuid kui sama võtmega on juba mõni muu kirje olemas, siis eemaldatakse vana kirje lahutaja abil (kolmas parameeter koondmeetodi kutse).

See kõik tähendab, et meie koondaja FixedSizePriorityQueue ei koonda kõiki väärtusi ühe võtmega, vaid salvestab liikuva summa N kõige enam kaubeldava tüüpi aktsiate kogustest. Iga sissetulev kanne sisaldab seni müüdud aktsiate koguarvu. KTable annab teile teavet selle kohta, milliste ettevõtete aktsiatega kaubeldakse praegu kõige rohkem, ilma et oleks vaja iga värskendust jooksvalt koondada.

Õppisime tegema kahte olulist asja:

  • rühmitage väärtused KT-tabelis ühise võtmega;
  • tehke nendel rühmitatud väärtustel kasulikke toiminguid, nagu koondamine ja liitmine.

Nende toimingute tegemise teadmine on oluline, et mõista Kafka Streamsi rakenduse kaudu liikuvate andmete tähendust ja mõista, millist teavet need sisaldavad.

Oleme koondanud ka mõned põhimõisted, mida selles raamatus varem käsitleti. 4. peatükis arutasime, kui oluline on tõrketaluv kohalik olek voogesitusrakenduse jaoks. Selle peatüki esimene näide näitas, miks kohalik osariik on nii oluline – see annab teile võimaluse jälgida, millist teavet olete juba näinud. Kohalik juurdepääs väldib võrgu viivitusi, muutes rakenduse tõhusamaks ja veakindlamaks.

Mis tahes koond- või koondamistoimingu tegemisel peate määrama olekusalve nime. Koondamis- ja liitmistoimingud tagastavad KTable eksemplari ja KTable kasutab olekusalvestust, et asendada vanad tulemused uutega. Nagu olete näinud, ei saadeta kõiki värskendusi protsessi käigus ja see on oluline, kuna koondamistoimingud on loodud kokkuvõtliku teabe saamiseks. Kui te kohalikku osariiki ei rakenda, edastab KTable kõik koond- ja koondtulemused.

Järgmisena vaatleme selliste toimingute sooritamist nagu liitmine teatud aja jooksul – nn aknaoperatsioonid.

5.3.2. Aknatoimingud

Eelmises osas tutvustasime libisevat konvolutsiooni ja liitmist. Rakendus teostas aktsiamüügi pideva kokkuvõtte, millele järgnes viie enimkaubeldava aktsia liitmine börsil.

Mõnikord on selline pidev tulemuste koondamine ja koondamine vajalik. Ja mõnikord peate toiminguid tegema ainult teatud aja jooksul. Näiteks arvutage, mitu vahetustehingut konkreetse ettevõtte aktsiatega tehti viimase 10 minuti jooksul. Või kui palju kasutajaid klõpsas uuel reklaambänneril viimase 15 minuti jooksul. Rakendus võib selliseid toiminguid teha mitu korda, kuid tulemustega, mis kehtivad ainult kindlaksmääratud ajaperioodidel (ajaaknad).

Vahetustehingute loendamine ostja järgi

Järgmises näites jälgime aktsiatehinguid mitme kaupleja – kas suurte organisatsioonide või nutikate üksikute rahastajate – vahel.

Sellel jälgimisel on kaks võimalikku põhjust. Üks neist on vajadus teada, mida turuliidrid ostavad/müüvad. Kui need suured tegijad ja kogenud investorid näevad võimalust, on mõistlik järgida nende strateegiat. Teiseks põhjuseks on soov märgata võimalikke märke ebaseaduslikust siseringi kauplemisest. Selleks peate analüüsima suurte müüginäitajate korrelatsiooni oluliste pressiteadetega.

Selline jälgimine koosneb järgmistest sammudest:

  • voo loomine lugemiseks aktsiatehingute teemast;
  • sissetulevate kirjete rühmitamine ostja ID ja laosümboli järgi. Meetodi groupBy kutsumine tagastab KGroupedStream klassi eksemplari;
  • Meetod KGroupedStream.windowedBy tagastab ajaaknaga piiratud andmevoo, mis võimaldab akende koondamist. Sõltuvalt akna tüübist tagastatakse kas TimeWindowedKStream või SessionWindowedKStream;
  • tehingute arv liitmistoimingu jaoks. Aknastatud andmevoog määrab, kas konkreetne kirje võetakse selle loenduse puhul arvesse;
  • tulemuste kirjutamine teemasse või nende väljastamine konsooli arenduse käigus.

Selle rakenduse topoloogia on lihtne, kuid selle selge pilt oleks abiks. Heidame pilgu joonisele fig. 5.11.

Järgmisena vaatame aknatoimingute funktsionaalsust ja vastavat koodi.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"

Akende tüübid

Kafka Streamsis on kolme tüüpi aknaid.

  • istungiline;
  • "tumblemine" (trummeldamine);
  • libisemine/hüppamine.

Milline neist valida, sõltub teie ettevõtte vajadustest. Kukkumise ja hüppamise aknad on ajaliselt piiratud, samas kui seansiaknaid piirab kasutaja tegevus – seansi(te) kestuse määrab ainult kasutaja aktiivne aktiivsus. Peamine asi, mida meeles pidada, on see, et kõik aknatüübid põhinevad kirjete kuupäeva/kellaaja templitel, mitte süsteemi kellaajal.

Järgmisena rakendame oma topoloogiat iga aknatüübiga. Täielik kood antakse ainult esimeses näites, muud tüüpi akende puhul ei muutu midagi peale akna toimimise tüübi.

Seansi aknad

Seansiaknad erinevad oluliselt kõigist teistest akende tüüpidest. Neid ei piira mitte niivõrd aeg, kuivõrd kasutaja tegevus (või selle olemi tegevus, mida soovite jälgida). Seansiaknad on piiritletud tegevusetusperioodidega.

Joonis 5.12 illustreerib seansiakende kontseptsiooni. Väiksem seanss liidetakse sellest vasakul oleva seansiga. Ja parempoolne seanss on eraldi, kuna see järgneb pikale tegevusetuse perioodile. Seansiaknad põhinevad kasutaja tegevusel, kuid kasutage kirjete kuupäeva/kellaaja templeid, et määrata, millisesse seansse kirje kuulub.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"

Seansiakende kasutamine aktsiatehingute jälgimiseks

Kasutagem seansiaknaid, et koguda teavet börsitehingute kohta. Seansiakende rakendamine on näidatud loendis 5.5 (mille leiate failist src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Olete juba näinud enamikku selle topoloogia tehteid, seega pole vaja neid siin uuesti vaadata. Kuid siin on ka mitmeid uusi elemente, mida me nüüd arutame.

Iga operatsioon groupBy teostab tavaliselt mingi liitmistoimingu (liitmine, koondamine või loendamine). Saate teostada kumulatiivse koondamise jooksva kogusummaga või akna liitmise, mis võtab arvesse kirjeid määratud ajaaknas.

Loendis 5.5 olev kood loeb seansiakendes tehtud tehingute arvu. Joonisel fig. 5.13 neid toiminguid analüüsitakse samm-sammult.

Kutsudes välja windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)), loome seansiakna tegevusetuse intervalliga 20 sekundit ja püsivusintervalliga 15 minutit. 20-sekundiline jõudeoleku intervall tähendab, et rakendus kaasab praegusesse (aktiivsesse) seansse kõik kirjed, mis saabuvad 20 sekundi jooksul pärast praeguse seansi lõppu või algust.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Järgmisena täpsustame, millist liitmistoimingut tuleb seansiaknas teha – antud juhul loenda. Kui sissetulev kirje jääb passiivsuse aknast väljapoole (kuupäeva/kellaaja templi kummalegi poolele), loob rakendus uue seansi. Säilitusintervall tähendab seansi säilitamist teatud aja jooksul ja võimaldab hilinenud andmeid, mis ulatuvad kauemaks kui seansi passiivsusperiood, kuid mida saab siiski lisada. Lisaks vastavad liitmisest tuleneva uue seansi algus ja lõpp kõige varasemale ja hilisemale kuupäeva/kellaaja templile.

Vaatame loendusmeetodi mõningaid kirjeid, et näha, kuidas seansid töötavad (tabel 5.1).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Kirjete saabumisel otsime olemasolevaid seansse sama võtmega, mille lõpuaeg on väiksem kui praegune kuupäeva/kellaaja tempel – tegevusetuse intervall ja algusaeg, mis on suurem kui praegune kuupäeva/kellaaja tempel + tegevusetuse intervall. Seda arvesse võttes neli sissekannet tabelist. 5.1 liidetakse üheks seansiks järgmiselt.

1. Kirje 1 saabub esimesena, seega algusaeg võrdub lõpuajaga ja on 00:00:00.

2. Järgmisena saabub sissekanne 2 ja me otsime seansse, mis lõpevad mitte varem kui 23:59:55 ja algavad hiljemalt 00:00:35. Leiame kirje 1 ja ühendame seansid 1 ja 2. Võtame seansi 1 algusaja (varasem) ja 2. seansi lõpuaja (hiljem), nii et meie uus seanss algab kell 00:00:00 ja lõpeb kell 00: 00:15.

3. Saabub rekord 3, otsime seansse vahemikus 00:00:30 kuni 00:01:10 ja ei leia ühtegi. Lisage võtme 123-345-654,FFBE teine ​​seanss, mis algab ja lõpeb kell 00:00:50.

4. Saabub rekord 4 ja me otsime seansse ajavahemikus 23:59:45-00:00:25. Seekord leiti mõlemad seansid 1 ja 2. Kõik kolm seanssi on ühendatud üheks, algusaeg on 00:00:00 ja lõpuaeg 00:00:15.

Selles jaotises kirjeldatu põhjal tasub meeles pidada järgmisi olulisi nüansse:

  • seansid ei ole kindla suurusega aknad. Seansi kestuse määrab tegevus etteantud ajavahemikus;
  • Andmetes olevad kuupäeva/kellaaja templid määravad, kas sündmus langeb olemasolevasse seansi või jõudeperioodi.

Järgmisena käsitleme järgmist tüüpi aknaid - "kukkuvaid" aknaid.

"Lupuvad" aknad

Kukkuvad aknad jäädvustavad sündmusi, mis jäävad teatud ajavahemikku. Kujutage ette, et peate jäädvustama kõik teatud ettevõtte aktsiatehingud iga 20 sekundi järel, nii et kogute kõik sündmused selle aja jooksul. 20-sekundilise intervalli lõpus rullub aken ümber ja liigub uuele 20-sekundilisele vaatlusintervallile. Joonis 5.14 illustreerib seda olukorda.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Nagu näete, on aknas kaasatud kõik viimase 20 sekundi jooksul vastu võetud sündmused. Selle perioodi lõpus luuakse uus aken.

Loendis 5.6 on näidatud kood, mis demonstreerib muutuvate akende kasutamist aktsiatehingute jäädvustamiseks iga 20 sekundi järel (leitud failist src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Selle väikese muudatusega TimeWindows.of meetodikutses saate kasutada akent. See näide ei kutsu meetodit till(), seega kasutatakse vaikimisi 24-tunnist säilitusintervalli.

Lõpuks on aeg liikuda edasi viimase aknavaliku juurde – akende "hüppamine".

Lükandaknad ("hüppavad").

Lükandaknad on sarnased kukkumisakendele, kuid väikese erinevusega. Lükandaknad ei oota enne hiljutiste sündmuste töötlemiseks uue akna loomist ajaintervalli lõpuni. Nad alustavad uusi arvutusi pärast ooteintervalli, mis on väiksem kui akna kestus.

Et illustreerida kukkuvate ja hüppavate akende erinevusi, pöördume tagasi börsitehingute loendamise näite juurde. Meie eesmärk on endiselt tehingute arv loendada, kuid me ei taha oodata loenduri värskendamist tervet aega. Selle asemel uuendame loendurit lühemate ajavahemike järel. Näiteks loendame endiselt tehingute arvu iga 20 sekundi järel, kuid värskendame loendurit iga 5 sekundi järel, nagu on näidatud joonisel fig. 5.15. Sel juhul saame tulemuseks kolm kattuvate andmetega akent.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Loend 5.7 näitab libisevate akende määratlemise koodi (leitud failist src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Pöördakna saab teisendada hüppavaks aknaks, lisades meetodile advanceBy() kutse. Näidatud näites on salvestamise intervall 15 minutit.

Selles jaotises nägite, kuidas piirata koondamise tulemusi ajaakendega. Eelkõige tahan, et jätaksite sellest jaotisest meelde järgmised kolm asja:

  • seansiakende suurust ei piira mitte ajaperiood, vaid kasutaja tegevus;
  • "Kummutavad" aknad annavad ülevaate sündmustest teatud aja jooksul;
  • Akende hüppamise kestus on fikseeritud, kuid neid uuendatakse sageli ja need võivad sisaldada kattuvaid kirjeid kõigis akendes.

Järgmisena õpime, kuidas teisendada KTabel ühenduse loomiseks tagasi KStreamiks.

5.3.3. KStreami ja KTable objektide ühendamine

Peatükis 4 käsitlesime kahe KStreami objekti ühendamist. Nüüd peame õppima, kuidas ühendada KTable ja KStream. Seda võib vaja minna järgmisel lihtsal põhjusel. KStream on kirjete voog ja KTable on kirjete värskenduste voog, kuid mõnikord võib tekkida soov lisada kirjevoole täiendavat konteksti, kasutades KTable-i värskendusi.

Võtame börsitehingute arvu andmed ja kombineerime need vastavate majandusharude börsiuudistega. Siin on, mida peate selle saavutamiseks tegema, võttes arvesse juba olemasolevat koodi.

  1. Teisendage KTable objekt aktsiatehingute arvu andmetega KStreamiks, millele järgneb võtme asendamine võtmega, mis näitab sellele aktsiasümbolile vastavat sektorit.
  2. Loo KTable objekt, mis loeb andmeid börsiuudistega teemast. See uus KT-tabel liigitatakse tööstussektori järgi.
  3. Ühendage uudiste värskendused teabega börsitehingute arvu kohta tööstusharude kaupa.

Nüüd vaatame, kuidas seda tegevuskava ellu viia.

Teisendage KTable KSStreamiks

KTable teisendamiseks KStreamiks peate tegema järgmist.

  1. Kutsuge meetod KTable.toStream() välja.
  2. Meetodi KStream.map kutsumisel asendage võti valdkonna nimega ja tooge seejärel aknast eksemplari TransactionSummary objekt.

Me aheldame need toimingud kokku järgmiselt (koodi leiate failist src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Loend 5.8).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Kuna me teostame toimingut KStream.map, jagatakse tagastatud KStreami eksemplar automaatselt ümber, kui seda ühenduses kasutatakse.

Oleme teisendusprotsessi lõpetanud, järgmiseks tuleb luua börsiuudiste lugemiseks objekt KTable.

KTable loomine aktsiauudiste jaoks

Õnneks võtab KTable objekti loomine vaid ühe koodirea (koodi leiate kaustast src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (loetelu 5.9).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Väärib märkimist, et Serde objekte ei pea määrama, kuna seadetes kasutatakse stringi Serdes. Samuti täidetakse tabel kohe alguses kirjetega, kasutades VARASEMAT loendust.

Nüüd saame liikuda viimase etapi juurde - ühendamine.

Uudiste värskenduste ühendamine tehingute arvu andmetega

Ühenduse loomine pole keeruline. Kasutame vasakpoolset liitumist juhuks, kui vastava valdkonna aktsiauudiseid pole (vajaliku koodi leiate failist src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Loend 5.10).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
See leftJoini operaator on üsna lihtne. Erinevalt 4. peatükis toodud liitumistest ei kasutata JoinWindow meetodit, sest KStream-KTable ühendamisel on iga võtme kohta KT-tabelis ainult üks kirje. Selline ühendus ei ole ajaliselt piiratud: kirje on kas KT-tabelis või puudub. Peamine järeldus: KTable objektide abil saab KStreami rikastada harvemini uuendatavate viiteandmetega.

Nüüd vaatame tõhusamat viisi KStreami sündmuste rikastamiseks.

5.3.4. GlobalKTable objektid

Nagu näete, on vaja sündmuste vooge rikastada või neile konteksti lisada. Peatükis 4 nägid seoseid kahe KStreami objekti vahel ning eelmises osas KStreami ja KTable'i vahelist seost. Kõigil neil juhtudel on võtmete uue tüübi või väärtusega vastendamisel vaja andmevoo uuesti partitsioonideks jagada. Mõnikord tehakse ümberjaotamine selgesõnaliselt ja mõnikord teeb Kafka Streams seda automaatselt. Ümberpartitsioon on vajalik, kuna võtmed on muutunud ja kirjed peavad jõudma uutesse jaotistesse, vastasel juhul on ühendus võimatu (sellest oli juttu 4. peatükis, alajaotises 4.2.4 jaotises “Andmete ümberpartitsioonid”).

Ümberjaotamine on kulukas

Ümberpartitsioneerimine nõuab kulusid - täiendavad ressursikulud vahepealsete teemade loomiseks, dubleerivate andmete hoidmiseks teises teemas; see tähendab ka suurenenud latentsust, mis on tingitud sellest teemast kirjutamisest ja lugemisest. Lisaks, kui peate ühendama rohkem kui ühe aspekti või dimensiooni, peate ühendused aheldama, kaardistama kirjed uute võtmetega ja käivitama uuesti partitsioonide jagamise.

Ühenduse loomine väiksemate andmekogumitega

Mõnel juhul on ühendatavate võrdlusandmete maht suhteliselt väike, nii et nende täielikud koopiad mahuvad hõlpsasti igasse sõlme. Selliste olukordade jaoks pakub Kafka Streams klassi GlobalKTable.

GlobalKTable eksemplarid on ainulaadsed, kuna rakendus kopeerib kõik andmed igasse sõlme. Ja kuna kõik andmed on olemas igas sõlmes, pole vaja sündmuste voogu viiteandmete võtmega sektsioonideks jagada, et see oleks kõigile partitsioonidele kättesaadav. Võtmeta liite saab teha ka GlobalKTable objektide abil. Selle funktsiooni demonstreerimiseks pöördume tagasi ühe eelneva näite juurde.

KStreami objektide ühendamine GlobalKTable objektidega

Alapunktis 5.3.2 teostasime ostjate kaupa vahetustehingute akende liitmise. Selle liitmise tulemused nägid välja umbes sellised:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Kuigi need tulemused täitsid eesmärki, oleks olnud kasulikum, kui kuvatud oleks ka kliendi nimi ja ettevõtte täisnimi. Kliendinime ja ettevõtte nime lisamiseks saate teha tavalisi liitmisi, kuid peate tegema kaks võtme vastendamist ja uuesti partitsioonide määramist. GlobalKTable abil saate selliste toimingute kulusid vältida.

Selleks kasutame loendist 5.11 pärit objekti countStream (vastava koodi leiate failist src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) ja ühendame selle kahe GlobalKTable objektiga.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Oleme seda juba varem arutanud, nii et ma ei korda seda. Kuid märgin, et funktsiooni toStream(.map) kood on loetavuse huvides abstraheeritud funktsiooniobjektiks, mitte sisemise lambda-avaldise asemel.

Järgmine samm on deklareerida kaks GlobalKTable eksemplari (näidatud koodi leiate failist src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (loetelu 5.12).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"

Pange tähele, et teemade nimede kirjeldamisel kasutatakse loendatavaid tüüpe.

Nüüd, kui meil on kõik komponendid valmis, ei jää muud üle kui kirjutada ühenduse kood (selle leiab failist src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (loetelu 5.13).

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Kuigi selles koodis on kaks liitumist, on need aheldatud, kuna kumbagi nende tulemustest ei kasutata eraldi. Tulemused kuvatakse kogu toimingu lõpus.

Kui käivitate ülaltoodud liitumistoimingu, saate järgmised tulemused:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Põhiolemus pole muutunud, kuid need tulemused näevad selgemad.

Kui loendate 4. peatükini, olete juba näinud mitut tüüpi ühendusi töös. Need on loetletud tabelis. 5.2. See tabel kajastab ühenduvusvõimalusi alates Kafka Streamsi versioonist 1.0.0; Midagi võib tulevastes väljaannetes muutuda.

Raamatu „Kafka ojad tegevuses. Rakendused ja mikroteenused reaalajas töötamiseks"
Asjade kokkuvõtteks võtame kokku põhitõed: saate ühendada sündmuste voogusid (KStream) ja värskendada vooge (KTable), kasutades kohalikku olekut. Teise võimalusena, kui võrdlusandmete suurus pole liiga suur, võite kasutada objekti GlobalKTable. GlobalKTables kopeerib kõik partitsioonid igasse Kafka Streamsi rakenduse sõlme, tagades, et kõik andmed on saadaval olenemata sellest, millisele partitsioonile võti vastab.

Järgmisena näeme funktsiooni Kafka Streams, tänu millele saame jälgida olekumuutusi ilma Kafka teema andmeid tarbimata.

5.3.5. Küsitav olek

Oleme juba teinud mitmeid olekuga seotud toiminguid ja väljastame tulemused alati konsooli (arenduse eesmärgil) või kirjutame need teemasse (tootmise eesmärgil). Kui kirjutate teemasse tulemusi, peate nende vaatamiseks kasutama Kafka tarbijat.

Nende teemade andmete lugemist võib pidada materialiseeritud vaadete tüübiks. Oma eesmärkidel saame kasutada materialiseeritud vaate definitsiooni Wikipediast: „...päringu tulemusi sisaldav füüsiline andmebaasiobjekt. Näiteks võib see olla kaugandmete kohalik koopia või tabeli ridade ja/või veergude alamhulk või liitetulemused või koondtabel, mis on saadud koondamise teel” (https://en.wikipedia.org/wiki /Materialiseeritud_vaade).

Kafka Streams võimaldab teil käivitada interaktiivseid päringuid osariigi kauplustes, võimaldades teil neid materialiseeritud vaateid otse lugeda. Oluline on märkida, et päring riigipoele on kirjutuskaitstud toiming. See tagab, et te ei pea muretsema rakenduse andmete töötlemise ajal kogemata oleku ebaühtlaseks muutmise pärast.

Oluline on olekupoodide otsepäringute tegemine. See tähendab, et saate luua armatuurlauarakendusi ilma, et peaksite esmalt Kafka tarbijalt andmeid hankima. See suurendab ka rakenduse tõhusust, kuna pole vaja andmeid uuesti kirjutada:

  • tänu andmete asukohale on neile kiire juurdepääs;
  • andmete dubleerimine on välistatud, kuna neid ei kirjutata välismällu.

Peamine asi, mida ma tahan meeles pidada, on see, et saate oleku päringuid otse oma rakendusest teha. Võimalusi, mida see teile pakub, ei saa üle hinnata. Selle asemel, et tarbida Kafka andmeid ja salvestada kirjeid rakenduse jaoks andmebaasi, saate sama tulemusega päringuid teha olekusalvedest. Otsesed päringud olekupoodidele tähendavad vähem koodi (tarbijat pole) ja tarkvara (tulemuste salvestamiseks pole vaja andmebaasitabelit).

Oleme selles peatükis käsitlenud üsna palju, nii et jätame praegu osariigi kaupluste vastu suunatud interaktiivsete päringute arutelu. Kuid ärge muretsege: 9. peatükis loome lihtsa interaktiivsete päringutega armatuurlauarakenduse. See kasutab mõningaid selle ja eelmiste peatükkide näiteid, et näidata interaktiivseid päringuid ja seda, kuidas saate neid Kafka Streamsi rakendustesse lisada.

Kokkuvõte

  • KStreami objektid esindavad sündmuste vooge, mis on võrreldavad andmebaasi lisadega. KTable objektid esindavad värskendusvooge, pigem nagu andmebaasi värskendusi. KTable objekti suurus ei kasva, vanad rekordid asenduvad uutega.
  • Koondamistoimingute jaoks on vajalikud KTable objektid.
  • Akende toiminguid kasutades saate koondatud andmed jagada ajasalvedeks.
  • Tänu GlobalKTable objektidele pääsete viiteandmetele juurde kõikjal rakenduses, olenemata partitsioonidest.
  • Võimalikud on ühendused KStreami, KTable ja GlobalKTable objektide vahel.

Seni oleme keskendunud Kafka Streamsi rakenduste loomisele kõrgetasemelise KStream DSL-i abil. Kuigi kõrgetasemeline lähenemine võimaldab luua korralikke ja sisutihedaid programme, kujutab selle kasutamine endast kompromissi. DSL KStreamiga töötamine tähendab koodi lakoonilisuse suurendamist, vähendades juhtimisastet. Järgmises peatükis vaatleme madala taseme töötleja sõlme API-t ja proovime teisi kompromisse. Programmid on varasemast pikemad, kuid saame luua peaaegu igasuguse käitlejasõlme, mida vajame.

→ Lisateavet raamatu kohta leiate aadressilt kirjastaja veebisait

→ Habrozhiteli jaoks 25% allahindlus kupongi abil - Kafka ojad

→ Raamatu paberversiooni eest tasumisel saadetakse e-postiga elektrooniline raamat.

Allikas: www.habr.com

Lisa kommentaar