Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο" Γεια σας, κάτοικοι του Khabro! Αυτό το βιβλίο είναι κατάλληλο για κάθε προγραμματιστή που θέλει να κατανοήσει την επεξεργασία νημάτων. Η κατανόηση του κατανεμημένου προγραμματισμού θα σας βοηθήσει να κατανοήσετε καλύτερα το Kafka και το Kafka Streams. Θα ήταν ωραίο να γνωρίζετε το ίδιο το πλαίσιο του Κάφκα, αλλά αυτό δεν είναι απαραίτητο: Θα σας πω όλα όσα χρειάζεστε. Οι έμπειροι προγραμματιστές του Kafka και οι αρχάριοι θα μάθουν πώς να δημιουργούν ενδιαφέρουσες εφαρμογές επεξεργασίας ροής χρησιμοποιώντας τη βιβλιοθήκη Kafka Streams σε αυτό το βιβλίο. Ενδιάμεσοι και προχωρημένοι προγραμματιστές Java που είναι ήδη εξοικειωμένοι με έννοιες όπως η σειριοποίηση θα μάθουν να εφαρμόζουν τις δεξιότητές τους για τη δημιουργία εφαρμογών Kafka Streams. Ο πηγαίος κώδικας του βιβλίου είναι γραμμένος σε Java 8 και κάνει σημαντική χρήση της σύνταξης έκφρασης λάμδα Java 8, επομένως θα σας φανεί χρήσιμο να γνωρίζετε πώς να δουλεύετε με συναρτήσεις λάμδα (ακόμη και σε άλλη γλώσσα προγραμματισμού).

Απόσπασμα. 5.3. Λειτουργίες συνάθροισης και παραθύρων

Σε αυτήν την ενότητα, θα προχωρήσουμε στην εξερεύνηση των πιο υποσχόμενων τμημάτων του Kafka Streams. Μέχρι στιγμής έχουμε καλύψει τις ακόλουθες πτυχές του Kafka Streams:

  • δημιουργία μιας τοπολογίας επεξεργασίας.
  • χρήση κατάστασης σε εφαρμογές ροής.
  • εκτέλεση συνδέσεων ροής δεδομένων.
  • διαφορές μεταξύ των ροών συμβάντων (KStream) και των ροών ενημέρωσης (KTable).

Στα ακόλουθα παραδείγματα θα φέρουμε όλα αυτά τα στοιχεία μαζί. Θα μάθετε επίσης για το παράθυρο, ένα άλλο εξαιρετικό χαρακτηριστικό των εφαρμογών ροής. Το πρώτο μας παράδειγμα θα είναι μια απλή συνάθροιση.

5.3.1. Συνάθροιση πωλήσεων μετοχών ανά κλάδο βιομηχανίας

Η συγκέντρωση και η ομαδοποίηση είναι ζωτικής σημασίας εργαλεία κατά την εργασία με δεδομένα ροής. Η εξέταση μεμονωμένων αρχείων όπως λαμβάνονται είναι συχνά ανεπαρκής. Για την εξαγωγή πρόσθετων πληροφοριών από δεδομένα, είναι απαραίτητο να ομαδοποιηθούν και να συνδυαστούν.

Σε αυτό το παράδειγμα, θα φορέσετε το κοστούμι ενός ημερήσιου εμπόρου που πρέπει να παρακολουθεί τον όγκο πωλήσεων των μετοχών εταιρειών σε διάφορους κλάδους. Συγκεκριμένα, ενδιαφέρεστε για τις πέντε εταιρείες με τις μεγαλύτερες πωλήσεις μετοχών σε κάθε κλάδο.

Μια τέτοια συγκέντρωση θα απαιτήσει τα ακόλουθα αρκετά βήματα για τη μετάφραση των δεδομένων στην επιθυμητή μορφή (μιλώντας με γενικούς όρους).

  1. Δημιουργήστε μια θεματική πηγή που δημοσιεύει ακατέργαστες πληροφορίες συναλλαγών μετοχών. Θα πρέπει να αντιστοιχίσουμε ένα αντικείμενο τύπου StockTransaction σε ένα αντικείμενο τύπου ShareVolume. Το θέμα είναι ότι το αντικείμενο StockTransaction περιέχει μεταδεδομένα πωλήσεων, αλλά χρειαζόμαστε μόνο δεδομένα σχετικά με τον αριθμό των μετοχών που πωλούνται.
  2. Ομαδοποιήστε δεδομένα ShareVolume κατά σύμβολο μετοχής. Αφού ομαδοποιηθούν ανά σύμβολο, μπορείτε να συμπτύξετε αυτά τα δεδομένα σε υποσύνολα των όγκων πωλήσεων μετοχών. Αξίζει να σημειωθεί ότι η μέθοδος KStream.groupBy επιστρέφει μια παρουσία του τύπου KGroupedStream. Και μπορείτε να λάβετε μια παρουσία KTable καλώντας περαιτέρω τη μέθοδο KGroupedStream.reduce.

Τι είναι η διεπαφή KGroupedStream

Οι μέθοδοι KStream.groupBy και KStream.groupByKey επιστρέφουν μια παρουσία του KGroupedStream. Το KGroupedStream είναι μια ενδιάμεση αναπαράσταση μιας ροής γεγονότων μετά από ομαδοποίηση κατά πλήκτρα. Δεν προορίζεται καθόλου για άμεση εργασία με αυτό. Αντίθετα, το KGroupedStream χρησιμοποιείται για λειτουργίες συγκέντρωσης, οι οποίες πάντα καταλήγουν σε έναν KTable. Και δεδομένου ότι το αποτέλεσμα των λειτουργιών συγκέντρωσης είναι ένας KTable και χρησιμοποιούν ένα κρατικό κατάστημα, είναι πιθανό να μην αποστέλλονται όλες οι ενημερώσεις ως αποτέλεσμα πιο κάτω.

Η μέθοδος KTable.groupBy επιστρέφει ένα παρόμοιο KGroupedTable - μια ενδιάμεση αναπαράσταση της ροής ενημερώσεων, που ομαδοποιείται εκ νέου κατά κλειδί.

Ας κάνουμε ένα μικρό διάλειμμα και ας δούμε το Σχ. 5.9, που δείχνει τι έχουμε πετύχει. Αυτή η τοπολογία θα πρέπει να σας είναι ήδη πολύ οικεία.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Ας δούμε τώρα τον κώδικα για αυτήν την τοπολογία (μπορεί να βρεθεί στο αρχείο src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Λίστα 5.2).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Ο συγκεκριμένος κώδικας διακρίνεται από τη συντομία του και τον μεγάλο όγκο ενεργειών που εκτελούνται σε πολλές γραμμές. Ενδέχεται να παρατηρήσετε κάτι νέο στην πρώτη παράμετρο της μεθόδου builder.stream: μια τιμή του τύπου enum AutoOffsetReset.EARLIEST (υπάρχει επίσης ένα LATEST), που ορίζεται χρησιμοποιώντας τη μέθοδο Consumed.withOffsetResetPolicy. Αυτός ο τύπος απαρίθμησης μπορεί να χρησιμοποιηθεί για τον καθορισμό μιας στρατηγικής επαναφοράς μετατόπισης για κάθε KStream ή KTable και έχει προτεραιότητα έναντι της επιλογής επαναφοράς μετατόπισης από τη διαμόρφωση.

