Pisanje vlastitog ograničenog modula za tarantool

Pisanje vlastitog ograničenog modula za tarantool

Prije nekog vremena suočili smo se s problemom čišćenja torki u razmacima tarantool. Čišćenje se nije moralo započeti kada je tarantoolu već ponestalo memorije, već unaprijed i određenom učestalošću. Za ovaj zadatak, tarantool ima modul napisan u Lua jeziku pod nazivom isteka roka trajanja. Nakon što smo kratko vrijeme koristili ovaj modul, shvatili smo da nam ne odgovara: zbog stalnog čišćenja velikih količina podataka Lua je visila u GC-u. Stoga smo razmišljali o razvoju vlastitog modula s ograničenim rokom trajanja, nadajući se da će kod napisan u izvornom programskom jeziku riješiti naše probleme na najbolji mogući način.

Dobar primjer za nas je tarantool modul tzv Memcached. Pristup koji se u njemu koristi temelji se na činjenici da se u prostoru stvara zasebno polje koje označava životni vijek torke, drugim riječima ttl. Modul u pozadini skenira prostor, uspoređuje TTL s trenutnim vremenom i odlučuje hoće li izbrisati torku ili ne. Kod modula memcached je jednostavan i elegantan, ali previše generičan. Prvo, ne uzima u obzir vrstu indeksa koji se indeksira i briše. Drugo, pri svakom prolazu skeniraju se sve torke, čiji broj može biti prilično velik. I ako je u expirationd modulu prvi problem riješen (indeks stabla je odvojen u zasebnu klasu), onda drugi još uvijek nije dobio nikakvu pozornost. Ove tri točke predodredile su izbor u korist pisanja vlastitog koda.

Opis

Dokumentacija za tarantool je vrlo dobra tutorial o tome kako napisati svoje pohranjene procedure u C. Prije svega, predlažem da se upoznate s tim kako biste razumjeli one umetke s naredbama i kodom koji će se pojaviti ispod. Također je vrijedno pažnje referenca na objekte koji su dostupni kada pišete svoj ograničeni modul, naime kutija, vlakno, indeks и txn.

Krenimo izdaleka i pogledajmo kako modul s ograničenim rokom valjanosti izgleda izvana:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Radi jednostavnosti, pokrećemo tarantool u direktoriju gdje se nalazi naša biblioteka libcapped-expirationd.so. Iz knjižnice se izvoze dvije funkcije: start i kill. Prvi korak je učiniti ove funkcije dostupnima iz Lua koristeći box.schema.func.create i box.schema.user.grant. Zatim kreirajte prostor čije će torke sadržavati samo tri polja: prvo je jedinstveni identifikator, drugo je email, a treće je životni vijek torke. Gradimo indeks stabla na vrhu prvog polja i nazivamo ga primarnim. Zatim dobivamo objekt povezivanja s našom izvornom bibliotekom.

Nakon pripremnih radova pokrenite funkciju pokretanja:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ovaj će primjer raditi tijekom skeniranja potpuno isto kao modul expirationd, koji je napisan u Lua. Prvi argument funkciji start je jedinstveni naziv zadatka. Drugi je identifikator prostora. Treći je jedinstveni indeks kojim će se brisati torke. Četvrti je indeks pomoću kojeg će se obići torke. Peti je broj polja tuple s vijekom trajanja (numeriranje počinje od 1, ne od 0!). Šesta i sedma su postavke skeniranja. 1024 je najveći broj torki koje se mogu vidjeti u jednoj transakciji. 3600 — vrijeme punog skeniranja u sekundama.

Imajte na umu da primjer koristi isti indeks za indeksiranje i brisanje. Ako je ovo indeks stabla, tada se obilazak provodi od manjeg ključa prema većem. Ako postoji neki drugi, na primjer, hash indeks, tada se obilazak provodi, u pravilu, nasumičnim redoslijedom. Sve prostorne torke se skeniraju u jednom skeniranju.

Umetnimo nekoliko tupleova u prostor sa životnim vijekom od 60 sekundi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Provjerimo je li umetanje uspješno:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve torke) i vidimo da je ograničeni expirationd modul već obrađen:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Zaustavimo zadatak:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Pogledajmo drugi primjer gdje se za indeksiranje koristi zasebni indeks:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Ovdje je sve isto kao u prvom primjeru, uz nekoliko iznimaka. Gradimo indeks stabla na vrhu trećeg polja i nazivamo ga exp. Ovaj indeks ne mora biti jedinstven, za razliku od indeksa koji se zove primarni. Prolazak će se izvršiti exp indeksom, a brisanje primarnim. Sjećamo se da su se prije oba radila samo pomoću primarnog indeksa.

Nakon pripremnog rada, pokrećemo funkciju start s novim argumentima:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Umetnimo ponovno nekoliko torki u prostor sa životnim vijekom od 60 sekundi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Nakon 30 sekundi, po analogiji ćemo dodati još nekoliko torki:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Provjerimo je li umetanje uspješno:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve torke) i vidimo da je ograničeni expirationd modul već obrađen:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ostalo je još nekoliko tupleova u prostoru koji će imati još oko 30 sekundi života. Štoviše, skeniranje je zaustavljeno pri prelasku s torke s ID-om 2 i životnim vijekom 1576421257 na torku s ID-om 3 i životnim vijekom 1576421287. Torke s životnim vijekom 1576421287 ili više nisu skenirane zbog redoslijeda ključeva exp indeksa. To je ušteda koju smo htjeli postići na samom početku.

Zaustavimo zadatak:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Provedba

Najbolji način da ispričate sve značajke projekta je njegov izvorni izvor. šifra! U sklopu publikacije fokusirat ćemo se samo na najvažnije točke, a to su algoritmi zaobilaženja prostora.

Argumenti koje prosljeđujemo početnoj metodi pohranjeni su u strukturi koja se zove expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atribut name je naziv zadatka. Space_id atribut je identifikator prostora. Atribut rm_index_id identifikator je jedinstvenog indeksa kojim će se torke brisati. Atribut it_index_id je identifikator indeksa kojim će se obići torke. Atribut it_index_type je tip indeksa kojim će se obići torke. Atribut filed_no broj je polja tuple s vijekom trajanja. Atribut scan_size je najveći broj torki koje se skeniraju u jednoj transakciji. Atribut scan_time je puno vrijeme skeniranja u sekundama.

Nećemo razmatrati argumente raščlanjivanja. Ovo je mukotrpan, ali jednostavan posao, u kojem će vam knjižnica pomoći msgpuck. Poteškoće mogu nastati samo s indeksima koji se prosljeđuju iz Lue kao složene podatkovne strukture s tipom mp_map, a ne korištenjem jednostavnih tipova mp_bool, mp_double, mp_int, mp_uint i mp_array. Ali nema potrebe analizirati cijeli indeks. Samo trebate provjeriti njegovu jedinstvenost, izračunati tip i izdvojiti identifikator.

Navodimo prototipove svih funkcija koje se koriste za parsiranje:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Sada prijeđimo na ono najvažnije - logiku zaobilaženja prostora i brisanja tuplea. Svaki blok torki ne veći od scan_size skenira se i modificira u jednoj transakciji. Ako je uspješna, ova se transakcija predaje; ako se dogodi pogreška, vraća se unatrag. Posljednji argument funkcije expirationd_iterate je pokazivač na iterator od kojeg skeniranje počinje ili se nastavlja. Ovaj se iterator interno povećava dok se ne pojavi greška, ne ponestane prostora ili nije moguće zaustaviti proces unaprijed. Funkcija expirationd_expired provjerava vijek trajanja tuplea, expirationd_delete briše tuple, expirationd_breakable provjerava trebamo li ići dalje.

Kôd funkcije Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kôd funkcije expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Kôd funkcije Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Kod funkcije Expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

primjena

Izvorni kod možete pogledati na ovdje!

Izvor: www.habr.com

Dodajte komentar