Você pode ler sobre o que são o banco de dados KDB+ e a linguagem de programação Q, bem como seus pontos fortes e fracos, na minha postagem anterior. e brevemente na introdução. Neste artigo, implementaremos um serviço em Q que processará um fluxo de dados de entrada e calculará várias funções de agregação minuto a minuto em "tempo real" (ou seja, terá tempo de calcular tudo antes do próximo lote de dados). A principal característica do Q é ser uma linguagem vetorial, permitindo-nos operar não em objetos individuais, mas em arrays de objetos, arrays de arrays e outros objetos complexos. Linguagens como Q e suas relacionadas, K, J e APL, são conhecidas por sua concisão. Frequentemente, um programa que ocuparia várias telas de código em uma linguagem familiar como Java pode ser escrito em poucas linhas. É exatamente isso que quero demonstrar neste artigo.

Introdução
O KDB+ é um banco de dados colunar projetado para lidar com grandes volumes de dados organizados de maneira específica (principalmente por tempo). É utilizado principalmente em instituições financeiras, como bancos, fundos de investimento e seguradoras. Q é a linguagem interna do KDB+, permitindo um trabalho eficiente com esses dados. A filosofia do Q é a concisão e a eficiência, sacrificando a clareza. Isso ocorre porque uma linguagem baseada em vetores seria difícil de entender, enquanto a concisão e a riqueza permitem que uma parte muito maior do programa seja exibida em uma única tela, tornando-o mais fácil de compreender.
Neste artigo, vamos implementar um programa completo em Q, e você pode querer experimentá-lo. Para isso, você precisará do próprio Q. Você pode baixar a versão gratuita de 32 bits no site do kx – Lá, caso tenha interesse, você também encontrará informações de referência sobre Q, um livro. e vários artigos sobre este tema.
Formulação do problema
Existe uma fonte que envia uma tabela de dados a cada 25 milissegundos. Como o KDB+ é usado principalmente em finanças, vamos assumir que se trata de uma tabela de negociações com as seguintes colunas: tempo (tempo em milissegundos), símbolo (código da empresa na bolsa de valores – IBM, AAPL…), preço (o preço pelo qual as ações foram compradas) e tamanho (o tamanho da transação). O intervalo de 25 milissegundos foi escolhido arbitrariamente; não é muito pequeno nem muito grande. Sua presença significa que os dados que chegam ao serviço já estão em buffer. Seria fácil implementar o buffer no lado do serviço, incluindo o buffer dinâmico com base na carga atual, mas, por simplicidade, manteremos um intervalo fixo.
O serviço deve calcular um conjunto de funções de agregação — preço máximo, preço médio, soma e outras informações úteis — por minuto para cada símbolo recebido da coluna `sym`. Para simplificar, assumimos que todas as funções podem ser calculadas incrementalmente, ou seja, para obter um novo valor, dois números — o valor anterior e o valor recebido — são suficientes. Por exemplo, as funções de máximo, média e soma possuem essa propriedade, mas a função de mediana não.
Vamos também assumir que o fluxo de dados recebido está ordenado por tempo. Isso nos permitirá trabalhar apenas com o minuto mais recente. Na prática, é suficiente poder trabalhar com o minuto atual e o anterior, caso haja alguma atualização atrasada. Para simplificar, não consideraremos esse caso.
Funções de agregação
Abaixo estão as funções de agregação necessárias. Incluí o máximo possível para aumentar a capacidade de processamento do serviço:
- alto – preço máximo – preço máximo por minuto.
- baixo – preço mínimo – preço mínimo por minuto.
- firstPrice – primeiro preço – o primeiro preço por minuto.
- últimoPreço – último preço – o último preço por minuto.
- firstSize – primeiro tamanho – o tamanho da primeira transação por minuto.
- lastSize – último tamanho — o último tamanho de transação por minuto.
- numTrades – contagem i – número de negociações por minuto.
- Volume – tamanho total – soma dos tamanhos das transações por minuto.
- pvolume – soma de preços – soma dos preços por minuto, necessária para avgPrice.
- Faturamento – soma de preço * tamanho – volume total de transações por minuto.
- avgPrice – pvolume%numTrades – preço médio por minuto.
- avgSize – volume%numTrades – tamanho médio das negociações por minuto.
- VWAP – volume de negócios em % – preço médio por minuto ponderado pelo tamanho da negociação.
- Volume acumulado – volume total – volume acumulado de transações durante todo o período.
Vamos abordar logo um ponto não óbvio: como inicializar essas colunas na primeira vez e a cada minuto subsequente. Algumas colunas, como `firstPrice`, devem ser inicializadas com o valor `null` a cada vez; seu valor é indefinido. Outras, como `volume`, devem sempre ser definidas como 0. Há também colunas que exigem uma abordagem combinada — por exemplo, `cumVolume` deve ser copiado do minuto anterior e definido como 0 para o primeiro minuto. Definiremos todos esses parâmetros usando o tipo de dados dicionário (semelhante a um registro):
// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
Adicionei os parâmetros `sym` e `time` ao dicionário para maior conveniência. Agora, `initWith` é uma linha pronta da tabela agregada final, onde resta especificar os parâmetros corretos `sym` e `time`. Você pode usá-la para adicionar novas linhas à tabela.
Precisaremos de `aggCols` ao criar a função de agregação. A lista precisa ser invertida devido à ordem de avaliação da expressão em Q (da direita para a esquerda). O objetivo é garantir que a avaliação prossiga de `high` para `cumVolume`, já que algumas colunas dependem de colunas anteriores.
As colunas que precisam ser copiadas para o novo minuto a partir do minuto anterior; a coluna "sym" foi adicionada para facilitar a consulta:
rollColumns:`sym`cumVolume;
Agora, vamos dividir as colunas em grupos com base em como elas devem ser atualizadas. Podemos distinguir três tipos:
- Acumuladores (volume, rotatividade, etc.) – devemos adicionar o valor de entrada ao valor anterior.
- Com um ponto específico (máximo, mínimo, etc.) - o primeiro valor do minuto é obtido a partir dos dados recebidos, os restantes são calculados utilizando a função.
- Os demais são sempre calculados usando a função.
Vamos definir variáveis para essas classes:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Ordem de cálculo
Atualizaremos a tabela agregada em duas etapas. Para maior eficiência, primeiro reduziremos a tabela de entrada para que contenha uma linha para cada símbolo e minuto. O fato de todas as nossas funções serem incrementais e associativas garante que o resultado não se alterará com esta etapa adicional. Poderíamos reduzir a tabela usando uma instrução SELECT:
select high:max price, low:min price … by sym,time.minute from table
Este método tem uma desvantagem: o conjunto de colunas calculadas é predefinido. Felizmente, o Q também implementa o select como uma função que aceita argumentos gerados dinamicamente:
?[table;whereClause;byClause;selectClause]
Não descreverei o formato dos argumentos em detalhes; em nosso caso, as únicas expressões não triviais são as expressões `by` e `select`, e elas devem ser dicionários da forma `columns!expressões`. Assim, a função de compressão pode ser definida da seguinte forma:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Para maior clareza, utilizei a função `parse`, que converte uma string contendo a expressão Q em um valor que pode ser passado para a função `eval` e é necessário na função `select`. Observe também que `preprocess` é definida como uma projeção (ou seja, uma função com argumentos parcialmente definidos) da função `select`; um argumento (a tabela) está faltando. Se aplicarmos `preprocess` a uma tabela, obteremos uma tabela compactada.
O segundo passo é atualizar a tabela agregada. Vamos primeiro escrever o algoritmo em pseudocódigo:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
…
Em Q, é comum usar funções map/reduce em vez de loops. Mas como Q é uma linguagem vetorial e podemos aplicar com segurança todas as operações a todos os símbolos de uma só vez, podemos, como uma primeira aproximação, eliminar o loop completamente, realizando operações em todos os símbolos simultaneamente:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…
Mas podemos ir ainda mais longe. Q possui um operador único e excepcionalmente poderoso — o operador de atribuição generalizado. Ele permite modificar um conjunto de valores em uma estrutura de dados complexa usando uma lista de índices, funções e argumentos. No nosso caso, seria algo assim:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
Infelizmente, atribuir valores a uma tabela requer uma lista de linhas, não de colunas, e exige a transposição da matriz (lista de colunas em lista de linhas) usando a função `flip`. Para uma tabela grande, isso é custoso, então, em vez disso, aplicamos uma atribuição generalizada a cada coluna individualmente usando a função `map` (que se parece com um apóstrofo):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
Estamos usando projeção de função novamente. Observe também que, em Q, a criação de listas também é uma função, e podemos chamá-la usando `each(map)` para obter uma lista de listas.
Para evitar um conjunto fixo de colunas calculadas, vamos criar a expressão acima dinamicamente. Primeiro, defina funções para calcular cada coluna, usando as variáveis `row` e `inp` para referenciar os dados agregados e de entrada:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Algumas colunas são especiais; seu primeiro valor não deve ser calculado pela função. Podemos determinar que é o primeiro pela coluna `row[`numTrades]` — se for 0, então o valor é o primeiro. Q possui uma função de seleção — `?[Boolean list;list1;list2]` — que seleciona um valor da lista 1 ou 2 dependendo da condição no primeiro argumento:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Aqui, chamei uma atribuição genérica com minha função (a expressão entre chaves). Ela recebe o valor atual (o primeiro argumento) e um argumento adicional, que passo como quarto parâmetro.
Vamos adicionar as caixas de som alimentadas por bateria separadamente, já que elas desempenham a mesma função:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Esta é uma tarefa típica pelos padrões do Q, mas estou atribuindo uma lista de valores de uma só vez. Finalmente, vamos criar a função principal:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Com essa expressão, eu crio dinamicamente uma função a partir de uma string contendo a expressão que forneci acima. O resultado será semelhante a este:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
A ordem de avaliação das colunas é invertida, uma vez que em Q a ordem de avaliação é da direita para a esquerda.
Agora que temos duas funções principais necessárias para a computação, tudo o que resta é adicionar um pouco de infraestrutura e o serviço estará pronto.
Etapas finais
Temos as funções `preprocess` e `updateAgg` que fazem todo o trabalho. Mas ainda precisamos garantir transições adequadas entre os minutos e calcular os índices para agregação. Primeiro, vamos definir a função `init`:
init:{
tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
offset::0; // индекс в tradeAgg, где начинается текущая минута
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
}
Também definiremos uma função de rolagem que alterará o minuto atual:
roll:{[tm]
if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Precisaremos de uma função para adicionar novos caracteres:
addSyms:{[syms]
currSyms,::syms; // добавим в список известных
// добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
// Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
E, finalmente, a função upd (nome tradicional dessa função para serviços Q), que é chamada pelo cliente para adicionar dados:
upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
data:select from data where time=tm; // фильтрация
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
};
Pronto. Aqui está o código completo do nosso serviço, como prometido, apenas algumas linhas:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Teste
Vamos testar o desempenho do serviço. Para isso, execute-o em um processo separado (coloque o código no arquivo service.q) e chame a função init:
q service.q –p 5566
q)init[]
Em outro console, inicie um segundo processo Q e conecte-o ao primeiro:
h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте
Primeiro, vamos criar uma lista de caracteres — 10000 deles — e adicionar uma função para gerar uma tabela aleatória. No segundo console:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Adicionei três símbolos reais à lista para facilitar a busca na tabela. A função `rnd` cria uma tabela aleatória com n linhas, onde os tempos variam de t a t+25 milissegundos.
Agora você pode tentar enviar dados para o serviço (vamos adicionar as primeiras dez horas):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Você pode verificar no serviço se a tabela foi atualizada:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Resultado:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919Agora, vamos executar um teste de carga para determinar a quantidade de dados que o serviço pode processar por minuto. Lembrando que definimos o intervalo de atualização para 25 milissegundos. Portanto, o serviço deve (em média) atualizar em pelo menos 20 milissegundos para dar tempo aos usuários de solicitarem os dados. Insira o seguinte no segundo processo:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 são dois minutos. Você pode tentar executar primeiro com 1000 linhas a cada 25 milissegundos:
start 1000
No meu caso, o resultado é de cerca de alguns milissegundos por atualização. Portanto, vou aumentar imediatamente o número de linhas para 10.000:
start 10000
Resultado:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Novamente, nada de especial, mas são 24 milhões de linhas por minuto, 400 mil por segundo. A atualização só ficou mais lenta do que 25 milissegundos cinco vezes, aparentemente devido à mudança de minuto. Vamos aumentar isso para 100 mil:
start 100000
Resultado:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Como podemos ver, o serviço está dando conta do recado, mas ainda consegue se manter à tona. Esse volume de dados (240 milhões de linhas por minuto) é extremamente grande; em casos como esse, é comum lançar vários clones (ou até dezenas) do serviço, cada um processando apenas um subconjunto dos caracteres. Mesmo assim, o resultado é impressionante para uma linguagem interpretada que tem como foco principal o armazenamento de dados.
Pode-se questionar por que o tempo cresce de forma não linear com o tamanho de cada atualização. A razão é que a função de compressão é essencialmente uma função em C, que é muito mais eficiente do que o `updateAgg`. A partir de um certo tamanho de atualização (em torno de 10.000), o `updateAgg` atinge seu limite máximo e, depois disso, seu tempo de execução torna-se independente do tamanho da atualização. É precisamente por causa da etapa prévia `Q` que o serviço consegue processar tais volumes de dados. Isso ressalta a importância de escolher o algoritmo correto ao trabalhar com big data. Outra consideração é o armazenamento adequado dos dados na memória. Se os dados não fossem armazenados em colunas ou em ordem cronológica, encontraríamos algo chamado falha de cache TLB — uma falha ao encontrar o endereço de uma página de memória no cache de endereços do processador. As buscas de endereço levam aproximadamente 30 vezes mais tempo se não forem bem-sucedidas e, no caso de dados dispersos, isso pode tornar o serviço muito mais lento.
Conclusão
Neste artigo, demonstrei que o KDB+ e a linguagem Q são adequados não apenas para armazenar grandes conjuntos de dados e acessá-los facilmente por meio de instruções SELECT, mas também para criar serviços de processamento de dados capazes de processar centenas de milhões de linhas/gigabytes de dados, mesmo em um único processo Q. A própria linguagem Q permite uma implementação excepcionalmente concisa e eficiente de algoritmos de processamento de dados devido à sua natureza vetorial, ao interpretador SQL integrado e a um conjunto de funções de biblioteca muito bem-sucedido.
Gostaria de salientar que o que foi apresentado acima é apenas uma amostra das capacidades do Q; ele também possui outros recursos exclusivos. Por exemplo, um protocolo IPC extremamente simples que elimina as barreiras entre os processos individuais do Q e permite que centenas desses processos sejam conectados em uma única rede, que pode abranger dezenas de servidores ao redor do mundo.
Fonte: habr.com
