Redis Stream: confiabilidad y escalabilidad de sus sistemas de mensajería

Redis Stream: confiabilidad y escalabilidad de sus sistemas de mensajería

Redis Stream es un nuevo tipo de datos abstracto introducido en Redis con la versión 5.0
Conceptualmente, Redis Stream es una lista a la que puede agregar entradas. Cada entrada tiene un identificador único. De forma predeterminada, la identificación se genera automáticamente e incluye una marca de tiempo. Por lo tanto, puede consultar rangos de registros a lo largo del tiempo o recibir nuevos datos a medida que llegan a la secuencia, de forma muy parecida al comando "tail -f" de Unix que lee un archivo de registro y se congela mientras espera nuevos datos. Tenga en cuenta que varios clientes pueden escuchar un hilo al mismo tiempo, del mismo modo que muchos procesos "tail -f" pueden leer un archivo simultáneamente sin entrar en conflicto entre sí.

Para comprender todos los beneficios del nuevo tipo de datos, echemos un vistazo rápido a las estructuras de Redis existentes desde hace mucho tiempo que replican parcialmente la funcionalidad de Redis Stream.

Redis PUB / SUB

Redis Pub/Sub es un sistema de mensajería simple que ya está integrado en su almacén de clave-valor. Sin embargo, la simplicidad tiene un precio:

  • Si el editor por alguna razón falla, pierde todos sus suscriptores.
  • El editor necesita saber la dirección exacta de todos sus suscriptores.
  • Un editor puede sobrecargar de trabajo a sus suscriptores si los datos se publican más rápido de lo que se procesan.
  • El mensaje se elimina del búfer del editor inmediatamente después de su publicación, independientemente de a cuántos suscriptores se entregó y de la rapidez con la que pudieron procesar este mensaje.
  • Todos los suscriptores recibirán el mensaje al mismo tiempo. Los propios suscriptores deben ponerse de acuerdo de alguna manera sobre el orden de procesamiento del mismo mensaje.
  • No existe ningún mecanismo integrado para confirmar que un suscriptor haya procesado correctamente un mensaje. Si un suscriptor recibe un mensaje y falla durante el procesamiento, el editor no lo sabrá.

Lista Redis

Redis List es una estructura de datos que admite el bloqueo de comandos de lectura. Puede agregar y leer mensajes desde el principio o el final de la lista. Con base en esta estructura, puede crear una buena pila o cola para su sistema distribuido y, en la mayoría de los casos, esto será suficiente. Principales diferencias con Redis Pub/Sub:

  • El mensaje se entrega a un cliente. El primer cliente bloqueado por lectura recibirá los datos primero.
  • Clint debe iniciar él mismo la operación de lectura de cada mensaje. List no sabe nada sobre los clientes.
  • Los mensajes se almacenan hasta que alguien los lee o los elimina explícitamente. Si configura el servidor Redis para descargar datos en el disco, la confiabilidad del sistema aumenta dramáticamente.

Introducción a la transmisión

Agregar una entrada a una secuencia

Equipo XAGREGAR agrega una nueva entrada a la secuencia. Un registro no es sólo una cadena, sino que consta de uno o más pares clave-valor. Así, cada entrada ya está estructurada y se asemeja a la estructura de un archivo CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

En el ejemplo anterior, agregamos dos campos a la transmisión con el nombre (clave) "mystream": "sensor-id" y "temperatura" con los valores "1234" y "19.8", respectivamente. Como segundo argumento, el comando toma un identificador que se asignará a la entrada; este identificador identifica de forma única cada entrada en la secuencia. Sin embargo, en este caso pasamos * porque queremos que Redis genere una nueva identificación para nosotros. Cada nueva identificación aumentará. Por tanto, cada nueva entrada tendrá un identificador superior en relación a las entradas anteriores.

Formato de identificador

El ID de entrada devuelto por el comando. XAGREGAR, consta de dos partes:

{millisecondsTime}-{sequenceNumber}

