Пишемо свій capped expirationd модуль для tarantool

Пишемо свій capped expirationd модуль для tarantool

Якийсь час тому перед нами постала проблема чищення кортежів у спейсах tarantool. Чистку потрібно було запускати не тоді, коли у tarantool вже закінчувалася пам'ять, а заздалегідь і з певною періодичністю. Для цього завдання в tarantool є модуль, написаний на Lua під назвою expirationd. Після нетривалого використання цього модуля ми зрозуміли, що нам він не підходить: на постійних чистках великих обсягів даних Lua висів у GC. Тому ми задумалися про розробку свого capped expirationd модуля, сподіваючись, що код, написаний нативною мовою програмування, вирішить наші завдання якнайкраще.

Хорошим прикладом нам послужив модуль tarantool під назвою memcached. Використовуваний у ньому підхід полягає в тому, що у спейсі заводиться окреме полі, у якому вказується час життя кортежу, інакше кажучи, ttl. Модуль у фоні сканує спейс, порівнює ttl з поточним часом і приймає рішення про видалення кортежу чи ні. Код модуля memcached простий та елегантний, але надто загальний. По-перше, він не враховує тип індексу, яким здійснюється обхід і видалення. По-друге, на кожному проході скануються всі кортежі, кількість яких може бути досить великою. І якщо в модулі expirationd перша проблема була вирішена (tree-індекс виділено в окремий клас), то другий все так само не приділено жодної уваги. Ці три пункти визначили вибір на користь написання коду.

Опис

У документації до tarantool є дуже гарний туторіал про те, як писати свої процедури, що зберігаються на мові C. В першу чергу пропоную ознайомитися з ним, щоб розуміти ті вставки з командами і кодом, які будуть зустрічатися нижче. Також варто звернути увагу на референс до об'єктів, які доступні при написанні свого власного capped модуля, а саме на ящик, волокно, індекс и txn.

Давайте почнемо здалеку і подивимося на те, як виглядає capped expirationd модуль зовні:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Для простоти запускаємо tarantool у каталозі, в якому знаходиться наша бібліотека libcapped-expirationd.so. З бібліотеки експортуються дві функції: start та kill. Спочатку потрібно зробити ці функції доступними з Lua за допомогою box.schema.func.create і box.schema.user.grant. Потім створити спейс, кортежі якого матимуть лише три поля: перше — це унікальний ідентифікатор, друге — електронна пошта, третє — час життя кортежу. Поверх першого поля будуємо tree-індекс і називаємо його primary. Далі отримуємо об'єкт підключення до нашої бібліотеки.

Після підготовчих робіт запускаємо функцію start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Цей приклад буде працювати при скануванні так само, як і модуль expirationd, який написаний на Lua. Першим аргументом на функцію start передається унікальне ім'я таска. Другим – ідентифікатор спейсу. Третім — унікальний індекс, яким буде здійснюватися видалення кортежів. Четвертим — індекс, за яким здійснюватиметься обхід кортежів. П'ятим – номер поля кортежу з часом життя (нумерація йде від 1, а не 0!). Шостим та сьомим – налаштування сканування. 1024 - це максимальна кількість кортежів, що проглядається в рамках однієї транзакції. 3600 – час повного сканування в секундах.

Зверніть увагу, що для обходу та видалення прикладу використовується один і той же індекс. Якщо це tree-індекс, то обхід здійснюється від меншого ключа до більшого. Якщо якийсь інший, наприклад, hash-індекс, то обхід здійснюється, як правило, у довільному порядку. За одне сканування проглядаються усі кортежі спейсу.

Давайте зробимо вставку в спейс кількох кортежів із часом життя 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Перевіримо, що вставка пройшла успішно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Повторимо select через 60+ секунд (вважаємо від початку вставки першого кортежу) і побачимо, що модуль capped expirationd вже відпрацював:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Зупинимо таск:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Давайте розглянемо другий приклад, коли для обходу використовується окремий індекс:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Тут все те саме, що й у першому прикладі, за малим винятком. Поверх третього поля будуємо tree-індекс і називаємо його exp. Цей індекс може бути унікальним, на відміну індексу під назвою primary. Обхід буде здійснюватися за exp індексом, а видалення за першим. Ми пам'ятаємо, що раніше й те, й інше робилося лише з використанням значного індексу.

Після підготовчих робіт запускаємо функцію start із новими аргументами:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Знову зробимо вставку в спейс кількох кортежів із часом життя 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Через 30 секунд за аналогією додамо ще кілька кортежів:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Перевіримо, що вставка пройшла успішно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Повторимо select через 60+ секунд (вважаємо від початку вставки першого кортежу) і побачимо, що модуль capped expirationd вже відпрацював:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

У спейсі залишилися кортежі, яким житиме ще приблизно 30 секунд. Більш того, сканування зупинилося при переході від кортежу з ідентифікатором 2 і часом життя 1576421257 до кортежу з ідентифікатором 3 і часом життя 1576421287. Кортежі з часом життя 1576421287 і більше переглянуті не були за рахунок Це і є та економія, якої ми хотіли досягти на самому початку.

Зупинимо таск:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реалізація

Найкраще про всі особливості проекту завжди розповість його вихідний код! У рамках публікації ми зупинимося лише на найважливіших моментах, а саме на алгоритмах обходу спейсу.

Аргументи, які ми передаємо у метод start, зберігаються у структурі під назвою expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Атрибут name це ім'я тяга. Атрибут space_id є ідентифікатором спейсу. Атрибут rm_index_id — ідентифікатор унікального індексу, яким буде здійснюватися видалення кортежів. Атрибут it_index_id — ідентифікатор індексу, яким буде здійснюватися обхід кортежів. Атрибут it_index_type — тип індексу, яким буде здійснюватися обхід кортежів. Атрибут filed_no – номер поля кортежу з часом життя. Атрибут scan_size - максимальна кількість кортежів, що проглядається в рамках однієї транзакції. Атрибут scan_time – час повного сканування за секунди.

Парсинг аргументів не розглядатимемо. Це копітка, але нескладна робота, в якій допоможе бібліотека. msgpuck. Труднощі можуть виникнути лише з індексами, які передаються з Lua у вигляді складної структури даних з типом mp_map, а не за допомогою простих типів mp_bool, mp_double, mp_int, mp_uint та mp_array. Але весь індекс ширяти і не потрібно. Достатньо лише перевірити його унікальність, обчислити тип та отримати ідентифікатор.

Перелічимо прототипи всіх функцій, які використовуються для парсингу:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

А тепер перейдемо до найголовнішого – до логіки обходу спейсу та видалення кортежів. Кожен блок кортежів розміром трохи більше scan_size проглядається і змінюється під однією транзакцією. У разі успіху ця транзакція комітується, у разі помилки відкочується. Останнім аргументом у функцію expirationd_iterate передається покажчик на ітератор, з якого починається або продовжується сканування. Цей ітератор інкрементується всередині поки не станеться помилка, не закінчиться спейс, або не буде можливості заздалегідь зупинити процес. Функція expirationd_expired перевіряє час життя кортежу, expirationd_delete – видаляє кортеж, expirationd_breakable – перевіряє, чи нам треба рухатися далі.

Код функції expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Код функції expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Код функції expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Код функції expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

додаток

Ознайомитися з вихідним кодом можна на тут!

Джерело: habr.com

Додати коментар або відгук