Cechy języka Q i KDB+ na przykładzie usługi czasu rzeczywistego

O tym czym jest baza KDB+, język programowania Q, jakie są ich mocne i słabe strony, możecie przeczytać w moich poprzednich Artykuł i krótko we wstępie. W artykule zaimplementujemy na Q usługę, która będzie przetwarzać przychodzący strumień danych i co minutę obliczać różne funkcje agregujące w trybie „czasu rzeczywistego” (czyli zdąży wszystko przeliczyć przed kolejną porcją danych). Główną cechą Q jest to, że jest to język wektorowy, który pozwala operować nie pojedynczymi obiektami, ale ich tablicami, tablicami tablic i innymi złożonymi obiektami. Języki takie jak Q i jego krewni K, J, APL słyną ze swojej zwięzłości. Często program zajmujący kilka ekranów kodu w znanym języku, takim jak Java, można na nich napisać w kilku linijkach. To właśnie chcę wykazać w tym artykule.

Cechy języka Q i KDB+ na przykładzie usługi czasu rzeczywistego

Wprowadzenie

KDB+ to baza kolumnowa skupiająca bardzo duże ilości danych, uporządkowanych w określony sposób (przede wszystkim czasowo). Stosowany jest przede wszystkim w instytucjach finansowych – bankach, funduszach inwestycyjnych, towarzystwach ubezpieczeniowych. Język Q jest wewnętrznym językiem KDB+, który pozwala efektywnie pracować z tymi danymi. Ideologia Q zakłada zwięzłość i skuteczność, podczas gdy jasność jest poświęcana. Jest to uzasadnione faktem, że język wektorowy i tak będzie trudny do zrozumienia, a zwięzłość i bogactwo nagrania pozwala zobaczyć na jednym ekranie znacznie większą część programu, co ostatecznie ułatwia jego zrozumienie.

W tym artykule wdrażamy pełnoprawny program Q i być może zechcesz go wypróbować. Aby to zrobić, będziesz potrzebować samego Q. Możesz pobrać bezpłatną wersję 32-bitową na stronie firmy kx - www.kx.com. Tam, jeśli jesteś zainteresowany, znajdziesz informacje referencyjne na temat książki Q Q Dla śmiertelników oraz różne artykuły na ten temat.

Stwierdzenie problemu

Istnieje źródło, które wysyła tabelę z danymi co 25 milisekund. Ponieważ KDB+ wykorzystywany jest przede wszystkim w finansach, założymy, że jest to tabela transakcji (transakcji), która posiada następujące kolumny: time (czas w milisekundach), sym (oznaczenie spółki na giełdzie - IBM, AAPL,…), cena (cena, po której nabyto akcje), wielkość (wielkość transakcji). Odstęp 25 milisekund jest dowolny, niezbyt mały i niezbyt długi. Jego obecność oznacza, że ​​dane trafiają do usługi już zbuforowane. Łatwo byłoby zaimplementować buforowanie po stronie usługi, w tym buforowanie dynamiczne w zależności od aktualnego obciążenia, ale dla uproszczenia skupimy się na stałym interwale.

Usługa musi zliczać co minutę dla każdego przychodzącego symbolu z kolumny sym zestawem funkcji agregujących - cena maksymalna, cena średnia, wielkość sumy itp. przydatna informacja. Dla uproszczenia założymy, że wszystkie funkcje można obliczać przyrostowo, tj. aby otrzymać nową wartość wystarczy znać dwie liczby – starą i przychodzącą. Na przykład funkcje max, średnia i suma mają tę właściwość, ale funkcja mediany nie.

Założymy również, że przychodzący strumień danych jest uporządkowany czasowo. Dzięki temu będziemy mieli możliwość pracy wyłącznie na ostatnią chwilę. W praktyce wystarczy możliwość pracy z minutami bieżącymi i poprzednimi, na wypadek gdyby niektóre aktualizacje się spóźniały. Dla uproszczenia nie będziemy rozważać tego przypadku.

Funkcje agregujące

