Scrivere il nostro modulo con scadenza limitata per tarantool

Scrivere il nostro modulo con scadenza limitata per tarantool

Qualche tempo fa ci siamo trovati di fronte al problema della pulizia delle tuple negli spazi tarantol. La pulizia doveva essere avviata non quando tarantool aveva già esaurito la memoria, ma in anticipo e con una certa frequenza. Per questo compito, tarantool ha un modulo scritto in Lua chiamato scadenza. Dopo aver utilizzato questo modulo per un breve periodo, ci siamo resi conto che non era adatto a noi: a causa della costante pulizia di grandi quantità di dati, Lua è rimasta bloccata nel GC. Pertanto, abbiamo pensato di sviluppare il nostro modulo con scadenza limitata, sperando che il codice scritto in un linguaggio di programmazione nativo risolvesse i nostri problemi nel miglior modo possibile.

Un buon esempio per noi è stato il modulo tarantool chiamato memcached. L'approccio utilizzato in esso si basa sul fatto che nello spazio viene creato un campo separato, che indica la durata della tupla, in altre parole, ttl. Il modulo in background scansiona lo spazio, confronta il TTL con l'ora corrente e decide se eliminare o meno la tupla. Il codice del modulo memcached è semplice ed elegante, ma troppo generico. Innanzitutto, non tiene conto del tipo di indice sottoposto a scansione ed eliminazione. In secondo luogo, ad ogni passaggio vengono scansionate tutte le tuple, il cui numero può essere piuttosto grande. E se nel modulo expirationd il primo problema è stato risolto (l'indice dell'albero è stato separato in una classe separata), il secondo non ha ancora ricevuto alcuna attenzione. Questi tre punti hanno predeterminato la scelta a favore di scrivere il mio codice.

descrizione

La documentazione per tarantool è molto buona tutorial su come scrivere le vostre stored procedure in C. Innanzitutto vi suggerisco di familiarizzare con essa per comprendere quegli inserti con comandi e codice che appariranno di seguito. Vale anche la pena prestare attenzione riferimento agli oggetti disponibili quando si scrive il proprio modulo limitato, vale a dire nella scatola, fibra, Index и grazie.

Partiamo da lontano e vediamo come appare dall'esterno un modulo con scadenza limitata:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Per semplicità, lanciamo tarantool nella directory in cui si trova la nostra libreria libcapped-expirationd.so. Dalla libreria vengono esportate due funzioni: start e kill. Il primo passo è rendere queste funzioni disponibili da Lua utilizzando box.schema.func.create e box.schema.user.grant. Quindi crea uno spazio le cui tuple conterranno solo tre campi: il primo è un identificatore univoco, il secondo è l'e-mail e il terzo è la durata della tupla. Costruiamo un indice ad albero sopra il primo campo e lo chiamiamo primario. Successivamente otteniamo l'oggetto connessione nella nostra libreria nativa.

Dopo il lavoro preparatorio, esegui la funzione di avvio:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Questo esempio funzionerà durante la scansione esattamente come il modulo expirationd, che è scritto in Lua. Il primo argomento della funzione start è il nome univoco dell'attività. Il secondo è l'identificatore dello spazio. Il terzo è un indice univoco mediante il quale verranno eliminate le tuple. Il quarto è l'indice con cui verranno attraversate le tuple. Il quinto è il numero del campo tupla con durata (la numerazione inizia da 1, non da 0!). Il sesto e il settimo sono impostazioni di scansione. 1024 è il numero massimo di tuple che possono essere visualizzate in una singola transazione. 3600: tempo di scansione completa in secondi.

Tieni presente che l'esempio utilizza lo stesso indice per la scansione e l'eliminazione. Se si tratta di un indice ad albero, allora l'attraversamento viene effettuato dalla chiave più piccola a quella più grande. Se esiste un altro indice hash, ad esempio, l'attraversamento viene eseguito, di regola, in ordine casuale. Tutte le tuple spaziali vengono scansionate in un'unica scansione.

Inseriamo diverse tuple nello spazio con una durata di 60 secondi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Verifichiamo che l'inserimento sia andato a buon fine:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Ripetiamo la selezione dopo 60+ secondi (contando dall'inizio dell'inserimento della prima tupla) e vediamo che il modulo capped expirationd ha già funzionato:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Interrompiamo il compito:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Diamo un'occhiata a un secondo esempio in cui viene utilizzato un indice separato per la scansione:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Tutto qui è uguale al primo esempio, con poche eccezioni. Costruiamo un indice ad albero sopra il terzo campo e lo chiamiamo exp. Questo indice non deve essere univoco, a differenza dell'indice chiamato primario. L'attraversamento verrà effettuato per indice exp e l'eliminazione per primario. Ricordiamo che in precedenza entrambe venivano eseguite utilizzando solo l'indice primario.

Dopo il lavoro preparatorio, eseguiamo la funzione start con nuovi argomenti:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Inseriamo nuovamente diverse tuple nello spazio con una durata di 60 secondi:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Dopo 30 secondi, per analogia, aggiungeremo qualche altra tupla:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Verifichiamo che l'inserimento sia andato a buon fine:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ripetiamo la selezione dopo 60+ secondi (contando dall'inizio dell'inserimento della prima tupla) e vediamo che il modulo capped expirationd ha già funzionato:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Ci sono ancora alcune tuple rimaste nello spazio che avranno circa altri 30 secondi di vita. Inoltre, la scansione si interrompeva quando si passava da una tupla con un ID pari a 2 e una durata pari a 1576421257 a una tupla con un ID pari a 3 e una durata pari a 1576421287. Le tuple con una durata pari a 1576421287 o più non venivano scansionate a causa dell'ordinamento di i tasti dell'indice exp. Questo è il risparmio che volevamo ottenere fin dall'inizio.

Interrompiamo il compito:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

implementazione

Il modo migliore per raccontare tutte le caratteristiche di un progetto è la sua fonte originale. codice! Nell'ambito della pubblicazione, ci concentreremo solo sui punti più importanti, vale a dire gli algoritmi di bypass spaziale.

Gli argomenti che passiamo al metodo start sono memorizzati in una struttura chiamata expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

L'attributo name è il nome dell'attività. L'attributo space_id è l'identificatore dello spazio. L'attributo rm_index_id è l'identificatore dell'indice univoco con cui verranno eliminate le tuple. L'attributo it_index_id è l'identificatore dell'indice mediante il quale verranno attraversate le tuple. L'attributo it_index_type è il tipo di indice con cui verranno attraversate le tuple. L'attributo filed_no è il numero del campo tupla con durata. L'attributo scan_size rappresenta il numero massimo di tuple scansionate in una transazione. L'attributo scan_time è il tempo di scansione completo in secondi.

Non prenderemo in considerazione l'analisi degli argomenti. Questo è un lavoro minuzioso ma semplice, in cui la biblioteca ti aiuterà msgpuck. Le difficoltà possono sorgere solo con gli indici che vengono passati da Lua come struttura dati complessa con il tipo mp_map e non utilizzando i tipi semplici mp_bool, mp_double, mp_int, mp_uint e mp_array. Ma non è necessario analizzare l'intero indice. Devi solo verificarne l'unicità, calcolare la tipologia ed estrarre l'identificatore.

Elenchiamo i prototipi di tutte le funzioni utilizzate per l'analisi:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Passiamo ora alla cosa più importante: la logica di bypassare lo spazio ed eliminare le tuple. Ogni blocco di tuple non più grande di scan_size viene scansionato e modificato in una singola transazione. In caso di successo, questa transazione viene confermata; se si verifica un errore, viene eseguito il rollback. L'ultimo argomento della funzione expirationd_iterate è un puntatore all'iteratore da cui inizia o continua la scansione. Questo iteratore viene incrementato internamente fino a quando non si verifica un errore, lo spazio si esaurisce o non è possibile interrompere il processo in anticipo. La funzione expirationd_expired controlla la durata di una tupla, expirationd_delete cancella una tupla, expirationd_breakable controlla se dobbiamo andare avanti.

Codice funzione Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Codice funzione expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Codice funzione Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Codice funzione Expirationd_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

applicazione

È possibile visualizzare il codice sorgente su qui!

Fonte: habr.com

Aggiungi un commento