Como Kafka se tornou realidade

Como Kafka se tornou realidade

Oi, Habr!

Trabalho na equipe Tinkoff, que está desenvolvendo sua própria central de notificações. Desenvolvo principalmente em Java usando Spring boot e resolvo diversos problemas técnicos que surgem em um projeto.

A maioria dos nossos microsserviços se comunicam entre si de forma assíncrona por meio de um agente de mensagens. Anteriormente, usávamos o IBM MQ como corretor, que não conseguia mais lidar com a carga, mas ao mesmo tempo tinha altas garantias de entrega.

Como substituto, foi oferecido o Apache Kafka, que tem alto potencial de escalonamento, mas, infelizmente, requer uma abordagem quase individual de configuração para diferentes cenários. Além disso, o mecanismo de entrega pelo menos uma vez que funciona no Kafka por padrão não permitiu manter o nível de consistência necessário imediatamente. A seguir, compartilharei nossa experiência na configuração do Kafka, em particular, direi como configurar e conviver exatamente com a entrega única.

Entrega garantida e muito mais

As configurações discutidas abaixo ajudarão a evitar vários problemas com as configurações de conexão padrão. Mas primeiro gostaria de prestar atenção a um parâmetro que facilitará uma possível depuração.

Isso vai ajudar ID do Cliente para Produtor e Consumidor. À primeira vista, você pode usar o nome do aplicativo como valor e, na maioria dos casos, isso funcionará. Embora a situação em que um aplicativo usa vários Consumidores e você fornece a eles o mesmo client.id, resulta no seguinte aviso:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Se você quiser usar JMX em um aplicativo com Kafka, isso pode ser um problema. Nesse caso, é melhor usar uma combinação do nome do aplicativo e, por exemplo, o nome do tópico como valor client.id. O resultado da nossa configuração pode ser visto na saída do comando grupos de consumidores kafka dos utilitários do Confluent:

Como Kafka se tornou realidade

Agora vamos examinar o cenário de entrega garantida de mensagens. Kafka Producer tem um parâmetro reconhecer, que permite configurar após quantas confirmações o líder do cluster precisa para considerar a mensagem escrita com sucesso. Este parâmetro pode assumir os seguintes valores:

  • 0 — o reconhecimento não será considerado.
  • 1 é o parâmetro padrão, apenas 1 réplica é necessária para confirmar.
  • −1 — é necessária a confirmação de todas as réplicas sincronizadas (configuração do cluster min.insync.replicas).

A partir dos valores listados, fica claro que acks iguais a -1 fornecem a garantia mais forte de que a mensagem não será perdida.

Como todos sabemos, os sistemas distribuídos não são confiáveis. Para proteger contra falhas transitórias, o Kafka Producer oferece a opção novas tentativas, que permite definir o número de tentativas de reenvio dentro entrega.timeout.ms. Como o parâmetro retries tem um valor padrão de Integer.MAX_VALUE (2147483647), o número de novas tentativas de mensagem pode ser ajustado alterando apenas delivery.timeout.ms.

Estamos avançando para entrega exatamente única

As configurações listadas permitem que nosso Produtor entregue mensagens com alta garantia. Vamos agora falar sobre como garantir que apenas uma cópia de uma mensagem seja gravada em um tópico Kafka. No caso mais simples, para fazer isso, você precisa definir o parâmetro no Produtor habilitar.idempotência para verdadeiro. A idempotência garante que apenas uma mensagem seja gravada em uma partição específica de um tópico. A pré-condição para ativar a idempotência são os valores acks = todos, tentar novamente > 0, max.in.flight.requests.per.connection ≤ 5. Caso esses parâmetros não sejam especificados pelo desenvolvedor, os valores acima serão definidos automaticamente.

Quando a idempotência é configurada, é necessário garantir que as mesmas mensagens acabem sempre nas mesmas partições. Isso pode ser feito configurando a chave e o parâmetro partitioner.class como Produtor. Vamos começar com a chave. Deve ser o mesmo para cada envio. Isso pode ser facilmente conseguido usando qualquer um dos IDs comerciais da postagem original. O parâmetro partitioner.class tem um valor padrão - Particionador padrão. Com esta estratégia de particionamento, por padrão agimos assim:

  • Se a partição for especificada explicitamente ao enviar a mensagem, então a usaremos.
  • Se a partição não for especificada, mas a chave for especificada, selecione a partição pelo hash da chave.
  • Se a partição e a chave não forem especificadas, selecione as partições uma por uma (round-robin).

Além disso, usando uma chave e envio idempotente com um parâmetro max.in.flight.requests.per.connection = 1 oferece processamento simplificado de mensagens no Consumidor. Também vale lembrar que se o controle de acesso estiver configurado em seu cluster, você precisará de direitos para gravar de forma idempotente em um tópico.

Se de repente você não tiver os recursos de envio idempotente por chave ou a lógica do lado do Produtor exigir a manutenção da consistência dos dados entre diferentes partições, as transações virão em seu socorro. Além disso, usando uma transação em cadeia, você pode sincronizar condicionalmente um registro no Kafka, por exemplo, com um registro no banco de dados. Para permitir o envio transacional ao Produtor, ele deve ser idempotente e configurado adicionalmente transacional.id. Se o seu cluster Kafka tiver controle de acesso configurado, um registro transacional, como um registro idempotente, precisará de permissões de gravação, que podem ser concedidas por máscara usando o valor armazenado em transactional.id.

Formalmente, qualquer string, como o nome do aplicativo, pode ser usada como identificador de transação. Mas se você iniciar várias instâncias do mesmo aplicativo com o mesmo transactional.id, a primeira instância iniciada será interrompida com um erro, pois Kafka o considerará um processo zumbi.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Para resolver esse problema, adicionamos um sufixo ao nome do aplicativo na forma de nome do host, que obtemos das variáveis ​​de ambiente.

O produtor está configurado, mas as transações no Kafka controlam apenas o escopo da mensagem. Independentemente do status da transação, a mensagem vai imediatamente para o tópico, mas possui atributos adicionais do sistema.

Para evitar que tais mensagens sejam lidas antecipadamente pelo Consumidor, ele precisa definir o parâmetro nível de isolamento para o valor read_commited. Tal Consumidor poderá ler mensagens não transacionais como antes, e mensagens transacionais somente após um commit.
Se você definiu todas as configurações listadas anteriormente, então você configurou exatamente uma vez a entrega. Parabéns!

Mas há mais uma nuance. Transactional.id, que configuramos acima, é na verdade o prefixo da transação. No gerenciador de transações, um número de sequência é adicionado a ele. O identificador recebido é emitido para transacional.id.expiração.ms, que está configurado em um cluster Kafka e tem um valor padrão de “7 dias”. Se durante esse período o aplicativo não tiver recebido nenhuma mensagem, quando você tentar o próximo envio transacional você receberá InvalidPidMappingException. O coordenador da transação emitirá então um novo número de sequência para a próxima transação. No entanto, a mensagem poderá ser perdida se InvalidPidMappingException não for tratada corretamente.

Em vez de totais

Como você pode ver, não basta simplesmente enviar mensagens para Kafka. Você precisa escolher uma combinação de parâmetros e estar preparado para fazer alterações rápidas. Neste artigo, tentei mostrar em detalhes a configuração de entrega exatamente uma vez e descrevi vários problemas com as configurações client.id e transactional.id que encontramos. Abaixo está um resumo das configurações do Produtor e do Consumidor.

Produtor:

  1. acks = todos
  2. novas tentativas > 0
  3. habilitar.idempotência = verdadeiro
  4. max.in.flight.requests.per.connection ≤ 5 (1 para envio ordenado)
  5. transactional.id = ${nome do aplicativo}-${nome do host}

Consumidor:

  1. isolamento.nível=read_comprometido

Para minimizar erros em aplicações futuras, criamos nosso próprio wrapper sobre a configuração spring, onde já estão definidos valores para alguns dos parâmetros listados.

Aqui estão alguns materiais para auto-estudo:

Fonte: habr.com

Adicionar um comentário