Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl" Bonjou, rezidan Khabro! Liv sa a apwopriye pou nenpòt pwomotè ki vle konprann pwosesis fil. Konprann pwogram distribiye pral ede w pi byen konprann Kafka ak Kafka Streams. Li ta bon pou konnen kad Kafka tèt li, men sa a pa nesesè: ​​Mwen pral di ou tout sa ou bezwen. Devlopè Kafka ki gen eksperyans ak inisyasyon yo pral aprann kijan pou kreye aplikasyon pou pwosesis kouran ki enteresan lè l sèvi avèk bibliyotèk Kafka Streams nan liv sa a. Devlopè Java entèmedyè ak avanse ki deja abitye ak konsèp tankou serializasyon pral aprann aplike konpetans yo pou kreye aplikasyon Kafka Streams. Kòd sous liv la ekri nan Java 8 epi li fè anpil itilizasyon sentaks ekspresyon lambda Java 8, kidonk konnen kijan pou travay ak fonksyon lambda (menm nan yon lòt langaj pwogramasyon) ap itil.

Ekstrè. 5.3. Agrégation ak fennèt operasyon yo

Nan seksyon sa a, nou pral kontinye eksplore pati ki pi pwomèt nan Kafka Streams. Jiskaprezan nou te kouvri aspè sa yo nan Kafka Streams:

  • kreye yon topoloji pwosesis;
  • lè l sèvi avèk eta nan aplikasyon difizyon;
  • fè koneksyon kouran done;
  • diferans ki genyen ant kouran evènman (KStream) ak kouran ajou (KTable).

Nan egzanp sa yo nou pral mete tout eleman sa yo ansanm. Ou pral aprann tou sou fenèt, yon lòt gwo karakteristik nan aplikasyon difizyon. Premye egzanp nou an pral yon senp agrégation.

5.3.1. Agregasyon nan lavant stock pa sektè endistri

Agregasyon ak gwoupman se zouti enpòtan anpil lè w ap travay ak done difizyon. Egzamen dosye endividyèl yo pandan y ap resevwa yo souvan pa ase. Pou ekstrè enfòmasyon adisyonèl nan done, li nesesè pou gwoupe yo ak konbine yo.

Nan egzanp sa a, ou pral mete sou kostim yon machann vann jou ki bezwen swiv volim lavant nan aksyon nan konpayi nan plizyè endistri. Espesyalman, ou enterese nan senk konpayi yo ki gen pi gwo lavant pati nan chak endistri.

Agrégation sa yo pral mande pou plizyè etap sa yo pou tradui done yo nan fòm ou vle a (pale an tèm jeneral).

  1. Kreye yon sous ki baze sou sijè ki pibliye enfòmasyon komès anvan tout koreksyon. Nou pral gen kat yon objè ki kalite StockTransaction nan yon objè ki kalite ShareVolume. Pwen an se ke objè a StockTransaction gen metadata lavant, men nou sèlman bezwen done sou kantite aksyon yo te vann.
  2. Gwoup ShareVolume done pa senbòl stock. Yon fwa gwoupe pa senbòl, ou ka tonbe done sa yo nan subtotal volim lavant stock. Li se vo sonje ke metòd la KStream.groupBy retounen yon egzanp nan kalite KGroupedStream. Epi ou ka jwenn yon egzanp KTable lè w rele plis metòd KGroupedStream.reduce.

Ki sa ki koòdone KGroupedStream la

Metòd KStream.groupBy ak KStream.groupByKey retounen yon egzanp KGroupedStream. KGroupedStream se yon reprezantasyon entèmedyè nan yon kouran evènman apre gwoupman pa kle. Li pa ditou fèt pou travay dirèk ak li. Olye de sa, yo itilize KGroupedStream pou operasyon agrégasyon, ki toujou lakòz yon KTable. Epi depi rezilta a nan operasyon agrégation se yon KTable epi yo itilize yon magazen leta, li posib ke se pa tout mizajou kòm yon rezilta yo voye pi lwen desann tiyo a.

Metòd KTable.groupBy la retounen yon KGroupedTable ki sanble - yon reprezantasyon entèmedyè kouran mizajou, regroupe pa kle.

