High-level replication in Tarantool DBMS

Hello, I'm building applications for DBMS Tarantool is a platform developed by Mail.ru Group that combines a high-performance DBMS and an application server in the Lua language. The high speed of Tarantool-based solutions is achieved, in particular, by supporting the in-memory DBMS mode and the ability to execute the application's business logic in a single address space with the data. At the same time, data persistence is ensured using ACID transactions (a WAL log is kept on the disk). Tarantool has built-in support for replication and sharding. Starting with version 2.1, SQL queries are supported. Tarantool is open source and distributed under the Simplified BSD license. There is also a commercial Enterprise version.

High-level replication in Tarantool DBMS
Feel the power! (…aka enjoy the performance)

All of the above makes Tarantool an attractive platform for creating high-load database applications. In such applications, there is often a need to replicate data.

As mentioned above, Tarantool has built-in data replication. The principle of its operation is the sequential execution of all transactions contained in the master log (WAL) on replicas. Typically, such replication (we will refer to it as low level) is used to provide fault tolerance for the application and/or to distribute the read load among the nodes of the cluster.

High-level replication in Tarantool DBMS
Rice. 1. Replication within a cluster

An example of an alternative scenario would be to transfer data created in one database to another database for processing/monitoring. In the latter case, a more convenient solution may be to use high-level replication - data replication at the level of application business logic. Those. we do not use a ready-made solution built into the DBMS, but implement replication inside the application we are developing on our own. This approach has both advantages and disadvantages. Let's list the pros.

1. Traffic saving:

  • you can transfer not all data, but only part of it (for example, you can transfer only some tables, some of their columns or records that meet a certain criterion);
  • unlike low-level replication, which is performed continuously in asynchronous (implemented in the current version of Tarantool - 1.10) or synchronous (to be implemented in future versions of Tarantool) mode, high-level replication can be performed by sessions (i.e., the application first performs data synchronization - an exchange session data, then there is a pause in replication, after which the next exchange session occurs, etc.);
  • if the record has changed several times, only its latest version can be transferred (unlike low-level replication, in which all changes made on the master will be sequentially played on the replicas).

2. There are no difficulties with the implementation of the exchange via HTTP, which allows you to synchronize remote databases.

High-level replication in Tarantool DBMS
Rice. 2. HTTP replication

3. The database structures between which data is transferred do not have to be the same (moreover, in the general case, it is even possible to use different DBMS, programming languages, platforms, etc.).

High-level replication in Tarantool DBMS
Rice. 3. Replication in heterogeneous systems

The downside is that, on average, programming is more difficult/expensive than configuring, and instead of customizing the built-in functionality, you will have to implement your own.

If in your situation the above pluses play a decisive role (or are a necessary condition), then it makes sense to use high-level replication. Let's consider several ways to implement high-level data replication in the Tarantool DBMS.

Traffic minimization

So, one of the advantages of high-level replication is traffic savings. In order for this advantage to be fully manifested, it is necessary to minimize the amount of data transmitted during each exchange session. Of course, one should not forget that at the end of the session, the data receiver must be synchronized with the source (at least in terms of the part of the data that is involved in replication).

How to minimize the amount of data transferred during high-level replication? The decision "on the forehead" can be the selection of data by date-time. To do this, you can use the date-time field already present in the table (if it exists). For example, an “order” document might have a field “required order lead time” − delivery_time. The problem with this solution is that the values ​​in this field do not have to be in the order that the orders were created. Thus, we cannot remember the maximum value of the field delivery_time, passed during the previous exchange session, and during the next exchange session, select all records with a higher field value delivery_time. In the interval between exchange sessions, records with a lower field value could be added delivery_time. Also, the order could have undergone changes that nevertheless did not affect the field delivery_time. In both cases, the changes will not be propagated from the source to the destination. To solve these problems, we need to transfer data "overlap". Those. at each exchange session, we will transfer all data with the field value delivery_time, exceeding some moment in the past (for example, N hours from the current moment). However, it is obvious that for large systems this approach is highly redundant and can reduce the traffic savings we are striving for to nothing. In addition, the transmitted table may not have a field associated with the date-time.

Another solution, more complex in terms of implementation, is to acknowledge receipt of data. In this case, during each exchange session, all data is transmitted, the receipt of which is not confirmed by the recipient. To implement it, you need to add a Boolean column to the source table (for example, is_transferred). If the receiver acknowledges receipt of the record, the corresponding field is set to true, after which the record no longer participates in exchanges. This implementation option has the following disadvantages. First, for each submitted entry, an acknowledgment must be generated and sent. Roughly speaking, this can be compared to doubling the amount of data transferred and resulting in a doubling of the number of roundtrips. Secondly, there is no possibility of sending the same record to several receivers (the first receiver to receive it will acknowledge receipt for itself and for all the others).

The method, devoid of the disadvantages given above, consists in adding a column to the transmitted table to track changes in its rows. Such a column can be of type date-time and must be set/updated by the application to the current time each time records are added/modified (atomically with addition/modification). Let's take the column as an example. update_time. Having saved the maximum value of the field of this column for the transferred records, we can start the next exchange session from this value (select records with the field value update_timeexceeding the previously stored value). The problem with the latter approach is that data changes can happen in batches. As a result, the values ​​of the fields in the column update_time may not be unique. Thus, this column cannot be used for batching (page-by-page) data output. For paging data, you will have to invent additional mechanisms that are likely to have very low efficiency (for example, extracting from the database all records with the value update_time above the given one and outputting a certain number of records, starting from some offset from the start of the sample).

You can improve the efficiency of data transfer by slightly improving the previous approach. To do this, we will use an integer type (long integer) as the values ​​​​of the fields of the column for tracking changes. Let's call the column row_ver. The field value of this column still needs to be set/updated each time a record is created/modified. But in this case, the field will be assigned not the current date-time, but the value of some counter increased by one. As a result, the column row_ver will contain unique values ​​and can be used not only for issuing "delta" data (data added / changed after the end of the previous exchange session), but also for simple and efficient pagination.

The last proposed method for minimizing the amount of data transferred within the framework of high-level replication seems to me the most optimal and universal. Let's dwell on it in more detail.

Passing Data Using the Row Version Count

Implementation of the server / master part

In MS SQL Server, to implement this approach, there is a special column type - rowversion. Each database has a counter that increments by one each time a record is added/modified in a table that has a column like rowversion. The value of this counter is automatically assigned to the field of this column in the added/changed record. The Tarantool DBMS does not have a similar built-in mechanism. However, in Tarantool it is easy to implement manually. Let's see how it's done.

First, some terminology: tables in Tarantool are called spaces (space), and records are called tuples. You can create sequences in Tarantool. Sequences are nothing more than named generators of ordered integer values. Those. this is just what we need for our purposes. Below we will create such a sequence.

Before performing any database operation in Tarantool, you need to run the following command:

box.cfg{}

As a result, Tarantool will start writing database snapshots and the transaction log to the current directory.

Let's create a sequence row_version:

box.schema.sequence.create('row_version',
    { if_not_exists = true })

Option if_not_exists allows you to execute the creation script multiple times: if the object exists, Tarantool will not try to create it again. This option will be used in all subsequent DDL commands.

Let's create a space for an example.

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        },
        {
            name = 'row_ver',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

Here we have given the name of the space (goods), field names and their types.

Auto-increment fields in Tarantool are also created using sequences. Create an auto-incrementing primary key by field id:

box.schema.sequence.create('goods_id',
    { if_not_exists = true })
box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

Tarantool supports several types of indexes. The indexes of TREE and HASH types are most often used, which are based on structures corresponding to the name. TREE is the most versatile index type. It allows you to retrieve data in an organized way. But for equality selection, HASH is more suitable. Accordingly, it is advisable to use HASH for the primary key (which we did).

To use a column row_ver to transfer changed data, it is necessary to bind sequence values ​​to the fields of this column row_ver. But unlike the primary key, the column field value row_ver should increase by one not only when adding new records, but also when changing existing ones. You can use triggers for this. There are two types of space triggers in Tarantool: before_replace и on_replace. Triggers are fired whenever the data in the space changes (a trigger function is run for each tuple affected by the change). Unlike on_replace, before_replace-triggers allow you to modify the data of the tuple on which the trigger is executed. Accordingly, the last type of triggers suits us.

box.space.goods:before_replace(function(old, new)
    return box.tuple.new({new[1], new[2], new[3],
        box.sequence.row_version:next()})
end)

The given trigger replaces the value of the field row_ver stored tuple to the next value of the sequence row_version.

In order to be able to extract data from the space goods by column row_ver, create an index:

box.space.goods:create_index('row_ver', {
    parts = { 'row_ver' },
    unique = true,
    type = 'TREE',
    if_not_exists = true
})

Index type - tree (TREE), because we need to extract the data in ascending order of the values ​​in the column row_ver.

Let's add some data to the space:

box.space.goods:insert{nil, 'pen', 123}
box.space.goods:insert{nil, 'pencil', 321}
box.space.goods:insert{nil, 'brush', 100}
box.space.goods:insert{nil, 'watercolour', 456}
box.space.goods:insert{nil, 'album', 101}
box.space.goods:insert{nil, 'notebook', 800}
box.space.goods:insert{nil, 'rubber', 531}
box.space.goods:insert{nil, 'ruler', 135}

Because the first field is an auto-increment counter, we pass nil instead. Tarantool will automatically substitute the next value. Similarly, as the value of the column fields row_ver you can pass nil - or not specify a value at all, because this column occupies the last position in the space.

Let's check the result of the insert:

tarantool> box.space.goods:select()
---
- - [1, 'pen', 123, 1]
  - [2, 'pencil', 321, 2]
  - [3, 'brush', 100, 3]
  - [4, 'watercolour', 456, 4]
  - [5, 'album', 101, 5]
  - [6, 'notebook', 800, 6]
  - [7, 'rubber', 531, 7]
  - [8, 'ruler', 135, 8]
...

As you can see, the first and last fields are filled in automatically. Now it will be easy to write a function for page-by-page unloading of space changes goods:

local page_size = 5
local function get_goods(row_ver)
    local index = box.space.goods.index.row_ver
    local goods = {}
    local counter = 0
    for _, tuple in index:pairs(row_ver, {
        iterator = 'GT' }) do
        local obj = tuple:tomap({ names_only = true })
        table.insert(goods, obj)
        counter = counter + 1
        if counter >= page_size then
            break
        end
    end
    return goods
end

The function takes a value as a parameter row_ver, starting from which it is necessary to unload changes, and returns a portion of the changed data.

Data sampling in Tarantool is done through indexes. Function get_goods uses iterator by index row_ver to get the changed data. The iterator type is GT (Greater Than, greater than). This means that the iterator will sequentially traverse the index values ​​starting from the passed key (field value row_ver).

The iterator returns tuples. In order to subsequently be able to transfer data via HTTP, it is necessary to convert tuples to a structure that is convenient for subsequent serialization. The example uses the standard function for this. tomap. Instead of using tomap you can write your own function. For example, we might want to rename a field name, don't pass the field code and add field comment:

local function unflatten_goods(tuple)
    local obj = {}
    obj.id = tuple.id
    obj.goods_name = tuple.name
    obj.comment = 'some comment'
    obj.row_ver = tuple.row_ver
    return obj
end

The size of the output data page (the number of records in one portion) is determined by the variable page_size. In the example, the value page_size is 5. In a real program, the page size usually matters more. It depends on the average size of the space tuple. The optimal page size can be selected empirically by measuring the data transfer time. The larger the page size, the fewer roundtrips between the transmitting and receiving side. This way you can reduce the total time for unloading changes. However, if the page size is too large, we will spend too much time on the server serializing the fetch. As a result, there may be delays in the processing of other requests that have come to the server. Parameter page_size can be loaded from the config file. For each transmitted space, you can set your own value. In this case, for most spaces, the default value (for example, 100) may be suitable.

Execute the function get_goods:

tarantool> get_goods(0)

---
- - row_ver: 1
    code: 123
    name: pen
    id: 1
  - row_ver: 2
    code: 321
    name: pencil
    id: 2
  - row_ver: 3
    code: 100
    name: brush
    id: 3
  - row_ver: 4
    code: 456
    name: watercolour
    id: 4
  - row_ver: 5
    code: 101
    name: album
    id: 5
...

Take the value of the field row_ver from the last line and call the function again:

tarantool> get_goods(5)

---
- - row_ver: 6
    code: 800
    name: notebook
    id: 6
  - row_ver: 7
    code: 531
    name: rubber
    id: 7
  - row_ver: 8
    code: 135
    name: ruler
    id: 8
...

Once again:

tarantool> get_goods(8)
---
- []
...

As you can see, with this use, the function returns all the records of the space page by page goods. The last page is followed by a blank selection.

Let's make changes to the space:

box.space.goods:update(4, {{'=', 6, 'copybook'}})
box.space.goods:insert{nil, 'clip', 234}
box.space.goods:insert{nil, 'folder', 432}

We have changed the value of the field name for one entry and added two new entries.

Let's repeat the last function call:

tarantool> get_goods(8)
---



- - row_ver: 9
    code: 800
    name: copybook
    id: 6
  - row_ver: 10
    code: 234
    name: clip
    id: 9
  - row_ver: 11
    code: 432
    name: folder
    id: 10
...

The function returned the changed and added records. So the function get_goods allows you to receive data that has changed since its last call, which is the basis of the considered replication method.

Let's leave the issuance of results via HTTP in the form of JSON beyond the scope of this article. You can read about it here: https://habr.com/ru/company/mailru/blog/272141/

Implementation of the client/slave part

Consider what the implementation of the receiving side looks like. Let's create a space on the receiving side to store the downloaded data:

box.schema.space.create('goods', {
    format = {
        {
            name = 'id',
            type = 'unsigned'

        },
        {
            name = 'name',
            type = 'string'

        },
        {
            name = 'code',
            type = 'unsigned'

        }
    },
    if_not_exists = true
})

box.space.goods:create_index('primary', {
    parts = { 'id' },
    sequence = 'goods_id',
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

The structure of the space resembles the structure of the space in the source. But since we are not going to pass the received data anywhere else, the column row_ver is missing in the recipient's space. In field id source identifiers will be recorded. Therefore, on the receiver side, there is no need to make it auto-increment.

In addition, we need a space to save the values row_ver:

box.schema.space.create('row_ver', {
    format = {
        {
            name = 'space_name',
            type = 'string'

        },
        {
            name = 'value',
            type = 'string'

        }
    },
    if_not_exists = true
})

box.space.row_ver:create_index('primary', {
    parts = { 'space_name' },
    unique = true,
    type = 'HASH',
    if_not_exists = true
})

For each loaded space (field space_name) we will store the last loaded value here row_ver (field value). The column is the primary key. space_name.

Let's create a function to load space data goods over HTTP. To do this, we need a library that implements an HTTP client. The following line loads the library and instantiates the HTTP client:

local http_client = require('http.client').new()

We also need a json deserialization library:

local json = require('json')

This is enough to create a data loading function:

local function load_data(url, row_ver)
    local url = ('%s?rowVer=%s'):format(url,
        tostring(row_ver))
    local body = nil
    local data = http_client:request('GET', url, body, {
        keepalive_idle =  1,
        keepalive_interval = 1
    })
    return json.decode(data.body)
end

The function executes an HTTP request at url, passes to it row_ver as a parameter and returns the deserialized query result.

The function to save the received data is as follows:

local function save_goods(goods)
    local n = #goods
    box.atomic(function()
        for i = 1, n do
            local obj = goods[i]
            box.space.goods:put(
                obj.id, obj.name, obj.code)
        end
    end)
end

The cycle of saving data to space goods placed in a transaction (for this, the function box.atomic) to reduce the number of disk operations.

Finally, the local space synchronization function goods with source can be done like this:

local function sync_goods()
    local tuple = box.space.row_ver:get('goods')
    local row_ver = tuple and tuple.value or 0

    —— set your url here:
    local url = 'http://127.0.0.1:81/test/goods/list'

    while true do
        local goods = load_goods(url, row_ver)

        local count = #goods
        if count == 0 then
            return
        end

        save_goods(goods)

        row_ver = goods[count].rowVer
        box.space.row_ver:put({'goods', row_ver})
    end
end

First, we read the previously stored value row_ver for space goods. If it is absent (the first exchange session), then we take as row_ver zero. Next, in the loop, we perform a page-by-page loading of the changed data from the source at the specified url. At each iteration, we save the received data into the corresponding local space and update the value row_ver (in space row_ver and in the variable row_ver) - take the value row_ver from the last row of loaded data.

To protect against accidental looping (in case of an error in the program), the loop while can be replaced by for:

for _ = 1, max_req do ...

As a result of the execution of the function sync_goods space goods in the receiver will contain the latest versions of all space records goods in the source.

Obviously, data deletion cannot be broadcast in this way. If such a need exists, you can use the mark for deletion. Add to space goods boolean field is_deleted and instead of physically deleting the record, we use logical deletion - we set the value of the field is_deleted in value true. Sometimes instead of a boolean field is_deleted more convenient to use the field deleted, which stores the date-time of the logical deletion of the record. After performing a logical deletion, the record marked for deletion will be transferred from the source to the destination (according to the logic discussed above).

Sequence row_ver can be used to transmit data from other spaces: there is no need to create a separate sequence for each transmitted space.

We have considered an efficient way of high-level data replication in applications using the Tarantool DBMS.

Conclusions

  1. Tarantool DBMS is an attractive, promising product for creating high-load applications.
  2. High-level data replication has several advantages over low-level replication.
  3. The method of high-level replication considered in the article allows minimizing the amount of data transferred by transferring only those records that have changed since the last exchange session.

Source: habr.com

Add a comment