Características da linguaxe Q e KDB+ usando o exemplo dun servizo en tempo real

Podes ler sobre a base de KDB+, a linguaxe de programación Q, cales son os seus puntos fortes e débiles no meu anterior Artigo e brevemente na introdución. No artigo, implementaremos un servizo en Q que procesará o fluxo de datos entrantes e calculará varias funcións de agregación cada minuto en modo "en tempo real" (é dicir, terá tempo para calcular todo antes da seguinte parte de datos). A principal característica de Q é que é unha linguaxe vectorial que permite operar non con obxectos únicos, senón coas súas matrices, matrices de matrices e outros obxectos complexos. Linguas como Q e os seus parentes K, J, APL son famosas pola súa brevidade. Moitas veces, un programa que ocupa varias pantallas de código nunha linguaxe familiar como Java pódese escribir nelas en poucas liñas. Isto é o que quero demostrar neste artigo.

Características da linguaxe Q e KDB+ usando o exemplo dun servizo en tempo real

Introdución

KDB+ é unha base de datos en columnas centrada en cantidades moi grandes de datos, ordenados dun xeito específico (principalmente por tempo). Utilízase principalmente en institucións financeiras: bancos, fondos de investimento, compañías de seguros. A linguaxe Q é a linguaxe interna de KDB+ que che permite traballar eficazmente con estes datos. A ideoloxía Q é a brevidade e a eficiencia, mentres que a claridade é sacrificada. Isto xustifícase polo feito de que a linguaxe vectorial será difícil de entender en calquera caso, e a brevidade e riqueza da gravación permítelle ver unha parte moito máis grande do programa nunha soa pantalla, o que finalmente facilita a súa comprensión.

Neste artigo implementamos un programa completo en Q e quizais queiras probalo. Para iso, necesitará a Q real. Podes descargar a versión gratuíta de 32 bits no sitio web da empresa kx: www.kx.com. Alí, se che interesa, atoparás información de referencia sobre Q, o libro Q Para Mortais e diversos artigos sobre este tema.

Declaración de problemas

Hai unha fonte que envía unha táboa con datos cada 25 milisegundos. Dado que KDB+ úsase principalmente en finanzas, asumiremos que esta é unha táboa de transaccións (operacións), que ten as seguintes columnas: tempo (tempo en milisegundos), sym (denominación da empresa na bolsa - IBM, AAPL,…), prezo (o prezo ao que se adquiriron as accións), tamaño (tamaño da transacción). O intervalo de 25 milisegundos é arbitrario, nin demasiado pequeno nin demasiado longo. A súa presenza significa que os datos chegan ao servizo xa almacenados en búfer. Sería doado implementar o búfer no lado do servizo, incluíndo o búfer dinámico dependendo da carga actual, pero para simplificar, centrarémonos nun intervalo fixo.

O servizo debe contar cada minuto para cada símbolo entrante da columna sym un conxunto de funcións de agregación: prezo máximo, prezo medio, tamaño da suma, etc. información útil. Para simplificar, asumiremos que todas as funcións poden calcularse de forma incremental, é dicir. para obter un novo valor, abonda con coñecer dous números: o antigo e os valores entrantes. Por exemplo, as funcións max, average, sum teñen esta propiedade, pero a función mediana non.

Tamén asumiremos que o fluxo de datos entrante está ordenado no tempo. Isto daranos a oportunidade de traballar só co último minuto. Na práctica, abonda con poder traballar cos minutos actuais e anteriores no caso de que algunha actualización tarde. Por simplicidade, non consideraremos este caso.

Funcións de agregación

As funcións de agregación necesarias están listadas a continuación. Tomei o maior número posible deles para aumentar a carga do servizo:

  • alto – prezo máximo – prezo máximo por minuto.
  • baixo – prezo mínimo – prezo mínimo por minuto.
  • firstPrice - primeiro prezo - primeiro prezo por minuto.
  • lastPrice - último prezo - último prezo por minuto.
  • firstSize - primeiro tamaño - primeiro tamaño comercial por minuto.
  • lastSize - último tamaño - último tamaño de intercambio nun minuto.
  • numTrades – count i – número de operacións por minuto.
  • volume - tamaño da suma - suma dos tamaños comerciais por minuto.
  • pvolume – prezo suma – suma de prezos por minuto, necesario para avgPrice.
  • - prezo total de facturación * tamaño - volume total de transaccións por minuto.
  • avgPrice - pvolume%numTrades - prezo medio por minuto.
  • avgSize – volume%numTrades – tamaño medio de comercio por minuto.
  • vwap - volume de negocio% - prezo medio por minuto ponderado polo tamaño da transacción.
  • cumVolume - volume suma - tamaño acumulado das transaccións durante todo o tempo.

