A Q és a KDB+ nyelv jellemzői egy valós idejű szolgáltatás példáján keresztül

Az előző bejegyzésemben olvashatsz arról, hogy mi is az a KDB+ adatbázis és a Q programozási nyelv, valamint mik az erősségeik és gyengeségeik. cikk ...és röviden a bevezetőben. Ebben a cikkben egy olyan szolgáltatást fogunk megvalósítani Q-ban, amely feldolgozza a bejövő adatfolyamot, és percről percre "valós időben" kiszámítja a különböző aggregációs függvényeket (azaz lesz ideje mindent kiszámítani a következő adatköteg előtt). A Q fő jellemzője, hogy vektoros nyelv, így nem egyes objektumokon, hanem azok tömbjein, tömbök tömbjein és más összetett objektumokon tudunk dolgozni. Az olyan nyelvek, mint a Q és rokon nyelvei, a K, J és az APL, rövidségükről híresek. Gyakran egy olyan program, amely egy ismerős nyelven, például a Java-ban több képernyőnyi kódot foglalna el, néhány sorban megírható. Pontosan ezt szeretném bemutatni ebben a cikkben.

A Q és a KDB+ nyelv jellemzői egy valós idejű szolgáltatás példáján keresztül

Bevezetés

A KDB+ egy oszlopos adatbázis, amelyet nagyon nagy mennyiségű, meghatározott módon (elsősorban idő szerint) rendszerezett adathoz terveztek. Elsősorban pénzügyi intézményekben, például bankokban, befektetési alapokban és biztosítótársaságokban használják. A Q a KDB+ belső nyelve, amely lehetővé teszi az adatokkal való hatékony munkát. A Q filozófiája a tömörség és a hatékonyság, feláldozva az érthetőséget. Ez azért van, mert egy vektor alapú nyelv nehezen lenne érthető, míg a tömörség és a gazdagság lehetővé teszi, hogy a program sokkal nagyobb részét egyetlen képernyőn jelenítsük meg, ami végső soron megkönnyíti a megértést.

Ebben a cikkben egy teljes értékű programot fogunk megvalósítani Q-ban, és érdemes lehet kipróbálnod. Ehhez szükséged lesz magára a Q-ra. Az ingyenes 32 bites verziót letöltheted a kx weboldaláról – www.kx.comOtt, ha érdekel, referenciaanyagokat is találsz a Q című könyvről Q a halandóknak és különféle cikkek ebben a témában.

Probléma nyilatkozat

Van egy forrás, amely 25 milliszekundumként küld egy adattáblát. Mivel a KDB+-t elsősorban a pénzügyekben használják, feltételezzük, hogy ez egy kereskedési tábla a következő oszlopokkal: time (idő milliszekundumban), sym (cégjelzés a tőzsdén – IBM, AAPL,…), ár (az az ár, amelyen a részvényeket megvásárolták), és méret (a tranzakció mérete). A 25 milliszekundumos intervallumot önkényesen választottuk meg; se nem túl kicsi, se nem túl nagy. Jelenléte azt jelenti, hogy a szolgáltatásba érkező adatok már pufferelve vannak. Könnyű lenne pufferelést megvalósítani a szolgáltatás oldalán, beleértve az aktuális terhelésen alapuló dinamikus pufferelést is, de az egyszerűség kedvéért egy fix intervallumnál maradunk.

A szolgáltatásnak percenként kell kiszámítania egy összesítő függvénykészletet – max. árat, avg árat, összegzett méretet és egyéb hasznos információkat – a sym oszlopból bejövő minden egyes szimbólumra. Az egyszerűség kedvéért feltételezzük, hogy minden függvény inkrementálisan kiszámítható, ami azt jelenti, hogy egy új érték megszerzéséhez két szám – a régi érték és a bejövő érték – elegendő. Például a max, az average és az sum függvények rendelkeznek ezzel a tulajdonsággal, de a medián függvény nem.

Azt is feltételezzük, hogy a bejövő adatfolyam idő szerint van rendezve. Ez lehetővé teszi számunkra, hogy csak a legutóbbi perccel dolgozzunk. A gyakorlatban elegendő, ha a jelenlegi és az előző percekkel is tudunk dolgozni, arra az esetre, ha bármilyen frissítés késik. Az egyszerűség kedvéért ezt az esetet nem vesszük figyelembe.

Aggregációs függvények

