Mga tampok ng wikang Q at KDB+ gamit ang halimbawa ng isang real-time na serbisyo

Maaari mong basahin ang tungkol sa kung ano ang KDB+ base, ang Q programming language, kung ano ang kanilang mga lakas at kahinaan sa aking nakaraang Artikulo at maikli sa pagpapakilala. Sa artikulo, ipapatupad namin ang isang serbisyo sa Q na magpoproseso ng papasok na stream ng data at magkalkula ng iba't ibang mga function ng pagsasama-sama bawat minuto sa mode na "real time" (ibig sabihin, magkakaroon ito ng oras upang kalkulahin ang lahat bago ang susunod na bahagi ng data). Ang pangunahing tampok ng Q ay ito ay isang wikang vector na nagbibigay-daan sa iyo upang gumana hindi sa mga solong bagay, ngunit sa kanilang mga array, array ng mga array at iba pang kumplikadong mga bagay. Ang mga wika tulad ng Q at ang mga kamag-anak nito na K, J, APL ay sikat sa kanilang kaiklian. Kadalasan, ang isang programa na kumukuha ng ilang mga screen ng code sa isang pamilyar na wika tulad ng Java ay maaaring isulat sa mga ito sa ilang linya. Ito ang gusto kong ipakita sa artikulong ito.

Mga tampok ng wikang Q at KDB+ gamit ang halimbawa ng isang real-time na serbisyo

Pagpapakilala

Ang KDB+ ay isang columnar database na nakatuon sa napakalaking dami ng data, na inayos sa isang partikular na paraan (pangunahin sa pamamagitan ng oras). Ito ay pangunahing ginagamit sa mga institusyong pinansyal - mga bangko, mga pondo sa pamumuhunan, mga kompanya ng seguro. Ang wikang Q ay ang panloob na wika ng KDB+ na nagbibigay-daan sa iyong epektibong magtrabaho sa data na ito. Ang Q ideology ay kaiklian at kahusayan, habang ang kalinawan ay isinakripisyo. Ito ay nabibigyang katwiran sa pamamagitan ng katotohanan na ang wikang vector ay magiging mahirap na maunawaan sa anumang kaso, at ang kaiklian at kayamanan ng pag-record ay nagbibigay-daan sa iyo upang makita ang isang mas malaking bahagi ng programa sa isang screen, na sa huli ay ginagawang mas madaling maunawaan.

Sa artikulong ito ay nagpapatupad kami ng isang ganap na programa sa Q at maaaring gusto mong subukan ito. Upang gawin ito, kakailanganin mo ang aktwal na Q. Maaari mong i-download ang libreng 32-bit na bersyon sa website ng kumpanya ng kx – www.kx.com. Doon, kung interesado ka, makakahanap ka ng reference na impormasyon sa Q, ang libro Q Para sa mga Mortal at iba't ibang artikulo sa paksang ito.

Pahayag ng problema

May source na nagpapadala ng talahanayan na may data bawat 25 millisecond. Dahil ang KDB+ ay pangunahing ginagamit sa pananalapi, ipagpalagay namin na ito ay isang talahanayan ng mga transaksyon (trades), na mayroong mga sumusunod na column: oras (oras sa millisecond), sym (pagtatalaga ng kumpanya sa stock exchange - IBM, AAPL,…), presyo (ang presyo kung saan binili ang mga pagbabahagi), laki (laki ng transaksyon). Ang 25 millisecond interval ay arbitrary, hindi masyadong maliit at hindi masyadong mahaba. Ang presensya nito ay nangangahulugan na ang data ay dumarating sa serbisyo na naka-buffer na. Madaling ipatupad ang buffering sa bahagi ng serbisyo, kabilang ang dynamic na buffering depende sa kasalukuyang pagkarga, ngunit para sa pagiging simple, tututuon tayo sa isang nakapirming agwat.

