Redis Stream - αξιοπιστία και επεκτασιμότητα των συστημάτων ανταλλαγής μηνυμάτων σας

Redis Stream - αξιοπιστία και επεκτασιμότητα των συστημάτων ανταλλαγής μηνυμάτων σας

Το Redis Stream είναι ένας νέος αφηρημένος τύπος δεδομένων που εισήχθη στο Redis με την έκδοση 5.0
Εννοιολογικά, το Redis Stream είναι μια λίστα στην οποία μπορείτε να προσθέσετε καταχωρήσεις. Κάθε καταχώρηση έχει ένα μοναδικό αναγνωριστικό. Από προεπιλογή, το αναγνωριστικό δημιουργείται αυτόματα και περιλαμβάνει μια χρονική σήμανση. Επομένως, μπορείτε να υποβάλετε ερωτήματα σε εύρη εγγραφών με την πάροδο του χρόνου ή να λάβετε νέα δεδομένα καθώς φτάνουν στη ροή, όπως ακριβώς η εντολή "tail -f" του Unix διαβάζει ένα αρχείο καταγραφής και παγώνει ενώ περιμένει νέα δεδομένα. Σημειώστε ότι πολλοί πελάτες μπορούν να ακούσουν ένα νήμα ταυτόχρονα, όπως και πολλές διεργασίες "tail -f" μπορούν να διαβάσουν ένα αρχείο ταυτόχρονα χωρίς να έρχονται σε διένεξη μεταξύ τους.

Για να κατανοήσουμε όλα τα πλεονεκτήματα του νέου τύπου δεδομένων, ας ρίξουμε μια γρήγορη ματιά στις ήδη υπάρχουσες δομές Redis που αντιγράφουν εν μέρει τη λειτουργικότητα του Redis Stream.

Redis PUB/SUB

Το Redis Pub/Sub είναι ένα απλό σύστημα ανταλλαγής μηνυμάτων που είναι ήδη ενσωματωμένο στο κατάστημά σας με αξία-κλειδιά. Ωστόσο, η απλότητα έχει ένα τίμημα:

  • Εάν ο εκδότης για κάποιο λόγο αποτύχει, τότε χάνει όλους τους συνδρομητές του
  • Ο εκδότης πρέπει να γνωρίζει την ακριβή διεύθυνση όλων των συνδρομητών του
  • Ένας εκδότης μπορεί να υπερφορτώσει τους συνδρομητές του με εργασία εάν τα δεδομένα δημοσιεύονται ταχύτερα από ό,τι υποβάλλονται σε επεξεργασία
  • Το μήνυμα διαγράφεται από το buffer του εκδότη αμέσως μετά τη δημοσίευση, ανεξάρτητα από το πόσους συνδρομητές παραδόθηκε και πόσο γρήγορα μπόρεσαν να επεξεργαστούν αυτό το μήνυμα.
  • Όλοι οι συνδρομητές θα λάβουν το μήνυμα ταυτόχρονα. Οι ίδιοι οι συνδρομητές πρέπει κατά κάποιο τρόπο να συμφωνήσουν μεταξύ τους για τη σειρά επεξεργασίας του ίδιου μηνύματος.
  • Δεν υπάρχει ενσωματωμένος μηχανισμός που να επιβεβαιώνει ότι ένας συνδρομητής έχει επεξεργαστεί με επιτυχία ένα μήνυμα. Εάν ένας συνδρομητής λάβει ένα μήνυμα και διακοπεί κατά τη διάρκεια της επεξεργασίας, ο εκδότης δεν θα το γνωρίζει.

Λίστα Redis

Η λίστα Redis είναι μια δομή δεδομένων που υποστηρίζει τον αποκλεισμό εντολών ανάγνωσης. Μπορείτε να προσθέσετε και να διαβάσετε μηνύματα από την αρχή ή το τέλος της λίστας. Με βάση αυτή τη δομή, μπορείτε να δημιουργήσετε μια καλή στοίβα ή ουρά για το κατανεμημένο σύστημά σας και στις περισσότερες περιπτώσεις αυτό θα είναι αρκετό. Κύριες διαφορές από το Redis Pub/Sub:

  • Το μήνυμα παραδίδεται σε έναν πελάτη. Ο πρώτος πελάτης που έχει αποκλειστεί από την ανάγνωση θα λάβει πρώτα τα δεδομένα.
  • Ο Κλιντ πρέπει να ξεκινήσει ο ίδιος τη λειτουργία ανάγνωσης για κάθε μήνυμα. Ο List δεν γνωρίζει τίποτα για τους πελάτες.
  • Τα μηνύματα αποθηκεύονται μέχρι να τα διαβάσει κάποιος ή να τα διαγράψει ρητά. Εάν ρυθμίσετε τις παραμέτρους του διακομιστή Redis ώστε να ξεπλένει δεδομένα στο δίσκο, τότε η αξιοπιστία του συστήματος αυξάνεται δραματικά.

Εισαγωγή στο Stream

Προσθήκη καταχώρισης σε ροή

Ομάδα XADD προσθέτει μια νέα καταχώρηση στη ροή. Μια εγγραφή δεν είναι απλώς μια συμβολοσειρά, αποτελείται από ένα ή περισσότερα ζεύγη κλειδιών-τιμών. Έτσι, κάθε καταχώρηση είναι ήδη δομημένη και μοιάζει με τη δομή ενός αρχείου CSV.

> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0

