Événements de retraitement reçus de Kafka

Événements de retraitement reçus de Kafka

Hé Habr.

Récemment, je a partagé son expérience sur les paramètres que nous, en tant qu'équipe, utilisons le plus souvent pour Kafka Producer et Consumer afin de nous rapprocher de la livraison garantie. Dans cet article je souhaite vous raconter comment nous avons organisé le retraitement d'un événement reçu de Kafka suite à une indisponibilité temporaire du système externe.

Les applications modernes fonctionnent dans un environnement très complexe. Logique métier enveloppée dans une pile technologique moderne, exécutée dans une image Docker gérée par un orchestrateur tel que Kubernetes ou OpenShift, et communiquant avec d'autres applications ou solutions d'entreprise via une chaîne de routeurs physiques et virtuels. Dans un tel environnement, quelque chose peut toujours tomber en panne, c'est pourquoi le retraitement des événements si l'un des systèmes externes n'est pas disponible constitue une partie importante de nos processus métier.

Comment c'était avant Kafka

Plus tôt dans le projet, nous avons utilisé IBM MQ pour la livraison asynchrone de messages. Si une erreur se produisait pendant le fonctionnement du service, le message reçu pourrait être placé dans une file d'attente de lettres mortes (DLQ) pour une analyse manuelle ultérieure. Le DLQ a été créé à côté de la file d'attente entrante, le message a été transféré dans IBM MQ.

Si l'erreur était temporaire et que nous pouvions la déterminer (par exemple, une ResourceAccessException sur un appel HTTP ou une MongoTimeoutException sur une requête MongoDb), alors la stratégie de nouvelle tentative prendrait effet. Quelle que soit la logique de branchement de l'application, le message d'origine a été déplacé soit vers la file d'attente du système pour un envoi différé, soit vers une application distincte conçue il y a longtemps pour renvoyer les messages. Cela inclut un numéro de renvoi dans l'en-tête du message, qui est lié à l'intervalle de délai ou à la fin de la stratégie au niveau de l'application. Si nous avons atteint la fin de la stratégie mais que le système externe est toujours indisponible, alors le message sera placé dans le DLQ pour analyse manuelle.

solutions de recherche

Recherche sur Internet, vous pouvez trouver ce qui suit décision. En bref, il est proposé de créer un sujet pour chaque intervalle de délai et d'implémenter des applications grand public qui liront les messages avec le délai requis.

Événements de retraitement reçus de Kafka

Malgré le grand nombre de critiques positives, cela ne me semble pas entièrement réussi. Tout d'abord, parce que le développeur, en plus de mettre en œuvre les exigences métier, devra consacrer beaucoup de temps à la mise en œuvre du mécanisme décrit.

De plus, si le contrôle d'accès est activé sur le cluster Kafka, vous devrez passer du temps à créer des sujets et à y fournir l'accès nécessaire. En plus de cela, vous devrez sélectionner le bon paramètre retention.ms pour chacun des sujets de nouvelle tentative afin que les messages aient le temps d'être renvoyés et n'en disparaissent pas. La mise en œuvre et la demande d'accès devront être répétées pour chaque service existant ou nouveau.

Voyons maintenant quels mécanismes spring en général et spring-kafka en particulier nous fournissent pour le retraitement des messages. Spring-kafka a une dépendance transitive sur spring-retry, qui fournit des abstractions pour gérer différentes BackOffPolicies. Il s'agit d'un outil assez flexible, mais son inconvénient majeur est le stockage des messages à renvoyer dans la mémoire de l'application. Cela signifie que le redémarrage de l'application suite à une mise à jour ou à une erreur de fonctionnement entraînera la perte de tous les messages en attente de retraitement. Ce point étant critique pour notre système, nous ne l’avons pas approfondi.

