Vor einiger Zeit standen wir vor dem Problem, Tupel in Räumen zu bereinigen
Ein gutes Beispiel für uns war das Tarantool-Modul namens
Beschreibung
Die Dokumentation für Tarantool ist sehr gut
Beginnen wir aus der Ferne und schauen wir uns an, wie ein abgelaufenes Modul mit einer Kappe von außen aussieht:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
capped_connection = net_box:new(3300)
Der Einfachheit halber starten wir tarantool in dem Verzeichnis, in dem sich unsere Bibliothek libcapped-expirationd.so befindet. Aus der Bibliothek werden zwei Funktionen exportiert: start und kill. Der erste Schritt besteht darin, diese Funktionen mithilfe von box.schema.func.create und box.schema.user.grant in Lua verfügbar zu machen. Erstellen Sie dann einen Bereich, dessen Tupel nur drei Felder enthalten: Das erste ist eine eindeutige Kennung, das zweite ist E-Mail und das dritte ist die Lebensdauer des Tupels. Wir erstellen einen Baumindex über dem ersten Feld und nennen ihn primär. Als nächstes erhalten wir das Verbindungsobjekt zu unserer nativen Bibliothek.
Führen Sie nach den Vorarbeiten die Startfunktion aus:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Dieses Beispiel funktioniert beim Scannen genauso wie das expirationd-Modul, das in Lua geschrieben ist. Das erste Argument der Startfunktion ist der eindeutige Name der Aufgabe. Der zweite ist die Raumkennung. Der dritte ist ein eindeutiger Index, anhand dessen Tupel gelöscht werden. Der vierte ist der Index, nach dem die Tupel durchlaufen werden. Die fünfte ist die Nummer des Tupelfelds mit Lebensdauer (Nummerierung beginnt bei 1, nicht bei 0!). Die sechste und siebte sind Scaneinstellungen. 1024 ist die maximale Anzahl von Tupeln, die in einer einzelnen Transaktion angezeigt werden können. 3600 – vollständige Scanzeit in Sekunden.
Beachten Sie, dass das Beispiel zum Crawlen und Löschen denselben Index verwendet. Wenn es sich um einen Baumindex handelt, erfolgt die Traversierung vom kleineren zum größeren Schlüssel. Gibt es einen anderen, beispielsweise einen Hash-Index, erfolgt die Durchquerung in der Regel in zufälliger Reihenfolge. Alle Raumtupel werden in einem Scan gescannt.
Fügen wir mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Raum ein:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Überprüfen wir, ob das Einfügen erfolgreich war:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576418976]
- [1, '[email protected]', 1576418976]
- [2, '[email protected]', 1576418976]
...
Wiederholen wir die Auswahl nach mehr als 60 Sekunden (gezählt ab Beginn der Einfügung des ersten Tupels) und stellen wir fest, dass das Modul „capped expirationd“ bereits verarbeitet wurde:
tarantool> box.space.tester.index.primary:select()
---
- []
...
Stoppen wir die Aufgabe:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Schauen wir uns ein zweites Beispiel an, bei dem ein separater Index für den Crawl verwendet wird:
fiber = require('fiber')
net_box = require('net.box')
box.cfg{listen = 3300}
box.schema.func.create('libcapped-expirationd.start', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start')
box.schema.func.create('libcapped-expirationd.kill', {language = 'C'})
box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill')
box.schema.space.create('tester')
box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}})
box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}})
capped_connection = net_box:new(3300)
Bis auf wenige Ausnahmen ist hier alles wie im ersten Beispiel. Wir erstellen einen Baumindex über dem dritten Feld und nennen ihn exp. Im Gegensatz zum Primärindex muss dieser Index nicht eindeutig sein. Die Durchquerung erfolgt über den Exp-Index und die Löschung über den Primärindex. Wir erinnern uns, dass beides zuvor nur mit dem Primärindex durchgeführt wurde.
Nach der Vorarbeit führen wir die Startfunktion mit neuen Argumenten aus:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Fügen wir noch einmal mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Raum ein:
box.space.tester:insert{0, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{1, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{2, '[email protected]', math.floor(fiber.time()) + 60}
Nach 30 Sekunden fügen wir analog noch ein paar Tupel hinzu:
box.space.tester:insert{3, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{4, '[email protected]', math.floor(fiber.time()) + 60}
box.space.tester:insert{5, '[email protected]', math.floor(fiber.time()) + 60}
Überprüfen wir, ob das Einfügen erfolgreich war:
tarantool> box.space.tester.index.primary:select()
---
- - [0, '[email protected]', 1576421257]
- [1, '[email protected]', 1576421257]
- [2, '[email protected]', 1576421257]
- [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Wiederholen wir die Auswahl nach mehr als 60 Sekunden (gezählt ab Beginn der Einfügung des ersten Tupels) und stellen wir fest, dass das Modul „capped expirationd“ bereits verarbeitet wurde:
tarantool> box.space.tester.index.primary:select()
---
- - [3, '[email protected]', 1576421287]
- [4, '[email protected]', 1576421287]
- [5, '[email protected]', 1576421287]
...
Es sind noch einige Tupel im Raum übrig, die noch etwa 30 Sekunden zu leben haben. Darüber hinaus wurde der Scan gestoppt, wenn von einem Tupel mit einer ID von 2 und einer Lebensdauer von 1576421257 zu einem Tupel mit einer ID von 3 und einer Lebensdauer von 1576421287 gewechselt wurde. Tupel mit einer Lebensdauer von 1576421287 oder mehr wurden aufgrund der Reihenfolge von nicht gescannt die exp-Indexschlüssel. Das sind die Einsparungen, die wir von Anfang an erreichen wollten.
Stoppen wir die Aufgabe:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementierung
Der beste Weg, über alle Merkmale eines Projekts zu berichten, ist die Originalquelle.
Die Argumente, die wir an die Startmethode übergeben, werden in einer Struktur namens expirationd_task gespeichert:
struct expirationd_task
{
char name[256];
uint32_t space_id;
uint32_t rm_index_id;
uint32_t it_index_id;
uint32_t it_index_type;
uint32_t field_no;
uint32_t scan_size;
uint32_t scan_time;
};
Das Namensattribut ist der Name der Aufgabe. Das Attribut „space_id“ ist die Raumkennung. Das Attribut rm_index_id ist die Kennung des eindeutigen Index, anhand dessen Tupel gelöscht werden. Das Attribut it_index_id ist die Kennung des Index, nach dem Tupel durchlaufen werden. Das Attribut it_index_type ist der Indextyp, nach dem Tupel durchlaufen werden. Das Attribut filed_no ist die Nummer des Tupelfelds mit Lebensdauer. Das Attribut scan_size gibt die maximale Anzahl von Tupeln an, die in einer Transaktion gescannt werden. Das Attribut scan_time ist die vollständige Scanzeit in Sekunden.
Wir werden das Parsen von Argumenten nicht berücksichtigen. Dies ist eine mühsame, aber einfache Aufgabe, bei der Ihnen die Bibliothek behilflich sein wird
Wir listen die Prototypen aller Funktionen auf, die zum Parsen verwendet werden:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos);
bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos);
bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos);
bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos);
bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos);
bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Kommen wir nun zum Wichtigsten – der Logik, Leerzeichen zu umgehen und Tupel zu löschen. Jeder Tupelblock, der nicht größer als scan_size ist, wird in einer einzigen Transaktion gescannt und geändert. Bei Erfolg wird diese Transaktion festgeschrieben; wenn ein Fehler auftritt, wird sie zurückgesetzt. Das letzte Argument der expirationd_iterate-Funktion ist ein Zeiger auf den Iterator, von dem aus der Scanvorgang beginnt oder fortgesetzt wird. Dieser Iterator wird intern inkrementiert, bis ein Fehler auftritt, der Speicherplatz aufgebraucht ist oder es nicht möglich ist, den Prozess im Voraus zu stoppen. Die Funktion expirationd_expired prüft die Lebensdauer eines Tupels, expirationd_delete löscht ein Tupel, expirationd_breakable prüft, ob wir weitermachen müssen.
Expirationd_iterate-Funktionscode:
static bool
expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp)
{
box_iterator_t *iter = *iterp;
box_txn_begin();
for (uint32_t i = 0; i < task->scan_size; ++i) {
box_tuple_t *tuple = NULL;
if (box_iterator_next(iter, &tuple) < 0) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_rollback();
return false;
}
if (!tuple) {
box_iterator_free(iter);
*iterp = NULL;
box_txn_commit();
return true;
}
if (expirationd_expired(task, tuple))
expirationd_delete(task, tuple);
else if (expirationd_breakable(task))
break;
}
box_txn_commit();
return true;
}
Funktionscode expirationd_expired:
static bool
expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple)
{
const char *buf = box_tuple_field(tuple, task->field_no - 1);
if (!buf || mp_typeof(*buf) != MP_UINT)
return false;
uint64_t val = mp_decode_uint(&buf);
if (val > fiber_time64() / 1000000)
return false;
return true;
}
Expirationd_delete-Funktionscode:
static void
expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple)
{
uint32_t len;
const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len);
box_delete(task->space_id, task->rm_index_id, str, str + len, NULL);
}
Funktionscode expiration_breakable:
static bool
expirationd_breakable(struct expirationd_task *task)
{
return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT;
}
Anwendung
Den Quellcode können Sie unter einsehen
Source: habr.com