Az alábbiakban a szükséges aggregációs függvények láthatók. A lehető legtöbbet beépítettem a szolgáltatás terhelésének növelése érdekében:

  • magas – max. ár – percenkénti maximális ár.
  • alacsony – minimum ár – percenkénti minimumár.
  • firstPrice – első ár – az első percenkénti ár.
  • lastPrice – last price – az utolsó percenkénti ár.
  • firstSize – first size – az első tranzakció percenkénti mérete.
  • lastSize – last size — az utolsó tranzakció percenkénti mérete.
  • numTrades – count i – percenkénti kereskedések száma.
  • volumen – összegméret – a percenkénti tranzakcióméretek összege.
  • pvolume – sum price – a percenkénti árak összege, az avgPrice-hez szükséges.
  • forgalom – összegár*méret – percenként lebonyolított tranzakciók teljes mennyisége.
  • avgPrice – pvolume%numTrades – átlagos percenkénti ár.
  • avgSize – volumen%numTrades – átlagos kereskedési méret percenként.
  • vwap – forgalom%volumen – percenkénti átlagos ár súlyozva a kereskedési mérettel.
  • cumVolume – összegzett volumen – a tranzakciók kumulált volumene a teljes időszak alatt.

Rögtön egy nem nyilvánvaló pontot vitassunk meg: hogyan kell inicializálni ezeket az oszlopokat az első alkalommal és minden további percben. Néhány oszlopot, például a firstPrice-t, minden alkalommal nullra kell inicializálni; az értékük nincs meghatározva. Másokat, például a volume-ot, mindig 0-ra kell állítani. Vannak olyan oszlopok is, amelyek kombinált megközelítést igényelnek – például a cumVolume-ot az előző percből kell másolni, és az első percre 0-ra kell állítani. Mindezeket a paramétereket a szótár adattípusával fogjuk definiálni (hasonlóan egy rekordhoz):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

A könnyebb áttekinthetőség kedvéért hozzáadtam a sym és a time paramétereket a szótárhoz. Az initWith most egy kész sor a végső összesített táblázatból, ahol már csak a helyes sym és time paramétereket kell megadni. Ezzel új sorokat adhatunk hozzá a táblázathoz.

Az aggregátumfüggvény létrehozásakor szükségünk lesz az aggCols változóra. A listát a Q-ban a kifejezések kiértékelésének sorrendje (jobbról balra) miatt meg kell invertálni. A cél az, hogy a kiértékelés a magastól a cumVolume-ig haladjon, mivel egyes oszlopok az előző oszlopoktól függenek.

Az új percbe az előzőből átmásolandó oszlopok esetében a sym oszlopot a kényelem kedvéért adjuk hozzá:

rollColumns:`sym`cumVolume;

Most osszuk az oszlopokat csoportokba aszerint, hogy hogyan kell frissíteni őket. Három típust különböztethetünk meg:

  1. Akkumulátorok (mennyiség, forgalom,..) – a bejövő értéket hozzá kell adnunk az előzőhöz.
  2. Egy speciális ponttal (magas, alacsony, ..) - a perc első értékét a bejövő adatokból vesszük, a többit a függvény segítségével számítjuk ki.
  3. A többit mindig a függvény segítségével számítjuk ki.

Definiáljuk a változókat ezekhez az osztályokhoz:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Számítási sorrend

Az összesített táblázatot két lépésben frissítjük. A hatékonyság érdekében először zsugorítjuk a bemeneti táblázatot úgy, hogy minden szimbólumhoz és perchez külön sor tartozzon. Az a tény, hogy minden függvényünk inkrementális és asszociatív, garantálja, hogy az eredmény nem fog változni ezzel a további lépéssel. A táblázatot egy SELECT utasítással zsugoríthatjuk:

select high:max price, low:min price … by sym,time.minute from table

Ennek a módszernek van egy hátránya: a számított oszlopok halmaza előre definiált. Szerencsére a Q a select függvényt dinamikusan generált argumentumok elfogadására szolgáló függvényként is megvalósítja:

?[table;whereClause;byClause;selectClause]

Nem fogom részletesen leírni az argumentumformátumot; esetünkben az egyetlen nemtriviális kifejezés a by és a select kifejezés, és ezeknek a columns!expressions űrlap szótárainak kell lenniük. Így a tömörítőfüggvény a következőképpen definiálható:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Az érthetőség kedvéért a parse függvényt használtam, amely a Q kifejezést tartalmazó karakterláncot egy olyan értékké alakítja, amely átadható az eval függvénynek, és amely szükséges a select függvényben. Azt is vegyük figyelembe, hogy az előfeldolgozás a select függvény egy vetületeként (azaz részben definiált argumentumokkal rendelkező függvényként) van definiálva; egy argumentum (a tábla) hiányzik. Ha az előfeldolgozást egy táblázatra alkalmazzuk, akkor egy tömörített táblázatot kapunk.

A második lépés az összesített tábla frissítése. Először írjuk meg az algoritmust pszeudokódban:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

A Q-ban gyakori, hogy ciklusok helyett map/reduce függvényeket használunk. De mivel a Q egy vektornyelv, és minden műveletet biztonságosan alkalmazhatunk egyszerre az összes szimbólumra, első közelítésben teljesen kiküszöbölhetjük a ciklust, ha egyszerre hajtunk végre műveleteket az összes szimbólumon:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

De ennél is tovább mehetünk. A Q-nak van egy egyedi és kivételesen hatékony operátora – az általánosított értékadó operátor. Ez lehetővé teszi egy összetett adatszerkezetben lévő értékhalmaz módosítását indexek, függvények és argumentumok listájának használatával. Esetünkben ez így néz ki:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Sajnos egy táblázathoz való hozzárendeléshez sorok listája szükséges, nem oszlopoké, és a mátrixot (oszlopok listáját sorok listájává) kell transzponálni a flip függvény segítségével. Egy nagy táblázat esetén ez költséges, ezért ehelyett egy általánosított hozzárendelést alkalmazunk minden oszlopra külön-külön a map függvény segítségével (ami aposztrófnak tűnik):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ismét függvényvetítést használunk. Azt is vegyük észre, hogy Q-ban a listakészítés szintén egy függvény, és az each(map) használatával meghívhatjuk listák listájának előállítására.

A számított oszlopok rögzített halmazának elkerülése érdekében hozzuk létre dinamikusan a fenti kifejezést. Először definiáljunk függvényeket az egyes oszlopok kiszámításához, a row és az inp változók használatával hivatkozva az összesített és a bemeneti adatokra:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Néhány oszlop speciális; az első értéküket nem szabad a függvénynek kiszámítania. Azt, hogy az első, a row[`numTrades] oszlop alapján állapíthatjuk meg – ha az 0, akkor az érték az első. A Q-nak van egy select függvénye – ?[Boolean list;list1;list2] –, amely az 1. vagy 2. listából választ ki egy értéket az első argumentumban szereplő feltételtől függően:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Itt egy általános értékadást hívtam meg a függvénnyel (a kapcsos zárójelben lévő kifejezés). Átadta az aktuális értéket (az első argumentumot) és egy további argumentumot, amelyet a negyedik paraméterben adok meg.

Adjuk hozzá külön az elemes hangszórókat, mivel ugyanazt a funkciót látják el:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Ez egy tipikus értékadás a Q szabványok szerint, de én egyszerre egy értéklistát rendelek hozzá. Végül hozzuk létre a main függvényt:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Ezzel a kifejezéssel dinamikusan létrehozok egy függvényt egy karakterláncból, amely tartalmazza a fent megadott kifejezést. Az eredmény így fog kinézni:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Az oszlopok kiértékelésének sorrendje megfordul, mivel Q-ban a kiértékelés sorrendje jobbról balra halad.

Most már csak két fő függvényünk van, amire a számítástechnika szüksége van, már csak egy kis infrastruktúrát kell hozzáadni, és a szolgáltatás készen is van.

Utolsó lépések

Rendelkezünk a preprocess és az updateAgg függvényekkel, amelyek elvégzik az összes munkát. De továbbra is biztosítanunk kell a percek közötti megfelelő átmeneteket, és ki kell számolnunk az indexeket az aggregációhoz. Először is definiáljuk az init függvényt:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Definiálunk egy roll függvényt is, amely megváltoztatja az aktuális percet:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Szükségünk lesz egy függvényre, amivel új karaktereket adhatunk hozzá:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Végül pedig az upd függvény (a függvény hagyományos elnevezése Q szolgáltatások esetén), amelyet az ügyfél hív meg az adatok hozzáadásához:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Ennyi. Íme a szolgáltatásunk teljes kódja, ahogy ígértük, csak pár sor:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

tesztelés

Teszteljük a szolgáltatás teljesítményét. Ehhez indítsuk el egy külön folyamatban (helyezzük a kódot a service.q fájlba), és hívjuk meg az init függvényt:

q service.q –p 5566

q)init[]

Egy másik konzolon indíts el egy második Q folyamatot, és csatlakozz az elsőhöz:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Először is hozzunk létre egy karakterekből álló listát – 10 000 darabot –, és adjunk hozzá egy függvényt, amely véletlenszerű táblázatot generál. A második konzolon:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Három valós szimbólumot adtam a listához, hogy könnyebb legyen őket keresni a táblázatban. Az rnd függvény egy n soros véletlenszerű táblázatot hoz létre, ahol az idők t-től t+25 milliszekundumig terjednek.

Most megpróbálhatsz adatokat küldeni a szolgáltatásnak (adjuk hozzá az első tíz órát):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

A szolgáltatásban ellenőrizheti, hogy a tábla frissült-e:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Eredmény:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Most futtassunk egy terheléses tesztet annak meghatározására, hogy a szolgáltatás mennyi adatot képes feldolgozni percenként. Emlékeztetőül, a frissítési intervallumot 25 milliszekundumra állítottuk be. Ezért a szolgáltatásnak (átlagosan) legalább 20 milliszekundumon belül frissülnie kell, hogy a felhasználóknak legyen idejük adatokat kérni. A második folyamatban írd be a következőket:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

A 4800 két percet jelent. Megpróbálhatod először 1000 sorral lefuttatni 25 milliszekundumként:

start 1000

Az én esetemben az eredmény frissítésenként körülbelül néhány milliszekundum. Tehát azonnal növelem a sorok számát 10 000-re:

start 10000

Eredmény:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Ismét semmi különös, de ez 24 millió sor percenként, 400 000 másodpercenként. A frissítés csak ötször lassult le több mint 25 milliszekundumra, nyilvánvalóan a percváltozás miatt. Növeljük ezt 100 000-re:

start 100000

Eredmény:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Amint láthatjuk, a szolgáltatás alig bírja a feladatokat, de azért sikerül talpon maradnia. Ez az adatmennyiség (percenként 240 millió sor) rendkívül nagy; ilyen esetekben gyakori, hogy a szolgáltatás több (vagy akár tucat) klónját indítják el, amelyek mindegyike csak a karakterek egy részhalmazát dolgozza fel. Mindazonáltal az eredmény lenyűgöző egy olyan értelmezett nyelv esetében, amely elsősorban az adattárolásra összpontosít.

Felmerülhet a kérdés, hogy miért nő az idő nemlineárisan az egyes frissítések méretével. Ennek az az oka, hogy a tömörítőfüggvény lényegében egy C függvény, amely sokkal hatékonyabb, mint az updateAgg. Egy bizonyos frissítési mérettől (kb. 10 000) kezdve az updateAgg eléri a felső határát, és ezt követően a végrehajtási ideje független a frissítési mérettől. Pontosan a Q előlépésnek köszönhetően képes a szolgáltatás ilyen adatmennyiségeket feldolgozni. Ez aláhúzza a megfelelő algoritmus kiválasztásának fontosságát a big data kezelésekor. Egy másik szempont a megfelelő adattárolás a memóriában. Ha az adatokat nem oszloposan vagy időrendben tárolnánk, akkor egy úgynevezett TLB gyorsítótár-hibával találkoznánk – a processzor címgyorsítótárában a memóriaoldal címének megtalálásának sikertelenségével. A címkeresések körülbelül 30-szor tovább tartanak, ha sikertelenek, és szétszórt adatok esetén ez többször is lelassíthatja a szolgáltatást.

Következtetés

Ebben a cikkben bemutattam, hogy a KDB+ és a Q nemcsak nagy adathalmazok tárolására és azok egyszerű elérésére alkalmas SELECT utasításokon keresztül, hanem olyan adatfeldolgozó szolgáltatások létrehozására is, amelyek képesek több százmillió sor/gigabájt adat feldolgozására akár egyetlen Q folyamatban is. Maga a Q nyelv vektoros jellegének, beépített SQL interpreterének és nagyon sikeres könyvtári függvénykészletének köszönhetően kivételesen tömör és hatékony megvalósítást tesz lehetővé az adatfeldolgozó algoritmusok számára.

Szeretném megjegyezni, hogy a fentiek csupán ízelítőt jelentenek a Q képességeiből; más egyedi jellemzőkkel is rendelkezik. Például egy rendkívül egyszerű IPC protokoll, amely eltörli a határokat az egyes Q folyamatok között, és lehetővé teszi, hogy több száz ilyen folyamatot egyetlen hálózatba kapcsoljanak, amely világszerte több tucat szervert is lefedhet.

Forrás: will.com

Vásároljon megbízható tárhelyet DDoS védelemmel, VPS VDS szerverekkel rendelkező webhelyekhez 🔥 Vásároljon megbízható weboldal tárhelyet DDoS védelemmel, VPS VDS szerverekkel | ProHoster