Antaŭ iom da tempo ni alfrontis la problemon de purigado de opoj en spacoj
Bona ekzemplo por ni estis la tarantool-modulo nomita
Priskribo
La dokumentado por tarantool havas tre bonan
Ni komencu de malproksime kaj rigardu, kiel aspektas eksvalidigita modulo de ekstere:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Por simpleco, ni lanĉas tarantool en la dosierujo kie troviĝas nia biblioteko libcapped-expirationd.so. Du funkcioj estas eksportitaj el la biblioteko: komenci kaj mortigi. La unua paŝo estas disponigi ĉi tiujn funkciojn de Lua uzante box.schema.func.create kaj box.schema.user.grant. Poste kreu spacon, kies opoj enhavos nur tri kampojn: la unua estas unika identigilo, la dua estas retpoŝto, kaj la tria estas la vivdaŭro de la opo. Ni konstruas arbindekson sur la unua kampo kaj nomas ĝin primara. Poste ni ricevas la konektan objekton al nia denaska biblioteko.
Post la prepara laboro, rulu la komencan funkcion:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Ĉi tiu ekzemplo funkcios dum skanado ĝuste same kiel la eksvalidiĝinta modulo, kiu estas skribita en Lua. La unua argumento al la startfunkcio estas la unika nomo de la tasko. La dua estas la spaca identigilo. La tria estas unika indekso per kiu opoj estos forigitaj. La kvara estas la indekso per kiu la opoj estos trapasitaj. La kvina estas la nombro de la opokampo kun vivdaŭro (numerado komenciĝas de 1, ne 0!). La sesa kaj sepa estas skanaj agordoj. 1024 estas la maksimuma nombro da opoj kiuj povas esti viditaj en ununura transakcio. 3600 — plena skanado en sekundoj.
Notu, ke la ekzemplo uzas la saman indekson por rampi kaj forigi. Se ĉi tio estas arbo-indekso, tiam la trapasado estas farita de la pli malgranda ŝlosilo al la pli granda. Se ekzistas iu alia, ekzemple, hash-indekso, tiam la trairado estas farita, kiel regulo, en hazarda ordo. Ĉiuj spacaj opoj estas skanitaj en unu skanado.
Ni enigu plurajn opoj en la spacon kun vivdaŭro de 60 sekundoj:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Ni kontrolu, ke la enmeto sukcesis:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Ni ripetu la elekton post 60+ sekundoj (kalkulante de la komenco de la enmeto de la unua opo) kaj vidu, ke la limigita eksvalidiĝinta modulo jam procesiĝis:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Ni ĉesigu la taskon:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Ni rigardu duan ekzemplon, kie aparta indekso estas uzata por la rampado:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Ĉio ĉi tie estas la sama kiel en la unua ekzemplo, kun kelkaj esceptoj. Ni konstruas arbindekson sur la tria kampo kaj nomas ĝin exp. Ĉi tiu indekso ne devas esti unika, male al la indekso nomata primara. Traversado estos farita per eksp-indekso, kaj forigo per primara. Ni memoras, ke antaŭe ambaŭ estis faritaj nur uzante la primaran indicon.
Post la prepara laboro, ni rulas la startfunkcion kun novaj argumentoj:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Ni enigu plurajn opoj en la spacon denove kun vivdaŭro de 60 sekundoj:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Post 30 sekundoj, analoge, ni aldonos kelkajn pliajn opoj:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Ni kontrolu, ke la enmeto sukcesis:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Ni ripetu la elekton post 60+ sekundoj (kalkulante de la komenco de la enmeto de la unua opo) kaj vidu, ke la limigita eksvalidiĝinta modulo jam procesiĝis:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Restas ankoraŭ kelkaj opoj en la spaco, kiuj havos ĉirkaŭ 30 pliajn sekundojn por vivi. Plie, la skanado ĉesis dum moviĝado de opo kun ID de 2 kaj vivdaŭro de 1576421257 al opo kun ID de 3 kaj vivdaŭro de 1576421287. Opoj kun vivdaŭro de 1576421287 aŭ pli ne estis skanitaj pro la mendo de la exp-indeksklavoj. Ĉi tio estas la ŝparado, kiun ni volis atingi ĉe la komenco mem.
Ni ĉesigu la taskon:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Реализация
La plej bona maniero rakonti pri ĉiuj trajtoj de projekto estas ĝia originala fonto.
La argumentoj, kiujn ni transdonas al la startmetodo, estas konservitaj en strukturo nomata expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
La nomo atributo estas la nomo de la tasko. La space_id-atributo estas la spaca identigilo. La atributo rm_index_id estas la identigilo de la unika indekso per kiu opoj estos forigitaj. La atributo it_index_id estas la identigilo de la indekso per kiu opoj estos trapasitaj. La atributo it_index_type estas la speco de indekso per kiu opoj estos trapasitaj. La atributo filed_no estas la nombro de la opa kampo kun vivdaŭro. La atributo scan_size estas la maksimuma nombro da opoj kiuj estas skanitaj en unu transakcio. La atributo scan_time estas la plena skanado en sekundoj.
Ni ne konsideros analizajn argumentojn. Ĉi tio estas peniga sed simpla laboro, per kiu la biblioteko helpos vin
Ni listigas la prototipojn de ĉiuj funkcioj uzataj por analizado:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Nun ni transiru al la plej grava afero - la logiko preterpasi spacon kaj forigi opoj. Ĉiu bloko de opoj ne pli granda ol scan_size estas skanita kaj modifita sub ununura transakcio. Se sukcesa, ĉi tiu transakcio estas farita; se eraro okazas, ĝi estas reigita. La lasta argumento al la funkcio expirationd_iterate estas montrilo al la iteratoro de kiu skanado komenciĝas aŭ daŭras. Ĉi tiu iteratoro estas pliigita interne ĝis eraro okazas, la spaco finiĝas, aŭ ne eblas ĉesigi la procezon anticipe. La funkcio expirationd_expired kontrolas la vivdaŭron de opo, expirationd_delete forigas opon, expirationd_breakable kontrolas ĉu ni bezonas pluiri.
Funkcia kodo Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Funkcia kodo expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Funkcia kodo Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Funkcia kodo expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Apliko
Vi povas vidi la fontkodon ĉe
fonto: www.habr.com