Пишем свой capped expirationd модуль для tarantool

Пишем свой capped expirationd модуль для tarantool

Какое-то время назад перед нами встала проблема чистки кортежей в спейсах tarantool. Чистку нужно было запускать не тогда, когда у tarantool уже заканчивалась память, а заранее и с определенной периодичностью. Для этой задачи в tarantool есть модуль, написанный на Lua, под названием expirationd. После непродолжительного использования этого модуля мы поняли, что нам он не подходит: на постоянных чистках больших объемов данных Lua висел в GC. Поэтому мы задумались о разработке своего capped expirationd модуля, надеясь, что код, написанный на нативном языке программирования, решит наши задачи наилучшим образом.

Хорошим примером нам послужил модуль tarantool под названием memcached. Используемый в нем подход основывается на том, что в спейсе заводится отдельное поле, в котором указывается время жизни кортежа, иными словами, ttl. Модуль в фоне сканирует спейс, сравнивает ttl с текущим временем и принимает решение о том, удалять кортеж или нет. Код модуля memcached простой и элегантный, но слишком общий. Во-первых, он не учитывает тип индекса, по которому осуществляется обход и удаление. Во-вторых, на каждом проходе сканируются все кортежи, количество которых может быть довольно большим. И если в модуле expirationd первая проблема была решена (tree-индекс выделен в отдельный класс), то второй все так же не уделено никакого внимания. Эти три пункта предопределили выбор в пользу написания своего кода.

Описание

В документации к tarantool есть очень хороший туториал о том, как писать свои хранимые процедуры на языке C. В первую очередь предлагаю ознакомиться с ним, чтобы понимать те вставки с командами и кодом, которые будут встречаться ниже. Также стоит обратить внимание на референс к объектам, которые доступны при написании своего собственного capped модуля, а именно на box, fiber, index и txn.

Давайте начнем издалека и посмотрим на то, как выглядит capped expirationd модуль снаружи:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Для простоты запускаем tarantool в каталоге, в котором находится наша библиотека libcapped-expirationd.so. Из библиотеки экспортируются две функции: start и kill. Вначале самом необходимо сделать эти функции доступными из Lua с помощью box.schema.func.create и box.schema.user.grant. Затем создать спейс, кортежи которого будут содержать всего три поля: первое — это уникальный идентификатор, второе — электронная почта, третье — время жизни кортежа. Поверх первого поля строим tree-индекс и называем его primary. Далее получаем объект подключения к нашей нативной библиотеке.

После подготовительных работ запускаем функцию start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Этот пример будет работать при сканировании ровно также, как и модуль expirationd, который написан на Lua. Первым аргументом в функцию start передается уникальное имя таска. Вторым — идентификатор спейса. Третьим — уникальный индекс, по которому будет осуществляться удаление кортежей. Четвертым — индекс, по которому будет осуществляться обход кортежей. Пятым — номер поля кортежа с временем жизни (нумерация идет от 1, а не 0!). Шестым и седьмым — настройки сканирования. 1024 — это максимальное количество кортежей, которое просматривается в рамках одной транзакции. 3600 — время полного сканирования в секундах.

Обратите внимание, что для обхода и удаления в примере используется один и тот же индекс. Если это tree-индекс, то обход осуществляется от меньшего ключа к большему. Если какой-то другой, например, hash-индекс, то обход осуществляется, как правило, в произвольном порядке. За одно сканирование просматриваются все кортежи спейса.

Давайте сделаем вставку в спейс нескольких кортежей с временем жизни 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Проверим, что вставка прошла успешно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Повторим select через 60+ секунд (считаем от начала вставки первого кортежа) и увидим, что модуль capped expirationd уже отработал:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Остановим таск:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Давайте рассмотрим второй пример, когда для обхода используется отдельный индекс:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Здесь все то же самое, что и в первом примере, за малым исключением. Поверх третьего поля строим tree-индекс и называем его exp. Этот индекс не обязан быть уникальным, в отличие от индекса под названием primary. Обход будет осуществляться по exp индексу, а удаление по primary. Мы помним, что ранее и то, и другое делалось только с использованием primary индекса.

После подготовительных работ запускаем функцию start с новыми аргументами:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Снова сделаем вставку в спейс нескольких кортежей с временем жизни 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Через 30 секунд по аналогии добавим еще несколько кортежей:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Проверим, что вставка прошла успешно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Повторим select через 60+ секунд (считаем от начала вставки первого кортежа) и увидим, что модуль capped expirationd уже отработал:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

В спейсе остались кортежи, которым жить еще примерно 30 секунд. Более того, сканирование остановилось при переходе от кортежа с идентификатором 2 и временем жизни 1576421257 к кортежу с идентификатором 3 и временем жизни 1576421287. Кортежи с временем жизни 1576421287 и более просмотрены не были за счет упорядоченности ключей exp индекса. Эта и есть та экономия, которой мы хотели добиться в самом начале.

Остановим таск:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Лучше всего о всех особенностях проекта всегда расскажет его исходный код! В рамках публикации мы остановимся лишь на самых важных моментах, а именно, на алгоритмах обхода спейса.

Аргументы, которые мы передаем в метод start, сохраняются в структуре под названием expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Атрибут name — это имя таска. Атрибут space_id — идентификатор спейса. Атрибут rm_index_id — идентификатор уникального индекса, по которому будет осуществляться удаление кортежей. Атрибут it_index_id — идентификатор индекса, по которому будет осуществляться обход кортежей. Атрибут it_index_type — тип индекса, по которому будет осуществляться обход кортежей. Атрибут filed_no — номер поля кортежа с временем жизни. Атрибут scan_size — максимальное количество кортежей, которое просматривается в рамках одной транзакции. Атрибут scan_time — время полного сканирования в секундах.

Парсинг аргументов рассматривать не будем. Это кропотливая, но несложная работа, в которой вам поможет библиотека msgpuck. Трудности могут возникнуть лишь с индексами, которые передаются из Lua в виде сложной структуры данных с типом mp_map, а не с помощь простых типов mp_bool, mp_double, mp_int, mp_uint и mp_array. Но весь индекс парсить и не нужно. Достаточно лишь проверить его уникальность, вычислить тип и извлечь идентификатор.

Перечислим прототипы всех функций, которые используются для парсинга:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

А теперь перейдем к самому главному — к логике обхода спейса и удаления кортежей. Каждый блок кортежей размером не более scan_size просматривается и изменяется под одной транзакцией. В случае успеха эта транзакция коммитится, в случае ошибки — откатывается. Последним аргументом в функцию expirationd_iterate передается указатель на итератор, с которого начинается или продолжается сканирование. Этот итератор инкрементируется внутри пока не произойдет ошибка, не закончится спейс, либо не представится возможности заранее остановить процесс. Функция expirationd_expired проверяет время жизни кортежа, expirationd_delete — удаляет кортеж, expirationd_breakable — проверяет, нужно ли нам двигаться дальше.

Код функции expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Код функции expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Код фукнции expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Код функции expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Приложение

Ознакомиться с исходным кодом можно на тут!

Источник: habr.com

Добавить комментарий