Merkmale der Q- und KDB+-Sprache am Beispiel eines Echtzeitdienstes

Was die KDB+-Basis und die Q-Programmiersprache sind und welche Stärken und Schwächen sie haben, können Sie in meinem vorherigen Artikel nachlesen Artikel und kurz in der Einleitung. In dem Artikel werden wir einen Dienst auf Q implementieren, der den eingehenden Datenstrom verarbeitet und jede Minute verschiedene Aggregationsfunktionen im „Echtzeit“-Modus berechnet (d. h. er hat Zeit, alles zu berechnen, bevor der nächste Datenabschnitt kommt). Das Hauptmerkmal von Q ist, dass es sich um eine Vektorsprache handelt, die es Ihnen ermöglicht, nicht mit einzelnen Objekten, sondern mit deren Arrays, Arrays aus Arrays und anderen komplexen Objekten zu arbeiten. Sprachen wie Q und seine Verwandten K, J, APL sind für ihre Kürze bekannt. Oft kann ein Programm, das mehrere Bildschirme mit Code in einer vertrauten Sprache wie Java einnimmt, in wenigen Zeilen darauf geschrieben werden. Das möchte ich in diesem Artikel zeigen.

Merkmale der Q- und KDB+-Sprache am Beispiel eines Echtzeitdienstes

Einführung

KDB+ ist eine spaltenbasierte Datenbank, die sich auf sehr große Datenmengen konzentriert, die auf eine bestimmte Weise (hauptsächlich nach Zeit) geordnet sind. Es wird hauptsächlich in Finanzinstituten eingesetzt – Banken, Investmentfonds, Versicherungen. Die Q-Sprache ist die interne Sprache von KDB+, die es Ihnen ermöglicht, effektiv mit diesen Daten zu arbeiten. Die Q-Ideologie ist Kürze und Effizienz, während Klarheit geopfert wird. Begründet wird dies damit, dass die Vektorsprache in jedem Fall schwer zu verstehen sein wird und die Kürze und Fülle der Aufzeichnung es ermöglicht, einen viel größeren Teil des Programms auf einem Bildschirm zu sehen, was letztendlich das Verständnis erleichtert.

In diesem Artikel implementieren wir ein vollwertiges Programm in Q und vielleicht möchten Sie es ausprobieren. Dazu benötigen Sie das aktuelle Q. Die kostenlose 32-Bit-Version können Sie auf der Website der Firma kx herunterladen – www.kx.com. Dort finden Sie bei Interesse Referenzinformationen zu Q, dem Buch Q Für Sterbliche und verschiedene Artikel zu diesem Thema.

Formulierung des Problems

Es gibt eine Quelle, die alle 25 Millisekunden eine Tabelle mit Daten sendet. Da KDB+ hauptsächlich im Finanzwesen verwendet wird, gehen wir davon aus, dass es sich um eine Tabelle von Transaktionen (Trades) handelt, die folgende Spalten enthält: time (Zeit in Millisekunden), sym (Firmenbezeichnung an der Börse – IBM, AAPL,…), Preis (der Preis, zu dem die Aktien gekauft wurden), Größe (Umfang der Transaktion). Das 25-Millisekunden-Intervall ist willkürlich, nicht zu klein und nicht zu lang. Sein Vorhandensein bedeutet, dass die Daten bereits gepuffert beim Dienst ankommen. Es wäre einfach, eine Pufferung auf der Dienstseite zu implementieren, einschließlich einer dynamischen Pufferung abhängig von der aktuellen Auslastung, aber der Einfachheit halber konzentrieren wir uns auf ein festes Intervall.

Der Dienst muss jede Minute für jedes eingehende Symbol aus der Sym-Spalte eine Reihe von Aggregationsfunktionen zählen – Höchstpreis, Durchschnittspreis, Summengröße usw. nützliche Informationen. Der Einfachheit halber gehen wir davon aus, dass alle Funktionen inkrementell berechnet werden können, d. h. Um einen neuen Wert zu erhalten, reicht es aus, zwei Zahlen zu kennen – den alten und den eingehenden Wert. Beispielsweise haben die Funktionen max, Average, sum diese Eigenschaft, die Medianfunktion jedoch nicht.

