以即時服務為例說明 Q 和 KDB+ 語言的特性

你可以在我之前的文章中了解KDB+基礎、Q程式語言是什麼、它們的優點和缺點是什麼 文章 並在簡介中簡要介紹。 在本文中,我們將在Q 上實作一個服務,該服務將處理傳入的資料流並以「即時」模式每分鐘計算各種聚合函數(即,它將有時間在下一部分資料之前計算所有內容) 。 Q 的主要特點是它是一種向量語言,允許您操作的不是單個對象,而是它們的數組、數組的數組和其他複雜對象。 Q 及其親戚 K、J、APL 等語言以其簡潔而聞名。 通常,用熟悉的語言(如 Java)佔用幾屏程式碼的程式只需幾行即可編寫。 這就是我想在本文中展示的內容。

以即時服務為例說明 Q 和 KDB+ 語言的特性

介紹

KDB+ 是一個專注於大量資料的列式資料庫,以特定方式(主要按時間)排序。 它主要用於金融機構—銀行、投資基金、保險公司。 Q 語言是 KDB+ 的內部語言,可讓您有效地處理這些資料。 Q 的理念是簡潔和高效,但犧牲了清晰度。 這是合理的,因為向量語言在任何情況下都難以理解,而錄製的簡潔性和豐富性使您可以在一個螢幕上看到程式的更大部分,這最終使其更容易理解。

在本文中,我們在 Q 中實現了一個成熟的程序,您可能想嘗試一下。 為此,您需要實際的 Q。您可以在 kx 公司網站上下載免費的 32 位元版本 – www.kx.com。 在那裡,如果你有興趣,你可以找到關於Q這本書的參考信息 Q 對於凡人 以及有關該主題的各種文章。

制定問題

有一個來源每 25 毫秒發送一個包含資料的表。 由於 KDB+ 主要用於金融領域,我們假設這是一個交易表,其中包含以下列:時間(以毫秒為單位的時間)、sym(證券交易所的公司名稱 - IBM, AAPL,...),價格(購買股票的價格),規模(交易規模)。 25毫秒的間隔是任意的,不能太小也不能太長。 它的存在意味著到達服務的資料已經緩衝。 在服務端實現緩衝很容易,包括根據當前負載進行動態緩衝,但為了簡單起見,我們將專注於固定間隔。

此服務必須每分鐘計算 sym 列中每個傳入符號的一組聚合函數 - 最大價格、平均價格、總和大小等。 有用的信息。 為了簡單起見,我們假設所有函數都可以增量計算,即要獲得新值,只需知道兩個數字即可 - 舊值和傳入值。 例如,函數 max、average、sum 具有此屬性,但中位數函數則沒有。

我們也將假設傳入的資料流是按時間排序的。 這將使我們有機會只在最後一刻工作。 實際上,能夠使用當前和之前的分鐘數就足夠了,以防某些更新延遲。 為了簡單起見,我們不會考慮這種情況。

聚合函數

下面列出了所需的聚合函數。 我盡可能多地使用它們來增加服務的負載:

  • high – 最高價 – 每分鐘的最高價。
  • low – 最低價格 – 每分鐘最低價。
  • firstPrice – 第一個價格 – 每分鐘的第一個價格。
  • lastPrice – 最後價格 – 每分鐘的最後價格。
  • firstSize – 第一尺寸 – 每分鐘的第一個交易尺寸。
  • lastSize – 最後尺寸 – 一分鐘內的最後交易尺寸。
  • numTrades – 計數 i – 每分鐘的交易數量。
  • 成交量 – 總規模 – 每分鐘交易規模總和。
  • pvolume – 總價格 – 每分鐘價格總和,avgPrice 所需。
  • – 成交價格總和*大小 – 每分鐘的交易總量。
  • avgPrice – pvolume%numTrades – 每分鐘平均價格。
  • avgSize – 交易量%numTrades – 每分鐘平均交易規模。
  • vwap – 成交量% – 以交易規模加權的每分鐘平均價格。
  • cumVolume – 交易量總和 – 整個時間內交易的累積大小。

