Redis Stream - viestintäjärjestelmien luotettavuus ja skaalautuvuus

Redis Stream - viestintäjärjestelmien luotettavuus ja skaalautuvuus

Redis Stream on uusi abstrakti tietotyyppi, joka esiteltiin Rediksen versiossa 5.0
Käsitteellisesti Redis Stream on luettelo, johon voit lisätä merkintöjä. Jokaisella merkinnällä on yksilöllinen tunniste. Oletusarvoisesti tunnus luodaan automaattisesti ja sisältää aikaleiman. Siksi voit kysellä tietuealueita ajan kuluessa tai vastaanottaa uusia tietoja, kun ne saapuvat virtaan, aivan kuten Unixin "tail -f" -komento lukee lokitiedoston ja jumiutuu odottaessaan uusia tietoja. Huomaa, että useat asiakkaat voivat kuunnella säiettä samanaikaisesti, aivan kuten monet "tail -f" -prosessit voivat lukea tiedoston samanaikaisesti ilman ristiriitaa keskenään.

Ymmärtääksemme kaikki uuden tietotyypin edut, katsotaanpa nopeasti kauan olemassa olevia Redis-rakenteita, jotka osittain toistavat Redis Streamin toimintoja.

Redis PUB/SUB

Redis Pub/Sub on yksinkertainen viestintäjärjestelmä, joka on jo sisäänrakennettu avainarvokauppaasi. Yksinkertaisuudella on kuitenkin hintansa:

  • Jos kustantaja jostain syystä epäonnistuu, hän menettää kaikki tilaajansa
  • Kustantajan on tiedettävä kaikkien tilaajiensa tarkka osoite
  • Kustantaja voi ylikuormittaa tilaajiaan työllä, jos tiedot julkaistaan ​​nopeammin kuin ne käsitellään
  • Viesti poistetaan julkaisijan puskurista välittömästi julkaisun jälkeen riippumatta siitä, kuinka monelle tilaajalle se on toimitettu ja kuinka nopeasti he pystyivät käsittelemään tämän viestin.
  • Kaikki tilaajat saavat viestin samanaikaisesti. Tilaajien itsensä täytyy jotenkin sopia keskenään saman viestin käsittelyjärjestyksestä.
  • Ei ole sisäänrakennettua mekanismia sen varmistamiseksi, että tilaaja on käsitellyt viestin onnistuneesti. Jos tilaaja saa viestin ja kaatuu käsittelyn aikana, julkaisija ei tiedä siitä.

Redis-lista

Redis List on tietorakenne, joka tukee lukukomentojen estämistä. Voit lisätä ja lukea viestejä luettelon alusta tai lopusta. Tämän rakenteen perusteella voit tehdä hyvän pinon tai jonon hajautetulle järjestelmällesi, ja useimmissa tapauksissa tämä riittää. Tärkeimmät erot Redis Pub/Subista:

  • Viesti toimitetaan yhdelle asiakkaalle. Ensimmäinen lukuestetty asiakas vastaanottaa tiedot ensin.
  • Clintin on aloitettava jokaisen viestin lukutoiminto itse. Lista ei tiedä asiakkaista mitään.
  • Viestit säilytetään, kunnes joku lukee ne tai poistaa ne. Jos määrität Redis-palvelimen huuhtelemaan tiedot levylle, järjestelmän luotettavuus kasvaa dramaattisesti.

Johdatus Streamiin

Merkinnän lisääminen streamiin

Joukkue XADD lisää uuden merkinnän streamiin. Tietue ei ole vain merkkijono, se koostuu yhdestä tai useammasta avain-arvo-parista. Siten jokainen merkintä on jo jäsennelty ja muistuttaa CSV-tiedoston rakennetta.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Yllä olevassa esimerkissä lisäämme streamiin kaksi kenttää, joiden nimi (avain) on "mystream": "sensor-id" ja "temperature" arvoilla "1234" ja "19.8". Toisena argumenttina komento ottaa merkinnälle määritettävän tunnisteen - tämä tunniste yksilöi jokaisen virran merkinnän. Tässä tapauksessa hyväksyimme *, koska haluamme Redisin luovan meille uuden tunnuksen. Jokainen uusi tunnus kasvaa. Siksi jokaisella uudella merkinnällä on korkeampi tunniste verrattuna aikaisempiin merkintöihin.

Tunnisteen muoto

Komennon palauttama merkintätunnus XADD, koostuu kahdesta osasta:

{millisecondsTime}-{sequenceNumber}

millisekuntiaAika — Unix-aika millisekunteina (Redis-palvelinaika). Jos nykyinen aika on kuitenkin sama tai pienempi kuin edellisen tallennuksen aika, käytetään edellisen tallennuksen aikaleimaa. Siksi, jos palvelimen aika palaa ajassa taaksepäin, uusi tunniste säilyttää edelleen lisäysominaisuuden.

sekvenssi numero käytetään tietueisiin, jotka on luotu samassa millisekunnissa. sekvenssi numero korotetaan 1:llä edelliseen merkintään verrattuna. Koska sekvenssi numero on kooltaan 64 bittiä, niin käytännössä yhden millisekunnin sisällä generoitavien tietueiden lukumäärää ei pitäisi rajoittaa.

Tällaisten tunnisteiden muoto voi tuntua ensi silmäyksellä oudolta. Epäluuloinen lukija saattaa ihmetellä, miksi aika on osa tunnistetta. Syynä on, että Redis-streamit tukevat aluekyselyitä tunnuksen perusteella. Koska tunniste liittyy tietueen luomisaikaan, tämä mahdollistaa aikajaksojen kyselyn. Tarkastellaan tiettyä esimerkkiä, kun katsomme komentoa XRANGE.

Jos käyttäjän on jostain syystä määritettävä oma tunniste, joka liittyy esimerkiksi johonkin ulkoiseen järjestelmään, voimme välittää sen komennolla XADD alla olevan kuvan sijaan *:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Huomaa, että tässä tapauksessa sinun on itse seurattava tunnuksen lisäystä. Esimerkissämme minimitunniste on "0-1", joten komento ei hyväksy toista tunnistetta, joka on yhtä suuri tai pienempi kuin "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Tietueiden määrä streamia kohden

On mahdollista saada tietueiden määrä streamissa yksinkertaisesti käyttämällä komentoa XLEN. Esimerkissämme tämä komento palauttaa seuraavan arvon:

> XLEN somestream
(integer) 2

Aluekyselyt - XRANGE ja XREVRANGE

Tietojen pyytämiseksi alueen mukaan meidän on määritettävä kaksi tunnistetta - alueen alku ja loppu. Palautettu alue sisältää kaikki elementit, mukaan lukien rajat. On myös kaksi erityistä tunnistetta "-" ja "+", jotka tarkoittavat vastaavasti pienintä (ensimmäinen tietue) ja suurin (viimeinen tietue) tunnistetta streamissa. Alla olevassa esimerkissä luetellaan kaikki stream-merkinnät.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Jokainen palautettu tietue on kahden elementin joukko: tunniste ja luettelo avainarvopareista. Sanoimme jo, että tietuetunnisteet liittyvät aikaan. Siksi voimme pyytää tietyn ajanjakson. Pyynnössä voimme kuitenkin määrittää, ettei koko tunnistetta, vaan vain Unix-aikaa, jättäen pois liittyvän osan sekvenssi numero. Tunnisteen pois jätetty osa asetetaan automaattisesti nollaan alueen alussa ja suurimmaksi mahdolliseksi arvoksi alueen lopussa. Alla on esimerkki siitä, kuinka voit pyytää kahden millisekunnin vaihteluväliä.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Meillä on vain yksi merkintä tällä alueella, mutta todellisissa tietojoukoissa palautettu tulos voi olla valtava. Tästä syystä XRANGE tukee COUNT-vaihtoehtoa. Määrittämällä määrän saamme yksinkertaisesti ensimmäiset N tietuetta. Jos haluamme saada seuraavat N tietuetta (sivutus), voimme käyttää viimeksi vastaanotettua ID:tä, lisätä sitä sekvenssi numero yksi kerrallaan ja kysy uudelleen. Katsotaanpa tätä seuraavassa esimerkissä. Aloitamme 10 elementin lisäämisen XADD (olettaen, että mystream oli jo täynnä 10 elementtiä). Aloittaaksesi iteroinnin, jossa on 2 elementtiä komentoa kohden, aloitamme koko alueesta, mutta COUNT on yhtä suuri kuin 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Jos haluat jatkaa iterointia kahden seuraavan elementin kanssa, meidän on valittava viimeksi vastaanotettu tunnus, eli 1519073279157-0, ja lisättävä 1 sekvenssi numero.
Tuloksena olevaa tunnusta, tässä tapauksessa 1519073279157-1, voidaan nyt käyttää seuraavan kutsun uutena alueen alkuargumenttina. XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Ja niin edelleen. Koska monimutkaisuus XRANGE on O(log(N)) etsimään ja sitten O(M) palauttamaan M elementtiä, silloin jokainen iteraatiovaihe on nopea. Siten käyttämällä XRANGE virtoja voidaan iteroida tehokkaasti.

Joukkue XREVRANGE on vastaava XRANGE, mutta palauttaa elementit käänteisessä järjestyksessä:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Huomaa, että komento XREVRANGE ottaa alueen argumentit aloitus- ja loppuargumentit käänteisessä järjestyksessä.

Uusien merkintöjen lukeminen XREAD:llä

Usein tehtävänä on tilata stream ja vastaanottaa vain uusia viestejä. Tämä konsepti saattaa näyttää samanlaiselta kuin Redis Pub/Sub tai Redis Listin esto, mutta Redis Streamin käytössä on perustavanlaatuisia eroja:

  1. Jokainen uusi viesti toimitetaan oletusarvoisesti jokaiselle tilaajalle. Tämä toiminta eroaa estävästä Redis-luettelosta, jossa vain yksi tilaaja lukee uuden viestin.
  2. Kun Redis Pub/Subissa kaikki viestit unohdetaan, eikä niitä koskaan säilytetä, Streamissä kaikki viestit säilytetään toistaiseksi (ellei asiakas nimenomaisesti aiheuta poistamista).
  3. Redis Streamin avulla voit erottaa pääsyn viesteihin yhdessä streamissa. Tietty tilaaja näkee vain henkilökohtaisen viestihistoriansa.

Voit tilata ketjun ja vastaanottaa uusia viestejä komennolla LUE. Se on hieman monimutkaisempi kuin XRANGE, joten aloitamme ensin yksinkertaisilla esimerkeillä.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Yllä oleva esimerkki näyttää estävän lomakkeen LUE. Huomaa, että COUNT-vaihtoehto on valinnainen. Itse asiassa ainoa vaadittu komentovaihtoehto on STREAMS-vaihtoehto, joka määrittää virtojen luettelon ja vastaavan enimmäistunnisteen. Kirjoitimme "STREAMS mystream 0" - haluamme vastaanottaa kaikki mystream-virran tietueet, joiden tunniste on suurempi kuin "0-0". Kuten esimerkistä näkyy, komento palauttaa säikeen nimen, koska voimme tilata useita säikeitä samanaikaisesti. Voisimme kirjoittaa esimerkiksi "STREAMS mystream otherstream 0 0". Huomaa, että STREAMS-vaihtoehdon jälkeen meidän on ensin annettava kaikkien vaadittujen streamien nimet ja vasta sitten luettelo tunnisteista.

Tässä yksinkertaisessa muodossa komento ei tee mitään erityistä verrattuna XRANGE. Mielenkiintoista on kuitenkin se, että voimme kääntyä helposti LUE estokomentoon, joka määrittää BLOCK-argumentin:

> XREAD BLOCK 0 STREAMS mystream $

Yllä olevassa esimerkissä on määritetty uusi BLOCK-vaihtoehto, jonka aikakatkaisu on 0 millisekuntia (tämä tarkoittaa odottamista toistaiseksi). Lisäksi sen sijaan, että olisi välitetty tavallinen tunniste stream mystreamille, erityinen tunniste $ välitettiin. Tämä erityinen tunniste tarkoittaa sitä LUE on käytettävä mystreamin enimmäistunnistetta tunnisteena. Joten saamme uusia viestejä vain siitä hetkestä lähtien, kun aloitimme kuuntelemisen. Jollain tapaa tämä on samanlainen kuin Unixin "tail -f" -komento.

Huomaa, että käytettäessä BLOCK-vaihtoehtoa meidän ei välttämättä tarvitse käyttää erityistä tunnistetta $. Voimme käyttää mitä tahansa streamissa olevaa tunnistetta. Jos tiimi pystyy käsittelemään pyyntömme välittömästi ilman estoa, se tekee niin, muuten se estää.

Estäminen LUE voi myös kuunnella useita säikeitä kerralla, sinun tarvitsee vain määrittää niiden nimet. Tässä tapauksessa komento palauttaa tietueen ensimmäisestä dataa vastaanottaneesta virrasta. Ensimmäinen tietyn säikeen estetty tilaaja vastaanottaa tiedot ensin.

Kuluttajaryhmät

Tietyissä tehtävissä haluamme rajoittaa tilaajien pääsyn yhden säikeen viesteihin. Esimerkki, jossa tämä voisi olla hyödyllistä, on viestijono työntekijöiden kanssa, jotka saavat erilaisia ​​viestejä säikeestä, mikä mahdollistaa viestien käsittelyn skaalaamisen.

Jos kuvittelemme, että meillä on kolme tilaajaa C1, C2, C3 ja viestiketju, joka sisältää viestit 1, 2, 3, 4, 5, 6, 7, niin viestit toimitetaan alla olevan kaavion mukaisesti:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Tämän vaikutuksen saavuttamiseksi Redis Stream käyttää Consumer Group -nimistä konseptia. Tämä konsepti on samanlainen kuin pseudotilaaja, joka vastaanottaa dataa virrasta, mutta jota itse asiassa palvelevat useat tilaajat ryhmän sisällä, mikä tarjoaa tietyt takeet:

  1. Jokainen viesti toimitetaan eri tilaajalle ryhmän sisällä.
  2. Ryhmän sisällä tilaajat tunnistetaan heidän nimensä perusteella, joka on merkkijono, jossa kirjainkoolla on merkitystä. Jos tilaaja poistuu tilapäisesti ryhmästä, hänet voidaan palauttaa ryhmään omalla yksilöllisellä nimellä.
  3. Jokainen kuluttajaryhmä noudattaa "ensimmäisen lukemattoman viestin" käsitettä. Kun tilaaja pyytää uusia viestejä, hän voi vastaanottaa vain viestejä, joita ei ole koskaan aiemmin toimitettu kenellekään ryhmän tilaajalle.
  4. Siellä on komento, joka vahvistaa, että tilaaja on käsitellyt viestin onnistuneesti. Kunnes tätä komentoa kutsutaan, pyydetty viesti pysyy "odottaa"-tilassa.
  5. Kuluttajaryhmässä jokainen tilaaja voi pyytää historiaa viesteistä, jotka on toimitettu hänelle, mutta joita ei ole vielä käsitelty ("odottaa"-tilassa)

Eräässä mielessä ryhmän tila voidaan ilmaista seuraavasti:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Nyt on aika tutustua kuluttajaryhmän pääkäskyihin, nimittäin:

  • XGROUP käytetään ryhmien luomiseen, tuhoamiseen ja hallintaan
  • XREADGROUP käytetään streamin lukemiseen ryhmän läpi
  • XACK - Tämän komennon avulla tilaaja voi merkitä viestin onnistuneesti käsitellyksi

Kuluttajaryhmän perustaminen

Oletetaan, että mystream on jo olemassa. Sitten ryhmän luomiskomento näyttää tältä:

> XGROUP CREATE mystream mygroup $
OK

Ryhmää luotaessa meidän on välitettävä tunniste, josta alkaen ryhmä saa viestejä. Jos haluamme vain vastaanottaa kaikki uudet viestit, voimme käyttää erityistä tunnistetta $ (kuten yllä olevassa esimerkissämme). Jos määrität 0:n erikoistunnisteen sijaan, kaikki viestiketjun viestit ovat ryhmän käytettävissä.

Nyt kun ryhmä on luotu, voimme heti aloittaa viestien lukemisen komennolla XREADGROUP. Tämä komento on hyvin samanlainen kuin LUE ja tukee valinnaista BLOCK-vaihtoehtoa. On kuitenkin olemassa pakollinen GROUP-vaihtoehto, joka on aina määritettävä kahdella argumentilla: ryhmän nimi ja tilaajan nimi. Myös COUNT-vaihtoehtoa tuetaan.

Ennen kuin luet ketjun, laitetaan sinne muutama viesti:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Yritetään nyt lukea tämä stream ryhmän läpi:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Yllä oleva komento kuuluu sanatarkasti seuraavasti:

"Minä, tilaaja Alice, oman ryhmän jäsen, haluan lukea yhden viestin mystreamistä, jota ei ole koskaan aiemmin toimitettu kenellekään."

Joka kerta kun tilaaja suorittaa toiminnon ryhmässä, sen on annettava nimensä, joka yksilöi itsensä ryhmässä. Yllä olevassa komennossa on vielä yksi erittäin tärkeä yksityiskohta - erityinen tunniste ">". Tämä erityinen tunniste suodattaa viestit jättäen vain ne, joita ei ole koskaan toimitettu.

Erikoistapauksissa voit myös määrittää todellisen tunnisteen, kuten 0, tai minkä tahansa muun kelvollisen tunnisteen. Tässä tapauksessa komento XREADGROUP palauttaa sinulle historian viesteistä, joiden tila on "odottaa", jotka on toimitettu määritetylle tilaajalle (Alice), mutta joita ei ole vielä kuitattu komennolla XACK.

Voimme testata tämän käyttäytymisen määrittämällä välittömästi tunnuksen 0 ilman vaihtoehtoa COUNT. Näemme vain yhden odottavan viestin, eli omenaviestin:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Jos kuitenkin vahvistamme, että viesti on käsitelty onnistuneesti, sitä ei enää näytetä:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Nyt on Bobin vuoro lukea jotain:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, oman ryhmän jäsen, pyysi korkeintaan kahta viestiä. Komento raportoi vain toimittamattomat viestit erikoistunnisteen ">" vuoksi. Kuten näet, viestiä "omena" ei näytetä, koska se on jo toimitettu Alicelle, joten Bob saa "oranssi" ja "mansikka".

Tällä tavalla Alice, Bob ja muut ryhmän tilaajat voivat lukea erilaisia ​​viestejä samasta virrasta. He voivat myös lukea käsittelemättömien viestiensä historiaa tai merkitä viestit käsitellyiksi.

Muutama asia on hyvä pitää mielessä:

  • Heti kun tilaaja pitää viestiä käskynä XREADGROUP, tämä viesti siirtyy odottavaan tilaan ja se on määritetty kyseiselle tilaajalle. Muut ryhmän tilaajat eivät voi lukea tätä viestiä.
  • Tilaajat luodaan automaattisesti ensimaininnan jälkeen, niitä ei tarvitse erikseen luoda.
  • Kanssa XREADGROUP voit lukea viestejä useista eri säikeistä samanaikaisesti, mutta jotta tämä toimisi, sinun on ensin luotava samannimiset ryhmät jokaiselle säikeelle käyttämällä XGROUP

Toipuminen epäonnistumisen jälkeen

Tilaaja voi toipua virheestä ja lukea uudelleen viestiluettelonsa, jonka tila on "odottaa". Todellisessa maailmassa tilaajat voivat kuitenkin lopulta epäonnistua. Mitä tapahtuu tilaajan jumissa oleville viesteille, jos tilaaja ei pysty toipumaan viasta?
Consumer Group tarjoaa ominaisuuden, jota käytetään juuri sellaisiin tilanteisiin - kun sinun on vaihdettava viestien omistajaa.

Ensimmäinen asia, joka sinun on tehtävä, on kutsua komento EXENDING, joka näyttää kaikki ryhmän viestit, joiden tila on "odottaa". Yksinkertaisimmassa muodossaan komentoa kutsutaan vain kahdella argumentilla: säikeen nimi ja ryhmän nimi:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Tiimi näytti käsittelemättömien viestien määrän koko ryhmälle ja jokaiselle tilaajalle. Meillä on vain Bob, jolla on kaksi odottamatonta viestiä, koska ainoa Alice pyytämä viesti vahvistettiin XACK.

Voimme pyytää lisätietoja käyttämällä muita argumentteja:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - tunnistealue (voit käyttää "-" ja "+")
{count} – toimitusyritysten määrä
{consumer-name} - ryhmän nimi

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Nyt meillä on tiedot jokaisesta viestistä: ID, tilaajan nimi, tyhjäkäyntiaika millisekunteina ja lopuksi toimitusyritysten lukumäärä. Meillä on kaksi viestiä Bobilta ja ne ovat olleet käyttämättömänä 74170458 millisekuntia eli noin 20 tuntia.

Huomaa, että kukaan ei estä meitä tarkistamasta viestin sisältöä pelkästään käyttämällä sitä XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Meidän on vain toistettava sama tunniste kahdesti argumenteissa. Nyt kun meillä on jokin käsitys, Alice saattaa päättää, että 20 tunnin seisokkiajan jälkeen Bob ei todennäköisesti toivu, ja on aika kysyä nämä viestit ja jatkaa niiden käsittelyä Bobin puolesta. Tätä varten käytämme komentoa XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Tämän komennon avulla voimme vastaanottaa "vieraan" viestin, jota ei ole vielä käsitelty vaihtamalla omistajaksi {kuluttaja}. Voimme kuitenkin määrittää myös vähimmäistyhjäkäyntiajan {min-idle-time}. Tämä auttaa välttämään tilanteen, jossa kaksi asiakasta yrittävät samanaikaisesti vaihtaa samojen viestien omistajaa:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Ensimmäinen asiakas nollaa seisokkiajan ja lisää toimituslaskuria. Joten toinen asiakas ei voi pyytää sitä.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice otti viestin onnistuneesti vastaan, ja hän voi nyt käsitellä viestin ja kuitata sen.

Yllä olevasta esimerkistä voit nähdä, että onnistunut pyyntö palauttaa itse viestin sisällön. Tämä ei kuitenkaan ole välttämätöntä. JUSTID-vaihtoehtoa voidaan käyttää vain viestitunnusten palauttamiseen. Tämä on hyödyllistä, jos et ole kiinnostunut viestin yksityiskohdista ja haluat lisätä järjestelmän suorituskykyä.

Toimituslaskuri

Tulosteessa näkyvä laskuri EXENDING on kunkin viestin toimitusten määrä. Tällaista laskuria kasvatetaan kahdella tavalla: kun viestiä pyydetään onnistuneesti kautta XCLAIM tai kun puhelua käytetään XREADGROUP.

On normaalia, että jotkin viestit toimitetaan useita kertoja. Tärkeintä on, että kaikki viestit käsitellään lopulta. Joskus viestin käsittelyssä ilmenee ongelmia, koska itse viesti on vioittunut tai viestin käsittely aiheuttaa virheen käsittelijäkoodissa. Tässä tapauksessa voi käydä niin, että kukaan ei pysty käsittelemään tätä viestiä. Koska meillä on toimitusyrityslaskuri, voimme käyttää tätä laskuria havaitaksemme tällaiset tilanteet. Siksi, kun toimitusten määrä saavuttaa määrittämäsi suuren luvun, olisi luultavasti viisaampaa laittaa tällainen viesti toiseen säikeeseen ja lähettää ilmoitus järjestelmänvalvojalle.

Säikeen tila

Joukkue XINFO käytetään pyytämään erilaisia ​​tietoja säikeestä ja sen ryhmistä. Esimerkiksi peruskomento näyttää tältä:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Yllä oleva komento näyttää yleiset tiedot määritetystä virrasta. Nyt hieman monimutkaisempi esimerkki:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Yllä oleva komento näyttää yleiset tiedot määritetyn säikeen kaikista ryhmistä

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Yllä oleva komento näyttää tiedot kaikille määritetyn virran ja ryhmän tilaajista.
Jos unohdat komennon syntaksin, pyydä apua itse komennosta:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Striimin kokorajoitus

Monet sovellukset eivät halua kerätä tietoja streamiin ikuisesti. Usein on hyödyllistä sallia viestien enimmäismäärä säiettä kohden. Muissa tapauksissa on hyödyllistä siirtää kaikki viestit säikeestä toiseen pysyvään varastoon, kun määritetty säikeen koko saavutetaan. Voit rajoittaa virran kokoa komennon MAXLEN-parametrilla XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

MAXLENia käytettäessä vanhat tietueet poistetaan automaattisesti, kun ne saavuttavat määritetyn pituuden, joten virran koko on vakio. Tässä tapauksessa karsiminen ei kuitenkaan tapahdu Redis-muistissa tehokkaimmalla tavalla. Voit parantaa tilannetta seuraavasti:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Yllä olevassa esimerkissä ~-argumentti tarkoittaa, että meidän ei välttämättä tarvitse rajoittaa virran pituutta tiettyyn arvoon. Esimerkissämme tämä voi olla mikä tahansa luku, joka on suurempi tai yhtä suuri kuin 1000 (esimerkiksi 1000, 1010 tai 1030). Määritimme juuri, että haluamme streamimme tallentavan vähintään 1000 tietuetta. Tämä tekee muistinhallinnasta paljon tehokkaampaa Redisissä.

Mukana on myös erillinen joukkue XTRIM, joka tekee saman asian:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Jatkuva tallennus ja replikointi

Redis Stream replikoidaan asynkronisesti orjasolmuihin ja tallennetaan tiedostoihin, kuten AOF (snapshot of all data) ja RDB (loki kaikista kirjoitustoiminnoista). Kuluttajaryhmien tilan replikointi on myös tuettu. Siksi, jos viesti on "odottaa"-tilassa isäntäsolmussa, orjasolmuissa tällä viestillä on sama tila.

Yksittäisten elementtien poistaminen streamista

Viestien poistamiseen on erityinen komento XDEL. Komento saa ketjun nimen ja sen jälkeen poistettavat viestitunnukset:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Tätä komentoa käytettäessä on otettava huomioon, että varsinaista muistia ei vapauteta heti.

Nollapituisia virtoja

Ero streamien ja muiden Redis-tietorakenteiden välillä on se, että kun muissa tietorakenteissa ei enää ole elementtejä sisällä, sivuvaikutuksena tietorakenne itse poistetaan muistista. Joten esimerkiksi lajiteltu joukko poistetaan kokonaan, kun ZREM-kutsu poistaa viimeisen elementin. Sen sijaan säikeet saavat jäädä muistiin, vaikka sisällä ei olisi elementtejä.

Johtopäätös

Redis Stream on ihanteellinen viestinvälittäjien, viestijonojen, yhtenäisten lokitietojen ja historiaa säilyttävien chat-järjestelmien luomiseen.

Kuten joskus sanoin Niklaus Wirth, ohjelmat ovat algoritmeja ja tietorakenteita, ja Redis tarjoaa jo sinulle molemmat.

Lähde: will.com

Lisää kommentti