Replikering på hög nivå i Tarantool DBMS

Hej, jag skapar applikationer för DBMS Tarantool är en plattform utvecklad av Mail.ru Group som kombinerar ett högpresterande DBMS och en applikationsserver på Lua-språket. Den höga hastigheten på lösningar baserade på Tarantool uppnås, särskilt tack vare stöd för minnesläget i DBMS och möjligheten att exekvera applikationsaffärslogik i ett enda adressutrymme med data. Samtidigt säkerställs databeständighet med ACID-transaktioner (en WAL-logg upprätthålls på disken). Tarantool har inbyggt stöd för replikering och skärning. Från och med version 2.1 stöds frågor i SQL-språk. Tarantool är öppen källkod och licensieras under Simplified BSD-licensen. Det finns också en kommersiell Enterprise-version.

Replikering på hög nivå i Tarantool DBMS
Känn kraften! (...aka njut av föreställningen)

Allt ovanstående gör Tarantool till en attraktiv plattform för att skapa högbelastningsapplikationer som fungerar med databaser. I sådana applikationer finns det ofta ett behov av datareplikering.

Som nämnts ovan har Tarantool inbyggd datareplikering. Principen för dess funktion är att sekventiellt utföra på repliker alla transaktioner som finns i masterloggen (WAL). Vanligtvis sådan replikering (vi kommer att kalla det vidare låg nivå) används för att säkerställa applikationsfeltolerans och/eller för att fördela läsbelastningen mellan klusternoder.

Replikering på hög nivå i Tarantool DBMS
Ris. 1. Replikering inom ett kluster

Ett exempel på ett alternativt scenario skulle vara att överföra data som skapats i en databas till en annan databas för bearbetning/övervakning. I det senare fallet kan en mer bekväm lösning vara att använda hög nivå replikering - datareplikering på applikationens affärslogiknivå. De där. Vi använder inte en färdig lösning inbyggd i DBMS, utan implementerar replikering på egen hand inom applikationen vi utvecklar. Detta tillvägagångssätt har både fördelar och nackdelar. Låt oss lista fördelarna.

1. Trafikbesparingar:

  • Du kan inte överföra all data, utan bara en del av den (till exempel kan du överföra endast vissa tabeller, några av deras kolumner eller poster som uppfyller ett visst kriterium);
  • Till skillnad från lågnivåreplikering, som utförs kontinuerligt i asynkront (implementerat i den nuvarande versionen av Tarantool - 1.10) eller synkront (som ska implementeras i efterföljande versioner av Tarantool), kan replikering på hög nivå utföras i sessioner (d.v.s. applikationen synkroniserar först data - en utbytessessionsdata, sedan blir det en paus i replikeringen, varefter nästa utbytesession inträffar, etc.);
  • om en post har ändrats flera gånger kan du överföra endast dess senaste version (till skillnad från replikering på låg nivå, där alla ändringar som görs på mastern kommer att spelas upp sekventiellt på replikerna).

2. Det finns inga svårigheter med att implementera HTTP-utbyte, vilket gör att du kan synkronisera fjärrdatabaser.

Replikering på hög nivå i Tarantool DBMS
Ris. 2. Replikering över HTTP

3. Databasstrukturerna mellan vilka data överförs behöver inte vara desamma (dettare, i det allmänna fallet, är det till och med möjligt att använda olika DBMS, programmeringsspråk, plattformar, etc.).

Replikering på hög nivå i Tarantool DBMS
Ris. 3. Replikation i heterogena system

Nackdelen är att programmering i genomsnitt är svårare/dyrare än konfiguration, och istället för att anpassa den inbyggda funktionaliteten måste du implementera din egen.

Om ovanstående fördelar i din situation är avgörande (eller är ett nödvändigt villkor), är det vettigt att använda replikering på hög nivå. Låt oss titta på flera sätt att implementera datareplikering på hög nivå i Tarantool DBMS.

Trafikminimering

Så en av fördelarna med replikering på hög nivå är trafikbesparingar. För att denna fördel ska kunna realiseras fullt ut är det nödvändigt att minimera mängden data som överförs under varje utbytessession. Naturligtvis bör vi inte glömma att i slutet av sessionen måste datamottagaren synkroniseras med källan (åtminstone för den del av datan som är involverad i replikering).

