Saját sapkás lejárt modulunk írása a tarantoolhoz

Saját sapkás lejárt modulunk írása a tarantoolhoz

Nemrég szembesültünk azzal a problémával, hogy a terekben lévő sorokat tisztítsuk meg tarantool. A takarítást nem akkor kellett elkezdeni, amikor a tarantool már kifogyott a memóriából, hanem előre és bizonyos gyakorisággal. Ehhez a feladathoz a tarantool rendelkezik egy Lua nyelven írt modullal lejárat. Miután rövid ideig használtuk ezt a modult, rájöttünk, hogy nem való nekünk: a nagy mennyiségű adat folyamatos tisztítása miatt a Lua a GC-ben lógott. Ezért elgondolkodtunk egy saját capped lejáratú modul kifejlesztésén, remélve, hogy a natív programozási nyelven írt kód a lehető legjobban megoldja problémáinkat.

Jó példa volt számunkra a tarantool modul memcached. A benne alkalmazott megközelítés azon alapul, hogy a térben külön mező jön létre, amely a tuple élettartamát jelzi, más szóval ttl. A háttérben lévő modul átvizsgálja a teret, összehasonlítja a TTL-t az aktuális idővel, és eldönti, hogy törölje-e a leírót vagy sem. A memcached modul kódja egyszerű és elegáns, de túl általános. Először is, nem veszi figyelembe a bejárt és törölt index típusát. Másodszor, minden lépésnél az összes sor beolvasásra kerül, amelyek száma meglehetősen nagy lehet. És ha a lejárt modulban az első probléma megoldódott (a faindex külön osztályba került), akkor a második még mindig nem kapott figyelmet. Ez a három pont előre meghatározta a saját kód írása melletti választást.

Leírás

A tarantool dokumentációja nagyon jó oktatóanyag Mindenekelőtt azt javaslom, hogy ismerkedjen meg vele, hogy megértse azokat a parancsokat és kódokat tartalmazó beszúrásokat, amelyek alább fognak megjelenni. Arra is érdemes odafigyelni referencia azokra az objektumokra, amelyek a saját sapkás modul írásakor elérhetőek, nevezetesen doboz, rost, index и txn.

Kezdjük messziről, és nézzük meg, hogyan néz ki kívülről egy kupakkal lejárt modul:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Az egyszerűség kedvéért elindítjuk a tarantoolt abban a könyvtárban, ahol a libcapped-expirationd.so könyvtárunk található. Két függvény exportálódik a könyvtárból: start és kill. Az első lépés az, hogy ezeket a funkciókat elérhetővé kell tenni a Lua-ból a box.schema.func.create és box.schema.user.grant segítségével. Ezután hozzon létre egy teret, amelynek sorai csak három mezőt tartalmaznak: az első egy egyedi azonosító, a második az e-mail, a harmadik pedig a sor élettartama. Az első mező tetejére építünk egy faindexet, és elsődlegesnek nevezzük. Ezután megkapjuk a kapcsolat objektumot a natív könyvtárunkhoz.

Az előkészítő munka után futtassa a start funkciót:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ez a példa a vizsgálat során pontosan ugyanúgy működik, mint a lejárt modul, amely Lua nyelven íródott. A start függvény első argumentuma a feladat egyedi neve. A második a térazonosító. A harmadik egy egyedi index, amellyel a sorok törlődnek. A negyedik az az index, amellyel a sorokat át kell haladni. Az ötödik az élettartammal rendelkező sor mező száma (a számozás 1-től kezdődik, nem 0-tól!). A hatodik és a hetedik a szkennelési beállítások. 1024 az egyetlen tranzakció során megtekinthető sorok maximális száma. 3600 – teljes pásztázási idő másodpercben.

Vegye figyelembe, hogy a példa ugyanazt az indexet használja a feltérképezéshez és a törléshez. Ha ez egy faindex, akkor a bejárás a kisebb kulcsról a nagyobbra történik. Ha van más, például hash index, akkor a bejárást általában véletlenszerű sorrendben hajtják végre. Az összes térsort egy szkennelésben vizsgálja a rendszer.

