Reprocessando eventos recebidos do Kafka

Reprocessando eventos recebidos do Kafka

Olá, Habr.

Recentemente, eu compartilhou sua experiência sobre quais parâmetros nós, como equipe, usamos com mais frequência para que o Produtor e Consumidor Kafka se aproxime da entrega garantida. Neste artigo quero contar como organizamos o reprocessamento de um evento recebido do Kafka em decorrência da indisponibilidade temporária do sistema externo.

Os aplicativos modernos operam em um ambiente muito complexo. Lógica de negócios envolvida em uma pilha de tecnologia moderna, executada em uma imagem Docker gerenciada por um orquestrador como Kubernetes ou OpenShift e comunicando-se com outros aplicativos ou soluções empresariais por meio de uma cadeia de roteadores físicos e virtuais. Nesse ambiente, algo sempre pode quebrar, portanto, o reprocessamento de eventos se um dos sistemas externos estiver indisponível é uma parte importante dos nossos processos de negócios.

Como era antes de Kafka

Anteriormente no projeto, usamos o IBM MQ para entrega assíncrona de mensagens. Se ocorrer algum erro durante a operação do serviço, a mensagem recebida poderá ser colocada em uma fila de mensagens mortas (DLQ) para análise manual adicional. O DLQ foi criado próximo à fila de entrada, a mensagem foi transferida dentro do IBM MQ.

Se o erro fosse temporário e pudéssemos determiná-lo (por exemplo, um ResourceAccessException em uma chamada HTTP ou um MongoTimeoutException em uma solicitação MongoDb), a estratégia de nova tentativa entraria em vigor. Independentemente da lógica de ramificação do aplicativo, a mensagem original foi movida para a fila do sistema para envio atrasado ou para um aplicativo separado criado há muito tempo para reenviar mensagens. Isso inclui um número de reenvio no cabeçalho da mensagem, que está vinculado ao intervalo de atraso ou ao final da estratégia no nível do aplicativo. Se chegarmos ao final da estratégia, mas o sistema externo ainda estiver indisponível, a mensagem será colocada no DLQ para análise manual.

Pesquisa de solução

Pesquisando na Internet, você pode encontrar o seguinte decisão. Resumindo, propõe-se criar um tópico para cada intervalo de atraso e implementar paralelamente aplicações Consumer, que irão ler as mensagens com o atraso necessário.

Reprocessando eventos recebidos do Kafka

Apesar do grande número de críticas positivas, não me parece totalmente bem-sucedido. Em primeiro lugar, porque o desenvolvedor, além de implementar os requisitos de negócio, terá que despender muito tempo implementando o mecanismo descrito.

Além disso, se o controle de acesso estiver habilitado no cluster Kafka, você terá que gastar algum tempo criando tópicos e fornecendo o acesso necessário a eles. Além disso, você precisará selecionar o parâmetro retention.ms correto para cada um dos tópicos de nova tentativa para que as mensagens tenham tempo de serem reenviadas e não desapareçam dele. A implementação e solicitação de acesso deverão ser repetidas para cada serviço existente ou novo.

Vamos agora ver quais mecanismos o spring em geral e o spring-kafka em particular nos fornecem para o reprocessamento de mensagens. Spring-kafka tem uma dependência transitiva de spring-retry, que fornece abstrações para gerenciar diferentes BackOffPolicies. Esta é uma ferramenta bastante flexível, mas sua desvantagem significativa é o armazenamento de mensagens para reenvio na memória do aplicativo. Isso significa que reiniciar o aplicativo devido a uma atualização ou erro operacional resultará na perda de todas as mensagens pendentes de reprocessamento. Como este ponto é crítico para o nosso sistema, não o consideramos mais detalhadamente.

O próprio spring-kafka fornece várias implementações de ContainerAwareErrorHandler, por exemplo SeekToCurrentErrorHandler, com o qual você pode processar a mensagem posteriormente sem alterar o deslocamento em caso de erro. A partir da versão spring-kafka 2.3, tornou-se possível definir BackOffPolicy.

Essa abordagem permite que mensagens reprocessadas sobrevivam às reinicializações do aplicativo, mas ainda não há mecanismo DLQ. Escolhemos esta opção no início de 2019, acreditando com otimismo que o DLQ não seria necessário (tivemos sorte e na verdade não precisávamos dele depois de vários meses operando o aplicativo com esse sistema de reprocessamento). Erros temporários causaram o disparo do SeekToCurrentErrorHandler. Os erros restantes foram impressos no log, resultando em um deslocamento, e o processamento continuou com a próxima mensagem.