milisegundosTiempo — Hora Unix en milisegundos (hora del servidor Redis). Sin embargo, si la hora actual es igual o menor que la hora de la grabación anterior, entonces se utiliza la marca de tiempo de la grabación anterior. Por lo tanto, si la hora del servidor retrocede en el tiempo, el nuevo identificador aún conservará la propiedad de incremento.

secuencia de números Se utiliza para registros creados en el mismo milisegundo. secuencia de números se incrementará en 1 respecto a la entrada anterior. Porque el secuencia de números tiene un tamaño de 64 bits, entonces, en la práctica, no debería encontrarse con un límite en la cantidad de registros que se pueden generar en un milisegundo.

El formato de estos identificadores puede parecer extraño a primera vista. Un lector desconfiado podría preguntarse por qué el tiempo forma parte del identificador. La razón es que las transmisiones de Redis admiten consultas de rango por ID. Dado que el identificador está asociado a la hora de creación del registro, esto permite consultar rangos de tiempo. Veremos un ejemplo específico cuando veamos el comando RANGO X.

Si por alguna razón el usuario necesita especificar su propio identificador, que, por ejemplo, está asociado con algún sistema externo, entonces podemos pasarlo al comando XAGREGAR en lugar de * como se muestra a continuación:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Tenga en cuenta que en este caso usted mismo debe controlar el incremento de ID. En nuestro ejemplo, el identificador mínimo es "0-1", por lo que el comando no aceptará otro identificador que sea igual o menor que "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Número de registros por secuencia

Es posible obtener la cantidad de registros en una secuencia simplemente usando el comando XLEN. Para nuestro ejemplo, este comando devolverá el siguiente valor:

> XLEN somestream
(integer) 2

Consultas de rango: XRANGE y XREVRANGE

Para solicitar datos por rango, debemos especificar dos identificadores: el principio y el final del rango. El rango devuelto incluirá todos los elementos, incluidos los límites. También hay dos identificadores especiales "-" y "+", que significan respectivamente el identificador más pequeño (primer registro) y más grande (último registro) de la secuencia. El siguiente ejemplo enumerará todas las entradas de la transmisión.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Cada registro devuelto es una matriz de dos elementos: un identificador y una lista de pares clave-valor. Ya dijimos que los identificadores de registros están relacionados con el tiempo. Por tanto, podemos solicitar un rango de un periodo de tiempo concreto. Sin embargo, podemos especificar en la solicitud no el identificador completo, sino solo la hora Unix, omitiendo la parte relacionada con secuencia de números. La parte omitida del identificador se pondrá automáticamente a cero al principio del rango y al valor máximo posible al final del rango. A continuación se muestra un ejemplo de cómo puede solicitar un rango de dos milisegundos.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Solo tenemos una entrada en este rango; sin embargo, en conjuntos de datos reales el resultado obtenido puede ser enorme. Por esta razón RANGO X admite la opción CONTAR. Al especificar la cantidad, simplemente podemos obtener los primeros N registros. Si necesitamos obtener los siguientes N registros (paginación), podemos usar el último ID recibido y aumentarlo. secuencia de números por uno y preguntar de nuevo. Veamos esto en el siguiente ejemplo. Empezamos sumando 10 elementos con XAGREGAR (suponiendo que mystream ya estuviera lleno con 10 elementos). Para comenzar la iteración obteniendo 2 elementos por comando, comenzamos con el rango completo pero con COUNT igual a 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Para continuar iterando con los siguientes dos elementos, debemos seleccionar el último ID recibido, es decir, 1519073279157-0, y agregar 1 a secuencia de números.
El ID resultante, en este caso 1519073279157-1, ahora se puede utilizar como nuevo argumento de inicio de rango para la siguiente llamada. RANGO X:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Etcétera. Porque la complejidad RANGO X es O(log(N)) para buscar y luego O(M) para devolver M elementos, entonces cada paso de iteración es rápido. Así, utilizando RANGO X Los flujos se pueden iterar de manera eficiente.

