Γεγονότα επανεπεξεργασίας που ελήφθησαν από τον Κάφκα

Γεγονότα επανεπεξεργασίας που ελήφθησαν από τον Κάφκα

Γεια σου Χαμπρ.

Πρόσφατα εγώ μοιράστηκε την εμπειρία του σχετικά με το ποιες παραμέτρους χρησιμοποιούμε ως ομάδα συχνότερα για τον Παραγωγό και τον Καταναλωτή του Kafka για να πλησιάσουν την εγγυημένη παράδοση. Σε αυτό το άρθρο θέλω να σας πω πώς οργανώσαμε την εκ νέου επεξεργασία ενός συμβάντος που ελήφθη από τον Κάφκα ως αποτέλεσμα της προσωρινής μη διαθεσιμότητας του εξωτερικού συστήματος.

Οι σύγχρονες εφαρμογές λειτουργούν σε ένα πολύ περίπλοκο περιβάλλον. Η επιχειρηματική λογική τυλιγμένη σε μια στοίβα σύγχρονης τεχνολογίας, που εκτελείται σε μια εικόνα Docker που διαχειρίζεται ένας ενορχηστρωτής όπως ο Kubernetes ή το OpenShift και επικοινωνεί με άλλες εφαρμογές ή επιχειρηματικές λύσεις μέσω μιας αλυσίδας φυσικών και εικονικών δρομολογητών. Σε ένα τέτοιο περιβάλλον, κάτι μπορεί πάντα να σπάσει, επομένως η επανεπεξεργασία συμβάντων εάν ένα από τα εξωτερικά συστήματα δεν είναι διαθέσιμο είναι σημαντικό μέρος των επιχειρηματικών διαδικασιών μας.

Πώς ήταν πριν τον Κάφκα

Νωρίτερα στο έργο χρησιμοποιήσαμε το IBM MQ για ασύγχρονη παράδοση μηνυμάτων. Εάν παρουσιαστεί οποιοδήποτε σφάλμα κατά τη λειτουργία της υπηρεσίας, το ληφθέν μήνυμα θα μπορούσε να τοποθετηθεί σε μια ουρά νεκρών γραμμάτων (DLQ) για περαιτέρω χειροκίνητη ανάλυση. Το DLQ δημιουργήθηκε δίπλα στην εισερχόμενη ουρά, το μήνυμα μεταφέρθηκε μέσα στο IBM MQ.

Εάν το σφάλμα ήταν προσωρινό και μπορούσαμε να το προσδιορίσουμε (για παράδειγμα, ένα ResourceAccessException σε μια κλήση HTTP ή ένα MongoTimeoutException σε ένα αίτημα MongoDb), τότε η στρατηγική επανάληψης θα τεθεί σε ισχύ. Ανεξάρτητα από τη λογική διακλάδωσης της εφαρμογής, το αρχικό μήνυμα μεταφέρθηκε είτε στην ουρά του συστήματος για καθυστερημένη αποστολή, είτε σε μια ξεχωριστή εφαρμογή που είχε γίνει εδώ και πολύ καιρό για να στείλει ξανά μηνύματα. Αυτό περιλαμβάνει έναν αριθμό εκ νέου αποστολής στην κεφαλίδα του μηνύματος, ο οποίος συνδέεται με το διάστημα καθυστέρησης ή το τέλος της στρατηγικής σε επίπεδο εφαρμογής. Εάν έχουμε φτάσει στο τέλος της στρατηγικής αλλά το εξωτερικό σύστημα δεν είναι ακόμα διαθέσιμο, τότε το μήνυμα θα τοποθετηθεί στο DLQ για χειροκίνητη ανάλυση.

Λύση αναζήτησης

Αναζήτηση στο Διαδίκτυο, μπορείτε να βρείτε τα παρακάτω απόφαση. Εν ολίγοις, προτείνεται η δημιουργία θέματος για κάθε διάστημα καθυστέρησης και η εφαρμογή εφαρμογών Καταναλωτών στο πλάι, οι οποίες θα διαβάζουν τα μηνύματα με την απαιτούμενη καθυστέρηση.

Γεγονότα επανεπεξεργασίας που ελήφθησαν από τον Κάφκα

Παρά τον μεγάλο αριθμό θετικών κριτικών, μου φαίνεται ότι δεν είναι απολύτως επιτυχημένο. Πρώτα απ 'όλα, επειδή ο προγραμματιστής, εκτός από την υλοποίηση επιχειρηματικών απαιτήσεων, θα πρέπει να αφιερώσει πολύ χρόνο στην εφαρμογή του μηχανισμού που περιγράφεται.

