For noget tid siden stod vi over for problemet med at rense tupler i rummet
Et godt eksempel for os var tarantool-modulet kaldet
beskrivelse
Dokumentationen for tarantool har en meget god
Lad os starte langvejs fra og se på, hvordan et udløbet udløbsmodul ser ud udefra:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
For nemheds skyld lancerer vi tarantool i den mappe, hvor vores libcapped-expirationd.so bibliotek er placeret. To funktioner eksporteres fra biblioteket: start og dræb. Det første trin er at gøre disse funktioner tilgængelige fra Lua ved hjælp af box.schema.func.create og box.schema.user.grant. Opret derefter et rum, hvis tuples kun vil indeholde tre felter: det første er en unik identifikator, det andet er e-mail, og det tredje er tuplens levetid. Vi bygger et træindeks oven på det første felt og kalder det primært. Dernæst får vi forbindelsesobjektet til vores oprindelige bibliotek.
Efter det forberedende arbejde skal du køre startfunktionen:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Dette eksempel vil fungere under scanning nøjagtigt det samme som det udløbne modul, som er skrevet i Lua. Det første argument til startfunktionen er det unikke navn på opgaven. Den anden er pladsidentifikatoren. Det tredje er et unikt indeks, hvormed tupler vil blive slettet. Den fjerde er det indeks, som tuplerne vil blive gennemløbet med. Den femte er nummeret på tupelfeltet med levetid (nummereringen starter fra 1, ikke 0!). Den sjette og syvende er scanningsindstillinger. 1024 er det maksimale antal tuples, der kan ses i en enkelt transaktion. 3600 — fuld scanningstid i sekunder.
Bemærk, at eksemplet bruger det samme indeks til at gennemgå og slette. Hvis dette er et træindeks, udføres gennemgangen fra den mindre nøgle til den større. Hvis der er et andet, for eksempel hash-indeks, udføres gennemgangen som regel i tilfældig rækkefølge. Alle space tuples scannes i én scanning.
Lad os indsætte flere tuples i rummet med en levetid på 60 sekunder:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Lad os kontrollere, at indsættelsen lykkedes:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Lad os gentage valget efter 60+ sekunder (tæller fra begyndelsen af indsættelsen af den første tuple) og se, at det begrænsede udløbsmodul allerede har behandlet:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Lad os stoppe opgaven:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Lad os se på et andet eksempel, hvor et separat indeks bruges til gennemgangen:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Alt her er det samme som i det første eksempel, med nogle få undtagelser. Vi bygger et træindeks oven på det tredje felt og kalder det exp. Dette indeks behøver ikke at være unikt, i modsætning til det indeks, der kaldes primært. Gennemgang vil blive udført af exp indeks, og sletning af primær. Vi husker, at begge tidligere kun blev udført ved hjælp af det primære indeks.
Efter det forberedende arbejde kører vi startfunktionen med nye argumenter:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Lad os indsætte flere tupler i rummet igen med en levetid på 60 sekunder:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Efter 30 sekunder vil vi analogt tilføje et par flere tupler:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Lad os kontrollere, at indsættelsen lykkedes:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Lad os gentage valget efter 60+ sekunder (tæller fra begyndelsen af indsættelsen af den første tuple) og se, at det begrænsede udløbsmodul allerede har behandlet:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Der er stadig nogle tupler tilbage i rummet, der vil have omkring 30 sekunder mere at leve. Desuden stoppede scanningen, da man flyttede fra en tupel med et ID på 2 og en levetid på 1576421257 til en tuple med et ID på 3 og en levetid på 1576421287. Tuples med en levetid på 1576421287 eller mere blev ikke scannet på grund af bestilling af exp indeks nøglerne. Det er de besparelser, vi ønskede at opnå i begyndelsen.
Lad os stoppe opgaven:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
implementering
Den bedste måde at fortælle om alle funktionerne i et projekt er dets originale kilde.
Argumenterne, vi sender til startmetoden, er gemt i en struktur kaldet expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Navnattributten er navnet på opgaven. Space_id-attributten er space-id'et. Attributten rm_index_id er identifikatoren for det unikke indeks, hvormed tupler vil blive slettet. Attributten it_index_id er identifikatoren for det indeks, som tupler vil blive krydset med. Attributten it_index_type er den type indeks, som tupler vil blive krydset med. Filed_no-attributten er nummeret på tupelfeltet med levetid. Attributten scan_size er det maksimale antal tuples, der scannes i en transaktion. Attributten scan_time er den fulde scanningstid i sekunder.
Vi vil ikke overveje at analysere argumenter. Dette er et møjsommeligt, men enkelt arbejde, som biblioteket vil hjælpe dig med
Vi lister prototyperne af alle funktioner, der bruges til at analysere:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Lad os nu gå videre til det vigtigste - logikken i at omgå plads og slette tupler. Hver blok af tupler, der ikke er større end scan_size, scannes og ændres under en enkelt transaktion. Hvis den lykkes, er denne transaktion begået; hvis der opstår fejl, rulles den tilbage. Det sidste argument til expirationd_iterate-funktionen er en pegepind til iteratoren, hvorfra scanningen begynder eller fortsætter. Denne iterator øges internt, indtil der opstår en fejl, pladsen løber tør, eller det ikke er muligt at stoppe processen på forhånd. Funktionen expirationd_expired kontrollerer levetiden for en tupel, expirationd_delete sletter en tupel, expirationd_breakable tjekker om vi skal videre.
Expirationd_iterate funktionskode:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Funktionskode expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete funktionskode:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Funktionskode expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
App
Du kan se kildekoden på
Kilde: www.habr.com