Jy kan lees oor wat die KDB+-basis, die Q-programmeertaal is, wat hul sterk- en swakpunte is in my vorige
Inleiding
KDB+ is 'n kolomdatabasis wat op baie groot hoeveelhede data gefokus is, op 'n spesifieke manier (hoofsaaklik volgens tyd) georden. Dit word hoofsaaklik in finansiΓ«le instellings gebruik - banke, beleggingsfondse, versekeringsmaatskappye. Die Q-taal is die interne taal van KDB+ wat jou toelaat om effektief met hierdie data te werk. Die Q-ideologie is bondigheid en doeltreffendheid, terwyl duidelikheid opgeoffer word. Dit word geregverdig deur die feit dat die vektortaal in elk geval moeilik sal wees om te verstaan, en die bondigheid en rykheid van die opname laat jou toe om 'n baie groter deel van die program op een skerm te sien, wat dit uiteindelik makliker maak om te verstaan.
In hierdie artikel implementeer ons 'n volwaardige program in Q en jy wil dit dalk probeer. Om dit te doen, sal jy die werklike Q nodig hΓͺ. Jy kan die gratis 32-bis weergawe op die kx maatskappy webwerf aflaai β
Probleemstelling
Daar is 'n bron wat elke 25 millisekondes 'n tabel met data stuur. Aangesien KDB+ hoofsaaklik in finansies gebruik word, sal ons aanvaar dat dit 'n tabel van transaksies (transaksies) is, wat die volgende kolomme het: tyd (tyd in millisekondes), sym (maatskappybenaming op die aandelebeurs - IBM, AAPL,...), prys (die prys waarteen die aandele gekoop is), grootte (grootte van die transaksie). Die 25 millisekonde interval is arbitrΓͺr, nie te klein en nie te lank nie. Die teenwoordigheid daarvan beteken dat die data na die diens kom wat reeds gebuffer is. Dit sal maklik wees om buffering aan die dienskant te implementeer, insluitend dinamiese buffering, afhangende van die huidige las, maar vir eenvoud fokus ons op 'n vaste interval.
Die diens moet elke minuut tel vir elke inkomende simbool vanaf die sim-kolom 'n stel samevoegingsfunksies - maksimum prys, gemiddelde prys, somgrootte, ens. Nuttige inligting. Vir eenvoud sal ons aanvaar dat alle funksies inkrementeel bereken kan word, m.a.w. om 'n nuwe waarde te verkry, is dit genoeg om twee getalle te ken - die ou en die inkomende waardes. Byvoorbeeld, die funksies maks, gemiddeld, som het hierdie eienskap, maar die mediaanfunksie nie.
Ons sal ook aanvaar dat die inkomende datastroom tydgeorden is. Dit sal ons die geleentheid gee om net met die laaste minuut te werk. In die praktyk is dit genoeg om met die huidige en vorige minute te kan werk ingeval sommige opdaterings laat is. Vir eenvoud sal ons nie hierdie saak oorweeg nie.
Aggregasie funksies
Die vereiste samevoegingsfunksies word hieronder gelys. Ek het soveel as moontlik van hulle geneem om die las op die diens te verhoog:
- hoog β maksimum prys β maksimum prys per minuut.
- laag β min prys β minimum prys per minuut.
- eerste prys β eerste prys β eerste prys per minuut.
- laaste prys β laaste prys β laaste prys per minuut.
- firstSize - eerste grootte - eerste handelsgrootte per minuut.
- lastSize - laaste grootte - laaste handelsgrootte in 'n minuut.
- numTrades - tel i - aantal ambagte per minuut.
- volume β somgrootte β som van handelsgroottes per minuut.
- pvolume β somprys β som van pryse per minuut, vereis vir gem.Prys.
- β somomsetprys*grootte β totale volume transaksies per minuut.
- avgPrice β pvolume%numTrades β gemiddelde prys per minuut.
- avgSize β volume%numTrades β gemiddelde handelsgrootte per minuut.
- vwap β omset% volume β gemiddelde prys per minuut geweeg volgens transaksiegrootte.
- cumVolume β som volume β opgehoopte grootte van transaksies oor die hele tyd.
Kom ons bespreek dadelik een nie-vanselfsprekende punt β hoe om hierdie kolomme vir die eerste keer en vir elke daaropvolgende minuut te inisialiseer. Sommige kolomme van die firstPrice-tipe moet elke keer geΓ―nisialiseer word om nul te wees; hul waarde is ongedefinieerd. Ander volumetipes moet altyd op 0 gestel word. Daar is ook kolomme wat 'n gekombineerde benadering vereis - byvoorbeeld, cumVolume moet vanaf die vorige minuut gekopieer word, en vir die eerste een op 0 gestel. Kom ons stel al hierdie parameters met behulp van die woordeboekdata tipe (analoog aan 'n rekord):
// list ! list β ΡΠΎΠ·Π΄Π°ΡΡ ΡΠ»ΠΎΠ²Π°ΡΡ, 0n β float null, 0N β long null, `sym β ΡΠΈΠΏ ΡΠΈΠΌΠ²ΠΎΠ», `sym1`sym2 β ΡΠΏΠΈΡΠΎΠΊ ΡΠΈΠΌΠ²ΠΎΠ»ΠΎΠ²
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // ΡΠΏΠΈΡΠΎΠΊ Π²ΡΠ΅Ρ
Π²ΡΡΠΈΡΠ»ΡΠ΅ΠΌΡΡ
ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse ΠΎΠ±ΡΡΡΠ½Π΅Π½ Π½ΠΈΠΆΠ΅
Ek het sim en tyd by die woordeboek gevoeg vir gerief, nou is initWith 'n klaargemaakte reΓ«l van die finale saamgevoegde tabel, waar dit nog oorbly om die korrekte sim en tyd in te stel. Jy kan dit gebruik om nuwe rye by 'n tabel te voeg.
Ons sal aggCols nodig hΓͺ wanneer 'n samevoegingsfunksie geskep word. Die lys moet omgekeer word as gevolg van die volgorde waarin uitdrukkings in Q geΓ«valueer word (van regs na links). Die doel is om te verseker dat die berekening van hoog na cumVolume gaan, aangesien sommige kolomme van voriges afhang.
Kolomme wat na 'n nuwe minuut van die vorige een gekopieer moet word, die simkolom word gerieflikheidshalwe bygevoeg:
rollColumns:`sym`cumVolume;
Kom ons verdeel nou die kolomme in groepe volgens hoe hulle opgedateer moet word. Drie tipes kan onderskei word:
- Akkumulators (volume, omset, ..) β ons moet die inkomende waarde by die vorige een voeg.
- Met 'n spesiale punt (hoog, laag, ..) - die eerste waarde in die minuut word geneem uit die inkomende data, die res word met behulp van die funksie bereken.
- Rus. Altyd bereken deur 'n funksie te gebruik.
Kom ons definieer veranderlikes vir hierdie klasse:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Berekening volgorde
Ons sal die saamgevoegde tabel in twee fases opdateer. Vir doeltreffendheid verklein ons eers die inkomende tabel sodat daar net een ry vir elke karakter en minuut is. Die feit dat al ons funksies inkrementeel en assosiatief is, waarborg dat die resultaat van hierdie bykomende stap nie sal verander nie. Jy kan die tabel verklein deur te kies:
select high:max price, low:min price β¦ by sym,time.minute from table
Hierdie metode het 'n nadeel - die stel berekende kolomme is vooraf gedefinieer. Gelukkig, in Q, word select ook geΓ―mplementeer as 'n funksie waar jy dinamies geskepte argumente kan vervang:
?[table;whereClause;byClause;selectClause]
Ek sal nie die formaat van die argumente in detail beskryf nie; in ons geval sal slegs deur en uitgesoekte uitdrukkings nie-triviaal wees en hulle moet woordeboeke van die vormkolomme!uitdrukkings wees. Die krimpfunksie kan dus soos volg gedefinieer word:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each ΡΡΠΎ ΡΡΠ½ΠΊΡΠΈΡ map Π² Q Π΄Π»Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠΏΠΈΡΠΊΠ°
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Vir duidelikheid het ek die ontleed-funksie gebruik, wat 'n string met 'n Q-uitdrukking verander in 'n waarde wat na die eval-funksie oorgedra kan word en wat in die funksie-seleksie vereis word. Let ook op dat voorproses gedefinieer word as 'n projeksie (d.w.s. 'n funksie met gedeeltelik gedefinieerde argumente) van die kiesfunksie, een argument (die tabel) ontbreek. As ons voorproses op 'n tabel toepas, sal ons 'n saamgeperste tabel kry.
Die tweede fase is die opdatering van die saamgevoegde tabel. Kom ons skryf eers die algoritme in pseudokode:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
β¦
In Q is dit algemeen om kaart-/verminderfunksies in plaas van lusse te gebruik. Maar aangesien Q 'n vektortaal is en ons maklik alle bewerkings op alle simbole gelyktydig kan toepas, kan ons met 'n eerste benadering hoegenaamd sonder 'n lus klaarkom en bewerkings op alle simbole gelyktydig uitvoer:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
β¦
Maar ons kan verder gaan, Q het 'n unieke en uiters kragtige operateur - die algemene toewysingsoperateur. Dit laat jou toe om 'n stel waardes in 'n komplekse datastruktuur te verander deur 'n lys van indekse, funksies en argumente te gebruik. In ons geval lyk dit so:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;β¦]: function[target[idx 0;idx 1;β¦];argument], Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΡΡΠ½ΠΊΡΠΈΡ β ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Ongelukkig, om aan 'n tabel toe te wys, benodig jy 'n lys van rye, nie kolomme nie, en jy moet die matriks (lys van kolomme na lys van rye) transponeer deur die flip-funksie te gebruik. Dit is duur vir 'n groot tabel, dus pas ons eerder 'n algemene opdrag op elke kolom afsonderlik toe deur die kaartfunksie (wat soos 'n apostrof lyk) te gebruik:
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Ons gebruik weer funksieprojeksie. Let ook daarop dat in Q, die skep van 'n lys ook 'n funksie is en ons kan dit noem deur die each(map) funksie te gebruik om 'n lys van lyste te kry.
Om te verseker dat die stel berekende kolomme nie vas is nie, sal ons bogenoemde uitdrukking dinamies skep. Kom ons definieer eers funksies om elke kolom te bereken, deur die ry- en inp-veranderlikes te gebruik om na die saamgevoegde en insette data te verwys:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Sommige kolomme is spesiaal; hul eerste waarde moet nie deur die funksie bereken word nie. Ons kan bepaal dat dit die eerste is deur die ry[`numTrades] kolom - as dit 0 bevat, dan is die waarde eerste. Q het 'n kiesfunksie - ?[Booleaanse lys;lys1;lys2] - wat 'n waarde uit lys 1 of 2 kies, afhangende van die voorwaarde in die eerste argument:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - ΡΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΠΎΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π΄Π»Ρ ΡΠ»ΡΡΠ°Ρ ΠΊΠΎΠ³Π΄Π° ΠΈΠ½Π΄Π΅ΠΊΡ Π½Π΅Π³Π»ΡΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Hier het ek 'n algemene opdrag met my funksie genoem ('n uitdrukking in krulhakies). Dit ontvang die huidige waarde (die eerste argument) en 'n bykomende argument, wat ek in die 4de parameter deurgee.
Kom ons voeg batteryluidsprekers afsonderlik by, aangesien die funksie vir hulle dieselfde is:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Dit is 'n normale opdrag volgens Q-standaarde, maar ek gee dadelik 'n lys waardes toe. Laastens, laat ons die hooffunksie skep:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" ΠΏΡΠΈΡΠ²ΠΎΠΈΠΌ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΠΎΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ Π·Π°Π²ΠΈΡΡΡ ΠΎΡ ΡΠΆΠ΅ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΡΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅ΡΡΠΈΠΌ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΡ. ,β ΡΠ°ΡΡΠΈΡΡΠΎΠ²ΡΠ²Π°Π΅ΡΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs β String from Vector (sv), ΡΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΡΡΡΠΎΠΊ Π²ΡΡΠ°Π²Π»ΡΡ β;β ΠΏΠΎΡΡΠ΅Π΄ΠΈΠ½Π΅
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Met hierdie uitdrukking skep ek dinamies 'n funksie uit 'n string wat die uitdrukking bevat wat ek hierbo gegee het. Die resultaat sal so lyk:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;β¦ ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
Die kolomevalueringsvolgorde word omgekeer, want in Q is die evalueringsvolgorde van regs na links.
Nou het ons twee hooffunksies wat nodig is vir berekeninge, ons moet net 'n bietjie infrastruktuur byvoeg en die diens is gereed.
Finale stappe
Ons het preprocess en updateAgg funksies wat al die werk doen. Maar dit is steeds nodig om die korrekte oorgang deur minute te verseker en indekse vir samevoeging te bereken. Eerstens, laat ons die init-funksie definieer:
init:{
tradeAgg:: 0#enlist[initWith]; // ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΠΏΡΡΡΡΡ ΡΠΈΠΏΠΈΠ·ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ, enlist ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠ»ΠΎΠ²Π°ΡΡ Π² ΡΠ°Π±Π»ΠΈΡΡ, Π° 0# ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ Π²Π·ΡΡΡ 0 ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΠΈΠ· Π½Π΅Π΅
currTime::00:00; // Π½Π°ΡΠ½Π΅ΠΌ Ρ 0, :: ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π² Π³Π»ΠΎΠ±Π°Π»ΡΠ½ΡΡ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ
currSyms::`u#`symbol$(); // `u# - ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ Π² Π΄Π΅ΡΠ΅Π²ΠΎ, Π΄Π»Ρ ΡΡΠΊΠΎΡΠ΅Π½ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ²
offset::0; // ΠΈΠ½Π΄Π΅ΠΊΡ Π² tradeAgg, Π³Π΄Π΅ Π½Π°ΡΠΈΠ½Π°Π΅ΡΡΡ ΡΠ΅ΠΊΡΡΠ°Ρ ΠΌΠΈΠ½ΡΡΠ°
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // ΠΊΡΡ Π΄Π»Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, ΡΠ°Π±Π»ΠΈΡΠ° Ρ ΠΊΠ»ΡΡΠΎΠΌ sym
}
Ons sal ook die rolfunksie definieer, wat die huidige minuut sal verander:
roll:{[tm]
if[currTime>tm; :init[]]; // Π΅ΡΠ»ΠΈ ΠΏΠ΅ΡΠ΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡΡ, ΡΠΎ ΠΏΡΠΎΡΡΠΎ Π²ΡΠ·ΠΎΠ²Π΅ΠΌ init
rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ ΠΊΡΡ β Π²Π·ΡΡΡ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±ΡΠ΅Π·Π°ΡΡ, Π²ΡΡΠ°Π²ΠΈΡΡ Π² rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Ons sal 'n funksie nodig hΓͺ om nuwe karakters by te voeg:
addSyms:{[syms]
currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠΏΠΈΡΠΎΠΊ ΠΈΠ·Π²Π΅ΡΡΠ½ΡΡ
// Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠ°Π±Π»ΠΈΡΡ sym, time ΠΈ rollColumns Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π²ΡΠΈΡΡ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΡΠΌ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ΠΌ.
// Π€ΡΠ½ΠΊΡΠΈΡ ^ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΠ΅Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ Π΄Π»Ρ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Π΅ΡΠ»ΠΈ ΡΠΈΠΌΠ²ΠΎΠ»Π° Π½Π΅Ρ Π² ΠΊΡΡΠ΅. value flip table Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² ΡΠ°Π±Π»ΠΈΡΠ΅.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
En laastens, die upd-funksie (die tradisionele naam vir hierdie funksie vir Q-dienste), wat deur die kliΓ«nt geroep word om data by te voeg:
upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±ΡΡΠ½ΠΎ ΡΠ΅ΡΠ²ΠΈΡ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π΅Ρ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ°Π±Π»ΠΈΡ
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½ΡΠ΅ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡΡΡ
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // ΠΏΠΎΠΌΠ΅Π½ΡΠ΅ΠΌ ΠΌΠΈΠ½ΡΡΡ, Π΅ΡΠ»ΠΈ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ
data:select from data where time=tm; // ΡΠΈΠ»ΡΡΡΠ°ΡΠΈΡ
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²ΡΠ΅ ΡΠΈΠΌΠ²ΠΎΠ»Ρ
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ. Π€ΡΠ½ΠΊΡΠΈΡ ? ΠΈΡΠ΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡ ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΡΠΏΠΈΡΠΊΠ° ΡΠΏΡΠ°Π²Π° Π² ΡΠΏΠΈΡΠΊΠ΅ ΡΠ»Π΅Π²Π°.
};
Dis al. Hier is die volledige kode van ons diens, soos belowe, net 'n paar reΓ«ls:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
toets
Kom ons kyk na die prestasie van die diens. Om dit te doen, laat ons dit in 'n aparte proses laat loop (plaas die kode in die service.q-lΓͺer) en noem die init-funksie:
q service.q βp 5566
q)init[]
In 'n ander konsole, begin die tweede Q-proses en koppel aan die eerste:
h:hopen `:host:5566
h:hopen 5566 // Π΅ΡΠ»ΠΈ ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ Ρ
ΠΎΡΡΠ΅
Kom ons skep eers 'n lys simbole - 10000 XNUMX stukke en voeg 'n funksie by om 'n ewekansige tabel te skep. In die tweede konsole:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Ek het drie regte simbole by die lys gevoeg om dit makliker te maak om hulle in die tabel te soek. Die rnd-funksie skep 'n ewekansige tabel met n rye, waar die tyd wissel van t tot t+25 millisekondes.
Nou kan jy probeer om data na die diens te stuur (voeg die eerste tien uur by):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
U kan in die diens kyk of die tabel opgedateer is:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Gevolg:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
Kom ons voer nou lastoetse uit om uit te vind hoeveel data die diens per minuut kan verwerk. Laat ek jou daaraan herinner dat ons die opdateringsinterval op 25 millisekondes gestel het. Gevolglik moet die diens (gemiddeld) in ten minste 20 millisekondes per opdatering pas om gebruikers tyd te gee om data aan te vra. Voer die volgende in in die tweede proses:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 is twee minute. Jy kan probeer om eerste vir 1000 rye elke 25 millisekondes te hardloop:
start 1000
In my geval is die resultaat ongeveer 'n paar millisekondes per opdatering. So ek sal dadelik die aantal rye verhoog na 10.000 XNUMX:
start 10000
Gevolg:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Weereens, niks besonders nie, maar dit is 24 miljoen reΓ«ls per minuut, 400 duisend per sekonde. Vir meer as 25 millisekondes het die opdatering slegs 5 keer verlangsaam, blykbaar toe die minuut verander het. Kom ons verhoog tot 100.000 XNUMX:
start 100000
Gevolg:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Soos u kan sien, kan die diens skaars klaarkom, maar dit slaag nietemin daarin om kop bo water te hou. So 'n volume data (240 miljoen rye per minuut) is uiters groot; in sulke gevalle is dit algemeen om verskeie klone (of selfs dosyne klone) van die diens te begin, wat elkeen slegs 'n deel van die karakters verwerk. Tog is die resultaat indrukwekkend vir 'n geΓ―nterpreteerde taal wat hoofsaaklik op databerging fokus.
Die vraag kan ontstaan ββwaarom tyd nie-lineΓͺr groei met die grootte van elke opdatering. Die rede is dat die krimpfunksie eintlik 'n C-funksie is, wat baie meer doeltreffend is as updateAgg. Vanaf 'n sekere opdateringsgrootte (ongeveer 10.000 30), bereik updateAgg sy plafon en dan hang die uitvoeringstyd daarvan nie af van die opdateringsgrootte nie. Dit is as gevolg van die voorlopige stap Q dat die diens in staat is om sulke volumes data te verteer. Dit beklemtoon hoe belangrik dit is om die regte algoritme te kies wanneer jy met groot data werk. Nog 'n punt is die korrekte berging van data in die geheue. As die data nie kolomvormig gestoor is nie of nie volgens tyd georden is nie, dan sou ons vertroud raak met iets soos 'n TLB-kasmis - die afwesigheid van 'n geheuebladsyadres in die verwerkeradreskas. Om na 'n adres te soek neem ongeveer XNUMX keer langer as dit onsuksesvol is, en as die data verstrooi is, kan dit die diens verskeie kere vertraag.
Gevolgtrekking
In hierdie artikel het ek gewys dat die KDB+ en Q-databasis nie net geskik is om groot data te stoor en maklik toegang daartoe te verkry deur middel van uitgesoekte nie, maar ook vir die skep van dataverwerkingsdienste wat in staat is om honderde miljoene rye/gigagrepe data te verteer, selfs in een enkele Q-proses. Die Q-taal self maak voorsiening vir uiters bondige en doeltreffende implementering van algoritmes wat verband hou met dataverwerking as gevolg van sy vektor aard, ingeboude SQL dialek tolk en 'n baie suksesvolle stel biblioteek funksies.
Ek sal daarop let dat bogenoemde net deel is van wat Q kan doen, dit het ook ander unieke kenmerke. Byvoorbeeld, 'n uiters eenvoudige IPC-protokol wat die grens tussen individuele Q-prosesse uitvee en jou toelaat om honderde van hierdie prosesse in 'n enkele netwerk te kombineer, wat op dosyne bedieners in verskillende wΓͺrelddele geleΓ« kan wees.
Bron: will.com