Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“ Halló, Khabro íbúar! Þessi bók er hentugur fyrir alla þróunaraðila sem vilja skilja þráðavinnslu. Að skilja dreifða forritun mun hjálpa þér að skilja Kafka og Kafka Streams betur. Það væri gaman að þekkja Kafka rammann sjálfan, en þetta er ekki nauðsynlegt: ​​Ég mun segja þér allt sem þú þarft. Reyndir Kafka forritarar og nýliðar munu læra hvernig á að búa til áhugaverð straumvinnsluforrit með Kafka Streams bókasafninu í þessari bók. Meðalstigs- og háþróaðir Java forritarar sem þegar þekkja hugtök eins og serialization munu læra að beita færni sinni til að búa til Kafka Streams forrit. Frumkóði bókarinnar er skrifaður í Java 8 og nýtir verulega Java 8 lambda tjáningu setningafræði, svo að kunna að vinna með lambda aðgerðir (jafnvel á öðru forritunarmáli) kemur sér vel.

Útdráttur. 5.3. Söfnun og gluggaaðgerðir

Í þessum hluta munum við halda áfram að kanna efnilegustu hluta Kafka Streams. Hingað til höfum við fjallað um eftirfarandi þætti Kafka Streams:

  • búa til vinnslustaðfræði;
  • nota ástand í streymisforritum;
  • framkvæma gagnastraumstengingar;
  • munur á viðburðastraumum (KStream) og uppfærslustraumum (KTable).

Í eftirfarandi dæmum munum við leiða alla þessa þætti saman. Þú munt líka læra um gluggastillingu, annar frábær eiginleiki streymisforrita. Fyrsta dæmið okkar verður einföld samansafn.

5.3.1. Samanlagning hlutabréfasölu eftir atvinnugreinum

Söfnun og flokkun eru mikilvæg verkfæri þegar unnið er með streymigögn. Athugun einstakra gagna eins og þær berast er oft ófullnægjandi. Til að draga viðbótarupplýsingar úr gögnum er nauðsynlegt að flokka þær og sameina þær.

Í þessu dæmi muntu fara í búning dagkaupmanns sem þarf að fylgjast með sölumagni hlutabréfa fyrirtækja í nokkrum atvinnugreinum. Nánar tiltekið hefur þú áhuga á þeim fimm fyrirtækjum sem hafa mesta hlutasölu í hverri atvinnugrein.

Slík samansöfnun mun krefjast eftirfarandi nokkurra skrefa til að þýða gögnin á æskilegt form (tala almennt).

  1. Búðu til efnisbundna heimild sem birtir hráar upplýsingar um hlutabréfaviðskipti. Við verðum að kortleggja hlut af gerðinni StockTransaction við hlut af gerðinni ShareVolume. Málið er að StockTransaction hluturinn inniheldur sölulýsigögn, en við þurfum aðeins gögn um fjölda hluta sem eru seldir.
  2. Flokkaðu ShareVolume gögn eftir hlutabréfatákni. Þegar þú hefur flokkað þau eftir táknum geturðu fellt þessi gögn saman í undirsamtölur af sölumagni á lager. Þess má geta að KStream.groupBy aðferðin skilar tilviki af gerðinni KGroupedStream. Og þú getur fengið KTable dæmi með því að kalla frekar KGroupedStream.reduce aðferðina.

Hvað er KGroupedStream viðmótið

Aðferðirnar KStream.groupBy og KStream.groupByKey skila tilviki af KGroupedStream. KGroupedStream er milliframsetning á straumi atburða eftir flokkun eftir lyklum. Það er alls ekki ætlað til beina vinnu með það. Þess í stað er KGroupedStream notað fyrir söfnunaraðgerðir, sem alltaf leiða til KTable. Og þar sem niðurstaðan af samsöfnunaraðgerðum er KTable og þeir nota ríkisverslun, er mögulegt að ekki allar uppfærslur af þeim sökum séu sendar lengra í leiðinni.

KTable.groupBy aðferðin skilar svipaðri KGroupedTable - milliframsetning á uppfærslustraumnum, endurflokkað eftir lykli.

Við skulum taka stutta pásu og skoða mynd. 5.9, sem sýnir hvað við höfum áorkað. Þessi staðfræði ætti nú þegar að vera mjög kunnugleg fyrir þig.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Við skulum nú skoða kóðann fyrir þessa staðfræði (hann er að finna í skránni src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Skrá 5.2).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Tilgreindur kóði er aðgreindur af stuttu máli og miklu magni aðgerða sem gerðar eru í nokkrum línum. Þú gætir tekið eftir einhverju nýju í fyrstu breytu builder.stream aðferðarinnar: gildi af enum gerðinni AutoOffsetReset.EARLIEST (það er líka LATEST), stillt með Consumed.withOffsetResetPolicy aðferðinni. Þessa upptalningartegund er hægt að nota til að tilgreina offset endurstillingarstefnu fyrir hvern KStream eða KTable og hefur forgang fram yfir offset endurstillingarvalkostinn úr uppsetningunni.

