For en tid siden ble vi møtt med problemet med å rense tupler i mellomrom
Et godt eksempel for oss var tarantool-modulen kalt
beskrivelse
Dokumentasjonen for tarantool har en veldig god
La oss starte langveis fra og se på hvordan en utløpt modul ser ut fra utsiden:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
For enkelhets skyld lanserer vi tarantool i katalogen der biblioteket vårt libcapped-expirationd.so ligger. To funksjoner eksporteres fra biblioteket: start og drep. Det første trinnet er å gjøre disse funksjonene tilgjengelige fra Lua ved å bruke box.schema.func.create og box.schema.user.grant. Deretter oppretter du en plass hvis tuples bare vil inneholde tre felt: det første er en unik identifikator, det andre er e-post, og det tredje er levetiden til tupelen. Vi bygger en treindeks på toppen av det første feltet og kaller det primær. Deretter får vi tilkoblingsobjektet til vårt opprinnelige bibliotek.
Etter det forberedende arbeidet, kjør startfunksjonen:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Dette eksemplet vil fungere under skanning nøyaktig det samme som utløpsmodulen, som er skrevet i Lua. Det første argumentet til startfunksjonen er det unike navnet på oppgaven. Den andre er plassidentifikatoren. Den tredje er en unik indeks der tupler vil bli slettet. Den fjerde er indeksen som tuplene vil bli krysset med. Den femte er nummeret på tuppelfeltet med levetid (nummereringen starter fra 1, ikke 0!). Den sjette og syvende er skanneinnstillinger. 1024 er det maksimale antallet tupler som kan sees i en enkelt transaksjon. 3600 — full skannetid i sekunder.
Merk at eksemplet bruker den samme indeksen for gjennomsøking og sletting. Hvis dette er en treindeks, utføres kryssingen fra den mindre nøkkelen til den større. Hvis det er en annen, for eksempel hash-indeks, utføres gjennomgangen som regel i tilfeldig rekkefølge. Alle romtupler skannes i én skanning.
La oss sette inn flere tupler i rommet med en levetid på 60 sekunder:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
La oss sjekke at innsettingen var vellykket:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
La oss gjenta valget etter 60+ sekunder (teller fra begynnelsen av innsettingen av den første tuppelen) og se at den begrensede utløpsmodulen allerede har behandlet:
tarantool> box.space.tester.index.primary:select()
---
- []
...
La oss stoppe oppgaven:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
La oss se på et annet eksempel der en egen indeks brukes for gjennomsøkingen:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Alt her er det samme som i det første eksemplet, med noen få unntak. Vi bygger en treindeks på toppen av det tredje feltet og kaller det exp. Denne indeksen trenger ikke å være unik, i motsetning til indeksen som kalles primær. Traversering vil bli utført av exp-indeks, og sletting av primær. Vi husker at tidligere ble begge utført kun ved bruk av primærindeksen.
Etter forarbeidet kjører vi startfunksjonen med nye argumenter:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
La oss sette inn flere tupler i rommet igjen med en levetid på 60 sekunder:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Etter 30 sekunder, analogt, vil vi legge til noen flere tupler:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
La oss sjekke at innsettingen var vellykket:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
La oss gjenta valget etter 60+ sekunder (teller fra begynnelsen av innsettingen av den første tuppelen) og se at den begrensede utløpsmodulen allerede har behandlet:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Det er fortsatt noen tupler igjen i rommet som vil ha omtrent 30 sekunder igjen å leve. Dessuten stoppet skanningen ved overgang fra en tuppel med en ID på 2 og en levetid på 1576421257 til en tuppel med en ID på 3 og en levetid på 1576421287. Tupler med en levetid på 1576421287 eller mer ble ikke skannet på grunn av bestillingen av exp-indeksnøklene. Dette er besparelsene vi ønsket å oppnå helt i begynnelsen.
La oss stoppe oppgaven:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
implementering
Den beste måten å fortelle om alle funksjonene til et prosjekt er dens opprinnelige kilde.
Argumentene vi sender til startmetoden er lagret i en struktur kalt expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Navnattributtet er navnet på oppgaven. Space_id-attributtet er space-identifikatoren. Attributtet rm_index_id er identifikatoren til den unike indeksen som tupler vil bli slettet med. It_index_id-attributtet er identifikatoren for indeksen som tupler vil bli krysset med. It_index_type-attributtet er typen indeks som tupler vil bli krysset med. Filed_no-attributtet er nummeret på tuppelfeltet med levetid. scan_size-attributtet er det maksimale antallet tupler som skannes i en transaksjon. scan_time-attributtet er hele skannetiden i sekunder.
Vi vil ikke vurdere å analysere argumenter. Dette er en møysommelig, men enkel jobb, som biblioteket vil hjelpe deg med
Vi lister opp prototypene til alle funksjoner som brukes til å analysere:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
La oss nå gå videre til det viktigste - logikken med å omgå plass og slette tupler. Hver blokk med tupler som ikke er større enn scan_size, skannes og endres under en enkelt transaksjon. Hvis den lykkes, blir denne transaksjonen forpliktet; hvis det oppstår feil, rulles den tilbake. Det siste argumentet til funksjonen expirationd_iterate er en peker til iteratoren som skanningen starter eller fortsetter fra. Denne iteratoren økes internt til det oppstår en feil, plassen går tom eller det ikke er mulig å stoppe prosessen på forhånd. Funksjonen expirationd_expired sjekker levetiden til en tuppel, expirationd_delete sletter en tuppel, expirationd_breakable sjekker om vi må gå videre.
Expirationd_iterate funksjonskode:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Funksjonskode expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete funksjonskode:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Funksjonskode expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
App
Du kan se kildekoden på
Kilde: www.habr.com