以实时服务为例说明 Q 和 KDB+ 语言的特性

你可以在我之前的文章中了解KDB+基础、Q编程语言是什么、它们的优点和缺点是什么 文章 并在简介中简要介绍。 在本文中,我们将在 Q 上实现一个服务,该服务将处理传入的数据流并以“实时”模式每分钟计算各种聚合函数(即,它将有时间在下一部分数据之前计算所有内容)。 Q 的主要特点是它是一种矢量语言,允许您操作的不是单个对象,而是它们的数组、数组的数组和其他复杂对象。 Q 及其亲戚 K、J、APL 等语言以其简洁而闻名。 通常,用熟悉的语言(如 Java)占用几屏代码的程序只需几行即可编写。 这就是我想在本文中展示的内容。

以实时服务为例说明 Q 和 KDB+ 语言的特性

介绍

KDB+ 是一个专注于大量数据的列式数据库,以特定方式(主要按时间)排序。 它主要用于金融机构——银行、投资基金、保险公司。 Q 语言是 KDB+ 的内部语言,可让您有效地处理这些数据。 Q 的理念是简洁和高效,但牺牲了清晰度。 这是合理的,因为矢量语言在任何情况下都很难理解,而录制的简洁性和丰富性使您可以在一个屏幕上看到程序的更大部分,这最终使其更容易理解。

在本文中,我们实现了一个成熟的 Q 程序,您可能想尝试一下。 为此,您需要 Q 本身。您可以在 kx 公司网站上下载免费的 32 位版本 - www.kx.com。 在那里,如果你有兴趣,你可以找到关于Q这本书的参考信息 Q 对于凡人 以及有关该主题的各种文章。

制定问题

有一个源每 25 毫秒发送一个包含数据的表。 由于 KDB+ 主要用于金融领域,我们假设这是一个交易表,其中包含以下列:时间(以毫秒为单位的时间)、sym(证券交易所的公司名称 - IBM, AAPL,...),价格(购买股票的价格),规模(交易规模)。 25毫秒的间隔是任意的,不能太小也不能太长。 它的存在意味着到达服务的数据已经缓冲。 在服务端实现缓冲很容易,包括根据当前负载进行动态缓冲,但为了简单起见,我们将重点关注固定间隔。

该服务必须每分钟计算 sym 列中每个传入符号的一组聚合函数 - 最大价格、平均价格、总和大小等。 有用的信息。 为了简单起见,我们假设所有函数都可以增量计算,即要获得新值,只需知道两个数字即可 - 旧值和传入值。 例如,函数 max、average、sum 具有此属性,但中值函数则没有。

我们还将假设传入的数据流是按时间排序的。 这将使我们有机会只在最后一刻工作。 实际上,能够使用当前和之前的分钟数就足够了,以防某些更新延迟。 为了简单起见,我们不会考虑这种情况。

聚合函数

下面列出了所需的聚合函数。 我尽可能多地使用它们来增加服务的负载:

  • high – 最高价格 – 每分钟的最高价格。
  • low – 最低价格 – 每分钟最低价格。
  • firstPrice – 第一个价格 – 每分钟的第一个价格。
  • lastPrice – 最后价格 – 每分钟的最后价格。
  • firstSize – 第一个尺寸 – 每分钟的第一个交易尺寸。
  • lastSize – 最后尺寸 – 一分钟内的最后交易尺寸。
  • numTrades – 计数 i – 每分钟的交易数量。
  • 交易量 – 总规模 – 每分钟交易规模总和。
  • pvolume – 总价格 – 每分钟价格总和,avgPrice 所需。
  • – 成交价格总和*大小 – 每分钟的交易总量。
  • avgPrice – pvolume%numTrades – 每分钟平均价格。
  • avgSize – 交易量%numTrades – 每分钟平均交易规模。
  • vwap – 成交量% – 按交易规模加权的每分钟平均价格。
  • cumVolume – 交易量总和 – 整个时间内交易的累积大小。

