Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real" Helo, drigolion Khabro! Mae'r llyfr hwn yn addas ar gyfer unrhyw ddatblygwr sydd am ddeall prosesu edau. Bydd deall rhaglennu dosbarthedig yn eich helpu i ddeall Kafka a Kafka Streams yn well. Byddai'n braf gwybod fframwaith Kafka ei hun, ond nid yw hyn yn angenrheidiol: byddaf yn dweud wrthych bopeth sydd ei angen arnoch. Bydd datblygwyr profiadol Kafka a dechreuwyr fel ei gilydd yn dysgu sut i greu cymwysiadau prosesu ffrydiau diddorol gan ddefnyddio llyfrgell Kafka Streams yn y llyfr hwn. Bydd datblygwyr Java canolradd ac uwch sydd eisoes yn gyfarwydd â chysyniadau fel cyfresoli yn dysgu cymhwyso eu sgiliau i greu cymwysiadau Kafka Streams. Mae cod ffynhonnell y llyfr wedi'i ysgrifennu yn Java 8 ac mae'n gwneud defnydd sylweddol o gystrawen mynegiant lambda Java 8, felly bydd gwybod sut i weithio gyda swyddogaethau lambda (hyd yn oed mewn iaith raglennu arall) yn dod yn ddefnyddiol.

Dyfyniad. 5.3. Gweithrediadau agregu a ffenestri

Yn yr adran hon, byddwn yn symud ymlaen i archwilio rhannau mwyaf addawol Nentydd Kafka. Hyd yn hyn rydym wedi ymdrin â'r agweddau canlynol ar Kafka Streams:

  • creu topoleg prosesu;
  • defnyddio cyflwr mewn cymwysiadau ffrydio;
  • perfformio cysylltiadau ffrwd data;
  • gwahaniaethau rhwng ffrydiau digwyddiad (KSstream) a ffrydiau diweddaru (KTable).

Yn yr enghreifftiau canlynol byddwn yn dod â'r holl elfennau hyn at ei gilydd. Byddwch hefyd yn dysgu am ffenestru, nodwedd wych arall o gymwysiadau ffrydio. Ein hesiampl gyntaf fydd cydgasgliad syml.

5.3.1. Cydgasglu gwerthiannau stoc fesul sector diwydiant

Mae cydgasglu a grwpio yn arfau hanfodol wrth weithio gyda ffrydio data. Mae archwilio cofnodion unigol wrth iddynt ddod i law yn aml yn annigonol. Er mwyn tynnu gwybodaeth ychwanegol o ddata, mae angen eu grwpio a'u cyfuno.

Yn yr enghraifft hon, byddwch chi'n gwisgo gwisg masnachwr dydd sydd angen olrhain cyfaint gwerthiant stociau cwmnïau mewn sawl diwydiant. Yn benodol, mae gennych ddiddordeb yn y pum cwmni sydd â'r gwerthiant cyfranddaliadau mwyaf ym mhob diwydiant.

Bydd agregiad o'r fath yn gofyn am y sawl cam canlynol i drosi'r data i'r ffurf a ddymunir (yn siarad yn gyffredinol).

  1. Creu ffynhonnell sy'n seiliedig ar bynciau sy'n cyhoeddi gwybodaeth masnachu stoc amrwd. Bydd yn rhaid i ni fapio gwrthrych o fath StockTransaction i wrthrych o fath ShareVolume. Y pwynt yw bod y gwrthrych StockTransaction yn cynnwys metadata gwerthiant, ond dim ond data am nifer y cyfranddaliadau sy'n cael eu gwerthu sydd ei angen arnom.
  2. Grwpio data ShareVolume yn ôl symbol stoc. Ar ôl eu grwpio yn ôl symbol, gallwch chi gwympo'r data hwn yn is-gyfansymiau o gyfeintiau gwerthiant stoc. Mae'n werth nodi bod y dull KStream.groupBy yn dychwelyd enghraifft o fath KGroupedStream. A gallwch gael enghraifft KTable trwy ffonio ymhellach y dull KGroupedStream.reduce.

Beth yw'r rhyngwyneb KGroupedStream

Mae'r dulliau KStream.groupBy a KStream.groupByKey yn dychwelyd enghraifft o KGroupedStream. Mae KGroupedStream yn gynrychiolaeth ganolraddol o ffrwd o ddigwyddiadau ar ôl grwpio yn ôl allweddi. Nid yw wedi'i fwriadu o gwbl ar gyfer gwaith uniongyrchol ag ef. Yn lle hynny, defnyddir KGroupedStream ar gyfer gweithrediadau agregu, sydd bob amser yn arwain at KTable. A chan mai canlyniad gweithrediadau agregu yw KTable a'u bod yn defnyddio storfa wladwriaethol, mae'n bosibl na fydd yr holl ddiweddariadau o ganlyniad yn cael eu hanfon ymhellach i lawr y biblinell.

Mae'r dull KTable.groupBy yn dychwelyd KGroupedTable tebyg - cynrychioliad canolradd o'r llif o ddiweddariadau, wedi'i ail-grwpio gan allwedd.

