ویژگی های زبان Q و KDB+ با استفاده از مثال یک سرویس بلادرنگ

می‌توانید در پست قبلی من در مورد اینکه پایگاه داده KDB+ و زبان برنامه‌نویسی Q چیستند و نقاط قوت و ضعف آنها چیست، مطالعه کنید. مقاله و به طور خلاصه در مقدمه. در این مقاله، ما سرویسی را در Q پیاده‌سازی خواهیم کرد که یک جریان داده ورودی را پردازش کرده و توابع مختلف تجمیع را دقیقه به دقیقه در "زمان واقعی" محاسبه می‌کند (یعنی، زمان کافی برای محاسبه همه چیز قبل از دسته داده بعدی را خواهد داشت). ویژگی اصلی Q این است که یک زبان برداری است و به ما امکان می‌دهد نه روی اشیاء منفرد، بلکه روی آرایه‌هایی از آنها، آرایه‌هایی از آرایه‌ها و سایر اشیاء پیچیده عمل کنیم. زبان‌هایی مانند Q و زبان‌های مرتبط با آن، K، J و APL، به دلیل اختصارشان مشهور هستند. اغلب، برنامه‌ای که چندین صفحه کد را در یک زبان آشنا مانند جاوا اشغال می‌کند، می‌تواند در چند خط نوشته شود. این دقیقاً همان چیزی است که می‌خواهم در این مقاله نشان دهم.

ویژگی های زبان Q و KDB+ با استفاده از مثال یک سرویس بلادرنگ

معرفی

KDB+ یک پایگاه داده ستونی است که برای حجم بسیار زیادی از داده‌ها که به شیوه‌ای خاص (عمدتاً بر اساس زمان) سازماندهی شده‌اند، طراحی شده است. این پایگاه داده عمدتاً در موسسات مالی مانند بانک‌ها، صندوق‌های سرمایه‌گذاری و شرکت‌های بیمه استفاده می‌شود. Q زبان داخلی KDB+ است که امکان کار کارآمد با این داده‌ها را فراهم می‌کند. فلسفه Q اختصار و کارایی است و وضوح را فدا می‌کند. دلیل این امر این است که یک زبان مبتنی بر بردار درک آن دشوار خواهد بود، در حالی که اختصار و غنای آن امکان نمایش بخش بسیار بیشتری از برنامه را در یک صفحه واحد فراهم می‌کند و در نهایت درک آن را آسان‌تر می‌کند.

در این مقاله، ما یک برنامه کامل را در Q پیاده‌سازی خواهیم کرد و شاید بخواهید آن را امتحان کنید. برای انجام این کار، به خود Q نیاز دارید. می‌توانید نسخه ۳۲ بیتی رایگان را از وب‌سایت kx دانلود کنید – www.kx.comدر آنجا، اگر علاقه‌مند باشید، اطلاعات مرجعی در مورد کتاب Q نیز خواهید یافت. Q برای فانی‌ها و مقالات مختلف در این زمینه.

بیانیه مشکل

منبعی وجود دارد که هر ۲۵ میلی‌ثانیه یک جدول داده ارسال می‌کند. از آنجایی که KDB+ عمدتاً در امور مالی استفاده می‌شود، فرض می‌کنیم که یک جدول معاملات با ستون‌های زیر است: time (زمان بر حسب میلی‌ثانیه)، sym (نماد شرکت در بورس اوراق بهادار -) آی بی ام, AAPL،...)، قیمت (قیمتی که سهام با آن خریداری شده است) و اندازه (اندازه تراکنش). فاصله زمانی ۲۵ میلی‌ثانیه به صورت دلخواه انتخاب شده است؛ نه خیلی کوچک است و نه خیلی بزرگ. وجود آن به این معنی است که داده‌هایی که به سرویس می‌رسند از قبل بافر شده‌اند. پیاده‌سازی بافرینگ در سمت سرویس، از جمله بافرینگ پویا بر اساس بار فعلی، آسان خواهد بود، اما برای سادگی، ما به یک فاصله زمانی ثابت پایبند خواهیم بود.

این سرویس باید مجموعه‌ای از توابع تجمیع - حداکثر قیمت، میانگین قیمت، اندازه مجموع و سایر اطلاعات مفید - را در هر دقیقه برای هر نماد ورودی از ستون sym محاسبه کند. برای سادگی، فرض می‌کنیم که همه توابع را می‌توان به صورت افزایشی محاسبه کرد، به این معنی که برای به دست آوردن یک مقدار جدید، دو عدد - مقدار قدیمی و مقدار ورودی - کافی است. به عنوان مثال، توابع max، average و sum این ویژگی را دارند، اما تابع median این ویژگی را ندارد.

همچنین فرض می‌کنیم که جریان داده‌های ورودی بر اساس زمان مرتب شده‌اند. این به ما اجازه می‌دهد فقط با آخرین دقیقه کار کنیم. در عمل، کافی است که بتوانیم با دقیقه‌های فعلی و قبلی کار کنیم تا در صورت تأخیر در به‌روزرسانی‌ها، بتوانیم از آنها استفاده کنیم. برای سادگی، این مورد را در نظر نمی‌گیریم.

توابع تجمیع

در زیر توابع تجمیع مورد نیاز آمده است. من تا حد امکان برای افزایش بار سرویس، تعداد بیشتری را در نظر گرفتم:

  • بالاترین – حداکثر قیمت – حداکثر قیمت در هر دقیقه.
  • پایین - حداقل قیمت - حداقل قیمت در هر دقیقه.
  • قیمت اول – قیمت اول – قیمت اول در هر دقیقه.
  • آخرین قیمت – آخرین قیمت – آخرین قیمت در هر دقیقه.
  • firstSize – اندازه اول – اندازه اولین تراکنش در هر دقیقه.
  • lastSize – آخرین اندازه — آخرین اندازه تراکنش در هر دقیقه.
  • numTrades – تعداد معاملات در دقیقه – تعداد معاملات در دقیقه.
  • حجم - مجموع اندازه - مجموع اندازه تراکنش‌ها در هر دقیقه.
  • pvolume – sum price – مجموع قیمت‌ها در هر دقیقه، مورد نیاز برای avgPrice.
  • گردش مالی - مجموع قیمت * اندازه - حجم کل تراکنش‌ها در هر دقیقه.
  • میانگین قیمت - pvolume%numTrades - میانگین قیمت در هر دقیقه.
  • avgSize – volume%numTrades – میانگین حجم معامله در هر دقیقه.
  • vwap – حجم معاملات – میانگین قیمت در هر دقیقه، وزن‌دهی شده با اندازه معامله.
  • cumVolume – مجموع حجم – حجم انباشته تراکنش‌ها در کل دوره.

بیایید همین الان یک نکته‌ی نه چندان واضح را بررسی کنیم: نحوه‌ی مقداردهی اولیه‌ی این ستون‌ها برای اولین بار و برای هر دقیقه‌ی بعدی. برخی از ستون‌ها، مانند firstPrice، باید هر بار با مقدار null مقداردهی اولیه شوند؛ مقدار آنها تعریف نشده است. برخی دیگر، مانند volume، باید همیشه روی 0 تنظیم شوند. همچنین ستون‌هایی وجود دارند که نیاز به یک رویکرد ترکیبی دارند - برای مثال، cumVolume باید از دقیقه‌ی قبل کپی شود و برای دقیقه‌ی اول روی 0 تنظیم شود. ما تمام این پارامترها را با استفاده از نوع داده‌ی دیکشنری (شبیه به یک رکورد) تعریف خواهیم کرد:

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

برای راحتی کار، sym و time را به دیکشنری اضافه کردم. حالا initWith یک ردیف آماده از جدول تجمیع‌شده‌ی نهایی است که sym و time صحیح آن هنوز مشخص نشده‌اند. می‌توانید از آن برای اضافه کردن ردیف‌های جدید به جدول استفاده کنید.

هنگام ایجاد تابع تجمیعی به aggCols نیاز خواهیم داشت. به دلیل ترتیب ارزیابی عبارت در Q (از راست به چپ)، لیست باید معکوس شود. هدف این است که اطمینان حاصل شود که ارزیابی از high به cumVolume ادامه می‌یابد، زیرا برخی ستون‌ها به ستون‌های قبلی وابسته هستند.

ستون‌هایی که باید از دقیقه قبلی به دقیقه جدید کپی شوند، ستون sym برای راحتی اضافه می‌شود:

rollColumns:`sym`cumVolume;

حالا بیایید ستون‌ها را بر اساس نحوه‌ی به‌روزرسانی‌شان به گروه‌هایی تقسیم کنیم. سه نوع ستون قابل تشخیص است:

  1. انباره‌ها (حجم، گردش مالی، ..) - ما باید مقدار ورودی را به مقدار قبلی اضافه کنیم.
  2. با یک نقطه خاص (بالا، پایین، ..) - اولین مقدار در دقیقه از داده‌های ورودی گرفته می‌شود، بقیه با استفاده از تابع محاسبه می‌شوند.
  3. بقیه همیشه با استفاده از تابع محاسبه می‌شوند.

بیایید برای این کلاس‌ها متغیر تعریف کنیم:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

ترتیب محاسبه

ما جدول تجمیعی را در دو مرحله به‌روزرسانی خواهیم کرد. برای کارایی بیشتر، ابتدا جدول ورودی را کوچک می‌کنیم تا برای هر نماد و دقیقه یک ردیف داشته باشد. این واقعیت که تمام توابع ما افزایشی و انجمنی هستند، تضمین می‌کند که نتیجه با این مرحله اضافی تغییر نخواهد کرد. می‌توانیم جدول را با استفاده از یک دستور select کوچک کنیم:

select high:max price, low:min price … by sym,time.minute from table

این روش یک اشکال دارد: مجموعه ستون‌های محاسبه‌شده از پیش تعریف‌شده است. خوشبختانه، Q همچنین select را به عنوان تابعی پیاده‌سازی می‌کند که آرگومان‌های تولیدشده به صورت پویا را می‌پذیرد:

?[table;whereClause;byClause;selectClause]

من قالب آرگومان را با جزئیات شرح نمی‌دهم؛ در مورد ما، تنها عبارات غیربدیهی، عبارات by و select هستند و باید دیکشنری‌هایی از فرم columns!expressions باشند. بنابراین، تابع فشرده‌سازی را می‌توان به صورت زیر تعریف کرد:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

برای وضوح بیشتر، من از تابع parse استفاده کردم که رشته‌ای حاوی عبارت Q را به مقداری تبدیل می‌کند که می‌تواند به تابع eval ارسال شود و در تابع select مورد نیاز است. همچنین توجه داشته باشید که preprocess به عنوان یک projection (یعنی تابعی با آرگومان‌های نیمه تعریف شده) از تابع select تعریف می‌شود؛ یک آرگومان (جدول) وجود ندارد. اگر preprocess را روی یک جدول اعمال کنیم، یک جدول فشرده دریافت می‌کنیم.

مرحله دوم، به‌روزرسانی جدول تجمیع‌شده است. ابتدا الگوریتم را در شبه‌کد می‌نویسیم:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

در Q، استفاده از توابع نگاشت/کاهش به جای حلقه‌ها رایج است. اما از آنجایی که Q یک زبان برداری است و می‌توانیم با خیال راحت همه عملیات را روی همه نمادها به طور همزمان اعمال کنیم، می‌توانیم، به عنوان یک تقریب اول، با انجام عملیات روی همه نمادها به طور همزمان، حلقه را به طور کامل حذف کنیم:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

اما می‌توانیم حتی فراتر برویم. Q یک عملگر منحصر به فرد و فوق‌العاده قدرتمند دارد - عملگر انتساب تعمیم‌یافته. این عملگر به شما امکان می‌دهد مجموعه‌ای از مقادیر را در یک ساختار داده پیچیده با استفاده از لیستی از شاخص‌ها، توابع و آرگومان‌ها تغییر دهید. در مورد ما، این به این شکل است:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

متأسفانه، انتساب به یک جدول نیاز به لیستی از ردیف‌ها دارد، نه ستون‌ها، و نیاز به جابجایی ماتریس (لیست ستون‌ها به لیستی از ردیف‌ها) با استفاده از تابع flip دارد. برای یک جدول بزرگ، این کار پرهزینه است، بنابراین در عوض، ما یک انتساب عمومی را به صورت جداگانه با استفاده از تابع map (که شبیه آپاستروف است) به هر ستون اعمال می‌کنیم:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

ما دوباره از تابع projection استفاده می‌کنیم. همچنین توجه داشته باشید که در Q، ایجاد لیست نیز یک تابع است و می‌توانیم آن را با استفاده از each(map) فراخوانی کنیم تا لیستی از لیست‌ها را دریافت کنیم.

برای جلوگیری از ایجاد مجموعه‌ای ثابت از ستون‌های محاسبه‌شده، بیایید عبارت فوق را به صورت پویا ایجاد کنیم. ابتدا، توابعی را برای محاسبه هر ستون تعریف می‌کنیم که از متغیرهای row و inp برای ارجاع به داده‌های تجمیع‌شده و ورودی استفاده می‌کنند:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