GroupByKey και GroupBy

Η διεπαφή KStream έχει δύο μεθόδους για την ομαδοποίηση εγγραφών: GroupByKey και GroupBy. Και οι δύο επιστρέφουν έναν KGroupedTable, οπότε ίσως αναρωτιέστε ποια είναι η διαφορά μεταξύ τους και πότε να χρησιμοποιήσετε ποιο;

Η μέθοδος GroupByKey χρησιμοποιείται όταν τα κλειδιά στο KStream δεν είναι ήδη άδεια. Και το πιο σημαντικό, η σημαία "απαιτείται εκ νέου κατάτμηση" δεν ορίστηκε ποτέ.

Η μέθοδος GroupBy προϋποθέτει ότι έχετε αλλάξει τα κλειδιά ομαδοποίησης, επομένως η σημαία αναδιαμέρισης ορίζεται σε true. Η εκτέλεση ενώσεων, συναθροίσεων κ.λπ. μετά τη μέθοδο GroupBy θα έχει ως αποτέλεσμα την αυτόματη εκ νέου κατάτμηση.
Περίληψη: Όποτε είναι δυνατόν, θα πρέπει να χρησιμοποιείτε το GroupByKey αντί για το GroupBy.

Είναι ξεκάθαρο τι κάνουν οι μέθοδοι mapValues ​​και groupBy, οπότε ας ρίξουμε μια ματιά στη μέθοδο sum() (που βρίσκεται στο src/main/java/bbejeck/model/ShareVolume.java) (Λίστα 5.3).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Η μέθοδος ShareVolume.sum επιστρέφει το τρέχον σύνολο του όγκου πωλήσεων αποθεμάτων και το αποτέλεσμα ολόκληρης της αλυσίδας υπολογισμών είναι ένα αντικείμενο KTable . Τώρα καταλαβαίνετε τον ρόλο που παίζει το KTable. Όταν φτάνουν τα αντικείμενα ShareVolume, το αντίστοιχο αντικείμενο KTable αποθηκεύει την πιο πρόσφατη τρέχουσα ενημέρωση. Είναι σημαντικό να θυμάστε ότι όλες οι ενημερώσεις αντικατοπτρίζονται στον προηγούμενο shareVolumeKTable, αλλά δεν αποστέλλονται όλες περαιτέρω.

Στη συνέχεια, χρησιμοποιούμε αυτόν τον KTable για να συγκεντρώσουμε (ανά αριθμό μετοχών που διαπραγματεύονται) για να καταλήξουμε στις πέντε εταιρείες με τον υψηλότερο όγκο μετοχών που διαπραγματεύονται σε κάθε κλάδο. Οι ενέργειές μας σε αυτή την περίπτωση θα είναι παρόμοιες με αυτές της πρώτης συγκέντρωσης.

  1. Εκτελέστε μια άλλη λειτουργία groupBy για να ομαδοποιήσετε μεμονωμένα αντικείμενα ShareVolume ανά κλάδο.
  2. Ξεκινήστε τη σύνοψη των αντικειμένων ShareVolume. Αυτή τη φορά το αντικείμενο συγκέντρωσης είναι μια ουρά προτεραιότητας σταθερού μεγέθους. Σε αυτήν την ουρά σταθερού μεγέθους, διατηρούνται μόνο οι πέντε εταιρείες με τα μεγαλύτερα ποσά μετοχών που πωλήθηκαν.
  3. Αντιστοιχίστε τις ουρές από την προηγούμενη παράγραφο σε μια τιμή συμβολοσειράς και επιστρέψτε τις πέντε πρώτες μετοχές με τις περισσότερες συναλλαγές ανά αριθμό ανά κλάδο.
  4. Γράψτε τα αποτελέσματα σε μορφή συμβολοσειράς στο θέμα.

Στο Σχ. Το σχήμα 5.10 δείχνει το γράφημα τοπολογίας ροής δεδομένων. Όπως μπορείτε να δείτε, ο δεύτερος γύρος επεξεργασίας είναι αρκετά απλός.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Τώρα που έχουμε κατανοήσει ξεκάθαρα τη δομή αυτού του δεύτερου γύρου επεξεργασίας, μπορούμε να στραφούμε στον πηγαίο κώδικα του (θα τον βρείτε στο αρχείο src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Λίστα 5.4) .

Αυτός ο αρχικοποιητής περιέχει μια μεταβλητή fixedQueue. Αυτό είναι ένα προσαρμοσμένο αντικείμενο που είναι ένας προσαρμογέας για το java.util.TreeSet που χρησιμοποιείται για την παρακολούθηση των κορυφαίων N αποτελεσμάτων με φθίνουσα σειρά μετοχών που διαπραγματεύονται.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Έχετε ήδη δει τις κλήσεις groupBy και mapValues, επομένως δεν θα μπούμε σε αυτές (καλούμε τη μέθοδο KTable.toStream επειδή η μέθοδος KTable.print έχει καταργηθεί). Αλλά δεν έχετε δει ακόμα την έκδοση KTable του aggregate(), οπότε θα αφιερώσουμε λίγο χρόνο για να το συζητήσουμε.

Όπως θυμάστε, αυτό που κάνει το KTable διαφορετικό είναι ότι οι εγγραφές με τα ίδια κλειδιά θεωρούνται ενημερώσεις. Το KTable αντικαθιστά την παλιά καταχώρηση με μια νέα. Η συγκέντρωση πραγματοποιείται με παρόμοιο τρόπο: οι πιο πρόσφατες εγγραφές με το ίδιο κλειδί συγκεντρώνονται. Όταν φτάσει μια εγγραφή, προστίθεται στην παρουσία της κλάσης FixedSizePriorityQueue χρησιμοποιώντας έναν αθροιστή (δεύτερη παράμετρος στην κλήση συγκεντρωτικής μεθόδου), αλλά εάν υπάρχει ήδη άλλη εγγραφή με το ίδιο κλειδί, τότε η παλιά εγγραφή αφαιρείται χρησιμοποιώντας έναν αφαιρετήρα (τρίτη παράμετρος στο η κλήση της συγκεντρωτικής μεθόδου).

Όλα αυτά σημαίνουν ότι ο αθροιστής μας, FixedSizePriorityQueue, δεν συγκεντρώνει όλες τις τιμές με ένα κλειδί, αλλά αποθηκεύει ένα κινούμενο άθροισμα των ποσοτήτων των N τύπων μετοχών με τις περισσότερες συναλλαγές. Κάθε εισερχόμενη εγγραφή περιέχει τον συνολικό αριθμό των μετοχών που έχουν πωληθεί μέχρι στιγμής. Το KTable θα σας δώσει πληροφορίες σχετικά με τις μετοχές των εταιρειών που είναι αυτή τη στιγμή οι περισσότερες διαπραγματεύσεις, χωρίς να απαιτείται κυλιόμενη συγκέντρωση για κάθε ενημέρωση.

Μάθαμε να κάνουμε δύο σημαντικά πράγματα:

  • ομαδοποιήστε τις τιμές στον KTable με ένα κοινό κλειδί.
  • εκτελέστε χρήσιμες λειτουργίες όπως η συλλογή και η συγκέντρωση σε αυτές τις ομαδοποιημένες τιμές.

Η γνώση του τρόπου εκτέλεσης αυτών των λειτουργιών είναι σημαντική για την κατανόηση της σημασίας των δεδομένων που διακινούνται μέσω μιας εφαρμογής Kafka Streams και για την κατανόηση των πληροφοριών που μεταφέρει.

Συγκεντρώσαμε επίσης μερικές από τις βασικές έννοιες που συζητήθηκαν νωρίτερα σε αυτό το βιβλίο. Στο Κεφάλαιο 4, συζητήσαμε πόσο σημαντική είναι η τοπική κατάσταση ανοχής σε σφάλματα για μια εφαρμογή ροής. Το πρώτο παράδειγμα σε αυτό το κεφάλαιο έδειξε γιατί η τοπική πολιτεία είναι τόσο σημαντική—σας δίνει τη δυνατότητα να παρακολουθείτε ποιες πληροφορίες έχετε ήδη δει. Η τοπική πρόσβαση αποφεύγει τις καθυστερήσεις δικτύου, καθιστώντας την εφαρμογή πιο αποτελεσματική και ανθεκτική στα σφάλματα.

Όταν εκτελείτε οποιαδήποτε λειτουργία συνάθροισης ή συγκέντρωσης, πρέπει να καθορίσετε το όνομα της κατάστασης αποθήκευσης. Οι λειτουργίες συνάθροισης και συνάθροισης επιστρέφουν μια παρουσία KTable και ο KTable χρησιμοποιεί αποθήκευση κατάστασης για να αντικαταστήσει τα παλιά αποτελέσματα με νέα. Όπως είδατε, δεν αποστέλλονται όλες οι ενημερώσεις κατ' αρχήν, και αυτό είναι σημαντικό επειδή οι λειτουργίες συγκέντρωσης έχουν σχεδιαστεί για να παράγουν συνοπτικές πληροφορίες. Εάν δεν εφαρμόσετε τοπική κατάσταση, το KTable θα προωθήσει όλα τα αποτελέσματα συνάθροισης και συνάθροισης.

Στη συνέχεια, θα εξετάσουμε την εκτέλεση λειτουργιών όπως η συγκέντρωση μέσα σε μια συγκεκριμένη χρονική περίοδο - οι λεγόμενες λειτουργίες παραθύρου.

5.3.2. Λειτουργίες παραθύρων

Στην προηγούμενη ενότητα, παρουσιάσαμε τη συνέλιξη και τη συνάθροιση ολίσθησης. Η εφαρμογή πραγματοποίησε μια συνεχή ανανέωση του όγκου πωλήσεων μετοχών, ακολουθούμενη από τη συγκέντρωση των πέντε μετοχών με τις περισσότερες συναλλαγές στο χρηματιστήριο.

Μερικές φορές είναι απαραίτητη μια τέτοια συνεχής συγκέντρωση και συνάθροιση αποτελεσμάτων. Και μερικές φορές χρειάζεται να εκτελέσετε λειτουργίες μόνο σε μια δεδομένη χρονική περίοδο. Για παράδειγμα, υπολογίστε πόσες συναλλαγές ανταλλαγής έγιναν με μετοχές μιας συγκεκριμένης εταιρείας τα τελευταία 10 λεπτά. Ή πόσοι χρήστες έκαναν κλικ σε ένα νέο διαφημιστικό banner τα τελευταία 15 λεπτά. Μια εφαρμογή μπορεί να εκτελέσει τέτοιες λειτουργίες πολλές φορές, αλλά με αποτελέσματα που ισχύουν μόνο για καθορισμένες χρονικές περιόδους (χρονικά παράθυρα).

Καταμέτρηση συναλλαγών ανά αγοραστή

Στο επόμενο παράδειγμα, θα παρακολουθούμε τις συναλλαγές μετοχών σε πολλούς εμπόρους—είτε μεγάλους οργανισμούς είτε έξυπνους μεμονωμένους χρηματοδότες.

Υπάρχουν δύο πιθανοί λόγοι για αυτήν την παρακολούθηση. Ένα από αυτά είναι η ανάγκη να γνωρίζουμε τι αγοράζουν/πουλούν οι ηγέτες της αγοράς. Εάν αυτοί οι μεγάλοι παίκτες και οι εξελιγμένοι επενδυτές βλέπουν ευκαιρίες, είναι λογικό να ακολουθήσουν τη στρατηγική τους. Ο δεύτερος λόγος είναι η επιθυμία να εντοπιστούν τυχόν σημάδια παράνομης διαπραγμάτευσης εμπιστευτικών πληροφοριών. Για να γίνει αυτό, θα χρειαστεί να αναλύσετε τη συσχέτιση των μεγάλων εκτινάξεων πωλήσεων με σημαντικά δελτία τύπου.

Αυτή η παρακολούθηση αποτελείται από τα ακόλουθα βήματα:

  • δημιουργία ροής για ανάγνωση από το θέμα των συναλλαγών μετοχών.
  • ομαδοποίηση εισερχόμενων εγγραφών κατά αναγνωριστικό αγοραστή και σύμβολο μετοχής. Η κλήση της μεθόδου groupBy επιστρέφει μια παρουσία της κλάσης KGroupedStream.
  • Η μέθοδος KGroupedStream.windowedBy επιστρέφει μια ροή δεδομένων που περιορίζεται σε ένα χρονικό παράθυρο, το οποίο επιτρέπει τη συγκέντρωση παραθύρων. Ανάλογα με τον τύπο του παραθύρου, επιστρέφεται είτε ένα TimeWindowedKStream είτε ένα SessionWindowedKStream.
  • πλήθος συναλλαγών για τη λειτουργία συγκέντρωσης. Η ροή δεδομένων με παράθυρο καθορίζει εάν μια συγκεκριμένη εγγραφή λαμβάνεται υπόψη σε αυτήν την καταμέτρηση.
  • εγγραφή αποτελεσμάτων σε ένα θέμα ή εξαγωγή τους στην κονσόλα κατά την ανάπτυξη.

