Skriver vår egen begränsade utgångsmodul för tarantool

Skriver vår egen begränsade utgångsmodul för tarantool

För en tid sedan stod vi inför problemet med att rengöra tuplar i utrymmen tarantverktyg. Rengöringen måste påbörjas inte när tarantool redan hade slut på minne, utan i förväg och med en viss frekvens. För denna uppgift har tarantool en modul skriven i Lua som kallas utgång. Efter att ha använt den här modulen under en kort tid insåg vi att den inte var lämplig för oss: på grund av konstant rensning av stora mängder data hängde Lua i GC. Därför funderade vi på att utveckla vår egen begränsade utgångsmodul, i hopp om att koden skriven på ett inhemskt programmeringsspråk skulle lösa våra problem på bästa möjliga sätt.

Ett bra exempel för oss var tarantool-modulen som heter memcached. Tillvägagångssättet som används i den bygger på det faktum att ett separat fält skapas i utrymmet, vilket anger tupelns livslängd, med andra ord, ttl. Modulen i bakgrunden skannar utrymmet, jämför TTL med den aktuella tiden och bestämmer om tupeln ska raderas eller inte. Den memcachade modulkoden är enkel och elegant, men för generisk. För det första tar den inte hänsyn till vilken typ av index som genomsöks och tas bort. För det andra skannas alla tuplar vid varje pass, vars antal kan vara ganska stort. Och om det första problemet i den utgångna modulen löstes (trädindexet var uppdelat i en separat klass), fick den andra fortfarande ingen uppmärksamhet. Dessa tre punkter förutbestämde valet till förmån för att skriva min egen kod.

beskrivning

Dokumentationen för tarantool har en mycket bra handledning om hur du skriver dina lagrade procedurer i C. Först och främst föreslår jag att du bekantar dig med det för att förstå de inlägg med kommandon och kod som kommer att visas nedan. Det är också värt att uppmärksamma referens till objekt som är tillgängliga när du skriver din egen capped modul, nämligen låda, fiber, index и txn.

Låt oss börja på långt håll och titta på hur en modul med utgångsdatum ser ut från utsidan:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

För enkelhetens skull lanserar vi tarantool i katalogen där vårt libcapped-expirationd.so bibliotek finns. Två funktioner exporteras från biblioteket: start och kill. Det första steget är att göra dessa funktioner tillgängliga från Lua med box.schema.func.create och box.schema.user.grant. Skapa sedan ett utrymme vars tuplar endast kommer att innehålla tre fält: det första är en unik identifierare, det andra är e-post och det tredje är tupelns livstid. Vi bygger ett trädindex ovanpå det första fältet och kallar det primärt. Därefter får vi anslutningsobjektet till vårt inhemska bibliotek.

Efter det förberedande arbetet, kör startfunktionen:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Det här exemplet kommer att fungera under skanning på exakt samma sätt som den utgångna modulen, som är skriven i Lua. Det första argumentet till startfunktionen är det unika namnet på uppgiften. Den andra är mellanslagsidentifieraren. Det tredje är ett unikt index genom vilket tupler kommer att raderas. Det fjärde är indexet med vilket tuplarna kommer att passeras. Den femte är numret på tuppelfältet med livslängd (numreringen börjar från 1, inte 0!). Den sjätte och sjunde är skanningsinställningar. 1024 är det maximala antalet tupler som kan ses i en enda transaktion. 3600 — full skanningstid i sekunder.

Observera att exemplet använder samma index för att genomsöka och ta bort. Om detta är ett trädindex utförs övergången från den mindre nyckeln till den större. Om det finns något annat, till exempel, hashindex, utförs övergången som regel i slumpmässig ordning. Alla rymdtuplar skannas i en skanning.

