Het schrijven van onze eigen afgetopte vervalmodule voor tarantool

Het schrijven van onze eigen afgetopte vervalmodule voor tarantool

Enige tijd geleden werden we geconfronteerd met het probleem van het schoonmaken van tupels in ruimtes tarantool. Het schoonmaken moest niet worden gestart toen tarantool al bijna geen geheugen meer had, maar van tevoren en met een bepaalde frequentie. Voor deze taak heeft tarantool een module geschreven in Lua genaamd vervaldatum. Nadat we deze module een korte tijd hadden gebruikt, beseften we dat deze niet geschikt was voor ons: door het voortdurend opschonen van grote hoeveelheden gegevens hing Lua in de GC. Daarom dachten we erover om onze eigen module met beperkte vervaldatum te ontwikkelen, in de hoop dat de code geschreven in een eigen programmeertaal onze problemen op de best mogelijke manier zou oplossen.

Een goed voorbeeld voor ons was de tarantool-module genaamd memcached. De aanpak die daarin wordt gebruikt, is gebaseerd op het feit dat er in de ruimte een apart veld wordt gecreëerd dat de levensduur van het tupel aangeeft, met andere woorden ttl. De module op de achtergrond scant de ruimte, vergelijkt de TTL met de huidige tijd en beslist of de tuple wordt verwijderd of niet. De opgeslagen modulecode is eenvoudig en elegant, maar te algemeen. Ten eerste wordt er geen rekening gehouden met het type index dat wordt gecrawld en verwijderd. Ten tweede worden bij elke doorgang alle tupels gescand, waarvan het aantal behoorlijk groot kan zijn. En als in de verlopen module het eerste probleem was opgelost (de boomindex was opgesplitst in een aparte klasse), dan kreeg de tweede nog steeds geen aandacht. Deze drie punten bepaalden vooraf de keuze om mijn eigen code te schrijven.

beschrijving

De documentatie voor tarantool heeft een zeer goed zelfstudie over hoe u uw opgeslagen procedures in C schrijft. Allereerst raad ik u aan er vertrouwd mee te raken, zodat u de invoegingen met opdrachten en code die hieronder zullen verschijnen, begrijpt. Het is ook de moeite waard om op te letten referentie naar objecten die beschikbaar zijn bij het schrijven van uw eigen afgedekte module, namelijk doos, vezel, index и bedankt.

Laten we van veraf beginnen en kijken hoe een afgetopte verlopen module er van buitenaf uitziet:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Voor de eenvoud starten we tarantool in de map waar onze libcapped-expirationd.so-bibliotheek zich bevindt. Er worden twee functies uit de bibliotheek geëxporteerd: starten en doden. De eerste stap is om deze functies beschikbaar te maken vanuit Lua met behulp van box.schema.func.create en box.schema.user.grant. Maak vervolgens een ruimte waarvan de tupels slechts drie velden bevatten: de eerste is een unieke identificatie, de tweede is e-mail en de derde is de levensduur van de tupel. We bouwen een boomindex bovenop het eerste veld en noemen deze primair. Vervolgens krijgen we het verbindingsobject naar onze eigen bibliotheek.

Voer na het voorbereidende werk de startfunctie uit:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Dit voorbeeld werkt tijdens het scannen precies hetzelfde als de verlopen module, die in Lua is geschreven. Het eerste argument voor de startfunctie is de unieke naam van de taak. De tweede is de ruimte-ID. De derde is een unieke index waarmee tupels worden verwijderd. De vierde is de index waarmee de tupels zullen worden doorlopen. De vijfde is het nummer van het tupelveld met levensduur (nummering begint bij 1, niet bij 0!). De zesde en zevende zijn scaninstellingen. 1024 is het maximale aantal tupels dat in één transactie kan worden bekeken. 3600 — volledige scantijd in seconden.

Houd er rekening mee dat in het voorbeeld dezelfde index wordt gebruikt voor crawlen en verwijderen. Als dit een boomindex is, wordt de verplaatsing uitgevoerd van de kleinere sleutel naar de grotere. Als er een andere is, bijvoorbeeld een hash-index, wordt de doortocht in de regel in willekeurige volgorde uitgevoerd. Alle ruimte-tupels worden in één scan gescand.

Laten we verschillende tupels in de ruimte plaatsen met een levensduur van 60 seconden:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Laten we controleren of het invoegen is gelukt:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Laten we de selectie herhalen na meer dan 60 seconden (gerekend vanaf het begin van het invoegen van de eerste tuple) en kijken of de afgetopte expiratiemodule al is verwerkt:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Laten we de taak stoppen:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Laten we een tweede voorbeeld bekijken waarbij een afzonderlijke index wordt gebruikt voor de crawl:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Alles is hier hetzelfde als in het eerste voorbeeld, op enkele uitzonderingen na. We bouwen een boomindex bovenop het derde veld en noemen deze exp. Deze index hoeft niet uniek te zijn, in tegenstelling tot de index die primair wordt genoemd. Traversal wordt uitgevoerd door exp-index en verwijdering door primair. We herinneren ons dat beide voorheen alleen met behulp van de primaire index werden gedaan.

Na het voorbereidende werk voeren we de startfunctie uit met nieuwe argumenten:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Laten we opnieuw verschillende tupels in de ruimte plaatsen met een levensduur van 60 seconden:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Na 30 seconden voegen we naar analogie nog een paar tupels toe:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Laten we controleren of het invoegen is gelukt:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Laten we de selectie herhalen na meer dan 60 seconden (gerekend vanaf het begin van het invoegen van de eerste tuple) en kijken of de afgetopte expiratiemodule al is verwerkt:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Er zijn nog enkele tupels over in de ruimte die nog ongeveer 30 seconden te leven hebben. Bovendien stopte de scan bij het overgaan van een tupel met een ID van 2 en een levensduur van 1576421257 naar een tupel met een ID van 3 en een levensduur van 1576421287. Tupels met een levensduur van 1576421287 of meer werden niet gescand vanwege de volgorde van de exp-indexsleutels. Dit is de besparing die we vanaf het allereerste begin wilden realiseren.

Laten we de taak stoppen:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

uitvoering

De beste manier om over alle kenmerken van een project te vertellen, is de oorspronkelijke bron. code! Als onderdeel van de publicatie zullen we ons alleen concentreren op de belangrijkste punten, namelijk ruimte-bypass-algoritmen.

De argumenten die we doorgeven aan de startmethode worden opgeslagen in een structuur genaamd expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Het naamattribuut is de naam van de taak. Het space_id attribuut is de spatie-ID. Het attribuut rm_index_id is de identificatie van de unieke index waarmee tupels worden verwijderd. Het it_index_id attribuut is de identificatie van de index waarmee tupels zullen worden doorlopen. Het attribuut it_index_type is het type index waarmee tupels worden doorlopen. Het filed_no attribuut is het nummer van het tupelveld met levensduur. Het kenmerk scan_size is het maximale aantal tupels dat in één transactie wordt gescand. Het kenmerk scan_time is de volledige scantijd in seconden.

We zullen geen rekening houden met het ontleden van argumenten. Dit is een moeizaam maar eenvoudig klusje, waarbij de bibliotheek u helpt berichtpuck. Er kunnen alleen problemen optreden met indexen die vanuit Lua worden doorgegeven als een complexe datastructuur met het mp_map-type, en niet met behulp van de eenvoudige typen mp_bool, mp_double, mp_int, mp_uint en mp_array. Maar het is niet nodig om de hele index te ontleden. U hoeft alleen maar de uniciteit ervan te controleren, het type te berekenen en de identificatie te extraheren.

We vermelden de prototypes van alle functies die worden gebruikt voor het parseren:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Laten we nu verder gaan met het belangrijkste: de logica van het omzeilen van spatie en het verwijderen van tupels. Elk blok met tupels dat niet groter is dan scan_size, wordt in één enkele transactie gescand en gewijzigd. Als deze succesvol is, wordt deze transactie vastgelegd; als er een fout optreedt, wordt deze teruggedraaid. Het laatste argument voor de functie expirationd_iterate is een verwijzing naar de iterator van waaruit het scannen begint of doorgaat. Deze iterator wordt intern opgehoogd totdat er een fout optreedt, de ruimte opraakt of het niet mogelijk is om het proces vooraf te stoppen. De functie expirationd_expired controleert de levensduur van een tupel, expirationd_delete verwijdert een tupel, expirationd_breakable controleert of we verder moeten.

Expirationd_iterate functiecode:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Functiecode expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete functiecode:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Functiecode expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

toepassing

Je kunt de broncode bekijken op hier!

Bron: www.habr.com

Voeg een reactie