Χαρακτηριστικά της γλώσσας Q και KDB+ χρησιμοποιώντας το παράδειγμα μιας υπηρεσίας σε πραγματικό χρόνο

Μπορείτε να διαβάσετε ποια είναι η βάση KDB+, η γλώσσα προγραμματισμού Q, ποια είναι τα δυνατά και τα αδύνατα σημεία τους στο προηγούμενο μου άρθρο και εν συντομία στην εισαγωγή. Στο άρθρο, θα εφαρμόσουμε μια υπηρεσία στο Q που θα επεξεργάζεται την εισερχόμενη ροή δεδομένων και θα υπολογίζει διάφορες συναρτήσεις συνάθροισης κάθε λεπτό σε λειτουργία "πραγματικό χρόνο" (δηλαδή, θα έχει χρόνο να υπολογίσει τα πάντα πριν από το επόμενο τμήμα δεδομένων). Το κύριο χαρακτηριστικό του Q είναι ότι είναι μια διανυσματική γλώσσα που σας επιτρέπει να λειτουργείτε όχι με μεμονωμένα αντικείμενα, αλλά με τους πίνακες, τους πίνακες πινάκων και άλλα πολύπλοκα αντικείμενα. Γλώσσες όπως η Q και οι συγγενείς της K, J, APL φημίζονται για τη συντομία τους. Συχνά, ένα πρόγραμμα που καταλαμβάνει πολλές οθόνες κώδικα σε μια γνωστή γλώσσα όπως η Java μπορεί να γραφτεί σε αυτές σε λίγες γραμμές. Αυτό θέλω να δείξω σε αυτό το άρθρο.

Χαρακτηριστικά της γλώσσας Q και KDB+ χρησιμοποιώντας το παράδειγμα μιας υπηρεσίας σε πραγματικό χρόνο

Εισαγωγή

Το KDB+ είναι μια στήλη βάσης δεδομένων που επικεντρώνεται σε πολύ μεγάλους όγκους δεδομένων, ταξινομημένων με συγκεκριμένο τρόπο (κυρίως με βάση το χρόνο). Χρησιμοποιείται κυρίως σε χρηματοπιστωτικά ιδρύματα - τράπεζες, επενδυτικά κεφάλαια, ασφαλιστικές εταιρείες. Η γλώσσα Q είναι η εσωτερική γλώσσα του KDB+ που σας επιτρέπει να εργάζεστε αποτελεσματικά με αυτά τα δεδομένα. Η ιδεολογία Q είναι η συντομία και η αποτελεσματικότητα, ενώ η σαφήνεια θυσιάζεται. Αυτό δικαιολογείται από το γεγονός ότι η γλώσσα του διανύσματος θα είναι δυσνόητη σε κάθε περίπτωση και η συντομία και ο πλούτος της εγγραφής σάς επιτρέπει να βλέπετε ένα πολύ μεγαλύτερο μέρος του προγράμματος σε μία οθόνη, κάτι που τελικά καθιστά ευκολότερη την κατανόηση.

Σε αυτό το άρθρο υλοποιούμε ένα πλήρες πρόγραμμα στο Q και ίσως θελήσετε να το δοκιμάσετε. Για να το κάνετε αυτό, θα χρειαστείτε το πραγματικό Q. Μπορείτε να κάνετε λήψη της δωρεάν έκδοσης 32 bit στον ιστότοπο της εταιρείας kx - www.kx.com. Εκεί, αν σας ενδιαφέρει, θα βρείτε πληροφορίες αναφοράς για το Q, το βιβλίο Q Για Θνητούς και διάφορα άρθρα για αυτό το θέμα.

Δήλωση προβλήματος

