Il y a quelque temps, nous avons été confrontés au problème du nettoyage des tuples dans les espaces
Un bon exemple pour nous était le module tarantool appelé
description
La documentation de tarantool a un très bon
Commençons de loin et regardons à quoi ressemble un module expiré plafonné de l'extérieur :
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Pour plus de simplicité, nous lançons tarantool dans le répertoire où se trouve notre bibliothèque libcapped-expirationd.so. Deux fonctions sont exportées depuis la bibliothèque : start et kill. La première étape consiste à rendre ces fonctions disponibles depuis Lua en utilisant box.schema.func.create et box.schema.user.grant. Créez ensuite un espace dont les tuples ne contiendront que trois champs : le premier est un identifiant unique, le deuxième est l'e-mail et le troisième est la durée de vie du tuple. Nous construisons un index arborescent au-dessus du premier champ et l'appelons primaire. Ensuite, nous obtenons l'objet de connexion à notre bibliothèque native.
Après les travaux préparatoires, exécutez la fonction start :
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Cet exemple fonctionnera pendant l'analyse exactement de la même manière que le module expiré, qui est écrit en Lua. Le premier argument de la fonction start est le nom unique de la tâche. Le second est l'identifiant de l'espace. Le troisième est un index unique par lequel les tuples seront supprimés. Le quatrième est l'index par lequel les tuples seront parcourus. Le cinquième est le numéro du champ de tuple avec durée de vie (la numérotation commence à 1, pas à 0 !). Les sixième et septième sont les paramètres de numérisation. 1024 est le nombre maximum de tuples pouvant être visualisés en une seule transaction. 3600 — durée d'analyse complète en secondes.
Notez que l'exemple utilise le même index pour l'exploration et la suppression. S'il s'agit d'un index d'arbre, alors le parcours est effectué de la plus petite clé à la plus grande. S'il existe un autre index de hachage, par exemple, le parcours est généralement effectué dans un ordre aléatoire. Tous les tuples spatiaux sont analysés en une seule analyse.
Insérons plusieurs tuples dans l'espace avec une durée de vie de 60 secondes :
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Vérifions que l'insertion a réussi :
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Répétons la sélection après plus de 60 secondes (à compter du début de l'insertion du premier tuple) et voyons que le module expiré plafonné a déjà été traité :
tarantool> box.space.tester.index.primary:select()
---
- []
...
Arrêtons la tâche :
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Examinons un deuxième exemple dans lequel un index distinct est utilisé pour l'exploration :
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Tout ici est le même que dans le premier exemple, à quelques exceptions près. Nous construisons un index d'arborescence au-dessus du troisième champ et l'appelons exp. Cet index ne doit pas nécessairement être unique, contrairement à l'index dit primaire. La traversée sera effectuée par index exp, et la suppression par primaire. Nous nous souvenons qu'auparavant, les deux étaient effectués uniquement en utilisant l'index primaire.
Après le travail préparatoire, nous exécutons la fonction start avec de nouveaux arguments :
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Insérons à nouveau plusieurs tuples dans l'espace avec une durée de vie de 60 secondes :
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Après 30 secondes, par analogie, nous ajouterons quelques tuples supplémentaires :
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Vérifions que l'insertion a réussi :
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Répétons la sélection après plus de 60 secondes (à compter du début de l'insertion du premier tuple) et voyons que le module expiré plafonné a déjà été traité :
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Il reste encore quelques tuples dans l’espace qui auront encore environ 30 secondes à vivre. De plus, l'analyse s'est arrêtée lors du passage d'un tuple avec un ID de 2 et une durée de vie de 1576421257 à un tuple avec un ID de 3 et une durée de vie de 1576421287. Les tuples avec une durée de vie de 1576421287 ou plus n'ont pas été analysés en raison de l'ordre de les clés d'index exp. Ce sont les économies que nous souhaitions réaliser au tout début.
Arrêtons la tâche :
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
exécution
La meilleure façon de présenter toutes les caractéristiques d’un projet est sa source originale.
Les arguments que nous transmettons à la méthode start sont stockés dans une structure appelée expirationd_task :
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
L'attribut name est le nom de la tâche. L'attribut space_id est l'identifiant de l'espace. L'attribut rm_index_id est l'identifiant de l'index unique par lequel les tuples seront supprimés. L'attribut it_index_id est l'identifiant de l'index par lequel les tuples seront parcourus. L'attribut it_index_type est le type d'index par lequel les tuples seront parcourus. L'attribut field_no est le numéro du champ tuple avec durée de vie. L'attribut scan_size correspond au nombre maximum de tuples analysés en une seule transaction. L'attribut scan_time correspond à la durée totale de l'analyse en secondes.
Nous n'envisagerons pas d'analyser les arguments. C'est un travail minutieux mais simple, pour lequel la bibliothèque vous aidera
Nous répertorions les prototypes de toutes les fonctions utilisées pour l'analyse :
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Passons maintenant à la chose la plus importante : la logique du contournement de l'espace et de la suppression des tuples. Chaque bloc de tuples ne dépassant pas scan_size est analysé et modifié en une seule transaction. En cas de succès, cette transaction est validée ; si une erreur se produit, elle est annulée. Le dernier argument de la fonction expirationd_iterate est un pointeur vers l'itérateur à partir duquel l'analyse commence ou se poursuit. Cet itérateur est incrémenté en interne jusqu'à ce qu'une erreur se produise, que l'espace soit épuisé ou qu'il ne soit pas possible d'arrêter le processus à l'avance. La fonction expirationd_expired vérifie la durée de vie d'un tuple, expirationd_delete supprime un tuple, expirationd_breakable vérifie si nous devons continuer.
Code de fonction Expirationd_iterate :
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Code de fonction expirationd_expired :
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Code de fonction Expirationd_delete :
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Code de fonction expiration_breakable :
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Application
Vous pouvez consulter le code source sur
Source: habr.com