Gadewch i ni gymryd seibiant byr ac edrych ar Ffig. 5.9, sy'n dangos yr hyn yr ydym wedi'i gyflawni. Dylai'r dopoleg hon fod yn gyfarwydd iawn i chi eisoes.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Edrychwn yn awr ar y cod ar gyfer y topoleg hon (mae i'w gael yn y ffeil src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Rhestr 5.2).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Mae'r cod a roddir yn cael ei wahaniaethu gan ei grynodeb a'r nifer fawr o gamau gweithredu a gyflawnir mewn sawl llinell. Efallai y byddwch yn sylwi ar rywbeth newydd ym mharamedr cyntaf y dull builder.stream: gwerth o'r math enum AutoOffsetReset.EARLIEST (mae DIWEDDARAF hefyd), wedi'i osod gan ddefnyddio'r dull Consumed.withOffsetResetPolicy. Gellir defnyddio'r math hwn o gyfrifiad i bennu strategaeth ailosod gwrthbwyso ar gyfer pob KSstream neu KTable ac mae'n cael blaenoriaeth dros yr opsiwn ailosod gwrthbwyso o'r ffurfweddiad.

GroupByKey a GroupBy

Mae gan ryngwyneb KStream ddau ddull ar gyfer grwpio cofnodion: GroupByKey a GroupBy. Mae'r ddau yn dychwelyd KGroupedTable, felly efallai eich bod yn pendroni beth yw'r gwahaniaeth rhyngddynt a phryd i ddefnyddio pa un?

Defnyddir y dull GroupByKey pan nad yw'r bysellau yn y ffrwd KS yn wag yn barod. Ac yn bwysicaf oll, ni osodwyd y faner “angen ail-rannu”.

Mae'r dull GroupBy yn cymryd yn ganiataol eich bod wedi newid y bysellau grwpio, felly mae'r faner ailrannu wedi'i gosod yn wir. Bydd uniadau perfformio, agregau, ac ati ar ôl y dull GroupBy yn arwain at ail-rannu'n awtomatig.
Crynodeb: Lle bynnag y bo modd, dylech ddefnyddio GroupByKey yn hytrach na GroupBy.

Mae'n amlwg beth mae'r dulliau mapValues ​​a groupBy yn ei wneud, felly gadewch i ni edrych ar y dull sum() (a geir yn src/main/java/bbejeck/model/ShareVolume.java) (Rhestr 5.3).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Mae'r dull ShareVolume.sum yn dychwelyd cyfanswm rhedegol y cyfaint gwerthiant stoc, ac mae canlyniad y gadwyn gyfan o gyfrifiadau yn wrthrych KTable . Nawr rydych chi'n deall y rôl mae KTable yn ei chwarae. Pan fydd gwrthrychau ShareVolume yn cyrraedd, mae'r gwrthrych KTable cyfatebol yn storio'r diweddariad cyfredol diweddaraf. Mae'n bwysig cofio bod yr holl ddiweddariadau yn cael eu hadlewyrchu yn y shareVolumeKTable blaenorol, ond nid yw pob un yn cael ei anfon ymhellach.

Yna byddwn yn defnyddio'r KTable hwn i agregu (yn ôl nifer y cyfranddaliadau a fasnachwyd) i gyrraedd y pum cwmni sydd â'r cyfeintiau uchaf o gyfranddaliadau a fasnachir ym mhob diwydiant. Bydd ein gweithredoedd yn yr achos hwn yn debyg i'r rhai ar gyfer y casgliad cyntaf.

  1. Perfformio gweithrediad groupBy arall i grwpio gwrthrychau ShareVolume unigol fesul diwydiant.
  2. Dechreuwch grynhoi gwrthrychau ShareVolume. Y tro hwn mae'r gwrthrych agregu yn giw blaenoriaeth maint sefydlog. Yn y ciw maint sefydlog hwn, dim ond y pum cwmni sydd â’r symiau mwyaf o gyfranddaliadau a werthwyd sy’n cael eu cadw.
  3. Mapiwch y ciwiau o'r paragraff blaenorol i werth llinynnol a dychwelwch y pum stoc a fasnachwyd fwyaf yn ôl rhif fesul diwydiant.
  4. Ysgrifennwch y canlyniadau ar ffurf llinyn i'r testun.

Yn Ffig. Mae Ffigur 5.10 yn dangos y graff topoleg llif data. Fel y gwelwch, mae'r ail rownd o brosesu yn eithaf syml.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Nawr bod gennym ddealltwriaeth glir o strwythur yr ail rownd hon o brosesu, gallwn droi at ei god ffynhonnell (fe welwch ef yn y ffeil src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Rhestr 5.4) .

Mae'r cychwynnwr hwn yn cynnwys newidyn fixedQueue. Mae hwn yn wrthrych arferiad sy'n addasydd ar gyfer java.util.TreeSet a ddefnyddir i olrhain y canlyniadau N uchaf yn nhrefn ddisgynnol y cyfranddaliadau a fasnachir.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Rydych chi eisoes wedi gweld y galwadau groupBy a mapValues, felly ni fyddwn yn mynd i mewn i'r rheini (rydym yn galw'r dull KTable.toStream oherwydd bod y dull KTable.print yn anghymeradwy). Ond nid ydych wedi gweld fersiwn KTable o agreg() eto, felly byddwn yn treulio ychydig o amser yn trafod hynny.

Fel y cofiwch, yr hyn sy'n gwneud KTable yn wahanol yw bod cofnodion gyda'r un allweddi yn cael eu hystyried yn ddiweddariadau. Mae KTable yn disodli'r hen gofnod gydag un newydd. Mae agregu yn digwydd mewn ffordd debyg: mae'r cofnodion diweddaraf gyda'r un allwedd yn cael eu hagregu. Pan fydd cofnod yn cyrraedd, caiff ei ychwanegu at enghraifft dosbarth FixedSizePriorityQueue gan ddefnyddio gwiber (ail baramedr yn yr alwad dull cyfanredol), ond os oes cofnod arall eisoes yn bodoli gyda'r un allwedd, yna caiff yr hen gofnod ei dynnu gan ddefnyddio tynnu (trydydd paramedr yn yr alwad dull cyfanredol).

