Redis Stream: affidabilità e scalabilità dei tuoi sistemi di messaggistica

Redis Stream: affidabilità e scalabilità dei tuoi sistemi di messaggistica

Redis Stream è un nuovo tipo di dati astratti introdotto in Redis con la versione 5.0
Concettualmente, Redis Stream è un elenco a cui è possibile aggiungere voci. Ogni voce ha un identificatore univoco. Per impostazione predefinita, l'ID viene generato automaticamente e include un timestamp. Pertanto, è possibile eseguire query su intervalli di record nel tempo o ricevere nuovi dati non appena arrivano nel flusso, proprio come il comando "tail -f" di Unix legge un file di registro e si blocca durante l'attesa di nuovi dati. Tieni presente che più client possono ascoltare un thread contemporaneamente, proprio come molti processi "tail -f" possono leggere un file simultaneamente senza entrare in conflitto tra loro.

Per comprendere tutti i vantaggi del nuovo tipo di dati, diamo una rapida occhiata alle strutture Redis esistenti da tempo che replicano parzialmente la funzionalità di Redis Stream.

Redis PUB/SUB

Redis Pub/Sub è un semplice sistema di messaggistica già integrato nel tuo negozio di valori-chiave. Tuttavia, la semplicità ha un prezzo:

  • Se l'editore per qualche motivo fallisce, perde tutti i suoi abbonati
  • L'editore ha bisogno di conoscere l'indirizzo esatto di tutti i suoi abbonati
  • Un editore può sovraccaricare di lavoro i suoi abbonati se i dati vengono pubblicati più velocemente di quanto vengono elaborati
  • Il messaggio viene eliminato dal buffer dell'editore immediatamente dopo la pubblicazione, indipendentemente dal numero di abbonati a cui è stato consegnato e dalla velocità con cui sono stati in grado di elaborare il messaggio.
  • Tutti gli abbonati riceveranno il messaggio contemporaneamente. Gli stessi abbonati devono in qualche modo concordare tra loro l'ordine di elaborazione dello stesso messaggio.
  • Non esiste un meccanismo integrato per confermare che un sottoscrittore abbia elaborato correttamente un messaggio. Se un abbonato riceve un messaggio e si blocca durante l'elaborazione, l'editore non lo saprà.

Elenco Redis

Redis List è una struttura dati che supporta il blocco dei comandi di lettura. Puoi aggiungere e leggere i messaggi dall'inizio o dalla fine dell'elenco. Sulla base di questa struttura, puoi creare un buon stack o coda per il tuo sistema distribuito e nella maggior parte dei casi questo sarà sufficiente. Principali differenze rispetto a Redis Pub/Sub:

  • Il messaggio viene recapitato a un client. Il primo client con lettura bloccata riceverà per primo i dati.
  • Clint deve avviare lui stesso l'operazione di lettura di ciascun messaggio. List non sa nulla dei clienti.
  • I messaggi vengono archiviati finché qualcuno non li legge o li elimina esplicitamente. Se si configura il server Redis per scaricare i dati su disco, l'affidabilità del sistema aumenta notevolmente.

Introduzione allo streaming

Aggiunta di una voce a uno stream

Squadra XAGGIUNGI aggiunge una nuova voce allo stream. Un record non è solo una stringa, ma è costituito da una o più coppie chiave-valore. Pertanto ogni voce è già strutturata e ricorda la struttura di un file CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Nell'esempio sopra, aggiungiamo due campi allo stream con il nome (chiave) “mystream”: “sensor-id” e “temperature” con i valori “1234” e “19.8”, rispettivamente. Come secondo argomento, il comando accetta un identificatore che verrà assegnato alla voce: questo identificatore identifica in modo univoco ciascuna voce nello stream. Tuttavia, in questo caso abbiamo superato * perché vogliamo che Redis generi un nuovo ID per noi. Ogni nuovo ID aumenterà. Pertanto ogni nuova voce avrà un identificatore più elevato rispetto alle voci precedenti.

Formato dell'identificatore

L'ID voce restituito dal comando XAGGIUNGI, è composto da due parti:

{millisecondsTime}-{sequenceNumber}

millisecondiTempo — Ora Unix in millisecondi (ora del server Redis). Tuttavia, se l'ora corrente è uguale o inferiore all'ora della registrazione precedente, viene utilizzato il timestamp della registrazione precedente. Pertanto, se l'ora del server va indietro nel tempo, il nuovo identificatore manterrà comunque la proprietà di incremento.

sequenza di numeri utilizzato per i record creati nello stesso millisecondo. sequenza di numeri verrà incrementato di 1 rispetto alla voce precedente. Perché il sequenza di numeri ha una dimensione di 64 bit, in pratica non dovresti incorrere in un limite al numero di record che possono essere generati entro un millisecondo.

Il formato di tali identificatori può sembrare strano a prima vista. Un lettore diffidente potrebbe chiedersi perché il tempo fa parte dell'identificatore. Il motivo è che i flussi Redis supportano query di intervallo in base all'ID. Poiché l'identificatore è associato all'ora in cui è stato creato il record, ciò rende possibile interrogare gli intervalli di tempo. Vedremo un esempio specifico quando esamineremo il comando XRANGE.

Se per qualche motivo l'utente ha bisogno di specificare il proprio identificatore, che, ad esempio, è associato a qualche sistema esterno, allora possiamo passarlo al comando XAGGIUNGI invece di * come mostrato di seguito:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Tieni presente che in questo caso dovrai monitorare tu stesso l'incremento dell'ID. Nel nostro esempio, l'identificatore minimo è "0-1", quindi il comando non accetterà un altro identificatore uguale o inferiore a "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Numero di record per flusso

È possibile ottenere il numero di record in un flusso semplicemente utilizzando il comando XLEN. Per il nostro esempio, questo comando restituirà il seguente valore:

> XLEN somestream
(integer) 2

Query di intervallo: XRANGE e XREVRANGE

Per richiedere dati per intervallo, dobbiamo specificare due identificatori: l'inizio e la fine dell'intervallo. L'intervallo restituito includerà tutti gli elementi, compresi i confini. Sono inoltre presenti due identificatori speciali “-” e “+”, che indicano rispettivamente l'identificatore più piccolo (primo record) e quello più grande (ultimo record) nel flusso. L'esempio seguente elencherà tutte le voci del flusso.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Ogni record restituito è un array di due elementi: un identificatore e un elenco di coppie chiave-valore. Abbiamo già detto che gli identificatori dei record sono legati al tempo. Pertanto, possiamo richiedere un intervallo di un periodo di tempo specifico. Possiamo però specificare nella richiesta non l'identificatore completo, ma solo l'ora Unix, omettendo la parte relativa a sequenza di numeri. La parte omessa dell'identificatore verrà automaticamente impostata su zero all'inizio dell'intervallo e sul valore massimo possibile alla fine dell'intervallo. Di seguito è riportato un esempio di come è possibile richiedere un intervallo di due millisecondi.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Abbiamo solo una voce in questo intervallo, tuttavia nei set di dati reali il risultato restituito può essere enorme. Per questa ragione XRANGE supporta l'opzione COUNT. Specificando la quantità possiamo semplicemente ottenere i primi N record. Se dobbiamo ottenere i successivi N record (impaginazione), possiamo utilizzare l'ultimo ID ricevuto, incrementandolo sequenza di numeri per uno e chiedi di nuovo. Consideriamolo nell'esempio seguente. Iniziamo ad aggiungere 10 elementi con XAGGIUNGI (supponendo che mystream fosse già pieno di 10 elementi). Per iniziare l'iterazione ottenendo 2 elementi per comando, iniziamo con l'intervallo completo ma con COUNT uguale a 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Per continuare l'iterazione con i due elementi successivi, dobbiamo selezionare l'ultimo ID ricevuto, ovvero 1519073279157-0, e aggiungere 1 a sequenza di numeri.
L'ID risultante, in questo caso 1519073279157-1, può ora essere utilizzato come nuovo argomento di inizio intervallo per la chiamata successiva XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

E così via. Perché la complessità XRANGE è O(log(N)) per cercare e quindi O(M) per restituire M elementi, quindi ogni passaggio di iterazione è veloce. Quindi, utilizzando XRANGE i flussi possono essere iterati in modo efficiente.

Squadra XREVRANGE è l'equivalente XRANGE, ma restituisce gli elementi in ordine inverso:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Tieni presente che il comando XREVRANGE accetta argomenti di intervallo che iniziano e si fermano in ordine inverso.

Lettura di nuove voci utilizzando XREAD

Spesso sorge il compito di iscriversi a uno stream e ricevere solo nuovi messaggi. Questo concetto può sembrare simile a Redis Pub/Sub o al blocco dell'elenco Redis, ma esistono differenze fondamentali su come utilizzare Redis Stream:

  1. Ogni nuovo messaggio viene recapitato a ogni abbonato per impostazione predefinita. Questo comportamento è diverso da un elenco Redis di blocco, in cui un nuovo messaggio verrà letto solo da un abbonato.
  2. Mentre in Redis Pub/Sub tutti i messaggi vengono dimenticati e non vengono mai resi persistenti, in Stream tutti i messaggi vengono conservati a tempo indeterminato (a meno che il client non ne causi esplicitamente l'eliminazione).
  3. Redis Stream ti consente di differenziare l'accesso ai messaggi all'interno di un flusso. Un abbonato specifico può vedere solo la cronologia dei messaggi personali.

Puoi iscriverti a un thread e ricevere nuovi messaggi utilizzando il comando XREAD. È un po' più complicato di XRANGE, quindi inizieremo prima con gli esempi più semplici.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

L'esempio sopra mostra un modulo non bloccante XREAD. Tieni presente che l'opzione COUNT è facoltativa. Infatti, l'unica opzione di comando richiesta è l'opzione STREAMS, che specifica un elenco di flussi insieme al corrispondente identificatore massimo. Abbiamo scritto "STREAMS mystream 0": vogliamo ricevere tutti i record del flusso mystream con un identificatore maggiore di "0-0". Come puoi vedere dall'esempio, il comando restituisce il nome del thread perché possiamo iscriverci a più thread contemporaneamente. Potremmo scrivere, ad esempio, "STREAMS mystream otherstream 0 0". Tieni presente che dopo l'opzione STREAMS dobbiamo prima fornire i nomi di tutti gli stream richiesti e solo successivamente un elenco di identificatori.

In questa forma semplice il comando non fa nulla di speciale rispetto a XRANGE. Tuttavia, la cosa interessante è che possiamo girare facilmente XREAD a un comando di blocco, specificando l'argomento BLOCCO:

> XREAD BLOCK 0 STREAMS mystream $

Nell'esempio sopra, viene specificata una nuova opzione BLOCCO con un timeout di 0 millisecondi (questo significa attendere indefinitamente). Inoltre, invece di passare il solito identificatore per lo stream mystream, è stato passato un identificatore speciale $. Questo identificatore speciale significa questo XREAD deve utilizzare l'identificatore massimo in mystream come identificatore. Quindi riceveremo nuovi messaggi solo a partire dal momento in cui abbiamo iniziato ad ascoltare. In un certo senso questo è simile al comando Unix "tail -f".

Tieni presente che quando utilizziamo l'opzione BLOCK non dobbiamo necessariamente utilizzare l'identificatore speciale $. Possiamo utilizzare qualsiasi identificatore esistente nello stream. Se il team può soddisfare immediatamente la nostra richiesta senza bloccarla, lo farà, altrimenti bloccherà.

Blocco XREAD puoi anche ascoltare più thread contemporaneamente, devi solo specificare i loro nomi. In questo caso, il comando restituirà un record del primo flusso che ha ricevuto dati. Il primo abbonato bloccato per un determinato thread riceverà per primo i dati.

Gruppi di consumatori

In alcune attività, vogliamo limitare l'accesso degli iscritti ai messaggi all'interno di un thread. Un esempio in cui ciò potrebbe essere utile è una coda di messaggi con lavoratori che riceveranno messaggi diversi da un thread, consentendo la scalabilità dell'elaborazione dei messaggi.

Se immaginiamo di avere tre abbonati C1, C2, C3 e un thread che contiene i messaggi 1, 2, 3, 4, 5, 6, 7, allora i messaggi verranno serviti come nello schema seguente:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Per ottenere questo effetto, Redis Stream utilizza un concetto chiamato Consumer Group. Questo concetto è simile a uno pseudo-abbonato, che riceve dati da un flusso, ma in realtà è servito da più abbonati all'interno di un gruppo, fornendo alcune garanzie:

  1. Ogni messaggio viene recapitato a un abbonato diverso all'interno del gruppo.
  2. All'interno di un gruppo, gli iscritti sono identificati dal loro nome, che è una stringa con distinzione tra maiuscole e minuscole. Se un iscritto abbandona temporaneamente il gruppo, può essere reintegrato nel gruppo utilizzando il suo nome univoco.
  3. Ogni Gruppo di Consumatori segue il concetto del "primo messaggio non letto". Quando un abbonato richiede nuovi messaggi, può ricevere solo messaggi che non sono mai stati consegnati in precedenza a nessun abbonato all'interno del gruppo.
  4. Esiste un comando per confermare esplicitamente che il messaggio è stato elaborato correttamente dall'abbonato. Finché non verrà richiamato questo comando, il messaggio richiesto rimarrà nello stato "in sospeso".
  5. All'interno del Gruppo Consumatori, ogni iscritto può richiedere lo storico dei messaggi che gli sono stati consegnati, ma non ancora elaborati (nello stato “in sospeso”)

In un certo senso, lo stato del gruppo può essere espresso come segue:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Ora è il momento di familiarizzare con i comandi principali per il Gruppo Consumatori, vale a dire:

  • GRUPPO X utilizzato per creare, distruggere e gestire gruppi
  • XREADGRUPPO utilizzato per leggere il flusso attraverso il gruppo
  • XACK - questo comando consente all'abbonato di contrassegnare il messaggio come elaborato con successo

Creazione del gruppo di consumatori

Supponiamo che mystream esista già. Quindi il comando di creazione del gruppo sarà simile a:

> XGROUP CREATE mystream mygroup $
OK

Quando creiamo un gruppo dobbiamo passare un identificatore, a partire dal quale il gruppo riceverà i messaggi. Se vogliamo solo ricevere tutti i nuovi messaggi, allora possiamo usare l'identificatore speciale $ (come nel nostro esempio sopra). Se specifichi 0 invece di un identificatore speciale, tutti i messaggi nel thread saranno disponibili per il gruppo.

Ora che il gruppo è stato creato possiamo iniziare subito a leggere i messaggi utilizzando il comando XREADGRUPPO. Questo comando è molto simile a XREAD e supporta l'opzione BLOCK opzionale. Tuttavia, esiste un'opzione GRUPPO obbligatoria che deve essere sempre specificata con due argomenti: il nome del gruppo e il nome dell'abbonato. È supportata anche l'opzione COUNT.

Prima di leggere il thread, inseriamo alcuni messaggi lì:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Ora proviamo a leggere questo flusso attraverso il gruppo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Il comando precedente si legge testualmente come segue:

"Io, l'abbonata Alice, membro di mygroup, voglio leggere un messaggio da mystream che non è mai stato recapitato a nessuno prima."

Ogni volta che un utente esegue un'operazione su un gruppo, deve fornire il proprio nome, identificandosi in modo univoco all'interno del gruppo. C'è un altro dettaglio molto importante nel comando precedente: l'identificatore speciale ">". Questo identificatore speciale filtra i messaggi, lasciando solo quelli che non sono mai stati recapitati prima.

Inoltre, in casi speciali, è possibile specificare un identificatore reale come 0 o qualsiasi altro identificatore valido. In questo caso il comando XREADGRUPPO ti restituirà una cronologia dei messaggi con lo stato "in sospeso" che sono stati consegnati all'abbonato specificato (Alice) ma non sono stati ancora riconosciuti utilizzando il comando XACK.

Possiamo testare questo comportamento specificando immediatamente l'ID 0, senza l'opzione COUNT. Vedremo semplicemente un unico messaggio in sospeso, ovvero il messaggio Apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Tuttavia, se confermiamo che il messaggio è stato elaborato correttamente, non verrà più visualizzato:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Ora tocca a Bob leggere qualcosa:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, un membro del mio gruppo, ha chiesto non più di due messaggi. Il comando segnala solo i messaggi non consegnati a causa dell'identificatore speciale ">". Come puoi vedere, il messaggio "mela" non verrà visualizzato poiché è già stato consegnato ad Alice, quindi Bob riceve "arancia" e "fragola".

In questo modo, Alice, Bob e qualsiasi altro iscritto al gruppo possono leggere messaggi diversi dallo stesso flusso. Possono anche leggere la cronologia dei messaggi non elaborati o contrassegnare i messaggi come elaborati.

Ci sono alcune cose da tenere a mente:

  • Non appena l'abbonato considera il messaggio un comando XREADGRUPPO, questo messaggio passa allo stato "in sospeso" e viene assegnato a quello specifico abbonato. Gli altri iscritti al gruppo non saranno in grado di leggere questo messaggio.
  • Gli abbonati vengono creati automaticamente alla prima menzione, non è necessario crearli esplicitamente.
  • Con XREADGRUPPO puoi leggere messaggi da più thread diversi contemporaneamente, tuttavia affinché funzioni devi prima creare gruppi con lo stesso nome per ciascun thread utilizzando GRUPPO X

Recupero dopo un fallimento

L'abbonato può riprendersi dall'errore e rileggere il suo elenco di messaggi con lo stato "in sospeso". Tuttavia, nel mondo reale, gli abbonati potrebbero alla fine fallire. Cosa succede ai messaggi bloccati di un abbonato se l'abbonato non è in grado di riprendersi da un errore?
Consumer Group offre una funzionalità che viene utilizzata proprio in questi casi, quando è necessario modificare il proprietario dei messaggi.

La prima cosa che devi fare è chiamare il comando SPESA, che visualizza tutti i messaggi del gruppo con lo stato "in sospeso". Nella sua forma più semplice, il comando viene chiamato con solo due argomenti: il nome del thread e il nome del gruppo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Il team ha visualizzato il numero di messaggi non elaborati per l'intero gruppo e per ciascun iscritto. Abbiamo solo Bob con due messaggi in sospeso perché l'unico messaggio richiesto da Alice è stato confermato XACK.

Possiamo richiedere maggiori informazioni utilizzando più argomenti:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - intervallo di identificatori (puoi utilizzare "-" e "+")
{count}: numero di tentativi di consegna
{nome-consumatore}: nome del gruppo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Ora abbiamo i dettagli per ciascun messaggio: ID, nome dell'abbonato, tempo di inattività in millisecondi e infine il numero di tentativi di consegna. Abbiamo due messaggi da Bob e sono rimasti inattivi per 74170458 millisecondi, circa 20 ore.

Tieni presente che nessuno ci impedisce di verificare quale fosse il contenuto del messaggio semplicemente utilizzando XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Dobbiamo solo ripetere due volte lo stesso identificatore negli argomenti. Ora che ne abbiamo un'idea, Alice potrebbe decidere che dopo 20 ore di inattività, Bob probabilmente non si riprenderà ed è il momento di interrogare quei messaggi e riprendere l'elaborazione per Bob. Per questo usiamo il comando X RECLAMO:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Utilizzando questo comando possiamo ricevere un messaggio “estraneo” che non è stato ancora elaborato cambiando il proprietario in {consumer}. Tuttavia, possiamo anche fornire un tempo di inattività minimo {min-idle-time}. Ciò aiuta a evitare una situazione in cui due client tentano di modificare contemporaneamente il proprietario degli stessi messaggi:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Il primo cliente azzererà i tempi di inattività e aumenterà il contatore delle consegne. Quindi il secondo client non potrà richiederlo.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Il messaggio è stato rivendicato con successo da Alice, che ora può elaborarlo e riconoscerlo.

Dall'esempio sopra, puoi vedere che una richiesta andata a buon fine restituisce il contenuto del messaggio stesso. Tuttavia, ciò non è necessario. L'opzione JUSTID può essere utilizzata per restituire solo gli ID dei messaggi. Ciò è utile se non sei interessato ai dettagli del messaggio e desideri aumentare le prestazioni del sistema.

Contatore di consegna

Il contatore che vedi nell'output SPESA è il numero di consegne di ciascun messaggio. Tale contatore viene incrementato in due modi: quando un messaggio viene richiesto con successo tramite X RECLAMO o quando viene utilizzata una chiamata XREADGRUPPO.

È normale che alcuni messaggi vengano recapitati più volte. La cosa principale è che tutti i messaggi alla fine vengano elaborati. A volte si verificano problemi durante l'elaborazione di un messaggio perché il messaggio stesso è danneggiato oppure l'elaborazione del messaggio causa un errore nel codice del gestore. In questo caso, potrebbe risultare che nessuno sarà in grado di elaborare questo messaggio. Poiché disponiamo di un contatore dei tentativi di consegna, possiamo utilizzare questo contatore per rilevare tali situazioni. Pertanto, una volta che il numero di consegne raggiunge il numero elevato specificato, probabilmente sarebbe più saggio inserire tale messaggio in un altro thread e inviare una notifica all'amministratore di sistema.

Stato del thread

Squadra XINFO utilizzato per richiedere varie informazioni su un thread e i suoi gruppi. Ad esempio, un comando di base è simile al seguente:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Il comando precedente visualizza informazioni generali sul flusso specificato. Ora un esempio leggermente più complesso:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Il comando precedente visualizza informazioni generali per tutti i gruppi del thread specificato

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Il comando precedente visualizza le informazioni per tutti i sottoscrittori dello stream e del gruppo specificati.
Se dimentichi la sintassi del comando, chiedi aiuto al comando stesso:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limite dimensione flusso

Molte applicazioni non desiderano raccogliere dati in un flusso per sempre. Spesso è utile avere un numero massimo di messaggi consentiti per thread. In altri casi, è utile spostare tutti i messaggi da un thread a un altro archivio persistente quando viene raggiunta la dimensione del thread specificata. Puoi limitare la dimensione di un flusso utilizzando il parametro MAXLEN nel comando XAGGIUNGI:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Quando si utilizza MAXLEN, i vecchi record vengono automaticamente eliminati quando raggiungono una lunghezza specifica, quindi il flusso ha una dimensione costante. Tuttavia, l'eliminazione in questo caso non avviene nel modo più efficiente nella memoria Redis. Puoi migliorare la situazione come segue:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

L'argomento ~ nell'esempio sopra significa che non dobbiamo necessariamente limitare la lunghezza del flusso a un valore specifico. Nel nostro esempio, potrebbe essere qualsiasi numero maggiore o uguale a 1000 (ad esempio, 1000, 1010 o 1030). Abbiamo appena specificato esplicitamente che vogliamo che il nostro stream memorizzi almeno 1000 record. Ciò rende la gestione della memoria molto più efficiente all'interno di Redis.

C'è anche una squadra separata XTRIM, che fa la stessa cosa:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Archiviazione e replica persistenti

Redis Stream viene replicato in modo asincrono sui nodi slave e salvato in file come AOF (istantanea di tutti i dati) e RDB (registro di tutte le operazioni di scrittura). È supportata anche la replica dello stato dei gruppi di consumatori. Pertanto, se un messaggio è nello stato “in sospeso” sul nodo master, allora sui nodi slave questo messaggio avrà lo stesso stato.

Rimozione di singoli elementi da uno stream

C'è un comando speciale per eliminare i messaggi XDEL. Il comando ottiene il nome del thread seguito dagli ID dei messaggi da eliminare:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Quando si utilizza questo comando, è necessario tenere presente che la memoria effettiva non verrà rilasciata immediatamente.

Flussi di lunghezza zero

La differenza tra i flussi e le altre strutture dati Redis è che quando altre strutture dati non hanno più elementi al loro interno, come effetto collaterale, la struttura dati stessa verrà rimossa dalla memoria. Quindi, ad esempio, l'insieme ordinato verrà completamente rimosso quando la chiamata ZREM rimuove l'ultimo elemento. Invece, i thread possono rimanere in memoria anche senza avere elementi al loro interno.

conclusione

Redis Stream è ideale per creare broker di messaggi, code di messaggi, registrazione unificata e sistemi di chat con conservazione della cronologia.

Come ho detto una volta Nicola Wirth, i programmi sono algoritmi più strutture dati e Redis ti offre già entrambi.

Fonte: habr.com

Aggiungi un commento