Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata" Hello, penduduk Khabro! Buku ini sesuai untuk mana-mana pembangun yang ingin memahami pemprosesan benang. Memahami pengaturcaraan teragih akan membantu anda memahami Aliran Kafka dan Kafka dengan lebih baik. Adalah baik untuk mengetahui rangka kerja Kafka itu sendiri, tetapi ini tidak perlu: ​​Saya akan memberitahu anda semua yang anda perlukan. Pembangun Kafka dan orang baru yang berpengalaman akan belajar cara mencipta aplikasi pemprosesan strim yang menarik menggunakan perpustakaan Kafka Streams dalam buku ini. Pembangun Java pertengahan dan lanjutan yang sudah biasa dengan konsep seperti bersiri akan belajar menggunakan kemahiran mereka untuk mencipta aplikasi Kafka Streams. Kod sumber buku ini ditulis dalam Java 8 dan menggunakan sintaks ekspresi lambda Java 8 dengan ketara, jadi mengetahui cara bekerja dengan fungsi lambda (walaupun dalam bahasa pengaturcaraan lain) akan berguna.

Petikan. 5.3. Operasi pengagregatan dan tingkap

Dalam bahagian ini, kami akan meneruskan untuk meneroka bahagian yang paling menjanjikan dalam Aliran Kafka. Setakat ini kami telah membincangkan aspek berikut dari Aliran Kafka:

  • mencipta topologi pemprosesan;
  • menggunakan keadaan dalam aplikasi penstriman;
  • melaksanakan sambungan aliran data;
  • perbezaan antara strim acara (KStream) dan strim kemas kini (KTable).

Dalam contoh berikut kami akan menyatukan semua elemen ini. Anda juga akan belajar tentang windowing, satu lagi ciri hebat aplikasi penstriman. Contoh pertama kami ialah pengagregatan mudah.

5.3.1. Pengagregatan jualan saham mengikut sektor industri

Pengagregatan dan pengelompokan adalah alat penting apabila bekerja dengan data penstriman. Pemeriksaan rekod individu semasa ia diterima selalunya tidak mencukupi. Untuk mengekstrak maklumat tambahan daripada data, adalah perlu untuk mengumpulkan dan menggabungkannya.

Dalam contoh ini, anda akan memakai kostum seorang peniaga harian yang perlu menjejaki volum jualan saham syarikat dalam beberapa industri. Secara khusus, anda berminat dengan lima syarikat yang mempunyai jualan saham terbesar dalam setiap industri.

Pengagregatan sedemikian memerlukan beberapa langkah berikut untuk menterjemah data ke dalam bentuk yang dikehendaki (bercakap dalam istilah umum).

  1. Cipta sumber berasaskan topik yang menerbitkan maklumat dagangan saham mentah. Kita perlu memetakan objek jenis StockTransaction kepada objek jenis ShareVolume. Maksudnya ialah objek StockTransaction mengandungi metadata jualan, tetapi kami hanya memerlukan data tentang bilangan saham yang dijual.
  2. Kumpulkan data ShareVolume mengikut simbol saham. Setelah dikumpulkan mengikut simbol, anda boleh meruntuhkan data ini kepada subjumlah volum jualan saham. Perlu diingat bahawa kaedah KStream.groupBy mengembalikan contoh jenis KGroupedStream. Dan anda boleh mendapatkan contoh KTable dengan terus memanggil kaedah KGroupedStream.reduce.

Apakah antara muka KGroupedStream

Kaedah KStream.groupBy dan KStream.groupByKey mengembalikan tika KGroupedStream. KGroupedStream ialah perwakilan perantaraan aliran acara selepas dikumpulkan mengikut kekunci. Ia sama sekali tidak bertujuan untuk kerja langsung dengannya. Sebaliknya, KGroupedStream digunakan untuk operasi pengagregatan, yang sentiasa menghasilkan KTable. Dan kerana hasil operasi pengagregatan ialah KTable dan mereka menggunakan stor negeri, kemungkinan tidak semua kemas kini sebagai hasilnya dihantar lebih jauh ke bawah saluran paip.

Kaedah KTable.groupBy mengembalikan KGroupedTable yang serupa - perwakilan perantaraan aliran kemas kini, dikumpulkan semula mengikut kunci.