Στο παραπάνω παράδειγμα, προσθέτουμε δύο πεδία στη ροή με το όνομα (κλειδί) "mystream": "sensor-id" και "temperature" με τις τιμές "1234" και "19.8", αντίστοιχα. Ως δεύτερο όρισμα, η εντολή παίρνει ένα αναγνωριστικό που θα εκχωρηθεί στην καταχώρηση - αυτό το αναγνωριστικό προσδιορίζει μοναδικά κάθε καταχώρηση στη ροή. Ωστόσο, σε αυτήν την περίπτωση περάσαμε * επειδή θέλουμε το Redis να δημιουργήσει ένα νέο αναγνωριστικό για εμάς. Κάθε νέα ταυτότητα θα αυξάνεται. Επομένως, κάθε νέα καταχώρηση θα έχει υψηλότερο αναγνωριστικό σε σχέση με προηγούμενες εγγραφές.

Μορφή αναγνωριστικού

Το αναγνωριστικό καταχώρισης που επιστρέφεται από την εντολή XADD, αποτελείται από δύο μέρη:

{millisecondsTime}-{sequenceNumber}

χιλιοστά του δευτερολέπτουΧρόνος — Χρόνος Unix σε χιλιοστά του δευτερολέπτου (χρόνος διακομιστή Redis). Ωστόσο, εάν η τρέχουσα ώρα είναι ίδια ή μικρότερη από την ώρα της προηγούμενης εγγραφής, τότε χρησιμοποιείται η χρονική σήμανση της προηγούμενης εγγραφής. Επομένως, εάν ο χρόνος διακομιστή επιστρέψει στο χρόνο, το νέο αναγνωριστικό θα εξακολουθεί να διατηρεί την ιδιότητα αύξησης.

αριθμός ακολουθίας χρησιμοποιείται για εγγραφές που δημιουργούνται στο ίδιο χιλιοστό του δευτερολέπτου. αριθμός ακολουθίας θα αυξηθεί κατά 1 σε σχέση με την προηγούμενη καταχώριση. Επειδή η αριθμός ακολουθίας έχει μέγεθος 64 bit, τότε στην πράξη δεν θα πρέπει να αντιμετωπίζετε όριο στον αριθμό των εγγραφών που μπορούν να δημιουργηθούν μέσα σε ένα χιλιοστό του δευτερολέπτου.

Η μορφή τέτοιων αναγνωριστικών μπορεί να φαίνεται περίεργη με την πρώτη ματιά. Ένας δύσπιστος αναγνώστης μπορεί να αναρωτηθεί γιατί ο χρόνος είναι μέρος του αναγνωριστικού. Ο λόγος είναι ότι οι ροές Redis υποστηρίζουν ερωτήματα εύρους κατά ID. Εφόσον το αναγνωριστικό συσχετίζεται με την ώρα δημιουργίας της εγγραφής, αυτό καθιστά δυνατή την υποβολή ερωτημάτων σε χρονικά εύρη. Θα δούμε ένα συγκεκριμένο παράδειγμα όταν δούμε την εντολή XRANGE.

Εάν για κάποιο λόγο ο χρήστης χρειαστεί να καθορίσει το δικό του αναγνωριστικό, το οποίο, για παράδειγμα, σχετίζεται με κάποιο εξωτερικό σύστημα, τότε μπορούμε να το περάσουμε στην εντολή XADD αντί για * όπως φαίνεται παρακάτω:

> XADD somestream 0-1 field value
0-1
> XADD somestream 0-2 foo bar
0-2

Λάβετε υπόψη ότι σε αυτήν την περίπτωση πρέπει να παρακολουθείτε μόνοι σας την αύξηση της ταυτότητας. Στο παράδειγμά μας, το ελάχιστο αναγνωριστικό είναι "0-1", επομένως η εντολή δεν θα δεχτεί άλλο αναγνωριστικό που είναι ίσο ή μικρότερο από "0-1".

> XADD somestream 0-1 foo bar
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Αριθμός εγγραφών ανά ροή

Μπορείτε να λάβετε τον αριθμό των εγγραφών σε μια ροή απλά χρησιμοποιώντας την εντολή XLEN. Για το παράδειγμά μας, αυτή η εντολή θα επιστρέψει την ακόλουθη τιμή:

> XLEN somestream
(integer) 2

Ερωτήματα εύρους - XRANGE και XREVRANGE

Για να ζητήσουμε δεδομένα ανά περιοχή, πρέπει να καθορίσουμε δύο αναγνωριστικά - την αρχή και το τέλος του εύρους. Το επιστρεφόμενο εύρος θα περιλαμβάνει όλα τα στοιχεία, συμπεριλαμβανομένων των ορίων. Υπάρχουν επίσης δύο ειδικά αναγνωριστικά "-" και "+", αντίστοιχα που σημαίνουν το μικρότερο (πρώτη εγγραφή) και το μεγαλύτερο (τελευταία εγγραφή) αναγνωριστικό στη ροή. Το παρακάτω παράδειγμα θα αναφέρει όλες τις καταχωρήσεις ροής.

> XRANGE mystream - +
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1518951482479-0
   2) 1) "sensor-id"
      2) "9999"
      3) "temperature"
      4) "18.2"

Κάθε επιστρεφόμενη εγγραφή είναι ένας πίνακας δύο στοιχείων: ένα αναγνωριστικό και μια λίστα ζευγών κλειδιού-τιμής. Είπαμε ήδη ότι τα αναγνωριστικά ρεκόρ σχετίζονται με το χρόνο. Επομένως, μπορούμε να ζητήσουμε ένα εύρος μιας συγκεκριμένης χρονικής περιόδου. Ωστόσο, μπορούμε να καθορίσουμε στο αίτημα όχι το πλήρες αναγνωριστικό, αλλά μόνο τον χρόνο Unix, παραλείποντας το τμήμα που σχετίζεται με αριθμός ακολουθίας. Το τμήμα του αναγνωριστικού που παραλείφθηκε θα μηδενιστεί αυτόματα στην αρχή του εύρους και στη μέγιστη δυνατή τιμή στο τέλος του εύρους. Παρακάτω είναι ένα παράδειγμα για το πώς μπορείτε να ζητήσετε εύρος δύο χιλιοστών του δευτερολέπτου.

> XRANGE mystream 1518951480106 1518951480107
1) 1) 1518951480106-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

Έχουμε μόνο μία καταχώρηση σε αυτό το εύρος, ωστόσο σε πραγματικά σύνολα δεδομένων το αποτέλεσμα που επιστρέφεται μπορεί να είναι τεράστιο. Γι 'αυτό το λόγο XRANGE υποστηρίζει την επιλογή COUNT. Καθορίζοντας την ποσότητα, μπορούμε απλά να πάρουμε τις πρώτες N εγγραφές. Εάν πρέπει να λάβουμε τις επόμενες N εγγραφές (σελιδοποίηση), μπορούμε να χρησιμοποιήσουμε το τελευταίο αναγνωριστικό που λάβαμε, να το αυξήσουμε αριθμός ακολουθίας ένα και ξαναρωτήστε. Ας το δούμε αυτό στο παρακάτω παράδειγμα. Αρχίζουμε να προσθέτουμε 10 στοιχεία με XADD (υποθέτοντας ότι το mystream είχε ήδη γεμίσει με 10 στοιχεία). Για να ξεκινήσει η επανάληψη λαμβάνοντας 2 στοιχεία ανά εντολή, ξεκινάμε με το πλήρες εύρος αλλά με COUNT ίσο με 2.

> XRANGE mystream - + COUNT 2
1) 1) 1519073278252-0
   2) 1) "foo"
      2) "value_1"
2) 1) 1519073279157-0
   2) 1) "foo"
      2) "value_2"

Για να συνεχίσουμε την επανάληψη με τα επόμενα δύο στοιχεία, πρέπει να επιλέξουμε το τελευταίο αναγνωριστικό που λάβαμε, δηλαδή 1519073279157-0, και να προσθέσουμε 1 στο αριθμός ακολουθίας.
Το αναγνωριστικό που προκύπτει, σε αυτήν την περίπτωση 1519073279157-1, μπορεί τώρα να χρησιμοποιηθεί ως το νέο όρισμα έναρξης εύρους για την επόμενη κλήση XRANGE:

> XRANGE mystream 1519073279157-1 + COUNT 2
1) 1) 1519073280281-0
   2) 1) "foo"
      2) "value_3"
2) 1) 1519073281432-0
   2) 1) "foo"
      2) "value_4"

Και ούτω καθεξής. Επειδή πολυπλοκότητα XRANGE είναι O(log(N)) για αναζήτηση και μετά O(M) για επιστροφή M στοιχείων, τότε κάθε βήμα επανάληψης είναι γρήγορο. Έτσι, χρησιμοποιώντας XRANGE Οι ροές μπορούν να επαναληφθούν αποτελεσματικά.

Ομάδα XREVRANGE είναι το αντίστοιχο XRANGE, αλλά επιστρέφει τα στοιχεία με αντίστροφη σειρά:

> XREVRANGE mystream + - COUNT 1
1) 1) 1519073287312-0
   2) 1) "foo"
      2) "value_10"

Σημειώστε ότι η εντολή XREVRANGE παίρνει ορίσματα εμβέλειας που ξεκινούν και σταματούν με αντίστροφη σειρά.

Ανάγνωση νέων καταχωρήσεων χρησιμοποιώντας το XREAD

Συχνά προκύπτει το καθήκον της εγγραφής σε μια ροή και της λήψης μόνο νέων μηνυμάτων. Αυτή η ιδέα μπορεί να φαίνεται παρόμοια με το Redis Pub/Sub ή τον αποκλεισμό της λίστας Redis, αλλά υπάρχουν θεμελιώδεις διαφορές στον τρόπο χρήσης του Redis Stream:

  1. Κάθε νέο μήνυμα παραδίδεται σε κάθε συνδρομητή από προεπιλογή. Αυτή η συμπεριφορά διαφέρει από μια λίστα αποκλεισμού Redis, όπου ένα νέο μήνυμα θα διαβάζεται μόνο από έναν συνδρομητή.
  2. Ενώ στο Redis Pub/Sub όλα τα μηνύματα ξεχνιούνται και δεν παραμένουν ποτέ, στο Stream όλα τα μηνύματα διατηρούνται επ' αόριστον (εκτός εάν ο πελάτης προκαλέσει ρητά τη διαγραφή).
  3. Το Redis Stream σάς επιτρέπει να διαφοροποιείτε την πρόσβαση σε μηνύματα σε μία ροή. Ένας συγκεκριμένος συνδρομητής μπορεί να δει μόνο το προσωπικό του ιστορικό μηνυμάτων.

Μπορείτε να εγγραφείτε σε ένα νήμα και να λαμβάνετε νέα μηνύματα χρησιμοποιώντας την εντολή XREAD. Είναι λίγο πιο περίπλοκο από XRANGE, οπότε θα ξεκινήσουμε πρώτα με τα πιο απλά παραδείγματα.

> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1519073278252-0
         2) 1) "foo"
            2) "value_1"
      2) 1) 1519073279157-0
         2) 1) "foo"
            2) "value_2"

Το παραπάνω παράδειγμα δείχνει μια φόρμα χωρίς αποκλεισμό XREAD. Σημειώστε ότι η επιλογή COUNT είναι προαιρετική. Στην πραγματικότητα, η μόνη απαιτούμενη επιλογή εντολής είναι η επιλογή STREAMS, η οποία καθορίζει μια λίστα ροών μαζί με το αντίστοιχο μέγιστο αναγνωριστικό. Γράψαμε "STREAMS mystream 0" - θέλουμε να λαμβάνουμε όλες τις εγγραφές της ροής mystream με αναγνωριστικό μεγαλύτερο από "0-0". Όπως μπορείτε να δείτε από το παράδειγμα, η εντολή επιστρέφει το όνομα του νήματος επειδή μπορούμε να εγγραφούμε σε πολλά νήματα ταυτόχρονα. Θα μπορούσαμε να γράψουμε, για παράδειγμα, "STREAMS mystream otherstream 0 0". Λάβετε υπόψη ότι μετά την επιλογή STREAMS πρέπει πρώτα να παρέχουμε τα ονόματα όλων των απαιτούμενων ροών και μόνο μετά μια λίστα αναγνωριστικών.

Σε αυτή την απλή μορφή η εντολή δεν κάνει τίποτα ιδιαίτερο σε σύγκριση με XRANGE. Ωστόσο, το ενδιαφέρον είναι ότι μπορούμε εύκολα να στρίψουμε XREAD σε μια εντολή αποκλεισμού, καθορίζοντας το όρισμα BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Στο παραπάνω παράδειγμα, ορίζεται μια νέα επιλογή BLOCK με χρονικό όριο 0 χιλιοστών του δευτερολέπτου (αυτό σημαίνει απεριόριστη αναμονή). Επιπλέον, αντί να περάσει το συνηθισμένο αναγνωριστικό για τη ροή mystream, πέρασε ένα ειδικό αναγνωριστικό $. Αυτό το ειδικό αναγνωριστικό σημαίνει ότι XREAD πρέπει να χρησιμοποιεί το μέγιστο αναγνωριστικό στο mystream ως αναγνωριστικό. Έτσι θα λαμβάνουμε νέα μηνύματα μόνο από τη στιγμή που αρχίσαμε να ακούμε. Κατά κάποιο τρόπο αυτό είναι παρόμοιο με την εντολή "tail -f" του Unix.

Σημειώστε ότι όταν χρησιμοποιούμε την επιλογή BLOCK δεν χρειάζεται απαραίτητα να χρησιμοποιούμε το ειδικό αναγνωριστικό $. Μπορούμε να χρησιμοποιήσουμε οποιοδήποτε αναγνωριστικό υπάρχει στη ροή. Εάν η ομάδα μπορεί να εξυπηρετήσει το αίτημά μας αμέσως χωρίς αποκλεισμό, θα το κάνει, διαφορετικά θα αποκλείσει.

Μπλοκάρισμα XREAD μπορεί επίσης να ακούσει πολλά νήματα ταυτόχρονα, απλά πρέπει να καθορίσετε τα ονόματά τους. Σε αυτήν την περίπτωση, η εντολή θα επιστρέψει μια εγγραφή της πρώτης ροής που έλαβε δεδομένα. Ο πρώτος συνδρομητής που έχει αποκλειστεί για ένα δεδομένο νήμα θα λάβει πρώτα δεδομένα.

Ομάδες Καταναλωτών

Σε ορισμένες εργασίες, θέλουμε να περιορίσουμε την πρόσβαση των συνδρομητών σε μηνύματα εντός ενός νήματος. Ένα παράδειγμα όπου αυτό θα μπορούσε να είναι χρήσιμο είναι μια ουρά μηνυμάτων με εργαζόμενους που θα λαμβάνουν διαφορετικά μηνύματα από ένα νήμα, επιτρέποντας την επεξεργασία μηνυμάτων να κλιμακώνεται.

Αν φανταστούμε ότι έχουμε τρεις συνδρομητές C1, C2, C3 και ένα νήμα που περιέχει τα μηνύματα 1, 2, 3, 4, 5, 6, 7, τότε τα μηνύματα θα προβάλλονται όπως στο παρακάτω διάγραμμα:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Για να επιτύχει αυτό το αποτέλεσμα, το Redis Stream χρησιμοποιεί μια έννοια που ονομάζεται Ομάδα Καταναλωτών. Αυτή η ιδέα είναι παρόμοια με έναν ψευδο-συνδρομητή, ο οποίος λαμβάνει δεδομένα από μια ροή, αλλά στην πραγματικότητα εξυπηρετείται από πολλούς συνδρομητές σε μια ομάδα, παρέχοντας ορισμένες εγγυήσεις:

  1. Κάθε μήνυμα παραδίδεται σε διαφορετικό συνδρομητή εντός της ομάδας.
  2. Μέσα σε μια ομάδα, οι συνδρομητές προσδιορίζονται με το όνομά τους, το οποίο είναι μια συμβολοσειρά με διάκριση πεζών-κεφαλαίων. Εάν ένας συνδρομητής αποχωρήσει προσωρινά από την ομάδα, μπορεί να επαναφερθεί στην ομάδα χρησιμοποιώντας το δικό του μοναδικό όνομα.
  3. Κάθε Ομάδα Καταναλωτών ακολουθεί την έννοια του «πρώτου μη αναγνωσμένου μηνύματος». Όταν ένας συνδρομητής ζητά νέα μηνύματα, μπορεί να λάβει μόνο μηνύματα που δεν έχουν παραδοθεί ποτέ στο παρελθόν σε κανένα συνδρομητή εντός της ομάδας.
  4. Υπάρχει μια εντολή για ρητή επιβεβαίωση ότι η επεξεργασία του μηνύματος έγινε με επιτυχία από τον συνδρομητή. Μέχρι να καλέσετε αυτή την εντολή, το ζητούμενο μήνυμα θα παραμείνει σε κατάσταση "εκκρεμότητα".
  5. Εντός της Ομάδας Καταναλωτών, κάθε συνδρομητής μπορεί να ζητήσει ιστορικό μηνυμάτων που του παραδόθηκαν, αλλά δεν έχουν ακόμη υποβληθεί σε επεξεργασία (σε κατάσταση «εκκρεμεί»)

