Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time" Halo warga Khabro! Buku ini cocok untuk pengembang mana pun yang ingin memahami pemrosesan thread. Memahami pemrograman terdistribusi akan membantu Anda lebih memahami Kafka dan Kafka Streams. Akan menyenangkan untuk mengetahui kerangka Kafka itu sendiri, tetapi ini tidak perlu: Saya akan memberi tahu Anda semua yang Anda butuhkan. Pengembang Kafka yang berpengalaman dan pemula akan mempelajari cara membuat aplikasi pemrosesan aliran yang menarik menggunakan perpustakaan Kafka Streams dalam buku ini. Pengembang Java tingkat menengah dan lanjutan yang sudah memahami konsep seperti serialisasi akan belajar menerapkan keterampilan mereka untuk membuat aplikasi Kafka Streams. Kode sumber buku ini ditulis dalam Java 8 dan banyak menggunakan sintaks ekspresi lambda Java 8, jadi mengetahui cara bekerja dengan fungsi lambda (bahkan dalam bahasa pemrograman lain) akan berguna.

Kutipan. 5.3. Operasi agregasi dan windowing

Di bagian ini, kita akan melanjutkan menjelajahi bagian Kafka Streams yang paling menjanjikan. Sejauh ini kami telah membahas aspek Kafka Streams berikut:

  • membuat topologi pemrosesan;
  • menggunakan status dalam aplikasi streaming;
  • melakukan koneksi aliran data;
  • perbedaan antara aliran peristiwa (KStream) dan aliran pembaruan (KTable).

Dalam contoh berikut kami akan menyatukan semua elemen ini. Anda juga akan belajar tentang windowing, fitur hebat lainnya pada aplikasi streaming. Contoh pertama kita adalah agregasi sederhana.

5.3.1. Agregasi penjualan saham berdasarkan sektor industri

Agregasi dan pengelompokan adalah alat penting saat bekerja dengan data streaming. Pemeriksaan terhadap catatan individual pada saat diterima seringkali tidak cukup. Untuk mengekstrak informasi tambahan dari data, perlu untuk mengelompokkan dan menggabungkannya.

Dalam contoh ini, Anda akan mengenakan kostum seorang day trader yang perlu melacak volume penjualan saham perusahaan di beberapa industri. Secara khusus, Anda tertarik pada lima perusahaan dengan penjualan saham terbesar di setiap industri.

Agregasi tersebut memerlukan beberapa langkah berikut untuk menerjemahkan data ke dalam bentuk yang diinginkan (secara umum).

  1. Buat sumber berbasis topik yang menerbitkan informasi perdagangan saham mentah. Kita harus memetakan objek bertipe StockTransaction ke objek bertipe ShareVolume. Intinya objek StockTransaction berisi metadata penjualan, namun kita hanya membutuhkan data jumlah saham yang terjual.
  2. Kelompokkan data ShareVolume berdasarkan simbol saham. Setelah dikelompokkan berdasarkan simbol, Anda dapat mengelompokkan data ini menjadi subtotal volume penjualan saham. Perlu diperhatikan bahwa metode KStream.groupBy mengembalikan instance tipe KGroupedStream. Dan Anda bisa mendapatkan instance KTable dengan memanggil metode KGroupedStream.reduce lebih lanjut.

Apa itu antarmuka KGroupedStream

Metode KStream.groupBy dan KStream.groupByKey mengembalikan instance KGroupedStream. KGroupedStream adalah representasi perantara dari aliran peristiwa setelah dikelompokkan berdasarkan kunci. Itu sama sekali tidak dimaksudkan untuk bekerja langsung dengannya. Sebaliknya, KGroupedStream digunakan untuk operasi agregasi, yang selalu menghasilkan KTable. Dan karena hasil dari operasi agregasi adalah KTable dan mereka menggunakan penyimpanan negara, ada kemungkinan bahwa tidak semua pembaruan sebagai hasilnya dikirim lebih jauh.

Metode KTable.groupBy mengembalikan KGroupedTable serupa - representasi perantara dari aliran pembaruan, yang dikelompokkan kembali berdasarkan kunci.

