Cách đây một thời gian, chúng ta đã phải đối mặt với vấn đề làm sạch các bộ dữ liệu trong không gian
Một ví dụ điển hình cho chúng tôi là mô-đun tarantool có tên
Описание
Tài liệu về tarantool rất hay
Hãy bắt đầu từ xa và xem mô-đun hết hạn được giới hạn trông như thế nào từ bên ngoài:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Để đơn giản, chúng tôi khởi chạy tarantool trong thư mục chứa thư viện libcapped-expirationd.so của chúng tôi. Hai hàm được xuất từ thư viện: bắt đầu và hủy. Bước đầu tiên là cung cấp các hàm này từ Lua bằng cách sử dụng box.schema.func.create và box.schema.user.grant. Sau đó, tạo một không gian trong đó các bộ dữ liệu sẽ chỉ chứa ba trường: trường đầu tiên là mã định danh duy nhất, trường thứ hai là email và trường thứ ba là thời gian tồn tại của bộ dữ liệu. Chúng tôi xây dựng một chỉ mục cây trên đầu trường đầu tiên và gọi nó là chính. Tiếp theo, chúng ta lấy đối tượng kết nối đến thư viện gốc của mình.
Sau công việc chuẩn bị, hãy chạy chức năng bắt đầu:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Ví dụ này sẽ hoạt động trong quá trình quét giống hệt như mô-đun đã hết hạn được viết bằng Lua. Đối số đầu tiên của hàm bắt đầu là tên duy nhất của tác vụ. Thứ hai là mã định danh không gian. Thứ ba là chỉ mục duy nhất mà các bộ dữ liệu sẽ bị xóa. Thứ tư là chỉ mục mà các bộ dữ liệu sẽ được duyệt qua. Thứ năm là số trường tuple có thời gian tồn tại (đánh số bắt đầu từ 1, không phải 0!). Thứ sáu và thứ bảy là cài đặt quét. 1024 là số lượng bộ dữ liệu tối đa có thể được xem trong một giao dịch. 3600 — thời gian quét toàn bộ tính bằng giây.
Lưu ý rằng ví dụ này sử dụng cùng một chỉ mục để thu thập thông tin và xóa. Nếu đây là chỉ mục cây thì việc truyền tải được thực hiện từ khóa nhỏ hơn đến khóa lớn hơn. Nếu có một số thứ khác, chẳng hạn như chỉ số băm, thì việc truyền tải được thực hiện, theo quy luật, theo thứ tự ngẫu nhiên. Tất cả các bộ dữ liệu không gian được quét trong một lần quét.
Hãy chèn một số bộ dữ liệu vào không gian với thời gian tồn tại là 60 giây:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Hãy kiểm tra xem việc chèn đã thành công chưa:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Hãy lặp lại thao tác chọn sau hơn 60 giây (tính từ khi bắt đầu chèn bộ dữ liệu đầu tiên) và thấy rằng mô-đun hết hạn có giới hạn đã được xử lý:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Hãy dừng nhiệm vụ:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Hãy xem ví dụ thứ hai trong đó một chỉ mục riêng biệt được sử dụng để thu thập thông tin:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Mọi thứ ở đây đều giống như trong ví dụ đầu tiên, với một vài ngoại lệ. Chúng tôi xây dựng một chỉ mục cây trên trường thứ ba và gọi nó là exp. Chỉ mục này không nhất thiết phải là duy nhất, không giống như chỉ mục được gọi là chính. Việc truyền tải sẽ được thực hiện theo chỉ số exp và xóa theo chỉ số chính. Chúng tôi nhớ rằng trước đây cả hai đều chỉ được thực hiện bằng cách sử dụng chỉ mục chính.
Sau công việc chuẩn bị, chúng ta chạy hàm start với các đối số mới:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Hãy chèn lại một số bộ dữ liệu vào khoảng trống với thời gian tồn tại là 60 giây:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Sau 30 giây, bằng cách tương tự, chúng ta sẽ thêm một vài bộ dữ liệu nữa:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Hãy kiểm tra xem việc chèn đã thành công chưa:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Hãy lặp lại thao tác chọn sau hơn 60 giây (tính từ khi bắt đầu chèn bộ dữ liệu đầu tiên) và thấy rằng mô-đun hết hạn có giới hạn đã được xử lý:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Vẫn còn một số bộ dữ liệu trong không gian và sẽ còn khoảng 30 giây nữa để tồn tại. Ngoài ra, quá trình quét dừng lại khi chuyển từ bộ dữ liệu có ID là 2 và thời gian tồn tại là 1576421257 sang bộ dữ liệu có ID là 3 và thời gian tồn tại là 1576421287. Các bộ dữ liệu có thời gian tồn tại là 1576421287 trở lên không được quét do thứ tự của các phím chỉ số exp. Đây là mức tiết kiệm mà chúng tôi muốn đạt được ngay từ đầu.
Hãy dừng nhiệm vụ:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Thực hiện
Cách tốt nhất để kể về tất cả các tính năng của dự án là nguồn gốc của nó.
Các đối số chúng ta truyền cho phương thức start được lưu trữ trong cấu trúc có tên là Expired_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Thuộc tính name là tên của nhiệm vụ. Thuộc tính space_id là mã định danh không gian. Thuộc tính rm_index_id là mã định danh của chỉ mục duy nhất mà các bộ dữ liệu sẽ bị xóa. Thuộc tính it_index_id là mã định danh của chỉ mục mà các bộ dữ liệu sẽ được duyệt qua. Thuộc tính it_index_type là loại chỉ mục mà các bộ dữ liệu sẽ được duyệt qua. Thuộc tính file_no là số trường tuple có thời gian tồn tại. Thuộc tính scan_size là số lượng bộ dữ liệu tối đa được quét trong một giao dịch. Thuộc tính scan_time là thời gian quét toàn bộ tính bằng giây.
Chúng tôi sẽ không xem xét việc phân tích các đối số. Đây là một công việc vất vả nhưng đơn giản, thư viện sẽ giúp bạn
Chúng tôi liệt kê các nguyên mẫu của tất cả các hàm được sử dụng để phân tích cú pháp:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Bây giờ chúng ta hãy chuyển sang điều quan trọng nhất - logic bỏ qua khoảng trắng và xóa các bộ dữ liệu. Mỗi khối bộ dữ liệu không lớn hơn scan_size được quét và sửa đổi trong một giao dịch. Nếu thành công, giao dịch này được cam kết; nếu xảy ra lỗi, nó sẽ được khôi phục. Đối số cuối cùng của hàm exid_iterate là một con trỏ tới trình vòng lặp mà từ đó quá trình quét bắt đầu hoặc tiếp tục. Trình vòng lặp này được tăng nội bộ cho đến khi xảy ra lỗi, hết dung lượng hoặc không thể dừng quá trình trước. Hàm hết hạn_hết hạn kiểm tra thời gian tồn tại của một bộ dữ liệu, Expired_delete xóa một bộ dữ liệu, Expired_breakable kiểm tra xem chúng ta có cần tiếp tục hay không.
Mã hàm Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Mã chức năng đã hết hạn_hết hạn:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Mã hàm Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Mã chức năng hết hạn_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Ứng dụng
Bạn có thể xem mã nguồn tại
Nguồn: www.habr.com