Szúrjunk be több sort a térbe 60 másodperces élettartammal:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Ellenőrizzük, hogy a beillesztés sikeres volt-e:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ismételjük meg a kijelölést 60+ másodperc múlva (az első sor beszúrásának kezdetétől számítva), és nézzük meg, hogy a capped lejárt modul már feldolgozta:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Hagyjuk abba a feladatot:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Nézzünk egy második példát, ahol külön indexet használnak a feltérképezéshez:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Itt minden ugyanaz, mint az első példában, néhány kivételtől eltekintve. A harmadik mező tetejére építünk egy faindexet, és exp-nek nevezzük. Ennek az indexnek nem kell egyedinek lennie, ellentétben az elsődlegesnek nevezett indexszel. A bejárást az exp index, a törlést az elsődleges végzi el. Emlékszünk arra, hogy korábban mindkettő csak az elsődleges index használatával történt.

Az előkészítő munka után új argumentumokkal futtatjuk a start függvényt:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Szúrjunk be ismét több sort a térbe 60 másodperces élettartammal:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

30 másodperc elteltével, analógia útján, hozzáadunk még néhány sort:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Ellenőrizzük, hogy a beillesztés sikeres volt-e:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ismételjük meg a kijelölést 60+ másodperc múlva (az első sor beszúrásának kezdetétől számítva), és nézzük meg, hogy a capped lejárt modul már feldolgozta:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Még mindig maradt néhány sor a térben, amelyeknek még körülbelül 30 másodpercük lesz élni. Ezenkívül a keresés leállt, amikor egy 2-es azonosítójú és 1576421257 élettartamú sorról egy 3-as azonosítójú és 1576421287 élettartamú sorra vált. az exp indexkulcsokat. Ez az a megtakarítás, amit a legelején el akartunk érni.

Hagyjuk abba a feladatot:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

A legjobb módja annak, hogy a projekt összes jellemzőjét elmondhassuk, az eredeti forrás. kód! A kiadvány részeként csak a legfontosabb pontokra koncentrálunk, nevezetesen a térkikerülési algoritmusokra.

A start metódusnak átadott argumentumokat egy expirationd_task nevű struktúra tárolja:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

A name attribútum a feladat neve. A space_id attribútum a space azonosító. Az rm_index_id attribútum annak az egyedi indexnek az azonosítója, amellyel a sorok törlődnek. Az it_index_id attribútum annak az indexnek az azonosítója, amellyel a sorok áthaladnak. Az it_index_type attribútum annak az indexnek a típusa, amellyel a sorok áthaladnak. A filed_no attribútum az élettartammal rendelkező sor mező száma. A scan_size attribútum az egy tranzakció során vizsgált sorok maximális száma. A scan_time attribútum a teljes vizsgálati idő másodpercben.

Nem vesszük figyelembe az érvek elemzését. Ez egy fáradságos, de egyszerű munka, amiben a könyvtár segítségére lesz msgpuck. Nehézségek csak azoknál az indexeknél merülhetnek fel, amelyeket a Lua-ból egy összetett adatstruktúraként mp_map típussal adnak át, nem pedig az mp_bool, mp_double, mp_int, mp_uint és mp_array egyszerű típusokat. De nincs szükség a teljes index elemzésére. Csak ellenőriznie kell az egyediségét, ki kell számítania a típust, és ki kell bontania az azonosítót.

Felsoroljuk az elemzéshez használt összes függvény prototípusát:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Most térjünk át a legfontosabb dologra - a szóköz megkerülésének és a sorok törlésének logikájára. A scan_size-nél nem nagyobb sorok minden egyes blokkját egy tranzakció során megvizsgálja és módosítja. Ha sikeres, a tranzakció véglegesítésre kerül, ha hiba történik, akkor visszaállítja. Az expirationd_iterate függvény utolsó argumentuma egy mutató arra az iterátorra, amelytől a keresés kezdődik vagy folytatódik. Ez az iterátor belsőleg növekszik, amíg hiba nem történik, a hely el nem fogy, vagy nem lehet előre leállítani a folyamatot. Az expirationd_expired függvény a sor élettartamát ellenőrzi, az expirationd_delete letörli, az expirationd_breakable pedig azt ellenőrzi, hogy tovább kell-e lépnünk.

Expirationd_iterate függvénykód:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Expirationd_expired függvénykód:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete függvénykód:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Függvénykód expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

App

A forráskódot itt tekintheti meg itt!

Forrás: will.com

Hozzászólás