Redis Stream - keandalan dan skalabilitas sistem pesan Anda

Redis Stream - keandalan dan skalabilitas sistem pesan Anda

Redis Stream adalah tipe data abstrak baru yang diperkenalkan di Redis dengan versi 5.0
Secara konseptual, Aliran Redis adalah Daftar yang dapat Anda tambahkan entri. Setiap entri memiliki pengidentifikasi unik. Secara default, ID dibuat secara otomatis dan menyertakan stempel waktu. Oleh karena itu, Anda dapat menanyakan rentang rekaman dari waktu ke waktu, atau menerima data baru saat data tersebut tiba di aliran, seperti perintah Unix "tail -f" yang membaca file log dan terhenti saat menunggu data baru. Perhatikan bahwa beberapa klien dapat mendengarkan thread pada saat yang sama, sama seperti banyak proses "tail -f" yang dapat membaca file secara bersamaan tanpa konflik satu sama lain.

Untuk memahami semua manfaat tipe data baru ini, mari kita lihat sekilas struktur Redis yang sudah lama ada yang sebagian mereplikasi fungsi Redis Stream.

Redis PUB/SUB

Redis Pub/Sub adalah sistem pesan sederhana yang sudah terpasang di penyimpanan nilai kunci Anda. Namun, kesederhanaan harus dibayar mahal:

  • Jika penerbit gagal karena alasan tertentu, ia kehilangan semua pelanggannya
  • Penerbit perlu mengetahui alamat pasti semua pelanggannya
  • Penerbit mungkin membebani pelanggannya dengan pekerjaan jika data dipublikasikan lebih cepat daripada pemrosesannya
  • Pesan akan dihapus dari buffer penerbit segera setelah dipublikasikan, terlepas dari berapa banyak pelanggan yang menerima pesan tersebut dan seberapa cepat mereka dapat memproses pesan tersebut.
  • Semua pelanggan akan menerima pesan secara bersamaan. Pelanggan sendiri entah bagaimana harus sepakat di antara mereka sendiri tentang urutan pemrosesan pesan yang sama.
  • Tidak ada mekanisme bawaan untuk mengonfirmasi bahwa pelanggan telah berhasil memproses pesan. Jika pelanggan menerima pesan dan mengalami crash selama pemrosesan, penerbit tidak akan mengetahuinya.

Daftar Ulang

Daftar Redis adalah struktur data yang mendukung pemblokiran perintah baca. Anda dapat menambah dan membaca pesan dari awal atau akhir daftar. Berdasarkan struktur ini, Anda dapat membuat tumpukan atau antrian yang bagus untuk sistem terdistribusi Anda, dan dalam banyak kasus ini sudah cukup. Perbedaan utama dari Redis Pub/Sub:

  • Pesan dikirimkan ke satu klien. Klien yang diblokir baca pertama akan menerima data terlebih dahulu.
  • Clint harus memulai sendiri operasi baca untuk setiap pesan. Daftar tidak tahu apa-apa tentang klien.
  • Pesan disimpan sampai seseorang membacanya atau menghapusnya secara eksplisit. Jika Anda mengonfigurasi server Redis untuk membuang data ke disk, keandalan sistem meningkat secara dramatis.

Pengantar Aliran

Menambahkan entri ke aliran

Tim XTAMBAH menambahkan entri baru ke aliran. Sebuah record bukan sekedar string, ia terdiri dari satu atau lebih pasangan nilai kunci. Dengan demikian, setiap entri sudah terstruktur dan menyerupai struktur file CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Pada contoh di atas, kita menambahkan dua kolom ke aliran dengan nama (kunci) “mystream”: “sensor-id” dan “temperature” dengan nilai masing-masing “1234” dan “19.8”. Sebagai argumen kedua, perintah mengambil pengidentifikasi yang akan ditetapkan ke entri - pengidentifikasi ini secara unik mengidentifikasi setiap entri dalam aliran. Namun, dalam kasus ini kami meneruskan * karena kami ingin Redis membuatkan ID baru untuk kami. Setiap ID baru akan bertambah. Oleh karena itu, setiap entri baru akan memiliki pengenal yang lebih tinggi dibandingkan entri sebelumnya.

