Replikace na vysoké úrovni v Tarantool DBMS

Dobrý den, vytvářím aplikace pro DBMS Tarantool je platforma vyvinutá společností Mail.ru Group, která kombinuje vysoce výkonný DBMS a aplikační server v jazyce Lua. Vysoké rychlosti řešení založených na Tarantool je dosaženo zejména díky podpoře in-memory režimu DBMS a schopnosti spouštět aplikační obchodní logiku v jediném adresním prostoru s daty. Zároveň je zajištěna perzistence dat pomocí ACID transakcí (na disku je udržován WAL log). Tarantool má vestavěnou podporu pro replikaci a sharding. Od verze 2.1 jsou podporovány dotazy v jazyce SQL. Tarantool je open source a je licencován pod licencí Simplified BSD. K dispozici je také komerční verze Enterprise.

Replikace na vysoké úrovni v Tarantool DBMS
Cítit sílu! (...také užijte si představení)

Díky všemu výše uvedenému je Tarantool atraktivní platformou pro vytváření vysoce zatěžovaných aplikací, které pracují s databázemi. V takových aplikacích je často potřeba replikace dat.

Jak bylo uvedeno výše, Tarantool má vestavěnou replikaci dat. Principem jeho fungování je sekvenční provádění na replikách všech transakcí obsažených v hlavním protokolu (WAL). Obvykle taková replikace (dále ji budeme nazývat nízká úroveň) se používá k zajištění odolnosti proti chybám aplikace a/nebo k rozložení zátěže čtení mezi uzly clusteru.

Replikace na vysoké úrovni v Tarantool DBMS
Rýže. 1. Replikace v rámci klastru

Příkladem alternativního scénáře by byl přenos dat vytvořených v jedné databázi do jiné databáze pro zpracování/monitorování. V druhém případě může být pohodlnějším řešením použití vysoká úroveň replikace - replikace dat na úrovni aplikační obchodní logiky. Tito. Nepoužíváme hotové řešení zabudované v DBMS, ale implementujeme replikaci sami v rámci aplikace, kterou vyvíjíme. Tento přístup má výhody i nevýhody. Pojďme si vyjmenovat výhody.

1. Úspora provozu:

  • Nelze přenést všechna data, ale pouze jejich část (můžete například přenést jen některé tabulky, některé jejich sloupce nebo záznamy, které splňují určité kritérium);
  • Na rozdíl od nízkoúrovňové replikace, která se provádí nepřetržitě v asynchronním (implementovaném v aktuální verzi Tarantool - 1.10) nebo synchronním (bude implementována v následujících verzích Tarantoolu) režimu, lze replikaci na vysoké úrovni provádět v relacích (tj. aplikace nejprve synchronizuje data - výměna dat relace, poté dojde k pauze v replikaci, po které dojde k další výměnné relaci atd.);
  • pokud se záznam několikrát změnil, můžete přenést pouze jeho nejnovější verzi (na rozdíl od nízkoúrovňové replikace, ve které se všechny změny provedené na masteru přehrají postupně na replikách).

2. Nejsou žádné potíže s implementací výměny HTTP, která umožňuje synchronizovat vzdálené databáze.

Replikace na vysoké úrovni v Tarantool DBMS
Rýže. 2. Replikace přes HTTP

3. Databázové struktury, mezi kterými se data přenášejí, nemusí být stejné (navíc v obecném případě je dokonce možné použít různé DBMS, programovací jazyky, platformy atd.).

Replikace na vysoké úrovni v Tarantool DBMS
Rýže. 3. Replikace v heterogenních systémech

Nevýhodou je, že v průměru je programování obtížnější/nákladnější než konfigurace a místo přizpůsobení vestavěné funkcionality budete muset implementovat vlastní.

Pokud jsou ve vaší situaci výše uvedené výhody zásadní (nebo jsou nezbytnou podmínkou), pak má smysl používat replikaci na vysoké úrovni. Podívejme se na několik způsobů, jak implementovat replikaci dat na vysoké úrovni v Tarantool DBMS.

Minimalizace dopravy

Jednou z výhod replikace na vysoké úrovni je tedy úspora provozu. Aby byla tato výhoda plně realizována, je nutné minimalizovat množství přenášených dat během každé výměnné relace. Samozřejmě bychom neměli zapomínat, že na konci relace musí být přijímač dat synchronizován se zdrojem (alespoň pro tu část dat, která se účastní replikace).

