Q և KDB+ լեզվի առանձնահատկությունները՝ օգտագործելով իրական ժամանակի ծառայության օրինակը

Այն մասին, թե որոնք են KDB+ բազան, Q ծրագրավորման լեզուն, որոնք են դրանց ուժեղ և թույլ կողմերը, կարող եք կարդալ իմ նախորդ գրքում Հոդված և հակիրճ՝ ներածության մեջ։ Հոդվածում մենք կիրականացնենք ծառայություն Q-ի վրա, որը կմշակի մուտքային տվյալների հոսքը և ամեն րոպե «իրական ժամանակի» ռեժիմում կհաշվարկի տարբեր ագրեգացիոն գործառույթներ (այսինքն՝ ժամանակ կունենա ամեն ինչ հաշվարկելու մինչև տվյալների հաջորդ մասը): Q-ի հիմնական առանձնահատկությունն այն է, որ այն վեկտորային լեզու է, որը թույլ է տալիս գործել ոչ թե առանձին օբյեկտների, այլ դրանց զանգվածների, զանգվածների զանգվածների և այլ բարդ օբյեկտների հետ։ Լեզուները, ինչպիսիք են Q-ն և նրա հարազատները K, J, APL, հայտնի են իրենց հակիրճությամբ: Հաճախ, Java-ի նման ծանոթ լեզվով կոդերի մի քանի էկրաններ վերցնող ծրագիր կարող է գրվել դրանց վրա մի քանի տողով: Սա այն է, ինչ ես ուզում եմ ցույց տալ այս հոդվածում:

Q և KDB+ լեզվի առանձնահատկությունները՝ օգտագործելով իրական ժամանակի ծառայության օրինակը

Ներածություն

KDB+-ը սյունակային տվյալների բազա է, որը կենտրոնացած է շատ մեծ քանակությամբ տվյալների վրա՝ պատվիրված հատուկ ձևով (հիմնականում ըստ ժամանակի): Այն օգտագործվում է հիմնականում ֆինանսական հաստատություններում՝ բանկերում, ներդրումային հիմնադրամներում, ապահովագրական ընկերություններում: Q լեզուն KDB+-ի ներքին լեզուն է, որը թույլ է տալիս արդյունավետորեն աշխատել այս տվյալների հետ: Q գաղափարախոսությունը հակիրճությունն ու արդյունավետությունն է, մինչդեռ պարզությունը զոհաբերվում է: Դա հիմնավորվում է նրանով, որ վեկտորային լեզուն ամեն դեպքում դժվար կլինի հասկանալ, իսկ ձայնագրության հակիրճությունն ու հարստությունը թույլ են տալիս մեկ էկրանին տեսնել ծրագրի շատ ավելի մեծ մասը, ինչը, ի վերջո, հեշտացնում է այն:

Այս հոդվածում մենք իրականացնում ենք Q-ի լիարժեք ծրագիր, և գուցե ցանկանաք փորձել այն: Դա անելու համար ձեզ անհրաժեշտ կլինի իրական Q: Դուք կարող եք ներբեռնել անվճար 32-բիթանոց տարբերակը kx ընկերության կայքում. www.kx.com. Այնտեղ, եթե հետաքրքրված եք, դուք կգտնեք տեղեկատու տեղեկատվություն Q-ի, գրքի վերաբերյալ Q Մահկանացուների համար և այս թեմայով տարբեր հոդվածներ:

Խնդրի ձևակերպում

Կա մի աղբյուր, որը յուրաքանչյուր 25 միլիվայրկանում ուղարկում է աղյուսակ՝ տվյալների հետ: Քանի որ KDB+-ն օգտագործվում է հիմնականում ֆինանսների մեջ, մենք կենթադրենք, որ սա գործարքների (առևտրի) աղյուսակ է, որն ունի հետևյալ սյունակները՝ ժամանակ (ժամանակը միլիվայրկյաններով), sym (ընկերության անվանումը ֆոնդային բորսայում - IBM, AAPL,…), գինը (գինը, որով ձեռք են բերվել բաժնետոմսերը), չափը (գործարքի չափը): 25 միլիվայրկյան միջակայքը կամայական է, ոչ շատ փոքր և ոչ շատ երկար: Դրա առկայությունը նշանակում է, որ տվյալները ծառայության են գալիս արդեն բուֆերացված: Հեշտ կլինի բուֆերավորում իրականացնել սպասարկման կողմում, ներառյալ դինամիկ բուֆերացումը՝ կախված ընթացիկ բեռից, բայց պարզության համար մենք կկենտրոնանանք ֆիքսված ընդմիջման վրա:

Ծառայությունը պետք է ամեն րոպե հաշվի sym սյունակից յուրաքանչյուր մուտքային նշանի համար մի շարք ագրեգացիոն ֆունկցիաներ՝ առավելագույն գին, միջին գին, գումարի չափ և այլն: օգտակար տեղեկատվություն. Պարզության համար մենք կենթադրենք, որ բոլոր գործառույթները կարող են հաշվարկվել աստիճանաբար, այսինքն. Նոր արժեք ստանալու համար բավական է իմանալ երկու թիվ՝ հին և մուտքային արժեքները: Օրինակ, max, average, sum ֆունկցիաները ունեն այս հատկությունը, իսկ մեդիանային ֆունկցիան՝ ոչ:

Մենք նաև կենթադրենք, որ մուտքային տվյալների հոսքը ժամանակին պատվիրված է: Սա մեզ հնարավորություն կտա աշխատել միայն վերջին րոպեի հետ։ Գործնականում բավական է կարողանալ աշխատել ընթացիկ և նախորդ րոպեների հետ, եթե որոշ թարմացումներ ուշանան։ Պարզության համար մենք չենք քննարկի այս դեպքը:

Ագրեգացման գործառույթներ

Պահանջվող ագրեգացման գործառույթները թվարկված են ստորև: Ես վերցրեցի դրանցից որքան հնարավոր է շատ, որպեսզի մեծացնեմ ծառայության բեռը.

  • բարձր – առավելագույն գին – րոպեի առավելագույն գինը:
  • ցածր – նվազագույն գին – րոպեի նվազագույն գինը:
  • firstPrice – առաջին գին – առաջին գինը րոպեի համար:
  • lastPrice – վերջին գին – րոպեի վերջին գինը:
  • firstSize – առաջին չափ – առաջին առևտրի չափը րոպեում:
  • lastSize – վերջին չափը – վերջին առևտրի չափը մեկ րոպեում:
  • numTrades – հաշվում i – գործարքների քանակը րոպեում:
  • ծավալ – գումարի չափ – առևտրի չափերի գումարը րոպեում:
  • pvolume – գումարային գին – գների գումարը րոպեում, պահանջվում է avgPrice-ի համար:
  • – գումարային շրջանառության գին*չափ – գործարքների ընդհանուր ծավալը րոպեում:
  • avgPrice – pvolume%numԱռևտուր – միջին գինը մեկ րոպեի համար:
  • avgSize – ծավալը%numԱռևտուր – միջին առևտրի չափը րոպեում:
  • vwap – շրջանառություն%ծավալ – րոպեի միջին գինը՝ ըստ գործարքի չափի:
  • cumVolume – գումարային ծավալ – ամբողջ ժամանակի ընթացքում գործարքների կուտակված չափը:

Եկեք անմիջապես քննարկենք մեկ ոչ ակնհայտ կետ՝ ինչպես նախաստորագրել այս սյունակները առաջին անգամ և յուրաքանչյուր հաջորդ րոպեի համար: FirstPrice տիպի որոշ սյունակներ պետք է ամեն անգամ զրոյացվեն, դրանց արժեքը որոշված ​​չէ: Ձայնի մյուս տեսակները միշտ պետք է սահմանվեն 0-ի վրա: Կան նաև սյունակներ, որոնք պահանջում են համակցված մոտեցում. օրինակ, cumVolume-ը պետք է պատճենվի նախորդ րոպեից, իսկ առաջինի համար սահմանենք 0: Եկեք սահմանենք այս բոլոր պարամետրերը՝ օգտագործելով բառարանի տվյալները: տեսակը (գրառման անալոգը).

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Հարմարության համար ես բառարանում ավելացրել եմ sym և ժամանակ, այժմ initWith-ը պատրաստի տող է վերջնական ագրեգացված աղյուսակից, որտեղ մնում է սահմանել ճիշտ սիմվոլը և ժամանակը: Դուք կարող եք օգտագործել այն աղյուսակում նոր տողեր ավելացնելու համար:

Ագրեգացման ֆունկցիա ստեղծելիս մեզ պետք կգա aggCols: Ցանկը պետք է շրջվի Q-ի արտահայտությունների գնահատման հերթականությամբ (աջից ձախ): Նպատակն է ապահովել, որ հաշվարկը գնա բարձրից մինչև cumVolume, քանի որ որոշ սյունակներ կախված են նախորդներից:

Սյունակներ, որոնք պետք է պատճենվեն նախորդից նոր րոպեի վրա, հարմարության համար ավելացվում է sym սյունակը.

rollColumns:`sym`cumVolume;

Այժմ եկեք բաժանենք սյունակները խմբերի, ըստ այն մասին, թե ինչպես պետք է դրանք թարմացվեն: Կարելի է առանձնացնել երեք տեսակ.

  1. Կուտակիչներ (ծավալ, շրջանառություն,..) – մուտքային արժեքը պետք է ավելացնենք նախորդին։
  2. Հատուկ կետով (բարձր, ցածր, ..) – րոպեի առաջին արժեքը վերցվում է մուտքային տվյալներից, մնացածը հաշվարկվում են ֆունկցիայի միջոցով:
  3. Հանգիստ. Միշտ հաշվարկվում է ֆունկցիայի միջոցով:

Եկեք սահմանենք փոփոխականներ այս դասերի համար.

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Հաշվարկի կարգը

Մենք կթարմացնենք ընդհանուր աղյուսակը երկու փուլով: Արդյունավետության համար մենք նախ փոքրացնում ենք մուտքային աղյուսակը, որպեսզի յուրաքանչյուր նիշի և րոպեի համար մնա միայն մեկ տող: Այն փաստը, որ մեր բոլոր գործառույթները աստիճանական և ասոցիատիվ են, երաշխիք է, որ այս լրացուցիչ քայլի արդյունքը չի փոխվի։ Դուք կարող եք փոքրացնել աղյուսակը՝ ընտրելով.

select high:max price, low:min price … by sym,time.minute from table

Այս մեթոդը ունի մի թերություն՝ հաշվարկված սյունակների հավաքածուն նախապես սահմանված է: Բարեբախտաբար, Q-ում select-ն իրականացվում է նաև որպես ֆունկցիա, որտեղ կարող եք փոխարինել դինամիկ ստեղծված փաստարկները.

?[table;whereClause;byClause;selectClause]

Ես մանրամասն չեմ նկարագրի փաստարկների ձևաչափը, մեր դեպքում միայն by and select արտահայտությունները կլինեն ոչ տրիվիալ, և դրանք պետք է լինեն սյունակներ!արտահայտությունների բառարաններ: Այսպիսով, կրճատման գործառույթը կարող է սահմանվել հետևյալ կերպ.

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Պարզության համար ես օգտագործեցի վերլուծական ֆունկցիան, որը Q արտահայտությունով տողը վերածում է արժեքի, որը կարող է փոխանցվել eval ֆունկցիային և որը պահանջվում է ֆունկցիայի ընտրության մեջ: Նկատի ունեցեք նաև, որ նախնական գործընթացը սահմանվում է որպես ընտրված ֆունկցիայի պրոյեկցիա (այսինքն՝ մասնակիորեն սահմանված արգումենտներով ֆունկցիա), մեկ արգումենտ (աղյուսակը) բացակայում է։ Եթե ​​մենք կիրառենք նախնական գործընթացը սեղանի վրա, մենք կստանանք սեղմված աղյուսակ:

Երկրորդ փուլը համախմբված աղյուսակի թարմացումն է: Եկեք նախ գրենք ալգորիթմը կեղծ կոդով.

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q-ում սովորական է loops-ի փոխարեն օգտագործել map/reduce ֆունկցիաներ: Բայց քանի որ Q-ն վեկտորային լեզու է, և մենք կարող ենք հեշտությամբ կիրառել բոլոր գործողությունները միանգամից բոլոր նշանների վրա, ապա առաջին մոտավորությունը մենք կարող ենք անել ընդհանրապես առանց հանգույցի, կատարելով գործողություններ բոլոր նշանների վրա միանգամից.

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Բայց մենք կարող ենք ավելի հեռուն գնալ, Q-ն ունի եզակի և չափազանց հզոր օպերատոր՝ ընդհանրացված նշանակման օպերատորը։ Այն թույլ է տալիս փոխել արժեքների մի շարք բարդ տվյալների կառուցվածքում՝ օգտագործելով ինդեքսների, գործառույթների և փաստարկների ցանկը: Մեր դեպքում դա այսպիսի տեսք ունի.

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ցավոք սրտի, աղյուսակին վերագրելու համար անհրաժեշտ է տողերի ցանկ, ոչ թե սյունակներ, և դուք պետք է փոխադրեք մատրիցը (սյունակների ցանկը տողերի ցանկին) օգտագործելով flip ֆունկցիան: Սա թանկ է մեծ աղյուսակի համար, ուստի փոխարենը մենք կիրառում ենք ընդհանրացված հանձնարարություն յուրաքանչյուր սյունակի համար առանձին՝ օգտագործելով քարտեզի ֆունկցիան (որը նման է ապաստրոֆի).

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Մենք կրկին օգտագործում ենք ֆունկցիայի պրոյեկցիան: Նկատի ունեցեք նաև, որ Q-ում ցուցակ ստեղծելը նույնպես ֆունկցիա է, և մենք կարող ենք այն անվանել յուրաքանչյուր(քարտեզ) ֆունկցիայի միջոցով՝ ցուցակների ցուցակ ստանալու համար:

Ապահովելու համար, որ հաշվարկված սյունակների հավաքածուն ամրագրված չէ, մենք դինամիկ կերպով կստեղծենք վերը նշված արտահայտությունը: Եկեք նախ սահմանենք յուրաքանչյուր սյունակ հաշվարկելու գործառույթներ՝ օգտագործելով տող և inp փոփոխականները՝ ագրեգացված և մուտքագրված տվյալներին անդրադառնալու համար.

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Որոշ սյունակներ հատուկ են, դրանց առաջին արժեքը չպետք է հաշվարկվի ֆունկցիայի միջոցով: Մենք կարող ենք որոշել, որ այն առաջինն է [`numTrades] սյունակով, եթե այն պարունակում է 0, ապա արժեքը առաջինն է: Q-ն ունի ընտրելու ֆունկցիա՝ ?[Boolean list;list1;list2] - որն ընտրում է արժեք 1-ից կամ 2-ից՝ կախված առաջին արգումենտի պայմանից.

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Այստեղ ես կանչեցի ընդհանրացված հանձնարարություն իմ ֆունկցիայով (արտահայտություն գանգուր փակագծերում): Այն ստանում է ընթացիկ արժեքը (առաջին փաստարկը) և լրացուցիչ արգումենտ, որը ես փոխանցում եմ 4-րդ պարամետրով։

Եկեք առանձին-առանձին ավելացնենք մարտկոցի բարձրախոսները, քանի որ նրանց համար գործառույթը նույնն է.

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Սա սովորական հանձնարարություն է Q ստանդարտներով, բայց ես միանգամից արժեքների ցուցակ եմ տալիս: Ի վերջո, եկեք ստեղծենք հիմնական գործառույթը.

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Այս արտահայտությամբ ես դինամիկ կերպով ստեղծում եմ ֆունկցիա մի տողից, որը պարունակում է իմ վերը նշված արտահայտությունը: Արդյունքը կունենա հետևյալ տեսքը.

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Սյունակի գնահատման կարգը շրջված է, քանի որ Q-ում գնահատման կարգը աջից ձախ է:

Այժմ մենք ունենք հաշվարկների համար անհրաժեշտ երկու հիմնական գործառույթ, պարզապես պետք է մի փոքր ենթակառուցվածք ավելացնել, և ծառայությունը պատրաստ է։

Վերջնական քայլեր

Մենք ունենք preprocess and updateAgg ֆունկցիաներ, որոնք կատարում են ամբողջ աշխատանքը: Բայց դեռևս անհրաժեշտ է ապահովել րոպեների ճիշտ անցումը և հաշվարկել ինդեքսները ագրեգացման համար: Նախ, եկեք սահմանենք init ֆունկցիան.

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Մենք նաև կսահմանենք roll ֆունկցիան, որը կփոխի ընթացիկ րոպեն.

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Նոր նիշեր ավելացնելու համար մեզ անհրաժեշտ կլինի ֆունկցիա.

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Եվ վերջապես, upd ֆունկցիան (այս ֆունկցիայի ավանդական անվանումը Q ծառայությունների համար), որը հաճախորդը կանչում է տվյալներ ավելացնելու համար.

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Այսքանը: Ահա մեր ծառայության ամբողջական կոդը, ինչպես խոստացել էինք, ընդամենը մի քանի տող.

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Փորձարկում

Եկեք ստուգենք ծառայության կատարումը: Դա անելու համար եկեք այն գործարկենք առանձին գործընթացով (կոդը տեղադրենք service.q ֆայլում) և կանչենք init ֆունկցիան.

q service.q –p 5566

q)init[]

Մեկ այլ վահանակում սկսեք երկրորդ Q գործընթացը և միացեք առաջինին.

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Նախ ստեղծենք սիմվոլների ցանկ՝ 10000 կտոր և ավելացնենք ֆունկցիա՝ պատահական աղյուսակ ստեղծելու համար։ Երկրորդ վահանակում.

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Ցուցակում ավելացրի երեք իրական սիմվոլներ, որպեսզի ավելի հեշտ լինի դրանք փնտրել աղյուսակում: rnd ֆունկցիան ստեղծում է պատահական աղյուսակ n տողով, որտեղ ժամանակը տատանվում է t-ից մինչև t+25 միլիվայրկյան։

Այժմ կարող եք փորձել տվյալներ ուղարկել ծառայությանը (ավելացրեք առաջին տասը ժամը).

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Ծառայության մեջ կարող եք ստուգել, ​​որ աղյուսակը թարմացվել է.

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Արդյունքը:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Այժմ կատարենք բեռնվածության թեստավորում՝ պարզելու, թե ծառայությունը որքան տվյալներ կարող է մշակել րոպեում: Հիշեցնեմ, որ թարմացման միջակայքը սահմանել ենք 25 միլիվայրկյան։ Համապատասխանաբար, ծառայությունը (միջինում) պետք է տեղավորվի առնվազն 20 միլիվայրկյան յուրաքանչյուր թարմացման համար, որպեսզի օգտվողներին ժամանակ տա տվյալներ պահանջելու համար: Երկրորդ գործընթացում մուտքագրեք հետևյալը.

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800-ը երկու րոպե է։ Կարող եք առաջինը փորձել վազել 1000 տողով յուրաքանչյուր 25 միլիվայրկյան:

start 1000

Իմ դեպքում, արդյունքը մոտավորապես մի քանի միլիվայրկյան է մեկ թարմացման համար: Այսպիսով, ես անմիջապես կավելացնեմ տողերի թիվը մինչև 10.000:

start 10000

Արդյունքը:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Կրկին առանձնահատուկ ոչինչ, բայց սա րոպեում 24 միլիոն տող է, վայրկյանում 400 հազար: Ավելի քան 25 միլիվայրկյան, թարմացումը դանդաղեցրեց ընդամենը 5 անգամ, ըստ երևույթին, երբ րոպեն փոխվեց: Եկեք հասնենք 100.000-ի.

start 100000

Արդյունքը:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Ինչպես տեսնում եք, ծառայությունը հազիվ է գլուխ հանում, բայց, այնուամենայնիվ, կարողանում է ջրի երեսին մնալ։ Տվյալների նման ծավալը (րոպեում 240 միլիոն տող) չափազանց մեծ է, նման դեպքերում սովորական է գործարկել ծառայության մի քանի կլոններ (կամ նույնիսկ տասնյակ կլոններ), որոնցից յուրաքանչյուրը մշակում է նիշերի միայն մի մասը: Այնուամենայնիվ, արդյունքը տպավորիչ է մեկնաբանված լեզվի համար, որը հիմնականում կենտրոնանում է տվյալների պահպանման վրա:

Հարց կարող է առաջանալ, թե ինչու է ժամանակը ոչ գծային աճում յուրաքանչյուր թարմացման չափով: Պատճառն այն է, որ shrink ֆունկցիան իրականում C ֆունկցիա է, որը շատ ավելի արդյունավետ է, քան updateAgg-ը։ Թարմացման որոշակի չափից սկսած (մոտ 10.000), updateAgg-ը հասնում է իր առաստաղին, և այնուհետև դրա կատարման ժամանակը կախված չէ թարմացման չափից: Նախնական Q քայլի շնորհիվ է, որ ծառայությունը կարողանում է մարսել տվյալների նման ծավալները։ Սա ընդգծում է, թե որքան կարևոր է մեծ տվյալների հետ աշխատելիս ճիշտ ալգորիթմ ընտրելը: Մյուս կետը հիշողության մեջ տվյալների ճիշտ պահպանումն է: Եթե ​​տվյալները չպահվեին սյունակային կարգով կամ չպատվիրվեին ժամանակի ընթացքում, ապա մենք կծանոթանայինք այնպիսի բանի, ինչպիսին է TLB քեշի բացթողումը` պրոցեսորի հասցեների քեշում հիշողության էջի հասցեի բացակայությունը: Հասցեի որոնումը անհաջողության դեպքում մոտ 30 անգամ ավելի երկար է տևում, իսկ եթե տվյալները ցրված են, այն կարող է մի քանի անգամ դանդաղեցնել ծառայությունը:

Ամփոփում

Այս հոդվածում ես ցույց տվեցի, որ KDB+ և Q տվյալների բազան հարմար է ոչ միայն մեծ տվյալներ պահելու և ընտրվածի միջոցով հեշտությամբ մուտք գործելու համար, այլ նաև տվյալների մշակման ծառայություններ ստեղծելու համար, որոնք ունակ են մարսելու հարյուրավոր միլիոնավոր տողեր/գիգաբայթ տվյալներ նույնիսկ մեկ Q գործընթաց: Q լեզուն ինքնին թույլ է տալիս չափազանց հակիրճ և արդյունավետ իրականացնել տվյալների մշակման հետ կապված ալգորիթմներ՝ շնորհիվ իր վեկտորային բնույթի, ներկառուցված SQL բարբառային թարգմանչի և գրադարանային գործառույթների շատ հաջող հավաքածուի:

Նկատեմ, որ վերը նշվածը ընդամենը մի մասն է այն ամենի, ինչ կարող է անել Q-ն, այն ունի նաև այլ յուրահատուկ առանձնահատկություններ: Օրինակ, չափազանց պարզ IPC արձանագրություն, որը ջնջում է Q-ի առանձին պրոցեսների սահմանը և թույլ է տալիս միավորել հարյուրավոր պրոցեսներ մեկ ցանցի մեջ, որը կարող է տեղակայվել աշխարհի տարբեր ծայրերում գտնվող տասնյակ սերվերների վրա:

Source: www.habr.com

Добавить комментарий