Wiederaufbereitung der von Kafka erhaltenen Ereignisse

Wiederaufbereitung der von Kafka erhaltenen Ereignisse

Hallo Habr.

Kürzlich habe ich teilte seine Erfahrungen darüber, welche Parameter wir als Team am häufigsten für Kafka Producer und Consumer verwenden, um der garantierten Lieferung näher zu kommen. In diesem Artikel möchte ich Ihnen erzählen, wie wir die erneute Verarbeitung eines von Kafka erhaltenen Ereignisses aufgrund der vorübergehenden Nichtverfügbarkeit des externen Systems organisiert haben.

Moderne Anwendungen arbeiten in einer sehr komplexen Umgebung. Geschäftslogik verpackt in einem modernen Technologie-Stack, läuft in einem Docker-Image, das von einem Orchestrator wie Kubernetes oder OpenShift verwaltet wird, und kommuniziert mit anderen Anwendungen oder Unternehmenslösungen über eine Kette physischer und virtueller Router. In einer solchen Umgebung kann immer etwas kaputt gehen. Daher ist die erneute Verarbeitung von Ereignissen, wenn eines der externen Systeme nicht verfügbar ist, ein wichtiger Teil unserer Geschäftsprozesse.

Wie es vor Kafka war

Zu Beginn des Projekts haben wir IBM MQ für die asynchrone Nachrichtenübermittlung verwendet. Wenn während des Betriebs des Dienstes ein Fehler auftritt, kann die empfangene Nachricht zur weiteren manuellen Analyse in eine Warteschlange für unzustellbare Nachrichten (DLQ) gestellt werden. Die DLQ wurde neben der Eingangswarteschlange erstellt, die Nachricht wurde innerhalb von IBM MQ übertragen.

Wenn der Fehler vorübergehend war und wir ihn feststellen konnten (z. B. eine ResourceAccessException bei einem HTTP-Aufruf oder eine MongoTimeoutException bei einer MongoDb-Anfrage), würde die Wiederholungsstrategie wirksam. Unabhängig von der Verzweigungslogik der Anwendung wurde die ursprüngliche Nachricht entweder in die Systemwarteschlange zum verzögerten Senden oder in eine separate Anwendung verschoben, die vor langer Zeit zum erneuten Senden von Nachrichten erstellt wurde. Dazu gehört eine Wiederholungsnummer im Nachrichtenheader, die an das Verzögerungsintervall oder das Ende der Strategie auf Anwendungsebene gebunden ist. Wenn wir das Ende der Strategie erreicht haben, das externe System jedoch immer noch nicht verfügbar ist, wird die Nachricht zur manuellen Analyse in die DLQ gestellt.

Suchlösungen

Suche im Internetfinden Sie Folgendes Entscheidung. Kurz gesagt wird vorgeschlagen, für jedes Verzögerungsintervall ein Thema zu erstellen und nebenbei Verbraucheranwendungen zu implementieren, die Nachrichten mit der erforderlichen Verzögerung lesen.

Wiederaufbereitung der von Kafka erhaltenen Ereignisse

Trotz der Vielzahl positiver Rezensionen scheint es mir nicht ganz gelungen zu sein. Erstens, weil der Entwickler neben der Umsetzung der Geschäftsanforderungen auch viel Zeit damit verbringen muss, den beschriebenen Mechanismus zu implementieren.

Wenn die Zugriffskontrolle auf dem Kafka-Cluster aktiviert ist, müssen Sie außerdem einige Zeit damit verbringen, Themen zu erstellen und den erforderlichen Zugriff darauf bereitzustellen. Darüber hinaus müssen Sie für jedes Wiederholungsthema den richtigen Parameter „retention.ms“ auswählen, damit Nachrichten Zeit zum erneuten Senden haben und nicht daraus verschwinden. Die Implementierung und Zugriffsanfrage muss für jeden bestehenden oder neuen Dienst wiederholt werden.

Schauen wir uns nun an, welche Mechanismen Spring im Allgemeinen und Spring-Kafka im Besonderen uns für die Nachrichtenverarbeitung zur Verfügung stellen. Spring-kafka hat eine transitive Abhängigkeit von spring-retry, das Abstraktionen für die Verwaltung verschiedener BackOffPolicies bereitstellt. Dies ist ein recht flexibles Tool, dessen wesentlicher Nachteil jedoch darin besteht, dass Nachrichten zum erneuten Senden im Anwendungsspeicher gespeichert werden. Dies bedeutet, dass ein Neustart der Anwendung aufgrund eines Updates oder eines Betriebsfehlers zum Verlust aller zur erneuten Verarbeitung anstehenden Nachrichten führt. Da dieser Punkt für unser System kritisch ist, haben wir ihn nicht weiter betrachtet.

spring-kafka selbst stellt beispielsweise mehrere Implementierungen von ContainerAwareErrorHandler bereit SeekToCurrentErrorHandler, mit dem Sie im Fehlerfall die Nachricht später ohne Verschiebung des Offsets weiterverarbeiten können. Ab der Version von Spring-Kafka 2.3 war es möglich, BackOffPolicy festzulegen.

Dieser Ansatz ermöglicht, dass erneut verarbeitete Nachrichten Anwendungsneustarts überstehen, es gibt jedoch immer noch keinen DLQ-Mechanismus. Wir haben uns Anfang 2019 für diese Option entschieden und waren optimistisch davon überzeugt, dass DLQ nicht benötigt werden würde (wir hatten Glück und brauchten es tatsächlich nicht, nachdem wir die Anwendung mehrere Monate lang mit einem solchen Aufbereitungssystem betrieben hatten). Vorübergehende Fehler führten zum Auslösen von SeekToCurrentErrorHandler. Die verbleibenden Fehler wurden im Protokoll gedruckt, was zu einem Offset führte und die Verarbeitung mit der nächsten Meldung fortgesetzt wurde.

Endgültige Entscheidung

Die auf SeekToCurrentErrorHandler basierende Implementierung veranlasste uns, einen eigenen Mechanismus zum erneuten Senden von Nachrichten zu entwickeln.

Zunächst wollten wir die vorhandenen Erfahrungen nutzen und je nach Anwendungslogik erweitern. Für eine Anwendung mit linearer Logik wäre es optimal, das Lesen neuer Nachrichten für einen kurzen, durch die Wiederholungsstrategie festgelegten Zeitraum anzuhalten. Für andere Anwendungen wollte ich einen einzigen Punkt haben, der die Wiederholungsstrategie durchsetzt. Darüber hinaus muss dieser einzelne Punkt für beide Ansätze über DLQ-Funktionalität verfügen.

Die Wiederholungsstrategie selbst muss in der Anwendung hinterlegt sein, die für das Abrufen des nächsten Intervalls verantwortlich ist, wenn ein vorübergehender Fehler auftritt.

Stoppen des Verbrauchers für eine lineare Logikanwendung

Bei der Arbeit mit Spring-Kafka könnte der Code zum Stoppen des Consumers etwa so aussehen:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Im Beispiel ist retryAt die Zeit zum Neustarten des MessageListenerContainer, wenn er noch ausgeführt wird. Der Neustart erfolgt in einem separaten Thread, der im TaskScheduler gestartet wird und dessen Implementierung ebenfalls von Spring bereitgestellt wird.

Wir finden den retryAt-Wert auf folgende Weise:

  1. Der Wert des Rückrufzählers wird nachgeschlagen.
  2. Basierend auf dem Zählerwert wird das aktuelle Verzögerungsintervall in der Wiederholungsstrategie gesucht. Die Strategie wird in der Anwendung selbst deklariert; wir haben das JSON-Format zum Speichern gewählt.
  3. Das im JSON-Array gefundene Intervall enthält die Anzahl der Sekunden, nach denen die Verarbeitung wiederholt werden muss. Diese Sekundenzahl wird zur aktuellen Zeit addiert, um den Wert für retryAt zu bilden.
  4. Wenn das Intervall nicht gefunden wird, ist der Wert von retryAt null und die Nachricht wird zur manuellen Analyse an DLQ gesendet.

Bei diesem Ansatz bleibt nur noch, die Anzahl der wiederholten Aufrufe für jede Nachricht, die gerade verarbeitet wird, beispielsweise im Anwendungsspeicher zu speichern. Für diesen Ansatz ist es nicht entscheidend, die Anzahl der Wiederholungen im Speicher zu belassen, da eine Anwendung mit linearer Logik die Verarbeitung nicht als Ganzes bewältigen kann. Im Gegensatz zum Spring-Retry führt ein Neustart der Anwendung nicht dazu, dass alle verlorenen Nachrichten erneut verarbeitet werden, sondern die Strategie wird einfach neu gestartet.

Dieser Ansatz trägt dazu bei, das externe System zu entlasten, das aufgrund einer sehr hohen Auslastung möglicherweise nicht verfügbar ist. Mit anderen Worten: Zusätzlich zur Wiederaufbereitung haben wir die Umsetzung des Musters erreicht Schutzschalter.

In unserem Fall beträgt die Fehlerschwelle nur 1, und um Systemausfallzeiten aufgrund vorübergehender Netzwerkausfälle zu minimieren, verwenden wir eine sehr granulare Wiederholungsstrategie mit kleinen Latenzintervallen. Dies ist möglicherweise nicht für alle Gruppenanwendungen geeignet, daher muss die Beziehung zwischen der Fehlerschwelle und dem Intervallwert basierend auf den Eigenschaften des Systems ausgewählt werden.