讓我們立即討論一個不明顯的問題 - 如何第一次以及隨後的每一分鐘初始化這些列。 firstPrice 類型的某些欄位每次都必須初始化為 null;它們的值是未定義的。 其他磁碟區類型必須始終設為 0。還有一些列需要組合方法 - 例如,必須從前一分鐘複製 cumVolume,並將第一個設為 0。讓我們使用字典資料設定所有這些參數類型(類似於記錄) :

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

為了方便起見,我將 sym 和 time 添加到字典中,現在 initWith 是來自最終聚合表的現成行,它仍然用於設定正確的 sym 和時間。 您可以使用它向表中新增一行。

在建立聚合函數時我們需要 aggCols。 由於 Q 中表達式的求值順序(從右到左),該列表必須顛倒。 目標是確保計算從 high 到 cumVolume,因為某些列依賴先前的列。

需要從前一分鐘複製到新分鐘的列,為了方便起見添加了 sym 列:

rollColumns:`sym`cumVolume;

現在讓我們根據列的更新方式將它們分組。 可以區分三種類型:

  1. 累加器(交易量、營業額…)-我們必須將傳入的值加到前一個值上。
  2. 對於特殊點(高、低…)-分鐘中的第一個值取自傳入數據,其餘值使用此函數計算。
  3. 休息。 始終使用函數進行計算。

讓我們為這些類別定義變數:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

計算順序

我們將分兩個階段更新聚合表。 為了提高效率,我們首先縮小傳入表,以便每個字元和分鐘只有一行。 事實上,我們所有的函數都是增量和關聯的,這保證了這個額外步驟的結果不會改變。 您可以使用 select 縮小表:

select high:max price, low:min price … by sym,time.minute from table

此方法有一個缺點 - 計算列集是預先定義的。 幸運的是,在 Q 中,select 也被實作為一個函數,您可以在其中替換動態建立的參數:

?[table;whereClause;byClause;selectClause]

我不會詳細描述參數的格式;在我們的例子中,只有 by 和 select 表達式才重要,它們應該是 columns!expressions 形式的字典。 因此,收縮函數可以定義如下:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

為了清楚起見,我使用了 parse 函數,它將帶有 Q 表達式的字串轉換為可以傳遞給 eval 函數並且在函數 select 中需要的值。 另請注意,預處理被定義為 select 函數的投影(即具有部分定義參數的函數),缺少一個參數(表)。 如果我們對錶應用預處理,我們將得到一個壓縮表。

第二階段是更新聚合表。 我們先用偽代碼寫演算法:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

在 Q 中,通常使用 map/reduce 函數而不是循環。 但由於 Q 是一種向量語言,我們可以輕鬆地將所有操作同時應用於所有符號,因此對於第一個近似,我們可以完全不需要循環,立即對所有符號執行操作:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

但我們可以更進一步,Q 有一個獨特且極其強大的運算子——廣義賦值運算子。 它允許您使用索引、函數和參數列表來更改複雜資料結構中的一組值。 在我們的例子中,它看起來像這樣:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

不幸的是,要指派給表,您需要行列表,而不是列,並且必須使用翻轉函數轉置矩陣(列列表到行列表)。 這對於大型表來說代價高昂,因此我們使用 map 函數(看起來像撇號)分別對每一列應用通用賦值:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

我們再次使用函數投影。 另請注意,在 Q 中,建立列表也是一個函數,我們可以使用each(map) 函數呼叫它來取得列表列表。

為了確保計算列的集合不固定,我們將動態建立上述表達式。 我們首先定義函數來計算每一列,使用 row 和 inp 變數來引用聚合資料和輸入資料:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

