Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time" Halo warga Khabro! Buku ieu cocog pikeun pamekar anu hoyong ngartos ngolah benang. Ngartos pemrograman anu disebarkeun bakal ngabantosan anjeun langkung ngartos Aliran Kafka sareng Kafka. Éta hadé pikeun terang kerangka Kafka sorangan, tapi ieu henteu diperyogikeun: Kuring bakal nyarioskeun ka anjeun sadayana anu anjeun peryogikeun. Pangembang Kafka sareng novice anu berpengalaman sami bakal diajar kumaha cara nyiptakeun aplikasi pamrosesan aliran anu pikaresepeun nganggo perpustakaan Kafka Streams dina buku ieu. Pangembang Java panengah sareng maju anu parantos akrab sareng konsép sapertos serialisasi bakal diajar nerapkeun kaahlianna pikeun nyiptakeun aplikasi Kafka Streams. Kode sumber buku ditulis dina Java 8 sareng ngagunakeun sintaksis éksprési lambda Java 8, ku kituna terang kumaha cara damel sareng fungsi lambda (sanaos dina basa pamrograman sanés) bakal mangpaat.

Petikan. 5.3. Aggregation sarta operasi windowing

Dina bagian ieu, urang bakal ngajalajah bagian anu paling ngajangjikeun tina Kafka Streams. Sajauh ieu kami parantos nutupan aspék Kafka Streams ieu:

  • nyieun topologi processing;
  • ngagunakeun kaayaan dina aplikasi streaming;
  • ngajalankeun sambungan aliran data;
  • bédana antara aliran acara (KStream) jeung aliran update (KTable).

Dina conto di handap ieu urang bakal mawa sakabeh elemen ieu babarengan. Anjeun ogé bakal diajar ngeunaan windowing, fitur hébat séjén tina aplikasi streaming. conto kahiji urang bakal aggregation basajan.

5.3.1. Aggregation tina jualan saham ku sektor industri

Agregasi sareng pengelompokeun mangrupikeun alat anu penting nalika damel sareng data streaming. Pamariksaan rékaman individu anu ditampi sering henteu cekap. Pikeun nimba informasi tambahan tina data, perlu pikeun grup jeung ngagabungkeun aranjeunna.

Dina conto ieu, anjeun bakal ngagem kostum padagang dinten anu kedah ngalacak volume penjualan saham perusahaan di sababaraha industri. Husus, anjeun resep kana lima perusahaan anu gaduh penjualan saham panggedéna di unggal industri.

Aggregation sapertos ngabutuhkeun sababaraha léngkah di handap ieu pikeun narjamahkeun data kana bentuk anu dipikahoyong (sacara umum).

  1. Jieun sumber dumasar-topik anu nyebarkeun inpormasi dagang saham atah. Urang kedah peta hiji obyék tipe StockTransaction ka obyék tipe ShareVolume. Intina nyaéta obyék StockTransaction ngandung metadata penjualan, tapi urang ngan ukur peryogi data ngeunaan jumlah saham anu dijual.
  2. Data Grup ShareVolume ku simbol saham. Sakali dikelompokkeun ku simbol, anjeun tiasa ngaragragkeun data ieu kana subtotal tina volume penjualan saham. Eta sia noting yén métode KStream.groupBy mulih hiji conto tipe KGroupedStream. Tur anjeun bisa meunangkeun conto KTable ku salajengna nelepon metoda KGroupedStream.reduce.

Naon panganteur KGroupedStream

Metodeu KStream.groupBy sareng KStream.groupByKey ngabalikeun conto KGroupedStream. KGroupedStream mangrupikeun perwakilan perantara tina aliran acara saatos dikelompokkeun ku konci. Éta henteu dimaksudkeun pikeun dianggo langsung sareng éta. Gantina, KGroupedStream dipaké pikeun operasi aggregation, nu salawasna ngahasilkeun KTable. Sarta saprak hasil tina operasi aggregation nyaeta KTable sarta aranjeunna ngagunakeun toko kaayaan, mungkin teu kabeh apdet salaku hasilna dikirim salajengna handap pipa nu.

Metoda KTable.groupBy mulih hiji KGroupedTable sarupa - ngagambarkeun panengah tina aliran apdet, regrouped ku konci.