Jak minimalizovat množství dat přenášených během replikace na vysoké úrovni? Přímým řešením může být výběr dat podle data a času. Chcete-li to provést, můžete použít pole datum-čas, které již v tabulce existuje (pokud existuje). Například dokument „objednávka“ může mít pole „požadovaný čas provedení objednávky“ - delivery_time. Problém tohoto řešení je v tom, že hodnoty v tomto poli nemusí být v pořadí, které odpovídá vytváření objednávek. Takže si nemůžeme vzpomenout na maximální hodnotu pole delivery_time, přenášené během předchozí relace výměny a během následující relace výměny vyberte všechny záznamy s vyšší hodnotou pole delivery_time. Mezi výměnnými relacemi mohly být přidány záznamy s nižší hodnotou pole delivery_time. Také pořadí mohlo doznat změn, které se nicméně oboru nedotkly delivery_time. V obou případech se změny nepřenesou ze zdroje do cíle. K vyřešení těchto problémů budeme muset přenášet data „překrývající se“. Tito. v každé výměnné relaci přeneseme všechna data s hodnotou pole delivery_time, přesahující nějaký bod v minulosti (například N hodin od aktuálního okamžiku). Je však zřejmé, že pro velké systémy je tento přístup vysoce nadbytečný a může snížit úspory provozu, o které usilujeme, na nic. Přenášená tabulka navíc nemusí mít pole spojené s datem a časem.

Dalším implementačně složitějším řešením je potvrzování příjmu dat. V tomto případě jsou během každé výměnné relace přenášena všechna data, jejichž příjem nebyl potvrzen příjemcem. Chcete-li to implementovat, budete muset do zdrojové tabulky přidat booleovský sloupec (např. is_transferred). Pokud příjemce potvrdí přijetí záznamu, odpovídající pole převezme hodnotu true, po kterém již vstup není zapojen do výměn. Tato možnost implementace má následující nevýhody. Nejprve je nutné pro každý přenesený záznam vygenerovat a odeslat potvrzení. Zhruba řečeno by to mohlo být srovnatelné se zdvojnásobením objemu přenesených dat a vedoucím ke zdvojnásobení počtu zpátečních cest. Za druhé neexistuje možnost poslat stejný záznam několika příjemcům (první příjemce potvrdí příjem sobě i všem ostatním).

Metodou, která nemá výše uvedené nevýhody, je přidání sloupce do přenášené tabulky pro sledování změn v jejích řádcích. Takový sloupec může být typu datum a čas a musí být aplikací nastaven/aktualizován na aktuální čas při každém přidání/změně záznamů (atomicky s přidáním/změnou). Jako příklad nazvěme sloupec update_time. Uložením maximální hodnoty pole tohoto sloupce pro přenesené záznamy můžeme zahájit další výměnnou relaci s touto hodnotou (vyberte záznamy s hodnotou pole update_time, překračující dříve uloženou hodnotu). Problém s druhým přístupem je, že ke změnám dat může docházet v dávkách. V důsledku hodnot polí ve sloupci update_time nemusí být jedinečné. Tento sloupec tedy nelze použít pro výstup dat po částech (po jednotlivých stránkách). Chcete-li zobrazit data stránku po stránce, budete muset vymyslet další mechanismy, které budou mít s největší pravděpodobností velmi nízkou efektivitu (například načíst z databáze všechny záznamy s hodnotou update_time vyšší než daný a produkuje určitý počet záznamů, počínaje určitým posunem od začátku vzorku).

Mírným vylepšením předchozího přístupu můžete zlepšit efektivitu přenosu dat. K tomu použijeme typ celé číslo (long integer) jako hodnoty pole sloupců pro sledování změn. Pojmenujme sloupec row_ver. Hodnota pole tohoto sloupce musí být stále nastavena/aktualizována při každém vytvoření/změně záznamu. Ale v tomto případě nebude poli přiřazen aktuální datum a čas, ale hodnota nějakého čítače zvýšená o jedničku. V důsledku toho sloupec row_ver bude obsahovat jedinečné hodnoty a lze je použít nejen k zobrazení „delta“ dat (data přidaná/změněná od konce předchozí výměnné relace), ale také k jejich jednoduchému a efektivnímu rozdělení na stránky.

Poslední navrhovaný způsob minimalizace množství přenášených dat v rámci vysokoúrovňové replikace se mi jeví jako nejoptimálnější a nejuniverzálnější. Pojďme se na to podívat podrobněji.

Předávání dat pomocí počítadla verzí řádku

Implementace serverové/master části

V MS SQL Server existuje speciální typ sloupce pro implementaci tohoto přístupu - rowversion. Každá databáze má čítač, který se zvýší o jedničku pokaždé, když je záznam přidán/změněn v tabulce, která má sloupec podobný rowversion. Hodnota tohoto počítadla se automaticky přiřadí do pole tohoto sloupce v přidaném/změněném záznamu. Tarantool DBMS nemá podobný vestavěný mechanismus. V Tarantool to však není těžké implementovat ručně. Podívejme se, jak se to dělá.

