Písanie vlastného obmedzeného modulu s vypršanou platnosťou pre tarantool

Písanie vlastného obmedzeného modulu s vypršanou platnosťou pre tarantool

Pred časom sme sa stretli s problémom čistenia n-tic v priestoroch tarantool. Čistenie sa muselo začať nie vtedy, keď už tarantoolu dochádzala pamäť, ale v predstihu a v určitej frekvencii. Pre túto úlohu má tarantool modul napísaný v jazyku Lua s názvom expirácia. Po krátkom používaní tohto modulu sme si uvedomili, že pre nás nie je vhodný: kvôli neustálemu upratovaniu veľkého množstva dát Lua visel v GC. Preto sme uvažovali o vývoji vlastného modulu s obmedzenou platnosťou, dúfajúc, že ​​kód napísaný v natívnom programovacom jazyku vyrieši naše problémy tým najlepším možným spôsobom.

Dobrým príkladom pre nás bol modul tarantool tzv memcached. V nej použitý prístup je založený na tom, že v priestore je vytvorené samostatné pole, ktoré udáva životnosť n-tice, inými slovami ttl. Modul na pozadí prehľadá priestor, porovná TTL s aktuálnym časom a rozhodne, či n-ticu vymaže alebo nie. Kód modulu memcached je jednoduchý a elegantný, ale príliš všeobecný. Po prvé, neberie do úvahy typ indexu, ktorý sa prehľadáva a odstraňuje. Po druhé, pri každom prechode sa skenujú všetky n-tice, ktorých počet môže byť dosť veľký. A ak sa v module so skončenou platnosťou vyriešil prvý problém (index stromu bol rozdelený do samostatnej triedy), druhému sa stále nevenovala žiadna pozornosť. Tieto tri body predurčili voľbu v prospech písania vlastného kódu.

Popis

Dokumentácia pre tarantool je veľmi dobrá tutoriál o tom, ako napísať uložené procedúry v C. V prvom rade vám navrhujem, aby ste sa s tým oboznámili, aby ste pochopili tie vložky s príkazmi a kódom, ktoré sa objavia nižšie. Tiež stojí za pozornosť odkaz na objekty, ktoré sú dostupné pri písaní vlastného obmedzeného modulu, a to box, vlákno, index и TXN.

Začnime z diaľky a pozrime sa, ako zvonku vyzerá modul s ukončenou platnosťou:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Pre jednoduchosť spustíme tarantool v adresári, kde sa nachádza naša knižnica libcapped-expirationd.so. Z knižnice sú exportované dve funkcie: štart a kill. Prvým krokom je sprístupniť tieto funkcie z Lua pomocou box.schema.func.create a box.schema.user.grant. Potom vytvorte priestor, ktorého n-tice budú obsahovať iba tri polia: prvé je jedinečný identifikátor, druhé je e-mail a tretie je životnosť n-tice. Postavíme stromový index na prvé pole a nazývame ho primárne. Ďalej dostaneme objekt pripojenia k našej natívnej knižnici.

Po prípravných prácach spustite funkciu štart:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Tento príklad bude fungovať počas skenovania presne tak isto ako modul s exspiráciou, ktorý je napísaný v jazyku Lua. Prvým argumentom funkcie štart je jedinečný názov úlohy. Druhým je identifikátor priestoru. Tretím je jedinečný index, pomocou ktorého budú n-tice odstránené. Štvrtý je index, ktorým sa budú prechádzať n-tice. Piate je číslo n-ticového poľa so životnosťou (číslovanie začína od 1, nie od 0!). Šiesty a siedmy sú nastavenia skenovania. 1024 je maximálny počet n-tic, ktoré je možné zobraziť v jednej transakcii. 3600 — úplný čas skenovania v sekundách.

Všimnite si, že príklad používa rovnaký index na prehľadávanie a odstraňovanie. Ak ide o index stromu, potom sa prechod vykoná od menšieho kľúča k väčšiemu. Ak existuje nejaký iný, napríklad hash index, potom sa prechod spravidla vykonáva v náhodnom poradí. Všetky priestorové n-tice sa skenujú pri jednom skenovaní.

