În urmă cu ceva timp ne-am confruntat cu problema curățării tuplurilor în spații
Un bun exemplu pentru noi a fost modulul tarantool numit
descriere
Documentația pentru tarantool are o foarte bună
Să începem de departe și să vedem cum arată un modul cu expirare plafonată din exterior:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Pentru simplitate, lansăm tarantool în directorul în care se află biblioteca noastră libcapped-expirationd.so. Două funcții sunt exportate din bibliotecă: start și kill. Primul pas este să faceți aceste funcții disponibile din Lua folosind box.schema.func.create și box.schema.user.grant. Apoi creați un spațiu ale cărui tupluri vor conține doar trei câmpuri: primul este un identificator unic, al doilea este e-mail, iar al treilea este durata de viață a tuplului. Construim un index de arbore deasupra primului câmp și îl numim primar. Apoi obținem obiectul de conexiune la biblioteca noastră nativă.
După lucrările pregătitoare, rulați funcția de pornire:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Acest exemplu va funcționa în timpul scanării exact la fel ca modulul expirat, care este scris în Lua. Primul argument al funcției de pornire este numele unic al sarcinii. Al doilea este identificatorul spațiului. Al treilea este un index unic prin care tuplurile vor fi șterse. Al patrulea este indicele prin care vor fi parcurse tuplurile. Al cincilea este numărul câmpului tuplu cu durata de viață (numerotarea începe de la 1, nu de la 0!). Al șaselea și al șaptelea sunt setările de scanare. 1024 este numărul maxim de tupluri care pot fi vizualizate într-o singură tranzacție. 3600 — timp de scanare complet în secunde.
Rețineți că exemplul folosește același index pentru accesare cu crawlere și ștergere. Dacă acesta este un index de arbore, atunci traversarea este efectuată de la cheia mai mică la cea mai mare. Dacă există un alt indice hash, de exemplu, atunci traversarea este efectuată, de regulă, în ordine aleatorie. Toate tuplurile spațiale sunt scanate într-o singură scanare.
Să introducem mai multe tupluri în spațiu cu o durată de viață de 60 de secunde:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Să verificăm dacă inserarea a avut succes:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Să repetăm selecția după 60+ secunde (numărând de la începutul inserării primului tuplu) și să vedem că modulul cu expirare plafonată a procesat deja:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Să oprim sarcina:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Să ne uităm la un al doilea exemplu în care este folosit un index separat pentru accesare cu crawlere:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Totul aici este la fel ca în primul exemplu, cu câteva excepții. Construim un index de arbore deasupra celui de-al treilea câmp și îl numim exp. Acest index nu trebuie să fie unic, spre deosebire de indexul numit primar. Traversarea va fi efectuată prin index exp, iar ștergerea prin primar. Ne amintim că anterior ambele se făceau doar folosind indicele primar.
După lucrările pregătitoare, rulăm funcția de pornire cu argumente noi:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Să inserăm din nou câteva tupluri în spațiu cu o durată de viață de 60 de secunde:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
După 30 de secunde, prin analogie, vom mai adăuga câteva tupluri:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Să verificăm dacă inserarea a avut succes:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Să repetăm selecția după 60+ secunde (numărând de la începutul inserării primului tuplu) și să vedem că modulul cu expirare plafonată a procesat deja:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Au mai rămas câteva tupluri în spațiu care vor avea încă vreo 30 de secunde de trăit. Mai mult, scanarea s-a oprit la trecerea de la un tuplu cu un ID de 2 și o durată de viață de 1576421257 la un tuplu cu un ID de 3 și o viață de 1576421287. Tuplurile cu o durată de viață de 1576421287 sau mai mult nu au fost scanate din cauza comandării tastele index exp. Acestea sunt economiile pe care am vrut să le realizăm de la bun început.
Să oprim sarcina:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
punerea în aplicare
Cel mai bun mod de a spune despre toate caracteristicile unui proiect este sursa lui originală.
Argumentele pe care le transmitem metodei start sunt stocate într-o structură numită expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atributul nume este numele sarcinii. Atributul space_id este identificatorul spațiului. Atributul rm_index_id este identificatorul indexului unic prin care tuplurile vor fi șterse. Atributul it_index_id este identificatorul indexului prin care vor fi parcurse tuplurile. Atributul it_index_type este tipul de index prin care vor fi parcurse tuplurile. Atributul filed_no este numărul câmpului tuplu cu durata de viață. Atributul scan_size este numărul maxim de tupluri care sunt scanate într-o tranzacție. Atributul scan_time este timpul complet de scanare în secunde.
Nu vom lua în considerare analizarea argumentelor. Aceasta este o treabă minuțioasă, dar simplă, cu care biblioteca vă va ajuta
Enumerăm prototipurile tuturor funcțiilor care sunt utilizate pentru analiza:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Acum să trecem la cel mai important lucru - logica ocolirii spațiului și a ștergerii tuplurilor. Fiecare bloc de tupluri nu mai mare decât scan_size este scanat și modificat în cadrul unei singure tranzacții. Dacă are succes, această tranzacție este comisă; dacă apare o eroare, aceasta este anulată. Ultimul argument al funcției expirationd_iterate este un pointer către iteratorul de la care începe sau continuă scanarea. Acest iterator este incrementat intern până când apare o eroare, spațiul se epuizează sau nu este posibilă oprirea procesului în avans. Funcția expirationd_expired verifică durata de viață a unui tuplu, expirationd_delete șterge un tuplu, expirationd_breakable verifică dacă trebuie să mergem mai departe.
Codul funcției Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Codul funcției expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Codul funcției Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Codul funcției Expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
App
Puteți vizualiza codul sursă la
Sursa: www.habr.com