Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem" Silav, niştecîhên Xabro! Ev pirtûk ji bo her pêşdebirkerê ku dixwaze pêvajoya mijarê fam bike maqûl e. Fêmkirina bernameya belavbûyî dê ji we re bibe alîkar ku hûn Kafka û Kafka Streams baştir fam bikin. Dê xweş be ku meriv çarçoveya Kafka bixwe nas bike, lê ev ne hewce ye: Ez ê her tiştê ku hûn hewce ne ji we re vebêjim. Pêşdebirên Kafka yên xwedî ezmûn û nûjen dê fêr bibin ka meriv çawa bi karanîna pirtûkxaneya Kafka Streams a di vê pirtûkê de sepanên pêvajoya tîrêjê ya balkêş biafirîne. Pêşdebirên Java-ya navîn û pêşkeftî ku jixwe bi têgehên mîna serialîzasyonê nas dikin dê fêr bibin ku jêhatîyên xwe bicîh bînin da ku sepanên Kafka Streams biafirînin. Koda çavkaniyê ya pirtûkê bi Java 8-ê hatî nivîsandin û girîngî ji hevoksaziya îfadeya lambda ya Java 8 bikar tîne, ji ber vê yekê zanibin meriv çawa bi fonksiyonên lambda re bixebite (tevî zimanek bernamenûsek din) dê bikêr be.

Excerpt. 5.3. Operasyonên komkirin û paceyê

Di vê beşê de, em ê li ser beşên herî hêvîdar ên Kafka Streams bigerin. Heta niha me van aliyên Kafka Streams vegirtiye:

  • afirandina topolojiya pêvajoyê;
  • bikaranîna dewletê di sepanên streaming;
  • pêkanîna peywendiyên herikîna daneyê;
  • ciyawaziyên di navbera herikên bûyerê (KStream) û herikên nûvekirinê (KTable).

Di mînakên jêrîn de em ê van hemû hêmanan bînin cem hev. Her weha hûn ê di derheqê pencereyê de, taybetmendiyek din a girîng a serîlêdanên streaming de jî fêr bibin. Mînaka meya yekem dê kombûnek hêsan be.

5.3.1. Berhevkirina firotana stokan ji hêla sektora pîşesaziyê ve

Kombûn û kombûn amûrên girîng in dema ku bi daneya vekêşanê re dixebitin. Vekolîna tomarên kesane yên ku têne wergirtin pir caran têrê nakin. Ji bo derxistina agahdariya zêde ji daneyan, pêdivî ye ku wan kom bikin û berhev bikin.

Di vê nimûneyê de, hûn ê cilê bazirganek rojane li xwe bikin ku hewce dike ku qebareya firotanê ya pargîdaniyên di gelek pîşesaziyê de bişopîne. Bi taybetî, hûn bala xwe didin pênc pargîdaniyên ku di her pîşesaziyê de firotana parvekirina herî mezin in.

Kombûna wusa dê çend gavên jêrîn hewce bike da ku daneyan bi forma xwestinê wergerîne (di axaftinên gelemperî de).

  1. Çavkaniyek-based mijarek biafirînin ku agahdariya bazirganiya pargîdaniya xav diweşîne. Em ê neçar bin ku neşeyek ji celebê StockTransaction li ser tiştek celeb ShareVolume nexşeyê bikin. Mesele ev e ku tişta StockTransaction metadata firotanê dihewîne, lê em tenê di derheqê hejmara parvekirinên têne firotin de hewceyê daneyê ne.
  2. Daneyên ShareVolume bi sembola stockê kom bikin. Piştî ku ji hêla sembolê ve têne kom kirin, hûn dikarin van daneyan di binhevokên cildên firotanê yên stock de hilweşînin. Hêjayî gotinê ye ku rêbaza KStream.groupBy mînakek ji celebê KGroupedStream vedigerîne. Û hûn dikarin mînakek KTable-ê bi gazîkirina rêbaza KGroupedStream.reduce bêtir bistînin.

Têkiliya KGroupedStream çi ye

Rêbazên KStream.groupBy û KStream.groupByKey mînakek KGroupedStream vedigerînin. KGroupedStream piştî komkirina bi kilîtan temsîla navbirî ya herikîna bûyeran e. Ew qet ji bo xebata rasterast bi wê re ne armanc e. Di şûna wê de, KGroupedStream ji bo operasyonên berhevkirinê tê bikar anîn, ku her gav di KTable de encam dide. Û ji ber ku encama operasyonên berhevkirinê KTable ye û ew firotgehek dewletê bikar tînin, mimkun e ku ne hemî nûvekirinên wekî encamek bêtir li boriyê têne şandin.

Rêbaza KTable.groupBy KGroupedTable-ya heman rengî vedigerîne - nûnertiyek navîn a herikîna nûvekirinê, ku ji hêla mifteyê ve ji nû ve hatî kom kirin.

