Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo" Ndewo, ndị bi na Khabro! Akwụkwọ a dabara adaba maka onye nrụpụta ọ bụla chọrọ ịghọta nhazi eri. Ịghọta mmemme kesara ga-enyere gị aka ịghọta Kafka na Kafka iyi. Ọ ga-adị mma ịmara usoro nke Kafka n'onwe ya, mana nke a adịghị mkpa: M ga-agwa gị ihe niile ị chọrọ. Ndị mmepe Kafka nwere ahụmahụ na ndị novice ga-amụta otu esi emepụta ngwa nhazi iyi na-atọ ụtọ site na iji ọba akwụkwọ Kafka Streams dị n'akwụkwọ a. Ndị mmepe Java dị n'etiti na nke dị elu amaralarị echiche dị ka serialization ga-amụta itinye nka ha iji mepụta ngwa Kafka Streams. Edere koodu isi mmalite akwụkwọ ahụ na Java 8 ma jiri Java 8 lambda okwu syntax mee ihe nke ọma, yabụ ịmara ka esi arụ ọrụ na ọrụ lambda (ọbụlagodi n'asụsụ mmemme ọzọ) ga-aba uru.

Mpempe akwụkwọ. 5.3. Ọrụ nchịkọta na windowing

N'akụkụ a, anyị ga-aga n'ihu inyocha akụkụ iyi Kafka kacha mma. Ka ọ dị ugbu a, anyị ekpuchila akụkụ ndị a nke iyi iyi Kafka:

  • ịmepụta topology nhazi;
  • iji steeti na gụgharia ngwa;
  • na-arụ ọrụ njikọ data iyi;
  • ọdịiche dị n'etiti iyi mmemme (KStream) na iyi mmelite (KTable).

N'ihe atụ ndị a, anyị ga-ejikọta ihe ndị a niile ọnụ. Ị ga-amụtakwa gbasara ime windo, akụkụ ọzọ dị mma nke ngwa mgbasa ozi. Ihe atụ mbụ anyị ga-abụ nchịkọta dị mfe.

5.3.1. Mkpokọta nke ngwaahịa ahịa site na ngalaba ụlọ ọrụ

Nchịkọta na nchịkọta bụ ngwa ọrụ dị mkpa mgbe ị na-arụ ọrụ na data nkwanye. Nyocha nke ndekọ nke onye ọ bụla ka a na-enweta ya anaghị ezughị oke. Iji wepụta ozi ndị ọzọ na data, ọ dị mkpa ijikọta ma jikọta ha.

N'ihe atụ a, ị ga-eyiri uwe nke onye na-ere ahịa ụbọchị nke kwesịrị iji nyochaa oke ahịa nke ebuka nke ụlọ ọrụ na ọtụtụ ụlọ ọrụ. Kpọmkwem, ị nwere mmasị na ụlọ ọrụ ise nwere oke ahịa ahịa na ụlọ ọrụ ọ bụla.

Nchịkọta dị otú ahụ ga-achọ usoro dị iche iche ndị a iji tụgharịa data n'ụdị achọrọ (na-ekwu okwu n'ozuzu).

  1. Mepụta isi iyi dabere na isiokwu nke na-ebipụta ozi azụmaahịa ngwaahịa ngwaahịa. Anyị ga-edepụta ihe ụdị StockTransaction na ihe ụdị ShareVolume. Isi ihe bụ na ihe StockTransaction nwere metadata ahịa, mana naanị anyị chọrọ data gbasara ọnụọgụ mbak a na-ere.
  2. Otu ShareVolume data site na akara ngwaahịa. Ozugbo ejiri akara chịkọta ya, ị nwere ike daa data a n'ime mkpokọta mkpokọta ọnụ ahịa ngwaahịa. Ọ dị mma ịmara na usoro KStream.groupBy na-eweghachi ihe atụ nke ụdị KGroupedStream. Ị nwere ike nweta ihe atụ KTable site n'ịkpọba usoro KGroupedStream.reduce.

Kedu ihe bụ interface KGroupedStream

Ụzọ KStream.groupBy na KStream.groupByKey na-eweghachi ihe atụ nke KGroupedStream. KGroupedStream bụ ihe nnọchite anya n'etiti ọtụtụ mmemme mgbe ejiri igodo chịkọtachara. Ebughị ya ma ọlị maka ọrụ ya na ya. Kama, a na-eji KGroupedStream arụ ọrụ mkpokọta, nke na-ebute KTable mgbe niile. Ma ebe ọ bụ na nsonaazụ nke mkpokọta ọrụ bụ KTable na ha na-eji ụlọ ahịa steeti, ọ ga-ekwe omume na ọ bụghị mmelite niile ka a na-ezigara n'ihu na pipeline.

Usoro KTable.groupBy na-eweghachite KGroupedTable yiri ya - ihe nnochite anya n'etiti iyi mmelite, ejiri igodo chịkọtakwara.

Ka anyị were obere oge ezumike wee lelee fig. 5.9, nke na-egosi ihe anyị rụzuru. Topology a kwesịrị ịmara gị nke ọma.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ka anyị leba anya na koodu maka topology a (enwere ike ịchọta ya na faịlụ src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Ndepụta 5.2).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
A na-amata koodu enyere site na nkenke ya na nnukwu olu omume emere n'ọtụtụ ahịrị. Ị nwere ike ịhụ ihe ọhụrụ na paramita mbụ nke usoro builder.stream: uru nke enum ụdị AutoOffsetReset.EARLIEST (enwekwara nke ikpeazụ), tọọ site na iji usoro Consumed.withOffsetResetPolicy. Enwere ike iji ụdị ngụkọ a iji kọwapụta atụmatụ nrụpụta nrụpụta maka KStream ma ọ bụ KTable ọ bụla wee buru ụzọ karịa nhọrọ nrụpụta nrụpụta site na nhazi.

