Пішам свой capped expirationd модуль для tarantool

Пішам свой capped expirationd модуль для tarantool

Нейкі час таму перад намі паўстала праблема чысткі картэжаў у спейсах. tarantool. Чыстку трэба было запускаць не тады, калі ў tarantool ужо сканчалася памяць, а загадзя і з вызначанай перыядычнасцю. Для гэтай задачы ў tarantool ёсць модуль, напісаны на Lua, пад назвай expirationd. Пасля непрацяглага выкарыстання гэтага модуля мы зразумелі, што нам ён не падыходзіць: на сталых чыстках вялікіх аб'ёмаў дадзеных Lua вісеў у GC. Таму мы задумаліся аб распрацоўцы свайго capped expirationd модуля, спадзяючыся, што код, напісаны на натыўнай мове праграмавання, вырашыць нашы задачы найлепшай выявай.

Добрым прыкладам нам паслужыў модуль tarantool пад назовам memcached. Выкарыстоўваны ў ім падыход засноўваецца на тым, што ў спейсе заводзіцца асобнае поле, у якім паказваецца час жыцця картэжа, іншымі словамі, ttl. Модуль у фоне скануе спейс, параўноўвае ttl з бягучым часам і прымае рашэнне аб тым, выдаляць картэж ці не. Код модуля memcached просты і элегантны, але занадта агульны. Па-першае, ён не ўлічвае тып азначніка, па якім ажыццяўляецца абыход і выдаленне. Па-другое, на кожным праходзе скануюцца ўсе картэжы, колькасць якіх можа быць даволі вялікай. І калі ў модулі expirationd першая праблема была вырашана (tree-індэкс выдзелены ў асобны клас), то другі ўсё гэтак жа не нададзена ніякай увагі. Гэтыя тры пункты абумовілі выбар на карысць напісання свайго кода.

Апісанне

У дакументацыі да tarantool ёсць вельмі добры тутарыял аб тым, як пісаць свае захоўваемыя працэдуры на мове C. У першую чаргу прапаную азнаёміцца ​​з ім, каб разумець тыя ўстаўкі з камандамі і кодам, якія будуць сустракацца ніжэй. Таксама варта звярнуць увагу на рэферэнс да аб'ектаў, якія даступныя пры напісанні свайго ўласнага capped модуля, а менавіта на скрынка, валакно, індэкс и txn.

Давайце пачнем здалёк і паглядзім на тое, як выглядае capped expirationd модуль звонку:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Для прастаты запускаем tarantool у каталогу, у якім знаходзіцца наша бібліятэка libcapped-expirationd.so. З бібліятэкі экспартуюцца дзве функцыі: start і kill. Спачатку самім неабходна зрабіць гэтыя функцыі даступнымі з Lua з дапамогай box.schema.func.create і box.schema.user.grant. Затым стварыць спейс, картэжы якога будуць утрымоўваць усяго тры палі: першае - гэта ўнікальны ідэнтыфікатар, другое - электронная пошта, трэцяе - час жыцця картэжа. Па-над першым полем будуем tree-індэкс і завем яго primary. Далей атрымліваем аб'ект падключэння да нашай натыўнай бібліятэкі.

Пасля падрыхтоўчых прац запускаем функцыю start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Гэты прыклад будзе працаваць пры сканаванні роўна таксама, як і модуль expirationd, які напісаны на Lua. Першым аргументам у функцыю start перадаецца ўнікальнае імя цяга. Другім - ідэнтыфікатар спейсу. Трэцім - унікальны індэкс, па якім будзе ажыццяўляцца выдаленне картэжаў. Чацвёртым - індэкс, па якім будзе ажыццяўляцца абыход картэжаў. Пятым - нумар поля картэжа з часам жыцця (нумарацыя ідзе ад 1, а не 0!). Шостым і сёмым - налады сканавання. 1024 - гэта максімальная колькасць картэжаў, якое праглядаецца ў рамках адной транзакцыі. 3600 - час поўнага сканавання ў секундах.

Звярніце ўвагу, што для абыходу і выдаленні ў прыкладзе выкарыстоўваецца адзін і той жа індэкс. Калі гэта tree-індэкс, то абыход ажыццяўляецца ад меншага ключа да большага. Калі нейкі іншы, напрыклад, hash-індэкс, то абыход ажыццяўляецца, як правіла, у адвольным парадку. За адно сканіраванне праглядаюцца ўсе картэжы спейсу.

