Можливості мови Q та KDB+ на прикладі сервісу реального часу

Про те, що таке база KDB+, мову програмування Q, які у них є сильні та слабкі сторони, можна прочитати у моїй попередній статті і коротко у вступі. У статті ж ми реалізуємо на Q сервіс, який оброблятиме вхідний потік даних і вираховуватиме щохвилини різні функції, що агрегують, в режимі “реального часу” (тобто буде встигати все порахувати до наступної порції даних). Головна особливість Q полягає в тому, що це векторна мова, що дозволяє оперувати не одиничними об'єктами, які масивами, масивами масивів та іншими складені об'єктами. Такі мови як Q і споріднені з ним K, J, APL відомі своєю стислістю. Нерідко програму, що займає кілька екранів коду звичною мовою типу Java, можна записати на них у кілька рядків. Саме це я й хочу продемонструвати у цій статті.

Можливості мови Q та KDB+ на прикладі сервісу реального часу

Запровадження

KDB+ — це колонкова база даних, орієнтована дуже великі обсяги даних, впорядковані певним чином (насамперед у часі). Використовується вона насамперед у фінансових організаціях – банках, інвестиційних фондах, страхових компаніях. Мова Q – це внутрішня мова KDB+, яка дозволяє ефективно працювати з цими даними. Ідеологія Q – це стислість та ефективність, зрозумілість при цьому приноситься в жертву. Обґрунтовується це тим, що векторна мова в будь-якому випадку буде складною для сприйняття, а стислість і насиченість запису дозволяє побачити на одному екрані набагато більшу частину програми, що в результаті полегшує її розуміння.

У статті ми реалізуємо повноцінну програму Q і вам, можливо, захочеться спробувати її у справі. Для цього вам знадобиться Q. Завантажити безкоштовну 32-бітну версію можна на сайті компанії kx – www.kx.com. Там же, якщо вам цікаво, ви знайдете довідкову інформацію про Q, книгу Q For Mortals та різноманітні статті на цю тему.

Постановка завдання

Є джерело, яке надсилає кожні 25 мілісекунд таблицю з даними. Оскільки KDB+ застосовується насамперед у фінансах, вважатимемо, що це таблиця угод (trades), в якій є такі колонки: time (час у мілісекундах), sym (позначення компанії на біржі – IBM, AAPL,…), price (ціна, за якою куплені акції), size (розмір угоди). Інтервал 25 мілісекунд вибраний довільно, він не надто маленький і не надто великий. Його наявність означає, що дані надходять до сервісу вже буферизовані. Можна було б легко реалізувати буферизацію на стороні сервісу, у тому числі динамічну, яка залежить від поточного навантаження, але для простоти зупинимося на фіксованому інтервалі.

Сервіс повинен вважати щохвилини для кожного вхідного символу з колонки sym набір функцій, що агрегують - max price, avg price, sum size і т.п. корисну інформацію. Для простоти ми припустимо, що це функції можна обчислювати инкрементально, тобто. для отримання нового значення достатньо знати два числа - старе та вхідне значення. Наприклад, функції max, average, sum мають цю властивість, а функція медіана немає.

Також ми припустимо, що вхідний потік даних упорядкований за часом. Це дасть нам можливість працювати лише з останньою хвилиною. На практиці достатньо вміти працювати з поточною та попередньою хвилинами на випадок, якщо якісь апдейти запізнилися. Для простоти ми не розглядатимемо цей випадок.

Агрегуючі функції

Нижче перераховані необхідні функції, що агрегують. Я взяв їх якнайбільше, щоб збільшити навантаження на сервіс:

  • high - max price - максимальна ціна за хвилину.
  • low – min price – мінімальна ціна за хвилину.
  • firstPrice – перша ціна за хвилину.
  • lastPrice – last price – остання ціна за хвилину.
  • firstSize – first size – перший розмір угоди за хвилину.
  • lastSize - last size - останній розмір угоди за хвилину.
  • numTrades – count i – кількість угод за хвилину.
  • volume - sum size - сума розмірів угод за хвилину.
  • pvolume – sum price – сума цін за хвилину, потрібна для avgPrice.
  • turnover - sum price * size - сумарний обсяг угод за хвилину.
  • avgPrice – pvolume%numTrades – середня ціна за хвилину.
  • avgSize – volume%numTrades – середній розмір угоди за хвилину.
  • vwap - turnover%volume - виважена за розміром угоди середня ціна за хвилину.
  • cumVolume – sum volume – накопичений розмір угод за весь час.

