Gerçek zamanlı hizmet örneğini kullanan Q ve KDB+ dilinin özellikleri

KDB+ tabanının, Q programlama dilinin ne olduğunu, güçlü ve zayıf yönlerinin neler olduğunu önceki yazımda okuyabilirsiniz. Makale ve kısaca giriş bölümünde. Makalede, gelen veri akışını işleyecek ve "gerçek zamanlı" modda her dakika çeşitli toplama işlevlerini hesaplayacak (yani, verilerin bir sonraki bölümünden önce her şeyi hesaplamak için zamanı olacak) Q üzerinde bir hizmet uygulayacağız. Q'nun ana özelliği, tek nesnelerle değil, onların dizileri, dizi dizileri ve diğer karmaşık nesnelerle işlem yapmanıza olanak tanıyan bir vektör dili olmasıdır. Q ve onun akrabaları K, J, APL gibi diller kısalıklarıyla ünlüdür. Çoğu zaman, Java gibi tanıdık bir dilde birkaç ekran kod alan bir program, bunların üzerine birkaç satırda yazılabilir. Bu makalede bunu göstermek istiyorum.

Gerçek zamanlı hizmet örneğini kullanan Q ve KDB+ dilinin özellikleri

Giriş

KDB+, belirli bir şekilde (öncelikle zamana göre) sıralanan çok büyük miktardaki verilere odaklanan sütunlu bir veritabanıdır. Öncelikle finansal kurumlarda - bankalar, yatırım fonları, sigorta şirketleri - kullanılır. Q dili, KDB+'ın bu verilerle etkili bir şekilde çalışmanıza olanak tanıyan dahili dilidir. Q ideolojisi kısalık ve verimliliktir, netlik feda edilir. Bu, vektör dilinin her durumda anlaşılmasının zor olacağı gerçeğiyle haklı çıkar ve kaydın kısalığı ve zenginliği, programın çok daha büyük bir bölümünü tek ekranda görmenize olanak tanır ve bu da sonuçta anlaşılmasını kolaylaştırır.

Bu yazıda Q'da tam teşekküllü bir program uyguluyoruz ve siz de denemek isteyebilirsiniz. Bunu yapmak için gerçek Q'ya ihtiyacınız olacak. Ücretsiz 32 bit sürümü kx şirketinin web sitesinden indirebilirsiniz – www.kx.com. Eğer ilgileniyorsanız, orada Q kitabı hakkında referans bilgileri bulacaksınız. Ölümlüler İçin Q ve bu konuyla ilgili çeşitli makaleler.

Sorunun formüle edilmesi

Her 25 milisaniyede bir veri içeren tablo gönderen bir kaynak var. KDB+ öncelikle finans alanında kullanıldığından, bunun aşağıdaki sütunlara sahip bir işlemler (işlemler) tablosu olduğunu varsayacağız: zaman (milisaniye cinsinden süre), sym (borsadaki şirket adı - IBM, AAPL,…), fiyat (hisselerin satın alındığı fiyat), büyüklük (işlemin büyüklüğü). 25 milisaniyelik aralık keyfidir; ne çok küçük ne de çok uzundur. Varlığı, verilerin hizmete zaten ara belleğe alınmış olarak geldiği anlamına gelir. Mevcut yüke bağlı olarak dinamik ara belleğe alma da dahil olmak üzere hizmet tarafında arabelleğe alma uygulamak kolay olacaktır, ancak basitlik açısından sabit bir aralığa odaklanacağız.

Hizmetin, sym sütunundan gelen her sembol için her dakikayı bir dizi toplama işlevi (maksimum fiyat, ortalama fiyat, toplam boyut vb.) sayması gerekir. kullanışlı bilgi. Basitlik açısından, tüm fonksiyonların artımlı olarak hesaplanabileceğini varsayacağız; yeni bir değer elde etmek için iki sayıyı (eski ve gelen değerler) bilmek yeterlidir. Örneğin, max, ortalama, toplam işlevleri bu özelliğe sahiptir, ancak medyan işlevi yoktur.

Ayrıca gelen veri akışının zaman sıralı olduğunu da varsayacağız. Bu bize sadece son dakikayla çalışma fırsatı verecek. Uygulamada bazı güncellemelerin geç gelmesi durumunda güncel ve önceki dakikalarla çalışabilmek yeterlidir. Basitlik açısından bu durumu dikkate almayacağız.

Toplama işlevleri

Gerekli toplama işlevleri aşağıda listelenmiştir. Hizmetin yükünü artırmak için mümkün olduğunca çoğunu aldım:

  • yüksek – maksimum fiyat – dakika başına maksimum fiyat.
  • düşük – minimum fiyat – dakika başına minimum fiyat.
  • ilkFiyat – ilk fiyat – dakika başına ilk fiyat.
  • lastPrice – son fiyat – dakika başına son fiyat.
  • FirstSize – ilk boyut – dakika başına ilk işlem boyutu.
  • lastSize – son boyut — bir dakika içindeki son işlem boyutu.
  • numTrades – count i – dakikadaki işlem sayısı.
  • hacim – toplam büyüklük – dakika başına işlem büyüklüklerinin toplamı.
  • pvolume – toplam fiyat – avgPrice için gerekli olan dakika başına fiyatların toplamı.
  • – toplam ciro fiyatı*büyüklük – dakika başına toplam işlem hacmi.
  • avgPrice – pvolume%numTrades – dakika başına ortalama fiyat.
  • avgSize – hacim%numTrades – dakika başına ortalama işlem boyutu.
  • vwap – ciro%hacim – işlem boyutuna göre ağırlıklandırılmış dakika başına ortalama fiyat.
  • cumVolume – toplam hacim – tüm zaman boyunca işlemlerin birikmiş boyutu.

Hemen açık olmayan bir noktayı tartışalım - bu sütunların ilk kez ve sonraki her dakika için nasıl başlatılacağı. FirstPrice türünün bazı sütunlarının her seferinde null olarak başlatılması gerekir; değerleri tanımsızdır. Diğer birim türleri her zaman 0'a ayarlanmalıdır. Ayrıca birleşik bir yaklaşım gerektiren sütunlar da vardır - örneğin, cumVolume önceki dakikadan kopyalanmalı ve ilki için 0'a ayarlanmalıdır. Tüm bu parametreleri sözlük verilerini kullanarak ayarlayalım. tür (bir kayda benzer):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Kolaylık sağlamak için sözlüğe sym ve time ekledim, şimdi initWith son toplu tablodan hazır bir satırdır ve burada doğru sym ve zamanı ayarlamaya devam eder. Bir tabloya yeni satırlar eklemek için bunu kullanabilirsiniz.

Bir toplama işlevi oluştururken aggCols'a ihtiyacımız olacak. Q'daki ifadelerin değerlendirilme sırası (sağdan sola) nedeniyle listenin ters çevrilmesi gerekir. Bazı sütunlar öncekilere bağlı olduğundan, amaç hesaplamanın yüksekten cumVolume'a doğru gitmesini sağlamaktır.

Bir önceki dakikadan yeni bir dakikaya kopyalanması gereken sütunlar, kolaylık olması açısından sym sütunu eklenmiştir:

rollColumns:`sym`cumVolume;

Şimdi sütunları nasıl güncellenmesi gerektiğine göre gruplara ayıralım. Üç tür ayırt edilebilir:

  1. Akümülatörler (hacim, ciro,..) – gelen değeri bir öncekine eklemeliyiz.
  2. Özel bir nokta ile (yüksek, düşük, ..) – dakikadaki ilk değer gelen verilerden alınır, geri kalanı fonksiyon kullanılarak hesaplanır.
  3. Dinlenmek. Her zaman bir fonksiyon kullanılarak hesaplanır.

Bu sınıflar için değişkenleri tanımlayalım:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Hesaplama sırası

Toplu tabloyu iki aşamada güncelleyeceğiz. Verimlilik adına, öncelikle gelen tabloyu her karakter ve dakika için yalnızca bir satır olacak şekilde küçültüyoruz. Tüm fonksiyonlarımızın artımlı ve ilişkisel olması, bu ek adımın sonucunun değişmeyeceğini garanti eder. Select'i kullanarak tabloyu küçültebilirsiniz:

select high:max price, low:min price … by sym,time.minute from table

Bu yöntemin bir dezavantajı vardır - hesaplanan sütunlar kümesi önceden tanımlanmıştır. Neyse ki Q'da select, dinamik olarak oluşturulan argümanları değiştirebileceğiniz bir işlev olarak da uygulanır:

?[table;whereClause;byClause;selectClause]

Argümanların formatını ayrıntılı olarak açıklamayacağım; bizim durumumuzda sadece by ve select ifadeleri önemsiz olmayacak ve sütunlar!ifadeler formunun sözlükleri olmalıdır. Böylece küçültme fonksiyonu şu şekilde tanımlanabilir:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Açıklık sağlamak için, Q ifadeli bir dizeyi eval işlevine aktarılabilecek ve işlev seçiminde gerekli olan bir değere dönüştüren ayrıştırma işlevini kullandım. Ayrıca ön işlemin, seçme fonksiyonunun bir yansıması (yani, kısmen tanımlanmış argümanlara sahip bir fonksiyon) olarak tanımlandığını, bir argümanın (tablo) eksik olduğunu unutmayın. Bir tabloya önişlem uygularsak sıkıştırılmış bir tablo elde ederiz.