Vložme do priestoru niekoľko n-tic so životnosťou 60 sekúnd:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Skontrolujte, či bolo vloženie úspešné:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Zopakujme výber po 60+ sekundách (počítajúc od začiatku vloženia prvej n-tice) a uvidíme, že obmedzený modul s exspiráciou už spracoval:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Zastavme úlohu:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Pozrime sa na druhý príklad, kde sa na indexové prehľadávanie používa samostatný index:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Všetko je tu rovnaké ako v prvom príklade, až na pár výnimiek. Na tretie pole vytvoríme stromový index a nazveme ho exp. Tento index nemusí byť jedinečný, na rozdiel od indexu nazývaného primárny. Prechod bude vykonaný indexom exp a vymazanie primárnym. Pamätáme si, že predtým sa obe robili iba pomocou primárneho indexu.

Po prípravných prácach spustíme funkciu štart s novými argumentmi:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Vložme do priestoru opäť niekoľko n-tic so životnosťou 60 sekúnd:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Po 30 sekundách, analogicky, pridáme niekoľko ďalších n-tic:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Skontrolujte, či bolo vloženie úspešné:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Zopakujme výber po 60+ sekundách (počítajúc od začiatku vloženia prvej n-tice) a uvidíme, že obmedzený modul s exspiráciou už spracoval:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

V priestore ešte zostali nejaké n-tice, ktoré budú mať ešte asi 30 sekúnd života. Okrem toho sa skenovanie zastavilo pri prechode z n-tice s ID 2 a životnosťou 1576421257 na n-ticu s ID 3 a životnosťou 1576421287. N-tice so životnosťou 1576421287 alebo viac neboli naskenované z dôvodu objednania indexové kľúče exp. To sú úspory, ktoré sme chceli dosiahnuť hneď na začiatku.

Zastavme úlohu:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Najlepší spôsob, ako povedať o všetkých funkciách projektu, je jeho pôvodný zdroj. kód! V rámci publikácie sa zameriame len na najdôležitejšie body, a to na algoritmy obchádzania priestoru.

Argumenty, ktoré odovzdávame metóde štart, sú uložené v štruktúre s názvom expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atribút name je názov úlohy. Atribút space_id je identifikátor priestoru. Atribút rm_index_id je identifikátor jedinečného indexu, pomocou ktorého budú n-tice vymazané. Atribút it_index_id je identifikátor indexu, ktorým sa budú prechádzať n-tice. Atribút it_index_type je typ indexu, ktorým sa budú prechádzať n-tice. Atribút filed_no je číslo poľa n-tice so životnosťou. Atribút scan_size je maximálny počet n-tic, ktoré sú naskenované v jednej transakcii. Atribút scan_time je celkový čas skenovania v sekundách.

Nebudeme uvažovať o analýze argumentov. Ide o namáhavú, no jednoduchú prácu, s ktorou vám pomôže knižnica msgpuck. Ťažkosti môžu nastať len pri indexoch, ktoré sa prenášajú z Lua ako komplexná dátová štruktúra s typom mp_map a nepoužívajú sa jednoduché typy mp_bool, mp_double, mp_int, mp_uint a mp_array. Nie je však potrebné analyzovať celý index. Stačí skontrolovať jeho jedinečnosť, vypočítať typ a extrahovať identifikátor.

Uvádzame prototypy všetkých funkcií, ktoré sa používajú na analýzu:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Teraz prejdime k tomu najdôležitejšiemu – k logike obchádzania priestoru a mazania n-tic. Každý blok n-tic nie väčší ako scan_size je naskenovaný a upravený v rámci jednej transakcie. Ak je transakcia úspešná, transakcia je potvrdená, ak sa vyskytne chyba, je vrátená späť. Posledný argument funkcie expirationd_iterate je ukazovateľ na iterátor, od ktorého skenovanie začína alebo pokračuje. Tento iterátor sa interne inkrementuje, kým sa nevyskytne chyba, neminie miesto alebo nie je možné zastaviť proces vopred. Funkcia expirationd_expired kontroluje životnosť n-tice, expirationd_delete maže n-ticu, expirationd_breakable kontroluje, či musíme ísť ďalej.

Kód funkcie Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kód funkcie expired_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Kód funkcie expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funkčný kód expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplikácia

Zdrojový kód si môžete pozrieť na tu!

Zdroj: hab.com

Pridať komentár