Ang serbisyo ay dapat magbilang bawat minuto para sa bawat papasok na simbolo mula sa sym column ng isang hanay ng mga pinagsama-samang function - max na presyo, avg na presyo, sum size, atbp. kapaki-pakinabang na impormasyon. Para sa pagiging simple, ipagpalagay namin na ang lahat ng mga function ay maaaring kalkulahin nang paunti-unti, i.e. upang makakuha ng bagong halaga, sapat na malaman ang dalawang numero - ang luma at ang mga papasok na halaga. Halimbawa, ang mga function na max, average, sum ay may ganitong katangian, ngunit ang median function ay wala.

Ipagpalagay din namin na ang papasok na stream ng data ay naayos ng oras. Bibigyan tayo nito ng pagkakataong magtrabaho lamang sa huling minuto. Sa pagsasagawa, sapat na upang makapagtrabaho sa kasalukuyan at nakaraang mga minuto kung sakaling huli ang ilang mga pag-update. Para sa pagiging simple, hindi namin isasaalang-alang ang kasong ito.

Mga function ng pagsasama-sama

Ang mga kinakailangang aggregation function ay nakalista sa ibaba. Kinuha ko ang marami sa kanila hangga't maaari upang madagdagan ang pagkarga sa serbisyo:

  • mataas – max na presyo – maximum na presyo kada minuto.
  • mababa – min na presyo – pinakamababang presyo kada minuto.
  • unang Presyo – unang presyo – unang presyo kada minuto.
  • huling Presyo – huling presyo – huling presyo kada minuto.
  • firstSize – unang laki – unang laki ng kalakalan kada minuto.
  • lastSize - huling laki - huling laki ng kalakalan sa isang minuto.
  • numTrades – bilang i – bilang ng mga trade kada minuto.
  • dami – laki ng kabuuan – kabuuan ng mga laki ng kalakalan kada minuto.
  • pvolume – kabuuan ng presyo – kabuuan ng mga presyo kada minuto, kinakailangan para sa avgPrice.
  • – sum turnover price*size – kabuuang dami ng mga transaksyon kada minuto.
  • avgPrice – pvolume%numTrades – average na presyo kada minuto.
  • avgSize – volume%numTrades – average na laki ng kalakalan kada minuto.
  • vwap – turnover%volume – average na presyo kada minuto na natimbang ayon sa laki ng transaksyon.
  • cumVolume – sum volume – naipon na laki ng mga transaksyon sa buong panahon.

Agad nating talakayin ang isang di-halatang punto - kung paano simulan ang mga column na ito sa unang pagkakataon at para sa bawat susunod na minuto. Ang ilang mga column ng uri ng firstPrice ay dapat masimulan sa null sa bawat oras; ang kanilang halaga ay hindi natukoy. Ang iba pang mga uri ng volume ay dapat palaging nakatakda sa 0. Mayroon ding mga column na nangangailangan ng pinagsamang diskarte - halimbawa, ang cumVolume ay dapat makopya mula sa nakaraang minuto, at para sa unang isa ay itakda sa 0. Itakda natin ang lahat ng mga parameter na ito gamit ang data ng diksyunaryo uri (katulad sa isang talaan):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Nagdagdag ako ng sym at oras sa diksyunaryo para sa kaginhawahan, ngayon ang initWith ay isang handa na linya mula sa huling pinagsama-samang talahanayan, kung saan nananatili itong itakda ang tamang sym at oras. Magagamit mo ito upang magdagdag ng mga bagong row sa isang talahanayan.

Kakailanganin namin ang aggCols kapag gumagawa ng aggregation function. Dapat baligtarin ang listahan dahil sa pagkakasunud-sunod kung saan sinusuri ang mga expression sa Q (mula kanan pakaliwa). Ang layunin ay upang matiyak na ang pagkalkula ay napupunta mula sa mataas hanggang sa cumVolume, dahil ang ilang mga haligi ay nakadepende sa mga nauna.

Mga column na kailangang kopyahin sa isang bagong minuto mula sa nauna, idinaragdag ang column ng sym para sa kaginhawahan:

rollColumns:`sym`cumVolume;

Ngayon, hatiin natin ang mga column sa mga pangkat ayon sa kung paano sila dapat i-update. Tatlong uri ay maaaring makilala:

  1. Accumulators (volume, turnover,..) - dapat nating idagdag ang papasok na halaga sa nauna.
  2. Sa isang espesyal na punto (mataas, mababa, ..) - ang unang halaga sa minuto ay kinuha mula sa papasok na data, ang natitira ay kinakalkula gamit ang function.
  3. Pahinga. Palaging kalkulahin gamit ang isang function.

Tukuyin natin ang mga variable para sa mga klase na ito:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Pagkakasunud-sunod ng pagkalkula

I-update namin ang pinagsama-samang talahanayan sa dalawang yugto. Para sa kahusayan, pinaliit muna namin ang papasok na talahanayan upang mayroon lamang isang hilera para sa bawat karakter at minuto. Ang katotohanan na ang lahat ng aming mga function ay incremental at nauugnay na mga garantiya na ang resulta ng karagdagang hakbang na ito ay hindi magbabago. Maaari mong paliitin ang talahanayan gamit ang piliin:

select high:max price, low:min price … by sym,time.minute from table

Ang pamamaraang ito ay may isang kawalan - ang hanay ng mga kinakalkula na mga haligi ay paunang natukoy. Sa kabutihang palad, sa Q, ang piliin ay ipinatupad din bilang isang function kung saan maaari mong palitan ang mga dynamic na nilikhang argumento:

?[table;whereClause;byClause;selectClause]

Hindi ko ilalarawan nang detalyado ang format ng mga argumento; sa aming kaso, sa pamamagitan lamang ng at piling mga expression ay hindi mahalaga at dapat silang mga diksyunaryo ng mga form na column!expression. Kaya, ang pag-urong function ay maaaring tukuyin bilang mga sumusunod:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Para sa kalinawan, ginamit ko ang parse function, na ginagawang value ang string na may Q expression na maaaring ipasa sa eval function at kinakailangan sa function na piliin. Tandaan din na ang preprocess ay tinukoy bilang isang projection (ibig sabihin, isang function na may bahagyang tinukoy na mga argumento) ng piling function, isang argumento (ang talahanayan) ang nawawala. Kung ilalapat namin ang preprocess sa isang talahanayan, makakakuha kami ng isang naka-compress na talahanayan.

Ang ikalawang yugto ay ang pag-update ng pinagsama-samang talahanayan. Isulat muna natin ang algorithm sa pseudocode:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Sa Q, karaniwan nang gumamit ng mapa/pagbawas ng mga function sa halip na mga loop. Ngunit dahil ang Q ay isang wikang vector at madali nating mailalapat ang lahat ng mga operasyon sa lahat ng mga simbolo nang sabay-sabay, kung gayon sa isang unang pagtatantya ay magagawa natin nang walang loop, na gumaganap ng mga operasyon sa lahat ng mga simbolo nang sabay-sabay:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Ngunit maaari tayong magpatuloy, ang Q ay may natatangi at napakalakas na operator - ang generalised assignment operator. Pinapayagan ka nitong baguhin ang isang hanay ng mga halaga sa isang kumplikadong istraktura ng data gamit ang isang listahan ng mga indeks, function at argumento. Sa aming kaso, ganito ang hitsura:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Sa kasamaang palad, upang magtalaga sa isang talahanayan kailangan mo ng isang listahan ng mga hilera, hindi mga haligi, at kailangan mong i-transpose ang matrix (listahan ng mga hanay sa listahan ng mga hilera) gamit ang flip function. Ito ay mahal para sa isang malaking talahanayan, kaya sa halip ay naglalapat kami ng pangkalahatang takdang-aralin sa bawat column nang hiwalay, gamit ang function ng mapa (na mukhang apostrophe):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Muli naming ginagamit ang function projection. Tandaan din na sa Q, ang paglikha ng isang listahan ay isa ring function at maaari nating tawagan ito gamit ang bawat(mapa) function upang makakuha ng listahan ng mga listahan.

Upang matiyak na ang hanay ng mga kalkuladong column ay hindi maayos, gagawin namin ang expression sa itaas nang pabago-bago. Tukuyin muna natin ang mga function upang kalkulahin ang bawat column, gamit ang row at inp na mga variable upang sumangguni sa pinagsama-sama at input na data:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Espesyal ang ilang column; hindi dapat kalkulahin ng function ang unang halaga ng mga ito. Matutukoy namin na ito ang una sa pamamagitan ng hanay ng row[`numTrades] - kung naglalaman ito ng 0, ang value ang una. Ang Q ay may piling function - ?[Boolean list;list1;list2] - na pumipili ng value mula sa listahan 1 o 2 depende sa kundisyon sa unang argumento:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Dito ay tumawag ako ng isang pangkalahatang takdang-aralin sa aking pag-andar (isang expression sa mga kulot na braces). Natatanggap nito ang kasalukuyang halaga (ang unang argumento) at isang karagdagang argumento, na ipinapasa ko sa ika-4 na parameter.

Idagdag natin ang mga speaker ng baterya nang hiwalay, dahil pareho ang function para sa kanila:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Ito ay isang normal na pagtatalaga ayon sa mga pamantayan ng Q, ngunit nagtatalaga ako ng isang listahan ng mga halaga nang sabay-sabay. Sa wakas, likhain natin ang pangunahing pag-andar:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Gamit ang expression na ito, dynamic akong lumikha ng isang function mula sa isang string na naglalaman ng expression na ibinigay ko sa itaas. Magiging ganito ang resulta:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Ang pagkakasunud-sunod ng pagsusuri sa hanay ay baligtad dahil sa Q ang pagkakasunud-sunod ng pagsusuri ay mula kanan pakaliwa.

Ngayon mayroon kaming dalawang pangunahing pag-andar na kinakailangan para sa mga kalkulasyon, kailangan lang naming magdagdag ng kaunting imprastraktura at handa na ang serbisyo.

Mga huling hakbang

Mayroon kaming preprocess at updateAgg function na gumagawa ng lahat ng gawain. Ngunit kailangan pa ring tiyakin ang tamang paglipat sa mga minuto at kalkulahin ang mga index para sa pagsasama-sama. Una sa lahat, tukuyin natin ang init function:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Tutukuyin din namin ang function ng roll, na magbabago sa kasalukuyang minuto:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Kakailanganin namin ang isang function upang magdagdag ng mga bagong character:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

At sa wakas, ang upd function (ang tradisyunal na pangalan para sa function na ito para sa mga serbisyo ng Q), na tinatawag ng kliyente upang magdagdag ng data:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Iyon lang. Narito ang kumpletong code ng aming serbisyo, tulad ng ipinangako, ilang linya lamang:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Pagsubok

Suriin natin ang pagganap ng serbisyo. Upang gawin ito, patakbuhin natin ito sa isang hiwalay na proseso (ilagay ang code sa service.q file) at tawagan ang init function:

q service.q –p 5566

q)init[]

Sa isa pang console, simulan ang pangalawang proseso ng Q at kumonekta sa una:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Una, gumawa tayo ng isang listahan ng mga simbolo - 10000 piraso at magdagdag ng isang function upang lumikha ng isang random na talahanayan. Sa pangalawang console:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Nagdagdag ako ng tatlong totoong simbolo sa listahan para mas madaling hanapin ang mga ito sa talahanayan. Ang rnd function ay lumilikha ng random na talahanayan na may n row, kung saan ang oras ay nag-iiba mula t hanggang t+25 millisecond.

Ngayon ay maaari mong subukang magpadala ng data sa serbisyo (idagdag ang unang sampung oras):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Maaari mong suriin sa serbisyo kung na-update ang talahanayan:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Resulta:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Magsagawa tayo ngayon ng pagsubok sa pag-load upang malaman kung gaano karaming data ang maaaring iproseso ng serbisyo bawat minuto. Ipaalala ko sa iyo na itinakda namin ang agwat ng pag-update sa 25 millisecond. Alinsunod dito, ang serbisyo ay dapat (sa karaniwan) na magkasya sa hindi bababa sa 20 millisecond bawat update upang bigyan ang mga user ng oras na humiling ng data. Ipasok ang sumusunod sa pangalawang proseso:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

Ang 4800 ay dalawang minuto. Maaari mong subukang tumakbo muna para sa 1000 row bawat 25 millisecond:

start 1000

Sa aking kaso, ang resulta ay humigit-kumulang ilang millisecond bawat update. Kaya agad kong dadagdagan ang bilang ng mga row sa 10.000:

start 10000

Resulta:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Muli, walang espesyal, ngunit ito ay 24 milyong linya bawat minuto, 400 libo bawat segundo. Para sa higit sa 25 millisecond, bumagal lang ang pag-update nang 5 beses, tila noong nagbago ang minuto. Taasan natin sa 100.000:

start 100000

Resulta:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Tulad ng nakikita mo, ang serbisyo ay halos hindi makayanan, ngunit gayunpaman ito ay namamahala upang manatiling nakalutang. Ang nasabing dami ng data (240 milyong mga hilera kada minuto) ay napakalaki; sa mga ganitong kaso, karaniwan nang maglunsad ng ilang mga clone (o kahit dose-dosenang mga clone) ng serbisyo, na ang bawat isa ay nagpoproseso lamang ng bahagi ng mga character. Gayunpaman, ang resulta ay kahanga-hanga para sa isang binibigyang kahulugan na wika na pangunahing nakatuon sa pag-iimbak ng data.

Maaaring lumitaw ang tanong kung bakit lumalaki ang oras nang hindi linear sa laki ng bawat pag-update. Ang dahilan ay ang pag-urong function ay talagang isang C function, na kung saan ay mas mahusay kaysa sa updateAgg. Simula sa isang tiyak na laki ng pag-update (sa paligid ng 10.000), ang updateAgg ay umabot sa kisame nito at pagkatapos ay ang oras ng pagpapatupad nito ay hindi nakadepende sa laki ng pag-update. Ito ay dahil sa paunang hakbang na Q na ang serbisyo ay nakakapag-digest ng mga naturang volume ng data. Itinatampok nito kung gaano kahalaga ang piliin ang tamang algorithm kapag nagtatrabaho sa malaking data. Ang isa pang punto ay ang tamang pag-iimbak ng data sa memorya. Kung ang data ay hindi naka-imbak sa columnarly o hindi inutusan ng oras, magiging pamilyar tayo sa bagay na tulad ng isang TLB cache miss - ang kawalan ng isang address ng memory page sa cache ng address ng processor. Ang paghahanap para sa isang address ay tumatagal ng humigit-kumulang 30 beses na mas matagal kung hindi matagumpay, at kung ang data ay nakakalat, maaari nitong pabagalin ang serbisyo nang maraming beses.

Konklusyon

Sa artikulong ito, ipinakita ko na ang KDB+ at Q database ay angkop hindi lamang para sa pag-iimbak ng malaking data at madaling pag-access dito sa pamamagitan ng piling, ngunit para din sa paglikha ng mga serbisyo sa pagpoproseso ng data na may kakayahang mag-digest ng daan-daang milyong row/gigabytes ng data kahit na sa isang solong proseso ng Q. Ang wikang Q mismo ay nagbibigay-daan para sa lubos na maigsi at mahusay na pagpapatupad ng mga algorithm na nauugnay sa pagpoproseso ng data dahil sa kalikasan ng vector nito, built-in na SQL dialect interpreter at isang napakatagumpay na hanay ng mga function ng library.

Mapapansin ko na ang nasa itaas ay bahagi lamang ng kung ano ang magagawa ni Q, mayroon din itong iba pang mga natatanging tampok. Halimbawa, isang napakasimpleng protocol ng IPC na nagbubura sa hangganan sa pagitan ng mga indibidwal na proseso ng Q at nagbibigay-daan sa iyong pagsamahin ang daan-daang mga prosesong ito sa isang network, na maaaring matatagpuan sa dose-dosenang mga server sa iba't ibang bahagi ng mundo.

Pinagmulan: www.habr.com

Magdagdag ng komento