Decisão final

A implementação baseada em SeekToCurrentErrorHandler nos levou a desenvolver nosso próprio mecanismo para reenvio de mensagens.

Em primeiro lugar, queríamos aproveitar a experiência existente e expandi-la dependendo da lógica da aplicação. Para uma aplicação de lógica linear, seria ideal interromper a leitura de novas mensagens por um curto período de tempo especificado pela estratégia de nova tentativa. Para outras aplicações, eu queria ter um único ponto que reforçasse a estratégia de nova tentativa. Além disso, este único ponto deve ter funcionalidade DLQ para ambas as abordagens.

A própria estratégia de nova tentativa deve ser armazenada na aplicação, que é responsável por recuperar o próximo intervalo quando ocorrer um erro temporário.

Parando o consumidor para uma aplicação de lógica linear

Ao trabalhar com spring-kafka, o código para parar o Consumidor pode ser mais ou menos assim:

public void pauseListenerContainer(MessageListenerContainer listenerContainer, 
                                   Instant retryAt) {
        if (nonNull(retryAt) && listenerContainer.isRunning()) {
            listenerContainer.stop();
            taskScheduler.schedule(() -> listenerContainer.start(), retryAt);
            return;
        }
        // to DLQ
    }

No exemplo, retryAt é o momento de reiniciar o MessageListenerContainer se ele ainda estiver em execução. O relançamento ocorrerá em um thread separado lançado no TaskScheduler, cuja implementação também será fornecida pela primavera.

Encontramos o valor retryAt da seguinte maneira:

  1. O valor do contador de rechamadas é consultado.
  2. Com base no valor do contador, o intervalo de atraso atual na estratégia de nova tentativa é pesquisado. A estratégia é declarada na própria aplicação, escolhemos o formato JSON para armazená-la.
  3. O intervalo encontrado na matriz JSON contém o número de segundos após os quais o processamento precisará ser repetido. Esse número de segundos é adicionado à hora atual para formar o valor de retryAt.
  4. Se o intervalo não for localizado, o valor de retryAt será nulo e a mensagem será enviada ao DLQ para análise manual.

Com esta abordagem, resta apenas salvar o número de chamadas repetidas para cada mensagem que está sendo processada no momento, por exemplo, na memória da aplicação. Manter a contagem de novas tentativas na memória não é crítico para esta abordagem, uma vez que uma aplicação de lógica linear não pode lidar com o processamento como um todo. Ao contrário do spring-retry, reiniciar o aplicativo não fará com que todas as mensagens sejam perdidas para serem reprocessadas, mas simplesmente reiniciará a estratégia.

Essa abordagem ajuda a aliviar a carga do sistema externo, que pode estar indisponível devido a uma carga muito pesada. Ou seja, além do reprocessamento, conseguimos a implementação do padrão disjuntor.

No nosso caso, o limite de erro é de apenas 1 e, para minimizar o tempo de inatividade do sistema devido a interrupções temporárias da rede, usamos uma estratégia de repetição muito granular com pequenos intervalos de latência. Isto pode não ser adequado para todas as aplicações de grupo, portanto a relação entre o limite de erro e o valor do intervalo deve ser selecionada com base nas características do sistema.

Um aplicativo separado para processar mensagens de aplicativos com lógica não determinística

Aqui está um exemplo de código que envia uma mensagem para tal aplicação (Retryer), que será reenviada para o tópico DESTINATION quando o horário RETRY_AT for atingido:


