คุณสามารถอ่านเกี่ยวกับว่าฐาน KDB+ คืออะไร ภาษาการเขียนโปรแกรม Q คืออะไร จุดแข็งและจุดอ่อนของพวกเขาในครั้งก่อนของฉัน
การแนะนำ
KDB+ เป็นฐานข้อมูลแบบเรียงเป็นแนวที่เน้นไปที่ข้อมูลจำนวนมาก โดยเรียงลำดับในลักษณะเฉพาะ (โดยหลักตามเวลา) ส่วนใหญ่จะใช้ในสถาบันการเงิน - ธนาคาร กองทุนรวมที่ลงทุน บริษัทประกันภัย ภาษา Q คือภาษาภายในของ KDB+ ที่ช่วยให้คุณทำงานกับข้อมูลนี้ได้อย่างมีประสิทธิภาพ อุดมการณ์ Q คือความกะทัดรัดและมีประสิทธิภาพ ในขณะที่ความชัดเจนถูกละทิ้งไป นี่เป็นข้อพิสูจน์ด้วยความจริงที่ว่าภาษาเวกเตอร์จะเข้าใจยากไม่ว่าในกรณีใด และความกะทัดรัดและความสมบูรณ์ของการบันทึกช่วยให้คุณเห็นส่วนที่ใหญ่กว่ามากของโปรแกรมบนหน้าจอเดียว ซึ่งท้ายที่สุดแล้วจะทำให้เข้าใจได้ง่ายขึ้น
ในบทความนี้ เราได้ใช้โปรแกรมเต็มรูปแบบใน Q และคุณอาจต้องการลองใช้ดู ในการดำเนินการนี้ คุณจะต้องมี Q จริง คุณสามารถดาวน์โหลดเวอร์ชัน 32 บิตฟรีได้ที่เว็บไซต์บริษัท kx -
คำแถลงปัญหา
มีแหล่งที่มาที่ส่งตารางพร้อมข้อมูลทุกๆ 25 มิลลิวินาที เนื่องจาก KDB+ ใช้ในด้านการเงินเป็นหลัก เราจะถือว่านี่คือตารางธุรกรรม (การซื้อขาย) ซึ่งมีคอลัมน์ต่อไปนี้: เวลา (เวลาเป็นมิลลิวินาที), Sym (การกำหนดบริษัทในตลาดหลักทรัพย์ - ไอบีเอ็ม, AAPL,…), ราคา (ราคาที่ซื้อหุ้น), ขนาด (ขนาดของธุรกรรม) ช่วงเวลา 25 มิลลิวินาทีนั้นเป็นไปตามอำเภอใจ ไม่น้อยเกินไปและไม่ยาวเกินไป การมีอยู่หมายความว่าข้อมูลมาถึงบริการที่ถูกบัฟเฟอร์แล้ว คงจะเป็นเรื่องง่ายที่จะใช้การบัฟเฟอร์ในฝั่งบริการ รวมถึงการบัฟเฟอร์แบบไดนามิกขึ้นอยู่กับโหลดปัจจุบัน แต่เพื่อความง่าย เราจะมุ่งเน้นไปที่ช่วงเวลาที่คงที่
บริการจะต้องนับทุกนาทีสำหรับแต่ละสัญลักษณ์ที่เข้ามาจากคอลัมน์ sym ชุดของฟังก์ชันการรวม - ราคาสูงสุด ราคาเฉลี่ย ขนาดผลรวม ฯลฯ ข้อมูลที่เป็นประโยชน์. เพื่อความง่าย เราจะถือว่าฟังก์ชันทั้งหมดสามารถคำนวณแบบเพิ่มทีละน้อยได้ เช่น เพื่อให้ได้ค่าใหม่ก็เพียงพอที่จะทราบตัวเลขสองตัว - ค่าเก่าและค่าขาเข้า ตัวอย่างเช่น ฟังก์ชันสูงสุด ค่าเฉลี่ย ผลรวมมีคุณสมบัตินี้ แต่ฟังก์ชันมัธยฐานไม่มี
นอกจากนี้เรายังจะถือว่าสตรีมข้อมูลที่เข้ามานั้นเป็นไปตามลำดับเวลา นี่จะทำให้เรามีโอกาสที่จะทำงานเฉพาะในนาทีสุดท้ายเท่านั้น ในทางปฏิบัติ การทำงานกับนาทีปัจจุบันและนาทีก่อนหน้าก็เพียงพอแล้ว ในกรณีที่การอัปเดตบางอย่างล่าช้า เพื่อความง่าย เราจะไม่พิจารณากรณีนี้
ฟังก์ชันการรวมกลุ่ม
ฟังก์ชันการรวมที่จำเป็นแสดงอยู่ด้านล่างนี้ ฉันใช้มันให้มากที่สุดเท่าที่จะเป็นไปได้เพื่อเพิ่มภาระในบริการ:
- สูง – ราคาสูงสุด – ราคาสูงสุดต่อนาที
- ราคาต่ำ – ราคาต่ำสุด – ราคาขั้นต่ำต่อนาที
- ราคาแรก – ราคาแรก – ราคาแรกต่อนาที
- LastPrice – ราคาสุดท้าย – ราคาสุดท้ายต่อนาที
- firstSize – ขนาดแรก – ขนาดการซื้อขายครั้งแรกต่อนาที
- LastSize – ขนาดสุดท้าย – ขนาดการซื้อขายสุดท้ายในหนึ่งนาที
- numTrades – นับ i – จำนวนการซื้อขายต่อนาที
- ปริมาณ – ขนาดผลรวม – ผลรวมของขนาดการซื้อขายต่อนาที
- pvolume – ราคารวม – ผลรวมของราคาต่อนาที ที่จำเป็นสำหรับ avgPrice
- – ราคาหมุนเวียนรวม*ขนาด – ปริมาณธุรกรรมทั้งหมดต่อนาที
- ราคาเฉลี่ย – pvolume%numTrades – ราคาเฉลี่ยต่อนาที
- avgSize – ปริมาณ%numTrades – ขนาดการซื้อขายเฉลี่ยต่อนาที
- vwap – มูลค่าการซื้อขาย% ปริมาณ – ราคาเฉลี่ยต่อนาทีถ่วงน้ำหนักด้วยขนาดธุรกรรม
- cumVolume – ปริมาณรวม – ขนาดธุรกรรมสะสมตลอดเวลา
เรามาพูดถึงประเด็นที่ไม่ชัดเจนจุดหนึ่งทันที - วิธีเริ่มต้นคอลัมน์เหล่านี้ในครั้งแรกและในแต่ละนาทีต่อ ๆ ไป บางคอลัมน์ของประเภท firstPrice จะต้องเริ่มต้นเป็น null ในแต่ละครั้ง โดยไม่ได้กำหนดค่าคอลัมน์เหล่านั้น ประเภทวอลุ่มอื่นๆ จะต้องตั้งค่าเป็น 0 เสมอ นอกจากนี้ยังมีคอลัมน์ที่ต้องใช้วิธีรวม เช่น ต้องคัดลอก cumVolume จากนาทีก่อนหน้า และสำหรับอันแรกตั้งค่าเป็น 0 มาตั้งค่าพารามิเตอร์เหล่านี้ทั้งหมดโดยใช้ข้อมูลพจนานุกรม ประเภท (คล้ายกับบันทึก):
// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
ฉันเพิ่ม Sym และเวลาลงในพจนานุกรมเพื่อความสะดวก ตอนนี้ initWith เป็นบรรทัดสำเร็จรูปจากตารางรวมสุดท้าย ซึ่งยังคงต้องตั้งค่า Sym และเวลาที่ถูกต้อง คุณสามารถใช้มันเพื่อเพิ่มแถวใหม่ลงในตารางได้
เราจะต้องมี aggCols เมื่อสร้างฟังก์ชันการรวมกลุ่ม รายการจะต้องกลับด้านตามลำดับการประเมินนิพจน์ใน Q (จากขวาไปซ้าย) เป้าหมายคือเพื่อให้แน่ใจว่าการคำนวณเปลี่ยนจากสูงไปจนถึง cumVolume เนื่องจากบางคอลัมน์ขึ้นอยู่กับคอลัมน์ก่อนหน้า
คอลัมน์ที่ต้องคัดลอกไปยังนาทีใหม่จากก่อนหน้า คอลัมน์ Sym จะถูกเพิ่มเพื่อความสะดวก:
rollColumns:`sym`cumVolume;
ตอนนี้เรามาแบ่งคอลัมน์ออกเป็นกลุ่มตามวิธีที่ควรอัปเดต สามารถแยกแยะได้สามประเภท:
- ตัวสะสม (ปริมาณ, มูลค่าการซื้อขาย,..) – เราต้องเพิ่มมูลค่าที่เข้ามาให้กับค่าก่อนหน้า
- ด้วยจุดพิเศษ (สูง, ต่ำ, ..) – ค่าแรกในนาทีนำมาจากข้อมูลที่เข้ามา ส่วนที่เหลือจะถูกคำนวณโดยใช้ฟังก์ชัน
- พักผ่อน. คำนวณโดยใช้ฟังก์ชันเสมอ
มากำหนดตัวแปรสำหรับคลาสเหล่านี้กันดีกว่า:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
ลำดับการคำนวณ
เราจะอัปเดตตารางรวมเป็นสองขั้นตอน เพื่อประสิทธิภาพ ก่อนอื่นเราจะย่อตารางขาเข้าเพื่อให้มีเพียงแถวเดียวสำหรับแต่ละอักขระและนาที ความจริงที่ว่าฟังก์ชันทั้งหมดของเรามีแบบเพิ่มหน่วยและเชื่อมโยงรับประกันว่าผลลัพธ์ของขั้นตอนเพิ่มเติมนี้จะไม่เปลี่ยนแปลง คุณสามารถย่อตารางโดยใช้ตัวเลือก:
select high:max price, low:min price … by sym,time.minute from table
วิธีนี้มีข้อเสีย - ชุดของคอลัมน์จากการคำนวณถูกกำหนดไว้ล่วงหน้าแล้ว โชคดีที่ใน Q นั้น select ยังถูกนำมาใช้เป็นฟังก์ชันที่คุณสามารถแทนที่อาร์กิวเมนต์ที่สร้างขึ้นแบบไดนามิก:
?[table;whereClause;byClause;selectClause]
ฉันจะไม่อธิบายรายละเอียดเกี่ยวกับรูปแบบของข้อโต้แย้ง ในกรณีของเรา เฉพาะนิพจน์โดยและที่เลือกเท่านั้นที่จะไม่สำคัญ และควรเป็นพจนานุกรมของคอลัมน์แบบฟอร์ม!นิพจน์ ดังนั้นฟังก์ชันการย่อขนาดสามารถกำหนดได้ดังนี้:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
เพื่อความชัดเจน ฉันใช้ฟังก์ชัน parse ซึ่งเปลี่ยนสตริงที่มีนิพจน์ Q เป็นค่าที่สามารถส่งผ่านไปยังฟังก์ชัน eval และจำเป็นในการเลือกฟังก์ชัน โปรดทราบว่าการประมวลผลล่วงหน้าถูกกำหนดให้เป็นเส้นโครง (เช่น ฟังก์ชันที่มีอาร์กิวเมนต์ที่กำหนดบางส่วน) ของฟังก์ชันเลือก โดยอาร์กิวเมนต์หนึ่งตัว (ตาราง) หายไป หากเราใช้การประมวลผลล่วงหน้ากับตาราง เราจะได้ตารางที่บีบอัด
ขั้นตอนที่สองกำลังอัปเดตตารางรวม ขั้นแรกให้เขียนอัลกอริทึมเป็นรหัสเทียม:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
…
ใน Q เป็นเรื่องปกติที่จะใช้ฟังก์ชัน map/reduce แทนการวนซ้ำ แต่เนื่องจาก Q เป็นภาษาเวกเตอร์ และเราสามารถใช้การดำเนินการทั้งหมดกับสัญลักษณ์ทั้งหมดในคราวเดียวได้อย่างง่ายดาย จากนั้นจึงทำการประมาณครั้งแรกโดยไม่ต้องวนซ้ำเลย โดยดำเนินการกับสัญลักษณ์ทั้งหมดในคราวเดียว:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…
แต่เราสามารถไปไกลกว่านี้ได้ Q มีโอเปอเรเตอร์ที่มีเอกลักษณ์และทรงพลังอย่างยิ่ง - โอเปอเรเตอร์ที่ได้รับมอบหมายทั่วไป ช่วยให้คุณสามารถเปลี่ยนชุดของค่าในโครงสร้างข้อมูลที่ซับซ้อนโดยใช้รายการดัชนี ฟังก์ชัน และอาร์กิวเมนต์ ในกรณีของเราดูเหมือนว่านี้:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
ขออภัย หากต้องการกำหนดให้กับตาราง คุณต้องมีรายการแถว ไม่ใช่คอลัมน์ และคุณต้องย้ายเมทริกซ์ (รายการคอลัมน์เป็นรายการแถว) โดยใช้ฟังก์ชันพลิก ซึ่งมีราคาแพงสำหรับตารางขนาดใหญ่ ดังนั้นเราจึงใช้การกำหนดทั่วไปกับแต่ละคอลัมน์แยกกัน โดยใช้ฟังก์ชันแผนที่ (ซึ่งดูเหมือนเครื่องหมายอะพอสทรอฟี่):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
เราใช้การฉายภาพฟังก์ชันอีกครั้ง โปรดทราบว่าใน Q การสร้างรายการก็เป็นฟังก์ชันเช่นกัน และเราสามารถเรียกมันได้โดยใช้ฟังก์ชัน Each(map) เพื่อรับรายการ
เพื่อให้แน่ใจว่าชุดของคอลัมน์จากการคำนวณไม่ได้รับการแก้ไข เราจะสร้างนิพจน์ข้างต้นแบบไดนามิก ขั้นแรก เรามากำหนดฟังก์ชันเพื่อคำนวณแต่ละคอลัมน์ โดยใช้ตัวแปรแถวและ inp เพื่ออ้างถึงข้อมูลที่รวบรวมและป้อนข้อมูล:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
บางคอลัมน์มีลักษณะพิเศษ ฟังก์ชันไม่ควรคำนวณค่าแรก เราสามารถระบุได้ว่าเป็นคอลัมน์แรกในคอลัมน์แถว[`numTrades] - หากมี 0 แสดงว่าค่านั้นเป็นอันดับแรก Q มีฟังก์ชัน select - ?[Boolean list;list1;list2] - ซึ่งเลือกค่าจากรายการ 1 หรือ 2 ขึ้นอยู่กับเงื่อนไขในอาร์กิวเมนต์แรก:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
ที่นี่ฉันเรียกการมอบหมายทั่วไปด้วยฟังก์ชันของฉัน (นิพจน์ในวงเล็บปีกกา) ได้รับค่าปัจจุบัน (อาร์กิวเมนต์แรก) และอาร์กิวเมนต์เพิ่มเติม ซึ่งฉันส่งผ่านในพารามิเตอร์ที่ 4
มาเพิ่มลำโพงแบตเตอรี่แยกกันเนื่องจากฟังก์ชั่นจะเหมือนกันสำหรับลำโพงเหล่านี้:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
นี่เป็นการกำหนดปกติตามมาตรฐาน Q แต่ฉันกำลังกำหนดรายการค่าในคราวเดียว สุดท้ายนี้ มาสร้างฟังก์ชันหลักกันดีกว่า:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
ด้วยนิพจน์นี้ ฉันจะสร้างฟังก์ชันแบบไดนามิกจากสตริงที่มีนิพจน์ที่ฉันให้ไว้ข้างต้น ผลลัพธ์จะมีลักษณะดังนี้:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
ลำดับการประเมินคอลัมน์กลับด้าน เนื่องจากใน Q ลำดับการประเมินจะเป็นจากขวาไปซ้าย
ตอนนี้เรามีฟังก์ชันหลักสองฟังก์ชันที่จำเป็นสำหรับการคำนวณ เราเพียงแค่ต้องเพิ่มโครงสร้างพื้นฐานเพียงเล็กน้อยและบริการก็พร้อมใช้งาน
ขั้นตอนสุดท้าย
เรามีฟังก์ชัน preprocess และ updateAgg ที่ทำหน้าที่ทั้งหมด แต่ยังจำเป็นต้องตรวจสอบให้แน่ใจว่ามีการเปลี่ยนผ่านนาทีที่ถูกต้องและคำนวณดัชนีสำหรับการรวมกลุ่ม ก่อนอื่น เรามานิยามฟังก์ชัน init กันก่อน:
init:{
tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
offset::0; // индекс в tradeAgg, где начинается текущая минута
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
}
นอกจากนี้เรายังจะกำหนดฟังก์ชันม้วนซึ่งจะเปลี่ยนนาทีปัจจุบัน:
roll:{[tm]
if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
เราจะต้องมีฟังก์ชั่นเพื่อเพิ่มตัวละครใหม่:
addSyms:{[syms]
currSyms,::syms; // добавим в список известных
// добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
// Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
และสุดท้ายคือฟังก์ชัน upd (ชื่อดั้งเดิมของฟังก์ชันนี้สำหรับบริการ Q) ซึ่งไคลเอ็นต์เรียกเพื่อเพิ่มข้อมูล:
upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
data:select from data where time=tm; // фильтрация
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
};
นั่นคือทั้งหมดที่ นี่คือรหัสที่สมบูรณ์ของบริการของเราตามที่สัญญาไว้ เพียงไม่กี่บรรทัด:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
การทดสอบ
เรามาตรวจสอบประสิทธิภาพของบริการกันดีกว่า หากต้องการทำสิ่งนี้ ให้รันมันในกระบวนการแยกต่างหาก (ใส่โค้ดในไฟล์ service.q) แล้วเรียกใช้ฟังก์ชัน init:
q service.q –p 5566
q)init[]
ในคอนโซลอื่น ให้เริ่มกระบวนการ Q ที่สองและเชื่อมต่อกับกระบวนการแรก:
h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте
ขั้นแรก เรามาสร้างรายการสัญลักษณ์ 10000 ชิ้นและเพิ่มฟังก์ชันเพื่อสร้างตารางสุ่ม ในคอนโซลที่สอง:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
ฉันเพิ่มสัญลักษณ์จริงสามตัวลงในรายการเพื่อให้ง่ายต่อการค้นหาในตาราง ฟังก์ชัน rnd จะสร้างตารางสุ่มที่มี n แถว โดยที่เวลาจะแตกต่างกันไปตั้งแต่ t ถึง t+25 มิลลิวินาที
ตอนนี้คุณสามารถลองส่งข้อมูลไปยังบริการได้ (เพิ่มสิบชั่วโมงแรก):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
คุณสามารถตรวจสอบบริการว่าตารางได้รับการอัปเดตแล้ว:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
ผล:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919
ตอนนี้เรามาทำการทดสอบโหลดเพื่อดูว่าบริการสามารถประมวลผลข้อมูลได้มากเพียงใดต่อนาที ฉันขอเตือนคุณว่าเราตั้งค่าช่วงเวลาการอัปเดตเป็น 25 มิลลิวินาที ดังนั้น บริการจะต้อง (โดยเฉลี่ย) มีขนาดพอดีอย่างน้อย 20 มิลลิวินาทีต่อการอัปเดต เพื่อให้ผู้ใช้มีเวลาในการขอข้อมูล ป้อนข้อมูลต่อไปนี้ในกระบวนการที่สอง:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 คือสองนาที คุณสามารถลองรันก่อนเป็นจำนวน 1000 แถวทุกๆ 25 มิลลิวินาที:
start 1000
ในกรณีของฉัน ผลลัพธ์จะอยู่ที่ประมาณสองสามมิลลิวินาทีต่อการอัพเดต ฉันจะเพิ่มจำนวนแถวเป็น 10.000 ทันที:
start 10000
ผล:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
ขอย้ำอีกครั้งว่าไม่มีอะไรพิเศษ แต่นี่คือ 24 ล้านบรรทัดต่อนาที 400 ต่อวินาที เป็นเวลานานกว่า 25 มิลลิวินาที การอัปเดตช้าลงเพียง 5 ครั้ง ซึ่งเห็นได้ชัดเมื่อเปลี่ยนนาที เพิ่มเป็น 100.000:
start 100000
ผล:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
อย่างที่คุณเห็น บริการนี้แทบจะไม่สามารถรับมือได้ แต่ถึงกระนั้นก็สามารถจัดการให้ลอยไปได้ ปริมาณข้อมูลดังกล่าว (240 ล้านแถวต่อนาที) มีขนาดใหญ่มาก ในกรณีเช่นนี้ เป็นเรื่องปกติที่จะเปิดตัวบริการหลายโคลน (หรือหลายสิบโคลน) ซึ่งแต่ละโคลนจะประมวลผลอักขระเพียงบางส่วนเท่านั้น อย่างไรก็ตาม ผลลัพธ์ที่ได้ก็น่าประทับใจสำหรับภาษาที่แปลซึ่งเน้นไปที่การจัดเก็บข้อมูลเป็นหลัก
คำถามอาจเกิดขึ้นว่าทำไมเวลาจึงไม่เป็นเชิงเส้นตามขนาดของการอัปเดตแต่ละครั้ง เหตุผลก็คือ จริงๆ แล้วฟังก์ชันย่อขนาดนั้นเป็นฟังก์ชัน C ซึ่งมีประสิทธิภาพมากกว่า updateAgg มาก เริ่มต้นจากขนาดการอัปเดตที่กำหนด (ประมาณ 10.000) updateAgg ถึงขีดจำกัดแล้ว เวลาในการดำเนินการจะไม่ขึ้นอยู่กับขนาดการอัปเดต เนื่องจากขั้นตอน Q เบื้องต้นที่บริการสามารถแยกแยะข้อมูลปริมาณดังกล่าวได้ สิ่งนี้เน้นย้ำถึงความสำคัญของการเลือกอัลกอริธึมที่เหมาะสมเมื่อทำงานกับข้อมูลขนาดใหญ่ อีกจุดหนึ่งคือการจัดเก็บข้อมูลในหน่วยความจำที่ถูกต้อง หากข้อมูลไม่ได้ถูกจัดเก็บแบบเรียงเป็นแนวหรือไม่ได้เรียงลำดับตามเวลา เราจะคุ้นเคยกับสิ่งที่ขาดหายไปของแคช TLB - การไม่มีที่อยู่หน้าหน่วยความจำในแคชที่อยู่ของโปรเซสเซอร์ การค้นหาที่อยู่จะใช้เวลานานขึ้นประมาณ 30 เท่าหากไม่สำเร็จ และหากข้อมูลกระจัดกระจาย ก็อาจทำให้บริการช้าลงได้หลายครั้ง
ข้อสรุป
ในบทความนี้ ฉันแสดงให้เห็นว่าฐานข้อมูล KDB+ และ Q ไม่เพียงเหมาะสมสำหรับการจัดเก็บข้อมูลขนาดใหญ่และเข้าถึงได้ง่ายผ่านการเลือกเท่านั้น แต่ยังสำหรับการสร้างบริการประมวลผลข้อมูลที่สามารถย่อยข้อมูลหลายร้อยล้านแถว/กิกะไบต์ได้แม้ใน กระบวนการ Q เดียว ภาษา Q ช่วยให้สามารถใช้งานอัลกอริธึมที่เกี่ยวข้องกับการประมวลผลข้อมูลได้อย่างกระชับและมีประสิทธิภาพ เนื่องจากลักษณะของเวกเตอร์ มีล่ามภาษา SQL ในตัว และชุดฟังก์ชันไลบรารีที่ประสบความสำเร็จอย่างมาก
ฉันจะทราบว่าข้างต้นเป็นเพียงส่วนหนึ่งของสิ่งที่ Q สามารถทำได้ แต่ยังมีคุณสมบัติพิเศษอื่นๆ อีกด้วย ตัวอย่างเช่น โปรโตคอล IPC ที่เรียบง่ายอย่างยิ่งซึ่งจะลบขอบเขตระหว่างกระบวนการ Q แต่ละรายการ และช่วยให้คุณสามารถรวมกระบวนการเหล่านี้หลายร้อยกระบวนการไว้ในเครือข่ายเดียว ซึ่งสามารถตั้งอยู่บนเซิร์ฟเวอร์หลายสิบเครื่องในส่วนต่างๆ ของโลก
ที่มา: will.com