Equipo XREVRANGO es el equivalente RANGO X, pero devuelve los elementos en orden inverso:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Tenga en cuenta que el comando XREVRANGO toma los argumentos de rango para iniciar y detener en orden inverso.

Leer nuevas entradas usando XREAD

A menudo surge la tarea de suscribirse a una transmisión y recibir solo mensajes nuevos. Este concepto puede parecer similar a Redis Pub/Sub o bloquear Redis List, pero existen diferencias fundamentales en cómo usar Redis Stream:

  1. Cada mensaje nuevo se entrega a cada suscriptor de forma predeterminada. Este comportamiento es diferente de una Lista de Redis de bloqueo, donde solo un suscriptor leerá un mensaje nuevo.
  2. Mientras que en Redis Pub/Sub todos los mensajes se olvidan y nunca persisten, en Stream todos los mensajes se retienen indefinidamente (a menos que el cliente provoque explícitamente su eliminación).
  3. Redis Stream le permite diferenciar el acceso a los mensajes dentro de una secuencia. Un suscriptor específico sólo puede ver su historial de mensajes personales.

Puedes suscribirte a un hilo y recibir nuevos mensajes usando el comando XLEER. Es un poco más complicado que RANGO X, así que comenzaremos primero con los ejemplos más simples.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

El ejemplo anterior muestra un formulario sin bloqueo. XLEER. Tenga en cuenta que la opción CONTAR es opcional. De hecho, la única opción de comando requerida es la opción STREAMS, que especifica una lista de transmisiones junto con el identificador máximo correspondiente. Escribimos "STREAMS mystream 0" - queremos recibir todos los registros de mystream stream con un identificador mayor que "0-0". Como puede ver en el ejemplo, el comando devuelve el nombre del hilo porque podemos suscribirnos a varios hilos al mismo tiempo. Podríamos escribir, por ejemplo, "STREAMS mystream otherstream 0 0". Tenga en cuenta que después de la opción STREAMS debemos proporcionar primero los nombres de todas las transmisiones requeridas y solo luego una lista de identificadores.

En esta forma simple, el comando no hace nada especial en comparación con RANGO X. Sin embargo, lo interesante es que podemos convertir fácilmente XLEER a un comando de bloqueo, especificando el argumento BLOQUE:

> XREAD BLOCK 0 STREAMS mystream $

En el ejemplo anterior, se especifica una nueva opción BLOQUE con un tiempo de espera de 0 milisegundos (esto significa esperar indefinidamente). Además, en lugar de pasar el identificador habitual para la transmisión mystream, se pasó un identificador especial $. Este identificador especial significa que XLEER debe utilizar el identificador máximo en mystream como identificador. Por lo tanto, solo recibiremos mensajes nuevos a partir del momento en que comenzamos a escuchar. En cierto modo, esto es similar al comando "tail -f" de Unix.

Tenga en cuenta que cuando usamos la opción BLOQUEAR no necesariamente necesitamos usar el identificador especial $. Podemos utilizar cualquier identificador existente en la secuencia. Si el equipo puede atender nuestra solicitud inmediatamente sin bloquear, lo hará; de lo contrario, se bloqueará.

Bloqueo XLEER También puedes escuchar varios hilos a la vez, solo necesitas especificar sus nombres. En este caso, el comando devolverá un registro del primer flujo que recibió datos. El primer suscriptor bloqueado para un hilo determinado recibirá los datos primero.

Grupos de consumidores

En determinadas tareas, queremos limitar el acceso de los suscriptores a los mensajes dentro de un hilo. Un ejemplo en el que esto podría resultar útil es una cola de mensajes con trabajadores que recibirán diferentes mensajes de un hilo, lo que permitirá escalar el procesamiento de mensajes.