Format pengenal

ID entri dikembalikan oleh perintah XTAMBAH, terdiri dari dua bagian:

{millisecondsTime}-{sequenceNumber}

milidetikWaktu — Waktu Unix dalam milidetik (waktu server Redis). Namun, jika waktu saat ini sama atau kurang dari waktu rekaman sebelumnya, maka stempel waktu rekaman sebelumnya akan digunakan. Oleh karena itu, jika waktu server kembali ke masa lalu, pengidentifikasi baru akan tetap mempertahankan properti kenaikan.

nomor urut digunakan untuk rekaman yang dibuat dalam milidetik yang sama. nomor urut akan bertambah 1 dibandingkan entri sebelumnya. Karena nomor urut berukuran 64 bit, maka dalam praktiknya Anda tidak akan mengalami batasan jumlah record yang dapat dihasilkan dalam satu milidetik.

Sekilas format pengidentifikasi tersebut mungkin tampak aneh. Pembaca yang tidak percaya mungkin bertanya-tanya mengapa waktu menjadi bagian dari pengenal. Alasannya adalah aliran Redis mendukung kueri rentang berdasarkan ID. Karena pengidentifikasi dikaitkan dengan waktu pembuatan rekaman, hal ini memungkinkan untuk menanyakan rentang waktu. Kita akan melihat contoh spesifik ketika kita melihat perintahnya XRANGE.

Jika karena alasan tertentu pengguna perlu menentukan pengenalnya sendiri, yang, misalnya, dikaitkan dengan beberapa sistem eksternal, maka kita dapat meneruskannya ke perintah XTAMBAH alih-alih * seperti yang ditunjukkan di bawah ini:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Harap dicatat bahwa dalam hal ini Anda harus memantau sendiri penambahan ID. Dalam contoh kita, pengidentifikasi minimum adalah "0-1", sehingga perintah tidak akan menerima pengidentifikasi lain yang sama dengan atau kurang dari "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Jumlah catatan per aliran

Dimungkinkan untuk mendapatkan jumlah catatan dalam suatu aliran hanya dengan menggunakan perintah XLEN. Sebagai contoh kita, perintah ini akan mengembalikan nilai berikut:

> XLEN somestream
(integer) 2

Rentang kueri - XRANGE dan XREVRANGE