Imos discutir inmediatamente un punto non obvio: como inicializar estas columnas por primeira vez e para cada minuto posterior. Algunhas columnas do tipo firstPrice deben inicializarse como nulas cada vez; o seu valor non está definido. Outros tipos de volume deben estar sempre configurados en 0. Tamén hai columnas que requiren un enfoque combinado; por exemplo, cumVolume debe ser copiado do minuto anterior e para o primeiro definido en 0. Axustemos todos estes parámetros usando os datos do dicionario. tipo (análogo a un rexistro):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Engadín sym e time ao dicionario por comodidade, agora initWith é unha liña preparada da táboa agregada final, onde queda establecer o sym e o tempo correctos. Podes usalo para engadir novas filas a unha táboa.

Necesitaremos aggCols ao crear unha función de agregación. A lista debe ser invertida debido á orde na que se avalían as expresións en Q (de dereita a esquerda). O obxectivo é asegurar que o cálculo vai de alto a cumVolume, xa que algunhas columnas dependen das anteriores.

As columnas que deben copiarse nun minuto novo do anterior, engádese a columna sym para comodidade:

rollColumns:`sym`cumVolume;

Agora imos dividir as columnas en grupos segundo como se deben actualizar. Pódense distinguir tres tipos:

  1. Acumuladores (volume, facturación,..) – debemos engadir o valor de entrada ao anterior.
  2. Cun punto especial (alto, baixo, ..): o primeiro valor do minuto tómase dos datos entrantes, o resto calcúlase mediante a función.
  3. Descansa. Calcúlase sempre mediante unha función.

Definamos variables para estas clases:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Orde de cálculo

Actualizaremos a táboa agregada en dúas etapas. Para a eficacia, primeiro reducimos a táboa de entrada para que só haxa unha fila para cada carácter e minuto. O feito de que todas as nosas funcións sexan incrementais e asociativas garante que o resultado deste paso adicional non cambiará. Podes reducir a táboa usando select:

select high:max price, low:min price … by sym,time.minute from table

Este método ten unha desvantaxe: o conxunto de columnas calculadas está predefinido. Afortunadamente, en Q, select tamén se implementa como unha función onde podes substituír argumentos creados dinámicamente:

?[table;whereClause;byClause;selectClause]

Non describirei en detalle o formato dos argumentos; no noso caso, só as expresións by e select non serán triviais e deberían ser dicionarios da forma columnas!expresións. Así, a función de redución pódese definir do seguinte xeito:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Para claridade, usei a función de análise, que converte unha cadea cunha expresión Q nun valor que se pode pasar á función eval e que se require na función select. Teña en conta tamén que o preproceso defínese como unha proxección (é dicir, unha función con argumentos parcialmente definidos) da función de selección, falta un argumento (a táboa). Se aplicamos un preproceso a unha táboa, obteremos unha táboa comprimida.

A segunda etapa é a actualización da táboa agregada. Escribamos primeiro o algoritmo en pseudocódigo:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

En Q, é común usar funcións de mapa/reducir en lugar de bucles. Pero dado que Q é unha linguaxe vectorial e podemos aplicar facilmente todas as operacións a todos os símbolos á vez, a unha primeira aproximación podemos prescindir dun bucle, realizando operacións sobre todos os símbolos á vez:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Pero podemos ir máis aló, Q ten un operador único e extremadamente poderoso: o operador de asignación xeneralizada. Permítelle cambiar un conxunto de valores nunha estrutura de datos complexa mediante unha lista de índices, funcións e argumentos. No noso caso ten o seguinte aspecto:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Desafortunadamente, para asignarlle a unha táboa necesitas unha lista de filas, non columnas, e tes que transpoñer a matriz (lista de columnas a lista de filas) usando a función flip. Isto é caro para unha táboa grande, polo que aplicamos unha asignación xeneralizada a cada columna por separado, usando a función de mapa (que semella un apóstrofo):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Usamos de novo a función de proxección. Teña en conta tamén que en Q, crear unha lista tamén é unha función e podemos chamala usando a función each(map) para obter unha lista de listas.

