Redis Stream - fidindeco kaj skaleblo de viaj mesaĝaj sistemoj

Redis Stream - fidindeco kaj skaleblo de viaj mesaĝaj sistemoj

Redis Stream estas nova abstrakta datumtipo enkondukita en Redis kun versio 5.0
Koncipe, Redis Stream estas Listo al kiu vi povas aldoni enskribojn. Ĉiu eniro havas unikan identigilon. Defaŭlte, la ID estas aŭtomate generita kaj inkluzivas tempomarkon. Sekve, vi povas pridemandi intervalojn de rekordoj laŭlonge de la tempo, aŭ ricevi novajn datumojn kiam ĝi alvenas en la fluo, same kiel la Unikso "vosto -f" komando legas protokoldosieron kaj frostas dum atendado de novaj datumoj. Notu, ke pluraj klientoj povas aŭskulti fadenon samtempe, same kiel multaj "vosto -f" procezoj povas legi dosieron samtempe sen konflikti unu kun la alia.

Por kompreni ĉiujn avantaĝojn de la nova datumtipo, ni rapide rigardu la longe ekzistantajn Redis-strukturojn, kiuj parte reproduktas la funkciecon de Redis Stream.

Redis PUB/SUB

Redis Pub/Sub estas simpla mesaĝa sistemo jam enkonstruita en via ŝlosilvalora vendejo. Tamen, simpleco havas prezon:

  • Se la eldonisto ial malsukcesas, tiam li perdas ĉiujn siajn abonantojn
  • La eldonejo bezonas scii la precizan adreson de ĉiuj siaj abonantoj
  • Eldonisto povas troŝarĝi siajn abonantojn per laboro se datumoj estas publikigitaj pli rapide ol ĝi estas prilaborita
  • La mesaĝo estas forigita el la bufro de la eldonisto tuj post publikigo, sendepende de kiom da abonantoj ĝi estis liverita kaj kiom rapide ili povis prilabori ĉi tiun mesaĝon.
  • Ĉiuj abonantoj ricevos la mesaĝon samtempe. La abonantoj mem devas iel interkonsenti pri la ordo de prilaboro de la sama mesaĝo.
  • Ne ekzistas enkonstruita mekanismo por konfirmi ke abonanto sukcese prilaboris mesaĝon. Se abonanto ricevas mesaĝon kaj kraŝas dum prilaborado, la eldonejo ne scios pri ĝi.

Redis Listo

Redis List estas datumstrukturo kiu subtenas blokadon de legkomandoj. Vi povas aldoni kaj legi mesaĝojn de la komenco aŭ fino de la listo. Surbaze de ĉi tiu strukturo, vi povas fari bonan stakon aŭ voston por via distribuita sistemo, kaj plejofte ĉi tio sufiĉos. Ĉefaj diferencoj de Redis Pub/Sub:

  • La mesaĝo estas transdonita al unu kliento. La unua leg-blokita kliento ricevos la datumojn unue.
  • Clint devas mem iniciati la legoperacion por ĉiu mesaĝo. Listo scias nenion pri klientoj.
  • Mesaĝoj estas konservitaj ĝis iu legas ilin aŭ eksplicite forigas ilin. Se vi agordas la Redis-servilon por flui datumojn al disko, tiam la fidindeco de la sistemo draste pliiĝas.

Enkonduko al Stream

Aldonante eniron al rivereto

teamo XADD aldonas novan eniron al la fluo. Rekordo ne estas nur ĉeno, ĝi konsistas el unu aŭ pluraj ŝlosil-valoraj paroj. Tiel, ĉiu eniro jam estas strukturita kaj similas la strukturon de CSV-dosiero.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