Mae hyn i gyd yn golygu nad yw ein cydgrynwr, FixedSizePriorityQueue, yn agregu'r holl werthoedd ag un allwedd, ond yn storio swm symudol o feintiau'r N mathau o stociau a fasnachir fwyaf. Mae pob cofnod sy'n dod i mewn yn cynnwys cyfanswm y cyfrannau a werthwyd hyd yn hyn. Bydd KTable yn rhoi gwybodaeth i chi am gyfranddaliadau cwmnïau sy'n cael eu masnachu fwyaf ar hyn o bryd, heb fod angen agregiad treigl o bob diweddariad.

Dysgon ni wneud dau beth pwysig:

  • gwerthoedd grŵp yn KTable gan allwedd gyffredin;
  • perfformio gweithrediadau defnyddiol fel rholio i fyny a chyfuno ar y gwerthoedd grŵp hyn.

Mae gwybod sut i gyflawni'r gweithrediadau hyn yn bwysig er mwyn deall ystyr y data sy'n symud trwy raglen Kafka Streams a deall pa wybodaeth sydd ganddo.

Rydym hefyd wedi dod â rhai o'r cysyniadau allweddol a drafodwyd yn gynharach yn y llyfr hwn ynghyd. Ym Mhennod 4, buom yn trafod pa mor bwysig yw cyflwr lleol goddefgar o fai ar gyfer cymhwysiad ffrydio. Roedd yr enghraifft gyntaf yn y bennod hon yn dangos pam mae gwladwriaeth leol mor bwysig - mae'n rhoi'r gallu i chi gadw golwg ar ba wybodaeth rydych chi wedi'i gweld eisoes. Mae mynediad lleol yn osgoi oedi rhwydwaith, gan wneud y cais yn fwy perfformiadol ac yn gallu gwrthsefyll gwallau.

Wrth berfformio unrhyw weithrediad rholio neu agregu, rhaid i chi nodi enw'r storfa wladwriaeth. Mae'r gweithrediadau rholio a chydgrynhoi yn dychwelyd enghraifft KTable, ac mae'r KTable yn defnyddio storfa wladwriaeth i ddisodli hen ganlyniadau â rhai newydd. Fel y gwelsoch, nid yw pob diweddariad yn cael ei anfon ar y gweill, ac mae hyn yn bwysig oherwydd bod gweithrediadau cydgasglu wedi'u cynllunio i gynhyrchu gwybodaeth gryno. Os na fyddwch yn cymhwyso cyflwr lleol, bydd KTable yn anfon yr holl ganlyniadau agregu a rholio ymlaen.

Nesaf, byddwn yn edrych ar gyflawni gweithrediadau megis agregu o fewn cyfnod penodol o amser - gweithrediadau ffenestru fel y'u gelwir.

5.3.2. Gweithrediadau ffenestr

Yn yr adran flaenorol, fe wnaethom gyflwyno convolution llithro a agregu. Perfformiodd y cais gynnydd parhaus o gyfaint gwerthiant stoc, ac yna agregu'r pum stoc a fasnachwyd fwyaf ar y gyfnewidfa.

Weithiau mae angen cydgrynhoi a chyflwyno canlyniadau yn barhaus. Ac weithiau dim ond dros gyfnod penodol o amser y mae angen i chi berfformio llawdriniaethau. Er enghraifft, cyfrifwch faint o drafodion cyfnewid a wnaed gyda chyfranddaliadau cwmni penodol yn y 10 munud diwethaf. Neu faint o ddefnyddwyr a gliciodd ar faner hysbysebu newydd yn y 15 munud diwethaf. Gall cais gyflawni gweithrediadau o'r fath sawl gwaith, ond gyda chanlyniadau sy'n berthnasol i gyfnodau penodol o amser yn unig (ffenestri amser).

Cyfrif trafodion cyfnewid yn ôl prynwr

Yn yr enghraifft nesaf, byddwn yn olrhain trafodion stoc ar draws masnachwyr lluosog - naill ai sefydliadau mawr neu arianwyr unigol craff.

Mae dau reswm posibl dros olrhain hwn. Un ohonynt yw'r angen i wybod beth mae arweinwyr y farchnad yn ei brynu/gwerthu. Os yw'r chwaraewyr mawr hyn a'r buddsoddwyr soffistigedig hyn yn gweld cyfle, mae'n gwneud synnwyr i ddilyn eu strategaeth. Yr ail reswm yw'r awydd i weld unrhyw arwyddion posibl o fasnachu mewnol anghyfreithlon. I wneud hyn, bydd angen i chi ddadansoddi cydberthynas pigau gwerthiant mawr â datganiadau pwysig i'r wasg.

Mae olrhain o'r fath yn cynnwys y camau canlynol:

  • creu ffrwd ar gyfer darllen o'r testun trafodion stoc;
  • grwpio cofnodion sy'n dod i mewn yn ôl ID prynwr a symbol stoc. Mae galw'r dull grŵpBy yn dychwelyd enghraifft o'r dosbarth KGroupedStream;
  • Mae'r dull KGroupedStream.windowedBy yn dychwelyd llif data wedi'i gyfyngu i ffenestr amser, sy'n caniatáu agregu ffenestr. Yn dibynnu ar y math o ffenestr, dychwelir naill ai TimeWindowedKSstream neu SessionWindowedKSstream;
  • cyfrif trafodion ar gyfer y gweithrediad agregu. Mae'r llif data ffenestr yn pennu a yw cofnod penodol yn cael ei ystyried yn y cyfrif hwn;
  • ysgrifennu canlyniadau i bwnc neu eu hallbynnu i'r consol yn ystod datblygiad.

Mae topoleg y cymhwysiad hwn yn syml, ond byddai darlun clir ohono yn ddefnyddiol. Gadewch i ni edrych ar Ffig. 5.11.

Nesaf, byddwn yn edrych ar ymarferoldeb gweithrediadau ffenestri a'r cod cyfatebol.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"

Mathau o ffenestri

Mae tri math o ffenestr yn Kafka Streams:

  • sesiynol;
  • “tumbling” (tumbling);
  • llithro/hercian.

Mae pa un i'w ddewis yn dibynnu ar eich gofynion busnes. Mae terfyn amser ar ffenestri tumbling a neidio, tra bod ffenestri sesiwn wedi'u cyfyngu gan weithgarwch defnyddwyr - mae hyd y sesiwn(sesiynau) yn cael ei bennu gan ba mor actif yw'r defnyddiwr yn unig. Y prif beth i'w gofio yw bod pob math o ffenestr yn seiliedig ar stampiau dyddiad/amser y cofnodion, nid amser y system.

Nesaf, rydym yn gweithredu ein topoleg gyda phob un o'r mathau o ffenestri. Rhoddir y cod cyflawn yn yr enghraifft gyntaf yn unig; ar gyfer mathau eraill o ffenestri ni fydd unrhyw beth yn newid ac eithrio'r math o weithrediad ffenestr.

Ffenestri sesiwn

Mae ffenestri sesiwn yn wahanol iawn i bob math arall o ffenestri. Maent wedi'u cyfyngu nid cymaint gan amser â gweithgaredd y defnyddiwr (neu weithgaredd yr endid yr hoffech ei olrhain). Mae cyfnodau o anweithgarwch yn cyfyngu ar ffenestri sesiynau.

Mae Ffigur 5.12 yn dangos y cysyniad o ffenestri sesiynau. Bydd y sesiwn lai yn uno â'r sesiwn ar y chwith. A bydd y sesiwn ar y dde ar wahân oherwydd ei fod yn dilyn cyfnod hir o anweithgarwch. Mae ffenestri sesiwn yn seiliedig ar weithgaredd defnyddwyr, ond defnyddiwch stampiau dyddiad/amser o gofnodion i benderfynu i ba sesiwn y mae'r cofnod yn perthyn.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"

Defnyddio ffenestri sesiwn i olrhain trafodion stoc

Gadewch i ni ddefnyddio ffenestri sesiwn i gasglu gwybodaeth am drafodion cyfnewid. Dangosir gweithrediad ffenestri sesiwn yn Rhestr 5.5 (sydd i'w weld yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Rydych chi eisoes wedi gweld y rhan fwyaf o'r llawdriniaethau yn y topoleg hon, felly nid oes angen edrych arnynt eto yma. Ond mae sawl elfen newydd yma hefyd, y byddwn yn eu trafod yn awr.

Mae unrhyw weithrediad grŵpBy fel arfer yn cyflawni rhyw fath o weithrediad agregu (agregu, rholio neu gyfrif). Gallwch berfformio naill ai agregu cronnus gyda chyfanswm rhedeg, neu agregu ffenestri, sy'n ystyried cofnodion o fewn ffenestr amser benodol.

Mae'r cod yn Rhestr 5.5 yn cyfrif nifer y trafodion o fewn ffenestri sesiynau. Yn Ffig. 5.13 dadansoddir y camau hyn gam wrth gam.

Drwy ffonio windowedBy(SessionWindows.with(twentySeconds).until(pimteenMinutes)) rydym yn creu ffenestr sesiwn gyda chyfwng anweithgarwch o 20 eiliad a chyfnod dyfalbarhad o 15 munud. Mae egwyl segur o 20 eiliad yn golygu y bydd y cais yn cynnwys unrhyw gofnod sy'n cyrraedd o fewn 20 eiliad i ddiwedd neu ddechrau'r sesiwn gyfredol i'r sesiwn gyfredol (gweithredol).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Nesaf, rydym yn nodi pa weithrediad agregu sydd angen ei berfformio yn ffenestr y sesiwn - yn yr achos hwn, cyfrif. Os yw cofnod sy'n dod i mewn yn disgyn y tu allan i'r ffenestr anweithgarwch (y naill ochr i'r stamp dyddiad/amser), mae'r cais yn creu sesiwn newydd. Mae cyfwng cadw yn golygu cynnal sesiwn am gyfnod penodol o amser ac mae'n caniatáu ar gyfer data hwyr sy'n ymestyn y tu hwnt i gyfnod anweithgarwch y sesiwn ond y gellir ei atodi o hyd. Yn ogystal, mae dechrau a diwedd y sesiwn newydd sy'n deillio o'r uno yn cyfateb i'r stamp dyddiad/amser cynharaf a diweddaraf.

Edrychwn ar ychydig o gofnodion o'r dull cyfrif i weld sut mae sesiynau'n gweithio (Tabl 5.1).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Pan fydd cofnodion yn cyrraedd, rydym yn edrych am sesiynau presennol gyda'r un allwedd, amser gorffen sy'n llai na'r stamp dyddiad/amser cyfredol - cyfwng anweithgarwch, ac amser cychwyn sy'n fwy na'r stamp dyddiad/amser cyfredol + cyfwng anweithgarwch. Gan gymryd hyn i ystyriaeth, pedwar cofnod o'r tabl. 5.1 yn cael eu huno i un sesiwn fel a ganlyn.

