Há algum tempo nos deparamos com o problema de limpar tuplas em espaços
Um bom exemplo para nós foi o módulo tarantool chamado
descrição
A documentação do tarantool tem um ótimo
Vamos começar de longe e ver como é um módulo expirado com limite visto de fora:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Para simplificar, iniciamos o tarantool no diretório onde nossa biblioteca libcapped-expirationd.so está localizada. Duas funções são exportadas da biblioteca: start e kill. O primeiro passo é disponibilizar essas funções em Lua usando box.schema.func.create e box.schema.user.grant. Em seguida, crie um espaço cujas tuplas conterão apenas três campos: o primeiro é um identificador exclusivo, o segundo é email e o terceiro é o tempo de vida da tupla. Construímos um índice de árvore no topo do primeiro campo e o chamamos de primário. Em seguida, obtemos o objeto de conexão com nossa biblioteca nativa.
Após o trabalho preparatório, execute a função start:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Este exemplo funcionará durante a varredura exatamente da mesma forma que o módulo expirationd, que está escrito em Lua. O primeiro argumento para a função start é o nome exclusivo da tarefa. O segundo é o identificador do espaço. O terceiro é um índice exclusivo pelo qual as tuplas serão excluídas. O quarto é o índice pelo qual as tuplas serão percorridas. O quinto é o número do campo da tupla com tempo de vida (a numeração começa em 1, não em 0!). O sexto e o sétimo são configurações de digitalização. 1024 é o número máximo de tuplas que podem ser visualizadas em uma única transação. 3600 — tempo de varredura completo em segundos.
Observe que o exemplo usa o mesmo índice para rastreamento e exclusão. Se este for um índice de árvore, o percurso será realizado da chave menor para a maior. Se houver algum outro, por exemplo, índice hash, então o percurso é realizado, via de regra, em ordem aleatória. Todas as tuplas espaciais são varridas em uma varredura.
Vamos inserir várias tuplas no espaço com vida útil de 60 segundos:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Vamos verificar se a inserção foi bem-sucedida:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Vamos repetir o select após mais de 60 segundos (contando desde o início da inserção da primeira tupla) e ver se o módulo capped expirationd já foi processado:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Vamos parar a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Vejamos um segundo exemplo em que um índice separado é usado para o rastreamento:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Tudo aqui é igual ao primeiro exemplo, com algumas exceções. Construímos um índice de árvore no topo do terceiro campo e o chamamos de exp. Este índice não precisa ser único, ao contrário do índice denominado primário. A travessia será realizada pelo índice exp e a exclusão pelo primário. Lembramos que anteriormente ambos eram feitos apenas utilizando o índice primário.
Após o trabalho preparatório, executamos a função start com novos argumentos:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Vamos inserir várias tuplas no espaço novamente com um tempo de vida de 60 segundos:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Após 30 segundos, por analogia, adicionaremos mais algumas tuplas:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Vamos verificar se a inserção foi bem-sucedida:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Vamos repetir o select após mais de 60 segundos (contando desde o início da inserção da primeira tupla) e ver se o módulo capped expirationd já foi processado:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ainda restam algumas tuplas no espaço que terão cerca de mais 30 segundos de vida. Além disso, a varredura parou ao passar de uma tupla com um ID 2 e um tempo de vida de 1576421257 para uma tupla com um ID de 3 e um tempo de vida de 1576421287. Tuplas com um tempo de vida de 1576421287 ou mais não foram varridas devido à ordem de as chaves de índice exp. Esta é a economia que queríamos alcançar desde o início.
Vamos parar a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementação
A melhor maneira de contar todas as características de um projeto é sua fonte original.
Os argumentos que passamos para o método start são armazenados em uma estrutura chamada expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
O atributo name é o nome da tarefa. O atributo space_id é o identificador do espaço. O atributo rm_index_id é o identificador do índice exclusivo pelo qual as tuplas serão excluídas. O atributo it_index_id é o identificador do índice pelo qual as tuplas serão percorridas. O atributo it_index_type é o tipo de índice pelo qual as tuplas serão percorridas. O atributo arquivado_no é o número do campo da tupla com tempo de vida. O atributo scan_size é o número máximo de tuplas que são varridas em uma transação. O atributo scan_time é o tempo completo de verificação em segundos.
Não consideraremos a análise de argumentos. Este é um trabalho árduo mas simples, com o qual a biblioteca irá ajudá-lo
Listamos os protótipos de todas as funções usadas para análise:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Agora vamos para a coisa mais importante - a lógica de ignorar o espaço e excluir tuplas. Cada bloco de tuplas não maior que scan_size é verificado e modificado em uma única transação. Se for bem-sucedida, esta transação será confirmada; se ocorrer um erro, ela será revertida. O último argumento para a função expirationd_iterate é um ponteiro para o iterador a partir do qual a varredura começa ou continua. Este iterador é incrementado internamente até que ocorra um erro, o espaço acabe ou não seja possível interromper o processo antecipadamente. A função expirationd_expired verifica o tempo de vida de uma tupla, expirationd_delete exclui uma tupla, expirationd_breakable verifica se precisamos seguir em frente.
Código da função Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Código de função expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Código da função Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Código da função Expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplicação
Você pode visualizar o código-fonte em
Fonte: habr.com