Schreiben unseres eigenen Moduls mit begrenztem Ablaufdatum für Tarantool

Schreiben unseres eigenen Moduls mit begrenztem Ablaufdatum für Tarantool

Vor einiger Zeit standen wir vor dem Problem, Tupel in Räumen zu bereinigen Tarantool. Die Reinigung musste nicht gestartet werden, wenn tarantool bereits keinen Speicher mehr hatte, sondern im Voraus und in einer bestimmten Häufigkeit. Für diese Aufgabe verfügt Tarantool über ein in Lua geschriebenes Modul namens Ablauf. Nachdem wir dieses Modul für kurze Zeit verwendet hatten, stellten wir fest, dass es für uns nicht geeignet war: Aufgrund der ständigen Bereinigung großer Datenmengen blieb Lua im GC hängen. Deshalb haben wir darüber nachgedacht, ein eigenes Modul mit begrenztem Ablaufdatum zu entwickeln, in der Hoffnung, dass der in einer nativen Programmiersprache geschriebene Code unsere Probleme bestmöglich lösen würde.

Ein gutes Beispiel für uns war das Tarantool-Modul namens memcached. Der darin verwendete Ansatz basiert darauf, dass im Raum ein separates Feld erstellt wird, das die Lebensdauer des Tupels angibt, also ttl. Das Modul im Hintergrund scannt den Raum, vergleicht die TTL mit der aktuellen Zeit und entscheidet, ob das Tupel gelöscht wird oder nicht. Der Code des zwischengespeicherten Moduls ist einfach und elegant, aber zu allgemein. Erstens wird der Indextyp, der gecrawlt und gelöscht wird, nicht berücksichtigt. Zweitens werden bei jedem Durchgang alle Tupel gescannt, deren Anzahl recht groß sein kann. Und wenn im abgelaufenen Modul das erste Problem gelöst wurde (der Baumindex wurde in eine separate Klasse aufgeteilt), dann wurde dem zweiten immer noch keine Aufmerksamkeit geschenkt. Diese drei Punkte gaben die Entscheidung vor, meinen eigenen Code zu schreiben.

Beschreibung

Die Dokumentation für Tarantool ist sehr gut Lernprogramm Informationen zum Schreiben Ihrer gespeicherten Prozeduren in C. Zunächst empfehle ich Ihnen, sich damit vertraut zu machen, um die unten aufgeführten Befehls- und Codeeinfügungen zu verstehen. Es lohnt sich auch, darauf zu achten Referenz auf Objekte, die beim Schreiben Ihres eigenen Capped-Moduls verfügbar sind, nämlich Box, Faser, Index и txn.

Beginnen wir aus der Ferne und schauen wir uns an, wie ein abgelaufenes Modul mit einer Kappe von außen aussieht:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)

Der Einfachheit halber starten wir tarantool in dem Verzeichnis, in dem sich unsere Bibliothek libcapped-expirationd.so befindet. Aus der Bibliothek werden zwei Funktionen exportiert: start und kill. Der erste Schritt besteht darin, diese Funktionen mithilfe von box.schema.func.create und box.schema.user.grant in Lua verfügbar zu machen. Erstellen Sie dann einen Bereich, dessen Tupel nur drei Felder enthalten: Das erste ist eine eindeutige Kennung, das zweite ist E-Mail und das dritte ist die Lebensdauer des Tupels. Wir erstellen einen Baumindex über dem ersten Feld und nennen ihn primär. Als nächstes erhalten wir das Verbindungsobjekt zu unserer nativen Bibliothek.

Führen Sie nach den Vorarbeiten die Startfunktion aus:

capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})

