Skribante nian propran kapitan eksvaliditan modulon por tarantool

Skribante nian propran kapitan eksvaliditan modulon por tarantool

Antaŭ iom da tempo ni alfrontis la problemon de purigado de opoj en spacoj tarantool. Purigado devis komenciĝi ne kiam tarantoolo jam mankis memoro, sed anticipe kaj je certa frekvenco. Por ĉi tiu tasko, tarantool havas modulon skribitan en Lua nomita eksvalidiĝo. Post uzi ĉi tiun modulon por mallonga tempo, ni rimarkis, ke ĝi ne taŭgas por ni: pro konstanta purigado de grandaj kvantoj da datumoj, Lua pendis en la GC. Sekve, ni pensis pri disvolvi nian propran limigitan eksvalidiĝintan modulon, esperante ke la kodo skribita en denaska programlingvo solvos niajn problemojn en la plej bona ebla maniero.

Bona ekzemplo por ni estis la tarantool-modulo nomita memcached. La aliro uzata en ĝi baziĝas sur la fakto, ke aparta kampo estas kreita en la spaco, kiu indikas la vivdaŭron de la opo, alivorte, ttl. La modulo en la fono skanas la spacon, komparas la TTL kun la nuna tempo kaj decidas ĉu forigi la opon aŭ ne. La memcached-modulkodo estas simpla kaj eleganta, sed tro ĝenerala. Unue, ĝi ne konsideras la specon de indekso kiu estas rampita kaj forigita. Due, sur ĉiu paŝo ĉiuj opoj estas skanitaj, kies nombro povas esti sufiĉe granda. Kaj se en la eksvalidiĝinta modulo la unua problemo estis solvita (la arbo indekso estis apartigita en apartan klason), tiam la dua ankoraŭ ne ricevis atenton. Ĉi tiuj tri punktoj antaŭdeterminis la elekton favore al skribi mian propran kodon.

Priskribo

La dokumentado por tarantool havas tre bonan lernilo pri kiel skribi viajn konservitajn procedurojn en C. Antaŭ ĉio, mi sugestas vin familiarigi vin kun ĝi por kompreni tiujn enmetojn kun komandoj kaj kodo, kiuj aperos sube. Ankaŭ indas atenti referenco al objektoj disponeblaj kiam vi verkas vian propran ĉapelitan modulon, nome skatolo, fibro, indekso и txn.

Ni komencu de malproksime kaj rigardu, kiel aspektas eksvalidigita modulo de ekstere:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Por simpleco, ni lanĉas tarantool en la dosierujo kie troviĝas nia biblioteko libcapped-expirationd.so. Du funkcioj estas eksportitaj el la biblioteko: komenci kaj mortigi. La unua paŝo estas disponigi ĉi tiujn funkciojn de Lua uzante box.schema.func.create kaj box.schema.user.grant. Poste kreu spacon, kies opoj enhavos nur tri kampojn: la unua estas unika identigilo, la dua estas retpoŝto, kaj la tria estas la vivdaŭro de la opo. Ni konstruas arbindekson sur la unua kampo kaj nomas ĝin primara. Poste ni ricevas la konektan objekton al nia denaska biblioteko.

Post la prepara laboro, rulu la komencan funkcion:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ĉi tiu ekzemplo funkcios dum skanado ĝuste same kiel la eksvalidiĝinta modulo, kiu estas skribita en Lua. La unua argumento al la startfunkcio estas la unika nomo de la tasko. La dua estas la spaca identigilo. La tria estas unika indekso per kiu opoj estos forigitaj. La kvara estas la indekso per kiu la opoj estos trapasitaj. La kvina estas la nombro de la opokampo kun vivdaŭro (numerado komenciĝas de 1, ne 0!). La sesa kaj sepa estas skanaj agordoj. 1024 estas la maksimuma nombro da opoj kiuj povas esti viditaj en ununura transakcio. 3600 — plena skanado en sekundoj.

Notu, ke la ekzemplo uzas la saman indekson por rampi kaj forigi. Se ĉi tio estas arbo-indekso, tiam la trapasado estas farita de la pli malgranda ŝlosilo al la pli granda. Se ekzistas iu alia, ekzemple, hash-indekso, tiam la trairado estas farita, kiel regulo, en hazarda ordo. Ĉiuj spacaj opoj estas skanitaj en unu skanado.

Ni enigu plurajn opoj en la spacon kun vivdaŭro de 60 sekundoj:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Ni kontrolu, ke la enmeto sukcesis:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ni ripetu la elekton post 60+ sekundoj (kalkulante de la komenco de la enmeto de la unua opo) kaj vidu, ke la limigita eksvalidiĝinta modulo jam procesiĝis:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Ni ĉesigu la taskon:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Ni rigardu duan ekzemplon, kie aparta indekso estas uzata por la rampado:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Ĉio ĉi tie estas la sama kiel en la unua ekzemplo, kun kelkaj esceptoj. Ni konstruas arbindekson sur la tria kampo kaj nomas ĝin exp. Ĉi tiu indekso ne devas esti unika, male al la indekso nomata primara. Traversado estos farita per eksp-indekso, kaj forigo per primara. Ni memoras, ke antaŭe ambaŭ estis faritaj nur uzante la primaran indicon.

Post la prepara laboro, ni rulas la startfunkcion kun novaj argumentoj:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Ni enigu plurajn opoj en la spacon denove kun vivdaŭro de 60 sekundoj:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Post 30 sekundoj, analoge, ni aldonos kelkajn pliajn opoj:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Ni kontrolu, ke la enmeto sukcesis:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ni ripetu la elekton post 60+ sekundoj (kalkulante de la komenco de la enmeto de la unua opo) kaj vidu, ke la limigita eksvalidiĝinta modulo jam procesiĝis:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Restas ankoraŭ kelkaj opoj en la spaco, kiuj havos ĉirkaŭ 30 pliajn sekundojn por vivi. Plie, la skanado ĉesis dum moviĝado de opo kun ID de 2 kaj vivdaŭro de 1576421257 al opo kun ID de 3 kaj vivdaŭro de 1576421287. Opoj kun vivdaŭro de 1576421287 aŭ pli ne estis skanitaj pro la mendo de la exp-indeksklavoj. Ĉi tio estas la ŝparado, kiun ni volis atingi ĉe la komenco mem.

Ni ĉesigu la taskon:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

La plej bona maniero rakonti pri ĉiuj trajtoj de projekto estas ĝia originala fonto. kodo! Kadre de la publikigo, ni koncentriĝos nur pri la plej gravaj punktoj, nome, spacaj pretervojaj algoritmoj.

La argumentoj, kiujn ni transdonas al la startmetodo, estas konservitaj en strukturo nomata expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

La nomo atributo estas la nomo de la tasko. La space_id-atributo estas la spaca identigilo. La atributo rm_index_id estas la identigilo de la unika indekso per kiu opoj estos forigitaj. La atributo it_index_id estas la identigilo de la indekso per kiu opoj estos trapasitaj. La atributo it_index_type estas la speco de indekso per kiu opoj estos trapasitaj. La atributo filed_no estas la nombro de la opa kampo kun vivdaŭro. La atributo scan_size estas la maksimuma nombro da opoj kiuj estas skanitaj en unu transakcio. La atributo scan_time estas la plena skanado en sekundoj.

Ni ne konsideros analizajn argumentojn. Ĉi tio estas peniga sed simpla laboro, per kiu la biblioteko helpos vin msgpuck. Malfacilaĵoj povas aperi nur kun indeksoj kiuj estas pasigitaj de Lua kiel kompleksa datumstrukturo kun la mp_map-tipo, kaj ne uzante la simplajn tipojn mp_bool, mp_double, mp_int, mp_uint kaj mp_array. Sed ne necesas analizi la tutan indekson. Vi nur bezonas kontroli ĝian unikecon, kalkuli la tipon kaj ĉerpi la identigilon.

Ni listigas la prototipojn de ĉiuj funkcioj uzataj por analizado:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Nun ni transiru al la plej grava afero - la logiko preterpasi spacon kaj forigi opoj. Ĉiu bloko de opoj ne pli granda ol scan_size estas skanita kaj modifita sub ununura transakcio. Se sukcesa, ĉi tiu transakcio estas farita; se eraro okazas, ĝi estas reigita. La lasta argumento al la funkcio expirationd_iterate estas montrilo al la iteratoro de kiu skanado komenciĝas aŭ daŭras. Ĉi tiu iteratoro estas pliigita interne ĝis eraro okazas, la spaco finiĝas, aŭ ne eblas ĉesigi la procezon anticipe. La funkcio expirationd_expired kontrolas la vivdaŭron de opo, expirationd_delete forigas opon, expirationd_breakable kontrolas ĉu ni bezonas pluiri.

Funkcia kodo Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funkcia kodo expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Funkcia kodo Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funkcia kodo expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Apliko

Vi povas vidi la fontkodon ĉe tie!

fonto: www.habr.com

Aldoni komenton