Anda boleh membaca tentang asas KDB+, bahasa pengaturcaraan Q, kekuatan dan kelemahan mereka dalam saya sebelum ini
Pengenalan
KDB+ ialah pangkalan data kolumnar yang memfokuskan pada jumlah data yang sangat besar, disusun mengikut cara tertentu (terutamanya mengikut masa). Ia digunakan terutamanya dalam institusi kewangan - bank, dana pelaburan, syarikat insurans. Bahasa Q ialah bahasa dalaman KDB+ yang membolehkan anda bekerja dengan data ini dengan berkesan. Ideologi Q adalah ringkas dan kecekapan, manakala kejelasan dikorbankan. Ini dibenarkan oleh fakta bahawa bahasa vektor akan sukar difahami dalam apa jua keadaan, dan keringkasan dan kekayaan rakaman membolehkan anda melihat sebahagian besar program pada satu skrin, yang akhirnya menjadikannya lebih mudah untuk difahami.
Dalam artikel ini kami melaksanakan program lengkap dalam Q dan anda mungkin ingin mencubanya. Untuk melakukan ini, anda memerlukan Q sebenar. Anda boleh memuat turun versi 32-bit percuma di laman web syarikat kx β
Pernyataan masalah
Terdapat sumber yang menghantar jadual dengan data setiap 25 milisaat. Oleh kerana KDB+ digunakan terutamanya dalam kewangan, kami akan menganggap bahawa ini ialah jadual urus niaga (dagangan), yang mempunyai lajur berikut: masa (masa dalam milisaat), sym (penetapan syarikat di bursa saham - IBM, AAPL,β¦), harga (harga di mana saham dibeli), saiz (saiz urus niaga). Selang 25 milisaat adalah sewenang-wenangnya, tidak terlalu kecil dan tidak terlalu lama. Kehadirannya bermakna bahawa data datang ke perkhidmatan yang telah ditimbal. Adalah mudah untuk melaksanakan penimbalan pada bahagian perkhidmatan, termasuk penimbalan dinamik bergantung pada beban semasa, tetapi untuk kesederhanaan, kami akan menumpukan pada selang tetap.
Perkhidmatan mesti mengira setiap minit untuk setiap simbol masuk dari lajur sym satu set fungsi pengagregatan - harga maks, harga purata, saiz jumlah, dsb. informasi berguna. Untuk kesederhanaan, kami akan menganggap bahawa semua fungsi boleh dikira secara berperingkat, i.e. untuk mendapatkan nilai baru, cukup untuk mengetahui dua nombor - nilai lama dan masuk. Sebagai contoh, fungsi maks, purata, jumlah mempunyai sifat ini, tetapi fungsi median tidak.
Kami juga akan menganggap bahawa aliran data masuk adalah masa yang ditempah. Ini akan memberi kita peluang untuk bekerja hanya pada saat-saat akhir. Dalam amalan, sudah cukup untuk dapat bekerja dengan minit semasa dan sebelumnya sekiranya beberapa kemas kini lewat. Untuk kesederhanaan, kami tidak akan mempertimbangkan kes ini.
Fungsi pengagregatan
Fungsi pengagregatan yang diperlukan disenaraikan di bawah. Saya mengambil sebanyak mungkin daripada mereka untuk meningkatkan beban pada perkhidmatan:
- tinggi β harga maks β harga maksimum seminit.
- rendah β harga min β harga minimum seminit.
- Harga pertama β harga pertama β harga pertama seminit.
- harga terakhir β harga terakhir β harga terakhir seminit.
- firstSize β saiz pertama β saiz dagangan pertama seminit.
- lastSize β saiz terakhir β saiz dagangan terakhir dalam satu minit.
- numTrades β kira i β bilangan dagangan seminit.
- volum β saiz jumlah β jumlah saiz dagangan seminit.
- pvolume β jumlah harga β jumlah harga seminit, diperlukan untuk avgPrice.
- β jumlah harga pusing ganti*saiz β jumlah volum urus niaga seminit.
- avgPrice β pvolume%numTrades β harga purata seminit.
- avgSize β volum%numTrades β purata saiz dagangan seminit.
- vwap β jumlah pusing ganti β harga purata seminit ditimbang mengikut saiz transaksi.
- cumVolume β jumlah volum β saiz terkumpul transaksi sepanjang masa.
Mari kita bincangkan dengan segera satu perkara yang tidak jelas - bagaimana untuk memulakan lajur ini untuk kali pertama dan untuk setiap minit berikutnya. Sesetengah lajur jenis FirstPrice mesti dimulakan kepada null setiap kali; nilainya tidak ditentukan. Jenis volum lain mesti sentiasa ditetapkan kepada 0. Terdapat juga lajur yang memerlukan pendekatan gabungan - contohnya, cumVolume mesti disalin dari minit sebelumnya dan untuk yang pertama ditetapkan kepada 0. Mari kita tetapkan semua parameter ini menggunakan data kamus jenis (sama dengan rekod):
// list ! list β ΡΠΎΠ·Π΄Π°ΡΡ ΡΠ»ΠΎΠ²Π°ΡΡ, 0n β float null, 0N β long null, `sym β ΡΠΈΠΏ ΡΠΈΠΌΠ²ΠΎΠ», `sym1`sym2 β ΡΠΏΠΈΡΠΎΠΊ ΡΠΈΠΌΠ²ΠΎΠ»ΠΎΠ²
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // ΡΠΏΠΈΡΠΎΠΊ Π²ΡΠ΅Ρ
Π²ΡΡΠΈΡΠ»ΡΠ΅ΠΌΡΡ
ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse ΠΎΠ±ΡΡΡΠ½Π΅Π½ Π½ΠΈΠΆΠ΅
Saya menambah sym dan masa pada kamus untuk kemudahan, kini initWith ialah baris siap dari jadual agregat akhir, di mana ia kekal untuk menetapkan sym dan masa yang betul. Anda boleh menggunakannya untuk menambah baris baharu pada jadual.
Kami memerlukan aggCols apabila mencipta fungsi pengagregatan. Senarai mesti diterbalikkan kerana susunan ungkapan dalam Q dinilai (dari kanan ke kiri). Matlamatnya adalah untuk memastikan pengiraan pergi dari tinggi kepada cumVolume, kerana beberapa lajur bergantung pada yang sebelumnya.
Lajur yang perlu disalin ke minit baharu daripada yang sebelumnya, lajur sym ditambah untuk kemudahan:
rollColumns:`sym`cumVolume;
Sekarang mari bahagikan lajur kepada kumpulan mengikut cara ia perlu dikemas kini. Tiga jenis boleh dibezakan:
- Akumulator (isipadu, pusing ganti,..) β kita mesti menambah nilai masuk kepada yang sebelumnya.
- Dengan titik khas (tinggi, rendah, ..) β nilai pertama dalam minit diambil daripada data masuk, selebihnya dikira menggunakan fungsi.
- Rehat. Sentiasa dikira menggunakan fungsi.
Mari kita tentukan pembolehubah untuk kelas ini:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Urutan pengiraan
Kami akan mengemas kini jadual agregat dalam dua peringkat. Untuk kecekapan, kami mula-mula mengecilkan jadual masuk supaya hanya terdapat satu baris untuk setiap aksara dan minit. Hakikat bahawa semua fungsi kami adalah tambahan dan bersekutu menjamin bahawa hasil daripada langkah tambahan ini tidak akan berubah. Anda boleh mengecilkan jadual menggunakan pilih:
select high:max price, low:min price β¦ by sym,time.minute from table
Kaedah ini mempunyai kelemahan - set lajur yang dikira dipratakrifkan. Nasib baik, dalam Q, pilih juga dilaksanakan sebagai fungsi di mana anda boleh menggantikan argumen yang dibuat secara dinamik:
?[table;whereClause;byClause;selectClause]
Saya tidak akan menerangkan secara terperinci format hujah; dalam kes kami, hanya oleh dan pilih ungkapan yang tidak penting dan ia harus menjadi kamus bagi bentuk lajur! ungkapan. Oleh itu, fungsi pengecutan boleh ditakrifkan seperti berikut:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each ΡΡΠΎ ΡΡΠ½ΠΊΡΠΈΡ map Π² Q Π΄Π»Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠΏΠΈΡΠΊΠ°
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Untuk kejelasan, saya menggunakan fungsi parse, yang menukar rentetan dengan ungkapan Q menjadi nilai yang boleh dihantar ke fungsi eval dan yang diperlukan dalam fungsi pilih. Juga ambil perhatian bahawa praproses ditakrifkan sebagai unjuran (iaitu, fungsi dengan argumen yang ditakrifkan separa) bagi fungsi pilih, satu argumen (jadual) tiada. Jika kita menggunakan praproses pada jadual, kita akan mendapat jadual termampat.
Peringkat kedua ialah mengemas kini jadual agregat. Mari kita mula-mula menulis algoritma dalam pseudokod:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
β¦
Dalam Q, adalah perkara biasa untuk menggunakan fungsi peta/kurangkan dan bukannya gelung. Tetapi oleh kerana Q ialah bahasa vektor dan kita boleh menggunakan semua operasi dengan mudah pada semua simbol sekaligus, maka pada anggaran pertama kita boleh lakukan tanpa gelung sama sekali, melakukan operasi pada semua simbol sekaligus:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
β¦
Tetapi kita boleh pergi lebih jauh, Q mempunyai pengendali yang unik dan sangat berkuasa - pengendali tugasan umum. Ia membolehkan anda menukar satu set nilai dalam struktur data yang kompleks menggunakan senarai indeks, fungsi dan hujah. Dalam kes kami ia kelihatan seperti ini:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;β¦]: function[target[idx 0;idx 1;β¦];argument], Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΡΡΠ½ΠΊΡΠΈΡ β ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Malangnya, untuk menetapkan kepada jadual anda memerlukan senarai baris, bukan lajur, dan anda perlu menukar matriks (senarai lajur kepada senarai baris) menggunakan fungsi flip. Ini mahal untuk jadual yang besar, jadi sebaliknya kami menggunakan tugasan umum pada setiap lajur secara berasingan, menggunakan fungsi peta (yang kelihatan seperti apostrof):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Kami sekali lagi menggunakan unjuran fungsi. Juga ambil perhatian bahawa dalam Q, mencipta senarai juga merupakan fungsi dan kita boleh memanggilnya menggunakan fungsi each(map) untuk mendapatkan senarai senarai.
Untuk memastikan set lajur yang dikira tidak tetap, kami akan mencipta ungkapan di atas secara dinamik. Mari kita tentukan dahulu fungsi untuk mengira setiap lajur, menggunakan pembolehubah baris dan inp untuk merujuk kepada data agregat dan input:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Sesetengah lajur adalah istimewa; nilai pertamanya tidak boleh dikira oleh fungsi. Kita boleh menentukan bahawa ia adalah yang pertama dengan lajur baris[`numTrades] - jika ia mengandungi 0, maka nilainya adalah yang pertama. Q mempunyai fungsi pilih - ?[Boolean list;list1;list2] - yang memilih nilai daripada senarai 1 atau 2 bergantung pada syarat dalam hujah pertama:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - ΡΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΠΎΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π΄Π»Ρ ΡΠ»ΡΡΠ°Ρ ΠΊΠΎΠ³Π΄Π° ΠΈΠ½Π΄Π΅ΠΊΡ Π½Π΅Π³Π»ΡΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Di sini saya memanggil tugasan umum dengan fungsi saya (ungkapan dalam pendakap kerinting). Ia menerima nilai semasa (hujah pertama) dan hujah tambahan, yang saya luluskan dalam parameter ke-4.
Mari tambah pembesar suara bateri secara berasingan, kerana fungsinya adalah sama untuk mereka:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Ini adalah tugasan biasa mengikut piawaian Q, tetapi saya memberikan senarai nilai sekaligus. Akhirnya, mari buat fungsi utama:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" ΠΏΡΠΈΡΠ²ΠΎΠΈΠΌ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΠΎΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ Π·Π°Π²ΠΈΡΡΡ ΠΎΡ ΡΠΆΠ΅ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΡΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅ΡΡΠΈΠΌ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΡ. ,β ΡΠ°ΡΡΠΈΡΡΠΎΠ²ΡΠ²Π°Π΅ΡΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs β String from Vector (sv), ΡΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΡΡΡΠΎΠΊ Π²ΡΡΠ°Π²Π»ΡΡ β;β ΠΏΠΎΡΡΠ΅Π΄ΠΈΠ½Π΅
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Dengan ungkapan ini, saya mencipta fungsi secara dinamik daripada rentetan yang mengandungi ungkapan yang saya berikan di atas. Hasilnya akan kelihatan seperti ini:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;β¦ ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
Susunan penilaian lajur disongsangkan kerana dalam Q susunan penilaian adalah dari kanan ke kiri.
Sekarang kita mempunyai dua fungsi utama yang diperlukan untuk pengiraan, kita hanya perlu menambah sedikit infrastruktur dan perkhidmatan sudah sedia.
Langkah akhir
Kami mempunyai fungsi praproses dan kemas kiniAgg yang melakukan semua kerja. Tetapi masih perlu untuk memastikan peralihan yang betul melalui minit dan mengira indeks untuk pengagregatan. Pertama sekali, mari kita tentukan fungsi init:
init:{
tradeAgg:: 0#enlist[initWith]; // ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΠΏΡΡΡΡΡ ΡΠΈΠΏΠΈΠ·ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ, enlist ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠ»ΠΎΠ²Π°ΡΡ Π² ΡΠ°Π±Π»ΠΈΡΡ, Π° 0# ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ Π²Π·ΡΡΡ 0 ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΠΈΠ· Π½Π΅Π΅
currTime::00:00; // Π½Π°ΡΠ½Π΅ΠΌ Ρ 0, :: ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π² Π³Π»ΠΎΠ±Π°Π»ΡΠ½ΡΡ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ
currSyms::`u#`symbol$(); // `u# - ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ Π² Π΄Π΅ΡΠ΅Π²ΠΎ, Π΄Π»Ρ ΡΡΠΊΠΎΡΠ΅Π½ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ²
offset::0; // ΠΈΠ½Π΄Π΅ΠΊΡ Π² tradeAgg, Π³Π΄Π΅ Π½Π°ΡΠΈΠ½Π°Π΅ΡΡΡ ΡΠ΅ΠΊΡΡΠ°Ρ ΠΌΠΈΠ½ΡΡΠ°
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // ΠΊΡΡ Π΄Π»Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, ΡΠ°Π±Π»ΠΈΡΠ° Ρ ΠΊΠ»ΡΡΠΎΠΌ sym
}
Kami juga akan menentukan fungsi roll, yang akan mengubah minit semasa:
roll:{[tm]
if[currTime>tm; :init[]]; // Π΅ΡΠ»ΠΈ ΠΏΠ΅ΡΠ΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡΡ, ΡΠΎ ΠΏΡΠΎΡΡΠΎ Π²ΡΠ·ΠΎΠ²Π΅ΠΌ init
rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ ΠΊΡΡ β Π²Π·ΡΡΡ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±ΡΠ΅Π·Π°ΡΡ, Π²ΡΡΠ°Π²ΠΈΡΡ Π² rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Kami memerlukan fungsi untuk menambah aksara baharu:
addSyms:{[syms]
currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠΏΠΈΡΠΎΠΊ ΠΈΠ·Π²Π΅ΡΡΠ½ΡΡ
// Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠ°Π±Π»ΠΈΡΡ sym, time ΠΈ rollColumns Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π²ΡΠΈΡΡ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΡΠΌ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ΠΌ.
// Π€ΡΠ½ΠΊΡΠΈΡ ^ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΠ΅Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ Π΄Π»Ρ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Π΅ΡΠ»ΠΈ ΡΠΈΠΌΠ²ΠΎΠ»Π° Π½Π΅Ρ Π² ΠΊΡΡΠ΅. value flip table Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² ΡΠ°Π±Π»ΠΈΡΠ΅.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
Dan akhirnya, fungsi upd (nama tradisional untuk fungsi ini untuk perkhidmatan Q), yang dipanggil oleh pelanggan untuk menambah data:
upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±ΡΡΠ½ΠΎ ΡΠ΅ΡΠ²ΠΈΡ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π΅Ρ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ°Π±Π»ΠΈΡ
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½ΡΠ΅ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡΡΡ
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // ΠΏΠΎΠΌΠ΅Π½ΡΠ΅ΠΌ ΠΌΠΈΠ½ΡΡΡ, Π΅ΡΠ»ΠΈ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ
data:select from data where time=tm; // ΡΠΈΠ»ΡΡΡΠ°ΡΠΈΡ
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²ΡΠ΅ ΡΠΈΠΌΠ²ΠΎΠ»Ρ
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ. Π€ΡΠ½ΠΊΡΠΈΡ ? ΠΈΡΠ΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡ ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΡΠΏΠΈΡΠΊΠ° ΡΠΏΡΠ°Π²Π° Π² ΡΠΏΠΈΡΠΊΠ΅ ΡΠ»Π΅Π²Π°.
};
Itu sahaja. Berikut ialah kod lengkap perkhidmatan kami, seperti yang dijanjikan, hanya beberapa baris:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Ujian
Mari kita semak prestasi perkhidmatan. Untuk melakukan ini, mari jalankannya dalam proses yang berasingan (letakkan kod dalam fail service.q) dan panggil fungsi init:
q service.q βp 5566
q)init[]
Dalam konsol lain, mulakan proses Q kedua dan sambung ke yang pertama:
h:hopen `:host:5566
h:hopen 5566 // Π΅ΡΠ»ΠΈ ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ Ρ
ΠΎΡΡΠ΅
Mula-mula, mari buat senarai simbol - 10000 keping dan tambah fungsi untuk mencipta jadual rawak. Dalam konsol kedua:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Saya menambah tiga simbol sebenar pada senarai untuk memudahkan mencarinya dalam jadual. Fungsi rnd mencipta jadual rawak dengan n baris, di mana masa berbeza dari t hingga t+25 milisaat.
Kini anda boleh cuba menghantar data ke perkhidmatan (tambah sepuluh jam pertama):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Anda boleh menyemak dalam perkhidmatan bahawa jadual telah dikemas kini:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Keputusan:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
Sekarang mari kita jalankan ujian beban untuk mengetahui jumlah data yang boleh diproses oleh perkhidmatan setiap minit. Biar saya ingatkan anda bahawa kami menetapkan selang kemas kini kepada 25 milisaat. Sehubungan itu, perkhidmatan mesti (secara purata) muat sekurang-kurangnya 20 milisaat setiap kemas kini untuk memberi masa kepada pengguna meminta data. Masukkan yang berikut dalam proses kedua:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 ialah dua minit. Anda boleh cuba jalankan dahulu untuk 1000 baris setiap 25 milisaat:
start 1000
Dalam kes saya, hasilnya adalah sekitar beberapa milisaat setiap kemas kini. Jadi saya akan segera meningkatkan bilangan baris kepada 10.000:
start 10000
Keputusan:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Sekali lagi, tiada yang istimewa, tetapi ini adalah 24 juta baris seminit, 400 ribu sesaat. Selama lebih daripada 25 milisaat, kemas kini perlahan hanya 5 kali, nampaknya apabila minit berubah. Mari meningkat kepada 100.000:
start 100000
Keputusan:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Seperti yang anda lihat, perkhidmatan itu hampir tidak dapat mengatasinya, tetapi ia berjaya untuk terus bertahan. Jumlah data sedemikian (240 juta baris seminit) adalah sangat besar; dalam kes sedemikian, adalah perkara biasa untuk melancarkan beberapa klon (atau bahkan berpuluh-puluh klon) perkhidmatan, yang setiap satunya memproses hanya sebahagian daripada aksara. Namun, hasilnya mengagumkan untuk bahasa yang ditafsirkan yang memberi tumpuan terutamanya pada penyimpanan data.
Persoalan mungkin timbul tentang mengapa masa berkembang secara tidak linear dengan saiz setiap kemas kini. Sebabnya ialah fungsi shrink sebenarnya adalah fungsi C, yang jauh lebih cekap daripada updateAgg. Bermula dari saiz kemas kini tertentu (sekitar 10.000), updateAgg mencapai silingnya dan kemudian masa pelaksanaannya tidak bergantung pada saiz kemas kini. Ia disebabkan oleh langkah awal Q bahawa perkhidmatan dapat mencerna volum data sedemikian. Ini menyerlahkan betapa pentingnya memilih algoritma yang betul apabila bekerja dengan data besar. Perkara lain ialah penyimpanan data yang betul dalam ingatan. Jika data tidak disimpan secara kolumnar atau tidak dipesan mengikut masa, maka kita akan menjadi biasa dengan perkara seperti kehilangan cache TLB - ketiadaan alamat halaman memori dalam cache alamat pemproses. Mencari alamat mengambil masa kira-kira 30 kali lebih lama jika tidak berjaya, dan jika data berselerak, ia boleh memperlahankan perkhidmatan beberapa kali.
Kesimpulan
Dalam artikel ini, saya menunjukkan bahawa pangkalan data KDB+ dan Q sesuai bukan sahaja untuk menyimpan data yang besar dan mengaksesnya dengan mudah melalui pilihan, tetapi juga untuk mencipta perkhidmatan pemprosesan data yang mampu mencerna ratusan juta baris/gigabait data walaupun dalam satu proses Q tunggal. Bahasa Q itu sendiri membenarkan pelaksanaan algoritma yang sangat ringkas dan cekap berkaitan pemprosesan data kerana sifat vektornya, penterjemah dialek SQL terbina dalam dan set fungsi perpustakaan yang sangat berjaya.
Saya akan ambil perhatian bahawa perkara di atas hanyalah sebahagian daripada apa yang boleh dilakukan oleh Q, ia juga mempunyai ciri unik yang lain. Sebagai contoh, protokol IPC yang sangat mudah yang memadamkan sempadan antara proses Q individu dan membolehkan anda menggabungkan beratus-ratus proses ini ke dalam satu rangkaian, yang boleh terletak pada berpuluh-puluh pelayan di bahagian dunia yang berbeza.
Sumber: www.habr.com