Υπάρχει μια πηγή που στέλνει έναν πίνακα με δεδομένα κάθε 25 χιλιοστά του δευτερολέπτου. Δεδομένου ότι το KDB+ χρησιμοποιείται κυρίως στα χρηματοοικονομικά, θα υποθέσουμε ότι αυτός είναι ένας πίνακας συναλλαγών (συναλλαγές), ο οποίος έχει τις ακόλουθες στήλες: χρόνος (χρόνος σε χιλιοστά του δευτερολέπτου), sym (ονομασία εταιρείας στο χρηματιστήριο - IBM, AAPL,…), τιμή (η τιμή στην οποία αγοράστηκαν οι μετοχές), μέγεθος (μέγεθος της συναλλαγής). Το διάστημα των 25 χιλιοστών του δευτερολέπτου είναι αυθαίρετο, όχι πολύ μικρό και όχι πολύ μεγάλο. Η παρουσία του σημαίνει ότι τα δεδομένα έρχονται στην υπηρεσία ήδη αποθηκευμένα στην προσωρινή μνήμη. Θα ήταν εύκολο να εφαρμοστεί η προσωρινή αποθήκευση στην πλευρά της υπηρεσίας, συμπεριλαμβανομένης της δυναμικής αποθήκευσης, ανάλογα με το τρέχον φορτίο, αλλά για λόγους απλότητας, θα επικεντρωθούμε σε ένα σταθερό διάστημα.

Η υπηρεσία πρέπει να μετράει κάθε λεπτό για κάθε εισερχόμενο σύμβολο από τη στήλη sym ένα σύνολο συναρτήσεων συγκέντρωσης - μέγιστη τιμή, μέση τιμή, μέγεθος αθροίσματος κ.λπ. ΧΡΗΣΙΜΕΣ ΠΛΗΡΟΦΟΡΙΕΣ. Για απλότητα, θα υποθέσουμε ότι όλες οι συναρτήσεις μπορούν να υπολογιστούν σταδιακά, δηλ. για να αποκτήσετε μια νέα τιμή, αρκεί να γνωρίζετε δύο αριθμούς - την παλιά και τις εισερχόμενες τιμές. Για παράδειγμα, οι συναρτήσεις max, average, sum έχουν αυτή την ιδιότητα, αλλά η διάμεσος συνάρτηση όχι.

Θα υποθέσουμε επίσης ότι η εισερχόμενη ροή δεδομένων είναι χρονικά διατεταγμένη. Αυτό θα μας δώσει την ευκαιρία να δουλέψουμε μόνο με την τελευταία στιγμή. Στην πράξη, αρκεί να μπορείς να δουλεύεις με τα τρέχοντα και τα προηγούμενα λεπτά σε περίπτωση που καθυστερήσουν κάποιες ενημερώσεις. Για λόγους απλότητας, δεν θα εξετάσουμε αυτήν την περίπτωση.

Συναρτήσεις συγκέντρωσης

Οι απαιτούμενες συναρτήσεις συγκέντρωσης παρατίθενται παρακάτω. Πήρα όσο το δυνατόν περισσότερα από αυτά για να αυξήσω το φορτίο στην υπηρεσία:

  • υψηλή – μέγιστη τιμή – μέγιστη τιμή ανά λεπτό.
  • χαμηλή – ελάχιστη τιμή – ελάχιστη τιμή ανά λεπτό.
  • firstPrice – πρώτη τιμή – πρώτη τιμή ανά λεπτό.
  • lastPrice – τελευταία τιμή – τελευταία τιμή ανά λεπτό.
  • firstSize – πρώτο μέγεθος – πρώτο μέγεθος συναλλαγής ανά λεπτό.
  • lastSize – τελευταίο μέγεθος – τελευταίο μέγεθος συναλλαγής σε ένα λεπτό.
  • numTrades – count i – αριθμός συναλλαγών ανά λεπτό.
  • όγκος – μέγεθος αθροίσματος – άθροισμα μεγεθών συναλλαγών ανά λεπτό.
  • pvolume – αθροιστική τιμή – άθροισμα τιμών ανά λεπτό, που απαιτείται για τη μέσηΤιμή.
  • – αθροιστική τιμή κύκλου εργασιών*μέγεθος – συνολικός όγκος συναλλαγών ανά λεπτό.
  • avgPrice – pvolume%numTrades – μέση τιμή ανά λεπτό.
  • avgSize – volume%numTrades – μέσο μέγεθος συναλλαγών ανά λεπτό.
  • vwap – κύκλος εργασιών%όγκος – μέση τιμή ανά λεπτό σταθμισμένη με βάση το μέγεθος της συναλλαγής.
  • cumVolume – αθροιστικός όγκος – συσσωρευμένο μέγεθος συναλλαγών καθ’ όλη τη διάρκεια του χρόνου.