Nejprve trocha terminologie: tabulky v Tarantoolu se nazývají mezery a záznamy se nazývají n-tice. V Tarantool můžete vytvářet sekvence. Sekvence nejsou nic jiného než pojmenované generátory uspořádaných celočíselných hodnot. Tito. to je přesně to, co potřebujeme pro naše účely. Níže takovou sekvenci vytvoříme.

Před provedením jakékoli operace databáze v Tarantool musíte spustit následující příkaz:

box.cfg{}

Výsledkem je, že Tarantool začne zapisovat databázové snímky a transakční protokoly do aktuálního adresáře.

Vytvořme sekvenci row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })

Možnost if_not_exists umožňuje spuštění skriptu pro vytvoření vícekrát: pokud objekt existuje, Tarantool se jej znovu nepokusí vytvořit. Tato možnost bude použita ve všech následujících příkazech DDL.

Vytvořme si prostor jako příklad.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

Zde nastavíme název prostoru (goods), názvy polí a jejich typy.

Auto-inkrementační pole v Tarantool jsou také vytvořena pomocí sekvencí. Vytvořme automaticky se zvyšující primární klíč podle pole id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Tarantool podporuje několik typů indexů. Nejčastěji používané indexy jsou typy TREE a HASH, které jsou založeny na strukturách odpovídajících názvu. TREE je nejuniverzálnější typ indexu. Umožňuje vám získávat data organizovaným způsobem. Ale pro výběr rovnosti je vhodnější HASH. V souladu s tím je vhodné použít HASH pro primární klíč (což jsme udělali).

Chcete-li použít sloupec row_ver pro přenos změněných dat je třeba svázat sekvenční hodnoty do polí v tomto sloupci row_ver. Ale na rozdíl od primárního klíče, hodnota pole sloupce row_ver by se měla zvýšit o jedničku nejen při přidávání nových záznamů, ale i při změně stávajících. K tomu můžete použít spouštěče. Tarantool má dva typy vesmírných spouštěčů: before_replace и on_replace. Spouštěče se spouštějí vždy, když se změní data v prostoru (pro každou n-tici ovlivněnou změnami se spustí funkce spouštění). Na rozdíl od on_replace, before_replace-triggers vám umožní upravit data n-tice, pro kterou je spouštěč spuštěn. Podle toho nám vyhovuje poslední typ spouštěčů.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)

Následující spouštěč nahrazuje hodnotu pole row_ver uloženou n-tici na další hodnotu sekvence row_version.

Aby bylo možné extrahovat data z vesmíru goods podle sloupce row_ver, vytvoříme index:

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})

Typ indexu - strom (TREE), protože budeme muset extrahovat data ve vzestupném pořadí hodnot ve sloupci row_ver.

Přidejme do prostoru nějaká data:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

Protože První pole je automaticky se zvyšující počítadlo, místo toho předáme nulu. Tarantool automaticky nahradí další hodnotu. Podobně jako hodnota polí sloupců row_ver můžete předat nulu - nebo vůbec neuvádět hodnotu, protože tento sloupec zaujímá poslední pozici v prostoru.

Zkontrolujeme výsledek vložení:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...

Jak vidíte, první a poslední pole se vyplňují automaticky. Nyní bude snadné napsat funkci pro nahrávání změn prostoru stránku po stránce goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end

Funkce bere jako parametr hodnotu row_ver, počínaje, ze kterého je nutné vyjmout změny, a vrátí část změněných dat.

Vzorkování dat v Tarantool se provádí pomocí indexů. Funkce get_goods používá iterátor podle indexu row_ver přijímat změněná data. Typ iterátoru je GT (Větší než, větší než). To znamená, že iterátor bude postupně procházet hodnoty indexu počínaje předaným klíčem (hodnota pole row_ver).

Iterátor vrací n-tice. Aby bylo možné následně přenášet data přes HTTP, je nutné převést n-tice do struktury vhodné pro následnou serializaci. Příklad k tomu používá standardní funkci tomap. Místo použití tomap můžete napsat svou vlastní funkci. Můžeme například chtít přejmenovat pole name, neprojděte pole code a přidejte pole comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end

Velikost stránky výstupních dat (počet záznamů v jedné části) je určena proměnnou page_size. V příkladu hodnota page_size je 5. Ve skutečném programu obvykle více záleží na velikosti stránky. Záleží na průměrné velikosti vesmírné n-tice. Optimální velikost stránky lze určit empiricky měřením doby přenosu dat. Čím větší je velikost stránky, tím menší je počet zpátečních cest mezi odesílající a přijímající stranou. Tímto způsobem můžete zkrátit celkovou dobu stahování změn. Pokud je však velikost stránky příliš velká, strávíme na serveru serializací vzorku příliš dlouho. V důsledku toho může dojít ke zpoždění při zpracování dalších požadavků přicházejících na server. Parametr page_size lze načíst z konfiguračního souboru. Pro každý přenášený prostor můžete nastavit jeho vlastní hodnotu. Pro většinu prostorů však může být vhodná výchozí hodnota (například 100).