Untuk meminta data berdasarkan rentang, kita perlu menentukan dua pengidentifikasi - awal dan akhir rentang. Rentang yang dikembalikan akan mencakup semua elemen, termasuk batasnya. Ada juga dua pengidentifikasi khusus “-” dan “+”, masing-masing berarti pengidentifikasi terkecil (catatan pertama) dan terbesar (catatan terakhir) dalam aliran. Contoh di bawah ini akan mencantumkan semua entri aliran.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Setiap rekaman yang dikembalikan adalah larik yang terdiri dari dua elemen: pengidentifikasi dan daftar pasangan nilai kunci. Kami telah mengatakan bahwa pengidentifikasi rekaman berkaitan dengan waktu. Oleh karena itu, kami dapat meminta rentang jangka waktu tertentu. Namun, kami dapat menentukan dalam permintaan bukan pengidentifikasi lengkap, tetapi hanya waktu Unix, dengan menghilangkan bagian yang terkait dengan nomor urut. Bagian pengidentifikasi yang dihilangkan akan secara otomatis disetel ke nol di awal rentang dan ke nilai maksimum yang mungkin di akhir rentang. Di bawah ini adalah contoh bagaimana Anda dapat meminta rentang dua milidetik.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Kami hanya memiliki satu entri dalam rentang ini, namun dalam kumpulan data nyata, hasil yang dikembalikan bisa sangat besar. Untuk alasan ini XRANGE mendukung opsi COUNT. Dengan menentukan kuantitas, kita bisa mendapatkan N record pertama. Jika kita perlu mendapatkan N record berikutnya (pagination), kita bisa menggunakan ID yang terakhir diterima, tingkatkan nomor urut satu per satu dan bertanya lagi. Mari kita lihat pada contoh berikut. Kami mulai menambahkan 10 elemen dengan XTAMBAH (dengan asumsi mystream sudah diisi dengan 10 elemen). Untuk memulai iterasi dengan mendapatkan 2 elemen per perintah, kita mulai dengan rentang penuh tetapi dengan COUNT sama dengan 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Untuk melanjutkan iterasi dengan dua elemen berikutnya, kita perlu memilih ID terakhir yang diterima, yaitu 1519073279157-0, dan menambahkan 1 ke nomor urut.
ID yang dihasilkan, dalam hal ini 1519073279157-1, sekarang dapat digunakan sebagai argumen awal rentang baru untuk panggilan berikutnya XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Dan seterusnya. Karena kompleksitas XRANGE adalah O(log(N)) untuk mencari dan kemudian O(M) untuk mengembalikan elemen M, maka setiap langkah iterasi menjadi cepat. Jadi, menggunakan XRANGE aliran dapat diulang secara efisien.

Tim XREVRANGE adalah setara XRANGE, tetapi mengembalikan elemen dalam urutan terbalik:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Harap dicatat bahwa perintahnya XREVRANGE mengambil argumen rentang mulai dan berhenti dalam urutan terbalik.

Membaca entri baru menggunakan XREAD

Seringkali muncul tugas untuk berlangganan suatu aliran dan hanya menerima pesan baru. Konsep ini mungkin tampak mirip dengan Redis Pub/Sub atau memblokir Daftar Redis, namun ada perbedaan mendasar dalam cara menggunakan Redis Stream:

  1. Setiap pesan baru dikirimkan ke setiap pelanggan secara default. Perilaku ini berbeda dengan pemblokiran Daftar Redis, di mana pesan baru hanya akan dibaca oleh satu pelanggan.
  2. Saat di Redis Pub/Sub semua pesan dilupakan dan tidak pernah disimpan, di Stream semua pesan dipertahankan tanpa batas waktu (kecuali klien secara eksplisit menyebabkan penghapusan).
  3. Redis Stream memungkinkan Anda membedakan akses ke pesan dalam satu aliran. Pelanggan tertentu hanya dapat melihat riwayat pesan pribadinya.

Anda dapat berlangganan thread dan menerima pesan baru menggunakan perintah XBACA. Ini sedikit lebih rumit dari itu XRANGE, jadi kita akan mulai dengan contoh yang lebih sederhana terlebih dahulu.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Contoh di atas menunjukkan formulir non-pemblokiran XBACA. Perhatikan bahwa opsi COUNT bersifat opsional. Faktanya, satu-satunya opsi perintah yang diperlukan adalah opsi STREAMS, yang menentukan daftar aliran bersama dengan pengidentifikasi maksimum yang sesuai. Kami menulis "STREAMS mystream 0" - kami ingin menerima semua rekaman aliran mystream dengan pengidentifikasi lebih besar dari "0-0". Seperti yang Anda lihat dari contoh, perintah mengembalikan nama thread karena kita dapat berlangganan beberapa thread secara bersamaan. Kita dapat menulis, misalnya, "STREAMS mystream otherstream 0 0". Harap dicatat bahwa setelah opsi STREAMS kita harus terlebih dahulu memberikan nama semua aliran yang diperlukan dan baru kemudian daftar pengidentifikasi.

