Q eta KDB+ hizkuntzaren ezaugarriak denbora errealeko zerbitzu baten adibidea erabiliz

KDB+ oinarria, Q programazio-lengoaia, zein indargune eta ahulgune diren irakur dezakezu nire aurreko Artikulu eta labur-labur sarreran. Artikuluan, Q-n sarrerako datu-fluxua prozesatu eta minuturo agregazio-funtzio ezberdinak kalkulatuko dituen zerbitzu bat ezarriko dugu "denbora errealean" moduan (hau da, hurrengo datu-zatiaren aurretik guztia kalkulatzeko denbora izango du). Q-ren ezaugarri nagusia lengoaia bektoriala dela da, eta ez objektu bakarrekin funtzionatzea ahalbidetzen duena, baizik eta haien array, array eta bestelako objektu konplexuekin. Q eta bere erlatiboak K, J, APL bezalako hizkuntzak ezagunak dira beren laburtasunagatik. Askotan, Java bezalako hizkuntza ezagun batean kode-pantaila batzuk hartzen dituen programa bat lerro gutxitan idatz daiteke. Hau da artikulu honetan erakutsi nahi dudana.

Q eta KDB+ hizkuntzaren ezaugarriak denbora errealeko zerbitzu baten adibidea erabiliz

Sarrera

KDB+ datu-kantitate oso handietara bideratutako zutabe-datu-base bat da, modu zehatz batean ordenatuta (denboraren arabera batez ere). Batez ere finantza-erakundeetan erabiltzen da - bankuetan, inbertsio-funtsetan, aseguru-etxeetan. Q hizkuntza KDB+-ren barne hizkuntza da, datu hauekin modu eraginkorrean lan egiteko aukera ematen duena. Q ideologia laburtasuna eta eraginkortasuna da, argitasuna sakrifikatzen den bitartean. Honen bidez justifikatzen da bektore-lengoaia ulertzea zaila izango dela edozein kasutan, eta grabazioaren laburtasunari eta aberastasunari esker, programaren zati askoz handiagoa pantaila batean ikusteko aukera ematen du, eta horrek, azken finean, errazago ulertzen du.

Artikulu honetan Q-en osoko programa bat inplementatzen dugu eta baliteke probatu nahi izatea. Horretarako, benetako Q-a beharko duzu. Doako 32 biteko bertsioa deskarga dezakezu kx enpresaren webgunean - www.kx.com. Bertan, interesa izanez gero, Q liburuari buruzko erreferentziazko informazioa aurkituko duzu Q Hilkorrentzat eta gai honi buruzko hainbat artikulu.

Arazoaren formulazioa

Badago iturri bat 25 milisegundotik behin datuekin taula bat bidaltzen duena. KDB+ finantzaketan batez ere erabiltzen denez, hau transakzioen taula bat dela suposatuko dugu, zutabe hauek dituena: denbora (denbora milisegundotan), sym (burtsan enpresaren izendapena - IBM, AAPL,…), prezioa (akzioak erosi ziren prezioa), tamaina (eragiketaren tamaina). 25 milisegundoko tartea arbitrarioa da, ez txikiegia eta ez luzeegia. Bere presentziak esan nahi du datuak dagoeneko buffered dagoen zerbitzura iristen direla. Erraza izango litzateke buffering-a ezartzea zerbitzuaren aldetik, uneko kargaren araberako buffer dinamikoa barne, baina sinpletasunerako, tarte finko batean zentratuko gara.

Zerbitzuak minutu bakoitza zenbatu behar du sym zutabetik sartzen den ikur bakoitzeko funtzio multzo bat - gehienezko prezioa, batez besteko prezioa, batura-tamaina, etab. informazio baliagarria. Sinpletasunerako, funtzio guztiak modu inkrementalean kalkula daitezkeela suposatuko dugu, hau da. balio berri bat lortzeko, nahikoa da bi zenbaki ezagutzea: zaharrak eta sarrerako balioak. Adibidez, max, batez bestekoa, batura funtzioek propietate hau dute, baina mediana funtzioak ez.

Sarrerako datu-korrontea denbora ordenatuta dagoela ere hartuko dugu. Honek azken momentuan bakarrik lan egiteko aukera emango digu. Praktikan, nahikoa da uneko eta aurreko minutuekin lan egin ahal izatea, eguneraketa batzuk berandu badaude. Sinpletasunagatik, ez dugu kasu hau kontuan hartuko.

Agregazio-funtzioak

