Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu" Halo warga Khabro! Buku iki cocok kanggo pangembang sing pengin ngerti proses thread. Ngerteni pemrograman sing disebarake bakal mbantu sampeyan luwih ngerti Aliran Kafka lan Kafka. Luwih becik ngerti kerangka Kafka dhewe, nanging iki ora perlu: ​​Aku bakal ngandhani kabeh sing dibutuhake. Pangembang lan wong anyar Kafka sing duwe pengalaman bakal sinau carane nggawe aplikasi pangolahan stream sing menarik nggunakake perpustakaan Kafka Streams ing buku iki. Pangembang Jawa menengah lan maju sing wis ngerti konsep kaya serialisasi bakal sinau ngetrapake katrampilan kanggo nggawe aplikasi Kafka Streams. Kode sumber buku kasebut ditulis ing Jawa 8 lan nggunakake sintaksis ekspresi lambda Java 8, supaya ngerti cara nggarap fungsi lambda (sanajan ing basa pamrograman liyane) bakal migunani.

Kutipan. 5.3. Agregasi lan operasi windowing

Ing bagean iki, kita bakal nerusake kanggo njelajah bagean paling njanjeni saka Kafka Streams. Nganti saiki, kita wis nutupi aspek Kafka Streams ing ngisor iki:

  • nggawe topologi pangolahan;
  • nggunakake negara ing aplikasi streaming;
  • nindakake sambungan stream data;
  • beda antarane stream acara (KStream) lan nganyari stream (KTable).

Ing conto ing ngisor iki, kita bakal nggabungake kabeh unsur kasebut. Sampeyan uga bakal sinau babagan windowing, fitur gedhe liyane saka aplikasi streaming. Conto pisanan kita bakal dadi agregasi sing prasaja.

5.3.1. Agregasi penjualan saham miturut sektor industri

Agregasi lan pengelompokan minangka alat penting nalika nggarap data streaming. Pemeriksaan cathetan individu nalika ditampa asring ora cukup. Kanggo ngekstrak informasi tambahan saka data, perlu kanggo klompok lan gabungke.

Ing conto iki, sampeyan bakal nganggo kostum pedagang dina sing kudu nglacak volume dodolan saham perusahaan ing sawetara industri. Khusus, sampeyan kasengsem ing limang perusahaan kanthi dodolan saham paling gedhe ing saben industri.

Panggabungan kasebut mbutuhake sawetara langkah ing ngisor iki kanggo nerjemahake data menyang wangun sing dikarepake (nganggo istilah umum).

  1. Nggawe sumber adhedhasar topik sing nerbitake informasi dagang saham mentah. Kita kudu peta obyek saka jinis StockTransaction kanggo obyek saka jinis ShareVolume. Intine yaiku obyek StockTransaction ngemot metadata dodolan, nanging kita mung butuh data babagan jumlah saham sing didol.
  2. Kelompok data ShareVolume kanthi simbol saham. Sawise diklompokaké miturut simbol, sampeyan bisa ambruk data iki menyang subtotal volume dodolan saham. Wigati dicathet yen metode KStream.groupBy ngasilake conto jinis KGroupedStream. Lan sampeyan bisa njaluk conto KTable kanthi luwih nelpon metode KGroupedStream.reduce.

Apa antarmuka KGroupedStream

Cara KStream.groupBy lan KStream.groupByKey ngasilake conto KGroupedStream. KGroupedStream minangka perwakilan penengah saka aliran acara sawise dikelompokake miturut tombol. Iku ora kabeh dimaksudaké kanggo karya langsung karo. Nanging, KGroupedStream digunakake kanggo operasi agregasi, sing tansah ngasilake KTable. Lan wiwit asil operasi agregasi punika KTable lan padha nggunakake nyimpen negara, bisa ora kabeh nganyari minangka asil dikirim luwih mudhun pipo.

Cara KTable.groupBy ngasilake KGroupedTable sing padha - perwakilan penengah saka stream nganyari, dikumpulake maneh kanthi kunci.