1. Mae cofnod 1 yn cyrraedd yn gyntaf, felly mae'r amser cychwyn yn hafal i'r amser gorffen ac yn 00:00:00.

2. Nesaf, mae cofnod 2 yn cyrraedd, ac rydym yn edrych am sesiynau sy'n dod i ben dim cynharach na 23:59:55 ac yn dechrau dim hwyrach na 00:00:35. Rydyn ni'n dod o hyd i gofnod 1 ac yn cyfuno sesiynau 1 a 2. Rydyn ni'n cymryd amser dechrau sesiwn 1 (yn gynharach) ac amser gorffen sesiwn 2 (yn ddiweddarach), fel bod ein sesiwn newydd yn dechrau am 00:00:00 ac yn gorffen am 00: 00:15.

3. Cofnod 3 yn cyrraedd, rydym yn edrych am sesiynau rhwng 00:00:30 a 00:01:10 ac nid ydynt yn dod o hyd i unrhyw un. Ychwanegu ail sesiwn ar gyfer yr allwedd 123-345-654,FFBE, gan ddechrau a gorffen am 00:00:50.

4. Mae record 4 yn cyrraedd ac rydym yn chwilio am sesiynau rhwng 23:59:45 a 00:00:25. Y tro hwn canfyddir y ddwy sesiwn 1 a 2. Cyfunir y tair sesiwn yn un, gydag amser cychwyn o 00:00:00 ac amser gorffen o 00:00:15.

O'r hyn a ddisgrifir yn yr adran hon, mae'n werth cofio'r arlliwiau pwysig canlynol:

  • nid yw sesiynau yn ffenestri maint sefydlog. Mae hyd sesiwn yn cael ei bennu gan y gweithgaredd o fewn cyfnod penodol o amser;
  • Mae'r stampiau dyddiad/amser yn y data yn pennu a yw'r digwyddiad yn dod o fewn sesiwn bresennol neu yn ystod cyfnod segur.

Nesaf byddwn yn trafod y math nesaf o ffenestr - ffenestri “tumbling”.

"Tumbling" ffenestri

Mae ffenestri tumbling yn dal digwyddiadau sy'n dod o fewn cyfnod penodol o amser. Dychmygwch fod angen i chi ddal holl drafodion stoc cwmni penodol bob 20 eiliad, felly rydych chi'n casglu'r holl ddigwyddiadau yn ystod y cyfnod hwnnw. Ar ddiwedd yr egwyl 20 eiliad, mae'r ffenestr yn rholio drosodd ac yn symud i egwyl arsylwi newydd o 20 eiliad. Mae Ffigur 5.14 yn dangos y sefyllfa hon.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Fel y gallwch weld, mae'r holl ddigwyddiadau a dderbyniwyd yn yr 20 eiliad diwethaf wedi'u cynnwys yn y ffenestr. Ar ddiwedd y cyfnod hwn o amser, mae ffenestr newydd yn cael ei chreu.

Mae rhestru 5.6 yn dangos cod sy'n dangos y defnydd o ffenestri tumbling i ddal trafodion stoc bob 20 eiliad (a geir yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Gyda'r newid bach hwn i alwad dull TimeWindows.of, gallwch ddefnyddio ffenestr tumbling. Nid yw'r enghraifft hon yn galw'r dull tan (), felly bydd y cyfnod cadw rhagosodedig o 24 awr yn cael ei ddefnyddio.

Yn olaf, mae'n bryd symud ymlaen i'r olaf o'r opsiynau ffenestr - ffenestri "hopian".

Llithro ("neidio") ffenestri

Mae ffenestri llithro/hopian yn debyg i ffenestri tumbling, ond gyda gwahaniaeth bach. Nid yw ffenestri llithro yn aros tan ddiwedd yr egwyl amser cyn creu ffenestr newydd i brosesu digwyddiadau diweddar. Maent yn dechrau cyfrifiadau newydd ar ôl cyfnod aros sy'n llai na hyd y ffenestr.