Wir gehen außerdem davon aus, dass der eingehende Datenstrom zeitlich geordnet ist. Dies gibt uns die Möglichkeit, nur in letzter Minute zu arbeiten. In der Praxis reicht es aus, mit dem aktuellen und dem vorherigen Protokoll arbeiten zu können, falls sich einige Updates verspäten. Der Einfachheit halber werden wir diesen Fall nicht betrachten.

Aggregationsfunktionen

Die erforderlichen Aggregationsfunktionen sind unten aufgeführt. Ich habe so viele davon wie möglich genommen, um die Belastung des Dienstes zu erhöhen:

  • hoch – maximaler Preis – maximaler Preis pro Minute.
  • niedrig – Mindestpreis – Mindestpreis pro Minute.
  • firstPrice – erster Preis – erster Preis pro Minute.
  • lastPrice – letzter Preis – letzter Preis pro Minute.
  • firstSize – erste Größe – erste Handelsgröße pro Minute.
  • lastSize – letzte Größe – letzte Handelsgröße in einer Minute.
  • numTrades – count i – Anzahl der Trades pro Minute.
  • Volumen – Summengröße – Summe der Handelsgrößen pro Minute.
  • pvolume – Summenpreis – Summe der Preise pro Minute, erforderlich für avgPrice.
  • – Gesamtumsatzpreis*Größe – Gesamtvolumen der Transaktionen pro Minute.
  • avgPrice – pvolume%numTrades – Durchschnittspreis pro Minute.
  • avgSize – volume%numTrades – durchschnittliche Handelsgröße pro Minute.
  • vwap – Umsatz%Volumen – Durchschnittspreis pro Minute, gewichtet nach Transaktionsgröße.
  • cumVolume – Gesamtvolumen – kumulierte Größe der Transaktionen über die gesamte Zeit.

Lassen Sie uns gleich einen nicht offensichtlichen Punkt besprechen – wie diese Spalten zum ersten Mal und für jede weitere Minute initialisiert werden. Einige Spalten vom Typ „firstPrice“ müssen jedes Mal auf null initialisiert werden; ihr Wert ist undefiniert. Andere Volumentypen müssen immer auf 0 gesetzt werden. Es gibt auch Spalten, die einen kombinierten Ansatz erfordern – zum Beispiel muss cumVolume aus der vorherigen Minute kopiert und für die erste auf 0 gesetzt werden. Lassen Sie uns alle diese Parameter mithilfe der Wörterbuchdaten festlegen Typ (analog zu einem Datensatz):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Der Einfachheit halber habe ich sym und time zum Wörterbuch hinzugefügt. Jetzt ist initWith eine vorgefertigte Zeile aus der endgültigen aggregierten Tabelle, in der noch das richtige sym und die richtige Zeit festgelegt werden müssen. Sie können damit neue Zeilen zu einer Tabelle hinzufügen.

Wir benötigen aggCols, wenn wir eine Aggregationsfunktion erstellen. Aufgrund der Reihenfolge, in der Ausdrücke in Q ausgewertet werden (von rechts nach links), muss die Liste invertiert werden. Das Ziel besteht darin, sicherzustellen, dass die Berechnung von „high“ auf „cumVolume“ übergeht, da einige Spalten von den vorherigen abhängen.

Spalten, die von der vorherigen in eine neue Minute kopiert werden müssen, der Einfachheit halber wird die Sym-Spalte hinzugefügt:

rollColumns:`sym`cumVolume;

