Reprocesamiento de eventos recibidos de Kafka

Reprocesamiento de eventos recibidos de Kafka

Hola Habr.

Recientemente, yo compartió su experiencia sobre qué parámetros utilizamos con más frecuencia como equipo para que Kafka Producer and Consumer se acerque a la entrega garantizada. En este artículo quiero contarles cómo organizamos el reprocesamiento de un evento recibido de Kafka como resultado de la indisponibilidad temporal del sistema externo.

Las aplicaciones modernas operan en un entorno muy complejo. Lógica empresarial envuelta en una pila de tecnología moderna, ejecutándose en una imagen de Docker administrada por un orquestador como Kubernetes u OpenShift, y comunicándose con otras aplicaciones o soluciones empresariales a través de una cadena de enrutadores físicos y virtuales. En un entorno así, algo siempre puede romperse, por lo que reprocesar eventos si uno de los sistemas externos no está disponible es una parte importante de nuestros procesos comerciales.

Cómo era antes de Kafka

Al principio del proyecto utilizamos IBM MQ para la entrega de mensajes asíncronos. Si se produjera algún error durante la operación del servicio, el mensaje recibido podría colocarse en una cola de mensajes fallidos (DLQ) para su posterior análisis manual. El DLQ se creó junto a la cola entrante y el mensaje se transfirió dentro de IBM MQ.

Si el error fuera temporal y pudiéramos determinarlo (por ejemplo, una ResourceAccessException en una llamada HTTP o una MongoTimeoutException en una solicitud de MongoDb), entonces la estrategia de reintento tendría efecto. Independientemente de la lógica de bifurcación de la aplicación, el mensaje original se movió a la cola del sistema para envío retrasado o a una aplicación separada que se creó hace mucho tiempo para reenviar mensajes. Esto incluye un número de reenvío en el encabezado del mensaje, que está vinculado al intervalo de retraso o al final de la estrategia a nivel de aplicación. Si hemos llegado al final de la estrategia pero el sistema externo aún no está disponible, el mensaje se colocará en el DLQ para su análisis manual.

Las soluciones de búsqueda

Buscando en Internet, puedes encontrar lo siguiente decisión. En resumen, se propone crear un tema para cada intervalo de retraso e implementar aplicaciones de consumo en el lateral, que leerán los mensajes con el retraso requerido.

Reprocesamiento de eventos recibidos de Kafka

A pesar de la gran cantidad de críticas positivas, no me parece del todo exitoso. En primer lugar, porque el desarrollador, además de implementar los requisitos comerciales, tendrá que dedicar mucho tiempo a implementar el mecanismo descrito.

Además, si el control de acceso está habilitado en el clúster de Kafka, tendrá que dedicar algún tiempo a crear temas y proporcionarles el acceso necesario. Además de esto, deberá seleccionar el parámetro retención.ms correcto para cada uno de los temas de reintento para que los mensajes tengan tiempo de reenviarse y no desaparezcan. La implementación y solicitud de acceso deberá repetirse para cada servicio nuevo o existente.

Veamos ahora qué mecanismos nos proporciona spring en general y spring-kafka en particular para el reprocesamiento de mensajes. Spring-kafka tiene una dependencia transitiva de spring-retry, que proporciona abstracciones para gestionar diferentes BackOffPolicies. Esta es una herramienta bastante flexible, pero su inconveniente importante es almacenar mensajes para reenviarlos en la memoria de la aplicación. Esto significa que reiniciar la aplicación debido a una actualización o un error operativo resultará en la pérdida de todos los mensajes pendientes de reprocesamiento. Dado que este punto es crítico para nuestro sistema, no lo consideramos más a fondo.

spring-kafka proporciona varias implementaciones de ContainerAwareErrorHandler, por ejemplo SeekToCurrentErrorHandler, con el que podrás procesar el mensaje más tarde sin cambiar el desplazamiento en caso de error. A partir de la versión de spring-kafka 2.3, fue posible configurar BackOffPolicy.

Este enfoque permite que los mensajes reprocesados ​​sobrevivan a los reinicios de la aplicación, pero todavía no existe un mecanismo DLQ. Elegimos esta opción a principios de 2019, creyendo con optimismo que DLQ no sería necesario (tuvimos suerte y, de hecho, no lo necesitábamos después de varios meses de operar la aplicación con dicho sistema de reprocesamiento). Los errores temporales provocaron que SeekToCurrentErrorHandler se activara. Los errores restantes se imprimieron en el registro, lo que generó una compensación y el procesamiento continuó con el siguiente mensaje.

Decisión definitiva

La implementación basada en SeekToCurrentErrorHandler nos impulsó a desarrollar nuestro propio mecanismo para reenviar mensajes.

En primer lugar, queríamos utilizar la experiencia existente y ampliarla según la lógica de la aplicación. Para una aplicación de lógica lineal, sería óptimo dejar de leer mensajes nuevos durante un breve período de tiempo especificado por la estrategia de reintento. Para otras aplicaciones, quería tener un punto único que aplicara la estrategia de reintento. Además, este único punto debe tener funcionalidad DLQ para ambos enfoques.

La estrategia de reintento en sí debe almacenarse en la aplicación, que es responsable de recuperar el siguiente intervalo cuando ocurre un error temporal.

Detener al consumidor para una aplicación de lógica lineal

Cuando se trabaja con spring-kafka, el código para detener al consumidor podría verse así:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

En el ejemplo, retryAt es el momento de reiniciar MessageListenerContainer si aún se está ejecutando. El relanzamiento se producirá en un hilo separado iniciado en TaskScheduler, cuya implementación también se proporciona en Spring.

Encontramos el valor retryAt de la siguiente manera:

  1. Se busca el valor del contador de recuperación.
  2. Según el valor del contador, se busca el intervalo de retraso actual en la estrategia de reintento. La estrategia se declara en la propia aplicación, elegimos el formato JSON para almacenarla.
  3. El intervalo que se encuentra en la matriz JSON contiene la cantidad de segundos después de los cuales será necesario repetir el procesamiento. Esta cantidad de segundos se agrega a la hora actual para formar el valor de retryAt.
  4. Si no se encuentra el intervalo, entonces el valor de retryAt es nulo y el mensaje se enviará a DLQ para su análisis manual.

Con este enfoque, lo único que queda es guardar el número de llamadas repetidas para cada mensaje que se está procesando actualmente, por ejemplo en la memoria de la aplicación. Mantener el recuento de reintentos en la memoria no es fundamental para este enfoque, ya que una aplicación de lógica lineal no puede manejar el procesamiento en su totalidad. A diferencia del reintento de primavera, reiniciar la aplicación no hará que todos los mensajes se pierdan para volver a procesarse, sino que simplemente reiniciará la estrategia.

Este enfoque ayuda a aliviar la carga del sistema externo, que puede no estar disponible debido a una carga muy pesada. Es decir, además del reprocesamiento, logramos la implementación del patrón cortacircuitos.

En nuestro caso, el umbral de error es solo 1 y, para minimizar el tiempo de inactividad del sistema debido a cortes temporales de la red, utilizamos una estrategia de reintento muy granular con pequeños intervalos de latencia. Es posible que esto no sea adecuado para todas las aplicaciones grupales, por lo que la relación entre el umbral de error y el valor del intervalo debe seleccionarse en función de las características del sistema.

Una aplicación separada para procesar mensajes de aplicaciones con lógica no determinista

A continuación se muestra un ejemplo de código que envía un mensaje a dicha aplicación (Retryer), que se reenviará al tema DESTINATION cuando se alcance el tiempo RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

El ejemplo muestra que mucha información se transmite en los encabezados. El valor de RETRY_AT se encuentra de la misma manera que para el mecanismo de reintento mediante la parada del Consumidor. Además de DESTINATION y RETRY_AT pasamos:

  • GROUP_ID, mediante el cual agrupamos mensajes para análisis manual y búsqueda simplificada.
  • ORIGINAL_PARTITION para intentar conservar el mismo consumidor para volver a procesarlo. Este parámetro puede ser nulo, en cuyo caso la nueva partición se obtendrá utilizando la clave record.key() del mensaje original.
  • Valor CONTADOR actualizado para seguir la estrategia de reintento.
  • SEND_TO es una constante que indica si el mensaje se envía para reprocesamiento al llegar a RETRY_AT o se coloca en DLQ.
  • MOTIVO: el motivo por el que se interrumpió el procesamiento del mensaje.

Retryer almacena mensajes para reenviarlos y analizarlos manualmente en PostgreSQL. Un temporizador inicia una tarea que encuentra mensajes con RETRY_AT y los envía de regreso a la partición ORIGINAL_PARTITION del tema DESTINATION con la clave record.key().

Una vez enviados, los mensajes se eliminan de PostgreSQL. El análisis manual de mensajes se produce en una interfaz de usuario simple que interactúa con Retryer a través de la API REST. Sus características principales son reenviar o eliminar mensajes de DLQ, ver información de errores y buscar mensajes, por ejemplo por nombre de error.

Dado que el control de acceso está habilitado en nuestros clústeres, es necesario solicitar adicionalmente acceso al tema que Retryer está escuchando y permitir que Retryer escriba en el tema de DESTINO. Esto es un inconveniente, pero, a diferencia del enfoque de temas de intervalo, tenemos un DLQ y una interfaz de usuario completos para administrarlo.

Hay casos en los que un tema entrante es leído por varios grupos de consumidores diferentes, cuyas aplicaciones implementan una lógica diferente. Reprocesar un mensaje a través de Retryer para una de estas aplicaciones dará como resultado un duplicado en la otra. Para protegernos contra esto, creamos un tema separado para el reprocesamiento. El mismo consumidor puede leer los temas entrantes y de reintento sin ninguna restricción.

Reprocesamiento de eventos recibidos de Kafka

De forma predeterminada, este enfoque no proporciona la funcionalidad de disyuntor; sin embargo, se puede agregar a la aplicación usando nube-de-primavera-netflix o nuevo disyuntor de nube de primavera, envolviendo los lugares donde se llaman los servicios externos en abstracciones apropiadas. Además, es posible elegir una estrategia para mamparo patrón, que también puede ser útil. Por ejemplo, en spring-cloud-netflix esto podría ser un grupo de subprocesos o un semáforo.

conclusión

Como resultado, tenemos una aplicación separada que nos permite repetir el procesamiento de mensajes si algún sistema externo no está disponible temporalmente.

Una de las principales ventajas de la aplicación es que puede ser utilizada por sistemas externos que se ejecutan en el mismo clúster de Kafka, ¡sin modificaciones significativas por su parte! Una aplicación de este tipo sólo necesitará acceder al tema de reintento, completar algunos encabezados de Kafka y enviar un mensaje al reintento. No es necesario levantar ninguna infraestructura adicional. Y para reducir la cantidad de mensajes transferidos desde la aplicación a Retryer y viceversa, identificamos aplicaciones con lógica lineal y las reprocesamos a través de la parada del Consumidor.

Fuente: habr.com

Añadir un comentario