Dalam bentuk sederhana ini perintah tidak melakukan sesuatu yang istimewa dibandingkan dengan XRANGE. Namun, yang menarik adalah kita bisa dengan mudah berbelok XBACA ke perintah pemblokiran, tentukan argumen BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Pada contoh di atas, opsi BLOCK baru ditentukan dengan batas waktu 0 milidetik (ini berarti menunggu tanpa batas waktu). Selain itu, alih-alih meneruskan pengidentifikasi biasa untuk aliran mystream, pengidentifikasi khusus $ telah diteruskan. Pengidentifikasi khusus ini berarti itu XBACA harus menggunakan pengidentifikasi maksimum di mystream sebagai pengidentifikasi. Jadi kami hanya akan menerima pesan baru mulai dari saat kami mulai mendengarkan. Dalam beberapa hal ini mirip dengan perintah Unix "tail -f".

Perhatikan bahwa ketika menggunakan opsi BLOCK kita tidak perlu menggunakan pengenal khusus $. Kita dapat menggunakan pengenal apa pun yang ada di aliran. Jika tim dapat segera melayani permintaan kami tanpa memblokir, maka tim akan melakukannya, jika tidak maka akan diblokir.

Pemblokiran XBACA juga dapat mendengarkan beberapa thread sekaligus, Anda hanya perlu menentukan namanya. Dalam hal ini, perintah akan mengembalikan catatan aliran pertama yang menerima data. Pelanggan pertama yang diblokir untuk thread tertentu akan menerima data terlebih dahulu.

Kelompok Konsumen

Dalam tugas tertentu, kami ingin membatasi akses pelanggan ke pesan dalam satu thread. Contoh di mana hal ini dapat berguna adalah antrean pesan dengan pekerja yang akan menerima pesan berbeda dari thread, sehingga pemrosesan pesan dapat ditingkatkan skalanya.

Jika kita bayangkan kita memiliki tiga pelanggan C1, C2, C3 dan sebuah thread yang berisi pesan 1, 2, 3, 4, 5, 6, 7, maka pesan-pesan tersebut akan disajikan seperti pada diagram di bawah ini:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Untuk mencapai efek ini, Redis Stream menggunakan konsep yang disebut Consumer Group. Konsep ini mirip dengan pelanggan semu, yang menerima data dari aliran, namun sebenarnya dilayani oleh beberapa pelanggan dalam satu grup, sehingga memberikan jaminan tertentu:

  1. Setiap pesan dikirim ke pelanggan berbeda dalam grup.
  2. Dalam grup, pelanggan diidentifikasi berdasarkan namanya, yang merupakan string peka huruf besar-kecil. Jika pelanggan keluar sementara dari grup, dia dapat dikembalikan ke grup menggunakan nama uniknya sendiri.
  3. Setiap Grup Konsumen mengikuti konsep “pesan pertama yang belum dibaca”. Ketika pelanggan meminta pesan baru, ia hanya dapat menerima pesan yang belum pernah dikirimkan sebelumnya ke pelanggan mana pun dalam grup.
  4. Terdapat perintah untuk mengonfirmasi secara eksplisit bahwa pesan berhasil diproses oleh pelanggan. Sampai perintah ini dipanggil, pesan yang diminta akan tetap dalam status "menunggu keputusan".
  5. Dalam Grup Konsumen, setiap pelanggan dapat meminta riwayat pesan yang telah dikirimkan kepadanya, namun belum diproses (dalam status “tertunda”)

Dalam arti tertentu, keadaan kelompok dapat dinyatakan sebagai berikut:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Sekarang saatnya mengenal perintah utama Kelompok Konsumen, yaitu:

  • KELOMPOK X digunakan untuk membuat, menghancurkan, dan mengelola kelompok
  • XREADGROUP digunakan untuk membaca streaming melalui grup
  • XACK - perintah ini memungkinkan pelanggan menandai pesan sebagai berhasil diproses

Penciptaan Grup Konsumen