GroupByKey og GroupBy

KStream viðmótið hefur tvær aðferðir til að flokka færslur: GroupByKey og GroupBy. Báðir skila KGroupedTable, svo þú gætir verið að velta fyrir þér hver munurinn er á þeim og hvenær á að nota hvora?

GroupByKey aðferðin er notuð þegar lyklarnir í KStream eru þegar ekki tómir. Og síðast en ekki síst, „þarfst endurskipting“ fáninn var aldrei stilltur.

GroupBy aðferðin gerir ráð fyrir að þú hafir breytt hóplykla, þannig að endurskiptingarfáninn er stilltur á satt. Að framkvæma sameiningu, samsöfnun o.s.frv. eftir GroupBy aðferðinni mun leiða til sjálfvirkrar skiptingar.
Samantekt: Þegar mögulegt er ættirðu að nota GroupByKey frekar en GroupBy.

Það er ljóst hvað mapValues ​​​​og groupBy aðferðirnar gera, svo við skulum skoða sum() aðferðina (finnst í src/main/java/bbejeck/model/ShareVolume.java) (skráning 5.3).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
ShareVolume.sum aðferðin skilar hlaupandi heildarmagni birgðasölumagns og niðurstaða allrar útreikningakeðjunnar er KTable hlutur . Nú skilurðu hlutverkið sem KTable gegnir. Þegar ShareVolume hlutir koma geymir samsvarandi KTable hlutur nýjustu núverandi uppfærsluna. Það er mikilvægt að muna að allar uppfærslur endurspeglast í fyrri shareVolumeKTable, en ekki allar eru sendar lengra.

Við notum síðan þessa KTable til að leggja saman (eftir fjölda hlutabréfa sem verslað er með) til að komast að þeim fimm fyrirtækjum sem eru með mest magn hlutabréfa sem verslað er með í hverri atvinnugrein. Aðgerðir okkar í þessu tilfelli verða svipaðar og við fyrstu söfnunina.

  1. Framkvæmdu aðra GroupBy aðgerð til að flokka einstaka ShareVolume hluti eftir atvinnugreinum.
  2. Byrjaðu að draga saman ShareVolume hluti. Að þessu sinni er safnhluturinn forgangsröð í fastri stærð. Í þessari fastri stærðarröð eru aðeins þau fimm fyrirtæki sem eru með mest seld hlutabréf.
  3. Kortleggðu biðraðirnar frá fyrri málsgrein yfir á strengsgildi og skilaðu fimm efstu hlutabréfunum sem mest viðskipti eru með eftir fjölda eftir atvinnugreinum.
  4. Skrifaðu niðurstöðurnar á strengjaformi við efnið.

Í mynd. Mynd 5.10 sýnir gröf gagnaflæðis svæðisfræðinnar. Eins og þú sérð er önnur umferð vinnslunnar frekar einföld.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Nú þegar við höfum skýran skilning á uppbyggingu þessarar annarrar lotu vinnslu, getum við snúið okkur að frumkóða hans (þú finnur hann í skránni src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Skrá 5.4) .

Þessi frumstilli inniheldur fasta biðröð breytu. Þetta er sérsniðinn hlutur sem er millistykki fyrir java.util.TreeSet sem er notað til að fylgjast með efstu N niðurstöðum í lækkandi röð hlutabréfa sem verslað er með.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Þú hefur þegar séð groupBy og mapValues ​​símtölin, svo við förum ekki í þau (við köllum KTable.toStream aðferðina vegna þess að KTable.print aðferðin er úrelt). En þú hefur ekki séð KTable útgáfuna af aggregate() ennþá, svo við munum eyða smá tíma í að ræða það.

Eins og þú manst, það sem gerir KTable öðruvísi er að færslur með sömu lykla eru taldar uppfærslur. KTable skiptir gömlu færslunni út fyrir nýja. Söfnun á sér stað á svipaðan hátt: nýjustu færslur með sama lykli eru teknar saman. Þegar færsla kemur er henni bætt við FixedSizePriorityQueue flokkatilvikið með því að nota adder (annar færibreyta í samanlagða aðferðarkallinu), en ef önnur færsla er þegar til með sama lykli, þá er gamla færslan fjarlægð með frádráttarbúnaði (þriðja færibreytan í heildaraðferðarkallið).