Hur minimerar man mängden data som överförs under replikering på hög nivå? En enkel lösning kan vara att välja data efter datum och tid. För att göra detta kan du använda det datum-tid-fält som redan finns i tabellen (om det finns). Till exempel kan ett "order"-dokument ha ett fält "required order execution time" - delivery_time. Problemet med denna lösning är att värdena i detta fält inte behöver vara i den sekvens som motsvarar skapandet av beställningar. Så vi kan inte komma ihåg det maximala fältvärdet delivery_time, sänds under föregående utbytessession, och under nästa utbytessession välj alla poster med ett högre fältvärde delivery_time. Poster med ett lägre fältvärde kan ha lagts till mellan utbytessessioner delivery_time. Ordern kunde också ha genomgått förändringar, som ändå inte påverkade fältet delivery_time. I båda fallen kommer ändringarna inte att överföras från källan till destinationen. För att lösa dessa problem kommer vi att behöva överföra data "överlappande". De där. i varje utbytessession kommer vi att överföra all data med fältvärdet delivery_time, som överskrider någon punkt i det förflutna (till exempel N timmar från det aktuella ögonblicket). Det är dock uppenbart att för stora system är detta tillvägagångssätt högst redundant och kan minska de trafikbesparingar som vi strävar efter till ingenting. Dessutom kanske tabellen som överförs inte har ett fält kopplat till en datum-tid.

En annan lösning, mer komplex när det gäller implementering, är att bekräfta mottagandet av data. I detta fall, under varje utbytessession, överförs all data, vars mottagande inte har bekräftats av mottagaren. För att implementera detta måste du lägga till en boolesk kolumn i källtabellen (till exempel, is_transferred). Om mottagaren bekräftar mottagandet av posten tar motsvarande fält värdet true, varefter inträdet inte längre är involverat i utbyten. Detta implementeringsalternativ har följande nackdelar. Först måste en bekräftelse genereras och skickas för varje post som överförs. Grovt sett kan detta jämföras med att fördubbla mängden data som överförs och leda till att antalet tur- och returresor fördubblas. För det andra finns det ingen möjlighet att skicka samma post till flera mottagare (den första mottagaren som tar emot kommer att bekräfta mottagandet för sig själv och för alla andra).

En metod som inte har de ovan angivna nackdelarna är att lägga till en kolumn i den överförda tabellen för att spåra ändringar i dess rader. En sådan kolumn kan vara av datum-tid-typ och måste ställas in/uppdateras av applikationen till aktuell tid varje gång poster läggs till/ändras (atomiskt med tillägget/ändringen). Som ett exempel, låt oss kalla kolumnen update_time. Genom att spara det maximala fältvärdet för denna kolumn för de överförda posterna kan vi starta nästa utbytessession med detta värde (välj poster med fältvärdet update_time, överskrider det tidigare lagrade värdet). Problemet med det senare tillvägagångssättet är att dataförändringar kan ske i omgångar. Som ett resultat av fältvärdena i kolumnen update_time kanske inte är unik. Den här kolumnen kan alltså inte användas för portionerad (sida för sida) datautmatning. För att visa data sida för sida måste du uppfinna ytterligare mekanismer som med största sannolikhet kommer att ha mycket låg effektivitet (till exempel att hämta alla poster med värdet från databasen update_time högre än en given och producerar ett visst antal poster, med början från en viss offset från början av provet).

Du kan förbättra effektiviteten i dataöverföringen genom att förbättra det tidigare tillvägagångssättet något. För att göra detta kommer vi att använda heltalstypen (långt heltal) som kolumnfältsvärden för att spåra ändringar. Låt oss namnge kolumnen row_ver. Fältvärdet för denna kolumn måste fortfarande ställas in/uppdateras varje gång en post skapas/ändras. Men i det här fallet kommer fältet inte att tilldelas aktuell datum-tid, utan värdet på någon räknare, ökat med ett. Som ett resultat, kolumnen row_ver kommer att innehålla unika värden och kan användas inte bara för att visa "delta"-data (data som lagts till/ändrats sedan slutet av föregående utbytessession), utan också för att enkelt och effektivt dela upp dem på sidor.

Den sista föreslagna metoden för att minimera mängden data som överförs inom ramen för replikering på hög nivå förefaller mig vara den mest optimala och universella. Låt oss titta på det mer detaljerat.

Skicka data med hjälp av en radversionsräknare

Implementering av server/masterdelen

I MS SQL Server finns det en speciell kolumntyp för att implementera detta tillvägagångssätt - rowversion. Varje databas har en räknare som ökar med en varje gång en post läggs till/ändras i en tabell som har en kolumn som rowversion. Värdet på denna räknare tilldelas automatiskt till fältet i denna kolumn i den tillagda/ändrade posten. Tarantool DBMS har inte en liknande inbyggd mekanism. Men i Tarantool är det inte svårt att implementera det manuellt. Låt oss titta på hur detta görs.

Först, lite terminologi: tabeller i Tarantool kallas mellanslag och poster kallas tupler. I Tarantool kan du skapa sekvenser. Sekvenser är inget annat än namngivna generatorer av ordnade heltalsvärden. De där. det är precis vad vi behöver för våra syften. Nedan kommer vi att skapa en sådan sekvens.

