Ciri bahasa Q dan KDB+ menggunakan contoh perkhidmatan masa nyata

Anda boleh membaca tentang asas KDB+, bahasa pengaturcaraan Q, kekuatan dan kelemahan mereka dalam saya sebelum ini artikel dan secara ringkas dalam pengenalan. Dalam artikel tersebut, kami akan melaksanakan perkhidmatan pada Q yang akan memproses aliran data masuk dan mengira pelbagai fungsi pengagregatan setiap minit dalam mod "masa nyata" (iaitu, ia akan mempunyai masa untuk mengira semuanya sebelum bahagian data seterusnya). Ciri utama Q ialah ia adalah bahasa vektor yang membolehkan anda beroperasi bukan dengan objek tunggal, tetapi dengan tatasusunannya, tatasusunan tatasusunan dan objek kompleks lain. Bahasa seperti Q dan kerabatnya K, J, APL terkenal dengan singkatannya. Selalunya, program yang menggunakan beberapa skrin kod dalam bahasa biasa seperti Java boleh ditulis padanya dalam beberapa baris. Inilah yang saya ingin tunjukkan dalam artikel ini.

Ciri bahasa Q dan KDB+ menggunakan contoh perkhidmatan masa nyata

Pengenalan

KDB+ ialah pangkalan data kolumnar yang memfokuskan pada jumlah data yang sangat besar, disusun mengikut cara tertentu (terutamanya mengikut masa). Ia digunakan terutamanya dalam institusi kewangan - bank, dana pelaburan, syarikat insurans. Bahasa Q ialah bahasa dalaman KDB+ yang membolehkan anda bekerja dengan data ini dengan berkesan. Ideologi Q adalah ringkas dan kecekapan, manakala kejelasan dikorbankan. Ini dibenarkan oleh fakta bahawa bahasa vektor akan sukar difahami dalam apa jua keadaan, dan keringkasan dan kekayaan rakaman membolehkan anda melihat sebahagian besar program pada satu skrin, yang akhirnya menjadikannya lebih mudah untuk difahami.

Dalam artikel ini kami melaksanakan program lengkap dalam Q dan anda mungkin ingin mencubanya. Untuk melakukan ini, anda memerlukan Q sebenar. Anda boleh memuat turun versi 32-bit percuma di laman web syarikat kx – www.kx.com. Di sana, jika anda berminat, anda akan mendapat maklumat rujukan tentang Q, buku itu Q Untuk Manusia dan pelbagai artikel mengenai topik ini.

Pernyataan masalah

Terdapat sumber yang menghantar jadual dengan data setiap 25 milisaat. Oleh kerana KDB+ digunakan terutamanya dalam kewangan, kami akan menganggap bahawa ini ialah jadual urus niaga (dagangan), yang mempunyai lajur berikut: masa (masa dalam milisaat), sym (penetapan syarikat di bursa saham - IBM, AAPL,…), harga (harga di mana saham dibeli), saiz (saiz urus niaga). Selang 25 milisaat adalah sewenang-wenangnya, tidak terlalu kecil dan tidak terlalu lama. Kehadirannya bermakna bahawa data datang ke perkhidmatan yang telah ditimbal. Adalah mudah untuk melaksanakan penimbalan pada bahagian perkhidmatan, termasuk penimbalan dinamik bergantung pada beban semasa, tetapi untuk kesederhanaan, kami akan menumpukan pada selang tetap.

Perkhidmatan mesti mengira setiap minit untuk setiap simbol masuk dari lajur sym satu set fungsi pengagregatan - harga maks, harga purata, saiz jumlah, dsb. informasi berguna. Untuk kesederhanaan, kami akan menganggap bahawa semua fungsi boleh dikira secara berperingkat, i.e. untuk mendapatkan nilai baru, cukup untuk mengetahui dua nombor - nilai lama dan masuk. Sebagai contoh, fungsi maks, purata, jumlah mempunyai sifat ini, tetapi fungsi median tidak.

Kami juga akan menganggap bahawa aliran data masuk adalah masa yang ditempah. Ini akan memberi kita peluang untuk bekerja hanya pada saat-saat akhir. Dalam amalan, sudah cukup untuk dapat bekerja dengan minit semasa dan sebelumnya sekiranya beberapa kemas kini lewat. Untuk kesederhanaan, kami tidak akan mempertimbangkan kes ini.

Fungsi pengagregatan