Ας συζητήσουμε αμέσως ένα μη προφανές σημείο - πώς να αρχικοποιήσετε αυτές τις στήλες για πρώτη φορά και για κάθε επόμενο λεπτό. Ορισμένες στήλες του τύπου firstPrice πρέπει να αρχικοποιούνται ως μηδενικές κάθε φορά· η τιμή τους δεν είναι καθορισμένη. Οι άλλοι τύποι τόμου πρέπει πάντα να ορίζονται στο 0. Υπάρχουν επίσης στήλες που απαιτούν συνδυαστική προσέγγιση - για παράδειγμα, το cumVolume πρέπει να αντιγραφεί από το προηγούμενο λεπτό και για τον πρώτο να οριστεί στο 0. Ας ορίσουμε όλες αυτές τις παραμέτρους χρησιμοποιώντας τα δεδομένα του λεξικού τύπος (ανάλογος με μια εγγραφή):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Πρόσθεσα sym και χρόνο στο λεξικό για ευκολία, τώρα το initWith είναι μια έτοιμη γραμμή από τον τελικό συγκεντρωτικό πίνακα, όπου απομένει να ορίσετε το σωστό σύμβολο και την ώρα. Μπορείτε να το χρησιμοποιήσετε για να προσθέσετε νέες σειρές σε έναν πίνακα.

Θα χρειαστούμε aggCols κατά τη δημιουργία μιας συνάρτησης συγκέντρωσης. Η λίστα πρέπει να αντιστραφεί λόγω της σειράς με την οποία αξιολογούνται οι εκφράσεις στο Q (από δεξιά προς τα αριστερά). Ο στόχος είναι να διασφαλιστεί ότι ο υπολογισμός πηγαίνει από υψηλό σε cumVolume, καθώς ορισμένες στήλες εξαρτώνται από προηγούμενες.

Στήλες που πρέπει να αντιγραφούν σε ένα νέο λεπτό από το προηγούμενο, η στήλη sym προστίθεται για ευκολία:

rollColumns:`sym`cumVolume;

Τώρα ας χωρίσουμε τις στήλες σε ομάδες ανάλογα με τον τρόπο ενημέρωσης τους. Τρεις τύποι διακρίνονται:

  1. Συσσωρευτές (όγκος, κύκλος εργασιών,..) – πρέπει να προσθέσουμε την εισερχόμενη αξία στην προηγούμενη.
  2. Με ένα ειδικό σημείο (υψηλό, χαμηλό, ..) - η πρώτη τιμή στο λεπτό λαμβάνεται από τα εισερχόμενα δεδομένα, τα υπόλοιπα υπολογίζονται χρησιμοποιώντας τη συνάρτηση.
  3. Υπόλοιπο. Υπολογίζεται πάντα χρησιμοποιώντας μια συνάρτηση.

Ας ορίσουμε μεταβλητές για αυτές τις κλάσεις:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Σειρά υπολογισμού

Θα ενημερώσουμε τον συγκεντρωτικό πίνακα σε δύο στάδια. Για αποτελεσματικότητα, πρώτα συρρικνώνουμε τον εισερχόμενο πίνακα έτσι ώστε να υπάρχει μόνο μία σειρά για κάθε χαρακτήρα και λεπτό. Το γεγονός ότι όλες οι λειτουργίες μας είναι αυξητικές και συνειρμικές εγγυάται ότι το αποτέλεσμα αυτού του πρόσθετου βήματος δεν θα αλλάξει. Θα μπορούσατε να συρρικνώσετε τον πίνακα χρησιμοποιώντας επιλέξτε:

select high:max price, low:min price … by sym,time.minute from table

Αυτή η μέθοδος έχει ένα μειονέκτημα - το σύνολο των υπολογιζόμενων στηλών είναι προκαθορισμένο. Ευτυχώς, στο Q, το select υλοποιείται επίσης ως συνάρτηση όπου μπορείτε να αντικαταστήσετε ορίσματα που δημιουργούνται δυναμικά:

?[table;whereClause;byClause;selectClause]

Δεν θα περιγράψω λεπτομερώς τη μορφή των ορισμάτων· στην περίπτωσή μας, μόνο οι εκφράσεις by και select θα είναι μη τετριμμένες και θα πρέπει να είναι λεξικά των στηλών φόρμας!expressions. Έτσι, η συνάρτηση συρρίκνωσης μπορεί να οριστεί ως εξής:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Για λόγους σαφήνειας, χρησιμοποίησα τη συνάρτηση ανάλυσης, η οποία μετατρέπει μια συμβολοσειρά με έκφραση Q σε τιμή που μπορεί να περάσει στη συνάρτηση eval και η οποία απαιτείται στη συνάρτηση επιλογής. Σημειώστε επίσης ότι η προεπεξεργασία ορίζεται ως προβολή (δηλαδή, μια συνάρτηση με μερικώς καθορισμένα ορίσματα) της συνάρτησης Select, λείπει ένα όρισμα (ο πίνακας). Εάν εφαρμόσουμε προεπεξεργασία σε έναν πίνακα, θα έχουμε έναν συμπιεσμένο πίνακα.

Το δεύτερο στάδιο είναι η ενημέρωση του συγκεντρωτικού πίνακα. Ας γράψουμε πρώτα τον αλγόριθμο σε ψευδοκώδικα:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

Στο Q, είναι σύνηθες να χρησιμοποιούνται συναρτήσεις χάρτη/μείωσης αντί για βρόχους. Αλλά επειδή το Q είναι μια διανυσματική γλώσσα και μπορούμε εύκολα να εφαρμόσουμε όλες τις πράξεις σε όλα τα σύμβολα ταυτόχρονα, τότε σε μια πρώτη προσέγγιση μπορούμε να κάνουμε χωρίς καθόλου βρόχο, εκτελώντας λειτουργίες σε όλα τα σύμβολα ταυτόχρονα:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Αλλά μπορούμε να προχωρήσουμε παραπέρα, το Q έχει έναν μοναδικό και εξαιρετικά ισχυρό τελεστή - τον γενικευμένο τελεστή ανάθεσης. Σας επιτρέπει να αλλάξετε ένα σύνολο τιμών σε μια σύνθετη δομή δεδομένων χρησιμοποιώντας μια λίστα δεικτών, συναρτήσεων και ορισμάτων. Στην περίπτωσή μας μοιάζει με αυτό:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Δυστυχώς, για να αντιστοιχίσετε σε έναν πίνακα χρειάζεστε μια λίστα γραμμών, όχι στηλών, και πρέπει να μεταφέρετε τη μήτρα (λίστα στηλών στη λίστα γραμμών) χρησιμοποιώντας τη συνάρτηση αναστροφής. Αυτό είναι ακριβό για έναν μεγάλο πίνακα, επομένως αντ 'αυτού εφαρμόζουμε μια γενικευμένη ανάθεση σε κάθε στήλη ξεχωριστά, χρησιμοποιώντας τη συνάρτηση χάρτη (η οποία μοιάζει με απόστροφο):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Χρησιμοποιούμε ξανά την προβολή συνάρτησης. Σημειώστε επίσης ότι στο Q, η δημιουργία λίστας είναι επίσης μια συνάρτηση και μπορούμε να την καλέσουμε χρησιμοποιώντας τη συνάρτηση κάθε(χάρτη) για να λάβουμε μια λίστα λιστών.

Για να διασφαλίσουμε ότι το σύνολο των υπολογιζόμενων στηλών δεν είναι σταθερό, θα δημιουργήσουμε την παραπάνω έκφραση δυναμικά. Ας ορίσουμε πρώτα συναρτήσεις για τον υπολογισμό κάθε στήλης, χρησιμοποιώντας τις μεταβλητές row και inp για αναφορά στα συγκεντρωτικά και δεδομένα εισόδου:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Ορισμένες στήλες είναι ειδικές· η πρώτη τους τιμή δεν πρέπει να υπολογίζεται από τη συνάρτηση. Μπορούμε να προσδιορίσουμε ότι είναι το πρώτο από τη στήλη [`numTrades] - αν περιέχει 0, τότε η τιμή είναι πρώτη. Το Q έχει μια συνάρτηση επιλογής - ?[Boolean list;list1;list2] - η οποία επιλέγει μια τιμή από τη λίστα 1 ή 2 ανάλογα με τη συνθήκη στο πρώτο όρισμα:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Εδώ κάλεσα μια γενικευμένη ανάθεση με τη συνάρτησή μου (μια έκφραση σε σγουρά άγκιστρα). Λαμβάνει την τρέχουσα τιμή (το πρώτο όρισμα) και ένα επιπλέον όρισμα, το οποίο περνάω στην 4η παράμετρο.

Ας προσθέσουμε ξεχωριστά ηχεία μπαταρίας, καθώς η λειτουργία είναι ίδια για αυτά:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Αυτή είναι μια κανονική εκχώρηση με τα πρότυπα Q, αλλά εκχωρώ μια λίστα τιμών αμέσως. Τέλος, ας δημιουργήσουμε την κύρια συνάρτηση:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Με αυτήν την έκφραση, δημιουργώ δυναμικά μια συνάρτηση από μια συμβολοσειρά που περιέχει την έκφραση που έδωσα παραπάνω. Το αποτέλεσμα θα μοιάζει με αυτό:

{[aggTable;idx;inp] rows:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Η σειρά αξιολόγησης της στήλης αντιστρέφεται επειδή στο Q η σειρά αξιολόγησης είναι από δεξιά προς τα αριστερά.

Τώρα έχουμε δύο βασικές λειτουργίες που είναι απαραίτητες για τους υπολογισμούς, πρέπει απλώς να προσθέσουμε λίγη υποδομή και η υπηρεσία είναι έτοιμη.

Τελικά βήματα

Έχουμε συναρτήσεις προεπεξεργασίας και ενημέρωσης Agg που κάνουν όλη τη δουλειά. Ωστόσο, εξακολουθεί να είναι απαραίτητο να διασφαλιστεί η σωστή μετάβαση στα λεπτά και να υπολογιστούν οι δείκτες για τη συγκέντρωση. Πρώτα απ 'όλα, ας ορίσουμε τη συνάρτηση init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Θα ορίσουμε επίσης τη συνάρτηση roll, η οποία θα αλλάξει το τρέχον λεπτό:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Θα χρειαστούμε μια συνάρτηση για να προσθέσουμε νέους χαρακτήρες:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

Και τέλος, η συνάρτηση upd (το παραδοσιακό όνομα αυτής της συνάρτησης για τις υπηρεσίες Q), η οποία καλείται από τον πελάτη για να προσθέσει δεδομένα:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Αυτό είναι όλο. Εδώ είναι ο πλήρης κωδικός της υπηρεσίας μας, όπως υποσχεθήκαμε, λίγες μόνο γραμμές:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst_0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Δοκιμές

Ας ελέγξουμε την απόδοση της υπηρεσίας. Για να γίνει αυτό, ας το εκτελέσουμε σε μια ξεχωριστή διαδικασία (βάλτε τον κωδικό στο αρχείο service.q) και καλέστε τη συνάρτηση init:

q service.q –p 5566

q)init[]

Σε μια άλλη κονσόλα, ξεκινήστε τη δεύτερη διαδικασία Q και συνδεθείτε στην πρώτη:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Αρχικά, ας δημιουργήσουμε μια λίστα συμβόλων - 10000 κομμάτια και ας προσθέσουμε μια συνάρτηση για να δημιουργήσουμε έναν τυχαίο πίνακα. Στη δεύτερη κονσόλα:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Πρόσθεσα τρία πραγματικά σύμβολα στη λίστα για να διευκολυνθεί η αναζήτηση τους στον πίνακα. Η συνάρτηση rnd δημιουργεί έναν τυχαίο πίνακα με n σειρές, όπου ο χρόνος ποικίλλει από t έως t+25 χιλιοστά του δευτερολέπτου.

Τώρα μπορείτε να δοκιμάσετε να στείλετε δεδομένα στην υπηρεσία (προσθέστε τις πρώτες δέκα ώρες):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Μπορείτε να ελέγξετε στην υπηρεσία ότι ο πίνακας έχει ενημερωθεί:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Το αποτέλεσμα:

sym|time|high|low|firstPrice|lastPrice|firstSize|lastSize|numTrades|volume|pvolume|turnover|avgPrice|avgSize|vwap|cumVolume
--|--|--|--|--|--------------------------------
AAPL|09:27|9.258904|9.258904|9.258904|9.258904|8|8|1|8|9.258904|74.07123|9.258904|8|9.258904|2888
AAPL|09:28|9.068162|9.068162|9.068162|9.068162|7|7|1|7|9.068162|63.47713|9.068162|7|9.068162|2895
AAPL|09:31|4.680449|0.2011121|1.620827|0.2011121|1|5|4|14|9.569556|36.84342|2.392389|3.5|2.631673|2909
AAPL|09:33|2.812535|2.812535|2.812535|2.812535|6|6|1|6|2.812535|16.87521|2.812535|6|2.812535|2915
AAPL|09:34|5.099025|5.099025|5.099025|5.099025|4|4|1|4|5.099025|20.3961|5.099025|4|5.099025|2919

Ας πραγματοποιήσουμε τώρα δοκιμή φόρτωσης για να μάθουμε πόσα δεδομένα μπορεί να επεξεργαστεί η υπηρεσία ανά λεπτό. Να σας υπενθυμίσω ότι ορίσαμε το διάστημα ενημέρωσης στα 25 χιλιοστά του δευτερολέπτου. Αντίστοιχα, η υπηρεσία πρέπει (κατά μέσο όρο) να χωράει σε τουλάχιστον 20 χιλιοστά του δευτερολέπτου ανά ενημέρωση για να δίνει στους χρήστες χρόνο να ζητήσουν δεδομένα. Εισαγάγετε τα ακόλουθα στη δεύτερη διαδικασία:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 είναι δύο λεπτά. Μπορείτε να δοκιμάσετε να τρέχετε πρώτα για 1000 σειρές κάθε 25 χιλιοστά του δευτερολέπτου:

start 1000

Στην περίπτωσή μου, το αποτέλεσμα είναι περίπου μερικά χιλιοστά του δευτερολέπτου ανά ενημέρωση. Έτσι, θα αυξήσω αμέσως τον αριθμό των σειρών σε 10.000:

start 10000

Το αποτέλεσμα:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Και πάλι, τίποτα το ιδιαίτερο, αλλά αυτό είναι 24 εκατομμύρια γραμμές ανά λεπτό, 400 χιλιάδες ανά δευτερόλεπτο. Για περισσότερα από 25 χιλιοστά του δευτερολέπτου, η ενημέρωση επιβραδύνθηκε μόνο 5 φορές, προφανώς όταν άλλαξε το λεπτό. Ας αυξηθούμε σε 100.000:

start 100000

Το αποτέλεσμα:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Όπως μπορείτε να δείτε, η υπηρεσία μετά βίας μπορεί να αντεπεξέλθει, αλλά παρόλα αυτά καταφέρνει να παραμείνει στη ζωή. Ένας τέτοιος όγκος δεδομένων (240 εκατομμύρια σειρές ανά λεπτό) είναι εξαιρετικά μεγάλος· σε τέτοιες περιπτώσεις, είναι σύνηθες να εκκινούνται αρκετοί κλώνοι (ή ακόμα και δεκάδες κλώνοι) της υπηρεσίας, καθένας από τους οποίους επεξεργάζεται μόνο μέρος των χαρακτήρων. Ωστόσο, το αποτέλεσμα είναι εντυπωσιακό για μια ερμηνευτική γλώσσα που εστιάζει κυρίως στην αποθήκευση δεδομένων.

Μπορεί να προκύψει το ερώτημα γιατί ο χρόνος μεγαλώνει μη γραμμικά με το μέγεθος κάθε ενημέρωσης. Ο λόγος είναι ότι η συνάρτηση shrink είναι στην πραγματικότητα μια συνάρτηση C, η οποία είναι πολύ πιο αποτελεσματική από το updateAgg. Ξεκινώντας από ένα συγκεκριμένο μέγεθος ενημέρωσης (περίπου 10.000), το updateAgg φτάνει στο ανώτατο όριο και, στη συνέχεια, ο χρόνος εκτέλεσής του δεν εξαρτάται από το μέγεθος της ενημέρωσης. Λόγω του προκαταρκτικού βήματος Q η υπηρεσία είναι σε θέση να αφομοιώσει τέτοιους όγκους δεδομένων. Αυτό υπογραμμίζει πόσο σημαντικό είναι να επιλέγετε τον σωστό αλγόριθμο όταν εργάζεστε με μεγάλα δεδομένα. Ένα άλλο σημείο είναι η σωστή αποθήκευση δεδομένων στη μνήμη. Εάν τα δεδομένα δεν ήταν αποθηκευμένα σε στήλη ή δεν είχαν ταξινομηθεί με βάση το χρόνο, τότε θα εξοικειωνόμασταν με κάτι τέτοιο όπως η απώλεια της κρυφής μνήμης TLB - η απουσία διεύθυνσης σελίδας μνήμης στη μνήμη προσωρινής μνήμης διευθύνσεων του επεξεργαστή. Η αναζήτηση μιας διεύθυνσης διαρκεί περίπου 30 φορές περισσότερο εάν δεν είναι επιτυχής και εάν τα δεδομένα είναι διάσπαρτα, μπορεί να επιβραδύνει την υπηρεσία αρκετές φορές.

Συμπέρασμα

Σε αυτό το άρθρο, έδειξα ότι οι βάσεις δεδομένων KDB+ και Q είναι κατάλληλες όχι μόνο για την αποθήκευση μεγάλων δεδομένων και την εύκολη πρόσβαση σε αυτά μέσω επιλογής, αλλά και για τη δημιουργία υπηρεσιών επεξεργασίας δεδομένων που είναι ικανές να αφομοιώσουν εκατοντάδες εκατομμύρια σειρές/gigabyte δεδομένων ακόμη και σε μία μόνο διαδικασία Q. Η ίδια η γλώσσα Q επιτρέπει την εξαιρετικά συνοπτική και αποτελεσματική εφαρμογή αλγορίθμων που σχετίζονται με την επεξεργασία δεδομένων λόγω της διανυσματικής της φύσης, του ενσωματωμένου διερμηνέα διαλέκτου SQL και ενός πολύ επιτυχημένου συνόλου λειτουργιών βιβλιοθήκης.

Θα σημειώσω ότι τα παραπάνω είναι μόνο μέρος αυτού που μπορεί να κάνει η Q, έχει και άλλα μοναδικά χαρακτηριστικά. Για παράδειγμα, ένα εξαιρετικά απλό πρωτόκολλο IPC που διαγράφει τα όρια μεταξύ μεμονωμένων διεργασιών Q και σας επιτρέπει να συνδυάσετε εκατοντάδες από αυτές τις διεργασίες σε ένα ενιαίο δίκτυο, το οποίο μπορεί να βρίσκεται σε δεκάδες διακομιστές σε διαφορετικά μέρη του κόσμου.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο