Fa un temps ens vam trobar amb el problema de netejar les tuples als espais
Un bon exemple per a nosaltres va ser el mòdul tarantool anomenat
Descripció
La documentació de tarantool té una molt bona
Comencem des de lluny i mirem com és un mòdul amb caducitat limitada des de l'exterior:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Per simplificar, iniciem tarantool al directori on es troba la nostra biblioteca libcapped-expirationd.so. Des de la biblioteca s'exporten dues funcions: iniciar i matar. El primer pas és fer que aquestes funcions estiguin disponibles a Lua mitjançant box.schema.func.create i box.schema.user.grant. A continuació, creeu un espai les tuples del qual només contindran tres camps: el primer és un identificador únic, el segon és el correu electrònic i el tercer és la vida útil de la tupla. Construïm un índex d'arbre a la part superior del primer camp i l'anomenem principal. A continuació, obtenim l'objecte de connexió a la nostra biblioteca nativa.
Després del treball preparatori, executeu la funció d'inici:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Aquest exemple funcionarà durant l'escaneig exactament igual que el mòdul caducat, que està escrit en Lua. El primer argument de la funció d'inici és el nom únic de la tasca. El segon és l'identificador de l'espai. El tercer és un índex únic pel qual s'eliminaran les tuples. El quart és l'índex pel qual es recorreran les tuples. El cinquè és el nombre del camp de tupla amb vida útil (la numeració comença des de l'1, no del 0!). El sisè i el setè són paràmetres d'escaneig. 1024 és el nombre màxim de tuples que es poden veure en una sola transacció. 3600: temps d'escaneig complet en segons.
Tingueu en compte que l'exemple utilitza el mateix índex per rastrejar i suprimir. Si es tracta d'un índex d'arbre, el recorregut es realitza des de la clau més petita a la més gran. Si hi ha algun altre, per exemple, índex hash, el recorregut es realitza, per regla general, en ordre aleatori. Totes les tuples espacials s'escanegen en una sola exploració.
Inseriu diverses tuples a l'espai amb una vida útil de 60 segons:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Comprovem que la inserció ha estat correcta:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Repetim la selecció després de més de 60 segons (comptant des de l'inici de la inserció de la primera tupla) i veiem que el mòdul amb caducitat límit ja s'ha processat:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Aturem la tasca:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Vegem un segon exemple on s'utilitza un índex separat per al rastreig:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Tot aquí és igual que al primer exemple, amb algunes excepcions. Construïm un índex d'arbre a la part superior del tercer camp i l'anomenem exp. Aquest índex no ha de ser únic, a diferència de l'índex anomenat primari. La travessa es durà a terme mitjançant l'índex exp, i l'eliminació per primària. Recordem que anteriorment tots dos es feien només utilitzant l'índex primari.
Després del treball preparatori, executem la funció d'inici amb nous arguments:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Tornem a inserir diverses tuples a l'espai amb una vida útil de 60 segons:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Després de 30 segons, per analogia, afegirem unes quantes tuples més:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Comprovem que la inserció ha estat correcta:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Repetim la selecció després de més de 60 segons (comptant des de l'inici de la inserció de la primera tupla) i veiem que el mòdul amb caducitat límit ja s'ha processat:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Encara queden algunes tuples a l'espai que tindran uns 30 segons més de vida. A més, l'exploració es va aturar en passar d'una tupla amb un ID de 2 i una vida útil de 1576421257 a una tupla amb una ID de 3 i una vida útil de 1576421287. Les tuples amb una vida útil de 1576421287 o més no es van escanejar a causa de l'ordre de les claus d'índex exp. Aquest és l'estalvi que volíem aconseguir al principi.
Aturem la tasca:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementació
La millor manera d'explicar totes les característiques d'un projecte és la seva font original.
Els arguments que passem al mètode d'inici s'emmagatzemen en una estructura anomenada expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
L'atribut de nom és el nom de la tasca. L'atribut space_id és l'identificador de l'espai. L'atribut rm_index_id és l'identificador de l'índex únic pel qual s'eliminaran les tuples. L'atribut it_index_id és l'identificador de l'índex pel qual es recorreran les tuples. L'atribut it_index_type és el tipus d'índex pel qual es recorreran les tuples. L'atribut filed_no és el número del camp de tupla amb vida útil. L'atribut scan_size és el nombre màxim de tuples que s'escanegen en una transacció. L'atribut scan_time és el temps d'exploració complet en segons.
No considerarem l'anàlisi d'arguments. Aquesta és una feina minuciosa però senzilla, amb la qual la biblioteca us ajudarà
Enumerem els prototips de totes les funcions que s'utilitzen per analitzar:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Ara passem al més important: la lògica d'ometre l'espai i eliminar tuples. Cada bloc de tuples no més gran que scan_size s'escaneja i es modifica en una única transacció. Si té èxit, aquesta transacció es confirma; si es produeix un error, es revertirà. L'últim argument de la funció expirationd_iterate és un punter a l'iterador des del qual comença o continua l'exploració. Aquest iterador s'incrementa internament fins que es produeix un error, s'esgota l'espai o no és possible aturar el procés per endavant. La funció expirationd_expired comprova la vida útil d'una tupla, expirationd_delete elimina una tupla, expirationd_breakable comprova si hem de seguir endavant.
Codi de funció Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Codi de funció expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Codi de funció Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Codi de funció expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplicació
Podeu veure el codi font a
Font: www.habr.com