Prije nekog vremena suočili smo se s problemom čišćenja torki u razmacima
Dobar primjer za nas je tarantool modul tzv
Opis
Dokumentacija za tarantool je vrlo dobra
Krenimo izdaleka i pogledajmo kako modul s ograničenim rokom valjanosti izgleda izvana:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Radi jednostavnosti, pokrećemo tarantool u direktoriju gdje se nalazi naša biblioteka libcapped-expirationd.so. Iz knjižnice se izvoze dvije funkcije: start i kill. Prvi korak je učiniti ove funkcije dostupnima iz Lua koristeći box.schema.func.create i box.schema.user.grant. Zatim kreirajte prostor čije će torke sadržavati samo tri polja: prvo je jedinstveni identifikator, drugo je email, a treće je životni vijek torke. Gradimo indeks stabla na vrhu prvog polja i nazivamo ga primarnim. Zatim dobivamo objekt povezivanja s našom izvornom bibliotekom.
Nakon pripremnih radova pokrenite funkciju pokretanja:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Ovaj će primjer raditi tijekom skeniranja potpuno isto kao modul expirationd, koji je napisan u Lua. Prvi argument funkciji start je jedinstveni naziv zadatka. Drugi je identifikator prostora. Treći je jedinstveni indeks kojim će se brisati torke. Četvrti je indeks pomoću kojeg će se obići torke. Peti je broj polja tuple s vijekom trajanja (numeriranje počinje od 1, ne od 0!). Šesta i sedma su postavke skeniranja. 1024 je najveći broj torki koje se mogu vidjeti u jednoj transakciji. 3600 — vrijeme punog skeniranja u sekundama.
Imajte na umu da primjer koristi isti indeks za indeksiranje i brisanje. Ako je ovo indeks stabla, tada se obilazak provodi od manjeg ključa prema većem. Ako postoji neki drugi, na primjer, hash indeks, tada se obilazak provodi, u pravilu, nasumičnim redoslijedom. Sve prostorne torke se skeniraju u jednom skeniranju.
Umetnimo nekoliko tupleova u prostor sa životnim vijekom od 60 sekundi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Provjerimo je li umetanje uspješno:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve torke) i vidimo da je ograničeni expirationd modul već obrađen:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Zaustavimo zadatak:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Pogledajmo drugi primjer gdje se za indeksiranje koristi zasebni indeks:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Ovdje je sve isto kao u prvom primjeru, uz nekoliko iznimaka. Gradimo indeks stabla na vrhu trećeg polja i nazivamo ga exp. Ovaj indeks ne mora biti jedinstven, za razliku od indeksa koji se zove primarni. Prolazak će se izvršiti exp indeksom, a brisanje primarnim. Sjećamo se da su se prije oba radila samo pomoću primarnog indeksa.
Nakon pripremnog rada, pokrećemo funkciju start s novim argumentima:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Umetnimo ponovno nekoliko torki u prostor sa životnim vijekom od 60 sekundi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Nakon 30 sekundi, po analogiji ćemo dodati još nekoliko torki:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Provjerimo je li umetanje uspješno:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve torke) i vidimo da je ograničeni expirationd modul već obrađen:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ostalo je još nekoliko tupleova u prostoru koji će imati još oko 30 sekundi života. Štoviše, skeniranje je zaustavljeno pri prelasku s torke s ID-om 2 i životnim vijekom 1576421257 na torku s ID-om 3 i životnim vijekom 1576421287. Torke s životnim vijekom 1576421287 ili više nisu skenirane zbog redoslijeda ključeva exp indeksa. To je ušteda koju smo htjeli postići na samom početku.
Zaustavimo zadatak:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Provedba
Najbolji način da ispričate sve značajke projekta je njegov izvorni izvor.
Argumenti koje prosljeđujemo početnoj metodi pohranjeni su u strukturi koja se zove expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atribut name je naziv zadatka. Space_id atribut je identifikator prostora. Atribut rm_index_id identifikator je jedinstvenog indeksa kojim će se torke brisati. Atribut it_index_id je identifikator indeksa kojim će se obići torke. Atribut it_index_type je tip indeksa kojim će se obići torke. Atribut filed_no broj je polja tuple s vijekom trajanja. Atribut scan_size je najveći broj torki koje se skeniraju u jednoj transakciji. Atribut scan_time je puno vrijeme skeniranja u sekundama.
Nećemo razmatrati argumente raščlanjivanja. Ovo je mukotrpan, ali jednostavan posao, u kojem će vam knjižnica pomoći
Navodimo prototipove svih funkcija koje se koriste za parsiranje:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Sada prijeđimo na ono najvažnije - logiku zaobilaženja prostora i brisanja tuplea. Svaki blok torki ne veći od scan_size skenira se i modificira u jednoj transakciji. Ako je uspješna, ova se transakcija predaje; ako se dogodi pogreška, vraća se unatrag. Posljednji argument funkcije expirationd_iterate je pokazivač na iterator od kojeg skeniranje počinje ili se nastavlja. Ovaj se iterator interno povećava dok se ne pojavi greška, ne ponestane prostora ili nije moguće zaustaviti proces unaprijed. Funkcija expirationd_expired provjerava vijek trajanja tuplea, expirationd_delete briše tuple, expirationd_breakable provjerava trebamo li ići dalje.
Kôd funkcije Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Kôd funkcije expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Kôd funkcije Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Kod funkcije Expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
primjena
Izvorni kod možete pogledati na
Izvor: www.habr.com