Ann pran yon ti repo epi gade Fig. 5.9, ki montre sa nou reyalize. Topoloji sa a ta dwe deja trè abitye pou ou.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Ann gade kounye a kòd pou topoloji sa a (li ka jwenn nan dosye src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lis 5.2).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Kòd yo bay yo distenge pa konsizyon li yo ak gwo volim aksyon ki fèt nan plizyè liy. Ou ka remake yon bagay nouvo nan premye paramèt metòd builder.stream la: yon valè enum kalite AutoOffsetReset.EARLIEST (gen tou yon DÈNYE), mete lè l sèvi avèk metòd Consumed.withOffsetResetPolicy. Kalite enimerasyon sa a ka itilize pou presize yon estrateji reset konpanse pou chak KStream oswa KTable epi li pran priyorite sou opsyon reset konpanse nan konfigirasyon an.

GroupByKey ak GroupBy

Koòdone KStream la gen de metòd pou gwoupe dosye: GroupByKey ak GroupBy. Tou de retounen yon KGroupedTable, kidonk ou ta ka mande ki diferans ki genyen ant yo ak ki lè yo sèvi ak youn?

Metòd GroupByKey yo itilize lè kle yo nan KStream la deja pa vid. Ak sa ki pi enpòtan, drapo "mande pou re-partitionnement" pa janm mete.

Metòd GroupBy a sipoze ke ou te chanje kle gwoupman yo, kidonk drapo repartisyon an mete sou vre. Fè rantre, agrégation, elatriye apre metòd GroupBy la pral lakòz otomatik re-partitioning.
Rezime: Chak fwa sa posib, ou ta dwe itilize GroupByKey olye ke GroupBy.

Li klè kisa metòd mapValues ​​ak groupBy fè, kidonk ann gade metòd sum() (ki jwenn nan src/main/java/bbejeck/model/ShareVolume.java) (Lis 5.3).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Metòd ShareVolume.sum la retounen total volim lavant stock, ak rezilta tout chèn kalkil la se yon objè KTable. . Kounye a ou konprann wòl KTable jwe. Lè objè ShareVolume rive, objè KTable ki koresponn lan estoke dènye aktyalizasyon aktyèl la. Li enpòtan pou sonje ke tout mizajou yo reflete nan shareVolumeKTable anvan an, men se pa tout yo voye pi lwen.

Lè sa a, nou itilize KTable sa a pou rasanble (pa kantite aksyon ki te fè kòmès) pou rive nan senk konpayi yo ki gen pi gwo volim aksyon ki te fè kòmès nan chak endistri. Aksyon nou yo nan ka sa a pral sanble ak sa yo pou premye rasanbleman an.

  1. Fè yon lòt groupBy operasyon pou gwoupe endividyèl ShareVolume objè pa endistri.
  2. Kòmanse rezime objè ShareVolume. Fwa sa a, objè aggregasyon an se yon keu priyorite gwosè fiks. Nan keu sa a gwosè fiks, se sèlman senk konpayi yo ki gen pi gwo kantite aksyon vann yo kenbe.
  3. Map ke moun kap kriye yo soti nan paragraf anvan an nan yon valè fisèl epi retounen senk pi gwo aksyon ki pi komès pa nimewo pa endistri.
  4. Ekri rezilta yo nan fòm fisèl nan sijè a.

Nan Fig. Figi 5.10 montre graf topoloji koule done yo. Kòm ou ka wè, dezyèm tou nan pwosesis se byen senp.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Kounye a ke nou gen yon konpreyansyon klè sou estrikti dezyèm tou sa a nan pwosesis, nou ka ale nan kòd sous li yo (ou pral jwenn li nan dosye a src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lis 5.4) .

Inisyalisateur sa a gen yon varyab fixedQueue. Sa a se yon objè koutim ki se yon adaptè pou java.util.TreeSet ki itilize pou swiv N rezilta yo nan lòd desandan aksyon yo te fè kòmès.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Ou te deja wè groupBy ak mapValues ​​apèl yo, kidonk nou pa pral antre nan sa yo (nou ap rele metòd KTable.toStream paske metòd KTable.print depreche). Men, ou poko wè vèsyon KTable aggregate() an, kidonk nou pral pase yon ti tan diskite sou sa.

Kòm ou sonje, sa ki fè KTable diferan se ke dosye ki gen menm kle yo konsidere kòm mizajou. KTable ranplase ansyen antre a ak yon nouvo. Agregasyon fèt nan yon fason menm jan an: dènye dosye yo ak menm kle a yo rasanble. Lè yon dosye rive, li ajoute nan egzanp klas FixedSizePriorityQueue lè l sèvi avèk yon ajoute (dezyèm paramèt nan apèl metòd total la), men si yon lòt dosye deja egziste ak menm kle a, Lè sa a, ansyen dosye a retire lè l sèvi avèk yon soustrè (twazyèm paramèt nan apèl metòd total la).

Tout sa vle di ke agrégateur nou an, FixedSizePriorityQueue, pa rasanble tout valè ak yon sèl kle, men sere yon sòm k ap deplase nan kantite N ki pi fè kòmès nan aksyon yo. Chak antre fèk ap rantre genyen kantite total aksyon ki vann jiskaprezan. KTable ap ba ou enfòmasyon sou ki aksyon konpayi yo ki pi fè kòmès kounye a, san yo pa mande pou chak ajou.

Nou te aprann fè de bagay enpòtan:

  • gwoup valè nan KTable pa yon kle komen;
  • fè operasyon itil tankou rollup ak agrégation sou valè gwoupe sa yo.

Konnen kijan pou fè operasyon sa yo enpòtan pou konprann siyifikasyon done yo k ap deplase atravè yon aplikasyon Kafka Streams epi konprann ki enfòmasyon li pote.

Nou te rasanble tou kèk nan konsèp kle yo te diskite pi bonè nan liv sa a. Nan Chapit 4, nou te diskite sou fason eta lokal ki toleran ak fay enpòtan pou yon aplikasyon difizyon. Premye egzanp nan chapit sa a te montre poukisa eta lokal la tèlman enpòtan—li ba ou kapasite pou kenbe tras enfòmasyon ou te deja wè. Aksè lokal yo evite reta nan rezo a, sa ki fè aplikasyon an pi efikas ak reziste erè.

Lè w ap fè nenpòt operasyon totalman oswa agrégasyon, ou dwe presize non magazen eta a. Operasyon rasanbleman ak agrégasyon yo retounen yon egzanp KTable, epi KTable itilize depo eta pou ranplase ansyen rezilta yo ak nouvo rezilta yo. Kòm ou te wè, se pa tout mizajou yo voye desann nan tiyo a, e sa a enpòtan paske operasyon agrégation yo fèt pou pwodui enfòmasyon rezime. Si ou pa aplike eta lokal la, KTable ap voye tout rezilta agrégasyon ak rasanbleman yo.

Apre sa, nou pral gade nan fè operasyon tankou agrégation nan yon peryòd tan espesifik - sa yo rele operasyon fenèt.

5.3.2. Operasyon fenèt yo

Nan seksyon anvan an, nou te prezante glisman konvolusyon ak agrégation. Aplikasyon an te fè yon woule kontinyèl nan volim lavant stock, ki te swiv pa agrégation nan senk aksyon ki pi fè kòmès sou echanj la.

Pafwa sa yo agrégation kontinyèl ak woule-up nan rezilta yo nesesè. Epi pafwa ou bezwen fè operasyon sèlman sou yon peryòd tan bay yo. Pou egzanp, kalkile konbyen tranzaksyon echanj yo te fèt ak aksyon nan yon konpayi an patikilye nan 10 dènye minit yo. Oswa konbyen itilizatè yo klike sou yon nouvo banyè piblisite nan dènye 15 minit yo. Yon aplikasyon ka fè operasyon sa yo plizyè fwa, men ak rezilta ki aplike sèlman nan peryòd tan espesifik (fenèt tan).

Konte tranzaksyon echanj pa achtè

Nan pwochen egzanp lan, nou pral swiv tranzaksyon aksyon atravè plizyè komèsan—swa gwo òganizasyon oswa bayeur endividyèl entelijan.

Gen de rezon posib pou swiv sa a. Youn nan yo se bezwen pou konnen sa lidè mache ap achte/vann. Si gwo jwè sa yo ak envestisè sofistike yo wè opòtinite, li fè sans pou yo swiv estrateji yo. Dezyèm rezon an se dezi a tach nenpòt siy posib nan komès inisye ilegal. Pou w fè sa, w ap bezwen analize korelasyon gwo kliyan ki pou vann pointes ak communiqués de près enpòtan.

Suivi sa yo konsiste de etap sa yo:

  • kreye yon kouran pou lekti nan sijè a stock-tranzaksyon;
  • gwoupman dosye fèk ap rantre pa ID achtè ak senbòl stock. Lè w rele metòd groupBy la, retounen yon egzanp klas KGroupedStream la;
  • Metòd KGroupedStream.windowedBy la retounen yon kouran done limite a yon fenèt tan, ki pèmèt agrégasyon fenèt. Tou depan de kalite fenèt la, swa yon TimeWindowedKStream oswa yon SessionWindowedKStream retounen;
  • konte tranzaksyon pou operasyon an agrégation. Flux done fennèt la detèmine si yo pran an kont yon dosye patikilye nan konte sa a;
  • ekri rezilta nan yon sijè oswa pwodiksyon yo nan konsole a pandan devlopman.

Topoloji aplikasyon sa a se senp, men yon foto klè sou li ta itil. Ann pran yon gade nan Fig. 5.11.

Apre sa, nou pral gade fonksyonalite operasyon fenèt yo ak kòd ki koresponn lan.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"

Kalite fenèt yo

Gen twa kalite fenèt nan Kafka Streams:

  • sesyonè;
  • "tonbe" (tonbe);
  • glisman / sote.

Kilès pou chwazi depann sou kondisyon biznis ou. Tumbling ak sote fenèt yo limite nan tan, pandan y ap fenèt sesyon yo limite pa aktivite itilizatè a-se dire a nan sesyon an (yo) detèmine sèlman pa ki jan aktif itilizatè a ye. Bagay pwensipal lan sonje se ke tout kalite fenèt yo baze sou koupon pou dat/lè nan antre yo, pa tan nan sistèm.

Apre sa, nou aplike topoloji nou an ak chak kalite fenèt yo. Yo pral bay kòd konplè a sèlman nan premye egzanp pou lòt kalite fenèt pa gen anyen ki pral chanje eksepte ki kalite operasyon fenèt la.

Fenèt sesyon yo

Fenèt sesyon yo trè diferan de tout lòt kalite fenèt yo. Yo limite pa tèlman pa tan ke pa aktivite a nan itilizatè a (oswa aktivite a nan antite a ke ou ta renmen swiv). Fenèt sesyon yo delimite pa peryòd inaktivite.

Figi 5.12 montre konsèp fenèt sesyon yo. Sesyon ki pi piti a pral rantre ak sesyon ki sou bò gòch li. Ak sesyon an sou bò dwat la pral separe paske li swiv yon peryòd tan nan inaktivite. Fenèt sesyon yo baze sou aktivite itilizatè yo, men sèvi ak koupon pou dat/lè nan antre yo pou detèmine nan ki sesyon antre a fè pati.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"

Sèvi ak fenèt sesyon yo pou swiv tranzaksyon aksyon yo

Ann sèvi ak fenèt sesyon yo pou pran enfòmasyon sou tranzaksyon echanj yo. Aplikasyon fenèt sesyon yo montre nan Lis 5.5 (ki ka jwenn nan src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Ou te deja wè pi fò nan operasyon yo nan topoloji sa a, kidonk pa gen okenn bezwen gade nan yo ankò isit la. Men, gen tou plizyè eleman nouvo isit la, ki nou pral kounye a diskite.

Nenpòt operasyon groupBy tipikman fè kèk kalite operasyon agrégation (agrégation, woule, oswa konte). Ou ka fè swa agrégation kimilatif ak yon total kouri, oswa agrégasyon fenèt, ki pran an kont dosye nan yon fenèt tan espesifye.

Kòd ki nan Lis 5.5 konte kantite tranzaksyon ki genyen nan fenèt sesyon yo. Nan Fig. 5.13 aksyon sa yo analize etap pa etap.

Lè nou rele windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) nou kreye yon fenèt sesyon ak yon entèval inaktivite 20 segonn ak yon entèval pèsistans 15 minit. Yon entèval san fè anyen konsa nan 20 segonn vle di ke aplikasyon an pral gen ladan nenpòt antre ki rive nan 20 segonn nan fen oswa kòmanse nan sesyon aktyèl la nan sesyon aktyèl la (aktif).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Apre sa, nou presize ki operasyon agrégation ki bezwen fèt nan fenèt sesyon an - nan ka sa a, konte. Si yon antre ki fèk ap rantre tonbe deyò fenèt inaktivite a (nenpòt kote nan koupon pou dat/lè), aplikasyon an kreye yon nouvo sesyon. Entèval retansyon vle di kenbe yon sesyon pou yon sèten kantite tan epi pèmèt done an reta ki pwolonje pi lwen pase peryòd inaktivite sesyon an, men yo ka toujou tache. Anplis de sa, kòmansman ak fen nouvo sesyon ki soti nan fizyon an koresponn ak pi bonè ak dènye koupon pou dat/lè.

Ann gade kèk antre nan metòd konte a pou wè ki jan sesyon yo fonksyone (Tablo 5.1).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Lè dosye yo rive, nou chèche sesyon ki egziste deja yo ak menm kle a, yon tan fini mwens pase koupon pou dat/tan aktyèl la - entèval inaktivite, ak yon tan kòmanse ki pi gran pase koupon pou dat/tan aktyèl la + entèval inaktivite. Lè w ap pran sa a an kont, kat antre nan tab la. 5.1 yo fizyone nan yon sèl sesyon jan sa a.

