“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko" Kaixo, Khabro bizilagunak! Liburu hau haria prozesatzea ulertu nahi duen edozein garatzailerentzat egokia da. Banatutako programazioa ulertzeak Kafka eta Kafka Streams hobeto ulertzen lagunduko dizu. Polita litzateke Kafka markoa bera ezagutzea, baina hau ez da beharrezkoa: behar duzun guztia esango dizut. Kafka garatzaile esperientziadunek eta hasiberriek korronteen prozesatzeko aplikazio interesgarriak nola sortzen ikasiko dute liburu honetan Kafka Streams liburutegia erabiliz. Serializazioa bezalako kontzeptuak ezagutzen dituzten Java garatzaile ertaineko eta aurreratuek beren gaitasunak aplikatzen ikasiko dute Kafka Streams aplikazioak sortzeko. Liburuaren iturburu-kodea Java 8n idatzita dago eta Java 8 lambda adierazpen sintaxia nabarmen erabiltzen du, beraz, lambda funtzioekin lan egiten jakitea (baita beste programazio-lengoaia batean ere) ondo etorriko da.

Laburpena. 5.3. Agregazio- eta leiho-eragiketak

Atal honetan, Kafka Streams-en zatirik itxaropentsuenak arakatzen joango gara. Orain arte Kafka Streams-en alderdi hauek landu ditugu:

  • prozesatzeko topologia bat sortzea;
  • egoera erabiltzea streaming aplikazioetan;
  • datu-korronte konexioak egitea;
  • gertaeren korronteen (KStream) eta eguneratze korronteen (KTable) arteko desberdintasunak.

Hurrengo adibideetan elementu horiek guztiak elkartuko ditugu. Leihoei buruz ere ikasiko duzu, streaming aplikazioen beste ezaugarri handi bat. Gure lehen adibidea agregazio sinple bat izango da.

5.3.1. Stocken salmentak industria-sektoreen arabera batzea

Agregazioa eta taldekatzea ezinbesteko tresnak dira streaming datuekin lan egiteko. Erregistro indibidualak jasotzen diren heinean aztertzea nahikoa ez da askotan. Datuetatik informazio gehigarria ateratzeko, taldekatu eta konbinatu egin behar da.

Adibide honetan, hainbat industriatako enpresen izakinen salmenta-bolumenaren jarraipena egin behar duen eguneko dendari baten mozorroa jarriko duzu. Zehazki, industria bakoitzean akzio salmenta handienak dituzten bost enpresetan interesatzen zaizu.

Agregazio horrek hurrengo urratsak beharko ditu datuak nahi den formara itzultzeko (termino orokorrean hitz eginez).

  1. Sortu gaietan oinarritutako iturri bat, stocken merkataritzako informazio gordina argitaratzen duena. StockTransaction motako objektu bat ShareVolume motako objektu batekin mapatu beharko dugu. Kontua da StockTransaction objektuak salmenta metadatuak dituela, baina saltzen diren akzio kopuruari buruzko datuak soilik behar ditugula.
  2. Taldeak partekatuBolumearen datuak stock ikurren arabera. Ikurren arabera taldekatu ondoren, datu hauek akzioen salmenten bolumenen azpitotaletan bildu ditzakezu. Kontuan izan behar da KStream.groupBy metodoak KGroupedStream motako instantzia bat itzultzen duela. Eta KTable instantzia bat lor dezakezu KGroupedStream.reduce metodora deituz.

Zer da KGroupedStream interfazea

KStream.groupBy eta KStream.groupByKey metodoek KGroupedStream-en instantzia bat itzultzen dute. KGroupedStream gakoen arabera taldekatu ondoren gertaeren korronte baten tarteko irudikapena da. Ez dago batere berarekin lan zuzenerako pentsatuta. Horren ordez, KGroupedStream agregazio-eragiketetarako erabiltzen da, beti KTable bat sortzen dutenak. Eta agregazio-eragiketen emaitza KTable bat denez eta egoera biltegi bat erabiltzen dutenez, baliteke horren ondorioz eguneratze guztiak ez bidaltzea kanalizaziotik behera.

KTable.groupBy metodoak KGroupedTable antzeko bat itzultzen du - eguneratze-korrontearen tarteko irudikapena, gakoen arabera birtaldetuta.

