Fitur bahasa Q dan KDB+ menggunakan contoh layanan real-time

Anda bisa membaca tentang apa itu basis KDB+, bahasa pemrograman Q, apa kelebihan dan kekurangannya pada saya sebelumnya Artikel dan secara singkat dalam pendahuluan. Dalam artikel ini, kami akan mengimplementasikan layanan pada Q yang akan memproses aliran data masuk dan menghitung berbagai fungsi agregasi setiap menit dalam mode "waktu nyata" (yaitu, ia memiliki waktu untuk menghitung semuanya sebelum bagian data berikutnya). Fitur utama Q adalah bahasa vektor yang memungkinkan Anda untuk beroperasi tidak dengan objek tunggal, tetapi dengan arraynya, array dari array, dan objek kompleks lainnya. Bahasa seperti Q dan kerabatnya K, J, APL terkenal dengan singkatnya. Seringkali, sebuah program yang mengambil beberapa layar kode dalam bahasa familiar seperti Java dapat ditulis dalam beberapa baris. Inilah yang ingin saya tunjukkan dalam artikel ini.

Fitur bahasa Q dan KDB+ menggunakan contoh layanan real-time

pengenalan

KDB+ adalah database kolom yang berfokus pada data dalam jumlah sangat besar, diurutkan dengan cara tertentu (terutama berdasarkan waktu). Ini digunakan terutama di lembaga keuangan - bank, dana investasi, perusahaan asuransi. Bahasa Q adalah bahasa internal KDB+ yang memungkinkan Anda bekerja secara efektif dengan data ini. Ideologi Q adalah keringkasan dan efisiensi, sedangkan kejelasan dikorbankan. Hal ini dibenarkan oleh fakta bahwa bahasa vektor akan sulit untuk dipahami, dan singkatnya serta kekayaan rekaman memungkinkan Anda melihat lebih banyak bagian program pada satu layar, yang pada akhirnya membuatnya lebih mudah untuk dipahami.

Pada artikel ini kami menerapkan program lengkap di Q dan Anda mungkin ingin mencobanya. Untuk melakukan ini, Anda memerlukan Q yang sebenarnya. Anda dapat mengunduh versi 32-bit gratis di situs web perusahaan kx – www.kx.com. Di sana, jika Anda tertarik, Anda akan menemukan informasi referensi tentang Q, bukunya Q Untuk Manusia dan berbagai artikel tentang topik ini.

Pernyataan masalah

Ada sumber yang mengirimkan tabel dengan data setiap 25 milidetik. Karena KDB+ terutama digunakan di bidang keuangan, kita asumsikan bahwa ini adalah tabel transaksi (perdagangan), yang memiliki kolom berikut: time (waktu dalam milidetik), sym (sebutan perusahaan di bursa saham - IBM, AAPL,…), price (harga pembelian saham), size (ukuran transaksi). Interval 25 milidetik itu sewenang-wenang, tidak terlalu kecil dan tidak terlalu panjang. Kehadirannya berarti data yang masuk ke layanan sudah di-buffer. Akan mudah untuk menerapkan buffering di sisi layanan, termasuk buffering dinamis tergantung pada beban saat ini, namun untuk kesederhanaan, kami akan fokus pada interval tetap.

Layanan harus menghitung setiap menit untuk setiap simbol yang masuk dari kolom sym serangkaian fungsi agregasi - harga maksimal, harga rata-rata, ukuran jumlah, dll. informasi berguna. Untuk mempermudah, kita asumsikan bahwa semua fungsi dapat dihitung secara bertahap, yaitu. untuk mendapatkan nilai baru, cukup mengetahui dua angka - nilai lama dan nilai masuk. Misalnya, fungsi maks, rata-rata, jumlah mempunyai sifat ini, tetapi fungsi median tidak.

Kami juga akan berasumsi bahwa aliran data yang masuk diurutkan berdasarkan waktu. Ini akan memberi kita kesempatan untuk bekerja hanya pada menit-menit terakhir. Dalam praktiknya, cukup bekerja dengan menit saat ini dan sebelumnya jika ada pembaruan yang terlambat. Untuk mempermudah, kami tidak akan mempertimbangkan kasus ini.

Fungsi agregasi

