Пишување на нашиот сопствен модул со истечен рок на траење за тарантул

Пишување на нашиот сопствен модул со истечен рок на траење за тарантул

Пред извесно време се соочивме со проблемот на чистење торки во простори тарантол. Чистењето мораше да се започне не кога тарантулата веќе беше без меморија, туку однапред и со одредена фреквенција. За оваа задача, tarantool има модул напишан на Lua наречен Истек. Откако го користевме овој модул кратко време, сфативме дека не ни одговара: поради постојаното чистење на големи количини на податоци, Луа висеше во GC. Затоа, размислувавме да развиеме сопствен модул со истечен рок, надевајќи се дека кодот напишан на мајчин програмски јазик ќе ги реши нашите проблеми на најдобар можен начин.

Добар пример за нас беше модулот tarantool наречен мемориран. Пристапот што се користи во него се заснова на фактот дека во просторот се создава посебно поле, што го означува животниот век на торката, со други зборови, ttl. Модулот во позадина го скенира просторот, го споредува TTL со моменталното време и одлучува дали да ја избрише торката или не. Кодот на мемкешираниот модул е ​​едноставен и елегантен, но премногу генерички. Прво, не го зема предвид типот на индекс што се индексира и брише. Второ, на секој премин се скенираат сите торки, чиј број може да биде доста голем. И ако во истечениот модул беше решен првиот проблем (индексот на дрвото беше поделен во посебна класа), тогаш вториот сè уште не доби никакво внимание. Овие три точки го предодредени изборот во корист на пишување на мојот сопствен код.

Опис

Документацијата за тарантот има многу добра упатство За тоа како да ги напишете вашите зачувани процедури во C. Прво на сите, предлагам да се запознаете со тоа за да ги разберете тие инсерти со команди и код што ќе се појави подолу. Исто така, вреди да се обрне внимание референца на објекти кои се достапни при пишување на ваш сопствен модул со ограничување, имено кутија, влакна, индекс и txn.

Да почнеме од далеку и да погледнеме како изгледа однадвор ограничен истечен модул:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

За едноставност, стартуваме tarantool во директориумот каде што се наоѓа нашата библиотека libcapped-expirationd.so. Две функции се извезуваат од библиотеката: стартувај и убиј. Првиот чекор е да ги направите овие функции достапни од Lua користејќи box.schema.func.create и box.schema.user.grant. Потоа креирајте простор чии торки ќе содржат само три полиња: првото е единствен идентификатор, второто е е-пошта и третото е животниот век на торката. Ние градиме индекс на дрво на врвот на првото поле и го нарекуваме примарен. Следно, го добиваме објектот за поврзување со нашата матична библиотека.

По подготвителната работа, извршете ја функцијата за почеток:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Овој пример ќе работи за време на скенирањето точно исто како и истечениот модул, кој е напишан на Луа. Првиот аргумент на функцијата старт е единственото име на задачата. Вториот е идентификаторот на просторот. Третиот е единствен индекс со кој ќе се бришат торките. Четвртиот е индексот со кој ќе се поминуваат торките. Петтиот е бројот на полето за торба со животен век (нумерирањето започнува од 1, а не од 0!). Шестиот и седмиот се поставките за скенирање. 1024 е максималниот број на множества што може да се видат во една трансакција. 3600 — целосно време на скенирање во секунди.

Забележете дека примерот го користи истиот индекс за индексирање и бришење. Ако ова е индекс на дрво, тогаш преминувањето се врши од помалиот клуч до поголемиот. Ако има некој друг, на пример, хаш индекс, тогаш преминувањето се врши, по правило, по случаен редослед. Сите вселенски множества се скенираат во едно скенирање.