GroupByKey na GroupBy

Ihe ngosi KStream nwere ụzọ abụọ maka ịchịkọta ndekọ: GroupByKey na GroupBy. Ha abụọ na-eweghachite KGroupedTable, yabụ ị nwere ike ịnọ na-eche ihe dị iche n'etiti ha na mgbe ị ga-eji nke?

A na-eji usoro GroupByKey mgbe igodo dị na KStream adịlarị efu. Na nke kachasị mkpa, ọkọlọtọ "chọrọ nhazigharị ọzọ" edobebeghị ọkọlọtọ.

Usoro GroupBy na-eche na ị gbanweela igodo nchịkọta, ya mere a na-edozi ọkọlọtọ nkewa ka ọ bụrụ eziokwu. Ịrụ njikọ, nchịkọta, wdg mgbe usoro GroupBy ga-eme ka ọ bụrụ nkewa na-akpaghị aka.
Nchịkọta: Mgbe ọ bụla enwere ike, ịkwesịrị iji GroupByKey karịa GroupBy.

O doro anya ihe mapValues ​​& GroupSite na-eme, yabụ ka anyị lelee usoro nchikota (nke dị na src/main/java/bbejeck/model/ShareVolume.java) (Ndepụta 5.3).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Usoro ShareVolume.sum na-eweghachi mkpokọta ọsọ nke oke ahịa ngwaahịa, yana nsonaazụ nke usoro mgbako niile bụ ihe KTable. . Ugbu a ị ghọtara ọrụ KTable na-arụ. Mgbe ihe ShareVolume rutere, ihe KTable kwekọrọ na-echekwa mmelite kachasị ọhụrụ. Ọ dị mkpa icheta na mmelite niile na-egosipụta na shareVolumeKTable gara aga, mana ọ bụghị ha niile ka ezigara n'ihu.

Anyị na-eji KTable a chịkọta (site na ọnụ ọgụgụ nke mbak ahịa) iji rute na ụlọ ọrụ ise nwere ọnụ ọgụgụ kachasị elu nke ahịa na ụlọ ọrụ ọ bụla. Omume anyị na nke a ga-adị ka nke mbụ nchịkọta.

  1. Mee otu ọzọSite n'ọrụ ịchịkọta ihe ShareVolume n'otu n'otu site na ụlọ ọrụ.
  2. Malite ichikota ihe ShareVolume. N'oge a, ihe nchịkọta bụ kwụ n'ahịrị kacha mkpa. Na kwụ n'ahịrị nha a, ọ bụ naanị ụlọ ọrụ ise nwere oke oke ahịa ere ka edobere.
  3. Map ndị kwụ n'ahịrị site na paragraf gara aga ka ọ bụrụ uru eriri wee weghachi ebuka ise kacha ere ahịa site na nọmba site na ụlọ ọrụ.
  4. Dee nsonaazụ ya n'ụdị eriri na isiokwu.

Na fig. Ọgụgụ 5.10 na-egosi eserese data eruba topology. Dị ka ị pụrụ ịhụ, usoro nhazi nke abụọ dị nnọọ mfe.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ugbu a anyị nwere nghọta doro anya banyere nhazi nke nhazi nke abụọ a, anyị nwere ike ịtụgharị na koodu isi ya (ị ga-ahụ ya na faịlụ src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Ndepụta 5.4) .

Ihe mmalite a nwere mgbanwe Queue a kapịrị ọnụ. Nke a bụ ihe omenala bụ ihe nkwụnye maka java.util.TreeSet nke a na-eji soro nsonaazụ N kachasị elu na-agbada n'usoro ahịa ahịa.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ị hụlarị ọkpụkpọ otuBy na mapValue, yabụ anyị agaghị abanye na ndị ahụ (anyị na-akpọ usoro KTable.toStream n'ihi na usoro KTable.print kwụsịrị). Mana ị hụbeghị ụdị mkpokọta KTable, yabụ anyị ga-etinye obere oge na-ekwurịta nke ahụ.

Dị ka ị na-echeta, ihe na-eme ka KTable dị iche bụ na a na-ewere ndekọ nwere otu igodo dị ka mmelite. KTable jiri nke ọhụrụ dochie ntinye ochie. Nchịkọta na-eme n'otu ụzọ ahụ: a na-ejikọta ihe ndekọ ọhụrụ nwere otu igodo ahụ. Mgbe ndekọ bịarutere, a na-agbakwunye ya na klas FixedSizePriorityQueue site na iji adder (nkeji nke abụọ na usoro nchịkọta mkpokọta), mana ọ bụrụ na ndekọ ọzọ adịlarị na otu igodo ahụ, mgbe ahụ, a ga-ewepụ ihe ndekọ ochie site na iji subtractor (parameter nke atọ n'ime ya). usoro ikpokọta oku).