有些列很特殊;它們的第一個值不應由函數計算。 我們可以透過 row[`numTrades] 欄位來確定它是第一個 - 如果它包含 0,則該值是第一個。 Q 有一個選擇函數 - ?[Boolean list;list1;list2] - 它根據第一個參數中的條件從列表 1 或 2 中選擇一個值:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

在這裡,我用我的函數(大括號中的表達式)呼叫了廣義賦值。 它接收當前值(第一個參數)和一個附加參數,我將其傳遞到第四個參數。

讓我們單獨添加電池揚聲器,因為它們的功能是相同的:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

按照 Q 標準,這是正常分配,但我要立即分配一個值清單。 最後,我們來建立 main 函數:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

使用此表達式,我從包含上面給出的表達式的字串動態建立一個函數。 結果將如下所示:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

列求值順序是倒置的,因為在 Q 中求值順序是從右到左。

現在我們有了計算所需的兩個主要功能,我們只需要添加一點基礎設施,服務就準備好了。

最後步驟

我們有 preprocess 和 updateAgg 函數來完成所有工作。 但還是需要確保分鐘級的正確過渡,並計算指標進行聚合。 首先,我們來定義一下init函數:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

我們還將定義 roll 函數,它將更改當前分鐘:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

我們需要一個函數來新增字元:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

最後是 upd 函數(Q 服務中函數的傳統名稱),客戶端呼叫該函數來新增資料:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

就這樣。 這是我們服務的完整程式碼,正如所承諾的,只有幾行:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

測試

讓我們檢查一下服務的效能。 為此,讓我們在單獨的進程中運行它(將程式碼放在 service.q 檔案中)並呼叫 init 函數:

q service.q –p 5566

q)init[]

在另一個控制台中,啟動第二個 Q 進程並連接到第一個:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

首先,我們建立一個符號清單 - 10000 個,並新增一個函數來建立隨機表。 在第二個控制台中:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

我在清單中新增了三個真實符號,以便更輕鬆地在表中找到它們。 rnd 函數建立一個包含 n 行的隨機表,其中時間從 t 到 t+25 毫秒變化。

現在您可以嘗試向服務發送資料(新增前十個小時):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

您可以在服務中檢查該表是否已更新:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

其結果是:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

現在讓我們進行負載測試,看看服務每分鐘可以處理多少資料。 讓我提醒您,我們將更新間隔設定為 25 毫秒。 因此,該服務必須(平均)每次更新至少需要 20 毫秒,以便為使用者提供請求資料的時間。 在第二個進程中輸入以下內容:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800是兩分鐘。 您可以嘗試先每 1000 毫秒運行 25 行:

start 1000

就我而言,每次更新的結果大約是幾毫秒。 所以我會立即將行數增加到 10.000:

start 10000

其結果是:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

同樣,沒什麼特別的,但這是每分鐘 24 萬行,每秒 400 萬行。 在超過 25 毫秒的時間內,更新速度僅減慢了 5 倍,顯然是在分鐘發生變化時。 讓我們增加到 100.000:

start 100000

其結果是:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

正如您所看到的,該服務幾乎無法應對,但它仍然設法維持下去。 這樣的資料量(每分鐘 240 億行)非常大;在這種情況下,通常會啟動該服務的多個克隆(甚至數十個克隆),每個克隆僅處理部分字元。 儘管如此,對於主要關注資料儲存的解釋語言來說,結果還是令人印象深刻的。

可能會出現這樣的問題:為什麼時間會隨著每次更新的大小而非線性成長。 原因是shrink函數其實是C函數,它比updateAgg效率高得多。 從某個更新大小(約10.000)開始,updateAgg達到上限,然後其執行時間不依賴更新大小。 正是由於初步步驟 Q,該服務才能夠消化如此大量的數據。 這凸顯了在處理大數據時選擇正確的演算法是多麼重要。 還有一點就是資料在記憶體中的正確儲存。 如果資料不是按列儲存或不是按時間排序,那麼我們就會熟悉 TLB 快取未命中之類的情況 - 處理器位址快取中缺少記憶體頁位址。 如果不成功,搜尋位址大約需要花費30倍的時間,如果資料分散,可能會使服務速度減慢數倍。

結論

在本文中,我展示了 KDB+ 和 Q 資料庫不僅適合存儲大數據並透過 select 輕鬆存取數據,而且還適合創建能夠消化數億行/GB 數據的數據處理服務,即使在單一的 Q 過程中。 由於其向量性質、內建的 SQL 方言解釋器和一組非常成功的函式庫函數,Q 語言本身可以極其簡潔、有效率地實現與資料處理相關的演算法。

我要指出的是,上述只是 Q 功能的一部分,它還有其他獨特的功能。 例如,一個極其簡單的 IPC 協議,它消除了各個 Q 進程之間的邊界,並允許您將數百個這樣的進程組合到一個網路中,該網路可以位於世界不同地區的數十台伺服器上。

來源: www.habr.com

添加評論