Redis Stream - kebolehpercayaan dan skalabiliti sistem pemesejan anda

Redis Stream - kebolehpercayaan dan skalabiliti sistem pemesejan anda

Redis Stream ialah jenis data abstrak baharu yang diperkenalkan dalam Redis dengan versi 5.0
Dari segi konsep, Redis Stream ialah Senarai yang boleh anda tambahkan entri. Setiap entri mempunyai pengecam unik. Secara lalai, ID dijana secara automatik dan termasuk cap masa. Oleh itu, anda boleh menanyakan julat rekod dari semasa ke semasa, atau menerima data baharu apabila ia tiba dalam strim, sama seperti perintah "tail -f" Unix membaca fail log dan membeku sementara menunggu data baharu. Ambil perhatian bahawa berbilang pelanggan boleh mendengar benang pada masa yang sama, sama seperti banyak proses "tail -f" boleh membaca fail secara serentak tanpa bercanggah antara satu sama lain.

Untuk memahami semua faedah jenis data baharu, mari kita lihat dengan pantas struktur Redis yang telah lama wujud yang sebahagiannya mereplikasi kefungsian Redis Stream.

Redis PUB/SUB

Redis Pub/Sub ialah sistem pemesejan mudah yang telah dibina ke dalam stor nilai kunci anda. Walau bagaimanapun, kesederhanaan datang pada harga:

  • Jika penerbit atas sebab tertentu gagal, maka dia kehilangan semua pelanggannya
  • Penerbit perlu mengetahui alamat sebenar semua pelanggannya
  • Penerbit mungkin membebankan pelanggannya dengan kerja jika data diterbitkan lebih cepat daripada diproses
  • Mesej dipadamkan daripada penimbal penerbit sejurus selepas penerbitan, tanpa mengira bilangan pelanggan yang dihantar dan seberapa cepat mereka dapat memproses mesej ini.
  • Semua pelanggan akan menerima mesej pada masa yang sama. Pelanggan sendiri entah bagaimana mesti bersetuju sesama mereka mengenai susunan pemprosesan mesej yang sama.
  • Tiada mekanisme terbina dalam untuk mengesahkan bahawa pelanggan telah berjaya memproses mesej. Jika pelanggan menerima mesej dan ranap semasa pemprosesan, penerbit tidak akan mengetahui tentangnya.

Senarai Redis

Senarai Redis ialah struktur data yang menyokong menyekat perintah baca. Anda boleh menambah dan membaca mesej dari awal atau akhir senarai. Berdasarkan struktur ini, anda boleh membuat timbunan atau baris gilir yang baik untuk sistem edaran anda, dan dalam kebanyakan kes ini sudah mencukupi. Perbezaan utama daripada Redis Pub/Sub:

  • Mesej dihantar kepada satu pelanggan. Pelanggan yang disekat baca pertama akan menerima data terlebih dahulu.
  • Clint mesti memulakan operasi baca untuk setiap mesej sendiri. Senarai tidak tahu apa-apa tentang pelanggan.
  • Mesej disimpan sehingga seseorang membacanya atau memadamkannya secara eksplisit. Jika anda mengkonfigurasi pelayan Redis untuk mengepam data ke cakera, maka kebolehpercayaan sistem meningkat secara mendadak.

Pengenalan kepada Strim

Menambah entri pada strim

Pasukan XADD menambah entri baharu pada strim. Rekod bukan sekadar rentetan, ia terdiri daripada satu atau lebih pasangan nilai kunci. Oleh itu, setiap entri sudah berstruktur dan menyerupai struktur fail CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Dalam contoh di atas, kami menambah dua medan pada strim dengan nama (kunci) "mystream": "id sensor" dan "suhu" dengan nilai "1234" dan "19.8", masing-masing. Sebagai hujah kedua, arahan mengambil pengecam yang akan diberikan kepada entri - pengecam ini secara unik mengenal pasti setiap entri dalam strim. Walau bagaimanapun, dalam kes ini kami lulus * kerana kami mahu Redis menjana ID baharu untuk kami. Setiap ID baharu akan meningkat. Oleh itu, setiap entri baharu akan mempunyai pengecam yang lebih tinggi berhubung dengan entri sebelumnya.