Þetta þýðir allt að samansafnari okkar, FixedSizePriorityQueue, safnar ekki saman öllum gildum með einum lykli, heldur geymir hreyfanlegur summa af magni N mest verslaðra tegunda hlutabréfa. Hver innkoma færsla inniheldur heildarfjölda seldra hlutabréfa hingað til. KTable mun gefa þér upplýsingar um hvaða hlutabréf eru mest viðskipti með hlutabréf eins og er, án þess að þurfa að safna saman hverri uppfærslu.

Við lærðum að gera tvo mikilvæga hluti:

  • flokka gildi í KTable með sameiginlegum lykli;
  • framkvæma gagnlegar aðgerðir eins og uppröðun og samsöfnun á þessum flokkuðu gildum.

Að vita hvernig á að framkvæma þessar aðgerðir er mikilvægt til að skilja merkingu gagna sem fara í gegnum Kafka Streams forrit og skilja hvaða upplýsingar þau bera.

Við höfum líka tekið saman nokkur af lykilhugtökum sem fjallað var um fyrr í þessari bók. Í kafla 4 ræddum við hversu bilunarþolið staðbundið ríki er mikilvægt fyrir streymisforrit. Fyrsta dæmið í þessum kafla sýndi hvers vegna staðbundið ríki er svo mikilvægt - það gefur þér möguleika á að halda utan um hvaða upplýsingar þú hefur þegar séð. Staðbundinn aðgangur kemur í veg fyrir nettafir, sem gerir forritið afkastameira og villuþolnara.

Þegar þú framkvæmir einhverja upprifjun eða söfnunaraðgerð verður þú að tilgreina nafn ríkisverslunarinnar. Upprifjun og samsöfnunaraðgerðir skila KTable tilviki og KTable notar ástandsgeymslu til að skipta út gömlum niðurstöðum fyrir nýjar. Eins og þú hefur séð eru ekki allar uppfærslur sendar niður í leiðslunni og þetta er mikilvægt vegna þess að samansafnunaraðgerðir eru hannaðar til að framleiða yfirlitsupplýsingar. Ef þú notar ekki staðbundið ríki mun KTable áframsenda allar samansöfnunar- og samsetningarniðurstöður.

Næst munum við skoða að framkvæma aðgerðir eins og samsöfnun innan ákveðins tíma - svokallaðar gluggaaðgerðir.

5.3.2. Gluggaaðgerðir

Í fyrri hlutanum kynntum við rennibraut og samsöfnun. Umsóknin framkvæmdi samfellda uppröðun á sölumagni hlutabréfa, fylgt eftir með því að safna saman fimm mest veltu hlutabréfum í kauphöllinni.

Stundum er slík samfelld samansöfnun og uppröðun niðurstaðna nauðsynleg. Og stundum þarftu að framkvæma aðgerðir aðeins á tilteknu tímabili. Til dæmis, reiknaðu út hversu mörg skipti hafa verið gerð með hlutabréf í tilteknu fyrirtæki á síðustu 10 mínútum. Eða hversu margir notendur smelltu á nýjan auglýsingaborða á síðustu 15 mínútum. Forrit getur framkvæmt slíkar aðgerðir margsinnis, en með niðurstöðum sem eiga aðeins við um tilgreind tímabil (tímaglugga).

Talning skiptaviðskipta eftir kaupanda

Í næsta dæmi munum við fylgjast með hlutabréfaviðskiptum milli margra kaupmanna - annað hvort stórra stofnana eða klárra einstakra fjármálamanna.

Það eru tvær mögulegar ástæður fyrir þessari mælingu. Ein af þeim er þörfin á að vita hvað markaðsleiðtogar eru að kaupa/selja. Ef þessir stóru leikmenn og háþróaðir fjárfestar sjá tækifæri er skynsamlegt að fylgja stefnu þeirra. Önnur ástæðan er löngunin til að koma auga á hugsanleg merki um ólögleg innherjaviðskipti. Til að gera þetta þarftu að greina fylgni stórra söluhækkana við mikilvægar fréttatilkynningar.

Slík rakning samanstendur af eftirfarandi skrefum:

  • búa til straum til að lesa úr umræðuefninu um hlutabréfaviðskipti;
  • flokka innkomnar færslur eftir auðkenni kaupanda og hlutabréfatákni. Að kalla groupBy aðferðina skilar tilviki af KGroupedStream bekknum;
  • KGroupedStream.windowedBy aðferðin skilar gagnastraumi sem takmarkast við tímaglugga, sem gerir gluggasamsöfnun kleift. Það fer eftir gluggagerðinni, annað hvort TimeWindowedKSstream eða SessionWindowedKSstream er skilað;
  • færslufjölda fyrir söfnunaraðgerðina. Gagnaflæðið í glugganum ákvarðar hvort tekið sé tillit til tiltekinnar skráningar í þessari talningu;
  • skrifa niðurstöður við efni eða senda þær á stjórnborðið meðan á þróun stendur.

