Rašome savo „tarantool“ modulį, kurio galiojimo laikas pasibaigęs

Rašome savo „tarantool“ modulį, kurio galiojimo laikas pasibaigęs

Prieš kurį laiką susidūrėme su kortelių valymo erdvėse problema tarantolis. Valymą reikėjo pradėti ne tada, kai tarantool jau baigdavosi atminties, o iš anksto ir tam tikru dažnumu. Šiai užduočiai tarantool turi modulį, parašytą Lua kalba galiojimo laikas. Panaudoję šį modulį neilgai supratome, kad jis mums netinka: dėl nuolatinio didelio duomenų kiekio valymo Lua pakibo GC. Todėl galvojome sukurti savo riboto galiojimo modulį, tikėdamiesi, kad gimtąja programavimo kalba parašytas kodas geriausiai išspręs mūsų problemas.

Geras pavyzdys mums buvo tarantool modulis vadinamas memcached. Jame naudojamas metodas grindžiamas tuo, kad erdvėje sukuriamas atskiras laukas, nurodantis kortelės gyvavimo trukmę, kitaip tariant, ttl. Fone esantis modulis nuskaito erdvę, lygina TTL su esamu laiku ir nusprendžia, ar ištrinti seką, ar ne. Atmintyje saugomo modulio kodas yra paprastas ir elegantiškas, bet pernelyg bendras. Pirma, neatsižvelgiama į indekso tipą, kuris tikrinamas ir ištrinamas. Antra, kiekviename žingsnyje nuskaitomos visos eilutės, kurių skaičius gali būti gana didelis. Ir jei pasibaigusiame modulyje pirmoji problema buvo išspręsta (medžio indeksas buvo išskirtas į atskirą klasę), tai antroji vis tiek nesulaukė jokio dėmesio. Šie trys punktai iš anksto nulėmė pasirinkimą rašyti savo kodą.

aprašymas

Tarantolio dokumentacija yra labai gera pamoka apie tai, kaip įrašyti išsaugotas procedūras C. Pirmiausia siūlau susipažinti su ja, kad suprastumėte tuos intarpus su komandomis ir kodu, kurie pasirodys žemiau. Taip pat verta atkreipti dėmesį nuoroda objektams, kurie yra prieinami rašant savo ribotą modulį, būtent dėžė, pluoštas, rodyklė и txn.

Pradėkime nuo tolo ir pažiūrėkime, kaip iš išorės atrodo pasibaigęs modulis, kurio galiojimo laikas baigėsi:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Kad būtų paprasčiau, mes paleidžiame tarantool kataloge, kuriame yra mūsų libcapped-expirationd.so biblioteka. Iš bibliotekos eksportuojamos dvi funkcijos: paleisti ir nužudyti. Pirmas žingsnis yra padaryti šias funkcijas pasiekiamas iš Lua naudojant box.schema.func.create ir box.schema.user.grant. Tada sukurkite erdvę, kurios eilutėse bus tik trys laukai: pirmasis yra unikalus identifikatorius, antrasis yra el. Pirmojo lauko viršuje sukuriame medžio indeksą ir vadiname jį pirminiu. Tada gauname ryšio objektą su savo gimtąja biblioteka.

Baigę parengiamąjį darbą, paleiskite paleidimo funkciją:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Šis pavyzdys nuskaitymo metu veiks lygiai taip pat, kaip pasibaigęs modulis, parašytas Lua kalba. Pirmasis pradžios funkcijos argumentas yra unikalus užduoties pavadinimas. Antrasis yra erdvės identifikatorius. Trečiasis yra unikalus indeksas, pagal kurį bus ištrintos eilutės. Ketvirtasis yra indeksas, pagal kurį bus kertamos kortelės. Penktasis yra eilutės lauko skaičius su galiojimo laiku (numeracija prasideda nuo 1, o ne nuo 0!). Šeštas ir septintas yra nuskaitymo nustatymai. 1024 yra maksimalus kortelių, kurias galima peržiūrėti per vieną operaciją, skaičius. 3600 – visas nuskaitymo laikas sekundėmis.

Atminkite, kad pavyzdyje tikrinant ir trinant naudojamas tas pats indeksas. Jei tai yra medžio indeksas, tada perėjimas atliekamas nuo mažesnio rakto iki didesnio. Jei yra koks nors kitas, pavyzdžiui, maišos indeksas, tada perėjimas paprastai atliekamas atsitiktine tvarka. Visos erdvės eilutės nuskaitomos vienu nuskaitymu.

