คุณสมบัติของภาษา Q และ KDB+ โดยใช้ตัวอย่างการบริการแบบเรียลไทม์

คุณสามารถอ่านเกี่ยวกับว่าฐาน KDB+ คืออะไร ภาษาการเขียนโปรแกรม Q คืออะไร จุดแข็งและจุดอ่อนของพวกเขาในครั้งก่อนของฉัน статье และในบทนำโดยย่อ ในบทความ เราจะใช้บริการบน Q ที่จะประมวลผลสตรีมข้อมูลที่เข้ามาและคำนวณฟังก์ชันการรวมต่างๆ ทุกนาทีในโหมด "เรียลไทม์" (นั่นคือ จะมีเวลาในการคำนวณทุกอย่างก่อนส่วนถัดไปของข้อมูล) คุณสมบัติหลักของ Q คือเป็นภาษาเวกเตอร์ที่ช่วยให้คุณทำงานได้ไม่ใช่กับวัตถุเดี่ยว แต่กับอาร์เรย์ อาร์เรย์ของอาร์เรย์ และวัตถุที่ซับซ้อนอื่นๆ ภาษาเช่น Q และญาติ K, J, APL มีชื่อเสียงในด้านความกะทัดรัด บ่อยครั้งที่โปรแกรมที่รับโค้ดหลายหน้าจอในภาษาที่คุ้นเคย เช่น Java สามารถเขียนลงในโค้ดได้เพียงไม่กี่บรรทัด นี่คือสิ่งที่ฉันต้องการแสดงให้เห็นในบทความนี้

คุณสมบัติของภาษา Q และ KDB+ โดยใช้ตัวอย่างการบริการแบบเรียลไทม์

การแนะนำ

KDB+ เป็นฐานข้อมูลแบบเรียงเป็นแนวที่เน้นไปที่ข้อมูลจำนวนมาก โดยเรียงลำดับในลักษณะเฉพาะ (โดยหลักตามเวลา) ส่วนใหญ่จะใช้ในสถาบันการเงิน - ธนาคาร กองทุนรวมที่ลงทุน บริษัทประกันภัย ภาษา Q คือภาษาภายในของ KDB+ ที่ช่วยให้คุณทำงานกับข้อมูลนี้ได้อย่างมีประสิทธิภาพ อุดมการณ์ Q คือความกะทัดรัดและมีประสิทธิภาพ ในขณะที่ความชัดเจนถูกละทิ้งไป นี่เป็นข้อพิสูจน์ด้วยความจริงที่ว่าภาษาเวกเตอร์จะเข้าใจยากไม่ว่าในกรณีใด และความกะทัดรัดและความสมบูรณ์ของการบันทึกช่วยให้คุณเห็นส่วนที่ใหญ่กว่ามากของโปรแกรมบนหน้าจอเดียว ซึ่งท้ายที่สุดแล้วจะทำให้เข้าใจได้ง่ายขึ้น

ในบทความนี้ เราได้ใช้โปรแกรมเต็มรูปแบบใน Q และคุณอาจต้องการลองใช้ดู ในการดำเนินการนี้ คุณจะต้องมี Q จริง คุณสามารถดาวน์โหลดเวอร์ชัน 32 บิตฟรีได้ที่เว็บไซต์บริษัท kx - www.kx.com. หากคุณสนใจคุณจะพบข้อมูลอ้างอิงเกี่ยวกับ Q หนังสือ ถามสำหรับมนุษย์ และบทความต่าง ๆ ในหัวข้อนี้

คำแถลงปัญหา

มีแหล่งที่มาที่ส่งตารางพร้อมข้อมูลทุกๆ 25 มิลลิวินาที เนื่องจาก KDB+ ใช้ในด้านการเงินเป็นหลัก เราจะถือว่านี่คือตารางธุรกรรม (การซื้อขาย) ซึ่งมีคอลัมน์ต่อไปนี้: เวลา (เวลาเป็นมิลลิวินาที), Sym (การกำหนดบริษัทในตลาดหลักทรัพย์ - ไอบีเอ็ม, AAPL,…), ราคา (ราคาที่ซื้อหุ้น), ขนาด (ขนาดของธุรกรรม) ช่วงเวลา 25 มิลลิวินาทีนั้นเป็นไปตามอำเภอใจ ไม่น้อยเกินไปและไม่ยาวเกินไป การมีอยู่หมายความว่าข้อมูลมาถึงบริการที่ถูกบัฟเฟอร์แล้ว คงจะเป็นเรื่องง่ายที่จะใช้การบัฟเฟอร์ในฝั่งบริการ รวมถึงการบัฟเฟอร์แบบไดนามิกขึ้นอยู่กับโหลดปัจจุบัน แต่เพื่อความง่าย เราจะมุ่งเน้นไปที่ช่วงเวลาที่คงที่

บริการจะต้องนับทุกนาทีสำหรับแต่ละสัญลักษณ์ที่เข้ามาจากคอลัมน์ sym ชุดของฟังก์ชันการรวม - ราคาสูงสุด ราคาเฉลี่ย ขนาดผลรวม ฯลฯ ข้อมูลที่เป็นประโยชน์. เพื่อความง่าย เราจะถือว่าฟังก์ชันทั้งหมดสามารถคำนวณแบบเพิ่มทีละน้อยได้ เช่น เพื่อให้ได้ค่าใหม่ก็เพียงพอที่จะทราบตัวเลขสองตัว - ค่าเก่าและค่าขาเข้า ตัวอย่างเช่น ฟังก์ชันสูงสุด ค่าเฉลี่ย ผลรวมมีคุณสมบัตินี้ แต่ฟังก์ชันมัธยฐานไม่มี

นอกจากนี้เรายังจะถือว่าสตรีมข้อมูลที่เข้ามานั้นเป็นไปตามลำดับเวลา นี่จะทำให้เรามีโอกาสที่จะทำงานเฉพาะในนาทีสุดท้ายเท่านั้น ในทางปฏิบัติ การทำงานกับนาทีปัจจุบันและนาทีก่อนหน้าก็เพียงพอแล้ว ในกรณีที่การอัปเดตบางอย่างล่าช้า เพื่อความง่าย เราจะไม่พิจารณากรณีนี้

ฟังก์ชันการรวมกลุ่ม

ฟังก์ชันการรวมที่จำเป็นแสดงอยู่ด้านล่างนี้ ฉันใช้มันให้มากที่สุดเท่าที่จะเป็นไปได้เพื่อเพิ่มภาระในบริการ:

  • สูง – ราคาสูงสุด – ราคาสูงสุดต่อนาที
  • ราคาต่ำ – ราคาต่ำสุด – ราคาขั้นต่ำต่อนาที
  • ราคาแรก – ราคาแรก – ราคาแรกต่อนาที
  • LastPrice – ราคาสุดท้าย – ราคาสุดท้ายต่อนาที
  • firstSize – ขนาดแรก – ขนาดการซื้อขายครั้งแรกต่อนาที
  • LastSize – ขนาดสุดท้าย – ขนาดการซื้อขายสุดท้ายในหนึ่งนาที
  • numTrades – นับ i – จำนวนการซื้อขายต่อนาที
  • ปริมาณ – ขนาดผลรวม – ผลรวมของขนาดการซื้อขายต่อนาที
  • pvolume – ราคารวม – ผลรวมของราคาต่อนาที ที่จำเป็นสำหรับ avgPrice
  • – ราคาหมุนเวียนรวม*ขนาด – ปริมาณธุรกรรมทั้งหมดต่อนาที
  • ราคาเฉลี่ย – pvolume%numTrades – ราคาเฉลี่ยต่อนาที
  • avgSize – ปริมาณ%numTrades – ขนาดการซื้อขายเฉลี่ยต่อนาที
  • vwap – มูลค่าการซื้อขาย% ปริมาณ – ราคาเฉลี่ยต่อนาทีถ่วงน้ำหนักด้วยขนาดธุรกรรม
  • cumVolume – ปริมาณรวม – ขนาดธุรกรรมสะสมตลอดเวลา

เรามาพูดถึงประเด็นที่ไม่ชัดเจนจุดหนึ่งทันที - วิธีเริ่มต้นคอลัมน์เหล่านี้ในครั้งแรกและในแต่ละนาทีต่อ ๆ ไป บางคอลัมน์ของประเภท firstPrice จะต้องเริ่มต้นเป็น null ในแต่ละครั้ง โดยไม่ได้กำหนดค่าคอลัมน์เหล่านั้น ประเภทวอลุ่มอื่นๆ จะต้องตั้งค่าเป็น 0 เสมอ นอกจากนี้ยังมีคอลัมน์ที่ต้องใช้วิธีรวม เช่น ต้องคัดลอก cumVolume จากนาทีก่อนหน้า และสำหรับอันแรกตั้งค่าเป็น 0 มาตั้งค่าพารามิเตอร์เหล่านี้ทั้งหมดโดยใช้ข้อมูลพจนานุกรม ประเภท (คล้ายกับบันทึก):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

ฉันเพิ่ม Sym และเวลาลงในพจนานุกรมเพื่อความสะดวก ตอนนี้ initWith เป็นบรรทัดสำเร็จรูปจากตารางรวมสุดท้าย ซึ่งยังคงต้องตั้งค่า Sym และเวลาที่ถูกต้อง คุณสามารถใช้มันเพื่อเพิ่มแถวใหม่ลงในตารางได้

เราจะต้องมี aggCols เมื่อสร้างฟังก์ชันการรวมกลุ่ม รายการจะต้องกลับด้านตามลำดับการประเมินนิพจน์ใน Q (จากขวาไปซ้าย) เป้าหมายคือเพื่อให้แน่ใจว่าการคำนวณเปลี่ยนจากสูงไปจนถึง cumVolume เนื่องจากบางคอลัมน์ขึ้นอยู่กับคอลัมน์ก่อนหน้า

คอลัมน์ที่ต้องคัดลอกไปยังนาทีใหม่จากก่อนหน้า คอลัมน์ Sym จะถูกเพิ่มเพื่อความสะดวก:

rollColumns:`sym`cumVolume;

ตอนนี้เรามาแบ่งคอลัมน์ออกเป็นกลุ่มตามวิธีที่ควรอัปเดต สามารถแยกแยะได้สามประเภท:

  1. ตัวสะสม (ปริมาณ, มูลค่าการซื้อขาย,..) – เราต้องเพิ่มมูลค่าที่เข้ามาให้กับค่าก่อนหน้า
  2. ด้วยจุดพิเศษ (สูง, ต่ำ, ..) – ค่าแรกในนาทีนำมาจากข้อมูลที่เข้ามา ส่วนที่เหลือจะถูกคำนวณโดยใช้ฟังก์ชัน
  3. พักผ่อน. คำนวณโดยใช้ฟังก์ชันเสมอ

มากำหนดตัวแปรสำหรับคลาสเหล่านี้กันดีกว่า:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

ลำดับการคำนวณ

เราจะอัปเดตตารางรวมเป็นสองขั้นตอน เพื่อประสิทธิภาพ ก่อนอื่นเราจะย่อตารางขาเข้าเพื่อให้มีเพียงแถวเดียวสำหรับแต่ละอักขระและนาที ความจริงที่ว่าฟังก์ชันทั้งหมดของเรามีแบบเพิ่มหน่วยและเชื่อมโยงรับประกันว่าผลลัพธ์ของขั้นตอนเพิ่มเติมนี้จะไม่เปลี่ยนแปลง คุณสามารถย่อตารางโดยใช้ตัวเลือก:

select high:max price, low:min price … by sym,time.minute from table

วิธีนี้มีข้อเสีย - ชุดของคอลัมน์จากการคำนวณถูกกำหนดไว้ล่วงหน้าแล้ว โชคดีที่ใน Q นั้น select ยังถูกนำมาใช้เป็นฟังก์ชันที่คุณสามารถแทนที่อาร์กิวเมนต์ที่สร้างขึ้นแบบไดนามิก:

?[table;whereClause;byClause;selectClause]

ฉันจะไม่อธิบายรายละเอียดเกี่ยวกับรูปแบบของข้อโต้แย้ง ในกรณีของเรา เฉพาะนิพจน์โดยและที่เลือกเท่านั้นที่จะไม่สำคัญ และควรเป็นพจนานุกรมของคอลัมน์แบบฟอร์ม!นิพจน์ ดังนั้นฟังก์ชันการย่อขนาดสามารถกำหนดได้ดังนี้:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

เพื่อความชัดเจน ฉันใช้ฟังก์ชัน parse ซึ่งเปลี่ยนสตริงที่มีนิพจน์ Q เป็นค่าที่สามารถส่งผ่านไปยังฟังก์ชัน eval และจำเป็นในการเลือกฟังก์ชัน โปรดทราบว่าการประมวลผลล่วงหน้าถูกกำหนดให้เป็นเส้นโครง (เช่น ฟังก์ชันที่มีอาร์กิวเมนต์ที่กำหนดบางส่วน) ของฟังก์ชันเลือก โดยอาร์กิวเมนต์หนึ่งตัว (ตาราง) หายไป หากเราใช้การประมวลผลล่วงหน้ากับตาราง เราจะได้ตารางที่บีบอัด

