Enige tijd geleden werden we geconfronteerd met het probleem van het schoonmaken van tupels in ruimtes
Een goed voorbeeld voor ons was de tarantool-module genaamd
beschrijving
De documentatie voor tarantool heeft een zeer goed
Laten we van veraf beginnen en kijken hoe een afgetopte verlopen module er van buitenaf uitziet:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Voor de eenvoud starten we tarantool in de map waar onze libcapped-expirationd.so-bibliotheek zich bevindt. Er worden twee functies uit de bibliotheek geëxporteerd: starten en doden. De eerste stap is om deze functies beschikbaar te maken vanuit Lua met behulp van box.schema.func.create en box.schema.user.grant. Maak vervolgens een ruimte waarvan de tupels slechts drie velden bevatten: de eerste is een unieke identificatie, de tweede is e-mail en de derde is de levensduur van de tupel. We bouwen een boomindex bovenop het eerste veld en noemen deze primair. Vervolgens krijgen we het verbindingsobject naar onze eigen bibliotheek.
Voer na het voorbereidende werk de startfunctie uit:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Dit voorbeeld werkt tijdens het scannen precies hetzelfde als de verlopen module, die in Lua is geschreven. Het eerste argument voor de startfunctie is de unieke naam van de taak. De tweede is de ruimte-ID. De derde is een unieke index waarmee tupels worden verwijderd. De vierde is de index waarmee de tupels zullen worden doorlopen. De vijfde is het nummer van het tupelveld met levensduur (nummering begint bij 1, niet bij 0!). De zesde en zevende zijn scaninstellingen. 1024 is het maximale aantal tupels dat in één transactie kan worden bekeken. 3600 — volledige scantijd in seconden.
Houd er rekening mee dat in het voorbeeld dezelfde index wordt gebruikt voor crawlen en verwijderen. Als dit een boomindex is, wordt de verplaatsing uitgevoerd van de kleinere sleutel naar de grotere. Als er een andere is, bijvoorbeeld een hash-index, wordt de doortocht in de regel in willekeurige volgorde uitgevoerd. Alle ruimte-tupels worden in één scan gescand.
Laten we verschillende tupels in de ruimte plaatsen met een levensduur van 60 seconden:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Laten we controleren of het invoegen is gelukt:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Laten we de selectie herhalen na meer dan 60 seconden (gerekend vanaf het begin van het invoegen van de eerste tuple) en kijken of de afgetopte expiratiemodule al is verwerkt:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Laten we de taak stoppen:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Laten we een tweede voorbeeld bekijken waarbij een afzonderlijke index wordt gebruikt voor de crawl:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Alles is hier hetzelfde als in het eerste voorbeeld, op enkele uitzonderingen na. We bouwen een boomindex bovenop het derde veld en noemen deze exp. Deze index hoeft niet uniek te zijn, in tegenstelling tot de index die primair wordt genoemd. Traversal wordt uitgevoerd door exp-index en verwijdering door primair. We herinneren ons dat beide voorheen alleen met behulp van de primaire index werden gedaan.
Na het voorbereidende werk voeren we de startfunctie uit met nieuwe argumenten:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Laten we opnieuw verschillende tupels in de ruimte plaatsen met een levensduur van 60 seconden:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Na 30 seconden voegen we naar analogie nog een paar tupels toe:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Laten we controleren of het invoegen is gelukt:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Laten we de selectie herhalen na meer dan 60 seconden (gerekend vanaf het begin van het invoegen van de eerste tuple) en kijken of de afgetopte expiratiemodule al is verwerkt:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Er zijn nog enkele tupels over in de ruimte die nog ongeveer 30 seconden te leven hebben. Bovendien stopte de scan bij het overgaan van een tupel met een ID van 2 en een levensduur van 1576421257 naar een tupel met een ID van 3 en een levensduur van 1576421287. Tupels met een levensduur van 1576421287 of meer werden niet gescand vanwege de volgorde van de exp-indexsleutels. Dit is de besparing die we vanaf het allereerste begin wilden realiseren.
Laten we de taak stoppen:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
uitvoering
De beste manier om over alle kenmerken van een project te vertellen, is de oorspronkelijke bron.
De argumenten die we doorgeven aan de startmethode worden opgeslagen in een structuur genaamd expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Het naamattribuut is de naam van de taak. Het space_id attribuut is de spatie-ID. Het attribuut rm_index_id is de identificatie van de unieke index waarmee tupels worden verwijderd. Het it_index_id attribuut is de identificatie van de index waarmee tupels zullen worden doorlopen. Het attribuut it_index_type is het type index waarmee tupels worden doorlopen. Het filed_no attribuut is het nummer van het tupelveld met levensduur. Het kenmerk scan_size is het maximale aantal tupels dat in één transactie wordt gescand. Het kenmerk scan_time is de volledige scantijd in seconden.
We zullen geen rekening houden met het ontleden van argumenten. Dit is een moeizaam maar eenvoudig klusje, waarbij de bibliotheek u helpt
We vermelden de prototypes van alle functies die worden gebruikt voor het parseren:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Laten we nu verder gaan met het belangrijkste: de logica van het omzeilen van spatie en het verwijderen van tupels. Elk blok met tupels dat niet groter is dan scan_size, wordt in één enkele transactie gescand en gewijzigd. Als deze succesvol is, wordt deze transactie vastgelegd; als er een fout optreedt, wordt deze teruggedraaid. Het laatste argument voor de functie expirationd_iterate is een verwijzing naar de iterator van waaruit het scannen begint of doorgaat. Deze iterator wordt intern opgehoogd totdat er een fout optreedt, de ruimte opraakt of het niet mogelijk is om het proces vooraf te stoppen. De functie expirationd_expired controleert de levensduur van een tupel, expirationd_delete verwijdert een tupel, expirationd_breakable controleert of we verder moeten.
Expirationd_iterate functiecode:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Functiecode expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete functiecode:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Functiecode expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
toepassing
Je kunt de broncode bekijken op
Bron: www.habr.com