Įterpkime į erdvę kelias eilutes, kurių tarnavimo laikas yra 60 sekundžių:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Patikrinkime, ar įterpimas buvo sėkmingas:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Pakartokime pasirinkimą po 60+ sekundžių (skaičiuojant nuo pirmosios eilutės įterpimo pradžios) ir pažiūrėkime, ar modulis, kurio galiojimo laikas baigėsi, jau apdorotas:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Sustabdome užduotį:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Pažvelkime į antrą pavyzdį, kai tikrinimui naudojamas atskiras indeksas:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Čia viskas taip pat, kaip ir pirmame pavyzdyje, su keliomis išimtimis. Trečiojo lauko viršuje sukuriame medžio indeksą ir vadiname jį exp. Šis indeksas neturi būti unikalus, skirtingai nei indeksas, vadinamas pirminiu. Perėjimas bus atliktas pagal exp indeksą, o ištrynimas pagal pirminį. Prisimename, kad anksčiau abu buvo daromi tik naudojant pirminį indeksą.

Po parengiamojo darbo paleidžiame paleidimo funkciją su naujais argumentais:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Vėl įterpkime į erdvę kelias eilutes, kurių tarnavimo laikas yra 60 sekundžių:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Po 30 sekundžių pagal analogiją pridėsime dar keletą eilučių:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Patikrinkime, ar įterpimas buvo sėkmingas:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Pakartokime pasirinkimą po 60+ sekundžių (skaičiuojant nuo pirmosios eilutės įterpimo pradžios) ir pažiūrėkime, ar modulis, kurio galiojimo laikas baigėsi, jau apdorotas:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Erdvėje vis dar liko keletas kortelių, kurioms teks gyventi dar apie 30 sekundžių. Be to, nuskaitymas sustojo, kai iš eilės, kurios ID yra 2 ir 1576421257, į eilutę, kurios ID yra 3, o eksploatavimo trukmė 1576421287. Kortelės, kurių tarnavimo laikas yra 1576421287 ar daugiau, nebuvo nuskaitytos dėl užsakymo exp indekso klavišus. Tai yra tas sutaupymas, kurį norėjome sutaupyti pačioje pradžioje.

Sustabdome užduotį:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Vykdymas

Geriausias būdas papasakoti apie visas projekto ypatybes yra jo pirminis šaltinis. kodas! Kaip dalį leidinio, mes sutelksime dėmesį tik į svarbiausius dalykus, būtent į erdvės apėjimo algoritmus.

Argumentai, kuriuos perduodame pradžios metodui, yra saugomi struktūroje, vadinamoje expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Pavadinimo atributas yra užduoties pavadinimas. Atributas space_id yra erdvės identifikatorius. Atributas rm_index_id yra unikalaus indekso, pagal kurį bus ištrintos eilutės, identifikatorius. Atributas it_index_id yra indekso identifikatorius, pagal kurį bus einamos eilutės. Atributas it_index_type yra indekso tipas, kuriuo bus einamos eilutės. Atributas filed_no yra eilutės lauko su galiojimo laiku numeris. Atributas scan_size yra maksimalus kortelių, kurios nuskaitomos per vieną operaciją, skaičius. Atributas scan_time yra visas nuskaitymo laikas sekundėmis.

Mes nenagrinėsime argumentų. Tai kruopštus, bet paprastas darbas, kurį atlikti jums padės biblioteka msgpuck. Sunkumai gali kilti tik su indeksais, kurie perduodami iš Lua kaip sudėtinga duomenų struktūra su mp_map tipu, o ne naudojant paprastus tipus mp_bool, mp_double, mp_int, mp_uint ir mp_array. Tačiau nereikia analizuoti viso indekso. Jums tereikia patikrinti jo unikalumą, apskaičiuoti tipą ir išgauti identifikatorių.

Pateikiame visų analizei naudojamų funkcijų prototipus:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Dabar pereikime prie svarbiausio dalyko – tarpo apėjimo ir kortelių ištrynimo logikos. Kiekvienas kortelių blokas, ne didesnis nei scan_size, yra nuskaitomas ir modifikuojamas atliekant vieną operaciją. Jei sėkminga, ši operacija atliekama; jei įvyksta klaida, ji atšaukiama. Paskutinis funkcijos expirationd_iterate argumentas yra rodyklė į iteratorių, nuo kurio pradedamas arba tęsiamas nuskaitymas. Šis iteratorius padidinamas viduje, kol įvyksta klaida, baigiasi vieta arba neįmanoma iš anksto sustabdyti proceso. Funkcija expirationd_expired tikrina sektos gyvavimo trukmę, expirationd_delete ištrina seką, expirationd_breakable patikrina, ar mums reikia judėti toliau.

Expirationd_iterate funkcijos kodas:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funkcijos kodas expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete funkcijos kodas:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funkcijos kodas expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Taikymas

Šaltinio kodą galite peržiūrėti adresu čia!

Šaltinis: www.habr.com

Добавить комментарий