İkinci aşama, toplu tablonun güncellenmesidir. Önce algoritmayı sözde kodla yazalım:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Q'da döngüler yerine harita/azaltma işlevlerini kullanmak yaygındır. Ancak Q bir vektör dili olduğundan ve tüm işlemleri tüm sembollere aynı anda kolayca uygulayabildiğimizden, ilk yaklaşımda, hiçbir döngü olmadan tüm semboller üzerinde işlemleri aynı anda gerçekleştirerek yapabiliriz:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Ancak daha da ileri gidebiliriz; Q'nun benzersiz ve son derece güçlü bir operatörü vardır: genelleştirilmiş atama operatörü. Endekslerin, işlevlerin ve argümanların bir listesini kullanarak karmaşık bir veri yapısındaki bir dizi değeri değiştirmenize olanak tanır. Bizim durumumuzda şöyle görünüyor:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Ne yazık ki, bir tabloya atama yapmak için sütunların değil satırların bir listesine ihtiyacınız vardır ve çevirme işlevini kullanarak matrisi (sütun listesinden satır listesine) aktarmanız gerekir. Bu, büyük bir tablo için pahalıdır, bu nedenle harita işlevini (kesme işaretine benzeyen) kullanarak her sütuna ayrı ayrı genelleştirilmiş bir atama uygularız:

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Yine fonksiyon projeksiyonunu kullanıyoruz. Ayrıca Q'da liste oluşturmanın da bir fonksiyon olduğunu ve listelerin bir listesini almak için her(map) fonksiyonunu kullanarak onu çağırabileceğimizi unutmayın.

Hesaplanan sütun kümesinin sabit olmadığından emin olmak için yukarıdaki ifadeyi dinamik olarak oluşturacağız. Öncelikle, toplanan ve giriş verilerine atıfta bulunmak için satır ve inp değişkenlerini kullanarak her bir sütunu hesaplamak için işlevler tanımlayalım:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Bazı sütunlar özeldir; ilk değerleri fonksiyon tarafından hesaplanmamalıdır. Bunun ilk olduğunu satır[`numTrades] sütununa göre belirleyebiliriz - eğer 0 içeriyorsa değer ilk olur. Q'nun, ilk argümandaki duruma bağlı olarak liste 1 veya 2'den bir değer seçen - ?[Boolean list;list1;list2] - seçme işlevi vardır:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Burada fonksiyonumla (küme parantezleri içindeki bir ifade) genelleştirilmiş bir atama çağırdım. Geçerli değeri (ilk argüman) ve 4. parametrede ilettiğim ek bir argümanı alır.

İşlevi onlar için aynı olduğundan pil hoparlörlerini ayrı ayrı ekleyelim:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Bu, Q standartlarına göre normal bir atamadır ancak ben hemen bir değerler listesi atıyorum. Son olarak ana fonksiyonu oluşturalım:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Bu ifade ile yukarıda verdiğim ifadeyi içeren bir string'den dinamik olarak bir fonksiyon oluşturuyorum. Sonuç şöyle görünecek:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Sütun değerlendirme sırası ters çevrilmiştir çünkü Q'da değerlendirme sırası sağdan sola doğrudur.

Artık hesaplamalar için gerekli olan iki ana fonksiyonumuz var, sadece biraz altyapı eklememiz gerekiyor ve hizmet hazır.

Son adımlar

Tüm işi yapan ön işleme ve güncellemeAgg işlevlerimiz var. Ancak dakikalar arasında doğru geçişin sağlanması ve toplama için endekslerin hesaplanması yine de gereklidir. Öncelikle init fonksiyonunu tanımlayalım:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Ayrıca mevcut dakikayı değiştirecek yuvarlama fonksiyonunu da tanımlayacağız:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Yeni karakterler eklemek için bir fonksiyona ihtiyacımız olacak:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Ve son olarak, istemci tarafından veri eklemek için çağrılan upd işlevi (Q hizmetleri için bu işlevin geleneksel adı):

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Bu kadar. İşte söz verdiğimiz gibi hizmetimizin tam kodu, sadece birkaç satır:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Test

Hizmetin performansını kontrol edelim. Bunu yapmak için ayrı bir işlemde çalıştıralım (kodu service.q dosyasına koyalım) ve init fonksiyonunu çağıralım:

q service.q –p 5566

q)init[]

Başka bir konsolda ikinci Q işlemini başlatın ve birinciye bağlanın:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Öncelikle 10000 adetlik bir sembol listesi oluşturalım ve rastgele bir tablo oluşturacak bir fonksiyon ekleyelim. İkinci konsolda:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Tabloda aramayı kolaylaştırmak için listeye üç gerçek sembol ekledim. Rnd işlevi, zamanın t ila t+25 milisaniye arasında değiştiği, n satırlı rastgele bir tablo oluşturur.

Artık hizmete veri göndermeyi deneyebilirsiniz (ilk on saati ekleyin):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Tablonun güncellendiğini hizmette kontrol edebilirsiniz:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Sonuç:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Şimdi servisin dakikada ne kadar veri işleyebildiğini öğrenmek için yük testi yapalım. Güncelleme aralığını 25 milisaniyeye ayarladığımızı hatırlatayım. Buna göre, kullanıcılara veri istemeleri için zaman tanımak amacıyla hizmetin (ortalama olarak) güncelleme başına en az 20 milisaniyeye sığması gerekiyor. İkinci işleme aşağıdakileri girin:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 iki dakikadır. Her 1000 milisaniyede bir 25 satır boyunca ilk önce koşmayı deneyebilirsiniz:

start 1000

Benim durumumda sonuç, güncelleme başına birkaç milisaniye civarındadır. Bu yüzden satır sayısını hemen 10.000'e çıkaracağım:

start 10000

Sonuç:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Yine özel bir şey yok ama bu dakikada 24 milyon satır, saniyede 400 bin satır. 25 milisaniyeden fazla bir süre boyunca güncelleme yalnızca 5 kez yavaşladı; görünüşe göre dakika değiştiğinde. 100.000'e çıkaralım:

start 100000

Sonuç:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Gördüğünüz gibi hizmet zorlukla başa çıkabiliyor ancak yine de ayakta kalmayı başarıyor. Böyle bir veri hacmi (dakikada 240 milyon satır) son derece büyüktür; bu gibi durumlarda, her biri karakterlerin yalnızca bir kısmını işleyen hizmetin birkaç klonunun (hatta düzinelerce klonunun) başlatılması yaygındır. Yine de sonuç, öncelikle veri depolamaya odaklanan yorumlanmış bir dil için etkileyici.

Her güncellemenin boyutuyla birlikte zamanın neden doğrusal olmayan bir şekilde büyüdüğü sorusu ortaya çıkabilir. Bunun nedeni, küçültme işlevinin aslında bir C işlevi olmasıdır ve bu, updateAgg'den çok daha verimlidir. Belirli bir güncelleme boyutundan (yaklaşık 10.000) başlayarak, updateAgg tavanına ulaşır ve ardından yürütme süresi güncelleme boyutuna bağlı değildir. Hizmetin bu kadar büyük miktarda veriyi sindirebilmesi, Q ön adımı sayesinde mümkün olmaktadır. Bu, büyük verilerle çalışırken doğru algoritmayı seçmenin ne kadar önemli olduğunu vurgulamaktadır. Bir diğer nokta ise verilerin hafızaya doğru şekilde saklanmasıdır. Veriler sütun halinde depolanmasaydı veya zamana göre sıralanmasaydı, TLB önbellek kaçırması gibi bir şeye - işlemci adresi önbelleğinde bir bellek sayfası adresinin bulunmamasına - aşina olurduk. Bir adresin aranması, başarısız olması durumunda yaklaşık 30 kat daha uzun sürer ve veriler dağınıksa hizmetin birkaç kez yavaşlamasına neden olabilir.

Sonuç

Bu makalede, KDB+ ve Q veritabanının yalnızca büyük verileri depolamak ve seçme yoluyla bu verilere kolayca erişmek için değil, aynı zamanda yüz milyonlarca satırı/gigabayt veriyi en üst düzeyde bile sindirebilen veri işleme hizmetleri oluşturmak için de uygun olduğunu gösterdim. tek bir Q süreci. Q dilinin kendisi, vektör yapısı, yerleşik SQL lehçesi yorumlayıcısı ve çok başarılı kütüphane işlevleri kümesi nedeniyle veri işlemeyle ilgili algoritmaların son derece kısa ve verimli bir şekilde uygulanmasına olanak tanır.

Yukarıdakilerin Q'nun yapabileceklerinin sadece bir kısmı olduğunu, başka benzersiz özelliklerinin de olduğunu not edeceğim. Örneğin, bireysel Q işlemleri arasındaki sınırı silen ve bu işlemlerin yüzlercesini, dünyanın farklı yerlerindeki düzinelerce sunucuda bulunabilen tek bir ağda birleştirmenize olanak tanıyan son derece basit bir IPC protokolü.

Kaynak: habr.com

Yorum ekle