ویژگی های زبان Q و KDB+ با استفاده از مثال یک سرویس بلادرنگ

می توانید در مورد پایه KDB+، زبان برنامه نویسی Q، نقاط قوت و ضعف آنها در مقاله قبلی من بخوانید. مقاله و به اختصار در مقدمه. در این مقاله، ما سرویسی را روی Q پیاده‌سازی می‌کنیم که جریان داده‌های ورودی را پردازش می‌کند و توابع مختلف تجمع را در هر دقیقه در حالت «زمان واقعی» محاسبه می‌کند (یعنی زمان محاسبه همه چیز را قبل از بخش بعدی داده‌ها خواهد داشت). ویژگی اصلی Q این است که یک زبان برداری است که به شما امکان می دهد نه با اشیاء منفرد، بلکه با آرایه ها، آرایه های آرایه ها و سایر اشیاء پیچیده آنها کار کنید. زبان هایی مانند Q و اقوام آن K, J, APL به دلیل کوتاه بودن مشهور هستند. اغلب، برنامه ای که چندین صفحه کد را به زبانی آشنا مانند جاوا می گیرد، می تواند در چند خط روی آن نوشته شود. این چیزی است که من می خواهم در این مقاله نشان دهم.

ویژگی های زبان Q و KDB+ با استفاده از مثال یک سرویس بلادرنگ

معرفی

KDB+ یک پایگاه داده ستونی است که بر روی مقادیر بسیار زیادی داده متمرکز شده است که به روشی خاص (عمدتاً بر اساس زمان) مرتب شده اند. این در درجه اول در موسسات مالی - بانک ها، صندوق های سرمایه گذاری، شرکت های بیمه استفاده می شود. زبان Q زبان داخلی KDB+ است که به شما امکان می دهد به طور موثر با این داده ها کار کنید. ایدئولوژی Q مختصر و کارآمدی است، در حالی که وضوح قربانی می شود. این با این واقعیت توجیه می شود که درک زبان برداری در هر صورت دشوار خواهد بود و مختصر بودن و غنای ضبط به شما امکان می دهد بخش بسیار بیشتری از برنامه را در یک صفحه مشاهده کنید که در نهایت درک آن را آسان تر می کند.

در این مقاله ما یک برنامه کامل را در Q پیاده سازی می کنیم و ممکن است بخواهید آن را امتحان کنید. برای انجام این کار، به Q واقعی نیاز دارید. می توانید نسخه 32 بیتی رایگان را در وب سایت شرکت kx دانلود کنید - www.kx.com. در آنجا، اگر علاقه مند هستید، اطلاعات مرجع کتاب Q را خواهید یافت Q برای فانی ها و مقالات مختلف در این زمینه

بیانیه مشکل

منبعی وجود دارد که هر 25 میلی ثانیه یک جدول با داده ارسال می کند. از آنجایی که KDB+ عمدتاً در امور مالی استفاده می شود، فرض می کنیم که این جدولی از معاملات (معاملات) است که دارای ستون های زیر است: زمان (زمان بر حسب میلی ثانیه)، sym (تعیین شرکت در بورس اوراق بهادار - آی بی ام, AAPL،...)، قیمت (قیمتی که سهام خریداری شده است)، اندازه (اندازه معامله). فاصله 25 میلی ثانیه دلخواه است، نه خیلی کوچک و نه خیلی طولانی. وجود آن به این معنی است که داده ها از قبل بافر به سرویس می آیند. اجرای بافر در سمت سرویس، از جمله بافر پویا بسته به بار فعلی، آسان خواهد بود، اما برای سادگی، ما بر روی یک بازه ثابت تمرکز خواهیم کرد.

سرویس باید در هر دقیقه برای هر نماد ورودی از ستون sym مجموعه ای از توابع جمع آوری - حداکثر قیمت، میانگین قیمت، اندازه مجموع و غیره را شمارش کند. اطلاعات مفید. برای سادگی، فرض می کنیم که همه توابع را می توان به صورت افزایشی محاسبه کرد، به عنوان مثال. برای به دست آوردن یک مقدار جدید، کافی است دو عدد را بدانید - مقادیر قدیمی و ورودی. به عنوان مثال، توابع max، average، sum دارای این ویژگی هستند، اما تابع میانه این ویژگی را ندارد.