Επιπλέον, εάν ο έλεγχος πρόσβασης είναι ενεργοποιημένος στο σύμπλεγμα Kafka, θα πρέπει να αφιερώσετε λίγο χρόνο στη δημιουργία θεμάτων και στην παροχή της απαραίτητης πρόσβασης σε αυτά. Επιπλέον, θα χρειαστεί να επιλέξετε τη σωστή παράμετρο retention.ms για καθένα από τα θέματα επανάληψης, ώστε τα μηνύματα να έχουν χρόνο να σταλούν εκ νέου και να μην εξαφανιστούν από αυτό. Η υλοποίηση και το αίτημα πρόσβασης θα πρέπει να επαναλαμβάνεται για κάθε υπάρχουσα ή νέα υπηρεσία.

Ας δούμε τώρα τι μηχανισμούς μας παρέχει το ελατήριο γενικά και το ελατήριο-κάφκα ειδικότερα για την επανεπεξεργασία μηνυμάτων. Το Spring-kafka έχει μια μεταβατική εξάρτηση από το spring-retry, το οποίο παρέχει αφαιρέσεις για τη διαχείριση διαφορετικών BackOffPolicies. Αυτό είναι ένα αρκετά ευέλικτο εργαλείο, αλλά το σημαντικό του μειονέκτημα είναι η αποθήκευση μηνυμάτων για εκ νέου αποστολή στη μνήμη της εφαρμογής. Αυτό σημαίνει ότι η επανεκκίνηση της εφαρμογής λόγω ενημέρωσης ή σφάλματος λειτουργίας θα έχει ως αποτέλεσμα την απώλεια όλων των μηνυμάτων που εκκρεμούν για επανεπεξεργασία. Δεδομένου ότι αυτό το σημείο είναι κρίσιμο για το σύστημά μας, δεν το εξετάσαμε περαιτέρω.

Το ίδιο το spring-kafka παρέχει αρκετές υλοποιήσεις του ContainerAwareErrorHandler, για παράδειγμα SearchToCurrentError Handler, με το οποίο μπορείτε να επεξεργαστείτε το μήνυμα αργότερα χωρίς μετατόπιση μετατόπισης σε περίπτωση σφάλματος. Ξεκινώντας με την έκδοση του spring-kafka 2.3, κατέστη δυνατό να ορίσετε το BackOffPolicy.

Αυτή η προσέγγιση επιτρέπει στα επανεπεξεργασμένα μηνύματα να επιβιώνουν από την επανεκκίνηση της εφαρμογής, αλλά δεν υπάρχει ακόμα μηχανισμός DLQ. Επιλέξαμε αυτή την επιλογή στις αρχές του 2019, πιστεύοντας αισιόδοξα ότι το DLQ δεν θα χρειαζόταν (ήμασταν τυχεροί και ουσιαστικά δεν το χρειαστήκαμε μετά από αρκετούς μήνες λειτουργίας της εφαρμογής με τέτοιο σύστημα επανεπεξεργασίας). Προσωρινά σφάλματα προκάλεσαν την ενεργοποίηση του SeekToCurrentErrorHandler. Τα υπόλοιπα σφάλματα εκτυπώθηκαν στο αρχείο καταγραφής, με αποτέλεσμα τη μετατόπιση και η επεξεργασία συνεχίστηκε με το επόμενο μήνυμα.

Τελική απόφαση

Η υλοποίηση που βασίζεται στο SeekToCurrentErrorHandler μας ώθησε να αναπτύξουμε τον δικό μας μηχανισμό για την εκ νέου αποστολή μηνυμάτων.

Πρώτα από όλα, θέλαμε να χρησιμοποιήσουμε την υπάρχουσα εμπειρία και να την επεκτείνουμε ανάλογα με τη λογική της εφαρμογής. Για μια εφαρμογή γραμμικής λογικής, θα ήταν βέλτιστο να σταματήσετε την ανάγνωση νέων μηνυμάτων για σύντομο χρονικό διάστημα που καθορίζεται από τη στρατηγική επανάληψης. Για άλλες εφαρμογές, ήθελα να έχω ένα μόνο σημείο που θα επέβαλλε τη στρατηγική επανάληψης δοκιμής. Επιπλέον, αυτό το μεμονωμένο σημείο πρέπει να έχει λειτουργικότητα DLQ και για τις δύο προσεγγίσεις.

Η ίδια η στρατηγική επανάληψης πρέπει να αποθηκευτεί στην εφαρμογή, η οποία είναι υπεύθυνη για την ανάκτηση του επόμενου διαστήματος όταν παρουσιαστεί ένα προσωρινό σφάλμα.

Διακοπή του καταναλωτή για μια εφαρμογή γραμμικής λογικής