让我们立即讨论一个不明显的问题 - 如何第一次以及随后的每一分钟初始化这些列。 firstPrice 类型的某些列每次都必须初始化为 null;它们的值是未定义的。 其他卷类型必须始终设置为 0。还有一些列需要组合方法 - 例如,必须从前一分钟复制 cumVolume,并将第一个设置为 0。让我们使用字典数据设置所有这些参数类型(类似于记录):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

为了方便起见,我将 sym 和 time 添加到字典中,现在 initWith 是来自最终聚合表的现成行,它仍然用于设置正确的 sym 和时间。 您可以使用它向表中添加新行。

创建聚合函数时我们需要 aggCols。 由于 Q 中表达式的求值顺序(从右到左),该列表必须颠倒。 目标是确保计算从 high 到 cumVolume,因为某些列依赖于之前的列。

需要从前一分钟复制到新分钟的列,为了方便起见添加了 sym 列:

rollColumns:`sym`cumVolume;

现在让我们根据列的更新方式将它们分组。 可以区分三种类型:

  1. 累加器(交易量、营业额……)——我们必须将传入的值添加到前一个值上。
  2. 对于特殊点(高、低……)——分钟中的第一个值取自传入数据,其余值使用该函数计算。
  3. 休息。 始终使用函数进行计算。

让我们为这些类定义变量:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

计算顺序

我们将分两个阶段更新聚合表。 为了提高效率,我们首先缩小传入表,以便每个字符和分钟只有一行。 事实上,我们所有的函数都是增量和关联的,这保证了这个额外步骤的结果不会改变。 您可以使用 select 缩小表:

select high:max price, low:min price … by sym,time.minute from table

此方法有一个缺点 - 计算列集是预定义的。 幸运的是,在 Q 中,select 也被实现为一个函数,您可以在其中替换动态创建的参数:

?[table;whereClause;byClause;selectClause]

我不会详细描述参数的格式;在我们的例子中,只有 by 和 select 表达式才重要,它们应该是 columns!expressions 形式的字典。 因此,收缩函数可以定义如下:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

为了清楚起见,我使用了 parse 函数,它将带有 Q 表达式的字符串转换为可以传递给 eval 函数并且在函数 select 中需要的值。 另请注意,预处理被定义为 select 函数的投影(即具有部分定义参数的函数),缺少一个参数(表)。 如果我们对表应用预处理,我们将得到一个压缩表。

第二阶段是更新聚合表。 我们首先用伪代码编写算法:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

在 Q 中,通常使用 map/reduce 函数而不是循环。 但由于 Q 是一种向量语言,我们可以轻松地将所有操作同时应用于所有符号,因此对于第一个近似,我们可以完全不需要循环,立即对所有符号执行操作:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

但我们可以更进一步,Q 有一个独特且极其强大的运算符——广义赋值运算符。 它允许您使用索引、函数和参数列表来更改复杂数据结构中的一组值。 在我们的例子中,它看起来像这样:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

不幸的是,要分配给表,您需要行列表,而不是列,并且必须使用翻转函数转置矩阵(列列表到行列表)。 这对于大型表来说代价高昂,因此我们使用 map 函数(看起来像撇号)分别对每一列应用通用赋值:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

我们再次使用函数投影。 另请注意,在 Q 中,创建列表也是一个函数,我们可以使用each(map) 函数调用它来获取列表列表。

为了确保计算列的集合不固定,我们将动态创建上述表达式。 我们首先定义函数来计算每一列,使用 row 和 inp 变量来引用聚合数据和输入数据:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