Teilen wir nun die Spalten in Gruppen ein, je nachdem, wie sie aktualisiert werden sollen. Es lassen sich drei Typen unterscheiden:

  1. Akkumulatoren (Volumen, Umsatz,...) – wir müssen den eingehenden Wert zum vorherigen addieren.
  2. Mit einem speziellen Punkt (hoch, niedrig, ..) – der erste Wert in der Minute wird aus den eingehenden Daten übernommen, der Rest wird mit der Funktion berechnet.
  3. Ausruhen. Wird immer über eine Funktion berechnet.

Definieren wir Variablen für diese Klassen:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Berechnungsreihenfolge

Wir werden die aggregierte Tabelle in zwei Schritten aktualisieren. Aus Effizienzgründen verkleinern wir zunächst die eingehende Tabelle, sodass für jedes Zeichen und jede Minute nur eine Zeile vorhanden ist. Die Tatsache, dass alle unsere Funktionen inkrementell und assoziativ sind, garantiert, dass sich das Ergebnis dieses zusätzlichen Schritts nicht ändert. Sie können die Tabelle mit select verkleinern:

select high:max price, low:min price … by sym,time.minute from table

Diese Methode hat einen Nachteil: Der Satz berechneter Spalten ist vordefiniert. Glücklicherweise ist select in Q auch als Funktion implementiert, mit der Sie dynamisch erstellte Argumente ersetzen können:

?[table;whereClause;byClause;selectClause]

Ich werde das Format der Argumente nicht im Detail beschreiben; in unserem Fall sind nur by- und select-Ausdrücke nichttrivial und sollten Wörterbücher der Form columns!expressions sein. Somit kann die Schrumpfungsfunktion wie folgt definiert werden:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Der Übersichtlichkeit halber habe ich die Funktion „parse“ verwendet, die einen String mit einem Q-Ausdruck in einen Wert umwandelt, der an die Funktion „eval“ übergeben werden kann und der in der Funktion „select“ benötigt wird. Beachten Sie auch, dass der Vorprozess als Projektion (d. h. eine Funktion mit teilweise definierten Argumenten) der Auswahlfunktion definiert ist und ein Argument (die Tabelle) fehlt. Wenn wir eine Vorverarbeitung auf eine Tabelle anwenden, erhalten wir eine komprimierte Tabelle.

Die zweite Stufe ist die Aktualisierung der aggregierten Tabelle. Schreiben wir zunächst den Algorithmus in Pseudocode:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

In Q ist es üblich, Map/Reduction-Funktionen anstelle von Schleifen zu verwenden. Aber da Q eine Vektorsprache ist und wir alle Operationen problemlos auf alle Symbole gleichzeitig anwenden können, können wir in erster Näherung überhaupt auf eine Schleife verzichten und Operationen auf alle Symbole gleichzeitig ausführen:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Aber wir können noch weiter gehen: Q verfügt über einen einzigartigen und äußerst leistungsfähigen Operator – den verallgemeinerten Zuweisungsoperator. Es ermöglicht Ihnen, eine Reihe von Werten in einer komplexen Datenstruktur mithilfe einer Liste von Indizes, Funktionen und Argumenten zu ändern. In unserem Fall sieht es so aus:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Leider benötigen Sie für die Zuweisung zu einer Tabelle eine Liste von Zeilen, keine Spalten, und Sie müssen die Matrix (Liste von Spalten in Liste von Zeilen) mithilfe der Flip-Funktion transponieren. Bei einer großen Tabelle ist das teuer, daher wenden wir stattdessen eine verallgemeinerte Zuweisung auf jede Spalte separat an, indem wir die Map-Funktion verwenden (die wie ein Apostroph aussieht):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Wir verwenden erneut die Funktionsprojektion. Beachten Sie auch, dass in Q das Erstellen einer Liste ebenfalls eine Funktion ist und wir sie mit der Funktion every(map) aufrufen können, um eine Liste von Listen zu erhalten.