I ddangos y gwahaniaethau rhwng ffenestri tumbling a neidio, gadewch i ni ddychwelyd at yr enghraifft o gyfrif trafodion cyfnewid stoc. Ein nod o hyd yw cyfrif nifer y trafodion, ond nid ydym am aros am yr holl amser cyn diweddaru'r cownter. Yn lle hynny, byddwn yn diweddaru'r cownter yn fyrrach. Er enghraifft, byddwn yn dal i gyfrif nifer y trafodion bob 20 eiliad, ond yn diweddaru'r cownter bob 5 eiliad, fel y dangosir yn Ffig. 5.15. Yn yr achos hwn, mae gennym dair ffenestr ganlyniad gyda data sy'n gorgyffwrdd.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Mae rhestru 5.7 yn dangos y cod ar gyfer diffinio ffenestri llithro (a geir yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Gellir trosi ffenestr tumbling yn ffenestr hercian trwy ychwanegu galwad at y dull ymlaen llawBy(). Yn yr enghraifft a ddangosir, yr egwyl arbed yw 15 munud.

Fe welsoch chi yn yr adran hon sut i gyfyngu canlyniadau agregu i ffenestri amser. Yn benodol, rwyf am i chi gofio'r tri pheth canlynol o'r adran hon:

  • mae maint ffenestri sesiwn wedi'i gyfyngu nid gan gyfnod amser, ond gan weithgaredd defnyddwyr;
  • Mae ffenestri “tumbling” yn rhoi trosolwg o ddigwyddiadau o fewn cyfnod penodol o amser;
  • Mae hyd ffenestri neidio yn sefydlog, ond cânt eu diweddaru'n aml a gallant gynnwys cofnodion sy'n gorgyffwrdd ym mhob ffenestr.

Nesaf, byddwn yn dysgu sut i drosi KTable yn ôl i Kstream ar gyfer cysylltiad.

5.3.3. Cysylltu gwrthrychau Kstream a KTable

Ym Mhennod 4, buom yn trafod cysylltu dau wrthrych KSstream. Nawr mae'n rhaid i ni ddysgu sut i gysylltu KTable a KSstream. Efallai y bydd angen hyn am y rheswm syml canlynol. Mae KStream yn ffrwd o gofnodion, ac mae KTable yn ffrwd o ddiweddariadau cofnodion, ond weithiau efallai y byddwch am ychwanegu cyd-destun ychwanegol i'r ffrwd cofnodion gan ddefnyddio diweddariadau o'r KTable.

Gadewch i ni gymryd data ar nifer y trafodion cyfnewid stoc a'u cyfuno â newyddion cyfnewid stoc ar gyfer y diwydiannau perthnasol. Dyma beth sydd angen i chi ei wneud i gyflawni hyn o ystyried y cod sydd gennych eisoes.

  1. Trosi gwrthrych KTable gyda data ar nifer y trafodion stoc yn KSstream, ac yna amnewid yr allwedd gyda'r allwedd sy'n nodi'r sector diwydiant sy'n cyfateb i'r symbol stoc hwn.
  2. Creu gwrthrych KTable sy'n darllen data o bwnc gyda newyddion cyfnewid stoc. Bydd y KTable newydd hwn yn cael ei gategoreiddio yn ôl sector diwydiant.
  3. Cysylltu diweddariadau newyddion â gwybodaeth am nifer y trafodion cyfnewid stoc yn ôl sector diwydiant.

Nawr, gadewch i ni weld sut i roi'r cynllun gweithredu hwn ar waith.

Trosi KTable i KSstream

I drosi KTable i KSstream mae angen i chi wneud y canlynol.

  1. Ffoniwch y dull KTable.toStream().
  2. Trwy ffonio'r dull KStream.map, rhowch enw'r diwydiant yn lle'r allwedd, ac yna adalw'r gwrthrych TransactionSummary o'r enghraifft Windowed.

Byddwn yn cadwyno'r gweithrediadau hyn gyda'i gilydd fel a ganlyn (mae'r cod i'w weld yn y ffeil src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Rhestr 5.8).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Oherwydd ein bod yn cyflawni gweithrediad KSstream.map, mae'r enghraifft KStream a ddychwelwyd yn cael ei ail-rannu'n awtomatig pan gaiff ei ddefnyddio mewn cysylltiad.

Rydym wedi cwblhau'r broses drosi, nesaf mae angen i ni greu gwrthrych KTable ar gyfer darllen newyddion stoc.

Creu KTable ar gyfer newyddion stoc

Yn ffodus, mae creu gwrthrych KTable yn cymryd un llinell o god yn unig (mae'r cod i'w weld yn src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Rhestr 5.9).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Mae'n werth nodi nad oes angen nodi unrhyw wrthrychau Serde, gan fod Serdes llinynnol yn cael eu defnyddio yn y gosodiadau. Hefyd, trwy ddefnyddio'r rhif CYNNAR, mae'r tabl yn cael ei lenwi â chofnodion ar y cychwyn cyntaf.

Nawr gallwn symud ymlaen i'r cam olaf - cysylltiad.

Cysylltu diweddariadau newyddion â data cyfrif trafodion

Nid yw creu cysylltiad yn anodd. Byddwn yn defnyddio uniad chwith rhag ofn nad oes unrhyw newyddion stoc ar gyfer y diwydiant perthnasol (mae'r cod angenrheidiol i'w weld yn y ffeil src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Rhestr 5.10).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Mae'r gweithredwr leftJoin hwn yn eithaf syml. Yn wahanol i'r uniadau ym Mhennod 4, ni ddefnyddir y dull JoinWindow oherwydd wrth berfformio uniad KSstream-KTable, dim ond un cofnod sydd yn y KTable ar gyfer pob allwedd. Nid yw cysylltiad o'r fath yn gyfyngedig o ran amser: mae'r cofnod naill ai yn y KTable neu'n absennol. Y prif gasgliad: gan ddefnyddio gwrthrychau KTable gallwch gyfoethogi KSstream gyda data cyfeirio sy'n cael ei ddiweddaru'n llai aml.

Nawr byddwn yn edrych ar ffordd fwy effeithlon o gyfoethogi digwyddiadau o KSstream.

5.3.4. Gwrthrychau GlobalKTable

Fel y gwelwch, mae angen cyfoethogi ffrydiau digwyddiadau neu ychwanegu cyd-destun iddynt. Ym Mhennod 4 gwelsoch y cysylltiadau rhwng dau wrthrych KSstream, ac yn yr adran flaenorol fe welsoch y cysylltiad rhwng KSstream a KTable. Ym mhob un o'r achosion hyn, mae angen ail-rannu'r llif data wrth fapio'r allweddi i fath neu werth newydd. Weithiau mae ail-rannu yn cael ei wneud yn benodol, ac weithiau mae Kafka Streams yn ei wneud yn awtomatig. Mae angen ail-rannu oherwydd bod yr allweddi wedi newid a rhaid i'r cofnodion ddod i ben mewn adrannau newydd, neu bydd y cysylltiad yn amhosib (trafodwyd hyn ym Mhennod 4, yn yr adran “Re-partitioning data” yn isadran 4.2.4).

Mae cost i ail-rannu

Mae ail-rannu yn gofyn am gostau - costau adnoddau ychwanegol ar gyfer creu pynciau canolradd, storio data dyblyg mewn pwnc arall; mae hefyd yn golygu mwy o hwyrni oherwydd ysgrifennu a darllen o'r testun hwn. Yn ogystal, os oes angen i chi ymuno ar draws mwy nag un agwedd neu ddimensiwn, rhaid i chi gadwyno'r uniadau, mapio'r cofnodion gydag allweddi newydd, a rhedeg y broses ail-rannu eto.

Cysylltu â setiau data llai

Mewn rhai achosion, mae cyfaint y data cyfeirio sydd i'w gysylltu yn gymharol fach, felly gall copïau cyflawn ohono ffitio'n lleol ar bob nod yn hawdd. Ar gyfer sefyllfaoedd fel hyn, mae Kafka Streams yn darparu'r dosbarth GlobalKTable.

Mae achosion GlobalKTable yn unigryw oherwydd bod y rhaglen yn dyblygu'r holl ddata i bob un o'r nodau. A chan fod yr holl ddata yn bresennol ar bob nod, nid oes angen rhannu ffrwd y digwyddiad trwy allwedd data cyfeirio fel ei fod ar gael i bob rhaniad. Gallwch hefyd wneud uniadau di-allwedd gan ddefnyddio gwrthrychau GlobalKTable. Gadewch i ni fynd yn ôl at un o'r enghreifftiau blaenorol i ddangos y nodwedd hon.

Cysylltu gwrthrychau KSstream i wrthrychau GlobalKTable

Yn is-adran 5.3.2, gwnaethom agregu trafodion cyfnewid gan brynwyr mewn ffenestr. Roedd canlyniadau'r casgliad hwn yn edrych fel hyn:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Er bod y canlyniadau hyn yn ateb y diben, byddai wedi bod yn fwy defnyddiol pe bai enw'r cwsmer ac enw llawn y cwmni hefyd wedi'u harddangos. I ychwanegu enw'r cwsmer ac enw'r cwmni, gallwch chi wneud uniadau arferol, ond bydd angen i chi wneud dau fapio allweddol ac ail-rannu. Gyda GlobalKTable gallwch osgoi cost gweithrediadau o'r fath.

I wneud hyn, byddwn yn defnyddio'r gwrthrych countStream o Listing 5.11 (mae'r cod cyfatebol i'w weld yn src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) a'i gysylltu â dau wrthrych GlobalKTable.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Rydym eisoes wedi trafod hyn o'r blaen, felly nid wyf am ei ailadrodd. Ond nodaf fod y cod yn ffwythiant map toStream().map yn cael ei dynnu i mewn i wrthrych ffwythiant yn lle mynegiad lambda mewnlin er mwyn ei ddarllen.

Y cam nesaf yw datgan dau achos o GlobalKTable (mae'r cod a ddangosir i'w weld yn y ffeil src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Rhestr 5.12).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"

Sylwch fod enwau pynciau'n cael eu disgrifio gan ddefnyddio mathau wedi'u rhifo.

Nawr bod gennym yr holl gydrannau yn barod, y cyfan sydd ar ôl yw ysgrifennu'r cod ar gyfer y cysylltiad (sydd i'w gael yn y ffeil src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Rhestr 5.13).

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
Er bod dau gysylltiad yn y cod hwn, maent wedi'u cadwyno oherwydd ni ddefnyddir y naill na'r llall o'u canlyniadau ar wahân. Dangosir y canlyniadau ar ddiwedd y llawdriniaeth gyfan.