有些列很特殊;它们的第一个值不应由函数计算。 我们可以通过 row[`numTrades] 列确定它是第一个 - 如果它包含 0,则该值是第一个。 Q 有一个选择函数 - ?[Boolean list;list1;list2] - 它根据第一个参数中的条件从列表 1 或 2 中选择一个值:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

在这里,我用我的函数(大括号中的表达式)调用了广义赋值。 它接收当前值(第一个参数)和一个附加参数,我将其传递到第四个参数中。

让我们单独添加电池扬声器,因为它们的功能是相同的:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

按照 Q 标准,这是正常分配,但我要立即分配一个值列表。 最后,我们来创建 main 函数:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

使用此表达式,我从包含上面给出的表达式的字符串动态创建一个函数。 结果将如下所示:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

列求值顺序是倒置的,因为在 Q 中求值顺序是从右到左。

现在我们有两个计算所需的主要功能,我们只需要添加一点基础设施,服务就准备好了。

最后步骤

我们有 preprocess 和 updateAgg 函数来完成所有工作。 但还是需要保证分钟级的正确过渡,并计算指标进行聚合。 首先我们定义一下init函数:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

我们还将定义 roll 函数,它将更改当前分钟:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

我们需要一个函数来添加新字符:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

最后是 upd 函数(Q 服务中该函数的传统名称),客户端调用该函数来添加数据:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

就这样。 这是我们服务的完整代码,正如所承诺的,只有几行:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

测试

让我们检查一下服务的性能。 为此,让我们在单独的进程中运行它(将代码放在 service.q 文件中)并调用 init 函数:

q service.q –p 5566

q)init[]

在另一个控制台中,启动第二个 Q 进程并连接到第一个:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

首先,我们创建一个符号列表 - 10000 个,并添加一个函数来创建随机表。 在第二个控制台中:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

我在列表中添加了三个真实符号,以便更轻松地在表中查找它们。 rnd 函数创建一个包含 n 行的随机表,其中时间从 t 到 t+25 毫秒变化。

现在您可以尝试向服务发送数据(添加前十个小时):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

您可以在服务中检查该表是否已更新:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

结果:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

现在让我们进行负载测试,看看服务每分钟可以处理多少数据。 让我提醒您,我们将更新间隔设置为 25 毫秒。 因此,该服务必须(平均)每次更新至少需要 20 毫秒,以便为用户提供请求数据的时间。 在第二个进程中输入以下内容:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800是两分钟。 您可以尝试先每 1000 毫秒运行 25 行:

start 1000

就我而言,每次更新的结果大约是几毫秒。 所以我会立即将行数增加到 10.000:

start 10000

结果:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

同样,没什么特别的,但这是每分钟 24 万行,每秒 400 万行。 在超过 25 毫秒的时间内,更新速度仅减慢了 5 倍,显然是在分钟发生变化时。 让我们增加到 100.000:

start 100000

结果:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

正如您所看到的,该服务几乎无法应对,但它仍然设法维持下去。 这样的数据量(每分钟 240 亿行)非常大;在这种情况下,通常会启动该服务的多个克隆(甚至数十个克隆),每个克隆仅处理部分字符。 尽管如此,对于主要关注数据存储的解释语言来说,结果还是令人印象深刻的。

可能会出现这样的问题:为什么时间随着每次更新的大小而非线性增长。 原因是shrink函数实际上是一个C函数,它比updateAgg效率高得多。 从某个更新大小(大约10.000)开始,updateAgg达到上限,然后其执行时间不依赖于更新大小。 正是由于初步步骤 Q,该服务才能够消化如此大量的数据。 这凸显了在处理大数据时选择正确的算法是多么重要。 还有一点就是数据在内存中的正确存储。 如果数据不是按列存储或者没有按时间排序,那么我们就会熟悉 TLB 缓存未命中之类的情况 - 处理器地址缓存中缺少内存页地址。 如果不成功,搜索地址大约需要花费30倍的时间,如果数据分散,可能会使服务速度减慢数倍。

结论

在本文中,我展示了 KDB+ 和 Q 数据库不仅适合存储大数据并通过 select 轻松访问数据,而且还适合创建能够消化数亿行/GB 数据的数据处理服务,即使在一个单一的 Q 过程。 由于其向量性质、内置的 SQL 方言解释器和一组非常成功的库函数,Q 语言本身可以极其简洁、高效地实现与数据处理相关的算法。

我要指出的是,上述只是 Q 功能的一部分,它还有其他独特的功能。 例如,一个极其简单的 IPC 协议,它消除了各个 Q 进程之间的边界,并允许您将数百个这样的进程组合到一个网络中,该网络可以位于世界不同地区的数十台服务器上。

来源: habr.com

添加评论