Redis Stream: fiabilidade e escalabilidade dos teus sistemas de mensaxería

Redis Stream: fiabilidade e escalabilidade dos teus sistemas de mensaxería

Redis Stream é un novo tipo de datos abstractos introducido en Redis coa versión 5.0
Conceptualmente, Redis Stream é unha lista á que pode engadir entradas. Cada entrada ten un identificador único. Por defecto, o ID xérase automaticamente e inclúe unha marca de tempo. Polo tanto, pode consultar intervalos de rexistros ao longo do tempo ou recibir novos datos a medida que chegan ao fluxo, ao igual que o comando "tail -f" de Unix le un ficheiro de rexistro e conxélase mentres esperan novos datos. Teña en conta que varios clientes poden escoitar un fío ao mesmo tempo, así como moitos procesos "tail -f" poden ler un ficheiro ao mesmo tempo sen entrar en conflito entre si.

Para comprender todas as vantaxes do novo tipo de datos, imos dar unha rápida ollada ás estruturas de Redis que hai moito tempo que replican parcialmente a funcionalidade de Redis Stream.

Redis PUB/SUB

Redis Pub/Sub é un sistema de mensaxería sinxelo que xa está integrado na túa tenda de valores clave. Non obstante, a sinxeleza ten un custo:

  • Se o editor falla por algún motivo, entón perde todos os seus subscritores
  • O editor necesita coñecer o enderezo exacto de todos os seus subscritores
  • Un editor pode sobrecargar de traballo aos seus subscritores se os datos se publican máis rápido do que se procesan
  • A mensaxe elimínase do búfer do editor inmediatamente despois da publicación, independentemente de cantos subscritores se entregou e da rapidez con que puideron procesar esta mensaxe.
  • Todos os subscritores recibirán a mensaxe ao mesmo tempo. Os propios subscritores deben acordar dalgún xeito entre eles a orde de procesamento da mesma mensaxe.
  • Non hai ningún mecanismo integrado para confirmar que un subscritor procesou correctamente unha mensaxe. Se un subscritor recibe unha mensaxe e falla durante o procesamento, o editor non o saberá.

Lista Redis

Redis List é unha estrutura de datos que admite o bloqueo de comandos de lectura. Podes engadir e ler mensaxes desde o principio ou o final da lista. Con base nesta estrutura, podes facer unha boa pila ou cola para o teu sistema distribuído e, na maioría dos casos, será suficiente. Principais diferenzas con Redis Pub/Sub:

  • A mensaxe entrégase a un cliente. O primeiro cliente bloqueado en lectura recibirá os datos primeiro.
  • Clint debe iniciar a operación de lectura de cada mensaxe por si mesmo. Lista non sabe nada de clientes.
  • As mensaxes gárdanse ata que alguén as le ou as elimina de forma explícita. Se configuras o servidor Redis para descargar datos no disco, a fiabilidade do sistema aumenta drasticamente.

Introdución a Stream

Engadir unha entrada a un fluxo

Equipo XADD engade unha nova entrada ao fluxo. Un rexistro non é só unha cadea, está formado por un ou máis pares clave-valor. Así, cada entrada xa está estruturada e aseméllase á estrutura dun ficheiro CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

No exemplo anterior, engadimos dous campos ao fluxo co nome (chave) "mystream": "sensor-id" e "temperature" cos valores "1234" e "19.8", respectivamente. Como segundo argumento, o comando toma un identificador que se asignará á entrada; este identificador identifica de forma única cada entrada no fluxo. Non obstante, neste caso pasamos * porque queremos que Redis xere un novo ID para nós. Cada novo ID aumentará. Polo tanto, cada nova entrada terá un identificador superior en relación ás entradas anteriores.

Formato do identificador

O ID de entrada devolto polo comando XADD, consta de dúas partes:

{millisecondsTime}-{sequenceNumber}

