Shkrimi i modulit tonë të skaduar me kapak për tarantool

Shkrimi i modulit tonë të skaduar me kapak për tarantool

Pak kohë më parë u përballëm me problemin e pastrimit të tupave në hapësira tarantool. Pastrimi duhej të fillonte jo kur tarantool tashmë po i mbaronte memoria, por paraprakisht dhe në një frekuencë të caktuar. Për këtë detyrë, tarantool ka një modul të shkruar në Lua të quajtur skadimit. Pas përdorimit të këtij moduli për një kohë të shkurtër, kuptuam se ai nuk ishte i përshtatshëm për ne: për shkak të pastrimit të vazhdueshëm të sasive të mëdha të të dhënave, Lua u var në GC. Prandaj, ne menduam të zhvillonim modulin tonë të skaduar, duke shpresuar që kodi i shkruar në një gjuhë programimi amtare do të zgjidhte problemet tona në mënyrën më të mirë të mundshme.

Një shembull i mirë për ne ishte moduli tarantool i quajtur e memorizuar. Qasja e përdorur në të bazohet në faktin se në hapësirë ​​krijohet një fushë e veçantë, e cila tregon jetëgjatësinë e tuples, me fjalë të tjera, ttl. Moduli në sfond skanon hapësirën, krahason TTL me kohën aktuale dhe vendos nëse do të fshihet tupleja apo jo. Kodi i modulit memcached është i thjeshtë dhe elegant, por shumë i përgjithshëm. Së pari, nuk merr parasysh llojin e indeksit që po zvarritet dhe fshihet. Së dyti, në çdo kalim skanohen të gjitha tufat, numri i të cilave mund të jetë mjaft i madh. Dhe nëse në modulin e skaduar problemi i parë u zgjidh (indeksi i pemës u nda në një klasë të veçantë), atëherë i dyti ende nuk mori ndonjë vëmendje. Këto tre pika paracaktuan zgjedhjen në favor të shkrimit të kodit tim.

Përshkrim

Dokumentacioni për tarantool ka një shumë të mirë tutorial se si të shkruani procedurat tuaja të ruajtura në C. Së pari, ju sugjeroj të njiheni me të në mënyrë që të kuptoni ato inserte me komanda dhe kode që do të shfaqen më poshtë. Vlen gjithashtu t'i kushtohet vëmendje referencë për objektet që janë të disponueshme kur shkruani modulin tuaj me kapak, domethënë kuti, fibër, indeks и txn.

Le të fillojmë nga larg dhe të shohim se si duket nga jashtë një modul i skaduar i kufizuar:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Për thjeshtësi, ne hapim tarantool në drejtorinë ku ndodhet biblioteka jonë libcapped-expirationd.so. Dy funksione eksportohen nga biblioteka: start dhe kill. Hapi i parë është t'i vini këto funksione të disponueshme nga Lua duke përdorur box.schema.func.create dhe box.schema.user.grant. Më pas krijoni një hapësirë, tuplet e së cilës do të përmbajnë vetëm tre fusha: e para është një identifikues unik, e dyta është email dhe e treta është jetëgjatësia e tuples. Ne ndërtojmë një indeks peme në krye të fushës së parë dhe e quajmë atë primare. Më pas marrim objektin e lidhjes me bibliotekën tonë amtare.

Pas punës përgatitore, ekzekutoni funksionin e fillimit:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ky shembull do të funksionojë gjatë skanimit saktësisht njësoj si moduli i skaduar, i cili është shkruar në Lua. Argumenti i parë për funksionin e fillimit është emri unik i detyrës. E dyta është identifikuesi i hapësirës. E treta është një indeks unik me të cilin do të fshihen tupat. E katërta është indeksi me të cilin do të përshkohen tuplet. E pesta është numri i fushës tuple me jetëgjatësi (numërimi fillon nga 1, jo nga 0!). E gjashta dhe e shtata janë cilësimet e skanimit. 1024 është numri maksimal i tupleve që mund të shikohen në një transaksion të vetëm. 3600 — koha e plotë e skanimit në sekonda.

Vini re se shembulli përdor të njëjtin indeks për zvarritje dhe fshirje. Nëse ky është një indeks peme, atëherë kalimi kryhet nga çelësi më i vogël në atë më të madh. Nëse ka ndonjë tjetër, për shembull, indeks hash, atëherë kalimi kryhet, si rregull, në mënyrë të rastësishme. Të gjitha tuplet e hapësirës skanohen në një skanim.

Le të fusim disa tupa në hapësirë ​​me jetëgjatësi prej 60 sekondash:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Le të kontrollojmë nëse futja ishte e suksesshme:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Le të përsërisim përzgjedhjen pas 60+ sekondash (duke llogaritur nga fillimi i futjes së tuples së parë) dhe të shohim që moduli i skaduar i kapur tashmë është përpunuar:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Le të ndalojmë detyrën:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Le të shohim një shembull të dytë ku përdoret një indeks i veçantë për zvarritjen:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Gjithçka këtu është e njëjtë si në shembullin e parë, me disa përjashtime. Ne ndërtojmë një indeks peme në krye të fushës së tretë dhe e quajmë atë exp. Ky indeks nuk duhet të jetë unik, ndryshe nga indeksi i quajtur primar. Kalimi do të kryhet sipas indeksit exp, dhe fshirja nga primar. Kujtojmë se më parë të dyja bëheshin vetëm duke përdorur indeksin primar.

Pas punës përgatitore, ne ekzekutojmë funksionin e fillimit me argumente të reja:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Le të fusim disa tuple në hapësirë ​​përsëri me një jetëgjatësi prej 60 sekondash:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Pas 30 sekondash, për analogji, do të shtojmë disa tuple të tjerë:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Le të kontrollojmë nëse futja ishte e suksesshme:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Le të përsërisim përzgjedhjen pas 60+ sekondash (duke llogaritur nga fillimi i futjes së tuples së parë) dhe të shohim që moduli i skaduar i kapur tashmë është përpunuar:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Kanë mbetur ende disa tupa në hapësirë ​​që do të kenë edhe rreth 30 sekonda për të jetuar. Për më tepër, skanimi u ndal kur lëviz nga një tuple me ID 2 dhe jetëgjatësi 1576421257 në një tuple me ID 3 dhe jetëgjatësi 1576421287. Tuplet me jetëgjatësi 1576421287 ose më shumë nuk u skanuan për shkak të renditjes së çelësat e indeksit exp. Këto janë kursimet që kemi dashur të arrijmë që në fillim.

Le të ndalojmë detyrën:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Zbatimi

Mënyra më e mirë për të treguar për të gjitha tiparet e një projekti është burimi i tij origjinal. kod! Si pjesë e botimit, ne do të përqendrohemi vetëm në pikat më të rëndësishme, përkatësisht, algoritmet e anashkalimit të hapësirës.

Argumentet që kalojmë në metodën e fillimit ruhen në një strukturë të quajtur expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atributi i emrit është emri i detyrës. Atributi space_id është identifikuesi i hapësirës. Atributi rm_index_id është identifikuesi i indeksit unik me të cilin do të fshihen tuples. Atributi it_index_id është identifikuesi i indeksit me të cilin do të përshkohen tuples. Atributi it_index_type është lloji i indeksit me të cilin do të përshkohen tuples. Atributi filed_no është numri i fushës tuple me jetëgjatësi. Atributi scan_size është numri maksimal i tupleve që skanohen në një transaksion. Atributi scan_time është koha e plotë e skanimit në sekonda.

Ne nuk do të shqyrtojmë analizën e argumenteve. Kjo është një punë e mundimshme, por e thjeshtë, me të cilën biblioteka do t'ju ndihmojë msgpuck. Vështirësitë mund të lindin vetëm me indekset që kalojnë nga Lua si një strukturë komplekse të dhënash me llojin mp_map, dhe jo duke përdorur llojet e thjeshta mp_bool, mp_double, mp_int, mp_uint dhe mp_array. Por nuk ka nevojë të analizohet i gjithë indeksi. Thjesht duhet të kontrolloni veçantinë e tij, të llogarisni llojin dhe të nxirrni identifikuesin.

Ne listojmë prototipet e të gjitha funksioneve që përdoren për analizë:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Tani le të kalojmë te gjëja më e rëndësishme - logjika e anashkalimit të hapësirës dhe fshirjes së tupave. Çdo bllok tuplesh jo më i madh se scan_size skanohet dhe modifikohet sipas një transaksioni të vetëm. Nëse është i suksesshëm, ky transaksion kryhet; nëse ndodh gabim, ai rikthehet. Argumenti i fundit për funksionin expirationd_iterate është një tregues drejt përsëritësit nga i cili fillon ose vazhdon skanimi. Ky përsëritës rritet nga brenda derisa të ndodhë një gabim, hapësira të mbarojë ose të mos jetë e mundur të ndalet procesi paraprakisht. Funksioni expirationd_expired kontrollon jetëgjatësinë e një tupleje, expirationd_delete fshin një tuple, expirationd_breakable kontrollon nëse duhet të vazhdojmë përpara.

Kodi i funksionit Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kodi i funksionit expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Kodi i funksionit Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Kodi i funksionit të skaduar_të thyhet:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplikim

Mund ta shikoni kodin burim në këtu!

Burimi: www.habr.com

Shto një koment