Werin em demek kurt bisekinin û li Fig. 5.9, ku nîşan dide ku me çi bi dest xistiye. Divê ev topolojî jixwe ji we re pir nas be.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Ka em niha li koda vê topolojiyê binêrin (ew dikare di pelê src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java de were dîtin) (Listing 5.2).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Koda hatî dayîn ji hêla kurtbûna xwe û hejmûna mezin a çalakiyên ku di çend rêzan de têne kirin ve têne cûda kirin. Hûn dikarin di pîvana yekem a rêbaza builder.stream de tiştek nû bibînin: nirxek ji celebê enum AutoOffsetReset.EARLIEST (YÊ DAWÎ jî heye), bi karanîna rêbaza Consumed.withOffsetResetPolicy hatî danîn. Ev celebê hejmartinê dikare were bikar anîn da ku ji bo her KStream an KTable-ê stratejiyek ji nû vesazkirinê diyar bike û pêşî li vebijarka vesazkirina vesazkirinê ya ji veavakirinê digire.

GroupByKey û GroupBy

Têkiliya KStream ji bo komkirina tomaran du rêbaz hene: GroupByKey û GroupBy. Her du KGroupedTable vedigerin, ji ber vê yekê hûn dikarin bipirsin ka ferqa wan çi ye û kengê hûn kîjan bikar bînin?

Rêbaza GroupByKey dema ku bişkokên di KStream-ê de jixwe vala nebin tê bikar anîn. Û ya herî girîng, ala "ji nû ve dabeşkirinê hewce dike" qet nehat danîn.

Rêbaza GroupBy dihesibîne ku we bişkojkên komkirinê guhertiye, ji ber vê yekê ala ji nû ve dabeşkirinê wekî rast tê danîn. Piştî rêbaza GroupBy pêkanîna tevlêbûn, komkirin û hwd dê ji nû ve dabeşkirina otomatîkî encam bide.
Kurte: Kengî gengaz be, divê hûn ji GroupBy bêtir GroupByKey bikar bînin.

Eşkere ye ku rêbazên mapValues ​​û groupBy çi dikin, ji ber vê yekê em mêze bikin li rêbaza sum() (li src/main/java/bbejeck/model/ShareVolume.java tê dîtin) (Listing 5.3).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Rêbaza ShareVolume.sum serjimara berbelav a qebareya firotana stock vedigerîne, û encama tevahiya zincîra hesaban tiştek KTable e. . Naha hûn rola ku KTable dilîze fêm dikin. Dema ku tiştên ShareVolume digihîjin, tişta têkildar a KTable nûvekirina heyî ya herî dawî hildide. Girîng e ku ji bîr mekin ku hemî nûvekirin di parvekirina berêVolumeKTable de têne xuyang kirin, lê ne hemî bêtir têne şandin.

Dûv re em vê KTable-ê bikar tînin da ku berhev bikin (li gorî hêjmara hîseyên ku têne danûstendin) da ku bigihîjin pênc pargîdaniyên ku di her pîşesaziyê de hejmûna herî zêde parvekirî ne. Çalakiyên me yên di vê rewşê de dê mîna yên kombûna yekem bin.

  1. Operasyonek groupBy ya din bikin da ku tiştên ShareVolume yên ferdî li gorî pîşesaziyê kom bikin.
  2. Dest bi kurtkirina tiştên ShareVolume bikin. Vê carê tişta berhevkirinê rêzek pêşîn a bi pîvana sabît e. Di vê rêza sabît de, tenê pênc pargîdaniyên ku herî zêde hîseyên wan têne firotin têne girtin.
  3. Rêzên ji paragrafa berê li ser nirxek rêzixê nexşînin û pênc stokên herî bazirganî ji hêla hejmarê ve ji hêla pîşesaziyê ve vegerînin.
  4. Encaman bi forma rêzê ji mijarê re binivîsin.

Di Fig. Xiflteya 5.10 grafiya topolojiya herikîna daneyê nîşan dide. Wekî ku hûn dikarin bibînin, qonaxa duyemîn a pêvajoyê pir hêsan e.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Naha ku me têgihiştinek zelal a strukturê vê gera duyemîn a pêvajoyê heye, em dikarin berê xwe bidin koda çavkaniya wê (hûn ê wê di pelê de bibînin src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Lîsteya 5.4) .

Ev destpêker guhêrbarek fixedQueue dihewîne. Ev tişta xwerû ye ku ji bo java.util.TreeSet adapterek e ku ji bo şopandina encamên N-ya jorîn di rêza daketinê ya parveyên ku têne kirîn de tê bikar anîn.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
We berê bangên groupBy û mapValues'an dîtiye, ji ber vê yekê em ê neçin wan (em gazî rêbaza KTable.toStream dikin ji ber ku rêbaza KTable.print betal bûye). Lê we hêj guhertoya KTable ya aggregate() nedîtiye, ji ber vê yekê em ê demek hindik li ser nîqaş bikin.

Wekî ku tê bîra we, ya ku KTable-ê cûda dike ev e ku tomarên bi heman keys nûvekirin têne hesibandin. KTable têketina kevn bi yekî nû diguhezîne. Tevhevbûn bi heman rengî pêk tê: tomarên herî dawî yên bi heman mifteyê têne berhev kirin. Dema ku tomarek tê, ew bi karanîna zêdekerek (parametreya duyemîn di bangewaziya rêbaza hevgirtî de) li mînaka pola FixedSizePriorityQueue tê zêdekirin, lê heke tomarek din jixwe bi heman mifteyê hebe, wê hingê tomara kevn bi karanîna dakêşkerek tê rakirin (parametreya sêyemîn di gazîkirina rêbaza tevhev).

Hemî ev tê vê wateyê ku berhevkarê me, FixedSizePriorityQueue, hemî nirxan bi yek mifteyê berhev nake, lê berhevokek herikbar ji hejmarên N celebên herî bazirganî yên stokan hilîne. Her têketina gihîştî hejmara giştî ya parên ku heya nuha hatine firotin vedihewîne. KTable dê agahdarî bide we ku hîseyên kîjan pargîdaniyan niha herî zêde têne danûstendinê ne, bêyî ku hewce bike ku her nûvekirinek bihevrekêşî hebe.

Em hîn bûn ku du tiştên girîng bikin:

  • nirxên komê yên di KTable de ji hêla mifteyek hevpar ve;
  • li ser van nirxên komkirî operasyonên bikêr ên wekî berhevkirin û komkirin pêk bînin.

Fêrbûna çawaniya pêkanîna van operasyonan ji bo têgihîştina wateya daneya ku di serîlêdana Kafka Streams re derbas dibe û têgihîştina agahdariya ku ew hildigire girîng e.

Me jî hin têgehên sereke yên ku berê di vê pirtûkê de hatine nîqaş kirin anîne cem hev. Di Beşa 4-ê de, me nîqaş kir ka çiqas xelet-tolerans, dewleta herêmî ji bo serîlêdanek streaming girîng e. Mînaka yekem a di vê beşê de destnîşan kir ku çima dewleta herêmî ew qas girîng e - ew jêhatîbûnê dide we ku hûn agahdariya ku we berê dîtiye bişopînin. Gihîştina herêmî ji derengiya torê dûr dikeve, serîlêdanê performansa û xelet-berxwedêrtir dike.

Dema ku hûn operasyonek berhevkirinê an berhevkirinê pêk tînin, divê hûn navê firotgeha dewletê diyar bikin. Operasyonên berhevkirin û berhevkirinê mînakek KTable vedigerînin, û KTable hilanîna dewletê bikar tîne da ku encamên kevin bi yên nû biguhezîne. Wekî ku we dîtiye, ne hemî nûvekirin li ser boriyê têne şandin, û ev girîng e ji ber ku operasyonên berhevkirinê ji bo hilberîna agahdariya kurt têne çêkirin. Heke hûn dewleta herêmî bicîh neynin, KTable dê hemî encamên berhevkirin û berhevkirinê bişîne.

Dûv re, em ê li pêkanîna operasyonên wekî kombûnê di nav demek diyarkirî de binihêrin - bi navê operasyonên pencereyê.

5.3.2. Operasyonên pencereyê

Di beşa paşîn de, me tevlihevbûn û kombûnek şemitîn destnîşan kir. Serlêdan berhevokek domdar a firotana borsayê pêk anî û li dûv wê jî pênc stokên herî bazirganî li ser danûstendinê kom kirin.

Carinan kombûn û berhevkirina encamên weha domdar hewce ye. Û carinan hûn hewce ne ku operasyonan tenê di demek diyarkirî de bikin. Mînakî, hesab bikin ka di 10 hûrdemên paşîn de çend danûstendinên danûstendinê bi hîseyên pargîdaniyek taybetî re hatine kirin. An jî çend bikarhêneran di 15 hûrdemên paşîn de li ser pankartek reklamê ya nû bikirtînin. Dibe ku serîlêdanek gelek caran operasyonên weha pêk bîne, lê bi encamên ku tenê ji bo demên diyarkirî (paceyên demê) derbas dibin.

Ji hêla kirrûbirrê ve danûstandinên danûstendinê têne hesibandin

Di mînaka paşîn de, em ê danûstendinên stokê di nav gelek bazirganan de bişopînin - an rêxistinên mezin an jî fînanserên kesane yên zîrek.

Du sedemên gengaz ên vê şopandinê hene. Yek ji wan hewce ye ku meriv zanibe ka serokên bazarê çi dikirin/difroşin. Ger van lîstikvanên mezin û veberhênerên sofîstîke fersendê bibînin, maqûl e ku hûn stratejiya wan bişopînin. Sedema duyemîn ev e ku meriv her nîşanên mumkin ên bazirganiya navxweyî ya neqanûnî bibîne. Ji bo vê yekê, hûn ê hewce bikin ku pêwendiya pêlên firotanê yên mezin bi daxuyaniyên çapameniyê yên girîng re analîz bikin.

Şopandina bi vî rengî ji gavên jêrîn pêk tê:

  • afirandina çemek ji bo xwendina ji mijara danûstendinên stock;
  • komkirina tomarên hatinê ji hêla nasnameya kirrûbir û sembola stock. Gazîkirina rêbaza groupBy mînakek çîna KGroupedStream vedigerîne;
  • Rêbaza KGroupedStream.windowedBy, herikîna daneyê ya ku bi pencereya demê ve sînorkirî ye vedigerîne, ku destûrê dide berhevkirina paceyî. Li gorî celebê pencereyê, an TimeWindowedKStream an jî SessionWindowedKStream tê vegerandin;
  • jimartina danûstendinê ji bo operasyona berhevkirinê. Herikîna daneya paceyî diyar dike ka qeydek taybetî di vê hejmartinê de tê hesibandin an na;
  • nivîsandina encaman li ser mijarekê an derxistina wan li konsolê di dema pêşkeftinê de.

Topolojiya vê serîlêdanê hêsan e, lê wêneyek zelal a wê dê bibe alîkar. Ka em li Fig. 5.11.

Piştre, em ê li fonksiyona operasyonên pencereyê û koda têkildar binêrin.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"

Cureyên pencereyê

Di Kafka Streams de sê celeb pencereyan hene:

  • sessional;
  • "tumbling";
  • dişemitin/diçûn.

Kîjan yek hilbijêrin bi daxwazên karsaziya we ve girêdayî ye. Pencereyên şûştin û avêtinê bi demê re sînordar in, dema ku paceyên danişînê ji hêla çalakiya bikarhêner ve têne sînorkirin - dirêjahiya danişîn(an) tenê ji hêla bikarhêner ve tê destnîşankirin ka çiqas çalak e. Ya sereke ku divê were bîra me ev e ku hemî celebên pencereyê li ser bingeha tarîx / demjimêra navnîşan in, ne dema pergalê.

Piştre, em topolojiya xwe bi her cûreyên pencereyê re bicîh dikin. Koda bêkêmasî dê tenê di mînaka yekem de were dayîn; ji bo celebên din ên pencereyê ji bilî celebê xebata pencereyê tiştek nayê guhertin.

Paceyên rûniştinê

Paceyên rûniştinê ji hemî celebên pencereyên din pir cûda ne. Ew ne ew qas ji hêla demê ve, lê ji hêla çalakiya bikarhêner (an jî çalakiya saziya ku hûn dixwazin bişopînin) ve têne sînorkirin. Pencereyên danişînê ji hêla demên bêçalaktiyê ve têne veqetandin.

Xiflteya 5.12 têgeha paceyên danişînê nîşan dide. Danişîna piçûk dê bi rûniştina li milê wê yê çepê re bibe yek. Û danişîna li rastê dê veqetandî be ji ber ku ew demek dirêj neçalakiyê dişopîne. Paceyên danişînê li ser bingeha çalakiya bikarhêner in, lê ji navnîşan mohra tarîx/demê bikar tînin da ku diyar bikin ku navnîş ji kîjan danişînê ye.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"

Bikaranîna paceyên rûniştinê ji bo şopandina danûstendinên stock

Werin em paceyên danişînê bikar bînin da ku agahdariya li ser danûstendinên danûstendinê bigirin. Pêkanîna paceyên danişînê di Lîsteya 5.5 de (ku dikare li src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java were dîtin) tê xuyang kirin.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
We berê piraniya operasyonên di vê topolojiyê de dîtiye, ji ber vê yekê ne hewce ye ku hûn li vir dîsa wan binirxînin. Lê li vir çend hêmanên nû jî hene, ku em ê niha behsa wan bikin.

Her operasyonek groupBy bi gelemperî cûreyek operasyona berhevkirinê (hevkirin, berhevkirin, an hejmartin) pêk tîne. Hûn dikarin bi tevheviyek birêkûpêk, an berhevkirina paceyê, ku tomarên di nav pencereyek demkî diyarkirî de hesab dike, berhevkirina kumulatîf pêk bînin.

Koda di Lîsteya 5.5 de hejmara danûstendinên di paceyên danişînê de dihejmêre. Di Fig. 5.13 ev kiryar gav bi gav têne analîz kirin.

Bi gazîkirina windowedBy(SessionWindows.with(twentySeconds).until(panzdehMinutes)) em pencereyek danişînê bi navbera neçalaktiyê 20 saniye û navberek domdar 15 hûrdeman diafirînin. Navberek bêkar a 20 saniyeyan tê vê wateyê ku serîlêdan dê her têketina ku di nav 20 saniyeyan de ji bidawîhatin an destpêka danişîna heyî de bigihîje danişîna heyî (çalak).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Dûv re, em destnîşan dikin ka kîjan operasyona berhevkirinê divê di pencereya danişînê de were kirin - di vê rewşê de, hejmartin. Ger têketinek li derveyî pencereya bêçalaktiyê derkeve (her aliyek mohra tarîx/demê), serîlêdan danişînek nû diafirîne. Navbera ragirtinê tê wateya domandina danişînê ji bo demek diyarkirî û destûrê dide daneyên dereng ên ku ji heyama neçalaktiyê ya danişînê dirêj dibe lê dîsa jî dikare were girêdan. Digel vê yekê, destpêk û dawiya danişîna nû ya ku ji hevgirtinê derketiye, bi tarîx/dema herî zû û herî dawî re têkildar e.

Ka em li çend navnîşan ji rêbaza hejmartinê binêrin da ku bibînin ka danişîn çawa dixebitin (Tablo 5.1).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Dema ku tomar digihîjin, em bi heman mifteyê li danişînên heyî digerin, dema dawîn ji mohra tarîx/dema heyî kêmtir - navbera bêçalaktiyê, û demek destpêkê ji mohra tarîx/demjimêra heyî + navbera bêçalaktiyê mezintir e. Li gorî vê yekê, çar navnîşan ji tabloyê. 5.1 di yek danişînê de wekî jêrîn têne yek kirin.

1. Qeyda 1 pêşî tê, ji ber vê yekê dema destpêkê bi dema dawîyê re wekhev e û 00:00:00 e.

2. Paşê, têketina 2 tê, û em li danişînên ku ne zûtir ji 23:59:55 bi dawî dibin û ne dereng ji 00:00:35 dest pê dikin digerin. Em qeyda 1-ê dibînin û danişînên 1 û 2 li hev dikin. Em dema destpêkirina danişîna 1 (berê) û dema dawiya danişîna 2 (paşê) digirin, da ku rûniştina meya nû di 00:00:00 de dest pê bike û di 00 de biqede: 00:15.

3. Qeyda 3 tê, em li danişînan di navbera 00:00:30 û 00:01:10 de digerin û tu yekê nabînin. Ji bo mifteya 123-345-654, FFBE danişîna duyemîn zêde bikin, ku di 00:00:50 de dest pê dike û diqede.

4. Qeyda 4 tê û em li danişînan di navbera 23:59:45 û 00:00:25 de digerin. Vê carê her du danişînên 1 û 2 têne dîtin. Her sê danişîn di yek de têne berhev kirin, bi demjimêra destpêkê 00:00:00 û dema bidawîbûnê 00:00:15.

Ji tiştê ku di vê beşê de tê rave kirin, hêja ye ku nuansên girîng ên jêrîn bi bîr bînin:

  • danişînên paceyên sabît-size ne. Demjimêra danişînê ji hêla çalakiyê ve di nav demek diyarkirî de tê destnîşankirin;
  • Di daneyan de mohra tarîx/dem diyar dike ka bûyer dikeve nav danişînek heyî an di heyamek bêkar de.

Dûv re em ê li ser celebê paceya paşîn - pencereyên "teqandin" nîqaş bikin.

Pencereyên "hilweşîn".

Pencereyên ku diherikin bûyerên ku di nav demek diyarkirî de ne digirin. Bifikirin ku hûn hewce ne ku her 20 saniyan de hemî danûstendinên pargîdaniyek diyarkirî bigirin, ji ber vê yekê hûn hemî bûyeran di wê heyamê de berhev dikin. Di dawiya navbera 20 çirke de, pencere dizivire û diçe navberek nû ya çavdêriya 20 saniyeyî. Xiflteya 5.14 vê rewşê nîşan dide.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Wekî ku hûn dikarin bibînin, hemî bûyerên ku di 20 saniyeyên paşîn de hatine wergirtin di pencereyê de cih digirin. Di dawiya vê heyamê de, pencereyek nû tê afirandin.

Lîsteya 5.6 kodê nîşan dide ku karanîna pencereyên şikestî ji bo girtina danûstendinên stokê her 20 çirkeyan nîşan dide (li src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java tê dîtin).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Bi vê guheztina piçûk a bangewaziya rêbazê TimeWindows.of, hûn dikarin pencereyek tîrêjê bikar bînin. Ev mînak rêbaza heta() nagere, ji ber vê yekê navbera ragirtina xwerû ya 24 demjimêran dê were bikar anîn.

Di dawiyê de, ew dem e ku em biçin ser vebijarkên paceyê yên paşîn - paceyên "hopping".

Pencereyên diherikin ("bazirin").

Pencereyên şemitî/hilweşînî dişibin pencereyên tîrêjê, lê bi ferqek sivik. Pencereyên xêzkirî li benda dawiya navberê namînin berî ku pencereyek nû biafirînin da ku bûyerên vê dawiyê bişopînin. Ew piştî navberek li bendê ji dema paceyê kêmtir dest bi hesabên nû dikin.

Ji bo ronîkirina cûdahiyên di navbera pencereyên şûştin û bazdanê de, em vegerin ser mînaka jimartina danûstendinên borsayê. Armanca me hîn jî ev e ku em hejmara danûstendinan bijmêrin, lê em naxwazin berî nûvekirina hejmarê li benda tevahî demê bisekinin. Di şûna wê de, em ê hejmarê di navberên kurt de nûve bikin. Mînakî, em ê dîsa jî her 20 saniyeyan hejmara danûstendinan bijmêrin, lê wekî ku di Hêjîrê de tê xuyang kirin her 5 çirkeyan jimarvan nûve bikin. 5.15. Di vê rewşê de, em bi sê pencereyên encamê yên bi daneya hevgirtî re diqedin.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Lîsteya 5.7 koda ji bo pênasekirina pencereyên xêzkirî nîşan dide (li src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java tê dîtin).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Pencereka gemarî dikare bi lêzêdekirina bangekê li rêbaza advanceBy() veguhezîne pencereyek bazdanê. Di mînaka nîşankirî de, navbera tomarkirinê 15 hûrdem e.

We di vê beşê de dît ku meriv çawa encamên berhevkirinê li pencereyên demê sînordar dike. Bi taybetî, ez dixwazim ku hûn sê tiştên jêrîn ji vê beşê bi bîr bînin:

  • mezinahiya paceyên danişînê ne ji hêla dema demê ve, lê ji hêla çalakiya bikarhêner ve sînorkirî ye;
  • Pencereyên "hilweşîn" di nav demek diyarkirî de nêrînek giştî ya bûyeran peyda dikin;
  • Demjimêra paceyên hilkişînê sabît e, lê ew pir caran têne nûve kirin û dibe ku di hemî pencereyan de navnîşên hevgirtî hebin.