Η τοπολογία αυτής της εφαρμογής είναι απλή, αλλά μια σαφής εικόνα της θα ήταν χρήσιμη. Ας ρίξουμε μια ματιά στο Σχ. 5.11.

Στη συνέχεια, θα εξετάσουμε τη λειτουργικότητα των λειτουργιών του παραθύρου και τον αντίστοιχο κώδικα.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"

Τύποι παραθύρων

Υπάρχουν τρεις τύποι παραθύρων στο Kafka Streams:

  • συνεδριακός;
  • “Tumbling” (Tumbling)
  • ολίσθηση/πηδώντας.

Ποιο να επιλέξετε εξαρτάται από τις απαιτήσεις της επιχείρησής σας. Τα παράθυρα ανατροπής και μεταπήδησης είναι χρονικά περιορισμένα, ενώ τα παράθυρα συνεδρίας περιορίζονται από τη δραστηριότητα του χρήστη—η διάρκεια της(των) περιόδου(ων) καθορίζεται αποκλειστικά από το πόσο ενεργός είναι ο χρήστης. Το κύριο πράγμα που πρέπει να θυμάστε είναι ότι όλοι οι τύποι παραθύρων βασίζονται στις σφραγίδες ημερομηνίας/ώρας των καταχωρήσεων και όχι στην ώρα του συστήματος.

Στη συνέχεια, υλοποιούμε την τοπολογία μας με κάθε έναν από τους τύπους παραθύρων. Ο πλήρης κωδικός θα δοθεί μόνο στο πρώτο παράδειγμα· για άλλους τύπους παραθύρων τίποτα δεν θα αλλάξει εκτός από τον τύπο λειτουργίας του παραθύρου.

Παράθυρα συνεδρίας

Τα παράθυρα συνεδρίας είναι πολύ διαφορετικά από όλους τους άλλους τύπους παραθύρων. Περιορίζονται όχι τόσο χρονικά όσο από τη δραστηριότητα του χρήστη (ή τη δραστηριότητα της οντότητας που θα θέλατε να παρακολουθήσετε). Τα παράθυρα συνεδρίας οριοθετούνται από περιόδους αδράνειας.

Το σχήμα 5.12 απεικονίζει την έννοια των παραθύρων συνεδρίας. Η μικρότερη περίοδος σύνδεσης θα συγχωνευθεί με την περίοδο λειτουργίας στα αριστερά της. Και η συνεδρία στα δεξιά θα είναι ξεχωριστή γιατί ακολουθεί μια μακρά περίοδο αδράνειας. Τα παράθυρα συνεδρίας βασίζονται στη δραστηριότητα του χρήστη, αλλά χρησιμοποιούν σφραγίδες ημερομηνίας/ώρας από καταχωρήσεις για να καθορίσουν σε ποια περίοδο λειτουργίας ανήκει η καταχώρηση.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"

Χρήση παραθύρων συνεδρίας για την παρακολούθηση συναλλαγών μετοχών

Ας χρησιμοποιήσουμε τα παράθυρα συνεδρίας για να καταγράψουμε πληροφορίες σχετικά με τις συναλλαγές ανταλλαγής. Η υλοποίηση των παραθύρων περιόδου λειτουργίας εμφανίζεται στη Λίστα 5.5 (η οποία βρίσκεται στο src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Έχετε ήδη δει τις περισσότερες από τις λειτουργίες σε αυτήν την τοπολογία, επομένως δεν χρειάζεται να τις αναθεωρήσετε ξανά εδώ. Υπάρχουν όμως και αρκετά νέα στοιχεία εδώ, τα οποία θα συζητήσουμε τώρα.

Οποιαδήποτε λειτουργία groupBy εκτελεί συνήθως κάποιο είδος λειτουργίας συγκέντρωσης (συγκέντρωση, συνάθροιση ή καταμέτρηση). Μπορείτε να εκτελέσετε είτε αθροιστική συγκέντρωση με ένα τρέχον σύνολο είτε συνάθροιση παραθύρων, η οποία λαμβάνει υπόψη τις εγγραφές εντός ενός καθορισμένου χρονικού παραθύρου.

Ο κωδικός στη Λίστα 5.5 μετράει τον αριθμό των συναλλαγών εντός των παραθύρων συνεδρίας. Στο Σχ. 5.13 αυτές οι ενέργειες αναλύονται βήμα προς βήμα.

Καλώντας το windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) δημιουργούμε ένα παράθυρο συνεδρίας με διάστημα αδράνειας 20 δευτερόλεπτα και διάστημα παραμονής 15 λεπτών. Ένα μεσοδιάστημα αδράνειας 20 δευτερολέπτων σημαίνει ότι η εφαρμογή θα περιλαμβάνει κάθε καταχώρηση που φτάνει εντός 20 δευτερολέπτων από το τέλος ή την έναρξη της τρέχουσας συνεδρίας στην τρέχουσα (ενεργή) περίοδο λειτουργίας.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Στη συνέχεια, καθορίζουμε ποια λειτουργία συγκέντρωσης πρέπει να εκτελεστεί στο παράθυρο συνεδρίας - σε αυτήν την περίπτωση, μέτρηση. Εάν μια εισερχόμενη καταχώριση πέσει έξω από το παράθυρο αδράνειας (κάθε πλευρά της σφραγίδας ημερομηνίας/ώρας), η εφαρμογή δημιουργεί μια νέα περίοδο λειτουργίας. Το διάστημα διατήρησης σημαίνει τη διατήρηση μιας περιόδου σύνδεσης για ορισμένο χρονικό διάστημα και επιτρέπει καθυστερημένα δεδομένα που εκτείνονται πέρα ​​από την περίοδο αδράνειας της περιόδου σύνδεσης, αλλά μπορούν ακόμα να επισυναφθούν. Επιπλέον, η έναρξη και το τέλος της νέας περιόδου σύνδεσης που προκύπτει από τη συγχώνευση αντιστοιχούν στην παλαιότερη και πιο πρόσφατη σφραγίδα ημερομηνίας/ώρας.

Ας δούμε μερικές εγγραφές από τη μέθοδο μέτρησης για να δούμε πώς λειτουργούν οι περίοδοι λειτουργίας (Πίνακας 5.1).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Όταν φτάνουν οι εγγραφές, αναζητούμε υπάρχουσες περιόδους σύνδεσης με το ίδιο κλειδί, ώρα λήξης μικρότερη από την τρέχουσα σήμανση ημερομηνίας/ώρας - διάστημα αδράνειας και ώρα έναρξης μεγαλύτερη από την τρέχουσα σφραγίδα ημερομηνίας/ώρας + διάστημα αδράνειας. Λαμβάνοντας αυτό υπόψη, τέσσερις καταχωρήσεις από τον πίνακα. 5.1 συγχωνεύονται σε μία μόνο περίοδο λειτουργίας ως εξής.

1. Η εγγραφή 1 φτάνει πρώτη, οπότε η ώρα έναρξης είναι ίση με την ώρα λήξης και είναι 00:00:00.

2. Στη συνέχεια, φτάνει η καταχώριση 2 και αναζητούμε συνεδρίες που τελειώνουν όχι νωρίτερα από τις 23:59:55 και ξεκινούν το αργότερο στις 00:00:35. Βρίσκουμε την εγγραφή 1 και συνδυάζουμε τις συνεδρίες 1 και 2. Λαμβάνουμε την ώρα έναρξης της συνεδρίας 1 (νωρίτερα) και την ώρα λήξης της συνεδρίας 2 (αργότερα), έτσι ώστε η νέα μας συνεδρία να ξεκινά στις 00:00:00 και να τελειώνει στις 00: 00:15.

3. Η εγγραφή 3 φτάνει, αναζητούμε συνεδρίες μεταξύ 00:00:30 και 00:01:10 και δεν βρίσκουμε καμία. Προσθέστε μια δεύτερη συνεδρία για το κλειδί 123-345-654,FFBE, που αρχίζει και τελειώνει στις 00:00:50.

4. Η εγγραφή 4 φτάνει και αναζητούμε συνεδρίες μεταξύ 23:59:45 και 00:00:25. Αυτή τη φορά βρίσκονται και οι δύο συνεδρίες 1 και 2. Και οι τρεις συνεδρίες συνδυάζονται σε μία, με ώρα έναρξης 00:00:00 και ώρα λήξης 00:00:15.

Από όσα περιγράφονται σε αυτήν την ενότητα, αξίζει να θυμάστε τις ακόλουθες σημαντικές αποχρώσεις:

  • Οι συνεδρίες δεν είναι παράθυρα σταθερού μεγέθους. Η διάρκεια μιας συνεδρίας καθορίζεται από τη δραστηριότητα σε μια δεδομένη χρονική περίοδο.
  • Οι σφραγίδες ημερομηνίας/ώρας στα δεδομένα καθορίζουν εάν το συμβάν εμπίπτει σε μια υπάρχουσα περίοδο λειτουργίας ή σε μια περίοδο αδράνειας.

Στη συνέχεια θα συζητήσουμε τον επόμενο τύπο παραθύρων - "αναποδογυρισμένα" παράθυρα.

"Περιπτόμενα" παράθυρα

Τα αναδιπλούμενα παράθυρα καταγράφουν γεγονότα που εμπίπτουν σε μια συγκεκριμένη χρονική περίοδο. Φανταστείτε ότι πρέπει να καταγράφετε όλες τις συναλλαγές μετοχών μιας συγκεκριμένης εταιρείας κάθε 20 δευτερόλεπτα, ώστε να συλλέγετε όλα τα συμβάντα κατά τη διάρκεια αυτής της χρονικής περιόδου. Στο τέλος του διαστήματος των 20 δευτερολέπτων, το παράθυρο κυλάει και μετακινείται σε ένα νέο διάστημα παρατήρησης 20 δευτερολέπτων. Το Σχήμα 5.14 απεικονίζει αυτήν την κατάσταση.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Όπως μπορείτε να δείτε, όλα τα συμβάντα που ελήφθησαν τα τελευταία 20 δευτερόλεπτα περιλαμβάνονται στο παράθυρο. Στο τέλος αυτής της χρονικής περιόδου, δημιουργείται ένα νέο παράθυρο.