milisegundosTempo — Tempo Unix en milisegundos (hora do servidor Redis). Non obstante, se a hora actual é igual ou inferior á hora da gravación anterior, utilízase a marca de tempo da gravación anterior. Polo tanto, se o tempo do servidor retrocede no tempo, o novo identificador aínda conservará a propiedade de incremento.

sequenceNumber usado para rexistros creados no mesmo milisegundo. sequenceNumber incrementarase en 1 con respecto á entrada anterior. Porque o sequenceNumber ten un tamaño de 64 bits, entón na práctica non deberías atopar un límite no número de rexistros que se poden xerar nun milisegundo.

O formato destes identificadores pode parecer estraño a primeira vista. Un lector desconfiado pode preguntarse por que o tempo forma parte do identificador. O motivo é que os fluxos de Redis admiten consultas de intervalo por ID. Dado que o identificador está asociado coa hora en que se creou o rexistro, isto fai posible consultar intervalos de tempo. Veremos un exemplo específico cando vexamos o comando XRANGE.

Se por algún motivo o usuario precisa especificar o seu propio identificador, que, por exemplo, está asociado a algún sistema externo, podemos pasalo ao comando XADD en lugar de * como se mostra a continuación:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Teña en conta que, neste caso, debe supervisar vostede mesmo o incremento do DNI. No noso exemplo, o identificador mínimo é "0-1", polo que o comando non aceptará outro identificador igual ou inferior a "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Número de rexistros por emisión

É posible obter o número de rexistros nun fluxo simplemente usando o comando XLEN. Para o noso exemplo, este comando devolverá o seguinte valor:

> XLEN somestream
(integer) 2

Consultas de intervalo: XRANGE e XREVRANGE

Para solicitar datos por intervalo, necesitamos especificar dous identificadores: o principio e o final do intervalo. O intervalo devolto incluirá todos os elementos, incluídos os límites. Tamén hai dous identificadores especiais "-" e "+", que significan respectivamente o identificador máis pequeno (primeiro rexistro) e maior (último rexistro) do fluxo. O seguinte exemplo enumerará todas as entradas do fluxo.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Cada rexistro devolto é unha matriz de dous elementos: un identificador e unha lista de pares clave-valor. Xa dixemos que os identificadores de rexistro están relacionados co tempo. Polo tanto, podemos solicitar un intervalo dun período de tempo específico. Non obstante, podemos especificar na solicitude non o identificador completo, senón só a hora de Unix, omitindo a parte relacionada con sequenceNumber. A parte omitida do identificador establecerase automaticamente en cero ao principio do intervalo e ao máximo valor posible ao final do intervalo. A continuación móstrase un exemplo de como pode solicitar un intervalo de dous milisegundos.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Só temos unha entrada neste intervalo, pero en conxuntos de datos reais o resultado devolto pode ser enorme. Por esta razón XRANGE admite a opción COUNT. Ao especificar a cantidade, podemos simplemente obter os primeiros N rexistros. Se necesitamos obter os seguintes N rexistros (paxinación), podemos usar o último ID recibido, aumentalo sequenceNumber por unha e pregunta de novo. Vexamos isto no seguinte exemplo. Comezamos a engadir 10 elementos con XADD (supoñendo que mystream xa estaba cheo de 10 elementos). Para comezar a iteración obtendo 2 elementos por comando, comezamos co rango completo pero con COUNT igual a 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Para seguir iterando cos dous elementos seguintes, necesitamos seleccionar o último ID recibido, é dicir, 1519073279157-0, e engadir 1 a sequenceNumber.
O ID resultante, neste caso 1519073279157-1, agora pódese usar como o novo argumento de inicio do intervalo para a seguinte chamada XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Etcétera. Porque a complexidade XRANGE é O(log(N)) para buscar e despois O(M) para devolver M elementos, entón cada paso de iteración é rápido. Así, utilizando XRANGE os fluxos pódense iterar de forma eficiente.

Equipo XREVRANGE é o equivalente XRANGE, pero devolve os elementos en orde inversa:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Teña en conta que o comando XREVRANGE toma os argumentos do intervalo de inicio e detención en orde inversa.

Lendo novas entradas usando XREAD

Moitas veces xorde a tarefa de subscribirse a un fluxo e recibir só mensaxes novas. Este concepto pode parecer similar ao Redis Pub/Sub ou ao bloqueo de Redis List, pero hai diferenzas fundamentais na forma de usar Redis Stream:

  1. Cada nova mensaxe entrégase a todos os subscritores por defecto. Este comportamento é diferente dunha Lista Redis de bloqueo, onde unha mensaxe nova só será lida por un subscritor.
  2. Mentres en Redis Pub/Sub todas as mensaxes son esquecidas e nunca persisten, en Stream todas as mensaxes consérvanse indefinidamente (a non ser que o cliente provoque explícitamente a eliminación).
  3. Redis Stream permítelle diferenciar o acceso ás mensaxes dentro dun fluxo. Un subscritor específico só pode ver o seu historial de mensaxes persoal.

Podes subscribirte a un fío e recibir novas mensaxes usando o comando XREAD. É un pouco máis complicado que XRANGE, entón comezaremos cos exemplos máis sinxelos primeiro.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

O exemplo anterior mostra un formulario non bloqueador XREAD. Teña en conta que a opción COUNT é opcional. De feito, a única opción de comando necesaria é a opción STREAMS, que especifica unha lista de fluxos xunto co identificador máximo correspondente. Escribimos "STREAMS mystream 0": queremos recibir todos os rexistros do mystream stream cun identificador superior a "0-0". Como podes ver no exemplo, o comando devolve o nome do fío porque podemos subscribirnos a varios fíos ao mesmo tempo. Poderíamos escribir, por exemplo, "STREAMS mystream otherstream 0 0". Teña en conta que despois da opción STREAMS necesitamos proporcionar primeiro os nomes de todos os fluxos necesarios e só despois unha lista de identificadores.

Nesta forma sinxela, o comando non fai nada especial en comparación con XRANGE. Non obstante, o interesante é que podemos virar facilmente XREAD a un comando de bloqueo, especificando o argumento BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

No exemplo anterior, especifícase unha nova opción BLOQUEO cun tempo de espera de 0 milisegundos (isto significa esperar indefinidamente). Ademais, en lugar de pasar o identificador habitual para o fluxo mystream, pasou un identificador especial $. Este identificador especial significa que XREAD debe usar o identificador máximo en mystream como identificador. Polo tanto, só recibiremos novas mensaxes a partir do momento en que comezamos a escoitar. Nalgúns aspectos, isto é semellante ao comando "tail -f" de Unix.

Teña en conta que ao usar a opción BLOQUEAR non necesitamos necesariamente utilizar o identificador especial $. Podemos usar calquera identificador existente no fluxo. Se o equipo pode atender a nosa solicitude de inmediato sen bloquear, farao, se non, bloquearase.

Bloqueo XREAD tamén pode escoitar varios fíos á vez, só precisa especificar os seus nomes. Neste caso, o comando devolverá un rexistro do primeiro fluxo que recibiu datos. O primeiro subscritor bloqueado para un fío determinado recibirá os datos primeiro.

Grupos de Consumidores

En determinadas tarefas, queremos limitar o acceso dos subscritores ás mensaxes dentro dun fío. Un exemplo no que isto podería ser útil é unha fila de mensaxes con traballadores que recibirán diferentes mensaxes dun fío, permitindo que o procesamento de mensaxes se escala.

Se imaxinamos que temos tres subscritores C1, C2, C3 e un fío que contén as mensaxes 1, 2, 3, 4, 5, 6, 7, entón as mensaxes serán servidas como no diagrama seguinte:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Para conseguir este efecto, Redis Stream utiliza un concepto chamado Consumer Group. Este concepto é similar a un pseudo-abonado, que recibe datos dun fluxo, pero en realidade é atendido por varios subscritores dentro dun grupo, o que ofrece certas garantías:

  1. Cada mensaxe envíase a un subscritor diferente dentro do grupo.
  2. Dentro dun grupo, os subscritores identifícanse polo seu nome, que é unha cadea que distingue entre maiúsculas e minúsculas. Se un subscritor abandona temporalmente o grupo, pódese restaurar no grupo usando o seu propio nome único.
  3. Cada grupo de consumidores segue o concepto de "primeira mensaxe sen ler". Cando un subscritor solicita novas mensaxes, só pode recibir mensaxes que nunca foron entregadas previamente a ningún abonado do grupo.
  4. Hai un comando para confirmar explícitamente que a mensaxe foi procesada correctamente polo subscritor. Ata que se chame a este comando, a mensaxe solicitada permanecerá no estado "pendente".
  5. Dentro do Grupo de Consumidores, cada abonado pode solicitar un historial das mensaxes que lle foron entregadas, pero que aínda non foron procesadas (no estado "pendente").

En certo sentido, o estado do grupo pódese expresar do seguinte xeito:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Agora é hora de familiarizarse cos principais comandos para o Grupo de Consumidores, a saber:

  • GRUPO X usado para crear, destruír e xestionar grupos
  • XREADGROUP úsase para ler o fluxo a través do grupo
  • XACK - este comando permítelle ao subscritor marcar a mensaxe como procesada correctamente

Creación do Grupo de Consumidores

Supoñamos que mystream xa existe. A continuación, o comando de creación do grupo terá o seguinte aspecto:

> XGROUP CREATE mystream mygroup $
OK

Ao crear un grupo, debemos pasar un identificador, a partir do cal o grupo recibirá mensaxes. Se só queremos recibir todas as mensaxes novas, entón podemos usar o identificador especial $ (como no noso exemplo anterior). Se especificas 0 en lugar dun identificador especial, todas as mensaxes do fío estarán dispoñibles para o grupo.

Agora que o grupo está creado, podemos comezar inmediatamente a ler mensaxes usando o comando XREADGROUP. Este comando é moi semellante a XREAD e admite a opción BLOQUEAR opcional. Non obstante, hai unha opción GROUP obrigatoria que sempre debe especificarse con dous argumentos: o nome do grupo e o nome do abonado. Tamén se admite a opción COUNT.

Antes de ler o fío, poñamos alí algunhas mensaxes:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Agora imos tentar ler este fluxo a través do grupo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

O comando anterior di textualmente do seguinte xeito:

"Eu, a subscritora Alice, membro do meu grupo, quero ler unha mensaxe de mystream que nunca antes se entregara a ninguén".

Cada vez que un abonado realiza unha operación nun grupo, debe proporcionar o seu nome, identificándose de forma única dentro do grupo. Hai un detalle moi importante máis no comando anterior: o identificador especial ">". Este identificador especial filtra as mensaxes, deixando só as que nunca se entregaron antes.

Ademais, en casos especiais, pode especificar un identificador real como 0 ou calquera outro identificador válido. Neste caso o comando XREADGROUP devolverache un historial de mensaxes cun estado "pendente" que foron entregadas ao subscritor especificado (Alice) pero que aínda non se recoñeceron mediante o comando XACK.

Podemos probar este comportamento especificando inmediatamente o ID 0, sen a opción COUNT. Simplemente veremos unha única mensaxe pendente, é dicir, a mensaxe da mazá:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Non obstante, se confirmamos que a mensaxe se procesou correctamente, xa non se mostrará:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Agora tócalle a Bob ler algo:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, membro do meu grupo, non pediu máis de dúas mensaxes. O comando só informa das mensaxes non entregadas debido ao identificador especial ">". Como podes ver, a mensaxe "mazá" non se mostrará xa que xa foi entregada a Alicia, polo que Bob recibe "laranxa" e "amorodo".

Deste xeito, Alice, Bob e calquera outro subscritor do grupo poden ler diferentes mensaxes do mesmo fluxo. Tamén poden ler o seu historial de mensaxes non procesadas ou marcar mensaxes como procesadas.

Hai algunhas cousas a ter en conta:

  • Tan pronto como o subscritor considere que a mensaxe é un comando XREADGROUP, esta mensaxe pasa ao estado "pendente" e asígnase a ese subscritor específico. Outros subscritores do grupo non poderán ler esta mensaxe.
  • Os subscritores créanse automaticamente na primeira mención, non é necesario crealos de forma explícita.
  • Con XREADGROUP podes ler mensaxes de varios fíos diferentes ao mesmo tempo, pero para que isto funcione, primeiro debes crear grupos co mesmo nome para cada fío usando GRUPO X

Recuperación despois dun fallo

O subscritor pode recuperarse do fallo e volver ler a súa lista de mensaxes co estado "pendente". Non obstante, no mundo real, os subscritores poden fallar. Que ocorre coas mensaxes atascadas dun subscritor se o subscritor non pode recuperarse dun fallo?
Consumer Group ofrece unha función que se usa só para estes casos, cando precisa cambiar o propietario das mensaxes.

O primeiro que cómpre facer é chamar ao comando EXPENDENTE, que mostra todas as mensaxes do grupo co estado "pendente". Na súa forma máis sinxela, o comando chámase con só dous argumentos: o nome do fío e o nome do grupo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

O equipo mostrou o número de mensaxes sen procesar para todo o grupo e para cada abonado. Só temos a Bob con dúas mensaxes pendentes porque a única mensaxe que pediu Alice foi confirmada XACK.

Podemos solicitar máis información usando máis argumentos:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - intervalo de identificadores (podes usar “-” e “+”)
{count} — número de intentos de entrega
{nome-consumidor} - nome do grupo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Agora temos detalles para cada mensaxe: ID, nome do subscritor, tempo de inactividade en milisegundos e finalmente o número de intentos de entrega. Temos dúas mensaxes de Bob e estiveron inactivas durante 74170458 milisegundos, unhas 20 horas.

Teña en conta que ninguén nos impide comprobar cal era o contido da mensaxe simplemente usando XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Só temos que repetir o mesmo identificador dúas veces nos argumentos. Agora que temos algunha idea, Alice pode decidir que despois de 20 horas de inactividade, Bob probablemente non se recupere, e é hora de consultar esas mensaxes e retomar a procesalas para Bob. Para iso usamos o comando XRECLAMACIÓN:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Usando este comando, podemos recibir unha mensaxe "estranxeira" que aínda non foi procesada cambiando o propietario a {consumer}. Non obstante, tamén podemos proporcionar un tempo mínimo de inactividade {min-idle-time}. Isto axuda a evitar unha situación na que dous clientes intenten cambiar simultaneamente o propietario das mesmas mensaxes:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

O primeiro cliente restablecerá o tempo de inactividade e aumentará o contador de entrega. Polo tanto, o segundo cliente non poderá solicitalo.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

A mensaxe foi reclamada con éxito por Alice, que agora pode procesala e recoñecela.

No exemplo anterior, podes ver que unha solicitude exitosa devolve o contido da propia mensaxe. Non obstante, isto non é necesario. A opción JUSTID só se pode usar para devolver ID de mensaxes. Isto é útil se non estás interesado nos detalles da mensaxe e queres aumentar o rendemento do sistema.

Contador de reparto

O contador que ves na saída EXPENDENTE é o número de entregas de cada mensaxe. Tal contador increméntase de dúas formas: cando se solicita unha mensaxe con éxito mediante XRECLAMACIÓN ou cando se utiliza unha chamada XREADGROUP.

É normal que algunhas mensaxes se entreguen varias veces. O principal é que todas as mensaxes sexan finalmente procesadas. Ás veces ocorren problemas ao procesar unha mensaxe porque a propia mensaxe está corrompida ou o procesamento da mensaxe provoca un erro no código do controlador. Neste caso, pode resultar que ninguén poderá procesar esta mensaxe. Dado que temos un contador de intentos de entrega, podemos utilizar este contador para detectar tales situacións. Polo tanto, unha vez que o número de entregas alcance o número alto que especificas, probablemente sería máis prudente poñer esa mensaxe noutro fío e enviar unha notificación ao administrador do sistema.

Estado do fío

Equipo XINFO úsase para solicitar información sobre un fío e os seus grupos. Por exemplo, un comando básico ten o seguinte aspecto:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

O comando anterior mostra información xeral sobre o fluxo especificado. Agora un exemplo un pouco máis complexo:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

O comando anterior mostra información xeral para todos os grupos do fío especificado

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

O comando anterior mostra información para todos os subscritores do fluxo e do grupo especificados.
Se esqueces a sintaxe do comando, só tes que pedir axuda ao propio comando:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Límite de tamaño do fluxo

Moitas aplicacións non queren recoller datos nun fluxo para sempre. Moitas veces é útil ter un número máximo de mensaxes permitidas por fío. Noutros casos, é útil mover todas as mensaxes dun fío a outro almacén persistente cando se alcanza o tamaño do fío especificado. Podes limitar o tamaño dun fluxo usando o parámetro MAXLEN do comando XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Ao usar MAXLEN, os rexistros antigos elimínanse automaticamente cando alcanzan unha lonxitude especificada, polo que o fluxo ten un tamaño constante. Non obstante, a poda neste caso non se produce da forma máis eficiente na memoria Redis. Podes mellorar a situación do seguinte xeito:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

O argumento ~ no exemplo anterior significa que non necesariamente necesitamos limitar a lonxitude do fluxo a un valor específico. No noso exemplo, este pode ser calquera número maior ou igual a 1000 (por exemplo, 1000, 1010 ou 1030). Acabamos de especificar explícitamente que queremos que o noso fluxo almacene polo menos 1000 rexistros. Isto fai que a xestión da memoria sexa moito máis eficiente dentro de Redis.

Tamén hai un equipo separado XTRIM, que fai o mesmo:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Almacenamento e replicación persistentes

Redis Stream replícase de forma asíncrona nos nodos escravos e gárdase en ficheiros como AOF (instantánea de todos os datos) e RDB (rexistro de todas as operacións de escritura). Tamén se admite a replicación do estado dos grupos de consumidores. Polo tanto, se unha mensaxe está no estado "pendente" no nodo mestre, entón nos nodos escravos esta mensaxe terá o mesmo estado.

Eliminar elementos individuais dun fluxo

Hai un comando especial para eliminar mensaxes XDEL. O comando recibe o nome do fío seguido dos ID da mensaxe que se queren eliminar:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Ao usar este comando, cómpre ter en conta que a memoria real non se liberará inmediatamente.

Fluxos de lonxitude cero

A diferenza entre fluxos e outras estruturas de datos de Redis é que cando outras estruturas de datos xa non teñen elementos dentro delas, como efecto secundario, a propia estrutura de datos eliminarase da memoria. Así, por exemplo, o conxunto ordenado eliminarase completamente cando a chamada ZREM elimine o último elemento. Pola contra, os fíos poden permanecer na memoria aínda que non teñan ningún elemento dentro.

Conclusión

Redis Stream é ideal para crear intermediarios de mensaxes, colas de mensaxes, rexistro unificado e sistemas de chat para manter o historial.

Como dixen unha vez Niklaus Wirth, os programas son algoritmos máis estruturas de datos, e Redis xa che ofrece os dous.

Fonte: www.habr.com

Engadir un comentario