Нейкі час таму перад намі паўстала праблема чысткі картэжаў у спейсах.
Добрым прыкладам нам паслужыў модуль tarantool пад назовам
Апісанне
У дакументацыі да tarantool ёсць вельмі добры
Давайце пачнем здалёк і паглядзім на тое, як выглядае capped expirationd модуль звонку:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Для прастаты запускаем tarantool у каталогу, у якім знаходзіцца наша бібліятэка libcapped-expirationd.so. З бібліятэкі экспартуюцца дзве функцыі: start і kill. Спачатку самім неабходна зрабіць гэтыя функцыі даступнымі з Lua з дапамогай box.schema.func.create і box.schema.user.grant. Затым стварыць спейс, картэжы якога будуць утрымоўваць усяго тры палі: першае - гэта ўнікальны ідэнтыфікатар, другое - электронная пошта, трэцяе - час жыцця картэжа. Па-над першым полем будуем tree-індэкс і завем яго primary. Далей атрымліваем аб'ект падключэння да нашай натыўнай бібліятэкі.
Пасля падрыхтоўчых прац запускаем функцыю start:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Гэты прыклад будзе працаваць пры сканаванні роўна таксама, як і модуль expirationd, які напісаны на Lua. Першым аргументам у функцыю start перадаецца ўнікальнае імя цяга. Другім - ідэнтыфікатар спейсу. Трэцім - унікальны індэкс, па якім будзе ажыццяўляцца выдаленне картэжаў. Чацвёртым - індэкс, па якім будзе ажыццяўляцца абыход картэжаў. Пятым - нумар поля картэжа з часам жыцця (нумарацыя ідзе ад 1, а не 0!). Шостым і сёмым - налады сканавання. 1024 - гэта максімальная колькасць картэжаў, якое праглядаецца ў рамках адной транзакцыі. 3600 - час поўнага сканавання ў секундах.
Звярніце ўвагу, што для абыходу і выдаленні ў прыкладзе выкарыстоўваецца адзін і той жа індэкс. Калі гэта tree-індэкс, то абыход ажыццяўляецца ад меншага ключа да большага. Калі нейкі іншы, напрыклад, hash-індэкс, то абыход ажыццяўляецца, як правіла, у адвольным парадку. За адно сканіраванне праглядаюцца ўсе картэжы спейсу.
Давайце зробім устаўку ў спейс некалькіх картэжаў з часам жыцця 60 секунд:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Праверым, што ўстаўка прайшла паспяхова:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Паўторым select праз 60+ секунд (лічым ад пачатку ўстаўкі першага картэжа) і ўбачым, што модуль capped expirationd ужо адпрацаваў:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Спынім таск:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Давайце разгледзім другі прыклад, калі для абыходу выкарыстоўваецца асобны азначнік:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Тут усё тое самае, што і ў першым прыкладзе, за малым выключэннем. Па-над трэцім полем будуем tree-індэкс і завем яго exp. Гэты індэкс не павінен быць унікальным, у адрозненне ад індэкса пад назовам primary. Абыход будзе ажыццяўляцца па exp індэксу, а выдаленне па primary. Мы памятаем, што раней і тое, і іншае рабілася толькі з выкарыстаннем primary азначніка.
Пасля падрыхтоўчых прац запускаем функцыю start з новымі аргументамі:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Зноў зробім устаўку ў спейс некалькіх картэжаў з часам жыцця 60 секунд:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Праз 30 секунд па аналогіі дадамо яшчэ некалькі картэжаў:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Праверым, што ўстаўка прайшла паспяхова:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Паўторым select праз 60+ секунд (лічым ад пачатку ўстаўкі першага картэжа) і ўбачым, што модуль capped expirationd ужо адпрацаваў:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
У спейсе засталіся картэжы, якім жыць яшчэ прыкладна 30 секунд. Больш таго, сканаванне спынілася пры пераходзе ад картэжа з ідэнтыфікатарам 2 і часам жыцця 1576421257 да картэжа з ідэнтыфікатарам 3 і часам жыцця 1576421287. Картэжы з часам жыцця 1576421287 і больш прагледжаны не былі за рахунак у. Гэта і ёсць тая эканомія, якой мы хацелі дабіцца ў самым пачатку.
Спынім таск:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Рэалізацыя
Лепш за ўсё пра ўсе асаблівасці праекту заўсёды распавядзе яго зыходны
Аргументы, якія мы перадаем у метад start, захоўваюцца ў структуры пад назовам expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Атрыбут name - гэта імя цяга. Атрыбут space_id - ідэнтыфікатар спейсу. Атрыбут rm_index_id - ідэнтыфікатар унікальнага індэкса, па якім будзе ажыццяўляцца выдаленне картэжаў. Атрыбут it_index_id - ідэнтыфікатар індэкса, па якім будзе ажыццяўляцца абыход картэжаў. Атрыбут it_index_type - тып індэкса, па якім будзе ажыццяўляцца абыход картэжаў. Атрыбут filed_no - нумар поля картэжа з часам жыцця. Атрыбут scan_size - максімальная колькасць картэжаў, якое праглядаецца ў рамках адной транзакцыі. Атрыбут scan_time - час поўнага сканавання ў секундах.
Парсінг аргументаў разглядаць не будзем. Гэта карпатлівая, але нескладаная праца, у якой вам дапаможа бібліятэка
Пералічоны прататыпы ўсіх функцый, якія выкарыстоўваюцца для парсінгу:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
А зараз пяройдзем да самага галоўнага - да логікі абыходу спейсу і выдаленні картэжаў. Кожны блок картэжаў памерам не больш за scan_size праглядаецца і змяняецца пад адной транзакцыяй. У выпадку поспеху гэтая транзакцыя каміціцца, у выпадку памылкі - адкочваецца. Апошнім аргументам у функцыю expirationd_iterate перадаецца паказальнік на ітэратар, з якога пачынаецца ці працягваецца сканаванне. Гэты ітэратар інкрыментуецца ўсярэдзіне пакуль не адбудзецца памылка, не скончыцца спейс, альбо не прадставіцца магчымасці загадзя спыніць працэс. Функцыя expirationd_expired правярае час жыцця картэжа, expirationd_delete - выдаляе картэж, expirationd_breakable - правярае, ці трэба нам рухацца далей.
Код функцыі expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Код функцыі expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Код функцыі expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Код функцыі expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Дадатак
Азнаёміцца з зыходным кодам можна на
Крыніца: habr.com