Yfirborðsfræði þessa forrits er einföld, en skýr mynd af því væri gagnleg. Við skulum kíkja á mynd. 5.11.

Næst munum við skoða virkni gluggaaðgerða og samsvarandi kóða.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“

Tegundir glugga

Það eru þrjár gerðir af gluggum í Kafka Streams:

  • fundur;
  • "velta";
  • renna/hoppa.

Hver á að velja fer eftir viðskiptakröfum þínum. Tíma- og stökkgluggar eru takmarkaðir í tíma, en lotugluggar eru takmarkaðir af virkni notenda - lengd lotunnar/lotanna ræðst eingöngu af því hversu virkur notandinn er. Það sem helst þarf að muna er að allar gluggagerðir eru byggðar á dagsetningar-/tímastimplum færslunnar, ekki kerfistímanum.

Næst innleiðum við staðfræði okkar með hverri gluggagerðinni. Allur kóðinn verður aðeins gefinn í fyrsta dæminu; fyrir aðrar gerðir af gluggum breytist ekkert nema tegund gluggaaðgerða.

Fundargluggar

Session gluggar eru mjög frábrugðnir öllum öðrum gerðum glugga. Þau takmarkast ekki svo mikið af tíma heldur af virkni notandans (eða virkni aðilans sem þú vilt fylgjast með). Fundargluggar eru afmarkaðir af óvirknitímabilum.

Mynd 5.12 sýnir hugmyndina um setuglugga. Minni fundurinn mun sameinast fundinum vinstra megin við hann. Og þingið til hægri verður aðskilið vegna þess að það kemur í kjölfar langvarandi aðgerðaleysis. Setugluggar eru byggðir á virkni notenda, en notast við dagsetningar-/tímastimpla úr færslum til að ákvarða hvaða lotu færslan tilheyrir.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“

Notkun setuglugga til að fylgjast með hlutabréfaviðskiptum

Notum lotuglugga til að fanga upplýsingar um skiptiviðskipti. Útfærsla setuglugga er sýnd í Listing 5.5 (sem er að finna í src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Þú hefur nú þegar séð flestar aðgerðir í þessari staðfræði, svo það er engin þörf á að rifja þær upp hér aftur. En það eru líka nokkrir nýir þættir hér, sem við munum nú ræða.

Sérhver groupBy aðgerð framkvæmir venjulega einhvers konar söfnunaraðgerð (söfnun, uppsöfnun eða talningu). Þú getur annað hvort framkvæmt uppsafnaða söfnun með hlaupandi heildarupphæð, eða gluggasöfnun, sem tekur mið af færslum innan tiltekins tímaglugga.

Kóðinn í skráningu 5.5 telur fjölda viðskipta innan setuglugga. Í mynd. 5.13 þessar aðgerðir eru greindar skref fyrir skref.

Með því að kalla windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) búum við til lotuglugga með 20 sekúndna óvirkni og 15 mínútur. 20 sekúndna aðgerðaleysi þýðir að forritið mun innihalda allar færslur sem berast innan 20 sekúndna frá lokum eða upphafi núverandi lotu inn í núverandi (virka) lotu.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Næst tilgreinum við hvaða söfnunaraðgerð þarf að framkvæma í lotuglugganum - í þessu tilviki, telja. Ef færsla sem kemur inn fellur utan óvirknigluggans (hvoru megin við dagsetningar-/tímastimpilinn), býr forritið til nýja lotu. Varðveislubil þýðir að viðhalda lotu í ákveðinn tíma og leyfa seint gögn sem ná út fyrir óvirknitímabil lotunnar en samt er hægt að tengja hana við. Að auki samsvarar upphaf og lok nýju lotunnar sem verður til við sameininguna fyrsta og nýjasta dag-/tímastimplinum.

Við skulum skoða nokkrar færslur úr talningaraðferðinni til að sjá hvernig lotur virka (tafla 5.1).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Þegar skrár berast leitum við að núverandi lotum með sama lykli, lokatíma sem er minni en núverandi dagsetning/tímastimpill - óvirknibil og upphafstími sem er stærri en núverandi dagsetning/tímastimpill + óvirknibil. Að teknu tilliti til þessa, fjórar færslur úr töflu. 5.1 sameinast í eina lotu sem hér segir.

