Redis Stream – confiabilidade e escalabilidade de seus sistemas de mensagens

Redis Stream – confiabilidade e escalabilidade de seus sistemas de mensagens

Redis Stream é um novo tipo de dados abstrato introduzido no Redis na versão 5.0
Conceitualmente, Redis Stream é uma lista à qual você pode adicionar entradas. Cada entrada possui um identificador exclusivo. Por padrão, o ID é gerado automaticamente e inclui um carimbo de data/hora. Portanto, você pode consultar intervalos de registros ao longo do tempo ou receber novos dados à medida que chegam no fluxo, da mesma forma que o comando "tail -f" do Unix lê um arquivo de log e congela enquanto espera por novos dados. Observe que vários clientes podem escutar um thread ao mesmo tempo, assim como muitos processos "tail -f" podem ler um arquivo simultaneamente sem entrar em conflito entre si.

Para entender todos os benefícios do novo tipo de dados, vamos dar uma olhada rápida nas estruturas Redis existentes há muito tempo que replicam parcialmente a funcionalidade do Redis Stream.

RedisPUB/SUB

Redis Pub/Sub é um sistema de mensagens simples já integrado ao seu armazenamento de valores-chave. No entanto, a simplicidade tem um custo:

  • Se o editor falhar por algum motivo, ele perderá todos os seus assinantes
  • O editor precisa saber o endereço exato de todos os seus assinantes
  • Um editor pode sobrecarregar seus assinantes com trabalho se os dados forem publicados mais rapidamente do que processados
  • A mensagem é excluída do buffer do editor imediatamente após a publicação, independentemente de quantos assinantes ela foi entregue e da rapidez com que eles conseguiram processar a mensagem.
  • Todos os assinantes receberão a mensagem ao mesmo tempo. Os próprios assinantes devem de alguma forma concordar entre si sobre a ordem de processamento da mesma mensagem.
  • Não há nenhum mecanismo integrado para confirmar se um assinante processou uma mensagem com êxito. Se um assinante receber uma mensagem e travar durante o processamento, o editor não saberá disso.

Lista Redis

Redis List é uma estrutura de dados que oferece suporte ao bloqueio de comandos de leitura. Você pode adicionar e ler mensagens do início ou do final da lista. Com base nessa estrutura, você pode criar uma boa pilha ou fila para o seu sistema distribuído e, na maioria dos casos, isso será suficiente. Principais diferenças do Redis Pub/Sub:

  • A mensagem é entregue a um cliente. O primeiro cliente com leitura bloqueada receberá os dados primeiro.
  • Clint deve iniciar ele mesmo a operação de leitura para cada mensagem. List não sabe nada sobre clientes.
  • As mensagens são armazenadas até que alguém as leia ou as exclua explicitamente. Se você configurar o servidor Redis para liberar dados no disco, a confiabilidade do sistema aumentará drasticamente.

Introdução ao fluxo

Adicionando uma entrada a um stream

Equipe XADD adiciona uma nova entrada ao fluxo. Um registro não é apenas uma string, ele consiste em um ou mais pares de valores-chave. Assim, cada entrada já vem estruturada e lembra a estrutura de um arquivo CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

No exemplo acima, adicionamos dois campos ao stream com o nome (chave) “mystream”: “sensor-id” e “temperature” com os valores “1234” e “19.8”, respectivamente. Como segundo argumento, o comando usa um identificador que será atribuído à entrada - esse identificador identifica exclusivamente cada entrada no fluxo. Porém, neste caso passamos * porque queremos que o Redis gere um novo ID para nós. Cada novo ID aumentará. Portanto, cada nova entrada terá um identificador superior em relação às entradas anteriores.

Formato do identificador

O ID de entrada retornado pelo comando XADD, consiste em duas partes:

{millisecondsTime}-{sequenceNumber}

milissegundosTime — Tempo Unix em milissegundos (horário do servidor Redis). No entanto, se a hora atual for igual ou menor que a hora da gravação anterior, será usado o carimbo de data/hora da gravação anterior. Portanto, se a hora do servidor voltar no tempo, o novo identificador ainda manterá a propriedade de incremento.

número sequencial usado para registros criados no mesmo milissegundo. número sequencial será aumentado em 1 em relação à entrada anterior. Porque o número sequencial tiver 64 bits de tamanho, então, na prática, você não deverá atingir um limite no número de registros que podem ser gerados em um milissegundo.

