Fitur basa Q lan KDB+ nggunakake conto layanan wektu nyata

Sampeyan bisa maca babagan apa basis KDB +, basa pemrograman Q, apa kekuwatan lan kelemahane ing sadurunge. artikel lan ringkes ing pambuka. Ing artikel kasebut, kita bakal ngetrapake layanan ing Q sing bakal ngolah aliran data sing mlebu lan ngetung macem-macem fungsi agregasi saben menit ing mode "wektu nyata" (yaiku, bakal duwe wektu kanggo ngetung kabeh sadurunge bagean data sabanjure). Fitur utama Q iku basa vektor sing ngijini sampeyan kanggo operate ora karo obyek siji, nanging karo array, susunan array lan obyek Komplek liyane. Basa kayata Q lan sanak-sedulure K, J, APL misuwur kanthi ringkes. Asring, program sing njupuk sawetara layar kode ing basa sing dikenal kaya Jawa bisa ditulis ing sawetara baris. Iki sing dakkarepake kanggo nuduhake ing artikel iki.

Fitur basa Q lan KDB+ nggunakake conto layanan wektu nyata

Pambuka

KDB+ minangka basis data kolom sing fokus ing jumlah data sing akeh banget, diurutake kanthi cara tartamtu (utamane miturut wektu). Iki digunakake utamane ing institusi finansial - bank, dana investasi, perusahaan asuransi. Basa Q minangka basa internal KDB+ sing ngidini sampeyan nggarap data iki kanthi efektif. Ideologi Q minangka ringkesan lan efisiensi, dene kejelasan dikorbanake. Iki dibenerake kanthi kasunyatan manawa basa vektor bakal angel dingerteni ing kasus apa wae, lan ringkesan lan kasugihan rekaman ngidini sampeyan ndeleng bagean sing luwih gedhe saka program kasebut ing layar siji, sing pungkasane luwih gampang dingerteni.

Ing artikel iki kita ngleksanakake program lengkap ing Q lan sampeyan bisa uga pengin nyoba metu. Kanggo nindakake iki, sampeyan butuh Q sing nyata. Sampeyan bisa ngundhuh versi 32-bit gratis ing situs web perusahaan kx - www.kx.com. Ing kana, yen sampeyan kasengsem, sampeyan bakal nemokake informasi referensi babagan Q, buku kasebut Q Kanggo Mortals lan macem-macem artikel babagan topik iki.

Formulasi masalah

Ana sumber sing ngirim tabel karo data saben 25 milliseconds. Wiwit KDB + digunakake utamane ing keuangan, kita bakal nganggep yen iki minangka tabel transaksi (dagang), sing nduweni kolom ing ngisor iki: wektu (wektu ing milliseconds), sym (sebutan perusahaan ing bursa saham - IBM, AAPL,…), rega (rega ing ngendi saham dituku), ukuran (ukuran transaksi). Interval 25 milidetik iku sembarang, ora cilik banget lan ora dawa banget. Anane tegese data teka menyang layanan sing wis buffered. Iku bakal gampang kanggo ngleksanakake buffering ing sisih layanan, kalebu buffering dinamis gumantung ing mbukak saiki, nanging kanggo gamblang, kita bakal fokus ing interval tetep.

Layanan kudu ngetung saben menit kanggo saben simbol sing mlebu saka kolom sym sakumpulan fungsi panggabungan - rega maksimal, rega rata-rata, ukuran jumlah, lsp. informasi migunani. Kanggo gamblang, kita bakal nganggep kabeh fungsi bisa diwilang incrementally, i.e. kanggo entuk nilai anyar, cukup ngerti rong nomer - nilai lawas lan mlebu. Contone, fungsi max, rata-rata, jumlah duwe properti iki, nanging fungsi median ora.

Kita uga bakal nganggep yen aliran data sing mlebu wis diurutake wektu. Iki bakal menehi kita kesempatan kanggo bisa mung karo menit pungkasan. Ing praktik, cukup kanggo bisa nggarap menit saiki lan sadurunge yen sawetara nganyari telat. Kanggo gamblang, kita ora bakal nimbang kasus iki.

Fungsi agregasi