Eine separate Anwendung zur Verarbeitung von Nachrichten von Anwendungen mit nicht deterministischer Logik

Hier ist ein Beispiel für Code, der eine Nachricht an eine solche Anwendung (Retryer) sendet, die bei Erreichen der RETRY_AT-Zeit erneut an das DESTINATION-Thema gesendet wird:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

Das Beispiel zeigt, dass viele Informationen in Headern übertragen werden. Der Wert von RETRY_AT wird auf die gleiche Weise wie für den Wiederholungsmechanismus über den Consumer-Stopp ermittelt. Zusätzlich zu DESTINATION und RETRY_AT übergeben wir:

  • GROUP_ID, nach der wir Nachrichten zur manuellen Analyse und vereinfachten Suche gruppieren.
  • ORIGINAL_PARTITION, um zu versuchen, denselben Verbraucher für die erneute Verarbeitung beizubehalten. Dieser Parameter kann null sein. In diesem Fall wird die neue Partition mithilfe des Schlüssels „record.key()“ der ursprünglichen Nachricht abgerufen.
  • Der COUNTER-Wert wurde aktualisiert, um der Wiederholungsstrategie zu folgen.
  • SEND_TO ist eine Konstante, die angibt, ob die Nachricht bei Erreichen von RETRY_AT zur erneuten Verarbeitung gesendet oder in DLQ platziert wird.
  • REASON – der Grund, warum die Nachrichtenverarbeitung unterbrochen wurde.

Retryer speichert Nachrichten zum erneuten Senden und manuellen Parsen in PostgreSQL. Ein Timer startet eine Aufgabe, die Nachrichten mit RETRY_AT findet und sie mit dem Schlüssel record.key() an die ORIGINAL_PARTITION-Partition des DESTINATION-Themas zurücksendet.

Nach dem Senden werden Nachrichten aus PostgreSQL gelöscht. Das manuelle Parsen von Nachrichten erfolgt in einer einfachen Benutzeroberfläche, die über die REST-API mit Retryer interagiert. Seine Hauptfunktionen sind das erneute Senden oder Löschen von Nachrichten aus DLQ, das Anzeigen von Fehlerinformationen und das Suchen nach Nachrichten, beispielsweise anhand des Fehlernamens.

Da die Zugriffskontrolle auf unseren Clustern aktiviert ist, ist es notwendig, zusätzlich Zugriff auf das Thema anzufordern, das Retryer abhört, und Retryer zu erlauben, in das DESTINATION-Thema zu schreiben. Dies ist unpraktisch, aber im Gegensatz zum Intervall-Topic-Ansatz verfügen wir über einen vollwertigen DLQ und eine Benutzeroberfläche, um dies zu verwalten.

Es gibt Fälle, in denen ein eingehendes Thema von mehreren verschiedenen Verbrauchergruppen gelesen wird, deren Anwendungen unterschiedliche Logik implementieren. Die erneute Verarbeitung einer Nachricht über Retryer für eine dieser Anwendungen führt zu einem Duplikat in der anderen. Um uns davor zu schützen, erstellen wir ein eigenes Thema zur Nachbearbeitung. Die eingehenden und erneuten Versuchsthemen können vom selben Verbraucher ohne Einschränkungen gelesen werden.

Wiederaufbereitung der von Kafka erhaltenen Ereignisse

Standardmäßig bietet dieser Ansatz keine Leistungsschalterfunktion, kann jedoch mithilfe von zur Anwendung hinzugefügt werden Frühlingswolken-Netflix oder neu Spring Cloud-Leistungsschalter, indem es die Stellen, an denen externe Dienste aufgerufen werden, in entsprechende Abstraktionen einschließt. Darüber hinaus wird es möglich, eine Strategie zu wählen Schott Muster, das auch nützlich sein kann. In Spring-Cloud-Netflix könnte dies beispielsweise ein Thread-Pool oder ein Semaphor sein.

Abschluss

Daher verfügen wir über eine separate Anwendung, die es uns ermöglicht, die Nachrichtenverarbeitung zu wiederholen, wenn ein externes System vorübergehend nicht verfügbar ist.

Einer der Hauptvorteile der Anwendung besteht darin, dass sie von externen Systemen, die auf demselben Kafka-Cluster laufen, ohne nennenswerte Änderungen ihrerseits verwendet werden kann! Eine solche Anwendung muss lediglich auf das Retry-Thema zugreifen, einige Kafka-Header ausfüllen und eine Nachricht an den Retryer senden. Es besteht keine Notwendigkeit, zusätzliche Infrastruktur aufzubauen. Und um die Anzahl der von der Anwendung zum Retryer und zurück übertragenen Nachrichten zu reduzieren, haben wir Anwendungen mit linearer Logik identifiziert und sie über den Consumer-Stopp erneut verarbeitet.

Source: habr.com

Kommentar hinzufügen