O formato desses identificadores pode parecer estranho à primeira vista. Um leitor desconfiado pode se perguntar por que o tempo faz parte do identificador. O motivo é que os streams do Redis suportam consultas de intervalo por ID. Como o identificador está associado ao horário de criação do registro, isso possibilita a consulta de intervalos de tempo. Veremos um exemplo específico quando examinarmos o comando XRANGE.

Se por algum motivo o usuário precisar especificar seu próprio identificador, que, por exemplo, está associado a algum sistema externo, podemos passá-lo para o comando XADD em vez de * como mostrado abaixo:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Observe que, neste caso, você mesmo deve monitorar o incremento de ID. No nosso exemplo, o identificador mínimo é “0-1”, portanto o comando não aceitará outro identificador igual ou menor que “0-1”.

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Número de registros por stream

É possível obter o número de registros em um stream simplesmente usando o comando XLEN. Para nosso exemplo, este comando retornará o seguinte valor:

> XLEN somestream
(integer) 2

Consultas de intervalo - XRANGE e XREVRANGE

Para solicitar dados por intervalo, precisamos especificar dois identificadores - o início e o fim do intervalo. O intervalo retornado incluirá todos os elementos, incluindo os limites. Existem também dois identificadores especiais “-” e “+”, significando respectivamente o menor (primeiro registro) e o maior (último registro) identificador no fluxo. O exemplo abaixo listará todas as entradas do stream.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Cada registro retornado é uma matriz de dois elementos: um identificador e uma lista de pares chave-valor. Já dissemos que os identificadores de registros estão relacionados ao tempo. Portanto, podemos solicitar um intervalo de tempo específico. Porém, podemos especificar na solicitação não o identificador completo, mas apenas o horário Unix, omitindo a parte relacionada ao número sequencial. A parte omitida do identificador será automaticamente zerada no início do intervalo e com o valor máximo possível no final do intervalo. Abaixo está um exemplo de como você pode solicitar um intervalo de dois milissegundos.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Temos apenas uma entrada neste intervalo, porém em conjuntos de dados reais o resultado retornado pode ser enorme. Por esta razão XRANGE suporta a opção COUNT. Ao especificar a quantidade, podemos simplesmente obter os primeiros N registros. Se precisarmos obter os próximos N registros (paginação), podemos usar o último ID recebido, aumentá-lo número sequencial por um e pergunte novamente. Vejamos isso no exemplo a seguir. Começamos a adicionar 10 elementos com XADD (assumindo que mystream já estava preenchido com 10 elementos). Para iniciar a iteração obtendo 2 elementos por comando, começamos com o intervalo completo, mas com COUNT igual a 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Para continuar a iteração com os próximos dois elementos, precisamos selecionar o último ID recebido, ou seja, 1519073279157-0, e adicionar 1 a número sequencial.
O ID resultante, neste caso 1519073279157-1, agora pode ser usado como o novo argumento de início de intervalo para a próxima chamada XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

E assim por diante. Porque a complexidade XRANGE é O(log(N)) para pesquisar e então O(M) para retornar M elementos, então cada etapa de iteração é rápida. Assim, usando XRANGE fluxos podem ser iterados com eficiência.

Equipe XREVRANGE é o equivalente XRANGE, mas retorna os elementos na ordem inversa:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Observe que o comando XREVRANGE leva argumentos de intervalo para iniciar e parar na ordem inversa.

Lendo novas entradas usando XREAD

Muitas vezes surge a tarefa de assinar um stream e receber apenas novas mensagens. Este conceito pode parecer semelhante ao Redis Pub/Sub ou ao bloqueio da Redis List, mas existem diferenças fundamentais em como usar o Redis Stream:

  1. Cada nova mensagem é entregue a todos os assinantes por padrão. Este comportamento é diferente de uma Lista Redis de bloqueio, onde uma nova mensagem será lida apenas por um assinante.
  2. Enquanto no Redis Pub/Sub todas as mensagens são esquecidas e nunca persistidas, no Stream todas as mensagens são retidas indefinidamente (a menos que o cliente cause explicitamente a exclusão).
  3. O Redis Stream permite diferenciar o acesso às mensagens dentro de um stream. Um assinante específico só pode ver seu histórico de mensagens pessoais.

Você pode assinar um tópico e receber novas mensagens usando o comando XLER. É um pouco mais complicado do que XRANGE, então começaremos primeiro com os exemplos mais simples.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

O exemplo acima mostra um formulário sem bloqueio XLER. Observe que a opção COUNT é opcional. Na verdade, a única opção de comando necessária é a opção STREAMS, que especifica uma lista de fluxos junto com o identificador máximo correspondente. Escrevemos “STREAMS mystream 0” - queremos receber todos os registros do stream mystream com um identificador maior que “0-0”. Como você pode ver no exemplo, o comando retorna o nome do thread porque podemos assinar vários threads ao mesmo tempo. Poderíamos escrever, por exemplo, "STREAMS mystream otherstream 0 0". Observe que após a opção STREAMS precisamos primeiro fornecer os nomes de todos os streams necessários e só então uma lista de identificadores.

Nesta forma simples, o comando não faz nada de especial comparado a XRANGE. No entanto, o interessante é que podemos facilmente transformar XLER para um comando de bloqueio, especificando o argumento BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

No exemplo acima, uma nova opção BLOCK é especificada com um tempo limite de 0 milissegundos (isso significa esperar indefinidamente). Além disso, em vez de passar o identificador usual para o fluxo mystream, um identificador especial $ foi passado. Este identificador especial significa que XLER deve usar o identificador máximo em mystream como identificador. Portanto, só receberemos novas mensagens a partir do momento em que começarmos a ouvi-las. De certa forma, isso é semelhante ao comando "tail -f" do Unix.

Observe que ao usar a opção BLOCK não precisamos necessariamente usar o identificador especial $. Podemos usar qualquer identificador existente no stream. Se a equipe conseguir atender nossa solicitação imediatamente sem bloqueio, ela o fará, caso contrário bloqueará.

Bloqueio XLER também pode ouvir vários threads ao mesmo tempo, bastando especificar seus nomes. Neste caso, o comando retornará um registro do primeiro fluxo que recebeu dados. O primeiro assinante bloqueado para um determinado thread receberá os dados primeiro.

Grupos de consumidores

Em determinadas tarefas, queremos limitar o acesso do assinante às mensagens dentro de um tópico. Um exemplo em que isso pode ser útil é uma fila de mensagens com trabalhadores que receberão mensagens diferentes de um thread, permitindo escalar o processamento de mensagens.

Se imaginarmos que temos três assinantes C1, C2, C3 e um thread que contém as mensagens 1, 2, 3, 4, 5, 6, 7, então as mensagens serão veiculadas conforme o diagrama abaixo:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Para conseguir esse efeito, o Redis Stream usa um conceito chamado Consumer Group. Este conceito é semelhante a um pseudo-assinante, que recebe dados de um fluxo, mas na verdade é servido por vários assinantes dentro de um grupo, fornecendo certas garantias:

  1. Cada mensagem é entregue a um assinante diferente dentro do grupo.
  2. Dentro de um grupo, os assinantes são identificados pelo nome, que é uma string que diferencia maiúsculas de minúsculas. Se um assinante sair temporariamente do grupo, ele poderá ser restaurado ao grupo usando seu próprio nome exclusivo.
  3. Cada Grupo de Consumidores segue o conceito de “primeira mensagem não lida”. Quando um assinante solicita novas mensagens, ele só poderá receber mensagens que nunca tenham sido entregues anteriormente a nenhum assinante do grupo.
  4. Existe um comando para confirmar explicitamente que a mensagem foi processada com sucesso pelo assinante. Até que este comando seja chamado, a mensagem solicitada permanecerá no status “pendente”.
  5. Dentro do Grupo de Consumidores, cada assinante pode solicitar um histórico de mensagens que lhe foram entregues, mas ainda não foram processadas (no status “pendente”)

De certo modo, o estado do grupo pode ser expresso da seguinte forma:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Agora é hora de conhecer os principais comandos do Grupo de Consumidores, a saber:

  • GRUPO X usado para criar, destruir e gerenciar grupos
  • XREADGROUP usado para ler fluxo através do grupo
  • XACK - este comando permite ao assinante marcar a mensagem como processada com sucesso

Criação de Grupo de Consumidores

Vamos supor que mystream já exista. Então o comando de criação de grupo ficará assim:

> XGROUP CREATE mystream mygroup $
OK

Ao criar um grupo devemos passar um identificador, a partir do qual o grupo receberá mensagens. Se quisermos apenas receber todas as novas mensagens, podemos usar o identificador especial $ (como no nosso exemplo acima). Se você especificar 0 em vez de um identificador especial, todas as mensagens no tópico estarão disponíveis para o grupo.

Agora que o grupo foi criado, podemos começar imediatamente a ler as mensagens usando o comando XREADGROUP. Este comando é muito semelhante ao XLER e suporta a opção opcional BLOCK. Entretanto, existe uma opção GROUP obrigatória que sempre deve ser especificada com dois argumentos: o nome do grupo e o nome do assinante. A opção COUNT também é suportada.

Antes de ler o tópico, vamos colocar algumas mensagens lá:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Agora vamos tentar ler este fluxo através do grupo:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

O comando acima é lido literalmente da seguinte forma:

“Eu, assinante Alice, membro do mygroup, quero ler uma mensagem do mystream que nunca foi entregue a ninguém antes.”

Cada vez que um assinante realiza uma operação em um grupo, ele deve fornecer seu nome, identificando-se de forma única dentro do grupo. Há mais um detalhe muito importante no comando acima - o identificador especial ">". Este identificador especial filtra as mensagens, deixando apenas aquelas que nunca foram entregues antes.

Além disso, em casos especiais, você pode especificar um identificador real como 0 ou qualquer outro identificador válido. Neste caso o comando XREADGROUP retornará um histórico de mensagens com status "pendente" que foram entregues ao assinante especificado (Alice), mas ainda não foram confirmadas usando o comando XACK.

Podemos testar esse comportamento especificando imediatamente o ID 0, sem a opção CONTAGEM. Veremos simplesmente uma única mensagem pendente, ou seja, a mensagem da apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

No entanto, se confirmarmos que a mensagem foi processada com sucesso, ela não será mais exibida:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Agora é a vez de Bob ler algo:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Bob, um membro do mygroup, pediu no máximo duas mensagens. O comando reporta apenas mensagens não entregues devido ao identificador especial ">". Como você pode ver, a mensagem “maçã” não será exibida pois já foi entregue para Alice, então Bob recebe “laranja” e “morango”.

Dessa forma, Alice, Bob e qualquer outro assinante do grupo podem ler mensagens diferentes do mesmo fluxo. Eles também podem ler o histórico de mensagens não processadas ou marcar mensagens como processadas.

Há algumas coisas a ter em mente:

  • Assim que o assinante considerar a mensagem um comando XREADGROUP, esta mensagem entra no estado “pendente” e é atribuída a esse assinante específico. Outros assinantes do grupo não poderão ler esta mensagem.
  • Os assinantes são criados automaticamente na primeira menção, não há necessidade de criá-los explicitamente.
  • Com XREADGROUP você pode ler mensagens de vários tópicos diferentes ao mesmo tempo, no entanto, para que isso funcione, você precisa primeiro criar grupos com o mesmo nome para cada tópico usando GRUPO X

Recuperação após uma falha

O assinante pode se recuperar da falha e reler sua lista de mensagens com status “pendente”. No entanto, no mundo real, os assinantes podem acabar falhando. O que acontece com as mensagens travadas de um assinante se ele não conseguir se recuperar de uma falha?
O Consumer Group oferece um recurso que é usado justamente nesses casos - quando você precisa alterar o proprietário das mensagens.

A primeira coisa que você precisa fazer é chamar o comando XPENDO, que exibe todas as mensagens do grupo com status “pendente”. Na sua forma mais simples, o comando é chamado com apenas dois argumentos: o nome do thread e o nome do grupo:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

A equipe exibiu o número de mensagens não processadas para todo o grupo e para cada assinante. Só temos Bob com duas mensagens pendentes porque a única mensagem solicitada por Alice foi confirmada com XACK.

Podemos solicitar mais informações usando mais argumentos:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - intervalo de identificadores (você pode usar “-” e “+”)
{count} — número de tentativas de entrega
{nome do consumidor} - nome do grupo

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Agora temos detalhes de cada mensagem: ID, nome do assinante, tempo ocioso em milissegundos e por fim o número de tentativas de entrega. Temos duas mensagens de Bob e elas ficaram ociosas por 74170458 milissegundos, cerca de 20 horas.

Observe que ninguém nos impede de verificar qual era o conteúdo da mensagem simplesmente usando XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Só temos que repetir o mesmo identificador duas vezes nos argumentos. Agora que temos uma ideia, Alice pode decidir que, após 20 horas de inatividade, Bob provavelmente não se recuperará e é hora de consultar essas mensagens e retomar o processamento delas para Bob. Para isso usamos o comando XREIVINDICAÇÃO:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Usando este comando, podemos receber uma mensagem “estrangeira” que ainda não foi processada alterando o proprietário para {consumidor}. No entanto, também podemos fornecer um tempo ocioso mínimo {min-idle-time}. Isto ajuda a evitar uma situação em que dois clientes tentam alterar simultaneamente o proprietário das mesmas mensagens:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

O primeiro cliente irá zerar o tempo de inatividade e aumentar o contador de entrega. Portanto, o segundo cliente não poderá solicitá-lo.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

A mensagem foi reivindicada com sucesso por Alice, que agora pode processar a mensagem e reconhecê-la.

No exemplo acima, você pode ver que uma solicitação bem-sucedida retorna o conteúdo da própria mensagem. No entanto, isso não é necessário. A opção JUSTID pode ser usada apenas para retornar IDs de mensagens. Isto é útil se você não estiver interessado nos detalhes da mensagem e quiser aumentar o desempenho do sistema.

Balcão de entrega

O contador que você vê na saída XPENDO é o número de entregas de cada mensagem. Tal contador é incrementado de duas maneiras: quando uma mensagem é solicitada com sucesso via XREIVINDICAÇÃO ou quando uma chamada é usada XREADGROUP.

É normal que algumas mensagens sejam entregues várias vezes. O principal é que todas as mensagens sejam eventualmente processadas. Às vezes, ocorrem problemas durante o processamento de uma mensagem porque a própria mensagem está corrompida ou o processamento da mensagem causa um erro no código do manipulador. Nesse caso, pode acontecer que ninguém consiga processar esta mensagem. Como temos um contador de tentativas de entrega, podemos utilizá-lo para detectar tais situações. Portanto, quando a contagem de entrega atingir o número alto especificado, provavelmente seria mais sensato colocar essa mensagem em outro thread e enviar uma notificação ao administrador do sistema.

Estado do tópico

Equipe XINFO usado para solicitar diversas informações sobre um thread e seus grupos. Por exemplo, um comando básico se parece com isto:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

O comando acima exibe informações gerais sobre o fluxo especificado. Agora um exemplo um pouco mais complexo:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

O comando acima exibe informações gerais para todos os grupos do thread especificado

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

O comando acima exibe informações de todos os assinantes do stream e grupo especificado.
Se você esquecer a sintaxe do comando, basta pedir ajuda ao próprio comando:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Limite de tamanho do fluxo

Muitos aplicativos não desejam coletar dados em um fluxo para sempre. Muitas vezes é útil ter um número máximo de mensagens permitidas por thread. Em outros casos, é útil mover todas as mensagens de um thread para outro armazenamento persistente quando o tamanho do thread especificado for atingido. Você pode limitar o tamanho de um stream usando o parâmetro MAXLEN no comando XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Ao usar MAXLEN, os registros antigos são excluídos automaticamente quando atingem um comprimento especificado, de modo que o fluxo tenha um tamanho constante. No entanto, a poda neste caso não ocorre da forma mais eficiente na memória Redis. Você pode melhorar a situação da seguinte maneira:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

O argumento ~ no exemplo acima significa que não precisamos necessariamente limitar o comprimento do fluxo a um valor específico. No nosso exemplo, poderia ser qualquer número maior ou igual a 1000 (por exemplo, 1000, 1010 ou 1030). Apenas especificamos explicitamente que queremos que nosso stream armazene pelo menos 1000 registros. Isso torna o gerenciamento de memória muito mais eficiente dentro do Redis.

Há também uma equipe separada XTRIM, que faz a mesma coisa:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Armazenamento e replicação persistentes

O Redis Stream é replicado de forma assíncrona para nós escravos e salvo em arquivos como AOF (instantâneo de todos os dados) e RDB (log de todas as operações de gravação). A replicação do estado de grupos de consumidores também é suportada. Portanto, se uma mensagem estiver no status “pendente” no nó mestre, então nos nós escravos esta mensagem terá o mesmo status.

Removendo elementos individuais de um stream

Existe um comando especial para excluir mensagens XDEL. O comando obtém o nome do thread seguido dos IDs das mensagens a serem excluídas:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Ao usar este comando, você precisa levar em consideração que a memória real não será liberada imediatamente.

Fluxos de comprimento zero

A diferença entre streams e outras estruturas de dados Redis é que quando outras estruturas de dados não possuem mais elementos dentro delas, como efeito colateral, a própria estrutura de dados será removida da memória. Assim, por exemplo, o conjunto classificado será completamente removido quando a chamada ZREM remover o último elemento. Em vez disso, os threads podem permanecer na memória mesmo sem nenhum elemento dentro.

Conclusão

O Redis Stream é ideal para criar agentes de mensagens, filas de mensagens, registro unificado e sistemas de bate-papo para manutenção de histórico.

Como eu disse uma vez Niklaus Wirth, os programas são algoritmos mais estruturas de dados, e o Redis já oferece os dois.

Fonte: habr.com

Adicionar um comentário