Format pengecam

ID entri dikembalikan oleh arahan XADD, terdiri daripada dua bahagian:

{millisecondsTime}-{sequenceNumber}

milisaatMasa β€” Masa Unix dalam milisaat (masa pelayan Redis). Walau bagaimanapun, jika masa semasa adalah sama atau kurang daripada masa rakaman sebelumnya, maka cap masa rakaman sebelumnya digunakan. Oleh itu, jika masa pelayan kembali ke masa, pengecam baharu masih akan mengekalkan sifat kenaikan.

Nombor urutan digunakan untuk rekod yang dibuat dalam milisaat yang sama. Nombor urutan akan ditambah 1 berbanding entri sebelumnya. Kerana ia Nombor urutan adalah 64 bit dalam saiz, maka dalam amalan anda tidak sepatutnya menghadapi had pada bilangan rekod yang boleh dijana dalam satu milisaat.

Format pengecam sedemikian mungkin kelihatan pelik pada pandangan pertama. Pembaca yang tidak percaya mungkin tertanya-tanya mengapa masa adalah sebahagian daripada pengecam. Sebabnya ialah aliran Redis menyokong pertanyaan julat mengikut ID. Memandangkan pengecam dikaitkan dengan masa rekod dibuat, ini memungkinkan untuk membuat pertanyaan julat masa. Kami akan melihat contoh khusus apabila kami melihat arahan XRANGE.

Jika atas sebab tertentu pengguna perlu menentukan pengecamnya sendiri, yang, sebagai contoh, dikaitkan dengan beberapa sistem luaran, maka kita boleh menyampaikannya kepada arahan XADD bukannya * seperti yang ditunjukkan di bawah:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Sila ambil perhatian bahawa dalam kes ini anda mesti memantau sendiri kenaikan ID. Dalam contoh kami, pengecam minimum ialah "0-1", jadi arahan itu tidak akan menerima pengecam lain yang sama atau kurang daripada "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Bilangan rekod setiap aliran

Adalah mungkin untuk mendapatkan bilangan rekod dalam aliran hanya dengan menggunakan arahan XLEN. Untuk contoh kami, arahan ini akan mengembalikan nilai berikut:

> XLEN somestream
(integer) 2

Pertanyaan julat - XRANGE dan XREVRANGE

Untuk meminta data mengikut julat, kita perlu menentukan dua pengecam - permulaan dan penghujung julat. Julat yang dikembalikan akan merangkumi semua elemen, termasuk sempadan. Terdapat juga dua pengecam khas "-" dan "+", masing-masing bermaksud pengecam terkecil (rekod pertama) dan terbesar (rekod terakhir) dalam strim. Contoh di bawah akan menyenaraikan semua entri strim.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Setiap rekod yang dikembalikan ialah tatasusunan dua elemen: pengecam dan senarai pasangan nilai kunci. Kami telah mengatakan bahawa pengecam rekod berkaitan dengan masa. Oleh itu, kami boleh meminta julat tempoh masa tertentu. Walau bagaimanapun, kami boleh menentukan dalam permintaan bukan pengecam penuh, tetapi hanya masa Unix, meninggalkan bahagian yang berkaitan dengan Nombor urutan. Bahagian pengecam yang ditinggalkan akan secara automatik ditetapkan kepada sifar pada permulaan julat dan kepada nilai maksimum yang mungkin pada penghujung julat. Di bawah ialah contoh cara anda boleh meminta julat dua milisaat.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Kami hanya mempunyai satu entri dalam julat ini, namun dalam set data sebenar, hasil yang dikembalikan boleh menjadi besar. Atas sebab ini XRANGE menyokong pilihan COUNT. Dengan menyatakan kuantiti, kita hanya boleh mendapatkan N rekod pertama. Jika kita perlu mendapatkan rekod N seterusnya (penomboran), kita boleh menggunakan ID yang diterima terakhir, meningkatkannya Nombor urutan oleh seorang dan bertanya lagi. Mari kita lihat ini dalam contoh berikut. Kami mula menambah 10 elemen dengan XADD (dengan mengandaikan aliran saya sudah diisi dengan 10 elemen). Untuk memulakan lelaran mendapat 2 elemen bagi setiap arahan, kita mulakan dengan julat penuh tetapi dengan COUNT sama dengan 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Untuk meneruskan lelaran dengan dua elemen seterusnya, kita perlu memilih ID terakhir yang diterima, iaitu 1519073279157-0, dan menambah 1 kepada Nombor urutan.
ID yang terhasil, dalam kes ini 1519073279157-1, kini boleh digunakan sebagai hujah julat permulaan baharu untuk panggilan seterusnya XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Dan sebagainya. Kerana kerumitan XRANGE ialah O(log(N)) untuk mencari dan kemudian O(M) untuk mengembalikan elemen M, maka setiap langkah lelaran adalah pantas. Oleh itu, menggunakan XRANGE aliran boleh diulang dengan cekap.