Ајде да вметнеме неколку торки во просторот со животен век од 60 секунди:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Ајде да провериме дали вметнувањето е успешно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ајде да го повториме изборот по 60+ секунди (сметајќи од почетокот на вметнувањето на првата торка) и да видиме дека ограничениот истечен модул е ​​веќе обработен:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Ајде да ја прекинеме задачата:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Ајде да погледнеме во вториот пример каде што се користи посебен индекс за индексирање:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Сè овде е исто како во првиот пример, со неколку исклучоци. Ние градиме индекс на дрво на врвот на третото поле и го нарекуваме exp. Овој индекс не мора да биде единствен, за разлика од индексот наречен примарен. Преминувањето ќе се врши според индексот exp, а бришењето по примарниот. Се сеќаваме дека претходно и двете беа направени само со користење на примарниот индекс.

По подготвителната работа, ја извршуваме функцијата за почеток со нови аргументи:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Ајде повторно да вметнеме неколку торки во просторот со животен век од 60 секунди:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

По 30 секунди, по аналогија, ќе додадеме уште неколку торки:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Ајде да провериме дали вметнувањето е успешно:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ајде да го повториме изборот по 60+ секунди (сметајќи од почетокот на вметнувањето на првата торка) и да видиме дека ограничениот истечен модул е ​​веќе обработен:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Остануваат уште неколку торки во просторот на кои им остануваат уште 30 секунди живот. Покрај тоа, скенирањето престана кога се преместуваше од торка со ID 2 и животен век од 1576421257 на торка со ID 3 и животен век 1576421287. копчињата на индексот exp. Ова е заштедата што сакавме да ја постигнеме на самиот почеток.

Ајде да ја прекинеме задачата:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Најдобар начин да се каже за сите карактеристики на проектот е неговиот оригинален извор. код! Како дел од публикацијата, ќе се фокусираме само на најважните точки, имено, алгоритмите за бајпас на просторот.

Аргументите што ги предаваме на методот за почеток се зачувани во структура наречена expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Атрибутот име е името на задачата. Атрибутот space_id е идентификаторот на просторот. Атрибутот rm_index_id е идентификатор на уникатниот индекс со кој ќе се бришат торките. Атрибутот it_index_id е идентификатор на индексот со кој ќе се поминуваат торките. Атрибутот it_index_type е тип на индекс со кој ќе се поминуваат торките. Атрибутот filed_no е бројот на полето за множество со животен век. Атрибутот scan_size е максималниот број на множества што се скенираат во една трансакција. Атрибутот scan_time е целото време на скенирање во секунди.

Ние нема да разгледуваме парсирање на аргументи. Ова е макотрпна, но едноставна работа, со која библиотеката ќе ви помогне msgpuck. Тешкотиите можат да се појават само со индекси што се пренесуваат од Луа како сложена структура на податоци со типот mp_map, а не користејќи ги едноставните типови mp_bool, mp_double, mp_int, mp_uint и mp_array. Но, нема потреба да се анализира целиот индекс. Треба само да ја проверите неговата уникатност, да го пресметате типот и да го извлечете идентификаторот.

Ги наведуваме прототиповите на сите функции што се користат за парсирање:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Сега да се преселиме во најважната работа - логиката на заобиколување на просторот и бришење на топили. Секој блок на топилци не е поголем од Scan_size не се скенира и се менува под една трансакција. Доколку е успешна, оваа трансакција е извршена; ако се појави грешка, таа се врати назад. Последниот аргумент за функцијата на истекување на истекот е покажувач на итераторот од кој започнува или продолжува скенирањето. Овој итератор се зголемува внатрешно додека не се појави грешка, просторот истекува или не е можно да се запре процесот однапред. Функцијата Истекување на функцијата_експиран го проверува животниот век на tuple, истекот на_делета брише топа, истекот на проверките на распрснување дали треба да продолжиме понатаму.

Код на функцијата Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Кодот на функцијата expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Код на функцијата Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Код на функција expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Апликација

Можете да го видите изворниот код на тука!

Извор: www.habr.com

Додадете коментар