Pisanje našeg vlastitog ograničenog modula za tarantool

Pisanje našeg vlastitog ograničenog modula za tarantool

Prije nekog vremena suočili smo se s problemom čišćenja tuple-a u prostorima tarantool. Čišćenje je trebalo započeti ne kada je tarantoolu već ponestalo memorije, već unaprijed i na određenoj frekvenciji. Za ovaj zadatak, tarantool ima modul napisan na Lui pod nazivom isteka. Nakon kratkog korištenja ovog modula, shvatili smo da nam nije prikladan: zbog stalnog čišćenja velikih količina podataka Lua je visila u GC-u. Stoga smo razmišljali o razvoju vlastitog modula sa ograničenim rokom trajanja, nadajući se da će kod napisan na maternjem programskom jeziku riješiti naše probleme na najbolji mogući način.

Dobar primjer za nas je bio tarantool modul pod nazivom memcached. Pristup koji se koristi u njemu zasniva se na činjenici da se u prostoru kreira posebno polje koje ukazuje na životni vijek tuple, drugim riječima, ttl. Modul u pozadini skenira prostor, uspoređuje TTL sa trenutnim vremenom i odlučuje hoće li obrisati tuple ili ne. Kod modula memcached je jednostavan i elegantan, ali previše generički. Prvo, ne uzima u obzir tip indeksa koji se indeksira i briše. Drugo, na svakom prolazu se skeniraju sve torke, čiji broj može biti prilično velik. I ako je u modulu istekao prvi problem riješen (indeks stabla je izdvojen u zasebnu klasu), onda se na drugi i dalje nije obraćala pažnja. Ove tri tačke predodredile su izbor u korist pisanja sopstvenog koda.

Opis

Dokumentacija za tarantool ima vrlo dobru tutorial o tome kako napisati svoje pohranjene procedure u C. Prije svega, predlažem da se upoznate s tim kako biste razumjeli one umetke sa komandama i kodom koji će se pojaviti ispod. Takođe vredi obratiti pažnju referenca na objekte koji su dostupni prilikom pisanja vašeg vlastitog ograničenog modula, naime kutija, vlakno, indeks и txn.

Počnimo izdaleka i pogledajmo kako spolja izgleda ograničeni modul s istekom roka trajanja:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Radi jednostavnosti, pokrećemo tarantool u direktoriju u kojem se nalazi naša biblioteka libcapped-expirationd.so. Iz biblioteke se izvoze dvije funkcije: start i kill. Prvi korak je da ove funkcije učinite dostupnim iz Lua koristeći box.schema.func.create i box.schema.user.grant. Zatim kreirajte prostor čije će torke sadržavati samo tri polja: prvo je jedinstveni identifikator, drugo je e-pošta, a treće je životni vijek torke. Mi gradimo indeks stabla na vrhu prvog polja i nazivamo ga primarnim. Zatim dobijamo objekat veze sa našom matičnom bibliotekom.

Nakon pripremnih radova, pokrenite funkciju start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ovaj primjer će raditi tokom skeniranja potpuno isto kao i istekao modul, koji je napisan u Lua. Prvi argument funkcije start je jedinstveno ime zadatka. Drugi je identifikator prostora. Treći je jedinstveni indeks pomoću kojeg će se tuple brisati. Četvrti je indeks kojim će se preći torke. Peti je broj polja tuple sa životnim vijekom (numeracija počinje od 1, a ne od 0!). Šesta i sedma su postavke skeniranja. 1024 je maksimalni broj torki koje se mogu vidjeti u jednoj transakciji. 3600 — puno vrijeme skeniranja u sekundama.

Imajte na umu da primjer koristi isti indeks za indeksiranje i brisanje. Ako je ovo indeks stabla, tada se prelazak vrši od manjeg ključa do većeg. Ako postoji neki drugi, na primjer, hash indeks, tada se obilaženje vrši, u pravilu, slučajnim redoslijedom. Svi prostorni tokovi se skeniraju u jednom skeniranju.

Ubacimo nekoliko tuple-ova u prostor sa životnim vijekom od 60 sekundi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Provjerimo da li je umetanje bilo uspješno:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve tuple) i vidimo da je ograničeni modul isteka već radio:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Zaustavimo zadatak:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Pogledajmo drugi primjer gdje se zasebni indeks koristi za indeksiranje:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Ovdje je sve isto kao u prvom primjeru, sa nekoliko izuzetaka. Na vrhu trećeg polja gradimo indeks stabla i zovemo ga exp. Ovaj indeks ne mora biti jedinstven, za razliku od indeksa koji se naziva primarni. Prelazak će se vršiti indeksom exp, a brisanje primarnim. Sjećamo se da su prethodno oba rađena samo pomoću primarnog indeksa.

Nakon pripremnog rada, pokrećemo funkciju start s novim argumentima:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Ponovo ubacimo nekoliko tuple u prostor sa životnim vijekom od 60 sekundi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Nakon 30 sekundi, po analogiji, dodaćemo još nekoliko tuple-ova:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Provjerimo da li je umetanje bilo uspješno:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ponovimo odabir nakon 60+ sekundi (računajući od početka umetanja prve tuple) i vidimo da je ograničeni modul isteka već radio:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

U prostoru je ostalo još nekih torva kojima je ostalo još oko 30 sekundi života. Štaviše, skeniranje je zaustavljeno pri prelasku sa torke s ID-om 2 i životnim vijekom 1576421257 na torku sa ID-om 3 i životnim vijekom 1576421287. Torke sa životnim vijekom 1576421287 ili više nisu skenirane zbog redoslijeda ključeva indeksa exp. To je ušteda koju smo željeli postići na samom početku.

Zaustavimo zadatak:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Najbolji način da se ispriča o svim karakteristikama projekta je njegov izvorni izvor. kod! U sklopu publikacije fokusirat ćemo se samo na najvažnije točke, odnosno algoritme zaobilaženja prostora.

Argumenti koje prosljeđujemo start metodi pohranjeni su u strukturi koja se zove expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atribut name je naziv zadatka. Atribut space_id je identifikator prostora. Atribut rm_index_id je identifikator jedinstvenog indeksa pomoću kojeg će se tuple brisati. It_index_id atribut je identifikator indeksa kojim će se torke prelaziti. Atribut it_index_type je tip indeksa kojim će se torke prelaziti. Atribut filed_no je broj polja tuple sa životnim vijekom. Atribut scan_size je maksimalni broj torkova koji se skeniraju u jednoj transakciji. Atribut scan_time je puno vrijeme skeniranja u sekundama.

Nećemo razmatrati raščlanjivanje argumenata. Ovo je mukotrpan, ali jednostavan posao u kojem će vam biblioteka pomoći msgpuck. Poteškoće mogu nastati samo sa indeksima koji se prosleđuju iz Lua kao složena struktura podataka sa tipom mp_map, a ne sa upotrebom jednostavnih tipova mp_bool, mp_double, mp_int, mp_uint i mp_array. Ali nema potrebe za raščlanjivanjem cijelog indeksa. Vi samo trebate provjeriti njegovu jedinstvenost, izračunati tip i izdvojiti identifikator.

Navodimo prototipove svih funkcija koje se koriste za raščlanjivanje:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Pređimo sada na najvažniju stvar - logiku zaobilaženja prostora i brisanja torki. Svaki blok torki ne veći od scan_size se skenira i modificira u okviru jedne transakcije. Ako je uspješna, ova transakcija se predaje; ako dođe do greške, vraća se nazad. Posljednji argument funkcije expirationd_iterate je pokazivač na iterator iz kojeg skeniranje počinje ili se nastavlja. Ovaj iterator se interno povećava sve dok ne dođe do greške, dok ne ponestane prostora ili nije moguće zaustaviti proces unaprijed. Funkcija expirationd_expired provjerava životni vijek torke, expirationd_delete briše tuple, expirationd_breakable provjerava da li trebamo ići dalje.

Expirationd_iterate kod funkcije:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kôd funkcije expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete kod funkcije:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Kôd funkcije expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplikacija

Izvorni kod možete pogledati na ovdje!

izvor: www.habr.com

Dodajte komentar