Pisanje lastnega omejenega modula expirationd za tarantool

Pisanje lastnega omejenega modula expirationd za tarantool

Pred časom smo se soočili s problemom čiščenja tuplev v prostorih tarantool. Čiščenje je bilo treba začeti ne takrat, ko je tarantoolu že zmanjkovalo pomnilnika, ampak vnaprej in z določeno frekvenco. Za to nalogo ima tarantool modul, napisan v Lui, imenovan iztek. Po krajši uporabi tega modula smo ugotovili, da ni primeren za nas: zaradi nenehnega čiščenja velikih količin podatkov je Lua visela v GC. Zato smo razmišljali o razvoju lastnega modula capped expirationd, v upanju, da bo koda, napisana v domačem programskem jeziku, rešila naše težave na najboljši možni način.

Dober primer za nas je bil modul tarantool, imenovan memcached. Pristop, uporabljen v njem, temelji na dejstvu, da se v prostoru ustvari ločeno polje, ki označuje življenjsko dobo tuple, z drugimi besedami, ttl. Modul v ozadju skenira prostor, primerja TTL s trenutnim časom in se odloči, ali naj izbriše tuple ali ne. Koda modula memcached je preprosta in elegantna, a preveč splošna. Prvič, ne upošteva vrste indeksa, ki se pajka in briše. Drugič, pri vsakem prehodu se pregledajo vse tuple, katerih število je lahko precej veliko. In če je bila v modulu expirationd prva težava rešena (drevesni indeks je bil ločen v ločen razred), potem drugi še vedno ni bil deležen nobene pozornosti. Te tri točke so vnaprej določile izbiro v korist pisanja lastne kode.

Opis

Dokumentacija za tarantool je zelo dobra vadnica o tem, kako napisati svoje shranjene procedure v C. Najprej predlagam, da se seznanite s tem, da boste razumeli tiste vstavke z ukazi in kodo, ki bodo prikazani spodaj. Prav tako je vredno pozornosti referenca na objekte, ki so na voljo pri pisanju lastnega omejenega modula, namreč škatla, Vlakno, Indeks и txn.

Začnimo od daleč in poglejmo, kako je modul z omejeno veljavnostjo videti od zunaj:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Zaradi poenostavitve zaženemo tarantool v imeniku, kjer se nahaja naša knjižnica libcapped-expirationd.so. Iz knjižnice sta izvoženi dve funkciji: start in kill. Prvi korak je omogočiti te funkcije na voljo iz Lua z uporabo box.schema.func.create in box.schema.user.grant. Nato ustvarite prostor, katerega tuple bodo vsebovale samo tri polja: prvo je enolični identifikator, drugo je e-pošta in tretje je življenjska doba tuple. Na vrhu prvega polja zgradimo drevesni indeks in ga imenujemo primarni. Nato dobimo objekt povezave z našo izvorno knjižnico.

Po pripravljalnem delu zaženite zagonsko funkcijo:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ta primer bo med skeniranjem deloval popolnoma enako kot modul expirationd, ki je napisan v Lua. Prvi argument začetne funkcije je edinstveno ime opravila. Drugi je identifikator prostora. Tretji je unikatni indeks, s katerim bodo tuple izbrisane. Četrti je indeks, s katerim bodo prečkane tuple. Peto je številka polja tuple z življenjsko dobo (številčenje se začne z 1, ne z 0!). Šesti in sedmi sta nastavitvi skeniranja. 1024 je največje število tupl, ki si jih je mogoče ogledati v eni transakciji. 3600 — čas celotnega skeniranja v sekundah.

Upoštevajte, da primer uporablja isti indeks za pajkanje in brisanje. Če je to drevesni indeks, se prečkanje izvede od manjšega ključa do večjega. Če obstaja kakšen drug, na primer hash indeks, se prečkanje praviloma izvede v naključnem vrstnem redu. Vse prostorske tuple so skenirane v enem skeniranju.

V prostor vstavimo več tuplev z življenjsko dobo 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Preverimo, ali je bila vstavitev uspešna:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ponovimo izbiro po 60+ sekundah (šteto od začetka vstavitve prve tuple) in vidimo, da je modul capped expirationd že obdelan:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Ustavimo nalogo:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Oglejmo si drugi primer, kjer se za pajkanje uporablja ločen indeks:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Tukaj je vse enako kot v prvem primeru, z nekaj izjemami. Na vrhu tretjega polja zgradimo drevesni indeks in ga imenujemo exp. Ni nujno, da je ta indeks edinstven, za razliko od indeksa, imenovanega primarni. Prehod bo izvedel indeks exp, brisanje pa primarni. Spomnimo se, da je bilo prej oboje narejeno samo z uporabo primarnega indeksa.

Po pripravljalnem delu zaženemo funkcijo start z novimi argumenti:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Ponovno vstavimo več tuplev v prostor z življenjsko dobo 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Po 30 sekundah bomo po analogiji dodali še nekaj tuplev:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Preverimo, ali je bila vstavitev uspešna:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ponovimo izbiro po 60+ sekundah (šteto od začetka vstavitve prve tuple) in vidimo, da je modul capped expirationd že obdelan:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

V prostoru je še nekaj tulp, ki bodo imele še približno 30 sekund življenja. Poleg tega se je skeniranje ustavilo pri premiku iz tuple z ID-jem 2 in življenjsko dobo 1576421257 v tulp z ID-jem 3 in življenjsko dobo 1576421287. Tuple z življenjsko dobo 1576421287 ali več niso bile pregledane zaradi vrstnega reda ključi indeksa exp. To so prihranki, ki smo jih želeli doseči na samem začetku.

Ustavimo nalogo:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Najboljši način za povedati o vseh značilnostih projekta je njegov izvirni vir. koda! V okviru publikacije se bomo osredotočili le na najpomembnejše točke, in sicer algoritme obvoda prostora.

Argumenti, ki jih posredujemo začetni metodi, so shranjeni v strukturi, imenovani expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atribut imena je ime opravila. Atribut space_id je identifikator prostora. Atribut rm_index_id je identifikator edinstvenega indeksa, s katerim bodo tuple izbrisane. Atribut it_index_id je identifikator indeksa, s katerim bodo prečkane tuple. Atribut it_index_type je vrsta indeksa, s katerim bodo prečkane tuple. Atribut filed_no je številka polja tuple z življenjsko dobo. Atribut scan_size je največje število tulp, ki se pregledajo v eni transakciji. Atribut scan_time je celoten čas skeniranja v sekundah.

Argumentov za razčlenjevanje ne bomo upoštevali. To je mukotrpno, a preprosto delo, pri katerem vam bo pomagala knjižnica msgpuck. Težave lahko nastanejo samo pri indeksih, ki so posredovani iz Lua kot zapletena podatkovna struktura s tipom mp_map in ne uporabljajo preprostih tipov mp_bool, mp_double, mp_int, mp_uint in mp_array. Ni pa treba razčleniti celotnega indeksa. Samo preveriti morate njegovo edinstvenost, izračunati vrsto in izluščiti identifikator.

Navajamo prototipe vseh funkcij, ki se uporabljajo za razčlenjevanje:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Zdaj pa preidimo na najpomembnejšo stvar - logiko obhoda prostora in brisanja tuplev. Vsak blok tork, ki ni večji od scan_size, se pregleda in spremeni v eni transakciji. Če je uspešna, je ta transakcija odobrena; če pride do napake, se povrne. Zadnji argument funkcije expirationd_iterate je kazalec na iterator, od katerega se skeniranje začne ali nadaljuje. Ta iterator se interno povečuje, dokler ne pride do napake, zmanjka prostora ali postopka ni mogoče ustaviti vnaprej. Funkcija expirationd_expired preveri življenjsko dobo tuple, expirationd_delete izbriše tuple, expirationd_breakable preveri, ali moramo iti naprej.

Koda funkcije Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funkcijska koda expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Koda funkcije Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funkcijska koda expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Vloga

Izvorno kodo si lahko ogledate na tukaj!

Vir: www.habr.com

Dodaj komentar