Pasukan XREVRANGE adalah setara XRANGE, tetapi mengembalikan elemen dalam susunan terbalik:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Sila ambil perhatian bahawa arahan XREVRANGE mengambil hujah julat bermula dan berhenti dalam susunan terbalik.

Membaca entri baharu menggunakan XREAD

Selalunya tugas timbul untuk melanggan aliran dan hanya menerima mesej baharu. Konsep ini mungkin kelihatan serupa dengan Redis Pub/Sub atau menyekat Senarai Redis, tetapi terdapat perbezaan asas dalam cara menggunakan Redis Stream:

  1. Setiap mesej baharu dihantar kepada setiap pelanggan secara lalai. Tingkah laku ini berbeza daripada Senarai Redis yang menyekat, di mana mesej baharu hanya akan dibaca oleh seorang pelanggan.
  2. Semasa dalam Redis Pub/Sub semua mesej dilupakan dan tidak pernah berterusan, dalam Strim semua mesej dikekalkan selama-lamanya (melainkan pelanggan secara eksplisit menyebabkan pemadaman).
  3. Redis Stream membolehkan anda membezakan akses kepada mesej dalam satu aliran. Pelanggan tertentu hanya boleh melihat sejarah mesej peribadi mereka.

Anda boleh melanggan urutan dan menerima mesej baharu menggunakan arahan itu XBACA. Ia lebih rumit sedikit daripada XRANGE, jadi kita akan mulakan dengan contoh yang lebih mudah dahulu.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Contoh di atas menunjukkan borang tidak menyekat XBACA. Ambil perhatian bahawa pilihan COUNT adalah pilihan. Malah, satu-satunya pilihan arahan yang diperlukan ialah pilihan STREAMS, yang menentukan senarai strim bersama-sama dengan pengecam maksimum yang sepadan. Kami menulis "STREAMS mystream 0" - kami mahu menerima semua rekod strim mystream dengan pengecam yang lebih besar daripada "0-0". Seperti yang anda lihat daripada contoh, arahan itu mengembalikan nama utas kerana kami boleh melanggan berbilang utas pada masa yang sama. Kita boleh menulis, sebagai contoh, "STREAMS mystream otherstream 0 0". Sila ambil perhatian bahawa selepas pilihan STREAMS kami perlu memberikan nama semua strim yang diperlukan dahulu dan kemudian senarai pengecam.