Fungsi agregasi sing dibutuhake kapacak ing ngisor iki. Aku njupuk akeh sing bisa kanggo nambah beban ing layanan:

  • dhuwur - rega max - rega maksimum saben menit.
  • murah - rega min - rega minimal saben menit.
  • firstPrice - rega pisanan - rega pisanan saben menit.
  • lastPrice - rega pungkasan - rega pungkasan saben menit.
  • firstSize - ukuran pisanan - ukuran perdagangan pisanan saben menit.
  • lastSize - ukuran pungkasan - ukuran perdagangan pungkasan ing menit.
  • numTrades - count i - jumlah dagang saben menit.
  • volume - jumlah ukuran - jumlah saka ukuran perdagangan saben menit.
  • pvolume - jumlah rega - jumlah rega saben menit, dibutuhake kanggo avgPrice.
  • - jumlah rega turnover * ukuran - total volume transaksi saben menit.
  • avgPrice – pvolume%numTrades – rega rata-rata saben menit.
  • avgSize – volume%numTrades – ukuran perdagangan rata-rata saben menit.
  • vwap - volume turnover - rega rata-rata saben menit sing ditimbang miturut ukuran transaksi.
  • cumVolume - jumlah volume - ukuran akumulasi transaksi ing kabeh wektu.

Ayo langsung ngrembug siji titik sing ora jelas - carane miwiti kolom kasebut kanggo pisanan lan saben menit sabanjure. Sawetara kolom saka jinis FirstPrice kudu diinisialisasi dadi null saben wektu; regane ora ditemtokake. Jinis volume liyane kudu disetel dadi 0. Ana uga kolom sing mbutuhake pendekatan gabungan - contone, cumVolume kudu disalin saka menit sadurunge, lan kanggo sing pisanan disetel dadi 0. Ayo nyetel kabeh parameter kasebut nggunakake data kamus jinis (analog karo rekaman):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Aku nambahake sym lan wektu ing kamus kanggo penak, saiki initWith minangka baris sing wis siap saka tabel gabungan pungkasan, ing ngendi iku tetep nyetel sym lan wektu sing bener. Sampeyan bisa nggunakake kanggo nambah baris anyar menyang meja.

Kita butuh aggCols nalika nggawe fungsi agregasi. Dhaptar kasebut kudu diwalik amarga urutan ekspresi ing Q dievaluasi (saka tengen ngiwa). Tujuane kanggo mesthekake pitungan dadi saka dhuwur nganti cumVolume, amarga sawetara kolom gumantung saka sing sadurunge.

Kolom sing kudu disalin menyang menit anyar saka sing sadurunge, kolom sym ditambahake kanggo penak:

rollColumns:`sym`cumVolume;

Saiki ayo dibagi kolom dadi klompok miturut cara sing kudu dianyari. Telung jinis bisa dibedakake:

  1. Akumulator (volume, turnover, ..) - kita kudu nambah nilai sing mlebu menyang sing sadurunge.
  2. Kanthi titik khusus (dhuwur, kurang, ..) - nilai pisanan ing menit dijupuk saka data sing mlebu, liyane diwilang nggunakake fungsi kasebut.
  3. Ngaso. Tansah diwilang nggunakake fungsi.

Ayo nemtokake variabel kanggo kelas kasebut:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Urutan pitungan

Kita bakal nganyari tabel agregat ing rong tahap. Kanggo efisiensi, kita nyilikake tabel mlebu supaya mung siji baris kanggo saben karakter lan menit. Kasunyatan manawa kabeh fungsi kita minangka tambahan lan asosiatif njamin yen asil langkah tambahan iki ora bakal owah. Sampeyan bisa nyilikake tabel nggunakake pilih:

select high:max price, low:min price … by sym,time.minute from table

Cara iki duwe kerugian - set kolom sing wis ditemtokake wis ditemtokake. Untunge, ing Q, pilih uga dileksanakake minangka fungsi ing ngendi sampeyan bisa ngganti argumen sing digawe kanthi dinamis:

?[table;whereClause;byClause;selectClause]

Aku ora bakal njlèntrèhaké kanthi rinci babagan format argumen; ing kasus kita, mung dening lan pilih ekspresi sing ora pati penting lan kudu dadi kamus saka wangun kolom!ekspresi. Dadi, fungsi nyusut bisa ditetepake kaya ing ngisor iki:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Kanggo gamblang, Aku digunakake fungsi parse, kang dadi senar karo expression Q menyang Nilai sing bisa liwati kanggo fungsi eval lan kang dibutuhake ing fungsi pilih. Wigati uga yen preprocess ditetepake minangka proyeksi (yaiku, fungsi kanthi argumen sing ditemtokake sebagian) saka fungsi pilih, siji argumen (tabel) ilang. Yen kita ngetrapake preprocess menyang tabel, kita bakal entuk tabel sing dikompres.

Tahap kapindho yaiku nganyari tabel agregat. Ayo pisanan nulis algoritma ing pseudocode:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Ing Q, iku umum nggunakake map / ngurangi fungsi tinimbang puteran. Nanging amarga Q minangka basa vektor lan kita bisa kanthi gampang ngetrapake kabeh operasi menyang kabeh simbol bebarengan, mula kanggo perkiraan pisanan kita bisa nindakake tanpa loop, nindakake operasi ing kabeh simbol bebarengan:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Nanging kita bisa pindhah luwih, Q wis operator unik lan banget kuat - operator assignment umum. Iki ngidini sampeyan ngganti sakumpulan nilai ing struktur data kompleks nggunakake dhaptar indeks, fungsi lan argumen. Ing kasus kita katon kaya iki:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Sayange, kanggo nemtokake tabel sampeyan kudu dhaptar baris, ora kolom, lan sampeyan kudu transpose matriks (dhaftar kolom kanggo dhaptar baris) nggunakake fungsi flip. Iki larang kanggo meja gedhe, mula kita ngetrapake tugas umum kanggo saben kolom kanthi kapisah, nggunakake fungsi peta (sing katon kaya apostrof):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Kita maneh nggunakake proyeksi fungsi. Uga elinga yen ing Q, nggawe dhaptar uga fungsi lan kita bisa nyebataken nggunakake saben (peta) fungsi kanggo njaluk dhaptar dhaptar.

Kanggo mesthekake yen set kolom sing diwilang ora tetep, kita bakal nggawe ekspresi ing ndhuwur kanthi dinamis. Pisanan kita nemtokake fungsi kanggo ngetung saben kolom, nggunakake variabel baris lan inp kanggo ngrujuk data sing dikumpulake lan input:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Sawetara kolom khusus; nilai sing sepisanan ora kudu diitung kanthi fungsi kasebut. Kita bisa nemtokake manawa iku sing pisanan kanthi kolom [`numTrades] - yen ngemot 0, mula nilai kasebut pisanan. Q nduweni fungsi pilih - ?[Boolean list;list1;list2] - sing milih nilai saka dhaptar 1 utawa 2 gumantung saka kondisi ing argumen pisanan:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Ing kene aku nelpon tugas umum karo fungsiku (ekspresi ing kurung kriting). Nampa nilai saiki (argumen pisanan) lan argumen tambahan, sing dakliwati ing parameter 4.

Tambahake speaker baterei kanthi kapisah, amarga fungsine padha:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Iki minangka tugas normal miturut standar Q, nanging aku menehi dhaptar nilai sekaligus. Pungkasan, ayo nggawe fungsi utama:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Kanthi ekspresi iki, aku nggawe fungsi kanthi dinamis saka senar sing ngemot ekspresi sing dakwenehake ing ndhuwur. Asil bakal katon kaya iki:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Urutan evaluasi kolom diwalik amarga ing Q urutan evaluasi saka tengen ngiwa.

Saiki kita duwe rong fungsi utama sing dibutuhake kanggo petungan, kita mung kudu nambah infrastruktur sethithik lan layanan wis siyap.

Langkah pungkasan

Kita duwe fungsi preprocess lan updateAgg sing nindakake kabeh karya. Nanging isih perlu kanggo mesthekake transisi sing bener liwat menit lan ngetung indeks kanggo agregasi. Kaping pisanan, ayo nemtokake fungsi init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Kita uga bakal nemtokake fungsi gulung, sing bakal ngganti menit saiki:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Kita butuh fungsi kanggo nambah karakter anyar:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Lan pungkasane, fungsi upd (jeneng tradisional kanggo fungsi iki kanggo layanan Q), sing diarani klien kanggo nambah data:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Mekaten. Iki kode lengkap layanan kita, kaya sing dijanjekake, mung sawetara baris:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Tes

Ayo priksa kinerja layanan kasebut. Kanggo nindakake iki, ayo mbukak ing proses kapisah (sijine kode ing file service.q) lan nelpon fungsi init:

q service.q –p 5566

q)init[]

Ing console liyane, miwiti proses Q kapindho lan sambungake menyang pisanan:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Pisanan, ayo nggawe dhaptar simbol - 10000 potongan lan nambah fungsi kanggo nggawe tabel acak. Ing konsol kapindho:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Aku ditambahakΓ© telung simbol nyata kanggo dhaftar make iku luwih gampang kanggo katon ing meja. Fungsi rnd nggawe tabel acak karo n larik, ngendi wektu beda-beda gumantung saka t kanggo t + 25 milliseconds.

Saiki sampeyan bisa nyoba ngirim data menyang layanan (nambah sepuluh jam pisanan):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Sampeyan bisa mriksa ing layanan sing tabel wis dianyari:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Asil:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Saiki ayo nindakake tes beban kanggo ngerteni jumlah data sing bisa diproses saben menit. Ayo kula ngelingake yen kita nyetel interval nganyari dadi 25 milidetik. Patut, layanan kasebut kudu (rata-rata) pas karo paling sethithik 20 milidetik saben nganyari kanggo menehi wektu pangguna kanggo njaluk data. Ketik ing ngisor iki ing proses kapindho:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 iku rong menit. Sampeyan bisa nyoba mlaku dhisik kanggo 1000 baris saben 25 milidetik:

start 1000

Ing kasusku, asile watara saperangan milliseconds saben nganyari. Dadi aku bakal langsung nambah jumlah baris dadi 10.000:

start 10000

Asil:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Maneh, ora ana sing khusus, nanging iki 24 yuta garis saben menit, 400 ewu per detik. Kanggo luwih saka 25 milliseconds, nganyari mung kalem 5 kaping, ketoke nalika menit diganti. Ayo tambah dadi 100.000:

start 100000

Asil:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Kaya sing sampeyan ngerteni, layanan kasebut meh ora bisa ditindakake, nanging tetep bisa tetep. Volume data kasebut (240 yuta larik saben menit) gedhe banget; ing kasus kaya mengkono, umume ngluncurake sawetara klon (utawa malah puluhan klon) layanan kasebut, sing saben-saben mung ngolah bagean saka karakter. Isih, asil nyengsemaken kanggo basa sing diinterpretasikake sing fokus utamane ing panyimpenan data.

Pitakonan bisa uga muncul kenapa wektu tuwuh non-linear kanthi ukuran saben nganyari. Alesane yaiku fungsi shrink iku sejatine fungsi C, sing luwih efisien tinimbang updateAgg. Miwiti saka ukuran nganyari tartamtu (sekitar 10.000), updateAgg tekan langit-langit lan wektu eksekusi ora gumantung saka ukuran nganyari. Amarga langkah awal Q, layanan kasebut bisa nyerna volume data kasebut. Iki nyorot pentinge milih algoritma sing bener nalika nggarap data gedhe. Titik liyane yaiku panyimpenan data sing bener ing memori. Yen data ora disimpen columnarly utawa padha ora dhawuh dening wektu, kita bakal dadi menowo karo bab kaya TLB cache miss - ananΓ© alamat kaca memori ing cache alamat prosesor. Nelusuri alamat njupuk kira-kira 30 kaping maneh yen ora kasil, lan yen data kasebar, iku bisa alon mudhun layanan kaping pirang-pirang.

kesimpulan

Ing artikel iki, aku nuduhake yen database KDB + lan Q cocok ora mung kanggo nyimpen data gedhe lan gampang diakses liwat pilih, nanging uga kanggo nggawe layanan pangolahan data sing bisa nyerna atusan yuta larik / gigabyte data malah ing siji proses Q. Basa Q dhewe ngidini implementasine algoritma sing ringkes lan efisien sing ana gandhengane karo pangolahan data amarga sifat vektor, interpreter dialek SQL sing dibangun lan fungsi perpustakaan sing sukses banget.

Aku bakal Wigati sing ndhuwur mung bagean saka apa Q bisa nindakake, iku uga fitur unik liyane. Contone, protokol IPC sing gampang banget sing mbusak wates antarane proses Q individu lan ngidini sampeyan nggabungake atusan proses kasebut dadi jaringan siji, sing bisa ditemokake ing puluhan server ing macem-macem bagean ing saindenging jagad.

Source: www.habr.com

Add a comment