1. Dosye 1 rive an premye, kidonk lè kòmansman an egal ak lè fen a e li se 00:00:00.

2. Apre sa, antre 2 rive, epi nou chèche sesyon ki fini pa pi bonè pase 23:59:55 epi ki kòmanse pa pita pase 00:00:35. Nou jwenn dosye 1 epi konbine sesyon 1 ak 2. Nou pran lè kòmansman sesyon 1 (pi bonè) ak lè fen sesyon 2 (pita), pou nouvo sesyon nou an kòmanse a 00:00:00 epi fini a 00:00: 15:XNUMX.

3. Dosye 3 rive, nou chèche seyans ant 00:00:30 ak 00:01:10 epi nou pa jwenn okenn. Ajoute yon dezyèm sesyon pou kle a 123-345-654,FFBE, kòmanse ak fini a 00:00:50.

4. Dosye 4 rive epi n ap chèche seyans ant 23:59:45 ak 00:00:25. Fwa sa a, tou de sesyon 1 ak 2 yo jwenn tout twa sesyon yo konbine nan yon sèl, ak yon lè kòmanse nan 00:00:00 ak yon lè fini nan 00:00:15.

Soti nan sa ki dekri nan seksyon sa a, li vo sonje nuans enpòtan sa yo:

  • sesyon yo pa fenèt gwosè fiks. Se aktivite a ki detèmine dire a nan yon sesyon nan yon peryòd de tan bay;
  • Koupon pou dat/lè nan done yo detèmine si evènman an tonbe nan yon sesyon ki egziste deja oswa pandan yon peryòd san fè anyen konsa.

Apre sa, nou pral diskite sou pwochen kalite fenèt la - "tonbe" fenèt yo.

"Tumbling" fenèt yo

Tumbling fenèt yo kaptire evènman ki tonbe nan yon sèten peryòd tan. Imajine ke ou bezwen pran tout tranzaksyon yo stock nan yon konpayi sèten chak 20 segonn, kidonk, ou kolekte tout evènman yo pandan peryòd tan sa a. Nan fen entèval 20 segonn, fenèt la woule epi li deplase nan yon nouvo entèval obsèvasyon 20 segonn. Figi 5.14 montre sitiyasyon sa a.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Kòm ou ka wè, tout evènman yo te resevwa nan dènye 20 segonn yo enkli nan fenèt la. Nan fen peryòd tan sa a, yo kreye yon nouvo fenèt.

