Anda bisa membaca tentang apa itu basis KDB+, bahasa pemrograman Q, apa kelebihan dan kekurangannya pada saya sebelumnya
pengenalan
KDB+ adalah database kolom yang berfokus pada data dalam jumlah sangat besar, diurutkan dengan cara tertentu (terutama berdasarkan waktu). Ini digunakan terutama di lembaga keuangan - bank, dana investasi, perusahaan asuransi. Bahasa Q adalah bahasa internal KDB+ yang memungkinkan Anda bekerja secara efektif dengan data ini. Ideologi Q adalah keringkasan dan efisiensi, sedangkan kejelasan dikorbankan. Hal ini dibenarkan oleh fakta bahwa bahasa vektor akan sulit untuk dipahami, dan singkatnya serta kekayaan rekaman memungkinkan Anda melihat lebih banyak bagian program pada satu layar, yang pada akhirnya membuatnya lebih mudah untuk dipahami.
Pada artikel ini kami menerapkan program lengkap di Q dan Anda mungkin ingin mencobanya. Untuk melakukan ini, Anda memerlukan Q yang sebenarnya. Anda dapat mengunduh versi 32-bit gratis di situs web perusahaan kx β
Pernyataan masalah
Ada sumber yang mengirimkan tabel dengan data setiap 25 milidetik. Karena KDB+ terutama digunakan di bidang keuangan, kita asumsikan bahwa ini adalah tabel transaksi (perdagangan), yang memiliki kolom berikut: time (waktu dalam milidetik), sym (sebutan perusahaan di bursa saham - IBM, AAPL,β¦), price (harga pembelian saham), size (ukuran transaksi). Interval 25 milidetik itu sewenang-wenang, tidak terlalu kecil dan tidak terlalu panjang. Kehadirannya berarti data yang masuk ke layanan sudah di-buffer. Akan mudah untuk menerapkan buffering di sisi layanan, termasuk buffering dinamis tergantung pada beban saat ini, namun untuk kesederhanaan, kami akan fokus pada interval tetap.
Layanan harus menghitung setiap menit untuk setiap simbol yang masuk dari kolom sym serangkaian fungsi agregasi - harga maksimal, harga rata-rata, ukuran jumlah, dll. informasi berguna. Untuk mempermudah, kita asumsikan bahwa semua fungsi dapat dihitung secara bertahap, yaitu. untuk mendapatkan nilai baru, cukup mengetahui dua angka - nilai lama dan nilai masuk. Misalnya, fungsi maks, rata-rata, jumlah mempunyai sifat ini, tetapi fungsi median tidak.
Kami juga akan berasumsi bahwa aliran data yang masuk diurutkan berdasarkan waktu. Ini akan memberi kita kesempatan untuk bekerja hanya pada menit-menit terakhir. Dalam praktiknya, cukup bekerja dengan menit saat ini dan sebelumnya jika ada pembaruan yang terlambat. Untuk mempermudah, kami tidak akan mempertimbangkan kasus ini.
Fungsi agregasi
Fungsi agregasi yang diperlukan tercantum di bawah ini. Saya mengambil sebanyak mungkin untuk menambah beban layanan:
- tinggi β harga maksimal β harga maksimal per menit.
- rendah β harga minimum β harga minimum per menit.
- Harga pertama β harga pertama β harga pertama per menit.
- lastPrice β harga terakhir β harga terakhir per menit.
- firstSize β ukuran pertama β ukuran perdagangan pertama per menit.
- lastSize β ukuran terakhir β ukuran perdagangan terakhir dalam satu menit.
- numTrades β hitung i β jumlah perdagangan per menit.
- volume β jumlah ukuran β jumlah ukuran perdagangan per menit.
- pvolume β jumlah harga β jumlah harga per menit, diperlukan untuk harga rata-rata.
- β jumlah harga turnover*ukuran β total volume transaksi per menit.
- avgPrice β pvolume%numTrades β harga rata-rata per menit.
- avgSize β volume%numTrades β rata-rata ukuran perdagangan per menit.
- vwap β turnover%volume β harga rata-rata per menit berdasarkan ukuran transaksi.
- cumVolume β jumlah volume β akumulasi ukuran transaksi sepanjang waktu.
Mari kita segera membahas satu hal yang tidak jelas - cara menginisialisasi kolom ini untuk pertama kalinya dan untuk setiap menit berikutnya. Beberapa kolom tipe firstPrice harus diinisialisasi ke null setiap kali; nilainya tidak ditentukan. Tipe volume lainnya harus selalu disetel ke 0. Ada juga kolom yang memerlukan pendekatan gabungan - misalnya, cumVolume harus disalin dari menit sebelumnya, dan untuk menit pertama disetel ke 0. Mari atur semua parameter ini menggunakan data kamus ketik (analog dengan catatan):
// list ! list β ΡΠΎΠ·Π΄Π°ΡΡ ΡΠ»ΠΎΠ²Π°ΡΡ, 0n β float null, 0N β long null, `sym β ΡΠΈΠΏ ΡΠΈΠΌΠ²ΠΎΠ», `sym1`sym2 β ΡΠΏΠΈΡΠΎΠΊ ΡΠΈΠΌΠ²ΠΎΠ»ΠΎΠ²
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // ΡΠΏΠΈΡΠΎΠΊ Π²ΡΠ΅Ρ
Π²ΡΡΠΈΡΠ»ΡΠ΅ΠΌΡΡ
ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, reverse ΠΎΠ±ΡΡΡΠ½Π΅Π½ Π½ΠΈΠΆΠ΅
Saya menambahkan sim dan waktu ke kamus untuk kenyamanan, sekarang initWith adalah baris siap pakai dari tabel agregat terakhir, yang tersisa untuk menyetel sim dan waktu yang benar. Anda dapat menggunakannya untuk menambahkan baris baru ke tabel.
Kita memerlukan aggCols saat membuat fungsi agregasi. Daftar harus dibalik karena urutan evaluasi ekspresi dalam Q (dari kanan ke kiri). Tujuannya adalah untuk memastikan penghitungan beralih dari tinggi ke cumVolume, karena beberapa kolom bergantung pada kolom sebelumnya.
Kolom yang perlu disalin ke menit baru dari menit sebelumnya, kolom simbol ditambahkan untuk kenyamanan:
rollColumns:`sym`cumVolume;
Sekarang mari kita bagi kolom menjadi beberapa kelompok sesuai dengan cara memperbaruinya. Tiga jenis dapat dibedakan:
- Akumulator (volume, omset,..) β kita harus menambahkan nilai yang masuk ke nilai sebelumnya.
- Dengan titik khusus (tinggi, rendah, ..) β nilai pertama dalam satu menit diambil dari data yang masuk, sisanya dihitung menggunakan fungsi.
- Istirahat. Selalu dihitung menggunakan suatu fungsi.
Mari kita definisikan variabel untuk kelas-kelas ini:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Urutan perhitungan
Kami akan memperbarui tabel gabungan dalam dua tahap. Untuk efisiensi, pertama-tama kita mengecilkan tabel masuk sehingga hanya ada satu baris untuk setiap karakter dan menit. Fakta bahwa semua fungsi kami bersifat inkremental dan asosiatif menjamin bahwa hasil dari langkah tambahan ini tidak akan berubah. Anda dapat mengecilkan tabel menggunakan pilih:
select high:max price, low:min price β¦ by sym,time.minute from table
Metode ini memiliki kelemahan - kumpulan kolom terhitung sudah ditentukan sebelumnya. Untungnya, di Q, pilih juga diimplementasikan sebagai fungsi di mana Anda dapat mengganti argumen yang dibuat secara dinamis:
?[table;whereClause;byClause;selectClause]
Saya tidak akan menjelaskan secara rinci format argumen; dalam kasus kami, hanya ekspresi by dan select yang bersifat nontrivial dan harus berupa kamus kolom formulir!ekspresi. Dengan demikian, fungsi penyusutan dapat didefinisikan sebagai berikut:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each ΡΡΠΎ ΡΡΠ½ΠΊΡΠΈΡ map Π² Q Π΄Π»Ρ ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠΏΠΈΡΠΊΠ°
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Untuk kejelasan, saya menggunakan fungsi parse, yang mengubah string dengan ekspresi Q menjadi nilai yang dapat diteruskan ke fungsi eval dan diperlukan dalam fungsi pemilihan. Perhatikan juga bahwa praproses didefinisikan sebagai proyeksi (yaitu, fungsi dengan argumen yang ditentukan sebagian) dari fungsi pilih, satu argumen (tabel) hilang. Jika kita menerapkan preprocess ke sebuah tabel, kita akan mendapatkan tabel terkompresi.
Tahap kedua adalah memperbarui tabel agregat. Mari kita tulis dulu algoritmanya dalam pseudocode:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
β¦
Di Q, biasanya menggunakan fungsi map/reduce daripada loop. Tapi karena Q adalah bahasa vektor dan kita dapat dengan mudah menerapkan semua operasi ke semua simbol sekaligus, maka pada pendekatan pertama kita dapat melakukannya tanpa loop sama sekali, melakukan operasi pada semua simbol sekaligus:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
β¦
Tapi kita bisa melangkah lebih jauh, Q memiliki operator yang unik dan sangat kuat - operator penugasan yang digeneralisasi. Ini memungkinkan Anda mengubah sekumpulan nilai dalam struktur data yang kompleks menggunakan daftar indeks, fungsi, dan argumen. Dalam kasus kami, tampilannya seperti ini:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;β¦]: function[target[idx 0;idx 1;β¦];argument], Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΡΡΠ½ΠΊΡΠΈΡ β ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Sayangnya, untuk menetapkan ke tabel Anda memerlukan daftar baris, bukan kolom, dan Anda harus mengubah urutan matriks (daftar kolom ke daftar baris) menggunakan fungsi flip. Ini mahal untuk tabel besar, jadi kami menerapkan penugasan umum ke setiap kolom secara terpisah, menggunakan fungsi peta (yang terlihat seperti tanda kutip):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;β¦)];
Kami kembali menggunakan proyeksi fungsi. Perhatikan juga bahwa di Q, membuat daftar juga merupakan sebuah fungsi dan kita dapat memanggilnya menggunakan fungsi every(map) untuk mendapatkan daftar daftar.
Untuk memastikan bahwa kumpulan kolom terhitung tidak tetap, kami akan membuat ekspresi di atas secara dinamis. Pertama-tama mari kita definisikan fungsi untuk menghitung setiap kolom, menggunakan variabel baris dan input untuk merujuk pada data gabungan dan input:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Beberapa kolom bersifat khusus; nilai pertamanya tidak boleh dihitung oleh fungsi. Kita dapat menentukan bahwa ini adalah yang pertama dengan kolom baris[`numTrades] - jika berisi 0, maka nilainya adalah yang pertama. Q memiliki fungsi pilih - ?[Boolean list;list1;list2] - yang memilih nilai dari daftar 1 atau 2 bergantung pada kondisi di argumen pertama:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - ΡΠΎΠΆΠ΅ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΠΎΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π΄Π»Ρ ΡΠ»ΡΡΠ°Ρ ΠΊΠΎΠ³Π΄Π° ΠΈΠ½Π΄Π΅ΠΊΡ Π½Π΅Π³Π»ΡΠ±ΠΎΠΊΠΈΠΉ
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Di sini saya memanggil tugas umum dengan fungsi saya (ekspresi dalam kurung kurawal). Ia menerima nilai saat ini (argumen pertama) dan argumen tambahan, yang saya sampaikan pada parameter ke-4.
Mari kita tambahkan speaker baterai secara terpisah, karena fungsinya sama:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Ini adalah tugas normal menurut standar Q, tetapi saya menetapkan daftar nilai sekaligus. Terakhir, mari buat fungsi utama:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" ΠΏΡΠΈΡΠ²ΠΎΠΈΠΌ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΠΎΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΠΎΠΉ, ΠΏΠΎΡΠΎΠΌΡ ΡΡΠΎ Π½Π΅ΠΊΠΎΡΠΎΡΡΠ΅ ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ Π·Π°Π²ΠΈΡΡΡ ΠΎΡ ΡΠΆΠ΅ Π²ΡΡΠΈΡΠ»Π΅Π½Π½ΡΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" Π·Π°Π²Π΅ΡΡΠΈΠΌ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΡ. ,β ΡΠ°ΡΡΠΈΡΡΠΎΠ²ΡΠ²Π°Π΅ΡΡΡ ΠΊΠ°ΠΊ map[concat]
// ";" sv exprs β String from Vector (sv), ΡΠΎΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΡΡΡΠΎΠΊ Π²ΡΡΠ°Π²Π»ΡΡ β;β ΠΏΠΎΡΡΠ΅Π΄ΠΈΠ½Π΅
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Dengan ekspresi ini, saya secara dinamis membuat fungsi dari string yang berisi ekspresi yang saya berikan di atas. Hasilnya akan terlihat seperti ini:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;β¦ ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
Urutan evaluasi kolom dibalik karena pada Q urutan evaluasinya dari kanan ke kiri.
Sekarang kita memiliki dua fungsi utama yang diperlukan untuk perhitungan, kita hanya perlu menambahkan sedikit infrastruktur dan layanan siap.
Langkah terakhir
Kami memiliki fungsi praproses dan updateAgg yang melakukan semua pekerjaan. Namun tetap perlu memastikan transisi yang benar melalui menit dan menghitung indeks untuk agregasi. Pertama-tama, mari kita definisikan fungsi init:
init:{
tradeAgg:: 0#enlist[initWith]; // ΡΠΎΠ·Π΄Π°Π΅ΠΌ ΠΏΡΡΡΡΡ ΡΠΈΠΏΠΈΠ·ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ, enlist ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠ»ΠΎΠ²Π°ΡΡ Π² ΡΠ°Π±Π»ΠΈΡΡ, Π° 0# ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ Π²Π·ΡΡΡ 0 ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΠΈΠ· Π½Π΅Π΅
currTime::00:00; // Π½Π°ΡΠ½Π΅ΠΌ Ρ 0, :: ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ Π² Π³Π»ΠΎΠ±Π°Π»ΡΠ½ΡΡ ΠΏΠ΅ΡΠ΅ΠΌΠ΅Π½Π½ΡΡ
currSyms::`u#`symbol$(); // `u# - ΠΏΡΠ΅Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ Π² Π΄Π΅ΡΠ΅Π²ΠΎ, Π΄Π»Ρ ΡΡΠΊΠΎΡΠ΅Π½ΠΈΡ ΠΏΠΎΠΈΡΠΊΠ° ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ²
offset::0; // ΠΈΠ½Π΄Π΅ΠΊΡ Π² tradeAgg, Π³Π΄Π΅ Π½Π°ΡΠΈΠ½Π°Π΅ΡΡΡ ΡΠ΅ΠΊΡΡΠ°Ρ ΠΌΠΈΠ½ΡΡΠ°
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // ΠΊΡΡ Π΄Π»Ρ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΡ
Π·Π½Π°ΡΠ΅Π½ΠΈΠΉ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, ΡΠ°Π±Π»ΠΈΡΠ° Ρ ΠΊΠ»ΡΡΠΎΠΌ sym
}
Kami juga akan mendefinisikan fungsi roll, yang akan mengubah menit saat ini:
roll:{[tm]
if[currTime>tm; :init[]]; // Π΅ΡΠ»ΠΈ ΠΏΠ΅ΡΠ΅Π²Π°Π»ΠΈΠ»ΠΈ Π·Π° ΠΏΠΎΠ»Π½ΠΎΡΡ, ΡΠΎ ΠΏΡΠΎΡΡΠΎ Π²ΡΠ·ΠΎΠ²Π΅ΠΌ init
rollCache,::offset _ rollColumns#tradeAgg; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ ΠΊΡΡ β Π²Π·ΡΡΡ roll ΠΊΠΎΠ»ΠΎΠ½ΠΊΠΈ ΠΈΠ· aggTable, ΠΎΠ±ΡΠ΅Π·Π°ΡΡ, Π²ΡΡΠ°Π²ΠΈΡΡ Π² rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Kita memerlukan fungsi untuk menambahkan karakter baru:
addSyms:{[syms]
currSyms,::syms; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠΏΠΈΡΠΎΠΊ ΠΈΠ·Π²Π΅ΡΡΠ½ΡΡ
// Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π² ΡΠ°Π±Π»ΠΈΡΡ sym, time ΠΈ rollColumns Π²ΠΎΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π²ΡΠΈΡΡ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΡΠΌ ΠΏΡΠΈΡΠ²Π°ΠΈΠ²Π°Π½ΠΈΠ΅ΠΌ.
// Π€ΡΠ½ΠΊΡΠΈΡ ^ ΠΏΠΎΠ΄ΡΡΠ°Π²Π»ΡΠ΅Ρ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ Π΄Π»Ρ roll ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ, Π΅ΡΠ»ΠΈ ΡΠΈΠΌΠ²ΠΎΠ»Π° Π½Π΅Ρ Π² ΠΊΡΡΠ΅. value flip table Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅Ρ ΡΠΏΠΈΡΠΎΠΊ ΠΊΠΎΠ»ΠΎΠ½ΠΎΠΊ Π² ΡΠ°Π±Π»ΠΈΡΠ΅.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
Dan terakhir, fungsi upd (nama tradisional fungsi ini untuk layanan Q), yang dipanggil oleh klien untuk menambahkan data:
upd:{[tblName;data] // tblName Π½Π°ΠΌ Π½Π΅ Π½ΡΠΆΠ½ΠΎ, Π½ΠΎ ΠΎΠ±ΡΡΠ½ΠΎ ΡΠ΅ΡΠ²ΠΈΡ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π΅Ρ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ°Π±Π»ΠΈΡ
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // Π΄ΠΎΠ±Π°Π²ΠΈΠΌ Π΄Π°Π½Π½ΡΠ΅ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠΉ ΠΌΠΈΠ½ΡΡΡ
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // ΠΏΠΎΠΌΠ΅Π½ΡΠ΅ΠΌ ΠΌΠΈΠ½ΡΡΡ, Π΅ΡΠ»ΠΈ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ
data:select from data where time=tm; // ΡΠΈΠ»ΡΡΡΠ°ΡΠΈΡ
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // Π½ΠΎΠ²ΡΠ΅ ΡΠΈΠΌΠ²ΠΎΠ»Ρ
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // ΠΎΠ±Π½ΠΎΠ²ΠΈΠΌ Π°Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ ΡΠ°Π±Π»ΠΈΡΡ. Π€ΡΠ½ΠΊΡΠΈΡ ? ΠΈΡΠ΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡ ΡΠ»Π΅ΠΌΠ΅Π½ΡΠΎΠ² ΡΠΏΠΈΡΠΊΠ° ΡΠΏΡΠ°Π²Π° Π² ΡΠΏΠΈΡΠΊΠ΅ ΡΠ»Π΅Π²Π°.
};
Itu saja. Berikut kode lengkap layanan kami, seperti yang dijanjikan, hanya beberapa baris:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Pengujian
Mari kita periksa kinerja layanan. Untuk melakukan ini, jalankan dalam proses terpisah (letakkan kode di file service.q) dan panggil fungsi init:
q service.q βp 5566
q)init[]
Di konsol lain, mulai proses Q kedua dan sambungkan ke yang pertama:
h:hopen `:host:5566
h:hopen 5566 // Π΅ΡΠ»ΠΈ ΠΎΠ±Π° Π½Π° ΠΎΠ΄Π½ΠΎΠΌ Ρ
ΠΎΡΡΠ΅
Pertama, mari buat daftar simbol - 10000 buah dan tambahkan fungsi untuk membuat tabel acak. Di konsol kedua:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Saya menambahkan tiga simbol nyata ke dalam daftar untuk memudahkan mencarinya di tabel. Fungsi rnd membuat tabel acak dengan n baris, dengan waktu bervariasi dari t hingga t+25 milidetik.
Sekarang Anda dapat mencoba mengirim data ke layanan (tambahkan sepuluh jam pertama):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Anda dapat memeriksa di layanan bahwa tabel telah diperbarui:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Hasilnya:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
Sekarang mari kita lakukan pengujian beban untuk mengetahui berapa banyak data yang dapat diproses oleh layanan per menit. Izinkan saya mengingatkan Anda bahwa kami menyetel interval pembaruan menjadi 25 milidetik. Oleh karena itu, layanan (rata-rata) harus memenuhi setidaknya 20 milidetik per pembaruan agar pengguna memiliki waktu untuk meminta data. Masukkan yang berikut ini pada proses kedua:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 adalah dua menit. Anda dapat mencoba menjalankannya terlebih dahulu selama 1000 baris setiap 25 milidetik:
start 1000
Dalam kasus saya, hasilnya sekitar beberapa milidetik per pembaruan. Jadi saya akan segera menambah jumlah baris menjadi 10.000:
start 10000
Hasilnya:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Sekali lagi, tidak ada yang istimewa, tapi ini 24 juta baris per menit, 400 ribu per detik. Selama lebih dari 25 milidetik, pembaruan hanya melambat 5 kali, tampaknya ketika menit berganti. Ayo tingkatkan menjadi 100.000:
start 100000
Hasilnya:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Seperti yang Anda lihat, layanan ini hampir tidak dapat mengatasinya, namun tetap berhasil bertahan. Volume data seperti itu (240 juta baris per menit) sangat besar; dalam kasus seperti itu, biasanya meluncurkan beberapa klon (atau bahkan lusinan klon) layanan, yang masing-masing hanya memproses sebagian karakter. Namun, hasilnya mengesankan untuk bahasa interpretasi yang berfokus terutama pada penyimpanan data.
Pertanyaan yang mungkin muncul adalah mengapa waktu bertambah secara non-linear seiring dengan besarnya setiap pembaruan. Pasalnya, fungsi shrink sebenarnya merupakan fungsi C yang jauh lebih efisien dibandingkan updateAgg. Dimulai dari ukuran pembaruan tertentu (sekitar 10.000), updateAgg mencapai batas maksimumnya dan kemudian waktu eksekusinya tidak bergantung pada ukuran pembaruan. Hal ini disebabkan oleh langkah awal Q sehingga layanan mampu mencerna data dalam jumlah besar. Hal ini menyoroti betapa pentingnya memilih algoritma yang tepat ketika bekerja dengan data besar. Poin lainnya adalah penyimpanan data yang benar dalam memori. Jika data tidak disimpan secara kolom atau tidak diurutkan berdasarkan waktu, maka kita akan terbiasa dengan hal seperti cache TLB yang hilang - tidak adanya alamat halaman memori di cache alamat prosesor. Pencarian alamat memakan waktu sekitar 30 kali lebih lama jika tidak berhasil, dan jika data tersebar dapat memperlambat layanan beberapa kali.
Kesimpulan
Pada artikel ini, saya menunjukkan bahwa database KDB+ dan Q cocok tidak hanya untuk menyimpan data berukuran besar dan mudah diakses melalui pilihan, tetapi juga untuk membuat layanan pemrosesan data yang mampu mencerna ratusan juta baris/gigabyte data bahkan dalam jumlah besar. satu proses Q tunggal. Bahasa Q sendiri memungkinkan implementasi algoritme yang terkait dengan pemrosesan data dengan sangat ringkas dan efisien karena sifat vektornya, penerjemah dialek SQL bawaan, dan serangkaian fungsi perpustakaan yang sangat sukses.
Saya perhatikan bahwa hal di atas hanyalah sebagian dari apa yang dapat dilakukan Q, ia juga memiliki fitur unik lainnya. Misalnya, protokol IPC yang sangat sederhana yang menghapus batas antara masing-masing proses Q dan memungkinkan Anda menggabungkan ratusan proses ini ke dalam satu jaringan, yang dapat ditempatkan di lusinan server di berbagai belahan dunia.
Sumber: www.habr.com