Dalam bentuk mudah ini arahan tidak melakukan sesuatu yang istimewa berbanding dengan XRANGE. Walau bagaimanapun, perkara yang menarik ialah kita boleh berpaling dengan mudah XBACA kepada arahan menyekat, menyatakan hujah BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Dalam contoh di atas, pilihan BLOCK baharu ditentukan dengan tamat masa 0 milisaat (ini bermakna menunggu selama-lamanya). Selain itu, daripada menghantar pengecam biasa untuk strim mystream, pengecam khas $ telah diluluskan. Pengecam khas ini bermaksud bahawa XBACA mesti menggunakan pengecam maksimum dalam aliran saya sebagai pengecam. Jadi kami hanya akan menerima mesej baharu bermula dari saat kami mula mendengar. Dalam beberapa cara ini serupa dengan perintah "tail -f" Unix.

Ambil perhatian bahawa apabila menggunakan pilihan BLOCK kita tidak semestinya perlu menggunakan pengecam khas $. Kami boleh menggunakan sebarang pengecam yang sedia ada dalam strim. Jika pasukan boleh memberikan permintaan kami dengan segera tanpa menyekat, ia akan berbuat demikian, jika tidak ia akan menyekat.

Menyekat XBACA juga boleh mendengar berbilang utas serentak, anda hanya perlu menentukan namanya. Dalam kes ini, arahan akan mengembalikan rekod aliran pertama yang menerima data. Pelanggan pertama yang disekat untuk urutan tertentu akan menerima data terlebih dahulu.

Kumpulan Pengguna

Dalam tugas tertentu, kami ingin mengehadkan akses pelanggan kepada mesej dalam satu urutan. Contoh di mana ini boleh berguna ialah baris gilir mesej dengan pekerja yang akan menerima mesej berbeza daripada urutan, membenarkan pemprosesan mesej berskala.

Jika kita bayangkan bahawa kita mempunyai tiga pelanggan C1, C2, C3 dan urutan yang mengandungi mesej 1, 2, 3, 4, 5, 6, 7, maka mesej akan disampaikan seperti dalam rajah di bawah:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Untuk mencapai kesan ini, Redis Stream menggunakan konsep yang dipanggil Kumpulan Pengguna. Konsep ini serupa dengan pelanggan pseudo, yang menerima data daripada aliran, tetapi sebenarnya disediakan oleh berbilang pelanggan dalam kumpulan, memberikan jaminan tertentu:

  1. Setiap mesej dihantar kepada pelanggan yang berbeza dalam kumpulan.
  2. Dalam kumpulan, pelanggan dikenal pasti dengan nama mereka, yang merupakan rentetan sensitif huruf besar-besaran. Jika pelanggan terkeluar daripada kumpulan buat sementara waktu, dia boleh dipulihkan kepada kumpulan menggunakan nama uniknya sendiri.
  3. Setiap Kumpulan Pengguna mengikut konsep "mesej pertama yang belum dibaca". Apabila pelanggan meminta mesej baharu, ia hanya boleh menerima mesej yang tidak pernah dihantar sebelum ini kepada mana-mana pelanggan dalam kumpulan.
  4. Terdapat arahan untuk mengesahkan secara jelas bahawa mesej telah berjaya diproses oleh pelanggan. Sehingga arahan ini dipanggil, mesej yang diminta akan kekal dalam status "pending".
  5. Dalam Kumpulan Pengguna, setiap pelanggan boleh meminta sejarah mesej yang dihantar kepadanya, tetapi masih belum diproses (dalam status "pending")

Dalam erti kata lain, keadaan kumpulan itu boleh dinyatakan seperti berikut:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Kini tiba masanya untuk membiasakan diri dengan arahan utama untuk Kumpulan Pengguna, iaitu:

  • XGROUP digunakan untuk mencipta, memusnahkan dan mengurus kumpulan
  • XREADGROUP digunakan untuk membaca aliran melalui kumpulan
  • XACK - arahan ini membolehkan pelanggan menandakan mesej sebagai berjaya diproses

Penciptaan Kumpulan Pengguna

Mari kita anggap bahawa aliran saya sudah wujud. Kemudian arahan penciptaan kumpulan akan kelihatan seperti:

> XGROUP CREATE mystream mygroup $
OK