Beharrezko agregazio-funtzioak behean zerrendatzen dira. Ahalik eta gehien hartu ditut zerbitzuaren karga handitzeko:

  • altua – prezio maximoa – minutuko gehienezko prezioa.
  • baxua – prezio minimoa – minutuko gutxieneko prezioa.
  • firstPrice – lehen prezioa – lehenengo prezioa minutuko.
  • lastPrice – azken prezioa – azken prezioa minutuko.
  • firstSize - lehen tamaina - lehenengo merkataritza-tamaina minutuko.
  • lastSize - azken tamaina - azken merkataritza-tamaina minutu batean.
  • numTrades – count i – minutuko salerosketa kopurua.
  • bolumena - batura tamaina - minutuko merkataritza-tamainen batura.
  • pvolume - sum price - minutuko prezioen batura, batez besteko Prezioa egiteko beharrezkoa.
  • – fakturazioaren prezioa*tamaina – minutuko transakzioen bolumen osoa.
  • avgPrice – pvolume%numTrades – minutuko batez besteko prezioa.
  • avgSize - bolumena%numTrades - minutuko merkataritzaren batez besteko tamaina.
  • vwap - fakturazioaren% bolumena - transakzio tamainaren arabera haztatuta minutu bakoitzeko batez besteko prezioa.
  • cumVolume - batura bolumena - denbora osoan zehar transakzioen tamaina metatua.

Goazen berehala eztabaidatu begi bistakoa ez den puntu bat: zutabe hauek lehen aldiz eta hurrengo minutu bakoitzeko nola hasieratu. FirstPrice motako zutabe batzuk hasieratu behar dira aldi bakoitzean null gisa; haien balioa zehaztu gabe dago. Beste bolumen-motak 0-an ezarri behar dira beti. Ikuspegi konbinatua behar duten zutabeak ere badaude - adibidez, cumVolume aurreko minututik kopiatu behar da, eta lehenengorako 0-n ezarri. Ezar ditzagun parametro horiek guztiak hiztegiko datuak erabiliz. mota (erregistro baten antzekoa):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Sym eta denbora gehitu nizkion hiztegiari erosotasunerako, orain initWith azken taula agregatutik prestatutako lerroa da, non sym eta ordu egokia ezartzeko geratzen den. Taula bati errenkada berriak gehitzeko erabil dezakezu.

AggCols beharko dugu agregazio funtzio bat sortzerakoan. Zerrenda alderantzikatu egin behar da Q-ko adierazpenak ebaluatzen diren ordenagatik (eskuinetik ezkerrera). Helburua da kalkulua handitik CumVolumera doazela ziurtatzea, zutabe batzuk aurrekoen araberakoak baitira.

Aurrekoaren minutu berri batean kopiatu behar diren zutabeak, sym zutabea gehitzen da erosotasunerako:

rollColumns:`sym`cumVolume;

Orain zati ditzagun zutabeak taldetan nola eguneratu behar diren arabera. Hiru mota bereiz daitezke:

  1. Metatzaileak (bolumena, fakturazioa,..) – sarrerako balioa aurrekoari gehitu behar diogu.
  2. Puntu berezi batekin (altua, baxua, ..) – minutuko lehen balioa sarrerako datuetatik hartzen da, gainerakoak funtzioa erabiliz kalkulatzen dira.
  3. Atsedena. Funtzio baten bidez kalkulatzen da beti.

Defini ditzagun klase hauetarako aldagaiak:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Kalkulu-ordena

Taula agregatua bi fasetan eguneratuko dugu. Eraginkortasuna lortzeko, lehenengo sarrerako taula txikitzen dugu, karaktere eta minutu bakoitzeko errenkada bakarra egon dadin. Gure funtzio guztiak inkrementalak eta elkartuak izateak bermatzen du urrats gehigarri honen emaitza ez dela aldatuko. Taula txikitu dezakezu hautatu hau erabiliz:

select high:max price, low:min price … by sym,time.minute from table

Metodo honek desabantaila bat du - kalkulatutako zutabeen multzoa aurrez definituta dago. Zorionez, Q-en, hautatu ere dinamikoki sortutako argumentuak ordezka ditzakezun funtzio gisa inplementatzen da:

?[table;whereClause;byClause;selectClause]

Ez dut zehatz-mehatz deskribatuko argumentuen formatua; gure kasuan, by eta select esamoldeak bakarrik ez dira hutsalak izango eta formako zutabeen!esamoldeen hiztegiak izan behar dira. Beraz, murrizketa funtzioa honela defini daiteke:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Argitasunerako, parse funtzioa erabili dut, Q adierazpena duen kate bat eval funtziora pasa daitekeen eta funtzio hautatzeko beharrezkoa den balio batean bihurtzen duena. Kontuan izan, halaber, aurreprozesua hautatzeko funtzioaren proiekzio gisa (hau da, partzialki definitutako argumentuak dituen funtzio bat) definitzen dela, argumentu bat (taula) falta dela. Taula bati aurreprozesua aplikatzen badiogu, taula konprimitua lortuko dugu.