Mari kita berehat sebentar dan lihat Rajah. 5.9, yang menunjukkan apa yang telah kami capai. Topologi ini sepatutnya sudah biasa kepada anda.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Sekarang mari kita lihat kod untuk topologi ini (ia boleh didapati dalam fail src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Penyenaraian 5.2).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Kod yang diberikan dibezakan dengan singkatnya dan jumlah besar tindakan yang dilakukan dalam beberapa baris. Anda mungkin melihat sesuatu yang baharu dalam parameter pertama kaedah builder.stream: nilai jenis enum AutoOffsetReset.EARLIEST (terdapat juga TERKINI), ditetapkan menggunakan kaedah Consumed.withOffsetResetPolicy. Jenis penghitungan ini boleh digunakan untuk menentukan strategi tetapan semula ofset untuk setiap KStream atau KTable dan diutamakan daripada pilihan tetapan semula ofset daripada konfigurasi.

GroupByKey dan GroupBy

Antara muka KStream mempunyai dua kaedah untuk mengumpulkan rekod: GroupByKey dan GroupBy. Kedua-duanya mengembalikan KGroupedTable, jadi anda mungkin tertanya-tanya apakah perbezaan antara mereka dan bila hendak menggunakan yang mana satu?

Kaedah GroupByKey digunakan apabila kunci dalam KStream sudah tidak kosong. Dan yang paling penting, bendera "memerlukan pembahagian semula" tidak pernah ditetapkan.

Kaedah GroupBy menganggap bahawa anda telah menukar kunci kumpulan, jadi bendera partisi semula ditetapkan kepada benar. Melakukan pencantuman, pengagregatan, dsb. selepas kaedah GroupBy akan menghasilkan pembahagian semula automatik.
Ringkasan: Apabila boleh, anda harus menggunakan GroupByKey dan bukannya GroupBy.

Adalah jelas apa yang dilakukan oleh kaedah mapValues ​​dan groupBy, jadi mari kita lihat kaedah sum() (terdapat dalam src/main/java/bbejeck/model/ShareVolume.java) (Penyenaraian 5.3).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Kaedah ShareVolume.sum mengembalikan jumlah larian volum jualan saham dan hasil daripada keseluruhan rantaian pengiraan ialah objek KTable . Kini anda memahami peranan yang dimainkan oleh KTable. Apabila objek ShareVolume tiba, objek KTable yang sepadan menyimpan kemas kini semasa terkini. Adalah penting untuk diingat bahawa semua kemas kini ditunjukkan dalam ShareVolumeKTable sebelumnya, tetapi tidak semuanya dihantar lebih jauh.

Kami kemudiannya menggunakan KTable ini untuk mengagregat (mengikut bilangan saham yang didagangkan) untuk mencapai lima syarikat dengan volum tertinggi saham yang didagangkan dalam setiap industri. Tindakan kami dalam kes ini akan serupa dengan tindakan untuk pengagregatan pertama.

  1. Lakukan operasi groupBy lain untuk mengumpulkan objek ShareVolume individu mengikut industri.
  2. Mula meringkaskan objek ShareVolume. Kali ini objek pengagregatan ialah baris gilir keutamaan saiz tetap. Dalam baris gilir bersaiz tetap ini, hanya lima syarikat dengan jumlah terbesar saham yang dijual dikekalkan.
  3. Petakan baris gilir dari perenggan sebelumnya kepada nilai rentetan dan kembalikan lima saham teratas yang paling banyak didagangkan mengikut nombor mengikut industri.
  4. Tulis keputusan dalam bentuk rentetan kepada topik.

Dalam Rajah. Rajah 5.10 menunjukkan graf topologi aliran data. Seperti yang anda lihat, pusingan kedua pemprosesan agak mudah.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Kini setelah kita mempunyai pemahaman yang jelas tentang struktur pemprosesan pusingan kedua ini, kita boleh beralih kepada kod sumbernya (anda akan menemuinya dalam fail src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Penyenaraian 5.4) .

Pemula ini mengandungi pembolehubah fixedQueue. Ini ialah objek tersuai yang merupakan penyesuai untuk java.util.TreeSet yang digunakan untuk menjejak keputusan N teratas dalam tertib menurun bagi saham yang didagangkan.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Anda telah pun melihat panggilan groupBy dan mapValues, jadi kami tidak akan masuk ke dalam panggilan tersebut (kami memanggil kaedah KTable.toStream kerana kaedah KTable.print tidak digunakan lagi). Tetapi anda belum melihat versi KTable aggregate() lagi, jadi kami akan meluangkan sedikit masa untuk membincangkannya.

Seperti yang anda ingat, perkara yang menjadikan KTable berbeza ialah rekod dengan kunci yang sama dianggap kemas kini. KTable menggantikan entri lama dengan yang baru. Pengagregatan berlaku dengan cara yang sama: rekod terkini dengan kunci yang sama diagregatkan. Apabila rekod tiba, ia ditambahkan pada contoh kelas FixedSizePriorityQueue menggunakan penambah (parameter kedua dalam panggilan kaedah agregat), tetapi jika rekod lain sudah wujud dengan kunci yang sama, maka rekod lama dialih keluar menggunakan penolakan (parameter ketiga dalam panggilan kaedah agregat).

