La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro" Saluton, loĝantoj de Khabro! Ĉi tiu libro taŭgas por ajna programisto, kiu volas kompreni fadenan prilaboradon. Kompreni distribuitan programadon helpos vin pli bone kompreni Kafka kaj Kafka Streams. Estus bone koni la Kafkan kadron mem, sed ĉi tio ne estas necesa: mi rakontos al vi ĉion, kion vi bezonas. Spertaj Kafka-programistoj kaj novuloj same lernos kiel krei interesajn fluajn prilaborajn aplikaĵojn uzante la bibliotekon Kafka Streams en ĉi tiu libro. Mezaj kaj progresintaj Java programistoj jam konataj kun konceptoj kiel seriigo lernos apliki siajn kapablojn por krei Kafka Streams-aplikaĵojn. La fontkodo de la libro estas skribita en Java 8 kaj faras signifan uzon de Java 8 lambda esprimsintakso, do scii kiel labori kun lambda funkcioj (eĉ en alia programlingvo) estos utile.

Eltiraĵo. 5.3. Agregaj kaj fenestraj operacioj

En ĉi tiu sekcio, ni daŭrigos esplori la plej esperigajn partojn de Kafka Streams. Ĝis nun ni kovris la sekvajn aspektojn de Kafka Streams:

  • kreante prilaboran topologion;
  • uzado de stato en streaming-aplikoj;
  • elfarado de datumfluaj konektoj;
  • diferencoj inter eventofluoj (KStream) kaj ĝisdatigaj fluoj (KTable).

En la sekvaj ekzemploj ni kunigos ĉiujn ĉi tiujn elementojn. Vi ankaŭ lernos pri fenestrado, alia bonega funkcio de streaming-aplikoj. Nia unua ekzemplo estos simpla agregacio.

5.3.1. Agregado de akciaj vendoj laŭ industria sektoro

Agregado kaj grupiĝo estas esencaj iloj kiam oni laboras kun fluantaj datumoj. Ekzameno de individuaj rekordoj kiel ili estas ricevitaj estas ofte nesufiĉa. Por ĉerpi pliajn informojn el datumoj, necesas grupigi kaj kombini ilin.

En ĉi tiu ekzemplo, vi surmetos la kostumon de tagkomercisto, kiu bezonas spuri la vendan volumon de akcioj de kompanioj en pluraj industrioj. Specife, vi interesiĝas pri la kvin kompanioj kun la plej grandaj akciaj vendoj en ĉiu industrio.

Tia agregado postulos la sekvajn plurajn paŝojn por traduki la datumojn en la deziratan formon (parolante ĝenerale).

  1. Kreu teman fonton, kiu publikigas krudajn komercajn informojn. Ni devos mapi objekton de tipo StockTransaction al objekto de tipo ShareVolume. La punkto estas, ke la StockTransaction-objekto enhavas vendajn metadatumojn, sed ni bezonas nur datumojn pri la nombro da akcioj venditaj.
  2. Grupo ShareVolumdatumoj per akcia simbolo. Fojo grupigita per simbolo, vi povas kolapsi ĉi tiujn datumojn en subtotalojn de akciaj vendaj volumoj. Indas noti, ke la metodo KStream.groupBy resendas ekzemplon de tipo KGroupedStream. Kaj vi povas akiri KTable-instancon per pluvoko de la metodo KGroupedStream.reduce.

Kio estas la interfaco KGroupedStream

La metodoj KStream.groupBy kaj KStream.groupByKey resendas ekzemplon de KGroupedStream. KGroupedStream estas meza reprezentado de fluo de eventoj post grupiĝo per ŝlosiloj. Ĝi tute ne estas destinita por rekta laboro kun ĝi. Anstataŭe, KGroupedStream estas uzata por agregaciaj operacioj, kiuj ĉiam rezultigas KTable. Kaj ĉar la rezulto de agregaciaj operacioj estas KTable kaj ili uzas ŝtatan vendejon, eblas ke ne ĉiuj ĝisdatigoj kiel rezulto estas senditaj pli malsupre en la dukto.

La metodo KTable.groupBy resendas similan KGroupedTable - mezan reprezentadon de la fluo de ĝisdatigoj, regrupigita per ŝlosilo.

Ni prenu mallongan paŭzon kaj rigardu Fig. 5.9, kiu montras kion ni atingis. Ĉi tiu topologio jam devus esti tre konata al vi.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Ni nun rigardu la kodon por ĉi tiu topologio (ĝi troviĝas en la dosiero src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listo 5.2).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
La donita kodo distingiĝas per sia koncizeco kaj la granda kvanto de agoj faritaj en pluraj linioj. Vi eble rimarkos ion novan en la unua parametro de la metodo builder.stream: valoro de la enum-tipo AutoOffsetReset.EARLIEST (estas ankaŭ LATEST), agordita per la metodo Consumed.withOffsetResetPolicy. Ĉi tiu nombra tipo povas esti uzata por specifi ofsetan rekomencigitan strategion por ĉiu KStream aŭ KTable kaj havas prioritaton super la ofseta rekomencigita opcio de la agordo.

GroupByKey kaj GroupBy

La KStream-interfaco havas du metodojn por grupigi rekordojn: GroupByKey kaj GroupBy. Ambaŭ resendas KGroupedTable, do vi eble demandas, kio estas la diferenco inter ili kaj kiam uzi kiun?

La metodo GroupByKey estas uzata kiam la ŝlosiloj en la KStream jam estas nemalplenaj. Kaj plej grave, la flago "postulas re-dispartigon" neniam estis metita.

La GroupBy-metodo supozas, ke vi ŝanĝis la grupajn klavojn, do la reparticia flago estas agordita al vera. Fari kuniĝojn, kuniĝojn ktp post la metodo GroupBy rezultos en aŭtomata re-dispartigo.
Resumo: Kiam ajn eblas, vi devus uzi GroupByKey prefere ol GroupBy.

Estas klare, kion faras la metodoj mapValues ​​kaj groupBy, do ni rigardu la metodon sum() (trovebla en src/main/java/bbejeck/model/ShareVolume.java) (Listo 5.3).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
La metodo ShareVolume.sum redonas la totalan totalon de la akcia vendokvanto, kaj la rezulto de la tuta ĉeno de kalkuloj estas KTable objekto . Nun vi komprenas la rolon de KTable. Kiam ShareVolume-objektoj alvenas, la responda KTable-objekto konservas la plej lastan aktualan ĝisdatigon. Gravas memori, ke ĉiuj ĝisdatigoj estas reflektitaj en la antaŭa shareVolumeKTable, sed ne ĉiuj estas senditaj plu.

Ni tiam uzas ĉi tiun KTable por kunigi (laŭ nombro da akcioj komercitaj) por alveni al la kvin kompanioj kun la plej altaj volumoj de akcioj komercitaj en ĉiu industrio. Niaj agoj en ĉi tiu kazo estos similaj al tiuj por la unua agregado.

  1. Faru alian grupPer operacio por grupigi individuajn ShareVolume objektojn laŭ industrio.
  2. Komencu resumi ShareVolume-objektojn. Ĉi-foje la agregacia objekto estas fiksgranda prioritata atendovico. En ĉi tiu fiks-granda vico, nur la kvin kompanioj kun la plej grandaj kvantoj de akcioj venditaj estas retenitaj.
  3. Mapu la vicojn de la antaŭa alineo al korda valoro kaj redonu la plej bonajn kvin plej komercitajn akciojn laŭ nombro laŭ industrio.
  4. Skribu la rezultojn en ŝnuro al la temo.

En Fig. Figuro 5.10 montras la datumfluan topologian grafeon. Kiel vi povas vidi, la dua raŭndo de prilaborado estas sufiĉe simpla.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Nun kiam ni havas klaran komprenon pri la strukturo de ĉi tiu dua raŭndo de prilaborado, ni povas turni sin al ĝia fontkodo (vi trovos ĝin en la dosiero src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listo 5.4) .

Ĉi tiu komencanto enhavas fiksan Queue-variablon. Ĉi tio estas kutima objekto, kiu estas adaptilo por java.util.TreeSet, kiu estas uzata por spuri la suprajn N rezultojn en malkreskanta ordo de interŝanĝitaj akcioj.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Vi jam vidis la alvokojn groupBy kaj mapValues, do ni ne eniros en tiujn (ni nomas la metodon KTable.toStream ĉar la metodo KTable.print estas malrekomendita). Sed vi ankoraŭ ne vidis la KTable-version de aggregate() , do ni pasigos iom da tempo diskuti pri tio.

Kiel vi memoras, kio diferencas KTable estas, ke registroj kun la samaj ŝlosiloj estas konsiderataj ĝisdatigoj. KTable anstataŭigas la malnovan enskribon per nova. Agregado okazas en simila maniero: la plej novaj rekordoj kun la sama ŝlosilo estas kunigitaj. Kiam rekordo alvenas, ĝi estas aldonita al la klaso FixedSizePriorityQueue per aldonilo (dua parametro en la entuta metodovoko), sed se alia rekordo jam ekzistas kun la sama ŝlosilo, tiam la malnova rekordo estas forigita per subtraktoro (tria parametro en la entuta metodovoko).

Ĉi tio ĉio signifas, ke nia agregatoro, FixedSizePriorityQueue, ne kunigas ĉiujn valorojn per unu ŝlosilo, sed stokas moviĝantan sumon de la kvantoj de la N plej komercitaj specoj de akcioj. Ĉiu envenanta enskribo enhavas la totalan nombron da akcioj venditaj ĝis nun. KTable donos al vi informojn pri kiuj akcioj de kompanioj estas nuntempe la plej interŝanĝitaj, sen postuli daŭran agregadon de ĉiu ĝisdatigo.

Ni lernis fari du gravajn aferojn:

  • grupigu valorojn en KTable per komuna ŝlosilo;
  • fari utilajn operaciojn kiel kunigo kaj agregado sur ĉi tiuj grupigitaj valoroj.

Scii kiel fari ĉi tiujn operaciojn gravas por kompreni la signifon de la datumoj moviĝantaj tra aplikaĵo Kafka Streams kaj kompreni kiajn informojn ĝi portas.

Ni ankaŭ kunigis kelkajn el la ŝlosilaj konceptoj diskutitaj pli frue en ĉi tiu libro. En Ĉapitro 4, ni parolis pri kiom grava mistolerema, loka ŝtato estas por streaming-apliko. La unua ekzemplo en ĉi tiu ĉapitro pruvis kial loka ŝtato estas tiel grava—ĝi donas al vi la kapablon konservi trakon de kiaj informoj vi jam vidis. Loka aliro evitas retajn prokrastojn, igante la aplikaĵon pli efika kaj erarrezista.

Kiam vi faras ajnan kolektadon aŭ agregan operacion, vi devas specifi la nomon de la ŝtata vendejo. La kunigo kaj agregaciaj operacioj resendas KTable-instancon, kaj la KTable uzas ŝtatan stokadon por anstataŭigi malnovajn rezultojn per novaj. Kiel vi vidis, ne ĉiuj ĝisdatigoj estas senditaj laŭ la dukto, kaj ĉi tio estas grava ĉar agregaciaj operacioj estas dizajnitaj por produkti resumajn informojn. Se vi ne aplikas lokan ŝtaton, KTable plusendos ĉiujn agregajn kaj kolektajn rezultojn.

Poste, ni rigardos plenumi operaciojn kiel agregado ene de specifa tempodaŭro - tiel nomataj fenestraj operacioj.

5.3.2. Fenestraj operacioj

En la antaŭa sekcio, ni enkondukis glitan kunvolucion kaj agregadon. La aplikaĵo elfaris kontinuan kunigon de akciaj vendoj sekvitaj de agregado de la kvin plej komercitaj akcioj sur la interŝanĝo.

Foje tia kontinua kunigo kaj kunigo de rezultoj estas necesaj. Kaj foje vi bezonas fari operaciojn nur dum difinita tempodaŭro. Ekzemple, kalkulu kiom da interŝanĝaj transakcioj estis faritaj kun akcioj de aparta kompanio en la lastaj 10 minutoj. Aŭ kiom da uzantoj klakis sur nova reklama standardo en la lastaj 15 minutoj. Apliko povas plenumi tiajn operaciojn plurfoje, sed kun rezultoj kiuj validas nur por specifitaj tempodaŭroj (tempofenestroj).

Nombri interŝanĝajn transakciojn de aĉetanto

En la sekva ekzemplo, ni spuros akciajn transakciojn tra pluraj komercistoj - ĉu grandaj organizoj aŭ inteligentaj individuaj financistoj.

Estas du eblaj kialoj por ĉi tiu spurado. Unu el ili estas la bezono scii, kion merkataj gvidantoj aĉetas/vendas. Se ĉi tiuj grandaj ludantoj kaj sofistikaj investantoj vidas ŝancon, estas senco sekvi ilian strategion. La dua kialo estas la deziro ekvidi ajnajn eblajn signojn de kontraŭleĝa interna komerco. Por fari tion, vi devos analizi la korelacion de grandaj vendaj pikiloj kun gravaj gazetaraj komunikoj.

Tia spurado konsistas el la sekvaj paŝoj:

  • kreante fluon por legado de la stoko-transakcia temo;
  • grupigante envenantajn rekordojn laŭ aĉetanto-identigilo kaj akcia simbolo. Voki la metodon groupBy resendas ekzemplon de la klaso KGroupedStream;
  • La metodo KGroupedStream.windowedBy resendas datumfluon limigitan al tempofenestro, kiu permesas fenestran agregadon. Depende de la fenestrotipo, aŭ TimeWindowedKStream aŭ SessionWindowedKStream estas resendita;
  • transakciokalkulo por la agregacia operacio. La fenestra datumfluo determinas ĉu aparta rekordo estas konsiderata en ĉi tiu kalkulo;
  • skribi rezultojn al temo aŭ eligi ilin al la konzolo dum evoluo.

La topologio de ĉi tiu aplikaĵo estas simpla, sed klara bildo de ĝi estus helpema. Ni rigardu Fig. 5.11.

Poste, ni rigardos la funkciojn de fenestraj operacioj kaj la respondan kodon.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"

Fenestroj

Estas tri specoj de fenestroj en Kafka Streams:

  • sesio;
  • “falanta”;
  • glitante/saltetante.

Kiun elekti dependas de viaj komercaj postuloj. Tumblaj kaj saltantaj fenestroj estas templimigitaj, dum sesiofenestroj estas limigitaj per uzantagado—la daŭro de la sesio(j) estas determinita nur de kiom aktiva la uzanto estas. La ĉefa afero por memori estas, ke ĉiuj fenestraj tipoj baziĝas sur la dataj/horaj poŝtmarkoj de la enskriboj, ne la sistema tempo.

Poste ni efektivigas nian topologion kun ĉiu el la fenestrospecoj. La kompleta kodo estos donita nur en la unua ekzemplo; por aliaj specoj de fenestroj nenio ŝanĝiĝos krom la speco de fenestrofunkciado.

Sesiaj fenestroj

Sesiaj fenestroj estas tre malsamaj de ĉiuj aliaj specoj de fenestroj. Ili estas limigitaj ne tiom de tempo, kiom de la agado de la uzanto (aŭ la agado de la ento, kiun vi ŝatus spuri). Sesiaj fenestroj estas limigitaj per periodoj de neaktiveco.

Figuro 5.12 ilustras la koncepton de sesiofenestroj. La pli malgranda sesio kunfandiĝos kun la sesio maldekstre. Kaj la sesio dekstre estos aparta ĉar ĝi sekvas longan periodon de neaktiveco. Sesiaj fenestroj baziĝas sur uzanta agado, sed uzu datajn/horajn poŝtmarkojn de enskriboj por determini al kiu sesio apartenas la eniro.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"

Uzante sesiajn fenestrojn por spuri akciajn transakciojn

Ni uzu sesiajn fenestrojn por kapti informojn pri interŝanĝaj transakcioj. La efektivigo de sesiaj fenestroj estas montrita en Listo 5.5 (kiu troveblas en src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Vi jam vidis la plej multajn operaciojn en ĉi tiu topologio, do ne necesas rigardi ilin denove ĉi tie. Sed estas ankaŭ pluraj novaj elementoj ĉi tie, kiujn ni nun diskutos.

Ajna operacio groupBy tipe elfaras ian agregan operacion (agregado, kunigo aŭ nombrado). Vi povas fari aŭ akumulan agregadon kun kuranta totalo, aŭ fenestra agregado, kiu enkalkulas rekordojn ene de specifa tempofenestro.

La kodo en Listo 5.5 kalkulas la nombron da transakcioj ene de sesiaj fenestroj. En Fig. 5.13 ĉi tiuj agoj estas analizitaj paŝo post paŝo.

Vokante windowedBy(SessionWindows.with(dudekSekundoj).ĝis(dek kvin Minutoj)) ni kreas kunsidan fenestron kun senaktiveca intervalo de 20 sekundoj kaj persista intervalo de 15 minutoj. Neaktiva intervalo de 20 sekundoj signifas, ke la aplikaĵo inkludos ajnan eniron kiu alvenos ene de 20 sekundoj de la fino aŭ komenco de la nuna sesio en la nunan (aktivan) sesio.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Tuj poste, ni specifas, kian agregan operacion devas esti farita en la seanca fenestro - en ĉi tiu kazo, kalkulu. Se envenanta eniro falas ekster la neaktiveca fenestro (ambaŭ flankoj de la dato/hora stampo), la aplikaĵo kreas novan sesion. Retena intervalo signifas konservi seancon dum certa tempo kaj permesas malfruajn datumojn, kiuj etendiĝas preter la neaktivecperiodo de la sesio sed ankoraŭ povas esti alkroĉita. Aldone, la komenco kaj fino de la nova sesio rezultiĝanta el la kunfando respondas al la plej frua kaj lasta dato/hora stampo.

Ni rigardu kelkajn enskribojn de la kalkulmetodo por vidi kiel funkcias sesioj (Tabelo 5.1).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Kiam rekordoj alvenas, ni serĉas ekzistantajn sesiojn kun la sama ŝlosilo, fintempo malpli granda ol la nuna dato/tempo-marko - senaktiveca intervalo, kaj komenca tempo pli granda ol la nuna dato/tempomarko + senaktiva intervalo. Konsiderante ĉi tion, kvar enskriboj de tabelo. 5.1 estas kunfanditaj en ununuran sesion jene.

1. Rekordo 1 alvenas unue, do la komenca tempo estas egala al la fintempo kaj estas 00:00:00.

2. Poste, eniro 2 alvenas, kaj ni serĉas sesiojn kiuj finiĝas ne pli frue ol 23:59:55 kaj komenciĝas ne pli malfrue ol 00:00:35. Ni trovas rekordon 1 kaj kombinas sesiojn 1 kaj 2. Ni prenas la komencan tempon de sesio 1 (pli frue) kaj la fintempon de sesio 2 (poste), tiel ke nia nova sesio komenciĝas je 00:00:00 kaj finiĝas je 00: 00:15.

3. Rekordo 3 alvenas, ni serĉas sesiojn inter 00:00:30 kaj 00:01:10 kaj ne trovas iujn. Aldonu duan sesion por la ŝlosilo 123-345-654,FFBE, komencante kaj finiĝanta je 00:00:50.

4. Rekordo 4 alvenas kaj ni serĉas sesiojn inter 23:59:45 kaj 00:00:25. Ĉi-foje troviĝas ambaŭ sesioj 1 kaj 2. Ĉiuj tri sesioj estas kombinitaj en unu, kun komenca tempo de 00:00:00 kaj fintempo de 00:00:15.

El tio, kio estas priskribita en ĉi tiu sekcio, indas memori la jenajn gravajn nuancojn:

  • sesioj ne estas fiksgrandaj fenestroj. La daŭro de sesio estas determinita de la agado ene de difinita tempodaŭro;
  • La dataj/tempomarkoj en la datumoj determinas ĉu la okazaĵo falas ene de ekzistanta sesio aŭ dum neaktiva periodo.

Poste ni diskutos la sekvan tipon de fenestro - "falantaj" fenestroj.

"Tumbling" fenestroj

Tumblaj fenestroj kaptas eventojn, kiuj falas en certa tempodaŭro. Imagu, ke vi devas kapti ĉiujn akciajn transakciojn de certa kompanio ĉiujn 20 sekundojn, do vi kolektas ĉiujn eventojn dum tiu tempodaŭro. Je la fino de la 20-sekunda intervalo, la fenestro ruliĝas kaj moviĝas al nova 20-sekunda observa intervalo. Figuro 5.14 ilustras ĉi tiun situacion.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Kiel vi povas vidi, ĉiuj eventoj ricevitaj en la lastaj 20 sekundoj estas inkluzivitaj en la fenestro. Fine de ĉi tiu periodo, nova fenestro estas kreita.

Listo 5.6 montras kodon, kiu montras la uzon de falantaj fenestroj por kapti akciajn transakciojn ĉiujn 20 sekundojn (troveblas en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Kun ĉi tiu eta ŝanĝo al la metodovoko de TimeWindows.of, vi povas uzi falantan fenestron. Ĉi tiu ekzemplo ne nomas la metodon ĝis(), do la defaŭlta retenintervalo de 24 horoj estos uzata.

Fine, estas tempo pluiri al la lasta el la fenestro-opcioj - "saltetaj" fenestroj.

Glitantaj ("saltante") fenestroj

Glitantaj/saltetaj fenestroj similas al falantaj fenestroj, sed kun eta diferenco. Glitfenestroj ne atendas ĝis la fino de la tempointervalo antaŭ krei novan fenestron por prilabori lastatempajn eventojn. Ili komencas novajn kalkulojn post atenda intervalo malpli ol la fenestrodaŭro.

Por ilustri la diferencojn inter falantaj kaj saltantaj fenestroj, ni revenu al la ekzemplo de kalkulado de borsaj transakcioj. Nia celo ankoraŭ estas kalkuli la nombron da transakcioj, sed ni ne volas atendi la tutan tempon antaŭ ĝisdatigi la nombrilon. Anstataŭe, ni ĝisdatigos la nombrilon je pli mallongaj intervaloj. Ekzemple, ni ankoraŭ kalkulos la nombron da transakcioj ĉiujn 20 sekundojn, sed ĝisdatigos la nombrilon ĉiujn 5 sekundojn, kiel montrite en Fig. 5.15. En ĉi tiu kazo, ni finas kun tri rezultfenestroj kun interkovritaj datumoj.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Listo 5.7 montras la kodon por difini glitajn fenestrojn (troveblas en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Falanta fenestro povas esti konvertita al salteta fenestro aldonante vokon al la metodo advanceBy(). En la ekzemplo montrita, la ŝparintervalo estas 15 minutoj.

Vi vidis en ĉi tiu sekcio kiel limigi agregajn rezultojn al tempofenestroj. Aparte, mi volas, ke vi memoru la jenajn tri aferojn el ĉi tiu sekcio:

  • la grandeco de sesiaj fenestroj estas limigita ne de tempoperiodo, sed de uzanta agado;
  • "falantaj" fenestroj provizas superrigardon de eventoj en difinita tempodaŭro;
  • La daŭro de saltaj fenestroj estas fiksita, sed ili estas ofte ĝisdatigitaj kaj povas enhavi interkovrantajn enskribojn en ĉiuj fenestroj.

Poste, ni lernos kiel konverti KTable-on reen al KStream por konekto.

5.3.3. Konektante objektojn KStream kaj KTable

En Ĉapitro 4, ni diskutis konekti du KStream-objektojn. Nun ni devas lerni kiel konekti KTable kaj KStream. Ĉi tio povas esti bezonata pro la sekva simpla kialo. KStream estas fluo de rekordoj, kaj KTable estas fluo de rekordaj ĝisdatigoj, sed foje vi eble volas aldoni plian kuntekston al la rekordfluo uzante ĝisdatigojn de la KTable.

Ni prenu datumojn pri la nombro da borsaj transakcioj kaj kombinu ilin kun borsaj novaĵoj por la koncernaj industrioj. Jen kion vi devas fari por atingi tion pro la kodo, kiun vi jam havas.

  1. Konvertu KTable-objekton kun datumoj pri la nombro da akciaj transakcioj en KStream, sekvita per anstataŭigo de la ŝlosilo per la ŝlosilo indikanta la industrian sektoron respondan al ĉi tiu akcia simbolo.
  2. Kreu KTable-objekton, kiu legas datumojn de temo kun borsaj novaĵoj. Ĉi tiu nova KTable kategoriiĝos laŭ industria sektoro.
  3. Konektu novaĵajn ĝisdatigojn kun informoj pri la nombro da borsaj transakcioj laŭ industria sektoro.

Nun ni vidu kiel efektivigi ĉi tiun agadplanon.

Konverti KTable al KStream

Por konverti KTable al KStream vi devas fari la jenon.

  1. Voku la metodon KTable.toStream().
  2. Vokante la metodon KStream.map, anstataŭigu la ŝlosilon per la industrinomo, kaj poste prenu la objekton TransactionSummary el la Fenestra petskribo.

Ni ĉenos ĉi tiujn operaciojn kune jene (la kodo troviĝas en la dosiero src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listo 5.8).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Ĉar ni faras operacion KStream.map, la redonita KStream-instanco estas re-dispartita aŭtomate kiam ĝi estas uzata en konekto.

Ni kompletigis la konvertan procezon, poste ni devas krei KTable-objekton por legi akciajn novaĵojn.

Kreo de KTable por akciaj novaĵoj

Feliĉe, krei KTable-objekton bezonas nur unu linion de kodo (la kodo troviĝas en src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listo 5.9).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Indas noti, ke neniuj Serde-objektoj estas bezonataj por esti specifitaj, ĉar ĉenoj Serdes estas uzataj en la agordoj. Ankaŭ, uzante la PLEJ FRUAN nombradon, la tabelo estas plenigita kun registroj ĉe la komenco mem.

Nun ni povas pluiri al la fina paŝo - konekto.

Konektante novaĵajn ĝisdatigojn kun transakciaj nombro-datumoj

Krei konekton ne estas malfacila. Ni uzos maldekstran kunigon, se ne ekzistas akciaj novaĵoj por la koncerna industrio (la necesa kodo troviĝas en la dosiero src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listo 5.10).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Ĉi tiu leftJoin-funkciigisto estas sufiĉe simpla. Male al la kuniĝoj en Ĉapitro 4, la JoinWindow-metodo ne estas uzata ĉar kiam oni faras kunigon KStream-KTable, estas nur unu eniro en la KTable por ĉiu ŝlosilo. Tia konekto ne estas limigita en tempo: la registro estas aŭ en la KTable aŭ forestas. La ĉefa konkludo: uzante KTable-objektojn vi povas riĉigi KStream per malpli ofte ĝisdatigitaj referencaj datumoj.

Nun ni rigardos pli efikan manieron riĉigi eventojn de KStream.

5.3.4. GlobalKTable objektoj

Kiel vi povas vidi, necesas riĉigi eventofluojn aŭ aldoni kuntekston al ili. En Ĉapitro 4 vi vidis la ligojn inter du KStream-objektoj, kaj en la antaŭa sekcio vi vidis la ligon inter KStream kaj KTable. En ĉiuj ĉi tiuj kazoj, estas necese re-dispartigi la datumfluon dum mapado de la ŝlosiloj al nova tipo aŭ valoro. Foje redividado estas farita eksplicite, kaj foje Kafka Streams faras ĝin aŭtomate. Redispartigo estas necesa ĉar la ŝlosiloj ŝanĝiĝis kaj la rekordoj devas finiĝi en novaj sekcioj, alie la konekto estos neebla (tio estis diskutita en Ĉapitro 4, en la sekcio "Redispartigi datumojn" en subsekcio 4.2.4).

Re-dispartigo havas koston

Re-partitioning requires costs - aldonaj rimedkostoj por krei mezajn temojn, stoki duplikatajn datumojn en alia temo; ĝi ankaŭ signifas pliigitan latentecon pro skribo kaj legado de ĉi tiu temo. Aldone, se vi bezonas aliĝi trans pli ol unu aspekto aŭ dimensio, vi devas ĉeni la kuniĝojn, mapi la rekordojn per novaj ŝlosiloj kaj ruli la re-dispartiga procezo denove.

Konektante al pli malgrandaj datumaroj

En kelkaj kazoj, la volumeno de referencdatenoj por esti ligita estas relative malgranda, tiel ke kompletaj kopioj de ĝi povas facile konveni loke sur ĉiu nodo. Por situacioj kiel ĉi tio, Kafka Streams disponigas la GlobalKTable-klason.

GlobalKTable-kazoj estas unikaj ĉar la aplikaĵo reproduktas ĉiujn datumojn al ĉiu el la nodoj. Kaj ĉar ĉiuj datumoj ĉeestas sur ĉiu nodo, ne necesas dividi la eventofluon per referenca datuma ŝlosilo por ke ĝi estu disponebla por ĉiuj sekcioj. Vi ankaŭ povas fari senŝlosilajn kuniĝojn per GlobalKTable-objektoj. Ni reiru al unu el la antaŭaj ekzemploj por pruvi ĉi tiun funkcion.

Konektante KStream-objektojn al GlobalKTable-objektoj

En subsekcio 5.3.2, ni faris fenestran agregadon de interŝanĝaj transakcioj de aĉetantoj. La rezultoj de ĉi tiu agregado aspektis kiel ĉi tio:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Dum ĉi tiuj rezultoj servis la celon, estus pli utile se la nomo de la kliento kaj la plena firmaonomo ankaŭ estus montritaj. Por aldoni la klientnomon kaj firmaonomon, vi povas fari normalajn kuniĝojn, sed vi devos fari du ŝlosilajn mapojn kaj re-dispartigon. Kun GlobalKTable vi povas eviti la koston de tiaj operacioj.

Por fari tion, ni uzos la objekton countStream el Listo 5.11 (la responda kodo troviĝas en src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) kaj konektos ĝin al du GlobalKTable-objektoj.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Ni jam diskutis tion antaŭe, do mi ne ripetos ĝin. Sed mi rimarkas, ke la kodo en la toStream().map-funkcio estas abstraktita en funkciobjekton anstataŭ enlinia lambda esprimo por la legebleco.

La sekva paŝo estas deklari du okazojn de GlobalKTable (la montrita kodo troviĝas en la dosiero src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listo 5.12).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"

Bonvolu noti, ke temoj nomoj estas priskribitaj uzante listigitajn tipojn.

Nun kiam ni havas ĉiujn komponantojn pretajn, restas nur skribi la kodon por la konekto (kiu troviĝas en la dosiero src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listo 5.13).

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Kvankam estas du kuniĝoj en ĉi tiu kodo, ili estas ĉenitaj ĉar neniu el iliaj rezultoj estas uzataj aparte. La rezultoj estas montrataj ĉe la fino de la tuta operacio.

Kiam vi rulas ĉi-supran kunig-operacion, vi ricevos rezultojn kiel ĉi tion:

{customer='Barney, Smith' company="Exxon", transactions= 17}

La esenco ne ŝanĝiĝis, sed ĉi tiuj rezultoj aspektas pli klaraj.

Se vi kalkulas malsupren ĝis Ĉapitro 4, vi jam vidis plurajn specojn de ligoj en ago. Ili estas listigitaj en tabelo. 5.2. Ĉi tiu tabelo reflektas la konektebleckapablojn de versio 1.0.0 de Kafka Streams; Io povas ŝanĝiĝi en estontaj eldonoj.

La libro “Kafka Riveroj en Agado. Aplikoj kaj mikroservoj por realtempa laboro"
Por fini aferojn, ni resumu la bazojn: vi povas konekti eventofluojn (KStream) kaj ĝisdatigi fluojn (KTable) uzante lokan ŝtaton. Alternative, se la grandeco de la referencaj datumoj ne estas tro granda, vi povas uzi la objekton GlobalKTable. GlobalKTables reproduktas ĉiujn sekciojn al ĉiu aplikaĵnodo de Kafka Streams, certigante ke ĉiuj datumoj estas haveblaj sendepende de kiu sekcio la ŝlosilo respondas.

Poste ni vidos la funkcion de Kafka Streams, danke al kiu ni povas observi ŝtatŝanĝojn sen konsumi datumojn de Kafka temo.

5.3.5. Demandebla stato

Ni jam faris plurajn operaciojn implikantajn ŝtaton kaj ĉiam eligis la rezultojn al la konzolo (por disvolvaj celoj) aŭ skribas ilin al temo (por produktadaj celoj). Skribante rezultojn al temo, vi devas uzi Kafka-konsumanton por vidi ilin.

Legado de datumoj de ĉi tiuj temoj povas esti konsiderata speco de realigitaj vidoj. Por niaj celoj, ni povas uzi la difinon de materiigita vido de Vikipedio: “...fizika datumbaza objekto enhavanta la rezultojn de demando. Ekzemple, ĝi povus esti loka kopio de foraj datumoj, aŭ subaro de la vicoj kaj/aŭ kolumnoj de tabelo aŭ kunigrezultoj, aŭ resuma tabelo akirita per agregado" (https://en.wikipedia.org/wiki /Materiigita_vido).

Kafka Streams ankaŭ ebligas al vi fari interagajn demandojn en ŝtataj butikoj, permesante al vi rekte legi ĉi tiujn realigitajn vidojn. Gravas noti, ke la demando al la ŝtata vendejo estas nurlegebla operacio. Ĉi tio certigas, ke vi ne devas zorgi pri hazarde malkonsekvenca stato dum via aplikaĵo prilaboras datumojn.

Gravas la kapablo rekte pridemandi ŝtatbutikojn. Ĉi tio signifas, ke vi povas krei panelajn aplikaĵojn sen devi unue preni datumojn de la konsumanto de Kafka. Ĝi ankaŭ pliigas la efikecon de la aplikaĵo, pro la fakto, ke ne necesas skribi datumojn denove:

  • danke al la loko de la datumoj, ili povas esti rapide alireblaj;
  • duobligo de datumoj estas forigita, ĉar ĝi ne estas skribita al ekstera stokado.

La ĉefa afero, kiun mi volas, ke vi memoru, estas, ke vi povas rekte demandi staton el via aplikaĵo. La ŝancoj, kiujn ĉi tio donas al vi, ne povas esti troigitaj. Anstataŭ konsumi datumojn de Kafka kaj konservi rekordojn en datumbazo por la aplikaĵo, vi povas pridemandi ŝtatajn butikojn kun la sama rezulto. Rektaj demandoj al ŝtataj butikoj signifas malpli da kodo (neniu konsumanto) kaj malpli da programaro (neniu bezono de datumbaza tablo por stoki la rezultojn).

Ni kovris sufiĉe da grundo en ĉi tiu ĉapitro, do ni lasos nian diskuton pri interagaj demandoj kontraŭ ŝtataj butikoj nuntempe. Sed ne maltrankviliĝu: en Ĉapitro 9, ni kreos simplan panelan aplikaĵon kun interagaj demandoj. Ĝi uzos kelkajn el la ekzemploj de ĉi tiu kaj antaŭaj ĉapitroj por montri interagajn demandojn kaj kiel vi povas aldoni ilin al Kafka Streams-aplikoj.

Resumo

  • KStream-objektoj reprezentas fluojn de eventoj, kompareblaj al enigaĵoj en datumbazon. KTable-objektoj reprezentas ĝisdatigajn fluojn, pli kiel ĝisdatigojn al datumbazo. La grandeco de la objekto KTable ne kreskas, malnovaj registroj estas anstataŭigitaj per novaj.
  • KTable-objektoj estas postulataj por agregaciaj operacioj.
  • Uzante fenestrajn operaciojn, vi povas dividi agregitajn datumojn en tempajn sitelojn.
  • Danke al GlobalKTable-objektoj, vi povas aliri referencajn datumojn ie ajn en la aplikaĵo, sendepende de dispartigo.
  • Ligoj inter objektoj KStream, KTable kaj GlobalKTable eblas.

Ĝis nun, ni koncentriĝis pri konstruado de Kafka Streams-aplikoj uzante la altnivelan KStream DSL. Kvankam la altnivela aliro permesas krei bonordajn kaj koncizajn programojn, uzi ĝin reprezentas interŝanĝon. Labori kun DSL KStream signifas pliigi la koncizecon de via kodo reduktante la gradon de kontrolo. En la sekva ĉapitro, ni rigardos la malaltnivelan prizorgan nodon API kaj provos aliajn kompromisojn. La programoj estos pli longaj ol antaŭe, sed ni povos krei preskaŭ ajnan prizorgan nodon, kiun ni eble bezonos.

→ Pliaj detaloj pri la libro troveblas ĉe retejo de la eldonisto

→ Por Habrozhiteli 25% rabato uzante kuponon - Kafka Rojoj

→ Paginte por la papera versio de la libro, elektronika libro estos sendita retpoŝte.

fonto: www.habr.com

Aldoni komenton