En la supra ekzemplo, ni aldonas du kampojn al la fluo kun la nomo (ŝlosilo) "mystream": "sensilo-id" kaj "temperaturo" kun la valoroj "1234" kaj "19.8", respektive. Kiel la dua argumento, la komando prenas identigilon kiu estos asignita al la eniro - ĉi tiu identigilo unike identigas ĉiun eniron en la fluo. Tamen, ĉi-kaze ni pasis * ĉar ni volas, ke Redis generu novan identigilon por ni. Ĉiu nova identigilo pliiĝos. Tial, ĉiu nova enskribo havos pli altan identigilon rilate al antaŭaj enskriboj.

Formato de identigilo

La enira ID resendita de la komando XADD, konsistas el du partoj:

{millisecondsTime}-{sequenceNumber}

milisekundojTempo — Unikso-simila tempo en milisekundoj (Redis-servila tempo). Tamen, se la nuna tempo estas la sama aŭ malpli ol la tempo de la antaŭa registrado, tiam la tempomarko de la antaŭa registrado estas uzata. Tial, se la servila tempo iras reen en la tempo, la nova identigilo ankoraŭ konservos la pliigan posedaĵon.

sekvencoNumero uzata por rekordoj kreitaj en la sama milisekundo. sekvencoNumero estos pliigita je 1 rilate al la antaŭa eniro. Ĉar la sekvencoNumero estas 64 bitoj en grandeco, tiam praktike vi ne devus renkonti limon de la nombro da rekordoj kiuj povas esti generitaj ene de unu milisekundo.

La formato de tiaj identigiloj povas ŝajni stranga unuavide. Malfida leganto povus scivoli kial la tempo estas parto de la identigilo. La kialo estas, ke Redis-fluoj subtenas gamo-demandojn per ID. Ĉar la identigilo estas rilata al la tempo kiam la rekordo estis kreita, tio ebligas pridemandi tempintervalojn. Ni rigardos specifan ekzemplon kiam ni rigardos la komandon XRANGO.

Se ial la uzanto bezonas specifi sian propran identigilon, kiu, ekzemple, estas asociita kun iu ekstera sistemo, tiam ni povas transdoni ĝin al la komando. XADD anstataŭ * kiel montrite sube:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Bonvolu noti, ke en ĉi tiu kazo vi mem devas kontroli la ID-pliigon. En nia ekzemplo, la minimuma identigilo estas "0-1", do la komando ne akceptos alian identigilon kiu estas egala aŭ malpli ol "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Nombro da rekordoj per rivereto

Eblas akiri la nombron da rekordoj en rivereto simple uzante la komandon XLEN. Por nia ekzemplo, ĉi tiu komando resendos la sekvan valoron:

> XLEN somestream
(integer) 2

Demandoj pri intervalo - XRANGE kaj XREVRANGE

Por peti datumojn laŭ intervalo, ni devas specifi du identigilojn - la komencon kaj la finon de la intervalo. La revena intervalo inkluzivos ĉiujn elementojn, inkluzive de la limoj. Ekzistas ankaŭ du specialaj identigiloj "-" kaj "+", respektive signifanta la plej malgrandan (unuan rekordon) kaj plej grandan (lastan rekordon) identigilon en la rivereto. La malsupra ekzemplo listigos ĉiujn fluajn enskribojn.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Ĉiu revenis rekordo estas tabelo de du elementoj: identigilo kaj listo de ŝlosil-valoraj paroj. Ni jam diris, ke rekordaj identigiloj rilatas al tempo. Tial ni povas peti gamon de specifa tempodaŭro. Tamen, ni povas specifi en la peto ne la plenan identigilon, sed nur la Uniksan tempon, preterlasante la parton rilatan al sekvencoNumero. La preterlasita parto de la identigilo aŭtomate estos agordita al nulo komence de la intervalo kaj al la maksimuma ebla valoro ĉe la fino de la intervalo. Malsupre estas ekzemplo de kiel vi povas peti gamon de du milisekundoj.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Ni havas nur unu eniron en ĉi tiu gamo, tamen en realaj datumaj aroj la rezulto redonita povas esti grandega. Tial XRANGO subtenas la opcion COUNT. Specifante la kvanton, ni povas simple ricevi la unuajn N rekordojn. Se ni bezonas ricevi la sekvajn N rekordojn (paĝigo), ni povas uzi la lastan ricevitan identigilon, pliigi ĝin sekvencoNumero per unu kaj demandu denove. Ni rigardu ĉi tion en la sekva ekzemplo. Ni komencas aldoni 10 elementojn kun XADD (supoze ke mystream jam estis plenigita kun 10 elementoj). Por komenci la ripeton ricevante 2 elementojn per komando, ni komencas kun la plena gamo sed kun COUNT egala al 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Por daŭrigi ripetante kun la sekvaj du elementoj, ni devas elekti la lastan identigilon ricevitan, t.e. 1519073279157-0, kaj aldoni 1 al sekvencoNumero.
La rezulta ID, ĉi-kaze 1519073279157-1, nun povas esti uzata kiel la nova argumento de komenco de intervalo por la sekva voko XRANGO:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Kaj tiel plu. Ĉar komplekseco XRANGO estas O(log(N)) por serĉi kaj tiam O(M) por resendi M elementojn, tiam ĉiu ripeta paŝo estas rapida. Tiel, uzante XRANGO riveretoj povas esti ripetitaj efike.