Κατά μία έννοια, η κατάσταση της ομάδας μπορεί να εκφραστεί ως εξής:

+----------------------------------------+
| consumer_group_name: mygroup          
| consumer_group_stream: somekey        
| last_delivered_id: 1292309234234-92    
|                                                           
| consumers:                                          
|    "consumer-1" with pending messages  
|       1292309234234-4                          
|       1292309234232-8                          
|    "consumer-42" with pending messages 
|       ... (and so forth)                             
+----------------------------------------+

Τώρα ήρθε η ώρα να εξοικειωθείτε με τις κύριες εντολές για την Ομάδα Καταναλωτών, δηλαδή:

  • XGROUP χρησιμοποιείται για τη δημιουργία, την καταστροφή και τη διαχείριση ομάδων
  • XREADGROUP χρησιμοποιείται για την ανάγνωση ροής μέσω ομάδας
  • XACK - αυτή η εντολή επιτρέπει στον συνδρομητή να επισημάνει το μήνυμα ως επιτυχώς επεξεργασμένο

Δημιουργία Ομάδας Καταναλωτών

Ας υποθέσουμε ότι το mystream υπάρχει ήδη. Τότε η εντολή δημιουργίας ομάδας θα μοιάζει με αυτό:

> XGROUP CREATE mystream mygroup $
OK

Κατά τη δημιουργία μιας ομάδας, πρέπει να περάσουμε ένα αναγνωριστικό, από το οποίο ξεκινάει η ομάδα να λαμβάνει μηνύματα. Εάν θέλουμε απλώς να λαμβάνουμε όλα τα νέα μηνύματα, τότε μπορούμε να χρησιμοποιήσουμε το ειδικό αναγνωριστικό $ (όπως στο παραπάνω παράδειγμά μας). Εάν καθορίσετε 0 αντί για ειδικό αναγνωριστικό, τότε όλα τα μηνύματα στο νήμα θα είναι διαθέσιμα στην ομάδα.

Τώρα που δημιουργήθηκε η ομάδα, μπορούμε να ξεκινήσουμε αμέσως την ανάγνωση μηνυμάτων χρησιμοποιώντας την εντολή XREADGROUP. Αυτή η εντολή μοιάζει πολύ με XREAD και υποστηρίζει την προαιρετική επιλογή BLOCK. Ωστόσο, υπάρχει μια απαιτούμενη επιλογή GROUP που πρέπει πάντα να καθορίζεται με δύο ορίσματα: το όνομα της ομάδας και το όνομα του συνδρομητή. Υποστηρίζεται επίσης η επιλογή COUNT.

Πριν διαβάσετε το νήμα, ας βάλουμε μερικά μηνύματα εκεί:

> XADD mystream * message apple
1526569495631-0
> XADD mystream * message orange
1526569498055-0
> XADD mystream * message strawberry
1526569506935-0
> XADD mystream * message apricot
1526569535168-0
> XADD mystream * message banana
1526569544280-0

Τώρα ας προσπαθήσουμε να διαβάσουμε αυτήν τη ροή μέσω της ομάδας:

> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Η παραπάνω εντολή διαβάζεται αυτολεξεί ως εξής:

"Εγώ, η συνδρομητής Alice, μέλος του mygroup, θέλω να διαβάσω ένα μήνυμα από το mystream που δεν έχει παραδοθεί ποτέ σε κανέναν στο παρελθόν."

Κάθε φορά που ένας συνδρομητής εκτελεί μια λειτουργία σε μια ομάδα, πρέπει να παρέχει το όνομά του, προσδιορίζοντας μοναδικά τον εαυτό του εντός της ομάδας. Υπάρχει μια ακόμη πολύ σημαντική λεπτομέρεια στην παραπάνω εντολή - το ειδικό αναγνωριστικό ">". Αυτό το ειδικό αναγνωριστικό φιλτράρει τα μηνύματα, αφήνοντας μόνο εκείνα που δεν έχουν παραδοθεί ποτέ στο παρελθόν.

Επίσης, σε ειδικές περιπτώσεις, μπορείτε να καθορίσετε ένα πραγματικό αναγνωριστικό όπως 0 ή οποιοδήποτε άλλο έγκυρο αναγνωριστικό. Σε αυτή την περίπτωση η εντολή XREADGROUP θα σας επιστρέψει ένα ιστορικό μηνυμάτων με κατάσταση "σε εκκρεμότητα" που παραδόθηκαν στον καθορισμένο συνδρομητή (Alice) αλλά δεν έχουν ακόμη επιβεβαιωθεί χρησιμοποιώντας την εντολή XACK.

Μπορούμε να ελέγξουμε αυτήν τη συμπεριφορά καθορίζοντας αμέσως το ID 0, χωρίς την επιλογή COUNT. Θα δούμε απλώς ένα μήνυμα σε εκκρεμότητα, δηλαδή το μήνυμα της Apple:

> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1526569495631-0
         2) 1) "message"
            2) "apple"

Ωστόσο, εάν επιβεβαιώσουμε ότι το μήνυμα επεξεργάστηκε επιτυχώς, τότε δεν θα εμφανίζεται πλέον:

> XACK mystream mygroup 1526569495631-0
(integer) 1
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

Τώρα είναι η σειρά του Μπομπ να διαβάσει κάτι:

> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1526569498055-0
         2) 1) "message"
            2) "orange"
      2) 1) 1526569506935-0
         2) 1) "message"
            2) "strawberry"

Ο Μπομπ, μέλος του mygroup, δεν ζήτησε περισσότερα από δύο μηνύματα. Η εντολή αναφέρει μόνο μηνύματα που δεν έχουν παραδοθεί λόγω του ειδικού αναγνωριστικού ">". Όπως μπορείτε να δείτε, το μήνυμα "μήλο" δεν θα εμφανιστεί αφού έχει ήδη παραδοθεί στην Αλίκη, οπότε ο Μπομπ λαμβάνει "πορτοκαλί" και "φράουλα".

Με αυτόν τον τρόπο, η Alice, ο Bob και οποιοσδήποτε άλλος συνδρομητής της ομάδας μπορούν να διαβάσουν διαφορετικά μηνύματα από την ίδια ροή. Μπορούν επίσης να διαβάσουν το ιστορικό των μη επεξεργασμένων μηνυμάτων τους ή να επισημάνουν μηνύματα ως επεξεργασμένα.

Υπάρχουν μερικά πράγματα που πρέπει να έχετε κατά νου:

  • Μόλις ο συνδρομητής θεωρήσει ότι το μήνυμα είναι εντολή XREADGROUP, αυτό το μήνυμα μεταβαίνει στην κατάσταση "εκκρεμότητα" και εκχωρείται στον συγκεκριμένο συνδρομητή. Οι άλλοι συνδρομητές της ομάδας δεν θα μπορούν να διαβάσουν αυτό το μήνυμα.
  • Οι συνδρομητές δημιουργούνται αυτόματα με την πρώτη αναφορά, δεν χρειάζεται να δημιουργηθούν ρητά.
  • Με XREADGROUP μπορείτε να διαβάσετε μηνύματα από πολλά διαφορετικά νήματα ταυτόχρονα, ωστόσο για να λειτουργήσει αυτό θα πρέπει πρώτα να δημιουργήσετε ομάδες με το ίδιο όνομα για κάθε νήμα χρησιμοποιώντας XGROUP

Αποκατάσταση ανακαίνισης

Ο συνδρομητής μπορεί να ανακάμψει από την αποτυχία και να ξαναδιαβάσει τη λίστα των μηνυμάτων του με την κατάσταση «σε εκκρεμότητα». Ωστόσο, στον πραγματικό κόσμο, οι συνδρομητές μπορεί τελικά να αποτύχουν. Τι συμβαίνει με τα κολλημένα μηνύματα ενός συνδρομητή εάν ο συνδρομητής δεν είναι σε θέση να συνέλθει από μια αποτυχία;
Η Ομάδα Καταναλωτών προσφέρει μια δυνατότητα που χρησιμοποιείται για τέτοιες περιπτώσεις - όταν πρέπει να αλλάξετε τον κάτοχο των μηνυμάτων.

Το πρώτο πράγμα που πρέπει να κάνετε είναι να καλέσετε την εντολή ΕΚΠΤΩΣΗ, το οποίο εμφανίζει όλα τα μηνύματα στην ομάδα με την κατάσταση "σε εκκρεμότητα". Στην απλούστερη μορφή της, η εντολή καλείται με δύο μόνο ορίσματα: το όνομα του νήματος και το όνομα της ομάδας:

> XPENDING mystream mygroup
1) (integer) 2
2) 1526569498055-0
3) 1526569506935-0
4) 1) 1) "Bob"
      2) "2"

Η ομάδα εμφάνισε τον αριθμό των μη επεξεργασμένων μηνυμάτων για ολόκληρη την ομάδα και για κάθε συνδρομητή. Έχουμε μόνο τον Μπομπ με δύο εκκρεμή μηνύματα επειδή το μόνο μήνυμα που ζήτησε η Αλίκη επιβεβαιώθηκε XACK.

Μπορούμε να ζητήσουμε περισσότερες πληροφορίες χρησιμοποιώντας περισσότερα επιχειρήματα:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - εύρος αναγνωριστικών (μπορείτε να χρησιμοποιήσετε "-" και "+")
{count} — αριθμός προσπαθειών παράδοσης
{consumer-name} - όνομα ομάδας

> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1
2) 1) 1526569506935-0
   2) "Bob"
   3) (integer) 74170458
   4) (integer) 1

Τώρα έχουμε λεπτομέρειες για κάθε μήνυμα: ID, όνομα συνδρομητή, χρόνο αδράνειας σε χιλιοστά του δευτερολέπτου και τέλος τον αριθμό των προσπαθειών παράδοσης. Έχουμε δύο μηνύματα από τον Bob και έχουν μείνει σε αδράνεια για 74170458 χιλιοστά του δευτερολέπτου, περίπου 20 ώρες.

Λάβετε υπόψη ότι κανείς δεν μας εμποδίζει να ελέγξουμε ποιο ήταν το περιεχόμενο του μηνύματος απλά χρησιμοποιώντας XRANGE.

> XRANGE mystream 1526569498055-0 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Απλώς πρέπει να επαναλάβουμε το ίδιο αναγνωριστικό δύο φορές στα ορίσματα. Τώρα που έχουμε κάποια ιδέα, η Alice μπορεί να αποφασίσει ότι μετά από 20 ώρες διακοπής λειτουργίας, ο Bob πιθανότατα δεν θα αναρρώσει και είναι καιρός να ρωτήσετε αυτά τα μηνύματα και να συνεχίσετε την επεξεργασία τους για τον Bob. Για αυτό χρησιμοποιούμε την εντολή XCLAIM:

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Χρησιμοποιώντας αυτήν την εντολή, μπορούμε να λάβουμε ένα "ξένο" μήνυμα που δεν έχει υποστεί ακόμη επεξεργασία αλλάζοντας τον ιδιοκτήτη σε {consumer}. Ωστόσο, μπορούμε επίσης να παρέχουμε έναν ελάχιστο χρόνο αδράνειας {min-idle-time}. Αυτό βοηθά στην αποφυγή μιας κατάστασης όπου δύο πελάτες προσπαθούν να αλλάξουν ταυτόχρονα τον κάτοχο των ίδιων μηνυμάτων:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Ο πρώτος πελάτης θα επαναφέρει το χρόνο διακοπής λειτουργίας και θα αυξήσει τον μετρητή παράδοσης. Έτσι ο δεύτερος πελάτης δεν θα μπορεί να το ζητήσει.

> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
   2) 1) "message"
      2) "orange"

Το μήνυμα διεκδικήθηκε με επιτυχία από την Alice, η οποία μπορεί πλέον να επεξεργαστεί το μήνυμα και να το αναγνωρίσει.

Από το παραπάνω παράδειγμα, μπορείτε να δείτε ότι ένα επιτυχημένο αίτημα επιστρέφει τα περιεχόμενα του ίδιου του μηνύματος. Ωστόσο, αυτό δεν είναι απαραίτητο. Η επιλογή JUSTID μπορεί να χρησιμοποιηθεί μόνο για την επιστροφή αναγνωριστικών μηνυμάτων. Αυτό είναι χρήσιμο εάν δεν σας ενδιαφέρουν οι λεπτομέρειες του μηνύματος και θέλετε να αυξήσετε την απόδοση του συστήματος.

μετρητής παράδοσης

Ο μετρητής που βλέπετε στην έξοδο ΕΚΠΤΩΣΗ είναι ο αριθμός των παραδόσεων κάθε μηνύματος. Ένας τέτοιος μετρητής αυξάνεται με δύο τρόπους: όταν ένα μήνυμα ζητείται επιτυχώς μέσω XCLAIM ή όταν χρησιμοποιείται μια κλήση XREADGROUP.

Είναι φυσιολογικό ορισμένα μηνύματα να παραδίδονται πολλές φορές. Το κύριο πράγμα είναι ότι όλα τα μηνύματα τελικά υποβάλλονται σε επεξεργασία. Μερικές φορές παρουσιάζονται προβλήματα κατά την επεξεργασία ενός μηνύματος επειδή το ίδιο το μήνυμα είναι κατεστραμμένο ή η επεξεργασία του μηνύματος προκαλεί σφάλμα στον κώδικα χειρισμού. Σε αυτήν την περίπτωση, μπορεί να αποδειχθεί ότι κανείς δεν θα μπορέσει να επεξεργαστεί αυτό το μήνυμα. Εφόσον διαθέτουμε μετρητή προσπάθειας παράδοσης, μπορούμε να χρησιμοποιήσουμε αυτόν τον μετρητή για να εντοπίσουμε τέτοιες καταστάσεις. Επομένως, μόλις ο αριθμός παραδόσεων φτάσει στον υψηλό αριθμό που έχετε καθορίσει, θα ήταν πιθανώς πιο συνετό να τοποθετήσετε ένα τέτοιο μήνυμα σε άλλο νήμα και να στείλετε μια ειδοποίηση στον διαχειριστή του συστήματος.

Κατάσταση νήματος

Ομάδα XINFO χρησιμοποιείται για να ζητήσει διάφορες πληροφορίες σχετικά με ένα νήμα και τις ομάδες του. Για παράδειγμα, μια βασική εντολή μοιάζει με αυτό:

> XINFO STREAM mystream
 1) length
 2) (integer) 13
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1524494395530-0
    2) 1) "a"
       2) "1"
       3) "b"
       4) "2"
11) last-entry
12) 1) 1526569544280-0
    2) 1) "message"
       2) "banana"

Η παραπάνω εντολή εμφανίζει γενικές πληροφορίες σχετικά με την καθορισμένη ροή. Τώρα ένα λίγο πιο σύνθετο παράδειγμα:

> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 2
   5) pending
   6) (integer) 2
2) 1) name
   2) "some-other-group"
   3) consumers
   4) (integer) 1
   5) pending
   6) (integer) 0

Η παραπάνω εντολή εμφανίζει γενικές πληροφορίες για όλες τις ομάδες του καθορισμένου νήματος

> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 9104628
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 1
   5) idle
   6) (integer) 83841983

Η παραπάνω εντολή εμφανίζει πληροφορίες για όλους τους συνδρομητές της καθορισμένης ροής και της ομάδας.
Εάν ξεχάσετε τη σύνταξη της εντολής, απλώς ζητήστε βοήθεια από την ίδια την εντολή:

> XINFO HELP
1) XINFO {subcommand} arg arg ... arg. Subcommands are:
2) CONSUMERS {key} {groupname}  -- Show consumer groups of group {groupname}.
3) GROUPS {key}                 -- Show the stream consumer groups.
4) STREAM {key}                 -- Show information about the stream.
5) HELP                         -- Print this help.

Όριο μεγέθους ροής

Πολλές εφαρμογές δεν θέλουν να συλλέγουν δεδομένα σε μια ροή για πάντα. Συχνά είναι χρήσιμο να υπάρχει ένας μέγιστος επιτρεπόμενος αριθμός μηνυμάτων ανά νήμα. Σε άλλες περιπτώσεις, είναι χρήσιμο να μετακινούνται όλα τα μηνύματα από ένα νήμα σε άλλο μόνιμο χώρο αποθήκευσης όταν επιτευχθεί το καθορισμένο μέγεθος νήματος. Μπορείτε να περιορίσετε το μέγεθος μιας ροής χρησιμοποιώντας την παράμετρο MAXLEN στην εντολή XADD:

> XADD mystream MAXLEN 2 * value 1
1526654998691-0
> XADD mystream MAXLEN 2 * value 2
1526654999635-0
> XADD mystream MAXLEN 2 * value 3
1526655000369-0
> XLEN mystream
(integer) 2
> XRANGE mystream - +
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Όταν χρησιμοποιείτε το MAXLEN, οι παλιές εγγραφές διαγράφονται αυτόματα όταν φτάσουν σε ένα καθορισμένο μήκος, επομένως η ροή έχει σταθερό μέγεθος. Ωστόσο, το κλάδεμα σε αυτή την περίπτωση δεν συμβαίνει με τον πιο αποτελεσματικό τρόπο στη μνήμη Redis. Μπορείτε να βελτιώσετε την κατάσταση ως εξής:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

Το όρισμα ~ στο παραπάνω παράδειγμα σημαίνει ότι δεν χρειάζεται απαραίτητα να περιορίσουμε το μήκος ροής σε μια συγκεκριμένη τιμή. Στο παράδειγμά μας, αυτός θα μπορούσε να είναι οποιοσδήποτε αριθμός μεγαλύτερος ή ίσος με 1000 (για παράδειγμα, 1000, 1010 ή 1030). Απλώς προσδιορίσαμε ρητά ότι θέλουμε η ροή μας να αποθηκεύει τουλάχιστον 1000 εγγραφές. Αυτό κάνει τη διαχείριση μνήμης πολύ πιο αποτελεσματική στο Redis.

Υπάρχει και ξεχωριστή ομάδα XTRIM, που κάνει το ίδιο πράγμα:

> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Μόνιμη αποθήκευση και αναπαραγωγή

Το Redis Stream αναπαράγεται ασύγχρονα σε slave κόμβους και αποθηκεύεται σε αρχεία όπως το AOF (στιγμιότυπο όλων των δεδομένων) και το RDB (ημερολόγιο όλων των λειτουργιών εγγραφής). Υποστηρίζεται επίσης η αναπαραγωγή της κατάστασης των Ομάδων Καταναλωτών. Επομένως, εάν ένα μήνυμα βρίσκεται σε κατάσταση «σε εκκρεμότητα» στον κύριο κόμβο, τότε στους υποτελείς κόμβους αυτό το μήνυμα θα έχει την ίδια κατάσταση.

Αφαίρεση μεμονωμένων στοιχείων από μια ροή

Υπάρχει μια ειδική εντολή για τη διαγραφή μηνυμάτων XDEL. Η εντολή λαμβάνει το όνομα του νήματος ακολουθούμενο από τα αναγνωριστικά μηνυμάτων που πρόκειται να διαγραφούν:

> XRANGE mystream - + COUNT 2
1) 1) 1526654999635-0
   2) 1) "value"
      2) "2"
2) 1) 1526655000369-0
   2) 1) "value"
      2) "3"
> XDEL mystream 1526654999635-0
(integer) 1
> XRANGE mystream - + COUNT 2
1) 1) 1526655000369-0
   2) 1) "value"
      2) "3"

Όταν χρησιμοποιείτε αυτήν την εντολή, πρέπει να λάβετε υπόψη ότι η πραγματική μνήμη δεν θα απελευθερωθεί αμέσως.

Ρεύματα μηδενικού μήκους

Η διαφορά μεταξύ των ροών και άλλων δομών δεδομένων Redis είναι ότι όταν άλλες δομές δεδομένων δεν έχουν πλέον στοιχεία μέσα τους, ως παρενέργεια, η ίδια η δομή δεδομένων θα αφαιρεθεί από τη μνήμη. Έτσι, για παράδειγμα, το ταξινομημένο σύνολο θα αφαιρεθεί εντελώς όταν η κλήση ZREM αφαιρέσει το τελευταίο στοιχείο. Αντίθετα, τα νήματα επιτρέπεται να παραμείνουν στη μνήμη ακόμη και χωρίς να υπάρχουν στοιχεία μέσα.

Συμπέρασμα

Το Redis Stream είναι ιδανικό για τη δημιουργία μεσιτών μηνυμάτων, ουρών μηνυμάτων, ενοποιημένης καταγραφής και διατήρησης ιστορικού συστημάτων συνομιλίας.

Όπως είπα κάποτε Νίκλαους Βιρθ, τα προγράμματα είναι αλγόριθμοι συν δομές δεδομένων και το Redis σας δίνει ήδη και τα δύο.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο