KDB+ oinarria, Q programazio-lengoaia, zein indargune eta ahulgune diren irakur dezakezu nire aurreko
Sarrera
KDB+ datu-kantitate oso handietara bideratutako zutabe-datu-base bat da, modu zehatz batean ordenatuta (denboraren arabera batez ere). Batez ere finantza-erakundeetan erabiltzen da - bankuetan, inbertsio-funtsetan, aseguru-etxeetan. Q hizkuntza KDB+-ren barne hizkuntza da, datu hauekin modu eraginkorrean lan egiteko aukera ematen duena. Q ideologia laburtasuna eta eraginkortasuna da, argitasuna sakrifikatzen den bitartean. Honen bidez justifikatzen da bektore-lengoaia ulertzea zaila izango dela edozein kasutan, eta grabazioaren laburtasunari eta aberastasunari esker, programaren zati askoz handiagoa pantaila batean ikusteko aukera ematen du, eta horrek, azken finean, errazago ulertzen du.
Artikulu honetan Q-en osoko programa bat inplementatzen dugu eta baliteke probatu nahi izatea. Horretarako, benetako Q-a beharko duzu. Doako 32 biteko bertsioa deskarga dezakezu kx enpresaren webgunean -
Arazoaren formulazioa
Badago iturri bat 25 milisegundotik behin datuekin taula bat bidaltzen duena. KDB+ finantzaketan batez ere erabiltzen denez, hau transakzioen taula bat dela suposatuko dugu, zutabe hauek dituena: denbora (denbora milisegundotan), sym (burtsan enpresaren izendapena - IBM, AAPL,β¦), prezioa (akzioak erosi ziren prezioa), tamaina (eragiketaren tamaina). 25 milisegundoko tartea arbitrarioa da, ez txikiegia eta ez luzeegia. Bere presentziak esan nahi du datuak dagoeneko buffered dagoen zerbitzura iristen direla. Erraza izango litzateke buffering-a ezartzea zerbitzuaren aldetik, uneko kargaren araberako buffer dinamikoa barne, baina sinpletasunerako, tarte finko batean zentratuko gara.
Zerbitzuak minutu bakoitza zenbatu behar du sym zutabetik sartzen den ikur bakoitzeko funtzio multzo bat - gehienezko prezioa, batez besteko prezioa, batura-tamaina, etab. informazio baliagarria. Sinpletasunerako, funtzio guztiak modu inkrementalean kalkula daitezkeela suposatuko dugu, hau da. balio berri bat lortzeko, nahikoa da bi zenbaki ezagutzea: zaharrak eta sarrerako balioak. Adibidez, max, batez bestekoa, batura funtzioek propietate hau dute, baina mediana funtzioak ez.
Sarrerako datu-korrontea denbora ordenatuta dagoela ere hartuko dugu. Honek azken momentuan bakarrik lan egiteko aukera emango digu. Praktikan, nahikoa da uneko eta aurreko minutuekin lan egin ahal izatea, eguneraketa batzuk berandu badaude. Sinpletasunagatik, ez dugu kasu hau kontuan hartuko.
Agregazio-funtzioak
Beharrezko agregazio-funtzioak behean zerrendatzen dira. Ahalik eta gehien hartu ditut zerbitzuaren karga handitzeko:
- altua β prezio maximoa β minutuko gehienezko prezioa.
- baxua β prezio minimoa β minutuko gutxieneko prezioa.
- firstPrice β lehen prezioa β lehenengo prezioa minutuko.
- lastPrice β azken prezioa β azken prezioa minutuko.
- firstSize - lehen tamaina - lehenengo merkataritza-tamaina minutuko.
- lastSize - azken tamaina - azken merkataritza-tamaina minutu batean.
- numTrades β count i β minutuko salerosketa kopurua.
- bolumena - batura tamaina - minutuko merkataritza-tamainen batura.
- pvolume - sum price - minutuko prezioen batura, batez besteko Prezioa egiteko beharrezkoa.
- β fakturazioaren prezioa*tamaina β minutuko transakzioen bolumen osoa.
- avgPrice β pvolume%numTrades β minutuko batez besteko prezioa.
- avgSize - bolumena%numTrades - minutuko merkataritzaren batez besteko tamaina.
- vwap - fakturazioaren% bolumena - transakzio tamainaren arabera haztatuta minutu bakoitzeko batez besteko prezioa.
- cumVolume - batura bolumena - denbora osoan zehar transakzioen tamaina metatua.
Goazen berehala eztabaidatu begi bistakoa ez den puntu bat: zutabe hauek lehen aldiz eta hurrengo minutu bakoitzeko nola hasieratu. FirstPrice motako zutabe batzuk hasieratu behar dira aldi bakoitzean null gisa; haien balioa zehaztu gabe dago. Beste bolumen-motak 0-an ezarri behar dira beti. Ikuspegi konbinatua behar duten zutabeak ere badaude - adibidez, cumVolume aurreko minututik kopiatu behar da, eta lehenengorako 0-n ezarri. Ezar ditzagun parametro horiek guztiak hiztegiko datuak erabiliz. mota (erregistro baten antzekoa):
// list ! list β ΡΠΎΠ·Π΄Π°ΡΡ ΡΠ»ΠΎΠ²Π°ΡΡ, 0n β float null, 0N β long null, `sym β ΡΠΈΠΏ ΡΠΈΠΌΠ²ΠΎΠ», `sym1`sym2 β ΡΠΏΠΈΡΠΎΠΊ ΡΠΈΠΌΠ²ΠΎΠ»ΠΎΠ²
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // ΡΠΏΠΈΡΠΎΠΊ Π²ΡΠ΅Ρ
Π²ΡΡΠΈΡΠ»ΡΠ΅ΠΌΡΡ
ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse ΠΎΠ±ΡΡΡΠ½Π΅Π½ Π½ΠΈΠΆΠ΅
Sym eta denbora gehitu nizkion hiztegiari erosotasunerako, orain initWith azken taula agregatutik prestatutako lerroa da, non sym eta ordu egokia ezartzeko geratzen den. Taula bati errenkada berriak gehitzeko erabil dezakezu.
AggCols beharko dugu agregazio funtzio bat sortzerakoan. Zerrenda alderantzikatu egin behar da Q-ko adierazpenak ebaluatzen diren ordenagatik (eskuinetik ezkerrera). Helburua da kalkulua handitik CumVolumera doazela ziurtatzea, zutabe batzuk aurrekoen araberakoak baitira.
Aurrekoaren minutu berri batean kopiatu behar diren zutabeak, sym zutabea gehitzen da erosotasunerako:
rollColumns:`sym`cumVolume;
Orain zati ditzagun zutabeak taldetan nola eguneratu behar diren arabera. Hiru mota bereiz daitezke:
- Metatzaileak (bolumena, fakturazioa,..) β sarrerako balioa aurrekoari gehitu behar diogu.
- Puntu berezi batekin (altua, baxua, ..) β minutuko lehen balioa sarrerako datuetatik hartzen da, gainerakoak funtzioa erabiliz kalkulatzen dira.
- Atsedena. Funtzio baten bidez kalkulatzen da beti.
Defini ditzagun klase hauetarako aldagaiak:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Kalkulu-ordena
Taula agregatua bi fasetan eguneratuko dugu. Eraginkortasuna lortzeko, lehenengo sarrerako taula txikitzen dugu, karaktere eta minutu bakoitzeko errenkada bakarra egon dadin. Gure funtzio guztiak inkrementalak eta elkartuak izateak bermatzen du urrats gehigarri honen emaitza ez dela aldatuko. Taula txikitu dezakezu hautatu hau erabiliz:
select high:max price, low:min price β¦ by sym,time.minute from table
Metodo honek desabantaila bat du - kalkulatutako zutabeen multzoa aurrez definituta dago. Zorionez, Q-en, hautatu ere dinamikoki sortutako argumentuak ordezka ditzakezun funtzio gisa inplementatzen da:
?[table;whereClause;byClause;selectClause]
Ez dut zehatz-mehatz deskribatuko argumentuen formatua; gure kasuan, by eta select esamoldeak bakarrik ez dira hutsalak izango eta formako zutabeen!esamoldeen hiztegiak izan behar dira. Beraz, murrizketa funtzioa honela defini daiteke:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each ΡΡΠΎ ΡΡΠ½ΠΊΡΠΈΡ map Π² Q Π΄Π»Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠΏΠΈΡΠΊΠ°
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Argitasunerako, parse funtzioa erabili dut, Q adierazpena duen kate bat eval funtziora pasa daitekeen eta funtzio hautatzeko beharrezkoa den balio batean bihurtzen duena. Kontuan izan, halaber, aurreprozesua hautatzeko funtzioaren proiekzio gisa (hau da, partzialki definitutako argumentuak dituen funtzio bat) definitzen dela, argumentu bat (taula) falta dela. Taula bati aurreprozesua aplikatzen badiogu, taula konprimitua lortuko dugu.
Bigarren fasea taula agregatua eguneratzea da. Idatz dezagun lehenik algoritmoa pseudokodean:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
β¦
Q-n, ohikoa da mapa/murrizketa funtzioak erabiltzea begizten ordez. Baina Q lengoaia bektoriala denez eta eragiketa guztiak aldi berean ikur guztiei erraz aplika diezazkiekegunez, lehenengo hurbilketa batean begiztarik gabe egin dezakegu, ikur guztietan eragiketak aldi berean eginez:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
β¦
Baina harago joan gaitezke, Q-k operadore bakarra eta oso indartsua du - esleipen-operadore orokortua. Datu-egitura konplexu batean balio multzo bat aldatzeko aukera ematen du indize, funtzio eta argumentuen zerrenda erabiliz. Gure kasuan honelakoa da:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;β¦]: function[target[idx 0;idx 1;β¦];argument], Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΡΡΠ½ΠΊΡΠΈΡ β ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Zoritxarrez, taula bati esleitzeko errenkaden zerrenda bat behar duzu, ez zutabeak, eta matrizea (zutabeen zerrenda errenkaden zerrendara) transposatu behar duzu irauli funtzioa erabiliz. Taula handi baterako garestia da, beraz, zutabe bakoitzari esleipen orokor bat aplikatzen diogu bereizita, mapa funtzioa erabiliz (apostrofo baten itxura duena):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Berriz ere funtzioen proiekzioa erabiltzen dugu. Kontuan izan, gainera, Q-n, zerrenda bat sortzea funtzio bat ere badela eta bakoitza (mapa) funtzioa erabiliz dei dezakegula zerrenden zerrenda bat lortzeko.
Kalkulatutako zutabeen multzoa finkoa ez dela ziurtatzeko, goiko adierazpena dinamikoki sortuko dugu. Lehenik eta behin, defini ditzagun zutabe bakoitza kalkulatzeko funtzioak, errenkada eta inp aldagaiak erabiliz, gehitutako eta sarrerako datuei erreferentzia egiteko:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Zutabe batzuk bereziak dira; haien lehen balioa ez du funtzioak kalkulatu behar. Lehena dela zehaztu dezakegu errenkada [`numTrades] zutabearen bidez - 0 badu, balioa lehena da. Q-k hautatze-funtzioa du - ?[Boolean list;list1;list2] - 1 edo 2 zerrendako balio bat hautatzen duena, lehen argumentuko baldintzaren arabera:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - ΡΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΠΎΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π΄Π»Ρ ΡΠ»ΡΡΠ°Ρ ΠΊΠΎΠ³Π΄Π° ΠΈΠ½Π΄Π΅ΠΊΡ Π½Π΅Π³Π»ΡΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Hemen esleipen orokor bati deitu nion nire funtzioarekin (giltza kizkurdun adierazpena). Uneko balioa (lehen argumentua) eta argumentu gehigarri bat jasotzen ditu, 4. parametroan pasatzen dudana.
Gehi ditzagun bateriaren bozgorailuak bereizita, funtzioa bera baita beraientzat:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Q estandarren esleipen normala da, baina balio zerrenda bat esleitzen ari naiz aldi berean. Azkenik, sor dezagun funtzio nagusia:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" ΠΏΡΠΈΡΠ²ΠΎΠΈΠΌ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΠΎΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ Π·Π°Π²ΠΈΡΡΡ ΠΎΡ ΡΠΆΠ΅ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΡΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅ΡΡΠΈΠΌ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΡ. ,β ΡΠ°ΡΡΠΈΡΡΠΎΠ²ΡΠ²Π°Π΅ΡΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs β String from Vector (sv), ΡΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΡΡΡΠΎΠΊ Π²ΡΡΠ°Π²Π»ΡΡ β;β ΠΏΠΎΡΡΠ΅Π΄ΠΈΠ½Π΅
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Adierazpen honekin, goian eman dudan adierazpena duen kate batetik funtzio bat sortzen dut dinamikoki. Emaitza honela izango da:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;β¦ ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
Zutabeen ebaluazio-ordena alderantzikatu egiten da Q-n ebaluazio-ordena eskuinetik ezkerrera delako.
Orain kalkuluetarako beharrezkoak diren bi funtzio nagusi ditugu, azpiegitura apur bat gehitu besterik ez dugu egin behar eta zerbitzua prest dago.
Azken urratsak
Lan guztia egiten duten aurreprozesatu eta updateAgg funtzioak ditugu. Baina oraindik beharrezkoa da minutuen bidez trantsizio zuzena bermatzea eta agregaziorako indizeak kalkulatzea. Lehenik eta behin, defini dezagun init funtzioa:
init:{
tradeAgg:: 0#enlist[initWith]; // ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΠΏΡΡΡΡΡ ΡΠΈΠΏΠΈΠ·ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ, enlist ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠ»ΠΎΠ²Π°ΡΡ Π² ΡΠ°Π±Π»ΠΈΡΡ, Π° 0# ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ Π²Π·ΡΡΡ 0 ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΠΈΠ· Π½Π΅Π΅
currTime::00:00; // Π½Π°ΡΠ½Π΅ΠΌ Ρ 0, :: ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π² Π³Π»ΠΎΠ±Π°Π»ΡΠ½ΡΡ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ
currSyms::`u#`symbol$(); // `u# - ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ Π² Π΄Π΅ΡΠ΅Π²ΠΎ, Π΄Π»Ρ ΡΡΠΊΠΎΡΠ΅Π½ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ²
offset::0; // ΠΈΠ½Π΄Π΅ΠΊΡ Π² tradeAgg, Π³Π΄Π΅ Π½Π°ΡΠΈΠ½Π°Π΅ΡΡΡ ΡΠ΅ΠΊΡΡΠ°Ρ ΠΌΠΈΠ½ΡΡΠ°
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // ΠΊΡΡ Π΄Π»Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, ΡΠ°Π±Π»ΠΈΡΠ° Ρ ΠΊΠ»ΡΡΠΎΠΌ sym
}
Roll funtzioa ere definituko dugu, uneko minutua aldatuko duena:
roll:{[tm]
if[currTime>tm; :init[]]; // Π΅ΡΠ»ΠΈ ΠΏΠ΅ΡΠ΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡΡ, ΡΠΎ ΠΏΡΠΎΡΡΠΎ Π²ΡΠ·ΠΎΠ²Π΅ΠΌ init
rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ ΠΊΡΡ β Π²Π·ΡΡΡ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±ΡΠ΅Π·Π°ΡΡ, Π²ΡΡΠ°Π²ΠΈΡΡ Π² rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Karaktere berriak gehitzeko funtzio bat beharko dugu:
addSyms:{[syms]
currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠΏΠΈΡΠΎΠΊ ΠΈΠ·Π²Π΅ΡΡΠ½ΡΡ
// Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠ°Π±Π»ΠΈΡΡ sym, time ΠΈ rollColumns Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π²ΡΠΈΡΡ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΡΠΌ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ΠΌ.
// Π€ΡΠ½ΠΊΡΠΈΡ ^ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΠ΅Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ Π΄Π»Ρ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Π΅ΡΠ»ΠΈ ΡΠΈΠΌΠ²ΠΎΠ»Π° Π½Π΅Ρ Π² ΠΊΡΡΠ΅. value flip table Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² ΡΠ°Π±Π»ΠΈΡΠ΅.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
Eta azkenik, upd funtzioa (Q zerbitzuetarako funtzio honen izen tradizionala), bezeroak datuak gehitzeko deitzen duena:
upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±ΡΡΠ½ΠΎ ΡΠ΅ΡΠ²ΠΈΡ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π΅Ρ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ°Π±Π»ΠΈΡ
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½ΡΠ΅ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡΡΡ
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // ΠΏΠΎΠΌΠ΅Π½ΡΠ΅ΠΌ ΠΌΠΈΠ½ΡΡΡ, Π΅ΡΠ»ΠΈ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ
data:select from data where time=tm; // ΡΠΈΠ»ΡΡΡΠ°ΡΠΈΡ
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²ΡΠ΅ ΡΠΈΠΌΠ²ΠΎΠ»Ρ
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ. Π€ΡΠ½ΠΊΡΠΈΡ ? ΠΈΡΠ΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡ ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΡΠΏΠΈΡΠΊΠ° ΡΠΏΡΠ°Π²Π° Π² ΡΠΏΠΈΡΠΊΠ΅ ΡΠ»Π΅Π²Π°.
};
Hori da dena. Hona hemen gure zerbitzuaren kodea osoa, agindu bezala, lerro batzuk besterik ez:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Testing
Ikus dezagun zerbitzuaren errendimendua. Horretarako, exekutatu dezagun prozesu bereizi batean (jarri kodea service.q fitxategian) eta deitu init funtzioari:
q service.q βp 5566
q)init[]
Beste kontsola batean, hasi bigarren Q prozesua eta konektatu lehenengora:
h:hopen `:host:5566
h:hopen 5566 // Π΅ΡΠ»ΠΈ ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ Ρ
ΠΎΡΡΠ΅
Lehenik eta behin, sor ditzagun sinboloen zerrenda - 10000 pieza eta gehitu funtzio bat ausazko taula bat sortzeko. Bigarren kontsolan:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Zerrendari hiru sinbolo erreal gehitu dizkiot, taulan errazago bilatzeko. rnd funtzioak ausazko taula bat sortzen du n errenkadarekin, non denbora t-tik t+25 milisegundora aldatzen den.
Orain saiatu zaitezke datuak bidaltzen zerbitzura (gehitu lehen hamar orduak):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Zerbitzuan egiaztatu dezakezu taula eguneratu dela:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Emaitza:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
Egin ditzagun orain karga-probak zerbitzuak minutuko zenbat datu prozesatu ditzakeen jakiteko. Gogorarazten dizut eguneratze-tartea 25 milisegundoan ezarri dugula. Horren arabera, zerbitzuak (batez beste) gutxienez 20 milisegundotan sartu behar du eguneratze bakoitzeko erabiltzaileei datuak eskatzeko denbora emateko. Idatzi honako hau bigarren prozesuan:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 bi minutu dira. Saia zaitezke lehenik 1000 errenkadetan exekutatzen 25 milisegundotik behin:
start 1000
Nire kasuan, emaitza eguneratze bakoitzeko milisegundo pare bat ingurukoa da. Beraz, berehala igoko dut errenkada kopurua 10.000ra:
start 10000
Emaitza:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Berriz ere, ezer berezirik ez, baina hau da 24 milioi lerro minutuko, 400 mila segundoko. 25 milisegundo baino gehiagoz, eguneratzea 5 aldiz bakarrik moteldu zen, itxuraz minutua aldatzean. Handi gaitezen 100.000ra:
start 100000
Emaitza:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Ikusten duzunez, zerbitzuak apenas aurre egin dezake, baina, hala ere, urpean mantentzea lortzen du. Horrelako datu-bolumena (240 milioi errenkada minutuko) izugarri handia da; halakoetan, ohikoa da zerbitzuaren hainbat klon (edo dozenaka klon) abiarazteko, eta bakoitzak karaktereen zati bat soilik prozesatzen du. Hala ere, emaitza ikusgarria da, batez ere, datuak biltegiratzean zentratzen den hizkuntza interpretatu baterako.
Galdera sor daiteke zergatik hazten den denbora ez-linealki eguneratze bakoitzaren tamainarekin. Arrazoia da txikitzeko funtzioa benetan C funtzio bat dela, updateAgg baino askoz eraginkorragoa dena. Eguneratze-tamaina jakin batetik abiatuta (10.000 inguru), updateAgg bere sabaira iristen da eta orduan bere exekuzio-denbora ez da eguneratzearen tamainaren araberakoa. Q aurretiazko urratsari esker zerbitzuak horrelako datu-bolumenak digeritzeko gai da. Horrek nabarmentzen du zein garrantzitsua den algoritmo egokia aukeratzea big datarekin lan egitean. Beste puntu bat datuak memorian gordetzea da. Datuak zutabeetan gordetzen ez balira edo denboraren arabera ordenatuta egongo ez balira, TLB cache hutsa bezalako gauza bat ezagutuko genuke - prozesadorearen helbidearen cachean memoria orri-helbiderik ez izatea. Helbide bat bilatzeko 30 aldiz gehiago behar da arrakastarik ezean, eta datuak sakabanatuta badaude, zerbitzua hainbat aldiz moteldu daiteke.
Ondorioa
Artikulu honetan, erakutsi nuen KDB+ eta Q datu-baseak egokiak direla datu handiak gordetzeko eta hautaketaren bidez erraz sartzeko, baizik eta ehunka milioi errenkada/gigabyte datu digeritzeko gai diren datuak prozesatzeko zerbitzuak sortzeko ere. Q prozesu bakar bat. Q lengoaiak berak datuen prozesamenduarekin erlazionatutako algoritmoen ezarpen oso zehatza eta eraginkorra ahalbidetzen du, bere izaera bektoriala, SQL dialekto interpretatzailea integratua eta liburutegiko funtzio multzo oso arrakastatsua direla eta.
Kontuan izango dut goikoa Q-k egin dezakeenaren zati bat dela, beste ezaugarri berezi batzuk ere badituela. Esate baterako, IPC protokolo oso sinplea, Q prozesu indibidualen arteko muga ezabatzen duena eta prozesu horietako ehunka sare bakar batean konbinatzeko aukera ematen duena, munduko hainbat lekutako dozenaka zerbitzaritan egon daitekeena.
Iturria: www.habr.com