Lis 5.6 montre kòd ki montre itilizasyon fennèt ki woule pou pran tranzaksyon aksyon chak 20 segonn (yo jwenn nan src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Avèk ti chanjman sa a nan apèl la metòd TimeWindows.of, ou ka itilize yon fenèt tumbling. Egzanp sa a pa rele metòd until(), kidonk entèval retansyon default 24 èdtan yo pral itilize.

Finalman, li se tan pou avanse pou pi sou dènye opsyon fenèt yo - "sote" fenèt yo.

Glisman ("sote") fenèt yo

Fenèt glisman / sote yo sanble ak fenèt k ap woule, men ak yon ti diferans. Fenèt glisman pa tann jiska fen entèval tan an anvan yo kreye yon nouvo fenèt pou trete dènye evènman yo. Yo kòmanse nouvo kalkil apre yon entèval ap tann mwens pase dire fenèt la.

Pou ilistre diferans ki genyen ant fennèt woule ak sote, ann retounen nan egzanp konte tranzaksyon echanj. Objektif nou se toujou konte kantite tranzaksyon yo, men nou pa vle tann tout kantite tan anvan nou mete ajou kontwa a. Olye de sa, nou pral mete ajou kontwa a nan entèval ki pi kout. Pou egzanp, nou pral toujou konte kantite tranzaksyon yo chak 20 segonn, men mete ajou kontwa a chak 5 segonn, jan yo montre nan Fig. 5.15. Nan ka sa a, nou fini ak twa fenèt rezilta ak done sipèpoze.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Lis 5.7 montre kòd pou defini fenèt glisman (yo jwenn nan src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Ou ka konvèti yon fenèt k ap woule nan yon fenèt so lè w ajoute yon apèl nan metòd advanceBy(). Nan egzanp yo montre a, entèval ekonomize a se 15 minit.

Ou te wè nan seksyon sa a ki jan yo limite rezilta agrégation nan fennèt tan. An patikilye, mwen vle ou sonje twa bagay sa yo nan seksyon sa a:

  • gwosè fenèt sesyon yo limite pa peryòd tan, men pa aktivite itilizatè;
  • Fenèt "tonbe" yo bay yon apèsi sou evènman yo nan yon peryòd tan bay;
  • Se dire a nan fenèt sote fiks, men yo mete ajou souvan epi yo ka gen ladan antre sipèpoze nan tout fenèt yo.

Apre sa, nou pral aprann kijan pou konvèti yon KTable tounen nan yon KStream pou yon koneksyon.

5.3.3. Konekte objè KStream ak KTable

Nan Chapit 4, nou te diskite sou konekte de objè KStream. Koulye a, nou gen pou aprann kijan pou konekte KTable ak KStream. Sa a ka bezwen pou rezon senp sa a. KStream se yon kouran nan dosye, ak KTable se yon kouran nan mizajou dosye, men pafwa ou ka vle ajoute plis kontèks nan kouran dosye a lè l sèvi avèk mizajou ki soti nan KTable la.

Ann pran done sou kantite tranzaksyon echanj ak konbine yo ak nouvèl echanj pou endistri ki enpòtan yo. Men sa ou bezwen fè pou reyalize sa a bay kòd ou deja genyen an.

  1. Konvèti yon objè KTable ak done sou kantite tranzaksyon stock nan yon KStream, ki te swiv pa ranplase kle a ak kle ki endike sektè endistri ki koresponn ak senbòl stock sa a.
  2. Kreye yon objè KTable ki li done ki sòti nan yon sijè ak nouvèl echanj. Nouvo KTable sa a pral klase pa sektè endistri.
  3. Konekte mizajou nouvèl ak enfòmasyon sou kantite tranzaksyon echanj pa sektè endistri.

Koulye a, kite a wè ki jan yo aplike plan aksyon sa a.

Konvèti KTable an KStream

Pou konvèti KTable an KStream ou bezwen fè sa ki annapre yo.

  1. Rele metòd KTable.toStream().
  2. Lè w rele metòd KStream.map la, ranplase kle a ak non endistri a, epi answit rekipere objè TransactionSummary nan egzanp Windowed la.

Nou pral chenn operasyon sa yo ansanm jan sa a (nou ka jwenn kòd la nan dosye src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lis 5.8).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Paske n ap fè yon operasyon KStream.map, egzanp KStream retounen an ap re-patisyone otomatikman lè yo itilize li nan yon koneksyon.

Nou te konplete pwosesis konvèsyon an, apre nou bezwen kreye yon objè KTable pou li nouvèl stock.

Kreyasyon KTable pou nouvèl stock

Erezman, kreye yon objè KTable pran yon sèl liy kòd (ou ka jwenn kòd la nan src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Lis 5.9).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Li se vo anyen ke pa gen okenn objè Serde yo oblije espesifye, paske Serdes fisèl yo itilize nan anviwònman yo. Epitou, lè w itilize enimerasyon ki pi BONÈ a, tablo a ranpli ak dosye nan kòmansman an.

Koulye a, nou ka ale nan etap final la - koneksyon.

Konekte mizajou nouvèl ak done konte tranzaksyon yo

Kreye yon koneksyon pa difisil. Nou pral sèvi ak yon rantre gòch nan ka pa gen okenn nouvèl stock pou endistri ki enpòtan (nou ka jwenn kòd ki nesesè nan dosye a src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Operatè leftJoin sa a byen senp. Kontrèman ak rantre nan Chapit 4, metòd JoinWindow pa itilize paske lè w ap fè yon rantre KStream-KTable, gen yon sèl antre nan KTable pou chak kle. Yon koneksyon konsa pa limite nan tan: dosye a se swa nan KTable a oswa absan. Konklizyon prensipal la: lè l sèvi avèk objè KTable ou ka anrichi KStream ak done referans mwens souvan mete ajou.

Kounye a nou pral gade yon fason ki pi efikas pou anrichi evènman nan KStream.

5.3.4. GlobalKTable objè yo

Kòm ou ka wè, gen yon bezwen anrichi kouran evènman oswa ajoute kontèks yo. Nan Chapit 4 ou te wè koneksyon ki genyen ant de objè KStream, ak nan seksyon anvan an ou te wè koneksyon ki genyen ant yon KStream ak yon KTable. Nan tout ka sa yo, li nesesè re-patisyon kouran done a lè kat kle yo nan yon nouvo kalite oswa valè. Pafwa repatimantasyon fè klèman, epi pafwa Kafka Streams fè li otomatikman. Re-partitionnement nesesè paske kle yo chanje epi dosye yo dwe fini nan nouvo seksyon, sinon koneksyon an ap enposib (sa a te diskite nan Chapit 4, nan seksyon "Done re-partitionnement" nan sou-seksyon 4.2.4).

Re-partitionnement gen yon pri

Re-partisyone mande pou depans - depans resous adisyonèl pou kreye sijè entèmedyè, estoke done kopi nan yon lòt sijè; sa vle di tou ogmante latansi akòz ekri ak lekti nan sijè sa a. Anplis de sa, si ou bezwen rantre nan plis pase yon aspè oswa dimansyon, ou dwe chenn rantre yo, kat dosye yo ak nouvo kle, epi kouri pwosesis re-patisyon an ankò.

Konekte ak pi piti seri done

Nan kèk ka, volim done referans yo dwe konekte relativman ti, kidonk kopi konplè li yo ka fasilman anfòm lokalman sou chak ne. Pou sitiyasyon tankou sa a, Kafka Streams bay klas GlobalKTable la.

Enstans GlobalKTable yo inik paske aplikasyon an repwodui tout done nan chak nœuds yo. Epi depi tout done yo prezan sou chak ne, pa gen okenn nesesite pou patisyon kouran evènman an pa kle done referans pou li disponib nan tout patisyon yo. Ou kapab tou fè rantre san kle lè l sèvi avèk objè GlobalKTable. Ann tounen nan youn nan egzanp anvan yo demontre karakteristik sa a.

Konekte objè KStream ak objè GlobalKTable

Nan seksyon 5.3.2, nou te fè agrégation fenèt nan tranzaksyon echanj pa achtè. Rezilta yo nan agrégation sa a sanble yon bagay tankou sa a:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Pandan ke rezilta sa yo te sèvi objektif la, li ta pi itil si non kliyan an ak non konpayi konplè yo te parèt tou. Pou ajoute non kliyan an ak non konpayi an, ou ka fè rantre nòmal, men w ap bezwen fè de kat kle ak re-partitionnement. Avèk GlobalKTable ou ka evite pri a nan operasyon sa yo.

Pou fè sa, nou pral sèvi ak objè countStream ki soti nan Lis 5.11 (nou ka jwenn kòd ki koresponn lan nan src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) epi konekte li ak de objè GlobalKTable.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Nou te deja diskite sa a anvan, kidonk mwen pa pral repete li. Men, mwen sonje ke kòd la nan fonksyon an toStream().map abstrè nan yon objè fonksyon olye pou yo yon ekspresyon lambda inline pou dedomajman pou la lizibilite.

Pwochen etap la se pou deklare de sikonstans GlobalKTable (ou ka jwenn kòd yo montre nan dosye src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lis 5.12).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"

Tanpri sonje ke non sijè yo dekri lè l sèvi avèk kalite enimere.

Kounye a ke nou gen tout eleman yo pare, tout sa ki rete se ekri kòd la pou koneksyon an (ki ka jwenn nan dosye a src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Lis 5.13).

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Malgre ke gen de rantre nan kòd sa a, yo chenn paske youn nan rezilta yo pa itilize separeman. Rezilta yo parèt nan fen operasyon an antye.

Lè ou kouri operasyon rantre anwo a, ou pral jwenn rezilta tankou sa a:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Sans la pa chanje, men rezilta sa yo parèt pi klè.

Si w konte jiska Chapit 4, ou te deja wè plizyè kalite koneksyon an aksyon. Yo ki nan lis nan tablo. 5.2. Tablo sa a reflete kapasite koneksyon apati vèsyon 1.0.0 Kafka Streams; Yon bagay ka chanje nan pwochen degaje yo.

Liv la "Kafka Streams in Action. Aplikasyon ak mikwosèvis pou travay an tan reyèl"
Pou w fini bagay yo, ann rezime baz yo: ou ka konekte kouran evènman yo (KStream) ak ajou kouran (KTable) lè l sèvi avèk eta lokal yo. Altènativman, si gwosè referans done yo pa twò gwo, ou ka itilize objè GlobalKTable la. GlobalKTables replike tout patisyon yo nan chak ne aplikasyon Kafka Streams, asire ke tout done yo disponib kèlkeswa ki patisyon kle a koresponn ak.

Apre sa, nou pral wè karakteristik Kafka Streams la, gras a ki nou ka obsève chanjman eta yo san yo pa konsome done ki sòti nan yon sijè Kafka.

5.3.5. Eta kestyab

Nou te deja fè plizyè operasyon ki enplike eta epi nou toujou bay rezilta yo nan konsole a (pou rezon devlopman) oswa ekri yo nan yon sijè (pou rezon pwodiksyon). Lè w ap ekri rezilta nan yon sijè, ou dwe sèvi ak yon konsomatè Kafka pou wè yo.

Lekti done ki soti nan sijè sa yo ka konsidere kòm yon kalite opinyon konkretize. Pou rezon nou yo, nou ka itilize definisyon an nan yon View materyalize soti nan Wikipedia: "... yon objè baz done fizik ki gen rezilta yo nan yon rechèch. Pou egzanp, li ta ka yon kopi lokal done aleka, oswa yon sou-ansanm nan ranje yo ak/oswa kolòn nan yon tablo oswa rezilta rantre yo, oswa yon tablo rezime yo jwenn nan agrégation" (https://en.wikipedia.org/wiki /Materyalize_view).

Kafka Streams tou pèmèt ou fè rechèch entèaktif sou magazen leta yo, sa ki pèmèt ou li dirèkteman opinyon materyalize sa yo. Li enpòtan sonje ke rechèch la nan magazen eta a se yon operasyon li sèlman. Sa a asire ke ou pa bezwen enkyete sou aksidantèlman fè eta enkonsistan pandan aplikasyon w lan ap trete done.

Kapasite a dirèkteman rechèch magazen eta a enpòtan. Sa vle di ke ou ka kreye aplikasyon pou tablodbò san yo pa bezwen premye chache done nan men konsomatè Kafka la. Li ogmante tou efikasite aplikasyon an, akòz lefèt ke pa gen okenn bezwen ekri done ankò:

  • gras ak lokalite done yo, yo ka jwenn aksè byen vit;
  • se duplication nan done elimine, paske li pa ekri nan depo ekstèn.

Bagay pwensipal lan mwen vle ou sonje se ke ou ka dirèkteman rechèch eta nan aplikasyon w lan. Opòtinite sa a ba ou yo pa ka egzajere. Olye pou w konsome done ki soti nan Kafka epi estoke dosye nan yon baz done pou aplikasyon an, ou ka fè rechèch sou magazen leta ak menm rezilta a. Rekèt dirèk nan magazen leta vle di mwens kòd (pa gen konsomatè) ak mwens lojisyèl (pa bezwen yon tab baz done pou estoke rezilta yo).

Nou te kouvri anpil nan chapit sa a, kidonk nou pral kite diskisyon nou an sou demann entèaktif kont magazen leta pou kounye a. Men, pa enkyete: nan Chapit 9, nou pral kreye yon senp aplikasyon tablodbò ak demann entèaktif. Li pral sèvi ak kèk nan egzanp sa a ak chapit anvan yo pou montre demann entèaktif ak fason ou ka ajoute yo nan aplikasyon Kafka Streams.

Rezime

  • Objè KStream reprezante kouran evènman, ki konparab ak foure nan yon baz done. Objè KTable reprezante kouran aktyalizasyon, plis tankou mizajou nan yon baz done. Gwosè objè KTable a pa grandi, ansyen dosye yo ranplase pa nouvo.
  • Objè KTable yo obligatwa pou operasyon agrégation.
  • Sèvi ak operasyon fenèt, ou ka divize done total nan bokit tan.
  • Mèsi a objè GlobalKTable, ou ka jwenn aksè nan done referans nenpòt kote nan aplikasyon an, kèlkeswa patisyon yo.
  • Koneksyon ant objè KStream, KTable ak GlobalKTable posib.

Jiskaprezan, nou te konsantre sou bati aplikasyon Kafka Streams lè l sèvi avèk KStream DSL wo nivo. Malgre ke apwòch wo nivo a pèmèt ou kreye pwogram pwòp ak kout, lè l sèvi avèk li reprezante yon echanj. Travay ak DSL KStream vle di ogmante presizyon nan kòd ou a pa diminye degre nan kontwòl. Nan pwochen chapit la, nou pral gade API a ki ba-nivo moun kap okipe yo epi eseye lòt konpwomi. Pwogram yo pral pi long pase yo te anvan, men nou yo pral kapab kreye prèske nenpòt ne jeran ke nou ta ka bezwen.

→ Ou ka jwenn plis detay sou liv la nan sit entènèt piblikatè a

→ Pou Habrozhiteli 25% rabè lè l sèvi avèk koupon - Kafka Streams

→ Lè w peye pou vèsyon papye liv la, yo pral voye yon liv elektwonik pa imel.

Sous: www.habr.com

Add nouvo kòmantè