Kenmerke van die Q- en KDB+-taal met die voorbeeld van 'n intydse diens

Jy kan lees oor wat die KDB+-basis, die Q-programmeertaal is, wat hul sterk- en swakpunte is in my vorige Artikel en kortliks in die inleiding. In die artikel sal ons 'n diens op Q implementeer wat die inkomende datastroom sal verwerk en elke minuut verskillende samevoegingsfunksies sal bereken in "intydse" modus (dit wil sΓͺ, dit sal tyd hΓͺ om alles te bereken voor die volgende gedeelte data). Die hoofkenmerk van Q is dat dit 'n vektortaal is wat jou toelaat om nie met enkele voorwerpe te werk nie, maar met hul skikkings, skikkings skikkings en ander komplekse voorwerpe. Tale soos Q en sy familielede K, J, APL is bekend vir hul beknoptheid. Dikwels kan 'n program wat verskeie skerms kode in 'n bekende taal soos Java opneem, in 'n paar reΓ«ls daarop geskryf word. Dit is wat ek in hierdie artikel wil demonstreer.

Kenmerke van die Q- en KDB+-taal met die voorbeeld van 'n intydse diens

Inleiding

KDB+ is 'n kolomdatabasis wat op baie groot hoeveelhede data gefokus is, op 'n spesifieke manier (hoofsaaklik volgens tyd) georden. Dit word hoofsaaklik in finansiΓ«le instellings gebruik - banke, beleggingsfondse, versekeringsmaatskappye. Die Q-taal is die interne taal van KDB+ wat jou toelaat om effektief met hierdie data te werk. Die Q-ideologie is bondigheid en doeltreffendheid, terwyl duidelikheid opgeoffer word. Dit word geregverdig deur die feit dat die vektortaal in elk geval moeilik sal wees om te verstaan, en die bondigheid en rykheid van die opname laat jou toe om 'n baie groter deel van die program op een skerm te sien, wat dit uiteindelik makliker maak om te verstaan.

In hierdie artikel implementeer ons 'n volwaardige program in Q en jy wil dit dalk probeer. Om dit te doen, sal jy die werklike Q nodig hΓͺ. Jy kan die gratis 32-bis weergawe op die kx maatskappy webwerf aflaai – www.kx.com. Daar, as jy belangstel, sal jy verwysingsinligting oor Q, die boek, vind V Vir sterflinge en verskeie artikels oor hierdie onderwerp.

Probleemstelling

Daar is 'n bron wat elke 25 millisekondes 'n tabel met data stuur. Aangesien KDB+ hoofsaaklik in finansies gebruik word, sal ons aanvaar dat dit 'n tabel van transaksies (transaksies) is, wat die volgende kolomme het: tyd (tyd in millisekondes), sym (maatskappybenaming op die aandelebeurs - IBM, AAPL,...), prys (die prys waarteen die aandele gekoop is), grootte (grootte van die transaksie). Die 25 millisekonde interval is arbitrΓͺr, nie te klein en nie te lank nie. Die teenwoordigheid daarvan beteken dat die data na die diens kom wat reeds gebuffer is. Dit sal maklik wees om buffering aan die dienskant te implementeer, insluitend dinamiese buffering, afhangende van die huidige las, maar vir eenvoud fokus ons op 'n vaste interval.

Die diens moet elke minuut tel vir elke inkomende simbool vanaf die sim-kolom 'n stel samevoegingsfunksies - maksimum prys, gemiddelde prys, somgrootte, ens. Nuttige inligting. Vir eenvoud sal ons aanvaar dat alle funksies inkrementeel bereken kan word, m.a.w. om 'n nuwe waarde te verkry, is dit genoeg om twee getalle te ken - die ou en die inkomende waardes. Byvoorbeeld, die funksies maks, gemiddeld, som het hierdie eienskap, maar die mediaanfunksie nie.

Ons sal ook aanvaar dat die inkomende datastroom tydgeorden is. Dit sal ons die geleentheid gee om net met die laaste minuut te werk. In die praktyk is dit genoeg om met die huidige en vorige minute te kan werk ingeval sommige opdaterings laat is. Vir eenvoud sal ons nie hierdie saak oorweeg nie.

Aggregasie funksies

Die vereiste samevoegingsfunksies word hieronder gelys. Ek het soveel as moontlik van hulle geneem om die las op die diens te verhoog:

  • hoog – maksimum prys – maksimum prys per minuut.
  • laag – min prys – minimum prys per minuut.
  • eerste prys – eerste prys – eerste prys per minuut.
  • laaste prys – laaste prys – laaste prys per minuut.
  • firstSize - eerste grootte - eerste handelsgrootte per minuut.
  • lastSize - laaste grootte - laaste handelsgrootte in 'n minuut.
  • numTrades - tel i - aantal ambagte per minuut.
  • volume – somgrootte – som van handelsgroottes per minuut.
  • pvolume – somprys – som van pryse per minuut, vereis vir gem.Prys.
  • – somomsetprys*grootte – totale volume transaksies per minuut.
  • avgPrice – pvolume%numTrades – gemiddelde prys per minuut.
  • avgSize – volume%numTrades – gemiddelde handelsgrootte per minuut.
  • vwap – omset% volume – gemiddelde prys per minuut geweeg volgens transaksiegrootte.
  • cumVolume – som volume – opgehoopte grootte van transaksies oor die hele tyd.

Kom ons bespreek dadelik een nie-vanselfsprekende punt – hoe om hierdie kolomme vir die eerste keer en vir elke daaropvolgende minuut te inisialiseer. Sommige kolomme van die firstPrice-tipe moet elke keer geΓ―nisialiseer word om nul te wees; hul waarde is ongedefinieerd. Ander volumetipes moet altyd op 0 gestel word. Daar is ook kolomme wat 'n gekombineerde benadering vereis - byvoorbeeld, cumVolume moet vanaf die vorige minuut gekopieer word, en vir die eerste een op 0 gestel. Kom ons stel al hierdie parameters met behulp van die woordeboekdata tipe (analoog aan 'n rekord):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Ek het sim en tyd by die woordeboek gevoeg vir gerief, nou is initWith 'n klaargemaakte reΓ«l van die finale saamgevoegde tabel, waar dit nog oorbly om die korrekte sim en tyd in te stel. Jy kan dit gebruik om nuwe rye by 'n tabel te voeg.

Ons sal aggCols nodig hΓͺ wanneer 'n samevoegingsfunksie geskep word. Die lys moet omgekeer word as gevolg van die volgorde waarin uitdrukkings in Q geΓ«valueer word (van regs na links). Die doel is om te verseker dat die berekening van hoog na cumVolume gaan, aangesien sommige kolomme van voriges afhang.

Kolomme wat na 'n nuwe minuut van die vorige een gekopieer moet word, die simkolom word gerieflikheidshalwe bygevoeg:

rollColumns:`sym`cumVolume;

Kom ons verdeel nou die kolomme in groepe volgens hoe hulle opgedateer moet word. Drie tipes kan onderskei word:

  1. Akkumulators (volume, omset, ..) – ons moet die inkomende waarde by die vorige een voeg.
  2. Met 'n spesiale punt (hoog, laag, ..) - die eerste waarde in die minuut word geneem uit die inkomende data, die res word met behulp van die funksie bereken.
  3. Rus. Altyd bereken deur 'n funksie te gebruik.

Kom ons definieer veranderlikes vir hierdie klasse:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Berekening volgorde

Ons sal die saamgevoegde tabel in twee fases opdateer. Vir doeltreffendheid verklein ons eers die inkomende tabel sodat daar net een ry vir elke karakter en minuut is. Die feit dat al ons funksies inkrementeel en assosiatief is, waarborg dat die resultaat van hierdie bykomende stap nie sal verander nie. Jy kan die tabel verklein deur te kies:

select high:max price, low:min price … by sym,time.minute from table

Hierdie metode het 'n nadeel - die stel berekende kolomme is vooraf gedefinieer. Gelukkig, in Q, word select ook geΓ―mplementeer as 'n funksie waar jy dinamies geskepte argumente kan vervang:

?[table;whereClause;byClause;selectClause]