Bigarren fasea taula agregatua eguneratzea da. Idatz dezagun lehenik algoritmoa pseudokodean:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q-n, ohikoa da mapa/murrizketa funtzioak erabiltzea begizten ordez. Baina Q lengoaia bektoriala denez eta eragiketa guztiak aldi berean ikur guztiei erraz aplika diezazkiekegunez, lehenengo hurbilketa batean begiztarik gabe egin dezakegu, ikur guztietan eragiketak aldi berean eginez:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Baina harago joan gaitezke, Q-k operadore bakarra eta oso indartsua du - esleipen-operadore orokortua. Datu-egitura konplexu batean balio multzo bat aldatzeko aukera ematen du indize, funtzio eta argumentuen zerrenda erabiliz. Gure kasuan honelakoa da:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Zoritxarrez, taula bati esleitzeko errenkaden zerrenda bat behar duzu, ez zutabeak, eta matrizea (zutabeen zerrenda errenkaden zerrendara) transposatu behar duzu irauli funtzioa erabiliz. Taula handi baterako garestia da, beraz, zutabe bakoitzari esleipen orokor bat aplikatzen diogu bereizita, mapa funtzioa erabiliz (apostrofo baten itxura duena):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Berriz ere funtzioen proiekzioa erabiltzen dugu. Kontuan izan, gainera, Q-n, zerrenda bat sortzea funtzio bat ere badela eta bakoitza (mapa) funtzioa erabiliz dei dezakegula zerrenden zerrenda bat lortzeko.

Kalkulatutako zutabeen multzoa finkoa ez dela ziurtatzeko, goiko adierazpena dinamikoki sortuko dugu. Lehenik eta behin, defini ditzagun zutabe bakoitza kalkulatzeko funtzioak, errenkada eta inp aldagaiak erabiliz, gehitutako eta sarrerako datuei erreferentzia egiteko:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Zutabe batzuk bereziak dira; haien lehen balioa ez du funtzioak kalkulatu behar. Lehena dela zehaztu dezakegu errenkada [`numTrades] zutabearen bidez - 0 badu, balioa lehena da. Q-k hautatze-funtzioa du - ?[Boolean list;list1;list2] - 1 edo 2 zerrendako balio bat hautatzen duena, lehen argumentuko baldintzaren arabera:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Hemen esleipen orokor bati deitu nion nire funtzioarekin (giltza kizkurdun adierazpena). Uneko balioa (lehen argumentua) eta argumentu gehigarri bat jasotzen ditu, 4. parametroan pasatzen dudana.

Gehi ditzagun bateriaren bozgorailuak bereizita, funtzioa bera baita beraientzat:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Q estandarren esleipen normala da, baina balio zerrenda bat esleitzen ari naiz aldi berean. Azkenik, sor dezagun funtzio nagusia:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Adierazpen honekin, goian eman dudan adierazpena duen kate batetik funtzio bat sortzen dut dinamikoki. Emaitza honela izango da:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Zutabeen ebaluazio-ordena alderantzikatu egiten da Q-n ebaluazio-ordena eskuinetik ezkerrera delako.

Orain kalkuluetarako beharrezkoak diren bi funtzio nagusi ditugu, azpiegitura apur bat gehitu besterik ez dugu egin behar eta zerbitzua prest dago.

Azken urratsak

Lan guztia egiten duten aurreprozesatu eta updateAgg funtzioak ditugu. Baina oraindik beharrezkoa da minutuen bidez trantsizio zuzena bermatzea eta agregaziorako indizeak kalkulatzea. Lehenik eta behin, defini dezagun init funtzioa:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Roll funtzioa ere definituko dugu, uneko minutua aldatuko duena:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Karaktere berriak gehitzeko funtzio bat beharko dugu:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Eta azkenik, upd funtzioa (Q zerbitzuetarako funtzio honen izen tradizionala), bezeroak datuak gehitzeko deitzen duena:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Hori da dena. Hona hemen gure zerbitzuaren kodea osoa, agindu bezala, lerro batzuk besterik ez:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Testing

Ikus dezagun zerbitzuaren errendimendua. Horretarako, exekutatu dezagun prozesu bereizi batean (jarri kodea service.q fitxategian) eta deitu init funtzioari:

q service.q –p 5566

q)init[]

Beste kontsola batean, hasi bigarren Q prozesua eta konektatu lehenengora:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Lehenik eta behin, sor ditzagun sinboloen zerrenda - 10000 pieza eta gehitu funtzio bat ausazko taula bat sortzeko. Bigarren kontsolan:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Zerrendari hiru sinbolo erreal gehitu dizkiot, taulan errazago bilatzeko. rnd funtzioak ausazko taula bat sortzen du n errenkadarekin, non denbora t-tik t+25 milisegundora aldatzen den.

Orain saiatu zaitezke datuak bidaltzen zerbitzura (gehitu lehen hamar orduak):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Zerbitzuan egiaztatu dezakezu taula eguneratu dela:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Emaitza:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Egin ditzagun orain karga-probak zerbitzuak minutuko zenbat datu prozesatu ditzakeen jakiteko. Gogorarazten dizut eguneratze-tartea 25 milisegundoan ezarri dugula. Horren arabera, zerbitzuak (batez beste) gutxienez 20 milisegundotan sartu behar du eguneratze bakoitzeko erabiltzaileei datuak eskatzeko denbora emateko. Idatzi honako hau bigarren prozesuan:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 bi minutu dira. Saia zaitezke lehenik 1000 errenkadetan exekutatzen 25 milisegundotik behin:

start 1000

Nire kasuan, emaitza eguneratze bakoitzeko milisegundo pare bat ingurukoa da. Beraz, berehala igoko dut errenkada kopurua 10.000ra:

start 10000

Emaitza:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Berriz ere, ezer berezirik ez, baina hau da 24 milioi lerro minutuko, 400 mila segundoko. 25 milisegundo baino gehiagoz, eguneratzea 5 aldiz bakarrik moteldu zen, itxuraz minutua aldatzean. Handi gaitezen 100.000ra:

start 100000

Emaitza:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Ikusten duzunez, zerbitzuak apenas aurre egin dezake, baina, hala ere, urpean mantentzea lortzen du. Horrelako datu-bolumena (240 milioi errenkada minutuko) izugarri handia da; halakoetan, ohikoa da zerbitzuaren hainbat klon (edo dozenaka klon) abiarazteko, eta bakoitzak karaktereen zati bat soilik prozesatzen du. Hala ere, emaitza ikusgarria da, batez ere, datuak biltegiratzean zentratzen den hizkuntza interpretatu baterako.

Galdera sor daiteke zergatik hazten den denbora ez-linealki eguneratze bakoitzaren tamainarekin. Arrazoia da txikitzeko funtzioa benetan C funtzio bat dela, updateAgg baino askoz eraginkorragoa dena. Eguneratze-tamaina jakin batetik abiatuta (10.000 inguru), updateAgg bere sabaira iristen da eta orduan bere exekuzio-denbora ez da eguneratzearen tamainaren araberakoa. Q aurretiazko urratsari esker zerbitzuak horrelako datu-bolumenak digeritzeko gai da. Horrek nabarmentzen du zein garrantzitsua den algoritmo egokia aukeratzea big datarekin lan egitean. Beste puntu bat datuak memorian gordetzea da. Datuak zutabeetan gordetzen ez balira edo denboraren arabera ordenatuta egongo ez balira, TLB cache hutsa bezalako gauza bat ezagutuko genuke - prozesadorearen helbidearen cachean memoria orri-helbiderik ez izatea. Helbide bat bilatzeko 30 aldiz gehiago behar da arrakastarik ezean, eta datuak sakabanatuta badaude, zerbitzua hainbat aldiz moteldu daiteke.

Ondorioa

Artikulu honetan, erakutsi nuen KDB+ eta Q datu-baseak egokiak direla datu handiak gordetzeko eta hautaketaren bidez erraz sartzeko, baizik eta ehunka milioi errenkada/gigabyte datu digeritzeko gai diren datuak prozesatzeko zerbitzuak sortzeko ere. Q prozesu bakar bat. Q lengoaiak berak datuen prozesamenduarekin erlazionatutako algoritmoen ezarpen oso zehatza eta eraginkorra ahalbidetzen du, bere izaera bektoriala, SQL dialekto interpretatzailea integratua eta liburutegiko funtzio multzo oso arrakastatsua direla eta.

Kontuan izango dut goikoa Q-k egin dezakeenaren zati bat dela, beste ezaugarri berezi batzuk ere badituela. Esate baterako, IPC protokolo oso sinplea, Q prozesu indibidualen arteko muga ezabatzen duena eta prozesu horietako ehunka sare bakar batean konbinatzeko aukera ematen duena, munduko hainbat lekutako dozenaka zerbitzaritan egon daitekeena.

Iturria: www.habr.com

Gehitu iruzkin berria