Låt oss infoga flera tuplar i utrymmet med en livstid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Låt oss kontrollera att infogningen lyckades:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Låt oss upprepa valet efter 60+ sekunder (räknat från början av infogningen av den första tupeln) och se att den begränsade utgångsmodulen redan har bearbetats:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Låt oss stoppa uppgiften:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Låt oss titta på ett andra exempel där ett separat index används för genomsökningen:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Allt här är detsamma som i det första exemplet, med några få undantag. Vi bygger ett trädindex ovanpå det tredje fältet och kallar det exp. Detta index behöver inte vara unikt, till skillnad från indexet som kallas primärt. Traversering kommer att utföras av exp-index och radering av primär. Vi minns att tidigare gjordes båda endast med det primära indexet.

Efter förarbetet kör vi startfunktionen med nya argument:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Låt oss infoga flera tuplar i utrymmet igen med en livstid på 60 sekunder:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Efter 30 sekunder, analogt, kommer vi att lägga till några fler tuplar:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Låt oss kontrollera att infogningen lyckades:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Låt oss upprepa valet efter 60+ sekunder (räknat från början av infogningen av den första tupeln) och se att den begränsade utgångsmodulen redan har bearbetats:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Det finns fortfarande några tuplar kvar i utrymmet som kommer att ha cirka 30 sekunder kvar att leva. Dessutom avbröts skanningen när man flyttade från en tuppel med ett ID på 2 och en livstid på 1576421257 till en tuppel med ett ID på 3 och en livstid på 1576421287. Tupler med en livstid på 1576421287 eller mer skannades inte på grund av beställningen av exp index-nycklarna. Detta är besparingarna som vi ville uppnå från början.

Låt oss stoppa uppgiften:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

genomförande

Det bästa sättet att berätta om alla funktioner i ett projekt är dess ursprungliga källa. код! Som en del av publikationen kommer vi bara att fokusera på de viktigaste punkterna, nämligen space bypass-algoritmer.

Argumenten vi skickar till startmetoden lagras i en struktur som kallas expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Namnattributet är namnet på uppgiften. Attributet space_id är mellanslagsidentifieraren. Attributet rm_index_id är identifieraren för det unika indexet med vilket tupler kommer att raderas. Attributet it_index_id är identifieraren för indexet med vilket tupler kommer att passeras. Attributet it_index_type är den typ av index med vilken tuplar kommer att passeras. Attributet filed_no är numret på tupelfältet med livslängd. Attributet scan_size är det maximala antalet tuplar som skannas i en transaktion. Attributet scan_time är hela skanningstiden i sekunder.

Vi kommer inte att överväga att analysera argument. Det här är ett mödosamt men enkelt jobb som biblioteket hjälper dig med msgpuck. Svårigheter kan bara uppstå med index som skickas från Lua som en komplex datastruktur med typen mp_map, och inte med de enkla typerna mp_bool, mp_double, mp_int, mp_uint och mp_array. Men det finns inget behov av att analysera hela indexet. Du behöver bara kontrollera dess unikhet, beräkna typen och extrahera identifieraren.

Vi listar prototyperna för alla funktioner som används för att analysera:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Låt oss nu gå vidare till det viktigaste - logiken i att kringgå utrymme och ta bort tupler. Varje block av tuplar som inte är större än scan_size skannas och modifieras under en enda transaktion. Om den lyckas begås denna transaktion, om fel uppstår återställs den. Det sista argumentet för funktionen expirationd_iterate är en pekare till iteratorn från vilken skanningen börjar eller fortsätter. Denna iterator inkrementeras internt tills ett fel uppstår, utrymmet tar slut eller det inte är möjligt att stoppa processen i förväg. Funktionen expirationd_expired kontrollerar livslängden för en tupel, expirationd_delete tar bort en tupel, expirationd_breakable kontrollerar om vi behöver gå vidare.

Expirationd_iterate funktionskod:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funktionskod expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete funktionskod:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funktionskod expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

ansökan

Du kan se källkoden på här!

Källa: will.com

Lägg en kommentar