We write our own capped expirationd module for tarantool

We write our own capped expirationd module for tarantool

Some time ago, we faced the problem of cleaning tuples in spaces tarantool. Cleaning had to be started not when tarantool was already running out of memory, but in advance and with a certain frequency. For this task, tarantool has a module written in Lua called expiration. After using this module for a short time, we realized that it did not suit us: Lua hung in the GC during constant cleaning of large amounts of data. Therefore, we thought about developing our own capped expirationd module, hoping that the code written in a native programming language would solve our problems in the best way.

A good example is the tarantool module called Memcached. The approach used in it is based on the fact that a separate field is created in the space, in which the lifetime of the tuple is indicated, in other words, ttl. The module in the background scans the space, compares the ttl with the current time and decides whether to delete the tuple or not. The memcached module code is simple and elegant, but too generic. First, it does not take into account the type of index being crawled and deleted. Secondly, on each pass all tuples are scanned, the number of which can be quite large. And if the first problem was solved in the expirationd module (the tree-index was separated into a separate class), then the second one was still not given any attention. These three points predetermined the choice in favor of writing your own code.

Description

The documentation for tarantool has a very good tutorial about how to write your stored procedures in C. First of all, I suggest that you familiarize yourself with it in order to understand those inserts with commands and code that will be found below. It is also worth paying attention to reference to objects that are available when writing your own capped module, namely to box, fiber, index ΠΈ txn.

Let's start from afar and take a look at what the capped expirationd module looks like from the outside:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

For simplicity, we run tarantool in the directory where our libcapped-expirationd.so library is located. Two functions are exported from the library: start and kill. The first step is to make these functions available from Lua using box.schema.func.create and box.schema.user.grant. Then create a space whose tuples will contain only three fields: the first is a unique identifier, the second is email, and the third is the lifetime of the tuple. We build a tree index on top of the first field and call it primary. Next, we get the connection object to our native library.

After the preparatory work, we run the start function:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

This example will work exactly the same as the expirationd module, which is written in Lua, when scanning. The first argument to the start function is the unique name of the task. The second is the space identifier. The third is a unique index by which tuples will be deleted. The fourth is the index by which the tuples will be bypassed. The fifth is the field number of the tuple with the lifetime (numbering starts from 1, not 0!). The sixth and seventh are the scan settings. 1024 is the maximum number of tuples that can be looked up in a single transaction. 3600 - full scan time in seconds.

Note that the example uses the same index for traversal and deletion. If this is a tree index, then the traversal is carried out from the smaller key to the larger one. If some other, for example, hash-index, then the traversal is carried out, as a rule, in an arbitrary order. In one scan, all tuples of the space are scanned.

Let's insert several tuples into the space with a lifetime of 60 seconds:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Let's check that the insert was successful:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Let's repeat the select after 60+ seconds (counting from the beginning of the insertion of the first tuple) and see that the capped expirationd module has already completed:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Let's stop the task:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Let's look at a second example where a separate index is used for crawling:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Everything here is the same as in the first example, with a small exception. We build a tree index on top of the third field and call it exp. This index does not have to be unique, unlike the index called primary. Bypass will be carried out by exp index, and deletion by primary. We remember that previously both were done only using the primary index.

After the preparatory work, we run the start function with new arguments:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Again, we will insert several tuples into the space with a lifetime of 60 seconds:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

After 30 seconds, by analogy, add a few more tuples:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Let's check that the insert was successful:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Let's repeat the select after 60+ seconds (counting from the beginning of the insertion of the first tuple) and see that the capped expirationd module has already completed:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

There are tuples left in the space, which will live for about 30 seconds more. Moreover, the scan stopped when moving from a tuple with ID 2 and lifetime 1576421257 to a tuple with ID 3 and lifetime 1576421287. Tuples with lifetime 1576421287 or more were not scanned due to the ordering of index exp keys. This is the savings that we wanted to achieve at the very beginning.

Let's stop the task:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

implementation

Best of all, all the features of the project will always be told by its source code! As part of the publication, we will focus only on the most important points, namely, on algorithms for bypassing the space.

The arguments we pass to the start method are stored in a structure called expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

The name attribute is the name of the task. The space_id attribute is the identifier of the space. The rm_index_id attribute is a unique index identifier by which tuples will be deleted. The attribute it_index_id is the identifier of the index by which the tuples will be traversed. The it_index_type attribute is the type of the index by which the tuples will be traversed. The filed_no attribute is the field number of the tuple with the lifetime. The scan_size attribute is the maximum number of tuples that can be scanned within a single transaction. The scan_time attribute is the full scan time in seconds.

Argument parsing will not be considered. This is a painstaking, but simple job, in which the library will help you msgpuck. Difficulties can only arise with indexes that are passed from Lua as a complex data structure with type mp_map, and not using simple types mp_bool, mp_double, mp_int, mp_uint and mp_array. But you don't need to parse the entire index. It is enough just to check its uniqueness, calculate the type and extract the identifier.

We list the prototypes of all functions that are used for parsing:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

And now let's move on to the most important thing - to the logic of bypassing the space and deleting tuples. Each block of tuples no larger than scan_size is scanned and modified under a single transaction. If successful, this transaction is committed; if it fails, it is rolled back. The last argument to the expirationd_iterate function is a pointer to the iterator from which the scan starts or continues. This iterator is incremented internally until an error occurs, the space ends, or it is not possible to stop the process in advance. The expirationd_expired function checks the lifetime of the tuple, expirationd_delete - deletes the tuple, expirationd_breakable - checks whether we need to move on.

Expirationd_iterate function code:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Expirationd_expired function code:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

expirationd_delete function code:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Expirationd_breakable function code:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

application

You can view the source code at here!

Source: habr.com

Add a comment