Har dezagun atseden txiki bat eta ikus dezagun irudira. 5.9, lortu duguna erakusten duena. Topologia hau dagoeneko oso ezaguna izan beharko zenuke.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Ikus dezagun orain topologia honen kodea (src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java fitxategian aurki daiteke) (5.2 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Emandako kodea bere laburtasunagatik eta hainbat lerrotan egindako ekintzen bolumen handiagatik bereizten da. Baliteke zerbait berria nabaritzea builder.stream metodoaren lehen parametroan: AutoOffsetReset.EARLIEST enumerazio motako balio bat (AZKENENA ere badago), Consumed.withOffsetResetPolicy metodoa erabiliz ezarria. Zenbaketa mota hau KStream edo KTable bakoitzerako desplazamendu berrezartzeko estrategia bat zehazteko erabil daiteke eta konfigurazioko desplazamenduaren berrezartze aukeraren aurrean lehentasuna du.

GroupByKey eta GroupBy

KStream interfazeak bi metodo ditu erregistroak taldekatzeko: GroupByKey eta GroupBy. Biek KGroupedTable itzultzen dute, beraz, galdetzen ari zarete zein den haien arteko aldea eta noiz erabili zein den?

GroupByKey metodoa erabiltzen da KStream-eko gakoak jada hutsik ez daudenean. Eta garrantzitsuena, "berriro partizionatzea eskatzen du" bandera ez zen inoiz ezarri.

GroupBy metodoak taldekatzeko gakoak aldatu dituzula suposatzen du, beraz, zatiketa-marka egia gisa ezarrita dago. GroupBy metodoaren ondoren batzeak, agregazioak eta abar egiteak berriro partizio automatikoa izango du.
Laburpena: Ahal den guztietan, GroupBy erabili beharrean, GroupByKey.

Argi dago mapValues ​​eta groupBy metodoek zer egiten duten, beraz, ikus dezagun sum() metodoa (src/main/java/bbejeck/model/ShareVolume.java-n aurkitzen dena) (5.3 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
ShareVolume.sum metodoak akzioen salmenten bolumenaren guztizkoa itzultzen du, eta kalkulu-kate osoaren emaitza KTable objektu bat da. . Orain ulertzen duzu KTablek betetzen duen papera. ShareVolume objektuak iristen direnean, dagokion KTable objektuak uneko azken eguneratzea gordetzen du. Garrantzitsua da gogoratzea eguneratze guztiak aurreko shareVolumeKTable-n islatzen direla, baina guztiak ez direla aurrerago bidaltzen.

Ondoren, KTable hau erabiltzen dugu (negoziatutako akzio kopuruaren arabera) industria bakoitzean negoziatutako akzio-bolumen handiena duten bost enpresetara iristeko. Kasu honetan gure ekintzak lehen agregaziokoen antzekoak izango dira.

  1. Egin beste groupBy eragiketa banakako ShareVolume objektuak industriaren arabera taldekatzeko.
  2. Hasi ShareVolume objektuak laburtzen. Oraingoan agregazio-objektua tamaina finkoko lehentasun-ilara bat da. Tamaina finkoko ilara honetan, saldutako akzio kopuru handienak dituzten bost enpresak bakarrik mantentzen dira.
  3. Mapeatu aurreko paragrafoko ilarak kate-balio batera eta itzuli gehien saltzen diren bost akzio nagusiak industriaren arabera.
  4. Idatzi emaitzak kate moduan gaiari.

Irudian. 5.10 irudiak datu-fluxuaren topologia grafikoa erakusten du. Ikus dezakezunez, bigarren prozesatzeko txanda nahiko erraza da.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Bigarren prozesatzeko txanda honen egitura argi ulertzen dugunean, bere iturburu-kodera jo dezakegu (src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java fitxategian aurkituko duzu) (5.4 zerrenda). .

Hasieratzaile honek fixedQueue aldagai bat dauka. Hau java.util.TreeSet-erako egokitzaile bat den objektu pertsonalizatua da, eta N emaitzen goi mailako N emaitzen jarraipena egiteko erabiltzen den akzioen beheranzko ordenan.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Dagoeneko ikusi dituzu groupBy eta mapValues ​​​​deiak, beraz, ez gara horietan sartuko (KTable.toStream metodoari deitzen diogu KTable.print metodoa zaharkituta dagoelako). Baina oraindik ez duzu aggregate()-ren KTable bertsioa ikusi, beraz, denbora pixka bat emango dugu horri buruz eztabaidatzen.

Gogoratzen duzunez, KTable ezberdintzen duena da gako berdinak dituzten erregistroak eguneratzetzat hartzen direla. KTablek sarrera zaharra berri batekin ordezkatzen du. Agregazioa antzera gertatzen da: gako bera duten azken erregistroak batu egiten dira. Erregistro bat iristen denean, FixedSizePriorityQueue klaseko instantziara gehitzen da gehigarri bat erabiliz (metodo agregatuaren deian bigarren parametroa), baina dagoeneko beste erregistro bat badago gako berarekin, orduan erregistro zaharra kenduko da kengailu baten bidez (hirugarren parametroa). metodo agregatua deia).

Horrek guztiak esan nahi du gure agregatzaileak, FixedSizePriorityQueue, ez dituela balio guztiak gako batekin batzen, baizik eta gehien negoziatzen diren N akzio motaren kantitateen batura mugikorra gordetzen du. Sarrerako sarrera bakoitzak orain arte saldutako akzio kopurua dauka. KTable-k gaur egun gehien negoziatzen diren enpresen akzioei buruzko informazioa emango dizu, eguneraketa bakoitzaren agregazio iraunkorra behar izan gabe.

Bi gauza garrantzitsu egiten ikasi genuen:

  • taldekatu balioak KTable-n gako komun baten bidez;
  • egin eragiketa erabilgarriak, hala nola bilketa eta batuketa, taldekatutako balio horien gainean.

Eragiketa hauek egiten jakitea garrantzitsua da Kafka Streams aplikazio batean mugitzen diren datuen esanahia ulertzeko eta zer informazio daraman ulertzeko.

Liburu honetan lehen landutako funtsezko kontzeptu batzuk ere bildu ditugu. 4. kapituluan, akatsak toleranteak diren tokiko egoerak streaming aplikazio baterako garrantzitsuak diren aztertu dugu. Kapitulu honetako lehen adibidean tokiko estatua zergatik den hain garrantzitsua frogatu zen; lehendik ikusi duzun informazioaren jarraipena egiteko aukera ematen du. Tokiko sarbideak sareko atzerapenak saihesten ditu, aplikazioa eraginkorragoa eta akatsekiko erresistenteagoa bihurtuz.

Bilketa- edo agregazio-eragiketa bat egitean, egoera-biltegiaren izena zehaztu behar duzu. Bilketa- eta agregazio-eragiketek KTable-ren instantzia bat itzultzen dute, eta KTable-k egoera-biltegia erabiltzen du emaitza zaharrak berriekin ordezkatzeko. Ikusi duzun bezala, eguneratze guztiak ez dira bidetik bidaltzen, eta hori garrantzitsua da agregazio-eragiketak laburpen-informazioa sortzeko diseinatuta daudelako. Tokiko estatua aplikatzen ez baduzu, KTable-k agregazio eta bilketa-emaitza guztiak birbidaltuko ditu.

Ondoren, denbora-tarte zehatz batean agregazioa bezalako eragiketak egitea aztertuko dugu - leiho-eragiketak deiturikoak.

5.3.2. Leiho eragiketak

Aurreko atalean, irristakorra konboluzioa eta agregazioa sartu dugu. Aplikazioak akzioen salmenten bolumenaren etengabeko bilketa egin zuen, eta ondoren, trukean gehien negoziatzen diren bost akzioen batuketa egin zen.

Batzuetan, etengabeko agregazioa eta emaitzak biltzea beharrezkoa da. Eta batzuetan denbora-tarte jakin batean bakarrik egin behar dituzu eragiketak. Adibidez, kalkulatu zenbat truke-transakzio egin diren enpresa jakin baten akzioekin azken 10 minutuetan. Edo zenbat erabiltzailek klik egin duten publizitate-banner berri batean azken 15 minutuetan. Aplikazio batek eragiketa horiek hainbat aldiz egin ditzake, baina denbora-tarte zehatzei soilik aplikatzen zaizkien emaitzak (denbora-leioak).

Eroslearen truke-eragiketak zenbatzea

Hurrengo adibidean, hainbat merkatariren stock-transakzioen jarraipena egingo dugu, erakunde handietan edo finantzatzaile adimendunetan.

Bi arrazoi egon daitezke jarraipena egiteko. Horietako bat merkatuko liderrak zer erosten/saltzen ari diren jakitea da. Jokalari handi eta inbertitzaile sofistikatu hauek aukera ikusten badute, zentzuzkoa da euren estrategia jarraitzea. Bigarren arrazoia legez kanpoko barruko merkataritzaren zantzu posibleak antzematea da. Horretarako, salmenta-puntu handiek prentsa ohar garrantzitsuekin duten korrelazioa aztertu beharko duzu.

Jarraipen horrek urrats hauek ditu:

  • akzio-transakzioen gaitik irakurtzeko korronte bat sortzea;
  • sarrerako erregistroak erosle IDaren eta akzio-ikurren arabera taldekatzea. groupBy metodoari deitzeak KGroupedStream klasearen instantzia bat itzultzen du;
  • KGroupedStream.windowedBy metodoak denbora-leiho batera mugatutako datu-korrontea itzultzen du, eta horrek leiho-agregazioa ahalbidetzen du. Leiho motaren arabera, TimeWindowedKStream edo SessionWindowedKStream bat itzultzen da;
  • batuketa eragiketarako transakzio kopurua. Leihodun datu-fluxuak zehazten du zenbaketa honetan erregistro jakin bat kontuan hartzen den;
  • emaitzak idaztea gai bati edo kontsolara ateratzea garapenean zehar.

Aplikazio honen topologia sinplea da, baina horren irudi argia lagungarria izango litzateke. Ikus dezagun irudira. 5.11.

Ondoren, leihoen eragiketen funtzionaltasuna eta dagokion kodea ikusiko dugu.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"

Leiho motak

Kafka Streams-en hiru leiho mota daude:

  • saioa;
  • “buelta”;
  • irristatu/jauzika.

Zein aukeratu zure negozioaren eskakizunen araberakoa da. Tumbling eta jauzi-leihoak denbora mugatuta daude, eta saio-leihoak, berriz, erabiltzailearen jarduerak mugatzen ditu. Gogoratu beharreko gauza nagusia da leiho mota guztiak sarreren data/orduaren zigiletan oinarritzen direla, ez sistemaren orduan.

Ondoren, gure topologia inplementatzen dugu leiho mota bakoitzarekin. Kode osoa lehenengo adibidean bakarrik emango da; beste leiho motetarako ezer ez da aldatuko leihoaren eragiketa mota izan ezik.

Saioaren leihoak

Saio-leihoak oso desberdinak dira beste leiho mota guztien aldean. Ez dira denboraren arabera mugatzen erabiltzailearen jarduerak (edo jarraipena egin nahi duzun entitatearen jarduerak). Saio-leihoak jarduerarik gabeko aldiek mugatzen dituzte.

5.12 irudian saio-leihoen kontzeptua azaltzen da. Saio txikiagoak bere ezkerreko saioarekin bat egingo du. Eta eskuineko saioa apartekoa izango da, jarduerarik gabeko denbora luzea jarraitzen duelako. Saio-leihoak erabiltzailearen jardueran oinarritzen dira, baina erabili sarreretako data/ordu-zigiluak sarrera zein saiotakoa den zehazteko.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"

Saio-leihoak erabiltzea stock-transakzioen jarraipena egiteko

Erabili ditzagun saio-leihoak truke-transakzioei buruzko informazioa harrapatzeko. Saio-leihoen ezarpena 5.5 zerrendan agertzen da (src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java-n aurki daiteke).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Topologia honetako eragiketa gehienak ikusi dituzu dagoeneko, beraz, ez dago hemen berriro begiratu beharrik. Baina hemen ere badaude hainbat elementu berri, orain eztabaidatuko ditugunak.

Edozein groupBy-ren eragiketak batuketa-eragiketa bat egiten du normalean (agregazioa, bilketa edo zenbaketa). Bateratze metatua egin dezakezu guztizko exekutibo batekin, edo leiho-agregazioa, denbora-leiho zehatz bateko erregistroak kontuan hartzen dituena.

5.5 zerrendako kodeak saio-leihoetako transakzio kopurua zenbatzen du. Irudian. 5.13 ekintza hauek urratsez urrats aztertzen dira.

windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) deituz, saio-leiho bat sortzen dugu 20 segundoko inaktibitate tartearekin eta 15 minutuko iraunkortasun tartearekin. 20 segundoko inaktibo-tarte batek esan nahi du aplikazioak uneko saioa amaitu edo hasi eta 20 segundoko epean sartzen den edozein sarrera sartuko duela uneko saioan (aktiboan).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Ondoren, saio-leihoan zein agregazio-eragiketa egin behar den zehazten dugu; kasu honetan, zenbatu. Sarrerako sarrera bat jarduerarik gabeko leihotik kanpo geratzen bada (data/orduaren zigiluaren bi aldeetan), aplikazioak saio berri bat sortzen du. Atxikitze-tarteak saio bat denbora jakin batean mantentzea esan nahi du eta saioaren jarduerarik gabeko alditik haratago luzatzen diren baina oraindik erantsi daitezkeen berantiar datuak onartzen ditu. Gainera, bateratzearen ondoriozko saio berriaren hasiera eta amaiera data/orduaren zigilu goiztiarrari eta azkenari dagokio.

