Écrire notre propre module plafonné pour tarantool

Écrire notre propre module plafonné pour tarantool

Il y a quelque temps, nous avons été confrontés au problème du nettoyage des tuples dans les espaces tarentule. Le nettoyage ne devait pas être démarré lorsque tarantool manquait déjà de mémoire, mais à l'avance et à une certaine fréquence. Pour cette tâche, tarantool dispose d'un module écrit en Lua appelé expiration. Après avoir utilisé ce module pendant une courte période, nous avons réalisé qu'il ne nous convenait pas : en raison du nettoyage constant de grandes quantités de données, Lua s'est bloqué dans le GC. Par conséquent, nous avons pensé à développer notre propre module plafonné expirationd, en espérant que le code écrit dans un langage de programmation natif résoudrait nos problèmes de la meilleure façon possible.

Un bon exemple pour nous était le module tarantool appelé Memcached. L'approche utilisée est basée sur le fait qu'un champ séparé est créé dans l'espace, qui indique la durée de vie du tuple, en d'autres termes, ttl. Le module en arrière-plan scanne l'espace, compare le TTL avec l'heure actuelle et décide de supprimer ou non le tuple. Le code du module memcached est simple et élégant, mais trop générique. Premièrement, il ne prend pas en compte le type d’index exploré et supprimé. Deuxièmement, à chaque passage, tous les tuples sont analysés, dont le nombre peut être assez important. Et si dans le module expiré le premier problème était résolu (l'index de l'arborescence était séparé en une classe distincte), alors le second ne recevait toujours aucune attention. Ces trois points ont prédéterminé le choix d'écrire mon propre code.

description

La documentation de tarantool a un très bon Didacticiel sur comment écrire vos procédures stockées en C. Tout d'abord, je vous propose de vous familiariser avec celui-ci afin de comprendre les insertions de commandes et de code qui apparaîtront ci-dessous. Il convient également de prêter attention référence aux objets disponibles lors de l'écriture de votre propre module plafonné, à savoir boîte, fibre, indice и txn.

Commençons de loin et regardons à quoi ressemble un module expiré plafonné de l'extérieur :

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Pour plus de simplicité, nous lançons tarantool dans le répertoire où se trouve notre bibliothèque libcapped-expirationd.so. Deux fonctions sont exportées depuis la bibliothèque : start et kill. La première étape consiste à rendre ces fonctions disponibles depuis Lua en utilisant box.schema.func.create et box.schema.user.grant. Créez ensuite un espace dont les tuples ne contiendront que trois champs : le premier est un identifiant unique, le deuxième est l'e-mail et le troisième est la durée de vie du tuple. Nous construisons un index arborescent au-dessus du premier champ et l'appelons primaire. Ensuite, nous obtenons l'objet de connexion à notre bibliothèque native.

Après les travaux préparatoires, exécutez la fonction start :

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Cet exemple fonctionnera pendant l'analyse exactement de la même manière que le module expiré, qui est écrit en Lua. Le premier argument de la fonction start est le nom unique de la tâche. Le second est l'identifiant de l'espace. Le troisième est un index unique par lequel les tuples seront supprimés. Le quatrième est l'index par lequel les tuples seront parcourus. Le cinquième est le numéro du champ de tuple avec durée de vie (la numérotation commence à 1, pas à 0 !). Les sixième et septième sont les paramètres de numérisation. 1024 est le nombre maximum de tuples pouvant être visualisés en une seule transaction. 3600 — durée d'analyse complète en secondes.

Notez que l'exemple utilise le même index pour l'exploration et la suppression. S'il s'agit d'un index d'arbre, alors le parcours est effectué de la plus petite clé à la plus grande. S'il existe un autre index de hachage, par exemple, le parcours est généralement effectué dans un ordre aléatoire. Tous les tuples spatiaux sont analysés en une seule analyse.