Pan fyddwch yn rhedeg y gweithrediad ymuno uchod, byddwch yn cael canlyniadau fel hyn:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Nid yw'r hanfod wedi newid, ond mae'r canlyniadau hyn yn edrych yn fwy clir.

Os ydych chi'n cyfrif i lawr i Bennod 4, rydych chi eisoes wedi gweld sawl math o gysylltiadau ar waith. Maent wedi'u rhestru yn y tabl. 5.2. Mae'r tabl hwn yn adlewyrchu'r galluoedd cysylltedd fel fersiwn 1.0.0 o Kafka Streams; Gall rhywbeth newid mewn datganiadau yn y dyfodol.

Mae'r llyfr “Kafka Streams in Action. Cymwysiadau a microwasanaethau ar gyfer gwaith amser real"
I gloi pethau, gadewch i ni ailadrodd y pethau sylfaenol: gallwch gysylltu ffrydiau digwyddiad (KStream) a diweddaru ffrydiau (KTable) gan ddefnyddio cyflwr lleol. Fel arall, os nad yw maint y data cyfeirio yn rhy fawr, gallwch ddefnyddio'r gwrthrych GlobalKTable. Mae GlobalKTables yn dyblygu pob rhaniad i bob nod cymhwysiad Kafka Streams, gan sicrhau bod yr holl ddata ar gael ni waeth pa raniad y mae'r allwedd yn cyfateb iddo.

Nesaf fe welwn nodwedd Kafka Streams, a diolch i hynny gallwn arsylwi newidiadau cyflwr heb ddefnyddio data o bwnc Kafka.

5.3.5. Cyflwr amheus