Όταν εργάζεστε με το spring-kafka, ο κωδικός για να σταματήσετε τον Καταναλωτή μπορεί να μοιάζει κάπως έτσι:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Στο παράδειγμα, το retryAt είναι η ώρα για επανεκκίνηση του MessageListenerContainer, εάν εξακολουθεί να εκτελείται. Η επανεκκίνηση θα πραγματοποιηθεί σε ένα ξεχωριστό νήμα που θα ξεκινήσει στο TaskScheduler, η υλοποίηση του οποίου παρέχεται επίσης μέχρι την άνοιξη.

Βρίσκουμε την τιμή retryAt με τον ακόλουθο τρόπο:

  1. Αναζητείται η τιμή του μετρητή επανάκλησης.
  2. Με βάση την τιμή του μετρητή, αναζητείται το τρέχον διάστημα καθυστέρησης στη στρατηγική επανάληψης. Η στρατηγική δηλώνεται στην ίδια την εφαρμογή· επιλέξαμε τη μορφή JSON για να την αποθηκεύσουμε.
  3. Το διάστημα που βρέθηκε στον πίνακα JSON περιέχει τον αριθμό των δευτερολέπτων μετά τα οποία θα πρέπει να επαναληφθεί η επεξεργασία. Αυτός ο αριθμός των δευτερολέπτων προστίθεται στην τρέχουσα ώρα για να σχηματιστεί η τιμή για το retryAt.
  4. Εάν το διάστημα δεν βρεθεί, τότε η τιμή του retryAt είναι null και το μήνυμα θα σταλεί στο DLQ για μη αυτόματη ανάλυση.

Με αυτήν την προσέγγιση, το μόνο που μένει είναι να αποθηκεύσετε τον αριθμό των επαναλαμβανόμενων κλήσεων για κάθε μήνυμα που υποβάλλεται σε επεξεργασία, για παράδειγμα στη μνήμη της εφαρμογής. Η διατήρηση του αριθμού επανάληψης προσπαθειών στη μνήμη δεν είναι κρίσιμη για αυτήν την προσέγγιση, καθώς μια εφαρμογή γραμμικής λογικής δεν μπορεί να χειριστεί την επεξεργασία ως σύνολο. Σε αντίθεση με την επανάληψη της άνοιξης, η επανεκκίνηση της εφαρμογής δεν θα προκαλέσει την επανεπεξεργασία όλων των μηνυμάτων που χάνονται, αλλά απλώς θα επανεκκινήσει τη στρατηγική.

Αυτή η προσέγγιση βοηθά στην απομάκρυνση του φορτίου από το εξωτερικό σύστημα, το οποίο μπορεί να μην είναι διαθέσιμο λόγω πολύ μεγάλου φορτίου. Με άλλα λόγια, εκτός από την επανεπεξεργασία, πετύχαμε την υλοποίηση του πατρόν διακόπτη.

Στην περίπτωσή μας, το όριο σφάλματος είναι μόνο 1 και για να ελαχιστοποιήσουμε το χρόνο διακοπής λειτουργίας του συστήματος λόγω προσωρινών διακοπών δικτύου, χρησιμοποιούμε μια πολύ λεπτομερή στρατηγική επανάληψης δοκιμής με μικρά διαστήματα καθυστέρησης. Αυτό μπορεί να μην είναι κατάλληλο για όλες τις ομαδικές εφαρμογές, επομένως η σχέση μεταξύ του ορίου σφάλματος και της τιμής διαστήματος πρέπει να επιλέγεται με βάση τα χαρακτηριστικά του συστήματος.

Μια ξεχωριστή εφαρμογή επεξεργασίας μηνυμάτων από εφαρμογές με μη ντετερμινιστική λογική

Ακολουθεί ένα παράδειγμα κώδικα που στέλνει ένα μήνυμα σε μια τέτοια εφαρμογή (Επανάληψης δοκιμής), το οποίο θα σταλεί ξανά στο θέμα DESTINATION όταν φτάσει η ώρα RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Το παράδειγμα δείχνει ότι πολλές πληροφορίες μεταδίδονται σε κεφαλίδες. Η τιμή του RETRY_AT βρίσκεται με τον ίδιο τρόπο όπως και για τον μηχανισμό επανάληψης μέσω της στάσης Καταναλωτή. Εκτός από τα DESTINATION και RETRY_AT περνάμε:

  • GROUP_ID, με το οποίο ομαδοποιούμε μηνύματα για μη αυτόματη ανάλυση και απλοποιημένη αναζήτηση.
  • ORIGINAL_PARTITION για να προσπαθήσετε να διατηρήσετε τον ίδιο καταναλωτή για εκ νέου επεξεργασία. Αυτή η παράμετρος μπορεί να είναι null, οπότε το νέο διαμέρισμα θα ληφθεί χρησιμοποιώντας το κλειδί record.key() του αρχικού μηνύματος.
  • Ενημερώθηκε η τιμή COUNTER για να ακολουθήσετε τη στρατηγική επανάληψης.
  • Το SEND_TO είναι μια σταθερά που υποδεικνύει εάν το μήνυμα αποστέλλεται για επανεπεξεργασία μόλις φτάσει στο RETRY_AT ή τοποθετείται στο DLQ.
  • REASON - ο λόγος για τον οποίο διακόπηκε η επεξεργασία του μηνύματος.

Το Retrier αποθηκεύει μηνύματα για εκ νέου αποστολή και μη αυτόματη ανάλυση στο PostgreSQL. Ένα χρονόμετρο ξεκινά μια εργασία που βρίσκει μηνύματα με το RETRY_AT και τα στέλνει πίσω στο διαμέρισμα ORIGINAL_PARTITION του θέματος DESTINATION με το κλειδί record.key().

Μετά την αποστολή, τα μηνύματα διαγράφονται από την PostgreSQL. Η μη αυτόματη ανάλυση των μηνυμάτων πραγματοποιείται σε μια απλή διεπαφή χρήστη που αλληλεπιδρά με το Retryer μέσω REST API. Τα κύρια χαρακτηριστικά του είναι η εκ νέου αποστολή ή η διαγραφή μηνυμάτων από το DLQ, η προβολή πληροφοριών σφάλματος και η αναζήτηση μηνυμάτων, για παράδειγμα με βάση το όνομα σφάλματος.

Εφόσον ο έλεγχος πρόσβασης είναι ενεργοποιημένος στα συμπλέγματά μας, είναι απαραίτητο να ζητήσετε επιπλέον πρόσβαση στο θέμα που ακούει ο Retrier και να επιτρέψετε στον Retrier να γράψει στο θέμα DESTINATION. Αυτό δεν είναι βολικό, αλλά, σε αντίθεση με την προσέγγιση θεμάτων διαστήματος, έχουμε ένα πλήρες DLQ και UI για να το διαχειριστούμε.

Υπάρχουν περιπτώσεις που ένα εισερχόμενο θέμα διαβάζεται από πολλές διαφορετικές ομάδες καταναλωτών, των οποίων οι εφαρμογές εφαρμόζουν διαφορετική λογική. Η επανεπεξεργασία ενός μηνύματος μέσω του Retryer για μία από αυτές τις εφαρμογές θα έχει ως αποτέλεσμα ένα αντίγραφο από την άλλη. Για προστασία από αυτό, δημιουργούμε ένα ξεχωριστό θέμα για επανεπεξεργασία. Τα εισερχόμενα θέματα και τα θέματα επανάληψης μπορούν να διαβαστούν από τον ίδιο καταναλωτή χωρίς κανέναν περιορισμό.

Γεγονότα επανεπεξεργασίας που ελήφθησαν από τον Κάφκα

Από προεπιλογή αυτή η προσέγγιση δεν παρέχει λειτουργικότητα διακόπτη κυκλώματος, ωστόσο μπορεί να προστεθεί στην εφαρμογή χρησιμοποιώντας Spring-cloud-netflix ή καινούργιο διακόπτης κυκλώματος σύννεφων ελατηρίου, τυλίγοντας τα μέρη όπου καλούνται οι εξωτερικές υπηρεσίες σε κατάλληλες αφαιρέσεις. Επιπλέον, καθίσταται δυνατή η επιλογή μιας στρατηγικής για διάφραγμα μοτίβο, το οποίο μπορεί επίσης να είναι χρήσιμο. Για παράδειγμα, στο spring-cloud-netflix αυτό θα μπορούσε να είναι ένα thread pool ή ένα semaphore.

Παραγωγή

Ως αποτέλεσμα, έχουμε μια ξεχωριστή εφαρμογή που μας επιτρέπει να επαναλαμβάνουμε την επεξεργασία μηνυμάτων εάν κάποιο εξωτερικό σύστημα δεν είναι προσωρινά διαθέσιμο.

Ένα από τα βασικά πλεονεκτήματα της εφαρμογής είναι ότι μπορεί να χρησιμοποιηθεί από εξωτερικά συστήματα που τρέχουν στο ίδιο σύμπλεγμα Kafka, χωρίς σημαντικές τροποποιήσεις από την πλευρά τους! Μια τέτοια εφαρμογή θα χρειαστεί μόνο να έχει πρόσβαση στο θέμα επανάληψης δοκιμής, να συμπληρώσει μερικές κεφαλίδες του Κάφκα και να στείλει ένα μήνυμα στον Επαναληπτικό. Δεν χρειάζεται να δημιουργηθούν πρόσθετες υποδομές. Και για να μειώσουμε τον αριθμό των μηνυμάτων που μεταφέρονται από την εφαρμογή στο Retrier και πίσω, εντοπίσαμε εφαρμογές με γραμμική λογική και τις επεξεργαστήκαμε ξανά μέσω της στάσης Consumer.

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο