Reprocesando eventos recibidos de Kafka

Reprocesando eventos recibidos de Kafka

Ola Habr.

Recentemente eu compartiu a súa experiencia sobre os parámetros que usamos como equipo para que o produtor e o consumidor de Kafka se acheguen á entrega garantida. Neste artigo quero contarvos como organizamos o reprocesamento dun evento recibido de Kafka como consecuencia da indisponibilidade temporal do sistema externo.

As aplicacións modernas funcionan nun ambiente moi complexo. Lóxica empresarial envolta nunha pila de tecnoloxía moderna, que se executa nunha imaxe de Docker xestionada por un orquestrador como Kubernetes ou OpenShift e que se comunica con outras aplicacións ou solucións empresariais a través dunha cadea de enrutadores físicos e virtuais. Neste ambiente, algo sempre pode romper, polo que reprocesar eventos se un dos sistemas externos non está dispoñible é unha parte importante dos nosos procesos de negocio.

Como era antes de Kafka

Anteriormente no proxecto utilizamos IBM MQ para a entrega de mensaxes asíncronas. Se se produciu algún erro durante o funcionamento do servizo, a mensaxe recibida podería colocarse nunha cola de cartas mortas (DLQ) para unha posterior análise manual. O DLQ creouse xunto á cola de entrada, a mensaxe foi transferida dentro de IBM MQ.

Se o erro fose temporal e puidésemos determinalo (por exemplo, unha ResourceAccessException nunha chamada HTTP ou unha MongoTimeoutException nunha solicitude MongoDb), a estratexia de reintento entraría en vigor. Independentemente da lóxica de ramificación da aplicación, a mensaxe orixinal moveuse á cola do sistema para o envío atrasado ou a unha aplicación separada que se fixo hai moito tempo para reenviar mensaxes. Isto inclúe un número de reenvío na cabeceira da mensaxe, que está ligado ao intervalo de atraso ou ao final da estratexia a nivel de aplicación. Se chegamos ao final da estratexia pero o sistema externo aínda non está dispoñible, entón a mensaxe colocarase no DLQ para a análise manual.

Atopar unha solución

Buscando en Internet, podes atopar o seguinte decisión. En definitiva, proponse crear un tema para cada intervalo de atraso e implementar ao lado aplicacións Consumer, que lerán as mensaxes coa demora requirida.

Reprocesando eventos recibidos de Kafka

A pesar da gran cantidade de críticas positivas, paréceme que non ten éxito. En primeiro lugar, porque o desenvolvedor, ademais de implementar os requisitos comerciais, terá que pasar moito tempo implementando o mecanismo descrito.

Ademais, se o control de acceso está activado no clúster de Kafka, terás que dedicar algún tempo a crear temas e proporcionar o acceso necesario a eles. Ademais disto, terás que seleccionar o parámetro retention.ms correcto para cada un dos temas de reintento para que as mensaxes teñan tempo de reenviarse e non desaparezan del. A implantación e solicitude de acceso terá que repetirse para cada servizo existente ou novo.

Vexamos agora que mecanismos nos proporcionan spring en xeral e spring-kafka en particular para o reprocesamento de mensaxes. Spring-kafka ten unha dependencia transitiva de spring-retry, que proporciona abstraccións para xestionar diferentes BackOffPolicies. Esta é unha ferramenta bastante flexible, pero o seu inconveniente importante é almacenar mensaxes para reenviar na memoria da aplicación. Isto significa que reiniciar a aplicación debido a unha actualización ou un erro operativo provocará a perda de todas as mensaxes pendentes de reprocesar. Dado que este punto é fundamental para o noso sistema, non o consideramos máis.

Spring-kafka proporciona varias implementacións de ContainerAwareErrorHandler, por exemplo SeekToCurrentErrorHandler, co que pode procesar a mensaxe máis tarde sen desprazar a compensación en caso de erro. Comezando coa versión de spring-kafka 2.3, fíxose posible establecer BackOffPolicy.

Este enfoque permite que as mensaxes reprocesadas sobrevivan aos reinicios das aplicacións, pero aínda non hai mecanismo DLQ. Escollemos esta opción a principios de 2019, crendo con optimismo que DLQ non sería necesario (tivemos sorte e, de feito, non o necesitamos despois de varios meses de operar a aplicación cun sistema de reprocesamento deste tipo). Erros temporais provocaron que SeekToCurrentErrorHandler se disparase. Os erros restantes foron impresos no rexistro, o que resultou nunha compensación, e o procesamento continuou coa seguinte mensaxe.

Decisión final

A implementación baseada en SeekToCurrentErrorHandler levounos a desenvolver o noso propio mecanismo para reenviar mensaxes.

En primeiro lugar, queriamos utilizar a experiencia existente e ampliala dependendo da lóxica da aplicación. Para unha aplicación de lóxica lineal, sería óptimo deixar de ler novas mensaxes durante un curto período de tempo especificado pola estratexia de reintento. Para outras aplicacións, quería ter un único punto que aplicase a estratexia de reintento. Ademais, este único punto debe ter funcionalidade DLQ para ambos enfoques.

A propia estratexia de reintento debe almacenarse na aplicación, que se encarga de recuperar o seguinte intervalo cando se produza un erro temporal.

Deter o consumidor para unha aplicación de lóxica lineal

Cando se traballa con spring-kafka, o código para deter o Consumidor pode verse así:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

No exemplo, retryAt é o momento de reiniciar o MessageListenerContainer se aínda está en execución. O reinicio producirase nun fío separado lanzado en TaskScheduler, cuxa implementación tamén se proporciona en primavera.

Atopamos o valor retryAt do seguinte xeito:

  1. Búscase o valor do contador de recuperación.
  2. En función do valor do contador, búscase o intervalo de atraso actual na estratexia de reintento. A estratexia declárase na propia aplicación; escollemos o formato JSON para almacenala.
  3. O intervalo atopado na matriz JSON contén o número de segundos despois dos cales o procesamento deberá repetirse. Este número de segundos engádese ao tempo actual para formar o valor para retryAt.
  4. Se non se atopa o intervalo, entón o valor de retryAt é nulo e a mensaxe enviarase a DLQ para a análise manual.

Con este enfoque, só resta gardar o número de chamadas repetidas por cada mensaxe que se está a procesar actualmente, por exemplo na memoria da aplicación. Manter o reconto de reintentos na memoria non é fundamental para este enfoque, xa que unha aplicación de lóxica lineal non pode xestionar o procesamento no seu conxunto. A diferenza do spring-retry, o reinicio da aplicación non fará que se reprocesen todas as mensaxes que se perdan, senón que simplemente reiniciará a estratexia.

Este enfoque axuda a quitar a carga do sistema externo, que pode non estar dispoñible debido a unha carga moi pesada. Noutras palabras, ademais do reprocesamento, conseguimos a implantación do patrón interruptor.

No noso caso, o limiar de erro é só 1, e para minimizar o tempo de inactividade do sistema debido ás interrupcións temporais da rede, utilizamos unha estratexia de reintento moi granular con pequenos intervalos de latencia. Isto pode non ser adecuado para todas as aplicacións de grupo, polo que a relación entre o limiar de erro e o valor do intervalo debe seleccionarse en función das características do sistema.

Unha aplicación separada para procesar mensaxes de aplicacións con lóxica non determinista

Aquí tes un exemplo de código que envía unha mensaxe a unha aplicación deste tipo (Reintento), que se reenviará ao tema DESTINATION cando se alcance a hora RETRY_AT:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

O exemplo mostra que se transmite moita información nas cabeceiras. O valor de RETRY_AT atópase do mesmo xeito que para o mecanismo de reintento a través da parada do consumidor. Ademais de DESTINATION e RETRY_AT pasamos:

  • GROUP_ID, mediante o cal agrupamos mensaxes para a análise manual e a busca simplificada.
  • ORIGINAL_PARTITION para tentar manter o mesmo consumidor para volver procesar. Este parámetro pode ser nulo, nese caso a nova partición obterase mediante a clave record.key() da mensaxe orixinal.
  • Actualizouse o valor COUNTER para seguir a estratexia de reintento.
  • SEND_TO é unha constante que indica se a mensaxe se envía para reprocesar ao chegar a RETRY_AT ou se coloca en DLQ.
  • REASON: o motivo polo que se interrompeu o procesamento da mensaxe.

Retryer almacena mensaxes para reenviar e analizar manualmente en PostgreSQL. Un temporizador inicia unha tarefa que busca mensaxes con RETRY_AT e envíaas de volta á partición ORIGINAL_PARTITION do tema DESTINATION coa clave record.key().

Unha vez enviadas, as mensaxes elimínanse de PostgreSQL. A análise manual de mensaxes prodúcese nunha IU sinxela que interactúa con Retryer a través da API REST. As súas principais características son reenviar ou eliminar mensaxes de DLQ, ver información de erros e buscar mensaxes, por exemplo polo nome do erro.

Dado que o control de acceso está activado nos nosos clústeres, é necesario solicitar adicionalmente acceso ao tema que está escoitando Retryer e permitir que Retryer escriba no tema DESTINO. Isto é un inconveniente, pero, a diferenza do enfoque de temas de intervalo, temos un DLQ e unha interface de usuario completos para xestionalo.

Hai casos nos que un tema entrante é lido por varios grupos de consumidores diferentes, cuxas aplicacións implementan unha lóxica diferente. Reprocesar unha mensaxe a través de Retryer para unha destas aplicacións producirá un duplicado na outra. Para protexernos contra isto, creamos un tema separado para o reprocesamento. O mesmo Consumidor pode ler os temas entrantes e reintentos sen ningunha restrición.

Reprocesando eventos recibidos de Kafka

De forma predeterminada, este enfoque non proporciona funcionalidade de interruptor de circuito, pero pódese engadir á aplicación mediante primavera-nube-netflix ou novo disyuntor de nube de primavera, envolvendo os lugares onde se chaman servizos externos en abstraccións axeitadas. Ademais, faise posible escoller unha estratexia para mamparo patrón, que tamén pode ser útil. Por exemplo, en spring-cloud-netflix isto podería ser un grupo de fíos ou un semáforo.

Saída

Como resultado, temos unha aplicación separada que nos permite repetir o procesamento de mensaxes se algún sistema externo non está dispoñible temporalmente.

Unha das principais vantaxes da aplicación é que pode ser usada por sistemas externos que se executan no mesmo clúster de Kafka, sen modificacións significativas do seu lado. Esta aplicación só terá que acceder ao tema de reintento, cubrir algunhas cabeceiras de Kafka e enviar unha mensaxe ao reintento. Non hai necesidade de aumentar ningunha infraestrutura adicional. E para reducir o número de mensaxes transferidas desde a aplicación a Retryer e viceversa, identificamos aplicacións con lóxica lineal e procesámolas de novo a través da parada Consumer.

Fonte: www.habr.com

Engadir un comentario