Apabila membuat kumpulan, kita mesti lulus pengecam, bermula dari mana kumpulan akan menerima mesej. Jika kami hanya mahu menerima semua mesej baharu, maka kami boleh menggunakan pengecam khas $ (seperti dalam contoh kami di atas). Jika anda menentukan 0 dan bukannya pengecam khas, maka semua mesej dalam urutan akan tersedia kepada kumpulan.

Sekarang kumpulan itu dibuat, kita boleh mula membaca mesej dengan segera menggunakan arahan XREADGROUP. Perintah ini sangat mirip dengan XBACA dan menyokong pilihan BLOCK pilihan. Walau bagaimanapun, terdapat pilihan KUMPULAN yang diperlukan yang mesti sentiasa dinyatakan dengan dua argumen: nama kumpulan dan nama pelanggan. Pilihan COUNT juga disokong.

Sebelum membaca benang, mari letakkan beberapa mesej di sana:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Sekarang mari cuba baca aliran ini melalui kumpulan:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Perintah di atas dibaca secara verbatim seperti berikut:

"Saya, pelanggan Alice, ahli kumpulan saya, ingin membaca satu mesej daripada aliran saya yang tidak pernah dihantar kepada sesiapa sebelum ini."

Setiap kali pelanggan melakukan operasi pada kumpulan, ia mesti memberikan namanya, secara unik mengenal pasti dirinya dalam kumpulan. Terdapat satu lagi butiran yang sangat penting dalam arahan di atas - pengecam khas ">". Pengecam khas ini menapis mesej, meninggalkan hanya mesej yang tidak pernah dihantar sebelum ini.

Juga, dalam kes khas, anda boleh menentukan pengecam sebenar seperti 0 atau mana-mana pengecam sah lain. Dalam kes ini arahan XREADGROUP akan mengembalikan anda sejarah mesej dengan status "pending" yang dihantar kepada pelanggan tertentu (Alice) tetapi belum lagi diakui menggunakan arahan XACK.

Kita boleh menguji tingkah laku ini dengan segera menyatakan ID 0, tanpa pilihan COUNT. Kami hanya akan melihat satu mesej yang belum selesai, iaitu mesej epal:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Walau bagaimanapun, jika kami mengesahkan mesej sebagai berjaya diproses, maka ia tidak akan dipaparkan lagi:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Kini giliran Bob untuk membaca sesuatu:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, ahli kumpulan saya, meminta tidak lebih daripada dua mesej. Perintah hanya melaporkan mesej yang tidak dihantar kerana pengecam khas ">". Seperti yang anda lihat, mesej "epal" tidak akan dipaparkan kerana ia telah dihantar kepada Alice, jadi Bob menerima "oren" dan "strawberi".

Dengan cara ini, Alice, Bob dan mana-mana pelanggan kumpulan lain boleh membaca mesej yang berbeza daripada strim yang sama. Mereka juga boleh membaca sejarah mesej mereka yang belum diproses atau menandai mesej sebagai diproses.

Terdapat beberapa perkara yang perlu diingat:

  • Sebaik sahaja pelanggan menganggap mesej itu sebagai arahan XREADGROUP, mesej ini masuk ke dalam keadaan "belum selesai" dan diberikan kepada pelanggan tertentu itu. Pelanggan kumpulan lain tidak akan dapat membaca mesej ini.
  • Pelanggan dibuat secara automatik selepas sebutan pertama, tidak perlu menciptanya secara eksplisit.
  • Dengan XREADGROUP anda boleh membaca mesej daripada beberapa utas berbeza pada masa yang sama, namun untuk ini berfungsi, anda perlu membuat kumpulan dengan nama yang sama untuk setiap utas menggunakan XGROUP

Pemulihan selepas kegagalan