Відразу обговоримо один неочевидний момент - як ініціалізувати ці колонки вперше і кожної наступної хвилини. Деякі стовпчики типу firstPrice щоразу потрібно ініціалізувати значенням null, їх значення не визначено. Інші типи volume потрібно встановлювати завжди в 0. Ще є колонки, які вимагають комбінованого підходу – наприклад, cumVolume необхідно копіювати з попередньої хвилини, а для першої встановити в 0. Задамо всі ці параметри використовуючи тип даних словник (аналог запису):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Я додав sym і time у словник для зручності, тепер initWith – це готовий рядок із фінальної агрегованої таблиці, де залишилося задати правильні sym та time. Можна використовувати її для додавання нових рядків до таблиці.

aggCols нам знадобиться при створенні функції, що агрегує. Список необхідно інвертувати через особливості порядку обчислень виразів Q (справа ліворуч). Мета – забезпечити обчислення у напрямку від high до cumVolume, оскільки деякі колонки залежать від попередніх.

Колонки, які потрібно скопіювати в нову хвилину з попередньої, колонка sym додана для зручності:

rollColumns:`sym`cumVolume;

Тепер розділимо колонки на групи згідно з тим, як їх слід оновлювати. Можна виділити три типи:

  1. Акумулятори (volume, turnover,..) – ми повинні додати значення до попереднього.
  2. З особливою точкою (high, low, ..) – перше значення за хвилину береться з вхідних даних, інші вважаються з допомогою функції.
  3. Інші. Завжди рахуються за допомогою функції.

Визначимо змінні для цих класів:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Порядок обчислень

Оновлювати агреговану таблицю ми будемо у два етапи. Для ефективності ми спочатку утиснемо вхідну таблицю так, щоб там залишився один рядок для кожного символу та хвилини. Те, що всі наші функції є інкрементальними та асоціативними, гарантує нам, що результат від цього додаткового кроку не зміниться. Утиснути таблицю можна було б за допомогою селекту:

select high:max price, low:min price … by sym,time.minute from table

Цей спосіб має мінус – набір обчислюваних колонок заданий заздалегідь. На щастя, у Q селект реалізований і як функція, куди можна підставити динамічно створені аргументи:

?[table;whereClause;byClause;selectClause]

Я не буду докладно описувати формат аргументів, у нашому випадку нетривіальними будуть тільки by і select вирази і вони повинні бути словниками columns!expressions. Таким чином, стискаючу функцію можна задати так:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Для зрозумілості я використовував функцію parse, яка перетворює рядок з виразом Q на значення, яке може бути передано в функцію eval і яке потрібно в функціональному селекті. Також зазначимо, що preprocess задана як проекція (тобто функція з частково визначеними аргументами) функції селекту, один аргумент (таблиця) відсутня. Якщо ми застосуємо preprocess до таблиці, то отримаємо стислу таблицю.

Другий етап – це оновлення агрегованої таблиці. Напишемо спочатку алгоритм у псевдокоді:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q замість циклів прийнято використовувати функції map/reduce. Але оскільки Q - векторний мову і всі операції ми можемо спокійно застосовувати до всіх символів відразу, то в першому наближенні ми можемо обійтися взагалі без циклу, роблячи операції з усіма символами відразу:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Але ми можемо піти і далі, Q є унікальний і виключно потужний оператор - оператор узагальненого присвоювання. Він дозволяє змінити набір значень у складній структурі даних, використовуючи список індексів, функцій та аргументів. У нашому випадку він виглядає так:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

На жаль, для присвоєння таблицю потрібен список рядків, а чи не колонок, і доводиться транспонувати матрицю (список колонок список рядків) з допомогою функції flip. Для великої таблиці це накладно, тому натомість застосуємо узагальнене привласнення до кожної колонки окремо, використовуючи функцію map (яка виглядає як апостроф):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ми знову використовуємо проекцію функції. Також зауважте, що Q створення списку – це теж функція і ми можемо викликати її за допомогою функції each(map), щоб отримати список списків.

Щоб набір колонок, що обчислюються, не був фіксований, створимо вираз вище динамічно. Визначимо спочатку функції для обчислення кожної колонки, використовуючи змінні row та inp для посилання на агреговані та вхідні дані:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Деякі колонки особливі, їхнє перше значення не повинно обчислюватися функцією. Ми можемо визначити, що вона перша по колонці row[`numTrades] – якщо в ній 0, то значення перше. У Q є функція вибору - ?[Boolean list;list1;list2] - яка вибирає значення зі списку 1 або 2 залежно від умови в першому аргументі:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Тут я викликав узагальнене привласнення з моєю функцією (вираз у фігурних дужках). До неї передається поточне значення (перший аргумент) та додатковий аргумент, який я передаю у 4-му параметрі.

Окремо додамо акумуляторні колонки, оскільки для них функція одна і та ж:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Це звичайне за мірками Q привласнення, тільки надаю я відразу список значень. Нарешті створимо головну функцію:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Цим виразом я динамічно створюю функцію з рядка, який містить вираз, який я наводив вище. Результат виглядатиме так:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Порядок обчислення колонок інвертований, оскільки Q порядок обчислення справа наліво.

Тепер у нас є дві основні функції, необхідні для обчислень, залишилося додати трохи інфраструктури та сервіс готовий.

Фінальні кроки

У нас є функції preprocess і updateAgg, які роблять всю роботу. Але необхідно ще забезпечити правильний перехід через хвилини та обчислити індекси для агрегації. Насамперед визначимо функцію init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Також визначимо функцію roll, яка змінюватиме поточну хвилину:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Нам знадобиться функція для додавання нових символів:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

І, нарешті, функція upd (традиційна назва цієї функції для Q-сервісів), яка викликається клієнтом, для додавання даних:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

От і все. Ось повний код нашого сервісу, як і обіцялося, лише кілька рядків:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Тестування

Перевіримо продуктивність сервісу. Для цього запустимо його в окремому процесі (помістіть код у файл service.q) та викличте функцію init:

q service.q –p 5566

q)init[]

В іншій консолі запустіть другий Q процес і приєднайтеся до першого:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Спочатку створимо список символів – 10000 штук та додамо функцію для створення випадкової таблиці. У другій консолі:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Я додав до списку символів три справжніх, щоб було зручніше шукати їх у таблиці. Функція rnd створює випадкову таблицю з рядками n, де час змінюється від t до t+25 мілісекунд.

Тепер можна спробувати надіслати дані в сервіс (додамо перші десять годин):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Можна перевірити, що таблиця оновилася:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Результат:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Проведемо тепер тестування навантаження, щоб з'ясувати скільки даних сервіс може обробляти за хвилину. Нагадаю, що ми встановили інтервал для апдейтів у 25 мілісекунд. Відповідно, сервіс повинен (у середньому) укладатися хоча б у 20 мілісекунд на апдейт, щоб дати час користувачам запитати дані. Введіть наступне у другому процесі:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 – це дві хвилини. Можна спробувати запустити спочатку для 1000 рядків кожні 25 мілісекунд:

start 1000

У моєму випадку результат виходить у районі пари мілісекунд на апдейт. Так що я одразу збільшу кількість рядків до 10.000:

start 10000

Результат:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Знову нічого особливого, адже це 24 мільйони рядків на хвилину, 400 тисяч на секунду. Більше 25 мілісекунд апдейт гальмував лише 5 разів, мабуть при зміні хвилини. Збільшимо до 100.000:

start 100000

Результат:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Як бачимо, сервіс ледве справляється, проте йому вдається втриматися на плаву. Такий обсяг даних (240 мільйонів рядків на хвилину) надзвичайно великий, у разі прийнято запускати кілька клонів (чи навіть десятків клонів) сервісу, кожен із яких обробляє лише частина символів. Проте результат вражаючий для інтерпретованої мови, який орієнтований насамперед на зберігання даних.

Може виникнути питання, чому час росте нелінійно разом із розміром кожного апдейта. Причина в тому, що утискаюча функція – це фактично С функція, яка працює набагато ефективніше за updateAgg. Починаючи з якогось розміру апдейта (в районі 10.000), updateAgg досягає своєї стелі і далі її час виконання не залежить від розміру апдейту. Саме з допомогою попереднього кроку Q сервіс може перетравлювати такі обсяги даних. Це наголошує, наскільки важливо, працюючи з великими даними, вибирати правильний алгоритм. Ще один момент – правильне зберігання даних у пам'яті. Якби дані зберігалися не по-колоночно або не були впорядковані за часом, ми познайомилися б з такою річчю, як TLB cache miss – відсутність адреси сторінки пам'яті в кеші адрес процесора. Пошук адреси займає десь у 30 разів більше часу у разі невдачі та у разі розсіяних даних може уповільнити сервіс у кілька разів.

Висновок

У цій статті я показав, що база KDB+ і Q придатні не тільки для зберігання великих даних і простого доступу до них через селект, але й для створення сервісів обробки даних, які здатні перетравлювати сотні мільйонів рядків/гігабайти даних навіть в одному окремому Q процесі . Сама мова Q дозволяє виключно коротко та ефективно реалізовувати алгоритми, пов'язані з обробкою даних за рахунок своєї векторної природи, вбудованого інтерпретатора діалекту SQL та дуже вдалого набору бібліотечних функцій.

Я зауважу, що викладене вище це лише частина можливостей Q, у нього є й інші унікальні особливості. Наприклад, надзвичайно простий IPC протокол, який стирає кордон між окремими процесами Q і дозволяє об'єднувати сотні цих процесів в єдину мережу, яка може розташовуватися на десятках серверів в різних кінцях світу.

Джерело: habr.com

Додати коментар або відгук