為 tarantool 編寫我們自己的上限過期模組

為 tarantool 編寫我們自己的上限過期模組

前段時間我們遇到了清理空間中元組的問題 塔蘭圖爾。 清理不能在 tarantool 內存不足時開始,而是提前並以一定的頻率開始。 對於這個任務,tarantool 有一個用 Lua 寫的模組,名為 過期。 使用這個模組一段時間後,我們意識到它不適合我們:由於不斷清理大量數據,Lua掛在了GC中。 因此,我們考慮開發自己的 cappedexpirationd 模組,希望用原生程式語言編寫的程式碼能夠以最好的方式解決我們的問題。

對我們來說一個很好的例子是 tarantool 模組,名為 memcached的。 其中使用的方法是基於在空間中創建一個單獨的字段,該字段指示元組的生命週期,換句話說,ttl。 後台模組掃描空間,將TTL與目前時間進行比較,決定是否刪除該元組。 memcached模組程式碼簡單優雅,但過於通用。 首先,它沒有考慮正在爬行和刪除的索引的類型。 其次,每次掃描都會掃描所有元組,其數量可能非常大。 如果在expirationd模組中解決了第一個問題(樹索引被分離到一個單獨的類別中),那麼第二個問題仍然沒有受到任何關注。 這三點決定了我自己寫程式的選擇。

描述

tarantool 的文檔非常好 教程 關於如何用 C 語言編寫預存程序。首先,我建議您熟悉它,以便理解下面將出現的那些使用命令和程式碼進行插入的內容。 也值得關注 參考 編寫您自己的上限模組時可用的對象,即 , 纖維, 指數 и 傳送門.

讓我們從遠處開始,從外部看一下上限過期模組是什麼樣子的:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

為簡單起見,我們在 libcapped-expirationd.so 庫所在的目錄中啟動 tarantool。 從庫中導出兩個函數:start 和kill。 第一步是使用 box.schema.func.create 和 box.schema.user.grant 從 Lua 中使用這些函數。 然後建立一個空間,其元組僅包含三個欄位:第一個是唯一標識符,第二個是電子郵件,第三個是元組的生命週期。 我們在第一個欄位之上建立一個樹索引,並將其稱為主索引。 接下來我們取得到本機庫的連接物件。

準備工作完成後,執行start函數:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

這個範例在掃描過程中的工作方式與用 Lua 編寫的expirationd 模組完全相同。 啟動函數的第一個參數是任務的唯一名稱。 第二個是空間識別符。 第三個是刪除元組的唯一索引。 第四個是遍歷元組的索引。 第五個是具有生命週期的元組欄位的編號(編號從1開始,而不是0!)。 第六和第七是掃描設定。 1024 是單一事務中可以查看的最大元組數。 3600 — 完整掃描時間(以秒為單位)。

請注意,該範例使用相同的索引進行爬網和刪除。 如果這是一個樹索引,則從較小的鍵到較大的鍵進行遍歷。 如果存在其他一些索引,例如雜湊索引,則通常以隨機順序進行遍歷。 所有空間元組都在一次掃描中被掃描。

讓我們在空間中插入幾個生命週期為 60 秒的元組:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

我們來檢查是否插入成功:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

讓我們在 60 秒以上(從插入第一個元組開始算起)後重複選擇,看看 capped expiryd 模組已經處理完畢:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

讓我們停止任務:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

讓我們看第二個範例,其中使用單獨的索引進行爬網:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

這裡的所有內容都與第一個範例中的相同,但有一些例外。 我們在第三個欄位之上建立一個樹索引並將其稱為 exp。 與稱為主索引的索引不同,該索引不必是唯一的。 遍歷將透過exp索引進行,刪除則透過primary進行。 我們記得以前兩者都是只使用主索引完成的。

準備工作完成後,我們使用新參數執行啟動函數:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

讓我們再次將幾個元組插入到該空間中,生命週期為 60 秒:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

30秒後,以此類推,我們再增加幾個元組:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

我們來檢查是否插入成功:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

讓我們在 60 秒以上(從插入第一個元組開始算起)後重複選擇,看看 capped expiryd 模組已經處理完畢:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

空間中還剩下一些元組,它們還有大約 30 秒的生存時間。 此外,當從ID 為2、生命週期為1576421257 的元組移動到ID 為3、生命週期為1576421287 的元組時,掃描停止。由於以下順序,生命週期為1576421287 或更長的元組不會被掃描。 exp 索引鍵。 這是我們一開始就想實現的節省。

讓我們停止任務:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

履行

講述專案所有功能的最佳方式是其原始來源。 ! 作為出版物的一部分,我們將只關注最重要的一點,即空間旁路演算法。

我們傳遞給start方法的參數儲存在一個名為expirationd_task的結構中:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

name 屬性是任務的名稱。 space_id 屬性是空間識別碼。 rm_index_id 屬性是刪除元組的唯一索引的識別碼。 it_index_id 屬性是遍歷元組的索引的識別碼。 it_index_type 屬性是遍歷元組的索引類型。 fields_no 屬性是具有生存期的元組欄位的編號。 scan_size 屬性是一次事務中掃描的最大元組數。 scan_time 屬性是以秒為單位的完整掃描時間。

我們不會考慮解析參數。 這是一項艱苦但簡單的工作,圖書館將幫助您 訊息包。 只有當索引從 Lua 作為 mp_map 類型的複雜資料結構傳遞時才會出現困難,而不是使用簡單類型 mp_bool、mp_double、mp_int、mp_uint 和 mp_array。 但不需要解析整個索引。 您只需要檢查其唯一性,計算類型並提取識別碼。

我們列出了用於解析的所有函數的原型:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

現在讓我們繼續討論最重要的事情 - 繞過空間和刪除元組的邏輯。 在單一事務下掃描和修改不大於 scan_size 的每個元組塊。 如果成功,則提交該交易;如果發生錯誤,則回滾。 expirationd_iterate 函數的最後一個參數是一個指向迭代器的指針,掃描從該迭代器開始或繼續。 此迭代器在內部遞增,直到發生錯誤、空間耗盡或無法提前停止進程。 函數expirationd_expired 檢查元組的生命週期,expirationd_delete 刪除元組,expirationd_breakable 檢查是否需要繼續。

Expirationd_iterate函數程式碼:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

函數程式碼expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete函數程式碼:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

函數程式碼expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

應用

您可以在以下位置查看原始程式碼 這裡!

來源: www.habr.com

添加評論