Nke a pụtara na onye nchịkọta anyị, FixedSizePriorityQueue, anaghị eji otu igodo chịkọta ụkpụrụ niile, kama ọ na-echekwa nchikota na-akpụ akpụ nke ọnụọgụ nke ụdị ebuka ahịa kacha ere. Ntinye ọ bụla na-abata nwere ọnụ ọgụgụ nke mbak e rere ugbu a. KTable ga-enye gị ozi gbasara oke ahịa nke ụlọ ọrụ na-ere ahịa ugbu a, na-achọghị nchịkọta mpịakọta nke mmelite ọ bụla.

Anyị mụtara ime ihe abụọ dị mkpa:

  • ụkpụrụ otu na KTable site na igodo nkịtị;
  • rụọ ọrụ bara uru dị ka mkpọkọta na nchịkọta na ụkpụrụ ndị a agbakọtara.

Ịmara otu esi arụ ọrụ ndị a dị mkpa ịghọta ihe data na-aga site na ngwa Kafka Streams na ịghọta ozi ọ na-ebu.

Anyị achịkọtakwala ụfọdụ isi echiche ndị a tụlere na mbụ n'akwụkwọ a. N'Isi nke 4, anyị tụlere otu esi anabata mmejọ, steeti ime obodo dị mkpa maka ngwa mgbasa ozi. Ihe atụ nke mbụ n’isiakwụkwọ a gosiri ihe mere steeti ime obodo ji dị mkpa—ọ na-enye gị ohere idobe ozi ndị ị hụworo. Ịnweta mpaghara na-ezere igbu oge netwọk, na-eme ka ngwa ahụ na-arụ ọrụ nke ọma na-eguzogide njehie.

Mgbe ị na-arụ ọrụ mpịakọta ọ bụla ma ọ bụ mkpokọta, ị ga-ezipụta aha ụlọ ahịa steeti ahụ. Ọrụ mpịakọta na nchịkọta na-eweghachi ihe atụ KTable, na KTable na-eji nchekwa steeti were nke ọhụrụ dochie nsonaazụ ochie. Dị ka ị hụla, ọ bụghị mmelite niile ka a na-ezigara pipeline, nke a dịkwa mkpa n'ihi na a na-ahazi ọrụ nchịkọta iji mepụta ozi nchịkọta. Ọ bụrụ na itinyeghị steeti mpaghara, KTable ga-ebugharị nsonaazụ mkpokọta na mpịakọta niile.

Ọzọ, anyị ga-eleba anya na ịrụ ọrụ dịka nchịkọta n'ime oge a kapịrị ọnụ - nke a na-akpọ arụmọrụ windowing.

5.3.2. Ọrụ windo

Na ngalaba gara aga, anyị webatara convolution na mkpokọta. Ngwa ahụ rụrụ mpịakọta na-aga n'ihu na-ere ahịa ngwaahịa na-esote nchịkọta nke ngwaahịa ise kachasị ere na mgbanwe ahụ.

Mgbe ụfọdụ nchịkọta dị otú ahụ na-aga n'ihu na nchịkọta nke nsonaazụ dị mkpa. Ma mgbe ụfọdụ, ịkwesịrị ịrụ ọrụ naanị n'ime oge enyere. Dịka ọmụmaatụ, gbakọọ ọnụ ahịa mgbanwe mgbanwe nke otu ụlọ ọrụ na nkeji iri gara aga. Ma ọ bụ ole ndị ọrụ pịrị na ọkọlọtọ mgbasa ozi ọhụrụ n'ime nkeji iri na ise gara aga. Ngwa nwere ike ịrụ ọrụ dị otú ahụ ọtụtụ oge, mana yana nsonaazụ na-emetụta naanị oge a kapịrị ọnụ (window oge).

Ịgụ mgbanwe mgbanwe site n'azụ ahịa

N'ihe atụ na-esote, anyị ga-esochi azụmahịa ngwaahịa n'ofe ọtụtụ ndị ahịa-ma ọ bụ nnukwu ụlọ ọrụ ma ọ bụ ndị nwere ego nwere ọgụgụ isi.

Enwere ihe abụọ nwere ike ime maka nsochi a. Otu n'ime ha bụ mkpa ịmara ihe ndị isi ahịa na-azụta / na-ere. Ọ bụrụ na ndị a nnukwu egwuregwu na ọkaibe investors na-ahụ ohere, ọ bụ ihe ezi uche na-eso ha atụmatụ. Ihe nke abụọ kpatara ya bụ ọchịchọ ịhụ ihe ịrịba ama ọ bụla nwere ike ime nke ịzụ ahịa onye n'ime iwu na-akwadoghị. Iji mee nke a, ị ga-achọ nyochaa njikọ nke nnukwu spikes ahịa na mwepụta mgbasa ozi dị mkpa.

Nchikota dị otú ahụ nwere usoro ndị a:

  • ịmepụta iyi maka ịgụ ihe site na isiokwu azụmahịa-ahịa;
  • na-achịkọta ndekọ na-abata site na ID onye zụrụ na akara ngwaahịa. Ịkpọ otu site na usoro na-eweghachi ihe atụ nke klas KGroupedStream;
  • Usoro KGroupedStream.windowedBy na-eweghachite iyi data na-ejedebe na windo oge, nke na-enye ohere nchịkọta nke windo. Dabere na ụdị mpio ahụ, a ga-eweghachite ma TimeWindowedKStream ma ọ bụ SessionWindowedKStream;
  • Ọnụọgụ azụmahịa maka ọrụ mkpokọta. Usoro data nke windowed na-ekpebi ma a na-etinye otu ndekọ na ọnụ ọgụgụ a;
  • ide rịzọlt na isiokwu ma ọ bụ wepụta ha na njikwa n'oge mmepe.