Ini bermakna pengagregat kami, FixedSizePriorityQueue, tidak mengagregatkan semua nilai dengan satu kunci, tetapi menyimpan jumlah bergerak kuantiti N jenis saham yang paling banyak didagangkan. Setiap entri masuk mengandungi jumlah bilangan saham yang dijual setakat ini. KTable akan memberi anda maklumat tentang saham syarikat yang paling banyak didagangkan pada masa ini, tanpa memerlukan pengagregatan bergulir bagi setiap kemas kini.

Kami belajar melakukan dua perkara penting:

  • nilai kumpulan dalam KTable dengan kunci biasa;
  • melaksanakan operasi berguna seperti rollup dan pengagregatan pada nilai berkumpulan ini.

Mengetahui cara melaksanakan operasi ini adalah penting untuk memahami maksud data yang bergerak melalui aplikasi Kafka Streams dan memahami maklumat yang dibawanya.

Kami juga telah mengumpulkan beberapa konsep utama yang dibincangkan sebelum ini dalam buku ini. Dalam Bab 4, kami bercakap tentang betapa pentingnya toleransi kesalahan, negeri tempatan untuk aplikasi penstriman. Contoh pertama dalam bab ini menunjukkan mengapa negeri setempat sangat penting—ia memberi anda keupayaan untuk menjejaki maklumat yang telah anda lihat. Akses setempat mengelakkan kelewatan rangkaian, menjadikan aplikasi lebih berprestasi dan tahan ralat.

Apabila melakukan sebarang operasi rollup atau pengagregatan, anda mesti menyatakan nama kedai negeri. Operasi penggulungan dan pengagregatan mengembalikan tika KTable, dan KTable menggunakan storan keadaan untuk menggantikan hasil lama dengan yang baharu. Seperti yang anda lihat, tidak semua kemas kini dihantar ke saluran paip, dan ini penting kerana operasi pengagregatan direka bentuk untuk menghasilkan maklumat ringkasan. Jika anda tidak menggunakan keadaan setempat, KTable akan memajukan semua hasil pengagregatan dan gulungan.

Seterusnya, kita akan melihat pada melaksanakan operasi seperti pengagregatan dalam tempoh masa tertentu - yang dipanggil operasi tetingkap.

5.3.2. Operasi tingkap

Dalam bahagian sebelumnya, kami memperkenalkan lilitan gelongsor dan pengagregatan. Aplikasi ini melakukan penggulungan berterusan volum jualan saham, diikuti dengan pengagregatan lima saham yang paling banyak didagangkan di bursa.

Kadangkala pengagregatan berterusan dan penggulungan keputusan sedemikian diperlukan. Dan kadangkala anda perlu melakukan operasi hanya dalam tempoh masa tertentu. Sebagai contoh, hitung bilangan urus niaga pertukaran yang dibuat dengan saham syarikat tertentu dalam 10 minit terakhir. Atau berapa ramai pengguna mengklik sepanduk pengiklanan baharu dalam 15 minit yang lalu. Aplikasi boleh melakukan operasi sedemikian beberapa kali, tetapi dengan hasil yang digunakan hanya untuk tempoh masa tertentu (tetingkap masa).

Mengira transaksi pertukaran oleh pembeli

Dalam contoh seterusnya, kami akan menjejaki urus niaga saham merentas berbilang pedagang—sama ada organisasi besar atau pembiaya individu pintar.

Terdapat dua sebab yang mungkin untuk penjejakan ini. Salah satunya ialah keperluan untuk mengetahui apa yang pemimpin pasaran beli/jual. Jika pemain besar dan pelabur canggih ini melihat peluang, masuk akal untuk mengikuti strategi mereka. Sebab kedua ialah keinginan untuk melihat sebarang kemungkinan tanda perdagangan orang dalam haram. Untuk melakukan ini, anda perlu menganalisis korelasi lonjakan jualan besar dengan siaran akhbar yang penting.

Penjejakan sedemikian terdiri daripada langkah-langkah berikut:

  • mencipta aliran untuk membaca daripada topik urus niaga saham;
  • mengumpulkan rekod masuk mengikut ID pembeli dan simbol saham. Memanggil kaedah groupBy mengembalikan contoh kelas KGroupedStream;
  • Kaedah KGroupedStream.windowedBy mengembalikan aliran data terhad kepada tetingkap masa, yang membenarkan pengagregatan bertingkap. Bergantung pada jenis tetingkap, sama ada TimeWindowedKStream atau SessionWindowedKStream dikembalikan;
  • kiraan transaksi untuk operasi pengagregatan. Aliran data bertingkap menentukan sama ada rekod tertentu diambil kira dalam kiraan ini;
  • menulis hasil ke topik atau mengeluarkannya ke konsol semasa pembangunan.