teamo XREVRANGE estas la ekvivalento XRANGO, sed resendas la elementojn en inversa sinsekvo:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Bonvolu noti, ke la komando XREVRANGE prenas gamon argumentojn komenci kaj halti en inversa sinsekvo.

Legante novajn enskribojn uzante XREAD

Ofte aperas la tasko aboni al rivereto kaj ricevi nur novajn mesaĝojn. Ĉi tiu koncepto povas ŝajni simila al Redis Pub/Sub aŭ blokado de Redis List, sed estas fundamentaj diferencoj pri kiel uzi Redis Stream:

  1. Ĉiu nova mesaĝo estas liverita al ĉiu abonanto defaŭlte. Ĉi tiu konduto diferencas de bloka Redis Listo, kie nova mesaĝo nur estos legata de unu abonanto.
  2. Dum en Redis Pub/Sub ĉiuj mesaĝoj estas forgesitaj kaj neniam daŭras, en Stream ĉiuj mesaĝoj estas konservitaj senfine (krom se la kliento eksplicite kaŭzas forigon).
  3. Redis Stream permesas diferencigi aliron al mesaĝoj ene de unu rivereto. Specifa abonanto povas nur vidi sian personan mesaĝhistorion.

Vi povas aboni fadenon kaj ricevi novajn mesaĝojn per la komando XREAD. Ĝi estas iom pli komplika ol XRANGO, do ni komencos unue per la pli simplaj ekzemploj.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

La supra ekzemplo montras ne-blokan formon XREAD. Notu, ke la opcio COUNT estas nedeviga. Fakte, la nura postulata komanda opcio estas la opcio STREAMS, kiu specifas liston de fluoj kune kun la responda maksimuma identigilo. Ni skribis "STREAMS mystream 0" - ni volas ricevi ĉiujn rekordojn de la mystream stream kun identigilo pli granda ol "0-0". Kiel vi povas vidi de la ekzemplo, la komando resendas la nomon de la fadeno ĉar ni povas aboni plurajn fadenojn samtempe. Ni povus skribi, ekzemple, "STREAMS mystream otherstream 0 0". Bonvolu noti, ke post la opcio STREMS ni unue devas provizi la nomojn de ĉiuj postulataj fluoj kaj nur poste liston de identigiloj.

En ĉi tiu simpla formo la komando faras nenion specialan kompare kun XRANGO. Tamen, la interesa afero estas, ke ni povas facile turniĝi XREAD al bloka komando, precizigante la BLOK-argumenton:

> XREAD BLOCK 0 STREAMS mystream $

En la supra ekzemplo, nova BLOK-opcio estas specifita kun tempodaŭro de 0 milisekundoj (ĉi tio signifas atendi senfine). Krome, anstataŭ pasigi la kutiman identigilon por la rivereto mystream, speciala identigilo $ estis pasigita. Ĉi tiu speciala identigilo signifas tion XREAD devas uzi la maksimuman identigilon en mystream kiel la identigilon. Do ni ricevos novajn mesaĝojn nur ekde la momento kiam ni komencis aŭskulti. Iel ĉi tio similas al la Unikso "vosto -f" komando.

Rimarku, ke kiam oni uzas la opcion BLOKI, ni ne nepre bezonas uzi la specialan identigilon $. Ni povas uzi ajnan identigilon ekzistantan en la rivereto. Se la teamo povas servi nian peton tuj sen blokado, ĝi faros tion, alie ĝi blokos.

Blokado XREAD povas ankaŭ aŭskulti plurajn fadenojn samtempe, vi nur bezonas specifi iliajn nomojn. En ĉi tiu kazo, la komando resendos rekordon de la unua fluo kiu ricevis datumojn. La unua abonanto blokita por antaŭfiksita fadeno ricevos datumojn unue.

Konsumantaj Grupoj

En certaj taskoj, ni volas limigi aliron de abonanto al mesaĝoj ene de unu fadeno. Ekzemplo kie tio povus esti utila estas mesaĝvico kun laboristoj kiuj ricevos malsamajn mesaĝojn de fadeno, permesante mesaĝpretigon al skalo.

Se ni imagas, ke ni havas tri abonantojn C1, C2, C3 kaj fadenon kiu enhavas mesaĝojn 1, 2, 3, 4, 5, 6, 7, tiam la mesaĝoj estos servataj kiel en la suba diagramo:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Por atingi ĉi tiun efikon, Redis Stream uzas koncepton nomitan Consumer Group. Ĉi tiu koncepto estas simila al pseŭdo-abonanto, kiu ricevas datumojn de rivereto, sed estas fakte servata de multoblaj abonantoj ene de grupo, provizante certajn garantiojn:

  1. Ĉiu mesaĝo estas liverita al malsama abonanto ene de la grupo.
  2. Ene de grupo, abonantoj estas identigitaj sub sia nomo, kiu estas uskle-sentema ĉeno. Se abonanto provizore forlasas el la grupo, li povas esti restarigita al la grupo uzante sian propran unikan nomon.
  3. Ĉiu Konsumanto-Grupo sekvas la koncepton de "unua nelegita mesaĝo". Kiam abonanto petas novajn mesaĝojn, ĝi povas nur ricevi mesaĝojn kiuj neniam antaŭe estis liveritaj al iu ajn abonanto ene de la grupo.
  4. Estas komando por eksplicite konfirmi ke la mesaĝo estis sukcese prilaborita de la abonanto. Ĝis ĉi tiu komando estos vokita, la petita mesaĝo restos en la "pritraktata" statuso.
  5. Ene de la Konsumanto-Grupo, ĉiu abonanto povas peti historion de mesaĝoj kiuj estis transdonitaj al li, sed ankoraŭ ne estis prilaboritaj (en la "pritraktata" statuso)

Iasence, la stato de la grupo povas esti esprimita jene:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nun estas tempo konatiĝi kun la ĉefaj komandoj por la Konsumanto-Grupo, nome:

  • XGROUP uzata por krei, detrui kaj administri grupojn
  • XREADGROUP uzata por legi fluon tra grupo
  • XACK - ĉi tiu komando permesas al la abonanto marki la mesaĝon kiel sukcese prilaborita

Kreo de Consumer Group

Ni supozu, ke mystream jam ekzistas. Tiam la komando por krei grupon aspektos kiel:

> XGROUP CREATE mystream mygroup $
OK

Kreante grupon, ni devas pasi identigilon, de kiu la grupo ricevos mesaĝojn. Se ni nur volas ricevi ĉiujn novajn mesaĝojn, tiam ni povas uzi la specialan identigilon $ (kiel en nia supra ekzemplo). Se vi specifas 0 anstataŭ speciala identigilo, tiam ĉiuj mesaĝoj en la fadeno estos disponeblaj por la grupo.

Nun kiam la grupo estas kreita, ni povas tuj komenci legi mesaĝojn per la komando XREADGROUP. Ĉi tiu komando tre similas al XREAD kaj subtenas la laŭvolan opcion BLOCK. Tamen, ekzistas bezonata GROUP-opcio, kiu ĉiam devas esti specifita per du argumentoj: la grupnomo kaj la abonantnomo. La COUNT opcio ankaŭ estas subtenata.

Antaŭ ol legi la fadenon, ni metu kelkajn mesaĝojn tie:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Nun ni provu legi ĉi tiun fluon per la grupo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

La supra komando laŭvorte legas jene:

"Mi, abonanto Alice, membro de la mia grupo, volas legi unu mesaĝon de mystream, kiu neniam antaŭe estis transdonita al iu ajn."

Ĉiufoje kiam abonanto faras operacion sur grupo, ĝi devas disponigi sian nomon, unike identigante sin ene de la grupo. Estas unu plia tre grava detalo en la supra komando - la speciala identigilo ">". Ĉi tiu speciala identigilo filtras mesaĝojn, lasante nur tiujn, kiuj neniam antaŭe estis liveritaj.

Ankaŭ, en specialaj kazoj, vi povas specifi realan identigilon kiel 0 aŭ ajnan alian validan identigilon. En ĉi tiu kazo la komando XREADGROUP resendos al vi historion de mesaĝoj kun stato "pritraktataj", kiuj estis transdonitaj al la specifita abonanto (Alice) sed ankoraŭ ne estis agnoskitaj per la komando. XACK.

Ni povas testi ĉi tiun konduton tuj specifante la ID 0, sen la opcio COUNT. Ni simple vidos ununuran pritraktatan mesaĝon, tio estas, la pommesaĝon:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Tamen, se ni konfirmas la mesaĝon kiel sukcese prilaborita, tiam ĝi ne plu aperos:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nun estas la vico de Bob legi ion:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, membro de mia grupo, petis ne pli ol du mesaĝojn. La komando nur raportas neliverajn mesaĝojn pro la speciala identigilo ">". Kiel vi povas vidi, la mesaĝo "pomo" ne estos montrata ĉar ĝi jam estis transdonita al Alicio, do Bob ricevas "oranĝon" kaj "fragon".

Tiel, Alice, Bob, kaj ajna alia abonanto de la grupo povas legi malsamajn mesaĝojn de la sama rivereto. Ili ankaŭ povas legi sian historion de neprilaboritaj mesaĝoj aŭ marki mesaĝojn kiel prilaboritaj.

Estas kelkaj aferoj por konsideri:

  • Tuj kiam la abonanto konsideras la mesaĝon komando XREADGROUP, ĉi tiu mesaĝo iras en la "pritraktatan" staton kaj estas asignita al tiu specifa abonanto. Aliaj grupaj abonantoj ne povos legi ĉi tiun mesaĝon.
  • Abonantoj aŭtomate kreiĝas je la unua mencio, ne necesas eksplicite krei ilin.
  • Kun la helpo de XREADGROUP vi povas legi mesaĝojn de pluraj malsamaj fadenoj samtempe, tamen por ke tio funkciu, vi unue devas krei grupojn kun la sama nomo por ĉiu fadeno uzante XGROUP

Reakiro post fiasko

La abonanto povas resaniĝi de la malsukceso kaj relegi sian liston de mesaĝoj kun la "pritraktata" statuso. Tamen, en la reala mondo, abonantoj povas finfine malsukcesi. Kio okazas al la blokitaj mesaĝoj de abonanto se la abonanto ne povas resaniĝi post malsukceso?
Consumer Group ofertas funkcion, kiu estas uzata nur por tiaj kazoj - kiam vi bezonas ŝanĝi la posedanton de mesaĝoj.

La unua afero, kiun vi devas fari, estas voki la komandon EXPENDING, kiu montras ĉiujn mesaĝojn en la grupo kun la statuso "pritraktata". En ĝia plej simpla formo, la komando estas vokita kun nur du argumentoj: la fadennomo kaj la grupnomo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

La teamo montris la nombron da neprilaboritaj mesaĝoj por la tuta grupo kaj por ĉiu abonanto. Ni havas nur Bob kun du elstaraj mesaĝoj ĉar la nura mesaĝo, kiun Alico petis, estis konfirmita XACK.

Ni povas peti pli da informoj uzante pli da argumentoj:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - gamo da identigiloj (vi povas uzi "-" kaj "+")
{count} — nombro da liveraj provoj
{konsumanto-nomo} - grupnomo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nun ni havas detalojn por ĉiu mesaĝo: ID, nomo de abonanto, neaktiva tempo en milisekundoj kaj fine la nombro da liveraj provoj. Ni havas du mesaĝojn de Bob kaj ili estis neaktivaj dum 74170458 milisekundoj, ĉirkaŭ 20 horoj.

Bonvolu noti, ke neniu malhelpas nin kontroli, kio estis la enhavo de la mesaĝo simple uzante XRANGO.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Ni nur devas ripeti la saman identigilon dufoje en la argumentoj. Nun kiam ni havas ian ideon, Alico eble decidos, ke post 20 horoj da malfunkcio, Bob verŝajne ne resaniĝos, kaj estas tempo pridemandi tiujn mesaĝojn kaj rekomenci prilabori ilin por Bob. Por tio ni uzas la komandon XKLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Uzante ĉi tiun komandon, ni povas ricevi "fremdan" mesaĝon, kiu ankoraŭ ne estis prilaborita, ŝanĝante la posedanton al {konsumanto}. Tamen, ni ankaŭ povas disponigi minimuman neaktiva tempo {min-idle-time}. Ĉi tio helpas eviti situacion kie du klientoj provas samtempe ŝanĝi la posedanton de la samaj mesaĝoj:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

La unua kliento restarigos la malfunkcion kaj pliigos la liveran nombrilon. Do la dua kliento ne povos peti ĝin.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

La mesaĝo estis sukcese postulita de Alice, kiu nun povas prilabori la mesaĝon kaj agnoski ĝin.

El la supra ekzemplo, vi povas vidi, ke sukcesa peto resendas la enhavon de la mesaĝo mem. Tamen ĉi tio ne estas necesa. La JUSTID-opcio povas esti uzata por resendi mesaĝajn identigilojn nur. Ĉi tio estas utila se vi ne interesiĝas pri la detaloj de la mesaĝo kaj volas pliigi sisteman rendimenton.

Livero vendotablo

La nombrilo, kiun vi vidas en la eligo EXPENDING estas la nombro da liveroj de ĉiu mesaĝo. Tia nombrilo estas pliigita en du manieroj: kiam mesaĝo estas sukcese petita per XKLAIM aŭ kiam voko estas uzata XREADGROUP.

Estas normale, ke iuj mesaĝoj estas liveritaj plurfoje. La ĉefa afero estas, ke ĉiuj mesaĝoj estas eventuale prilaboritaj. Foje problemoj okazas dum prilaborado de mesaĝo ĉar la mesaĝo mem estas koruptita, aŭ mesaĝprilaborado kaŭzas eraron en la pritraktila kodo. En ĉi tiu kazo, povas rezulti, ke neniu povos prilabori ĉi tiun mesaĝon. Ĉar ni havas liverprovan nombrilon, ni povas uzi ĉi tiun nombrilon por detekti tiajn situaciojn. Sekve, post kiam la livernombro atingas la altan nombron, kiun vi specifas, verŝajne estus pli saĝe meti tian mesaĝon en alian fadenon kaj sendi sciigon al la sistemadministranto.

Fadeno Ŝtato

teamo XINFO uzata por peti diversajn informojn pri fadeno kaj ĝiaj grupoj. Ekzemple, baza komando aspektas jene:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

La supra komando montras ĝeneralajn informojn pri la specifita rivereto. Nun iomete pli kompleksa ekzemplo:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

La supra komando montras ĝeneralajn informojn por ĉiuj grupoj de la specifita fadeno

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

La supra komando montras informojn por ĉiuj abonantoj de la specifita rivereto kaj grupo.
Se vi forgesas la komandan sintakson, simple petu helpon de la komando mem:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Rojo Grandeca Limo

Multaj aplikoj ne volas kolekti datumojn en fluon por ĉiam. Ofte utilas havi maksimuman nombron da mesaĝoj permesitaj per fadeno. En aliaj kazoj, estas utile movi ĉiujn mesaĝojn de fadeno al alia konstanta vendejo kiam la specifita fadeno estas atingita. Vi povas limigi la grandecon de rivereto uzante la MAXLEN-parametron en la komando XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kiam oni uzas MAXLEN, malnovaj registroj estas aŭtomate forigitaj kiam ili atingas specifitan longon, do la rivereto havas konstantan grandecon. Tamen, pritondado en ĉi tiu kazo ne okazas en la plej efika maniero en Redis-memoro. Vi povas plibonigi la situacion jene:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

La ~-argumento en la supra ekzemplo signifas, ke ni ne nepre bezonas limigi la fluolongon al specifa valoro. En nia ekzemplo, ĉi tio povus esti ajna nombro pli granda ol aŭ egala al 1000 (ekzemple, 1000, 1010 aŭ 1030). Ni ĵus eksplicite specifis, ke ni volas, ke nia fluo stoku almenaŭ 1000 rekordojn. Ĉi tio faras memoradministradon multe pli efika ene de Redis.

Estas ankaŭ aparta teamo XTRIM, kiu faras la samon:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Konstanta stokado kaj reproduktado

Redis Stream estas nesinkrone reproduktita al sklavnodoj kaj konservita en dosieroj kiel AOF (momentfoto de ĉiuj datumoj) kaj RDB (protokolo de ĉiuj skribaj operacioj). Reproduktado de Konsumantgrupoj-ŝtato ankaŭ estas subtenata. Tial, se mesaĝo estas en la "pritraktata" statuso sur la majstra nodo, tiam sur la sklavaj nodoj ĉi tiu mesaĝo havos la saman statuson.

Forigante individuajn elementojn de rivereto

Estas speciala komando por forigi mesaĝojn XDEL. La komando ricevas la nomon de la fadeno sekvata de la mesaĝo-identigiloj por esti forigita:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Kiam vi uzas ĉi tiun komandon, vi devas konsideri, ke la reala memoro ne tuj estos liberigita.

Nullongaj riveretoj

La diferenco inter fluoj kaj aliaj datumstrukturoj de Redis estas ke kiam aliaj datumstrukturoj ne plu havas elementojn ene de ili, kiel kromefiko, la datumstrukturo mem estos forigita de memoro. Do, ekzemple, la ordigita aro estos tute forigita kiam la ZREM-voko forigas la lastan elementon. Anstataŭe, fadenoj rajtas resti en memoro eĉ sen havi iujn ajn elementojn ene.

konkludo

Redis Stream estas ideala por krei mesaĝajn makleristojn, mesaĝajn atendovicojn, unuigitan registradon kaj historikonservajn babilsistemojn.

Kiel mi iam diris Niklaus Wirth, programoj estas algoritmoj plus datumstrukturoj, kaj Redis jam donas al vi ambaŭ.

fonto: www.habr.com

Aldoni komenton