Recursos da linguagem Q e KDB+ usando o exemplo de um serviço em tempo real

Você pode ler sobre o que são a base KDB+, a linguagem de programação Q, quais são seus pontos fortes e fracos em meu artigo anterior. статье e brevemente na introdução. No artigo, implementaremos um serviço em Q que processará o fluxo de dados de entrada e calculará várias funções de agregação a cada minuto no modo “tempo real” (ou seja, terá tempo para calcular tudo antes da próxima porção de dados). A principal característica do Q é que é uma linguagem vetorial que permite operar não com objetos únicos, mas com seus arrays, arrays de arrays e outros objetos complexos. Idiomas como Q e seus parentes K, J, APL são famosos por sua brevidade. Freqüentemente, um programa que ocupa várias telas de código em uma linguagem familiar como Java pode ser escrito nelas em poucas linhas. É isso que quero demonstrar neste artigo.

Recursos da linguagem Q e KDB+ usando o exemplo de um serviço em tempo real

Introdução

KDB+ é um banco de dados colunar focado em grandes quantidades de dados, ordenados de uma maneira específica (principalmente por tempo). É utilizado principalmente em instituições financeiras – bancos, fundos de investimento, seguradoras. A linguagem Q é a linguagem interna do KDB+ que permite trabalhar efetivamente com esses dados. A ideologia Q é brevidade e eficiência, enquanto a clareza é sacrificada. Isso se justifica pelo fato de que a linguagem vetorial será de difícil compreensão em qualquer caso, e a brevidade e riqueza da gravação permitem ver uma parte muito maior do programa em uma tela, o que acaba facilitando a compreensão.

Neste artigo implementamos um programa completo em Q e você pode querer experimentá-lo. Para fazer isso, você precisará do Q real. Você pode baixar a versão gratuita de 32 bits no site da empresa kx – www.kx.com. Lá, caso tenha interesse, você encontrará informações de referência sobre Q, o livro Q para mortais e vários artigos sobre este tema.

Formulação do problema

Existe uma fonte que envia uma tabela com dados a cada 25 milissegundos. Como o KDB+ é usado principalmente em finanças, assumiremos que se trata de uma tabela de transações (negociações), que possui as seguintes colunas: tempo (tempo em milissegundos), sym (designação da empresa na bolsa - IBM, AAPL,…), preço (o preço pelo qual as ações foram compradas), tamanho (tamanho da transação). O intervalo de 25 milissegundos é arbitrário, nem muito pequeno nem muito longo. Sua presença significa que os dados chegam ao serviço já armazenados em buffer. Seria fácil implementar buffer no lado do serviço, incluindo buffer dinâmico dependendo da carga atual, mas para simplificar, focaremos em um intervalo fixo.

O serviço deve contar a cada minuto para cada símbolo recebido da coluna sym um conjunto de funções de agregação - preço máximo, preço médio, tamanho da soma, etc. informação útil. Para simplificar, assumiremos que todas as funções podem ser calculadas de forma incremental, ou seja, para obter um novo valor, basta conhecer dois números - o valor antigo e o valor recebido. Por exemplo, as funções max, média, soma possuem esta propriedade, mas a função mediana não.

Também assumiremos que o fluxo de dados de entrada é ordenado no tempo. Isto nos dará a oportunidade de trabalhar apenas com o último minuto. Na prática, basta poder trabalhar com os minutos atuais e anteriores caso algumas atualizações atrasem. Por simplicidade, não consideraremos este caso.

Funções de agregação

As funções de agregação necessárias estão listadas abaixo. Peguei o máximo possível para aumentar a carga do serviço:

  • alto – preço máximo – preço máximo por minuto.
  • baixo – preço mínimo – preço mínimo por minuto.
  • firstPrice – primeiro preço – primeiro preço por minuto.
  • lastPrice – último preço – último preço por minuto.
  • firstSize – primeiro tamanho – primeiro tamanho de negociação por minuto.
  • lastSize – último tamanho – último tamanho de negociação em um minuto.
  • numTrades – contagem i – número de negociações por minuto.
  • volume – tamanho da soma – soma dos tamanhos de negociação por minuto.
  • pvolume – sum price – soma dos preços por minuto, necessária para avgPrice.
  • – soma do preço do giro*tamanho – volume total de transações por minuto.
  • avgPrice – pvolume%numTrades – preço médio por minuto.
  • avgSize – volume%numTrades – tamanho médio da negociação por minuto.
  • vwap – volume de negócios%volume – preço médio por minuto ponderado pelo tamanho da transação.
  • cumVolume – soma do volume – tamanho acumulado das transações ao longo de todo o tempo.

Vamos discutir imediatamente um ponto não óbvio - como inicializar essas colunas pela primeira vez e para cada minuto subsequente. Algumas colunas do tipo firstPrice devem ser inicializadas como nulas todas as vezes; seu valor é indefinido. Outros tipos de volume devem sempre ser definidos como 0. Existem também colunas que requerem uma abordagem combinada - por exemplo, cumVolume deve ser copiado do minuto anterior e definido como 0 para o primeiro. Vamos definir todos esses parâmetros usando os dados do dicionário tipo (análogo a um registro):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Adicionei sym e time ao dicionário por conveniência, agora initWith é uma linha pronta da tabela agregada final, onde resta definir o sym e o horário corretos. Você pode usá-lo para adicionar novas linhas a uma tabela.

Precisaremos de aggCols ao criar uma função de agregação. A lista deve ser invertida devido à ordem em que as expressões em Q são avaliadas (da direita para a esquerda). O objetivo é garantir que o cálculo vá de alto para cumVolume, já que algumas colunas dependem das anteriores.

Colunas que precisam ser copiadas para um novo minuto do anterior, a coluna sym é adicionada por conveniência:

rollColumns:`sym`cumVolume;

Agora vamos dividir as colunas em grupos de acordo com a forma como devem ser atualizadas. Três tipos podem ser distinguidos:

  1. Acumuladores (volume, volume de negócios,..) – devemos somar o valor recebido ao anterior.
  2. Com um ponto especial (alto, baixo, ..) – o primeiro valor do minuto é obtido dos dados recebidos, o restante é calculado usando a função.
  3. Descansar. Sempre calculado usando uma função.

Vamos definir variáveis ​​para estas classes:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Ordem de cálculo

Atualizaremos a tabela agregada em duas etapas. Para maior eficiência, primeiro reduzimos a tabela recebida para que haja apenas uma linha para cada caractere e minuto. O facto de todas as nossas funções serem incrementais e associativas garante que o resultado deste passo adicional não mudará. Você poderia reduzir a tabela usando select:

select high:max price, low:min price … by sym,time.minute from table

Este método tem uma desvantagem - o conjunto de colunas calculadas é predefinido. Felizmente, em Q, select também é implementado como uma função onde você pode substituir argumentos criados dinamicamente:

?[table;whereClause;byClause;selectClause]

Não descreverei em detalhes o formato dos argumentos; no nosso caso, apenas as expressões by e select não serão triviais e deverão ser dicionários na forma colunas!expressões. Assim, a função de encolhimento pode ser definida da seguinte forma:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Para maior clareza, usei a função parse, que transforma uma string com uma expressão Q em um valor que pode ser passado para a função eval e que é necessário na função select. Observe também que o pré-processo é definido como uma projeção (ou seja, uma função com argumentos parcialmente definidos) da função select, faltando um argumento (a tabela). Se aplicarmos o pré-processamento a uma tabela, obteremos uma tabela compactada.

A segunda etapa é atualizar a tabela agregada. Vamos primeiro escrever o algoritmo em pseudocódigo:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Em Q, é comum usar funções map/reduce em vez de loops. Mas como Q é uma linguagem vetorial e podemos facilmente aplicar todas as operações a todos os símbolos de uma só vez, então, numa primeira aproximação, podemos dispensar nenhum loop, realizando operações em todos os símbolos de uma só vez:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Mas podemos ir mais longe, Q tem um operador único e extremamente poderoso – o operador de atribuição generalizada. Ele permite alterar um conjunto de valores em uma estrutura de dados complexa usando uma lista de índices, funções e argumentos. No nosso caso é assim:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Infelizmente, para atribuir a uma tabela você precisa de uma lista de linhas, não de colunas, e precisa transpor a matriz (lista de colunas para lista de linhas) usando a função flip. Isso é caro para uma tabela grande, então, em vez disso, aplicamos uma atribuição generalizada a cada coluna separadamente, usando a função map (que se parece com um apóstrofo):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Novamente usamos projeção de função. Observe também que em Q, criar uma lista também é uma função e podemos chamá-la usando a função each(map) para obter uma lista de listas.