Anggaplah mystream sudah ada. Maka perintah pembuatan grup akan terlihat seperti:

> XGROUP CREATE mystream mygroup $
OK

Saat membuat grup, kita harus memberikan pengenal, mulai dari grup mana yang akan menerima pesan. Jika kita hanya ingin menerima semua pesan baru, maka kita dapat menggunakan pengenal khusus $ (seperti pada contoh kita di atas). Jika Anda menentukan 0 dan bukan pengidentifikasi khusus, maka semua pesan di thread akan tersedia untuk grup.

Sekarang grup telah dibuat, kita dapat segera mulai membaca pesan menggunakan perintah XREADGROUP. Perintah ini sangat mirip dengan XBACA dan mendukung opsi BLOK opsional. Namun, ada opsi GROUP wajib yang harus selalu ditentukan dengan dua argumen: nama grup dan nama pelanggan. Opsi COUNT juga didukung.

Sebelum membaca thread ini, mari kita taruh beberapa pesan di sana:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Sekarang mari kita coba membaca aliran ini melalui grup:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Perintah di atas berbunyi kata demi kata sebagai berikut:

“Saya, pelanggan Alice, anggota grup saya, ingin membaca satu pesan dari aliran saya yang belum pernah dikirimkan kepada siapa pun sebelumnya.”

Setiap kali pelanggan melakukan operasi pada grup, ia harus memberikan namanya, yang secara unik mengidentifikasi dirinya di dalam grup. Ada detail lain yang sangat penting dalam perintah di atas - pengidentifikasi khusus ">". Pengidentifikasi khusus ini memfilter pesan, hanya menyisakan pesan yang belum pernah terkirim sebelumnya.

Selain itu, dalam kasus khusus, Anda dapat menentukan pengenal sebenarnya seperti 0 atau pengenal valid lainnya. Dalam hal ini perintahnya XREADGROUP akan mengembalikan kepada Anda riwayat pesan dengan status "menunggu keputusan" yang dikirimkan ke pelanggan tertentu (Alice) tetapi belum diakui menggunakan perintah XACK.

Kita dapat menguji perilaku ini dengan segera menentukan ID 0, tanpa opsi COUNT. Kami hanya akan melihat satu pesan yang tertunda, yaitu pesan apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Namun, jika kami mengonfirmasi bahwa pesan berhasil diproses, maka pesan tersebut tidak akan ditampilkan lagi:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Sekarang giliran Bob untuk membaca sesuatu:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, salah satu anggota grup saya, meminta tidak lebih dari dua pesan. Perintah ini hanya melaporkan pesan yang tidak terkirim karena pengidentifikasi khusus ">". Seperti yang Anda lihat, pesan "apel" tidak akan ditampilkan karena sudah terkirim ke Alice, jadi Bob menerima "oranye" dan "strawberry".

Dengan cara ini, Alice, Bob, dan pelanggan grup lainnya dapat membaca pesan berbeda dari aliran yang sama. Mereka juga dapat membaca riwayat pesan yang belum diproses atau menandai pesan sebagai sudah diproses.

Ada beberapa hal yang perlu diingat:

  • Segera setelah pelanggan menganggap pesan tersebut sebagai perintah XREADGROUP, pesan ini masuk ke status "menunggu keputusan" dan ditetapkan ke pelanggan tertentu. Pelanggan grup lain tidak akan dapat membaca pesan ini.
  • Pelanggan dibuat secara otomatis saat pertama kali disebutkan, tidak perlu membuatnya secara eksplisit.
  • Dengan XREADGROUP Anda dapat membaca pesan dari beberapa thread berbeda secara bersamaan, namun agar ini berfungsi, Anda harus terlebih dahulu membuat grup dengan nama yang sama untuk setiap thread menggunakan KELOMPOK X

Pemulihan setelah kegagalan

