Qualche tempo fa ci siamo trovati di fronte al problema della pulizia delle tuple negli spazi
Un buon esempio per noi è stato il modulo tarantool chiamato
descrizione
La documentazione per tarantool è molto buona
Partiamo da lontano e vediamo come appare dall'esterno un modulo con scadenza limitata:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Per semplicità, lanciamo tarantool nella directory in cui si trova la nostra libreria libcapped-expirationd.so. Dalla libreria vengono esportate due funzioni: start e kill. Il primo passo è rendere queste funzioni disponibili da Lua utilizzando box.schema.func.create e box.schema.user.grant. Quindi crea uno spazio le cui tuple conterranno solo tre campi: il primo è un identificatore univoco, il secondo è l'e-mail e il terzo è la durata della tupla. Costruiamo un indice ad albero sopra il primo campo e lo chiamiamo primario. Successivamente otteniamo l'oggetto connessione nella nostra libreria nativa.
Dopo il lavoro preparatorio, esegui la funzione di avvio:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Questo esempio funzionerà durante la scansione esattamente come il modulo expirationd, che è scritto in Lua. Il primo argomento della funzione start è il nome univoco dell'attività. Il secondo è l'identificatore dello spazio. Il terzo è un indice univoco mediante il quale verranno eliminate le tuple. Il quarto è l'indice con cui verranno attraversate le tuple. Il quinto è il numero del campo tupla con durata (la numerazione inizia da 1, non da 0!). Il sesto e il settimo sono impostazioni di scansione. 1024 è il numero massimo di tuple che possono essere visualizzate in una singola transazione. 3600: tempo di scansione completa in secondi.
Tieni presente che l'esempio utilizza lo stesso indice per la scansione e l'eliminazione. Se si tratta di un indice ad albero, allora l'attraversamento viene effettuato dalla chiave più piccola a quella più grande. Se esiste un altro indice hash, ad esempio, l'attraversamento viene eseguito, di regola, in ordine casuale. Tutte le tuple spaziali vengono scansionate in un'unica scansione.
Inseriamo diverse tuple nello spazio con una durata di 60 secondi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Verifichiamo che l'inserimento sia andato a buon fine:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Ripetiamo la selezione dopo 60+ secondi (contando dall'inizio dell'inserimento della prima tupla) e vediamo che il modulo capped expirationd ha già funzionato:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Interrompiamo il compito:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Diamo un'occhiata a un secondo esempio in cui viene utilizzato un indice separato per la scansione:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Tutto qui è uguale al primo esempio, con poche eccezioni. Costruiamo un indice ad albero sopra il terzo campo e lo chiamiamo exp. Questo indice non deve essere univoco, a differenza dell'indice chiamato primario. L'attraversamento verrà effettuato per indice exp e l'eliminazione per primario. Ricordiamo che in precedenza entrambe venivano eseguite utilizzando solo l'indice primario.
Dopo il lavoro preparatorio, eseguiamo la funzione start con nuovi argomenti:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Inseriamo nuovamente diverse tuple nello spazio con una durata di 60 secondi:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Dopo 30 secondi, per analogia, aggiungeremo qualche altra tupla:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Verifichiamo che l'inserimento sia andato a buon fine:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ripetiamo la selezione dopo 60+ secondi (contando dall'inizio dell'inserimento della prima tupla) e vediamo che il modulo capped expirationd ha già funzionato:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ci sono ancora alcune tuple rimaste nello spazio che avranno circa altri 30 secondi di vita. Inoltre, la scansione si interrompeva quando si passava da una tupla con un ID pari a 2 e una durata pari a 1576421257 a una tupla con un ID pari a 3 e una durata pari a 1576421287. Le tuple con una durata pari a 1576421287 o più non venivano scansionate a causa dell'ordinamento di i tasti dell'indice exp. Questo è il risparmio che volevamo ottenere fin dall'inizio.
Interrompiamo il compito:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
implementazione
Il modo migliore per raccontare tutte le caratteristiche di un progetto è la sua fonte originale.
Gli argomenti che passiamo al metodo start sono memorizzati in una struttura chiamata expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
L'attributo name è il nome dell'attività. L'attributo space_id è l'identificatore dello spazio. L'attributo rm_index_id è l'identificatore dell'indice univoco con cui verranno eliminate le tuple. L'attributo it_index_id è l'identificatore dell'indice mediante il quale verranno attraversate le tuple. L'attributo it_index_type è il tipo di indice con cui verranno attraversate le tuple. L'attributo filed_no è il numero del campo tupla con durata. L'attributo scan_size rappresenta il numero massimo di tuple scansionate in una transazione. L'attributo scan_time è il tempo di scansione completo in secondi.
Non prenderemo in considerazione l'analisi degli argomenti. Questo è un lavoro minuzioso ma semplice, in cui la biblioteca ti aiuterà
Elenchiamo i prototipi di tutte le funzioni utilizzate per l'analisi:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Passiamo ora alla cosa più importante: la logica di bypassare lo spazio ed eliminare le tuple. Ogni blocco di tuple non più grande di scan_size viene scansionato e modificato in una singola transazione. In caso di successo, questa transazione viene confermata; se si verifica un errore, viene eseguito il rollback. L'ultimo argomento della funzione expirationd_iterate è un puntatore all'iteratore da cui inizia o continua la scansione. Questo iteratore viene incrementato internamente fino a quando non si verifica un errore, lo spazio si esaurisce o non è possibile interrompere il processo in anticipo. La funzione expirationd_expired controlla la durata di una tupla, expirationd_delete cancella una tupla, expirationd_breakable controlla se dobbiamo andare avanti.
Codice funzione Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Codice funzione expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Codice funzione Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Codice funzione Expirationd_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
applicazione
È possibile visualizzare il codice sorgente su
Fonte: habr.com