Insérons plusieurs tuples dans l'espace avec une durée de vie de 60 secondes :

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Vérifions que l'insertion a réussi :

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Répétons la sélection après plus de 60 secondes (à compter du début de l'insertion du premier tuple) et voyons que le module expiré plafonné a déjà été traité :

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Arrêtons la tâche :

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Examinons un deuxième exemple dans lequel un index distinct est utilisé pour l'exploration :

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Tout ici est le même que dans le premier exemple, à quelques exceptions près. Nous construisons un index d'arborescence au-dessus du troisième champ et l'appelons exp. Cet index ne doit pas nécessairement être unique, contrairement à l'index dit primaire. La traversée sera effectuée par index exp, et la suppression par primaire. Nous nous souvenons qu'auparavant, les deux étaient effectués uniquement en utilisant l'index primaire.

Après le travail préparatoire, nous exécutons la fonction start avec de nouveaux arguments :

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Insérons à nouveau plusieurs tuples dans l'espace avec une durée de vie de 60 secondes :

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Après 30 secondes, par analogie, nous ajouterons quelques tuples supplémentaires :

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Vérifions que l'insertion a réussi :

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Répétons la sélection après plus de 60 secondes (à compter du début de l'insertion du premier tuple) et voyons que le module expiré plafonné a déjà été traité :

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Il reste encore quelques tuples dans l’espace qui auront encore environ 30 secondes à vivre. De plus, l'analyse s'est arrêtée lors du passage d'un tuple avec un ID de 2 et une durée de vie de 1576421257 à un tuple avec un ID de 3 et une durée de vie de 1576421287. Les tuples avec une durée de vie de 1576421287 ou plus n'ont pas été analysés en raison de l'ordre de les clés d'index exp. Ce sont les économies que nous souhaitions réaliser au tout début.

Arrêtons la tâche :

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

exécution

La meilleure façon de présenter toutes les caractéristiques d’un projet est sa source originale. code! Dans le cadre de la publication, nous nous concentrerons uniquement sur les points les plus importants, à savoir les algorithmes de contournement spatial.

Les arguments que nous transmettons à la méthode start sont stockés dans une structure appelée expirationd_task :

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

L'attribut name est le nom de la tâche. L'attribut space_id est l'identifiant de l'espace. L'attribut rm_index_id est l'identifiant de l'index unique par lequel les tuples seront supprimés. L'attribut it_index_id est l'identifiant de l'index par lequel les tuples seront parcourus. L'attribut it_index_type est le type d'index par lequel les tuples seront parcourus. L'attribut field_no est le numéro du champ tuple avec durée de vie. L'attribut scan_size correspond au nombre maximum de tuples analysés en une seule transaction. L'attribut scan_time correspond à la durée totale de l'analyse en secondes.

Nous n'envisagerons pas d'analyser les arguments. C'est un travail minutieux mais simple, pour lequel la bibliothèque vous aidera msgpuck. Des difficultés ne peuvent survenir qu'avec les index transmis depuis Lua sous la forme d'une structure de données complexe avec le type mp_map, et n'utilisant pas les types simples mp_bool, mp_double, mp_int, mp_uint et mp_array. Mais il n’est pas nécessaire d’analyser l’intégralité de l’index. Il suffit de vérifier son unicité, de calculer le type et d'extraire l'identifiant.

Nous répertorions les prototypes de toutes les fonctions utilisées pour l'analyse :

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Passons maintenant à la chose la plus importante : la logique du contournement de l'espace et de la suppression des tuples. Chaque bloc de tuples ne dépassant pas scan_size est analysé et modifié en une seule transaction. En cas de succès, cette transaction est validée ; si une erreur se produit, elle est annulée. Le dernier argument de la fonction expirationd_iterate est un pointeur vers l'itérateur à partir duquel l'analyse commence ou se poursuit. Cet itérateur est incrémenté en interne jusqu'à ce qu'une erreur se produise, que l'espace soit épuisé ou qu'il ne soit pas possible d'arrêter le processus à l'avance. La fonction expirationd_expired vérifie la durée de vie d'un tuple, expirationd_delete supprime un tuple, expirationd_breakable vérifie si nous devons continuer.

Code de fonction Expirationd_iterate :

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Code de fonction expirationd_expired :

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Code de fonction Expirationd_delete :

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Code de fonction expiration_breakable :

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Application

Vous pouvez consulter le code source sur ici!

Source: habr.com

Ajouter un commentaire