Pelanggan dapat pulih dari kegagalan dan membaca kembali daftar pesannya dengan status “tertunda”. Namun, di dunia nyata, pelanggan pada akhirnya bisa saja gagal. Apa yang terjadi pada pesan pelanggan yang macet jika pelanggan tidak dapat pulih dari kegagalan?
Grup Konsumen menawarkan fitur yang digunakan hanya untuk kasus seperti itu - ketika Anda perlu mengubah pemilik pesan.

Hal pertama yang perlu Anda lakukan adalah memanggil perintah PENGELUARAN, yang menampilkan semua pesan di grup dengan status “pending”. Dalam bentuknya yang paling sederhana, perintah ini dipanggil hanya dengan dua argumen: nama thread dan nama grup:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tim menampilkan jumlah pesan yang belum diproses untuk seluruh grup dan setiap pelanggan. Kami hanya memiliki Bob dengan dua pesan yang belum terselesaikan karena satu-satunya pesan yang diminta Alice telah dikonfirmasi XACK.

Kami dapat meminta lebih banyak informasi menggunakan lebih banyak argumen:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - rentang pengidentifikasi (Anda dapat menggunakan “-” dan “+”)
{count} — jumlah upaya pengiriman
{nama-konsumen} - nama grup

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Sekarang kami memiliki detail untuk setiap pesan: ID, nama pelanggan, waktu idle dalam milidetik, dan terakhir jumlah upaya pengiriman. Kami mendapat dua pesan dari Bob dan pesan tersebut tidak aktif selama 74170458 milidetik, sekitar 20 jam.

Harap dicatat bahwa tidak ada yang menghentikan kami untuk memeriksa isi pesan hanya dengan menggunakan XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Kita hanya perlu mengulangi pengidentifikasi yang sama dua kali dalam argumen. Sekarang kita mempunyai gambaran, Alice mungkin memutuskan bahwa setelah 20 jam downtime, Bob mungkin tidak akan pulih, dan inilah waktunya untuk menanyakan pesan-pesan tersebut dan melanjutkan pemrosesannya untuk Bob. Untuk ini kami menggunakan perintah XKLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Dengan menggunakan perintah ini, kita dapat menerima pesan “asing” yang belum diproses dengan mengubah pemiliknya menjadi {consumer}. Namun, kami juga dapat memberikan waktu idle minimum {min-idle-time}. Hal ini membantu menghindari situasi ketika dua klien mencoba mengubah pemilik pesan yang sama secara bersamaan:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pelanggan pertama akan mengatur ulang waktu henti dan menambah konter pengiriman. Jadi klien kedua tidak akan bisa memintanya.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Pesan berhasil diklaim oleh Alice, yang kini dapat memproses pesan tersebut dan mengonfirmasinya.

Dari contoh di atas, Anda dapat melihat bahwa permintaan yang berhasil mengembalikan isi pesan itu sendiri. Namun, hal ini tidak perlu. Opsi JUSTID hanya dapat digunakan untuk mengembalikan ID pesan. Ini berguna jika Anda tidak tertarik dengan detail pesan dan ingin meningkatkan kinerja sistem.

Konter pengiriman

Penghitung yang Anda lihat di output PENGELUARAN adalah jumlah pengiriman setiap pesan. Penghitung tersebut bertambah dalam dua cara: ketika pesan berhasil diminta melalui XKLAIM atau saat panggilan digunakan XREADGROUP.

Adalah normal jika beberapa pesan dikirimkan berkali-kali. Hal utama adalah semua pesan diproses pada akhirnya. Terkadang masalah terjadi saat memproses pesan karena pesan itu sendiri rusak, atau pemrosesan pesan menyebabkan kesalahan pada kode pengendali. Dalam hal ini, mungkin tidak ada seorang pun yang dapat memproses pesan ini. Karena kita mempunyai penghitung upaya pengiriman, kita dapat menggunakan penghitung ini untuk mendeteksi situasi seperti itu. Oleh karena itu, setelah penghitung pengiriman mencapai angka tinggi yang Anda tentukan, mungkin akan lebih bijaksana untuk meletakkan pesan seperti itu di thread lain dan mengirimkan pemberitahuan ke administrator sistem.