Rydym eisoes wedi cyflawni nifer o weithrediadau sy'n cynnwys cyflwr a bob amser yn allbwn y canlyniadau i'r consol (at ddibenion datblygu) neu eu hysgrifennu at bwnc (at ddibenion cynhyrchu). Wrth ysgrifennu canlyniadau i bwnc, mae'n rhaid i chi ddefnyddio defnyddiwr Kafka i'w gweld.

Gellir ystyried bod data darllen o'r pynciau hyn yn fath o safbwyntiau wedi'u gwireddu. At ein dibenion ni, gallwn ddefnyddio'r diffiniad o olygfa wedi'i gwireddu o Wicipedia: “...gwrthrych cronfa ddata ffisegol sy'n cynnwys canlyniadau ymholiad. Er enghraifft, gallai fod yn gopi lleol o ddata o bell, neu’n is-set o resi a/neu golofnau tabl neu ganlyniadau ymuno, neu dabl cryno a gafwyd drwy gydgrynhoi” ( https://en.wikipedia.org/wiki /golwg_materialized).

Mae Kafka Streams hefyd yn caniatáu ichi redeg ymholiadau rhyngweithiol ar siopau'r wladwriaeth, sy'n eich galluogi i ddarllen y golygfeydd sylweddol hyn yn uniongyrchol. Mae'n bwysig nodi bod yr ymholiad i siop y wladwriaeth yn weithrediad darllen yn unig. Mae hyn yn sicrhau nad oes rhaid i chi boeni am wneud cyflwr yn anghyson yn ddamweiniol tra bod eich cais yn prosesu data.

Mae'r gallu i gwestiynu siopau cyflwr yn uniongyrchol yn bwysig. Mae hyn yn golygu y gallwch chi greu cymwysiadau dangosfwrdd heb orfod nôl data gan y defnyddiwr Kafka yn gyntaf. Mae hefyd yn cynyddu effeithlonrwydd y cais, oherwydd y ffaith nad oes angen ysgrifennu data eto:

  • diolch i leoliad y data, gellir eu cyrchu'n gyflym;
  • mae dyblygu data yn cael ei ddileu, gan nad yw wedi'i ysgrifennu i storfa allanol.

Y prif beth yr wyf am i chi ei gofio yw y gallwch chi ymholi'n uniongyrchol o'r tu mewn i'ch cais. Ni ellir gorbwysleisio'r cyfleoedd y mae hyn yn eu rhoi i chi. Yn hytrach na defnyddio data o Kafka a storio cofnodion mewn cronfa ddata ar gyfer y cais, gallwch ymholi storfeydd cyflwr gyda'r un canlyniad. Mae ymholiadau uniongyrchol i storfeydd y wladwriaeth yn golygu llai o god (dim defnyddiwr) a llai o feddalwedd (dim angen tabl cronfa ddata i storio'r canlyniadau).

Rydym wedi ymdrin â chryn dipyn o dir yn y bennod hon, felly byddwn yn gadael ein trafodaeth o ymholiadau rhyngweithiol yn erbyn siopau'r wladwriaeth am y tro. Ond peidiwch â phoeni: ym Mhennod 9, byddwn yn creu rhaglen dangosfwrdd syml gydag ymholiadau rhyngweithiol. Bydd yn defnyddio rhai o'r enghreifftiau o hwn a phenodau blaenorol i ddangos ymholiadau rhyngweithiol a sut y gallwch eu hychwanegu at gymwysiadau Kafka Streams.

Crynodeb

  • Mae gwrthrychau KSstream yn cynrychioli ffrydiau o ddigwyddiadau, sy'n debyg i fewnosodiadau mewn cronfa ddata. Mae gwrthrychau KTable yn cynrychioli ffrydiau diweddaru, yn debycach i ddiweddariadau i gronfa ddata. Nid yw maint y gwrthrych KTable yn tyfu, mae hen gofnodion yn cael eu disodli gan rai newydd.
  • Mae angen gwrthrychau KTable ar gyfer gweithrediadau agregu.
  • Gan ddefnyddio gweithrediadau ffenestru, gallwch rannu data cyfanredol yn fwcedi amser.
  • Diolch i wrthrychau GlobalKTable, gallwch gyrchu data cyfeirio unrhyw le yn y rhaglen, waeth beth fo'r rhaniad.
  • Mae cysylltiadau rhwng gwrthrychau KSstream, KTable a GlobalKTable yn bosibl.

Hyd yn hyn, rydym wedi canolbwyntio ar adeiladu cymwysiadau Kafka Streams gan ddefnyddio'r lefel uchel KSstream DSL. Er bod y dull lefel uchel yn caniatáu ichi greu rhaglenni taclus a chryno, mae ei ddefnyddio yn gyfaddawd. Mae gweithio gyda DSL KStream yn golygu cynyddu crynoder eich cod trwy leihau lefel y rheolaeth. Yn y bennod nesaf, byddwn yn edrych ar yr API nod trin lefel isel ac yn rhoi cynnig ar gyfaddawdau eraill. Bydd y rhaglenni'n hirach nag yr oeddent o'r blaen, ond byddwn yn gallu creu bron unrhyw nod trin y gallai fod ei angen arnom.

→ Ceir rhagor o fanylion am y llyfr yn gwefan y cyhoeddwr

→ Ar gyfer Habrozhiteli gostyngiad o 25% gan ddefnyddio cwpon - Nentydd Kafka

→ Ar ôl talu am fersiwn bapur y llyfr, anfonir llyfr electronig trwy e-bost.

Ffynhonnell: hab.com

Ychwanegu sylw