بعضی از ستون‌ها خاص هستند؛ مقدار اول آنها نباید توسط تابع محاسبه شود. می‌توانیم با استفاده از ستون row[`numTrades] تعیین کنیم که اولین مقدار است - اگر 0 باشد، مقدار اول است. Q یک تابع select دارد -?[Boolean list;list1;list2] - که بسته به شرط موجود در آرگومان اول، مقداری را از لیست 1 یا 2 انتخاب می‌کند:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

در اینجا من یک انتساب عمومی را با تابع خودم (عبارت داخل آکولاد) فراخوانی کرده‌ام. مقدار فعلی (اولین آرگومان) و یک آرگومان اضافی که در پارامتر چهارم ارسال می‌کنم، به آن ارسال شده است.

بیایید بلندگوهای باتری‌دار را جداگانه اضافه کنیم، زیرا آنها عملکرد مشابهی دارند:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

این یک انتساب معمولی طبق استانداردهای Q است، اما من لیستی از مقادیر را به طور همزمان انتساب می‌دهم. در نهایت، بیایید تابع main را ایجاد کنیم:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

با این عبارت، من به صورت پویا یک تابع از رشته‌ای که شامل عبارتی است که در بالا ارائه دادم، ایجاد می‌کنم. نتیجه به این شکل خواهد بود:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

ترتیب ارزیابی ستون‌ها معکوس است، زیرا در Q ترتیب ارزیابی از راست به چپ است.

حالا ما دو تابع اصلی مورد نیاز برای محاسبات داریم، تنها چیزی که باقی مانده اضافه کردن کمی زیرساخت است و سرویس آماده است.

مراحل پایانی

ما توابع پیش‌پردازش و به‌روزرسانیAgg را داریم که تمام کارها را انجام می‌دهند. اما هنوز باید از انتقال صحیح بین دقایق اطمینان حاصل کنیم و شاخص‌ها را برای تجمیع محاسبه کنیم. ابتدا، بیایید تابع init را تعریف کنیم:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

همچنین یک تابع roll تعریف خواهیم کرد که دقیقه فعلی را تغییر می‌دهد:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

برای اضافه کردن کاراکترهای جدید به یک تابع نیاز داریم:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

و در نهایت، تابع upd (نام سنتی این تابع برای سرویس‌های Q)، که توسط کلاینت برای اضافه کردن داده‌ها فراخوانی می‌شود:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

همین. همانطور که قول داده بودیم، کد کامل سرویس ما در اینجا آمده است، فقط چند خط:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

آزمایش

بیایید عملکرد سرویس را آزمایش کنیم. برای انجام این کار، آن را در یک فرآیند جداگانه اجرا کنید (کد را در فایل service.q قرار دهید) و تابع init را فراخوانی کنید:

q service.q –p 5566

q)init[]

در یک کنسول دیگر، یک فرآیند Q دوم را شروع کنید و به اولی متصل شوید:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

ابتدا، بیایید لیستی از کاراکترها - ۱۰،۰۰۰ کاراکتر - ایجاد کنیم و یک تابع برای تولید یک جدول تصادفی اضافه کنیم. در کنسول دوم:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

من سه نماد واقعی به لیست اضافه کردم تا جستجوی آنها در جدول آسان‌تر شود. تابع rnd یک جدول تصادفی با n ردیف ایجاد می‌کند که در آن زمان‌ها از t تا t+25 میلی‌ثانیه متغیر هستند.

اکنون می‌توانید ارسال داده به سرویس را امتحان کنید (بیایید ده ساعت اول را اضافه کنیم):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

می‌توانید در سرویس بررسی کنید که جدول به‌روزرسانی شده است:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

یافته ها:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

حالا بیایید یک تست بار اجرا کنیم تا مشخص شود سرویس در هر دقیقه چه مقدار داده می‌تواند پردازش کند. به عنوان یادآوری، فاصله به‌روزرسانی را روی ۲۵ میلی‌ثانیه تنظیم می‌کنیم. بنابراین، سرویس (به طور متوسط) باید حداقل ظرف ۲۰ میلی‌ثانیه به‌روزرسانی شود تا به کاربران زمان برای درخواست داده بدهد. در فرآیند دوم موارد زیر را وارد کنید:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

۴۸۰۰ معادل دو دقیقه است. می‌توانید ابتدا آن را برای ۱۰۰۰ سطر در هر ۲۵ میلی‌ثانیه اجرا کنید:

start 1000

در مورد من، نتیجه حدود چند میلی‌ثانیه برای هر به‌روزرسانی است. بنابراین بلافاصله تعداد ردیف‌ها را به ۱۰۰۰۰ افزایش می‌دهم:

start 10000

یافته ها:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

باز هم، چیز خاصی نیست، اما این ۲۴ میلیون ردیف در دقیقه، ۴۰۰۰۰۰ در ثانیه است. به‌روزرسانی فقط پنج بار بیش از ۲۵ میلی‌ثانیه کند شد، ظاهراً به دلیل تغییر دقیقه. بیایید آن را به ۱۰۰۰۰۰ افزایش دهیم:

start 100000

یافته ها:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

همانطور که می‌بینیم، این سرویس به سختی از پس مشکلات برمی‌آید، اما همچنان می‌تواند سرپا بماند. این حجم از داده‌ها (۲۴۰ میلیون ردیف در دقیقه) بسیار زیاد است؛ در چنین مواردی، راه‌اندازی چندین کلون (یا حتی ده‌ها کلون) از سرویس که هر کدام فقط زیرمجموعه‌ای از کاراکترها را پردازش می‌کنند، رایج است. با این وجود، نتیجه برای یک زبان تفسیری که در درجه اول بر ذخیره‌سازی داده‌ها متمرکز است، چشمگیر است.

ممکن است این سوال پیش بیاید که چرا زمان با افزایش اندازه هر به‌روزرسانی به صورت غیرخطی افزایش می‌یابد. دلیل آن این است که تابع فشرده‌سازی اساساً یک تابع C است که بسیار کارآمدتر از updateAgg است. با شروع از یک اندازه به‌روزرسانی خاص (حدود 10.000)، updateAgg به سقف خود می‌رسد و پس از آن، زمان اجرای آن مستقل از اندازه به‌روزرسانی است. دقیقاً به دلیل پیش‌مرحله Q است که سرویس قادر به هضم چنین حجمی از داده‌ها است. این موضوع اهمیت انتخاب الگوریتم مناسب هنگام کار با کلان‌داده را برجسته می‌کند. نکته دیگر، ذخیره‌سازی مناسب داده‌ها در حافظه است. اگر داده‌ها به صورت ستونی یا بر اساس ترتیب زمانی ذخیره نشده باشند، با چیزی به نام خطای حافظه نهان TLB مواجه می‌شویم - عدم موفقیت در یافتن آدرس صفحه حافظه در حافظه نهان آدرس پردازنده. جستجوی آدرس در صورت عدم موفقیت تقریباً 30 برابر بیشتر طول می‌کشد و در مورد داده‌های پراکنده، این می‌تواند سرویس را چندین بار کند کند.

نتیجه

در این مقاله، نشان دادم که KDB+ و Q نه تنها برای ذخیره مجموعه داده‌های بزرگ و دسترسی آسان به آنها از طریق دستورات select مناسب هستند، بلکه برای ایجاد سرویس‌های پردازش داده که قادر به پردازش صدها میلیون ردیف/گیگابایت داده حتی در یک فرآیند Q واحد هستند نیز مناسب هستند. خود زبان Q به دلیل ماهیت برداری، مفسر SQL داخلی و مجموعه‌ای بسیار موفق از توابع کتابخانه‌ای، امکان پیاده‌سازی فوق‌العاده مختصر و کارآمد الگوریتم‌های پردازش داده را فراهم می‌کند.

می‌خواهم اشاره کنم که موارد فوق فقط نمونه‌ای از قابلیت‌های Q است؛ این سیستم ویژگی‌های منحصر به فرد دیگری نیز دارد. به عنوان مثال، یک پروتکل IPC بسیار ساده که مرزهای بین فرآیندهای Q را از بین می‌برد و به صدها فرآیند اجازه می‌دهد تا به یک شبکه واحد متصل شوند، که می‌تواند ده‌ها سرور را در سراسر جهان پوشش دهد.

منبع: www.habr.com

خرید هاست قابل اعتماد برای سایت های دارای حفاظت DDoS، سرورهای VPS VDS 🔥 خرید هاستینگ معتبر با محافظت در برابر حملات DDoS، سرورهای VPS و VDS | ProHoster