Psaní našeho vlastního omezeného modulu s vypršením platnosti pro tarantool

Psaní našeho vlastního omezeného modulu s vypršením platnosti pro tarantool

Před časem jsme se potýkali s problémem čištění n-tic v prostorech tarantool. Čištění muselo být zahájeno ne, když už tarantoolu docházela paměť, ale s předstihem a v určité frekvenci. Pro tento úkol má tarantool modul napsaný v Lua nazvaný vypršení. Po krátkém používání tohoto modulu jsme si uvědomili, že pro nás není vhodný: kvůli neustálému čištění velkého množství dat Lua visela v GC. Proto jsme přemýšleli o vývoji vlastního modulu s omezenou expirací a doufali jsme, že kód napsaný v nativním programovacím jazyce vyřeší naše problémy tím nejlepším možným způsobem.

Dobrým příkladem pro nás byl modul tarantool tzv memcached. V něm použitý přístup je založen na tom, že v prostoru je vytvořeno samostatné pole, které udává životnost n-tice, jinými slovy ttl. Modul na pozadí prohledá prostor, porovná TTL s aktuálním časem a rozhodne se, zda n-tici smaže nebo ne. Kód modulu memcached je jednoduchý a elegantní, ale příliš obecný. Za prvé, nebere v úvahu typ indexu, který je prolézán a odstraňován. Za druhé, při každém průchodu jsou skenovány všechny n-tice, jejichž počet může být poměrně velký. A pokud se v modulu s expirací vyřešil první problém (index stromu byl rozdělen do samostatné třídy), druhému se stále nevěnovala žádná pozornost. Tyto tři body předurčily volbu ve prospěch psaní vlastního kódu.

popis

Dokumentace pro tarantool je velmi dobrá tutorial o tom, jak psát uložené procedury v C. Nejprve vám doporučuji se s tím seznámit, abyste porozuměli těm insertům s příkazy a kódem, které se objeví níže. Za pozornost také stojí odkaz na objekty, které jsou dostupné při psaní vlastního omezeného modulu, jmenovitě box, vlákno, index и TXN.

Začněme zpovzdálí a podívejme se, jak vypadá modul s ukončenou platností zvenčí:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Pro jednoduchost spustíme tarantool v adresáři, kde se nachází naše knihovna libcapped-expirationd.so. Z knihovny jsou exportovány dvě funkce: start a kill. Prvním krokem je zpřístupnění těchto funkcí z Lua pomocí box.schema.func.create a box.schema.user.grant. Poté vytvořte prostor, jehož n-tice budou obsahovat pouze tři pole: první je jedinečný identifikátor, druhé je e-mail a třetí je životnost n-tice. Na první pole vytvoříme stromový index a nazveme ho primární. Dále získáme objekt připojení k naší nativní knihovně.

Po přípravných pracích spusťte funkci start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Tento příklad bude fungovat během skenování úplně stejně jako expirovaný modul, který je napsán v Lua. První argument funkce start je jedinečný název úlohy. Druhým je identifikátor prostoru. Třetí je jedinečný index, pomocí kterého budou n-tice odstraněny. Čtvrtý je index, kterým se budou procházet n-tice. Páté je číslo n-ticového pole s životností (číslování začíná od 1, ne od 0!). Šestý a sedmý jsou nastavení skenování. 1024 je maximální počet n-tic, které lze zobrazit v jedné transakci. 3600 — doba úplného skenování v sekundách.

Všimněte si, že příklad používá stejný index pro procházení a mazání. Pokud se jedná o index stromu, pak se procházení provádí od menšího klíče k většímu. Pokud existuje nějaký jiný, například hash index, pak se procházení provádí zpravidla v náhodném pořadí. Všechny vesmírné n-tice jsou skenovány v jednom skenování.

Vložme do prostoru několik n-tic s životností 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Zkontrolujeme, zda bylo vložení úspěšné:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Zopakujme výběr po 60+ sekundách (počítáno od začátku vložení první n-tice) a uvidíme, že omezený modul s expirací již zpracoval:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Zastavme úkol:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Podívejme se na druhý příklad, kde se pro procházení používá samostatný index:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Zde je až na pár výjimek vše stejné jako v prvním příkladu. Na třetí pole vytvoříme stromový index a nazveme ho exp. Tento index nemusí být jedinečný, na rozdíl od indexu zvaného primární. Průchod bude proveden indexem exp a smazání primárním. Pamatujeme si, že dříve se obojí dělalo pouze pomocí primárního indexu.

Po přípravných pracích spustíme funkci start s novými argumenty:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Vložme do prostoru opět několik n-tic s životností 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Po 30 sekundách analogicky přidáme několik dalších n-tic:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Zkontrolujeme, zda bylo vložení úspěšné:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Zopakujme výběr po 60+ sekundách (počítáno od začátku vložení první n-tice) a uvidíme, že omezený modul s expirací již zpracoval:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

V prostoru ještě zbývají nějaké n-tice, které budou mít ještě asi 30 sekund života. Navíc se skenování zastavilo při přechodu z n-tice s ID 2 a životností 1576421257 na n-tice s ID 3 a životností 1576421287. N-tice s životností 1576421287 nebo více nebyly skenovány kvůli objednání indexové klíče exp. To jsou úspory, kterých jsme chtěli dosáhnout hned na začátku.

Zastavme úkol:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

uskutečnění

Nejlepší způsob, jak říct o všech funkcích projektu, je jeho původní zdroj. kód! V rámci publikace se zaměříme pouze na nejdůležitější body, a to na algoritmy obcházení prostoru.

Argumenty, které předáme metodě start, jsou uloženy ve struktuře nazvané expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atribut name je název úkolu. Atribut space_id je identifikátor prostoru. Atribut rm_index_id je identifikátor jedinečného indexu, kterým budou n-tice odstraněny. Atribut it_index_id je identifikátor indexu, kterým se budou procházet n-tice. Atribut it_index_type je typ indexu, kterým se budou procházet n-tice. Atribut filed_no je číslo pole n-tice s životností. Atribut scan_size je maximální počet n-tic, které jsou skenovány v jedné transakci. Atribut scan_time je celková doba skenování v sekundách.

Nebudeme uvažovat o analýze argumentů. Jde o namáhavou, ale jednoduchou práci, se kterou vám knihovna pomůže msgpuck. Potíže mohou nastat pouze u indexů, které jsou předávány z Lua jako komplexní datová struktura s typem mp_map a nepoužívají se jednoduché typy mp_bool, mp_double, mp_int, mp_uint a mp_array. Není ale potřeba analyzovat celý index. Stačí zkontrolovat jeho jedinečnost, vypočítat typ a extrahovat identifikátor.

Uvádíme prototypy všech funkcí, které se používají k analýze:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Nyní přejdeme k tomu nejdůležitějšímu – logice obcházení prostoru a mazání n-tic. Každý blok n-tic, který není větší než scan_size, je naskenován a upraven v rámci jedné transakce. Pokud je úspěšná, je tato transakce potvrzena, pokud dojde k chybě, je vrácena zpět. Poslední argument funkce expirationd_iterate je ukazatel na iterátor, od kterého skenování začíná nebo pokračuje. Tento iterátor je interně inkrementován, dokud nedojde k chybě, dojde místo nebo není možné proces předem zastavit. Funkce expirationd_expired kontroluje životnost n-tice, expirationd_delete maže n-tice, expirationd_breakable kontroluje, zda musíme jít dál.

Kód funkce Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kód funkce expired_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Kód funkce Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Kód funkce expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplikace

Zdrojový kód si můžete prohlédnout na zde!

Zdroj: www.habr.com

Přidat komentář