Podes ler sobre a base de KDB+, a linguaxe de programación Q, cales son os seus puntos fortes e débiles no meu anterior
Introdución
KDB+ é unha base de datos en columnas centrada en cantidades moi grandes de datos, ordenados dun xeito específico (principalmente por tempo). Utilízase principalmente en institucións financeiras: bancos, fondos de investimento, compañías de seguros. A linguaxe Q é a linguaxe interna de KDB+ que che permite traballar eficazmente con estes datos. A ideoloxía Q é a brevidade e a eficiencia, mentres que a claridade é sacrificada. Isto xustifícase polo feito de que a linguaxe vectorial será difícil de entender en calquera caso, e a brevidade e riqueza da gravación permítelle ver unha parte moito máis grande do programa nunha soa pantalla, o que finalmente facilita a súa comprensión.
Neste artigo implementamos un programa completo en Q e quizais queiras probalo. Para iso, necesitará a Q real. Podes descargar a versión gratuíta de 32 bits no sitio web da empresa kx:
Declaración de problemas
Hai unha fonte que envía unha táboa con datos cada 25 milisegundos. Dado que KDB+ úsase principalmente en finanzas, asumiremos que esta é unha táboa de transaccións (operacións), que ten as seguintes columnas: tempo (tempo en milisegundos), sym (denominación da empresa na bolsa - IBM, AAPL,…), prezo (o prezo ao que se adquiriron as accións), tamaño (tamaño da transacción). O intervalo de 25 milisegundos é arbitrario, nin demasiado pequeno nin demasiado longo. A súa presenza significa que os datos chegan ao servizo xa almacenados en búfer. Sería doado implementar o búfer no lado do servizo, incluíndo o búfer dinámico dependendo da carga actual, pero para simplificar, centrarémonos nun intervalo fixo.
O servizo debe contar cada minuto para cada símbolo entrante da columna sym un conxunto de funcións de agregación: prezo máximo, prezo medio, tamaño da suma, etc. información útil. Para simplificar, asumiremos que todas as funcións poden calcularse de forma incremental, é dicir. para obter un novo valor, abonda con coñecer dous números: o antigo e os valores entrantes. Por exemplo, as funcións max, average, sum teñen esta propiedade, pero a función mediana non.
Tamén asumiremos que o fluxo de datos entrante está ordenado no tempo. Isto daranos a oportunidade de traballar só co último minuto. Na práctica, abonda con poder traballar cos minutos actuais e anteriores no caso de que algunha actualización tarde. Por simplicidade, non consideraremos este caso.
Funcións de agregación
As funcións de agregación necesarias están listadas a continuación. Tomei o maior número posible deles para aumentar a carga do servizo:
- alto – prezo máximo – prezo máximo por minuto.
- baixo – prezo mínimo – prezo mínimo por minuto.
- firstPrice - primeiro prezo - primeiro prezo por minuto.
- lastPrice - último prezo - último prezo por minuto.
- firstSize - primeiro tamaño - primeiro tamaño comercial por minuto.
- lastSize - último tamaño - último tamaño de intercambio nun minuto.
- numTrades – count i – número de operacións por minuto.
- volume - tamaño da suma - suma dos tamaños comerciais por minuto.
- pvolume – prezo suma – suma de prezos por minuto, necesario para avgPrice.
- - prezo total de facturación * tamaño - volume total de transaccións por minuto.
- avgPrice - pvolume%numTrades - prezo medio por minuto.
- avgSize – volume%numTrades – tamaño medio de comercio por minuto.
- vwap - volume de negocio% - prezo medio por minuto ponderado polo tamaño da transacción.
- cumVolume - volume suma - tamaño acumulado das transaccións durante todo o tempo.
Imos discutir inmediatamente un punto non obvio: como inicializar estas columnas por primeira vez e para cada minuto posterior. Algunhas columnas do tipo firstPrice deben inicializarse como nulas cada vez; o seu valor non está definido. Outros tipos de volume deben estar sempre configurados en 0. Tamén hai columnas que requiren un enfoque combinado; por exemplo, cumVolume debe ser copiado do minuto anterior e para o primeiro definido en 0. Axustemos todos estes parámetros usando os datos do dicionario. tipo (análogo a un rexistro):
// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
Engadín sym e time ao dicionario por comodidade, agora initWith é unha liña preparada da táboa agregada final, onde queda establecer o sym e o tempo correctos. Podes usalo para engadir novas filas a unha táboa.
Necesitaremos aggCols ao crear unha función de agregación. A lista debe ser invertida debido á orde na que se avalían as expresións en Q (de dereita a esquerda). O obxectivo é asegurar que o cálculo vai de alto a cumVolume, xa que algunhas columnas dependen das anteriores.
As columnas que deben copiarse nun minuto novo do anterior, engádese a columna sym para comodidade:
rollColumns:`sym`cumVolume;
Agora imos dividir as columnas en grupos segundo como se deben actualizar. Pódense distinguir tres tipos:
- Acumuladores (volume, facturación,..) – debemos engadir o valor de entrada ao anterior.
- Cun punto especial (alto, baixo, ..): o primeiro valor do minuto tómase dos datos entrantes, o resto calcúlase mediante a función.
- Descansa. Calcúlase sempre mediante unha función.
Definamos variables para estas clases:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Orde de cálculo
Actualizaremos a táboa agregada en dúas etapas. Para a eficacia, primeiro reducimos a táboa de entrada para que só haxa unha fila para cada carácter e minuto. O feito de que todas as nosas funcións sexan incrementais e asociativas garante que o resultado deste paso adicional non cambiará. Podes reducir a táboa usando select:
select high:max price, low:min price … by sym,time.minute from table
Este método ten unha desvantaxe: o conxunto de columnas calculadas está predefinido. Afortunadamente, en Q, select tamén se implementa como unha función onde podes substituír argumentos creados dinámicamente:
?[table;whereClause;byClause;selectClause]
Non describirei en detalle o formato dos argumentos; no noso caso, só as expresións by e select non serán triviais e deberían ser dicionarios da forma columnas!expresións. Así, a función de redución pódese definir do seguinte xeito:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Para claridade, usei a función de análise, que converte unha cadea cunha expresión Q nun valor que se pode pasar á función eval e que se require na función select. Teña en conta tamén que o preproceso defínese como unha proxección (é dicir, unha función con argumentos parcialmente definidos) da función de selección, falta un argumento (a táboa). Se aplicamos un preproceso a unha táboa, obteremos unha táboa comprimida.
A segunda etapa é a actualización da táboa agregada. Escribamos primeiro o algoritmo en pseudocódigo:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
…
En Q, é común usar funcións de mapa/reducir en lugar de bucles. Pero dado que Q é unha linguaxe vectorial e podemos aplicar facilmente todas as operacións a todos os símbolos á vez, a unha primeira aproximación podemos prescindir dun bucle, realizando operacións sobre todos os símbolos á vez:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…
Pero podemos ir máis aló, Q ten un operador único e extremadamente poderoso: o operador de asignación xeneralizada. Permítelle cambiar un conxunto de valores nunha estrutura de datos complexa mediante unha lista de índices, funcións e argumentos. No noso caso ten o seguinte aspecto:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
Desafortunadamente, para asignarlle a unha táboa necesitas unha lista de filas, non columnas, e tes que transpoñer a matriz (lista de columnas a lista de filas) usando a función flip. Isto é caro para unha táboa grande, polo que aplicamos unha asignación xeneralizada a cada columna por separado, usando a función de mapa (que semella un apóstrofo):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
Usamos de novo a función de proxección. Teña en conta tamén que en Q, crear unha lista tamén é unha función e podemos chamala usando a función each(map) para obter unha lista de listas.
Para asegurarnos de que o conxunto de columnas calculadas non está fixado, crearemos a expresión anterior de forma dinámica. Imos primeiro definir funcións para calcular cada columna, usando as variables de fila e inp para facer referencia aos datos agregados e de entrada:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Algunhas columnas son especiais; o seu primeiro valor non debe ser calculado pola función. Podemos determinar que é o primeiro pola fila [`numTrades] columna; se contén 0, entón o valor é o primeiro. Q ten unha función select - ?[Boolean list;list1;list2] - que selecciona un valor da lista 1 ou 2 dependendo da condición do primeiro argumento:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Aquí chamei unha asignación xeneralizada coa miña función (unha expresión entre chaves). Recibe o valor actual (o primeiro argumento) e un argumento adicional, que paso no cuarto parámetro.
Imos engadir altofalantes de batería por separado, xa que a función é a mesma para eles:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Esta é unha asignación normal dos estándares Q, pero estou asignando unha lista de valores á vez. Finalmente, imos crear a función principal:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Con esta expresión, creo dinámicamente unha función a partir dunha cadea que contén a expresión que dei anteriormente. O resultado será así:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
A orde de avaliación da columna invírtese porque en Q a orde de avaliación é de dereita a esquerda.
Agora temos dúas funcións principais necesarias para os cálculos, só necesitamos engadir un pouco de infraestrutura e o servizo está listo.
Pasos finais
Temos funcións de preprocesamento e updateAgg que fan todo o traballo. Pero aínda é necesario garantir a transición correcta a través de minutos e calcular índices para a agregación. Primeiro de todo, imos definir a función init:
init:{
tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
offset::0; // индекс в tradeAgg, где начинается текущая минута
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
}
Tamén definiremos a función de rolo, que cambiará o minuto actual:
roll:{[tm]
if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Necesitaremos unha función para engadir novos caracteres:
addSyms:{[syms]
currSyms,::syms; // добавим в список известных
// добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
// Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
E, finalmente, a función upd (o nome tradicional desta función para os servizos Q), que é chamada polo cliente para engadir datos:
upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
data:select from data where time=tm; // фильтрация
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
};
Iso é todo. Aquí tes o código completo do noso servizo, como prometemos, só unhas liñas:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Probas
Comprobamos o rendemento do servizo. Para iso, executémolo nun proceso separado (coloque o código no ficheiro service.q) e chamemos á función init:
q service.q –p 5566
q)init[]
Noutra consola, inicia o segundo proceso Q e conéctate ao primeiro:
h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте
Primeiro, imos crear unha lista de símbolos - 10000 pezas e engadir unha función para crear unha táboa aleatoria. Na segunda consola:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Engadín tres símbolos reais á lista para facilitar a súa busca na táboa. A función rnd crea unha táboa aleatoria con n filas, onde o tempo varía de t a t+25 milisegundos.
Agora podes tentar enviar datos ao servizo (engade as primeiras dez horas):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Podes comprobar no servizo que a táboa está actualizada:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Resultado:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
Agora realicemos probas de carga para saber cantos datos pode procesar o servizo por minuto. Permíteme recordarche que establecemos o intervalo de actualización en 25 milisegundos. En consecuencia, o servizo debe (de media) encaixar en polo menos 20 milisegundos por actualización para dar tempo aos usuarios para solicitar datos. Introduza o seguinte no segundo proceso:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 son dous minutos. Podes probar a executar primeiro durante 1000 filas cada 25 milisegundos:
start 1000
No meu caso, o resultado é duns milisegundos por actualización. Entón, vou aumentar inmediatamente o número de filas a 10.000:
start 10000
Resultado:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
De novo, nada especial, pero isto é de 24 millóns de liñas por minuto, 400 mil por segundo. Durante máis de 25 milisegundos, a actualización diminuíu só 5 veces, ao parecer cando cambiou o minuto. Aumentemos a 100.000:
start 100000
Resultado:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Como podes ver, o servizo apenas pode soportar, pero con todo consegue manterse a flote. Tal volume de datos (240 millóns de filas por minuto) é extremadamente grande; nestes casos, é habitual lanzar varios clons (ou incluso ducias de clons) do servizo, cada un dos cales só procesa parte dos personaxes. Aínda así, o resultado é impresionante para unha linguaxe interpretada que se centra principalmente no almacenamento de datos.
Pode xurdir a pregunta de por que o tempo crece de forma non lineal co tamaño de cada actualización. O motivo é que a función de redución é en realidade unha función C, que é moito máis eficiente que updateAgg. A partir dun determinado tamaño de actualización (uns 10.000), updateAgg alcanza o seu teito e entón o seu tempo de execución non depende do tamaño da actualización. É debido ao paso preliminar Q que o servizo é capaz de dixerir tales volumes de datos. Isto destaca o importante que é escoller o algoritmo correcto cando se traballa con big data. Outro punto é o correcto almacenamento dos datos na memoria. Se os datos non se almacenasen en columnas ou non estivesen ordenados por tempo, familiarizariamos con algo como un fallo de caché TLB: a ausencia dun enderezo de páxina de memoria na caché de enderezos do procesador. A busca dun enderezo leva unhas 30 veces máis tempo se non ten éxito, e se os datos están dispersos, pode ralentizar o servizo varias veces.
Conclusión
Neste artigo, demostrei que a base de datos KDB+ e Q son adecuadas non só para almacenar grandes datos e acceder facilmente a eles mediante select, senón tamén para crear servizos de procesamento de datos que son capaces de dixerir centos de millóns de filas/xigabytes de datos mesmo en un único proceso Q. A propia linguaxe Q permite unha implementación extremadamente concisa e eficiente de algoritmos relacionados co procesamento de datos debido á súa natureza vectorial, ao intérprete de dialectos SQL incorporado e a un conxunto de funcións de biblioteca moi exitoso.
Notarei que o anterior é só parte do que pode facer Q, tamén ten outras características únicas. Por exemplo, un protocolo IPC extremadamente sinxelo que borra o límite entre os procesos Q individuais e permite combinar centos destes procesos nunha única rede, que se pode localizar en decenas de servidores en diferentes partes do mundo.
Fonte: www.habr.com