Escribindo o noso propio módulo de caducidade para tarantool

Escribindo o noso propio módulo de caducidade para tarantool

Hai tempo afrontámonos co problema da limpeza de tuplas nos espazos tarantool. A limpeza tivo que comezar non cando o tarantool xa se esgotaba a memoria, senón con antelación e cunha determinada frecuencia. Para esta tarefa, tarantool ten un módulo escrito en Lua chamado caducidade. Despois de usar este módulo por pouco tempo, decatámonos de que non era axeitado para nós: debido á limpeza constante de grandes cantidades de datos, Lua colgou no GC. Polo tanto, pensamos en desenvolver o noso propio módulo con caducidade limitada, esperando que o código escrito nunha linguaxe de programación nativa resolva os nosos problemas da mellor maneira posible.

Un bo exemplo para nós foi o módulo Tarantool chamado memcached. O enfoque utilizado nel baséase no feito de que se crea un campo separado no espazo, o que indica a vida útil da tupla, noutras palabras, ttl. O módulo en segundo plano escanea o espazo, compara o TTL coa hora actual e decide se eliminar a tupla ou non. O código do módulo memcached é sinxelo e elegante, pero demasiado xenérico. En primeiro lugar, non ten en conta o tipo de índice que se está explorando e eliminando. En segundo lugar, en cada pasada son escaneadas todas as tuplas, cuxo número pode ser bastante grande. E se no módulo con caducidade resolveuse o primeiro problema (o índice da árbore separouse nunha clase separada), entón o segundo aínda non recibiu atención. Estes tres puntos predeterminaron a elección a favor de escribir o meu propio código.

Descrición

A documentación para tarantool ten unha moi boa titorial sobre como escribir os teus procedementos almacenados en C. En primeiro lugar, suxíroche que te familiarices con el para comprender esas insercións con comandos e código que aparecerán a continuación. Tamén paga a pena prestarlle atención referencia aos obxectos que están dispoñibles ao escribir o seu propio módulo limitado, é dicir caixa, fibra, índice и txn.

Comecemos de lonxe e vexamos como é un módulo con caducidade limitada desde fóra:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Para simplificar, lanzamos tarantool no directorio onde se atopa a nosa biblioteca libcapped-expirationd.so. Desde a biblioteca expórtanse dúas funcións: iniciar e matar. O primeiro paso é facer que estas funcións estean dispoñibles en Lua usando box.schema.func.create e box.schema.user.grant. A continuación, cree un espazo cuxas tuplas conterán só tres campos: o primeiro é un identificador único, o segundo é o correo electrónico e o terceiro é o tempo de vida da tupla. Construímos un índice de árbore enriba do primeiro campo e chamámolo principal. A continuación, obtemos o obxecto de conexión á nosa biblioteca nativa.

Despois do traballo preparatorio, execute a función de inicio:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Este exemplo funcionará durante a dixitalización exactamente igual que o módulo caducado, que está escrito en Lua. O primeiro argumento da función de inicio é o nome único da tarefa. O segundo é o identificador do espazo. O terceiro é un índice único polo cal se eliminarán as tuplas. O cuarto é o índice polo que se percorrerán as tuplas. O quinto é o número do campo de tupla con vida útil (a numeración comeza a partir de 1, non de 0!). O sexto e o sétimo son a configuración de dixitalización. 1024 é o número máximo de tuplas que se poden ver nunha única transacción. 3600: tempo de exploración completo en segundos.

Teña en conta que o exemplo usa o mesmo índice para rastrexar e eliminar. Se este é un índice de árbore, entón o percorrido realízase desde a clave máis pequena ata a máis grande. Se hai outro índice, por exemplo, hash, entón o percorrido realízase, por regra xeral, en orde aleatoria. Todas as tuplas espaciais son dixitalizadas nunha soa exploración.

Imos inserir varias tuplas no espazo cunha vida útil de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Comprobamos que a inserción foi correcta:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Repetimos a selección despois de máis de 60 segundos (contando desde o inicio da inserción da primeira tupla) e vexamos que o módulo con caducidade limitada xa se procesou:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Detemos a tarefa:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Vexamos un segundo exemplo onde se usa un índice separado para o rastrexo:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Aquí todo é o mesmo que no primeiro exemplo, con algunhas excepcións. Construímos un índice de árbore enriba do terceiro campo e chamámoslle exp. Este índice non ten que ser único, a diferenza do índice chamado primario. A travesía realizarase mediante o índice exp e a eliminación polo principal. Lembramos que anteriormente ambas se facían só usando o índice primario.

Despois do traballo preparatorio, executamos a función de inicio con novos argumentos:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Imos inserir varias tuplas no espazo de novo cunha vida útil de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Despois de 30 segundos, por analoxía, engadiremos algunhas tuplas máis:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Comprobamos que a inserción foi correcta:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Repetimos a selección despois de máis de 60 segundos (contando desde o inicio da inserción da primeira tupla) e vexamos que o módulo con caducidade limitada xa se procesou:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Aínda quedan algunhas tuplas no espazo que terán uns 30 segundos máis de vida. Ademais, a exploración detívose ao pasar dunha tupla cun ID de 2 e unha vida útil de 1576421257 a unha tupla cun ID de 3 e unha vida útil de 1576421287. Non se analizaron as tuplas cunha vida útil de 1576421287 ou máis debido á orde de as claves do índice exp. Este é o aforro que queriamos conseguir desde o principio.

Detemos a tarefa:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Implantación

A mellor forma de contar todas as características dun proxecto é a súa fonte orixinal. código! Como parte da publicación, centrarémonos só nos puntos máis importantes, a saber, os algoritmos de bypass espacial.

Os argumentos que pasamos ao método de inicio almacénanse nunha estrutura chamada expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

O atributo name é o nome da tarefa. O atributo space_id é o identificador do espazo. O atributo rm_index_id é o identificador do índice único polo cal se eliminarán as tuplas. O atributo it_index_id é o identificador do índice polo que se percorrerán as tuplas. O atributo it_index_type é o tipo de índice polo que se atravesarán as tuplas. O atributo filed_no é o número do campo de tupla con duración. O atributo scan_size é o número máximo de tuplas que se analizan nunha transacción. O atributo scan_time é o tempo de exploración completo en segundos.

Non consideraremos analizar argumentos. Este é un traballo minucioso pero sinxelo, co que a biblioteca vos axudará msgpuck. As dificultades só poden xurdir con índices que se pasan desde Lua como unha estrutura de datos complexa co tipo mp_map, e non usando os tipos simples mp_bool, mp_double, mp_int, mp_uint e mp_array. Pero non é necesario analizar todo o índice. Só tes que comprobar a súa unicidade, calcular o tipo e extraer o identificador.

Listamos os prototipos de todas as funcións que se usan para analizar:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Agora pasemos ao máis importante: a lóxica de ignorar o espazo e eliminar tuplas. Cada bloque de tuplas non maior que scan_size é escaneado e modificado nunha única transacción. Se ten éxito, esta transacción confirmarase; se se produce un erro, revérase. O último argumento da función expirationd_iterate é un punteiro ao iterador desde o cal comeza ou continúa a exploración. Este iterador increméntase internamente ata que se produce un erro, se esgota o espazo ou non é posible deter o proceso con antelación. A función expirationd_expired comproba a vida útil dunha tupla, expirationd_delete elimina unha tupla, expirationd_breakable verifica se hai que seguir adiante.

Código de función Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Código de función expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Código de función Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Código de función expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

App

Podes ver o código fonte en aquí!

Fonte: www.habr.com

Engadir un comentario