Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale" Përshëndetje, banorë të Khabro! Ky libër është i përshtatshëm për çdo zhvillues që dëshiron të kuptojë përpunimin e fijeve. Kuptimi i programimit të shpërndarë do t'ju ndihmojë të kuptoni më mirë Kafka dhe Kafka Streams. Do të ishte mirë të njihje vetë kornizën e Kafkës, por kjo nuk është e nevojshme: Unë do t'ju tregoj gjithçka që ju nevojitet. Zhvilluesit me përvojë dhe fillestarët e Kafkës do të mësojnë se si të krijojnë aplikacione interesante të përpunimit të transmetimit duke përdorur bibliotekën Kafka Streams në këtë libër. Zhvilluesit e ndërmjetëm dhe të avancuar të Java-s tashmë të njohur me koncepte si serializimi do të mësojnë të zbatojnë aftësitë e tyre për të krijuar aplikacione Kafka Streams. Kodi burimor i librit është shkruar në Java 8 dhe përdor sintaksën e shprehjes lambda të Java 8, kështu që të dish se si të punosh me funksionet lambda (madje edhe në një gjuhë tjetër programimi) do të jetë e dobishme.

Fragment. 5.3. Operacionet e grumbullimit dhe dritareve

Në këtë seksion, ne do të vazhdojmë të eksplorojmë pjesët më premtuese të Rrjedhat e Kafkës. Deri më tani kemi mbuluar aspektet e mëposhtme të Rrjedhat e Kafkës:

  • krijimi i një topologjie përpunimi;
  • përdorimi i gjendjes në aplikacionet e transmetimit;
  • kryerja e lidhjeve të rrjedhës së të dhënave;
  • dallimet midis transmetimeve të ngjarjeve (KStream) dhe transmetimeve të përditësimit (KTable).

Në shembujt e mëposhtëm do t'i bashkojmë të gjithë këta elementë. Do të mësoni gjithashtu rreth dritares, një tjetër veçori e shkëlqyer e aplikacioneve të transmetimit. Shembulli ynë i parë do të jetë një grumbullim i thjeshtë.

5.3.1. Grumbullimi i shitjeve të aksioneve sipas sektorit të industrisë

Mbledhja dhe grupimi janë mjete jetike kur punoni me të dhënat e transmetimit. Shqyrtimi i të dhënave individuale pasi ato merren shpesh është i pamjaftueshëm. Për të nxjerrë informacion shtesë nga të dhënat, është e nevojshme të grupohen dhe kombinohen ato.

Në këtë shembull, ju do të vishni kostumin e një tregtari ditor që duhet të gjurmojë vëllimin e shitjeve të aksioneve të kompanive në disa industri. Konkretisht, ju jeni të interesuar për pesë kompanitë me shitjet më të mëdha të aksioneve në çdo industri.

Një grumbullim i tillë do të kërkojë disa hapa të mëposhtëm për të përkthyer të dhënat në formën e dëshiruar (duke folur në terma të përgjithshëm).

  1. Krijoni një burim të bazuar në temë që publikon informacione të papërpunuara për tregtimin e aksioneve. Ne do të duhet të hartojmë një objekt të tipit StockTransaction në një objekt të tipit ShareVolume. Çështja është se objekti StockTransaction përmban metadata të shitjeve, por na duhen vetëm të dhëna për numrin e aksioneve që shiten.
  2. Gruponi të dhënat e vëllimit të ndarë sipas simbolit të aksioneve. Pasi të grupohen sipas simbolit, mund t'i ndani këto të dhëna në nëntotalet e vëllimeve të shitjeve të aksioneve. Vlen të përmendet se metoda KStream.groupBy kthen një shembull të llojit KGroupedStream. Dhe mund të merrni një shembull KTable duke thirrur më tej metodën KGroupedStream.reduce.

Çfarë është ndërfaqja KGroupedStream

Metodat KStream.groupBy dhe KStream.groupByKey kthejnë një shembull të KGroupedStream. KGroupedStream është një paraqitje e ndërmjetme e një rrjedhe ngjarjesh pas grupimit sipas çelësave. Nuk është aspak i destinuar për punë të drejtpërdrejtë me të. Në vend të kësaj, KGroupedStream përdoret për operacionet e grumbullimit, të cilat gjithmonë rezultojnë në një KTable. Dhe meqenëse rezultati i operacioneve të grumbullimit është një KTable dhe ata përdorin një dyqan shtetëror, është e mundur që jo të gjitha përditësimet si rezultat të dërgohen më tej në tubacion.

Metoda KTable.groupBy kthen një KGroupedTable të ngjashme - një paraqitje e ndërmjetme e rrjedhës së përditësimeve, të rigrupuar sipas çelësit.