Wymagane funkcje agregujące są wymienione poniżej. Wziąłem ich jak najwięcej, aby zwiększyć obciążenie usługi:

  • wysoka – cena maksymalna – maksymalna cena za minutę.
  • niska – cena minimalna – cena minimalna za minutę.
  • FirstPrice – pierwsza cena – pierwsza cena za minutę.
  • lastPrice – ostatnia cena – ostatnia cena za minutę.
  • FirstSize – pierwszy rozmiar – wielkość pierwszej transakcji na minutę.
  • lastSize – ostatni rozmiar – ostatni rozmiar transakcji w ciągu minuty.
  • numTrades – liczba i – liczba transakcji na minutę.
  • wolumen – suma wielkości – suma wielkości transakcji na minutę.
  • pvolume – cena sumaryczna – suma cen za minutę, wymagana dla avgPrice.
  • – suma ceny obrotu*rozmiar – całkowity wolumen transakcji na minutę.
  • avgPrice – pvolume%numTrades – średnia cena za minutę.
  • avgSize – wolumen%numTrades – średnia wielkość transakcji na minutę.
  • vwap – obrót%objętość – średnia cena za minutę ważona wielkością transakcji.
  • cumVolume – suma wolumenu – skumulowana wielkość transakcji w całym czasie.

Omówmy od razu jeden nieoczywisty punkt - jak inicjować te kolumny po raz pierwszy i dla każdej kolejnej minuty. Niektóre kolumny typu FirstPrice muszą za każdym razem zostać zainicjowane na wartość null; ich wartość jest niezdefiniowana. Pozostałe typy woluminów należy zawsze ustawić na 0. Są też kolumny, które wymagają podejścia łączonego - np. cumVolume trzeba skopiować z poprzedniej minuty, a dla pierwszej ustawić na 0. Ustawmy te wszystkie parametry korzystając z danych słownikowych wpisz (analogicznie do rekordu):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Dla wygody dodałem do słownika sym i time, teraz initWith jest gotową linią z końcowej tabeli zagregowanej, gdzie pozostaje ustawić poprawny sym i czas. Możesz go użyć, aby dodać nowe wiersze do tabeli.

Będziemy potrzebować aggCols podczas tworzenia funkcji agregującej. Listę należy odwrócić ze względu na kolejność oceniania wyrażeń w Q (od prawej do lewej). Celem jest zapewnienie, że obliczenia przejdą od wartości high do cumVolume, ponieważ niektóre kolumny zależą od poprzednich.

Kolumny, które należy skopiować do nowej minuty z poprzedniej, dla wygody dodano kolumnę sym:

rollColumns:`sym`cumVolume;

Podzielmy teraz kolumny na grupy według sposobu ich aktualizacji. Można wyróżnić trzy typy:

  1. Akumulatory (wolumen, obrót,..) – wartość przychodzącą musimy dodać do poprzedniej.
  2. Ze specjalnym punktem (górny, najniższy, ..) – z przychodzących danych pobierana jest pierwsza wartość w minucie, pozostałe obliczane są za pomocą funkcji.
  3. Odpoczynek. Zawsze obliczane przy użyciu funkcji.

Zdefiniujmy zmienne dla tych klas:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Kolejność obliczeń

Zaktualizujemy zagregowaną tabelę w dwóch etapach. Aby zwiększyć wydajność, najpierw zmniejszamy tabelę przychodzącą, tak aby na każdy znak i minutę przypadał tylko jeden wiersz. Fakt, że wszystkie nasze funkcje są inkrementalne i asocjacyjne gwarantuje, że wynik tego dodatkowego kroku nie ulegnie zmianie. Możesz zmniejszyć tabelę za pomocą wybierz:

select high:max price, low:min price … by sym,time.minute from table

Metoda ta ma wadę – zestaw kolumn obliczeniowych jest predefiniowany. Na szczęście w Q funkcja Select jest również zaimplementowana jako funkcja, w której można zastąpić dynamicznie tworzone argumenty:

?[table;whereClause;byClause;selectClause]

Nie będę szczegółowo opisywał formatu argumentów, w naszym przypadku jedynie wyrażenia by iselect będą nietrywialne i powinny to być słowniki formularza kolumny!wyrażenia. Zatem funkcję kurczenia można zdefiniować w następujący sposób:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Dla jasności użyłem funkcji parse, która zamienia ciąg znaków z wyrażeniem Q na wartość, którą można przekazać do funkcji eval i która jest wymagana przy wyborze funkcji. Należy również pamiętać, że proces wstępny definiuje się jako projekcję (tj. funkcję z częściowo zdefiniowanymi argumentami) funkcji selekcji, brakuje jednego argumentu (tabeli). Jeśli zastosujemy proces wstępny do tabeli, otrzymamy skompresowaną tabelę.

Drugim etapem jest aktualizacja tabeli zagregowanej. Najpierw napiszmy algorytm w pseudokodzie:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

W Q często używa się funkcji map/reduce zamiast pętli. Ponieważ jednak Q jest językiem wektorowym i możemy łatwo zastosować wszystkie operacje na wszystkich symbolach jednocześnie, to w przypadku pierwszego przybliżenia możemy w ogóle obejść się bez pętli, wykonując operacje na wszystkich symbolach jednocześnie:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Ale możemy pójść dalej, Q ma unikalny i niezwykle potężny operator - uogólniony operator przypisania. Umożliwia zmianę zbioru wartości w złożonej strukturze danych za pomocą listy indeksów, funkcji i argumentów. W naszym przypadku wygląda to tak:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Niestety do przypisania do tabeli potrzebna jest lista wierszy, a nie kolumn i trzeba transponować macierz (listę kolumn na listę wierszy) za pomocą funkcji odwracania. Jest to kosztowne w przypadku dużej tabeli, dlatego zamiast tego stosujemy uogólnione przypisanie do każdej kolumny z osobna, używając funkcji map (która wygląda jak apostrof):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ponownie korzystamy z projekcji funkcji. Zauważ też, że w Q tworzenie listy jest także funkcją i możemy ją wywołać za pomocą funkcji each(map), aby uzyskać listę list.

Aby mieć pewność, że zestaw kolumn obliczeniowych nie jest stały, powyższe wyrażenie utworzymy dynamicznie. Najpierw zdefiniujmy funkcje do obliczania każdej kolumny, używając zmiennych row i inp do odwoływania się do danych zagregowanych i wejściowych:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Niektóre kolumny są specjalne i funkcja nie powinna obliczać ich pierwszej wartości. To, że jest to pierwsza, możemy określić po kolumnie row[`numTrades] - jeśli zawiera 0, to wartość jest pierwsza. Q ma funkcję wyboru - ?[Lista logiczna;lista1;lista2] - która wybiera wartość z listy 1 lub 2 w zależności od warunku zawartego w pierwszym argumencie:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Tutaj nazwałem uogólnione przypisanie z moją funkcją (wyrażenie w nawiasach klamrowych). Otrzymuje aktualną wartość (pierwszy argument) oraz dodatkowy argument, który przekazuję w czwartym parametrze.

Dodajmy osobno głośniki akumulatorowe, ponieważ funkcja jest dla nich taka sama:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Jest to normalne przypisanie według standardów Q, ale przypisuję listę wartości od razu. Na koniec utwórzmy główną funkcję:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Za pomocą tego wyrażenia dynamicznie tworzę funkcję z ciągu znaków zawierającego wyrażenie, które podałem powyżej. Wynik będzie wyglądał następująco:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Kolejność oceny kolumn jest odwrócona, ponieważ w Q kolejność oceny jest od prawej do lewej.

Teraz mamy dwie główne funkcje niezbędne do obliczeń, wystarczy dodać trochę infrastruktury i usługa gotowa.

Ostatnie kroki

Mamy funkcje wstępnego przetwarzania i updateAgg, które wykonują całą pracę. Nadal jednak konieczne jest zapewnienie prawidłowego przejścia przez minuty i obliczenie indeksów do agregacji. Na początek zdefiniujmy funkcję init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Zdefiniujemy także funkcję roll, która zmieni aktualną minutę:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Będziemy potrzebować funkcji dodania nowych znaków:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

I wreszcie funkcja upd (tradycyjna nazwa tej funkcji dla usług Q), która jest wywoływana przez klienta w celu dodania danych:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

To wszystko. Oto pełny kod naszej usługi, zgodnie z obietnicą, tylko kilka linijek:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Testowanie

Sprawdźmy wydajność usługi. Aby to zrobić, uruchommy go w osobnym procesie (umieśćmy kod w pliku service.q) i wywołajmy funkcję init:

q service.q –p 5566

q)init[]

W innej konsoli uruchom drugi proces Q i połącz się z pierwszym:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Najpierw utwórzmy listę symboli - 10000 XNUMX sztuk i dodajmy funkcję tworzącą losową tabelę. W drugiej konsoli:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Do listy dodałem trzy symbole rzeczywiste, żeby łatwiej było ich szukać w tabeli. Funkcja rnd tworzy losową tabelę z n wierszami, gdzie czas waha się od t do t + 25 milisekund.

Teraz możesz spróbować wysłać dane do usługi (dodaj pierwsze dziesięć godzin):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

W serwisie możesz sprawdzić czy tabela została zaktualizowana:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Wynik:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Przeprowadźmy teraz testy obciążenia, aby dowiedzieć się, ile danych usługa może przetworzyć w ciągu minuty. Przypomnę, że interwał aktualizacji ustawiliśmy na 25 milisekund. W związku z tym usługa musi (średnio) zmieścić się w co najmniej 20 milisekundach na aktualizację, aby dać użytkownikom czas na zażądanie danych. W drugim procesie wprowadź następujące dane:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 to dwie minuty. Możesz spróbować najpierw uruchomić 1000 wierszy co 25 milisekund:

start 1000

W moim przypadku wynik wynosi około kilku milisekund na aktualizację. Natychmiast więc zwiększę liczbę wierszy do 10.000 XNUMX:

start 10000

Wynik:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Znowu nic specjalnego, ale to 24 miliony linii na minutę, 400 tysięcy na sekundę. Na ponad 25 milisekund aktualizacja spowolniła tylko 5 razy, najwyraźniej wraz ze zmianą minut. Zwiększmy do 100.000 XNUMX:

start 100000

Wynik:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Jak widać, usługa ledwo sobie radzi, ale mimo to udaje jej się utrzymać na powierzchni. Taka ilość danych (240 milionów wierszy na minutę) jest niezwykle duża, często w takich przypadkach uruchamia się kilka klonów (a nawet kilkadziesiąt klonów) usługi, z których każdy przetwarza tylko część znaków. Mimo to wynik jest imponujący jak na język interpretowany, który koncentruje się przede wszystkim na przechowywaniu danych.

Może pojawić się pytanie, dlaczego czas rośnie nieliniowo wraz z rozmiarem każdej aktualizacji. Powodem jest to, że funkcja zmniejszania jest w rzeczywistości funkcją C, która jest znacznie wydajniejsza niż updateAgg. Począwszy od określonego rozmiaru aktualizacji (około 10.000 30), updateAgg osiąga swój pułap i wtedy czas jego wykonania nie zależy od rozmiaru aktualizacji. To właśnie dzięki wstępnemu etapowi Q usługa jest w stanie przetrawić takie ilości danych. To pokazuje, jak ważny jest wybór odpowiedniego algorytmu podczas pracy z dużymi zbiorami danych. Kolejną kwestią jest prawidłowe przechowywanie danych w pamięci. Gdyby dane nie były przechowywane kolumnowo lub nie były uporządkowane według czasu, poznalibyśmy coś takiego jak brak pamięci podręcznej TLB - brak adresu strony pamięci w pamięci podręcznej adresu procesora. Wyszukiwanie adresu w przypadku niepowodzenia trwa około XNUMX razy dłużej, a w przypadku rozproszenia danych może kilkukrotnie spowolnić usługę.

wniosek

W tym artykule pokazałem, że bazy danych KDB+ i Q nadają się nie tylko do przechowywania dużych ilości danych i łatwego dostępu do nich poprzez selekcję, ale także do tworzenia usług przetwarzania danych, które są w stanie przetrawić setki milionów wierszy/gigabajtów danych nawet w jeden pojedynczy proces Q. Sam język Q pozwala na niezwykle zwięzłą i wydajną implementację algorytmów związanych z przetwarzaniem danych ze względu na jego wektorową naturę, wbudowany interpreter dialektu SQL oraz bardzo udany zestaw funkcji bibliotecznych.

Pragnę zauważyć, że powyższe to tylko część możliwości Q; ma on również inne unikalne cechy. Przykładowo niezwykle prosty protokół IPC, który zaciera granicę pomiędzy poszczególnymi procesami Q i pozwala na połączenie setek tych procesów w jedną sieć, która może być zlokalizowana na kilkudziesięciu serwerach w różnych częściach świata.

Źródło: www.habr.com

Dodaj komentarz