Η καταχώριση 5.6 εμφανίζει κώδικα που δείχνει τη χρήση αναδιπλούμενων παραθύρων για την καταγραφή συναλλαγών μετοχών κάθε 20 δευτερόλεπτα (βρίσκεται στο src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Με αυτήν τη μικρή αλλαγή στην κλήση της μεθόδου TimeWindows.of, μπορείτε να χρησιμοποιήσετε ένα αναδιπλούμενο παράθυρο. Αυτό το παράδειγμα δεν καλεί τη μέθοδο while(), επομένως θα χρησιμοποιηθεί το προεπιλεγμένο διάστημα διατήρησης των 24 ωρών.

Επιτέλους, ήρθε η ώρα να προχωρήσουμε στην τελευταία από τις επιλογές παραθύρων - «πηδώντας» τα παράθυρα.

Συρόμενα ("πηδώντας") παράθυρα

Τα συρόμενα παράθυρα είναι παρόμοια με τα αναδιπλούμενα παράθυρα, αλλά με μια μικρή διαφορά. Τα συρόμενα παράθυρα δεν περιμένουν μέχρι το τέλος του χρονικού διαστήματος πριν δημιουργήσουν ένα νέο παράθυρο για την επεξεργασία πρόσφατων συμβάντων. Ξεκινούν νέους υπολογισμούς μετά από ένα διάστημα αναμονής μικρότερο από τη διάρκεια του παραθύρου.

Για να δείξουμε τις διαφορές μεταξύ των παραθύρων ανατροπής και άλματος, ας επιστρέψουμε στο παράδειγμα της καταμέτρησης των χρηματιστηριακών συναλλαγών. Στόχος μας εξακολουθεί να είναι να μετράμε τον αριθμό των συναλλαγών, αλλά δεν θέλουμε να περιμένουμε ολόκληρο το χρονικό διάστημα πριν ενημερώσουμε το μετρητή. Αντίθετα, θα ενημερώνουμε τον μετρητή σε μικρότερα χρονικά διαστήματα. Για παράδειγμα, θα εξακολουθούμε να μετράμε τον αριθμό των συναλλαγών κάθε 20 δευτερόλεπτα, αλλά ενημερώνουμε τον μετρητή κάθε 5 δευτερόλεπτα, όπως φαίνεται στην Εικ. 5.15. Σε αυτήν την περίπτωση, καταλήγουμε σε τρία παράθυρα αποτελεσμάτων με επικαλυπτόμενα δεδομένα.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Η λίστα 5.7 δείχνει τον κώδικα για τον ορισμό των συρόμενων παραθύρων (βρίσκεται στο src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Ένα αναδιπλούμενο παράθυρο μπορεί να μετατραπεί σε παράθυρο αναπήδησης προσθέτοντας μια κλήση στη μέθοδο advanceBy(). Στο παράδειγμα που φαίνεται, το διάστημα αποθήκευσης είναι 15 λεπτά.

Είδατε σε αυτήν την ενότητα πώς να περιορίσετε τα αποτελέσματα συγκέντρωσης σε χρονικά παράθυρα. Συγκεκριμένα, θέλω να θυμάστε τα ακόλουθα τρία πράγματα από αυτήν την ενότητα:

  • το μέγεθος των παραθύρων συνεδρίας δεν περιορίζεται από τη χρονική περίοδο, αλλά από τη δραστηριότητα του χρήστη.
  • Τα παράθυρα που «πέφτουν» παρέχουν μια επισκόπηση των γεγονότων μέσα σε μια δεδομένη χρονική περίοδο.
  • Η διάρκεια των jumping windows είναι σταθερή, αλλά ενημερώνονται συχνά και ενδέχεται να περιέχουν επικαλυπτόμενες καταχωρήσεις σε όλα τα παράθυρα.

Στη συνέχεια, θα μάθουμε πώς να μετατρέψουμε ένα KTable ξανά σε KStream για σύνδεση.

5.3.3. Σύνδεση αντικειμένων KStream και KTable

Στο Κεφάλαιο 4, συζητήσαμε τη σύνδεση δύο αντικειμένων KStream. Τώρα πρέπει να μάθουμε πώς να συνδέουμε το KTable και το KStream. Αυτό μπορεί να χρειαστεί για τον ακόλουθο απλό λόγο. Το KStream είναι μια ροή εγγραφών και το KTable είναι μια ροή ενημερώσεων εγγραφών, αλλά μερικές φορές μπορεί να θέλετε να προσθέσετε πρόσθετο πλαίσιο στη ροή εγγραφών χρησιμοποιώντας ενημερώσεις από το KTable.

Ας πάρουμε δεδομένα για τον αριθμό των χρηματιστηριακών συναλλαγών και ας τα συνδυάσουμε με χρηματιστηριακά νέα για τους σχετικούς κλάδους. Δείτε τι πρέπει να κάνετε για να το πετύχετε, δεδομένου του κωδικού που έχετε ήδη.

  1. Μετατρέψτε ένα αντικείμενο KTable με δεδομένα σχετικά με τον αριθμό των συναλλαγών μετοχών σε KStream και στη συνέχεια αντικαταστήστε το κλειδί με το κλειδί που υποδεικνύει τον κλάδο του κλάδου που αντιστοιχεί σε αυτό το σύμβολο μετοχών.
  2. Δημιουργήστε ένα αντικείμενο KTable που διαβάζει δεδομένα από ένα θέμα με χρηματιστηριακά νέα. Αυτό το νέο KTable θα κατηγοριοποιηθεί ανά κλάδο βιομηχανίας.
  3. Συνδέστε τις ενημερώσεις ειδήσεων με πληροφορίες σχετικά με τον αριθμό των χρηματιστηριακών συναλλαγών ανά κλάδο κλάδου.

Τώρα ας δούμε πώς να εφαρμόσουμε αυτό το σχέδιο δράσης.

Μετατροπή KTable σε KStream

Για να μετατρέψετε το KTable σε KStream πρέπει να κάνετε τα εξής.

  1. Καλέστε τη μέθοδο KTable.toStream().
  2. Καλώντας τη μέθοδο KStream.map, αντικαταστήστε το κλειδί με το όνομα του κλάδου και, στη συνέχεια, ανακτήστε το αντικείμενο TransactionSummary από την παρουσία Windowed.

Θα συνδέσουμε αυτές τις λειτουργίες μεταξύ τους ως εξής (ο κώδικας βρίσκεται στο αρχείο src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Λίστα 5.8).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Επειδή εκτελούμε μια λειτουργία KStream.map, η επιστρεφόμενη παρουσία KStream διαχωρίζεται εκ νέου αυτόματα όταν χρησιμοποιείται σε μια σύνδεση.

Ολοκληρώσαμε τη διαδικασία μετατροπής, στη συνέχεια πρέπει να δημιουργήσουμε ένα αντικείμενο KTable για την ανάγνωση ειδήσεων μετοχών.

Δημιουργία KTable για ειδήσεις μετοχών

Ευτυχώς, η δημιουργία ενός αντικειμένου KTable χρειάζεται μόνο μία γραμμή κώδικα (ο κώδικας βρίσκεται στο src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Λίστα 5.9).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Αξίζει να σημειωθεί ότι δεν απαιτείται ο καθορισμός αντικειμένων Serde, καθώς στις ρυθμίσεις χρησιμοποιούνται Serdes συμβολοσειρών. Επίσης, χρησιμοποιώντας την ΠΡΩΙΤΕΡΗ απαρίθμηση, ο πίνακας γεμίζει με εγγραφές στην αρχή.

Τώρα μπορούμε να προχωρήσουμε στο τελευταίο βήμα - σύνδεση.

Σύνδεση ενημερώσεων ειδήσεων με δεδομένα καταμέτρησης συναλλαγών

Η δημιουργία μιας σύνδεσης δεν είναι δύσκολη. Θα χρησιμοποιήσουμε ένα αριστερό σύνδεσμο σε περίπτωση που δεν υπάρχουν νέα μετοχών για τον σχετικό κλάδο (τον απαραίτητο κωδικό μπορείτε να βρείτε στο αρχείο src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Λίστα 5.10).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Αυτός ο τελεστής leftJoin είναι αρκετά απλός. Σε αντίθεση με τις συνδέσεις στο Κεφάλαιο 4, η μέθοδος JoinWindow δεν χρησιμοποιείται επειδή κατά την εκτέλεση μιας σύνδεσης KStream-KTable, υπάρχει μόνο μία καταχώρηση στον KTable για κάθε κλειδί. Μια τέτοια σύνδεση δεν περιορίζεται χρονικά: η εγγραφή είναι είτε στον KTable είτε απουσιάζει. Το κύριο συμπέρασμα: χρησιμοποιώντας αντικείμενα KTable μπορείτε να εμπλουτίσετε το KStream με δεδομένα αναφοράς που ενημερώνονται λιγότερο συχνά.

Τώρα θα εξετάσουμε έναν πιο αποτελεσματικό τρόπο εμπλουτισμού εκδηλώσεων από το KStream.

5.3.4. Αντικείμενα GlobalKTable

Όπως μπορείτε να δείτε, υπάρχει ανάγκη να εμπλουτιστούν οι ροές συμβάντων ή να προσθέσετε περιεχόμενο σε αυτές. Στο Κεφάλαιο 4 είδατε τις συνδέσεις μεταξύ δύο αντικειμένων KStream και στην προηγούμενη ενότητα είδατε τη σύνδεση μεταξύ ενός KStream και ενός KTable. Σε όλες αυτές τις περιπτώσεις, είναι απαραίτητο να γίνει εκ νέου διαχωρισμός της ροής δεδομένων κατά την αντιστοίχιση των κλειδιών σε έναν νέο τύπο ή τιμή. Μερικές φορές ο επαναδιαμερισμός γίνεται ρητά και μερικές φορές το Kafka Streams το κάνει αυτόματα. Η επαναδιαμέριση είναι απαραίτητη επειδή τα κλειδιά έχουν αλλάξει και οι εγγραφές πρέπει να καταλήγουν σε νέες ενότητες, διαφορετικά η σύνδεση θα είναι αδύνατη (αυτό συζητήθηκε στο Κεφάλαιο 4, στην ενότητα «Δεδομένα εκ νέου κατάτμησης» στην υποενότητα 4.2.4).

Η επαναδιαμέριση έχει κόστος

Η επαναδιαμέριση απαιτεί κόστος - πρόσθετο κόστος πόρων για τη δημιουργία ενδιάμεσων θεμάτων, την αποθήκευση διπλών δεδομένων σε άλλο θέμα. σημαίνει επίσης αυξημένη καθυστέρηση λόγω γραφής και ανάγνωσης από αυτό το θέμα. Επιπλέον, εάν χρειάζεται να συνδέσετε περισσότερες από μία πτυχές ή διαστάσεις, πρέπει να συνδέσετε τις συνδέσεις, να αντιστοιχίσετε τις εγγραφές με νέα κλειδιά και να εκτελέσετε ξανά τη διαδικασία διαμερίσματος.

Σύνδεση σε μικρότερα σύνολα δεδομένων

Σε ορισμένες περιπτώσεις, ο όγκος των δεδομένων αναφοράς που πρόκειται να συνδεθούν είναι σχετικά μικρός, επομένως τα πλήρη αντίγραφά τους μπορούν εύκολα να χωρέσουν τοπικά σε κάθε κόμβο. Για καταστάσεις όπως αυτή, το Kafka Streams παρέχει την κλάση GlobalKTable.

Οι παρουσίες του GlobalKTable είναι μοναδικές επειδή η εφαρμογή αναπαράγει όλα τα δεδομένα σε κάθε κόμβο. Και δεδομένου ότι όλα τα δεδομένα υπάρχουν σε κάθε κόμβο, δεν χρειάζεται να χωρίσετε τη ροή συμβάντων με κλειδί δεδομένων αναφοράς, ώστε να είναι διαθέσιμη σε όλα τα διαμερίσματα. Μπορείτε επίσης να κάνετε συνδέσεις χωρίς κλειδί χρησιμοποιώντας αντικείμενα GlobalKTable. Ας επιστρέψουμε σε ένα από τα προηγούμενα παραδείγματα για να δείξουμε αυτό το χαρακτηριστικό.

Σύνδεση αντικειμένων KStream με αντικείμενα GlobalKTable

Στην υποενότητα 5.3.2, πραγματοποιήσαμε συνάθροιση παραθύρων των συναλλαγών ανταλλαγής από αγοραστές. Τα αποτελέσματα αυτής της συνάθροισης έμοιαζαν κάπως έτσι:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

Αν και αυτά τα αποτελέσματα εξυπηρετούσαν τον σκοπό, θα ήταν πιο χρήσιμο να εμφανίζονταν επίσης το όνομα του πελάτη και το πλήρες όνομα της εταιρείας. Για να προσθέσετε το όνομα πελάτη και την επωνυμία της εταιρείας, μπορείτε να κάνετε κανονικές συνδέσεις, αλλά θα χρειαστεί να κάνετε δύο αντιστοιχίσεις κλειδιών και επαναδιαμέριση. Με το GlobalKTable μπορείτε να αποφύγετε το κόστος τέτοιων λειτουργιών.

Για να γίνει αυτό, θα χρησιμοποιήσουμε το αντικείμενο countStream από τη Λίστα 5.11 (ο αντίστοιχος κώδικας βρίσκεται στο src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) και θα το συνδέσουμε με δύο αντικείμενα GlobalKTable.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Το έχουμε ήδη ξανασυζητήσει, οπότε δεν θα το επαναλάβω. Αλλά σημειώνω ότι ο κώδικας στη συνάρτηση toStream().map αφαιρείται σε ένα αντικείμενο συνάρτησης αντί για μια ενσωματωμένη έκφραση λάμδα για λόγους αναγνωσιμότητας.

Το επόμενο βήμα είναι να δηλώσετε δύο παρουσίες του GlobalKTable (ο κώδικας που εμφανίζεται βρίσκεται στο αρχείο src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Λίστα 5.12).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"

Λάβετε υπόψη ότι τα ονόματα των θεμάτων περιγράφονται χρησιμοποιώντας απαριθμημένους τύπους.

Τώρα που έχουμε όλα τα στοιχεία έτοιμα, το μόνο που μένει είναι να γράψουμε τον κώδικα για τη σύνδεση (ο οποίος βρίσκεται στο αρχείο src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Λίστα 5.13).

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Αν και υπάρχουν δύο ενώσεις σε αυτόν τον κώδικα, είναι αλυσιδωτές επειδή κανένα από τα αποτελέσματά τους δεν χρησιμοποιείται ξεχωριστά. Τα αποτελέσματα εμφανίζονται στο τέλος ολόκληρης της λειτουργίας.

Όταν εκτελείτε την παραπάνω λειτουργία σύνδεσης, θα λάβετε αποτελέσματα όπως αυτό:

{customer='Barney, Smith' company="Exxon", transactions= 17}

Η ουσία δεν έχει αλλάξει, αλλά αυτά τα αποτελέσματα φαίνονται πιο ξεκάθαρα.

Αν μετρήσετε αντίστροφα μέχρι το Κεφάλαιο 4, έχετε ήδη δει πολλούς τύπους συνδέσεων σε δράση. Παρατίθενται στον πίνακα. 5.2. Αυτός ο πίνακας αντικατοπτρίζει τις δυνατότητες συνδεσιμότητας από την έκδοση 1.0.0 του Kafka Streams. Κάτι μπορεί να αλλάξει σε μελλοντικές εκδόσεις.

Το βιβλίο «Ο Kafka Streams in Action. Εφαρμογές και μικροϋπηρεσίες για εργασία σε πραγματικό χρόνο"
Για να ολοκληρώσουμε τα πράγματα, ας ανακεφαλαιώσουμε τα βασικά: μπορείτε να συνδέσετε ροές συμβάντων (KStream) και να ενημερώσετε ροές (KTable) χρησιμοποιώντας τοπική κατάσταση. Εναλλακτικά, εάν το μέγεθος των δεδομένων αναφοράς δεν είναι πολύ μεγάλο, μπορείτε να χρησιμοποιήσετε το αντικείμενο GlobalKTable. Τα GlobalKTables αναπαράγουν όλες τις κατατμήσεις σε κάθε κόμβο εφαρμογής Kafka Streams, διασφαλίζοντας ότι όλα τα δεδομένα είναι διαθέσιμα ανεξάρτητα από το διαμέρισμα σε ποιο διαμέρισμα αντιστοιχεί το κλειδί.

Στη συνέχεια θα δούμε τη λειτουργία Kafka Streams, χάρη στην οποία μπορούμε να παρατηρήσουμε αλλαγές κατάστασης χωρίς να καταναλώνουμε δεδομένα από ένα θέμα Kafka.

5.3.5. Αμφισβητούμενη κατάσταση

Έχουμε ήδη εκτελέσει αρκετές λειτουργίες που περιλαμβάνουν κατάσταση και πάντα εξαγωγή των αποτελεσμάτων στην κονσόλα (για λόγους ανάπτυξης) ή εγγραφή τους σε ένα θέμα (για λόγους παραγωγής). Όταν γράφετε αποτελέσματα σε ένα θέμα, πρέπει να χρησιμοποιήσετε έναν καταναλωτή Kafka για να τα δείτε.

Η ανάγνωση δεδομένων από αυτά τα θέματα μπορεί να θεωρηθεί ως ένας τύπος υλοποιημένων απόψεων. Για τους σκοπούς μας, μπορούμε να χρησιμοποιήσουμε τον ορισμό μιας υλοποιημένης προβολής από τη Wikipedia: «...ένα φυσικό αντικείμενο βάσης δεδομένων που περιέχει τα αποτελέσματα ενός ερωτήματος. Για παράδειγμα, θα μπορούσε να είναι ένα τοπικό αντίγραφο απομακρυσμένων δεδομένων ή ένα υποσύνολο των γραμμών ή/και στηλών ενός πίνακα ή αποτελεσμάτων ένωσης ή ένας συνοπτικός πίνακας που λαμβάνεται μέσω συγκέντρωσης» (https://en.wikipedia.org/wiki /Materialized_view).

Το Kafka Streams σάς επιτρέπει επίσης να εκτελείτε διαδραστικά ερωτήματα σε κρατικά καταστήματα, επιτρέποντάς σας να διαβάζετε απευθείας αυτές τις υλοποιημένες απόψεις. Είναι σημαντικό να σημειωθεί ότι το ερώτημα στο κρατικό κατάστημα είναι μια λειτουργία μόνο για ανάγνωση. Αυτό διασφαλίζει ότι δεν χρειάζεται να ανησυχείτε μήπως γίνει τυχαία η κατάσταση ασυνεπής ενώ η εφαρμογή σας επεξεργάζεται δεδομένα.

Η δυνατότητα άμεσης υποβολής ερωτημάτων στα καταστήματα κατάστασης είναι σημαντική. Αυτό σημαίνει ότι μπορείτε να δημιουργήσετε εφαρμογές πίνακα εργαλείων χωρίς να χρειάζεται να λάβετε πρώτα δεδομένα από τον καταναλωτή Kafka. Αυξάνει επίσης την αποτελεσματικότητα της εφαρμογής, λόγω του γεγονότος ότι δεν χρειάζεται να γράψετε ξανά δεδομένα:

  • χάρη στην εντοπιότητα των δεδομένων, είναι δυνατή η γρήγορη πρόσβαση σε αυτά.
  • Η αντιγραφή δεδομένων εξαλείφεται, καθώς δεν εγγράφεται σε εξωτερικό χώρο αποθήκευσης.

Το κύριο πράγμα που θέλω να θυμάστε είναι ότι μπορείτε να ρωτήσετε απευθείας την κατάσταση μέσα από την εφαρμογή σας. Οι ευκαιρίες που σας δίνει αυτό δεν μπορούν να υπερεκτιμηθούν. Αντί να καταναλώνετε δεδομένα από τον Κάφκα και να αποθηκεύετε εγγραφές σε μια βάση δεδομένων για την εφαρμογή, μπορείτε να ρωτήσετε τα καταστήματα κατάστασης με το ίδιο αποτέλεσμα. Απευθείας ερωτήματα σε κρατικά καταστήματα σημαίνουν λιγότερο κώδικα (χωρίς καταναλωτή) και λιγότερο λογισμικό (δεν χρειάζεται πίνακας βάσης δεδομένων για την αποθήκευση των αποτελεσμάτων).

Έχουμε καλύψει αρκετά σημεία σε αυτό το κεφάλαιο, επομένως θα αφήσουμε τη συζήτησή μας για τα διαδραστικά ερωτήματα έναντι των κρατικών καταστημάτων προς το παρόν. Αλλά μην ανησυχείτε: στο Κεφάλαιο 9, θα δημιουργήσουμε μια απλή εφαρμογή πίνακα εργαλείων με διαδραστικά ερωτήματα. Θα χρησιμοποιήσει μερικά από τα παραδείγματα αυτού και των προηγούμενων κεφαλαίων για να δείξει διαδραστικά ερωτήματα και πώς μπορείτε να τα προσθέσετε στις εφαρμογές Kafka Streams.

Περίληψη

  • Τα αντικείμενα KStream αντιπροσωπεύουν ροές γεγονότων, συγκρίσιμες με τις εισαγωγές σε μια βάση δεδομένων. Τα αντικείμενα KTable αντιπροσωπεύουν ροές ενημέρωσης, περισσότερο σαν ενημερώσεις σε μια βάση δεδομένων. Το μέγεθος του αντικειμένου KTable δεν μεγαλώνει, οι παλιές εγγραφές αντικαθίστανται από νέες.
  • Τα αντικείμενα KTable απαιτούνται για λειτουργίες συνάθροισης.
  • Χρησιμοποιώντας τις λειτουργίες παραθύρου, μπορείτε να χωρίσετε συγκεντρωτικά δεδομένα σε κουβάδες χρόνου.
  • Χάρη στα αντικείμενα GlobalKTable, μπορείτε να αποκτήσετε πρόσβαση σε δεδομένα αναφοράς οπουδήποτε στην εφαρμογή, ανεξάρτητα από την κατάτμηση.
  • Είναι δυνατές οι συνδέσεις μεταξύ αντικειμένων KStream, KTable και GlobalKTable.

Μέχρι στιγμής, έχουμε επικεντρωθεί στη δημιουργία εφαρμογών Kafka Streams χρησιμοποιώντας το υψηλού επιπέδου KStream DSL. Αν και η προσέγγιση υψηλού επιπέδου σάς επιτρέπει να δημιουργείτε τακτοποιημένα και συνοπτικά προγράμματα, η χρήση της αντιπροσωπεύει ένα συμβιβασμό. Η συνεργασία με το DSL KStream σημαίνει αύξηση της συνοπτικότητας του κώδικά σας μειώνοντας τον βαθμό ελέγχου. Στο επόμενο κεφάλαιο, θα εξετάσουμε το API κόμβου χειριστή χαμηλού επιπέδου και θα δοκιμάσουμε άλλους συμβιβασμούς. Τα προγράμματα θα είναι μεγαλύτερα από ό,τι πριν, αλλά θα μπορούμε να δημιουργήσουμε σχεδόν οποιονδήποτε κόμβο χειριστή που μπορεί να χρειαστούμε.

→ Περισσότερες λεπτομέρειες για το βιβλίο μπορείτε να βρείτε στο ιστοσελίδα του εκδότη

→ Για Habrozhiteli 25% έκπτωση με χρήση κουπονιού - Ρεύματα Κάφκα

→ Με την πληρωμή της έντυπης έκδοσης του βιβλίου, θα αποσταλεί ηλεκτρονικό βιβλίο με e-mail.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο