Å skrive vår egen avkortet utløpsmodul for tarantool

Å skrive vår egen avkortet utløpsmodul for tarantool

For en tid siden ble vi møtt med problemet med å rense tupler i mellomrom tarantverktøy. Rengjøring måtte ikke startes når tarantool allerede var tom for minne, men på forhånd og med en viss frekvens. For denne oppgaven har tarantool en modul skrevet i Lua kalt utløp. Etter å ha brukt denne modulen i kort tid, innså vi at den ikke var egnet for oss: på grunn av konstant rengjøring av store datamengder, hang Lua i GC. Derfor tenkte vi på å utvikle vår egen begrensede utløpsmodul, i håp om at koden skrevet på et eget programmeringsspråk ville løse problemene våre på best mulig måte.

Et godt eksempel for oss var tarantool-modulen kalt memcached. Tilnærmingen som brukes i den er basert på det faktum at det opprettes et eget felt i rommet, som indikerer levetiden til tupelen, med andre ord, ttl. Modulen i bakgrunnen skanner rommet, sammenligner TTL med gjeldende klokkeslett og bestemmer om tuppelen skal slettes eller ikke. Den memcachede modulkoden er enkel og elegant, men for generisk. For det første tar den ikke hensyn til typen indeks som blir gjennomsøkt og slettet. For det andre, på hvert pass skannes alle tupler, hvorav antallet kan være ganske stort. Og hvis det første problemet ble løst i den utløpte modulen (treindeksen ble delt inn i en egen klasse), fikk den andre fortsatt ingen oppmerksomhet. Disse tre punktene forutbestemte valget til fordel for å skrive min egen kode.

beskrivelse

Dokumentasjonen for tarantool har en veldig god opplæringen om hvordan du skriver lagrede prosedyrer i C. Først av alt foreslår jeg at du gjør deg kjent med det for å forstå de innleggene med kommandoer og kode som vises nedenfor. Det er også verdt å være oppmerksom på henvisning til objekter som er tilgjengelige når du skriver din egen avgrensede modul, nemlig eske, fiber, indeks и txn.

La oss starte langveis fra og se på hvordan en utløpt modul ser ut fra utsiden:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

For enkelhets skyld lanserer vi tarantool i katalogen der biblioteket vårt libcapped-expirationd.so ligger. To funksjoner eksporteres fra biblioteket: start og drep. Det første trinnet er å gjøre disse funksjonene tilgjengelige fra Lua ved å bruke box.schema.func.create og box.schema.user.grant. Deretter oppretter du en plass hvis tuples bare vil inneholde tre felt: det første er en unik identifikator, det andre er e-post, og det tredje er levetiden til tupelen. Vi bygger en treindeks på toppen av det første feltet og kaller det primær. Deretter får vi tilkoblingsobjektet til vårt opprinnelige bibliotek.

Etter det forberedende arbeidet, kjør startfunksjonen:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Dette eksemplet vil fungere under skanning nøyaktig det samme som utløpsmodulen, som er skrevet i Lua. Det første argumentet til startfunksjonen er det unike navnet på oppgaven. Den andre er plassidentifikatoren. Den tredje er en unik indeks der tupler vil bli slettet. Den fjerde er indeksen som tuplene vil bli krysset med. Den femte er nummeret på tuppelfeltet med levetid (nummereringen starter fra 1, ikke 0!). Den sjette og syvende er skanneinnstillinger. 1024 er det maksimale antallet tupler som kan sees i en enkelt transaksjon. 3600 — full skannetid i sekunder.

Merk at eksemplet bruker den samme indeksen for gjennomsøking og sletting. Hvis dette er en treindeks, utføres kryssingen fra den mindre nøkkelen til den større. Hvis det er en annen, for eksempel hash-indeks, utføres gjennomgangen som regel i tilfeldig rekkefølge. Alle romtupler skannes i én skanning.

La oss sette inn flere tupler i rommet med en levetid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

La oss sjekke at innsettingen var vellykket:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

La oss gjenta valget etter 60+ sekunder (teller fra begynnelsen av innsettingen av den første tuppelen) og se at den begrensede utløpsmodulen allerede har behandlet:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

La oss stoppe oppgaven:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

La oss se på et annet eksempel der en egen indeks brukes for gjennomsøkingen:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Alt her er det samme som i det første eksemplet, med noen få unntak. Vi bygger en treindeks på toppen av det tredje feltet og kaller det exp. Denne indeksen trenger ikke å være unik, i motsetning til indeksen som kalles primær. Traversering vil bli utført av exp-indeks, og sletting av primær. Vi husker at tidligere ble begge utført kun ved bruk av primærindeksen.

Etter forarbeidet kjører vi startfunksjonen med nye argumenter:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

La oss sette inn flere tupler i rommet igjen med en levetid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Etter 30 sekunder, analogt, vil vi legge til noen flere tupler:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

La oss sjekke at innsettingen var vellykket:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

La oss gjenta valget etter 60+ sekunder (teller fra begynnelsen av innsettingen av den første tuppelen) og se at den begrensede utløpsmodulen allerede har behandlet:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Det er fortsatt noen tupler igjen i rommet som vil ha omtrent 30 sekunder igjen å leve. Dessuten stoppet skanningen ved overgang fra en tuppel med en ID på 2 og en levetid på 1576421257 til en tuppel med en ID på 3 og en levetid på 1576421287. Tupler med en levetid på 1576421287 eller mer ble ikke skannet på grunn av bestillingen av exp-indeksnøklene. Dette er besparelsene vi ønsket å oppnå helt i begynnelsen.

La oss stoppe oppgaven:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

implementering

Den beste måten å fortelle om alle funksjonene til et prosjekt er dens opprinnelige kilde. kode! Som en del av publikasjonen vil vi kun fokusere på de viktigste punktene, nemlig space bypass-algoritmer.

Argumentene vi sender til startmetoden er lagret i en struktur kalt expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Navnattributtet er navnet på oppgaven. Space_id-attributtet er space-identifikatoren. Attributtet rm_index_id er identifikatoren til den unike indeksen som tupler vil bli slettet med. It_index_id-attributtet er identifikatoren for indeksen som tupler vil bli krysset med. It_index_type-attributtet er typen indeks som tupler vil bli krysset med. Filed_no-attributtet er nummeret på tuppelfeltet med levetid. scan_size-attributtet er det maksimale antallet tupler som skannes i en transaksjon. scan_time-attributtet er hele skannetiden i sekunder.

Vi vil ikke vurdere å analysere argumenter. Dette er en møysommelig, men enkel jobb, som biblioteket vil hjelpe deg med msgpuck. Vanskeligheter kan bare oppstå med indekser som sendes fra Lua som en kompleks datastruktur med mp_map-typen, og ikke bruker de enkle typene mp_bool, mp_double, mp_int, mp_uint og mp_array. Men det er ikke nødvendig å analysere hele indeksen. Du trenger bare å sjekke dens unikhet, beregne typen og trekke ut identifikatoren.

Vi lister opp prototypene til alle funksjoner som brukes til å analysere:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

La oss nå gå videre til det viktigste - logikken med å omgå plass og slette tupler. Hver blokk med tupler som ikke er større enn scan_size, skannes og endres under en enkelt transaksjon. Hvis den lykkes, blir denne transaksjonen forpliktet; hvis det oppstår feil, rulles den tilbake. Det siste argumentet til funksjonen expirationd_iterate er en peker til iteratoren som skanningen starter eller fortsetter fra. Denne iteratoren økes internt til det oppstår en feil, plassen går tom eller det ikke er mulig å stoppe prosessen på forhånd. Funksjonen expirationd_expired sjekker levetiden til en tuppel, expirationd_delete sletter en tuppel, expirationd_breakable sjekker om vi må gå videre.

Expirationd_iterate funksjonskode:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funksjonskode expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete funksjonskode:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funksjonskode expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

App

Du kan se kildekoden på her!

Kilde: www.habr.com

Legg til en kommentar