میتوانید در پست قبلی من در مورد اینکه پایگاه داده KDB+ و زبان برنامهنویسی Q چیستند و نقاط قوت و ضعف آنها چیست، مطالعه کنید. و به طور خلاصه در مقدمه. در این مقاله، ما سرویسی را در Q پیادهسازی خواهیم کرد که یک جریان داده ورودی را پردازش کرده و توابع مختلف تجمیع را دقیقه به دقیقه در "زمان واقعی" محاسبه میکند (یعنی، زمان کافی برای محاسبه همه چیز قبل از دسته داده بعدی را خواهد داشت). ویژگی اصلی Q این است که یک زبان برداری است و به ما امکان میدهد نه روی اشیاء منفرد، بلکه روی آرایههایی از آنها، آرایههایی از آرایهها و سایر اشیاء پیچیده عمل کنیم. زبانهایی مانند Q و زبانهای مرتبط با آن، K، J و APL، به دلیل اختصارشان مشهور هستند. اغلب، برنامهای که چندین صفحه کد را در یک زبان آشنا مانند جاوا اشغال میکند، میتواند در چند خط نوشته شود. این دقیقاً همان چیزی است که میخواهم در این مقاله نشان دهم.

معرفی
KDB+ یک پایگاه داده ستونی است که برای حجم بسیار زیادی از دادهها که به شیوهای خاص (عمدتاً بر اساس زمان) سازماندهی شدهاند، طراحی شده است. این پایگاه داده عمدتاً در موسسات مالی مانند بانکها، صندوقهای سرمایهگذاری و شرکتهای بیمه استفاده میشود. Q زبان داخلی KDB+ است که امکان کار کارآمد با این دادهها را فراهم میکند. فلسفه Q اختصار و کارایی است و وضوح را فدا میکند. دلیل این امر این است که یک زبان مبتنی بر بردار درک آن دشوار خواهد بود، در حالی که اختصار و غنای آن امکان نمایش بخش بسیار بیشتری از برنامه را در یک صفحه واحد فراهم میکند و در نهایت درک آن را آسانتر میکند.
در این مقاله، ما یک برنامه کامل را در Q پیادهسازی خواهیم کرد و شاید بخواهید آن را امتحان کنید. برای انجام این کار، به خود Q نیاز دارید. میتوانید نسخه ۳۲ بیتی رایگان را از وبسایت kx دانلود کنید – در آنجا، اگر علاقهمند باشید، اطلاعات مرجعی در مورد کتاب Q نیز خواهید یافت. و مقالات مختلف در این زمینه.
بیانیه مشکل
منبعی وجود دارد که هر ۲۵ میلیثانیه یک جدول داده ارسال میکند. از آنجایی که KDB+ عمدتاً در امور مالی استفاده میشود، فرض میکنیم که یک جدول معاملات با ستونهای زیر است: time (زمان بر حسب میلیثانیه)، sym (نماد شرکت در بورس اوراق بهادار -) آی بی ام, AAPL،...)، قیمت (قیمتی که سهام با آن خریداری شده است) و اندازه (اندازه تراکنش). فاصله زمانی ۲۵ میلیثانیه به صورت دلخواه انتخاب شده است؛ نه خیلی کوچک است و نه خیلی بزرگ. وجود آن به این معنی است که دادههایی که به سرویس میرسند از قبل بافر شدهاند. پیادهسازی بافرینگ در سمت سرویس، از جمله بافرینگ پویا بر اساس بار فعلی، آسان خواهد بود، اما برای سادگی، ما به یک فاصله زمانی ثابت پایبند خواهیم بود.
این سرویس باید مجموعهای از توابع تجمیع - حداکثر قیمت، میانگین قیمت، اندازه مجموع و سایر اطلاعات مفید - را در هر دقیقه برای هر نماد ورودی از ستون sym محاسبه کند. برای سادگی، فرض میکنیم که همه توابع را میتوان به صورت افزایشی محاسبه کرد، به این معنی که برای به دست آوردن یک مقدار جدید، دو عدد - مقدار قدیمی و مقدار ورودی - کافی است. به عنوان مثال، توابع max، average و sum این ویژگی را دارند، اما تابع median این ویژگی را ندارد.
همچنین فرض میکنیم که جریان دادههای ورودی بر اساس زمان مرتب شدهاند. این به ما اجازه میدهد فقط با آخرین دقیقه کار کنیم. در عمل، کافی است که بتوانیم با دقیقههای فعلی و قبلی کار کنیم تا در صورت تأخیر در بهروزرسانیها، بتوانیم از آنها استفاده کنیم. برای سادگی، این مورد را در نظر نمیگیریم.
توابع تجمیع
در زیر توابع تجمیع مورد نیاز آمده است. من تا حد امکان برای افزایش بار سرویس، تعداد بیشتری را در نظر گرفتم:
- بالاترین – حداکثر قیمت – حداکثر قیمت در هر دقیقه.
- پایین - حداقل قیمت - حداقل قیمت در هر دقیقه.
- قیمت اول – قیمت اول – قیمت اول در هر دقیقه.
- آخرین قیمت – آخرین قیمت – آخرین قیمت در هر دقیقه.
- firstSize – اندازه اول – اندازه اولین تراکنش در هر دقیقه.
- lastSize – آخرین اندازه — آخرین اندازه تراکنش در هر دقیقه.
- numTrades – تعداد معاملات در دقیقه – تعداد معاملات در دقیقه.
- حجم - مجموع اندازه - مجموع اندازه تراکنشها در هر دقیقه.
- pvolume – sum price – مجموع قیمتها در هر دقیقه، مورد نیاز برای avgPrice.
- گردش مالی - مجموع قیمت * اندازه - حجم کل تراکنشها در هر دقیقه.
- میانگین قیمت - pvolume%numTrades - میانگین قیمت در هر دقیقه.
- avgSize – volume%numTrades – میانگین حجم معامله در هر دقیقه.
- vwap – حجم معاملات – میانگین قیمت در هر دقیقه، وزندهی شده با اندازه معامله.
- cumVolume – مجموع حجم – حجم انباشته تراکنشها در کل دوره.
بیایید همین الان یک نکتهی نه چندان واضح را بررسی کنیم: نحوهی مقداردهی اولیهی این ستونها برای اولین بار و برای هر دقیقهی بعدی. برخی از ستونها، مانند firstPrice، باید هر بار با مقدار null مقداردهی اولیه شوند؛ مقدار آنها تعریف نشده است. برخی دیگر، مانند volume، باید همیشه روی 0 تنظیم شوند. همچنین ستونهایی وجود دارند که نیاز به یک رویکرد ترکیبی دارند - برای مثال، cumVolume باید از دقیقهی قبل کپی شود و برای دقیقهی اول روی 0 تنظیم شود. ما تمام این پارامترها را با استفاده از نوع دادهی دیکشنری (شبیه به یک رکورد) تعریف خواهیم کرد:
// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
برای راحتی کار، sym و time را به دیکشنری اضافه کردم. حالا initWith یک ردیف آماده از جدول تجمیعشدهی نهایی است که sym و time صحیح آن هنوز مشخص نشدهاند. میتوانید از آن برای اضافه کردن ردیفهای جدید به جدول استفاده کنید.
هنگام ایجاد تابع تجمیعی به aggCols نیاز خواهیم داشت. به دلیل ترتیب ارزیابی عبارت در Q (از راست به چپ)، لیست باید معکوس شود. هدف این است که اطمینان حاصل شود که ارزیابی از high به cumVolume ادامه مییابد، زیرا برخی ستونها به ستونهای قبلی وابسته هستند.
ستونهایی که باید از دقیقه قبلی به دقیقه جدید کپی شوند، ستون sym برای راحتی اضافه میشود:
rollColumns:`sym`cumVolume;
حالا بیایید ستونها را بر اساس نحوهی بهروزرسانیشان به گروههایی تقسیم کنیم. سه نوع ستون قابل تشخیص است:
- انبارهها (حجم، گردش مالی، ..) - ما باید مقدار ورودی را به مقدار قبلی اضافه کنیم.
- با یک نقطه خاص (بالا، پایین، ..) - اولین مقدار در دقیقه از دادههای ورودی گرفته میشود، بقیه با استفاده از تابع محاسبه میشوند.
- بقیه همیشه با استفاده از تابع محاسبه میشوند.
بیایید برای این کلاسها متغیر تعریف کنیم:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
ترتیب محاسبه
ما جدول تجمیعی را در دو مرحله بهروزرسانی خواهیم کرد. برای کارایی بیشتر، ابتدا جدول ورودی را کوچک میکنیم تا برای هر نماد و دقیقه یک ردیف داشته باشد. این واقعیت که تمام توابع ما افزایشی و انجمنی هستند، تضمین میکند که نتیجه با این مرحله اضافی تغییر نخواهد کرد. میتوانیم جدول را با استفاده از یک دستور select کوچک کنیم:
select high:max price, low:min price … by sym,time.minute from table
این روش یک اشکال دارد: مجموعه ستونهای محاسبهشده از پیش تعریفشده است. خوشبختانه، Q همچنین select را به عنوان تابعی پیادهسازی میکند که آرگومانهای تولیدشده به صورت پویا را میپذیرد:
?[table;whereClause;byClause;selectClause]
من قالب آرگومان را با جزئیات شرح نمیدهم؛ در مورد ما، تنها عبارات غیربدیهی، عبارات by و select هستند و باید دیکشنریهایی از فرم columns!expressions باشند. بنابراین، تابع فشردهسازی را میتوان به صورت زیر تعریف کرد:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
برای وضوح بیشتر، من از تابع parse استفاده کردم که رشتهای حاوی عبارت Q را به مقداری تبدیل میکند که میتواند به تابع eval ارسال شود و در تابع select مورد نیاز است. همچنین توجه داشته باشید که preprocess به عنوان یک projection (یعنی تابعی با آرگومانهای نیمه تعریف شده) از تابع select تعریف میشود؛ یک آرگومان (جدول) وجود ندارد. اگر preprocess را روی یک جدول اعمال کنیم، یک جدول فشرده دریافت میکنیم.
مرحله دوم، بهروزرسانی جدول تجمیعشده است. ابتدا الگوریتم را در شبهکد مینویسیم:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
…
در Q، استفاده از توابع نگاشت/کاهش به جای حلقهها رایج است. اما از آنجایی که Q یک زبان برداری است و میتوانیم با خیال راحت همه عملیات را روی همه نمادها به طور همزمان اعمال کنیم، میتوانیم، به عنوان یک تقریب اول، با انجام عملیات روی همه نمادها به طور همزمان، حلقه را به طور کامل حذف کنیم:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…
اما میتوانیم حتی فراتر برویم. Q یک عملگر منحصر به فرد و فوقالعاده قدرتمند دارد - عملگر انتساب تعمیمیافته. این عملگر به شما امکان میدهد مجموعهای از مقادیر را در یک ساختار داده پیچیده با استفاده از لیستی از شاخصها، توابع و آرگومانها تغییر دهید. در مورد ما، این به این شکل است:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
متأسفانه، انتساب به یک جدول نیاز به لیستی از ردیفها دارد، نه ستونها، و نیاز به جابجایی ماتریس (لیست ستونها به لیستی از ردیفها) با استفاده از تابع flip دارد. برای یک جدول بزرگ، این کار پرهزینه است، بنابراین در عوض، ما یک انتساب عمومی را به صورت جداگانه با استفاده از تابع map (که شبیه آپاستروف است) به هر ستون اعمال میکنیم:
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
ما دوباره از تابع projection استفاده میکنیم. همچنین توجه داشته باشید که در Q، ایجاد لیست نیز یک تابع است و میتوانیم آن را با استفاده از each(map) فراخوانی کنیم تا لیستی از لیستها را دریافت کنیم.
برای جلوگیری از ایجاد مجموعهای ثابت از ستونهای محاسبهشده، بیایید عبارت فوق را به صورت پویا ایجاد کنیم. ابتدا، توابعی را برای محاسبه هر ستون تعریف میکنیم که از متغیرهای row و inp برای ارجاع به دادههای تجمیعشده و ورودی استفاده میکنند:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
بعضی از ستونها خاص هستند؛ مقدار اول آنها نباید توسط تابع محاسبه شود. میتوانیم با استفاده از ستون row[`numTrades] تعیین کنیم که اولین مقدار است - اگر 0 باشد، مقدار اول است. Q یک تابع select دارد -?[Boolean list;list1;list2] - که بسته به شرط موجود در آرگومان اول، مقداری را از لیست 1 یا 2 انتخاب میکند:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
در اینجا من یک انتساب عمومی را با تابع خودم (عبارت داخل آکولاد) فراخوانی کردهام. مقدار فعلی (اولین آرگومان) و یک آرگومان اضافی که در پارامتر چهارم ارسال میکنم، به آن ارسال شده است.
بیایید بلندگوهای باتریدار را جداگانه اضافه کنیم، زیرا آنها عملکرد مشابهی دارند:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
این یک انتساب معمولی طبق استانداردهای Q است، اما من لیستی از مقادیر را به طور همزمان انتساب میدهم. در نهایت، بیایید تابع main را ایجاد کنیم:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
با این عبارت، من به صورت پویا یک تابع از رشتهای که شامل عبارتی است که در بالا ارائه دادم، ایجاد میکنم. نتیجه به این شکل خواهد بود:
{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
ترتیب ارزیابی ستونها معکوس است، زیرا در Q ترتیب ارزیابی از راست به چپ است.
حالا ما دو تابع اصلی مورد نیاز برای محاسبات داریم، تنها چیزی که باقی مانده اضافه کردن کمی زیرساخت است و سرویس آماده است.
مراحل پایانی
ما توابع پیشپردازش و بهروزرسانیAgg را داریم که تمام کارها را انجام میدهند. اما هنوز باید از انتقال صحیح بین دقایق اطمینان حاصل کنیم و شاخصها را برای تجمیع محاسبه کنیم. ابتدا، بیایید تابع init را تعریف کنیم:
init:{
tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
offset::0; // индекс в tradeAgg, где начинается текущая минута
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
}
همچنین یک تابع roll تعریف خواهیم کرد که دقیقه فعلی را تغییر میدهد:
roll:{[tm]
if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
برای اضافه کردن کاراکترهای جدید به یک تابع نیاز داریم:
addSyms:{[syms]
currSyms,::syms; // добавим в список известных
// добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
// Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
و در نهایت، تابع upd (نام سنتی این تابع برای سرویسهای Q)، که توسط کلاینت برای اضافه کردن دادهها فراخوانی میشود:
upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
data:select from data where time=tm; // фильтрация
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
};
همین. همانطور که قول داده بودیم، کد کامل سرویس ما در اینجا آمده است، فقط چند خط:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
آزمایش
بیایید عملکرد سرویس را آزمایش کنیم. برای انجام این کار، آن را در یک فرآیند جداگانه اجرا کنید (کد را در فایل service.q قرار دهید) و تابع init را فراخوانی کنید:
q service.q –p 5566
q)init[]
در یک کنسول دیگر، یک فرآیند Q دوم را شروع کنید و به اولی متصل شوید:
h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте
ابتدا، بیایید لیستی از کاراکترها - ۱۰،۰۰۰ کاراکتر - ایجاد کنیم و یک تابع برای تولید یک جدول تصادفی اضافه کنیم. در کنسول دوم:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
من سه نماد واقعی به لیست اضافه کردم تا جستجوی آنها در جدول آسانتر شود. تابع rnd یک جدول تصادفی با n ردیف ایجاد میکند که در آن زمانها از t تا t+25 میلیثانیه متغیر هستند.
اکنون میتوانید ارسال داده به سرویس را امتحان کنید (بیایید ده ساعت اول را اضافه کنیم):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
میتوانید در سرویس بررسی کنید که جدول بهروزرسانی شده است:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
یافته ها:
sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919حالا بیایید یک تست بار اجرا کنیم تا مشخص شود سرویس در هر دقیقه چه مقدار داده میتواند پردازش کند. به عنوان یادآوری، فاصله بهروزرسانی را روی ۲۵ میلیثانیه تنظیم میکنیم. بنابراین، سرویس (به طور متوسط) باید حداقل ظرف ۲۰ میلیثانیه بهروزرسانی شود تا به کاربران زمان برای درخواست داده بدهد. در فرآیند دوم موارد زیر را وارد کنید:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
۴۸۰۰ معادل دو دقیقه است. میتوانید ابتدا آن را برای ۱۰۰۰ سطر در هر ۲۵ میلیثانیه اجرا کنید:
start 1000
در مورد من، نتیجه حدود چند میلیثانیه برای هر بهروزرسانی است. بنابراین بلافاصله تعداد ردیفها را به ۱۰۰۰۰ افزایش میدهم:
start 10000
یافته ها:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
باز هم، چیز خاصی نیست، اما این ۲۴ میلیون ردیف در دقیقه، ۴۰۰۰۰۰ در ثانیه است. بهروزرسانی فقط پنج بار بیش از ۲۵ میلیثانیه کند شد، ظاهراً به دلیل تغییر دقیقه. بیایید آن را به ۱۰۰۰۰۰ افزایش دهیم:
start 100000
یافته ها:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
همانطور که میبینیم، این سرویس به سختی از پس مشکلات برمیآید، اما همچنان میتواند سرپا بماند. این حجم از دادهها (۲۴۰ میلیون ردیف در دقیقه) بسیار زیاد است؛ در چنین مواردی، راهاندازی چندین کلون (یا حتی دهها کلون) از سرویس که هر کدام فقط زیرمجموعهای از کاراکترها را پردازش میکنند، رایج است. با این وجود، نتیجه برای یک زبان تفسیری که در درجه اول بر ذخیرهسازی دادهها متمرکز است، چشمگیر است.
ممکن است این سوال پیش بیاید که چرا زمان با افزایش اندازه هر بهروزرسانی به صورت غیرخطی افزایش مییابد. دلیل آن این است که تابع فشردهسازی اساساً یک تابع C است که بسیار کارآمدتر از updateAgg است. با شروع از یک اندازه بهروزرسانی خاص (حدود 10.000)، updateAgg به سقف خود میرسد و پس از آن، زمان اجرای آن مستقل از اندازه بهروزرسانی است. دقیقاً به دلیل پیشمرحله Q است که سرویس قادر به هضم چنین حجمی از دادهها است. این موضوع اهمیت انتخاب الگوریتم مناسب هنگام کار با کلانداده را برجسته میکند. نکته دیگر، ذخیرهسازی مناسب دادهها در حافظه است. اگر دادهها به صورت ستونی یا بر اساس ترتیب زمانی ذخیره نشده باشند، با چیزی به نام خطای حافظه نهان TLB مواجه میشویم - عدم موفقیت در یافتن آدرس صفحه حافظه در حافظه نهان آدرس پردازنده. جستجوی آدرس در صورت عدم موفقیت تقریباً 30 برابر بیشتر طول میکشد و در مورد دادههای پراکنده، این میتواند سرویس را چندین بار کند کند.
نتیجه
در این مقاله، نشان دادم که KDB+ و Q نه تنها برای ذخیره مجموعه دادههای بزرگ و دسترسی آسان به آنها از طریق دستورات select مناسب هستند، بلکه برای ایجاد سرویسهای پردازش داده که قادر به پردازش صدها میلیون ردیف/گیگابایت داده حتی در یک فرآیند Q واحد هستند نیز مناسب هستند. خود زبان Q به دلیل ماهیت برداری، مفسر SQL داخلی و مجموعهای بسیار موفق از توابع کتابخانهای، امکان پیادهسازی فوقالعاده مختصر و کارآمد الگوریتمهای پردازش داده را فراهم میکند.
میخواهم اشاره کنم که موارد فوق فقط نمونهای از قابلیتهای Q است؛ این سیستم ویژگیهای منحصر به فرد دیگری نیز دارد. به عنوان مثال، یک پروتکل IPC بسیار ساده که مرزهای بین فرآیندهای Q را از بین میبرد و به صدها فرآیند اجازه میدهد تا به یک شبکه واحد متصل شوند، که میتواند دهها سرور را در سراسر جهان پوشش دهد.
منبع: www.habr.com