Fungsi agregasi yang diperlukan tercantum di bawah ini. Saya mengambil sebanyak mungkin untuk menambah beban layanan:

  • tinggi – harga maksimal – harga maksimal per menit.
  • rendah – harga minimum – harga minimum per menit.
  • Harga pertama – harga pertama – harga pertama per menit.
  • lastPrice – harga terakhir – harga terakhir per menit.
  • firstSize – ukuran pertama – ukuran perdagangan pertama per menit.
  • lastSize – ukuran terakhir β€” ukuran perdagangan terakhir dalam satu menit.
  • numTrades – hitung i – jumlah perdagangan per menit.
  • volume – jumlah ukuran – jumlah ukuran perdagangan per menit.
  • pvolume – jumlah harga – jumlah harga per menit, diperlukan untuk harga rata-rata.
  • – jumlah harga turnover*ukuran – total volume transaksi per menit.
  • avgPrice – pvolume%numTrades – harga rata-rata per menit.
  • avgSize – volume%numTrades – rata-rata ukuran perdagangan per menit.
  • vwap – turnover%volume – harga rata-rata per menit berdasarkan ukuran transaksi.
  • cumVolume – jumlah volume – akumulasi ukuran transaksi sepanjang waktu.

Mari kita segera membahas satu hal yang tidak jelas - cara menginisialisasi kolom ini untuk pertama kalinya dan untuk setiap menit berikutnya. Beberapa kolom tipe firstPrice harus diinisialisasi ke null setiap kali; nilainya tidak ditentukan. Tipe volume lainnya harus selalu disetel ke 0. Ada juga kolom yang memerlukan pendekatan gabungan - misalnya, cumVolume harus disalin dari menit sebelumnya, dan untuk menit pertama disetel ke 0. Mari atur semua parameter ini menggunakan data kamus ketik (analog dengan catatan):

// list ! list – ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ, 0n – float null, 0N – long null, `sym – Ρ‚ΠΈΠΏ символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всСх вычисляСмых ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse объяснСн Π½ΠΈΠΆΠ΅

Saya menambahkan sim dan waktu ke kamus untuk kenyamanan, sekarang initWith adalah baris siap pakai dari tabel agregat terakhir, yang tersisa untuk menyetel sim dan waktu yang benar. Anda dapat menggunakannya untuk menambahkan baris baru ke tabel.

Kita memerlukan aggCols saat membuat fungsi agregasi. Daftar harus dibalik karena urutan evaluasi ekspresi dalam Q (dari kanan ke kiri). Tujuannya adalah untuk memastikan penghitungan beralih dari tinggi ke cumVolume, karena beberapa kolom bergantung pada kolom sebelumnya.

Kolom yang perlu disalin ke menit baru dari menit sebelumnya, kolom simbol ditambahkan untuk kenyamanan:

rollColumns:`sym`cumVolume;

Sekarang mari kita bagi kolom menjadi beberapa kelompok sesuai dengan cara memperbaruinya. Tiga jenis dapat dibedakan:

  1. Akumulator (volume, omset,..) – kita harus menambahkan nilai yang masuk ke nilai sebelumnya.
  2. Dengan titik khusus (tinggi, rendah, ..) – nilai pertama dalam satu menit diambil dari data yang masuk, sisanya dihitung menggunakan fungsi.
  3. Istirahat. Selalu dihitung menggunakan suatu fungsi.

Mari kita definisikan variabel untuk kelas-kelas ini:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Urutan perhitungan

Kami akan memperbarui tabel gabungan dalam dua tahap. Untuk efisiensi, pertama-tama kita mengecilkan tabel masuk sehingga hanya ada satu baris untuk setiap karakter dan menit. Fakta bahwa semua fungsi kami bersifat inkremental dan asosiatif menjamin bahwa hasil dari langkah tambahan ini tidak akan berubah. Anda dapat mengecilkan tabel menggunakan pilih:

select high:max price, low:min price … by sym,time.minute from table

Metode ini memiliki kelemahan - kumpulan kolom terhitung sudah ditentukan sebelumnya. Untungnya, di Q, pilih juga diimplementasikan sebagai fungsi di mana Anda dapat mengganti argumen yang dibuat secara dinamis:

?[table;whereClause;byClause;selectClause]