ขั้นตอนที่สองกำลังอัปเดตตารางรวม ขั้นแรกให้เขียนอัลกอริทึมเป็นรหัสเทียม:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

ใน Q เป็นเรื่องปกติที่จะใช้ฟังก์ชัน map/reduce แทนการวนซ้ำ แต่เนื่องจาก Q เป็นภาษาเวกเตอร์ และเราสามารถใช้การดำเนินการทั้งหมดกับสัญลักษณ์ทั้งหมดในคราวเดียวได้อย่างง่ายดาย จากนั้นจึงทำการประมาณครั้งแรกโดยไม่ต้องวนซ้ำเลย โดยดำเนินการกับสัญลักษณ์ทั้งหมดในคราวเดียว:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

แต่เราสามารถไปไกลกว่านี้ได้ Q มีโอเปอเรเตอร์ที่มีเอกลักษณ์และทรงพลังอย่างยิ่ง - โอเปอเรเตอร์ที่ได้รับมอบหมายทั่วไป ช่วยให้คุณสามารถเปลี่ยนชุดของค่าในโครงสร้างข้อมูลที่ซับซ้อนโดยใช้รายการดัชนี ฟังก์ชัน และอาร์กิวเมนต์ ในกรณีของเราดูเหมือนว่านี้:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

ขออภัย หากต้องการกำหนดให้กับตาราง คุณต้องมีรายการแถว ไม่ใช่คอลัมน์ และคุณต้องย้ายเมทริกซ์ (รายการคอลัมน์เป็นรายการแถว) โดยใช้ฟังก์ชันพลิก ซึ่งมีราคาแพงสำหรับตารางขนาดใหญ่ ดังนั้นเราจึงใช้การกำหนดทั่วไปกับแต่ละคอลัมน์แยกกัน โดยใช้ฟังก์ชันแผนที่ (ซึ่งดูเหมือนเครื่องหมายอะพอสทรอฟี่):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

เราใช้การฉายภาพฟังก์ชันอีกครั้ง โปรดทราบว่าใน Q การสร้างรายการก็เป็นฟังก์ชันเช่นกัน และเราสามารถเรียกมันได้โดยใช้ฟังก์ชัน Each(map) เพื่อรับรายการ

เพื่อให้แน่ใจว่าชุดของคอลัมน์จากการคำนวณไม่ได้รับการแก้ไข เราจะสร้างนิพจน์ข้างต้นแบบไดนามิก ขั้นแรก เรามากำหนดฟังก์ชันเพื่อคำนวณแต่ละคอลัมน์ โดยใช้ตัวแปรแถวและ inp เพื่ออ้างถึงข้อมูลที่รวบรวมและป้อนข้อมูล:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

บางคอลัมน์มีลักษณะพิเศษ ฟังก์ชันไม่ควรคำนวณค่าแรก เราสามารถระบุได้ว่าเป็นคอลัมน์แรกในคอลัมน์แถว[`numTrades] - หากมี 0 แสดงว่าค่านั้นเป็นอันดับแรก Q มีฟังก์ชัน select - ?[Boolean list;list1;list2] - ซึ่งเลือกค่าจากรายการ 1 หรือ 2 ขึ้นอยู่กับเงื่อนไขในอาร์กิวเมนต์แรก:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

ที่นี่ฉันเรียกการมอบหมายทั่วไปด้วยฟังก์ชันของฉัน (นิพจน์ในวงเล็บปีกกา) ได้รับค่าปัจจุบัน (อาร์กิวเมนต์แรก) และอาร์กิวเมนต์เพิ่มเติม ซึ่งฉันส่งผ่านในพารามิเตอร์ที่ 4

มาเพิ่มลำโพงแบตเตอรี่แยกกันเนื่องจากฟังก์ชั่นจะเหมือนกันสำหรับลำโพงเหล่านี้:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

นี่เป็นการกำหนดปกติตามมาตรฐาน Q แต่ฉันกำลังกำหนดรายการค่าในคราวเดียว สุดท้ายนี้ มาสร้างฟังก์ชันหลักกันดีกว่า:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

ด้วยนิพจน์นี้ ฉันจะสร้างฟังก์ชันแบบไดนามิกจากสตริงที่มีนิพจน์ที่ฉันให้ไว้ข้างต้น ผลลัพธ์จะมีลักษณะดังนี้:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

ลำดับการประเมินคอลัมน์กลับด้าน เนื่องจากใน Q ลำดับการประเมินจะเป็นจากขวาไปซ้าย

ตอนนี้เรามีฟังก์ชันหลักสองฟังก์ชันที่จำเป็นสำหรับการคำนวณ เราเพียงแค่ต้องเพิ่มโครงสร้างพื้นฐานเพียงเล็กน้อยและบริการก็พร้อมใช้งาน

ขั้นตอนสุดท้าย

เรามีฟังก์ชัน preprocess และ updateAgg ที่ทำหน้าที่ทั้งหมด แต่ยังจำเป็นต้องตรวจสอบให้แน่ใจว่ามีการเปลี่ยนผ่านนาทีที่ถูกต้องและคำนวณดัชนีสำหรับการรวมกลุ่ม ก่อนอื่น เรามานิยามฟังก์ชัน init กันก่อน:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

นอกจากนี้เรายังจะกำหนดฟังก์ชันม้วนซึ่งจะเปลี่ยนนาทีปัจจุบัน:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

เราจะต้องมีฟังก์ชั่นเพื่อเพิ่มตัวละครใหม่:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

และสุดท้ายคือฟังก์ชัน upd (ชื่อดั้งเดิมของฟังก์ชันนี้สำหรับบริการ Q) ซึ่งไคลเอ็นต์เรียกเพื่อเพิ่มข้อมูล:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

นั่นคือทั้งหมดที่ นี่คือรหัสที่สมบูรณ์ของบริการของเราตามที่สัญญาไว้ เพียงไม่กี่บรรทัด:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

การทดสอบ

เรามาตรวจสอบประสิทธิภาพของบริการกันดีกว่า หากต้องการทำสิ่งนี้ ให้รันมันในกระบวนการแยกต่างหาก (ใส่โค้ดในไฟล์ service.q) แล้วเรียกใช้ฟังก์ชัน init:

q service.q –p 5566

q)init[]

ในคอนโซลอื่น ให้เริ่มกระบวนการ Q ที่สองและเชื่อมต่อกับกระบวนการแรก:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

ขั้นแรก เรามาสร้างรายการสัญลักษณ์ 10000 ชิ้นและเพิ่มฟังก์ชันเพื่อสร้างตารางสุ่ม ในคอนโซลที่สอง:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

ฉันเพิ่มสัญลักษณ์จริงสามตัวลงในรายการเพื่อให้ง่ายต่อการค้นหาในตาราง ฟังก์ชัน rnd จะสร้างตารางสุ่มที่มี n แถว โดยที่เวลาจะแตกต่างกันไปตั้งแต่ t ถึง t+25 มิลลิวินาที

ตอนนี้คุณสามารถลองส่งข้อมูลไปยังบริการได้ (เพิ่มสิบชั่วโมงแรก):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

คุณสามารถตรวจสอบบริการว่าตารางได้รับการอัปเดตแล้ว:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

ผล:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

ตอนนี้เรามาทำการทดสอบโหลดเพื่อดูว่าบริการสามารถประมวลผลข้อมูลได้มากเพียงใดต่อนาที ฉันขอเตือนคุณว่าเราตั้งค่าช่วงเวลาการอัปเดตเป็น 25 มิลลิวินาที ดังนั้น บริการจะต้อง (โดยเฉลี่ย) มีขนาดพอดีอย่างน้อย 20 มิลลิวินาทีต่อการอัปเดต เพื่อให้ผู้ใช้มีเวลาในการขอข้อมูล ป้อนข้อมูลต่อไปนี้ในกระบวนการที่สอง:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 คือสองนาที คุณสามารถลองรันก่อนเป็นจำนวน 1000 แถวทุกๆ 25 มิลลิวินาที:

start 1000

ในกรณีของฉัน ผลลัพธ์จะอยู่ที่ประมาณสองสามมิลลิวินาทีต่อการอัพเดต ฉันจะเพิ่มจำนวนแถวเป็น 10.000 ทันที:

start 10000

ผล:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

ขอย้ำอีกครั้งว่าไม่มีอะไรพิเศษ แต่นี่คือ 24 ล้านบรรทัดต่อนาที 400 ต่อวินาที เป็นเวลานานกว่า 25 มิลลิวินาที การอัปเดตช้าลงเพียง 5 ครั้ง ซึ่งเห็นได้ชัดเมื่อเปลี่ยนนาที เพิ่มเป็น 100.000:

start 100000

ผล:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

อย่างที่คุณเห็น บริการนี้แทบจะไม่สามารถรับมือได้ แต่ถึงกระนั้นก็สามารถจัดการให้ลอยไปได้ ปริมาณข้อมูลดังกล่าว (240 ล้านแถวต่อนาที) มีขนาดใหญ่มาก ในกรณีเช่นนี้ เป็นเรื่องปกติที่จะเปิดตัวบริการหลายโคลน (หรือหลายสิบโคลน) ซึ่งแต่ละโคลนจะประมวลผลอักขระเพียงบางส่วนเท่านั้น อย่างไรก็ตาม ผลลัพธ์ที่ได้ก็น่าประทับใจสำหรับภาษาที่แปลซึ่งเน้นไปที่การจัดเก็บข้อมูลเป็นหลัก

คำถามอาจเกิดขึ้นว่าทำไมเวลาจึงไม่เป็นเชิงเส้นตามขนาดของการอัปเดตแต่ละครั้ง เหตุผลก็คือ จริงๆ แล้วฟังก์ชันย่อขนาดนั้นเป็นฟังก์ชัน C ซึ่งมีประสิทธิภาพมากกว่า updateAgg มาก เริ่มต้นจากขนาดการอัปเดตที่กำหนด (ประมาณ 10.000) updateAgg ถึงขีดจำกัดแล้ว เวลาในการดำเนินการจะไม่ขึ้นอยู่กับขนาดการอัปเดต เนื่องจากขั้นตอน Q เบื้องต้นที่บริการสามารถแยกแยะข้อมูลปริมาณดังกล่าวได้ สิ่งนี้เน้นย้ำถึงความสำคัญของการเลือกอัลกอริธึมที่เหมาะสมเมื่อทำงานกับข้อมูลขนาดใหญ่ อีกจุดหนึ่งคือการจัดเก็บข้อมูลในหน่วยความจำที่ถูกต้อง หากข้อมูลไม่ได้ถูกจัดเก็บแบบเรียงเป็นแนวหรือไม่ได้เรียงลำดับตามเวลา เราจะคุ้นเคยกับสิ่งที่ขาดหายไปของแคช TLB - การไม่มีที่อยู่หน้าหน่วยความจำในแคชที่อยู่ของโปรเซสเซอร์ การค้นหาที่อยู่จะใช้เวลานานขึ้นประมาณ 30 เท่าหากไม่สำเร็จ และหากข้อมูลกระจัดกระจาย ก็อาจทำให้บริการช้าลงได้หลายครั้ง

ข้อสรุป

ในบทความนี้ ฉันแสดงให้เห็นว่าฐานข้อมูล KDB+ และ Q ไม่เพียงเหมาะสมสำหรับการจัดเก็บข้อมูลขนาดใหญ่และเข้าถึงได้ง่ายผ่านการเลือกเท่านั้น แต่ยังสำหรับการสร้างบริการประมวลผลข้อมูลที่สามารถย่อยข้อมูลหลายร้อยล้านแถว/กิกะไบต์ได้แม้ใน กระบวนการ Q เดียว ภาษา Q ช่วยให้สามารถใช้งานอัลกอริธึมที่เกี่ยวข้องกับการประมวลผลข้อมูลได้อย่างกระชับและมีประสิทธิภาพ เนื่องจากลักษณะของเวกเตอร์ มีล่ามภาษา SQL ในตัว และชุดฟังก์ชันไลบรารีที่ประสบความสำเร็จอย่างมาก

ฉันจะทราบว่าข้างต้นเป็นเพียงส่วนหนึ่งของสิ่งที่ Q สามารถทำได้ แต่ยังมีคุณสมบัติพิเศษอื่นๆ อีกด้วย ตัวอย่างเช่น โปรโตคอล IPC ที่เรียบง่ายอย่างยิ่งซึ่งจะลบขอบเขตระหว่างกระบวนการ Q แต่ละรายการ และช่วยให้คุณสามารถรวมกระบวนการเหล่านี้หลายร้อยกระบวนการไว้ในเครือข่ายเดียว ซึ่งสามารถตั้งอยู่บนเซิร์ฟเวอร์หลายสิบเครื่องในส่วนต่างๆ ของโลก

ที่มา: will.com

เพิ่มความคิดเห็น