Dûv re, em ê fêr bibin ka meriv çawa ji bo girêdanê KTable vegere KStream-ê.

5.3.3. Girêdana tiştên KStream û KTable

Di Beşa 4 de, me li ser girêdana du tiştên KStream nîqaş kir. Naha divê em fêr bibin ka meriv çawa KTable û KStream girêdide. Ev dibe ku ji ber sedemek hêsan a jêrîn hewce bike. KStream çemek tomaran e, û KTable jî çemek nûvekirina tomaran e, lê carinan dibe ku hûn bixwazin ku bi karanîna nûvekirinên ji KTable re çarçoveyek din li herika tomarê zêde bikin.

Werin em daneyan li ser hejmara danûstendinên borsayê bigirin û wan bi nûçeyên borsayê yên ji bo pîşesaziyên têkildar re bikin yek. Li vir tiştê ku hûn hewce ne bikin ku hûn bigihîjin vê koda ku we berê heye.

  1. Tiştek KTable ya bi daneya li ser hejmara danûstendinên pargîdanî veguhezînin KStream, li dûv wê mifteyê bi mifteya ku sektora pîşesaziyê ya ku bi vê sembola stokê re têkildar nîşan dide veguhezînin.
  2. Tiştek KTable biafirînin ku daneyên ji mijarek bi nûçeyên borsayê dixwîne. Ev KTable-ya nû dê ji hêla sektora pîşesaziyê ve were kategorîze kirin.
  3. Nûvekirinên nûçeyan bi agahdariya li ser hejmara danûstendinên borsayê ji hêla sektora pîşesaziyê ve girêdin.

Naha em bibînin ka meriv çawa vê plana çalakiyê bicîh tîne.

KTable veguherînin KStream

Ji bo veguhertina KTable bo KStream hûn hewce ne ku jêrîn bikin.

  1. Gazî rêbaza KTable.toStream() bikin.
  2. Bi gazîkirina rêbaza KStream.map, mifteyê bi navê pîşesaziyê biguhezînin, û dûv re jî tiştê TransactionSummary ji mînaka Windowed bistînin.

Em ê van operasyonan bi vî rengî bi hev re zencîre bikin (kodê di pelê de src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java dikare were dîtin) (Listing 5.8).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Ji ber ku em operasyonek KStream.map pêk tînin, mînaka KStream-a vegerî dema ku di pêwendiyekê de tê bikar anîn bixwe ji nû ve tê dabeş kirin.

Me pêvajoya veguheztinê qedand, paşê em hewce ne ku ji bo xwendina nûçeyên pargîdaniyê tiştek KTable biafirînin.

Afirandina KTable ji bo nûçeyên stock

Xwezî, çêkirina hêmanek KTable tenê rêzek kodê digire (kod dikare li src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java were dîtin) (Listing 5.9).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Hêjayî gotinê ye ku tu tiştên Serde ne hewce ne ku bêne diyar kirin, ji ber ku di mîhengan de Serdeyên rêzan têne bikar anîn. Di heman demê de, bi karanîna jimartina EARLIYÊ, tablo di destpêkê de bi tomaran tê dagirtin.

Naha em dikarin derbasî qonaxa dawîn bibin - girêdan.

Girêdana nûvekirina nûçeyan bi daneyên hejmartina danûstendinê re

Çêkirina pêwendiyê ne zehmet e. Em ê tevlêbûnek çepê bikar bînin heke ji bo pîşesaziya têkildar nûçeyek stokê tune be (koda pêwîst dikare di pelê de src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java were dîtin) (Lîsteya 5.10).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Ev operatora leftJoin pir hêsan e. Berevajî tevlêbûnên di Beşa 4-ê de, rêbaza JoinWindow nayê bikar anîn ji ber ku dema ku tevlêbûnek KStream-KTable pêk tê, ji bo her keyek di KTable de tenê yek têketinek heye. Têkiliyek wusa di wextê de ne sînorkirî ye: tomar an di KTable de ye an jî tune ye. Encama sereke: bi karanîna tiştên KTable hûn dikarin KStream bi daneyên referansê yên kêm caran nûvekirî dewlemend bikin.

Naha em ê li rêyek bikêrtir ji bo dewlemendkirina bûyerên ji KStream binihêrin.

5.3.4. Tiştên GlobalKTable

Wekî ku hûn dikarin bibînin, hewcedarî bi dewlemendkirina rûdanên bûyeran an lê zêdekirina çarçoveyek li wan heye. Di Beşa 4 de we girêdanên di navbera du tiştên KStream de, û di beşa berê de we girêdana di navbera KStream û KTable de dît. Di van hemî rewşan de, pêdivî ye ku dema ku bişkojan bi celebek an nirxek nû ve nexşe bikin, pêlava daneyê ji nû ve dabeş bikin. Carinan dabeşkirin bi eşkere tê kirin, û carinan jî Kafka Streams wê bixweber dike. Dabeşkirin ji nû ve pêdivî ye ji ber ku bişkok hatine guhertin û tomar divê di beşên nû de biqedin, wekî din dê girêdan ne mumkun be (ev di Beşa 4-ê de, di beşa "Daneyên ji nû ve dabeşkirin" de di binbeşa 4.2.4 de hate nîqaş kirin).

Ji nû ve dabeşkirin lêçûnek heye

Ji nû ve dabeşkirin lêçûn hewce dike - lêçûnên çavkaniyê yên zêde ji bo afirandina mijarên navîn, hilanîna daneyên dubare di mijarek din de; ew jî tê wateya zêdebûna derengmayîna ji ber nivîsandin û xwendina ji vê mijarê. Wekî din, heke hûn hewce ne ku ji zêdetirî yek alî an pîvanek tevlê bibin, divê hûn zencîreyan girêdin, tomaran bi bişkojkên nû nexşînin, û pêvajoya ji nû ve dabeşkirinê ji nû ve bimeşînin.

Girêdana bi daneyên piçûktir

Di hin rewşan de, qebareya daneyên referansê yên ku werin girêdan bi nisbet piçûk e, ji ber vê yekê kopiyên wê yên bêkêmasî dikarin bi hêsanî li ser her girêkek herêmî bi cih bibin. Ji bo rewşên bi vî rengî, Kafka Streams çîna GlobalKTable peyda dike.

Mînakên GlobalKTable yekta ne ji ber ku serîlêdan hemî daneyan li her yek ji girêkan dubare dike. Û ji ber ku hemî dane li ser her girêkek heye, ne hewce ye ku hûn herikîna bûyerê bi mifteya daneya referansê ve parve bikin da ku ew ji hemî dabeşan re peyda bibe. Her weha hûn dikarin bi karanîna tiştên GlobalKTable ve girêdanên bê key. Ka em vegerin ser yek ji mînakên berê da ku vê taybetmendiyê nîşan bidin.

Girêdana tiştên KStream bi tiştên GlobalKTable re

Di binbeşa 5.3.2 de, me berhevkirina pencereya danûstendinên danûstendinê ji hêla kirrûbiran ve pêk anî. Encamên vê kombûnê bi vî rengî xuya bûn:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Dema ku van encaman ji mebestê re xizmet kir, heke navê xerîdar û navê pargîdaniya tevahî jî were xuyang kirin dê bikêrtir bûya. Ji bo ku navê xerîdar û navê pargîdaniyê lê zêde bikin, hûn dikarin tevlêbûna normal bikin, lê hûn ê hewce bikin ku du nexşeyên sereke û ji nû ve dabeşkirinê bikin. Bi GlobalKTable re hûn dikarin ji lêçûna operasyonên weha dûr bixin.

Ji bo vê yekê, em ê tişta countStream ji Lîsteya 5.11 bikar bînin (koda têkildar dikare li src/main/java/bbejeck/chapter_5/GlobalKTableExample.java were dîtin) û wê bi du tiştên GlobalKTable ve girêdin.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Me berê jî li ser vê yekê nîqaş kir, ji ber vê yekê ez ê dubare nekim. Lê ez bala xwe didim ku koda di fonksiyona toStream().map de ji bo xwendinê li şûna bêjeyek lambda ya hundurîn, di fonksiyonek fonksiyonê de tê berhev kirin.

Pêngava paşîn ev e ku du mînakên GlobalKTable were ragihandin (kodê ku tê xuyang kirin dikare di pelê de src/main/java/bbejeck/chapter_5/GlobalKTableExample.java were dîtin) (Listing 5.12).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"

Ji kerema xwe not bikin ku navên mijaran bi karanîna celebên hejmartî têne diyar kirin.

Naha ku me hemî pêkhate amade ne, ya ku dimîne ev e ku em kodê ji bo girêdanê binivîsin (ku di pelê src/main/java/bbejeck/chapter_5/GlobalKTableExample.java de tê dîtin) (Listing 5.13).

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Her çend di vê kodê de du girêdan hebin jî, ew bi zincîran têne girêdan ji ber ku yek ji encamên wan ji hev cuda nayê bikar anîn. Encam di dawiya tevahiya operasyonê de têne xuyang kirin.

Dema ku hûn operasyona tevlêbûna jorîn dimeşînin, hûn ê encamên weha bistînin:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Esas neguheriye, lê ev encam zelaltir xuya dikin.

Ger hûn ji Beşa 4-an re jimartin, we berê çend celeb girêdan di çalakiyê de dîtine. Ew di tabloyê de têne navnîş kirin. 5.2. Ev tablo kapasîteyên girêdanê yên wekî guhertoya 1.0.0 ya Kafka Streams nîşan dide; Dibe ku tiştek di weşanên pêşerojê de biguhere.

Pirtûka "Kafka Streams Di çalakiyê de. Serlêdan û mîkroxizmetên ji bo xebata rast-dem"
Ji bo ku tiştan biqedînin, werin em bingehên xwe ji nû ve bişopînin: hûn dikarin bi karanîna dewleta herêmî veguhên bûyerê (KStream) ve girêdin û herikên (KTable) nûve bikin. Wekî din, heke mezinahiya daneyên referansê ne pir mezin be, hûn dikarin objeya GlobalKTable bikar bînin. GlobalKTables hemî dabeşan li her girêka serîlêdana Kafka Streams dubare dike, piştrast dike ku hemî dane berdest in bêyî ku kilît bi kîjan dabeşkirinê re têkildar be.

Dûv re em ê taybetmendiya Kafka Streams bibînin, bi saya wê em dikarin guheztinên dewletê bişopînin bêyî ku daneyên ji mijarek Kafka bixwin.

5.3.5. Dewleteke bipirse

Me berê gelek operasyonên ku bi dewletê re têkildar in pêk aniye û her gav encaman ji konsolê re derdixe (ji bo mebestên pêşkeftinê) an jî wan li ser mijarekê dinivîse (ji bo mebestên hilberînê). Dema ku encaman li ser mijarekê dinivîsin, divê hûn xerîdarek Kafka bikar bînin da ku wan bibînin.

Xwendina daneyên ji van mijaran dikare wekî celebek dîtinên materyalî were hesibandin. Ji bo mebestên xwe, em dikarin pênaseya dîtina maddî ya ji Wîkîpediyayê bikar bînin: “...objektek databasa fizîkî ya ku encamên pirsekê vedihewîne. Mînakî, ew dibe ku kopiyek herêmî ya daneyên dûr, an binkeyek rêz û/an stûnên tabloyek an encamên tevlêbûnê be, an tabloyek kurt a ku bi berhevkirinê hatî peyda kirin” (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams di heman demê de dihêle hûn pirsên înteraktîf li ser firotgehên dewletê bimeşînin, bihêle hûn rasterast van dîtinên maddî bixwînin. Girîng e ku bala xwe bidinê ku lêpirsîna ji firotgeha dewletê re operasyonek tenê-xwendin e. Ev piştrast dike ku hûn ne hewce ne ku hûn di dema ku serîlêdana we daneyan hildiberîne bi xeletî nakokiya dewletê bikin.

Qabiliyeta ku meriv rasterast li firotgehên dewletê bipirse girîng e. Ev tê vê wateyê ku hûn dikarin serîlêdanên dashboardê biafirînin bêyî ku hûn pêşî daneyan ji xerîdarê Kafka bistînin. Di heman demê de ew karbidestiya serîlêdanê jî zêde dike, ji ber ku ne hewce ye ku ji nû ve daneyan binivîsin:

  • bi saya cîhê daneyan, ew dikarin zû bigihîjin wan;
  • dubarekirina daneyan ji holê tê rakirin, ji ber ku ew li hilanîna derveyî nayê nivîsandin.

Tişta sereke ku ez dixwazim ku hûn ji bîr nekin ev e ku hûn dikarin rasterast ji hundurê serlêdana xwe li dewletê bipirsin. Derfetên ku ev dide we nayê zêde kirin. Li şûna ku hûn daneyên ji Kafka bixwin û tomarên di danegehek ji bo serîlêdanê de hilînin, hûn dikarin bi heman encamê ji firotgehên dewletê bipirsin. Pirsên rasterast ji firotgehên dewletê re tê wateya kêm kod (bê xerîdar) û kêm nermalava (ne hewce ye ku tabloyek databasê hebe ku encaman hilîne).

Me di vê beşê de pir zemîn girtiye, ji ber vê yekê em ê nîqaşa xwe ya li ser pirsên înteraktîf ên li dijî firotgehên dewletê ji bo nuha bihêlin. Lê xem neke: di Beşa 9-ê de, em ê bi pirsên înteraktîf serîlêdanek dashboardek hêsan biafirînin. Ew ê hin mînakên ji vê û beşên berê bikar bîne da ku pirsên înteraktîf nîşan bide û ka hûn çawa dikarin wan li serîlêdanên Kafka Streams zêde bikin.

Nîqaş

  • Tiştên KStream herikên bûyeran temsîl dikin, ku bi têketina databasê re têne berhev kirin. Tiştên KTable nûvekirinên nûvekirinê temsîl dikin, bêtir wekî nûvekirina databasê. Mezinahiya objeya KTable mezin nabe, tomarên kevn bi yên nû têne guheztin.
  • Tiştên KTable ji bo operasyonên komkirinê hewce ne.
  • Bi karanîna operasyonên pencereyê, hûn dikarin daneya berhevkirî li kepçeyên demê veqetînin.
  • Bi saya tiştên GlobalKTable, hûn dikarin bigihîjin daneyên referansê li her cîhê serîlêdanê, bêyî dabeşkirinê.
  • Têkiliyên di navbera tiştên KStream, KTable û GlobalKTable de gengaz in.

Heya nuha, me bala xwe da ser avakirina sepanên Kafka Streams bi karanîna KStream DSL-a asta bilind. Her çend nêzîkatiya asta bilind destûrê dide we ku hûn bernameyên birêkûpêk û kurt biafirînin, lê karanîna wê bazirganiyek nîşan dide. Karkirina bi DSL KStream re tê vê wateyê ku bi kêmkirina asta kontrolê re kurtbûna koda xwe zêde dike. Di beşa pêş de, em ê li API-ya girêk-a-asta nizm binêrin û bazirganiyên din biceribînin. Dê bername ji yên berê dirêjtir bin, lê em ê karibin hema hema her girêk hilgirê ku dibe ku ji me re hewce be biafirînin.

→ Agahiyên bêtir li ser pirtûkê dikarin li vir bibînin malpera weşanger

→ Ji bo Habrozhiteli 25% erzanî bi karanîna kuponê - Kafka Streams

→ Piştî dayina guhertoya kaxezê ya pirtûkê, dê pirtûkek elektronîkî bi e-nameyê were şandin.

Source: www.habr.com

Add a comment