为 tarantool 编写我们自己的上限过期模块

为 tarantool 编写我们自己的上限过期模块

前段时间我们遇到了清理空间中元组的问题 塔兰图尔。 清理不能在 tarantool 内存不足时开始,而是提前并以一定的频率开始。 对于这个任务,tarantool 有一个用 Lua 编写的模块,名为 过期。 使用这个模块一段时间后,我们意识到它不适合我们:由于不断清理大量数据,Lua挂在了GC中。 因此,我们考虑开发自己的 cappedexpirationd 模块,希望用原生编程语言编写的代码能够以最好的方式解决我们的问题。

对我们来说一个很好的例子是 tarantool 模块,名为 memcached。 其中使用的方法是基于在空间中创建一个单独的字段,该字段指示元组的生命周期,换句话说,ttl。 后台模块扫描空间,将TTL与当前时间进行比较,决定是否删除该元组。 memcached模块代码简单优雅,但过于通用。 首先,它没有考虑正在爬行和删除的索引的类型。 其次,每次扫描都会扫描所有元组,其数量可能非常大。 如果在expirationd模块中解决了第一个问题(树索引被分离到一个单独的类中),那么第二个问题仍然没有受到任何关注。 这三点决定了我自己编写代码的选择。

使用说明

tarantool 的文档非常好 教程 关于如何用 C 编写存储过程。首先,我建议您熟悉它,以便理解下面将出现的那些使用命令和代码进行插入的内容。 也值得关注 参考 编写您自己的上限模块时可用的对象,即 , 纤维, 指数 и TXN.

让我们从远处开始,从外部看一下上限过期模块是什么样子的:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

为简单起见,我们在 libcapped-expirationd.so 库所在的目录中启动 tarantool。 从库中导出两个函数:start 和kill。 第一步是使用 box.schema.func.create 和 box.schema.user.grant 从 Lua 中使用这些函数。 然后创建一个空间,其元组仅包含三个字段:第一个是唯一标识符,第二个是电子邮件,第三个是元组的生命周期。 我们在第一个字段之上构建一个树索引,并将其称为主索引。 接下来我们获取到本机库的连接对象。

准备工作完成后,运行start函数:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

该示例在扫描过程中的工作方式与用 Lua 编写的expirationd 模块完全相同。 启动函数的第一个参数是任务的唯一名称。 第二个是空间标识符。 第三个是唯一索引,通过该索引可以删除元组。 第四个是遍历元组的索引。 第五个是具有生命周期的元组字段的编号(编号从1开始,而不是0!)。 第六和第七是扫描设置。 1024 是单个事务中可以查看的最大元组数。 3600 — 完整扫描时间(以秒为单位)。

请注意,该示例使用相同的索引进行爬网和删除。 如果这是一个树索引,则从较小的键到较大的键进行遍历。 如果存在其他一些索引,例如散列索引,则通常以随机顺序进行遍历。 所有空间元组都在一次扫描中被扫描。

让我们向空间中插入几个生命周期为 60 秒的元组:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

我们来检查一下是否插入成功:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

让我们在 60 秒以上(从第一个元组插入开始算起)后重复选择,看看 capped expiryd 模块已经处理完毕:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

让我们停止任务:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

让我们看第二个示例,其中使用单独的索引进行爬网:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

这里的所有内容都与第一个示例中的相同,但有一些例外。 我们在第三个字段之上构建一个树索引并将其称为 exp。 与称为主索引的索引不同,该索引不必是唯一的。 遍历将通过exp索引进行,删除则通过primary进行。 我们记得以前两者都是仅使用主索引完成的。

准备工作完成后,我们使用新参数运行启动函数:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

让我们再次将几个元组插入到该空间中,生命周期为 60 秒:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

30秒后,以此类推,我们再添加几个元组:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

我们来检查一下是否插入成功:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

让我们在 60 秒以上(从第一个元组插入开始算起)后重复选择,看看 capped expiryd 模块已经处理完毕:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

空间中还剩下一些元组,它们还有大约 30 秒的生存时间。 此外,当从 ID 为 2、生命周期为 1576421257 的元组移动到 ID 为 3、生命周期为 1576421287 的元组时,扫描停止。由于以下顺序,生命周期为 1576421287 或更长的元组不会被扫描。 exp 索引键。 这是我们一开始就想实现的节省。

让我们停止任务:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

履行

讲述项目所有功能的最佳方式是其原始来源。 ! 作为出版物的一部分,我们将只关注最重要的一点,即空间旁路算法。

我们传递给start方法的参数存储在一个名为expirationd_task的结构中:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

name 属性是任务的名称。 space_id 属性是空间标识符。 rm_index_id 属性是删除元组的唯一索引的标识符。 it_index_id 属性是遍历元组的索引的标识符。 it_index_type 属性是遍历元组的索引类型。 fields_no 属性是具有生存期的元组字段的编号。 scan_size 属性是一次事务中扫描的最大元组数。 scan_time 属性是以秒为单位的完整扫描时间。

我们不会考虑解析参数。 这是一项艰苦但简单的工作,图书馆将帮助您 消息包。 仅当索引从 Lua 作为 mp_map 类型的复杂数据结构传递时才会出现困难,而不是使用简单类型 mp_bool、mp_double、mp_int、mp_uint 和 mp_array。 但不需要解析整个索引。 您只需要检查其唯一性,计算类型并提取标识符。

我们列出了用于解析的所有函数的原型:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

现在让我们继续讨论最重要的事情 - 绕过空间和删除元组的逻辑。 在单个事务下扫描和修改不大于 scan_size 的每个元组块。 如果成功,则提交该事务;如果发生错误,则回滚。 expirationd_iterate 函数的最后一个参数是一个指向迭代器的指针,扫描从该迭代器开始或继续。 该迭代器在内部递增,直到发生错误、空间耗尽或无法提前停止进程。 函数expirationd_expired 检查元组的生命周期,expirationd_delete 删除元组,expirationd_breakable 检查是否需要继续。

Expirationd_iterate函数代码:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

函数代码expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete函数代码:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Expirationd_breakable函数代码:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

应用

您可以在以下位置查看源代码 这里!

来源: habr.com

添加评论