Topologi aplikasi ini mudah, tetapi gambaran yang jelas mengenainya akan membantu. Mari kita lihat Rajah. 5.11.

Seterusnya, kita akan melihat kefungsian operasi tetingkap dan kod yang sepadan.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"

Jenis tetingkap

Terdapat tiga jenis tetingkap dalam Kafka Streams:

  • sesi;
  • "terguling";
  • gelongsor/melompat.

Mana satu untuk dipilih bergantung pada keperluan perniagaan anda. Tetingkap jatuh dan melompat adalah terhad masa, manakala tetingkap sesi dihadkan oleh aktiviti pengguna—tempoh sesi ditentukan semata-mata oleh sejauh mana aktif pengguna. Perkara utama yang perlu diingat ialah semua jenis tetingkap adalah berdasarkan pada cap tarikh/masa entri, bukan masa sistem.

Seterusnya, kami melaksanakan topologi kami dengan setiap jenis tetingkap. Kod lengkap akan diberikan hanya dalam contoh pertama; untuk jenis tingkap lain tiada apa yang akan berubah kecuali jenis operasi tetingkap.

Tingkap sesi

Tetingkap sesi sangat berbeza daripada semua jenis tetingkap lain. Ia tidak terhad mengikut masa tetapi oleh aktiviti pengguna (atau aktiviti entiti yang anda ingin jejaki). Tetingkap sesi dihadkan oleh tempoh tidak aktif.

Rajah 5.12 menggambarkan konsep tetingkap sesi. Sesi yang lebih kecil akan bergabung dengan sesi di sebelah kirinya. Dan sesi di sebelah kanan akan berasingan kerana ia mengikuti tempoh tidak aktif yang lama. Tetingkap sesi adalah berdasarkan aktiviti pengguna, tetapi gunakan cop tarikh/masa daripada entri untuk menentukan sesi mana entri itu tergolong.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"

Menggunakan tetingkap sesi untuk menjejak urus niaga saham

Mari gunakan tetingkap sesi untuk menangkap maklumat tentang urus niaga pertukaran. Pelaksanaan tetingkap sesi ditunjukkan dalam Penyenaraian 5.5 (yang boleh didapati dalam src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Anda telah melihat kebanyakan operasi dalam topologi ini, jadi anda tidak perlu melihatnya lagi di sini. Tetapi terdapat juga beberapa elemen baru di sini, yang kini kita akan bincangkan.

Mana-mana operasi groupBy lazimnya melakukan beberapa jenis operasi pengagregatan (pengagregatan, penggulungan atau pengiraan). Anda boleh melakukan sama ada pengagregatan terkumpul dengan jumlah yang sedang berjalan, atau pengagregatan tetingkap, yang mengambil kira rekod dalam tetingkap masa yang ditentukan.

Kod dalam Penyenaraian 5.5 mengira bilangan transaksi dalam tetingkap sesi. Dalam Rajah. 5.13 tindakan ini dianalisis langkah demi langkah.

Dengan memanggil windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) kami mencipta tetingkap sesi dengan selang tidak aktif selama 20 saat dan selang kegigihan selama 15 minit. Selang melahu selama 20 saat bermakna bahawa aplikasi akan memasukkan sebarang entri yang tiba dalam masa 20 saat dari akhir atau permulaan sesi semasa ke dalam sesi semasa (aktif).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Seterusnya, kami menentukan operasi pengagregatan yang perlu dilakukan dalam tetingkap sesi - dalam kes ini, kira. Jika entri masuk jatuh di luar tetingkap tidak aktif (sama ada sisi cap tarikh/masa), aplikasi mencipta sesi baharu. Selang pengekalan bermaksud mengekalkan sesi untuk jangka masa tertentu dan membenarkan data lewat yang melangkaui tempoh tidak aktif sesi tetapi masih boleh dilampirkan. Selain itu, permulaan dan penghujung sesi baharu yang terhasil daripada gabungan sepadan dengan cop tarikh/masa yang paling awal dan terkini.

Mari lihat beberapa entri daripada kaedah kiraan untuk melihat cara sesi berfungsi (Jadual 5.1).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Apabila rekod tiba, kami mencari sesi sedia ada dengan kunci yang sama, masa tamat kurang daripada cap tarikh/masa semasa - selang tidak aktif dan masa mula lebih besar daripada cap tarikh/masa semasa + selang tidak aktif. Dengan mengambil kira ini, empat entri daripada jadual. 5.1 digabungkan menjadi satu sesi seperti berikut.

