Jakiś czas temu stanęliśmy przed problemem czyszczenia krotek w przestrzeniach
Dobrym przykładem dla nas był moduł tarantool tzw
Opis
Dokumentacja tarantoola jest bardzo dobra
Zacznijmy z daleka i przyjrzyjmy się, jak wygląda z zewnątrz zamknięty, wygasły moduł:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Dla uproszczenia uruchamiamy tarantool w katalogu, w którym znajduje się nasza biblioteka libcapped-expirationd.so. Z biblioteki eksportowane są dwie funkcje: start i kill. Pierwszym krokiem jest udostępnienie tych funkcji z Lua za pomocą box.schema.func.create i box.schema.user.grant. Następnie utwórz przestrzeń, w której krotki będą zawierać tylko trzy pola: pierwsze to unikalny identyfikator, drugie to e-mail, a trzecie to czas życia krotki. Budujemy indeks drzewa na pierwszym polu i nazywamy go podstawowym. Następnie otrzymujemy obiekt połączenia z naszą natywną biblioteką.
Po pracach przygotowawczych uruchom funkcję start:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Ten przykład będzie działał podczas skanowania dokładnie tak samo, jak wygasły moduł napisany w Lua. Pierwszym argumentem funkcji start jest unikalna nazwa zadania. Drugi to identyfikator spacji. Trzeci to unikalny indeks, dzięki któremu krotki zostaną usunięte. Czwarty to indeks, według którego będą przechodzić krotki. Piąty to numer pola krotki z czasem życia (numeracja zaczyna się od 1, a nie od 0!). Szósty i siódmy to ustawienia skanowania. 1024 to maksymalna liczba krotek, które można wyświetlić w pojedynczej transakcji. 3600 — czas pełnego skanowania w sekundach.
Należy pamiętać, że w przykładzie zastosowano ten sam indeks do przeszukiwania i usuwania. Jeśli jest to indeks drzewa, to przejście odbywa się od mniejszego klucza do większego. Jeśli istnieje inny, na przykład indeks skrótu, wówczas przechodzenie odbywa się z reguły w losowej kolejności. Wszystkie krotki przestrzenne są skanowane w jednym skanie.
Wstawmy w przestrzeń kilka krotek o czasie życia 60 sekund:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Sprawdźmy, czy wstawienie się powiodło:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Powtórzmy wybór po ponad 60 sekundach (licząc od początku wstawienia pierwszej krotki) i zobaczmy, że moduł z ograniczeniem, który wygasł, został już przetworzony:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Zatrzymajmy zadanie:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Przyjrzyjmy się drugiemu przykładowi, w którym do indeksowania używany jest oddzielny indeks:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Wszystko tutaj jest takie samo jak w pierwszym przykładzie, z kilkoma wyjątkami. Budujemy indeks drzewa na trzecim polu i nazywamy go exp. Indeks ten nie musi być unikalny, w przeciwieństwie do indeksu zwanego indeksem podstawowym. Przejście zostanie przeprowadzone według indeksu exp, a usunięcie przez podstawowy. Pamiętamy, że poprzednio oba były wykonywane tylko przy użyciu indeksu podstawowego.
Po pracach przygotowawczych uruchamiamy funkcję start z nowymi argumentami:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Wstawmy ponownie w przestrzeń kilka krotek z czasem życia wynoszącym 60 sekund:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Po 30 sekundach analogicznie dodamy jeszcze kilka krotek:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Sprawdźmy, czy wstawienie się powiodło:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Powtórzmy wybór po ponad 60 sekundach (licząc od początku wstawienia pierwszej krotki) i zobaczmy, że moduł z ograniczeniem, który wygasł, został już przetworzony:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
W przestrzeni pozostało jeszcze kilka krotek, którym będzie pozostało około 30 sekund życia. Co więcej, skanowanie zatrzymywało się przy przejściu z krotki o identyfikatorze 2 i czasie życia 1576421257 do krotki o identyfikatorze 3 i czasie życia 1576421287. Krotki o okresie życia 1576421287 i więcej nie były skanowane ze względu na kolejność klucze indeksu exp. To są oszczędności, które chcieliśmy uzyskać na samym początku.
Zatrzymajmy zadanie:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
realizacja
Najlepszym sposobem na przedstawienie wszystkich cech projektu jest jego oryginalne źródło.
Argumenty, które przekazujemy do metody start, przechowywane są w strukturze o nazwie expirationd_task:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Atrybut name jest nazwą zadania. Atrybut space_id jest identyfikatorem przestrzeni. Atrybut rm_index_id jest identyfikatorem unikalnego indeksu, według którego krotki zostaną usunięte. Atrybut it_index_id jest identyfikatorem indeksu, według którego będą poruszane krotki. Atrybut it_index_type to typ indeksu, według którego będą przeglądane krotki. Atrybut file_no to numer pola krotki z czasem życia. Atrybut scan_size określa maksymalną liczbę krotek skanowanych w jednej transakcji. Atrybut scan_time to czas pełnego skanowania w sekundach.
Nie będziemy rozważać analizowania argumentów. To żmudna, ale prosta praca, w której pomoże Ci biblioteka
Podajemy prototypy wszystkich funkcji używanych do parsowania:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Przejdźmy teraz do najważniejszej rzeczy - logiki omijania spacji i usuwania krotek. Każdy blok krotek nie większy niż scan_size jest skanowany i modyfikowany w ramach pojedynczej transakcji. Jeśli się powiedzie, transakcja zostanie zatwierdzona; jeśli wystąpi błąd, zostanie wycofana. Ostatnim argumentem funkcji expirationd_iterate jest wskaźnik do iteratora, od którego rozpoczyna się lub kontynuuje skanowanie. Ten iterator jest zwiększany wewnętrznie, aż do wystąpienia błędu, wyczerpania się miejsca lub braku możliwości wcześniejszego zatrzymania procesu. Funkcja expirationd_expired sprawdza czas życia krotki, expirationd_delete usuwa krotkę, expirationd_breakable sprawdza, czy musimy kontynuować.
Kod funkcji Expirationd_iterate:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Kod funkcji expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Kod funkcji Expirationd_delete:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Kod funkcji expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Aplikacja
Kod źródłowy możesz zobaczyć pod adresem
Źródło: www.habr.com