Escribiendo nuestro propio módulo caducado limitado para tarantool

Escribiendo nuestro propio módulo caducado limitado para tarantool

Hace algún tiempo nos enfrentamos al problema de limpiar tuplas en espacios. herramienta de tarano. La limpieza no debía iniciarse cuando Tarantool ya se estaba quedando sin memoria, sino con antelación y con cierta frecuencia. Para esta tarea, tarantool tiene un módulo escrito en Lua llamado vencimiento. Después de usar este módulo por un corto tiempo, nos dimos cuenta de que no era adecuado para nosotros: debido a la limpieza constante de grandes cantidades de datos, Lua se colgaba en el GC. Por lo tanto, pensamos en desarrollar nuestro propio módulo caducado con límite, con la esperanza de que el código escrito en un lenguaje de programación nativo resolviera nuestros problemas de la mejor manera posible.

Un buen ejemplo para nosotros fue el módulo tarantool llamado memcached. El enfoque utilizado en él se basa en el hecho de que se crea un campo separado en el espacio, que indica la vida útil de la tupla, en otras palabras, ttl. El módulo en segundo plano escanea el espacio, compara el TTL con la hora actual y decide si eliminar la tupla o no. El código del módulo Memcached es simple y elegante, pero demasiado genérico. En primer lugar, no tiene en cuenta el tipo de índice que se rastrea y elimina. En segundo lugar, en cada pasada se escanean todas las tuplas, cuyo número puede ser bastante grande. Y si en el módulo caducado se resolvió el primer problema (el índice del árbol se separó en una clase separada), el segundo aún no recibió atención. Estos tres puntos predeterminaron la elección a favor de escribir mi propio código.

Descripción

La documentación de tarantool tiene una muy buena tutorial sobre cómo escribir tus procedimientos almacenados en C. En primer lugar, te sugiero que te familiarices con él para poder entender esas inserciones con comandos y código que aparecerán a continuación. También vale la pena prestar atención a referencia a objetos que están disponibles al escribir su propio módulo limitado, es decir box, fibra, índice и txn.

Comencemos desde lejos y veamos cómo se ve un módulo caducado limitado desde el exterior:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Para simplificar, iniciamos tarantool en el directorio donde se encuentra nuestra biblioteca libcapped-expirationd.so. Se exportan dos funciones desde la biblioteca: iniciar y finalizar. El primer paso es hacer que estas funciones estén disponibles en Lua usando box.schema.func.create y box.schema.user.grant. Luego cree un espacio cuyas tuplas contendrán solo tres campos: el primero es un identificador único, el segundo es el correo electrónico y el tercero es la vida útil de la tupla. Construimos un índice de árbol encima del primer campo y lo llamamos primario. A continuación obtenemos el objeto de conexión a nuestra biblioteca nativa.

Después del trabajo preparatorio, ejecute la función de inicio:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Este ejemplo funcionará durante el escaneo exactamente igual que el módulo caducado, que está escrito en Lua. El primer argumento de la función de inicio es el nombre único de la tarea. El segundo es el identificador de espacio. El tercero es un índice único mediante el cual se eliminarán las tuplas. El cuarto es el índice por el cual se recorrerán las tuplas. El quinto es el número del campo de tupla con vida útil (¡la numeración comienza desde 1, no desde 0!). El sexto y el séptimo son configuraciones de escaneo. 1024 es el número máximo de tuplas que se pueden ver en una sola transacción. 3600: tiempo de escaneo completo en segundos.

Tenga en cuenta que el ejemplo utiliza el mismo índice para rastrear y eliminar. Si se trata de un índice de árbol, entonces el recorrido se realiza desde la clave más pequeña a la más grande. Si hay algún otro, por ejemplo, un índice hash, entonces el recorrido se realiza, por regla general, en orden aleatorio. Todas las tuplas espaciales se escanean en una sola exploración.