Pelanggan boleh pulih daripada kegagalan dan membaca semula senarai mesejnya dengan status "pending". Walau bagaimanapun, dalam dunia sebenar, pelanggan akhirnya mungkin gagal. Apakah yang berlaku kepada mesej pelanggan yang tersekat jika pelanggan tidak dapat pulih daripada kegagalan?
Kumpulan Pengguna menawarkan ciri yang digunakan untuk kes sedemikian sahaja - apabila anda perlu menukar pemilik mesej.

Perkara pertama yang perlu anda lakukan ialah memanggil arahan XPENDING, yang memaparkan semua mesej dalam kumpulan dengan status "pending". Dalam bentuk yang paling mudah, arahan dipanggil dengan hanya dua argumen: nama benang dan nama kumpulan:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Pasukan itu memaparkan bilangan mesej yang tidak diproses untuk keseluruhan kumpulan dan untuk setiap pelanggan. Kami hanya mempunyai Bob dengan dua mesej tertunggak kerana satu-satunya mesej yang diminta Alice telah disahkan XACK.

Kami boleh meminta lebih banyak maklumat menggunakan lebih banyak hujah:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - julat pengecam (anda boleh menggunakan β€œ-” dan β€œ+”)
{count} β€” bilangan percubaan penghantaran
{consumer-name} - nama kumpulan

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Kini kami mempunyai butiran untuk setiap mesej: ID, nama pelanggan, masa melahu dalam milisaat dan akhirnya bilangan percubaan penghantaran. Kami mempunyai dua mesej daripada Bob dan ia telah melahu selama 74170458 milisaat, kira-kira 20 jam.

Sila ambil perhatian bahawa tiada siapa yang menghalang kami daripada menyemak kandungan mesej itu hanya dengan menggunakan XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Kita hanya perlu mengulangi pengecam yang sama dua kali dalam hujah. Memandangkan kami mempunyai sedikit idea, Alice mungkin memutuskan bahawa selepas 20 jam masa rehat, Bob mungkin tidak akan pulih dan sudah tiba masanya untuk menanyakan mesej tersebut dan menyambung semula memprosesnya untuk Bob. Untuk ini kami menggunakan arahan XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Menggunakan arahan ini, kami boleh menerima mesej "asing" yang masih belum diproses dengan menukar pemilik kepada {consumer}. Walau bagaimanapun, kami juga boleh menyediakan masa terbiar minimum {min-idle-time}. Ini membantu mengelakkan situasi di mana dua pelanggan cuba menukar pemilik mesej yang sama secara serentak:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pelanggan pertama akan menetapkan semula masa henti dan meningkatkan kaunter penghantaran. Jadi pelanggan kedua tidak akan dapat memintanya.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Mesej itu berjaya dituntut oleh Alice, yang kini boleh memproses mesej dan mengakuinya.

Daripada contoh di atas, anda boleh melihat bahawa permintaan yang berjaya mengembalikan kandungan mesej itu sendiri. Walau bagaimanapun, ini tidak perlu. Pilihan JUSTID boleh digunakan untuk mengembalikan ID mesej sahaja. Ini berguna jika anda tidak berminat dengan butiran mesej dan ingin meningkatkan prestasi sistem.

Kaunter penghantaran

Kaunter yang anda lihat dalam output XPENDING ialah bilangan penghantaran setiap mesej. Kaunter sedemikian ditambah dalam dua cara: apabila mesej berjaya diminta melalui XCLAIM atau apabila panggilan digunakan XREADGROUP.

Adalah perkara biasa untuk sesetengah mesej dihantar beberapa kali. Perkara utama ialah semua mesej akhirnya diproses. Kadangkala masalah berlaku semasa memproses mesej kerana mesej itu sendiri rosak, atau pemprosesan mesej menyebabkan ralat dalam kod pengendali. Dalam kes ini, nampaknya tiada siapa yang akan dapat memproses mesej ini. Memandangkan kami mempunyai kaunter percubaan penghantaran, kami boleh menggunakan kaunter ini untuk mengesan situasi sedemikian. Oleh itu, sebaik sahaja kiraan penghantaran mencapai jumlah tinggi yang anda tentukan, mungkin lebih bijak untuk meletakkan mesej sedemikian pada urutan lain dan menghantar pemberitahuan kepada pentadbir sistem.