Давайце зробім устаўку ў спейс некалькіх картэжаў з часам жыцця 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Праверым, што ўстаўка прайшла паспяхова:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Паўторым select праз 60+ секунд (лічым ад пачатку ўстаўкі першага картэжа) і ўбачым, што модуль capped expirationd ужо адпрацаваў:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Спынім таск:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Давайце разгледзім другі прыклад, калі для абыходу выкарыстоўваецца асобны азначнік:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Тут усё тое самае, што і ў першым прыкладзе, за малым выключэннем. Па-над трэцім полем будуем tree-індэкс і завем яго exp. Гэты індэкс не павінен быць унікальным, у адрозненне ад індэкса пад назовам primary. Абыход будзе ажыццяўляцца па exp індэксу, а выдаленне па primary. Мы памятаем, што раней і тое, і іншае рабілася толькі з выкарыстаннем primary азначніка.

Пасля падрыхтоўчых прац запускаем функцыю start з новымі аргументамі:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Зноў зробім устаўку ў спейс некалькіх картэжаў з часам жыцця 60 секунд:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Праз 30 секунд па аналогіі дадамо яшчэ некалькі картэжаў:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Праверым, што ўстаўка прайшла паспяхова:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Паўторым select праз 60+ секунд (лічым ад пачатку ўстаўкі першага картэжа) і ўбачым, што модуль capped expirationd ужо адпрацаваў:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

У спейсе засталіся картэжы, якім жыць яшчэ прыкладна 30 секунд. Больш таго, сканаванне спынілася пры пераходзе ад картэжа з ідэнтыфікатарам 2 і часам жыцця 1576421257 да картэжа з ідэнтыфікатарам 3 і часам жыцця 1576421287. Картэжы з часам жыцця 1576421287 і больш прагледжаны не былі за рахунак у. Гэта і ёсць тая эканомія, якой мы хацелі дабіцца ў самым пачатку.

Спынім таск:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Рэалізацыя

Лепш за ўсё пра ўсе асаблівасці праекту заўсёды распавядзе яго зыходны код! У рамках публікацыі мы спынімся толькі на найважнейшых момантах, а менавіта, на алгарытмах абыходу спейсу.

Аргументы, якія мы перадаем у метад start, захоўваюцца ў структуры пад назовам expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Атрыбут name - гэта імя цяга. Атрыбут space_id - ідэнтыфікатар спейсу. Атрыбут rm_index_id - ідэнтыфікатар унікальнага індэкса, па якім будзе ажыццяўляцца выдаленне картэжаў. Атрыбут it_index_id - ідэнтыфікатар індэкса, па якім будзе ажыццяўляцца абыход картэжаў. Атрыбут it_index_type - тып індэкса, па якім будзе ажыццяўляцца абыход картэжаў. Атрыбут filed_no - нумар поля картэжа з часам жыцця. Атрыбут scan_size - максімальная колькасць картэжаў, якое праглядаецца ў рамках адной транзакцыі. Атрыбут scan_time - час поўнага сканавання ў секундах.

Парсінг аргументаў разглядаць не будзем. Гэта карпатлівая, але нескладаная праца, у якой вам дапаможа бібліятэка msgpuck. Цяжкасці могуць узнікнуць толькі з індэксамі, якія перадаюцца з Lua у выглядзе складанай структуры дадзеных з тыпам mp_map, а не з дапамогай простых тыпаў mp_bool, mp_double, mp_int, mp_uint і mp_array. Але ўвесь індэкс парсіць і не трэба. Дастаткова толькі праверыць яго ўнікальнасць, вылічыць тып і атрымаць ідэнтыфікатар.

Пералічоны прататыпы ўсіх функцый, якія выкарыстоўваюцца для парсінгу:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

А зараз пяройдзем да самага галоўнага - да логікі абыходу спейсу і выдаленні картэжаў. Кожны блок картэжаў памерам не больш за scan_size праглядаецца і змяняецца пад адной транзакцыяй. У выпадку поспеху гэтая транзакцыя каміціцца, у выпадку памылкі - адкочваецца. Апошнім аргументам у функцыю expirationd_iterate перадаецца паказальнік на ітэратар, з якога пачынаецца ці працягваецца сканаванне. Гэты ітэратар інкрыментуецца ўсярэдзіне пакуль не адбудзецца памылка, не скончыцца спейс, альбо не прадставіцца магчымасці загадзя спыніць працэс. Функцыя expirationd_expired правярае час жыцця картэжа, expirationd_delete - выдаляе картэж, expirationd_breakable - правярае, ці трэба нам рухацца далей.

Код функцыі expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Код функцыі expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Код функцыі expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Код функцыі expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Дадатак

Азнаёміцца ​​з зыходным кодам можна на тут!

Крыніца: habr.com

Дадаць каментар