Status Benang

Tim XINFO digunakan untuk meminta berbagai informasi tentang thread dan grupnya. Misalnya, perintah dasar terlihat seperti ini:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Perintah di atas menampilkan informasi umum tentang aliran yang ditentukan. Sekarang contoh yang sedikit lebih rumit:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Perintah di atas menampilkan informasi umum untuk semua grup thread yang ditentukan

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Perintah di atas menampilkan informasi untuk semua pelanggan aliran dan grup tertentu.
Jika Anda lupa sintaks perintahnya, cukup minta bantuan dari perintah itu sendiri:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Batas Ukuran Aliran

Banyak aplikasi tidak ingin mengumpulkan data ke dalam aliran selamanya. Seringkali berguna untuk memiliki jumlah maksimum pesan yang diperbolehkan per thread. Dalam kasus lain, akan berguna untuk memindahkan semua pesan dari thread ke penyimpanan persisten lain ketika ukuran thread yang ditentukan tercapai. Anda dapat membatasi ukuran aliran menggunakan parameter MAXLEN di perintah XTAMBAH:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Saat menggunakan MAXLEN, rekaman lama secara otomatis dihapus ketika mencapai panjang tertentu, sehingga aliran memiliki ukuran yang konstan. Namun, pemangkasan dalam kasus ini tidak terjadi dengan cara yang paling efisien di memori Redis. Anda dapat memperbaiki situasi ini sebagai berikut:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argumen ~ pada contoh di atas berarti kita tidak perlu membatasi panjang aliran ke nilai tertentu. Dalam contoh kita, angka ini bisa berupa angka apa pun yang lebih besar atau sama dengan 1000 (misalnya, 1000, 1010, atau 1030). Kami baru saja secara eksplisit menetapkan bahwa kami ingin aliran kami menyimpan setidaknya 1000 catatan. Hal ini membuat manajemen memori jauh lebih efisien di dalam Redis.

Ada juga tim terpisah XTRIM, yang melakukan hal yang sama:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Penyimpanan dan replikasi yang persisten

Redis Stream direplikasi secara asinkron ke node budak dan disimpan ke file seperti AOF (snapshot semua data) dan RDB (log semua operasi penulisan). Replikasi negara Kelompok Konsumen juga didukung. Oleh karena itu, jika sebuah pesan berstatus “pending” pada node master, maka pada node slave pesan tersebut akan memiliki status yang sama.

Menghapus elemen individual dari aliran

Ada perintah khusus untuk menghapus pesan XDEL. Perintah ini mendapatkan nama thread diikuti dengan ID pesan yang akan dihapus:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Saat menggunakan perintah ini, Anda perlu memperhitungkan bahwa memori sebenarnya tidak akan segera dilepaskan.

Aliran dengan panjang nol

Perbedaan antara aliran dan struktur data Redis lainnya adalah ketika struktur data lain tidak lagi memiliki elemen di dalamnya, sebagai efek sampingnya, struktur data itu sendiri akan dihapus dari memori. Jadi, misalnya, kumpulan yang diurutkan akan dihapus seluruhnya ketika panggilan ZREM menghapus elemen terakhir. Sebaliknya, thread dibiarkan tetap berada di memori bahkan tanpa ada elemen apa pun di dalamnya.

Kesimpulan

Redis Stream sangat ideal untuk membuat perantara pesan, antrean pesan, pencatatan log terpadu, dan sistem obrolan penyimpanan riwayat.

Seperti yang pernah saya katakan Niklaus Wirth, program adalah algoritma dan struktur data, dan Redis telah memberikan Anda keduanya.

Sumber: www.habr.com

Tambah komentar