spring-kafka lui-même fournit plusieurs implémentations de ContainerAwareErrorHandler, par exemple SeekToCurrentErrorHandler, avec lequel vous pouvez traiter le message ultérieurement sans décaler le décalage en cas d'erreur. À partir de la version spring-kafka 2.3, il est devenu possible de définir BackOffPolicy.

Cette approche permet aux messages retraités de survivre aux redémarrages de l'application, mais il n'existe toujours pas de mécanisme DLQ. Nous avons choisi cette option début 2019, pensant avec optimisme que DLQ ne serait pas nécessaire (nous avons eu de la chance et n'en avons effectivement pas eu besoin après plusieurs mois d'exploitation de l'application avec un tel système de retraitement). Des erreurs temporaires ont provoqué le déclenchement de SeekToCurrentErrorHandler. Les erreurs restantes ont été imprimées dans le journal, entraînant un décalage, et le traitement s'est poursuivi avec le message suivant.

Décision finale

L'implémentation basée sur SeekToCurrentErrorHandler nous a incité à développer notre propre mécanisme de renvoi de messages.

Tout d’abord, nous souhaitions utiliser l’expérience existante et l’étendre en fonction de la logique applicative. Pour une application de logique linéaire, il serait optimal d’arrêter la lecture des nouveaux messages pendant une courte période spécifiée par la stratégie de nouvelle tentative. Pour les autres applications, je voulais avoir un seul point qui appliquerait la stratégie de nouvelle tentative. De plus, ce point unique doit disposer de la fonctionnalité DLQ pour les deux approches.

La stratégie de nouvelle tentative elle-même doit être stockée dans l'application, qui est chargée de récupérer l'intervalle suivant lorsqu'une erreur temporaire se produit.

Arrêter le consommateur pour une application logique linéaire

Lorsque vous travaillez avec spring-kafka, le code pour arrêter le consommateur peut ressembler à ceci :

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

Dans l'exemple, retryAt est l'heure de redémarrage du MessageListenerContainer s'il est toujours en cours d'exécution. La relance aura lieu dans un thread distinct lancé dans TaskScheduler, dont la mise en œuvre est également prévue d'ici Spring.

Nous trouvons la valeur retryAt de la manière suivante :

  1. La valeur du compteur de rappel est recherchée.
  2. Sur la base de la valeur du compteur, l'intervalle de retard actuel dans la stratégie de nouvelle tentative est recherché. La stratégie est déclarée dans l'application elle-même ; nous avons choisi le format JSON pour la stocker.
  3. L'intervalle trouvé dans le tableau JSON contient le nombre de secondes après lequel le traitement devra être répété. Ce nombre de secondes est ajouté à l'heure actuelle pour former la valeur de retryAt.
  4. Si l'intervalle n'est pas trouvé, alors la valeur de retryAt est nulle et le message sera envoyé à DLQ pour analyse manuelle.

Avec cette approche, il ne reste plus qu'à sauvegarder le nombre d'appels répétés pour chaque message en cours de traitement, par exemple dans la mémoire de l'application. Conserver le nombre de tentatives en mémoire n'est pas essentiel pour cette approche, car une application de logique linéaire ne peut pas gérer le traitement dans son ensemble. Contrairement au spring-retry, le redémarrage de l'application n'entraînera pas la perte du retraitement de tous les messages, mais redémarrera simplement la stratégie.

Cette approche permet de soulager le système externe, qui peut être indisponible en raison d'une charge très importante. Autrement dit, en plus du retraitement, nous avons réalisé la mise en œuvre du patron Disjoncteur.

Dans notre cas, le seuil d'erreur n'est que de 1, et pour minimiser les temps d'arrêt du système dus à des pannes temporaires du réseau, nous utilisons une stratégie de nouvelle tentative très granulaire avec de petits intervalles de latence. Cela peut ne pas convenir à toutes les applications de groupe, c'est pourquoi la relation entre le seuil d'erreur et la valeur d'intervalle doit être sélectionnée en fonction des caractéristiques du système.

Une application distincte pour traiter les messages provenant d'applications avec une logique non déterministe

Voici un exemple de code qui envoie un message à une telle application (Retryer), qui le renverra au sujet DESTINATION lorsque l'heure RETRY_AT sera atteinte :


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

L'exemple montre que de nombreuses informations sont transmises dans les en-têtes. La valeur de RETRY_AT est trouvée de la même manière que pour le mécanisme de nouvelle tentative via l'arrêt Consommateur. En plus de DESTINATION et RETRY_AT on passe :

  • GROUP_ID, par lequel nous regroupons les messages pour une analyse manuelle et une recherche simplifiée.
  • ORIGINAL_PARTITION pour essayer de conserver le même consommateur pour le retraitement. Ce paramètre peut être nul, auquel cas la nouvelle partition sera obtenue à l'aide de la clé record.key() du message d'origine.
  • Valeur COUNTER mise à jour pour suivre la stratégie de nouvelle tentative.
  • SEND_TO est une constante indiquant si le message est envoyé pour retraitement lorsqu'il atteint RETRY_AT ou placé dans DLQ.
  • RAISON - la raison pour laquelle le traitement du message a été interrompu.

Retryer stocke les messages à renvoyer et à analyser manuellement dans PostgreSQL. Un timer démarre une tâche qui recherche les messages avec RETRY_AT et les renvoie à la partition ORIGINAL_PARTITION du sujet DESTINATION avec la clé record.key().

Une fois envoyés, les messages sont supprimés de PostgreSQL. L'analyse manuelle des messages s'effectue dans une interface utilisateur simple qui interagit avec Retryer via l'API REST. Ses principales fonctionnalités sont le renvoi ou la suppression de messages de DLQ, l'affichage des informations sur les erreurs et la recherche de messages, par exemple par nom d'erreur.

Le contrôle d'accès étant activé sur nos clusters, il est nécessaire de demander en plus l'accès au sujet que Retryer écoute et de permettre à Retryer d'écrire dans le sujet DESTINATION. Ceci n’est pas pratique, mais contrairement à l’approche thématique par intervalles, nous disposons d’un DLQ et d’une interface utilisateur à part entière pour le gérer.

Il existe des cas où un sujet entrant est lu par plusieurs groupes de consommateurs différents, dont les applications implémentent une logique différente. Le retraitement d'un message via Retryer pour l'une de ces applications entraînera un doublon pour l'autre. Pour nous protéger contre cela, nous créons un sujet distinct pour le retraitement. Les sujets entrants et de nouvelle tentative peuvent être lus par le même consommateur sans aucune restriction.

Événements de retraitement reçus de Kafka

Par défaut, cette approche ne fournit pas de fonctionnalité de disjoncteur, mais elle peut être ajoutée à l'application en utilisant printemps-nuage-netflix ou neuf disjoncteur nuage de printemps, enveloppant les endroits où les services externes sont appelés dans des abstractions appropriées. De plus, il devient possible de choisir une stratégie pour cloison modèle, qui peut également être utile. Par exemple, dans spring-cloud-netflix, cela pourrait être un pool de threads ou un sémaphore.

conclusion

En conséquence, nous disposons d'une application distincte qui nous permet de répéter le traitement des messages si un système externe est temporairement indisponible.

L’un des principaux avantages de l’application est qu’elle peut être utilisée par des systèmes externes fonctionnant sur le même cluster Kafka, sans modifications significatives de leur part ! Une telle application n'aura qu'à accéder au sujet de nouvelle tentative, à remplir quelques en-têtes Kafka et à envoyer un message au Retryer. Il n’est pas nécessaire de construire des infrastructures supplémentaires. Et afin de réduire le nombre de messages transférés de l'application vers Retryer et inversement, nous avons identifié les applications avec une logique linéaire et les avons retraitées via l'arrêt Consumer.

Source: habr.com

Ajouter un commentaire