Pred časom sme sa stretli s problémom čistenia n-tic v priestoroch
Dobrým príkladom pre nás bol modul tarantool tzv
Popis
Dokumentácia pre tarantool je veľmi dobrá
Začnime z diaľky a pozrime sa, ako zvonku vyzerá modul s ukončenou platnosťou:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Pre jednoduchosť spustíme tarantool v adresári, kde sa nachádza naša knižnica libcapped-expirationd.so. Z knižnice sú exportované dve funkcie: štart a kill. Prvým krokom je sprístupniť tieto funkcie z Lua pomocou box.schema.func.create a box.schema.user.grant. Potom vytvorte priestor, ktorého n-tice budú obsahovať iba tri polia: prvé je jedinečný identifikátor, druhé je e-mail a tretie je životnosť n-tice. Postavíme stromový index na prvé pole a nazývame ho primárne. Ďalej dostaneme objekt pripojenia k našej natívnej knižnici.
Po prípravných prácach spustite funkciu štart:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Tento príklad bude fungovať počas skenovania presne tak isto ako modul s exspiráciou, ktorý je napísaný v jazyku Lua. Prvým argumentom funkcie štart je jedinečný názov úlohy. Druhým je identifikátor priestoru. Tretím je jedinečný index, pomocou ktorého budú n-tice odstránené. Štvrtý je index, ktorým sa budú prechádzať n-tice. Piate je číslo n-ticového poľa so životnosťou (číslovanie začína od 1, nie od 0!). Šiesty a siedmy sú nastavenia skenovania. 1024 je maximálny počet n-tic, ktoré je možné zobraziť v jednej transakcii. 3600 — úplný čas skenovania v sekundách.
Všimnite si, že príklad používa rovnaký index na prehľadávanie a odstraňovanie. Ak ide o index stromu, potom sa prechod vykoná od menšieho kľúča k väčšiemu. Ak existuje nejaký iný, napríklad hash index, potom sa prechod spravidla vykonáva v náhodnom poradí. Všetky priestorové n-tice sa skenujú pri jednom skenovaní.
Vložme do priestoru niekoľko n-tic so životnosťou 60 sekúnd:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Skontrolujte, či bolo vloženie úspešné:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Zopakujme výber po 60+ sekundách (počítajúc od začiatku vloženia prvej n-tice) a uvidíme, že obmedzený modul s exspiráciou už spracoval:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Zastavme úlohu:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Pozrime sa na druhý príklad, kde sa na indexové prehľadávanie používa samostatný index:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Všetko je tu rovnaké ako v prvom príklade, až na pár výnimiek. Na tretie pole vytvoríme stromový index a nazveme ho exp. Tento index nemusí byť jedinečný, na rozdiel od indexu nazývaného primárny. Prechod bude vykonaný indexom exp a vymazanie primárnym. Pamätáme si, že predtým sa obe robili iba pomocou primárneho indexu.
Po prípravných prácach spustíme funkciu štart s novými argumentmi:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Vložme do priestoru opäť niekoľko n-tic so životnosťou 60 sekúnd:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Po 30 sekundách, analogicky, pridáme niekoľko ďalších n-tic:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Skontrolujte, či bolo vloženie úspešné:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Zopakujme výber po 60+ sekundách (počítajúc od začiatku vloženia prvej n-tice) a uvidíme, že obmedzený modul s exspiráciou už spracoval:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
V priestore ešte zostali nejaké n-tice, ktoré budú mať ešte asi 30 sekúnd života. Okrem toho sa skenovanie zastavilo pri prechode z n-tice s ID 2 a životnosťou 1576421257 na n-ticu s ID 3 a životnosťou 1576421287. N-tice so životnosťou 1576421287 alebo viac neboli naskenované z dôvodu objednania indexové kľúče exp. To sú úspory, ktoré sme chceli dosiahnuť hneď na začiatku.
Zastavme úlohu:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Реализация
Najlepší spôsob, ako povedať o všetkých funkciách projektu, je jeho pôvodný zdroj.
Argumenty, ktoré odovzdávame metóde štart, sú uložené v štruktúre s názvom expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atribút name je názov úlohy. Atribút space_id je identifikátor priestoru. Atribút rm_index_id je identifikátor jedinečného indexu, pomocou ktorého budú n-tice vymazané. Atribút it_index_id je identifikátor indexu, ktorým sa budú prechádzať n-tice. Atribút it_index_type je typ indexu, ktorým sa budú prechádzať n-tice. Atribút filed_no je číslo poľa n-tice so životnosťou. Atribút scan_size je maximálny počet n-tic, ktoré sú naskenované v jednej transakcii. Atribút scan_time je celkový čas skenovania v sekundách.
Nebudeme uvažovať o analýze argumentov. Ide o namáhavú, no jednoduchú prácu, s ktorou vám pomôže knižnica
Uvádzame prototypy všetkých funkcií, ktoré sa používajú na analýzu:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Teraz prejdime k tomu najdôležitejšiemu – k logike obchádzania priestoru a mazania n-tic. Každý blok n-tic nie väčší ako scan_size je naskenovaný a upravený v rámci jednej transakcie. Ak je transakcia úspešná, transakcia je potvrdená, ak sa vyskytne chyba, je vrátená späť. Posledný argument funkcie expirationd_iterate je ukazovateľ na iterátor, od ktorého skenovanie začína alebo pokračuje. Tento iterátor sa interne inkrementuje, kým sa nevyskytne chyba, neminie miesto alebo nie je možné zastaviť proces vopred. Funkcia expirationd_expired kontroluje životnosť n-tice, expirationd_delete maže n-ticu, expirationd_breakable kontroluje, či musíme ísť ďalej.
Kód funkcie Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Kód funkcie expired_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Kód funkcie expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Funkčný kód expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplikácia
Zdrojový kód si môžete pozrieť na
Zdroj: hab.com