Ikus ditzagun zenbaketa metodoko sarrera batzuk saioak nola funtzionatzen duten ikusteko (5.1 taula).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Erregistroak iristen direnean, lehendik dauden saioak bilatzen ditugu gako berarekin, uneko data/ordu zigilua baino amaiera-ordu txikiagoa - jarduerarik gabeko tartea eta hasiera-ordua uneko data/ordu zigilua + jarduerarik gabeko tartea baino handiagoa. Hori kontuan hartuta, taulako lau sarrera. 5.1 saio bakarrean batzen dira honela.

1. Erregistroa 1 iristen da lehenik, beraz hasiera-ordua amaiera-orduaren berdina da eta 00:00:00 da.

2. Jarraian, 2. sarrera iristen da, eta 23:59:55 baino lehen amaitzen diren eta 00:00:35 beranduago hasten diren saioak bilatzen ditugu. 1. erregistroa aurkitzen dugu eta 1. eta 2. saioak konbinatzen ditugu. 1. saioaren hasiera-ordua (lehenago) eta 2. saioaren amaiera-ordua (geroago) hartzen ditugu, gure saio berria 00:00:00etan hasi eta 00:00etan amai dadin: 15:XNUMX.

3. 3. erregistroa iristen da, 00:00:30 eta 00:01:10 arteko saioak bilatzen ditugu eta ez dugu aurkitzen. Gehitu bigarren saio bat 123-345-654,FFBE gakorako, 00:00:50ean hasi eta amaitu.

4. 4. erregistroa iristen da eta 23:59:45 eta 00:00:25 arteko saioak bilatzen ari gara. Oraingo honetan 1. eta 2. saioak aurkitzen dira.Hiru saioak bakarrean konbinatuta daude, hasierako ordua 00:00:00 eta amaierako ordua 00:00:15.

Atal honetan deskribatzen denaren arabera, ñabardura garrantzitsu hauek gogoratzea komeni da:

  • saioak ez dira tamaina finkoko leihoak. Saio baten iraupena epe jakin bateko jarduerak zehazten du;
  • Datuen data/orduaren zigiluak zehazten du gertaera lehendik dagoen saio batean edo inaktibo batean dagoen.

Jarraian, hurrengo leiho motari buruz eztabaidatuko dugu: leiho "errotagarriak".

"Tumbling" leihoak

Tumbling leihoek denbora-tarte jakin batean gertatzen diren gertaerak jasotzen dituzte. Imajinatu enpresa jakin baten akzio-transakzio guztiak 20 segundoro harrapatu behar dituzula, denbora tarte horretan gertaera guztiak bilduko dituzula. 20 segundoko tartearen amaieran, leihoa irauli eta 20 segundoko behaketa tarte berri batera mugitzen da. 5.14 irudiak egoera hau erakusten du.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Ikus dezakezunez, azken 20 segundoetan jasotako gertaera guztiak leihoan sartzen dira. Denbora-tarte hori amaitzean, leiho berri bat sortzen da.

5.6 zerrendan 20 segundoz behin akzio-transakzioak harrapatzeko leiho biribilen erabilera erakusten duen kodea erakusten du (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-n aurkitzen da).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
TimeWindows.of metodo-deiaren aldaketa txiki honekin, leiho birringarri bat erabil dezakezu. Adibide honek ez dio until() metodoari deitzen, beraz, 24 orduko atxikipen-tarte lehenetsia erabiliko da.

Azkenik, leihoetako azken aukeretara pasatzeko garaia da: leihoak "saltoka".

Leiho irristagarriak ("jauzika")

Leiho irristagarriak/jausteko leihoak erortzen diren leihoen antzekoak dira, baina alde txiki batekin. Leiho irristagarriak ez dira denbora-tartearen amaierara arte itxaron leiho berri bat sortu aurretik azken gertaerak prozesatzeko. Kalkulu berriak hasten dituzte leihoaren iraupena baino itxarote-tarte baten ondoren.

Leihoak erortzearen eta saltoaren arteko desberdintasunak ilustratzeko, itzul gaitezen burtsaren transakzioak zenbatzearen adibidera. Gure helburua transakzio kopurua zenbatzea da oraindik, baina ez dugu denbora guztia itxaron nahi kontagailua eguneratu aurretik. Horren ordez, kontagailua tarte laburragoetan eguneratuko dugu. Adibidez, transakzio kopurua 20 segundoro zenbatuko dugu oraindik, baina kontagailua 5 segundoro eguneratuko dugu, irudian ikusten den bezala. 5.15. Kasu honetan, hiru emaitza-leiho aurkitzen ditugu datu gainjarriekin.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
5.7 zerrendak leiho irristagarriak definitzeko kodea erakusten du (src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-n aurkitzen da).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Leiho irauli bat saltoka leiho bihur daiteke advanceBy() metodoari dei bat gehituz. Erakutsitako adibidean, gordetzeko tartea 15 minutukoa da.

Atal honetan ikusi duzu nola mugatu agregazio-emaitzak denbora-leihoetara. Bereziki, atal honetako hiru gauza hauek gogoratzea nahi dut:

  • saio-leihoen tamaina ez da denbora-tartearen arabera mugatzen, erabiltzailearen jardueraren arabera baizik;
  • "Turbling" leihoek denbora-tarte jakin bateko gertaeren ikuspegi orokorra eskaintzen dute;
  • Leiho jauzien iraupena finkoa da, baina maiz eguneratzen dira eta baliteke leiho guztietan gainjarritako sarrerak edukitzea.

Jarraian, KTable bat berriro KStream bihurtzeko konexio bat egiteko ikasiko dugu.

5.3.3. KStream eta KTable objektuak konektatzea

4. kapituluan, bi KStream objektu konektatzeari buruz eztabaidatu dugu. Orain KTable eta KStream nola konektatzen ikasi behar dugu. Baliteke honako arrazoi sinple honengatik beharrezkoa izatea. KStream erregistro-korronte bat da, eta KTable erregistro-eguneratze-korronte bat da, baina batzuetan baliteke erregistro-korronteari testuinguru gehigarria gehitu nahi izatea KTable-ren eguneraketak erabiliz.

Har ditzagun burtsa-transakzio-kopuruari buruzko datuak eta konbina ditzagun sektore garrantzitsuetarako burtsako albisteekin. Hona hemen hori lortzeko egin behar duzuna dagoeneko duzun kodea kontuan hartuta.

  1. Bihurtu KTable objektu bat akzio-transakzio-kopuruari buruzko datuak KStream batean, eta ondoren, akzio-ikur horri dagokion industria-sektorea adierazten duen gakoa ordezkatuz.
  2. Sortu KTable objektu bat, burtsako albisteekin gai bateko datuak irakurtzen dituena. KTable berri hau industria-sektoreen arabera sailkatuko da.
  3. Konektatu albisteen eguneraketak industria-sektorearen arabera burtsako transakzio kopuruari buruzko informazioarekin.

Orain ikus dezagun nola gauzatu ekintza plan hau.

Bihurtu KTable KStream-era

KTable KStream bihurtzeko honako hau egin behar duzu.

  1. Deitu KTable.toStream() metodoari.
  2. KStream.map metodoari deituz, ordezkatu gakoa industria-izenarekin eta, ondoren, berreskuratu TransactionSummary objektua Windowed instantziatik.

Eragiketa hauek honela kateatu egingo ditugu (kodea src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fitxategian aurki daiteke) (5.8 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
KStream.map eragiketa bat egiten ari garenez, itzulitako KStream instantzia automatikoki birpartizionatzen da konexio batean erabiltzen denean.

Bihurtze-prozesua amaitu dugu, jarraian KTable objektu bat sortu behar dugu stock albisteak irakurtzeko.

Stock berrietarako KTable sortzea

Zorionez, KTable objektu bat sortzeak kode lerro bat besterik ez du behar (kodea src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java-n aurki daiteke) (5.9 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Aipatzekoa da Serde objekturik ez dela zehaztu behar, ezarpenetan Serde kateak erabiltzen baitira. Era berean, EARLIEST zenbaketa erabiliz, taula erregistroz betetzen da hasieran.

Orain azken urratsera pasa gaitezke: konexioa.

Albisteen eguneraketak transakzio-zenbaketa datuekin konektatzea

Konexio bat sortzea ez da zaila. Ezkerreko elkarketa bat erabiliko dugu dagokion industriarako stock-albisterik ez dagoenean (beharrezko kodea src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java fitxategian aurki daiteke) (5.10 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
leftJoin operadore hau nahiko erraza da. 4. kapituluko batzen ez bezala, JoinWindow metodoa ez da erabiltzen KStream-KTable bat egitean KTable-n sarrera bakarra baitago gako bakoitzeko. Lotura hori ez dago denboran mugatua: erregistroa KTaulan edo ez dago. Ondorio nagusia: KTable objektuak erabiliz KStream aberastu dezakezu maiz gutxiago eguneratzen diren erreferentzia datuekin.

Orain KStream-eko gertaerak aberasteko modu eraginkorragoa aztertuko dugu.

5.3.4. GlobalKTable objektuak

Ikus dezakezunez, gertaeren korronteak aberastu edo horiei testuingurua gehitu beharra dago. 4. kapituluan bi KStream objekturen arteko konexioak ikusi dituzu, eta aurreko atalean, KStream baten eta KTable baten arteko konexioa. Kasu hauetan guztietan, beharrezkoa da datu-jarioa berriro partitzea gakoak mota edo balio berri batekin mapatzean. Batzuetan birpartizioa esplizituki egiten da, eta batzuetan Kafka Streamsek automatikoki egiten du. Berriro partizionatzea beharrezkoa da, gakoak aldatu direlako eta erregistroak atal berrietan amaitu behar direlako, bestela ezinezkoa izango da konexioa (4 azpiataleko “Datuak birpartizionatzea” atalean aztertu zen 4.2.4. kapituluan).

Berriro zatitzeak kostua du

Berriro zatitzeak kostuak eskatzen ditu - bitarteko gaiak sortzeko baliabide-kostu gehigarriak, bikoiztutako datuak beste gai batean gordetzeko; gai honetatik idaztearen eta irakurtzearen ondoriozko latentzia areagotzea ere esan nahi du. Gainera, alderdi edo dimentsio batean baino gehiagotan elkartu behar baduzu, elkarketak kateatu behar dituzu, erregistroak gako berriekin mapatu eta berriro partizionatzeko prozesua exekutatu.

Datu multzo txikiagoetara konektatzea

Zenbait kasutan, konektatu beharreko erreferentzia-datuen bolumena nahiko txikia da, beraz, kopia osoak erraz egokitu daitezke nodo bakoitzean lokalean. Horrelako egoeretarako, Kafka Streamsek GlobalKTable klasea eskaintzen du.

GlobalKTable instantzia bakarrak dira aplikazioak datu guztiak nodo bakoitzean errepikatzen dituelako. Eta datu guztiak nodo bakoitzean daudenez, ez dago gertaeren korrontea erreferentziazko datu-gakoaren bidez partizionatu beharrik, partizio guztietan eskuragarri egon dadin. Giltzarik gabeko elkarketak ere egin ditzakezu GlobalKTable objektuak erabiliz. Itzul gaitezen aurreko adibideetako batera ezaugarri hau erakusteko.

KStream objektuak GlobalKTable objektuekin konektatzea

5.3.2 azpiatalean, erosleen truke-transakzioen leiho-agregazioa egin dugu. Agregazio honen emaitzak honelakoak izan ziren:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Emaitza hauek helburua bete zuten arren, erabilgarriagoa izango zen bezeroaren izena eta enpresaren izen osoa ere bistaratu izan balira. Bezeroaren izena eta enpresaren izena gehitzeko, elkarketa arruntak egin ditzakezu, baina bi gako-mapping eta birpartizioa egin beharko dituzu. GlobalKTable-rekin eragiketa horien kostua saihestu dezakezu.

Horretarako, 5.11 zerrendako countStream objektua erabiliko dugu (dagokion kodea src/main/java/bbejeck/chapter_5/GlobalKTableExample.java-n aurki daiteke) eta GlobalKTable bi objektutara konektatuko dugu.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Lehenago ere eztabaidatu dugu honetaz, ez dut errepikatuko. Baina ohartzen naiz toStream().map funtzioko kodea funtzio-objektu batean abstraitzen dela lerroko lambda adierazpen baten ordez, irakurgarritasunaren mesedetan.

Hurrengo urratsa GlobalKTable-ren bi instantzia deklaratzea da (erakusten den kodea src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fitxategian aurki daiteke) (5.12 zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"

Kontuan izan gaiaren izenak zerrendatutako motak erabiliz deskribatzen direla.

Osagai guztiak prest dauzkagunean, konexiorako kodea idaztea baino ez da geratzen (src/main/java/bbejeck/chapter_5/GlobalKTableExample.java fitxategian aurki daiteke) (5.13. zerrenda).

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Kode honetan bi juntaketa badaude ere, kateatuta daude, ez baita haien emaitzak bereizita erabiltzen. Emaitzak eragiketa osoaren amaieran bistaratzen dira.

Goiko batzeko eragiketa exekutatzen duzunean, honelako emaitzak lortuko dituzu:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Funtsa ez da aldatu, baina emaitza hauek argiagoak dirudite.

4. kapituluraino atzerako zenbatzen baduzu, dagoeneko hainbat konexio mota ikusi dituzu martxan. Taulan ageri dira. 5.2. Taula honek Kafka Streams-en 1.0.0 bertsioaren konektagarritasun-gaitasunak islatzen ditu; Etorkizuneko bertsioetan zerbait alda daiteke.

“Kafka Streams in Action” liburua. Aplikazioak eta mikrozerbitzuak denbora errealean lan egiteko"
Gauzak amaitzeko, labur ditzagun oinarriak: gertaeren korronteak (KStream) eta korronteak eguneratu (KTable) konekta ditzakezu tokiko egoera erabiliz. Bestela, erreferentzia datuen tamaina handiegia ez bada, GlobalKTable objektua erabil dezakezu. GlobalKTables-ek partizio guztiak errepikatzen ditu Kafka Streams aplikazio-nodo bakoitzean, datu guztiak eskuragarri daudela ziurtatuz gakoa zein partiziori dagokion edozein dela ere.

Jarraian, Kafka Streams funtzioa ikusiko dugu, eta horri esker egoera aldaketak ikusi ahal izango ditugu Kafka gai bateko datuak kontsumitu gabe.

5.3.5. Kontsulta daitekeen egoera

Dagoeneko hainbat eragiketa egin ditugu egoerarekin eta beti emaitzak kontsolara atera (garapenerako) edo gai batean idatzi (produkziorako). Gai bati emaitzak idaztean, Kafka kontsumitzaile bat erabili behar duzu haiek ikusteko.

Gai hauetako datuak irakurtzea materializatutako ikuspegi motatzat har daiteke. Gure helburuetarako, Wikipediako ikuspegi materializatu baten definizioa erabil dezakegu: “...a physical database object containing the results a query. Esaterako, urruneko datuen kopia lokal bat izan daiteke, edo taula baten edo batzeko emaitzen errenkada eta/edo zutabeen azpimultzo bat, edo agregazio bidez lortutako laburpen-taula bat” (https://en.wikipedia.org/wiki). /Bista_materializatua).

Kafka Streams-ek estatuko dendetan kontsulta interaktiboak egiteko aukera ere ematen dizu, materializatutako ikuspegi hauek zuzenean irakurtzeko aukera emanez. Garrantzitsua da kontuan izan egoera biltegira egindako kontsulta irakurtzeko soilik den eragiketa dela. Horrek bermatzen du ez duzula kezkatu behar egoerak ustekabean koherenteak ez daitezen zure aplikazioak datuak prozesatzen dituen bitartean.

Garrantzitsua da egoera-dendak zuzenean kontsultatzeko gaitasuna. Horrek esan nahi du aginte-aplikazioak sor ditzakezula Kafka kontsumitzailearen datuak lehenbailehen eskuratu beharrik gabe. Aplikazioaren eraginkortasuna ere areagotzen du, datuak berriro idatzi beharrik ez dagoelako:

  • datuen tokiari esker, azkar eskura daitezke;
  • datuen bikoizketa ezabatzen da, ez baita kanpoko biltegiratze batean idazten.

Gogoan izan nahi dudan gauza nagusia da zure aplikaziotik zuzenean kontsulta dezakezula egoera. Horrek ematen dizkizuten aukerak ezin dira gehiegi nabarmendu. Kafka-ko datuak kontsumitu eta erregistroak aplikaziorako datu-base batean gorde beharrean, egoera-biltegiak kontsulta ditzakezu emaitza berarekin. Estatuko dendetara zuzeneko kontsultak kode gutxiago (kontsumitzailerik ez) eta software gutxiago (ez da datu-baseko taularen beharrik emaitzak gordetzeko).

Kapitulu honetan zeresan handia eman dugu, beraz, estatuko denden aurkako kontsulta interaktiboei buruzko eztabaida utziko dugu oraingoz. Baina ez kezkatu: 9. kapituluan, aginte-aplikazio sinple bat sortuko dugu kontsulta interaktiboekin. Kapitulu honetako eta aurrekoetako adibide batzuk erabiliko ditu kontsulta interaktiboak erakusteko eta Kafka Streams aplikazioetan nola gehi ditzakezun erakusteko.

Laburpena

  • KStream objektuek gertaeren korronteak adierazten dituzte, datu-base batean txertatzeen parekoak. KTable objektuek eguneratze-korronteak adierazten dituzte, datu-base baten eguneraketak bezalakoak. KTable objektuaren tamaina ez da hazten, erregistro zaharrak berriekin ordezkatzen dira.
  • KTable objektuak beharrezkoak dira agregazio eragiketetarako.
  • Leiho-eragiketak erabiliz, datu agregatuak denbora-ontzietan zati ditzakezu.
  • GlobalKTable objektuei esker, erreferentzia-datuak aplikazioko edozein tokitan atzi ditzakezu, partizioa edozein dela ere.
  • KStream, KTable eta GlobalKTable objektuen arteko konexioak posible dira.

Orain arte, maila altuko KStream DSL erabiliz Kafka Streams aplikazioak eraikitzera bideratu gara. Maila handiko ikuspegiak programa txukun eta zehatzak sortzeko aukera ematen badu ere, erabiltzeak truke-off bat suposatzen du. DSL KStream-ekin lan egiteak zure kodearen zehaztasuna areagotzea esan nahi du, kontrol-maila murriztuz. Hurrengo kapituluan, behe-mailako kudeatzaile-nodoaren APIa aztertuko dugu eta beste konpromezu batzuk probatuko ditugu. Programak lehen zirenak baino luzeagoak izango dira, baina behar ditugun ia edozein kudeatzaile-nodo sortzeko gai izango gara.

→ Liburuari buruzko xehetasun gehiago hemen aurki daitezke argitaletxearen webgunea

→ Habrozhitelirentzat % 25eko deskontua kupoia erabiliz - Kafka errekak

→ Liburuaren paperezko bertsioa ordaintzean, liburu elektroniko bat bidaliko da posta elektronikoz.

Iturria: www.habr.com

Gehitu iruzkin berria