At skrive vores eget lukkede udløbsmodul til tarantool

At skrive vores eget lukkede udløbsmodul til tarantool

For noget tid siden stod vi over for problemet med at rense tupler i rummet tarantværktøj. Rengøring skulle ikke startes, når tarantool allerede var ved at løbe tør for hukommelse, men på forhånd og med en bestemt frekvens. Til denne opgave har tarantool et modul skrevet i Lua kaldet udløb. Efter at have brugt dette modul i kort tid, indså vi, at det ikke var egnet til os: på grund af konstant rensning af store mængder data, hang Lua i GC. Derfor tænkte vi på at udvikle vores eget lukkede udløbsmodul i håb om, at koden skrevet på et indfødt programmeringssprog ville løse vores problemer på den bedst mulige måde.

Et godt eksempel for os var tarantool-modulet kaldet memcached. Den anvendte tilgang er baseret på, at der skabes et separat felt i rummet, som angiver tupelens levetid, med andre ord ttl. Modulet i baggrunden scanner rummet, sammenligner TTL med det aktuelle klokkeslæt og beslutter, om tuplet skal slettes eller ej. Den memcachede modulkode er enkel og elegant, men for generisk. For det første tager det ikke højde for den type indeks, der bliver gennemgået og slettet. For det andet scannes alle tupler ved hver passage, hvoraf antallet kan være ret stort. Og hvis det første problem i det udløbne modul blev løst (træindekset blev opdelt i en separat klasse), så fik det andet stadig ingen opmærksomhed. Disse tre punkter forudbestemte valget til fordel for at skrive min egen kode.

beskrivelse

Dokumentationen for tarantool har en meget god tutorial om, hvordan du skriver dine lagrede procedurer i C. Først og fremmest foreslår jeg, at du gør dig bekendt med det for at forstå de indsættelser med kommandoer og kode, der vises nedenfor. Det er også værd at være opmærksom på reference til objekter, der er tilgængelige, når du skriver dit eget afgrænsede modul, nemlig kasse, fiber, indeks и TXN.

Lad os starte langvejs fra og se på, hvordan et udløbet udløbsmodul ser ud udefra:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

For nemheds skyld lancerer vi tarantool i den mappe, hvor vores libcapped-expirationd.so bibliotek er placeret. To funktioner eksporteres fra biblioteket: start og dræb. Det første trin er at gøre disse funktioner tilgængelige fra Lua ved hjælp af box.schema.func.create og box.schema.user.grant. Opret derefter et rum, hvis tuples kun vil indeholde tre felter: det første er en unik identifikator, det andet er e-mail, og det tredje er tuplens levetid. Vi bygger et træindeks oven på det første felt og kalder det primært. Dernæst får vi forbindelsesobjektet til vores oprindelige bibliotek.

Efter det forberedende arbejde skal du køre startfunktionen:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Dette eksempel vil fungere under scanning nøjagtigt det samme som det udløbne modul, som er skrevet i Lua. Det første argument til startfunktionen er det unikke navn på opgaven. Den anden er pladsidentifikatoren. Det tredje er et unikt indeks, hvormed tupler vil blive slettet. Den fjerde er det indeks, som tuplerne vil blive gennemløbet med. Den femte er nummeret på tupelfeltet med levetid (nummereringen starter fra 1, ikke 0!). Den sjette og syvende er scanningsindstillinger. 1024 er det maksimale antal tuples, der kan ses i en enkelt transaktion. 3600 — fuld scanningstid i sekunder.

Bemærk, at eksemplet bruger det samme indeks til at gennemgå og slette. Hvis dette er et træindeks, udføres gennemgangen fra den mindre nøgle til den større. Hvis der er et andet, for eksempel hash-indeks, udføres gennemgangen som regel i tilfældig rækkefølge. Alle space tuples scannes i én scanning.

Lad os indsætte flere tuples i rummet med en levetid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Lad os kontrollere, at indsættelsen lykkedes:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Lad os gentage valget efter 60+ sekunder (tæller fra begyndelsen af ​​indsættelsen af ​​den første tuple) og se, at det begrænsede udløbsmodul allerede har behandlet:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Lad os stoppe opgaven:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Lad os se på et andet eksempel, hvor et separat indeks bruges til gennemgangen:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Alt her er det samme som i det første eksempel, med nogle få undtagelser. Vi bygger et træindeks oven på det tredje felt og kalder det exp. Dette indeks behøver ikke at være unikt, i modsætning til det indeks, der kaldes primært. Gennemgang vil blive udført af exp indeks, og sletning af primær. Vi husker, at begge tidligere kun blev udført ved hjælp af det primære indeks.

Efter det forberedende arbejde kører vi startfunktionen med nye argumenter:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Lad os indsætte flere tupler i rummet igen med en levetid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Efter 30 sekunder vil vi analogt tilføje et par flere tupler:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Lad os kontrollere, at indsættelsen lykkedes:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Lad os gentage valget efter 60+ sekunder (tæller fra begyndelsen af ​​indsættelsen af ​​den første tuple) og se, at det begrænsede udløbsmodul allerede har behandlet:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Der er stadig nogle tupler tilbage i rummet, der vil have omkring 30 sekunder mere at leve. Desuden stoppede scanningen, da man flyttede fra en tupel med et ID på 2 og en levetid på 1576421257 til en tuple med et ID på 3 og en levetid på 1576421287. Tuples med en levetid på 1576421287 eller mere blev ikke scannet på grund af bestilling af exp indeks nøglerne. Det er de besparelser, vi ønskede at opnå i begyndelsen.

Lad os stoppe opgaven:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

implementering

Den bedste måde at fortælle om alle funktionerne i et projekt er dets originale kilde. kode! Som en del af publikationen vil vi kun fokusere på de vigtigste punkter, nemlig space bypass-algoritmer.

Argumenterne, vi sender til startmetoden, er gemt i en struktur kaldet expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Navnattributten er navnet på opgaven. Space_id-attributten er space-id'et. Attributten rm_index_id er identifikatoren for det unikke indeks, hvormed tupler vil blive slettet. Attributten it_index_id er identifikatoren for det indeks, som tupler vil blive krydset med. Attributten it_index_type er den type indeks, som tupler vil blive krydset med. Filed_no-attributten er nummeret på tupelfeltet med levetid. Attributten scan_size er det maksimale antal tuples, der scannes i en transaktion. Attributten scan_time er den fulde scanningstid i sekunder.

Vi vil ikke overveje at analysere argumenter. Dette er et møjsommeligt, men enkelt arbejde, som biblioteket vil hjælpe dig med msgpuck. Vanskeligheder kan kun opstå med indekser, der overføres fra Lua som en kompleks datastruktur med mp_map-typen, og ikke ved at bruge de simple typer mp_bool, mp_double, mp_int, mp_uint og mp_array. Men der er ingen grund til at analysere hele indekset. Du skal bare tjekke dens unikke karakter, beregne typen og udtrække identifikatoren.

Vi lister prototyperne af alle funktioner, der bruges til at analysere:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Lad os nu gå videre til det vigtigste - logikken i at omgå plads og slette tupler. Hver blok af tupler, der ikke er større end scan_size, scannes og ændres under en enkelt transaktion. Hvis den lykkes, er denne transaktion begået; hvis der opstår fejl, rulles den tilbage. Det sidste argument til expirationd_iterate-funktionen er en pegepind til iteratoren, hvorfra scanningen begynder eller fortsætter. Denne iterator øges internt, indtil der opstår en fejl, pladsen løber tør, eller det ikke er muligt at stoppe processen på forhånd. Funktionen expirationd_expired kontrollerer levetiden for en tupel, expirationd_delete sletter en tupel, expirationd_breakable tjekker om vi skal videre.

Expirationd_iterate funktionskode:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funktionskode expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete funktionskode:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funktionskode expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

App

Du kan se kildekoden på her!

Kilde: www.habr.com

Tilføj en kommentar