Topology nke ngwa a dị mfe, mana nkọwa doro anya banyere ya ga-enyere aka. Ka anyị leba anya na fig. 5.11.

Na-esote, anyị ga-eleba anya na arụmọrụ nke arụmọrụ windo yana koodu kwekọrọ.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"

Ụdị windo

Enwere ụdị windo atọ na iyi Kafka:

  • oge oge;
  • "ịkụ azụ" (ịkụ azụ);
  • slide/hopping.

Nke ị ga-ahọrọ dabere na ihe azụmahịa gị chọrọ. Window mwụda na ịwụ elu bụ oge nwere oke, ebe mpio nnọkọ nwere oke site na ọrụ onye ọrụ — A na-ekpebi oge nke oge nnọkọ naanị site na ka onye ọrụ si arụ ọrụ. Isi ihe ị ga-echeta bụ na ụdị windo niile dabere na akara ụbọchị / oge nke ntinye, ọ bụghị oge usoro.

Na-esote, anyị na-emejuputa topology anyị na ụdị windo ọ bụla. A ga-enye koodu zuru oke naanị na ihe atụ mbụ; maka ụdị windo ndị ọzọ ọ dịghị ihe ga-agbanwe ma e wezụga ụdị ọrụ windo.

Window nke oge

Window oge dị nnọọ iche na ụdị windo ndị ọzọ niile. Ha na-ejedebeghị bụghị nke ukwuu site na oge dị ka ọrụ nke onye ọrụ (ma ọ bụ ọrụ nke kwadoro na ị ga-amasị soro). A na-amachi windo oge site na oge anaghị arụ ọrụ.

Ọgụgụ 5.12 na-egosi echiche nke windo nnọkọ. Obere nnọkọ ga-ejikọta ya na nnọkọ ahụ n'aka ekpe ya. Na nnọkọ dị n'aka nri ga-adị iche n'ihi na ọ na-esote ogologo oge nke enweghị ọrụ. Window oge dabere na ọrụ onye ọrụ, mana jiri stampụ ụbọchị/oge sitere na ndenye iji chọpụta nnọkọ nke ntinye.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"

Iji windo oge iji soro azụmahịa azụmaahịa

Ka anyị jiri windo oge weghara ozi gbasara azụmahịa mgbanwe. Egosiri mmejuputa windo oge na Ndepụta 5.5 (nke enwere ike ịhụ na src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ị hụlarị ọtụtụ ọrụ na topology a, yabụ na ọ dịghị mkpa ilele ha anya ọzọ ebe a. Ma enwerekwa ọtụtụ ihe ọhụrụ ebe a, nke anyị ga-atụle ugbu a.

Ọrụ otu ọ bụlaSite na-arụkarị ụdị ọrụ nchịkọta (nchịkọta, mpịakọta, ma ọ bụ agụta). Ị nwere ike ịme mkpokọta mkpokọta na mkpokọta na-agba ọsọ, ma ọ bụ nchịkọta mpio, nke na-eburu n'uche ndekọ n'ime windo oge akọwapụtara.

Koodu dị na Ndepụta 5.5 na-agụta ọnụọgụ azụmahịa n'ime windo nnọkọ. Na fig. 5.13 A na-enyocha omume ndị a site na nzọụkwụ.

Site n'ịkpọ windowedBy(SessionWindows.with(twentySeconds)) ruo (nkeji iri na ise) anyị na-emepụta windo nnọkọ nwere oge adịghị arụ ọrụ nke 20 sekọnd yana nkwụsị nkwụsi ike nke nkeji iri na ise. Oge nkeji iri abụọ na-adịghị arụ ọrụ pụtara na ngwa a ga-agụnye ntinye ọ bụla bịarutere n'ime 15 sekọnd nke njedebe ma ọ bụ mmalite nke nnọkọ dị ugbu a n'ime oge ugbu a (na-arụ ọrụ).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Na-esote, anyị na-akọwapụta ọrụ nchịkọta nke kwesịrị ịrụ ọrụ na windo nnọkọ - na nke a, gụọ. Ọ bụrụ na ntinye mbata daa na mpụga mpio anaghị arụ ọrụ (akụkụ ọ bụla nke stampụ ụbọchị/oge), ngwa a na-emepụta nnọkọ ọhụrụ. Ogologo oge njide pụtara idowe nnọkọ maka oge ụfọdụ ma na-enye ohere maka data n'oge na-agbatị karịa oge anaghị arụ ọrụ nke oge mana enwere ike itinye ya. Na mgbakwunye, mmalite na njedebe nke nnọkọ ọhụrụ sitere na njikọta kwekọrọ na stampụ ụbọchị/oge izizi na nke kachasị ọhụrụ.

Ka anyị leba anya na ntinye ole na ole sitere na usoro ọnụọgụ iji hụ ka nnọkọ si arụ ọrụ (Table 5.1).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Mgbe ndekọ batara, anyị na-achọ nnọkọ ndị dị ugbu a nwere otu igodo ahụ, oge njedebe na-erughị stampụ ụbọchị/oge dị ugbu a - oge adịghị arụ ọrụ, yana oge mmalite karịa ụbọchị/oge stampụ + oge adịghị arụ ọrụ. N'iburu nke a n'uche, ndenye anọ sitere na tebụl. 5.1 na-ejikọta ya na otu nnọkọ dị ka ndị a.

1. Ndekọ 1 na-abata na mbụ, ya mere oge mmalite dị nhata na oge njedebe na bụ 00:00:00.

2. Na-esote, ntinye 2 na-abịarute, anyị na-achọkwa nnọkọ ndị na-akwụsị na mbụ karịa 23:59:55 ma malite n'ikpeazụ karịa 00:00:35. Anyị na-achọta ndekọ 1 ma jikọta nnọkọ 1 na 2. Anyị na-ewere oge mmalite nke nnọkọ 1 (n'oge gara aga) na njedebe oge nke 2 (mgbe e mesịrị), ka nnọkọ ọhụrụ anyị malite na 00:00:00 wee kwụsị na 00: 00:15.

3. Ndekọ 3 bịarutere, we look for sessions between 00:00:30 na 00:01:10 ma ghara ịhụ ihe ọ bụla. Tinye nnọkọ nke abụọ maka igodo 123-345-654,FFBE, malite na agwụ na 00:00:50.

4. Ndekọ 4 bịarutere ma anyị na-achọ nnọkọ n'etiti 23:59:45 na 00:00:25. A na-achọta oge a abụọ sessions 1 na 2. A na-ejikọta oge atọ niile n'ime otu, with a start time of 00:00:00 na njedebe nke 00:00:15.

Site na ihe akọwara na ngalaba a, ọ bara uru icheta nuances ndị a dị mkpa:

  • Oge oge abụghị windo ndị nwere oke. A na-ekpebi oge nke nnọkọ site na ọrụ n'ime oge enyere;
  • Stambọchị/oge stampụ dị na data na-ekpebi ma mmemme ahụ ọ dabara n'ime oge dị adị ma ọ bụ n'oge enweghị ọrụ.

Ọzọ, anyị ga-atụle ụdị windo ọzọ - windo "tumbling".

Window "na-akụda".

Window na-atụgharị na-ejide ihe ndị na-ada n'ime oge ụfọdụ. Were ya na ị ga-eweghara azụmahịa niile nke otu ụlọ ọrụ na sekọnd 20 ọ bụla, yabụ ị na-anakọta ihe omume niile n'ime oge ahụ. N'ọgwụgwụ nke nkeji iri abụọ nke abụọ, mpio ahụ na-atụgharị wee gaa na nkeji nleba anya nke 20 nke abụọ ọhụrụ. Ọgụgụ 20 na-egosi ọnọdụ a.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Dịka ị na-ahụ, mmemme niile enwetara n'ime sekọnd iri abụọ gara aga na-esonye na windo. Na njedebe nke oge a, a na-emepụta windo ọhụrụ.

Ndepụta 5.6 na-egosi koodu na-egosi iji windo tumbling iji weghara azụmahịa ngwaahịa kwa sekọnd 20 (dị na src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Site na obere mgbanwe a na oku usoro TimeWindows., ị nwere ike iji windo tumbling. Ọmụmaatụ a anaghị akpọ usoro ruo (), yabụ, a ga-eji oge njide ndabara nke awa 24.

N'ikpeazụ, ọ bụ oge ịkwaga na nke ikpeazụ nke nhọrọ windo - "hopping" windo.

Window na-amị amị ("na-awụlikwa elu")

Window na-amị amị/nwụpụ yiri windo na-akụda, mana ọ nwere ntakịrị ihe dị iche. Window na-amị amị anaghị echere ruo ngwụcha oge etiti oge tupu ịmepụta windo ọhụrụ iji hazie ihe omume na-adịbeghị anya. Ha na-amalite ngụkọ ọhụrụ mgbe oge nchere na-erughị oge windo.

Iji gosi ọdịiche dị n'etiti windo ịwụ na ịwụ elu, ka anyị laghachi n'ihe atụ nke ịgụta azụmahịa mgbanwe ngwaahịa. Ebumnuche anyị ka bụ ịgụta ọnụọgụ azụmahịa, mana anyị achọghị ichere oge niile tupu imelite counter. Kama, anyị ga-emelite counter na obere oge. Dịka ọmụmaatụ, anyị ka ga-agụta ọnụọgụ azụmahịa kwa sekọnd 20, mana melite counter ọ bụla 5 sekọnd, dị ka egosiri na fig. 5.15. Na nke a, anyị na-ejedebe na atọ results windows na overlapping data.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ndepụta 5.7 na-egosi koodu maka ịkọwapụta windo sliding (dị na src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Enwere ike ịtụgharị windo na-akụda mmụọ ka ọ bụrụ windo hopping site na ịgbakwunye oku na usoro advanceBy(). N'ihe atụ egosiri, oge nchekwa bụ nkeji iri na ise.

Ị hụrụ na ngalaba a ka esi amachi nsonaazụ nchịkọta na windo oge. Karịsịa, achọrọ m ka ị cheta ihe atọ ndị a sitere na ngalaba a:

  • Ogo nke oge windo na-ejedebe ọ bụghị site na oge, kama site na ọrụ onye ọrụ;
  • Window "tumbling" na-enye nkọwa nke ihe omume n'ime oge enyere;
  • Ogologo oge mpio ịwụ elu ka edobere, mana a na-emelite ha ugboro ugboro ma nwee ike ịnwe ndenye kpuchiri ekpuchi na mpio niile.

Ọzọ, anyị ga-amụta otu esi agbanwe KTable laghachi na KStream maka njikọ.

5.3.3. Jikọọ KStream na KTable ihe

N'Isi nke 4, anyị tụlere ijikọ ihe abụọ KStream. Ugbu a, anyị ga-amụta ka esi jikọọ KTable na KStream. Nke a nwere ike ịdị mkpa maka ihe kpatara ya dị mfe. KStream bụ iyi nke ndekọ, na KTable bụ iyi mmelite ndekọ, mana mgbe ụfọdụ ị nwere ike itinye mgbakwunye mgbakwunye na iyi ndekọ site na iji mmelite sitere na KTable.

Ka anyị were data na ọnụọgụ ahịa azụmaahịa ma jikọta ha na ozi mgbanwe ngwaahịa maka ụlọ ọrụ dị mkpa. Nke a bụ ihe ị ga-eme iji nweta nke a nyere koodu ị nweburu.

  1. Tụgharịa ihe KTable nwere data na ọnụọgụ ahịa azụmaahịa ka ọ bụrụ KStream, soro ya jiri igodo dochie igodo ya na mpaghara ụlọ ọrụ dabara na akara ngwaahịa a.
  2. Mepụta ihe KTable na-agụ data sitere na isiokwu nwere ozi mgbanwe ngwaahịa ngwaahịa. A ga-ahazi KTable ọhụrụ a site na ngalaba ụlọ ọrụ.
  3. Jikọọ mmelite akụkọ yana ozi gbasara ọnụọgụ ahịa mgbanwe ngwaahịa site na ngalaba ụlọ ọrụ.

Ugbu a, ka anyị hụ otu esi etinye atụmatụ mmemme a.

Tugharia KTable ka ọ bụrụ KStream

Iji tọghata KTable ka ọ bụrụ KStream ị ga-eme ihe ndị a.

  1. Kpọọ usoro KTable.toStream().
  2. Site n'ịkpọ usoro KStream.map, jiri aha ụlọ ọrụ dochie igodo ahụ, wee weghachite ihe TransactionSummary na ihe atụ Windowed.

Anyị ga-ekekọta ọrụ ndị a ọnụ dị ka ndị a (a pụrụ ịchọta koodu ahụ na faịlụ src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Ndepụta 5.8).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
N'ihi na anyị na-arụ ọrụ KStream.map, ihe atụ KStream eweghachiri eweghachi na-akpaghị aka mgbe ejiri ya na njikọ.

Anyị emechaala usoro ntụgharị, ọzọ anyị kwesịrị ịmepụta ihe KTable maka ịgụ akụkọ ngwaahịa.

Ịmepụta KTable maka akụkọ ngwaahịa

Ọ dabara nke ọma, ịmepụta ihe KTable na-ewe naanị otu ahịrị koodu (koodu enwere ike ịhụ na src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Ndepụta 5.9).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ọ dị mma ịmara na ọ nweghị ihe Serde achọrọ ka akọwapụtara ya, ebe ọ bụ na a na-eji eriri Serdes na ntọala. Ọzọkwa, site na iji akara mbụ, tebụl jupụtara na ndekọ na mmalite.

Ugbu a, anyị nwere ike ịga n'ihu na nzọụkwụ ikpeazụ - njikọ.

Jikọọ mmelite akụkọ yana data ọnụọgụ azụmahịa

Ịmepụta njikọ adịghị ike. Anyị ga-eji njikọ aka ekpe ma ọ bụrụ na enweghị akụkọ ngwaahịa maka ụlọ ọrụ dị mkpa (a pụrụ ịchọta koodu dị mkpa na faịlụ src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Ndepụta 5.10).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Onye ọrụ aka ekpe a dị mfe. N'adịghị ka njikọ ndị dị n'isi nke 4, a naghị eji usoro JoinWindow eme ihe n'ihi na mgbe ị na-eme njikọ KStream-KTable, enwere naanị otu ntinye na KTable maka igodo ọ bụla. Njikọ dị otú ahụ ejedebeghị na oge: ndekọ ahụ dị na KTable ma ọ bụ na-anọghị. Isi nkwubi okwu: iji ihe KTable ị nwere ike ime ka KStream nwee data ntụaka emelitere obere oge.

Ugbu a, anyị ga-eleba anya n'ụzọ ga-arụ ọrụ nke ọma iji mee ka mmemme si na KStream kwalite.

5.3.4. Ihe GlobalKTable

Dịka ị na-ahụ, ọ dị mkpa ịkwalite iyi mmemme ma ọ bụ tinye ihe gbara ya gburugburu na ha. N'isi nke 4 ị hụrụ njikọ dị n'etiti ihe abụọ KStream, na na ngalaba nke gara aga ị hụrụ njikọ dị n'etiti KStream na KTable. N'okwu ndị a niile, ọ dị mkpa ka ekewaghachi iyi data mgbe ị na-esepụta igodo maka ụdị ma ọ bụ uru ọhụrụ. Mgbe ụfọdụ, a na-eme nkewa n'ụzọ doro anya, na mgbe ụfọdụ Kafka Streams na-eme ya na-akpaghị aka. Ịmeghachi nkewa dị mkpa n'ihi na igodo agbanweela na ndekọ ga-ejedebe na ngalaba ọhụrụ, ma ọ bụghị na njikọ ahụ agaghị ekwe omume (nke a na-atụle n'Isi nke 4, na ngalaba "Ikekọrịta data" na nkeji 4.2.4).

Nkewa ọzọ nwere ọnụ ahịa

Mweghachi nkewa na-achọ ụgwọ - ụgwọ ọrụ akụrụngwa agbakwunyere maka ịmepụta isiokwu etiti, na-echekwa data oyiri na isiokwu ọzọ; ọ pụtakwara ịba ụba latency n'ihi ide na ịgụ site na isiokwu a. Na mgbakwunye, ọ bụrụ na ịchọrọ isonye n'ofe ihe karịrị otu akụkụ ma ọ bụ akụkụ, ị ga-emerịrị njikọ ndị ahụ, jiri igodo ọhụrụ mapụta ndekọ ahụ, wee mee usoro nkewa ọzọ.

Jikọọ na obere datasets

N'ọnọdụ ụfọdụ, olu data ntụaka nke a ga-ejikọta dị ntakịrị, ya mere mbipụta ya zuru oke nwere ike dabara na mpaghara ọ bụla n'ọnụ ọnụ nke ọ bụla. Maka ọnọdụ ndị dị otú a, Kafka Streams na-enye klas GlobalKTable.

Ihe atụ GlobalKTable pụrụ iche n'ihi na ngwa a na-emegharị data niile na ọnụ ọnụ nke ọ bụla. Ma ebe ọ bụ na data niile dị na ọnụ nke ọ bụla, ọ dịghị mkpa iji igodo ntụaka kewaa iyi ihe omume ahụ ka ọ dị na nkebi niile. Ị nwekwara ike iji ihe GlobalKTable mee njikọ enweghị igodo. Ka anyị laghachi azụ n'otu n'ime ihe atụ ndị gara aga iji gosi njirimara a.

Ijikọ ihe KStream na ihe GlobalKTable

Na nkebi nke 5.3.2, anyị mere nchịkọta windo nke azụmahịa mgbanwe site n'aka ndị na-azụ ahịa. Nsonaazụ nke mkpokọta a dị ka nke a:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Ọ bụ ezie na nsonaazụ ndị a rụpụtara ebumnuche ahụ, ọ ga-aba uru karị ma ọ bụrụ na egosipụtakwara aha onye ahịa na aha ụlọ ọrụ zuru ezu. Iji tinye aha ndị ahịa na aha ụlọ ọrụ, ị nwere ike ịme njikọ nkịtị, mana ị ga-achọ ịme maapụ igodo abụọ wee kewaa ọzọ. Site na GlobalKTable ị nwere ike zere ụgwọ ọrụ dị otú ahụ.

Iji mee nke a, anyị ga-eji ihe countStream sitere na Ndepụta 5.11 (a pụrụ ịhụ koodu kwekọrọ na src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) wee jikọọ ya na ihe abụọ GlobalKTable.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Anyị atụleworị nke a na mbụ, yabụ na agaghị m ekwugharị ya. Mana achọpụtara m na koodu dị na toStream () arụ ọrụ map na-edobe n'ime ihe arụrụ arụ kama okwu inline lambda maka ịgụta ya.

Nzọụkwụ na-esote bụ ikwupụta ihe atụ abụọ nke GlobalKTable (koodu egosiri nwere ike ịhụ na faịlụ src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Ndepụta 5.12).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"

Biko mara na a na-akọwa aha isiokwu site na iji ụdị agụpụtara.

Ugbu a anyị nwere ihe niile dị njikere, naanị ihe fọdụrụ bụ ide koodu maka njikọ (nke enwere ike ịchọta na faịlụ src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Ndepụta 5.13).

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Ọ bụ ezie na e nwere njikọ abụọ na koodu a, a na-eke ha agbụ n'ihi na ọ dịghị nke ọ bụla n'ime nsonaazụ ha bụ nke a na-eji iche. A na-egosipụta nsonaazụ ya na njedebe nke ọrụ ahụ dum.

Mgbe ị na-arụ ọrụ njikọ dị n'elu, ị ga-enweta nsonaazụ dịka nke a:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Ihe kachasị mkpa agbanwebeghị, mana nsonaazụ ndị a na-ele anya nke ọma.

Ọ bụrụ na ị gbadaa na isi nke 4, ị hụlarị ụdị njikọ dị iche iche na-arụ ọrụ. E depụtara ha na tebụl. 5.2. Tebụl a na-egosipụta ike njikọ dị ka ụdị 1.0.0 nke Kafka Streams; Ihe nwere ike ịgbanwe na mwepụta n'ọdịnihu.

Akwụkwọ "Kafka Streams in Action. Ngwa na microservices maka ọrụ ozugbo"
Iji kechie ihe, ka anyị chegharịa isi ihe: ị nwere ike jikọọ iyi mmemme (KStream) wee melite iyi (KTable) site na iji steeti mpaghara. N'aka nke ọzọ, ọ bụrụ na nha data ntinye aka adịghị oke, ị nwere ike iji ihe GlobalKTable. GlobalKTables na-emegharị akụkụ niile na oghere ngwa Kafka iyi ọ bụla, na-ahụ na data niile dị n'agbanyeghị akụkụ nke igodo kwekọrọ.

Ọzọ, anyị ga-ahụ njirimara Kafka Streams, ekele nke anyị nwere ike ịhụ mgbanwe steeti na-erighị data sitere na isiokwu Kafka.

5.3.5. Steeti nwere ike ịma

Anyị arụlarị ọtụtụ arụmọrụ metụtara steeti ma na-ewepụta nsonaazụ ya na njikwa (maka ebumnuche mmepe) ma ọ bụ dee ha na isiokwu (maka ebumnuche mmepụta). Mgbe ị na-ede nsonaazụ na isiokwu, ị ga-eji onye ahịa Kafka lelee ha.

Ịgụ data sitere na isiokwu ndị a nwere ike iwere ya dị ka ụdị echiche efu. Maka ebumnuche anyị, anyị nwere ike iji nkọwa nke nlele anya sitere na Wikipedia: “...ihe nchekwa data anụ ahụ nwere nsonaazụ nke ajụjụ. Dịka ọmụmaatụ, ọ nwere ike ịbụ nnomi data dịpụrụ adịpụ, ma ọ bụ obere nke ahịrị na/ma ọ bụ ogidi nke tebụl ma ọ bụ sonye rịzọlt, ma ọ bụ tebụl nchịkọta enwetara site na nchịkọta" (https://en.wikipedia.org/wiki). /Materialized_view).

Kafka Streams na-enye gị ohere ịme ajụjụ mkparịta ụka na ụlọ ahịa steeti, na-enye gị ohere ịgụpụta echiche ndị a na-ahụ anya ozugbo. Ọ dị mkpa iburu n'obi na ajụjụ a na-aga na ụlọ ahịa steeti bụ ọrụ na-agụ naanị. Nke a na-eme ka o doo anya na ị gaghị echegbu onwe gị maka ime ka steeti na-ekwekọghị ekwekọ na mberede mgbe ngwa gị na-ahazi data.

Ikike ịjụ ụlọ ahịa steeti ozugbo dị mkpa. Nke a pụtara na ị nwere ike ịmepụta ngwa dashboard na-ebughị ụzọ nweta data n'aka ndị ahịa Kafka. Ọ na-abawanye arụmọrụ nke ngwa ahụ, n'ihi na ọ dịghị mkpa ide data ọzọ:

  • ekele maka mpaghara data, enwere ike ịnweta ha ngwa ngwa;
  • A na-ewepụ mbigharị nke data, ebe ọ bụ na edeghị ya na nchekwa mpụga.

Isi ihe m chọrọ ka icheta bụ na ị nwere ike jụọ ajụjụ ozugbo site na ngwa gị. Ohere ndị a na-enye gị enweghị ike ikwubiga ya ókè. Kama iri data sitere na Kafka na ịchekwa ndekọ na nchekwa data maka ngwa ahụ, ị ​​nwere ike jụọ ụlọ ahịa steeti na otu nsonaazụ ahụ. Ajuju ajuju maka ụlọ ahịa steeti pụtara obere koodu (enweghị onye ahịa) yana obere ngwanrọ (enweghị mkpa maka tebụl nchekwa data iji chekwaa nsonaazụ).

Anyị ekpuchiela ntakịrị ala n'isiakwụkwọ a, yabụ anyị ga-ahapụ mkparịta ụka anyị gbasara ajụjụ mkparịta ụka megide ụlọ ahịa steeti ugbu a. Mana echegbula: n'Isi nke 9, anyị ga-emepụta ngwa dashboard dị mfe yana ajụjụ mmekọrịta. Ọ ga-eji ụfọdụ n'ime ihe atụ si na nke a na nke gara aga na-egosipụta ajụjụ mmekọrịta yana otu ị ga-esi tinye ha na ngwa Kafka Streams.

Nchịkọta

  • Ihe KStream na-anọchite anya iyi ihe omume, dị ka ntinye na nchekwa data. Ihe KTable na-anọchi anya iyi mmelite, dị ka mmelite na nchekwa data. Ogo nke ihe KTable adịghị eto, a na-eji ndị ọhụrụ dochie ihe ndekọ ochie.
  • Achọrọ ihe KTable maka ọrụ nchịkọta.
  • N'iji arụ ọrụ windo, ị nwere ike kewaa data ekpokọtara n'ime bọket oge.
  • Ekele maka ihe GlobalKTable, ị nwere ike ịnweta data ntụaka ebe ọ bụla na ngwa ahụ, n'agbanyeghị nkewa.
  • Njikọ n'etiti KStream, KTable na GlobalKTable ihe ga-ekwe omume.

Ka ọ dị ugbu a, anyị etinyela uche na iwu Kafka Streams ngwa site na iji KStream DSL dị elu. Ọ bụ ezie na usoro dị elu na-enye gị ohere ịmepụta mmemme dị mma na nke dị nkenke, iji ya na-anọchite anya ahia. Ịrụ ọrụ na DSL KStream pụtara ịba ụba nkenke nke koodu gị site na ibelata ogo njikwa. N'isiakwụkwọ na-esote, anyị ga-eleba anya na API ọnụ onye na-ahụ maka ọkwa dị ala wee nwaa azụmaahịa ndị ọzọ. Mmemme ga-adị ogologo karịa ka ọ dị na mbụ, mana anyị ga-enwe ike ịmepụta ihe fọrọ nke nta ka ọ bụrụ onye njikwa ọ bụla anyị nwere ike ịchọ.

→ Enwere ike ịchọta nkọwa ndị ọzọ gbasara akwụkwọ ahụ na webụsaịtị onye nkwusa

→ Maka Habrozhiteli 25% mbelata site na iji coupon - Kafka iyi

→ Mgbe ịkwụ ụgwọ maka ụdị akwụkwọ ahụ, a ga-eziga akwụkwọ eletrọnịkị site na e-mail.

isi: www.habr.com

Tinye a comment