Innan du utför någon databasoperation i Tarantool måste du köra följande kommando:

box.cfg{}

Som ett resultat kommer Tarantool att börja skriva databasögonblicksbilder och transaktionsloggar till den aktuella katalogen.

Låt oss skapa en sekvens row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })

alternativ if_not_exists tillåter att skapandeskriptet körs flera gånger: om objektet finns kommer Tarantool inte att försöka skapa det igen. Detta alternativ kommer att användas i alla efterföljande DDL-kommandon.

Låt oss skapa ett utrymme som ett exempel.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

Här anger vi namnet på utrymmet (goods), fältnamn och deras typer.

Auto-inkrementerande fält i Tarantool skapas också med hjälp av sekvenser. Låt oss skapa en automatiskt ökande primärnyckel för fält id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Tarantool stöder flera typer av index. De vanligaste indexen är TREE- och HASH-typer, som är baserade på strukturer som motsvarar namnet. TREE är den mest mångsidiga indextypen. Det låter dig hämta data på ett organiserat sätt. Men för jämställdhetsurval är HASH mer lämpligt. Följaktligen är det tillrådligt att använda HASH för primärnyckeln (vilket är vad vi gjorde).

För att använda kolumnen row_ver för att överföra ändrade data måste du binda sekvensvärden till fälten i denna kolumn row_ver. Men till skillnad från primärnyckeln, kolumnfältsvärdet row_ver bör öka med ett inte bara när man lägger till nya poster, utan också när man ändrar befintliga. Du kan använda triggers för detta. Tarantool har två typer av rymdutlösare: before_replace и on_replace. Triggers utlöses när data i utrymmet ändras (för varje tuppel som påverkas av ändringarna startas en triggerfunktion). Till skillnad från on_replace, before_replace-triggers låter dig ändra data för tupeln för vilken triggern exekveras. Följaktligen passar den sista typen av triggers oss.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)

Följande utlösare ersätter fältvärdet row_ver lagrad tuple till nästa värde i sekvensen row_version.

För att kunna extrahera data från rymden goods efter kolumn row_ver, låt oss skapa ett index:

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})

Indextyp - träd (TREE), därför att vi kommer att behöva extrahera data i stigande ordning av värdena i kolumnen row_ver.

Låt oss lägga till lite data till utrymmet:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

Därför att Det första fältet är en automatisk inkrementerande räknare, vi skickar noll istället. Tarantool kommer automatiskt att ersätta nästa värde. På samma sätt som värdet på kolumnfälten row_ver du kan passera noll - eller inte ange värdet alls, eftersom denna kolumn upptar den sista positionen i utrymmet.

Låt oss kontrollera insättningsresultatet:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...

Som du kan se fylls det första och sista fältet i automatiskt. Nu blir det enkelt att skriva en funktion för sida för sida uppladdning av utrymmesändringar goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end

Funktionen tar värdet som parameter row_ver, från vilken det är nödvändigt att ladda ner ändringar, och returnerar en del av den ändrade datan.

Datasampling i Tarantool görs genom index. Fungera get_goods använder en iterator efter index row_ver för att ta emot ändrad data. Iteratortypen är GT (större än, större än). Detta innebär att iteratorn sekventiellt kommer att gå igenom indexvärdena med början från den godkända nyckeln (fältvärde row_ver).

Iteratorn returnerar tupler. För att sedan kunna överföra data via HTTP är det nödvändigt att konvertera tuplarna till en struktur som är lämplig för efterföljande serialisering. Exemplet använder standardfunktionen för detta tomap. Istället för att använda tomap du kan skriva din egen funktion. Vi kanske till exempel vill byta namn på ett fält name, passera inte fältet code och lägg till ett fält comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end

Sidstorleken för utdata (antalet poster i en del) bestäms av variabeln page_size. I exemplet värdet page_size är 5. I ett riktigt program spelar sidstorleken oftast större roll. Det beror på den genomsnittliga storleken på rymdtupeln. Den optimala sidstorleken kan bestämmas empiriskt genom att mäta dataöverföringstiden. Ju större sidstorlek, desto mindre antal rundor mellan sändnings- och mottagningssidan. På så sätt kan du minska den totala tiden för nedladdning av ändringar. Men om sidstorleken är för stor kommer vi att spendera för lång tid på servern med att serialisera provet. Som ett resultat kan det uppstå förseningar i behandlingen av andra förfrågningar som kommer till servern. Parameter page_size kan laddas från konfigurationsfilen. För varje överfört utrymme kan du ställa in ett eget värde. Men för de flesta utrymmen kan standardvärdet (till exempel 100) vara lämpligt.