1. Færsla 1 kemur fyrst, þannig að upphafstíminn er jöfn lokatímanum og er 00:00:00.

2. Næst kemur færsla 2 og við leitum að lotum sem lýkur ekki fyrr en 23:59:55 og hefjast eigi síðar en 00:00:35. Við finnum met 1 og sameinum lotu 1 og 2. Við tökum upphafstíma lotu 1 (fyrr) og lokatíma lotu 2 (síðar), þannig að nýja lotan okkar byrjar klukkan 00:00:00 og lýkur klukkan 00: 00:15.

3. Upptaka 3 kemur, við leitum að fundum á milli 00:00:30 og 00:01:10 og finnum enga. Bættu við annarri lotu fyrir lykilinn 123-345-654,FFBE, sem byrjar og lýkur klukkan 00:00:50.

4. Plata 4 kemur og við erum að leita að lotum á milli 23:59:45 og 00:00:25. Að þessu sinni finnast bæði lotur 1 og 2. Allar þrjár loturnar eru sameinaðar í eina, með upphafstíma 00:00:00 og lokatími 00:00:15.

Af því sem lýst er í þessum kafla er vert að muna eftir eftirfarandi mikilvægu blæbrigði:

  • lotur eru ekki gluggar í fastri stærð. Lengd fundar ræðst af starfseminni innan ákveðins tíma;
  • Dagsetningar-/tímastimplarnir í gögnunum ákvarða hvort atburðurinn falli innan núverandi lotu eða á aðgerðalausu tímabili.

Næst munum við ræða næstu tegund glugga - "velta" glugga.

"Villar" gluggar

Veltandi gluggar fanga atburði sem falla innan ákveðins tíma. Ímyndaðu þér að þú þurfir að fanga öll hlutabréfaviðskipti ákveðins fyrirtækis á 20 sekúndna fresti, svo þú safnar öllum atburðum á því tímabili. Í lok 20 sekúndna bilsins veltur glugginn og færist yfir í nýtt 20 sekúndna athugunarbil. Mynd 5.14 sýnir þessa stöðu.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Eins og þú sérð eru allir atburðir sem berast á síðustu 20 sekúndum með í glugganum. Í lok þessa tímabils er nýr gluggi búinn til.

