Kirjoitetaan omaa rajoittunutta vanhentunutta moduuliamme tarantoolille

Kirjoitetaan omaa rajoittunutta vanhentunutta moduuliamme tarantoolille

Jokin aika sitten kohtasimme tilojen siivousongelman tarantool. Puhdistus ei pitänyt aloittaa kun tarantolin muisti oli jo loppumassa, vaan etukäteen ja tietyllä tiheydellä. Tätä tehtävää varten tarantoolilla on lua-kielellä kirjoitettu moduuli nimeltään vanheneminen. Käytyämme tätä moduulia lyhyen aikaa huomasimme, että se ei sovellu meille: suurten tietomäärien jatkuvan puhdistuksen vuoksi Lua roikkui GC:ssä. Siksi ajattelimme kehittää oman capped expirated moduulimme, toivoen, että natiivi ohjelmointikielellä kirjoitettu koodi ratkaisee ongelmamme parhaalla mahdollisella tavalla.

Hyvä esimerkki meille oli tarantool-moduuli ns memcached. Siinä käytetty lähestymistapa perustuu siihen, että tilaan luodaan erillinen kenttä, joka ilmaisee monikon eliniän eli ttl. Taustalla oleva moduuli skannaa tilan, vertaa TTL:ää nykyiseen aikaan ja päättää, poistetaanko monikko vai ei. Välimuistissa oleva moduulikoodi on yksinkertainen ja tyylikäs, mutta liian yleinen. Ensinnäkin se ei ota huomioon indeksoitavan ja poistettavan hakemiston tyyppiä. Toiseksi, jokaisella siirrolla skannataan kaikki monikot, joiden lukumäärä voi olla melko suuri. Ja jos vanhentuneessa moduulissa ensimmäinen ongelma ratkesi (puuindeksi erotettiin erilliseen luokkaan), toinen ei vieläkään saanut huomiota. Nämä kolme pistettä määrittelivät ennalta valinnan oman koodini kirjoittamisen puolesta.

Kuvaus

Tarantoolin dokumentaatio on erittäin hyvä opetusohjelma siitä, kuinka kirjoitat tallennetut proseduurit C-kielellä. Ensinnäkin ehdotan, että tutustut siihen, jotta ymmärrät alla näkyvät lisäykset komentoineen ja koodiineen. Siihen kannattaa myös kiinnittää huomiota viite objekteihin, jotka ovat käytettävissä, kun kirjoitat omaa rajattua moduulia, nimittäin laatikko, kuitu, indeksi и txn.

Aloitetaan kaukaa ja katsotaan miltä vanhentunut moduuli näyttää ulkopuolelta:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Yksinkertaisuuden vuoksi käynnistämme tarantoolin hakemistossa, jossa libcapped-expirationd.so-kirjastomme sijaitsee. Kirjastosta viedään kaksi funktiota: start ja kill. Ensimmäinen vaihe on ottaa nämä toiminnot käyttöön Luasta käyttämällä box.schema.func.create ja box.schema.user.grant. Luo sitten tila, jonka monikot sisältävät vain kolme kenttää: ensimmäinen on yksilöllinen tunniste, toinen sähköposti ja kolmas monikko elinikä. Rakennamme puuindeksin ensimmäisen kentän päälle ja kutsumme sitä ensisijaiseksi. Seuraavaksi saamme yhteysobjektin alkuperäiseen kirjastoomme.

Valmistelun jälkeen suorita käynnistystoiminto:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Tämä esimerkki toimii tarkistuksen aikana täsmälleen samalla tavalla kuin vanhentunut moduuli, joka on kirjoitettu Luassa. Aloitusfunktion ensimmäinen argumentti on tehtävän yksilöllinen nimi. Toinen on tilan tunniste. Kolmas on yksilöllinen indeksi, jolla monikot poistetaan. Neljäs on indeksi, jolla monikot kulkevat. Viides on monikkokentän numero, jolla on elinikä (numerointi alkaa 1:stä, ei 0:sta!). Kuudes ja seitsemäs ovat skannausasetukset. 1024 on enimmäismäärä monikkoja, joita voidaan tarkastella yhdessä tapahtumassa. 3600 — täysi skannausaika sekunneissa.

Huomaa, että esimerkki käyttää samaa indeksiä indeksointiin ja poistamiseen. Jos tämä on puuindeksi, läpikulku suoritetaan pienemmästä avaimesta suurempaan. Jos on jokin muu, esimerkiksi hash-indeksi, niin läpikulku suoritetaan pääsääntöisesti satunnaisessa järjestyksessä. Kaikki avaruusjoukot skannataan yhdellä skannauksella.

Lisätään tilaan useita sarjoja, joiden elinikä on 60 sekuntia:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Tarkistetaan, että lisäys onnistui:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Toistetaan valinta yli 60 sekunnin kuluttua (lasketaan ensimmäisen monikon lisäyksen alusta) ja katsotaan, että vanhentunut moduuli on jo käsitelty:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Lopetetaan tehtävä:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Katsotaanpa toista esimerkkiä, jossa indeksointiin käytetään erillistä hakemistoa:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Kaikki tässä on sama kuin ensimmäisessä esimerkissä muutamaa poikkeusta lukuun ottamatta. Rakennamme puuindeksin kolmannen kentän päälle ja kutsumme sitä exp. Tämän indeksin ei tarvitse olla yksilöllinen, toisin kuin ensisijainen indeksi. Läpikulku suoritetaan exp-indeksillä ja poistaminen ensisijaisella. Muistamme, että aiemmin molemmat tehtiin vain ensisijaisella indeksillä.

Valmistelun jälkeen suoritamme aloitusfunktion uusilla argumenteilla:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Lisätään tilaan taas useita monikoita, joiden elinikä on 60 sekuntia:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

30 sekunnin kuluttua lisäämme analogisesti muutama monikko:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Tarkistetaan, että lisäys onnistui:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Toistetaan valinta yli 60 sekunnin kuluttua (lasketaan ensimmäisen monikon lisäyksen alusta) ja katsotaan, että vanhentunut moduuli on jo käsitelty:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Avaruudessa on vielä joitakin tupleja jäljellä, joilla on vielä noin 30 sekuntia elinaikaa. Lisäksi skannaus pysähtyi siirryttäessä monikosta, jonka tunnus on 2 ja elinikä 1576421257 monikkoon, jonka tunnus on 3 ja elinikä 1576421287. Tupleja, joiden elinikä on 1576421287 tai enemmän, ei skannattu johtuen tilauksesta exp-indeksiavaimet. Tämä on se säästö, jonka halusimme saavuttaa heti alussa.

Lopetetaan tehtävä:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Реализация

Paras tapa kertoa projektin kaikista ominaisuuksista on sen alkuperäinen lähde. koodi! Osana julkaisua keskitymme vain tärkeimpiin kohtiin, nimittäin avaruuden ohitusalgoritmeihin.

Argumentit, jotka välitämme aloitusmenetelmälle, on tallennettu rakenteeseen nimeltä expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Nimi-attribuutti on tehtävän nimi. Attribuutti space_id on tilan tunniste. Attribuutti rm_index_id on yksilöllisen indeksin tunniste, jolla monikot poistetaan. Attribuutti it_index_id on sen indeksin tunniste, jonka avulla monikot kulkevat. Attribuutti it_index_type on indeksityyppi, jolla monikot kulkevat. Filed_no-attribuutti on monikkokentän numero, jossa on elinikä. Attribuutti scan_size on lukujen enimmäismäärä, jotka tarkistetaan yhdessä tapahtumassa. Attribuutti scan_time on koko skannausaika sekunneissa.

Emme harkitse argumenttien jäsentämistä. Tämä on vaivalloinen mutta yksinkertainen työ, jossa kirjasto auttaa sinua msgpuck. Vaikeuksia voi syntyä vain indekseillä, jotka välitetään Luasta monimutkaisena tietorakenteena mp_map-tyypin kanssa, eivätkä yksinkertaisia ​​tyyppejä mp_bool, mp_double, mp_int, mp_uint ja mp_array käyttämällä. Mutta koko indeksiä ei tarvitse jäsentää. Sinun tarvitsee vain tarkistaa sen ainutlaatuisuus, laskea tyyppi ja purkaa tunniste.

Luettelemme kaikkien jäsentämiseen käytettyjen funktioiden prototyypit:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Siirrytään nyt tärkeimpään asiaan - tilan ohittamisen ja monikoiden poistamisen logiikkaan. Jokainen lukujono, joka ei ole suurempi kuin scan_size, tarkistetaan ja muokataan yhden tapahtuman aikana. Jos se onnistuu, tämä tapahtuma hyväksytään; jos tapahtuu virhe, se peruutetaan. Expirationd_iterate-funktion viimeinen argumentti on osoitin iteraattoriin, josta skannaus alkaa tai jatkuu. Tätä iteraattoria kasvatetaan sisäisesti, kunnes tapahtuu virhe, tila loppuu tai prosessia ei ole mahdollista pysäyttää etukäteen. Funktio expirationd_expired tarkistaa monikon käyttöiän, expirationd_delete poistaa monikon, expirationd_breakable tarkistaa, tarvitseeko meidän jatkaa.

Expirationd_iterate-funktiokoodi:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Toimintokoodi expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete-funktiokoodi:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Expirationd_breakable-funktiokoodi:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Sovellus

Voit tarkastella lähdekoodia osoitteessa täällä!

Lähde: will.com

Lisää kommentti