Para asegurarnos de que o conxunto de columnas calculadas non está fixado, crearemos a expresión anterior de forma dinámica. Imos primeiro definir funcións para calcular cada columna, usando as variables de fila e inp para facer referencia aos datos agregados e de entrada:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Algunhas columnas son especiais; o seu primeiro valor non debe ser calculado pola función. Podemos determinar que é o primeiro pola fila [`numTrades] columna; se contén 0, entón o valor é o primeiro. Q ten unha función select - ?[Boolean list;list1;list2] - que selecciona un valor da lista 1 ou 2 dependendo da condición do primeiro argumento:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Aquí chamei unha asignación xeneralizada coa miña función (unha expresión entre chaves). Recibe o valor actual (o primeiro argumento) e un argumento adicional, que paso no cuarto parámetro.

Imos engadir altofalantes de batería por separado, xa que a función é a mesma para eles:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Esta é unha asignación normal dos estándares Q, pero estou asignando unha lista de valores á vez. Finalmente, imos crear a función principal:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Con esta expresión, creo dinámicamente unha función a partir dunha cadea que contén a expresión que dei anteriormente. O resultado será así:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

A orde de avaliación da columna invírtese porque en Q a orde de avaliación é de dereita a esquerda.

Agora temos dúas funcións principais necesarias para os cálculos, só necesitamos engadir un pouco de infraestrutura e o servizo está listo.

Pasos finais

Temos funcións de preprocesamento e updateAgg que fan todo o traballo. Pero aínda é necesario garantir a transición correcta a través de minutos e calcular índices para a agregación. Primeiro de todo, imos definir a función init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Tamén definiremos a función de rolo, que cambiará o minuto actual:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Necesitaremos unha función para engadir novos caracteres:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

E, finalmente, a función upd (o nome tradicional desta función para os servizos Q), que é chamada polo cliente para engadir datos:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Iso é todo. Aquí tes o código completo do noso servizo, como prometemos, só unhas liñas:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Probas

Comprobamos o rendemento do servizo. Para iso, executémolo nun proceso separado (coloque o código no ficheiro service.q) e chamemos á función init:

q service.q –p 5566

q)init[]

Noutra consola, inicia o segundo proceso Q e conéctate ao primeiro:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Primeiro, imos crear unha lista de símbolos - 10000 pezas e engadir unha función para crear unha táboa aleatoria. Na segunda consola:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Engadín tres símbolos reais á lista para facilitar a súa busca na táboa. A función rnd crea unha táboa aleatoria con n filas, onde o tempo varía de t a t+25 milisegundos.

Agora podes tentar enviar datos ao servizo (engade as primeiras dez horas):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Podes comprobar no servizo que a táboa está actualizada:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Resultado:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Agora realicemos probas de carga para saber cantos datos pode procesar o servizo por minuto. Permíteme recordarche que establecemos o intervalo de actualización en 25 milisegundos. En consecuencia, o servizo debe (de media) encaixar en polo menos 20 milisegundos por actualización para dar tempo aos usuarios para solicitar datos. Introduza o seguinte no segundo proceso:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 son dous minutos. Podes probar a executar primeiro durante 1000 filas cada 25 milisegundos:

start 1000

No meu caso, o resultado é duns milisegundos por actualización. Entón, vou aumentar inmediatamente o número de filas a 10.000:

start 10000

Resultado:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

De novo, nada especial, pero isto é de 24 millóns de liñas por minuto, 400 mil por segundo. Durante máis de 25 milisegundos, a actualización diminuíu só 5 veces, ao parecer cando cambiou o minuto. Aumentemos a 100.000:

start 100000

Resultado:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Como podes ver, o servizo apenas pode soportar, pero con todo consegue manterse a flote. Tal volume de datos (240 millóns de filas por minuto) é extremadamente grande; nestes casos, é habitual lanzar varios clons (ou incluso ducias de clons) do servizo, cada un dos cales só procesa parte dos personaxes. Aínda así, o resultado é impresionante para unha linguaxe interpretada que se centra principalmente no almacenamento de datos.

Pode xurdir a pregunta de por que o tempo crece de forma non lineal co tamaño de cada actualización. O motivo é que a función de redución é en realidade unha función C, que é moito máis eficiente que updateAgg. A partir dun determinado tamaño de actualización (uns 10.000), updateAgg alcanza o seu teito e entón o seu tempo de execución non depende do tamaño da actualización. É debido ao paso preliminar Q que o servizo é capaz de dixerir tales volumes de datos. Isto destaca o importante que é escoller o algoritmo correcto cando se traballa con big data. Outro punto é o correcto almacenamento dos datos na memoria. Se os datos non se almacenasen en columnas ou non estivesen ordenados por tempo, familiarizariamos con algo como un fallo de caché TLB: a ausencia dun enderezo de páxina de memoria na caché de enderezos do procesador. A busca dun enderezo leva unhas 30 veces máis tempo se non ten éxito, e se os datos están dispersos, pode ralentizar o servizo varias veces.

Conclusión

Neste artigo, demostrei que a base de datos KDB+ e Q son adecuadas non só para almacenar grandes datos e acceder facilmente a eles mediante select, senón tamén para crear servizos de procesamento de datos que son capaces de dixerir centos de millóns de filas/xigabytes de datos mesmo en un único proceso Q. A propia linguaxe Q permite unha implementación extremadamente concisa e eficiente de algoritmos relacionados co procesamento de datos debido á súa natureza vectorial, ao intérprete de dialectos SQL incorporado e a un conxunto de funcións de biblioteca moi exitoso.

Notarei que o anterior é só parte do que pode facer Q, tamén ten outras características únicas. Por exemplo, un protocolo IPC extremadamente sinxelo que borra o límite entre os procesos Q individuais e permite combinar centos destes procesos nunha única rede, que se pode localizar en decenas de servidores en diferentes partes do mundo.

Fonte: www.habr.com

Engadir un comentario