Napisanie własnego, ograniczonego, wygasłego modułu dla tarantoolu

Napisanie własnego, ograniczonego, wygasłego modułu dla tarantoolu

Jakiś czas temu stanęliśmy przed problemem czyszczenia krotek w przestrzeniach tarantol. Czyszczenie trzeba było rozpoczynać nie wtedy, gdy w tarantoolu kończyła się już pamięć, ale wcześniej i z określoną częstotliwością. Do tego zadania tarantool posiada moduł napisany w Lua o nazwie wygaśnięcie. Po krótkim użytkowaniu tego modułu zdaliśmy sobie sprawę, że nie jest on dla nas odpowiedni: ze względu na ciągłe czyszczenie dużych ilości danych Lua zawieszała się w GC. Dlatego pomyśleliśmy o stworzeniu własnego modułu z limitem ważności, mając nadzieję, że kod napisany w natywnym języku programowania najlepiej rozwiąże nasze problemy.

Dobrym przykładem dla nas był moduł tarantool tzw memcached. Zastosowane w nim podejście polega na tym, że w przestrzeni tworzone jest osobne pole, które wskazuje czas życia krotki, czyli inaczej ttl. Moduł w tle skanuje przestrzeń, porównuje TTL z aktualnym czasem i decyduje, czy usunąć krotkę, czy nie. Kod modułu memcached jest prosty i elegancki, ale zbyt ogólny. Po pierwsze, nie bierze pod uwagę typu indeksu, który jest przeszukiwany i usuwany. Po drugie, przy każdym przebiegu skanowane są wszystkie krotki, których liczba może być dość duża. A jeśli w wygasłym module rozwiązano pierwszy problem (indeks drzewa został wydzielony do osobnej klasy), to na drugi nadal nie zwrócono uwagi. Te trzy punkty z góry przesądziły o wyborze na korzyść napisania własnego kodu.

Opis

Dokumentacja tarantoola jest bardzo dobra instruktaż o tym, jak pisać procedury składowane w C. Na początek sugeruję zapoznanie się z tym, aby zrozumieć wstawki z poleceniami i kodem, które pojawią się poniżej. Na to też warto zwrócić uwagę odniesienie do obiektów dostępnych podczas pisania własnego modułu z ograniczeniami, a mianowicie pudełko, włókno, wskaźnik и txn.

Zacznijmy z daleka i przyjrzyjmy się, jak wygląda z zewnątrz zamknięty, wygasły moduł:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Dla uproszczenia uruchamiamy tarantool w katalogu, w którym znajduje się nasza biblioteka libcapped-expirationd.so. Z biblioteki eksportowane są dwie funkcje: start i kill. Pierwszym krokiem jest udostępnienie tych funkcji z Lua za pomocą box.schema.func.create i box.schema.user.grant. Następnie utwórz przestrzeń, w której krotki będą zawierać tylko trzy pola: pierwsze to unikalny identyfikator, drugie to e-mail, a trzecie to czas życia krotki. Budujemy indeks drzewa na pierwszym polu i nazywamy go podstawowym. Następnie otrzymujemy obiekt połączenia z naszą natywną biblioteką.

Po pracach przygotowawczych uruchom funkcję start:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Ten przykład będzie działał podczas skanowania dokładnie tak samo, jak wygasły moduł napisany w Lua. Pierwszym argumentem funkcji start jest unikalna nazwa zadania. Drugi to identyfikator spacji. Trzeci to unikalny indeks, dzięki któremu krotki zostaną usunięte. Czwarty to indeks, według którego będą przechodzić krotki. Piąty to numer pola krotki z czasem życia (numeracja zaczyna się od 1, a nie od 0!). Szósty i siódmy to ustawienia skanowania. 1024 to maksymalna liczba krotek, które można wyświetlić w pojedynczej transakcji. 3600 — czas pełnego skanowania w sekundach.

Należy pamiętać, że w przykładzie zastosowano ten sam indeks do przeszukiwania i usuwania. Jeśli jest to indeks drzewa, to przejście odbywa się od mniejszego klucza do większego. Jeśli istnieje inny, na przykład indeks skrótu, wówczas przechodzenie odbywa się z reguły w losowej kolejności. Wszystkie krotki przestrzenne są skanowane w jednym skanie.