Dieses Beispiel funktioniert beim Scannen genauso wie das expirationd-Modul, das in Lua geschrieben ist. Das erste Argument der Startfunktion ist der eindeutige Name der Aufgabe. Der zweite ist die Raumkennung. Der dritte ist ein eindeutiger Index, anhand dessen Tupel gelöscht werden. Der vierte ist der Index, nach dem die Tupel durchlaufen werden. Die fünfte ist die Nummer des Tupelfelds mit Lebensdauer (Nummerierung beginnt bei 1, nicht bei 0!). Die sechste und siebte sind Scaneinstellungen. 1024 ist die maximale Anzahl von Tupeln, die in einer einzelnen Transaktion angezeigt werden können. 3600 – vollständige Scanzeit in Sekunden.

Beachten Sie, dass das Beispiel zum Crawlen und Löschen denselben Index verwendet. Wenn es sich um einen Baumindex handelt, erfolgt die Traversierung vom kleineren zum größeren Schlüssel. Gibt es einen anderen, beispielsweise einen Hash-Index, erfolgt die Durchquerung in der Regel in zufälliger Reihenfolge. Alle Raumtupel werden in einem Scan gescannt.

Fügen wir mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Raum ein:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Überprüfen wir, ob das Einfügen erfolgreich war:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
  - [1, '[email protected]', 1576418976]
  - [2, '[email protected]', 1576418976]
...

Wiederholen wir die Auswahl nach mehr als 60 Sekunden (gezählt ab Beginn der Einfügung des ersten Tupels) und stellen wir fest, dass das Modul „capped expirationd“ bereits verarbeitet wurde:

tarantool> box.space.tester.index.primary:select()
---
  - []
...

Stoppen wir die Aufgabe:

capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})

Schauen wir uns ein zweites Beispiel an, bei dem ein separater Index für den Crawl verwendet wird:

fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)

Bis auf wenige Ausnahmen ist hier alles wie im ersten Beispiel. Wir erstellen einen Baumindex über dem dritten Feld und nennen ihn exp. Im Gegensatz zum Primärindex muss dieser Index nicht eindeutig sein. Die Durchquerung erfolgt über den Exp-Index und die Löschung über den Primärindex. Wir erinnern uns, dass beides zuvor nur mit dem Primärindex durchgeführt wurde.

Nach der Vorarbeit führen wir die Startfunktion mit neuen Argumenten aus:

capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})

Fügen wir noch einmal mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Raum ein:

box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}

Nach 30 Sekunden fügen wir analog noch ein paar Tupel hinzu:

box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}

Überprüfen wir, ob das Einfügen erfolgreich war:

tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
  - [1, '[email protected]', 1576421257]
  - [2, '[email protected]', 1576421257]
  - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Wiederholen wir die Auswahl nach mehr als 60 Sekunden (gezählt ab Beginn der Einfügung des ersten Tupels) und stellen wir fest, dass das Modul „capped expirationd“ bereits verarbeitet wurde:

tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
  - [4, '[email protected]', 1576421287]
  - [5, '[email protected]', 1576421287]
...

Es sind noch einige Tupel im Raum übrig, die noch etwa 30 Sekunden zu leben haben. Darüber hinaus wurde der Scan gestoppt, wenn von einem Tupel mit einer ID von 2 und einer Lebensdauer von 1576421257 zu einem Tupel mit einer ID von 3 und einer Lebensdauer von 1576421287 gewechselt wurde. Tupel mit einer Lebensdauer von 1576421287 oder mehr wurden aufgrund der Reihenfolge von nicht gescannt die exp-Indexschlüssel. Das sind die Einsparungen, die wir von Anfang an erreichen wollten.

Stoppen wir die Aufgabe:

capped_connection:call('libcapped-expirationd.kill', {'indexed'})

Implementierung

Der beste Weg, über alle Merkmale eines Projekts zu berichten, ist die Originalquelle. Code! Im Rahmen der Veröffentlichung konzentrieren wir uns nur auf die wichtigsten Punkte, nämlich Space-Bypass-Algorithmen.

Die Argumente, die wir an die Startmethode übergeben, werden in einer Struktur namens expirationd_task gespeichert:

