前段時間我們遇到了清理空間中元組的問題
對我們來說一個很好的例子是 tarantool 模組,名為
描述
tarantool 的文檔非常好
讓我們從遠處開始,從外部看一下上限過期模組是什麼樣子的:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
為簡單起見,我們在 libcapped-expirationd.so 庫所在的目錄中啟動 tarantool。 從庫中導出兩個函數:start 和kill。 第一步是使用 box.schema.func.create 和 box.schema.user.grant 從 Lua 中使用這些函數。 然後建立一個空間,其元組僅包含三個欄位:第一個是唯一標識符,第二個是電子郵件,第三個是元組的生命週期。 我們在第一個欄位之上建立一個樹索引,並將其稱為主索引。 接下來我們取得到本機庫的連接物件。
準備工作完成後,執行start函數:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
這個範例在掃描過程中的工作方式與用 Lua 編寫的expirationd 模組完全相同。 啟動函數的第一個參數是任務的唯一名稱。 第二個是空間識別符。 第三個是刪除元組的唯一索引。 第四個是遍歷元組的索引。 第五個是具有生命週期的元組欄位的編號(編號從1開始,而不是0!)。 第六和第七是掃描設定。 1024 是單一事務中可以查看的最大元組數。 3600 — 完整掃描時間(以秒為單位)。
請注意,該範例使用相同的索引進行爬網和刪除。 如果這是一個樹索引,則從較小的鍵到較大的鍵進行遍歷。 如果存在其他一些索引,例如雜湊索引,則通常以隨機順序進行遍歷。 所有空間元組都在一次掃描中被掃描。
讓我們在空間中插入幾個生命週期為 60 秒的元組:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
我們來檢查是否插入成功:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
讓我們在 60 秒以上(從插入第一個元組開始算起)後重複選擇,看看 capped expiryd 模組已經處理完畢:
tarantool> box.space.tester.index.primary:select()
---
- []
...
讓我們停止任務:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
讓我們看第二個範例,其中使用單獨的索引進行爬網:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
這裡的所有內容都與第一個範例中的相同,但有一些例外。 我們在第三個欄位之上建立一個樹索引並將其稱為 exp。 與稱為主索引的索引不同,該索引不必是唯一的。 遍歷將透過exp索引進行,刪除則透過primary進行。 我們記得以前兩者都是只使用主索引完成的。
準備工作完成後,我們使用新參數執行啟動函數:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
讓我們再次將幾個元組插入到該空間中,生命週期為 60 秒:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
30秒後,以此類推,我們再增加幾個元組:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
我們來檢查是否插入成功:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
讓我們在 60 秒以上(從插入第一個元組開始算起)後重複選擇,看看 capped expiryd 模組已經處理完畢:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
空間中還剩下一些元組,它們還有大約 30 秒的生存時間。 此外,當從ID 為2、生命週期為1576421257 的元組移動到ID 為3、生命週期為1576421287 的元組時,掃描停止。由於以下順序,生命週期為1576421287 或更長的元組不會被掃描。 exp 索引鍵。 這是我們一開始就想實現的節省。
讓我們停止任務:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
履行
講述專案所有功能的最佳方式是其原始來源。
我們傳遞給start方法的參數儲存在一個名為expirationd_task的結構中:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
name 屬性是任務的名稱。 space_id 屬性是空間識別碼。 rm_index_id 屬性是刪除元組的唯一索引的識別碼。 it_index_id 屬性是遍歷元組的索引的識別碼。 it_index_type 屬性是遍歷元組的索引類型。 fields_no 屬性是具有生存期的元組欄位的編號。 scan_size 屬性是一次事務中掃描的最大元組數。 scan_time 屬性是以秒為單位的完整掃描時間。
我們不會考慮解析參數。 這是一項艱苦但簡單的工作,圖書館將幫助您
我們列出了用於解析的所有函數的原型:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
現在讓我們繼續討論最重要的事情 - 繞過空間和刪除元組的邏輯。 在單一事務下掃描和修改不大於 scan_size 的每個元組塊。 如果成功,則提交該交易;如果發生錯誤,則回滾。 expirationd_iterate 函數的最後一個參數是一個指向迭代器的指針,掃描從該迭代器開始或繼續。 此迭代器在內部遞增,直到發生錯誤、空間耗盡或無法提前停止進程。 函數expirationd_expired 檢查元組的生命週期,expirationd_delete 刪除元組,expirationd_breakable 檢查是否需要繼續。
Expirationd_iterate函數程式碼:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
函數程式碼expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete函數程式碼:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
函數程式碼expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
應用
您可以在以下位置查看原始程式碼
來源: www.habr.com