Proveďme funkci get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...

Vezměme hodnotu pole row_ver z posledního řádku a zavolejte funkci znovu:

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...

Ještě jednou:

tarantool> get_goods(8)
---
- []
...

Jak můžete vidět, při použití tímto způsobem funkce vrací všechny záznamy o prostoru stránku po stránce goods. Po poslední stránce následuje prázdný výběr.

Provedeme změny v prostoru:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

Změnili jsme hodnotu pole name pro jeden záznam a přidal dva nové záznamy.

Zopakujeme poslední volání funkce:

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...

Funkce vrátila změněné a přidané záznamy. Takže funkce get_goods umožňuje přijímat data, která se od posledního volání změnila, což je základem uvažované metody replikace.

Vydávání výsledků přes HTTP ve formě JSON necháme mimo rámec tohoto článku. Můžete si o tom přečíst zde: https://habr.com/ru/company/mailru/blog/272141/

Realizace klientské/slave části

Podívejme se, jak vypadá implementace přijímající strany. Na přijímací straně vytvoříme prostor pro uložení stažených dat:

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Struktura prostoru připomíná strukturu prostoru ve zdroji. Protože ale nebudeme předávat přijatá data nikam jinam, sloupec row_ver není v prostoru příjemce. V terénu id budou zaznamenány identifikátory zdroje. Na straně přijímače tedy není potřeba provádět automatické zvyšování.

Navíc potřebujeme prostor pro uložení hodnot row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Pro každé načtené místo (pole space_name) zde uložíme poslední načtenou hodnotu row_ver (pole value). Sloupec funguje jako primární klíč space_name.

Vytvořme funkci pro načtení prostorových dat goods přes HTTP. K tomu potřebujeme knihovnu, která implementuje HTTP klienta. Následující řádek načte knihovnu a vytvoří instanci klienta HTTP:

local http_client = require('http.client').new()

Potřebujeme také knihovnu pro deserializaci json:

local json = require('json')

To stačí k vytvoření funkce načítání dat:

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end

Funkce provede HTTP požadavek na url adresu a odešle jej row_ver jako parametr a vrátí deserializovaný výsledek požadavku.

Funkce pro ukládání přijatých dat vypadá takto:

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end

Cyklus ukládání dat do prostoru goods vložen do transakce (k tomu slouží funkce box.atomic), aby se snížil počet operací s diskem.

Konečně funkce místní synchronizace prostoru goods se zdrojem to můžete implementovat takto:

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end

Nejprve načteme dříve uloženou hodnotu row_ver pro prostor goods. Pokud chybí (první výměnná relace), bereme to jako row_ver nula. Dále v cyklu provedeme stahování změněných dat po jednotlivých stránkách ze zdroje na zadané adrese URL. Při každé iteraci ukládáme přijatá data do příslušného lokálního prostoru a aktualizujeme hodnotu row_ver (ve vesmíru row_ver a v proměnné row_ver) - vezměte hodnotu row_ver z posledního řádku načtených dat.

Pro ochranu proti náhodnému zacyklení (v případě chyby v programu) smyčka while lze nahradit for:

for _ = 1, max_req do ...

V důsledku provedení funkce sync_goods prostor goods přijímač bude obsahovat nejnovější verze všech vesmírných záznamů goods ve zdroji.

Je zřejmé, že smazání dat nelze tímto způsobem vysílat. Pokud taková potřeba existuje, můžete použít značku pro odstranění. Přidat do prostoru goods booleovské pole is_deleted a místo fyzického mazání záznamu použijeme logické mazání - nastavíme hodnotu pole is_deleted do smyslu true. Někdy místo booleovského pole is_deleted je pohodlnější použít pole deleted, který ukládá datum a čas logického smazání záznamu. Po provedení logického smazání bude záznam označený ke smazání přenesen ze zdroje do cíle (podle logiky diskutované výše).

Posloupnost row_ver lze použít pro přenos dat z jiných prostorů: není potřeba vytvářet samostatnou sekvenci pro každý přenášený prostor.

Podívali jsme se na efektivní způsob replikace dat na vysoké úrovni v aplikacích využívajících Tarantool DBMS.

Závěry

  1. Tarantool DBMS je atraktivní, slibný produkt pro vytváření aplikací s vysokou zátěží.
  2. Replikace dat na vysoké úrovni má oproti replikaci na nízké úrovni řadu výhod.
  3. Metoda replikace na vysoké úrovni popsaná v článku umožňuje minimalizovat množství přenesených dat přenosem pouze těch záznamů, které se od poslední relace výměny změnily.

Zdroj: www.habr.com

Přidat komentář