Scrierea propriului nostru modul cu expirare plafonată pentru tarantool

Scrierea propriului nostru modul cu expirare plafonată pentru tarantool

În urmă cu ceva timp ne-am confruntat cu problema curățării tuplurilor în spații tarantool. Curățarea trebuia începută nu când tarantoolul rămânea deja fără memorie, ci în avans și la o anumită frecvență. Pentru această sarcină, tarantool are un modul scris în Lua numit expirare. După ce am folosit acest modul pentru o perioadă scurtă de timp, ne-am dat seama că nu era potrivit pentru noi: datorită curățării constante a cantităților mari de date, Lua a stat în GC. Prin urmare, ne-am gândit să dezvoltăm propriul nostru modul cu expirare limitată, sperând că codul scris într-un limbaj de programare nativ ne va rezolva problemele în cel mai bun mod posibil.

Un bun exemplu pentru noi a fost modulul tarantool numit memcached. Abordarea folosită în acesta se bazează pe faptul că în spațiu este creat un câmp separat, care indică durata de viață a tuplului, cu alte cuvinte, ttl. Modulul din fundal scanează spațiul, compară TTL-ul cu ora curentă și decide dacă șterge sau nu tuplu. Codul modulului memcached este simplu și elegant, dar prea generic. În primul rând, nu ia în considerare tipul de index care este accesat cu crawlere și șters. În al doilea rând, la fiecare trecere sunt scanate toate tuplurile, al căror număr poate fi destul de mare. Și dacă în modulul expirat prima problemă a fost rezolvată (indexul arborelui a fost separat într-o clasă separată), atunci a doua încă nu a primit nicio atenție. Aceste trei puncte au predeterminat alegerea în favoarea scrierii propriului meu cod.

descriere

Documentația pentru tarantool are o foarte bună tutorial despre cum să scrieți procedurile stocate în C. În primul rând, vă sugerez să vă familiarizați cu el pentru a înțelege acele inserții cu comenzi și cod care vor apărea mai jos. De asemenea, merită să acordați atenție referinţă la obiectele care sunt disponibile atunci când scrieți propriul dvs. modul limitat, și anume cutie, fibră, index и txn.

Să începem de departe și să vedem cum arată un modul cu expirare plafonată din exterior:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Pentru simplitate, lansăm tarantool în directorul în care se află biblioteca noastră libcapped-expirationd.so. Două funcții sunt exportate din bibliotecă: start și kill. Primul pas este să faceți aceste funcții disponibile din Lua folosind box.schema.func.create și box.schema.user.grant. Apoi creați un spațiu ale cărui tupluri vor conține doar trei câmpuri: primul este un identificator unic, al doilea este e-mail, iar al treilea este durata de viață a tuplului. Construim un index de arbore deasupra primului câmp și îl numim primar. Apoi obținem obiectul de conexiune la biblioteca noastră nativă.

După lucrările pregătitoare, rulați funcția de pornire:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Acest exemplu va funcționa în timpul scanării exact la fel ca modulul expirat, care este scris în Lua. Primul argument al funcției de pornire este numele unic al sarcinii. Al doilea este identificatorul spațiului. Al treilea este un index unic prin care tuplurile vor fi șterse. Al patrulea este indicele prin care vor fi parcurse tuplurile. Al cincilea este numărul câmpului tuplu cu durata de viață (numerotarea începe de la 1, nu de la 0!). Al șaselea și al șaptelea sunt setările de scanare. 1024 este numărul maxim de tupluri care pot fi vizualizate într-o singură tranzacție. 3600 — timp de scanare complet în secunde.

Rețineți că exemplul folosește același index pentru accesare cu crawlere și ștergere. Dacă acesta este un index de arbore, atunci traversarea este efectuată de la cheia mai mică la cea mai mare. Dacă există un alt indice hash, de exemplu, atunci traversarea este efectuată, de regulă, în ordine aleatorie. Toate tuplurile spațiale sunt scanate într-o singură scanare.

Să introducem mai multe tupluri în spațiu cu o durată de viață de 60 de secunde:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Să verificăm dacă inserarea a avut succes:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Să repetăm ​​selecția după 60+ secunde (numărând de la începutul inserării primului tuplu) și să vedem că modulul cu expirare plafonată a procesat deja:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Să oprim sarcina:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Să ne uităm la un al doilea exemplu în care este folosit un index separat pentru accesare cu crawlere:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Totul aici este la fel ca în primul exemplu, cu câteva excepții. Construim un index de arbore deasupra celui de-al treilea câmp și îl numim exp. Acest index nu trebuie să fie unic, spre deosebire de indexul numit primar. Traversarea va fi efectuată prin index exp, iar ștergerea prin primar. Ne amintim că anterior ambele se făceau doar folosind indicele primar.

După lucrările pregătitoare, rulăm funcția de pornire cu argumente noi:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Să inserăm din nou câteva tupluri în spațiu cu o durată de viață de 60 de secunde:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

După 30 de secunde, prin analogie, vom mai adăuga câteva tupluri:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Să verificăm dacă inserarea a avut succes:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Să repetăm ​​selecția după 60+ secunde (numărând de la începutul inserării primului tuplu) și să vedem că modulul cu expirare plafonată a procesat deja:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Au mai rămas câteva tupluri în spațiu care vor avea încă vreo 30 de secunde de trăit. Mai mult, scanarea s-a oprit la trecerea de la un tuplu cu un ID de 2 și o durată de viață de 1576421257 la un tuplu cu un ID de 3 și o viață de 1576421287. Tuplurile cu o durată de viață de 1576421287 sau mai mult nu au fost scanate din cauza comandării tastele index exp. Acestea sunt economiile pe care am vrut să le realizăm de la bun început.

Să oprim sarcina:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

punerea în aplicare

Cel mai bun mod de a spune despre toate caracteristicile unui proiect este sursa lui originală. cod! Ca parte a publicației, ne vom concentra doar pe cele mai importante puncte, și anume, algoritmii de ocolire a spațiului.

Argumentele pe care le transmitem metodei start sunt stocate într-o structură numită expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atributul nume este numele sarcinii. Atributul space_id este identificatorul spațiului. Atributul rm_index_id este identificatorul indexului unic prin care tuplurile vor fi șterse. Atributul it_index_id este identificatorul indexului prin care vor fi parcurse tuplurile. Atributul it_index_type este tipul de index prin care vor fi parcurse tuplurile. Atributul filed_no este numărul câmpului tuplu cu durata de viață. Atributul scan_size este numărul maxim de tupluri care sunt scanate într-o tranzacție. Atributul scan_time este timpul complet de scanare în secunde.

Nu vom lua în considerare analizarea argumentelor. Aceasta este o treabă minuțioasă, dar simplă, cu care biblioteca vă va ajuta msgpuck. Dificultățile pot apărea numai cu indici care sunt transmise de la Lua ca o structură de date complexă cu tipul mp_map și nu folosind tipurile simple mp_bool, mp_double, mp_int, mp_uint și mp_array. Dar nu este nevoie să analizați întregul index. Trebuie doar să verificați unicitatea acestuia, să calculați tipul și să extrageți identificatorul.

Enumerăm prototipurile tuturor funcțiilor care sunt utilizate pentru analiza:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Acum să trecem la cel mai important lucru - logica ocolirii spațiului și a ștergerii tuplurilor. Fiecare bloc de tupluri nu mai mare decât scan_size este scanat și modificat în cadrul unei singure tranzacții. Dacă are succes, această tranzacție este comisă; dacă apare o eroare, aceasta este anulată. Ultimul argument al funcției expirationd_iterate este un pointer către iteratorul de la care începe sau continuă scanarea. Acest iterator este incrementat intern până când apare o eroare, spațiul se epuizează sau nu este posibilă oprirea procesului în avans. Funcția expirationd_expired verifică durata de viață a unui tuplu, expirationd_delete șterge un tuplu, expirationd_breakable verifică dacă trebuie să mergem mai departe.

Codul funcției Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Codul funcției expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Codul funcției Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Codul funcției Expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

App

Puteți vizualiza codul sursă la aici!

Sursa: www.habr.com

Adauga un comentariu