Para garantir que o conjunto de colunas calculadas não seja fixo, criaremos a expressão acima dinamicamente. Vamos primeiro definir funções para calcular cada coluna, usando as variáveis ​​row e inp para fazer referência aos dados agregados e de entrada:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Algumas colunas são especiais; seu primeiro valor não deve ser calculado pela função. Podemos determinar que é o primeiro pela coluna row[`numTrades] - se contiver 0, então o valor é o primeiro. Q tem uma função de seleção - ?[Boolean list;list1;list2] - que seleciona um valor da lista 1 ou 2 dependendo da condição no primeiro argumento:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Aqui chamei uma atribuição generalizada com minha função (uma expressão entre chaves). Ele recebe o valor atual (o primeiro argumento) e um argumento adicional, que passo no 4º parâmetro.

Vamos adicionar alto-falantes com bateria separadamente, pois a função é a mesma para eles:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Esta é uma atribuição normal para os padrões Q, mas estou atribuindo uma lista de valores de uma vez. Finalmente, vamos criar a função principal:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Com esta expressão, crio dinamicamente uma função a partir de uma string que contém a expressão que forneci acima. O resultado ficará assim:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

A ordem de avaliação das colunas é invertida porque em Q a ordem de avaliação é da direita para a esquerda.

Agora temos duas funções principais necessárias para os cálculos, basta adicionar um pouco de infraestrutura e o serviço está pronto.

Etapas finais

Temos funções preprocess e updateAgg que fazem todo o trabalho. Mas ainda é necessário garantir a correta transição através dos minutos e calcular os índices de agregação. Primeiro de tudo, vamos definir a função init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Também definiremos a função roll, que alterará o minuto atual:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Precisaremos de uma função para adicionar novos caracteres:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

E por fim, a função upd (nome tradicional desta função para serviços Q), que é chamada pelo cliente para adicionar dados:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Isso é tudo. Aqui está o código completo do nosso serviço, conforme prometido, apenas algumas linhas:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Teste

Vamos verificar o desempenho do serviço. Para fazer isso, vamos executá-lo em um processo separado (colocar o código no arquivo service.q) e chamar a função init:

q service.q –p 5566

q)init[]

Em outro console, inicie o segundo processo Q e conecte-se ao primeiro:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Primeiro, vamos criar uma lista de símbolos - 10000 peças e adicionar uma função para criar uma tabela aleatória. No segundo console:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Adicionei três símbolos reais à lista para facilitar sua procura na tabela. A função rnd cria uma tabela aleatória com n linhas, onde o tempo varia de t a t+25 milissegundos.

Agora você pode tentar enviar dados para o serviço (adicione as primeiras dez horas):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Você pode verificar no serviço se a tabela foi atualizada:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Resultado:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Vamos agora realizar testes de carga para descobrir quantos dados o serviço pode processar por minuto. Deixe-me lembrá-lo de que definimos o intervalo de atualização para 25 milissegundos. Conseqüentemente, o serviço deve (em média) caber em pelo menos 20 milissegundos por atualização para dar aos usuários tempo para solicitar dados. Insira o seguinte no segundo processo:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 são dois minutos. Você pode tentar executar primeiro 1000 linhas a cada 25 milissegundos:

start 1000

No meu caso, o resultado é de cerca de alguns milissegundos por atualização. Portanto, aumentarei imediatamente o número de linhas para 10.000:

start 10000

Resultado:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Novamente, nada de especial, mas são 24 milhões de linhas por minuto, 400 mil por segundo. Por mais de 25 milissegundos, a atualização ficou lenta apenas 5 vezes, aparentemente quando o minuto mudou. Vamos aumentar para 100.000:

start 100000

Resultado:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Como você pode ver, o serviço mal consegue lidar com isso, mas mesmo assim consegue se manter funcionando. Esse volume de dados (240 milhões de linhas por minuto) é extremamente grande, nesses casos é comum o lançamento de vários clones (ou mesmo dezenas de clones) do serviço, cada um processando apenas parte dos caracteres. Ainda assim, o resultado é impressionante para uma linguagem interpretada que se concentra principalmente no armazenamento de dados.

Pode surgir a questão de por que o tempo cresce de forma não linear com o tamanho de cada atualização. A razão é que a função de redução é na verdade uma função C, que é muito mais eficiente que updateAgg. A partir de um determinado tamanho de atualização (cerca de 10.000), updateAgg atinge seu teto e então seu tempo de execução não depende do tamanho da atualização. É devido à etapa preliminar Q que o serviço é capaz de digerir tais volumes de dados. Isso destaca a importância de escolher o algoritmo certo ao trabalhar com big data. Outro ponto é o correto armazenamento dos dados na memória. Se os dados não fossem armazenados em colunas ou não fossem ordenados por tempo, então nos familiarizaríamos com algo como falta de cache TLB - a ausência de um endereço de página de memória no cache de endereço do processador. A busca por um endereço leva cerca de 30 vezes mais tempo se não for bem-sucedida e, se os dados estiverem espalhados, pode tornar o serviço várias vezes mais lento.

Conclusão

Neste artigo, mostrei que os bancos de dados KDB+ e Q são adequados não apenas para armazenar grandes dados e acessá-los facilmente por meio de seleção, mas também para criar serviços de processamento de dados capazes de digerir centenas de milhões de linhas/gigabytes de dados, mesmo em um único processo Q. A própria linguagem Q permite uma implementação extremamente concisa e eficiente de algoritmos relacionados ao processamento de dados devido à sua natureza vetorial, interpretador de dialeto SQL integrado e um conjunto de funções de biblioteca muito bem-sucedido.

Observo que o que foi dito acima é apenas parte do que Q pode fazer, mas também possui outros recursos exclusivos. Por exemplo, um protocolo IPC extremamente simples que elimina a fronteira entre processos Q individuais e permite combinar centenas desses processos em uma única rede, que pode estar localizada em dezenas de servidores em diferentes partes do mundo.

Fonte: habr.com

Adicionar um comentário