Wstawmy w przestrzeń kilka krotek o czasie życia 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Sprawdźmy, czy wstawienie się powiodło:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Powtórzmy wybór po ponad 60 sekundach (licząc od początku wstawienia pierwszej krotki) i zobaczmy, że moduł z ograniczeniem, który wygasł, został już przetworzony:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Zatrzymajmy zadanie:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Przyjrzyjmy się drugiemu przykładowi, w którym do indeksowania używany jest oddzielny indeks:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Wszystko tutaj jest takie samo jak w pierwszym przykładzie, z kilkoma wyjątkami. Budujemy indeks drzewa na trzecim polu i nazywamy go exp. Indeks ten nie musi być unikalny, w przeciwieństwie do indeksu zwanego indeksem podstawowym. Przejście zostanie przeprowadzone według indeksu exp, a usunięcie przez podstawowy. Pamiętamy, że poprzednio oba były wykonywane tylko przy użyciu indeksu podstawowego.

Po pracach przygotowawczych uruchamiamy funkcję start z nowymi argumentami:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Wstawmy ponownie w przestrzeń kilka krotek z czasem życia wynoszącym 60 sekund:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Po 30 sekundach analogicznie dodamy jeszcze kilka krotek:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Sprawdźmy, czy wstawienie się powiodło:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Powtórzmy wybór po ponad 60 sekundach (licząc od początku wstawienia pierwszej krotki) i zobaczmy, że moduł z ograniczeniem, który wygasł, został już przetworzony:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

W przestrzeni pozostało jeszcze kilka krotek, którym będzie pozostało około 30 sekund życia. Co więcej, skanowanie zatrzymywało się przy przejściu z krotki o identyfikatorze 2 i czasie życia 1576421257 do krotki o identyfikatorze 3 i czasie życia 1576421287. Krotki o okresie życia 1576421287 i więcej nie były skanowane ze względu na kolejność klucze indeksu exp. To są oszczędności, które chcieliśmy uzyskać na samym początku.

Zatrzymajmy zadanie:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

realizacja

Najlepszym sposobem na przedstawienie wszystkich cech projektu jest jego oryginalne źródło. kod! W ramach publikacji skupimy się jedynie na najważniejszych punktach, czyli algorytmach obejścia przestrzeni.

Argumenty, które przekazujemy do metody start, przechowywane są w strukturze o nazwie expirationd_task:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Atrybut name jest nazwą zadania. Atrybut space_id jest identyfikatorem przestrzeni. Atrybut rm_index_id jest identyfikatorem unikalnego indeksu, według którego krotki zostaną usunięte. Atrybut it_index_id jest identyfikatorem indeksu, według którego będą poruszane krotki. Atrybut it_index_type to typ indeksu, według którego będą przeglądane krotki. Atrybut file_no to numer pola krotki z czasem życia. Atrybut scan_size określa maksymalną liczbę krotek skanowanych w jednej transakcji. Atrybut scan_time to czas pełnego skanowania w sekundach.

Nie będziemy rozważać analizowania argumentów. To żmudna, ale prosta praca, w której pomoże Ci biblioteka msgpuck. Trudności mogą pojawić się tylko w przypadku indeksów przekazywanych z Lua jako złożona struktura danych typu mp_map, a nie przy użyciu prostych typów mp_bool, mp_double, mp_int, mp_uint i mp_array. Ale nie ma potrzeby analizowania całego indeksu. Wystarczy sprawdzić jego unikalność, obliczyć typ i wyodrębnić identyfikator.

Podajemy prototypy wszystkich funkcji używanych do parsowania:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Przejdźmy teraz do najważniejszej rzeczy - logiki omijania spacji i usuwania krotek. Każdy blok krotek nie większy niż scan_size jest skanowany i modyfikowany w ramach pojedynczej transakcji. Jeśli się powiedzie, transakcja zostanie zatwierdzona; jeśli wystąpi błąd, zostanie wycofana. Ostatnim argumentem funkcji expirationd_iterate jest wskaźnik do iteratora, od którego rozpoczyna się lub kontynuuje skanowanie. Ten iterator jest zwiększany wewnętrznie, aż do wystąpienia błędu, wyczerpania się miejsca lub braku możliwości wcześniejszego zatrzymania procesu. Funkcja expirationd_expired sprawdza czas życia krotki, expirationd_delete usuwa krotkę, expirationd_breakable sprawdza, czy musimy kontynuować.

Kod funkcji Expirationd_iterate:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Kod funkcji expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Kod funkcji Expirationd_delete:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Kod funkcji expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Aplikacja

Kod źródłowy możesz zobaczyć pod adresem tutaj!

Źródło: www.habr.com

Dodaj komentarz