Ek sal nie die formaat van die argumente in detail beskryf nie; in ons geval sal slegs deur en uitgesoekte uitdrukkings nie-triviaal wees en hulle moet woordeboeke van die vormkolomme!uitdrukkings wees. Die krimpfunksie kan dus soos volg gedefinieer word:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Vir duidelikheid het ek die ontleed-funksie gebruik, wat 'n string met 'n Q-uitdrukking verander in 'n waarde wat na die eval-funksie oorgedra kan word en wat in die funksie-seleksie vereis word. Let ook op dat voorproses gedefinieer word as 'n projeksie (d.w.s. 'n funksie met gedeeltelik gedefinieerde argumente) van die kiesfunksie, een argument (die tabel) ontbreek. As ons voorproses op 'n tabel toepas, sal ons 'n saamgeperste tabel kry.

Die tweede fase is die opdatering van die saamgevoegde tabel. Kom ons skryf eers die algoritme in pseudokode:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

In Q is dit algemeen om kaart-/verminderfunksies in plaas van lusse te gebruik. Maar aangesien Q 'n vektortaal is en ons maklik alle bewerkings op alle simbole gelyktydig kan toepas, kan ons met 'n eerste benadering hoegenaamd sonder 'n lus klaarkom en bewerkings op alle simbole gelyktydig uitvoer:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Maar ons kan verder gaan, Q het 'n unieke en uiters kragtige operateur - die algemene toewysingsoperateur. Dit laat jou toe om 'n stel waardes in 'n komplekse datastruktuur te verander deur 'n lys van indekse, funksies en argumente te gebruik. In ons geval lyk dit so:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ongelukkig, om aan 'n tabel toe te wys, benodig jy 'n lys van rye, nie kolomme nie, en jy moet die matriks (lys van kolomme na lys van rye) transponeer deur die flip-funksie te gebruik. Dit is duur vir 'n groot tabel, dus pas ons eerder 'n algemene opdrag op elke kolom afsonderlik toe deur die kaartfunksie (wat soos 'n apostrof lyk) te gebruik:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ons gebruik weer funksieprojeksie. Let ook daarop dat in Q, die skep van 'n lys ook 'n funksie is en ons kan dit noem deur die each(map) funksie te gebruik om 'n lys van lyste te kry.

Om te verseker dat die stel berekende kolomme nie vas is nie, sal ons bogenoemde uitdrukking dinamies skep. Kom ons definieer eers funksies om elke kolom te bereken, deur die ry- en inp-veranderlikes te gebruik om na die saamgevoegde en insette data te verwys:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Sommige kolomme is spesiaal; hul eerste waarde moet nie deur die funksie bereken word nie. Ons kan bepaal dat dit die eerste is deur die ry[`numTrades] kolom - as dit 0 bevat, dan is die waarde eerste. Q het 'n kiesfunksie - ?[Booleaanse lys;lys1;lys2] - wat 'n waarde uit lys 1 of 2 kies, afhangende van die voorwaarde in die eerste argument:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Hier het ek 'n algemene opdrag met my funksie genoem ('n uitdrukking in krulhakies). Dit ontvang die huidige waarde (die eerste argument) en 'n bykomende argument, wat ek in die 4de parameter deurgee.

Kom ons voeg batteryluidsprekers afsonderlik by, aangesien die funksie vir hulle dieselfde is:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Dit is 'n normale opdrag volgens Q-standaarde, maar ek gee dadelik 'n lys waardes toe. Laastens, laat ons die hooffunksie skep:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Met hierdie uitdrukking skep ek dinamies 'n funksie uit 'n string wat die uitdrukking bevat wat ek hierbo gegee het. Die resultaat sal so lyk:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Die kolomevalueringsvolgorde word omgekeer, want in Q is die evalueringsvolgorde van regs na links.

Nou het ons twee hooffunksies wat nodig is vir berekeninge, ons moet net 'n bietjie infrastruktuur byvoeg en die diens is gereed.

Finale stappe

Ons het preprocess en updateAgg funksies wat al die werk doen. Maar dit is steeds nodig om die korrekte oorgang deur minute te verseker en indekse vir samevoeging te bereken. Eerstens, laat ons die init-funksie definieer:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Ons sal ook die rolfunksie definieer, wat die huidige minuut sal verander:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Ons sal 'n funksie nodig hΓͺ om nuwe karakters by te voeg:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

En laastens, die upd-funksie (die tradisionele naam vir hierdie funksie vir Q-dienste), wat deur die kliΓ«nt geroep word om data by te voeg:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Dis al. Hier is die volledige kode van ons diens, soos belowe, net 'n paar reΓ«ls:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

toets

Kom ons kyk na die prestasie van die diens. Om dit te doen, laat ons dit in 'n aparte proses laat loop (plaas die kode in die service.q-lΓͺer) en noem die init-funksie:

q service.q –p 5566

q)init[]

In 'n ander konsole, begin die tweede Q-proses en koppel aan die eerste:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Kom ons skep eers 'n lys simbole - 10000 XNUMX stukke en voeg 'n funksie by om 'n ewekansige tabel te skep. In die tweede konsole:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Ek het drie regte simbole by die lys gevoeg om dit makliker te maak om hulle in die tabel te soek. Die rnd-funksie skep 'n ewekansige tabel met n rye, waar die tyd wissel van t tot t+25 millisekondes.

Nou kan jy probeer om data na die diens te stuur (voeg die eerste tien uur by):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

U kan in die diens kyk of die tabel opgedateer is:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Gevolg:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Kom ons voer nou lastoetse uit om uit te vind hoeveel data die diens per minuut kan verwerk. Laat ek jou daaraan herinner dat ons die opdateringsinterval op 25 millisekondes gestel het. Gevolglik moet die diens (gemiddeld) in ten minste 20 millisekondes per opdatering pas om gebruikers tyd te gee om data aan te vra. Voer die volgende in in die tweede proses:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 is twee minute. Jy kan probeer om eerste vir 1000 rye elke 25 millisekondes te hardloop:

start 1000

In my geval is die resultaat ongeveer 'n paar millisekondes per opdatering. So ek sal dadelik die aantal rye verhoog na 10.000 XNUMX:

start 10000

Gevolg:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Weereens, niks besonders nie, maar dit is 24 miljoen reΓ«ls per minuut, 400 duisend per sekonde. Vir meer as 25 millisekondes het die opdatering slegs 5 keer verlangsaam, blykbaar toe die minuut verander het. Kom ons verhoog tot 100.000 XNUMX:

start 100000

Gevolg:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Soos u kan sien, kan die diens skaars klaarkom, maar dit slaag nietemin daarin om kop bo water te hou. So 'n volume data (240 miljoen rye per minuut) is uiters groot; in sulke gevalle is dit algemeen om verskeie klone (of selfs dosyne klone) van die diens te begin, wat elkeen slegs 'n deel van die karakters verwerk. Tog is die resultaat indrukwekkend vir 'n geΓ―nterpreteerde taal wat hoofsaaklik op databerging fokus.

Die vraag kan ontstaan ​​waarom tyd nie-lineΓͺr groei met die grootte van elke opdatering. Die rede is dat die krimpfunksie eintlik 'n C-funksie is, wat baie meer doeltreffend is as updateAgg. Vanaf 'n sekere opdateringsgrootte (ongeveer 10.000 30), bereik updateAgg sy plafon en dan hang die uitvoeringstyd daarvan nie af van die opdateringsgrootte nie. Dit is as gevolg van die voorlopige stap Q dat die diens in staat is om sulke volumes data te verteer. Dit beklemtoon hoe belangrik dit is om die regte algoritme te kies wanneer jy met groot data werk. Nog 'n punt is die korrekte berging van data in die geheue. As die data nie kolomvormig gestoor is nie of nie volgens tyd georden is nie, dan sou ons vertroud raak met iets soos 'n TLB-kasmis - die afwesigheid van 'n geheuebladsyadres in die verwerkeradreskas. Om na 'n adres te soek neem ongeveer XNUMX keer langer as dit onsuksesvol is, en as die data verstrooi is, kan dit die diens verskeie kere vertraag.

Gevolgtrekking

In hierdie artikel het ek gewys dat die KDB+ en Q-databasis nie net geskik is om groot data te stoor en maklik toegang daartoe te verkry deur middel van uitgesoekte nie, maar ook vir die skep van dataverwerkingsdienste wat in staat is om honderde miljoene rye/gigagrepe data te verteer, selfs in een enkele Q-proses. Die Q-taal self maak voorsiening vir uiters bondige en doeltreffende implementering van algoritmes wat verband hou met dataverwerking as gevolg van sy vektor aard, ingeboude SQL dialek tolk en 'n baie suksesvolle stel biblioteek funksies.

Ek sal daarop let dat bogenoemde net deel is van wat Q kan doen, dit het ook ander unieke kenmerke. Byvoorbeeld, 'n uiters eenvoudige IPC-protokol wat die grens tussen individuele Q-prosesse uitvee en jou toelaat om honderde van hierdie prosesse in 'n enkele netwerk te kombineer, wat op dosyne bedieners in verskillende wΓͺrelddele geleΓ« kan wees.

Bron: will.com

Voeg 'n opmerking