Features of the Q language and KDB + on the example of a real-time service

You can read about what the KDB + base is, the Q programming language, what strengths and weaknesses they have, in my previous article and briefly in the introduction. In the article, we will implement a service on Q that will process the incoming data stream and calculate various aggregating functions every minute in β€œreal time” mode (that is, it will have time to calculate everything before the next piece of data). The main feature of Q is that it is a vector language that allows you to operate not with single objects, but with their arrays, arrays of arrays and other complex objects. Languages ​​like Q and related languages ​​K, J, APL are famous for their brevity. It is not uncommon for a program that spans several screens of code in a familiar language such as Java to be written on multiple lines. That is what I want to demonstrate in this article.

Features of the Q language and KDB + on the example of a real-time service

Introduction

KDB+ is a columnar database focused on very large amounts of data, ordered in a certain way (primarily by time). It is used, first of all, in financial organizations - banks, investment funds, insurance companies. The Q language is KDB+'s internal language for working with this data efficiently. The ideology of Q is brevity and efficiency, while clarity is sacrificed. This is justified by the fact that the vector language in any case will be difficult to understand, and the brevity and richness of the recording allows you to see a much larger part of the program on one screen, which ultimately makes it easier to understand.

In this article, we will implement a full-fledged Q program, and you might want to try it out. To do this, you will need Q itself. You can download a free 32-bit version from the kx website - www.kx.com. In the same place, if you are interested, you will find background information on Q, the book Q For Mortals and various articles on the subject.

Formulation of the problem

There is a source that sends a table with data every 25 milliseconds. Since KDB+ is used primarily in finance, we will assume that this is a table of transactions (trades), which has the following columns: time (time in milliseconds), sym (company designation on the exchange - IBM, AAPL,…), price (price at which shares were bought), size (transaction size). The interval of 25 milliseconds is arbitrarily chosen, it is not too small and not too large. Its presence means that the data comes to the service already buffered. It would be easy to implement buffering on the service side, including dynamic buffering depending on the current load, but for simplicity, we will stop at a fixed interval.

The service must count for each incoming symbol from the sym column a set of aggregating functions - max price, avg price, sum size, etc. useful information. For simplicity, we assume that all functions can be calculated incrementally, i.e. to get a new value, it is enough to know two numbers - the old and the incoming values. For example, the max, average, sum functions have this property, but the median function does not.

We also assume that the incoming data stream is time-ordered. This will give us the opportunity to work only with the last minute. In practice, it is enough to be able to work with the current and previous minutes in case some updates are late. For simplicity, we will not consider this case.

Aggregate functions

The required aggregate functions are listed below. I took as many of them as possible to increase the load on the service:

  • high - max price - maximum price per minute.
  • low – min price – minimum price per minute.
  • firstPrice - first price - the first price per minute.
  • lastPrice - last price - last price per minute.
  • firstSize - first size - the first trade size per minute.
  • lastSize - last size - last trade size per minute.
  • numTrades - count i - number of trades per minute.
  • volume - sum size - sum of deal sizes per minute.
  • pvolume - sum price - sum of prices per minute, needed for avgPrice.
  • turnover – sum price*size – total volume of transactions per minute.
  • avgPrice - pvolume%numTrades - average price per minute.
  • avgSize – volume%numTrades – average trade size per minute.
  • vwap – turnover%volume – average price per minute weighted by trade size.
  • cumVolume - sum volume - the accumulated size of transactions for the entire time.

Let's immediately discuss one non-obvious point - how to initialize these columns for the first time and for each subsequent minute. Some columns of type firstPrice need to be initialized to null each time, their value is undefined. Other volume types must always be set to 0. There are also columns that require a combined approach - for example, cumVolume must be copied from the previous minute, and set to 0 for the first one. Let's set all these parameters using the dictionary data type (similar to a record):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

I added sym and time to the dictionary for convenience, now initWith is a ready-made line from the final aggregated table, where it remains to set the correct sym and time. You can use it to add new rows to a table.

we will need aggCols when creating an aggregating function. The list must be inverted due to the order of evaluation of expressions in Q (from right to left). The goal is to ensure that the calculation is in the direction from high to cumVolume, since some columns depend on the previous ones.

Columns that need to be copied to the new minute from the previous one, the sym column is added for convenience:

rollColumns:`sym`cumVolume;

Now let's divide the columns into groups according to how they should be updated. Three types can be distinguished:

  1. Accumulators (volume, turnover,..) - we must add the input value to the previous one.
  2. With a special dot (high, low, ..) - the first value in the minute is taken from the incoming data, the rest are calculated using the function.
  3. Rest. Always considered with a function.

Let's define variables for these classes:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Calculation Order

We will update the aggregated table in two stages. To be efficient, we will first shrink the input table so that there is one row for each character and minute. The fact that all our functions are incremental and associative guarantees that the result of this extra step will not change. It would be possible to shrink the table using the select:

select high:max price, low:min price … by sym,time.minute from table

This method has a minus - the set of calculated columns is predefined. Fortunately, in Q, the select is also implemented as a function where dynamically created arguments can be substituted:

?[table;whereClause;byClause;selectClause]

I will not describe in detail the format of the arguments, in our case only by and select expressions will be non-trivial and they must be dictionaries of the form columns!expressions. Thus, the shrinking function can be defined as follows:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

For clarity, I used the parse function, which turns a string with a Q expression into a value that can be passed to the eval function and which is required in the functional select. Also note that preprocess is specified as a projection (ie, a function with partially defined arguments) of the select function, one argument (table) is missing. If we apply preprocess to a table, we get a shrunk table.

The second stage is updating the aggregated table. Let's write the algorithm in pseudocode first:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

In Q, it is customary to use map/reduce functions instead of loops. But since Q is a vector language and we can safely apply all operations to all characters at once, then, as a first approximation, we can do without a loop at all, performing operations with all characters at once:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

But we can go further, Q has a unique and extremely powerful operator, the generalized assignment operator. It allows you to change a set of values ​​in a complex data structure using a list of indexes, functions and arguments. In our case, it looks like this:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Unfortunately, to assign to a table, you need a list of rows, not columns, and you have to transpose the matrix (list of columns into a list of rows) using the flip function. For a large table, this is expensive, so instead we apply a generic assignment to each column separately using the map function (which looks like an apostrophe):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

We use function projection again. Also note that in Q, creating a list is also a function, and we can call it with the each(map) function to get a list of lists.

So that the set of calculated columns is not fixed, we will create the expression above dynamically. Let's first define functions to calculate each column, using the row and inp variables to refer to the aggregated and input data:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Some columns are special, their first value must not be evaluated by the function. We can determine that it is the first one by the row[`numTrades] column - if it has 0, then the value is the first. Q has a selector function - ?[Boolean list;list1;list2] - that selects a value from list 1 or 2 depending on the condition in the first argument:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Here I called a generic assignment with my function (expression in curly braces). It is passed the current value (first argument) and an additional argument, which I pass in the 4th parameter.

Separately, we will add battery speakers, since the function for them is the same:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

This is a normal assignment by the standards of Q, but I immediately assign a list of values. Finally, let's create the main function:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

With this expression, I dynamically create a function from a string that contains the expression I gave above. The result will look like this:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

The order in which the columns are evaluated is inverted because in Q, the evaluation order is from right to left.

Now we have two main functions required for computing, it remains to add a little infrastructure and the service is ready.

Final steps

We have the preprocess and updateAgg functions that do all the work. But it is still necessary to ensure the correct transition after minutes and calculate the indexes for aggregation. First, let's define the init function:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

We also define the roll function, which will change the current minute:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

We need a function to add new characters:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

And finally, the upd function (the traditional name for this function for Q services), which is called by the client to add data:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

That's all. Here is the full code of our service, as promised, just a few lines:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

The test is

Let's check the performance of the service. To do this, run it in a separate process (put the code in the service.q file) and call the init function:

q service.q –p 5566

q)init[]

In another console, start the second Q process and connect to the first one:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

First, let's create a list of symbols - 10000 pieces and add a function to create a random table. In the second console:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

I added three real characters to the list to make it easier to look for them in the table. The rnd function creates a random table with n rows where the time varies from t to t+25 milliseconds.

Now we can try to send data to the service (let's add the first ten hours):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

You can check in the service that the table has been updated:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Result:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Let's now load test to find out how much data the service can process per minute. Let me remind you that we set the interval for updates to 25 milliseconds. Accordingly, the service should (on average) fit at least 20 milliseconds per update to give users time to request data. Enter the following in the second process:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 is two minutes. You can try to run first for 1000 lines every 25 milliseconds:

start 1000

In my case, the result is obtained in the region of a couple of milliseconds per update. So I will immediately increase the number of lines to 10.000:

start 10000

Result:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Again, nothing special, but this is 24 million lines per minute, 400 thousand per second. More than 25 milliseconds, the update slowed down only 5 times, apparently when changing the minute. Let's increase it to 100.000:

start 100000

Result:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

As you can see, the service is barely coping, but nevertheless it manages to stay afloat. Such a volume of data (240 million lines per minute) is extremely large, in such cases it is common to run several clones (or even dozens of clones) of the service, each of which processes only a part of the characters. However, the result is impressive for an interpreted language that focuses primarily on data storage.

One might wonder why the time grows non-linearly with the size of each update. The reason is that the shrink function is actually a C function that is much more efficient than updateAgg. Starting from a certain update size (around 10.000), updateAgg reaches its ceiling and then its execution time does not depend on the update size. It is due to the preliminary step Q that the service is able to digest such volumes of data. This highlights the importance of choosing the right algorithm when dealing with big data. Another point is the correct storage of data in memory. If the data were not stored in columns or were not ordered by time, then we would get acquainted with such a thing as TLB cache miss - the absence of a memory page address in the processor's address cache. The address lookup takes about 30 times longer if it fails, and in the case of scattered data, it can slow down the service several times.

Conclusion

In this article, I showed that the KDB + and Q database are suitable not only for storing large data and easy access to it through a select, but also for creating data processing services that can digest hundreds of millions of rows / gigabytes of data even in one single Q process . The Q language itself allows extremely concise and efficient implementation of algorithms related to data processing due to its vector nature, the built-in SQL dialect interpreter, and a very successful set of library functions.

I will note that the above is only a subset of Q's capabilities, it has other unique features as well. For example, an extremely simple IPC protocol that blurs the boundary between individual Q processes and allows you to combine hundreds of these processes into a single network that can be located on dozens of servers in different parts of the world.

Source: habr.com

Add a comment