Låt oss köra funktionen get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...

Låt oss ta fältvärdet row_ver från sista raden och anropa funktionen igen:

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...

Ännu en gång:

tarantool> get_goods(8)
---
- []
...

Som du kan se, när den används på detta sätt, returnerar funktionen alla rymdposter sida för sida goods. Den sista sidan följs av ett tomt urval.

Låt oss göra ändringar i utrymmet:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

Vi har ändrat fältvärdet name för en post och lade till två nya poster.

Låt oss upprepa det senaste funktionsanropet:

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...

Funktionen returnerade de ändrade och tillagda posterna. Funktionen alltså get_goods låter dig ta emot data som har ändrats sedan det senaste anropet, vilket är grunden för den replikeringsmetod som övervägs.

Vi kommer att lämna utgivningen av resultat via HTTP i form av JSON utanför ramen för denna artikel. Du kan läsa om detta här: https://habr.com/ru/company/mailru/blog/272141/

Implementering av klient/slavdelen

Låt oss titta på hur den mottagande sidans implementering ser ut. Låt oss skapa ett utrymme på den mottagande sidan för att lagra nedladdade data:

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Rummets struktur liknar strukturen i utrymmet i källan. Men eftersom vi inte kommer att skicka de mottagna uppgifterna någon annanstans, kolumnen row_ver finns inte i mottagarens utrymme. I fält id källidentifierare kommer att spelas in. På mottagarens sida finns det därför inget behov av att göra den automatiskt inkrementerande.

Dessutom behöver vi ett utrymme för att spara värden row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

För varje laddat utrymme (fält space_name) sparar vi det senast laddade värdet här row_ver (fält value). Kolumnen fungerar som primärnyckel space_name.

Låt oss skapa en funktion för att ladda rymddata goods via HTTP. För att göra detta behöver vi ett bibliotek som implementerar en HTTP-klient. Följande rad laddar biblioteket och instansierar HTTP-klienten:

local http_client = require('http.client').new()

Vi behöver också ett bibliotek för json-deserialisering:

local json = require('json')

Detta räcker för att skapa en dataladdningsfunktion:

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end

Funktionen exekverar en HTTP-begäran till url-adressen och skickar den row_ver som en parameter och returnerar det deserialiserade resultatet av begäran.

Funktionen för att spara mottagen data ser ut så här:

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end

Cykel för att spara data i rymden goods placeras i en transaktion (funktionen används för detta box.atomic) för att minska antalet diskoperationer.

Slutligen den lokala rymdsynkroniseringsfunktionen goods med en källa kan du implementera det så här:

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end

Först läser vi det tidigare sparade värdet row_ver för utrymme goods. Om det saknas (första utbytespasset) så tar vi det som row_ver noll. Nästa i cykeln utför vi en sida-för-sida-nedladdning av de ändrade data från källan på den angivna webbadressen. Vid varje iteration sparar vi mottagna data till lämpligt lokalt utrymme och uppdaterar värdet row_ver (i rymden row_ver och i variabeln row_ver) - ta värdet row_ver från den sista raden med laddade data.

För att skydda mot oavsiktlig looping (vid ett fel i programmet), slingan while kan ersättas av for:

for _ = 1, max_req do ...

Som ett resultat av att utföra funktionen sync_goods Plats goods mottagaren kommer att innehålla de senaste versionerna av alla rymdposter goods i källan.

Uppenbarligen kan dataradering inte sändas på detta sätt. Om ett sådant behov finns kan du använda ett raderingsmärke. Lägg till utrymme goods booleskt fält is_deleted och istället för att fysiskt radera en post använder vi logisk radering - vi ställer in fältvärdet is_deleted till mening true. Ibland istället för ett booleskt fält is_deleted det är bekvämare att använda fältet deleted, som lagrar datum-tid för den logiska raderingen av posten. Efter att ha utfört en logisk radering kommer posten som är markerad för radering att överföras från källan till destinationen (enligt logiken som diskuterats ovan).

Efterföljande row_ver kan användas för att överföra data från andra utrymmen: det finns inget behov av att skapa en separat sekvens för varje överfört utrymme.

Vi tittade på ett effektivt sätt att replikera data på hög nivå i applikationer som använder Tarantool DBMS.

Resultat

  1. Tarantool DBMS är en attraktiv, lovande produkt för att skapa högbelastningsapplikationer.
  2. Datareplikering på hög nivå har ett antal fördelar jämfört med replikering på låg nivå.
  3. Den högnivåreplikeringsmetod som diskuteras i artikeln låter dig minimera mängden överförd data genom att endast överföra de poster som har ändrats sedan den senaste utbytessessionen.

Källa: will.com

Lägg en kommentar