Le të bëjmë një pushim të shkurtër dhe të shohim Fig. 5.9, që tregon se çfarë kemi arritur. Kjo topologji tashmë duhet të jetë shumë e njohur për ju.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Le të shohim tani kodin për këtë topologji (ai mund të gjendet në skedarin src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lista 5.2).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Kodi i dhënë dallohet nga shkurtësia dhe vëllimi i madh i veprimeve të kryera në disa rreshta. Mund të vëreni diçka të re në parametrin e parë të metodës builder.stream: një vlerë e tipit enum AutoOffsetReset.EARLIEST (ekziston edhe një LATEST), e vendosur duke përdorur metodën Consumed.withOffsetResetPolicy. Ky lloj numërimi mund të përdoret për të specifikuar një strategji të rivendosjes së kompensimit për çdo KStream ose KTable dhe ka përparësi ndaj opsionit të rivendosjes së kompensimit nga konfigurimi.

GroupByKey dhe GroupBy

Ndërfaqja KStream ka dy metoda për grupimin e të dhënave: GroupByKey dhe GroupBy. Të dy kthejnë një KGroupedTable, kështu që mund të pyesni se cili është ndryshimi midis tyre dhe kur të përdorni cilin?

Metoda GroupByKey përdoret kur çelësat në KStream janë tashmë jo bosh. Dhe më e rëndësishmja, flamuri "kërkon ri-ndarje" nuk u vendos kurrë.

Metoda GroupBy supozon se ju keni ndryshuar çelësat e grupimit, kështu që flamuri i rindarjes është vendosur në true. Kryerja e bashkimeve, grumbullimeve, etj. pas metodës GroupBy do të rezultojë në ri-ndarje automatike.
Përmbledhje: Kur është e mundur, duhet të përdorni GroupByKey në vend të GroupBy.

Është e qartë se çfarë bëjnë metodat mapValues ​​dhe groupBy, kështu që le të hedhim një vështrim në metodën sum() (që gjendet në src/main/java/bbejeck/model/ShareVolume.java) (Lista 5.3).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Metoda ShareVolume.sum kthen totalin aktual të vëllimit të shitjeve të aksioneve dhe rezultati i të gjithë zinxhirit të llogaritjeve është një objekt KTable . Tani e kuptoni rolin që luan KTable. Kur mbërrijnë objektet ShareVolume, objekti përkatës KTable ruan përditësimin më të fundit aktual. Është e rëndësishme të mbani mend se të gjitha përditësimet janë pasqyruar në shareVolumeKTable e mëparshme, por jo të gjitha dërgohen më tej.

Më pas, duke përdorur këtë KTable, ne grumbullojmë (sipas numrit të aksioneve të tregtuara) për të arritur në pesë kompanitë me vëllimet më të larta të aksioneve të tregtuara në secilën industri. Veprimet tona në këtë rast do të jenë të ngjashme me ato të grumbullimit të parë.

  1. Kryeni një operacion tjetër groupBy për të grupuar objekte individuale ShareVolume sipas industrisë.
  2. Filloni të përmbledhni objektet ShareVolume. Këtë herë objekti i grumbullimit është një radhë prioritare me madhësi fikse. Në këtë radhë me madhësi fikse, mbahen vetëm pesë kompanitë me sasinë më të madhe të aksioneve të shitura.
  3. Hartoni radhët nga paragrafi i mëparshëm në një vlerë vargu dhe ktheni pesë aksionet kryesore më të tregtuara sipas numrit sipas industrisë.
  4. Shkruani rezultatet në formë vargu në temë.

Në Fig. Figura 5.10 tregon grafikun e topologjisë së rrjedhës së të dhënave. Siç mund ta shihni, raundi i dytë i përpunimit është mjaft i thjeshtë.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Tani që kemi një kuptim të qartë të strukturës së këtij raundi të dytë të përpunimit, mund t'i drejtohemi kodit të tij burimor (do ta gjeni në skedarin src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lista 5.4) .

Ky inicializues përmban një variabël fixedQueue. Ky është një objekt i personalizuar që është një përshtatës për java.util.TreeSet që përdoret për të gjurmuar rezultatet kryesore N në rend zbritës të aksioneve të tregtuara.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Ju keni parë tashmë thirrjet groupBy dhe mapValues, kështu që ne nuk do të hyjmë në ato (po e quajmë metodën KTable.toStream sepse metoda KTable.print është e vjetëruar). Por ju nuk e keni parë ende versionin KTable të aggregate(), kështu që ne do të kalojmë pak kohë duke e diskutuar atë.

Siç e mbani mend, ajo që e bën KTable të ndryshme është se të dhënat me të njëjtat çelësa konsiderohen si përditësime. KTable zëvendëson hyrjen e vjetër me një të re. Grumbullimi ndodh në një mënyrë të ngjashme: të dhënat më të fundit me të njëjtin çelës grumbullohen. Kur arrin një rekord, ai shtohet në instancën e klasës FixedSizePriorityQueue duke përdorur një grumbullues (parametri i dytë në thirrjen e metodës agregate), por nëse një tjetër rekord tashmë ekziston me të njëjtin çelës, atëherë rekordi i vjetër hiqet duke përdorur një zbritës (parametri i tretë në thirrja e metodës agregate).

E gjithë kjo do të thotë që grumbulluesi ynë, FixedSizePriorityQueue, nuk i grumbullon të gjitha vlerat me një çelës, por ruan një shumë lëvizëse të sasive të N llojeve më të tregtuara të stoqeve. Çdo hyrje hyrëse përmban numrin total të aksioneve të shitura deri më tani. KTable do t'ju japë informacion se cilat aksione të kompanive janë aktualisht më të tregtueshme, pa kërkuar grumbullim të vazhdueshëm të çdo përditësimi.

Mësuam të bëjmë dy gjëra të rëndësishme:

  • gruponi vlerat në KTable me një çelës të përbashkët;
  • kryejnë operacione të dobishme si grumbullimi dhe grumbullimi në këto vlera të grupuara.

Njohja se si të kryhen këto operacione është e rëndësishme për të kuptuar kuptimin e të dhënave që lëvizin përmes një aplikacioni Kafka Streams dhe për të kuptuar se çfarë informacioni mbart.

Ne kemi bashkuar gjithashtu disa nga konceptet kryesore të diskutuara më parë në këtë libër. Në kapitullin 4, ne diskutuam se sa i rëndësishëm është gjendja lokale e tolerancës ndaj gabimeve për një aplikacion transmetimi. Shembulli i parë në këtë kapitull tregoi pse shteti lokal është kaq i rëndësishëm—ju jep mundësinë të mbani gjurmët e informacionit që keni parë tashmë. Qasja lokale shmang vonesat në rrjet, duke e bërë aplikacionin më performues dhe rezistent ndaj gabimeve.

Kur kryeni ndonjë operacion grumbullimi ose grumbullimi, duhet të specifikoni emrin e dyqanit shtetëror. Operacionet e grumbullimit dhe grumbullimit kthejnë një shembull KTable dhe KTable përdor ruajtjen e gjendjes për të zëvendësuar rezultatet e vjetra me të reja. Siç e keni parë, jo të gjitha përditësimet dërgohen në linjë dhe kjo është e rëndësishme sepse operacionet e grumbullimit janë krijuar për të prodhuar informacion përmbledhës. Nëse nuk aplikoni gjendjen lokale, KTable do të përcjellë të gjitha rezultatet e grumbullimit dhe grumbullimit.

Më pas, ne do të shikojmë kryerjen e operacioneve të tilla si grumbullimi brenda një periudhe të caktuar kohore - të ashtuquajturat operacione të dritareve.

5.3.2. Operacionet e dritareve

Në seksionin e mëparshëm, ne prezantuam konvolucionin dhe grumbullimin rrëshqitës. Aplikacioni realizoi një përmbledhje të vazhdueshme të vëllimit të shitjeve të aksioneve, e ndjekur nga agregimi i pesë aksioneve më të tregtuara në bursë.

Ndonjëherë një grumbullim dhe përmbledhje e tillë e vazhdueshme e rezultateve është e nevojshme. Dhe ndonjëherë ju duhet të kryeni operacione vetëm për një periudhë të caktuar kohe. Për shembull, llogaritni sa transaksione shkëmbimi janë bërë me aksionet e një kompanie të caktuar në 10 minutat e fundit. Ose sa përdorues klikuan në një baner të ri reklamues në 15 minutat e fundit. Një aplikacion mund të kryejë operacione të tilla disa herë, por me rezultate që zbatohen vetëm për periudha të caktuara kohore (dritaret kohore).

Numërimi i transaksioneve të këmbimit sipas blerësit

Në shembullin tjetër, ne do të gjurmojmë transaksionet e aksioneve nëpër tregtarë të shumtë—ose organizata të mëdha ose financues të zgjuar individualë.

Ka dy arsye të mundshme për këtë gjurmim. Një prej tyre është nevoja për të ditur se çfarë janë duke blerë/shitur liderët e tregut. Nëse këta lojtarë të mëdhenj dhe investitorë të sofistikuar shohin mundësi, ka kuptim të ndjekin strategjinë e tyre. Arsyeja e dytë është dëshira për të dalluar ndonjë shenjë të mundshme të tregtimit të brendshëm të paligjshëm. Për ta bërë këtë, do t'ju duhet të analizoni lidhjen e rritjeve të mëdha të shitjeve me njoftimet e rëndësishme për shtyp.

Një gjurmim i tillë përbëhet nga hapat e mëposhtëm:

  • krijimi i një transmetimi për lexim nga tema e transaksioneve të aksioneve;
  • grupimi i regjistrimeve hyrëse sipas ID-së së blerësit dhe simbolit të aksioneve. Thirrja e metodës groupBy kthen një shembull të klasës KGroupedStream;
  • Metoda KGroupedStream.windowedBy kthen një rrjedhë të dhënash të kufizuar në një dritare kohore, e cila lejon grumbullimin e dritareve. Në varësi të llojit të dritares, kthehet ose një TimeWindowedKStream ose një SessionWindowedKStream;
  • numërimin e transaksioneve për operacionin e grumbullimit. Rrjedha e të dhënave me dritare përcakton nëse një regjistrim i veçantë merret parasysh në këtë numërim;
  • shkrimi i rezultateve në një temë ose nxjerrja e tyre në tastierë gjatë zhvillimit.

Topologjia e këtij aplikacioni është e thjeshtë, por një pamje e qartë e tij do të ishte e dobishme. Le të hedhim një vështrim në Fig. 5.11.

Më pas, do të shikojmë funksionalitetin e operacioneve të dritares dhe kodin përkatës.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"

Llojet e dritareve

Ekzistojnë tre lloje të dritareve në Kafka Streams:

  • sesionale;
  • “tumbling” (tumbling);
  • rrëshqitje/hopping.

Cilin të zgjidhni varet nga kërkesat e biznesit tuaj. Dritaret e rrëshqitjes dhe kërcimit janë të kufizuara në kohë, ndërsa dritaret e sesioneve janë të kufizuara nga aktiviteti i përdoruesit - kohëzgjatja e sesionit(eve) përcaktohet vetëm nga sa aktiv është përdoruesi. Gjëja kryesore për t'u mbajtur mend është se të gjitha llojet e dritareve bazohen në vulat e datës/kohës së hyrjeve, jo në kohën e sistemit.

Më pas, ne implementojmë topologjinë tonë me secilin nga llojet e dritareve. Kodi i plotë do të jepet vetëm në shembullin e parë; për llojet e tjera të dritareve asgjë nuk do të ndryshojë përveç llojit të funksionimit të dritares.

Dritaret e sesioneve

Dritaret e sesioneve janë shumë të ndryshme nga të gjitha llojet e tjera të dritareve. Ato janë të kufizuara jo aq nga koha sa nga aktiviteti i përdoruesit (ose aktiviteti i entitetit që dëshironi të gjurmoni). Dritaret e sesioneve kufizohen nga periudhat e pasivitetit.

Figura 5.12 ilustron konceptin e dritareve të sesionit. Sesioni më i vogël do të bashkohet me seancën në të majtë të tij. Dhe seanca në të djathtë do të jetë e ndarë sepse pason një periudhë të gjatë pasiviteti. Dritaret e sesioneve bazohen në aktivitetin e përdoruesit, por përdorin vulat e datës/kohës nga hyrjet për të përcaktuar se cilës sesion i përket hyrja.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"

Përdorimi i dritareve të sesioneve për të gjurmuar transaksionet e aksioneve

Le të përdorim dritaret e sesioneve për të kapur informacione rreth transaksioneve të shkëmbimit. Zbatimi i dritareve të sesionit tregohet në Listimin 5.5 (i cili mund të gjendet në src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Ju keni parë tashmë shumicën e operacioneve në këtë topologji, kështu që nuk ka nevojë t'i shikoni ato përsëri këtu. Por këtu ka edhe disa elementë të rinj, të cilët do t'i diskutojmë tani.

Çdo operacion groupBy zakonisht kryen një lloj operacioni grumbullimi (agregim, grumbullim ose numërim). Ju mund të kryeni ose grumbullimin kumulativ me një total të ekzekutuar, ose grumbullimin e dritareve, i cili merr parasysh të dhënat brenda një dritareje kohore të caktuar.

Kodi në Listimin 5.5 numëron numrin e transaksioneve brenda dritareve të sesioneve. Në Fig. 5.13 këto veprime janë analizuar hap pas hapi.

Duke thirrur windowedBy(SessionWindows.with(twentySeconds).until(pesëmbëdhjetëMinutes)) krijojmë një dritare sesioni me një interval pasiviteti prej 20 sekondash dhe një interval qëndrueshmërie prej 15 minutash. Një interval i papunë prej 20 sekondash do të thotë që aplikacioni do të përfshijë çdo hyrje që arrin brenda 20 sekondave nga përfundimi ose fillimi i sesionit aktual në sesionin aktual (aktiv).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Më pas, ne specifikojmë se cili operacion grumbullimi duhet të kryhet në dritaren e sesionit - në këtë rast, numëroni. Nëse një hyrje hyrëse bie jashtë dritares së pasivitetit (secila anë e vulës së datës/kohës), aplikacioni krijon një sesion të ri. Intervali i mbajtjes nënkupton mbajtjen e një sesioni për një kohë të caktuar dhe lejon të dhëna të vonuara që shtrihen përtej periudhës së pasivitetit të seancës, por që mund të bashkëngjiten ende. Për më tepër, fillimi dhe mbarimi i sesionit të ri që rezulton nga bashkimi korrespondojnë me vulën më të hershme dhe më të fundit të datës/kohës.

Le të shohim disa hyrje nga metoda e numërimit për të parë se si funksionojnë sesionet (Tabela 5.1).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Kur mbërrijnë regjistrimet, ne kërkojmë sesionet ekzistuese me të njëjtin çelës, një kohë përfundimi më të vogël se data/koha aktuale - intervali i pasivitetit dhe një kohë fillimi më e madhe se data/koha aktuale + intervali i pasivitetit. Duke marrë parasysh këtë, katër hyrje nga tabela. 5.1 shkrihen në një seancë të vetme si më poshtë.

1. Regjistri 1 mbërrin i pari, kështu që ora e fillimit është e barabartë me kohën e përfundimit dhe është 00:00:00.

2. Më pas, mbërrin hyrja 2 dhe ne kërkojmë sesione që përfundojnë jo më herët se ora 23:59:55 dhe fillojnë jo më vonë se ora 00:00:35. Ne gjejmë regjistrimin 1 dhe kombinojmë sesionet 1 dhe 2. Marrim orën e fillimit të sesionit 1 (më herët) dhe kohën e përfundimit të sesionit 2 (më vonë), në mënyrë që sesioni ynë i ri të fillojë në orën 00:00:00 dhe të përfundojë në orën 00: 00:15.

3. Regjistri 3 mbërrin, ne kërkojmë seanca midis orës 00:00:30 dhe 00:01:10 dhe nuk gjejmë asnjë. Shtoni një seancë të dytë për çelësin 123-345-654, FFBE, duke filluar dhe duke përfunduar në 00:00:50.

4. Regjistri 4 mbërrin dhe ne jemi në kërkim të seancave ndërmjet orës 23:59:45 dhe 00:00:25. Këtë herë gjenden të dy sesionet 1 dhe 2. Të tre seancat janë të kombinuara në një, me një orar fillimi 00:00:00 dhe një orar mbarimi 00:00:15.

Nga ajo që përshkruhet në këtë pjesë, ia vlen të mbani mend nuancat e mëposhtme të rëndësishme:

  • seancat nuk janë dritare me madhësi fikse. Kohëzgjatja e një seance përcaktohet nga aktiviteti brenda një periudhe të caktuar kohore;
  • Vulat e datës/kohës në të dhëna përcaktojnë nëse ngjarja bie brenda një sesioni ekzistues ose gjatë një periudhe të papunë.

Më tej do të diskutojmë llojin tjetër të dritares - dritaret "rrëzuese".

Dritare "rrëmbyese".

Dritaret e rrëzuara kapin ngjarje që bien brenda një periudhe të caktuar kohe. Imagjinoni që ju duhet të kapni të gjitha transaksionet e aksioneve të një kompanie të caktuar çdo 20 sekonda, në mënyrë që të grumbulloni të gjitha ngjarjet gjatë asaj periudhe kohore. Në fund të intervalit prej 20 sekondash, dritarja rrotullohet dhe kalon në një interval të ri vëzhgimi prej 20 sekondash. Figura 5.14 ilustron këtë situatë.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Siç mund ta shihni, të gjitha ngjarjet e marra në 20 sekondat e fundit përfshihen në dritare. Në fund të kësaj periudhe kohore, krijohet një dritare e re.

Listimi 5.6 tregon kodin që demonstron përdorimin e dritareve në rënie për të kapur transaksionet e aksioneve çdo 20 sekonda (gjendet në src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Me këtë ndryshim të vogël në thirrjen e metodës TimeWindows.of, mund të përdorni një dritare rrëshqitëse. Ky shembull nuk thërret metodën deri(), kështu që do të përdoret intervali i paracaktuar i mbajtjes prej 24 orësh.

Më në fund, është koha për të kaluar në opsionin e fundit të dritares - dritaret "hopping".

Dritare rrëshqitëse ("kërcuese").

Dritaret rrëshqitëse/rrëshqitëse janë të ngjashme me dritaret që bien, por me një ndryshim të vogël. Dritaret rrëshqitëse nuk presin deri në fund të intervalit kohor përpara se të krijoni një dritare të re për të përpunuar ngjarjet e fundit. Ata fillojnë llogaritjet e reja pas një intervali pritjeje më pak se kohëzgjatja e dritares.

Për të ilustruar dallimet midis dritareve me rënie dhe kërcim, le të kthehemi te shembulli i numërimit të transaksioneve në bursë. Qëllimi ynë është ende të numërojmë numrin e transaksioneve, por nuk duam të presim të gjithë kohën përpara se të përditësojmë sportelin. Në vend të kësaj, ne do ta përditësojmë numëruesin në intervale më të shkurtra. Për shembull, ne do të numërojmë përsëri numrin e transaksioneve çdo 20 sekonda, por përditësojmë numëruesin çdo 5 sekonda, siç tregohet në Fig. 5.15. Në këtë rast, ne përfundojmë me tre dritare rezultatesh me të dhëna të mbivendosura.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Lista 5.7 tregon kodin për përcaktimin e dritareve rrëshqitëse (gjendet në src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Një dritare rrëshqitëse mund të konvertohet në një dritare kërcyese duke shtuar një thirrje në metodën advanceBy(). Në shembullin e treguar, intervali i kursimit është 15 minuta.

Ju patë në këtë seksion se si të kufizoni rezultatet e grumbullimit në dritaret kohore. Në veçanti, dua që ju të mbani mend tre gjërat e mëposhtme nga ky seksion:

  • madhësia e dritareve të sesioneve nuk kufizohet nga periudha kohore, por nga aktiviteti i përdoruesit;
  • Dritaret "rrëmbyese" ofrojnë një pasqyrë të ngjarjeve brenda një periudhe të caktuar kohore;
  • Kohëzgjatja e dritareve kërcyese është fikse, por ato përditësohen shpesh dhe mund të përmbajnë hyrje të mbivendosura në të gjitha dritaret.

Më pas, ne do të mësojmë se si të kthejmë një KTable përsëri në një KStream për një lidhje.

5.3.3. Lidhja e objekteve KStream dhe KTable

Në kapitullin 4, ne diskutuam lidhjen e dy objekteve KStream. Tani duhet të mësojmë se si të lidhim KTable dhe KStream. Kjo mund të jetë e nevojshme për arsyen e mëposhtme të thjeshtë. KStream është një rrymë regjistrimesh dhe KTable është një rrjedhë përditësimesh rekordesh, por ndonjëherë mund të dëshironi të shtoni kontekst shtesë në transmetimin e regjistrimeve duke përdorur përditësime nga KTable.

Le të marrim të dhëna për numrin e transaksioneve në bursë dhe t'i kombinojmë ato me lajmet e bursës për industritë përkatëse. Ja çfarë duhet të bëni për ta arritur këtë duke pasur parasysh kodin që keni tashmë.

  1. Konvertoni një objekt KTable me të dhëna për numrin e transaksioneve të aksioneve në një KStream, pasuar nga zëvendësimi i çelësit me çelësin që tregon sektorin e industrisë që korrespondon me këtë simbol të aksioneve.
  2. Krijo një objekt KTable që lexon të dhëna nga një temë me lajmet e bursës. Kjo KTable e re do të kategorizohet sipas sektorit të industrisë.
  3. Lidhni përditësimet e lajmeve me informacionin mbi numrin e transaksioneve në bursë sipas sektorit të industrisë.

Tani le të shohim se si ta zbatojmë këtë plan veprimi.

Konvertoni KTable në KStream

Për të kthyer KTable në KStream, duhet të bëni sa më poshtë.

  1. Thirrni metodën KTable.toStream().
  2. Duke thirrur metodën KStream.map, zëvendësoni çelësin me emrin e industrisë dhe më pas merrni objektin TransactionSummary nga shembulli Windowed.

Ne do t'i lidhim këto operacione së bashku si më poshtë (kodi mund të gjendet në skedarin src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lista 5.8).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Për shkak se ne po kryejmë një operacion KStream.map, shembulli i kthyer i KStream ndahet automatikisht kur përdoret në një lidhje.

Kemi përfunduar procesin e konvertimit, më pas duhet të krijojmë një objekt KTable për leximin e lajmeve të aksioneve.

Krijimi i KTable për lajmet e aksioneve

Për fat të mirë, krijimi i një objekti KTable kërkon vetëm një rresht kodi (kodi mund të gjendet në src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lista 5.9).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Vlen të përmendet se asnjë objekt Serde nuk kërkohet të specifikohet, pasi në cilësimet përdoren Serdet e vargut. Gjithashtu, duke përdorur numërimin më të hershëm, tabela mbushet me regjistrime që në fillim.

Tani mund të kalojmë në hapin e fundit - lidhjen.

Lidhja e përditësimeve të lajmeve me të dhënat e numërimit të transaksioneve

Krijimi i një lidhjeje nuk është i vështirë. Ne do të përdorim një bashkim majtas në rast se nuk ka lajme të aksioneve për industrinë përkatëse (kodi i nevojshëm mund të gjendet në skedarin src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lista 5.10).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Ky operator leftJoin është mjaft i thjeshtë. Ndryshe nga bashkimet në Kapitullin 4, metoda JoinWindow nuk përdoret sepse kur kryeni një bashkim KStream-KTable, ka vetëm një hyrje në KTable për çdo çelës. Një lidhje e tillë nuk është e kufizuar në kohë: regjistrimi është ose në KTable ose mungon. Përfundimi kryesor: duke përdorur objektet e KTable ju mund të pasuroni KStream me të dhëna referimi të përditësuara më rrallë.

Tani do të shikojmë një mënyrë më efikase për të pasuruar ngjarjet nga KStream.

5.3.4. Objektet GlobalKTable

Siç mund ta shihni, ekziston nevoja për të pasuruar transmetimet e ngjarjeve ose për të shtuar kontekst në to. Në kapitullin 4 keni parë lidhjet midis dy objekteve KStream dhe në pjesën e mëparshme keni parë lidhjen midis një KStream dhe një KTable. Në të gjitha këto raste, është e nevojshme të ri-ndarja e rrjedhës së të dhënave kur harton çelësat në një lloj ose vlerë të re. Ndonjëherë rindarja bëhet në mënyrë eksplicite, dhe ndonjëherë Kafka Streams e bën atë automatikisht. Rindarja është e nevojshme sepse çelësat kanë ndryshuar dhe të dhënat duhet të përfundojnë në seksione të reja, përndryshe lidhja do të jetë e pamundur (kjo u diskutua në kapitullin 4, në seksionin "Ri-ndarja e të dhënave" në nënseksionin 4.2.4).

Rindarja ka një kosto

Ri-ndarja kërkon kosto - kosto burimesh shtesë për krijimin e temave të ndërmjetme, ruajtjen e të dhënave të dyfishta në një temë tjetër; do të thotë gjithashtu rritje e vonesës për shkak të shkrimit dhe leximit nga kjo temë. Për më tepër, nëse keni nevojë të bashkoheni në më shumë se një aspekt ose dimension, duhet t'i lidhni lidhjet, t'i hartoni regjistrimet me çelësa të rinj dhe të ekzekutoni përsëri procesin e ri-ndarjes.

Lidhja me grupe të dhënash më të vogla

Në disa raste, vëllimi i të dhënave të referencës për t'u lidhur është relativisht i vogël, kështu që kopjet e plota të tyre mund të përshtaten lehtësisht në nivel lokal në secilën nyje. Për situata si kjo, Kafka Streams ofron klasën GlobalKTable.

Instancat e GlobalKTable janë unike sepse aplikacioni përsërit të gjitha të dhënat në secilën prej nyjeve. Dhe meqenëse të gjitha të dhënat janë të pranishme në secilën nyje, nuk ka nevojë të ndahet transmetimi i ngjarjes sipas çelësit të të dhënave referencë në mënyrë që të jetë i disponueshëm për të gjitha ndarjet. Ju gjithashtu mund të bëni lidhje pa çelës duke përdorur objektet GlobalKTable. Le të kthehemi te një nga shembujt e mëparshëm për të demonstruar këtë veçori.

Lidhja e objekteve KStream me objektet GlobalKTable

Në nënseksionin 5.3.2, ne kryem agregimin e dritareve të transaksioneve të këmbimit nga blerësit. Rezultatet e këtij grumbullimi dukeshin diçka si kjo:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Ndërsa këto rezultate i shërbenin qëllimit, do të kishte qenë më e dobishme nëse emri i klientit dhe emri i plotë i kompanisë do të ishin shfaqur gjithashtu. Për të shtuar emrin e klientit dhe emrin e kompanisë, mund të bëni bashkime normale, por do t'ju duhet të bëni dy harta kryesore dhe ri-ndarje. Me GlobalKTable ju mund të shmangni koston e operacioneve të tilla.

Për ta bërë këtë, ne do të përdorim objektin countStream nga Lista 5.11 (kodi përkatës mund të gjendet në src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) dhe do ta lidhim atë me dy objekte GlobalKTable.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Ne e kemi diskutuar këtë më parë, kështu që nuk do ta përsëris. Por unë vërej se kodi në funksionin toStream().map është abstraguar në një objekt funksioni në vend të një shprehjeje lambda inline për hir të lexueshmërisë.

Hapi tjetër është deklarimi i dy instancave të GlobalKTable (kodi i treguar mund të gjendet në skedarin src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lista 5.12).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"

Ju lutemi vini re se emrat e temave përshkruhen duke përdorur lloje të numëruara.

Tani që i kemi gati të gjithë komponentët, mbetet vetëm të shkruajmë kodin për lidhjen (i cili mund të gjendet në skedarin src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lista 5.13).

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Edhe pse ka dy bashkime në këtë kod, ato janë të lidhura me zinxhir sepse asnjë nga rezultatet e tyre nuk përdoret veçmas. Rezultatet shfaqen në fund të gjithë operacionit.

Kur ekzekutoni operacionin e mësipërm të bashkimit, do të merrni rezultate si kjo:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Thelbi nuk ka ndryshuar, por këto rezultate duken më të qarta.

Nëse numëroni mbrapsht deri në Kapitullin 4, tashmë keni parë disa lloje lidhjesh në veprim. Ato janë renditur në tabelë. 5.2. Kjo tabelë pasqyron aftësitë e lidhjes që nga versioni 1.0.0 i Kafka Streams; Diçka mund të ndryshojë në publikimet e ardhshme.

Libri “Rrjedhat e Kafkës në Veprim. Aplikacione dhe mikroshërbime për punë në kohë reale"
Për t'i përfunduar gjërat, le të përmbledhim bazat: mund të lidhni transmetimet e ngjarjeve (KStream) dhe të përditësoni transmetimet (KTable) duke përdorur gjendjen lokale. Përndryshe, nëse madhësia e të dhënave të referencës nuk është shumë e madhe, mund të përdorni objektin GlobalKTable. GlobalKTables kopjojnë të gjitha ndarjet në çdo nyje aplikimi të Kafka Streams, duke siguruar që të gjitha të dhënat të jenë të disponueshme pavarësisht se cilës ndarje korrespondon çelësi.

Më pas do të shohim funksionin Kafka Streams, falë të cilit mund të vëzhgojmë ndryshimet e gjendjes pa konsumuar të dhëna nga një temë Kafka.

5.3.5. Gjendje e diskutueshme

Ne kemi kryer tashmë disa operacione që përfshijnë gjendjen dhe gjithmonë nxjerrim rezultatet në tastierë (për qëllime zhvillimi) ose i shkruajmë ato në një temë (për qëllime prodhimi). Kur shkruani rezultate në një temë, duhet të përdorni një konsumator Kafka për t'i parë ato.

Leximi i të dhënave nga këto tema mund të konsiderohet si një lloj pikëpamjesh të materializuara. Për qëllimet tona, ne mund të përdorim përkufizimin e një pamjeje të materializuar nga Wikipedia: “...një objekt fizik i bazës së të dhënave që përmban rezultatet e një pyetjeje. Për shembull, mund të jetë një kopje lokale e të dhënave në distancë, ose një nëngrup rreshtash dhe/ose kolonash të një tabele ose rezultate të bashkimit, ose një tabelë përmbledhëse e marrë përmes grumbullimit” (https://en.wikipedia.org/wiki /Pamja e_materializuar).

Kafka Streams ju lejon gjithashtu të ekzekutoni pyetje interaktive në dyqanet shtetërore, duke ju lejuar të lexoni drejtpërdrejt këto pamje të materializuara. Është e rëndësishme të theksohet se pyetja në dyqanin shtetëror është një operacion vetëm për lexim. Kjo siguron që ju të mos shqetësoheni për mospërputhjen aksidentale të gjendjes ndërsa aplikacioni juaj po përpunon të dhënat.

Aftësia për të kërkuar drejtpërdrejt dyqanet e shtetit është e rëndësishme. Kjo do të thotë që ju mund të krijoni aplikacione në panel pa pasur nevojë të merrni më parë të dhëna nga konsumatori Kafka. Ai gjithashtu rrit efikasitetin e aplikacionit, për faktin se nuk ka nevojë të shkruani përsëri të dhëna:

  • falë lokalitetit të të dhënave, ato mund të aksesohen shpejt;
  • Dyfishimi i të dhënave eliminohet, pasi nuk është shkruar në ruajtjen e jashtme.

Gjëja kryesore që dua të mbani mend është se ju mund të kërkoni drejtpërdrejt gjendjen nga brenda aplikacionit tuaj. Mundësitë që ju jep kjo nuk mund të mbivlerësohen. Në vend që të konsumoni të dhëna nga Kafka dhe të ruani të dhënat në një bazë të dhënash për aplikacionin, ju mund të kërkoni dyqanet shtetërore me të njëjtin rezultat. Pyetjet e drejtpërdrejta në dyqanet shtetërore nënkuptojnë më pak kod (pa konsumator) dhe më pak softuer (nuk ka nevojë për një tabelë të dhënash për të ruajtur rezultatet).

Ne kemi mbuluar mjaft terren në këtë kapitull, kështu që do ta lëmë diskutimin tonë për pyetjet interaktive kundër dyqaneve shtetërore për momentin. Por mos u shqetësoni: në Kapitullin 9, ne do të krijojmë një aplikacion të thjeshtë të panelit me pyetje interaktive. Ai do të përdorë disa nga shembujt nga ky dhe kapitujt e mëparshëm për të demonstruar pyetje interaktive dhe si mund t'i shtoni ato në aplikacionet Kafka Streams.

Përmbledhje

  • Objektet KStream përfaqësojnë rrjedha ngjarjesh, të krahasueshme me futjet në një bazë të dhënash. Objektet e KTable përfaqësojnë rrjedhat e përditësimit, më shumë si përditësime në një bazë të dhënash. Madhësia e objektit KTable nuk rritet, regjistrimet e vjetra zëvendësohen me të reja.
  • Objektet e KTable kërkohen për operacionet e grumbullimit.
  • Duke përdorur operacionet e dritares, mund të ndani të dhënat e grumbulluara në kova kohore.
  • Falë objekteve GlobalKTable, ju mund të përdorni të dhënat e referencës kudo në aplikacion, pavarësisht nga ndarja.
  • Lidhjet ndërmjet objekteve KStream, KTable dhe GlobalKTable janë të mundshme.

Deri më tani, ne jemi fokusuar në ndërtimin e aplikacioneve të Kafka Streams duke përdorur KStream DSL të nivelit të lartë. Megjithëse qasja e nivelit të lartë ju lejon të krijoni programe të rregullta dhe koncize, përdorimi i tij përfaqëson një kompromis. Të punosh me DSL KStream nënkupton rritjen e koncizitetit të kodit duke ulur shkallën e kontrollit. Në kapitullin tjetër, ne do të shikojmë API-në e nyjes së mbajtësit të nivelit të ulët dhe do të provojmë kompromise të tjera. Programet do të jenë më të gjata se sa ishin më parë, por ne do të jemi në gjendje të krijojmë pothuajse çdo nyje mbajtëse që mund të na nevojitet.

→ Më shumë detaje rreth librit mund të gjenden në faqen e internetit të botuesit

→ Për Habrozhiteli 25% zbritje duke përdorur kupon - Rrjedhat e Kafkës

→ Pas pagesës për versionin në letër të librit, një libër elektronik do të dërgohet me e-mail.

Burimi: www.habr.com

Shto një koment