Saya tidak akan menjelaskan secara rinci format argumen; dalam kasus kami, hanya ekspresi by dan select yang bersifat nontrivial dan harus berupa kamus kolom formulir!ekspresi. Dengan demikian, fungsi penyusutan dapat didefinisikan sebagai berikut:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map Π² Q для ΠΎΠ΄Π½ΠΎΠ³ΠΎ списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Untuk kejelasan, saya menggunakan fungsi parse, yang mengubah string dengan ekspresi Q menjadi nilai yang dapat diteruskan ke fungsi eval dan diperlukan dalam fungsi pemilihan. Perhatikan juga bahwa praproses didefinisikan sebagai proyeksi (yaitu, fungsi dengan argumen yang ditentukan sebagian) dari fungsi pilih, satu argumen (tabel) hilang. Jika kita menerapkan preprocess ke sebuah tabel, kita akan mendapatkan tabel terkompresi.

Tahap kedua adalah memperbarui tabel agregat. Mari kita tulis dulu algoritmanya dalam pseudocode:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Di Q, biasanya menggunakan fungsi map/reduce daripada loop. Tapi karena Q adalah bahasa vektor dan kita dapat dengan mudah menerapkan semua operasi ke semua simbol sekaligus, maka pada pendekatan pertama kita dapat melakukannya tanpa loop sama sekali, melakukan operasi pada semua simbol sekaligus:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Tapi kita bisa melangkah lebih jauh, Q memiliki operator yang unik dan sangat kuat - operator penugasan yang digeneralisasi. Ini memungkinkan Anda mengubah sekumpulan nilai dalam struktur data yang kompleks menggunakan daftar indeks, fungsi, dan argumen. Dalam kasus kami, tampilannya seperti ini:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], Π² нашСм случаС функция – это присваиваниС
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Sayangnya, untuk menetapkan ke tabel Anda memerlukan daftar baris, bukan kolom, dan Anda harus mengubah urutan matriks (daftar kolom ke daftar baris) menggunakan fungsi flip. Ini mahal untuk tabel besar, jadi kami menerapkan penugasan umum ke setiap kolom secara terpisah, menggunakan fungsi peta (yang terlihat seperti tanda kutip):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Kami kembali menggunakan proyeksi fungsi. Perhatikan juga bahwa di Q, membuat daftar juga merupakan sebuah fungsi dan kita dapat memanggilnya menggunakan fungsi every(map) untuk mendapatkan daftar daftar.

Untuk memastikan bahwa kumpulan kolom terhitung tidak tetap, kami akan membuat ekspresi di atas secara dinamis. Pertama-tama mari kita definisikan fungsi untuk menghitung setiap kolom, menggunakan variabel baris dan input untuk merujuk pada data gabungan dan input:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Beberapa kolom bersifat khusus; nilai pertamanya tidak boleh dihitung oleh fungsi. Kita dapat menentukan bahwa ini adalah yang pertama dengan kolom baris[`numTrades] - jika berisi 0, maka nilainya adalah yang pertama. Q memiliki fungsi pilih - ?[Boolean list;list1;list2] - yang memilih nilai dari daftar 1 atau 2 bergantung pada kondisi di argumen pertama:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - Ρ‚ΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½ΠΎΠ΅ присваиваниС для случая ΠΊΠΎΠ³Π΄Π° индСкс Π½Π΅Π³Π»ΡƒΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Di sini saya memanggil tugas umum dengan fungsi saya (ekspresi dalam kurung kurawal). Ia menerima nilai saat ini (argumen pertama) dan argumen tambahan, yang saya sampaikan pada parameter ke-4.

Mari kita tambahkan speaker baterai secara terpisah, karena fungsinya sama:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Ini adalah tugas normal menurut standar Q, tetapi saya menetapkan daftar nilai sekaligus. Terakhir, mari buat fungsi utama:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычислСнноС Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡ‚ΠΎΠΌΡƒ Ρ‡Ρ‚ΠΎ Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ зависят ΠΎΡ‚ ΡƒΠΆΠ΅ вычислСнных Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅Ρ€ΡˆΠΈΠΌ созданиС присваивания. ,’ Ρ€Π°ΡΡˆΠΈΡ„Ρ€ΠΎΠ²Ρ‹Π²Π°Π΅Ρ‚ΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs – String from Vector (sv), соСдиняСт список строк вставляя β€œ;” посрСдинС
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Dengan ekspresi ini, saya secara dinamis membuat fungsi dari string yang berisi ekspresi yang saya berikan di atas. Hasilnya akan terlihat seperti ini:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Urutan evaluasi kolom dibalik karena pada Q urutan evaluasinya dari kanan ke kiri.

Sekarang kita memiliki dua fungsi utama yang diperlukan untuk perhitungan, kita hanya perlu menambahkan sedikit infrastruktur dan layanan siap.

Langkah terakhir

Kami memiliki fungsi praproses dan updateAgg yang melakukan semua pekerjaan. Namun tetap perlu memastikan transisi yang benar melalui menit dan menghitung indeks untuk agregasi. Pertama-tama, mari kita definisikan fungsi init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаСм ΠΏΡƒΡΡ‚ΡƒΡŽ Ρ‚ΠΈΠΏΠΈΠ·ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, enlist ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ ΡΠ»ΠΎΠ²Π°Ρ€ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ, Π° 0# ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ Π²Π·ΡΡ‚ΡŒ 0 элСмСнтов ΠΈΠ· Π½Π΅Π΅
  currTime::00:00; // Π½Π°Ρ‡Π½Π΅ΠΌ с 0, :: ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ присваиваниС Π² Π³Π»ΠΎΠ±Π°Π»ΡŒΠ½ΡƒΡŽ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Π½Π½ΡƒΡŽ
  currSyms::`u#`symbol$(); // `u# - ΠΏΡ€Π΅Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список Π² Π΄Π΅Ρ€Π΅Π²ΠΎ, для ускорСния поиска элСмСнтов
  offset::0; // индСкс Π² tradeAgg, Π³Π΄Π΅ начинаСтся тСкущая ΠΌΠΈΠ½ΡƒΡ‚Π° 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для послСдних Π·Π½Π°Ρ‡Π΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ρ‚Π°Π±Π»ΠΈΡ†Π° с ΠΊΠ»ΡŽΡ‡ΠΎΠΌ sym
 }

Kami juga akan mendefinisikan fungsi roll, yang akan mengubah menit saat ini:

roll:{[tm]
  if[currTime>tm; :init[]]; // Ссли ΠΏΠ΅Ρ€Π΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡ‡ΡŒ, Ρ‚ΠΎ просто Π²Ρ‹Π·ΠΎΠ²Π΅ΠΌ init
  rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ кэш – Π²Π·ΡΡ‚ΡŒ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±Ρ€Π΅Π·Π°Ρ‚ΡŒ, Π²ΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π² rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Kita memerlukan fungsi untuk menambahkan karakter baru:

addSyms:{[syms]
  currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² список извСстных
  // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ sym, time ΠΈ rollColumns воспользовавшись ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΌ присваиваниСм.
  // Ѐункция ^ подставляСт значСния ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ для roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Ссли символа Π½Π΅Ρ‚ Π² кэшС. value flip table Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅Ρ‚ список ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Dan terakhir, fungsi upd (nama tradisional fungsi ini untuk layanan Q), yang dipanggil oleh klien untuk menambahkan data:

upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡƒΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎ сСрвис ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅Ρ‚ нСсколько Ρ‚Π°Π±Π»ΠΈΡ† 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½Ρ‹Π΅ для ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡƒΡ‚Ρ‹
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // помСняСм ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Ссли Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ
  data:select from data where time=tm; // Ρ„ΠΈΠ»ΡŒΡ‚Ρ€Π°Ρ†ΠΈΡ
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²Ρ‹Π΅ символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½ΡƒΡŽ Ρ‚Π°Π±Π»ΠΈΡ†Ρƒ. Ѐункция ? ΠΈΡ‰Π΅Ρ‚ индСкс элСмСнтов списка справа Π² спискС слСва.
 };

Itu saja. Berikut kode lengkap layanan kami, seperti yang dijanjikan, hanya beberapa baris:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Pengujian

Mari kita periksa kinerja layanan. Untuk melakukan ini, jalankan dalam proses terpisah (letakkan kode di file service.q) dan panggil fungsi init:

q service.q –p 5566

q)init[]

Di konsol lain, mulai proses Q kedua dan sambungkan ke yang pertama:

h:hopen `:host:5566
h:hopen 5566 // Ссли ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ хостС

Pertama, mari buat daftar simbol - 10000 buah dan tambahkan fungsi untuk membuat tabel acak. Di konsol kedua:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Saya menambahkan tiga simbol nyata ke dalam daftar untuk memudahkan mencarinya di tabel. Fungsi rnd membuat tabel acak dengan n baris, dengan waktu bervariasi dari t hingga t+25 milidetik.

Sekarang Anda dapat mencoba mengirim data ke layanan (tambahkan sepuluh jam pertama):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Anda dapat memeriksa di layanan bahwa tabel telah diperbarui:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Hasilnya:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Sekarang mari kita lakukan pengujian beban untuk mengetahui berapa banyak data yang dapat diproses oleh layanan per menit. Izinkan saya mengingatkan Anda bahwa kami menyetel interval pembaruan menjadi 25 milidetik. Oleh karena itu, layanan (rata-rata) harus memenuhi setidaknya 20 milidetik per pembaruan agar pengguna memiliki waktu untuk meminta data. Masukkan yang berikut ini pada proses kedua:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 adalah dua menit. Anda dapat mencoba menjalankannya terlebih dahulu selama 1000 baris setiap 25 milidetik:

start 1000

Dalam kasus saya, hasilnya sekitar beberapa milidetik per pembaruan. Jadi saya akan segera menambah jumlah baris menjadi 10.000:

start 10000

Hasilnya:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Sekali lagi, tidak ada yang istimewa, tapi ini 24 juta baris per menit, 400 ribu per detik. Selama lebih dari 25 milidetik, pembaruan hanya melambat 5 kali, tampaknya ketika menit berganti. Ayo tingkatkan menjadi 100.000:

start 100000

Hasilnya:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Seperti yang Anda lihat, layanan ini hampir tidak dapat mengatasinya, namun tetap berhasil bertahan. Volume data seperti itu (240 juta baris per menit) sangat besar; dalam kasus seperti itu, biasanya meluncurkan beberapa klon (atau bahkan lusinan klon) layanan, yang masing-masing hanya memproses sebagian karakter. Namun, hasilnya mengesankan untuk bahasa interpretasi yang berfokus terutama pada penyimpanan data.

Pertanyaan yang mungkin muncul adalah mengapa waktu bertambah secara non-linear seiring dengan besarnya setiap pembaruan. Pasalnya, fungsi shrink sebenarnya merupakan fungsi C yang jauh lebih efisien dibandingkan updateAgg. Dimulai dari ukuran pembaruan tertentu (sekitar 10.000), updateAgg mencapai batas maksimumnya dan kemudian waktu eksekusinya tidak bergantung pada ukuran pembaruan. Hal ini disebabkan oleh langkah awal Q sehingga layanan mampu mencerna data dalam jumlah besar. Hal ini menyoroti betapa pentingnya memilih algoritma yang tepat ketika bekerja dengan data besar. Poin lainnya adalah penyimpanan data yang benar dalam memori. Jika data tidak disimpan secara kolom atau tidak diurutkan berdasarkan waktu, maka kita akan terbiasa dengan hal seperti cache TLB yang hilang - tidak adanya alamat halaman memori di cache alamat prosesor. Pencarian alamat memakan waktu sekitar 30 kali lebih lama jika tidak berhasil, dan jika data tersebar dapat memperlambat layanan beberapa kali.

Kesimpulan

Pada artikel ini, saya menunjukkan bahwa database KDB+ dan Q cocok tidak hanya untuk menyimpan data berukuran besar dan mudah diakses melalui pilihan, tetapi juga untuk membuat layanan pemrosesan data yang mampu mencerna ratusan juta baris/gigabyte data bahkan dalam jumlah besar. satu proses Q tunggal. Bahasa Q sendiri memungkinkan implementasi algoritme yang terkait dengan pemrosesan data dengan sangat ringkas dan efisien karena sifat vektornya, penerjemah dialek SQL bawaan, dan serangkaian fungsi perpustakaan yang sangat sukses.

Saya perhatikan bahwa hal di atas hanyalah sebagian dari apa yang dapat dilakukan Q, ia juga memiliki fitur unik lainnya. Misalnya, protokol IPC yang sangat sederhana yang menghapus batas antara masing-masing proses Q dan memungkinkan Anda menggabungkan ratusan proses ini ke dalam satu jaringan, yang dapat ditempatkan di lusinan server di berbagai belahan dunia.

Sumber: www.habr.com

Tambah komentar