1. Rekod 1 tiba dahulu, jadi masa mula adalah sama dengan masa tamat dan ialah 00:00:00.

2. Seterusnya, entri 2 tiba, dan kami mencari sesi yang berakhir tidak lebih awal daripada 23:59:55 dan bermula selewat-lewatnya 00:00:35. Kami mencari rekod 1 dan menggabungkan sesi 1 dan 2. Kami mengambil masa mula sesi 1 (lebih awal) dan masa tamat sesi 2 (kemudian), supaya sesi baharu kami bermula pada 00:00:00 dan berakhir pada 00: 00:15.

3. Rekod 3 tiba, kami mencari sesi antara 00:00:30 dan 00:01:10 dan tidak menemui apa-apa. Tambahkan sesi kedua untuk kunci 123-345-654,FFBE, bermula dan berakhir pada 00:00:50.

4. Rekod 4 tiba dan kami sedang mencari sesi antara 23:59:45 dan 00:00:25. Kali ini kedua-dua sesi 1 dan 2 ditemui. Ketiga-tiga sesi digabungkan menjadi satu, dengan masa mula 00:00:00 dan masa tamat 00:00:15.

Daripada apa yang diterangkan dalam bahagian ini, perlu diingati nuansa penting berikut:

  • sesi bukan tetingkap bersaiz tetap. Tempoh sesi ditentukan oleh aktiviti dalam tempoh masa tertentu;
  • Setem tarikh/masa dalam data menentukan sama ada acara itu berada dalam sesi sedia ada atau semasa tempoh terbiar.

Seterusnya kita akan membincangkan jenis tetingkap seterusnya - tetingkap "berjatuhan".

Tingkap "bergulung".

Tetingkap runtuh menangkap peristiwa yang berlaku dalam tempoh masa tertentu. Bayangkan anda perlu menangkap semua urus niaga saham syarikat tertentu setiap 20 saat, jadi anda mengumpul semua peristiwa dalam tempoh masa tersebut. Pada penghujung selang 20 saat, tetingkap berguling dan beralih ke selang pemerhatian 20 saat yang baharu. Rajah 5.14 menggambarkan keadaan ini.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Seperti yang anda lihat, semua acara yang diterima dalam 20 saat terakhir disertakan dalam tetingkap. Pada penghujung tempoh masa ini, tetingkap baharu dibuat.

Penyenaraian 5.6 menunjukkan kod yang menunjukkan penggunaan tingkap jatuh untuk menangkap urus niaga saham setiap 20 saat (terdapat dalam src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Dengan perubahan kecil ini kepada panggilan kaedah TimeWindows.of ini, anda boleh menggunakan tetingkap jatuh. Contoh ini tidak memanggil kaedah until(), jadi selang pengekalan lalai 24 jam akan digunakan.

Akhirnya, tiba masanya untuk beralih ke pilihan tetingkap terakhir - tetingkap "melompat".

Tingkap gelongsor ("melompat")

Tingkap gelongsor/melompat adalah serupa dengan tingkap jatuh, tetapi dengan sedikit perbezaan. Tetingkap gelongsor jangan tunggu sehingga penghujung selang masa sebelum mencipta tetingkap baharu untuk memproses peristiwa terbaharu. Mereka memulakan pengiraan baharu selepas tempoh menunggu kurang daripada tempoh tetingkap.

Untuk menggambarkan perbezaan antara tingkap jatuh dan melompat, mari kita kembali kepada contoh mengira urus niaga bursa saham. Matlamat kami masih untuk mengira bilangan transaksi, tetapi kami tidak mahu menunggu keseluruhan masa sebelum mengemas kini kaunter. Sebaliknya, kami akan mengemas kini kaunter pada selang masa yang lebih singkat. Sebagai contoh, kami masih akan mengira bilangan transaksi setiap 20 saat, tetapi mengemas kini kaunter setiap 5 saat, seperti yang ditunjukkan dalam Rajah. 5.15. Dalam kes ini, kami mempunyai tiga tetingkap hasil dengan data bertindih.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Penyenaraian 5.7 menunjukkan kod untuk menentukan tetingkap gelongsor (terdapat dalam src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Tetingkap jatuh boleh ditukar kepada tetingkap melompat dengan menambahkan panggilan pada kaedah advanceBy(). Dalam contoh yang ditunjukkan, selang simpanan ialah 15 minit.

Anda melihat dalam bahagian ini cara mengehadkan hasil pengagregatan kepada tetingkap masa. Khususnya, saya ingin anda mengingati tiga perkara berikut dari bahagian ini:

  • saiz tetingkap sesi dihadkan bukan oleh tempoh masa, tetapi oleh aktiviti pengguna;
  • tingkap "berguling" memberikan gambaran keseluruhan peristiwa dalam tempoh masa tertentu;
  • Tempoh tetingkap melompat adalah tetap, tetapi ia dikemas kini dengan kerap dan mungkin mengandungi entri bertindih dalam semua tetingkap.

Seterusnya, kita akan belajar cara menukar KTable kembali kepada KStream untuk sambungan.

5.3.3. Menyambung objek KStream dan KTable

Dalam Bab 4, kami membincangkan menyambungkan dua objek KStream. Sekarang kita perlu belajar cara menyambungkan KTable dan KStream. Ini mungkin diperlukan atas sebab mudah berikut. KStream ialah strim rekod, dan KTable ialah strim kemas kini rekod, tetapi kadangkala anda mungkin mahu menambah konteks tambahan pada strim rekod menggunakan kemas kini daripada KTable.

Mari kita ambil data tentang bilangan urus niaga bursa saham dan gabungkannya dengan berita bursa saham untuk industri yang berkaitan. Inilah yang anda perlu lakukan untuk mencapai ini memandangkan kod yang anda sudah ada.

  1. Tukar objek KTable dengan data tentang bilangan urus niaga saham kepada KStream, diikuti dengan menggantikan kunci dengan kunci yang menunjukkan sektor industri yang sepadan dengan simbol saham ini.
  2. Cipta objek KTable yang membaca data daripada topik dengan berita bursa saham. KTable baharu ini akan dikategorikan mengikut sektor industri.
  3. Sambungkan kemas kini berita dengan maklumat tentang bilangan urus niaga bursa saham mengikut sektor industri.

Sekarang mari kita lihat bagaimana untuk melaksanakan pelan tindakan ini.

Tukar KTable kepada KStream

Untuk menukar KTable kepada KStream anda perlu melakukan perkara berikut.

  1. Panggil kaedah KTable.toStream().
  2. Dengan memanggil kaedah KStream.map, gantikan kunci dengan nama industri, dan kemudian dapatkan semula objek TransactionSummary daripada contoh Windowed.

Kami akan merantai operasi ini bersama-sama seperti berikut (kod boleh didapati dalam fail src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Penyenaraian 5.8).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Oleh kerana kami menjalankan operasi KStream.map, tika KStream yang dikembalikan akan dipartisi semula secara automatik apabila ia digunakan dalam sambungan.

Kami telah menyelesaikan proses penukaran, seterusnya kami perlu mencipta objek KTable untuk membaca berita saham.

Penciptaan KTable untuk berita saham

Nasib baik, mencipta objek KTable hanya memerlukan satu baris kod (kod itu boleh didapati dalam src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Penyenaraian 5.9).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Perlu diingat bahawa tiada objek Serde diperlukan untuk dinyatakan, kerana rentetan Serdes digunakan dalam tetapan. Selain itu, dengan menggunakan penghitungan TERAWAL, jadual diisi dengan rekod pada awal-awal lagi.

Sekarang kita boleh beralih ke langkah terakhir - sambungan.

Menghubungkan kemas kini berita dengan data kiraan transaksi

Membuat sambungan tidak sukar. Kami akan menggunakan gabungan kiri sekiranya tiada berita saham untuk industri yang berkaitan (kod yang diperlukan boleh didapati dalam fail src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Penyenaraian 5.10).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Operator leftJoin ini agak mudah. Tidak seperti cantuman dalam Bab 4, kaedah JoinWindow tidak digunakan kerana semasa melakukan cantuman KStream-KTable, terdapat hanya satu entri dalam KTable untuk setiap kunci. Sambungan sedemikian tidak terhad dalam masa: rekod sama ada dalam KTable atau tiada. Kesimpulan utama: menggunakan objek KTable anda boleh memperkayakan KStream dengan data rujukan yang kurang kerap dikemas kini.

Sekarang kita akan melihat cara yang lebih cekap untuk memperkayakan acara daripada KStream.

5.3.4. Objek GlobalKTable

Seperti yang anda lihat, terdapat keperluan untuk memperkaya strim acara atau menambah konteks padanya. Dalam Bab 4 anda melihat sambungan antara dua objek KStream dan dalam bahagian sebelumnya anda melihat sambungan antara KStream dan KTable. Dalam semua kes ini, adalah perlu untuk membahagikan semula aliran data apabila memetakan kunci kepada jenis atau nilai baharu. Kadangkala pembahagian semula dilakukan secara eksplisit, dan kadangkala Kafka Streams melakukannya secara automatik. Pembahagian semula adalah perlu kerana kekunci telah berubah dan rekod mesti berakhir di bahagian baharu, jika tidak sambungan akan menjadi mustahil (ini telah dibincangkan dalam Bab 4, dalam bahagian "Data pembahagian semula" dalam subseksyen 4.2.4).

Pembahagian semula mempunyai kos

Pembahagian semula memerlukan kos - kos sumber tambahan untuk mencipta topik perantaraan, menyimpan data pendua dalam topik lain; ia juga bermakna peningkatan kependaman kerana menulis dan membaca daripada topik ini. Selain itu, jika anda perlu menyertai lebih daripada satu aspek atau dimensi, anda mesti merantai cantuman, memetakan rekod dengan kunci baharu dan menjalankan proses pembahagian semula sekali lagi.

Menyambung kepada set data yang lebih kecil

Dalam sesetengah kes, volum data rujukan yang hendak disambungkan adalah agak kecil, jadi salinan lengkapnya boleh dimuatkan secara setempat dengan mudah pada setiap nod. Untuk situasi seperti ini, Kafka Streams menyediakan kelas GlobalKTable.

Contoh GlobalKTable adalah unik kerana aplikasi mereplikasi semua data kepada setiap nod. Dan oleh kerana semua data terdapat pada setiap nod, tidak perlu membahagikan strim acara dengan kunci data rujukan supaya ia tersedia untuk semua partition. Anda juga boleh membuat sambung tanpa kunci menggunakan objek GlobalKTable. Mari kita kembali ke salah satu contoh sebelumnya untuk menunjukkan ciri ini.

Menyambungkan objek KStream ke objek GlobalKTable

Dalam subseksyen 5.3.2, kami melakukan pengagregatan tetingkap bagi transaksi pertukaran oleh pembeli. Keputusan pengagregatan ini kelihatan seperti ini:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Walaupun keputusan ini memenuhi tujuan, ia akan menjadi lebih berguna jika nama pelanggan dan nama penuh syarikat turut dipaparkan. Untuk menambah nama pelanggan dan nama syarikat, anda boleh melakukan gabungan biasa, tetapi anda perlu melakukan dua pemetaan utama dan pembahagian semula. Dengan GlobalKTable anda boleh mengelakkan kos operasi sedemikian.

Untuk melakukan ini, kami akan menggunakan objek countStream daripada Penyenaraian 5.11 (kod yang sepadan boleh didapati dalam src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) dan menyambungkannya kepada dua objek GlobalKTable.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Kami telah membincangkan perkara ini sebelum ini, jadi saya tidak akan mengulanginya. Tetapi saya perhatikan bahawa kod dalam fungsi toStream().map diabstraksikan ke dalam objek fungsi dan bukannya ungkapan lambda sebaris demi kebolehbacaan.

Langkah seterusnya ialah mengisytiharkan dua contoh GlobalKTable (kod yang ditunjukkan boleh didapati dalam fail src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Penyenaraian 5.12).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"

Sila ambil perhatian bahawa nama topik diterangkan menggunakan jenis terhitung.

Sekarang kita telah menyediakan semua komponen, yang tinggal hanyalah menulis kod untuk sambungan (yang boleh didapati dalam fail src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Penyenaraian 5.13).

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Walaupun terdapat dua cantuman dalam kod ini, ia dirantai kerana kedua-dua keputusannya tidak digunakan secara berasingan. Hasilnya dipaparkan pada penghujung keseluruhan operasi.

Apabila anda menjalankan operasi gabungan di atas, anda akan mendapat hasil seperti ini:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Intipatinya tidak berubah, tetapi keputusan ini kelihatan lebih jelas.

Jika anda mengira detik ke Bab 4, anda telah melihat beberapa jenis sambungan dalam tindakan. Mereka disenaraikan dalam jadual. 5.2. Jadual ini menggambarkan keupayaan ketersambungan pada versi 1.0.0 Kafka Streams; Sesuatu mungkin berubah dalam keluaran akan datang.

Buku “Kafka Streams in Action. Aplikasi dan perkhidmatan mikro untuk kerja masa nyata"
Untuk menyelesaikan masalah, mari kita imbas semula asas: anda boleh menyambungkan strim acara (KStream) dan mengemas kini strim (KTable) menggunakan keadaan setempat. Sebagai alternatif, jika saiz data rujukan tidak terlalu besar, anda boleh menggunakan objek GlobalKTable. GlobalKTables mereplikasi semua partition pada setiap nod aplikasi Kafka Streams, memastikan semua data tersedia tanpa mengira partition mana yang sepadan dengan kunci.

Seterusnya kita akan melihat ciri Aliran Kafka, yang mana kita boleh melihat perubahan keadaan tanpa menggunakan data daripada topik Kafka.

5.3.5. Keadaan boleh ditanya

Kami telah melakukan beberapa operasi yang melibatkan keadaan dan sentiasa mengeluarkan hasilnya ke konsol (untuk tujuan pembangunan) atau menulisnya ke topik (untuk tujuan pengeluaran). Apabila menulis hasil pada topik, anda perlu menggunakan pengguna Kafka untuk melihatnya.

Membaca data daripada topik ini boleh dianggap sebagai satu jenis pandangan yang terwujud. Untuk tujuan kami, kami boleh menggunakan definisi pandangan terwujud daripada Wikipedia: “...objek pangkalan data fizikal yang mengandungi hasil pertanyaan. Contohnya, ia boleh menjadi salinan tempatan data jauh, atau subset baris dan/atau lajur jadual atau hasil gabungan, atau jadual ringkasan yang diperoleh melalui pengagregatan" (https://en.wikipedia.org/wiki /Materialized_view).

Kafka Streams juga membenarkan anda menjalankan pertanyaan interaktif di kedai negeri, membolehkan anda membaca terus pandangan yang terwujud ini. Adalah penting untuk ambil perhatian bahawa pertanyaan kepada stor negeri ialah operasi baca sahaja. Ini memastikan anda tidak perlu risau tentang membuat keadaan tidak konsisten secara tidak sengaja semasa aplikasi anda sedang memproses data.

Keupayaan untuk menanyakan stor negeri secara langsung adalah penting. Ini bermakna anda boleh membuat aplikasi papan pemuka tanpa perlu mengambil data daripada pengguna Kafka terlebih dahulu. Ia juga meningkatkan kecekapan aplikasi, kerana fakta bahawa tidak perlu menulis data lagi:

  • terima kasih kepada lokaliti data, ia boleh diakses dengan cepat;
  • pertindihan data dihapuskan, kerana ia tidak ditulis ke storan luaran.

Perkara utama yang saya mahu anda ingat ialah anda boleh terus bertanya keadaan dari dalam aplikasi anda. Peluang yang diberikan kepada anda tidak boleh dilebih-lebihkan. Daripada menggunakan data daripada Kafka dan menyimpan rekod dalam pangkalan data untuk aplikasi, anda boleh menanyakan stor keadaan dengan hasil yang sama. Pertanyaan terus ke kedai nyatakan bermakna kurang kod (tiada pengguna) dan kurang perisian (tidak perlu jadual pangkalan data untuk menyimpan hasilnya).

Kami telah membincangkan sedikit perkara dalam bab ini, jadi kami akan meninggalkan perbincangan kami tentang pertanyaan interaktif terhadap kedai negeri buat masa ini. Tetapi jangan risau: dalam Bab 9, kami akan mencipta aplikasi papan pemuka mudah dengan pertanyaan interaktif. Ia akan menggunakan beberapa contoh daripada bab ini dan sebelumnya untuk menunjukkan pertanyaan interaktif dan cara anda boleh menambahkannya pada aplikasi Kafka Streams.

Ringkasan

  • Objek KStream mewakili aliran peristiwa, setanding dengan sisipan ke dalam pangkalan data. Objek KTable mewakili aliran kemas kini, lebih seperti kemas kini kepada pangkalan data. Saiz objek KTable tidak berkembang, rekod lama digantikan dengan yang baru.
  • Objek KTable diperlukan untuk operasi pengagregatan.
  • Menggunakan operasi windowing, anda boleh membahagikan data agregat kepada baldi masa.
  • Terima kasih kepada objek GlobalKTable, anda boleh mengakses data rujukan di mana-mana dalam aplikasi, tanpa mengira pembahagian.
  • Sambungan antara objek KStream, KTable dan GlobalKTable adalah mungkin.

Setakat ini, kami telah menumpukan pada membina aplikasi Kafka Streams menggunakan KStream DSL peringkat tinggi. Walaupun pendekatan peringkat tinggi membolehkan anda membuat program yang kemas dan ringkas, menggunakannya mewakili pertukaran. Bekerja dengan DSL KStream bermakna meningkatkan kepekatan kod anda dengan mengurangkan tahap kawalan. Dalam bab seterusnya, kita akan melihat API nod pengendali peringkat rendah dan mencuba pertukaran lain. Program ini akan menjadi lebih lama daripada sebelumnya, tetapi kami akan dapat mencipta hampir mana-mana nod pengendali yang mungkin kami perlukan.

→ Butiran lanjut tentang buku boleh didapati di laman web penerbit

→ Untuk Habrozhiteli diskaun 25% menggunakan kupon - Aliran Kafka

→ Selepas pembayaran untuk versi kertas buku, buku elektronik akan dihantar melalui e-mel.

Sumber: www.habr.com

Tambah komen