Skráning 5.6 sýnir kóða sem sýnir notkun veltandi glugga til að fanga hlutabréfaviðskipti á 20 sekúndna fresti (finnst í src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Með þessari litlu breytingu á TimeWindows.of aðferðarkallinu geturðu notað veltandi glugga. Þetta dæmi kallar ekki til() aðferðina, þannig að sjálfgefið varðveislubil 24 klukkustundir verður notað.

Að lokum er kominn tími til að fara yfir í síðasta gluggavalkostinn - "hoppandi" glugga.

Rennandi ("hoppandi") gluggar

Renni-/hoppgluggar líkjast veltandi gluggum, en með smá mun. Rennandi gluggar bíða ekki þar til tímabilinu lýkur áður en þú býrð til nýjan glugga til að vinna úr nýlegum atburðum. Þeir hefja nýja útreikninga eftir styttri biðtíma en gluggatímann.

Til að sýna muninn á veltandi og stökkgluggum skulum við snúa aftur að dæminu um að telja kauphallarviðskipti. Markmið okkar er enn að telja fjölda færslna, en við viljum ekki bíða allan tímann áður en teljarinn er uppfærður. Þess í stað munum við uppfæra teljarann ​​með styttri millibili. Til dæmis munum við samt telja fjölda viðskipta á 20 sekúndna fresti, en uppfæra teljarann ​​á 5 sekúndna fresti, eins og sýnt er á mynd. 5.15. Í þessu tilviki endum við með þrjá niðurstöðuglugga með gögnum sem skarast.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Skráning 5.7 sýnir kóðann til að skilgreina renniglugga (finnst í src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Hægt er að breyta veltandi glugga í hoppglugga með því að bæta kalli við advanceBy() aðferðina. Í dæminu sem sýnt er er vistunarbilið 15 mínútur.

Þú sást í þessum hluta hvernig á að takmarka niðursöfnunarniðurstöður við tímaglugga. Sérstaklega vil ég að þú munir eftir eftirfarandi þremur hlutum úr þessum hluta:

  • stærð setuglugga er ekki takmörkuð af tímabili, heldur af virkni notenda;
  • „veltandi“ gluggar veita yfirsýn yfir atburði innan ákveðins tíma;
  • Lengd stökkglugga er fast, en þeir eru uppfærðir oft og geta innihaldið færslur sem skarast í öllum gluggum.

Næst munum við læra hvernig á að breyta KTable aftur í KStream fyrir tengingu.

5.3.3. Að tengja KStream og KTable hluti

Í kafla 4 ræddum við að tengja tvo KStream hluti. Nú verðum við að læra hvernig á að tengja KTable og KStream. Þetta gæti verið nauðsynlegt af eftirfarandi einföldu ástæðu. KStream er straumur af færslum og KTable er straumur af færsluuppfærslum, en stundum gætirðu viljað bæta viðbótarsamhengi við færslustrauminn með því að nota uppfærslur frá KTable.

Tökum gögn um fjölda kauphallarviðskipta og sameinum þau við kauphallarfréttir fyrir viðkomandi atvinnugreinar. Hér er það sem þú þarft að gera til að ná þessu miðað við kóðann sem þú ert nú þegar með.

  1. Umbreyttu KTable hlut með gögnum um fjölda hlutabréfaviðskipta í KStream, fylgt eftir með því að skipta út lyklinum fyrir lykilinn sem gefur til kynna iðnaðargeirann sem samsvarar þessu hlutabréfatákni.
  2. Búðu til KTable hlut sem les gögn úr efni með kauphallarfréttum. Þessi nýja KTable verður flokkuð eftir atvinnugreinum.
  3. Tengdu fréttauppfærslur við upplýsingar um fjölda kauphallarviðskipta eftir atvinnugreinum.

Nú skulum við sjá hvernig á að hrinda þessari aðgerðaáætlun í framkvæmd.

Umbreyttu KTable í KStream

Til að breyta KTable í KStream þarftu að gera eftirfarandi.

  1. Hringdu í KTable.toStream() aðferðina.
  2. Með því að kalla á KStream.map aðferðina, skiptu lykilnum út fyrir iðnaðarheitið og sæktu síðan TransactionSummary hlutinn úr Windowed tilvikinu.

Við munum hlekkja þessar aðgerðir saman sem hér segir (kóðann er að finna í skránni src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Skrá 5.8).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Vegna þess að við erum að framkvæma KStream.map aðgerð er KStream tilvikið sem skilað er aftur skipt sjálfkrafa þegar það er notað í tengingu.

Við höfum lokið umbreytingarferlinu, næst þurfum við að búa til KTable hlut til að lesa hlutabréfafréttir.

Stofnun KTable fyrir hlutabréfafréttir

Sem betur fer tekur aðeins eina línu af kóða til að búa til KTable hlut (kóðann er að finna í src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Skrá 5.9).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Það er athyglisvert að ekki þarf að tilgreina Serde hluti, þar sem strengur Serdes er notaður í stillingunum. Einnig, með því að nota FYRSTU upptalninguna, er taflan fyllt með færslum alveg í upphafi.

Nú getum við haldið áfram í síðasta skrefið - tengingu.

Að tengja fréttauppfærslur við færslutalningargögn

Það er ekki erfitt að búa til tengingu. Við munum nota vinstri samtengingu ef það eru engar hlutabréfafréttir fyrir viðkomandi iðnað (nauðsynlegan kóða er að finna í skránni src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Skrá 5.10).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Þessi leftJoin rekstraraðili er frekar einfaldur. Ólíkt joinunum í kafla 4 er JoinWindow aðferðin ekki notuð vegna þess að þegar þú framkvæmir KStream-KTable join er aðeins ein færsla í KTable fyrir hvern lykil. Slík tenging er ekki takmörkuð í tíma: skráin er annaðhvort í KTable eða fjarverandi. Meginniðurstaðan: með því að nota KTable hluti geturðu auðgað KStream með sjaldnar uppfærðum viðmiðunargögnum.

Nú munum við skoða skilvirkari leið til að auðga viðburði frá KStream.

5.3.4. GlobalKTable hlutir

Eins og þú sérð er þörf á að auðga viðburðarstrauma eða bæta samhengi við þá. Í kafla 4 sástu tengingarnar á milli tveggja KStream-hluta og í fyrri hlutanum sástu tenginguna á milli KStream og KTable. Í öllum þessum tilfellum er nauðsynlegt að skipta gagnastraumnum aftur í skiptingu þegar lyklarnir eru kortlagðir á nýja gerð eða gildi. Stundum er endurskipting gerð beinlínis og stundum gerir Kafka Streams það sjálfkrafa. Endurskiptingu er nauðsynleg vegna þess að lyklarnir hafa breyst og færslurnar verða að enda í nýjum hlutum, annars verður tengingin ómöguleg (þetta var rætt í kafla 4, í kaflanum „Endurskipting gagna“ í undirkafla 4.2.4).

Endurskiptingu hefur kostnað í för með sér

Endurskipting krefst kostnaðar - viðbótarkostnaðar til að búa til milliefni, geyma afrit gagna í öðru efni; það þýðir líka aukna leynd vegna ritunar og lestrar úr þessu efni. Að auki, ef þú þarft að sameinast yfir fleiri en einn þátt eða vídd, verður þú að keðja samskeytin, kortleggja færslurnar með nýjum lyklum og keyra endurskiptingarferlið aftur.

Tengist minni gagnasöfnum

Í sumum tilfellum er magn tilvísunargagna sem á að tengja tiltölulega lítið, þannig að heil eintök af þeim geta auðveldlega passað staðbundið á hvern hnút. Fyrir aðstæður sem þessar býður Kafka Streams upp á GlobalKTable bekknum.

GlobalKTable tilvik eru einstök vegna þess að forritið endurtekur öll gögn á hvern hnút. Og þar sem öll gögn eru til staðar á hverjum hnút er engin þörf á að skipta atburðarstraumnum í skiptingu með tilvísunargagnalykli þannig að það sé aðgengilegt öllum skiptingum. Þú getur líka búið til lyklalausa tengingu með GlobalKTable hlutum. Við skulum fara aftur í eitt af fyrri dæmunum til að sýna fram á þennan eiginleika.

Að tengja KStream hluti við GlobalKTable hluti

Í undirkafla 5.3.2 framkvæmdum við gluggasamsöfnun gengisviðskipta kaupenda. Niðurstöður þessarar samsöfnunar litu einhvern veginn svona út:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Þó að þessar niðurstöður þjónuðu tilgangi hefði það verið gagnlegra ef nafn viðskiptavinar og fullt nafn fyrirtækis hefði einnig verið birt. Til að bæta við nafni viðskiptavinarins og nafni fyrirtækis geturðu gert venjulega samskeyti, en þú þarft að gera tvær lyklavörpingar og skipting aftur. Með GlobalKTable geturðu forðast kostnað við slíkar aðgerðir.

Til að gera þetta munum við nota countStream hlutinn úr Listing 5.11 (samsvarandi kóða er að finna í src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) og tengja hann við tvo GlobalKTable hluti.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Við höfum þegar rætt þetta áður, svo ég mun ekki endurtaka það. En ég tek eftir því að kóðinn í toStream().map fallinu er tekinn upp í fallhlut í stað innbyggðrar lambda tjáningar til að auðvelda læsileika.

Næsta skref er að lýsa yfir tveimur tilfellum af GlobalKTable (kóðann sem sýndur er er að finna í skránni src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Skrá 5.12).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“

Vinsamlegast athugaðu að efnisheiti eru lýst með upptalnum gerðum.

Nú þegar við erum með alla íhlutina tilbúna, er allt sem eftir er að skrifa kóðann fyrir tenginguna (sem er að finna í skránni src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Skrá 5.13).

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Þrátt fyrir að það séu tveir samskeyti í þessum kóða eru þeir hlekkjaðir vegna þess að hvorug niðurstöður þeirra eru notaðar sérstaklega. Niðurstöðurnar birtast í lok allrar aðgerðarinnar.

Þegar þú keyrir ofangreinda join-aðgerð færðu niðurstöður eins og þessar:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Kjarninn hefur ekki breyst, en þessar niðurstöður virðast skýrari.

Ef þú telur niður í kafla 4 hefurðu þegar séð nokkrar tegundir af tengingum í gangi. Þau eru skráð í töflu. 5.2. Þessi tafla endurspeglar tengimöguleika frá útgáfu 1.0.0 af Kafka Streams; Eitthvað gæti breyst í komandi útgáfum.

Bókin „Kafka straumar í verki. Forrit og örþjónustur fyrir rauntímavinnu“
Til að klára hlutina skulum við rifja upp grunnatriðin: þú getur tengt viðburðarstrauma (KStream) og uppfært strauma (KTable) með því að nota staðbundið ástand. Að öðrum kosti, ef stærð viðmiðunargagna er ekki of stór, geturðu notað GlobalKTable hlutinn. GlobalKTables endurtaka allar skiptingarnar á hvern Kafka Streams forritshnút og tryggja að öll gögn séu tiltæk óháð því hvaða skipting lykillinn samsvarar.

Næst munum við sjá Kafka Streams eiginleikann, þökk sé honum getum fylgst með ástandsbreytingum án þess að neyta gagna frá Kafka efni.

5.3.5. Spurningahæft ástand

Við höfum þegar framkvæmt nokkrar aðgerðir sem fela í sér ástand og sendum alltaf niðurstöðurnar út á stjórnborðið (í þróunarskyni) eða skrifum þær á efni (í framleiðslutilgangi). Þegar þú skrifar niðurstöður fyrir efni þarftu að nota Kafka neytanda til að skoða þær.

Að lesa gögn úr þessum viðfangsefnum getur talist tegund raunhæfra skoðana. Í okkar tilgangi getum við notað skilgreininguna á efnislegri skoðun frá Wikipedia: „... líkamlegur gagnagrunnshlutur sem inniheldur niðurstöður fyrirspurnar. Til dæmis gæti það verið staðbundið afrit af fjarlægum gögnum, eða undirmengi af línum og/eða dálkum töflu eða sameinað niðurstöður, eða yfirlitstafla sem fengin er með söfnun“ (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams gerir þér einnig kleift að keyra gagnvirkar fyrirspurnir í ríkisverslunum, sem gerir þér kleift að lesa þessar raunhæfu skoðanir beint. Það er mikilvægt að hafa í huga að fyrirspurnin til ríkisverslunarinnar er skrifvarinn aðgerð. Þetta tryggir að þú þarft ekki að hafa áhyggjur af því að gera ástand ósamræmis fyrir slysni meðan forritið þitt er að vinna úr gögnum.

Hæfni til að leita beint eftir ríkisverslunum er mikilvæg. Þetta þýðir að þú getur búið til mælaborðsforrit án þess að þurfa fyrst að sækja gögn frá Kafka neytandanum. Það eykur einnig skilvirkni forritsins, vegna þess að það er engin þörf á að skrifa gögn aftur:

  • þökk sé staðsetningu gagnanna er hægt að nálgast þau fljótt;
  • tvíföldun gagna er eytt þar sem þau eru ekki skrifuð á ytri geymslu.

Það helsta sem ég vil að þú munir er að þú getur spurt um stöðu beint úr umsókn þinni. Það er ekki hægt að ofmeta tækifærin sem þetta gefur þér. Í stað þess að neyta gagna frá Kafka og geyma færslur í gagnagrunni fyrir forritið, geturðu leitað til ríkisverslana með sömu niðurstöðu. Beinar fyrirspurnir til ríkisverslana þýðir minni kóða (enginn neytandi) og minni hugbúnað (engin þörf á gagnagrunnstöflu til að geyma niðurstöðurnar).

Við höfum farið yfir töluvert af jörðu í þessum kafla, svo við látum umfjöllun okkar um gagnvirkar fyrirspurnir gegn ríkisverslunum standa í bili. En ekki hafa áhyggjur: í kafla 9 munum við búa til einfalt mælaborðsforrit með gagnvirkum fyrirspurnum. Það mun nota nokkur af dæmunum úr þessum og fyrri köflum til að sýna fram á gagnvirkar fyrirspurnir og hvernig þú getur bætt þeim við Kafka Streams forrit.

Yfirlit

  • KStream hlutir tákna strauma atburða, sambærilega við innskot í gagnagrunn. KTable hlutir tákna uppfærslustrauma, meira eins og uppfærslur á gagnagrunni. Stærð KTable hlutarins vex ekki, gömlum plötum er skipt út fyrir nýjar.
  • KTable hlutir eru nauðsynlegir fyrir söfnunaraðgerðir.
  • Með því að nota gluggaaðgerðir geturðu skipt uppsöfnuðum gögnum í tímaflokka.
  • Þökk sé GlobalKTable hlutum geturðu nálgast tilvísunargögn hvar sem er í forritinu, óháð skiptingu.
  • Tengingar á milli KStream, KTable og GlobalKTable hluta eru mögulegar.

Hingað til höfum við einbeitt okkur að því að byggja Kafka Streams forrit með því að nota KStream DSL á háu stigi. Þrátt fyrir að háþróað nálgun gerir þér kleift að búa til snyrtileg og hnitmiðuð forrit, þá er það málamiðlun að nota hana. Að vinna með DSL KStream þýðir að auka nákvæmni kóðans með því að draga úr stjórnunarstigi. Í næsta kafla ætlum við að skoða lágstig stjórnunarhnút API og reyna önnur málamiðlun. Forritin verða lengri en þau voru áður, en við munum geta búið til næstum hvaða stjórnunarhnút sem við gætum þurft.

→ Nánari upplýsingar um bókina má finna á heimasíðu útgefanda

→ Fyrir Habrozhiteli 25% afslátt með því að nota afsláttarmiða - Kafka straumar

→ Við greiðslu fyrir pappírsútgáfu bókarinnar er rafbók send með tölvupósti.

Heimild: www.habr.com

Bæta við athugasemd