public <K, V> void retry(ConsumerRecord<K, V> record, String retryToTopic, 
                         Instant retryAt, String counter, String groupId, Exception e) {
        Headers headers = ofNullable(record.headers()).orElse(new RecordHeaders());
        List<Header> arrayOfHeaders = 
            new ArrayList<>(Arrays.asList(headers.toArray()));
        updateHeader(arrayOfHeaders, GROUP_ID, groupId::getBytes);
        updateHeader(arrayOfHeaders, DESTINATION, retryToTopic::getBytes);
        updateHeader(arrayOfHeaders, ORIGINAL_PARTITION, 
                     () -> Integer.toString(record.partition()).getBytes());
        if (nonNull(retryAt)) {
            updateHeader(arrayOfHeaders, COUNTER, counter::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "retry"::getBytes);
            updateHeader(arrayOfHeaders, RETRY_AT, retryAt.toString()::getBytes);
        } else {
            updateHeader(arrayOfHeaders, REASON, 
                         ExceptionUtils.getStackTrace(e)::getBytes);
            updateHeader(arrayOfHeaders, SEND_TO, "backout"::getBytes);
        }
        ProducerRecord<K, V> messageToSend =
            new ProducerRecord<>(retryTopic, null, null, record.key(), record.value(), arrayOfHeaders);
        kafkaTemplate.send(messageToSend);
    }

O exemplo mostra que muitas informações são transmitidas em cabeçalhos. O valor de RETRY_AT é encontrado da mesma forma que para o mecanismo de nova tentativa através da parada do Consumidor. Além de DESTINATION e RETRY_AT passamos:

  • GROUP_ID, pelo qual agrupamos mensagens para análise manual e pesquisa simplificada.
  • ORIGINAL_PARTITION para tentar manter o mesmo Consumidor para reprocessamento. Este parâmetro pode ser nulo, neste caso a nova partição será obtida utilizando a chave record.key() da mensagem original.
  • Valor COUNTER atualizado para seguir a estratégia de nova tentativa.
  • SEND_TO é uma constante que indica se a mensagem é enviada para reprocessamento ao atingir RETRY_AT ou colocada em DLQ.
  • REASON - o motivo pelo qual o processamento da mensagem foi interrompido.

O Retryer armazena mensagens para reenvio e análise manual no PostgreSQL. Um cronômetro inicia uma tarefa que encontra mensagens com RETRY_AT e as envia de volta para a partição ORIGINAL_PARTITION do tópico DESTINATION com a chave record.key().

Depois de enviadas, as mensagens são excluídas do PostgreSQL. A análise manual de mensagens ocorre em uma UI simples que interage com o Retryer por meio da API REST. Suas principais funcionalidades são reenviar ou excluir mensagens do DLQ, visualizar informações de erros e pesquisar mensagens, por exemplo, pelo nome do erro.

Como o controle de acesso está habilitado em nossos clusters, é necessário solicitar adicionalmente acesso ao tópico que o Retryer está escutando e permitir que o Retryer grave no tópico DESTINATION. Isso é inconveniente, mas, diferentemente da abordagem de tópico de intervalo, temos um DLQ e uma interface de usuário completos para gerenciá-lo.

Há casos em que um tópico recebido é lido por vários grupos de consumidores diferentes, cujas aplicações implementam lógicas diferentes. O reprocessamento de uma mensagem por meio do Retryer para um desses aplicativos resultará em uma duplicata no outro. Para nos proteger contra isso, criamos um tópico separado para reprocessamento. Os tópicos de entrada e nova tentativa podem ser lidos pelo mesmo Consumidor sem quaisquer restrições.

Reprocessando eventos recebidos do Kafka

Por padrão, esta abordagem não fornece funcionalidade de disjuntor, mas pode ser adicionada à aplicação usando primavera-nuvem-netflix ou novo disjuntor de nuvem de primavera, agrupando os locais onde os serviços externos são chamados em abstrações apropriadas. Além disso, torna-se possível escolher uma estratégia para anteparo padrão, que também pode ser útil. Por exemplo, em spring-cloud-netflix, isso pode ser um pool de threads ou um semáforo.

Jogar aviator online grátis: hack aviator funciona

Como resultado, temos um aplicativo separado que nos permite repetir o processamento de mensagens caso algum sistema externo esteja temporariamente indisponível.

Uma das principais vantagens do aplicativo é que ele pode ser utilizado por sistemas externos rodando no mesmo cluster Kafka, sem modificações significativas de sua parte! Tal aplicativo precisará apenas acessar o tópico de nova tentativa, preencher alguns cabeçalhos Kafka e enviar uma mensagem ao Retryer. Não há necessidade de criar qualquer infra-estrutura adicional. E para reduzir o número de mensagens transferidas da aplicação para o Retryer e vice-versa, identificamos as aplicações com lógica linear e as processamos novamente através da parada do Consumidor.

Fonte: habr.com

Adicionar um comentário