Mari kita istirahat sejenak dan melihat Gambar. 5.9, yang menunjukkan apa yang telah kami capai. Topologi ini pastinya sudah tidak asing lagi bagi Anda.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Sekarang mari kita lihat kode untuk topologi ini (dapat ditemukan di file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Kode yang diberikan dibedakan berdasarkan singkatnya dan banyaknya tindakan yang dilakukan dalam beberapa baris. Anda mungkin melihat sesuatu yang baru di parameter pertama metode build.stream: nilai tipe enum AutoOffsetReset.EARLIEST (ada juga yang TERBARU), yang disetel menggunakan metode Consumed.withOffsetResetPolicy. Jenis enumerasi ini dapat digunakan untuk menentukan strategi penyetelan ulang offset untuk setiap KStream atau KTable dan lebih diutamakan daripada opsi penyetelan ulang offset dari konfigurasi.

GroupByKey dan GroupBy

Antarmuka KStream memiliki dua metode untuk mengelompokkan rekaman: GroupByKey dan GroupBy. Keduanya mengembalikan KGroupedTable, jadi Anda mungkin bertanya-tanya apa perbedaan antara keduanya dan kapan menggunakan yang mana?

Metode GroupByKey digunakan ketika kunci di KStream sudah tidak kosong. Dan yang paling penting, tanda “memerlukan partisi ulang” tidak pernah disetel.

Metode GroupBy mengasumsikan bahwa Anda telah mengubah kunci pengelompokan, sehingga tanda partisi ulang disetel ke true. Melakukan penggabungan, agregasi, dll. setelah metode GroupBy akan menghasilkan partisi ulang otomatis.
Ringkasan: Bila memungkinkan, Anda harus menggunakan GroupByKey daripada GroupBy.

Sudah jelas apa yang dilakukan metode mapValues ​​​​dan groupBy, jadi mari kita lihat metode sum() (ditemukan di src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Metode ShareVolume.sum mengembalikan total volume penjualan stok yang berjalan, dan hasil dari seluruh rantai perhitungan adalah objek KTable . Sekarang Anda memahami peran yang dimainkan KTable. Ketika objek ShareVolume tiba, objek KTable terkait menyimpan pembaruan terkini. Penting untuk diingat bahwa semua pembaruan tercermin dalam shareVolumeKTable sebelumnya, tetapi tidak semua dikirim lebih lanjut.

Kami kemudian menggunakan KTable ini untuk mengagregasi (berdasarkan jumlah saham yang diperdagangkan) untuk mendapatkan lima perusahaan dengan volume saham tertinggi yang diperdagangkan di setiap industri. Tindakan kami dalam kasus ini akan serupa dengan tindakan agregasi pertama.

  1. Lakukan operasi groupBy lainnya untuk mengelompokkan objek ShareVolume individual berdasarkan industri.
  2. Mulai meringkas objek ShareVolume. Kali ini objek agregasi adalah antrian prioritas berukuran tetap. Dalam antrian ukuran tetap ini, hanya lima perusahaan dengan jumlah penjualan saham terbesar yang dipertahankan.
  3. Petakan antrean dari paragraf sebelumnya ke nilai string dan kembalikan lima saham teratas yang paling banyak diperdagangkan berdasarkan nomor berdasarkan industri.
  4. Tulis hasilnya dalam bentuk string sesuai topik.

Pada Gambar. Gambar 5.10 menunjukkan grafik topologi aliran data. Seperti yang Anda lihat, pemrosesan putaran kedua cukup sederhana.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Sekarang kita memiliki pemahaman yang jelas tentang struktur pemrosesan putaran kedua ini, kita dapat beralih ke kode sumbernya (Anda akan menemukannya di file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4) .

Penginisialisasi ini berisi variabel fixedQueue. Ini adalah objek khusus yang merupakan adaptor untuk java.util.TreeSet yang digunakan untuk melacak hasil N teratas dalam urutan menurun dari saham yang diperdagangkan.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Anda telah melihat panggilan groupBy dan mapValues ​​​​jadi kami tidak akan membahasnya (kami memanggil metode KTable.toStream karena metode KTable.print sudah tidak digunakan lagi). Namun Anda belum melihat agregat() versi KTable, jadi kami akan meluangkan sedikit waktu untuk membahasnya.

Seperti yang Anda ingat, apa yang membuat KTable berbeda adalah bahwa catatan dengan kunci yang sama dianggap sebagai pembaruan. KTable mengganti entri lama dengan yang baru. Agregasi terjadi dengan cara yang sama: catatan terbaru dengan kunci yang sama digabungkan. Ketika sebuah record tiba, record tersebut ditambahkan ke instance kelas FixedSizePriorityQueue menggunakan adder (parameter kedua dalam pemanggilan metode agregat), tetapi jika record lain sudah ada dengan kunci yang sama, maka record lama dihapus menggunakan pengurang (parameter ketiga dalam pemanggilan metode agregat).

Ini semua berarti bahwa agregator kami, FixedSizePriorityQueue, tidak menggabungkan semua nilai dengan satu kunci, namun menyimpan jumlah bergerak dari jumlah N jenis saham yang paling banyak diperdagangkan. Setiap entri yang masuk berisi jumlah total saham yang terjual sejauh ini. KTable akan memberi Anda informasi tentang saham perusahaan mana yang paling banyak diperdagangkan saat ini, tanpa memerlukan agregasi bergulir pada setiap pembaruan.

Kami belajar melakukan dua hal penting:

  • mengelompokkan nilai di KTable dengan kunci umum;
  • melakukan operasi yang berguna seperti rollup dan agregasi pada nilai-nilai yang dikelompokkan ini.

Mengetahui cara melakukan operasi ini penting untuk memahami arti data yang dipindahkan melalui aplikasi Kafka Streams dan memahami informasi apa yang dibawanya.

Kami juga telah mengumpulkan beberapa konsep utama yang dibahas sebelumnya dalam buku ini. Di Bab 4, kita membahas betapa pentingnya toleransi terhadap kesalahan, keadaan lokal untuk aplikasi streaming. Contoh pertama dalam bab ini menunjukkan mengapa negara bagian setempat sangat penting—ini memberi Anda kemampuan untuk melacak informasi apa yang telah Anda lihat. Akses lokal menghindari penundaan jaringan, menjadikan aplikasi lebih berkinerja dan tahan kesalahan.

Saat melakukan operasi rollup atau agregasi, Anda harus menentukan nama penyimpanan negara. Operasi rollup dan agregasi mengembalikan instans KTable, dan KTable menggunakan penyimpanan status untuk menggantikan hasil lama dengan hasil baru. Seperti yang Anda lihat, tidak semua pembaruan dikirimkan, dan ini penting karena operasi agregasi dirancang untuk menghasilkan informasi ringkasan. Jika Anda tidak menerapkan negara bagian lokal, KTable akan meneruskan semua hasil agregasi dan rollup.

Selanjutnya, kita akan melihat cara melakukan operasi seperti agregasi dalam periode waktu tertentu - yang disebut operasi windowing.

5.3.2. Operasi jendela

Di bagian sebelumnya, kami memperkenalkan konvolusi geser dan agregasi. Aplikasi ini melakukan roll-up penjualan saham secara terus menerus diikuti dengan agregasi dari lima saham yang paling banyak diperdagangkan di bursa.

Kadang-kadang diperlukan pengumpulan dan penggabungan hasil secara terus-menerus. Dan terkadang Anda perlu melakukan operasi hanya untuk jangka waktu tertentu. Misalnya menghitung berapa banyak transaksi pertukaran yang dilakukan dengan saham perusahaan tertentu dalam 10 menit terakhir. Atau berapa banyak pengguna yang mengklik banner iklan baru dalam 15 menit terakhir. Suatu aplikasi dapat melakukan operasi tersebut beberapa kali, namun dengan hasil yang hanya berlaku pada periode waktu tertentu (jendela waktu).

Menghitung transaksi penukaran oleh pembeli

Pada contoh berikutnya, kami akan melacak transaksi saham di beberapa pedagang—baik organisasi besar atau pemodal individu yang cerdas.

Ada dua kemungkinan alasan pelacakan ini. Salah satunya adalah kebutuhan untuk mengetahui apa yang dibeli/dijual oleh pemimpin pasar. Jika para pemain besar dan investor canggih ini melihat peluang, masuk akal untuk mengikuti strategi mereka. Alasan kedua adalah keinginan untuk menemukan kemungkinan adanya tanda-tanda insider trading yang ilegal. Untuk melakukan ini, Anda perlu menganalisis korelasi lonjakan penjualan yang besar dengan siaran pers yang penting.

Pelacakan tersebut terdiri dari langkah-langkah berikut:

  • membuat aliran untuk membaca topik transaksi saham;
  • mengelompokkan catatan masuk berdasarkan ID pembeli dan simbol stok. Memanggil metode groupBy akan mengembalikan instance kelas KGroupedStream;
  • Metode KGroupedStream.windowedBy mengembalikan aliran data terbatas pada jangka waktu tertentu, yang memungkinkan agregasi berjendela. Tergantung pada jenis jendela, TimeWindowedKStream atau SessionWindowedKStream dikembalikan;
  • jumlah transaksi untuk operasi agregasi. Aliran data berjendela menentukan apakah catatan tertentu diperhitungkan dalam penghitungan ini;
  • menulis hasil ke suatu topik atau mengeluarkannya ke konsol selama pengembangan.

Topologi aplikasi ini sederhana, namun gambaran yang jelas akan sangat membantu. Mari kita lihat Gambar. 5.11.

Selanjutnya, kita akan melihat fungsionalitas operasi jendela dan kode terkait.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"

Jenis jendela

Ada tiga jenis jendela di Kafka Streams:

  • sesi;
  • “jatuh” (jatuh);
  • meluncur/melompat.

Yang mana yang harus dipilih bergantung pada kebutuhan bisnis Anda. Jendela jatuh dan melompat memiliki batasan waktu, sedangkan jendela sesi dibatasi oleh aktivitas pengguna—durasi sesi hanya ditentukan oleh seberapa aktif pengguna. Hal utama yang perlu diingat adalah bahwa semua tipe jendela didasarkan pada cap tanggal/waktu entri, bukan waktu sistem.

Selanjutnya, kami mengimplementasikan topologi kami dengan masing-masing tipe jendela. Kode lengkapnya hanya akan diberikan pada contoh pertama; untuk jenis windows lainnya tidak ada yang berubah kecuali jenis operasi jendela.

Jendela sesi

Jendela sesi sangat berbeda dari semua jenis jendela lainnya. Mereka dibatasi bukan oleh waktu melainkan oleh aktivitas pengguna (atau aktivitas entitas yang ingin Anda lacak). Jendela sesi dibatasi berdasarkan periode tidak aktif.

Gambar 5.12 mengilustrasikan konsep jendela sesi. Sesi yang lebih kecil akan bergabung dengan sesi di sebelah kirinya. Dan sesi di sebelah kanan akan terpisah karena mengikuti periode tidak aktif yang lama. Jendela sesi didasarkan pada aktivitas pengguna, tetapi menggunakan stempel tanggal/waktu dari entri untuk menentukan sesi mana yang termasuk dalam entri tersebut.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"

Menggunakan jendela sesi untuk melacak transaksi saham

Mari gunakan jendela sesi untuk menangkap informasi tentang transaksi pertukaran. Implementasi jendela sesi ditunjukkan pada Listing 5.5 (yang dapat ditemukan di src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Anda telah melihat sebagian besar operasi dalam topologi ini, jadi tidak perlu melihatnya lagi di sini. Namun ada juga beberapa elemen baru disini yang akan kita bahas sekarang.

Operasi groupBy apa pun biasanya melakukan beberapa jenis operasi agregasi (agregasi, rollup, atau penghitungan). Anda dapat melakukan agregasi kumulatif dengan total berjalan, atau agregasi jendela, yang memperhitungkan catatan akun dalam jangka waktu tertentu.

Kode di Listing 5.5 menghitung jumlah transaksi dalam jendela sesi. Pada Gambar. 5.13 Tindakan-tindakan ini dianalisis langkah demi langkah.

Dengan memanggil windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) kita membuat jendela sesi dengan interval tidak aktif 20 detik dan interval persistensi 15 menit. Interval idle 20 detik berarti aplikasi akan menyertakan entri apa pun yang masuk dalam waktu 20 detik setelah akhir atau awal sesi saat ini ke dalam sesi (aktif) saat ini.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Selanjutnya, kami menentukan operasi agregasi mana yang perlu dilakukan di jendela sesi - dalam hal ini, hitung. Jika entri masuk berada di luar jendela tidak aktif (di kedua sisi cap tanggal/waktu), aplikasi akan membuat sesi baru. Interval retensi berarti mempertahankan sesi selama jangka waktu tertentu dan memungkinkan data terlambat melampaui periode tidak aktif sesi tetapi masih dapat dilampirkan. Selain itu, awal dan akhir sesi baru yang dihasilkan dari penggabungan sesuai dengan stempel tanggal/waktu paling awal dan terbaru.

Mari kita lihat beberapa entri dari metode penghitungan untuk melihat cara kerja sesi (Tabel 5.1).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Ketika catatan tiba, kami mencari sesi yang ada dengan kunci yang sama, waktu berakhir kurang dari cap tanggal/waktu saat ini - interval tidak aktif, dan waktu mulai lebih besar dari cap tanggal/waktu saat ini + interval tidak aktif. Dengan mempertimbangkan hal ini, empat entri dari tabel. 5.1 digabungkan menjadi satu sesi sebagai berikut.

1. Catatan 1 tiba lebih dulu, sehingga waktu mulai sama dengan waktu berakhir yaitu 00:00:00.

2. Selanjutnya, entri 2 tiba, dan kami mencari sesi yang berakhir paling lambat 23:59:55 dan dimulai paling lambat 00:00:35. Kami menemukan catatan 1 dan menggabungkan sesi 1 dan 2. Kami mengambil waktu mulai sesi 1 (sebelumnya) dan waktu akhir sesi 2 (nanti), sehingga sesi baru kami dimulai pada 00:00:00 dan berakhir pada 00: 00:15.

3. Rekor 3 tiba, kami mencari sesi antara 00:00:30 dan 00:01:10 dan tidak menemukannya. Tambahkan sesi kedua untuk kunci 123-345-654,FFBE, dimulai dan diakhiri pada 00:00:50.

4. Rekor 4 tiba dan kami mencari sesi antara 23:59:45 dan 00:00:25. Kali ini ditemukan sesi 1 dan 2. Ketiga sesi digabungkan menjadi satu, dengan waktu mulai 00:00:00 dan waktu berakhir 00:00:15.

Dari apa yang dijelaskan di bagian ini, perlu diingat nuansa penting berikut ini:

  • sesi bukanlah jendela berukuran tetap. Durasi suatu sesi ditentukan oleh aktivitas dalam jangka waktu tertentu;
  • Stempel tanggal/waktu dalam data menentukan apakah peristiwa tersebut termasuk dalam sesi yang ada atau selama periode tidak aktif.

Selanjutnya kita akan membahas jenis jendela berikutnya - jendela “jatuh”.

Jendela "jatuh".

Jendela berjatuhan menangkap peristiwa yang terjadi dalam jangka waktu tertentu. Bayangkan Anda perlu mencatat semua transaksi saham perusahaan tertentu setiap 20 detik, jadi Anda mengumpulkan semua peristiwa selama periode waktu tersebut. Di akhir interval 20 detik, jendela akan bergulir dan berpindah ke interval pengamatan 20 detik yang baru. Gambar 5.14 mengilustrasikan situasi ini.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Seperti yang Anda lihat, semua peristiwa yang diterima dalam 20 detik terakhir disertakan dalam jendela. Di akhir periode waktu ini, jendela baru akan dibuat.

Listing 5.6 menunjukkan kode yang menunjukkan penggunaan jendela berjatuhan untuk menangkap transaksi saham setiap 20 detik (ditemukan di src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Dengan perubahan kecil pada pemanggilan metode TimeWindows.of ini, Anda dapat menggunakan jendela berjatuhan. Contoh ini tidak memanggil metode sampai(), sehingga interval retensi default 24 jam akan digunakan.

Terakhir, saatnya beralih ke opsi jendela terakhir - jendela "melompat".

Jendela geser ("melompat").

Jendela geser/melompat mirip dengan jendela berjatuhan, namun dengan sedikit perbedaan. Jendela geser tidak menunggu hingga akhir interval waktu sebelum membuat jendela baru untuk memproses kejadian terkini. Mereka memulai penghitungan baru setelah interval tunggu kurang dari durasi jendela.

Untuk mengilustrasikan perbedaan antara jendela tumbling dan jendela lompat, mari kita kembali ke contoh penghitungan transaksi bursa. Tujuan kami tetap menghitung jumlah transaksi, namun kami tidak ingin menunggu sepanjang waktu sebelum memperbarui penghitung. Sebaliknya, kami akan memperbarui penghitung pada interval yang lebih pendek. Misalnya, kita masih akan menghitung jumlah transaksi setiap 20 detik, namun memperbarui penghitung setiap 5 detik, seperti yang ditunjukkan pada Gambar. 5.15. Dalam kasus ini, kita mendapatkan tiga jendela hasil dengan data yang tumpang tindih.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Listing 5.7 menunjukkan kode untuk mendefinisikan jendela geser (ditemukan di src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Jendela tumbling dapat diubah menjadi jendela lompat dengan menambahkan panggilan ke metode advanceBy(). Pada contoh yang ditunjukkan, interval penghematan adalah 15 menit.

Anda melihat di bagian ini cara membatasi hasil agregasi ke jangka waktu. Secara khusus, saya ingin Anda mengingat tiga hal berikut dari bagian ini:

  • ukuran jendela sesi tidak dibatasi oleh jangka waktu, tetapi oleh aktivitas pengguna;
  • Jendela “jatuh” memberikan gambaran umum tentang peristiwa dalam periode waktu tertentu;
  • Durasi jendela lompat sudah ditetapkan, namun sering diperbarui dan mungkin berisi entri yang tumpang tindih di semua jendela.

Selanjutnya, kita akan mempelajari cara mengubah KTable kembali menjadi KStream untuk koneksi.

5.3.3. Menghubungkan objek KStream dan KTable

Pada Bab 4, kita membahas cara menghubungkan dua objek KStream. Sekarang kita harus mempelajari cara menghubungkan KTable dan KStream. Ini mungkin diperlukan karena alasan sederhana berikut. KStream adalah aliran rekaman, dan KTable adalah aliran pembaruan rekaman, namun terkadang Anda mungkin ingin menambahkan konteks tambahan ke aliran rekaman menggunakan pembaruan dari KTable.

Mari kita ambil data jumlah transaksi bursa dan gabungkan dengan berita bursa untuk industri terkait. Inilah yang perlu Anda lakukan untuk mencapai hal ini mengingat kode yang sudah Anda miliki.

  1. Konversikan objek KTable yang berisi data jumlah transaksi saham menjadi KStream, dilanjutkan dengan mengganti kunci dengan kunci yang menunjukkan sektor industri yang sesuai dengan simbol saham tersebut.
  2. Buat objek KTable yang membaca data dari suatu topik dengan berita bursa. KTable baru ini akan dikategorikan berdasarkan sektor industri.
  3. Menghubungkan update berita dengan informasi jumlah transaksi bursa berdasarkan sektor industri.

Sekarang mari kita lihat bagaimana menerapkan rencana aksi ini.

Ubah KTable menjadi KStream

Untuk mengonversi KTable ke KStream Anda perlu melakukan hal berikut.

  1. Panggil metode KTable.toStream().
  2. Dengan memanggil metode KStream.map, ganti kunci dengan nama industri, lalu ambil objek TransactionSummary dari instance Windowed.

Kami akan merangkai operasi ini bersama-sama sebagai berikut (kode dapat ditemukan di file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Karena kita menjalankan operasi KStream.map, instance KStream yang dikembalikan dipartisi ulang secara otomatis saat digunakan dalam koneksi.

Proses konversi sudah kita selesaikan, selanjutnya kita perlu membuat objek KTable untuk membaca berita saham.

Pembuatan KTable untuk berita saham

Untungnya, membuat objek KTable hanya membutuhkan satu baris kode (kode dapat ditemukan di src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Perlu dicatat bahwa tidak ada objek Serde yang perlu ditentukan, karena string Serdes digunakan dalam pengaturan. Selain itu, dengan menggunakan enumerasi EARLIEST, tabel diisi dengan record di awal.

Sekarang kita dapat melanjutkan ke langkah terakhir - koneksi.

Menghubungkan update berita dengan data jumlah transaksi

Membuat koneksi tidaklah sulit. Kami akan menggunakan gabungan kiri jika tidak ada berita saham untuk industri terkait (kode yang diperlukan dapat ditemukan di file src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Operator leftJoin ini cukup sederhana. Berbeda dengan join di Bab 4, metode JoinWindow tidak digunakan karena saat melakukan join KStream-KTable, hanya ada satu entri di KTable untuk setiap kunci. Koneksi seperti itu tidak dibatasi waktu: catatannya ada di KTable atau tidak ada. Kesimpulan utama: menggunakan objek KTable Anda dapat memperkaya KStream dengan data referensi yang lebih jarang diperbarui.

Sekarang kita akan melihat cara yang lebih efisien untuk memperkaya acara dari KStream.

5.3.4. Objek GlobalKTable

Seperti yang Anda lihat, ada kebutuhan untuk memperkaya aliran peristiwa atau menambahkan konteks ke dalamnya. Di Bab 4 Anda melihat hubungan antara dua objek KStream, dan di bagian sebelumnya Anda melihat hubungan antara KStream dan KTable. Dalam semua kasus ini, aliran data perlu dipartisi ulang saat memetakan kunci ke tipe atau nilai baru. Terkadang partisi ulang dilakukan secara eksplisit, dan terkadang Kafka Streams melakukannya secara otomatis. Partisi ulang diperlukan karena kunci telah berubah dan catatan harus berakhir di bagian baru, jika tidak, koneksi tidak akan mungkin dilakukan (hal ini telah dibahas di Bab 4, di bagian “Mempartisi ulang data” di subbagian 4.2.4).

Mempartisi ulang memerlukan biaya

Mempartisi ulang memerlukan biaya - biaya sumber daya tambahan untuk membuat topik perantara, menyimpan data duplikat di topik lain; ini juga berarti peningkatan latensi karena menulis dan membaca topik ini. Selain itu, jika Anda perlu menggabungkan lebih dari satu aspek atau dimensi, Anda harus merangkai gabungan tersebut, memetakan rekaman dengan kunci baru, dan menjalankan proses partisi ulang lagi.

Menghubungkan ke kumpulan data yang lebih kecil

Dalam beberapa kasus, volume data referensi yang akan dihubungkan relatif kecil, sehingga salinan lengkapnya dapat dengan mudah ditampung secara lokal di setiap node. Untuk situasi seperti ini, Kafka Streams menyediakan kelas GlobalKTable.

Instance GlobalKTable bersifat unik karena aplikasi mereplikasi semua data ke setiap node. Dan karena semua data ada di setiap node, tidak perlu mempartisi aliran peristiwa dengan kunci data referensi sehingga tersedia untuk semua partisi. Anda juga dapat membuat gabungan tanpa kunci menggunakan objek GlobalKTable. Mari kita kembali ke salah satu contoh sebelumnya untuk mendemonstrasikan fitur ini.

Menghubungkan objek KStream ke objek GlobalKTable

Di subbagian 5.3.2, kami melakukan agregasi jendela transaksi pertukaran oleh pembeli. Hasil agregasi ini terlihat seperti ini:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Meskipun hasil ini sesuai dengan tujuannya, akan lebih berguna jika nama pelanggan dan nama lengkap perusahaan juga ditampilkan. Untuk menambahkan nama pelanggan dan nama perusahaan, Anda dapat melakukan penggabungan normal, namun Anda perlu melakukan dua pemetaan kunci dan partisi ulang. Dengan GlobalKTable Anda dapat menghindari biaya operasi tersebut.

Untuk melakukan ini, kita akan menggunakan objek countStream dari Listing 5.11 (kode terkait dapat ditemukan di src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) dan menghubungkannya ke dua objek GlobalKTable.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Kita sudah membicarakan hal ini sebelumnya, jadi saya tidak akan mengulanginya. Namun saya perhatikan bahwa kode dalam fungsi toStream().map diabstraksi menjadi objek fungsi alih-alih ekspresi lambda sebaris demi keterbacaan.

Langkah selanjutnya adalah mendeklarasikan dua instance GlobalKTable (kode yang ditampilkan dapat ditemukan di file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.12).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"

Harap dicatat bahwa nama topik dijelaskan menggunakan tipe yang disebutkan.

Sekarang semua komponen sudah siap, yang tersisa hanyalah menulis kode untuk koneksi (yang dapat ditemukan di file src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13).

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Meskipun ada dua gabungan dalam kode ini, keduanya dirangkai karena tidak ada hasil yang digunakan secara terpisah. Hasilnya ditampilkan di akhir seluruh operasi.

Saat Anda menjalankan operasi penggabungan di atas, Anda akan mendapatkan hasil seperti ini:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Esensinya tidak berubah, namun hasil ini terlihat lebih jelas.

Jika Anda menghitung mundur ke Bab 4, Anda telah melihat beberapa jenis koneksi beraksi. Mereka tercantum dalam tabel. 5.2. Tabel ini mencerminkan kemampuan konektivitas pada Kafka Streams versi 1.0.0; Sesuatu mungkin berubah dalam rilis mendatang.

Buku “Arus Kafka Beraksi. Aplikasi dan layanan mikro untuk pekerjaan real-time"
Sebagai penutup, mari kita rekap dasar-dasarnya: Anda dapat menghubungkan aliran peristiwa (KStream) dan memperbarui aliran (KTable) menggunakan status lokal. Alternatifnya, jika ukuran data referensi tidak terlalu besar, Anda bisa menggunakan objek GlobalKTable. GlobalKTables mereplikasi semua partisi ke setiap node aplikasi Kafka Streams, memastikan bahwa semua data tersedia terlepas dari partisi mana yang terkait dengan kunci tersebut.

Selanjutnya kita akan melihat fitur Kafka Streams, berkat itu kita dapat mengamati perubahan status tanpa menggunakan data dari topik Kafka.

5.3.5. Status yang dapat dikueri

Kami telah melakukan beberapa operasi yang melibatkan status dan selalu menampilkan hasilnya ke konsol (untuk tujuan pengembangan) atau menuliskannya ke suatu topik (untuk tujuan produksi). Saat menulis hasil pada suatu topik, Anda harus menggunakan konsumen Kafka untuk melihatnya.

Membaca data dari topik-topik ini dapat dianggap sebagai jenis pandangan yang terwujud. Untuk tujuan kita, kita dapat menggunakan definisi tampilan terwujud dari Wikipedia: “...objek database fisik yang berisi hasil kueri. Misalnya, itu bisa berupa salinan lokal dari data jarak jauh, atau subkumpulan baris dan/atau kolom dari tabel atau hasil gabungan, atau tabel ringkasan yang diperoleh melalui agregasi” (https://en.wikipedia.org/wiki /Tampilan_Terwujud).

Kafka Streams juga memungkinkan Anda menjalankan kueri interaktif di penyimpanan negara, memungkinkan Anda membaca langsung tampilan material ini. Penting untuk dicatat bahwa permintaan ke penyimpanan negara adalah operasi baca-saja. Hal ini memastikan bahwa Anda tidak perlu khawatir membuat status tidak konsisten secara tidak sengaja saat aplikasi Anda memproses data.

Kemampuan untuk menanyakan penyimpanan negara secara langsung adalah penting. Artinya, Anda dapat membuat aplikasi dasbor tanpa harus mengambil data terlebih dahulu dari konsumen Kafka. Ini juga meningkatkan efisiensi aplikasi, karena tidak perlu menulis data lagi:

  • berkat lokalitas datanya, data tersebut dapat diakses dengan cepat;
  • duplikasi data dihilangkan, karena tidak ditulis ke penyimpanan eksternal.

Hal utama yang saya ingin Anda ingat adalah Anda bisa langsung menanyakan status dari dalam aplikasi Anda. Peluang yang diberikan hal ini kepada Anda tidak dapat dilebih-lebihkan. Daripada menggunakan data dari Kafka dan menyimpan catatan dalam database untuk aplikasi, Anda dapat menanyakan penyimpanan status dengan hasil yang sama. Kueri langsung ke penyimpanan negara berarti lebih sedikit kode (tidak ada konsumen) dan lebih sedikit perangkat lunak (tidak memerlukan tabel database untuk menyimpan hasilnya).

Kita telah membahas cukup banyak hal dalam bab ini, jadi kita akan meninggalkan diskusi kita mengenai query interaktif terhadap penyimpanan negara untuk saat ini. Namun jangan khawatir: di Bab 9, kita akan membuat aplikasi dashboard sederhana dengan query interaktif. Ini akan menggunakan beberapa contoh dari bab ini dan bab sebelumnya untuk mendemonstrasikan kueri interaktif dan bagaimana Anda dapat menambahkannya ke aplikasi Kafka Streams.

Ringkasan

  • Objek KStream mewakili aliran peristiwa, sebanding dengan penyisipan ke dalam database. Objek KTable mewakili aliran pembaruan, lebih seperti pembaruan pada database. Ukuran objek KTable tidak bertambah, record lama diganti dengan yang baru.
  • Objek KTable diperlukan untuk operasi agregasi.
  • Dengan menggunakan operasi windowing, Anda dapat membagi data gabungan ke dalam kelompok waktu.
  • Berkat objek GlobalKTable, Anda dapat mengakses data referensi di mana saja dalam aplikasi, apa pun partisinya.
  • Koneksi antara objek KStream, KTable dan GlobalKTable dimungkinkan.

Sejauh ini, kami fokus membangun aplikasi Kafka Streams menggunakan KStream DSL tingkat tinggi. Meskipun pendekatan tingkat tinggi memungkinkan Anda membuat program yang rapi dan ringkas, menggunakannya merupakan trade-off. Bekerja dengan DSL KStream berarti meningkatkan keringkasan kode Anda dengan mengurangi tingkat kontrol. Pada bab berikutnya, kita akan melihat API node pengendali tingkat rendah dan mencoba trade-off lainnya. Programnya akan lebih panjang dari sebelumnya, namun kita akan mampu membuat hampir semua node pengendali yang mungkin kita perlukan.

→ Detail lebih lanjut tentang buku ini dapat ditemukan di situs web penerbit

→ Untuk Habrozhiteli diskon 25% menggunakan kupon - Aliran Kafka

→ Setelah pembayaran untuk buku versi kertas, buku elektronik akan dikirim melalui email.

Sumber: www.habr.com

Tambah komentar