Fungsi pengagregatan yang diperlukan disenaraikan di bawah. Saya mengambil sebanyak mungkin daripada mereka untuk meningkatkan beban pada perkhidmatan:

  • tinggi – harga maks – harga maksimum seminit.
  • rendah – harga min – harga minimum seminit.
  • Harga pertama – harga pertama – harga pertama seminit.
  • harga terakhir – harga terakhir – harga terakhir seminit.
  • firstSize – saiz pertama – saiz dagangan pertama seminit.
  • lastSize – saiz terakhir β€” saiz dagangan terakhir dalam satu minit.
  • numTrades – kira i – bilangan dagangan seminit.
  • volum – saiz jumlah – jumlah saiz dagangan seminit.
  • pvolume – jumlah harga – jumlah harga seminit, diperlukan untuk avgPrice.
  • – jumlah harga pusing ganti*saiz – jumlah volum urus niaga seminit.
  • avgPrice – pvolume%numTrades – harga purata seminit.
  • avgSize – volum%numTrades – purata saiz dagangan seminit.
  • vwap – jumlah pusing ganti – harga purata seminit ditimbang mengikut saiz transaksi.
  • cumVolume – jumlah volum – saiz terkumpul transaksi sepanjang masa.

Mari kita bincangkan dengan segera satu perkara yang tidak jelas - bagaimana untuk memulakan lajur ini untuk kali pertama dan untuk setiap minit berikutnya. Sesetengah lajur jenis FirstPrice mesti dimulakan kepada null setiap kali; nilainya tidak ditentukan. Jenis volum lain mesti sentiasa ditetapkan kepada 0. Terdapat juga lajur yang memerlukan pendekatan gabungan - contohnya, cumVolume mesti disalin dari minit sebelumnya dan untuk yang pertama ditetapkan kepada 0. Mari kita tetapkan semua parameter ini menggunakan data kamus jenis (sama dengan rekod):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Saya menambah sym dan masa pada kamus untuk kemudahan, kini initWith ialah baris siap dari jadual agregat akhir, di mana ia kekal untuk menetapkan sym dan masa yang betul. Anda boleh menggunakannya untuk menambah baris baharu pada jadual.

Kami memerlukan aggCols apabila mencipta fungsi pengagregatan. Senarai mesti diterbalikkan kerana susunan ungkapan dalam Q dinilai (dari kanan ke kiri). Matlamatnya adalah untuk memastikan pengiraan pergi dari tinggi kepada cumVolume, kerana beberapa lajur bergantung pada yang sebelumnya.

Lajur yang perlu disalin ke minit baharu daripada yang sebelumnya, lajur sym ditambah untuk kemudahan:

rollColumns:`sym`cumVolume;

Sekarang mari bahagikan lajur kepada kumpulan mengikut cara ia perlu dikemas kini. Tiga jenis boleh dibezakan:

  1. Akumulator (isipadu, pusing ganti,..) – kita mesti menambah nilai masuk kepada yang sebelumnya.
  2. Dengan titik khas (tinggi, rendah, ..) – nilai pertama dalam minit diambil daripada data masuk, selebihnya dikira menggunakan fungsi.
  3. Rehat. Sentiasa dikira menggunakan fungsi.

Mari kita tentukan pembolehubah untuk kelas ini:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Urutan pengiraan

Kami akan mengemas kini jadual agregat dalam dua peringkat. Untuk kecekapan, kami mula-mula mengecilkan jadual masuk supaya hanya terdapat satu baris untuk setiap aksara dan minit. Hakikat bahawa semua fungsi kami adalah tambahan dan bersekutu menjamin bahawa hasil daripada langkah tambahan ini tidak akan berubah. Anda boleh mengecilkan jadual menggunakan pilih:

select high:max price, low:min price … by sym,time.minute from table

Kaedah ini mempunyai kelemahan - set lajur yang dikira dipratakrifkan. Nasib baik, dalam Q, pilih juga dilaksanakan sebagai fungsi di mana anda boleh menggantikan argumen yang dibuat secara dinamik:

?[table;whereClause;byClause;selectClause]

Saya tidak akan menerangkan secara terperinci format hujah; dalam kes kami, hanya oleh dan pilih ungkapan yang tidak penting dan ia harus menjadi kamus bagi bentuk lajur! ungkapan. Oleh itu, fungsi pengecutan boleh ditakrifkan seperti berikut:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Untuk kejelasan, saya menggunakan fungsi parse, yang menukar rentetan dengan ungkapan Q menjadi nilai yang boleh dihantar ke fungsi eval dan yang diperlukan dalam fungsi pilih. Juga ambil perhatian bahawa praproses ditakrifkan sebagai unjuran (iaitu, fungsi dengan argumen yang ditakrifkan separa) bagi fungsi pilih, satu argumen (jadual) tiada. Jika kita menggunakan praproses pada jadual, kita akan mendapat jadual termampat.

Peringkat kedua ialah mengemas kini jadual agregat. Mari kita mula-mula menulis algoritma dalam pseudokod:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Dalam Q, adalah perkara biasa untuk menggunakan fungsi peta/kurangkan dan bukannya gelung. Tetapi oleh kerana Q ialah bahasa vektor dan kita boleh menggunakan semua operasi dengan mudah pada semua simbol sekaligus, maka pada anggaran pertama kita boleh lakukan tanpa gelung sama sekali, melakukan operasi pada semua simbol sekaligus:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Tetapi kita boleh pergi lebih jauh, Q mempunyai pengendali yang unik dan sangat berkuasa - pengendali tugasan umum. Ia membolehkan anda menukar satu set nilai dalam struktur data yang kompleks menggunakan senarai indeks, fungsi dan hujah. Dalam kes kami ia kelihatan seperti ini:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Malangnya, untuk menetapkan kepada jadual anda memerlukan senarai baris, bukan lajur, dan anda perlu menukar matriks (senarai lajur kepada senarai baris) menggunakan fungsi flip. Ini mahal untuk jadual yang besar, jadi sebaliknya kami menggunakan tugasan umum pada setiap lajur secara berasingan, menggunakan fungsi peta (yang kelihatan seperti apostrof):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Kami sekali lagi menggunakan unjuran fungsi. Juga ambil perhatian bahawa dalam Q, mencipta senarai juga merupakan fungsi dan kita boleh memanggilnya menggunakan fungsi each(map) untuk mendapatkan senarai senarai.

Untuk memastikan set lajur yang dikira tidak tetap, kami akan mencipta ungkapan di atas secara dinamik. Mari kita tentukan dahulu fungsi untuk mengira setiap lajur, menggunakan pembolehubah baris dan inp untuk merujuk kepada data agregat dan input:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Sesetengah lajur adalah istimewa; nilai pertamanya tidak boleh dikira oleh fungsi. Kita boleh menentukan bahawa ia adalah yang pertama dengan lajur baris[`numTrades] - jika ia mengandungi 0, maka nilainya adalah yang pertama. Q mempunyai fungsi pilih - ?[Boolean list;list1;list2] - yang memilih nilai daripada senarai 1 atau 2 bergantung pada syarat dalam hujah pertama:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Di sini saya memanggil tugasan umum dengan fungsi saya (ungkapan dalam pendakap kerinting). Ia menerima nilai semasa (hujah pertama) dan hujah tambahan, yang saya luluskan dalam parameter ke-4.

Mari tambah pembesar suara bateri secara berasingan, kerana fungsinya adalah sama untuk mereka:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Ini adalah tugasan biasa mengikut piawaian Q, tetapi saya memberikan senarai nilai sekaligus. Akhirnya, mari buat fungsi utama:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Dengan ungkapan ini, saya mencipta fungsi secara dinamik daripada rentetan yang mengandungi ungkapan yang saya berikan di atas. Hasilnya akan kelihatan seperti ini:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Susunan penilaian lajur disongsangkan kerana dalam Q susunan penilaian adalah dari kanan ke kiri.

Sekarang kita mempunyai dua fungsi utama yang diperlukan untuk pengiraan, kita hanya perlu menambah sedikit infrastruktur dan perkhidmatan sudah sedia.

Langkah akhir

Kami mempunyai fungsi praproses dan kemas kiniAgg yang melakukan semua kerja. Tetapi masih perlu untuk memastikan peralihan yang betul melalui minit dan mengira indeks untuk pengagregatan. Pertama sekali, mari kita tentukan fungsi init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Kami juga akan menentukan fungsi roll, yang akan mengubah minit semasa:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Kami memerlukan fungsi untuk menambah aksara baharu:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Dan akhirnya, fungsi upd (nama tradisional untuk fungsi ini untuk perkhidmatan Q), yang dipanggil oleh pelanggan untuk menambah data:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Itu sahaja. Berikut ialah kod lengkap perkhidmatan kami, seperti yang dijanjikan, hanya beberapa baris:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Ujian

Mari kita semak prestasi perkhidmatan. Untuk melakukan ini, mari jalankannya dalam proses yang berasingan (letakkan kod dalam fail service.q) dan panggil fungsi init:

q service.q –p 5566

q)init[]

Dalam konsol lain, mulakan proses Q kedua dan sambung ke yang pertama:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Mula-mula, mari buat senarai simbol - 10000 keping dan tambah fungsi untuk mencipta jadual rawak. Dalam konsol kedua:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Saya menambah tiga simbol sebenar pada senarai untuk memudahkan mencarinya dalam jadual. Fungsi rnd mencipta jadual rawak dengan n baris, di mana masa berbeza dari t hingga t+25 milisaat.

Kini anda boleh cuba menghantar data ke perkhidmatan (tambah sepuluh jam pertama):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Anda boleh menyemak dalam perkhidmatan bahawa jadual telah dikemas kini:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Keputusan:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Sekarang mari kita jalankan ujian beban untuk mengetahui jumlah data yang boleh diproses oleh perkhidmatan setiap minit. Biar saya ingatkan anda bahawa kami menetapkan selang kemas kini kepada 25 milisaat. Sehubungan itu, perkhidmatan mesti (secara purata) muat sekurang-kurangnya 20 milisaat setiap kemas kini untuk memberi masa kepada pengguna meminta data. Masukkan yang berikut dalam proses kedua:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 ialah dua minit. Anda boleh cuba jalankan dahulu untuk 1000 baris setiap 25 milisaat:

start 1000

Dalam kes saya, hasilnya adalah sekitar beberapa milisaat setiap kemas kini. Jadi saya akan segera meningkatkan bilangan baris kepada 10.000:

start 10000

Keputusan:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Sekali lagi, tiada yang istimewa, tetapi ini adalah 24 juta baris seminit, 400 ribu sesaat. Selama lebih daripada 25 milisaat, kemas kini perlahan hanya 5 kali, nampaknya apabila minit berubah. Mari meningkat kepada 100.000:

start 100000

Keputusan:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Seperti yang anda lihat, perkhidmatan itu hampir tidak dapat mengatasinya, tetapi ia berjaya untuk terus bertahan. Jumlah data sedemikian (240 juta baris seminit) adalah sangat besar; dalam kes sedemikian, adalah perkara biasa untuk melancarkan beberapa klon (atau bahkan berpuluh-puluh klon) perkhidmatan, yang setiap satunya memproses hanya sebahagian daripada aksara. Namun, hasilnya mengagumkan untuk bahasa yang ditafsirkan yang memberi tumpuan terutamanya pada penyimpanan data.

Persoalan mungkin timbul tentang mengapa masa berkembang secara tidak linear dengan saiz setiap kemas kini. Sebabnya ialah fungsi shrink sebenarnya adalah fungsi C, yang jauh lebih cekap daripada updateAgg. Bermula dari saiz kemas kini tertentu (sekitar 10.000), updateAgg mencapai silingnya dan kemudian masa pelaksanaannya tidak bergantung pada saiz kemas kini. Ia disebabkan oleh langkah awal Q bahawa perkhidmatan dapat mencerna volum data sedemikian. Ini menyerlahkan betapa pentingnya memilih algoritma yang betul apabila bekerja dengan data besar. Perkara lain ialah penyimpanan data yang betul dalam ingatan. Jika data tidak disimpan secara kolumnar atau tidak dipesan mengikut masa, maka kita akan menjadi biasa dengan perkara seperti kehilangan cache TLB - ketiadaan alamat halaman memori dalam cache alamat pemproses. Mencari alamat mengambil masa kira-kira 30 kali lebih lama jika tidak berjaya, dan jika data berselerak, ia boleh memperlahankan perkhidmatan beberapa kali.

Kesimpulan

Dalam artikel ini, saya menunjukkan bahawa pangkalan data KDB+ dan Q sesuai bukan sahaja untuk menyimpan data yang besar dan mengaksesnya dengan mudah melalui pilihan, tetapi juga untuk mencipta perkhidmatan pemprosesan data yang mampu mencerna ratusan juta baris/gigabait data walaupun dalam satu proses Q tunggal. Bahasa Q itu sendiri membenarkan pelaksanaan algoritma yang sangat ringkas dan cekap berkaitan pemprosesan data kerana sifat vektornya, penterjemah dialek SQL terbina dalam dan set fungsi perpustakaan yang sangat berjaya.

Saya akan ambil perhatian bahawa perkara di atas hanyalah sebahagian daripada apa yang boleh dilakukan oleh Q, ia juga mempunyai ciri unik yang lain. Sebagai contoh, protokol IPC yang sangat mudah yang memadamkan sempadan antara proses Q individu dan membolehkan anda menggabungkan beratus-ratus proses ini ke dalam satu rangkaian, yang boleh terletak pada berpuluh-puluh pelayan di bahagian dunia yang berbeza.

Sumber: www.habr.com

Tambah komen