リアルタイム サービスの例を使用した Q および KDB+ 言語の機能

KDB+ ベース、Q プログラミング言語とは何か、それらの長所と短所については、以前の記事で読むことができます。 статье そして導入部で簡単に。 この記事では、受信データ ストリームを処理し、さまざまな集計関数を「リアルタイム」モードで毎分計算するサービスを Q に実装します (つまり、データの次の部分の前にすべてを計算する時間があります)。 Q の主な特徴は、単一のオブジェクトではなく、その配列、配列の配列、その他の複雑なオブジェクトを操作できるベクトル言語であることです。 Q やその関連言語である K、J、APL などの言語は、その簡潔さで有名です。 多くの場合、Java などの使い慣れた言語では、数画面のコードを必要とするプログラムを数行で記述することができます。 これがこの記事で説明したいことです。

リアルタイム サービスの例を使用した Q および KDB+ 言語の機能

導入

KDB+ は、特定の方法 (主に時間順) で順序付けされた、非常に大量のデータに焦点を当てた列型データベースです。 主に銀行、投資ファンド、保険会社などの金融機関で使用されます。 Q 言語は、このデータを効果的に操作できるようにする KDB+ の内部言語です。 Q のイデオロギーは簡潔さと効率性を重視する一方で、明瞭さは犠牲にされています。 これは、どのような場合でもベクトル言語を理解するのが難しいという事実によって正当化され、記録の簡潔さと豊富さにより、プログラムの大部分を XNUMX つの画面上に表示できるため、最終的には理解しやすくなります。

この記事では、Q で本格的なプログラムを実装します。ぜひ試してみてください。 これを行うには、実際の Q が必要になります。kx 社の Web サイトから無料の 32 ビット バージョンをダウンロードできます。 www.kx.com。 ご興味がございましたら、Q に関する参考情報をご覧ください。 Qフォーモータルズ およびこのトピックに関するさまざまな記事。

問題の定式化

25 ミリ秒ごとにデータを含むテーブルを送信するソースがあります。 KDB+ は主に金融で使用されるため、これは次の列を持つトランザクション (取引) テーブルであると仮定します: time (ミリ秒単位の時間)、sym (証券取引所での会社名 - IBM, AAPL、…)、価格 (株式が購入された価格)、サイズ (取引の規模)。 25 ミリ秒の間隔は任意であり、小さすぎず、長すぎません。 この存在は、データがすでにバッファリングされてサービスに送信されることを意味します。 現在の負荷に応じた動的バッファリングなど、サービス側でバッファリングを実装するのは簡単ですが、簡単にするために、固定間隔に焦点を当てます。

このサービスは、最大価格、平均価格、合計サイズなどの一連の集計関数を使用して、sym 列から受信する各シンボルを毎分カウントする必要があります。 有用な情報。 簡単にするために、すべての関数が増分的に計算できると仮定します。 新しい値を取得するには、古い値と新しい値という XNUMX つの数値を知っていれば十分です。 たとえば、関数 max、average、sum にはこの特性がありますが、median 関数にはありません。

また、受信データ ストリームは時間順であると仮定します。 これにより、最後の瞬間だけで作業する機会が得られます。 実際には、一部の更新が遅れた場合に備えて、現在および過去の分を操作できれば十分です。 簡単にするために、このケースは考慮しません。

集計関数

必要な集計関数を以下に示します。 サービスの負荷を増やすために、可能な限り多くのそれらを採用しました。

  • 高 – 最大価格 – XNUMX 分あたりの最大価格。
  • low – 最低価格 – XNUMX分あたりの最低価格。
  • firstPrice – 最初の価格 – XNUMX 分あたりの最初の価格。
  • lastPrice – 最後の価格 – XNUMX 分あたりの最後の価格。
  • firstSize – 最初のサイズ – XNUMX 分あたりの最初の取引サイズ。
  • lastSize – 最後のサイズ – XNUMX分間の最後の取引サイズ。
  • numTrades – count i – XNUMX 分あたりの取引数。
  • ボリューム – 合計サイズ – XNUMX 分あたりの取引サイズの合計。
  • pvolume – 合計価格 – avgPrice に必要な XNUMX 分あたりの価格の合計。
  • – 総売上高*サイズ – XNUMX 分あたりの合計トランザクション量。
  • avgPrice – pvolume%numTrades – XNUMX 分あたりの平均価格。
  • avgSize – volume%numTrades – XNUMX 分あたりの平均取引サイズ。
  • vwap – 売上高%volume – 取引サイズで重み付けされた XNUMX 分あたりの平均価格。
  • CumVolume – 合計ボリューム – 全期間にわたるトランザクションの累積サイズ。

すぐに、最初とその後の各分でこれらの列を初期化する方法という、明らかではない点について説明しましょう。 firstPrice タイプの一部の列は毎回 null に初期化する必要があり、その値は未定義です。 他のボリューム タイプは常に 0 に設定する必要があります。また、組み合わせたアプローチが必要な列もあります。たとえば、cumVolume は前の分からコピーする必要があり、最初のボリュームは 0 に設定する必要があります。辞書データを使用して、これらすべてのパラメータを設定しましょう。タイプ (レコードに似ています):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

便宜上、sym と time をディクショナリに追加しましたが、initWith は最終的な集計テーブルからの既成の行となり、正しい sym と time を設定するために残ります。 これを使用して、テーブルに新しい行を追加できます。

集計関数を作成するときは、aggCols が必要になります。 Q の式が評価される順序 (右から左) により、リストは反転する必要があります。 一部の列は前の列に依存するため、目標は、計算が high からcumVolume まで確実に行われるようにすることです。

前の分から新しい分にコピーする必要がある列。便宜上、sym 列が追加されています。

rollColumns:`sym`cumVolume;

次に、更新方法に応じて列をグループに分割しましょう。 次の XNUMX つのタイプを区別できます。

  1. アキュムレータ (出来高、売上高など) – 受信した値を前の値に加算する必要があります。
  2. 特別なポイント (高値、低値、..) を使用すると、その分の最初の値が受信データから取得され、残りは関数を使用して計算されます。
  3. 休む。 常に関数を使用して計算されます。

これらのクラスの変数を定義しましょう。

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

計算順序

集計テーブルは XNUMX 段階で更新します。 効率性を高めるために、まず受信テーブルを縮小して、各文字と分に XNUMX 行のみが存在するようにします。 すべての関数が増分的かつ結合的であるという事実により、この追加ステップの結果が変わらないことが保証されます。 select を使用してテーブルを縮小できます。

select high:max price, low:min price … by sym,time.minute from table

この方法には欠点があります。計算列のセットが事前定義されているということです。 幸いなことに、Q では、動的に作成された引数を置き換えることができる関数としても select が実装されています。

?[table;whereClause;byClause;selectClause]

引数の形式については詳しく説明しません。この場合、by 式と select 式のみが非自明であり、それらは columns!expressions 形式の辞書である必要があります。 したがって、縮小関数は次のように定義できます。

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

わかりやすくするために、parse 関数を使用しました。この関数は、Q 式を含む文字列を、eval 関数に渡すことができ、関数 select で必要な値に変換します。 また、プリプロセスは select 関数の射影 (つまり、部分的に定義された引数を持つ関数) として定義されており、XNUMX つの引数 (テーブル) が欠落していることにも注意してください。 テーブルに前処理を適用すると、圧縮されたテーブルが得られます。

第 XNUMX 段階では、集計テーブルを更新します。 まずアルゴリズムを擬似コードで書きましょう:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q では、ループの代わりにマップ/リデュース関数を使用するのが一般的です。 しかし、Q はベクトル言語であり、すべての演算をすべてのシンボルに一度に簡単に適用できるため、ループをまったく使用せずに最初の近似を行うことができ、すべてのシンボルに対して一度に演算を実行できます。

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

しかし、さらに進むことができます。Q には、一般化された代入演算子という、ユニークで非常に強力な演算子があります。 インデックス、関数、引数のリストを使用して、複雑なデータ構造内の値のセットを変更できます。 私たちの場合は次のようになります。

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

残念ながら、テーブルに代入するには列ではなく行のリストが必要で、flip 関数を使用して行列 (列のリストから行のリスト) を転置する必要があります。 これは大きなテーブルではコストがかかるため、代わりに、map 関数 (アポストロフィのように見えます) を使用して、一般化された割り当てを各列に個別に適用します。

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

ここでも関数射影を使用します。 また、Q ではリストの作成も関数であり、 each(map) 関数を使用してそれを呼び出してリストのリストを取得できることにも注意してください。

計算列のセットが固定されないように、上記の式を動的に作成します。 まず、row 変数と inp 変数を使用して集計データと入力データを参照し、各列を計算する関数を定義しましょう。

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

一部の列は特殊なので、その最初の値は関数で計算すべきではありません。 row[`numTrades] 列によってそれが最初であることを判断できます。列に 0 が含まれている場合、その値は最初です。 Q には選択関数 - ?[Boolean list;list1;list2] - があり、最初の引数の条件に応じてリスト 1 または 2 から値を選択します。

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

ここでは、関数 (中括弧で囲まれた式) を使用して一般化された代入を呼び出しました。 現在の値 (最初の引数) と追加の引数を受け取り、それを 4 番目のパラメーターに渡します。

機能は同じなので、バッテリー スピーカーを個別に追加しましょう。

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

これはQ標準による通常の代入ですが、値のリストを一度に代入しています。 最後に、main 関数を作成しましょう。

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

この式を使用して、上で指定した式を含む文字列から関数を動的に作成します。 結果は次のようになります。

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Q では評価順序が右から左であるため、列の評価順序は逆になります。

これで、計算に必要な XNUMX つの主要な関数が完成しました。少しインフラストラクチャを追加するだけで、サービスの準備が整いました。

最終ステップ

すべての作業を行う preprocess 関数と updateAgg 関数があります。 ただし、分単位での正しい移行を確認し、集計用のインデックスを計算する必要があります。 まず最初に、init 関数を定義しましょう。

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

現在の分を変更する roll 関数も定義します。

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

新しい文字を追加する関数が必要になります。

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

最後に、upd 関数 (Q サービスのこの関数の伝統的な名前) は、データを追加するためにクライアントによって呼び出されます。

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

それだけです。 これは、約束どおり、ほんの数行のサービスの完全なコードです。

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

テスト

サービスのパフォーマンスを確認してみましょう。 これを行うには、別のプロセスで実行し (service.q ファイルにコードを置き)、init 関数を呼び出します。

q service.q –p 5566

q)init[]

別のコンソールで XNUMX 番目の Q プロセスを開始し、最初の Q プロセスに接続します。

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

まず、シンボルのリスト (10000 個) を作成し、ランダム テーブルを作成する関数を追加しましょう。 XNUMX 番目のコンソールで:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

表内で見つけやすくするために、リストに 25 つの実際の記号を追加しました。 rnd 関数は、時間が t から t+XNUMX ミリ秒まで変化する n 行のランダム テーブルを作成します。

ここで、サービスにデータを送信してみます (最初の XNUMX 時間を追加します)。

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

サービスでテーブルが更新されたことを確認できます。

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

結果:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

次に、負荷テストを実行して、サービスが 25 分あたりに処理できるデータ量を調べてみましょう。 更新間隔を 20 ミリ秒に設定していることを思い出してください。 したがって、ユーザーにデータを要求する時間を与えるために、サービスは (平均して) 更新ごとに少なくとも XNUMX ミリ秒に収まる必要があります。 XNUMX 番目のプロセスでは次のように入力します。

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800は1000分です。 まずは 25 ミリ秒ごとに XNUMX 行実行してみてください。

start 1000

私の場合、結果は更新ごとに数ミリ秒程度です。 そこで、すぐに行数を 10.000 に増やします。

start 10000

結果:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

これも特別なことではありませんが、これは 24 分あたり 400 万行、25 秒あたり 5 万行になります。 100.000 ミリ秒以上の間、更新が遅くなったのは XNUMX 回だけで、おそらく分が変わったときでした。 XNUMX まで増やしてみましょう:

start 100000

結果:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

ご覧のとおり、このサービスはほとんど対応できませんが、それでもなんとか存続しています。 このようなデータ量 (240 分あたり XNUMX 億 XNUMX 万行) は非常に大きいため、そのような場合は、サービスの複数のクローン (場合によっては数十のクローン) を起動し、それぞれが文字の一部のみを処理することが一般的です。 それでも、主にデータストレージに焦点を当てたインタプリタ型言語としては、その結果は印象的です。

各更新のサイズに応じてなぜ時間が非線形的に増加するのかという疑問が生じるかもしれません。 その理由は、shrink 関数は実際には C 関数であり、updateAgg よりもはるかに効率的であるためです。 特定の更新サイズ (約 10.000) から開始すると、updateAgg は上限に達し、その後、実行時間は更新サイズに依存しなくなります。 サービスがそのような量のデータを消化できるのは、予備ステップ Q のおかげです。 これは、ビッグデータを扱うときに適切なアルゴリズムを選択することがいかに重要であるかを強調しています。 もう 30 つのポイントは、データをメモリに正しく保存することです。 データが列順に格納されていないか、時間順に並べられていない場合、TLB キャッシュ ミス、つまりプロセッサ アドレス キャッシュにメモリ ページ アドレスが存在しないことがよく知られるようになります。 住所の検索に失敗すると約 XNUMX 倍の時間がかかり、データが分散している場合はサービスが数倍遅くなる可能性があります。

まとめ

この記事では、KDB+ および Q データベースが、大規模なデータを保存し、select を通じて簡単にアクセスするだけでなく、数億行/ギガバイトのデータを消化できるデータ処理サービスの作成にも適していることを示しました。単一の Q プロセス。 Q 言語自体は、そのベクトルの性質、組み込みの SQL 方言インタープリター、および非常に優れた一連のライブラリ関数により、データ処理に関連するアルゴリズムの非常に簡潔かつ効率的な実装を可能にします。

上記は Q ができることの一部にすぎず、他にも独自の機能があることに注意してください。 たとえば、非常にシンプルな IPC プロトコルは、個々の Q プロセス間の境界を消去し、これらのプロセスを数百もの単一ネットワークに結合して、世界各地にある数十のサーバーに配置できるようにします。

出所: habr.com

コメントを追加します