A Q és a KDB+ nyelv jellemzői egy valós idejű szolgáltatás példáján keresztül

Arról, hogy mi a KDB+ alap, a Q programozási nyelv, mik azok erősségei és gyengeségei, az előző oldalamban olvashattok. cikk és röviden a bevezetőben. A cikkben olyan szolgáltatást fogunk megvalósítani a Q-n, amely feldolgozza a bejövő adatfolyamot, és percenként különféle aggregációs függvényeket számol ki „valós idejű” módban (azaz lesz ideje mindent kiszámítani a következő adatrész előtt). A Q fő jellemzője, hogy egy vektornyelv, amely lehetővé teszi, hogy ne egyedi objektumokkal, hanem azok tömbjeivel, tömbök tömbjeivel és egyéb összetett objektumokkal dolgozzon. Az olyan nyelvek, mint a Q és rokonai K, J, APL, híresek rövidségükről. Gyakran előfordul, hogy egy program, amely több képernyőnyi kódot foglal el egy ismerős nyelven, például a Java nyelven, néhány sorban ráírható. Ezt szeretném bemutatni ebben a cikkben.

A Q és a KDB+ nyelv jellemzői egy valós idejű szolgáltatás példáján keresztül

Bevezetés

A KDB+ egy oszlopos adatbázis, amely nagyon nagy mennyiségű adatra összpontosít, meghatározott módon (elsősorban idő szerint) rendezve. Elsősorban pénzintézetekben – bankokban, befektetési alapokban, biztosítótársaságokban – használják. A Q nyelv a KDB+ belső nyelve, amely lehetővé teszi az adatok hatékony kezelését. A Q ideológiája a rövidség és a hatékonyság, miközben a világosság feláldozott. Ezt az indokolja, hogy a vektornyelv minden esetben nehezen lesz érthető, a felvétel rövidsége és gazdagsága pedig lehetővé teszi, hogy egy képernyőn jóval nagyobb részt lássunk a programból, ami végső soron megkönnyíti a megértést.

Ebben a cikkben egy teljes értékű programot valósítunk meg a Q-ban, és érdemes lehet kipróbálni. Ehhez szüksége lesz a tényleges Q-ra. Az ingyenes 32 bites verziót letöltheti a kx cég webhelyéről – www.kx.com. Ott, ha érdekel, referencia információkat talál a Q-ról, a könyvről Q Halandóknak és különféle cikkek ebben a témában.

Probléma nyilatkozat

Van olyan forrás, amely 25 ezredmásodpercenként küld egy táblázatot az adatokkal. Mivel a KDB+-t elsősorban a pénzügyekben használják, feltételezzük, hogy ez a tranzakciók (ügyletek) táblázata, amelynek a következő oszlopai vannak: idő (idő ezredmásodpercben), sym (cég megnevezése a tőzsdén - IBM, AAPL,…), ár (az az ár, amelyen a részvényeket megvásárolták), méret (az ügylet nagysága). A 25 ezredmásodperces intervallum tetszőleges, nem túl kicsi és nem túl hosszú. Jelenléte azt jelenti, hogy az adatok már pufferelve érkeznek a szolgáltatáshoz. Könnyű lenne a szervizoldali pufferelést megvalósítani, beleértve az aktuális terheléstől függő dinamikus pufferelést is, de az egyszerűség kedvéért fix intervallumra koncentrálunk.

A szolgáltatásnak percenként számolnia kell minden egyes bejövő szimbólumra a Sym oszlopból egy összesítő függvénykészletet - max ár, átlagos ár, összeg mérete stb. hasznos információ. Az egyszerűség kedvéért feltételezzük, hogy minden függvény növekményesen számítható, azaz. egy új érték megszerzéséhez elegendő két szám ismerete - a régi és a bejövő érték. Például a max, átlag, sum függvények rendelkeznek ezzel a tulajdonsággal, de a medián függvény nem.

Azt is feltételezzük, hogy a bejövő adatfolyam időrendű. Ez lehetőséget ad arra, hogy csak az utolsó pillanatban dolgozzunk. Gyakorlatilag elég az aktuális és az előző percekkel dolgozni arra az esetre, ha egyes frissítések késnek. Az egyszerűség kedvéért ezt az esetet nem vesszük figyelembe.

Aggregációs függvények

