Prije nekog vremena suočili smo se s problemom čišćenja tuple-a u prostorima
Dobar primjer za nas je bio tarantool modul pod nazivom
Opis
Dokumentacija za tarantool ima vrlo dobru
Počnimo izdaleka i pogledajmo kako spolja izgleda ograničeni modul s istekom roka trajanja:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Radi jednostavnosti, pokrećemo tarantool u direktoriju u kojem se nalazi naša biblioteka libcapped-expirationd.so. Iz biblioteke se izvoze dvije funkcije: start i kill. Prvi korak je da ove funkcije učinite dostupnim iz Lua koristeći box.schema.func.create i box.schema.user.grant. Zatim kreirajte prostor čije će torke sadržavati samo tri polja: prvo je jedinstveni identifikator, drugo je e-pošta, a treće je životni vijek torke. Mi gradimo indeks stabla na vrhu prvog polja i nazivamo ga primarnim. Zatim dobijamo objekat veze sa našom matičnom bibliotekom.
Nakon pripremnih radova, pokrenite funkciju start:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Ovaj primjer će raditi tokom skeniranja potpuno isto kao i istekao modul, koji je napisan u Lua. Prvi argument funkcije start je jedinstveno ime zadatka. Drugi je identifikator prostora. Treći je jedinstveni indeks pomoću kojeg će se tuple brisati. Četvrti je indeks kojim će se preći torke. Peti je broj polja tuple sa životnim vijekom (numeracija počinje od 1, a ne od 0!). Šesta i sedma su postavke skeniranja. 1024 je maksimalni broj torki koje se mogu vidjeti u jednoj transakciji. 3600 — puno vrijeme skeniranja u sekundama.
Imajte na umu da primjer koristi isti indeks za indeksiranje i brisanje. Ako je ovo indeks stabla, tada se prelazak vrši od manjeg ključa do većeg. Ako postoji neki drugi, na primjer, hash indeks, tada se obilaženje vrši, u pravilu, slučajnim redoslijedom. Svi prostorni tokovi se skeniraju u jednom skeniranju.
Ubacimo nekoliko tuple-ova u prostor sa životnim vijekom od 60 sekundi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Provjerimo da li je umetanje bilo uspješno:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve tuple) i vidimo da je ograničeni modul isteka već radio:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Zaustavimo zadatak:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Pogledajmo drugi primjer gdje se zasebni indeks koristi za indeksiranje:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Ovdje je sve isto kao u prvom primjeru, sa nekoliko izuzetaka. Na vrhu trećeg polja gradimo indeks stabla i zovemo ga exp. Ovaj indeks ne mora biti jedinstven, za razliku od indeksa koji se naziva primarni. Prelazak će se vršiti indeksom exp, a brisanje primarnim. Sjećamo se da su prethodno oba rađena samo pomoću primarnog indeksa.
Nakon pripremnog rada, pokrećemo funkciju start s novim argumentima:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Ponovo ubacimo nekoliko tuple u prostor sa životnim vijekom od 60 sekundi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Nakon 30 sekundi, po analogiji, dodaćemo još nekoliko tuple-ova:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Provjerimo da li je umetanje bilo uspješno:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve tuple) i vidimo da je ograničeni modul isteka već radio:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
U prostoru je ostalo još nekih torva kojima je ostalo još oko 30 sekundi života. Štaviše, skeniranje je zaustavljeno pri prelasku sa torke s ID-om 2 i životnim vijekom 1576421257 na torku sa ID-om 3 i životnim vijekom 1576421287. Torke sa životnim vijekom 1576421287 ili više nisu skenirane zbog redoslijeda ključeva indeksa exp. To je ušteda koju smo željeli postići na samom početku.
Zaustavimo zadatak:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Реализация
Najbolji način da se ispriča o svim karakteristikama projekta je njegov izvorni izvor.
Argumenti koje prosljeđujemo start metodi pohranjeni su u strukturi koja se zove expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atribut name je naziv zadatka. Atribut space_id je identifikator prostora. Atribut rm_index_id je identifikator jedinstvenog indeksa pomoću kojeg će se tuple brisati. It_index_id atribut je identifikator indeksa kojim će se torke prelaziti. Atribut it_index_type je tip indeksa kojim će se torke prelaziti. Atribut filed_no je broj polja tuple sa životnim vijekom. Atribut scan_size je maksimalni broj torkova koji se skeniraju u jednoj transakciji. Atribut scan_time je puno vrijeme skeniranja u sekundama.
Nećemo razmatrati raščlanjivanje argumenata. Ovo je mukotrpan, ali jednostavan posao u kojem će vam biblioteka pomoći
Navodimo prototipove svih funkcija koje se koriste za raščlanjivanje:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Pređimo sada na najvažniju stvar - logiku zaobilaženja prostora i brisanja torki. Svaki blok torki ne veći od scan_size se skenira i modificira u okviru jedne transakcije. Ako je uspješna, ova transakcija se predaje; ako dođe do greške, vraća se nazad. Posljednji argument funkcije expirationd_iterate je pokazivač na iterator iz kojeg skeniranje počinje ili se nastavlja. Ovaj iterator se interno povećava sve dok ne dođe do greške, dok ne ponestane prostora ili nije moguće zaustaviti proces unaprijed. Funkcija expirationd_expired provjerava životni vijek torke, expirationd_delete briše tuple, expirationd_breakable provjerava da li trebamo ići dalje.
Expirationd_iterate kod funkcije:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Kôd funkcije expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete kod funkcije:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Kôd funkcije expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplikacija
Izvorni kod možete pogledati na
izvor: www.habr.com