Réplication de haut niveau dans le SGBD Tarantool

Bonjour, je crée des applications pour SGBD Tarantoutil est une plateforme développée par Mail.ru Group qui combine un SGBD performant et un serveur d'applications en langage Lua. La vitesse élevée des solutions basées sur Tarantool est obtenue notamment grâce à la prise en charge du mode in-memory du SGBD et à la capacité d'exécuter la logique métier des applications dans un seul espace d'adressage avec les données. Dans le même temps, la persistance des données est assurée grâce aux transactions ACID (un journal WAL est conservé sur le disque). Tarantool prend en charge la réplication et le partitionnement. A partir de la version 2.1, les requêtes en langage SQL sont prises en charge. Tarantool est open source et sous licence BSD simplifiée. Il existe également une version commerciale Entreprise.

Réplication de haut niveau dans le SGBD Tarantool
Ressentez la puissance! (…c'est-à-dire profitez de la performance)

Tout ce qui précède fait de Tarantool une plate-forme attrayante pour créer des applications à forte charge fonctionnant avec des bases de données. Dans de telles applications, la réplication des données est souvent nécessaire.

Comme mentionné ci-dessus, Tarantool dispose d'une réplication de données intégrée. Le principe de son fonctionnement est d'exécuter séquentiellement sur les réplicas toutes les transactions contenues dans le master log (WAL). Habituellement, une telle réplication (nous l'appellerons plus loin niveau faible) est utilisé pour garantir la tolérance aux pannes des applications et/ou pour répartir la charge de lecture entre les nœuds du cluster.

Réplication de haut niveau dans le SGBD Tarantool
Riz. 1. Réplication au sein d'un cluster

Un exemple de scénario alternatif consisterait à transférer des données créées dans une base de données vers une autre base de données à des fins de traitement/surveillance. Dans ce dernier cas, une solution plus pratique peut consister à utiliser haut niveau réplication - réplication des données au niveau de la logique métier de l'application. Ceux. Nous n'utilisons pas de solution toute faite intégrée au SGBD, mais implémentons nous-mêmes la réplication au sein de l'application que nous développons. Cette approche présente à la fois des avantages et des inconvénients. Listons les avantages.

1. Économies de trafic :

  • Vous ne pouvez pas transférer toutes les données, mais seulement une partie (par exemple, vous pouvez transférer uniquement certaines tables, certaines de leurs colonnes ou enregistrements qui répondent à un certain critère) ;
  • Contrairement à la réplication de bas niveau, qui est effectuée en continu en mode asynchrone (implémenté dans la version actuelle de Tarantool - 1.10) ou synchrone (à implémenter dans les versions ultérieures de Tarantool), la réplication de haut niveau peut être effectuée en sessions (c'est-à-dire, le l'application synchronise d'abord les données - une session d'échange de données, puis il y a une pause dans la réplication, après quoi la prochaine session d'échange a lieu, etc.) ;
  • si un enregistrement a changé plusieurs fois, vous ne pouvez transférer que sa dernière version (contrairement à la réplication de bas niveau, dans laquelle toutes les modifications apportées sur le maître seront relues séquentiellement sur les répliques).

2. Il n'y a aucune difficulté à mettre en œuvre l'échange HTTP, qui vous permet de synchroniser des bases de données distantes.

Réplication de haut niveau dans le SGBD Tarantool
Riz. 2. Réplication sur HTTP

3. Les structures de bases de données entre lesquelles les données sont transférées ne doivent pas nécessairement être les mêmes (de plus, dans le cas général, il est même possible d'utiliser différents SGBD, langages de programmation, plateformes, etc.).

Réplication de haut niveau dans le SGBD Tarantool
Riz. 3. Réplication dans des systèmes hétérogènes

L'inconvénient est qu'en moyenne, la programmation est plus difficile/coûteuse que la configuration, et au lieu de personnaliser les fonctionnalités intégrées, vous devrez implémenter les vôtres.

Si dans votre situation les avantages ci-dessus sont cruciaux (ou constituent une condition nécessaire), alors il est logique d'utiliser la réplication de haut niveau. Examinons plusieurs façons d'implémenter la réplication de données de haut niveau dans le SGBD Tarantool.

Minimisation du trafic

Ainsi, l’un des avantages de la réplication de haut niveau réside dans les économies de trafic. Pour que cet avantage soit pleinement exploité, il est nécessaire de minimiser la quantité de données transférées lors de chaque session d'échange. Bien entendu, il ne faut pas oublier qu'à la fin de la session, le récepteur des données doit être synchronisé avec la source (au moins pour la partie des données impliquée dans la réplication).

Comment minimiser la quantité de données transférées lors d’une réplication de haut niveau ? Une solution simple pourrait consister à sélectionner les données par date et heure. Pour ce faire, vous pouvez utiliser le champ date-heure déjà existant dans la table (s'il existe). Par exemple, un document « commande » peut avoir un champ « délai d'exécution de la commande requis » - delivery_time. Le problème avec cette solution est que les valeurs de ce champ ne doivent pas nécessairement être dans l'ordre qui correspond à la création des commandes. Nous ne pouvons donc pas nous souvenir de la valeur maximale du champ delivery_time, transmis lors de la session d'échange précédente, et lors de la session d'échange suivante, sélectionnez tous les enregistrements avec une valeur de champ plus élevée delivery_time. Des enregistrements avec une valeur de champ inférieure peuvent avoir été ajoutés entre les sessions d'échange delivery_time. Aussi, l'ordre aurait pu subir des modifications, qui n'ont néanmoins pas affecté le domaine delivery_time. Dans les deux cas, les modifications ne seront pas transférées de la source vers la destination. Pour résoudre ces problèmes, nous devrons transférer des données « qui se chevauchent ». Ceux. à chaque session d'échange, nous transférerons toutes les données avec la valeur du champ delivery_time, dépassant un certain point dans le passé (par exemple, N heures à partir du moment actuel). Cependant, il est évident que pour les grands systèmes, cette approche est hautement redondante et peut réduire à néant les économies de trafic que nous recherchons. De plus, la table en cours de transfert ne peut pas comporter de champ associé à une date-heure.

Une autre solution, plus complexe en termes de mise en œuvre, consiste à accuser réception des données. Dans ce cas, lors de chaque session d'échange, toutes les données sont transmises dont la réception n'a pas été confirmée par le destinataire. Pour implémenter cela, vous devrez ajouter une colonne booléenne à la table source (par exemple, is_transferred). Si le destinataire accuse réception de l'enregistrement, le champ correspondant prend la valeur true, après quoi l'entrée n'est plus impliquée dans les échanges. Cette option de mise en œuvre présente les inconvénients suivants. Premièrement, pour chaque enregistrement transféré, un accusé de réception doit être généré et envoyé. En gros, cela pourrait équivaloir à doubler la quantité de données transférées et à doubler le nombre d’allers-retours. Deuxièmement, il n'est pas possible d'envoyer le même enregistrement à plusieurs destinataires (le premier destinataire accusera réception pour lui-même et pour tous les autres).

Une méthode qui ne présente pas les inconvénients indiqués ci-dessus consiste à ajouter une colonne au tableau transmis pour suivre les modifications apportées à ses lignes. Une telle colonne peut être de type date-heure et doit être définie/mise à jour par l'application à l'heure actuelle à chaque fois que des enregistrements sont ajoutés/modifiés (de manière atomique avec l'ajout/la modification). A titre d'exemple, appelons la colonne update_time. En enregistrant la valeur maximale du champ de cette colonne pour les enregistrements transférés, nous pouvons démarrer la prochaine session d'échange avec cette valeur (sélectionner les enregistrements avec la valeur du champ update_time, dépassant la valeur précédemment stockée). Le problème avec cette dernière approche est que les modifications des données peuvent se produire par lots. En raison des valeurs des champs dans la colonne update_time n'est peut-être pas unique. Ainsi, cette colonne ne peut pas être utilisée pour la sortie de données portionnées (page par page). Pour afficher les données page par page, vous devrez inventer des mécanismes supplémentaires qui seront probablement très peu efficaces (par exemple, récupérer de la base de données tous les enregistrements portant la valeur update_time supérieur à un donné et produisant un certain nombre d'enregistrements, à partir d'un certain décalage par rapport au début de l'échantillon).

Vous pouvez améliorer l'efficacité du transfert de données en améliorant légèrement l'approche précédente. Pour ce faire, nous utiliserons le type entier (entier long) comme valeurs de champ de colonne pour le suivi des modifications. Nommons la colonne row_ver. La valeur du champ de cette colonne doit toujours être définie/mise à jour à chaque fois qu'un enregistrement est créé/modifié. Mais dans ce cas, le champ ne se verra pas attribuer la date-heure actuelle, mais la valeur d'un compteur, augmentée de un. En conséquence, la colonne row_ver contiendra des valeurs uniques et pourra être utilisé non seulement pour afficher des données « delta » (données ajoutées/modifiées depuis la fin de la session d'échange précédente), mais aussi pour les décomposer simplement et efficacement en pages.

La dernière méthode proposée pour minimiser la quantité de données transférées dans le cadre d'une réplication de haut niveau me semble la plus optimale et la plus universelle. Regardons cela plus en détail.

Transmission de données à l'aide d'un compteur de versions de lignes

Mise en place de la partie serveur/maître

Dans MS SQL Server, il existe un type de colonne spécial pour implémenter cette approche : rowversion. Chaque base de données possède un compteur qui augmente de un à chaque fois qu'un enregistrement est ajouté/modifié dans une table comportant une colonne comme rowversion. La valeur de ce compteur est automatiquement affectée au champ de cette colonne dans l'enregistrement ajouté/modifié. Le SGBD Tarantool ne dispose pas d'un mécanisme intégré similaire. Cependant, dans Tarantool, il n’est pas difficile de l’implémenter manuellement. Voyons comment cela se fait.

Tout d'abord, un peu de terminologie : les tables dans Tarantool sont appelées espaces et les enregistrements sont appelés tuples. Dans Tarantool, vous pouvez créer des séquences. Les séquences ne sont rien de plus que des générateurs nommés de valeurs entières ordonnées. Ceux. c'est exactement ce dont nous avons besoin pour nos objectifs. Ci-dessous, nous allons créer une telle séquence.

Avant d'effectuer toute opération de base de données dans Tarantool, vous devez exécuter la commande suivante :

box.cfg{}

En conséquence, Tarantool commencera à écrire des instantanés de base de données et des journaux de transactions dans le répertoire actuel.

Créons une séquence row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })

Option if_not_exists permet d'exécuter le script de création plusieurs fois : si l'objet existe, Tarantool ne tentera pas de le créer à nouveau. Cette option sera utilisée dans toutes les commandes DDL ultérieures.

Créons un espace à titre d'exemple.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

Ici, nous définissons le nom de l'espace (goods), les noms de champs et leurs types.

Les champs à incrémentation automatique dans Tarantool sont également créés à l'aide de séquences. Créons une clé primaire auto-incrémentée par champ id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Tarantool prend en charge plusieurs types d'index. Les index les plus couramment utilisés sont les types TREE et HASH, qui sont basés sur des structures correspondant au nom. TREE est le type d’index le plus polyvalent. Il vous permet de récupérer des données de manière organisée. Mais pour la sélection d'égalité, HASH est plus approprié. En conséquence, il est conseillé d'utiliser HASH pour la clé primaire (c'est ce que nous avons fait).

Pour utiliser la colonne row_ver pour transférer les données modifiées, vous devez lier les valeurs de séquence aux champs de cette colonne row_ver. Mais contrairement à la clé primaire, la valeur du champ de colonne row_ver devrait augmenter de un non seulement lors de l'ajout de nouveaux enregistrements, mais également lors de la modification d'enregistrements existants. Vous pouvez utiliser des déclencheurs pour cela. Tarantool dispose de deux types de déclencheurs spatiaux : before_replace и on_replace. Les déclencheurs sont déclenchés chaque fois que les données dans l'espace changent (pour chaque tuple affecté par les modifications, une fonction de déclenchement est lancée). Contrairement à on_replace, before_replace-triggers permettent de modifier les données du tuple pour lequel le trigger est exécuté. En conséquence, le dernier type de déclencheurs nous convient.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)

Le déclencheur suivant remplace la valeur du champ row_ver tuple stocké à la valeur suivante de la séquence row_version.

Afin de pouvoir extraire des données de l'espace goods par colonne row_ver, créons un index :

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})

Type d'index - arbre (TREE), parce que nous devrons extraire les données par ordre croissant des valeurs de la colonne row_ver.

Ajoutons quelques données à l'espace :

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

Parce que Le premier champ est un compteur auto-incrémenté ; nous passons nil à la place. Tarantool remplacera automatiquement la valeur suivante. De même, comme la valeur des champs de colonne row_ver vous pouvez passer nil - ou ne pas spécifier la valeur du tout, car cette colonne occupe la dernière position dans l'espace.

Vérifions le résultat de l'insertion :

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...

Comme vous pouvez le constater, le premier et le dernier champs sont remplis automatiquement. Il sera désormais facile d'écrire une fonction pour le téléchargement page par page des modifications d'espace goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end

La fonction prend en paramètre la valeur row_ver, à partir duquel il est nécessaire de décharger les modifications, et renvoie une partie des données modifiées.

L'échantillonnage des données dans Tarantool se fait via des index. Fonction get_goods utilise un itérateur par index row_ver pour recevoir les données modifiées. Le type d’itérateur est GT (Greater Than, supérieur à). Cela signifie que l'itérateur parcourra séquentiellement les valeurs d'index à partir de la clé transmise (valeur du champ row_ver).

L'itérateur renvoie des tuples. Afin de pouvoir ensuite transférer des données via HTTP, il est nécessaire de convertir les tuples en une structure pratique pour une sérialisation ultérieure. L'exemple utilise la fonction standard pour cela tomap. À la place d'utiliser tomap vous pouvez écrire votre propre fonction. Par exemple, nous pourrions vouloir renommer un champ name, ne passe pas le terrain code et ajoutez un champ comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end

La taille de la page des données de sortie (le nombre d'enregistrements dans une partie) est déterminée par la variable page_size. Dans l'exemple la valeur page_size est 5. Dans un programme réel, la taille de la page est généralement plus importante. Cela dépend de la taille moyenne du tuple spatial. La taille de page optimale peut être déterminée empiriquement en mesurant le temps de transfert de données. Plus la taille de la page est grande, plus le nombre d’allers-retours entre les côtés expéditeur et récepteur est petit. De cette façon, vous pouvez réduire le temps global de téléchargement des modifications. Cependant, si la taille de la page est trop grande, nous passerons trop de temps sur le serveur à sérialiser l'échantillon. En conséquence, il peut y avoir des retards dans le traitement des autres requêtes arrivant au serveur. Paramètre page_size peut être chargé à partir du fichier de configuration. Pour chaque espace transmis, vous pouvez définir sa propre valeur. Toutefois, pour la plupart des espaces, la valeur par défaut (par exemple, 100) peut convenir.

Exécutons la fonction get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...

Prenons la valeur du champ row_ver à partir de la dernière ligne et appelez à nouveau la fonction :

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...

Et encore:

tarantool> get_goods(8)
---
- []
...

Comme vous pouvez le voir, lorsqu'elle est utilisée de cette manière, la fonction renvoie tous les enregistrements d'espace page par page. goods. La dernière page est suivie d'une sélection vide.

Apportons des modifications à l'espace :

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

Nous avons modifié la valeur du champ name pour une entrée et ajouté deux nouvelles entrées.

Répétons le dernier appel de fonction :

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...

La fonction a renvoyé les enregistrements modifiés et ajoutés. Donc la fonction get_goods permet de recevoir des données modifiées depuis son dernier appel, qui constitue la base de la méthode de réplication considérée.

Nous laisserons la délivrance des résultats via HTTP sous forme de JSON hors du cadre de cet article. Vous pouvez lire à ce sujet ici : https://habr.com/ru/company/mailru/blog/272141/

Implémentation de la partie client/esclave

Regardons à quoi ressemble la mise en œuvre du côté récepteur. Créons un espace côté réception pour stocker les données téléchargées :

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

La structure de l'espace ressemble à la structure de l'espace dans la source. Mais comme nous n'allons transmettre les données reçues nulle part ailleurs, la colonne row_ver n'est pas dans l'espace du destinataire. Sur le terrain id les identifiants de source seront enregistrés. Par conséquent, du côté du récepteur, il n’est pas nécessaire de le rendre auto-incrémenté.

De plus, nous avons besoin d'un espace pour sauvegarder les valeurs row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Pour chaque espace chargé (champ space_name) nous enregistrerons ici la dernière valeur chargée row_ver (champ value). La colonne fait office de clé primaire space_name.

Créons une fonction pour charger les données spatiales goods via HTTP. Pour ce faire, nous avons besoin d'une bibliothèque qui implémente un client HTTP. La ligne suivante charge la bibliothèque et instancie le client HTTP :

local http_client = require('http.client').new()

Nous avons également besoin d'une bibliothèque pour la désérialisation json :

local json = require('json')

Cela suffit pour créer une fonction de chargement de données :

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end

La fonction exécute une requête HTTP à l'adresse URL et l'envoie row_ver en paramètre et renvoie le résultat désérialisé de la requête.

La fonction de sauvegarde des données reçues ressemble à ceci :

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end

Cycle de sauvegarde des données dans l'espace goods placé dans une transaction (la fonction est utilisée pour cela box.atomic) pour réduire le nombre d'opérations sur le disque.

Enfin, la fonction de synchronisation spatiale locale goods avec une source, vous pouvez l'implémenter comme ceci :

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end

Nous lisons d’abord la valeur précédemment enregistrée row_ver pour l'espace goods. S'il manque (la première séance d'échange), alors nous le prenons comme row_ver zéro. Ensuite, dans le cycle, nous effectuons un téléchargement page par page des données modifiées à partir de la source à l'URL spécifiée. A chaque itération, nous sauvegardons les données reçues dans l'espace local approprié et mettons à jour la valeur row_ver (dans l'espace row_ver et dans la variable row_ver) - prendre la valeur row_ver à partir de la dernière ligne de données chargées.

Pour se protéger contre un bouclage accidentel (en cas d'erreur dans le programme), la boucle while peut être remplacé par for:

for _ = 1, max_req do ...

Suite à l'exécution de la fonction sync_goods espace goods le récepteur contiendra les dernières versions de tous les enregistrements spatiaux goods dans la source.

Évidemment, la suppression des données ne peut pas être diffusée de cette manière. Si un tel besoin existe, vous pouvez utiliser une marque de suppression. Ajouter à l'espace goods champ booléen is_deleted et au lieu de supprimer physiquement un enregistrement, nous utilisons la suppression logique - nous définissons la valeur du champ is_deleted dans le sens true. Parfois au lieu d'un champ booléen is_deleted il est plus pratique d'utiliser le terrain deleted, qui stocke la date et l'heure de la suppression logique de l'enregistrement. Après avoir effectué une suppression logique, l'enregistrement marqué pour suppression sera transféré de la source vers la destination (selon la logique décrite ci-dessus).

Séquence row_ver peut être utilisé pour transmettre des données provenant d’autres espaces : il n’est pas nécessaire de créer une séquence distincte pour chaque espace transmis.

Nous avons examiné un moyen efficace de réplication de données de haut niveau dans les applications utilisant le SGBD Tarantool.

résultats

  1. Tarantool DBMS est un produit attrayant et prometteur pour créer des applications à forte charge.
  2. La réplication de données de haut niveau présente de nombreux avantages par rapport à la réplication de bas niveau.
  3. La méthode de réplication de haut niveau abordée dans l'article vous permet de minimiser la quantité de données transférées en transférant uniquement les enregistrements modifiés depuis la dernière session d'échange.

Source: habr.com

Ajouter un commentaire