Az alábbiakban felsoroljuk a szükséges összesítő függvényeket. A lehető legtöbbet vettem be közülük, hogy növeljem a szolgáltatás terhelését:

  • magas – max ár – maximális percdíj.
  • alacsony – min ár – minimális percdíj.
  • firstPrice – first price – első ár percenként.
  • lastPrice – last price – utolsó ár percenként.
  • firstSize – első méret – első kereskedési méret percenként.
  • lastSize – utolsó méret – utolsó kereskedési méret egy perc alatt.
  • numTrades – count i – ügyletek száma percenként.
  • volume – Sum size – A percenkénti kereskedési méretek összege.
  • pvolume – összeg ár – percenkénti árak összege, az avgPrice-hez szükséges.
  • – összesített forgalmi ár*méret – a tranzakciók teljes mennyisége percenként.
  • avgPrice – pvolume%numTrades – átlagos percár.
  • avgSize – volume%numTrades – átlagos kereskedési méret percenként.
  • vwap – forgalom%volume – átlagos percár a tranzakció méretével súlyozva.
  • cumVolume – összeg mennyiség – tranzakciók halmozott mérete a teljes idő alatt.

Azonnal beszéljünk meg egy nem nyilvánvaló pontot – hogyan inicializáljuk ezeket az oszlopokat először és minden további percben. A firstPrice típusú egyes oszlopokat minden alkalommal nullára kell inicializálni, értékük nincs meghatározva. Más kötettípusokat mindig 0-ra kell állítani. Vannak olyan oszlopok is, amelyek kombinált megközelítést igényelnek - például a cumVolume-ot az előző percről kell átmásolni, az elsőnél pedig 0-ra. Állítsuk be ezeket a paramétereket a szótári adatok segítségével típus (a rekordhoz hasonló):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

A kényelem kedvéért hozzáadtam a szótárhoz a sym-et és az időt, most az initWith egy kész sor a végső összesített táblázatból, ahol a helyes sym-et és időt kell beállítani. Használhatja új sorok hozzáadására a táblázathoz.

Aggregációs függvény létrehozásakor szükségünk lesz aggCols-ra. A listát meg kell fordítani a Q-beli kifejezések kiértékelési sorrendje miatt (jobbról balra). A cél az, hogy a számítás magasról cumVolume-ra menjen, mivel egyes oszlopok a korábbiaktól függenek.

Azok az oszlopok, amelyeket az előzőből egy új percbe kell másolni, a kényelem kedvéért hozzáadjuk a szimbolikus oszlopot:

rollColumns:`sym`cumVolume;

Most osszuk fel csoportokra az oszlopokat aszerint, hogy hogyan kell frissíteni őket. Három típust különböztethetünk meg:

  1. Akkumulátorok (volumen, forgalom,...) – a beérkező értéket hozzá kell adnunk az előzőhöz.
  2. Speciális ponttal (magas, alacsony, ..) – a perc első értéket a beérkező adatokból veszik, a többit a függvény segítségével számítja ki.
  3. Pihenés. Mindig függvény segítségével számítják ki.

Határozzuk meg a változókat ezekhez az osztályokhoz:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Számítási sorrend

Az összesített táblázatot két lépésben frissítjük. A hatékonyság érdekében először összezsugorítjuk a bejövő táblázatot úgy, hogy minden karakternek és percnek csak egy sora legyen. Az a tény, hogy minden funkciónk inkrementális és asszociatív, garantálja, hogy ennek a további lépésnek az eredménye nem változik. A táblázatot a Select segítségével csökkentheti:

select high:max price, low:min price … by sym,time.minute from table

Ennek a módszernek van egy hátránya - a számított oszlopok halmaza előre meghatározott. Szerencsére a Q-ban a select olyan függvényként is implementálva van, ahol helyettesítheti a dinamikusan létrehozott argumentumokat:

?[table;whereClause;byClause;selectClause]

Az argumentumok formátumát nem írom le részletesen, esetünkben csak a by and select kifejezések lesznek nem triviálisak, és ezek az űrlaposzlopok!kifejezések szótárai legyenek. Így a zsugorító függvény a következőképpen definiálható:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Az áttekinthetőség kedvéért a parse függvényt használtam, amely egy Q kifejezést tartalmazó karakterláncot az eval függvénynek átadható és a select függvényben szükséges értékké alakít. Vegye figyelembe azt is, hogy az előfeldolgozás a select függvény vetületeként (vagyis részben meghatározott argumentumokkal rendelkező függvényként) van definiálva, egy argumentum (a táblázat) hiányzik. Ha előfeldolgozást alkalmazunk egy táblára, akkor egy tömörített táblát kapunk.

A második lépés az összesített táblázat frissítése. Először írjuk meg az algoritmust pszeudokódban:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q-ban gyakori a leképezés/kicsinyítés függvények használata ciklusok helyett. De mivel a Q vektornyelv, és minden műveletet könnyedén alkalmazhatunk az összes szimbólumra egyszerre, így első közelítéssel megtehetjük ciklus nélkül is, egyszerre hajtva végre az összes szimbólumon a műveleteket:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

De mehetünk tovább, a Q-nak van egy egyedülálló és rendkívül erős operátora - az általános hozzárendelés operátora. Lehetővé teszi egy összetett adatstruktúra értékkészletének megváltoztatását indexek, függvények és argumentumok listájának segítségével. A mi esetünkben így néz ki:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Sajnos a táblázathoz való hozzárendeléshez sorok listájára van szükség, nem oszlopokra, és a mátrixot (az oszlopok listáját a sorok listájára) kell transzponálnia a flip funkcióval. Ez drága egy nagy táblánál, ezért ehelyett minden oszlophoz külön-külön általánosított hozzárendelést alkalmazunk a térkép funkcióval (ami aposztrófnak tűnik):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ismét függvényvetítést használunk. Vegye figyelembe azt is, hogy a Q-ban a lista létrehozása is egy függvény, és ezt az Each(map) függvény segítségével hívhatjuk meg, hogy listát kapjunk.

Annak érdekében, hogy a számított oszlopok halmaza ne legyen rögzített, a fenti kifejezést dinamikusan hozzuk létre. Először definiáljunk függvényeket az egyes oszlopok kiszámításához, a sor és inp változók segítségével az összesített és bemeneti adatokra hivatkozva:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Egyes oszlopok speciálisak, első értéküket a függvény nem számíthatja ki. A row[`numTrades] oszlopból megállapíthatjuk, hogy ez az első - ha 0-t tartalmaz, akkor az érték az első. A Q-nak van egy kiválasztó függvénye - ?[Logiai lista;lista1;lista2] -, amely az 1. vagy 2. listából választ ki egy értéket az első argumentum feltételétől függően:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Itt egy általánosított hozzárendelést hívtam meg a függvényemmel (egy kifejezés göndör kapcsos zárójelben). Megkapja az aktuális értéket (az első argumentumot) és egy további argumentumot, amit a 4. paraméterben adok át.

Az akkumulátoros hangszórókat külön adjuk hozzá, mivel a funkciójuk ugyanaz:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Ez egy normális hozzárendelés a Q szabványok szerint, de én egyszerre rendelek hozzá egy értéklistát. Végül hozzuk létre a fő függvényt:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Ezzel a kifejezéssel dinamikusan létrehozok egy függvényt a fent megadott kifejezést tartalmazó karakterláncból. Az eredmény így fog kinézni:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Az oszlop kiértékelési sorrendje fordított, mert a Q-ban a kiértékelési sorrend jobbról balra halad.

Most két fő funkciónk van a számításokhoz, csak kell hozzá egy kis infrastruktúra és kész is a szolgáltatás.

Utolsó lépések

Vannak előfeldolgozási és updateAgg függvényeink, amelyek minden munkát elvégeznek. De továbbra is biztosítani kell a helyes átmenetet a perceken keresztül, és ki kell számítani az összesítéshez szükséges indexeket. Először is definiáljuk az init függvényt:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Meghatározzuk a dobás funkciót is, amely megváltoztatja az aktuális percet:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Új karakterek hozzáadásához szükségünk lesz egy függvényre:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

És végül az upd függvény (a Q szolgáltatások hagyományos neve ennek a függvénynek), amelyet az ügyfél hív meg adatok hozzáadására:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Ez minden. Itt van szolgáltatásunk teljes kódja, ahogy ígértük, csak néhány sor:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

tesztelés

Ellenőrizzük a szolgáltatás teljesítményét. Ehhez futtassuk le egy külön folyamatban (tegyük a kódot a service.q fájlba), és hívjuk meg az init függvényt:

q service.q –p 5566

q)init[]

Egy másik konzolon indítsa el a második Q folyamatot, és csatlakozzon az elsőhöz:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Először hozzunk létre egy szimbólumlistát - 10000 XNUMX darabot, és adjunk hozzá egy függvényt egy véletlenszerű táblázat létrehozásához. A második konzolon:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Három valódi szimbólumot adtam hozzá a listához, hogy könnyebben megtaláljam őket a táblázatban. Az rnd függvény egy véletlenszerű táblázatot hoz létre n sorral, ahol az idő t és t+25 ezredmásodperc között változik.

Most megpróbálhatja adatokat küldeni a szolgáltatásnak (adja hozzá az első tíz órát):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

A szolgáltatásban ellenőrizheti, hogy a táblázat frissült:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Eredmény:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Most végezzünk terhelési tesztet, hogy megtudjuk, mennyi adatot tud feldolgozni a szolgáltatás percenként. Hadd emlékeztesselek arra, hogy a frissítési időközt 25 ezredmásodpercre állítottuk be. Ennek megfelelően a szolgáltatásnak (átlagosan) legalább 20 ezredmásodpercnek kell beleférnie frissítésenként, hogy a felhasználóknak legyen idejük adatkérésre. A második folyamatban írja be a következőket:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 két perc. Megpróbálhatja először 1000 ezredmásodpercenként 25 sorban futni:

start 1000

Az én esetemben az eredmény körülbelül néhány milliszekundum frissítésenként. Tehát azonnal növelem a sorok számát 10.000 XNUMX-re:

start 10000

Eredmény:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Megint semmi különös, de ez 24 millió sor percenként, 400 ezer másodpercenként. Több mint 25 ezredmásodpercig a frissítés csak 5-ször lassult le, nyilvánvalóan a perc változásakor. Növeljük 100.000-re:

start 100000

Eredmény:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Amint látható, a szolgáltatás alig bírja, de ennek ellenére képes talpon maradni. Egy ilyen adatmennyiség (240 millió sor percenként) rendkívül nagy, ilyenkor gyakran előfordul, hogy a szolgáltatás több klónját (vagy akár több tucat klónját) indítják el, amelyek mindegyike csak a karakterek egy részét dolgozza fel. Ennek ellenére az eredmény lenyűgöző egy olyan értelmezett nyelv esetében, amely elsősorban az adattárolásra összpontosít.

Felmerülhet a kérdés, hogy miért nő az idő nem lineárisan az egyes frissítések méretével. Ennek az az oka, hogy a shrink függvény valójában egy C függvény, ami sokkal hatékonyabb, mint az updateAgg. Egy bizonyos frissítési mérettől (körülbelül 10.000 30) indulva az updateAgg eléri a plafont, majd a végrehajtási ideje nem függ a frissítés méretétől. A Q előzetes lépésnek köszönhető, hogy a szolgáltatás ilyen mennyiségű adatot képes feldolgozni. Ez rávilágít arra, mennyire fontos a megfelelő algoritmus kiválasztása a nagy adatokkal való munka során. Egy másik szempont az adatok helyes tárolása a memóriában. Ha az adatokat nem oszlopszerűen tárolnák, vagy nem időrendben lennének, akkor megismerkednénk egy olyan dologgal, mint a TLB cache miss - a memóriaoldal címének hiánya a processzor cím-gyorsítótárában. A címkeresés sikertelensége esetén körülbelül XNUMX-szor tovább tart, és ha az adatok szétszóródnak, az többszörösen lelassíthatja a szolgáltatást.

Következtetés

Ebben a cikkben bemutattam, hogy a KDB+ és Q adatbázis nemcsak nagy adatok tárolására és selecten keresztül történő könnyű elérésére alkalmas, hanem olyan adatfeldolgozó szolgáltatások létrehozására is, amelyek akár több száz millió sor/gigabájt adat megemésztésére is alkalmasak. egyetlen Q folyamat. Maga a Q nyelv az adatfeldolgozáshoz kapcsolódó algoritmusok rendkívül tömör és hatékony megvalósítását teszi lehetővé vektorjellegének, beépített SQL dialektus értelmezőjének és nagyon sikeres könyvtári funkcióinak köszönhetően.

Megjegyzem, a fentiek csak egy részét képezik a Q képességeinek, más egyedi tulajdonságokkal is rendelkezik. Például egy rendkívül egyszerű IPC-protokoll, amely eltörli az egyes Q-folyamatok közötti határvonalat, és lehetővé teszi több száz ilyen folyamat egyetlen hálózatba való összevonását, amely a világ különböző pontjain több tucat szerveren helyezkedhet el.

Forrás: will.com

Hozzászólás