Před časem jsme se potýkali s problémem čištění n-tic v prostorech
Dobrým příkladem pro nás byl modul tarantool tzv
popis
Dokumentace pro tarantool je velmi dobrá
Začněme zpovzdálí a podívejme se, jak vypadá modul s ukončenou platností zvenčí:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Pro jednoduchost spustíme tarantool v adresáři, kde se nachází naše knihovna libcapped-expirationd.so. Z knihovny jsou exportovány dvě funkce: start a kill. Prvním krokem je zpřístupnění těchto funkcí z Lua pomocí box.schema.func.create a box.schema.user.grant. Poté vytvořte prostor, jehož n-tice budou obsahovat pouze tři pole: první je jedinečný identifikátor, druhé je e-mail a třetí je životnost n-tice. Na první pole vytvoříme stromový index a nazveme ho primární. Dále získáme objekt připojení k naší nativní knihovně.
Po přípravných pracích spusťte funkci start:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Tento příklad bude fungovat během skenování úplně stejně jako expirovaný modul, který je napsán v Lua. První argument funkce start je jedinečný název úlohy. Druhým je identifikátor prostoru. Třetí je jedinečný index, pomocí kterého budou n-tice odstraněny. Čtvrtý je index, kterým se budou procházet n-tice. Páté je číslo n-ticového pole s životností (číslování začíná od 1, ne od 0!). Šestý a sedmý jsou nastavení skenování. 1024 je maximální počet n-tic, které lze zobrazit v jedné transakci. 3600 — doba úplného skenování v sekundách.
Všimněte si, že příklad používá stejný index pro procházení a mazání. Pokud se jedná o index stromu, pak se procházení provádí od menšího klíče k většímu. Pokud existuje nějaký jiný, například hash index, pak se procházení provádí zpravidla v náhodném pořadí. Všechny vesmírné n-tice jsou skenovány v jednom skenování.
Vložme do prostoru několik n-tic s životností 60 sekund:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Zkontrolujeme, zda bylo vložení úspěšné:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Zopakujme výběr po 60+ sekundách (počítáno od začátku vložení první n-tice) a uvidíme, že omezený modul s expirací již zpracoval:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Zastavme úkol:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Podívejme se na druhý příklad, kde se pro procházení používá samostatný index:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Zde je až na pár výjimek vše stejné jako v prvním příkladu. Na třetí pole vytvoříme stromový index a nazveme ho exp. Tento index nemusí být jedinečný, na rozdíl od indexu zvaného primární. Průchod bude proveden indexem exp a smazání primárním. Pamatujeme si, že dříve se obojí dělalo pouze pomocí primárního indexu.
Po přípravných pracích spustíme funkci start s novými argumenty:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Vložme do prostoru opět několik n-tic s životností 60 sekund:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Po 30 sekundách analogicky přidáme několik dalších n-tic:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Zkontrolujeme, zda bylo vložení úspěšné:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Zopakujme výběr po 60+ sekundách (počítáno od začátku vložení první n-tice) a uvidíme, že omezený modul s expirací již zpracoval:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
V prostoru ještě zbývají nějaké n-tice, které budou mít ještě asi 30 sekund života. Navíc se skenování zastavilo při přechodu z n-tice s ID 2 a životností 1576421257 na n-tice s ID 3 a životností 1576421287. N-tice s životností 1576421287 nebo více nebyly skenovány kvůli objednání indexové klíče exp. To jsou úspory, kterých jsme chtěli dosáhnout hned na začátku.
Zastavme úkol:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
uskutečnění
Nejlepší způsob, jak říct o všech funkcích projektu, je jeho původní zdroj.
Argumenty, které předáme metodě start, jsou uloženy ve struktuře nazvané expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atribut name je název úkolu. Atribut space_id je identifikátor prostoru. Atribut rm_index_id je identifikátor jedinečného indexu, kterým budou n-tice odstraněny. Atribut it_index_id je identifikátor indexu, kterým se budou procházet n-tice. Atribut it_index_type je typ indexu, kterým se budou procházet n-tice. Atribut filed_no je číslo pole n-tice s životností. Atribut scan_size je maximální počet n-tic, které jsou skenovány v jedné transakci. Atribut scan_time je celková doba skenování v sekundách.
Nebudeme uvažovat o analýze argumentů. Jde o namáhavou, ale jednoduchou práci, se kterou vám knihovna pomůže
Uvádíme prototypy všech funkcí, které se používají k analýze:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Nyní přejdeme k tomu nejdůležitějšímu – logice obcházení prostoru a mazání n-tic. Každý blok n-tic, který není větší než scan_size, je naskenován a upraven v rámci jedné transakce. Pokud je úspěšná, je tato transakce potvrzena, pokud dojde k chybě, je vrácena zpět. Poslední argument funkce expirationd_iterate je ukazatel na iterátor, od kterého skenování začíná nebo pokračuje. Tento iterátor je interně inkrementován, dokud nedojde k chybě, dojde místo nebo není možné proces předem zastavit. Funkce expirationd_expired kontroluje životnost n-tice, expirationd_delete maže n-tice, expirationd_breakable kontroluje, zda musíme jít dál.
Kód funkce Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Kód funkce expired_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Kód funkce Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Kód funkce expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplikace
Zdrojový kód si můžete prohlédnout na
Zdroj: www.habr.com