Ayo ngaso sedhela lan deleng Fig. 5.9, sing nuduhake apa sing wis digayuh. Topologi iki mesthine wis akrab karo sampeyan.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Ayo saiki ndeleng kode kanggo topologi iki (bisa ditemokake ing file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Kode sing diwenehake dibedakake kanthi ringkes lan volume gedhe saka tumindak sing ditindakake ing sawetara baris. Sampeyan bisa uga sok dong mirsani soko anyar ing parameter pisanan cara builder.stream: Nilai saka jinis enum AutoOffsetReset.EARLIEST (ana uga LATEST), nyetel nggunakake cara Consumed.withOffsetResetPolicy. Jinis enumerasi iki bisa digunakake kanggo nemtokake strategi reset offset kanggo saben KStream utawa KTable lan luwih dhisik tinimbang opsi reset offset saka konfigurasi.

GroupByKey lan GroupBy

Antarmuka KStream duwe rong cara kanggo nglumpukake rekaman: GroupByKey lan GroupBy. Loro-lorone ngasilake KGroupedTable, supaya sampeyan bisa uga mikir apa bedane ing antarane lan nalika nggunakake sing endi?

Cara GroupByKey digunakake nalika tombol ing KStream wis ora kosong. Lan sing paling penting, gendera "mbutuhake partisi maneh" ora nate disetel.

Cara GroupBy nganggep yen sampeyan wis ngganti tombol klompok, mula gendéra partisi ulang disetel dadi bener. Nindakake gabungan, agregasi, lan liya-liyane sawise metode GroupBy bakal ngasilake partisi maneh kanthi otomatis.
Ringkesan: Yen bisa, sampeyan kudu nggunakake GroupByKey tinimbang GroupBy.

Cetha apa sing ditindakake dening metode mapValues ​​lan groupBy, mula ayo goleki metode sum () (ditemokake ing src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Cara ShareVolume.sum ngasilake total volume dodolan saham, lan asil saka kabeh chain petungan minangka obyek KTable . Saiki sampeyan ngerti peran KTable. Nalika obyek ShareVolume teka, obyek KTable sing cocog nyimpen nganyari saiki paling anyar. Iku penting kanggo elinga yen kabeh nganyari dibayangke ing shareVolumeKTable sadurungé, nanging ora kabeh dikirim luwih.

Kita banjur nggunakake KTable iki kanggo aggregate (miturut jumlah Enggo bareng perdagangan) kanggo teka ing limang perusahaan karo volume paling saka Enggo bareng perdagangan ing saben industri. Tumindak kita ing kasus iki bakal padha karo sing kanggo agregasi pisanan.

  1. Nindakake operasi groupBy liyane kanggo klompok obyek ShareVolume individu miturut industri.
  2. Mulai ngringkes obyek ShareVolume. Wektu iki obyek agregasi minangka antrian prioritas ukuran tetep. Ing antrian ukuran tetep iki, mung limang perusahaan kanthi jumlah saham paling gedhe sing didol sing ditahan.
  3. Peta antrian saka paragraf sadurunge menyang nilai senar lan bali limang saham paling dagang paling dhuwur miturut nomer industri.
  4. Tulis asil ing wangun string menyang topik.

Ing Fig. Gambar 5.10 nuduhake grafik topologi aliran data. Nalika sampeyan bisa ndeleng, babak kapindho Processing cukup prasaja.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Saiki kita duwe pangerten sing jelas babagan struktur pangolahan babak kapindho iki, kita bisa nguripake kode sumber (sampeyan bakal nemokake ing file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Initializer iki ngemot variabel fixedQueue. Iki obyek adat sing adaptor kanggo java.util.TreeSet sing digunakake kanggo trek ndhuwur N asil ing urutan mudhun saka Enggo bareng perdagangan.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Sampeyan wis ndeleng grupBy lan mapValues ​​​​telpon, mula kita ora bakal mlebu (kita nelpon metode KTable.toStream amarga metode KTable.print ora digunakake). Nanging sampeyan durung weruh versi KTable saka agregat (), supaya kita bakal nglampahi sethitik wektu ngrembug babagan.

Nalika sampeyan ngelingi, sing ndadekake KTable beda yaiku rekaman kanthi tombol sing padha dianggep nganyari. KTable ngganti entri lawas karo anyar. Agregasi dumadi kanthi cara sing padha: cathetan paling anyar kanthi kunci sing padha dikumpulake. Nalika rekaman teka, ditambahake menyang conto kelas FixedSizePriorityQueue nggunakake adder (parameter kapindho ing panggilan metode agregat), nanging yen rekaman liyane wis ana kanthi tombol sing padha, banjur rekaman lawas dibusak nggunakake subtractor (parameter katelu ing panggilan metode agregat).

Iki kabeh tegese agregator kita, FixedSizePriorityQueue, ora nglumpukake kabeh nilai kanthi siji kunci, nanging nyimpen jumlah obah saka jumlah saham N sing paling akeh didagangake. Saben entri mlebu ngemot jumlah total saham sing didol nganti saiki. KTable bakal menehi informasi babagan saham perusahaan sing saiki paling akeh didol, tanpa mbutuhake panggabungan gulung saben nganyari.

Kita sinau kanggo nindakake rong perkara penting:

  • nilai klompok ing KTtable kanthi kunci umum;
  • nindakake operasi migunani kayata rollup lan aggregation ing nilai diklompokaké iki.

Ngerti carane nindakake operasi kasebut penting kanggo mangerteni makna data sing obah liwat aplikasi Kafka Streams lan mangerteni informasi apa sing digawa.

Kita uga wis nglumpukake sawetara konsep utama sing dibahas sadurunge ing buku iki. Ing Bab 4, kita ngomong babagan pentinge fault-tolerant, negara lokal kanggo aplikasi streaming. Conto pisanan ing bab iki nduduhake sebabe negara lokal penting banget - menehi sampeyan kemampuan kanggo nglacak informasi apa wae sing wis sampeyan deleng. Akses lokal ngindhari wektu tundha jaringan, nggawe aplikasi luwih performa lan tahan kesalahan.

Nalika nindakake operasi rollup utawa agregasi, sampeyan kudu nemtokake jeneng toko negara. Operasi rollup lan agregasi ngasilake conto KTable, lan KTable nggunakake panyimpenan negara kanggo ngganti asil lawas karo sing anyar. Kaya sing sampeyan ngerteni, ora kabeh nganyari dikirim menyang saluran pipa, lan iki penting amarga operasi agregasi dirancang kanggo ngasilake informasi ringkesan. Yen sampeyan ora ngetrapake negara lokal, KTable bakal nerusake kabeh asil agregasi lan rollup.

Sabanjure, kita bakal nliti nindakake operasi kayata agregasi sajrone wektu tartamtu - sing disebut operasi windowing.

5.3.2. Operasi jendhela

Ing bagean sadurunge, kita ngenalake konvolusi lan agregasi geser. Aplikasi kasebut nindakake volume dodolan saham sing terus-terusan, banjur dikumpulake saka limang saham sing paling didagang ing bursa.

Kadhangkala agregasi terus-terusan lan gulung asil kasebut perlu. Lan kadhangkala sampeyan kudu nindakake operasi mung sajrone wektu tartamtu. Contone, ngitung jumlah transaksi ijol-ijolan sing digawe karo saham perusahaan tartamtu sajrone 10 menit pungkasan. Utawa pirang-pirang pangguna ngeklik spanduk pariwara anyar sajrone 15 menit pungkasan. Aplikasi bisa nindakake operasi kasebut kaping pirang-pirang, nanging kanthi asil mung ditrapake kanggo wektu tartamtu (windows wektu).

Ngitung transaksi ijol-ijolan dening panuku

Ing conto sabanjure, kita bakal nglacak transaksi saham ing pirang-pirang pedagang - organisasi gedhe utawa pemodal individu sing cerdas.

Ana rong alasan kanggo nelusuri iki. Salah sijine yaiku kudu ngerti apa sing dituku / didol para pemimpin pasar. Yen para pemain gedhe lan investor sing canggih iki ndeleng kesempatan, mula kudu ngetutake strategi kasebut. Alasan liya yaiku kepinginan kanggo nemokake pratandha saka perdagangan njero ilegal. Kanggo nindakake iki, sampeyan kudu nganalisa korélasi lonjakan penjualan gedhe karo siaran pers sing penting.

Pelacakan kasebut kalebu langkah-langkah ing ngisor iki:

  • nggawe stream kanggo maca saka topik saham-transaksi;
  • ngelompokake cathetan mlebu dening ID panuku lan simbol Simpenan. Nelpon cara groupBy ngasilake conto kelas KGroupedStream;
  • Cara KGroupedStream.windowedBy ngasilake aliran data sing diwatesi ing jendhela wektu, sing ngidini agregasi windowed. Gumantung ing jinis jendhela, salah siji TimeWindowedKStream utawa SessionWindowedKStream bali;
  • jumlah transaksi kanggo operasi agregasi. Aliran data windowed nemtokake manawa rekaman tartamtu dijupuk menyang akun ing count iki;
  • nulis asil kanggo topik utawa outputting menyang console sak pembangunan.

Topologi aplikasi iki prasaja, nanging gambar sing jelas bakal mbiyantu. Ayo dideleng ing Fig. 5.11.

Sabanjure, kita bakal ndeleng fungsi operasi jendhela lan kode sing cocog.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"

Jinis jendhela

Ana telung jinis jendhela ing Kafka Streams:

  • sesional;
  • "tumbling";
  • ngusapake / mlumpat.

Kang siji kanggo milih gumantung ing syarat bisnis. Jendhela tumbling lan mlumpat diwatesi wektu, dene jendhela sesi diwatesi dening aktivitas pangguna-durasi sesi ditemtokake mung dening aktifitas pangguna. Wangsulan: Bab ingkang utama kanggo elinga iku kabeh jinis jendhela adhedhasar tanggal / wektu prangko entri, ora wektu sistem.

Sabanjure, kita ngleksanakake topologi kita karo saben jinis jendhela. Kode lengkap bakal diwenehi mung ing conto pisanan; kanggo jinis windows liyane ora bakal ngganti kajaba jinis operasi jendhela.

Jendhela sesi

Jendhela sesi beda banget karo kabeh jinis jendhela liyane. Padha diwatesi ora dadi luwih dening wektu minangka dening kegiatan saka pangguna (utawa aktivitas saka entitas sing pengin dilacak). Jendhela sesi diwatesi kanthi wektu ora aktif.

Gambar 5.12 nggambarake konsep jendhela sesi. Sesi sing luwih cilik bakal digabung karo sesi ing sisih kiwa. Lan sesi ing sisih tengen bakal kapisah amarga ngetutake wektu ora aktif. Jendhela sesi adhedhasar aktivitas pangguna, nanging gunakake prangko tanggal / wektu saka entri kanggo nemtokake sesi entri kasebut.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"

Nggunakake jendhela sesi kanggo nglacak transaksi saham

Ayo nggunakake jendhela sesi kanggo njupuk informasi babagan transaksi ijol-ijolan. Implementasine windows sesi ditampilake ing Listing 5.5 (sing bisa ditemokake ing src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Sampeyan wis ndeleng umume operasi ing topologi iki, mula sampeyan ora perlu ndeleng maneh ing kene. Nanging ana uga sawetara unsur anyar ing kene, sing saiki bakal dibahas.

Operasi groupBy biasane nindakake sawetara jinis operasi agregasi (agregasi, rollup, utawa counting). Sampeyan bisa nindakake panggabungan kumulatif kanthi total mlaku, utawa panggabungan jendhela, sing nyathet cathetan ing wektu sing ditemtokake.

Kode ing Listing 5.5 ngitung jumlah transaksi ing jendhela sesi. Ing Fig. 5.13 tumindak kasebut dianalisis langkah demi langkah.

Kanthi nelpon windowedBy(SessionWindows.with(twentySeconds).nganti(fifteenMinutes)) kita nggawe jendhela sesi kanthi interval ora aktif 20 detik lan interval ketekunan 15 menit. Interval nganggur 20 detik tegese aplikasi bakal kalebu entri apa wae sing teka sajrone 20 detik sawise pungkasan utawa wiwitan sesi saiki menyang sesi saiki (aktif).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Sabanjure, kita nemtokake operasi agregasi sing kudu ditindakake ing jendela sesi - ing kasus iki, count. Yen entri mlebu ana ing njaba jendela ora aktif (salah siji sisih prangko tanggal/wektu), aplikasi kasebut nggawe sesi anyar. Interval retensi tegese njaga sesi sajrone wektu tartamtu lan ngidini data telat sing ngluwihi periode ora aktif sesi nanging isih bisa dilampirake. Kajaba iku, wiwitan lan pungkasan sesi anyar asil saka gabungan cocog karo prangko tanggal/wektu paling wiwitan lan paling anyar.

Ayo katon ing sawetara entri saka cara count kanggo ndeleng carane sesi bisa (Tabel 5.1).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Nalika cathetan teka, kita nggolèki sesi sing ana kanthi tombol sing padha, wektu pungkasan kurang saka tanggal / wektu saiki - interval ora aktif, lan wektu wiwitan luwih gedhe tinimbang tanggal saiki / stempel wektu + interval ora aktif. Njupuk iki menyang akun, papat entri saka Tabel. 5.1 digabung dadi siji sesi kaya ing ngisor iki.

1. Rekam 1 teka dhisik, mula wektu wiwitan padha karo wektu pungkasan lan 00:00:00.

2. Sabanjure, entri 2 teka, lan kita nggoleki sesi sing pungkasan ora luwih awal saka 23:59:55 lan diwiwiti ora luwih saka 00:00:35. Kita nemokake rekaman 1 lan gabungke sesi 1 lan 2. Kita njupuk wektu wiwitan sesi 1 (sadurunge) lan wektu pungkasan sesi 2 (mengko), supaya sesi anyar kita diwiwiti ing 00:00:00 lan rampung ing 00: 00:15.

3. Rekam 3 teka, kita goleki sesi antarane 00:00:30 lan 00:01:10 lan ora nemokake. Tambah sesi kapindho kanggo tombol 123-345-654, FFBE, miwiti lan pungkasan ing 00:00:50.

4. Rekam 4 teka lan kita nggoleki sesi antarane 23:59:45 lan 00:00:25. Wektu iki loro sesi 1 lan 2 ditemokake. Kabeh telung sesi digabungake dadi siji, kanthi wektu wiwitan 00:00:00 lan wektu pungkasan 00:00:15.

Saka apa sing diterangake ing bagean iki, sampeyan kudu ngelingi nuansa penting ing ngisor iki:

  • sesi ora windows ukuran tetep. Suwene sesi ditemtokake dening kegiatan ing wektu tartamtu;
  • Prangko tanggal/wektu ing data nemtokake manawa acara kasebut ana ing sesi sing wis ana utawa sajrone wektu nganggur.

Sabanjure kita bakal ngrembug jinis jendhela sabanjure - "tumbling" windows.

Jendela "tumbling".

Tumbling windows njupuk acara sing tiba ing wektu tartamtu. Mbayangno yen sampeyan kudu nyekel kabeh transaksi saham perusahaan tartamtu saben 20 detik, supaya sampeyan ngumpulake kabeh acara ing wektu kasebut. Ing pungkasan interval 20 detik, jendhela muter lan pindhah menyang interval observasi 20 detik sing anyar. Gambar 5.14 nggambarake kahanan iki.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Kaya sing sampeyan ngerteni, kabeh acara sing ditampa sajrone 20 detik pungkasan kalebu ing jendela. Ing pungkasan wektu iki, jendhela anyar digawe.

Listing 5.6 nuduhake kode sing nduduhake panggunaan tumbling windows kanggo njupuk transaksi saham saben 20 detik (ditemokake ing src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Kanthi owah-owahan cilik iki kanggo telpon TimeWindows.of method, sampeyan bisa nggunakake jendhela tumbling. Conto iki ora nelpon nganti () cara, supaya interval penylametan standar 24 jam bakal digunakake.

Akhire, iku wektu kanggo pindhah menyang pungkasan saka opsi jendhela - "hopping" windows.

Sliding ("mlumpat") jendhela

Jendhela sliding / mlumpat padha karo jendhela tumbling, nanging ana bedane tipis. Jendhela geser ora ngenteni nganti pungkasan interval wektu sadurunge nggawe jendhela anyar kanggo ngolah acara paling anyar. Dheweke miwiti petungan anyar sawise interval nunggu kurang saka durasi jendhela.

Kanggo nggambarake beda antarane jendhela tumbling lan mlumpat, ayo bali menyang conto ngetung transaksi bursa saham. Tujuan kita isih kanggo ngetung jumlah transaksi, nanging kita ora pengin ngenteni kabeh wektu sadurunge nganyari counter. Nanging, kita bakal nganyari counter ing interval sing luwih cendhek. Contone, kita isih bakal ngetung jumlah transaksi saben 20 detik, nanging nganyari counter saben 5 detik, minangka ditampilake ing Fig. 5.15. Ing kasus iki, kita mungkasi karo telung asil jendhela karo data tumpang tindih.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Listing 5.7 nuduhake kode kanggo nemtokake windows ngusapake (ditemokake ing src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Jendhela tumbling bisa diowahi menyang jendhela hopping kanthi nambah telpon kanggo advanceBy () cara. Ing conto sing ditampilake, interval nyimpen yaiku 15 menit.

Sampeyan ndeleng ing bagean iki carane mbatesi asil agregasi kanggo wektu windows. Utamane, aku pengin sampeyan ngelingi telung perkara ing ngisor iki saka bagean iki:

  • ukuran jendhela sesi diwatesi ora miturut periode wektu, nanging dening aktivitas pangguna;
  • "tumbling" windows nyedhiyakake ringkesan acara ing wektu tartamtu;
  • Suwene jumping windows tetep, nanging asring dianyari lan bisa ngemot entri sing tumpang tindih ing kabeh jendhela.

Sabanjure, kita bakal sinau carane ngowahi KTable bali menyang KStream kanggo sambungan.

5.3.3. Nyambungake obyek KStream lan KTable

Ing Bab 4, kita rembugan nyambungake rong obyek KStream. Saiki kita kudu sinau carane nyambungake KTable lan KStream. Iki bisa uga dibutuhake kanggo alasan prasaja ing ngisor iki. KStream minangka stream rekaman, lan KTable minangka stream nganyari rekaman, nanging kadhangkala sampeyan pengin nambah konteks tambahan menyang stream rekaman nggunakake nganyari saka KTable.

Ayo njupuk data babagan jumlah transaksi bursa saham lan gabungke karo warta bursa saham kanggo industri sing relevan. Mangkene apa sing kudu sampeyan lakoni kanggo entuk kode sing wis ana.

  1. Ngonversi obyek KTable kanthi data babagan jumlah transaksi saham menyang KStream, banjur diganti tombol kanthi tombol sing nuduhake sektor industri sing cocog karo simbol saham iki.
  2. Nggawe obyek KTable sing maca data saka topik karo warta bursa saham. KTable anyar iki bakal dikategorikaké miturut sektor industri.
  3. Sambungake nganyari warta kanthi informasi babagan jumlah transaksi bursa saham miturut sektor industri.

Saiki ayo ndeleng carane ngleksanakake rencana aksi iki.

Ngonversi KTable menyang KStream

Kanggo ngowahi KTable dadi KStream sampeyan kudu nindakake ing ngisor iki.

  1. Nelpon metode KTable.toStream().
  2. Kanthi nelpon cara KStream.map, ngganti tombol karo jeneng industri, lan banjur njupuk obyek TransactionSummary saka Windowed Kayata.

Kita bakal chain operasi iki bebarengan minangka nderek (kode bisa ditemokaké ing file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Amarga kita nindakake operasi KStream.map, contone KStream bali dipisahake kanthi otomatis nalika digunakake ing sambungan.

Kita wis ngrampungake proses konversi, sabanjure kita kudu nggawe obyek KTable kanggo maca warta saham.

Nggawe KTable kanggo warta saham

Begjanipun, nggawe obyek KTable njupuk mung siji baris kode (kode bisa ditemokaké ing src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Wigati dicathet menawa ora ana obyek Serde sing kudu ditemtokake, amarga string Serdes digunakake ing setelan kasebut. Uga, kanthi nggunakake enumerasi AWAL, tabel diisi karo cathetan ing wiwitan.

Saiki kita bisa nerusake menyang langkah pungkasan - sambungan.

Nyambungake nganyari warta karo data count transaksi

Nggawe sambungan ora angel. Kita bakal nggunakake gabung kiwa yen ora ana warta saham kanggo industri sing cocog (kode sing dibutuhake bisa ditemokake ing file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Operator leftJoin iki cukup prasaja. Beda karo gabungan ing Bab 4, metode JoinWindow ora digunakake amarga nalika nindakake gabungan KStream-KTable, mung ana siji entri ing KTable kanggo saben tombol. Sambungan kasebut ora diwatesi ing wektu: rekaman kasebut ana ing KTable utawa ora ana. Kesimpulan utama: nggunakake obyek KTable sampeyan bisa nambah KStream karo data referensi sing kurang kerep dianyari.

Saiki kita bakal ndeleng cara sing luwih efisien kanggo nambah acara saka KStream.

5.3.4. Objek GlobalKTable

Kaya sing sampeyan ngerteni, perlu kanggo nambah aliran acara utawa nambah konteks. Ing Bab 4 sampeyan ndeleng sambungan antarane rong obyek KStream, lan ing bagean sadurunge sampeyan ndeleng sambungan antarane KStream lan KTable. Ing kabeh kasus iki, perlu kanggo partisi maneh stream data nalika pemetaan tombol kanggo jinis anyar utawa nilai. Kadhangkala partisi ulang ditindakake kanthi jelas, lan kadhangkala Kafka Streams nindakake kanthi otomatis. Pemisahan maneh perlu amarga tombol wis diganti lan cathetan kudu rampung ing bagean anyar, yen ora, sambungan bakal ora bisa (iki wis rembugan ing Bab 4, ing bagean "Re-partisi data" ing bagean 4.2.4).

Pemisahan maneh duwe biaya

Pemisahan ulang mbutuhake biaya - biaya sumber daya tambahan kanggo nggawe topik penengah, nyimpen data duplikat ing topik liyane; iku uga tegese tambah latensi amarga nulis lan maca saka topik iki. Kajaba iku, yen sampeyan kudu nggabung ing luwih saka siji aspek utawa dimensi, sampeyan kudu chain gabungan, map cathetan karo tombol anyar, lan mbukak maneh proses partisi maneh.

Nyambung menyang dataset cilik

Ing sawetara kasus, volume data referensi sing bakal disambungake relatif cilik, supaya salinan lengkap bisa gampang pas sacara lokal ing saben simpul. Kanggo kahanan kaya iki, Kafka Streams nyedhiyakake kelas GlobalKTable.

Kedadean GlobalKTable unik amarga aplikasi niru kabeh data menyang saben simpul. Lan amarga kabeh data ana ing saben simpul, ora perlu pamisah stream acara kanthi referensi kunci data supaya kasedhiya kanggo kabeh partisi. Sampeyan uga bisa nggawe gabungan tanpa tombol nggunakake obyek GlobalKTable. Ayo bali menyang salah sawijining conto sadurunge kanggo nduduhake fitur iki.

Nyambungake obyek KStream menyang obyek GlobalKTable

Ing bagean 5.3.2, kita nindakake panggabungan jendhela transaksi ijol-ijolan dening para panuku. Asil saka agregasi iki katon kaya iki:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Nalika asil kasebut nyedhiyakake tujuan kasebut, mesthine bakal luwih migunani yen jeneng pelanggan lan jeneng perusahaan lengkap uga ditampilake. Kanggo nambah jeneng pelanggan lan jeneng perusahaan, sampeyan bisa gabung normal, nanging sampeyan kudu nindakake rong pemetaan tombol lan partisi maneh. Kanthi GlobalKTable sampeyan bisa ngindhari biaya operasi kasebut.

Kanggo nindakake iki, kita bakal nggunakake countStream obyek saka Listing 5.11 (kode sing cocog bisa ditemokaké ing src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) lan nyambung menyang loro obyek GlobalKTable.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Kita wis tau ngrembug babagan iki, mula aku ora bakal mbaleni. Nanging aku Wigati sing kode ing toStream () fungsi map abstrak menyang obyek fungsi tinimbang expression lambda inline kanggo diwaca.

Langkah sabanjure yaiku ngumumake rong conto GlobalKTable (kode sing ditampilake bisa ditemokake ing file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"

Wigati dimangerteni manawa jeneng topik diterangake nggunakake jinis enumerasi.

Saiki yen kabeh komponen wis siyap, kabeh sing isih ana yaiku nulis kode kanggo sambungan kasebut (sing bisa ditemokake ing file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Sanajan ana rong gabungan ing kode iki, padha dirantai amarga ora ana asil sing digunakake kanthi kapisah. Asil ditampilake ing mburi kabeh operasi.

Nalika sampeyan mbukak operasi gabungan ing ndhuwur, sampeyan bakal entuk asil kaya iki:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Intine ora owah, nanging asil kasebut katon luwih jelas.

Yen sampeyan ngetung nganti Bab 4, sampeyan wis ndeleng sawetara jinis sambungan sing ditindakake. Padha kapacak ing tabel. 5.2. Tabel iki nggambarake kemampuan konektivitas ing versi 1.0.0 saka Kafka Streams; Ana sing bisa diganti ing rilis mbesuk.

Buku "Kafka Streams in Action. Aplikasi lan layanan mikro kanggo karya nyata-wektu"
Kanggo ngrampungake, ayo ringkesan dhasar: sampeyan bisa nyambungake stream acara (KStream) lan nganyari stream (KTable) nggunakake negara lokal. Utawa, yen ukuran data referensi ora gedhe banget, sampeyan bisa nggunakake obyek GlobalKTable. GlobalKTables niru kabeh partisi menyang saben simpul aplikasi Kafka Streams, mesthekake yen kabeh data kasedhiya preduli saka partisi sing cocog karo tombol kasebut.

Sabanjure kita bakal weruh fitur Kafka Streams, amarga kita bisa mirsani owah-owahan negara tanpa nggunakake data saka topik Kafka.

5.3.5. Negara sing bisa ditakoni

Kita wis nindakake sawetara operasi sing nglibatake negara lan mesthi ngasilake asil menyang konsol (kanggo tujuan pangembangan) utawa nulis menyang topik (kanggo tujuan produksi). Nalika nulis asil menyang topik, sampeyan kudu nggunakake konsumen Kafka kanggo ndeleng.

Maca data saka topik kasebut bisa dianggep minangka jinis tampilan sing diwujudake. Kanggo tujuan kita, kita bisa nggunakake definisi tampilan materialized saka Wikipedia: "...objek database fisik ngemot asil saka pitakonan. Contone, bisa uga salinan lokal data remot, utawa subset saka baris lan/utawa kolom saka tabel utawa asil gabungan, utawa tabel ringkesan sing dipikolehi liwat agregasi” (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams uga ngidini sampeyan mbukak pitakon interaktif ing toko negara, supaya sampeyan bisa langsung maca tampilan kasebut. Wigati dimangerteni manawa pitakon menyang toko negara minangka operasi mung diwaca. Iki mesthekake yen sampeyan ora perlu kuwatir yen ora sengaja nggawe negara ora konsisten nalika aplikasi sampeyan ngolah data.

Kemampuan kanggo langsung takon toko negara penting. Iki tegese sampeyan bisa nggawe aplikasi dashboard tanpa kudu njupuk data dhisik saka konsumen Kafka. Iki uga nambah efisiensi aplikasi, amarga ora perlu nulis data maneh:

  • thanks kanggo lokalitas data, bisa diakses kanthi cepet;
  • duplikasi data wis ngilangi, awit iku ora ditulis kanggo panyimpenan external.

Wangsulan: Bab ingkang utama aku pengin sampeyan elinga iku sampeyan bisa langsung takon negara saka ing aplikasi. Kesempatan sing diwenehake sampeyan ora bisa diremehake. Tinimbang nggunakake data saka Kafka lan nyimpen cathetan ing basis data kanggo aplikasi kasebut, sampeyan bisa takon toko negara kanthi asil sing padha. Pitakonan langsung menyang toko negara tegese kode kurang (ora ana konsumen) lan piranti lunak kurang (ora perlu tabel database kanggo nyimpen asil).

Kita wis nutupi sawetara dhasar ing bab iki, mula kita bakal ninggalake diskusi babagan pitakon interaktif marang toko negara saiki. Nanging aja kuwatir: ing Bab 9, kita bakal nggawe aplikasi dashboard sing prasaja kanthi pitakon interaktif. Bakal nggunakake sawetara conto saka bab iki lan sadurunge kanggo nduduhake pitakon interaktif lan carane sampeyan bisa nambah menyang aplikasi Kafka Streams.

Ringkesan

  • Obyek KStream makili aliran acara, bisa dibandhingake karo sisipan menyang database. Objek KTable makili stream nganyari, luwih kaya nganyari kanggo database. Ukuran obyek KTable ora tuwuh, cathetan lawas diganti karo sing anyar.
  • Objek KTable dibutuhake kanggo operasi agregasi.
  • Nggunakake operasi windowing, sampeyan bisa pamisah data agregat menyang ember wektu.
  • Thanks kanggo obyek GlobalKTable, sampeyan bisa ngakses data referensi ing ngendi wae ing aplikasi, preduli saka partisi.
  • Sambungan antarane obyek KStream, KTable lan GlobalKTable bisa uga.

Nganti saiki, kita wis fokus kanggo mbangun aplikasi Kafka Streams nggunakake KStream DSL tingkat dhuwur. Senajan pendekatan tingkat dhuwur ngijini sampeyan kanggo nggawe program rapi lan ringkes, nggunakake iku makili trade-off. Nggarap DSL KStream tegese nambah ringkesan kode kanthi nyuda tingkat kontrol. Ing bab sabanjure, kita bakal ndeleng API simpul panangan tingkat rendah lan nyoba ganti rugi liyane. Program kasebut bakal luwih dawa tinimbang sadurunge, nanging kita bakal bisa nggawe meh kabeh simpul panangan sing dibutuhake.

→ Rincian liyane babagan buku bisa ditemokake ing situs web penerbit

→ Kanggo Habrozhiteli diskon 25% nggunakake kupon - Aliran Kafka

→ Sawise mbayar buku versi kertas, buku elektronik bakal dikirim liwat e-mail.

Source: www.habr.com

Add a comment