Insertemos varias tuplas en el espacio con una vida útil de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Comprobemos que la inserción fue exitosa:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Repitamos la selección después de más de 60 segundos (contando desde el comienzo de la inserción de la primera tupla) y veamos que el módulo caducado limitado ya se ha procesado:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Detengamos la tarea:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Veamos un segundo ejemplo en el que se utiliza un índice independiente para el rastreo:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Aquí todo es igual que en el primer ejemplo, con algunas excepciones. Construimos un índice de árbol encima del tercer campo y lo llamamos exp. Este índice no tiene por qué ser único, a diferencia del índice llamado primario. El recorrido se realizará por índice exp y la eliminación por primario. Recordamos que anteriormente ambas se hacían únicamente utilizando el índice primario.

Después del trabajo preparatorio, ejecutamos la función de inicio con nuevos argumentos:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Insertemos nuevamente varias tuplas en el espacio con una vida útil de 60 segundos:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Después de 30 segundos, por analogía, agregaremos algunas tuplas más:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Comprobemos que la inserción fue exitosa:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Repitamos la selección después de más de 60 segundos (contando desde el comienzo de la inserción de la primera tupla) y veamos que el módulo caducado limitado ya se ha procesado:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Aún quedan algunas tuplas en el espacio a las que les quedarán unos 30 segundos más de vida. Además, el escaneo se detuvo al pasar de una tupla con un ID de 2 y una vida útil de 1576421257 a una tupla con un ID de 3 y una vida útil de 1576421287. Las tuplas con una vida útil de 1576421287 o más no se escanearon debido al orden de las claves del índice exp. Estos son los ahorros que queríamos lograr desde el principio.

Detengamos la tarea:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

implementación

La mejor manera de conocer todas las características de un proyecto es su fuente original. código! Como parte de la publicación, nos centraremos sólo en los puntos más importantes, a saber, los algoritmos de derivación espacial.

Los argumentos que pasamos al método de inicio se almacenan en una estructura llamada expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

El atributo de nombre es el nombre de la tarea. El atributo space_id es el identificador del espacio. El atributo rm_index_id es el identificador del índice único mediante el cual se eliminarán las tuplas. El atributo it_index_id es el identificador del índice por el cual se recorrerán las tuplas. El atributo it_index_type es el tipo de índice por el cual se recorrerán las tuplas. El atributo filed_no es el número del campo de tupla con duración. El atributo scan_size es el número máximo de tuplas que se escanean en una transacción. El atributo scan_time es el tiempo de análisis completo en segundos.

No consideraremos analizar argumentos. Este es un trabajo minucioso pero sencillo, en el que la biblioteca te ayudará msgpuck. Las dificultades sólo pueden surgir con índices que se pasan desde Lua como una estructura de datos compleja con el tipo mp_map, y sin utilizar los tipos simples mp_bool, mp_double, mp_int, mp_uint y mp_array. Pero no es necesario analizar todo el índice. Sólo necesitas comprobar su unicidad, calcular el tipo y extraer el identificador.

Enumeramos los prototipos de todas las funciones que se utilizan para el análisis:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Ahora pasemos a lo más importante: la lógica de omitir el espacio y eliminar tuplas. Cada bloque de tuplas no mayor que scan_size se escanea y modifica en una sola transacción. Si tiene éxito, esta transacción se confirma; si se produce un error, se revierte. El último argumento de la función expirationd_iterate es un puntero al iterador desde el cual comienza o continúa el escaneo. Este iterador se incrementa internamente hasta que ocurre un error, se agota el espacio o no es posible detener el proceso por adelantado. La función expirationd_expired verifica la vida útil de una tupla, expirationd_delete elimina una tupla, expirationd_breakable verifica si necesitamos seguir adelante.

Código de función Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Código de función expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Código de función Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Código de función Expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

solicitud

Puedes ver el código fuente en aquí!

Fuente: habr.com

Añadir un comentario