Hayu urang istirahat sakedap sareng tingali Gbr. 5.9, anu nunjukkeun naon anu kami parantos dihontal. Topologi ieu kedahna akrab pisan ka anjeun.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Hayu urang tingali dina kode pikeun topologi ieu (bisa kapanggih dina file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Kodeu anu dipasihkeun dibédakeun ku pondokna sareng volume tindakan anu ageung dilakukeun dina sababaraha garis. Anjeun bisa perhatikeun hal anyar dina parameter mimiti metoda builder.stream: a nilai tipe enum AutoOffsetReset.EARLIEST (aya ogé LATEST), diatur ngagunakeun métode Consumed.withOffsetResetPolicy. Jinis enumerasi ieu tiasa dianggo pikeun nangtukeun strategi reset offset pikeun tiap KStream atanapi KTable sareng diutamakeun tina pilihan reset offset tina konfigurasi.

GroupByKey jeung GroupBy

Antarbeungeut KStream gaduh dua metode pikeun ngagolongkeun rékaman: GroupByKey sareng GroupBy. Duanana balik a KGroupedTable, jadi Anjeun bisa jadi wondering naon bédana antara aranjeunna sarta nalika ngagunakeun nu mana?

Metodeu GroupByKey dianggo nalika konci dina KStream tos teu kosong. Sareng anu paling penting, bendera "merlukeun partisi ulang" henteu pernah disetél.

Métode GroupBy nganggap yén anjeun parantos ngarobih konci grup, janten bandéra partisi ulang disetel ka leres. Nedunan ngagabung, aggregations, jsb sanggeus metoda GroupBy bakal ngahasilkeun partisi ulang otomatis.
Ringkesan: Sabisana, anjeun kedah nganggo GroupByKey tinimbang GroupBy.

Éta jelas naon anu dilakukeun ku metode mapValues ​​sareng groupBy, janten hayu urang tingali metode sum () (kapanggih dina src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Metoda ShareVolume.sum mulihkeun total ngajalankeun volume jualan saham, sarta hasil tina sakabéh ranté itungan mangrupa objék KTable . Ayeuna anjeun ngartos peran KTable muterkeun. Nalika objék ShareVolume sumping, objék KTable anu saluyu nyimpen apdet ayeuna panganyarna. Kadé inget yen sakabeh apdet anu reflected dina shareVolumeKTable saméméhna, tapi teu kabeh dikirim salajengna.

Urang lajeng nganggo KTable ieu agrégat (ku jumlah biasa traded) pikeun anjog di lima pausahaan jeung volume pangluhurna biasa traded di unggal industri. Laku lampah urang dina hal ieu bakal sami sareng pikeun agrégasi munggaran.

  1. Laksanakeun operasi groupBy séjén pikeun ngagolongkeun objék ShareVolume individu dumasar industri.
  2. Mimitian nyimpulkeun objék ShareVolume. Waktos ieu obyék agrégasi mangrupikeun antrian prioritas ukuran tetep. Dina antrian ukuran tetep ieu, ngan lima pausahaan kalawan jumlah panggedena saham dijual anu dipikagaduh.
  3. Peta antrian ti paragraf saméméhna ka nilai string jeung balik luhureun lima saham paling traded ku nomer ku industri.
  4. Tulis hasil dina wangun string kana topik.

Dina Gbr. Gambar 5.10 nembongkeun grafik topologi aliran data. Sakumaha anjeun tiasa tingali, babak kadua ngolah cukup basajan.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Ayeuna urang gaduh pamahaman anu jelas ngeunaan struktur pamrosésan babak kadua ieu, urang tiasa kéngingkeun kode sumberna (anjeun bakal mendakanana dina file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

initializer Ieu ngandung variabel fixedQueue. Ieu objék custom nu adaptor pikeun java.util.TreeSet nu dipaké pikeun lagu N luhur hasil dina urutan nurun tina saham traded.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Anjeun parantos ningali grupBy sareng mapValues ​​​​sauran, janten urang moal lebet kana éta (urang nyauran metodeu KTable.toStream sabab metodeu KTable.print teu dianggo). Tapi anjeun teu acan ningali versi KTable of agrégat () acan, sangkan bakal méakkeun saeutik waktos ngabahas éta.

Sakumaha anjeun émut, anu ngajantenkeun KTable béda nyaéta rékaman sareng konci anu sami dianggap apdet. KTable ngagantikeun entri heubeul ku nu anyar. Aggregation lumangsung dina cara nu sarupa: rékaman panganyarna jeung konci nu sarua nu aggregated. Nalika rékaman datang, éta ditambahkeun kana conto kelas FixedSizePriorityQueue maké panambah (parameter kadua dina panggero métode agrégat), tapi lamun rékaman sejen geus aya jeung konci anu sarua, lajeng catetan heubeul dihapus maké subtractor a (parameter katilu dina). panggero métode agrégat).

Ieu sadayana hartosna yén agrégator kami, FixedSizePriorityQueue, henteu ngahijikeun sadaya nilai sareng hiji konci, tapi nyimpen jumlah gerak tina jumlah saham N anu paling didagangkeun. Unggal éntri asup ngandung jumlah total saham dijual jadi jauh. KTable bakal masihan anjeun inpormasi ngeunaan saham perusahaan mana anu ayeuna paling didagangkeun, tanpa meryogikeun agrégat rolling unggal update.

Urang diajar ngalakukeun dua hal penting:

  • nilai grup dina KTable ku konci umum;
  • ngalakukeun operasi mangpaat kayaning rollup na aggregation on ieu nilai dikelompokeun.

Nyaho kumaha ngalakukeun operasi ieu penting pikeun ngarti kana harti data anu ngalir ngaliwatan aplikasi Kafka Streams sareng ngartos inpormasi naon anu dibawa.

Kami ogé parantos ngahijikeun sababaraha konsép konci anu dibahas dina buku ieu. Dina Bab 4, urang bahas kumaha toleran kasalahan, kaayaan lokal penting pikeun aplikasi streaming. Conto anu munggaran dina bab ieu nunjukkeun naha kaayaan lokal penting pisan — éta masihan anjeun kamampuan pikeun ngalacak inpormasi anu anjeun parantos ningali. Aksés lokal ngahindarkeun telat jaringan, ngajantenkeun aplikasi langkung berprestasi sareng tahan kasalahan.

Nalika ngajalankeun sagala rollup atanapi aggregation operasi, anjeun kudu nangtukeun nami toko kaayaan. Operasi rollup sareng agrégasi ngabalikeun conto KTable, sareng KTable nganggo panyimpenan kaayaan pikeun ngagentos hasil anu lami sareng anu énggal. Sakumaha anu anjeun tingali, henteu sadayana pembaruan dikirimkeun ka saluran pipa, sareng ieu penting sabab operasi agrégasi dirancang pikeun ngahasilkeun inpormasi kasimpulan. Upami anjeun henteu nerapkeun kaayaan lokal, KTable bakal neraskeun sadaya hasil agrégasi sareng rollup.

Salajengna, urang bakal ningali ngajalankeun operasi sapertos aggregation dina jangka waktu anu khusus - anu disebut operasi windowing.

5.3.2. Operasi jandela

Dina bagian saméméhna, urang ngawanohkeun konvolusi ngageser jeung aggregation. Aplikasi éta ngalaksanakeun roll-up kontinyu tina volume jualan saham, dituturkeun ku aggregation tina lima saham paling traded on bursa.

Kadang-kadang aggregation kontinyu sapertos na roll-up hasil diperlukeun. Sareng kadang anjeun kedah ngalakukeun operasi ngan ukur dina waktos anu ditangtukeun. Salaku conto, itung sabaraha transaksi bursa anu dilakukeun sareng saham perusahaan tinangtu dina 10 menit terakhir. Atanapi sabaraha pangguna ngaklik spanduk pariwara énggal dina 15 menit terakhir. Hiji aplikasi tiasa ngalakukeun operasi sapertos sababaraha kali, tapi kalawan hasil nu lumaku ngan pikeun période waktu nu tangtu (windows waktos).

Ngitung transaksi bursa ku pembeli

Dina conto salajengna, urang bakal ngalacak transaksi saham dina sababaraha padagang-boh organisasi ageung atanapi pemodal individu anu pinter.

Aya dua kamungkinan alesan pikeun ngalacak ieu. Salah sahijina nyaéta kedah terang naon anu dibeli/dijual para pamimpin pasar. Upami pamaén ageung sareng investor canggih ieu ningali kasempetan, éta masuk akal pikeun nuturkeun strategina. Alesan anu kadua nyaéta kahayang pikeun ningali tanda-tanda anu mungkin tina perdagangan insider ilegal. Jang ngalampahkeun ieu, anjeun bakal kedah analisa korelasi tina paku jualan badag kalayan siaran pers penting.

Pelacakan sapertos ieu diwangun ku léngkah-léngkah ieu:

  • nyieun aliran pikeun maca tina topik stock-transaksi;
  • ngagolongkeun rékaman asup ku ID meuli jeung simbol stock. Nyauran metode groupBy mulihkeun conto kelas KGroupedStream;
  • Metodeu KGroupedStream.windowedBy mulihkeun aliran data dugi ka jandela waktos, anu ngamungkinkeun aggregation windowed. Gumantung kana jinis jandela, boh TimeWindowedKStream atanapi SessionWindowedKStream dipulangkeun;
  • cacah transaksi pikeun operasi aggregation. Aliran data windowed nangtukeun naha rékaman tinangtu dicokot kana akun dina count ieu;
  • nulis hasil kana topik atawa kaluaran aranjeunna ka konsol salila ngembangkeun.

Topologi aplikasi ieu saderhana, tapi gambaran anu jelas ngeunaan éta bakal ngabantosan. Hayu urang tingali Gbr. 5.11.

Salajengna, urang bakal ningali pungsionalitas operasi jandela sareng kode anu saluyu.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"

Jenis jandela

Aya tilu jinis windows di Kafka Streams:

  • sésional;
  • "tumbling" (tumbling);
  • ngageser / hopping.

Mana anu kedah dipilih gumantung kana kabutuhan bisnis anjeun. Jandéla tumbling sareng luncat waktosna terbatas, sedengkeun windows sési diwatesan ku kagiatan pangguna-durasi sési ditangtukeun ngan ku sabaraha aktip pangguna. Hal utama pikeun nginget yén sadaya jinis jandela dumasar kana tanggal / waktos perangko tina éntri, sanes waktos sistem.

Salajengna, urang nerapkeun topologi urang kalawan unggal tipe jandela. Kodeu lengkep bakal dipasihkeun ngan dina conto anu munggaran; pikeun jinis windows anu sanés moal aya anu robih kecuali jinis operasi jandela.

Jandéla sési

Jandéla sési béda pisan sareng jinis windows anu sanés. Aranjeunna diwatesan teu jadi loba ku waktu sakumaha ku aktivitas pamaké (atawa aktivitas entitas nu Anjeun hoyong lacak). Jandéla sési dibatesan ku période teu aktip.

Gambar 5.12 ngagambarkeun konsép jandéla sési. Sesi nu leuwih leutik bakal ngagabung jeung sési ka kénca. Sareng sési di beulah katuhu bakal papisah sabab nuturkeun période anu teu aktip. Jandéla sési dumasar kana kagiatan pangguna, tapi nganggo perangko tanggal / waktos tina éntri pikeun nangtukeun sési mana éntri éta.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"

Ngagunakeun jandéla sési pikeun ngalacak transaksi saham

Hayu urang nganggo jandéla sési pikeun nyandak inpormasi ngeunaan transaksi bursa. Palaksanaan jandéla sési ditémbongkeun dina Listing 5.5 (anu bisa kapanggih dina src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Anjeun parantos ningali seueur operasi dina topologi ieu, janten anjeun henteu kedah ningali deui di dieu. Tapi aya ogé sababaraha elemen anyar di dieu, anu ayeuna urang bahas.

Sakur operasi groupBy ilaharna ngalakukeun sababaraha jenis operasi aggregation (aggregation, rollup, atawa cacah). Anjeun tiasa ngalakukeun aggregation kumulatif sareng total jalan, atanapi aggregation jandela, anu tumut kana catetan akun dina jandela waktos anu ditangtukeun.

Kodeu dina Listing 5.5 ngitung jumlah transaksi dina jandéla sési. Dina Gbr. 5.13 tindakan ieu dianalisis step by step.

Ku nelepon windowedBy(SessionWindows.with(twentySeconds).nepi(fifteenMinutes)) urang nyieun jandela sési kalayan interval inactivity 20 detik sarta interval kegigihan 15 menit. Interval dianggurkeun 20 detik ngandung harti yén aplikasi bakal ngalebetkeun éntri naon waé anu datang dina 20 detik saatos tungtung atanapi ngamimitian sési ayeuna kana sési ayeuna (aktip).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Salajengna, urang tangtukeun mana operasi aggregation perlu dipigawé dina jandela sési - dina hal ieu, cacah. Lamun entri asup ragrag di luar jandela inactivity (boh sisi tanggal/waktu cap), aplikasi nu nyieun sési anyar. Interval retention hartina ngajaga sési pikeun sababaraha waktu jeung ngamungkinkeun pikeun data telat nu manjangan saluareun periode inactivity sési tapi masih bisa napel. Sajaba ti éta, mimiti jeung ahir sési anyar hasil tina ngahiji pakait jeung pangheubeulna jeung panganyarna perangko tanggal/waktu.

Hayu urang tingali sababaraha éntri tina metode cacah pikeun ningali kumaha sési jalanna (Tabel 5.1).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Nalika rékaman sumping, urang milarian sési anu aya sareng konci anu sami, waktos tungtung kirang ti tanggal ayeuna / cap waktos - interval inactivity, sareng waktos mimiti langkung ageung tibatan tanggal ayeuna / cap waktos + interval inactivity. Nyandak ieu kana akun, opat éntri tina tabel. 5.1 dihijikeun kana hiji sési sapertos kieu.

1. Rékam 1 sumping heula, janten waktos ngamimitian sami sareng waktos akhir sareng 00:00:00.

2. Salajengna, asupna 2 datang, sarta kami néangan sési nu tungtung euweuh saméméhna ti 23:59:55 tur mimitian no saterusna ti 00:00:35. Kami mendakan catetan 1 sareng ngagabungkeun sési 1 sareng 2. Urang nyandak waktos ngamimitian sési 1 (saméméhna) sareng waktos ahir sési 2 (engké), supados sési énggal urang dimimitian dina jam 00:00:00 sareng ditungtungan jam 00: 00:15.

3. Rékam 3 datang, urang néangan sesi antara 00:00:30 jeung 00:01:10 sarta teu manggihan nanaon. Tambahkeun sési kadua pikeun konci 123-345-654, FFBE, dimimitian jeung ditungtungan make jam 00:00:50.

4. Rékam 4 sumping sareng urang milarian sési antara 23:59:45 sareng 00:00:25. Waktu ieu kapanggih duanana sesi 1 jeung 2. Katiluna sesi digabungkeun kana hiji, kalawan waktu mimiti 00:00:00 sarta hiji waktu tungtung 00:00:15.

Tina anu dijelaskeun dina bagian ieu, émut kana nuansa penting ieu:

  • sési henteu windows ukuranana tetep. Durasi sési ditangtukeun ku kagiatan dina jangka waktu anu ditangtukeun;
  • Tanggal/waktu perangko dina data nangtukeun naha acara ragrag dina sési aya atawa salila periode dianggurkeun.

Salajengna urang bakal ngabahas jinis jandela salajengna - "tumbling" windows.

Jandéla "Tumbleng".

Jandéla tumbling nangkep kajadian anu tumiba dina jangka waktu anu tangtu. Bayangkeun yén anjeun kedah néwak sadaya transaksi saham perusahaan anu tangtu unggal 20 detik, janten anjeun ngumpulkeun sadaya kajadian dina waktos éta. Dina ahir interval 20 detik, jandela gulung leuwih sarta pindah ka interval observasi 20 detik anyar. Gambar 5.14 ngagambarkeun kaayaan ieu.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Sakumaha anjeun tiasa tingali, sadaya kajadian anu ditampi dina 20 detik terakhir kalebet dina jandela. Dina ahir periode waktu ieu, jandela anyar dijieun.

Listing 5.6 nunjukkeun kode anu nunjukkeun pamakean windows tumbling pikeun nangkep transaksi saham unggal 20 detik (kapanggih dina src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Kalayan parobahan leutik ieu kana panggero metode TimeWindows.of, anjeun tiasa nganggo jandela tumbling. Conto ieu henteu nyauran metodeu dugi (), janten interval ingetan standar 24 jam bakal dianggo.

Tungtungna, éta waktu pikeun ngaléngkah ka panungtungan pilihan jandela - "hopping" jandéla.

Ngageser ("luncat") jandéla

Jandéla sliding/hopping sarupa jeung jandéla tumbling, tapi kalawan saeutik bédana. Ngageser jandéla ulah ngantosan nepi ka ahir interval waktu saméméh nyieun jandela anyar pikeun ngolah acara panganyarna. Aranjeunna ngamimitian itungan anyar saatos interval ngantosan kirang ti durasi jandela.

Pikeun ngagambarkeun bédana antara tumbling sareng jumping windows, hayu urang uih deui kana conto ngitung transaksi bursa saham. Tujuanana kami masih ngitung jumlah transaksi, tapi kami henteu hoyong ngantosan sadayana waktos sateuacan ngapdet loket. Gantina, urang bakal ngamutahirkeun counter dina interval pondok. Contona, urang masih bakal cacah jumlah transaksi unggal 20 detik, tapi ngamutahirkeun counter unggal 5 detik, ditémbongkeun saperti dina Gbr. 5.15. Dina hal ieu, urang mungkas nepi ka tilu hasil jandela jeung data tumpang tindih.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Listing 5.7 nembongkeun kode pikeun nangtukeun jandéla ngageser (kapanggih dina src / utama / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Jandéla tumbling bisa dirobah jadi jandela hopping ku nambahkeun panggero ka advanceBy () métode. Dina conto anu dipidangkeun, interval nyimpen nyaéta 15 menit.

Anjeun ningal dina bagian ieu kumaha ngawatesan hasil agrégasi kana waktos windows. Khususna, kuring hoyong anjeun émut kana tilu hal ieu tina bagian ieu:

  • ukuran jandéla sési diwatesan lain ku periode waktu, tapi ku aktivitas pamaké;
  • Jandéla "tumbling" nyayogikeun tinjauan acara dina waktos anu ditangtukeun;
  • Durasi luncat windows tetep, tapi aranjeunna sering diropéa sareng tiasa ngandung éntri anu tumpang tindih dina sadaya windows.

Salajengna, urang bakal diajar kumaha carana ngarobah KTable deui ka KStream pikeun sambungan.

5.3.3. Nyambungkeun objék KStream sareng KTable

Dina Bab 4, urang ngabahas nyambungkeun dua objék KStream. Ayeuna urang kedah diajar kumaha nyambungkeun KTable sareng KStream. Ieu bisa jadi diperlukeun pikeun alesan basajan handap. KStream mangrupikeun aliran rékaman, sareng KTable mangrupikeun aliran apdet rékaman, tapi sakapeung anjeun badé nambihan kontéks tambahan kana aliran rékaman nganggo apdet tina KTable.

Hayu urang nyandak data dina jumlah transaksi bursa jeung ngagabungkeun aranjeunna kalayan warta bursa saham pikeun industri relevan. Ieu naon anu anjeun kedah laksanakeun pikeun ngahontal ieu tinangtu kode anu anjeun parantos gaduh.

  1. Ngarobah obyék KTable kalawan data dina Jumlah transaksi stock kana KStream a, dituturkeun ku ngaganti konci kalayan konci nunjukkeun sektor industri pakait jeung simbol stock ieu.
  2. Jieun objék KTable nu maca data tina topik kalawan warta bursa saham. KTable anyar ieu bakal digolongkeun dumasar sektor industri.
  3. Sambungkeun apdet warta sareng inpormasi ngeunaan jumlah transaksi bursa saham ku sektor industri.

Ayeuna hayu urang tingali kumaha nerapkeun rencana aksi ieu.

Ngarobih KTable ka KStream

Pikeun ngarobih KTable ka KStream anjeun kedah ngalakukeun ieu.

  1. Nelepon metoda KTable.toStream ().
  2. Ku nelepon metoda KStream.map, ngaganti konci kalayan ngaran industri, lajeng nimba objék TransactionSummary tina conto Windowed.

Urang bakal ranté operasi ieu babarengan saperti kieu (kode bisa kapanggih dina file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Kusabab urang ngalakukeun operasi KStream.map, conto KStream balik ulang partitioned otomatis lamun dipaké dina sambungan.

Kami parantos réngsé prosés konvérsi, salajengna urang kedah nyiptakeun objék KTable pikeun maca warta saham.

Nyiptakeun KTable pikeun warta saham

Untungna, nyieun hiji objek KTable nyokot ngan hiji garis kode (kode nu bisa kapanggih dina src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Eta sia noting yén euweuh objék Serde diperlukeun pikeun nangtukeun, saprak string Serdes dipaké dina setélan. Ogé, ku ngagunakeun enumerasi pangheubeulna, tabél dieusian ku rékaman di pisan awal.

Ayeuna urang tiasa ngaléngkah ka léngkah ahir - sambungan.

Nyambungkeun apdet warta sareng data cacah transaksi

Nyiptakeun sambungan henteu sesah. Urang bakal make a gabung kénca bisi euweuh warta stock pikeun industri relevan (kode diperlukeun bisa kapanggih dina file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
operator leftJoin Ieu cukup basajan. Beda sareng gabung dina Bab 4, metode JoinWindow henteu dianggo sabab nalika ngalakukeun gabungan KStream-KTable, ngan aya hiji éntri dina KTable pikeun unggal konci. Sambungan sapertos kitu henteu dugi ka waktosna: rékamanna aya dina KTable atanapi henteu. Kacindekan utama: ngagunakeun objék KTable anjeun tiasa ngabeungharan KStream kalayan data rujukan anu kirang sering diropéa.

Ayeuna urang bakal ningali cara anu langkung épisién pikeun ngabeungharan acara ti KStream.

5.3.4. objék GlobalKTable

Sakumaha anjeun tiasa tingali, aya anu peryogi pikeun enrich acara aliran atawa tambahkeun konteks ka aranjeunna. Dina Bab 4 anjeun ningali sambungan antara dua objék KStream, sareng dina bagian sateuacana anjeun ningali sambungan antara KStream sareng KTable. Dina sakabéh kasus ieu, perlu partisi ulang aliran data nalika pemetaan konci kana tipe atawa nilai anyar. Kadang partisi ulang dilakukeun sacara eksplisit, sareng kadang Kafka Streams ngalakukeunana sacara otomatis. Re-partitioning perlu sabab kenop geus robah sarta rékaman kudu mungkas nepi di bagian anyar, disebutkeun sambungan bakal jadi teu mungkin (ieu dibahas dina Bab 4, dina bagian "Re-partitioning data" dina subsection 4.2.4).

Re-partitioning boga ongkos

Re-partitioning merlukeun waragad - waragad sumberdaya tambahan pikeun nyieun jejer panengah, nyimpen duplikat data dina topik sejen; hartina ogé ngaronjat latency alatan nulis jeung maca tina topik ieu. Salaku tambahan, upami anjeun kedah ngiluan langkung ti hiji aspék atanapi diménsi, anjeun kedah ranté gabung, peta rékaman nganggo konci énggal, sareng jalankeun prosés pamisahan deui.

Nyambungkeun ka set data anu langkung alit

Dina sababaraha kasus, volume data rujukan nu disambungkeun relatif leutik, jadi salinan lengkep eta bisa gampang pas lokal dina unggal titik. Pikeun kaayaan sapertos kieu, Kafka Streams nyayogikeun kelas GlobalKTable.

Instansi GlobalKTable unik sabab aplikasina ngayakeun réplikasi sadaya data ka unggal titik. Sarta saprak sakabeh data aya dina unggal titik, teu kudu partisi aliran acara ku konci data rujukan meh sadia pikeun sakabéh partitions. Anjeun ogé tiasa ngagabung tanpa konci nganggo objék GlobalKTable. Hayu urang balik deui ka salah sahiji conto sateuacana pikeun nunjukkeun fitur ieu.

Nyambungkeun objék KStream ka objék GlobalKTable

Dina subsection 5.3.2, urang ngalaksanakeun jandela aggregation tina transaksi bursa ku pembeli. Hasil tina aggregation ieu kasampak kawas kieu:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Sanaos hasil ieu nyayogikeun tujuan, éta bakal langkung mangpaat upami nami palanggan sareng nami perusahaan lengkep ogé ditingalikeun. Pikeun nambihan nami palanggan sareng nami perusahaan, anjeun tiasa ngalakukeun gabungan normal, tapi anjeun kedah ngalakukeun dua pemetaan konci sareng partisi ulang. Kalayan GlobalKTable anjeun tiasa nyingkahan biaya operasi sapertos kitu.

Jang ngalampahkeun ieu, urang bakal ngagunakeun objék countStream ti Listing 5.11 (kode pakait bisa kapanggih dina src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) tur sambungkeun ka dua objék GlobalKTable.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Kami parantos ngabahas ieu sateuacanna, janten kuring moal ngulang deui. Tapi kuring dicatet yén kode dina toStream () fungsi peta ieu abstrak kana objek fungsi tinimbang hiji ekspresi lambda inline demi readability.

Lengkah saterusna nyaéta nyatakeun dua instansi GlobalKTable (kode anu dipidangkeun tiasa dipendakan dina file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"

Perhatikeun yén ngaran topik dijelaskeun ngagunakeun tipe enumerated.

Ayeuna urang gaduh sadaya komponén siap, ngan ukur nyerat kodeu pikeun sambungan (anu tiasa dipendakan dina file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Sanajan aya dua ngagabung dina kode ieu, aranjeunna dipasung sabab henteu hasil maranéhanana dipaké misah. Hasilna dipintonkeun dina tungtung sakabéh operasi.

Nalika anjeun ngajalankeun operasi gabung di luhur, anjeun bakal nampi hasil sapertos kieu:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Intina teu robah, tapi hasilna ieu kasampak leuwih jelas.

Upami anjeun ngitung dugi ka Bab 4, anjeun parantos ningali sababaraha jinis sambungan anu aktip. Aranjeunna didaptarkeun dina tabel. 5.2. Tabel ieu ngagambarkeun kamampuan konektipitas dina versi 1.0.0 Kafka Streams; Aya anu tiasa robih dina rilis anu bakal datang.

Buku "Kafka Streams in Action. Aplikasi sareng microservices pikeun digawé sacara real-time"
Pikeun mungkus hal-hal, hayu urang recap dasarna: anjeun tiasa nyambungkeun aliran acara (KStream) sareng ngapdet aliran (KTable) nganggo kaayaan lokal. Alternatipna, upami ukuran data rujukan henteu ageung teuing, anjeun tiasa nganggo obyék GlobalKTable. GlobalKTables ngayakeun réplikasi sadaya partisi ka unggal titik aplikasi Kafka Streams, mastikeun yén sadaya data sayogi henteu paduli partisi mana koncina.

Salajengna urang bakal ningali fitur Aliran Kafka, hatur nuhun anu urang tiasa ningali parobahan kaayaan tanpa nganggo data tina topik Kafka.

5.3.5. kaayaan queryable

Kami parantos ngalaksanakeun sababaraha operasi anu ngalibetkeun kaayaan sareng teras-terasan ngahasilkeun hasil ka konsol (pikeun tujuan pangwangunan) atanapi nyerat kana topik (pikeun tujuan produksi). Nalika nyerat hasil kana topik, anjeun kedah nganggo konsumen Kafka pikeun ningali éta.

Maca data tina topik-topik ieu tiasa dianggap jinis pandangan anu diwujudkeun. Pikeun kaperluan urang, urang bisa make definisi view materialized ti Wikipedia: "... a physical database object containing the results of a query. Contona, éta bisa mangrupa salinan lokal data jauh, atawa sawaréh ti baris jeung/atawa kolom tina tabel atawa hasil gabungan, atawa tabel kasimpulan diala ngaliwatan aggregation" (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams ogé ngidinan Anjeun pikeun ngajalankeun queries interaktif on toko kaayaan, sahingga anjeun langsung maca ieu pintonan materialized. Kadé dicatet yén pamundut ka toko kaayaan mangrupa operasi baca-hijina. Ieu mastikeun yén anjeun henteu kedah hariwang ngeunaan ngahaja ngajantenkeun kaayaan teu konsisten nalika aplikasi anjeun ngolah data.

Kamampuhan pikeun langsung naroskeun toko kaayaan penting. Ieu ngandung harti yén anjeun tiasa nyiptakeun aplikasi dasbor tanpa kedah nyandak data heula ti konsumen Kafka. Éta ogé ningkatkeun efisiensi aplikasi, kusabab kanyataan yén henteu kedah nyerat data deui:

  • hatur nuhun kana lokalitas data, aranjeunna tiasa diaksés gancang;
  • duplikasi data dileungitkeun, sabab teu ditulis ka panyimpenan éksternal.

Hal utama anu kuring hoyong anjeun émut nyaéta anjeun tiasa langsung naroskeun kaayaan tina aplikasi anjeun. Kasempetan anu dipasihkeun ku anjeun henteu tiasa diémutan. Gantina ngonsumsi data ti Kafka sareng nyimpen rékaman dina pangkalan data pikeun aplikasi, anjeun tiasa naroskeun toko kaayaan kalayan hasil anu sami. queries langsung ka toko kaayaan hartina kirang kode (euweuh konsumen) jeung software kirang (teu kudu tabel database pikeun nyimpen hasilna).

Kami parantos nutupan sakedik taneuh dina bab ieu, janten kami bakal ngantunkeun diskusi ngeunaan patarosan interaktif ngalawan toko nagara ayeuna. Tapi tong hariwang: dina Bab 9, urang bakal nyiptakeun aplikasi dasbor saderhana sareng patarosan interaktif. Bakal ngagunakeun sababaraha conto tina ieu sareng bab saméméhna pikeun nunjukkeun patarosan interaktif sareng kumaha anjeun tiasa nambihanana kana aplikasi Kafka Streams.

singgetan

  • Objék KStream ngagambarkeun aliran kajadian, dibandingkeun sareng sisipan kana pangkalan data. Objék KTable ngagambarkeun aliran apdet, langkung sapertos apdet kana database. Ukuran objék KTable teu tumuwuh, rékaman heubeul diganti ku nu anyar.
  • objék KTable diperlukeun pikeun operasi aggregation.
  • Ngagunakeun operasi windowing, anjeun tiasa ngabagi data aggregated kana ember waktos.
  • Hatur nuhun kana objék GlobalKTable, anjeun tiasa ngaksés data rujukan dimana waé dina aplikasi, henteu paduli partisi.
  • Sambungan antara objék KStream, KTable sareng GlobalKTable tiasa waé.

Sajauh ieu, kami parantos fokus kana ngawangun aplikasi Kafka Streams nganggo KStream DSL tingkat luhur. Sanajan pendekatan tingkat luhur ngidinan Anjeun pikeun nyieun program rapih tur singket, ngagunakeun éta ngagambarkeun trade-off. Gawe sareng DSL KStream hartosna ningkatkeun ringkesan kode anjeun ku ngirangan darajat kontrol. Dina bab salajengna, urang bakal ningali API titik panangan tingkat rendah sareng nyobian trade-off anu sanés. Programna bakal langkung panjang tibatan sateuacanna, tapi urang bakal tiasa nyiptakeun ampir sadaya titik panangan anu urang peryogikeun.

→ Rincian langkung seueur ngeunaan buku tiasa dipendakan di ramatloka penerbit

→ Pikeun Habrozhiteli diskon 25% ngagunakeun kupon - Aliran Kafka

→ Saatos mayar kanggo versi kertas buku, buku éléktronik bakal dikirim ku e-mail.

sumber: www.habr.com

Tambahkeun komentar