Escrevendo nosso próprio módulo expirado limitado para tarantool

Escrevendo nosso próprio módulo expirado limitado para tarantool

Há algum tempo nos deparamos com o problema de limpar tuplas em espaços taranol. A limpeza não teve que ser iniciada quando o tarantool já estava sem memória, mas com antecedência e com certa frequência. Para esta tarefa, tarantool possui um módulo escrito em Lua chamado expiração. Após um curto período de uso deste módulo, percebemos que ele não era adequado para nós: devido à limpeza constante de grandes quantidades de dados, Lua travou no GC. Portanto, pensamos em desenvolver nosso próprio módulo de expiração limitada, esperando que o código escrito em uma linguagem de programação nativa resolvesse nossos problemas da melhor maneira possível.

Um bom exemplo para nós foi o módulo tarantool chamado memcached. A abordagem nele utilizada baseia-se no fato de ser criado um campo separado no espaço, que indica o tempo de vida da tupla, ou seja, ttl. O módulo em segundo plano verifica o espaço, compara o TTL com a hora atual e decide se deseja excluir a tupla ou não. O código do módulo memcached é simples e elegante, mas muito genérico. Primeiro, não leva em consideração o tipo de índice que está sendo rastreado e excluído. Em segundo lugar, em cada passagem todas as tuplas são varridas, cujo número pode ser bastante grande. E se no módulo expirado o primeiro problema foi resolvido (o índice da árvore foi separado em uma classe separada), o segundo ainda não recebeu atenção. Esses três pontos predeterminaram a escolha de escrever meu próprio código.

descrição

A documentação do tarantool tem um ótimo tutorial sobre como escrever seus procedimentos armazenados em C. Primeiramente sugiro que você se familiarize com ele para entender aquelas inserções com comandos e códigos que aparecerão a seguir. Também vale a pena prestar atenção referência para objetos que estão disponíveis ao escrever seu próprio módulo limitado, ou seja, caixa, fibra, índice и txn.

Vamos começar de longe e ver como é um módulo expirado com limite visto de fora:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Para simplificar, iniciamos o tarantool no diretório onde nossa biblioteca libcapped-expirationd.so está localizada. Duas funções são exportadas da biblioteca: start e kill. O primeiro passo é disponibilizar essas funções em Lua usando box.schema.func.create e box.schema.user.grant. Em seguida, crie um espaço cujas tuplas conterão apenas três campos: o primeiro é um identificador exclusivo, o segundo é email e o terceiro é o tempo de vida da tupla. Construímos um índice de árvore no topo do primeiro campo e o chamamos de primário. Em seguida, obtemos o objeto de conexão com nossa biblioteca nativa.

Após o trabalho preparatório, execute a função start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Este exemplo funcionará durante a varredura exatamente da mesma forma que o módulo expirationd, que está escrito em Lua. O primeiro argumento para a função start é o nome exclusivo da tarefa. O segundo é o identificador do espaço. O terceiro é um índice exclusivo pelo qual as tuplas serão excluídas. O quarto é o índice pelo qual as tuplas serão percorridas. O quinto é o número do campo da tupla com tempo de vida (a numeração começa em 1, não em 0!). O sexto e o sétimo são configurações de digitalização. 1024 é o número máximo de tuplas que podem ser visualizadas em uma única transação. 3600 — tempo de varredura completo em segundos.

Observe que o exemplo usa o mesmo índice para rastreamento e exclusão. Se este for um índice de árvore, o percurso será realizado da chave menor para a maior. Se houver algum outro, por exemplo, índice hash, então o percurso é realizado, via de regra, em ordem aleatória. Todas as tuplas espaciais são varridas em uma varredura.

Vamos inserir várias tuplas no espaço com vida útil de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Vamos verificar se a inserção foi bem-sucedida:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Vamos repetir o select após mais de 60 segundos (contando desde o início da inserção da primeira tupla) e ver se o módulo capped expirationd já foi processado:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Vamos parar a tarefa:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Vejamos um segundo exemplo em que um índice separado é usado para o rastreamento:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Tudo aqui é igual ao primeiro exemplo, com algumas exceções. Construímos um índice de árvore no topo do terceiro campo e o chamamos de exp. Este índice não precisa ser único, ao contrário do índice denominado primário. A travessia será realizada pelo índice exp e a exclusão pelo primário. Lembramos que anteriormente ambos eram feitos apenas utilizando o índice primário.

Após o trabalho preparatório, executamos a função start com novos argumentos:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Vamos inserir várias tuplas no espaço novamente com um tempo de vida de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Após 30 segundos, por analogia, adicionaremos mais algumas tuplas:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Vamos verificar se a inserção foi bem-sucedida:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Vamos repetir o select após mais de 60 segundos (contando desde o início da inserção da primeira tupla) e ver se o módulo capped expirationd já foi processado:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ainda restam algumas tuplas no espaço que terão cerca de mais 30 segundos de vida. Além disso, a varredura parou ao passar de uma tupla com um ID 2 e um tempo de vida de 1576421257 para uma tupla com um ID de 3 e um tempo de vida de 1576421287. Tuplas com um tempo de vida de 1576421287 ou mais não foram varridas devido à ordem de as chaves de índice exp. Esta é a economia que queríamos alcançar desde o início.

Vamos parar a tarefa:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Implementação

A melhor maneira de contar todas as características de um projeto é sua fonte original. código! Como parte da publicação, focaremos apenas nos pontos mais importantes, ou seja, algoritmos de desvio de espaço.

Os argumentos que passamos para o método start são armazenados em uma estrutura chamada expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

O atributo name é o nome da tarefa. O atributo space_id é o identificador do espaço. O atributo rm_index_id é o identificador do índice exclusivo pelo qual as tuplas serão excluídas. O atributo it_index_id é o identificador do índice pelo qual as tuplas serão percorridas. O atributo it_index_type é o tipo de índice pelo qual as tuplas serão percorridas. O atributo arquivado_no é o número do campo da tupla com tempo de vida. O atributo scan_size é o número máximo de tuplas que são varridas em uma transação. O atributo scan_time é o tempo completo de verificação em segundos.

Não consideraremos a análise de argumentos. Este é um trabalho árduo mas simples, com o qual a biblioteca irá ajudá-lo msgpuck. Dificuldades só podem surgir com índices que são passados ​​​​de Lua como uma estrutura de dados complexa com o tipo mp_map, e não usando os tipos simples mp_bool, mp_double, mp_int, mp_uint e mp_array. Mas não há necessidade de analisar todo o índice. Basta verificar sua exclusividade, calcular o tipo e extrair o identificador.

Listamos os protótipos de todas as funções usadas para análise:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Agora vamos para a coisa mais importante - a lógica de ignorar o espaço e excluir tuplas. Cada bloco de tuplas não maior que scan_size é verificado e modificado em uma única transação. Se for bem-sucedida, esta transação será confirmada; se ocorrer um erro, ela será revertida. O último argumento para a função expirationd_iterate é um ponteiro para o iterador a partir do qual a varredura começa ou continua. Este iterador é incrementado internamente até que ocorra um erro, o espaço acabe ou não seja possível interromper o processo antecipadamente. A função expirationd_expired verifica o tempo de vida de uma tupla, expirationd_delete exclui uma tupla, expirationd_breakable verifica se precisamos seguir em frente.

Código da função Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Código de função expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Código da função Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Código da função Expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplicação

Você pode visualizar o código-fonte em aqui!

Fonte: habr.com

Adicionar um comentário