Um sicherzustellen, dass die Menge der berechneten Spalten nicht festgelegt ist, erstellen wir den obigen Ausdruck dynamisch. Definieren wir zunächst Funktionen zur Berechnung jeder Spalte, indem wir die Variablen row und inp verwenden, um auf die aggregierten Daten und die Eingabedaten zu verweisen:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Einige Spalten sind speziell; ihr erster Wert sollte nicht von der Funktion berechnet werden. Wir können anhand der Spalte row[`numTrades] feststellen, dass es der erste ist. Wenn sie 0 enthält, ist der Wert der erste. Q verfügt über eine Auswahlfunktion – ?[Boolesche Liste;Liste1;Liste2] – die abhängig von der Bedingung im ersten Argument einen Wert aus Liste 1 oder 2 auswählt:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Hier habe ich mit meiner Funktion eine verallgemeinerte Zuweisung aufgerufen (ein Ausdruck in geschweiften Klammern). Es erhält den aktuellen Wert (das erste Argument) und ein zusätzliches Argument, das ich im 4. Parameter übergebe.

Fügen wir Batterielautsprecher separat hinzu, da die Funktion für sie dieselbe ist:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Dies ist nach Q-Standards eine normale Zuweisung, aber ich weise gleichzeitig eine Liste von Werten zu. Zum Schluss erstellen wir die Hauptfunktion:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Mit diesem Ausdruck erstelle ich dynamisch eine Funktion aus einer Zeichenfolge, die den oben angegebenen Ausdruck enthält. Das Ergebnis wird so aussehen:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Die Reihenfolge der Spaltenauswertung ist umgekehrt, da in Q die Auswertungsreihenfolge von rechts nach links erfolgt.

Jetzt haben wir zwei Hauptfunktionen, die für Berechnungen notwendig sind, wir müssen nur noch ein wenig Infrastruktur hinzufügen und der Dienst ist fertig.

Letzte Schritte

Wir haben Vorverarbeitungs- und UpdateAgg-Funktionen, die die ganze Arbeit erledigen. Dennoch ist es notwendig, den korrekten Übergang durch die Minuten sicherzustellen und Indizes für die Aggregation zu berechnen. Definieren wir zunächst die Init-Funktion:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Wir werden auch die Rollfunktion definieren, die die aktuelle Minute ändert:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Wir benötigen eine Funktion zum Hinzufügen neuer Zeichen:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Und schließlich die Funktion upd (der traditionelle Name dieser Funktion für Q-Services), die vom Client aufgerufen wird, um Daten hinzuzufügen:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Das ist alles. Hier ist der vollständige Code unseres Dienstes, wie versprochen, nur ein paar Zeilen:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Testing

Lassen Sie uns die Leistung des Dienstes überprüfen. Dazu führen wir es in einem separaten Prozess aus (fügen Sie den Code in die Datei service.q ein) und rufen die Funktion init auf:

q service.q –p 5566

q)init[]

Starten Sie in einer anderen Konsole den zweiten Q-Prozess und stellen Sie eine Verbindung zum ersten her:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Lassen Sie uns zunächst eine Liste mit Symbolen erstellen – 10000 Stück – und eine Funktion hinzufügen, um eine Zufallstabelle zu erstellen. In der zweiten Konsole:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Ich habe der Liste drei echte Symbole hinzugefügt, um das Auffinden in der Tabelle zu erleichtern. Die rnd-Funktion erstellt eine Zufallstabelle mit n Zeilen, wobei die Zeit zwischen t und t+25 Millisekunden variiert.

Jetzt können Sie versuchen, Daten an den Dienst zu senden (fügen Sie die ersten zehn Stunden hinzu):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Sie können im Dienst überprüfen, ob die Tabelle aktualisiert wurde:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Ergebnis:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Führen wir nun einen Lasttest durch, um herauszufinden, wie viele Daten der Dienst pro Minute verarbeiten kann. Ich möchte Sie daran erinnern, dass wir das Aktualisierungsintervall auf 25 Millisekunden festgelegt haben. Dementsprechend muss der Dienst (durchschnittlich) in mindestens 20 Millisekunden pro Update passen, um Benutzern Zeit für die Datenanforderung zu geben. Geben Sie im zweiten Vorgang Folgendes ein:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 sind zwei Minuten. Sie können versuchen, zunächst alle 1000 Millisekunden 25 Zeilen auszuführen:

start 1000

In meinem Fall liegt das Ergebnis bei etwa ein paar Millisekunden pro Update. Deshalb erhöhe ich sofort die Anzahl der Zeilen auf 10.000:

start 10000

Ergebnis:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Auch hier nichts Besonderes, aber das sind 24 Millionen Zeilen pro Minute, 400 pro Sekunde. Für mehr als 25 Millisekunden verlangsamte sich das Update nur fünfmal, offenbar beim Minutenwechsel. Erhöhen wir auf 5:

start 100000

Ergebnis:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Wie Sie sehen, kommt der Dienst kaum zurecht, schafft es aber trotzdem, sich über Wasser zu halten. Ein solches Datenvolumen (240 Millionen Zeilen pro Minute) ist extrem groß; in solchen Fällen ist es üblich, mehrere Klone (oder sogar Dutzende von Klonen) des Dienstes zu starten, von denen jeder nur einen Teil der Zeichen verarbeitet. Dennoch ist das Ergebnis beeindruckend für eine interpretierte Sprache, die sich hauptsächlich auf die Datenspeicherung konzentriert.

Es stellt sich möglicherweise die Frage, warum die Zeit mit der Größe jedes Updates nichtlinear zunimmt. Der Grund dafür ist, dass die Shrink-Funktion tatsächlich eine C-Funktion ist, die viel effizienter ist als updateAgg. Ab einer bestimmten Update-Größe (ca. 10.000) erreicht updateAgg seine Obergrenze und dann hängt seine Ausführungszeit nicht mehr von der Update-Größe ab. Dank des Vorschritts Q ist der Dienst in der Lage, solche Datenmengen zu verarbeiten. Dies verdeutlicht, wie wichtig es ist, bei der Arbeit mit Big Data den richtigen Algorithmus auszuwählen. Ein weiterer Punkt ist die korrekte Speicherung der Daten im Speicher. Wenn die Daten nicht spaltenweise gespeichert oder nicht zeitlich geordnet wären, würden wir mit so etwas wie einem TLB-Cache-Miss vertraut werden – dem Fehlen einer Speicherseitenadresse im Prozessor-Adress-Cache. Die Suche nach einer Adresse dauert etwa 30-mal länger, wenn sie nicht erfolgreich ist, und wenn die Daten verstreut sind, kann dies den Dienst um ein Vielfaches verlangsamen.

Abschluss

In diesem Artikel habe ich gezeigt, dass die Datenbanken KDB+ und Q nicht nur zum Speichern großer Datenmengen und zum einfachen Zugriff darauf über Select geeignet sind, sondern auch zum Erstellen von Datenverarbeitungsdiensten, die selbst in der Lage sind, Hunderte Millionen Zeilen/Gigabyte an Daten zu verarbeiten ein einziger Q-Prozess. Die Q-Sprache selbst ermöglicht aufgrund ihrer Vektornatur, des integrierten SQL-Dialektinterpreters und einer sehr erfolgreichen Reihe von Bibliotheksfunktionen eine äußerst präzise und effiziente Implementierung von Algorithmen im Zusammenhang mit der Datenverarbeitung.

Ich möchte anmerken, dass das oben Genannte nur einen Teil dessen darstellt, was Q leisten kann, es verfügt aber auch über andere einzigartige Funktionen. Zum Beispiel ein extrem einfaches IPC-Protokoll, das die Grenzen zwischen einzelnen Q-Prozessen aufhebt und es Ihnen ermöglicht, Hunderte dieser Prozesse in einem einzigen Netzwerk zu kombinieren, das sich auf Dutzenden von Servern in verschiedenen Teilen der Welt befinden kann.

Source: habr.com

Kommentar hinzufügen