Keadaan Benang

Pasukan XINFO digunakan untuk meminta pelbagai maklumat tentang benang dan kumpulannya. Sebagai contoh, arahan asas kelihatan seperti ini:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Perintah di atas memaparkan maklumat umum tentang aliran yang ditentukan. Sekarang contoh yang lebih kompleks:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Perintah di atas memaparkan maklumat umum untuk semua kumpulan benang yang ditentukan

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Perintah di atas memaparkan maklumat untuk semua pelanggan aliran dan kumpulan yang ditentukan.
Jika anda terlupa sintaks arahan, cuma minta arahan itu sendiri untuk mendapatkan bantuan:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Had Saiz Strim

Banyak aplikasi tidak mahu mengumpul data ke dalam aliran selama-lamanya. Selalunya berguna untuk mempunyai bilangan maksimum mesej yang dibenarkan bagi setiap urutan. Dalam kes lain, adalah berguna untuk mengalihkan semua mesej dari utas ke kedai berterusan yang lain apabila saiz utas yang ditentukan dicapai. Anda boleh mengehadkan saiz aliran menggunakan parameter MAXLEN dalam arahan XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Apabila menggunakan MAXLEN, rekod lama dipadamkan secara automatik apabila ia mencapai panjang yang ditentukan, jadi strim mempunyai saiz yang tetap. Walau bagaimanapun, pemangkasan dalam kes ini tidak berlaku dengan cara yang paling berkesan dalam ingatan Redis. Anda boleh memperbaiki keadaan seperti berikut:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Argumen ~ dalam contoh di atas bermakna kita tidak semestinya perlu mengehadkan panjang aliran kepada nilai tertentu. Dalam contoh kami, ini boleh menjadi sebarang nombor yang lebih besar daripada atau sama dengan 1000 (contohnya, 1000, 1010 atau 1030). Kami hanya menyatakan dengan jelas bahawa kami mahu strim kami menyimpan sekurang-kurangnya 1000 rekod. Ini menjadikan pengurusan memori lebih cekap di dalam Redis.

Terdapat juga pasukan yang berasingan XTRIM, yang melakukan perkara yang sama:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Penyimpanan dan replikasi berterusan

Redis Stream direplikasi secara tak segerak ke nod hamba dan disimpan ke fail seperti AOF (snapshot semua data) dan RDB (log semua operasi tulis). Replikasi keadaan Kumpulan Pengguna juga disokong. Oleh itu, jika mesej berada dalam status "pending" pada nod induk, maka pada nod hamba mesej ini akan mempunyai status yang sama.

Mengalih keluar elemen individu daripada aliran

Terdapat arahan khas untuk memadam mesej XDEL. Perintah itu mendapat nama utas diikuti dengan ID mesej yang akan dipadamkan:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Apabila menggunakan arahan ini, anda perlu mengambil kira bahawa memori sebenar tidak akan dikeluarkan serta-merta.

Aliran panjang sifar

Perbezaan antara aliran dan struktur data Redis yang lain ialah apabila struktur data lain tidak lagi mempunyai elemen di dalamnya, sebagai kesan sampingan, struktur data itu sendiri akan dialih keluar daripada ingatan. Jadi, sebagai contoh, set yang diisih akan dialih keluar sepenuhnya apabila panggilan ZREM mengalih keluar elemen terakhir. Sebaliknya, benang dibenarkan kekal dalam ingatan walaupun tanpa sebarang unsur di dalamnya.

Kesimpulan

Redis Stream sesuai untuk membuat broker mesej, baris gilir mesej, pengelogan bersatu dan sistem sembang menyimpan sejarah.

Seperti yang pernah saya katakan Niklaus Wirth, program ialah algoritma serta struktur data dan Redis sudah memberikan anda kedua-duanya.

Sumber: www.habr.com

Tambah komen