Репликация на високо ниво в СУБД на Tarantool

Здравейте, създавам приложения за СУБД Tarantool е платформа, разработена от Mail.ru Group, която съчетава високопроизводителна СУБД и сървър за приложения на езика Lua. Високата скорост на решенията, базирани на Tarantool, се постига по-специално благодарение на поддръжката на режима в паметта на СУБД и възможността за изпълнение на бизнес логиката на приложението в едно адресно пространство с данни. В същото време устойчивостта на данните се осигурява чрез ACID транзакции (на диска се поддържа WAL дневник). Tarantool има вградена поддръжка за репликация и шардинг. От версия 2.1 се поддържат заявки на SQL език. Tarantool е с отворен код и е лицензиран под лиценза Simplified BSD. Има и търговска версия Enterprise.

Репликация на високо ниво в СУБД на Tarantool
Почувствайте силата! (...насладете се на изпълнението)

Всичко по-горе прави Tarantool привлекателна платформа за създаване на приложения с високо натоварване, които работят с бази данни. В такива приложения често има нужда от репликация на данни.

Както бе споменато по-горе, Tarantool има вградена репликация на данни. Принципът на неговата работа е последователно да изпълнява върху реплики всички транзакции, съдържащи се в главния дневник (WAL). Обикновено такава репликация (по-нататък ще я наричаме ниско ниво) се използва за осигуряване на устойчивост на грешки на приложението и/или за разпределяне на натоварването при четене между възлите на клъстера.

Репликация на високо ниво в СУБД на Tarantool
Ориз. 1. Репликация в рамките на клъстер

Пример за алтернативен сценарий би бил прехвърлянето на данни, създадени в една база данни, в друга база данни за обработка/мониторинг. В последния случай по-удобно решение може да бъде използването високо ниво репликация - репликация на данни на ниво бизнес логика на приложението. Тези. Ние не използваме готово решение, вградено в СУБД, а реализираме репликация сами в рамките на приложението, което разработваме. Този подход има както предимства, така и недостатъци. Нека изброим предимствата.

1. Спестяване на трафик:

  • Не можете да прехвърлите всички данни, а само част от тях (например можете да прехвърлите само някои таблици, някои техни колони или записи, които отговарят на определен критерий);
  • За разлика от репликацията на ниско ниво, която се извършва непрекъснато в асинхронен (имплементиран в текущата версия на Tarantool - 1.10) или синхронен (ще бъде внедрен в следващите версии на Tarantool) режим, репликацията на високо ниво може да се извършва в сесии (т.е. приложението първо синхронизира данните - сесия за обмен на данни, след това има пауза в репликацията, след което се появява следващата сесия за обмен и т.н.);
  • ако даден запис се е променил няколко пъти, можете да прехвърлите само последната му версия (за разлика от репликацията на ниско ниво, при която всички промени, направени на главния запис, ще се възпроизвеждат последователно върху репликите).

2. Няма трудности при внедряването на HTTP обмен, което ви позволява да синхронизирате отдалечени бази данни.

Репликация на високо ниво в СУБД на Tarantool
Ориз. 2. Репликация през HTTP

3. Не е задължително структурите на базите данни, между които се прехвърлят данни, да са еднакви (нещо повече, в общия случай дори е възможно да се използват различни СУБД, езици за програмиране, платформи и т.н.).

Репликация на високо ниво в СУБД на Tarantool
Ориз. 3. Репликация в хетерогенни системи

Недостатъкът е, че средно програмирането е по-трудно/скъпо от конфигурацията и вместо да персонализирате вградената функционалност, ще трябва да внедрите своя собствена.

Ако във вашата ситуация горните предимства са от решаващо значение (или са необходимо условие), тогава има смисъл да използвате репликация на високо ниво. Нека разгледаме няколко начина за внедряване на репликация на данни на високо ниво в СУБД на Tarantool.

Минимизиране на трафика

И така, едно от предимствата на репликацията на високо ниво е спестяването на трафик. За да се реализира напълно това предимство, е необходимо да се минимизира количеството на прехвърляните данни по време на всяка сесия на обмен. Разбира се, не трябва да забравяме, че в края на сесията приемникът на данни трябва да бъде синхронизиран с източника (поне за тази част от данните, която участва в репликацията).

Как да минимизирам количеството прехвърлени данни по време на репликация на високо ниво? Едно просто решение може да бъде да изберете данни по дата и час. За да направите това, можете да използвате полето за дата-час, което вече съществува в таблицата (ако съществува). Например документ „поръчка“ може да има поле „необходимо време за изпълнение на поръчката“ - delivery_time. Проблемът с това решение е, че стойностите в това поле не трябва да са в последователността, която съответства на създаването на поръчки. Така че не можем да си спомним максималната стойност на полето delivery_time, предадени по време на предишната сесия на обмен, а по време на следващата сесия на обмен изберете всички записи с по-висока стойност на полето delivery_time. Записи с по-ниска стойност на полето може да са добавени между сесиите на обмен delivery_time. Освен това редът може да е претърпял промени, които обаче не са засегнали полето delivery_time. И в двата случая промените няма да бъдат прехвърлени от източника към дестинацията. За да разрешим тези проблеми, ще трябва да прехвърлим данни „припокриващи се“. Тези. във всяка сесия на обмен ще прехвърляме всички данни със стойността на полето delivery_time, надхвърлящ някаква точка в миналото (например N часа от текущия момент). Очевидно е обаче, че за големи системи този подход е силно излишен и може да намали спестяването на трафик, към което се стремим, до нищо. Освен това таблицата, която се прехвърля, може да няма поле, свързано с дата-час.

Друго решение, по-сложно от гледна точка на изпълнение, е да се потвърди получаването на данни. В този случай по време на всяка сесия на обмен се предават всички данни, чието получаване не е потвърдено от получателя. За да приложите това, ще трябва да добавите булева колона към изходната таблица (например, is_transferred). Ако получателят потвърди получаването на записа, съответното поле приема стойността true, след което записът вече не участва в борси. Този вариант на изпълнение има следните недостатъци. Първо, за всеки прехвърлен запис трябва да се генерира и изпрати потвърждение. Грубо казано, това може да бъде сравнимо с удвояване на количеството прехвърлени данни и да доведе до удвояване на броя на двупосочните пътувания. Второ, няма възможност за изпращане на един и същи запис до няколко получателя (първият получател, който получи, ще потвърди получаването за себе си и за всички останали).

Метод, който няма посочените по-горе недостатъци, е добавянето на колона към предаваната таблица за проследяване на промените в нейните редове. Такава колона може да бъде от тип дата-час и трябва да бъде зададена/актуализирана от приложението до текущия час всеки път, когато се добавят/променят записи (атомично с добавянето/промяната). Като пример, нека наречем колоната update_time. Като запазим максималната стойност на полето на тази колона за прехвърлените записи, можем да започнем следващата сесия на обмен с тази стойност (изберете записи със стойността на полето update_time, надвишаваща предварително съхранената стойност). Проблемът с последния подход е, че промените в данните могат да се извършват на партиди. В резултат на стойностите на полето в колоната update_time може да не е уникален. По този начин тази колона не може да се използва за изходни данни (страница по страница). За да показвате данните страница по страница, ще трябва да измислите допълнителни механизми, които най-вероятно ще имат много ниска ефективност (например извличане от базата данни на всички записи със стойността update_time по-висока от дадена и създава определен брой записи, започвайки от определено отместване от началото на извадката).

Можете да подобрите ефективността на преноса на данни, като подобрите леко предишния подход. За да направим това, ще използваме целочислен тип (дълго цяло число) като стойности на полето на колоната за проследяване на промените. Да назовем колоната row_ver. Стойността на полето на тази колона все още трябва да се задава/актуализира всеки път, когато се създава/модифицира запис. Но в този случай на полето няма да бъде присвоена текущата дата-час, а стойността на някакъв брояч, увеличена с единица. В резултат на това колоната row_ver ще съдържа уникални стойности и може да се използва не само за показване на „делта“ данни (данни, добавени/променени от края на предишната сесия на обмен), но също така и за просто и ефективно разбиване на страници.

Последният предложен метод за минимизиране на количеството прехвърлени данни в рамките на репликация на високо ниво ми се струва най-оптималният и универсален. Нека го разгледаме по-подробно.

Предаване на данни с помощта на брояч на версия на ред

Внедряване на сървърната/главната част

В MS SQL Server има специален тип колона за прилагане на този подход - rowversion. Всяка база данни има брояч, който се увеличава с единица всеки път, когато се добави/промени запис в таблица, която има колона като rowversion. Стойността на този брояч автоматично се присвоява на полето на тази колона в добавения/променен запис. СУБД на Tarantool няма подобен вграден механизъм. В Tarantool обаче не е трудно да го внедрите ръчно. Нека да разгледаме как се прави това.

Първо, малко терминология: таблиците в Tarantool се наричат ​​пространства, а записите се наричат ​​кортежи. В Tarantool можете да създавате поредици. Последователностите не са нищо повече от именувани генератори на подредени цели числа. Тези. това е точно това, от което се нуждаем за нашите цели. По-долу ще създадем такава последователност.

Преди да извършите каквато и да е операция с база данни в Tarantool, трябва да изпълните следната команда:

box.cfg{}

В резултат на това Tarantool ще започне да записва моментни снимки на база данни и журнали на транзакции в текущата директория.

Нека създадем последователност row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })

Вариант if_not_exists позволява скриптът за създаване да бъде изпълнен многократно: ако обектът съществува, Tarantool няма да се опита да го създаде отново. Тази опция ще се използва във всички следващи DDL команди.

Нека създадем пространство като пример.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

Тук задаваме името на пространството (goods), имена на полета и техните типове.

Автоматично увеличаващите се полета в Tarantool също се създават с помощта на последователности. Нека създадем автоматично увеличаващ се първичен ключ по поле id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Tarantool поддържа няколко вида индекси. Най-често използваните индекси са типове TREE и HASH, които се основават на структури, съответстващи на името. TREE е най-универсалният тип индекс. Тя ви позволява да извличате данни по организиран начин. Но за избор на равенство, HASH е по-подходящ. Съответно е препоръчително да използвате HASH за първичен ключ (което и направихме).

За да използвате колоната row_ver за да прехвърлите променени данни, трябва да свържете стойностите на последователността към полетата на тази колона row_ver. Но за разлика от първичния ключ, стойността на полето на колоната row_ver трябва да се увеличи с единица не само при добавяне на нови записи, но и при промяна на съществуващи. Можете да използвате тригери за това. Tarantool има два вида задействания за пространство: before_replace и on_replace. Тригерите се задействат всеки път, когато данните в пространството се променят (за всеки кортеж, засегнат от промените, се стартира тригерна функция). За разлика от on_replace, before_replace-тригерите ви позволяват да променяте данните на кортежа, за който се изпълнява тригерът. Съответно, последният тип тригери ни подхожда.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)

Следният тригер замества стойността на полето row_ver съхранен кортеж към следващата стойност на последователността row_version.

За да може да извлича данни от космоса goods по колона row_ver, нека създадем индекс:

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})

Тип индекс - дърво (TREE), защото ще трябва да извлечем данните във възходящ ред на стойностите в колоната row_ver.

Нека добавим малко данни към пространството:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

защото Първото поле е автоматично увеличаващ се брояч; вместо това предаваме нула. Tarantool автоматично ще замени следващата стойност. По същия начин, като стойността на полетата на колоната row_ver можете да подадете нула - или изобщо да не посочите стойността, защото тази колона заема последната позиция в пространството.

Нека проверим резултата от вмъкването:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...

Както можете да видите, първото и последното поле се попълват автоматично. Сега ще бъде лесно да напишете функция за качване страница по страница на промените в пространството goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end

Функцията приема като параметър стойността row_ver, започвайки от който е необходимо да се разтоварят промените, и връща част от променените данни.

Извадката от данни в Tarantool се извършва чрез индекси. функция get_goods използва итератор по индекс row_ver за получаване на променени данни. Типът на итератора е GT (Greater Than, по-голямо от). Това означава, че итераторът последователно ще премине през стойностите на индекса, започвайки от подаден ключ (стойност на полето row_ver).

Итераторът връща кортежи. За да може впоследствие да се прехвърлят данни през HTTP, е необходимо да се преобразуват кортежите в структура, удобна за последваща сериализация. Примерът използва стандартната функция за това tomap. Вместо да използвате tomap можете да напишете своя собствена функция. Например, може да искаме да преименуваме поле name, не подминавайте полето code и добавете поле comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end

Размерът на страницата на изходните данни (броят на записите в една част) се определя от променливата page_size. В примера стойността page_size е 5. В реална програма размерът на страницата обикновено е по-важен. Зависи от средния размер на космическия кортеж. Оптималният размер на страницата може да се определи емпирично чрез измерване на времето за трансфер на данни. Колкото по-голям е размерът на страницата, толкова по-малък е броят на обръщенията между изпращащата и получаващата страна. По този начин можете да намалите общото време за изтегляне на промените. Ако обаче размерът на страницата е твърде голям, ние ще прекараме твърде много време на сървъра, сериализирайки извадката. В резултат на това може да има забавяне при обработката на други заявки, идващи към сървъра. Параметър page_size може да се зареди от конфигурационния файл. За всяко предавано пространство можете да зададете собствена стойност. За повечето интервали обаче стойността по подразбиране (например 100) може да е подходяща.

Нека изпълним функцията get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...

Нека вземем стойността на полето row_ver от последния ред и отново извикайте функцията:

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...

Още веднъж:

tarantool> get_goods(8)
---
- []
...

Както можете да видите, когато се използва по този начин, функцията връща всички записи за пространство страница по страница goods. Последната страница е последвана от празна селекция.

Нека направим промени в пространството:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

Променихме стойността на полето name за един запис и добави два нови записа.

Нека повторим последното извикване на функция:

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...

Функцията върна променените и добавени записи. Така че функцията get_goods ви позволява да получавате данни, които са се променили от последното му извикване, което е в основата на разглеждания метод на репликация.

Ще оставим издаването на резултати чрез HTTP под формата на JSON извън обхвата на тази статия. Можете да прочетете за това тук: https://habr.com/ru/company/mailru/blog/272141/

Внедряване на част клиент/подчинен

Нека да разгледаме как изглежда изпълнението на приемащата страна. Нека създадем пространство от приемащата страна за съхраняване на изтеглените данни:

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Структурата на пространството наподобява структурата на пространството в източника. Но тъй като няма да предаваме получените данни никъде другаде, колоната row_ver не е в пространството на получателя. В полето id идентификаторите на източника ще бъдат записани. Следователно от страната на приемника няма нужда да се прави автоматично нарастване.

Освен това се нуждаем от пространство за запазване на стойности row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

За всяко заредено пространство (поле space_name) ще запазим последната заредена стойност тук row_ver (поле value). Колоната действа като първичен ключ space_name.

Нека създадем функция за зареждане на данни за пространство goods чрез HTTP. За да направим това, имаме нужда от библиотека, която внедрява HTTP клиент. Следният ред зарежда библиотеката и инстанцира HTTP клиента:

local http_client = require('http.client').new()

Нуждаем се също от библиотека за json десериализация:

local json = require('json')

Това е достатъчно, за да създадете функция за зареждане на данни:

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end

Функцията изпълнява HTTP заявка към url адреса и я изпраща row_ver като параметър и връща десериализирания резултат от заявката.

Функцията за запазване на получените данни изглежда така:

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end

Цикъл на записване на данни в пространството goods поставени в транзакция (функцията се използва за това box.atomic), за да намалите броя на дисковите операции.

И накрая, функцията за синхронизиране на локалното пространство goods с източник можете да го реализирате по следния начин:

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end

Първо четем предварително записаната стойност row_ver за пространство goods. Ако липсва (първата сесия на обмен), тогава го приемаме като row_ver нула. След това в цикъла извършваме изтегляне страница по страница на променените данни от източника на посочения URL адрес. При всяка итерация запазваме получените данни в съответното локално пространство и актуализираме стойността row_ver (в космоса row_ver и в променливата row_ver) - вземете стойността row_ver от последния ред на заредените данни.

За да се предпази от случайно зацикляне (в случай на грешка в програмата), цикълът while може да се замени с for:

for _ = 1, max_req do ...

В резултат на изпълнение на функцията sync_goods пространство goods приемникът ще съдържа най-новите версии на всички космически записи goods в източника.

Очевидно изтриването на данни не може да се излъчва по този начин. Ако има такава необходимост, можете да използвате знак за изтриване. Добавяне към пространството goods булево поле is_deleted и вместо физическо изтриване на запис, използваме логическо изтриване - задаваме стойността на полето is_deleted в смисъл true. Понякога вместо булево поле is_deleted по-удобно е да използвате полето deleted, който съхранява датата-час на логическото изтриване на записа. След извършване на логическо изтриване, записът, маркиран за изтриване, ще бъде прехвърлен от източника към дестинацията (според логиката, обсъдена по-горе).

последователност row_ver може да се използва за предаване на данни от други пространства: няма нужда да се създава отделна последователност за всяко предавано пространство.

Разгледахме ефективен начин за репликация на данни на високо ниво в приложения, използващи СУБД на Tarantool.

Данни

  1. Tarantool DBMS е атрактивен, обещаващ продукт за създаване на приложения с високо натоварване.
  2. Репликацията на данни на високо ниво има редица предимства пред репликацията на ниско ниво.
  3. Методът за репликация на високо ниво, разгледан в статията, ви позволява да минимизирате количеството прехвърлени данни, като прехвърлите само онези записи, които са се променили след последната сесия на обмен.

Източник: www.habr.com

Добавяне на нов коментар