Hai tempo afrontámonos co problema da limpeza de tuplas nos espazos
Un bo exemplo para nós foi o módulo Tarantool chamado
Descrición
A documentación para tarantool ten unha moi boa
Comecemos de lonxe e vexamos como é un módulo con caducidade limitada desde fóra:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Para simplificar, lanzamos tarantool no directorio onde se atopa a nosa biblioteca libcapped-expirationd.so. Desde a biblioteca expórtanse dúas funcións: iniciar e matar. O primeiro paso é facer que estas funcións estean dispoñibles en Lua usando box.schema.func.create e box.schema.user.grant. A continuación, cree un espazo cuxas tuplas conterán só tres campos: o primeiro é un identificador único, o segundo é o correo electrónico e o terceiro é o tempo de vida da tupla. Construímos un índice de árbore enriba do primeiro campo e chamámolo principal. A continuación, obtemos o obxecto de conexión á nosa biblioteca nativa.
Despois do traballo preparatorio, execute a función de inicio:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Este exemplo funcionará durante a dixitalización exactamente igual que o módulo caducado, que está escrito en Lua. O primeiro argumento da función de inicio é o nome único da tarefa. O segundo é o identificador do espazo. O terceiro é un índice único polo cal se eliminarán as tuplas. O cuarto é o índice polo que se percorrerán as tuplas. O quinto é o número do campo de tupla con vida útil (a numeración comeza a partir de 1, non de 0!). O sexto e o sétimo son a configuración de dixitalización. 1024 é o número máximo de tuplas que se poden ver nunha única transacción. 3600: tempo de exploración completo en segundos.
Teña en conta que o exemplo usa o mesmo índice para rastrexar e eliminar. Se este é un índice de árbore, entón o percorrido realízase desde a clave máis pequena ata a máis grande. Se hai outro índice, por exemplo, hash, entón o percorrido realízase, por regra xeral, en orde aleatoria. Todas as tuplas espaciais son dixitalizadas nunha soa exploración.
Imos inserir varias tuplas no espazo cunha vida útil de 60 segundos:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Comprobamos que a inserción foi correcta:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Repetimos a selección despois de máis de 60 segundos (contando desde o inicio da inserción da primeira tupla) e vexamos que o módulo con caducidade limitada xa se procesou:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Detemos a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Vexamos un segundo exemplo onde se usa un índice separado para o rastrexo:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Aquí todo é o mesmo que no primeiro exemplo, con algunhas excepcións. Construímos un índice de árbore enriba do terceiro campo e chamámoslle exp. Este índice non ten que ser único, a diferenza do índice chamado primario. A travesía realizarase mediante o índice exp e a eliminación polo principal. Lembramos que anteriormente ambas se facían só usando o índice primario.
Despois do traballo preparatorio, executamos a función de inicio con novos argumentos:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Imos inserir varias tuplas no espazo de novo cunha vida útil de 60 segundos:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Despois de 30 segundos, por analoxía, engadiremos algunhas tuplas máis:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Comprobamos que a inserción foi correcta:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Repetimos a selección despois de máis de 60 segundos (contando desde o inicio da inserción da primeira tupla) e vexamos que o módulo con caducidade limitada xa se procesou:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Aínda quedan algunhas tuplas no espazo que terán uns 30 segundos máis de vida. Ademais, a exploración detívose ao pasar dunha tupla cun ID de 2 e unha vida útil de 1576421257 a unha tupla cun ID de 3 e unha vida útil de 1576421287. Non se analizaron as tuplas cunha vida útil de 1576421287 ou máis debido á orde de as claves do índice exp. Este é o aforro que queriamos conseguir desde o principio.
Detemos a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implantación
A mellor forma de contar todas as características dun proxecto é a súa fonte orixinal.
Os argumentos que pasamos ao método de inicio almacénanse nunha estrutura chamada expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
O atributo name é o nome da tarefa. O atributo space_id é o identificador do espazo. O atributo rm_index_id é o identificador do índice único polo cal se eliminarán as tuplas. O atributo it_index_id é o identificador do índice polo que se percorrerán as tuplas. O atributo it_index_type é o tipo de índice polo que se atravesarán as tuplas. O atributo filed_no é o número do campo de tupla con duración. O atributo scan_size é o número máximo de tuplas que se analizan nunha transacción. O atributo scan_time é o tempo de exploración completo en segundos.
Non consideraremos analizar argumentos. Este é un traballo minucioso pero sinxelo, co que a biblioteca vos axudará
Listamos os prototipos de todas as funcións que se usan para analizar:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Agora pasemos ao máis importante: a lóxica de ignorar o espazo e eliminar tuplas. Cada bloque de tuplas non maior que scan_size é escaneado e modificado nunha única transacción. Se ten éxito, esta transacción confirmarase; se se produce un erro, revérase. O último argumento da función expirationd_iterate é un punteiro ao iterador desde o cal comeza ou continúa a exploración. Este iterador increméntase internamente ata que se produce un erro, se esgota o espazo ou non é posible deter o proceso con antelación. A función expirationd_expired comproba a vida útil dunha tupla, expirationd_delete elimina unha tupla, expirationd_breakable verifica se hai que seguir adiante.
Código de función Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Código de función expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Código de función Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Código de función expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
App
Podes ver o código fonte en
Fonte: www.habr.com