همچنین فرض می‌کنیم که جریان داده‌های دریافتی به ترتیب زمان است. این به ما این فرصت را می دهد که فقط با آخرین لحظه کار کنیم. در عمل کافی است در صورت تاخیر در بروز رسانی، بتوان با دقایق جاری و قبلی کار کرد. برای سادگی، ما این مورد را بررسی نمی کنیم.

توابع تجمع

توابع تجمع مورد نیاز در زیر فهرست شده است. من تا حد امکان از آنها استفاده کردم تا بار سرویس را افزایش دهم:

  • قیمت بالا – حداکثر – حداکثر قیمت در دقیقه.
  • قیمت پایین - حداقل قیمت - حداقل قیمت در دقیقه.
  • firstPrice – قیمت اول – اولین قیمت در دقیقه.
  • lastPrice – آخرین قیمت – آخرین قیمت در دقیقه.
  • firstSize – اندازه اول – اندازه تجارت اول در دقیقه.
  • lastSize – آخرین اندازه – آخرین اندازه معامله در یک دقیقه.
  • numTrades – شمارش i – تعداد معاملات در دقیقه.
  • حجم – مجموع اندازه – مجموع اندازه های تجارت در دقیقه.
  • pvolume – مجموع قیمت – مجموع قیمت‌ها در دقیقه، مورد نیاز برای avgPrice.
  • - مجموع قیمت گردش مالی * اندازه - حجم کل معاملات در دقیقه.
  • avgPrice – pvolume%numTrades – میانگین قیمت در دقیقه.
  • avgSize – volume%numTrades – میانگین اندازه تجارت در دقیقه.
  • vwap – گردش % حجم – میانگین قیمت در دقیقه با وزن تراکنش.
  • cumVolume - حجم جمع - اندازه انباشته تراکنش ها در کل زمان.

بیایید فوراً یک نکته غیر واضح را مورد بحث قرار دهیم - چگونگی مقداردهی اولیه این ستون ها برای اولین بار و برای هر دقیقه بعدی. برخی از ستون‌های نوع firstPrice باید هر بار مقداردهی اولیه شوند و مقدار آن‌ها تعریف نشده است. سایر انواع حجم باید همیشه روی 0 تنظیم شوند. همچنین ستون هایی وجود دارند که به رویکرد ترکیبی نیاز دارند - برای مثال، cumVolume باید از دقیقه قبل کپی شود، و برای اولین بار روی 0 تنظیم شود. بیایید همه این پارامترها را با استفاده از داده های فرهنگ لغت تنظیم کنیم. نوع (مشابه با رکورد):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

برای راحتی کار، sym و time را به فرهنگ لغت اضافه کردم، اکنون initWith یک خط آماده از جدول نهایی جمع آوری شده است، که در آن باقی مانده است که سیم و زمان صحیح را تنظیم کنید. می توانید از آن برای اضافه کردن ردیف های جدید به جدول استفاده کنید.

هنگام ایجاد یک تابع تجمع به aggCols نیاز خواهیم داشت. به دلیل ترتیبی که عبارات در Q (از راست به چپ) ارزیابی می شوند، فهرست باید معکوس شود. هدف این است که اطمینان حاصل شود که محاسبه از بالا به cumVolume می‌رود، زیرا برخی از ستون‌ها به ستون‌های قبلی بستگی دارند.

ستون‌هایی که باید در دقیقه جدید کپی شوند، ستون sym برای راحتی اضافه می‌شود:

rollColumns:`sym`cumVolume;

حالا بیایید ستون ها را بر اساس نحوه به روز رسانی به گروه ها تقسیم کنیم. سه نوع قابل تشخیص است:

  1. انباشته ها (حجم، گردش مالی،...) – باید مقدار ورودی را به مقدار قبلی اضافه کنیم.
  2. با یک نقطه خاص (بالا، کم، ..) - اولین مقدار در دقیقه از داده های ورودی گرفته می شود، بقیه با استفاده از تابع محاسبه می شوند.
  3. باقی مانده. همیشه با استفاده از یک تابع محاسبه می شود.

بیایید متغیرهایی را برای این کلاس ها تعریف کنیم:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

ترتیب محاسبه

جدول تجمیع شده را در دو مرحله به روز می کنیم. برای کارایی، ابتدا جدول ورودی را کوچک می کنیم تا برای هر کاراکتر و دقیقه فقط یک ردیف وجود داشته باشد. این واقعیت که همه عملکردهای ما افزایشی و تداعی کننده هستند تضمین می کند که نتیجه این مرحله اضافی تغییر نخواهد کرد. می توانید جدول را با استفاده از انتخاب کوچک کنید:

select high:max price, low:min price … by sym,time.minute from table

این روش یک نقطه ضعف دارد - مجموعه ستون های محاسبه شده از پیش تعریف شده است. خوشبختانه در Q، select نیز به عنوان تابعی پیاده سازی می شود که در آن می توانید آرگومان های ایجاد شده به صورت پویا را جایگزین کنید:

?[table;whereClause;byClause;selectClause]

من فرمت آرگومان‌ها را با جزئیات توضیح نمی‌دهم؛ در مورد ما، فقط عبارات by و select غیر اساسی خواهند بود و باید فرهنگ‌های عبارات ستون‌های فرم باشند. بنابراین، تابع کوچک شدن را می توان به صورت زیر تعریف کرد:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

برای وضوح، من از تابع تجزیه استفاده کردم، که رشته ای با عبارت Q را به مقداری تبدیل می کند که می تواند به تابع eval ارسال شود و در انتخاب تابع لازم است. همچنین توجه داشته باشید که پیش پردازش به عنوان یک طرح (به عنوان مثال، یک تابع با آرگومان های تا حدی تعریف شده) از تابع select تعریف می شود، یک آرگومان (جدول) وجود ندارد. اگر پیش پردازش را روی یک جدول اعمال کنیم، یک جدول فشرده به دست خواهیم آورد.

مرحله دوم به روز رسانی جدول تجمیع است. بیایید ابتدا الگوریتم را در شبه کد بنویسیم:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

در Q، استفاده از توابع map/reduce به جای حلقه ها رایج است. اما از آنجایی که Q یک زبان برداری است و ما می‌توانیم به راحتی همه عملیات را به یکباره روی همه نمادها اعمال کنیم، پس برای اولین تقریب می‌توانیم بدون حلقه انجام دهیم و عملیات را روی همه نمادها به طور همزمان انجام دهیم:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

اما می توانیم جلوتر برویم، Q یک عملگر منحصر به فرد و بسیار قدرتمند دارد - عملگر انتساب تعمیم یافته. این به شما امکان می دهد مجموعه ای از مقادیر را در یک ساختار داده پیچیده با استفاده از لیستی از شاخص ها، توابع و آرگومان ها تغییر دهید. در مورد ما به این صورت است:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

متأسفانه، برای اختصاص دادن به یک جدول، به لیستی از ردیف‌ها نیاز دارید، نه ستون‌ها، و باید ماتریس (فهرست ستون‌ها به فهرست ردیف‌ها) را با استفاده از تابع flip جابه‌جا کنید. این برای یک جدول بزرگ گران است، بنابراین به جای آن یک انتساب تعمیم یافته را به هر ستون به طور جداگانه با استفاده از تابع نقشه (که شبیه آپاستروف است) اعمال می کنیم:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

ما دوباره از طرح ریزی تابع استفاده می کنیم. همچنین توجه داشته باشید که در Q، ایجاد لیست نیز یک تابع است و می‌توانیم آن را با استفاده از تابع every(map) فراخوانی کنیم تا لیستی از لیست‌ها به دست آید.

برای اطمینان از ثابت نبودن مجموعه ستون های محاسبه شده، عبارت فوق را به صورت پویا ایجاد می کنیم. بیایید ابتدا توابعی را برای محاسبه هر ستون با استفاده از متغیرهای ردیف و inp برای ارجاع به داده‌های جمع‌آوری شده و ورودی تعریف کنیم:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

برخی از ستون ها خاص هستند، اولین مقدار آنها نباید توسط تابع محاسبه شود. با ستون [`numTrades] می‌توانیم تعیین کنیم که اولین است - اگر حاوی 0 باشد، مقدار اول است. Q یک تابع انتخاب دارد - ?[Boolean list;list1;list2] - که بسته به شرایط آرگومان اول، مقداری را از لیست 1 یا 2 انتخاب می کند:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

در اینجا من یک تخصیص تعمیم یافته را با تابع خود فراخوانی کردم (یک عبارت در پرانتزهای فرفری). مقدار فعلی (آگومان اول) و یک آرگومان اضافی را دریافت می کند که من آن را در پارامتر چهارم ارسال می کنم.

بیایید بلندگوهای باتری را جداگانه اضافه کنیم، زیرا عملکرد برای آنها یکسان است:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

این یک تخصیص معمولی با استانداردهای Q است، اما من فهرستی از مقادیر را به یکباره اختصاص می دهم. در نهایت، اجازه دهید تابع اصلی را ایجاد کنیم:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

با این عبارت، من به صورت پویا یک تابع از یک رشته ایجاد می کنم که حاوی عبارتی است که در بالا دادم. نتیجه به شکل زیر خواهد بود:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

ترتیب ارزیابی ستون معکوس است زیرا در Q ترتیب ارزیابی از راست به چپ است.

اکنون دو تابع اصلی لازم برای محاسبات داریم، فقط باید کمی زیرساخت اضافه کنیم و سرویس آماده است.

مراحل پایانی

ما توابع preprocess و updateAgg داریم که همه کارها را انجام می دهند. اما هنوز هم لازم است از انتقال صحیح از طریق دقیقه اطمینان حاصل شود و شاخص ها برای تجمیع محاسبه شوند. اول از همه، اجازه دهید تابع init را تعریف کنیم:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

ما همچنین تابع رول را تعریف می کنیم که دقیقه فعلی را تغییر می دهد:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

برای افزودن کاراکترهای جدید به یک تابع نیاز داریم:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

و در نهایت، تابع upd (نام سنتی این تابع برای خدمات Q)، که توسط مشتری برای افزودن داده فراخوانی می شود:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

همین. در اینجا کد کامل خدمات ما، همانطور که قول داده بودیم، فقط چند خط است:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

آزمایش

بیایید عملکرد سرویس را بررسی کنیم. برای انجام این کار، اجازه دهید آن را در یک فرآیند جداگانه اجرا کنیم (کد را در فایل service.q قرار دهید) و تابع init را فراخوانی کنیم:

q service.q –p 5566

q)init[]

در کنسول دیگری، فرآیند Q دوم را شروع کنید و به اولین وصل شوید:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

ابتدا، بیایید یک لیست از نمادها ایجاد کنیم - 10000 قطعه و یک تابع برای ایجاد یک جدول تصادفی اضافه کنیم. در کنسول دوم:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

من سه نماد واقعی را به لیست اضافه کردم تا جستجوی آنها در جدول راحت تر باشد. تابع rnd یک جدول تصادفی با n ردیف ایجاد می کند که زمان آن از t تا t+25 میلی ثانیه متغیر است.

اکنون می توانید داده ها را به سرویس ارسال کنید (ده ساعت اول را اضافه کنید):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

می توانید در سرویس بررسی کنید که جدول به روز شده است:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

یافته ها:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

اکنون بیایید آزمایش بار را انجام دهیم تا بفهمیم سرویس چقدر داده در دقیقه می تواند پردازش کند. یادآوری می کنم که ما فاصله به روز رسانی را روی 25 میلی ثانیه تنظیم کردیم. بر این اساس، این سرویس (به طور متوسط) باید حداقل 20 میلی ثانیه در هر به روز رسانی باشد تا به کاربران زمان برای درخواست داده بدهد. در فرآیند دوم موارد زیر را وارد کنید:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 دو دقیقه است. می توانید ابتدا برای 1000 ردیف هر 25 میلی ثانیه اجرا کنید:

start 1000

در مورد من، نتیجه در حدود چند میلی ثانیه در هر به روز رسانی است. بنابراین من بلافاصله تعداد ردیف ها را به 10.000 افزایش می دهم:

start 10000

یافته ها:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

باز هم چیز خاصی نیست، اما این 24 میلیون خط در دقیقه، 400 هزار در ثانیه است. برای بیش از 25 میلی ثانیه، به‌روزرسانی فقط 5 بار کند شد، ظاهراً هنگام تغییر دقیقه. بیایید به 100.000 افزایش دهیم:

start 100000

یافته ها:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

همانطور که می بینید، این سرویس به سختی می تواند کنار بیاید، اما با این وجود می تواند سرپا بماند. چنین حجمی از داده ها (240 میلیون ردیف در دقیقه) بسیار زیاد است؛ در چنین مواردی، راه اندازی چندین کلون (یا حتی ده ها کلون) از سرویس که هر کدام تنها بخشی از کاراکترها را پردازش می کنند، معمول است. با این حال، نتیجه برای یک زبان تفسیری که در درجه اول بر روی ذخیره سازی داده تمرکز دارد، چشمگیر است.

ممکن است این سوال پیش بیاید که چرا زمان با حجم هر به روز رسانی به صورت غیر خطی رشد می کند. دلیل آن این است که تابع کوچک کردن در واقع یک تابع C است که بسیار کارآمدتر از updateAgg است. با شروع از یک اندازه به روز رسانی خاص (حدود 10.000)، updateAgg به سقف خود می رسد و سپس زمان اجرای آن به اندازه به روز رسانی بستگی ندارد. به دلیل مرحله مقدماتی Q است که سرویس قادر است چنین حجمی از داده ها را هضم کند. این نشان می دهد که انتخاب الگوریتم مناسب هنگام کار با داده های بزرگ چقدر مهم است. نکته دیگر ذخیره سازی صحیح اطلاعات در حافظه است. اگر داده‌ها به صورت ستونی ذخیره نمی‌شدند یا براساس زمان مرتب نمی‌شدند، با چیزی به عنوان از دست دادن حافظه پنهان TLB آشنا می‌شدیم - عدم وجود آدرس صفحه حافظه در حافظه پنهان آدرس پردازنده. جستجوی یک آدرس در صورت عدم موفقیت حدود 30 برابر بیشتر طول می کشد و اگر داده ها پراکنده باشند، می تواند چندین بار سرعت سرویس را کاهش دهد.

نتیجه

در این مقاله، من نشان دادم که پایگاه داده KDB+ و Q نه تنها برای ذخیره داده های بزرگ و دسترسی آسان به آنها از طریق انتخاب مناسب هستند، بلکه برای ایجاد خدمات پردازش داده که قادر به هضم صدها میلیون ردیف/گیگابایت داده حتی در یک فرآیند Q واحد. خود زبان Q به دلیل ماهیت برداری، مفسر گویش SQL داخلی و مجموعه ای بسیار موفق از توابع کتابخانه، امکان اجرای بسیار مختصر و کارآمد الگوریتم های مربوط به پردازش داده ها را فراهم می کند.

متذکر می شوم که موارد فوق فقط بخشی از کاری است که Q می تواند انجام دهد، ویژگی های منحصر به فرد دیگری نیز دارد. به عنوان مثال، یک پروتکل IPC بسیار ساده که مرز بین فرآیندهای Q منفرد را پاک می کند و به شما امکان می دهد صدها مورد از این فرآیندها را در یک شبکه واحد ترکیب کنید که می تواند در ده ها سرور در نقاط مختلف جهان قرار گیرد.

منبع: www.habr.com

اضافه کردن نظر