struct expirationd_task
{
  char name[256];
  uint32_t space_id;
  uint32_t rm_index_id;
  uint32_t it_index_id;
  uint32_t it_index_type; 
  uint32_t field_no;
  uint32_t scan_size;
  uint32_t scan_time;
};

Das Namensattribut ist der Name der Aufgabe. Das Attribut „space_id“ ist die Raumkennung. Das Attribut rm_index_id ist die Kennung des eindeutigen Index, anhand dessen Tupel gelöscht werden. Das Attribut it_index_id ist die Kennung des Index, nach dem Tupel durchlaufen werden. Das Attribut it_index_type ist der Indextyp, nach dem Tupel durchlaufen werden. Das Attribut filed_no ist die Nummer des Tupelfelds mit Lebensdauer. Das Attribut scan_size gibt die maximale Anzahl von Tupeln an, die in einer Transaktion gescannt werden. Das Attribut scan_time ist die vollständige Scanzeit in Sekunden.

Wir werden das Parsen von Argumenten nicht berücksichtigen. Dies ist eine mühsame, aber einfache Aufgabe, bei der Ihnen die Bibliothek behilflich sein wird msgpuck. Schwierigkeiten können nur bei Indizes auftreten, die von Lua als komplexe Datenstruktur mit dem Typ mp_map übergeben werden und nicht mit den einfachen Typen mp_bool, mp_double, mp_int, mp_uint und mp_array. Es ist jedoch nicht erforderlich, den gesamten Index zu analysieren. Sie müssen lediglich die Eindeutigkeit überprüfen, den Typ berechnen und die Kennung extrahieren.

Wir listen die Prototypen aller Funktionen auf, die zum Parsen verwendet werden:

bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);

Kommen wir nun zum Wichtigsten – der Logik, Leerzeichen zu umgehen und Tupel zu löschen. Jeder Tupelblock, der nicht größer als scan_size ist, wird in einer einzigen Transaktion gescannt und geändert. Bei Erfolg wird diese Transaktion festgeschrieben; wenn ein Fehler auftritt, wird sie zurückgesetzt. Das letzte Argument der expirationd_iterate-Funktion ist ein Zeiger auf den Iterator, von dem aus der Scanvorgang beginnt oder fortgesetzt wird. Dieser Iterator wird intern inkrementiert, bis ein Fehler auftritt, der Speicherplatz aufgebraucht ist oder es nicht möglich ist, den Prozess im Voraus zu stoppen. Die Funktion expirationd_expired prüft die Lebensdauer eines Tupels, expirationd_delete löscht ein Tupel, expirationd_breakable prüft, ob wir weitermachen müssen.

Expirationd_iterate-Funktionscode:

static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
  box_iterator_t *iter = *iterp;
  box_txn_begin();
  for (uint32_t i = 0; i < task->scan_size; ++i) {
    box_tuple_t *tuple = NULL;
    if (box_iterator_next(iter, &tuple) < 0) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_rollback();
      return false;
    }
    if (!tuple) {
      box_iterator_free(iter);
      *iterp = NULL;
      box_txn_commit();
      return true;
    }
    if (expirationd_expired(task, tuple))
      expirationd_delete(task, tuple);
    else if (expirationd_breakable(task))
      break;
  }
  box_txn_commit();
  return true;
}

Funktionscode expirationd_expired:

static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
  const char *buf = box_tuple_field(tuple, task->field_no - 1);
  if (!buf || mp_typeof(*buf) != MP_UINT)
    return false;
  uint64_t val = mp_decode_uint(&buf);
  if (val > fiber_time64() / 1000000)
    return false;
  return true;
}

Expirationd_delete-Funktionscode:

static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
  uint32_t len;
  const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
  box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}

Funktionscode expiration_breakable:

static bool
expirationd_breakable(struct expirationd_task *task)
{
  return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}

Anwendung

Den Quellcode können Sie unter einsehen hier!

Source: habr.com

Kommentar hinzufügen