Si imaginamos que tenemos tres suscriptores C1, C2, C3 y un hilo que contiene los mensajes 1, 2, 3, 4, 5, 6, 7, entonces los mensajes se entregarán como en el siguiente diagrama:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Para lograr este efecto, Redis Stream utiliza un concepto llamado Consumer Group. Este concepto es similar a un pseudo-suscriptor, que recibe datos de una transmisión, pero en realidad es atendido por múltiples suscriptores dentro de un grupo, lo que brinda ciertas garantías:

  1. Cada mensaje se entrega a un suscriptor diferente dentro del grupo.
  2. Dentro de un grupo, los suscriptores se identifican por su nombre, que es una cadena que distingue entre mayúsculas y minúsculas. Si un suscriptor sale temporalmente del grupo, se le puede restaurar al grupo usando su propio nombre exclusivo.
  3. Cada grupo de consumidores sigue el concepto de "primer mensaje no leído". Cuando un suscriptor solicita mensajes nuevos, solo puede recibir mensajes que nunca hayan sido entregados previamente a ningún suscriptor dentro del grupo.
  4. Hay un comando para confirmar explícitamente que el suscriptor procesó exitosamente el mensaje. Hasta que se llame a este comando, el mensaje solicitado permanecerá en el estado "pendiente".
  5. Dentro del Grupo de Consumidores, cada suscriptor puede solicitar un historial de los mensajes que le fueron entregados, pero que aún no han sido procesados ​​(en el estado “pendiente”)

En cierto sentido, el estado del grupo se puede expresar de la siguiente manera:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Ahora es el momento de familiarizarse con los comandos principales del Grupo de Consumidores, a saber:

  • GRUPO X Se utiliza para crear, destruir y gestionar grupos.
  • GRUPO XREAD utilizado para leer la transmisión a través del grupo
  • XACK - este comando permite al suscriptor marcar el mensaje como procesado exitosamente

Creación de grupo de consumidores

Supongamos que mystream ya existe. Entonces el comando de creación de grupo se verá así:

> XGROUP CREATE mystream mygroup $
OK

Al crear un grupo, debemos pasar un identificador, a partir del cual el grupo recibirá mensajes. Si solo queremos recibir todos los mensajes nuevos, podemos usar el identificador especial $ (como en nuestro ejemplo anterior). Si especifica 0 en lugar de un identificador especial, todos los mensajes del hilo estarán disponibles para el grupo.

Ahora que el grupo está creado, podemos comenzar a leer mensajes inmediatamente usando el comando GRUPO XREAD. Este comando es muy similar a XLEER y admite la opción BLOQUE opcional. Sin embargo, existe una opción GRUPO requerida que siempre debe especificarse con dos argumentos: el nombre del grupo y el nombre del suscriptor. También se admite la opción CONTAR.

Antes de leer el hilo, pongamos algunos mensajes allí:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Ahora intentemos leer esta transmisión a través del grupo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

El comando anterior se lee textualmente de la siguiente manera:

"Yo, la suscriptora Alice, miembro de mygroup, quiero leer un mensaje de mystream que nunca antes se haya entregado a nadie".

Cada vez que un suscriptor realiza una operación en un grupo, debe proporcionar su nombre, identificándose de forma única dentro del grupo. Hay otro detalle muy importante en el comando anterior: el identificador especial ">". Este identificador especial filtra los mensajes y deja solo aquellos que nunca antes se han entregado.

Además, en casos especiales, puede especificar un identificador real como 0 o cualquier otro identificador válido. En este caso el comando GRUPO XREAD le devolverá un historial de mensajes con estado "pendiente" que se entregaron al suscriptor especificado (Alice) pero que aún no han sido reconocidos mediante el comando XACK.

Podemos probar este comportamiento especificando inmediatamente el ID 0, sin la opción COUNT. Simplemente veremos un único mensaje pendiente, es decir, el mensaje de la manzana:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Sin embargo, si confirmamos que el mensaje se procesó correctamente, ya no se mostrará:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Ahora es el turno de Bob de leer algo:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, miembro de mygroup, no pidió más de dos mensajes. El comando sólo informa mensajes no entregados debido al identificador especial ">". Como puede ver, el mensaje "manzana" no se mostrará porque ya se le entregó a Alice, por lo que Bob recibe "naranja" y "fresa".

De esta forma, Alice, Bob y cualquier otro suscriptor del grupo pueden leer diferentes mensajes de la misma transmisión. También pueden leer su historial de mensajes no procesados ​​o marcar mensajes como procesados.

Hay algunas cosas a tener en cuenta:

  • Tan pronto como el suscriptor considere que el mensaje es una orden GRUPO XREAD, este mensaje pasa al estado "pendiente" y se asigna a ese suscriptor específico. Otros suscriptores del grupo no podrán leer este mensaje.
  • Los suscriptores se crean automáticamente tras la primera mención, no es necesario crearlos explícitamente.
  • Con GRUPO XREAD Puedes leer mensajes de varios hilos diferentes al mismo tiempo; sin embargo, para que esto funcione, primero debes crear grupos con el mismo nombre para cada hilo usando GRUPO X

Recuperación después de un fracaso

El suscriptor puede recuperarse del fallo y volver a leer su lista de mensajes con el estado "pendiente". Sin embargo, en el mundo real, los suscriptores pueden terminar fallando. ¿Qué sucede con los mensajes bloqueados de un suscriptor si el suscriptor no puede recuperarse de una falla?
Consumer Group ofrece una función que se utiliza precisamente en estos casos: cuando es necesario cambiar el propietario de los mensajes.

Lo primero que debes hacer es llamar al comando. XPENDER, que muestra todos los mensajes del grupo con el estado "pendiente". En su forma más simple, el comando se llama con solo dos argumentos: el nombre del hilo y el nombre del grupo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

El equipo mostró la cantidad de mensajes sin procesar para todo el grupo y para cada suscriptor. Sólo tenemos a Bob con dos mensajes pendientes porque el único mensaje que Alice solicitó fue confirmado con XACK.

Podemos solicitar más información utilizando más argumentos:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id}: rango de identificadores (puede usar “-” y “+”)
{count}: número de intentos de entrega
{nombre-consumidor} - nombre del grupo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Ahora tenemos detalles de cada mensaje: ID, nombre del suscriptor, tiempo de inactividad en milisegundos y finalmente el número de intentos de entrega. Tenemos dos mensajes de Bob y han estado inactivos durante 74170458 milisegundos, unas 20 horas.

Tenga en cuenta que nadie nos impide comprobar cuál era el contenido del mensaje simplemente utilizando RANGO X.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Sólo tenemos que repetir dos veces el mismo identificador en los argumentos. Ahora que tenemos una idea, Alice podría decidir que después de 20 horas de inactividad, Bob probablemente no se recuperará, y es hora de consultar esos mensajes y continuar procesándolos para Bob. Para esto usamos el comando XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Usando este comando, podemos recibir un mensaje "extranjero" que aún no ha sido procesado cambiando el propietario a {consumidor}. Sin embargo, también podemos proporcionar un tiempo de inactividad mínimo {min-idle-time}. Esto ayuda a evitar una situación en la que dos clientes intenten cambiar simultáneamente el propietario de los mismos mensajes:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

El primer cliente restablecerá el tiempo de inactividad y aumentará el contador de entrega. Entonces el segundo cliente no podrá solicitarlo.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Alice reclamó el mensaje con éxito y ahora puede procesarlo y reconocerlo.

En el ejemplo anterior, puede ver que una solicitud exitosa devuelve el contenido del mensaje. Sin embargo, esto no es necesario. La opción JUSTID se puede utilizar para devolver ID de mensajes únicamente. Esto es útil si no está interesado en los detalles del mensaje y desea aumentar el rendimiento del sistema.

Mostrador de entrega

El contador que ves en la salida. XPENDER es el número de entregas de cada mensaje. Dicho contador se incrementa de dos maneras: cuando un mensaje se solicita con éxito a través de XCLAIM o cuando se utiliza una llamada GRUPO XREAD.

Es normal que algunos mensajes se entreguen varias veces. Lo principal es que todos los mensajes finalmente se procesan. A veces se producen problemas al procesar un mensaje porque el mensaje en sí está dañado o el procesamiento del mensaje provoca un error en el código del controlador. En este caso, puede resultar que nadie pueda procesar este mensaje. Como tenemos un contador de intentos de entrega, podemos utilizar este contador para detectar este tipo de situaciones. Por lo tanto, una vez que el recuento de entregas alcance el número alto que usted especifica, probablemente sería más prudente colocar dicho mensaje en otro hilo y enviar una notificación al administrador del sistema.

Estado del hilo

Equipo XINFO Se utiliza para solicitar información diversa sobre un hilo y sus grupos. Por ejemplo, un comando básico se ve así:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

El comando anterior muestra información general sobre la secuencia especificada. Ahora un ejemplo un poco más complejo:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

El comando anterior muestra información general para todos los grupos del hilo especificado.

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

El comando anterior muestra información de todos los suscriptores de la transmisión y el grupo especificados.
Si olvida la sintaxis del comando, simplemente pida ayuda al comando:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Límite de tamaño de transmisión

Muchas aplicaciones no quieren recopilar datos en una secuencia para siempre. A menudo resulta útil tener un número máximo de mensajes permitidos por hilo. En otros casos, resulta útil mover todos los mensajes de un hilo a otro almacén persistente cuando se alcanza el tamaño del hilo especificado. Puede limitar el tamaño de una secuencia utilizando el parámetro MAXLEN en el comando XAGREGAR:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Cuando se utiliza MAXLEN, los registros antiguos se eliminan automáticamente cuando alcanzan una longitud específica, por lo que la secuencia tiene un tamaño constante. Sin embargo, la poda en este caso no se produce de la forma más eficiente en la memoria de Redis. Puedes mejorar la situación de la siguiente manera:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

El argumento ~ en el ejemplo anterior significa que no necesariamente necesitamos limitar la longitud de la transmisión a un valor específico. En nuestro ejemplo, podría ser cualquier número mayor o igual a 1000 (por ejemplo, 1000, 1010 o 1030). Simplemente especificamos explícitamente que queremos que nuestra transmisión almacene al menos 1000 registros. Esto hace que la gestión de la memoria sea mucho más eficiente dentro de Redis.

También hay un equipo separado. XTRIM, que hace lo mismo:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Almacenamiento y replicación persistentes

Redis Stream se replica de forma asíncrona en nodos esclavos y se guarda en archivos como AOF (instantánea de todos los datos) y RDB (registro de todas las operaciones de escritura). También se admite la replicación del estado de grupos de consumidores. Por lo tanto, si un mensaje está en estado "pendiente" en el nodo maestro, entonces en los nodos esclavos este mensaje tendrá el mismo estado.

Eliminar elementos individuales de una secuencia

Hay un comando especial para borrar mensajes. XDEL. El comando obtiene el nombre del hilo seguido de los ID de los mensajes que se eliminarán:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Al utilizar este comando, debe tener en cuenta que la memoria real no se liberará inmediatamente.

Flujos de longitud cero

La diferencia entre los flujos y otras estructuras de datos de Redis es que cuando otras estructuras de datos ya no tienen elementos dentro de ellas, como efecto secundario, la estructura de datos en sí se eliminará de la memoria. Entonces, por ejemplo, el conjunto ordenado se eliminará por completo cuando la llamada ZREM elimine el último elemento. En cambio, se permite que los subprocesos permanezcan en la memoria incluso sin tener ningún elemento en su interior.

Conclusión

Redis Stream es ideal para crear intermediarios de mensajes, colas de mensajes, registros unificados y sistemas de chat para mantener el historial.

Como dije una vez Niklaus Wirth, los programas son algoritmos más estructuras de datos, y Redis ya te ofrece ambos.

Fuente: habr.com

Añadir un comentario