Como Kafka se fixo realidade

Como Kafka se fixo realidade

Ola Habr!

Traballo no equipo de Tinkoff, que está a desenvolver o seu propio centro de notificacións. Desenvolvo principalmente en Java usando Spring boot e resolvo varios problemas técnicos que xorden nun proxecto.

A maioría dos nosos microservizos comunícanse entre si de forma asíncrona a través dun intermediario de mensaxes. Anteriormente, utilizabamos IBM MQ como intermediario, que xa non podía facer fronte á carga, pero ao mesmo tempo tiña altas garantías de entrega.

Como substitución, ofrecéronnos Apache Kafka, que ten un alto potencial de escalado, pero, por desgraza, require un enfoque case individual da configuración para diferentes escenarios. Ademais, o mecanismo de entrega polo menos unha vez que funciona en Kafka por defecto non permitía manter o nivel de coherencia necesario fóra da caixa. A continuación, compartirei a nosa experiencia na configuración de Kafka, en particular, contarei como configurar e vivir con exactamente unha vez entrega.

Entrega garantida e moito máis

A configuración que se comenta a continuación axudará a evitar unha serie de problemas coa configuración de conexión predeterminada. Pero primeiro gustaríame prestar atención a un parámetro que facilitará unha posible depuración.

Isto axudará cliente.id para o produtor e o consumidor. A primeira vista, pode usar o nome da aplicación como valor e, na maioría dos casos, funcionará. Aínda que a situación na que unha aplicación usa varios Consumidores e lles dás o mesmo client.id, dá como resultado o seguinte aviso:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Se queres usar JMX nunha aplicación con Kafka, isto pode ser un problema. Para este caso, é mellor usar unha combinación do nome da aplicación e, por exemplo, o nome do tema como valor client.id. O resultado da nosa configuración pódese ver na saída do comando grupos de consumidores kafka dos servizos públicos de Confluent:

Como Kafka se fixo realidade

Agora vexamos o escenario para a entrega de mensaxes garantida. Kafka Producer ten un parámetro acos, que lle permite configurar despois de cantos recoñecementos precisa o líder do clúster para considerar a mensaxe escrita correctamente. Este parámetro pode tomar os seguintes valores:

  • 0: non se considerará o recoñecemento.
  • 1 é o parámetro predeterminado, só se precisa 1 réplica para recoñecer.
  • −1 — Requírese o recoñecemento de todas as réplicas sincronizadas (configuración do clúster min.insync.réplicas).

Dos valores enumerados está claro que acks igual a −1 dá a garantía máis forte de que a mensaxe non se perderá.

Como todos sabemos, os sistemas distribuídos non son fiables. Para protexerse contra fallos transitorios, Kafka Producer ofrece a opción reintentos, que che permite definir o número de intentos de reenvío dentro tempo de entrega.ms. Dado que o parámetro de reintentos ten un valor predeterminado de Integer.MAX_VALUE (2147483647), o número de reintentos de mensaxe pódese axustar cambiando só delivery.timeout.ms.

Estamos avanzando cara a unha única entrega

A configuración indicada permite que o noso produtor envíe mensaxes cunha alta garantía. Falemos agora de como garantir que só se escribe unha copia dunha mensaxe nun tema de Kafka? No caso máis sinxelo, para facelo, cómpre configurar o parámetro en Producer habilitar.idempotencia a verdade. Idempotencia garante que só se escribe unha mensaxe nunha partición específica dun tema. A condición previa para habilitar a idempotencia son os valores acks = todos, reintento > 0, max.in.flight.requests.per.connection ≤ 5. Se o programador non especifica estes parámetros, os valores anteriores estableceranse automaticamente.

Cando se configura a idempotencia, é necesario asegurarse de que as mesmas mensaxes acaben nas mesmas particións cada vez. Isto pódese facer configurando a clave e o parámetro partitioner.class en Producer. Imos comezar coa chave. Debe ser o mesmo para cada envío. Isto pódese conseguir facilmente utilizando calquera dos ID de empresa da publicación orixinal. O parámetro partitioner.class ten un valor predeterminado − Particionador predeterminado. Con esta estratexia de partición, por defecto actuamos así:

  • Se a partición se especifica explícitamente ao enviar a mensaxe, entón usámola.
  • Se non se especifica a partición, pero se especifica a chave, seleccione a partición polo hash da chave.
  • Se non se especifican a partición e a clave, seleccione as particións unha por unha (round-robin).

Ademais, usando unha clave e envío idempotente cun parámetro max.in.flight.requests.per.connection = 1 ofrécelle un procesamento de mensaxes simplificado en Consumer. Tamén vale a pena lembrar que se o control de acceso está configurado no seu clúster, entón necesitará dereitos para escribir de forma idempotente nun tema.

Se de súpeto carece das capacidades de envío idempotente por clave ou a lóxica do lado do Produtor require manter a coherencia dos datos entre as diferentes particións, entón as transaccións acudirán ao rescate. Ademais, usando unha transacción en cadea, pode sincronizar condicionalmente un rexistro en Kafka, por exemplo, cun rexistro na base de datos. Para habilitar o envío transaccional ao Produtor, debe ser idempotente e adicionalmente configurado transaccional.id. Se o seu clúster de Kafka ten o control de acceso configurado, entón un rexistro transaccional, como un rexistro idempotente, necesitará permisos de escritura, que se poden conceder mediante máscara usando o valor almacenado en transactional.id.

Formalmente, calquera cadea, como o nome da aplicación, pode usarse como identificador de transacción. Pero se inicias varias instancias da mesma aplicación co mesmo transactional.id, a primeira instancia iniciada deterase cun erro, xa que Kafka considerará que é un proceso zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Para solucionar este problema, engadimos un sufixo ao nome da aplicación en forma de nome de host, que obtemos das variables de ambiente.

O produtor está configurado, pero as transaccións en Kafka só controlan o alcance da mensaxe. Independentemente do estado da transacción, a mensaxe vai inmediatamente ao tema, pero ten atributos adicionais do sistema.

Para evitar que tales mensaxes sexan lidas polo consumidor con antelación, debe configurar o parámetro illamento.nivel ao valor read_committed. Este consumidor poderá ler mensaxes non transacionais como antes, e as mensaxes transacionais só despois dunha confirmación.
Se estableceu todas as opcións listadas anteriormente, configuraches exactamente unha vez entrega. Parabéns!

Pero hai un matiz máis. Transactional.id, que configuramos anteriormente, é en realidade o prefixo da transacción. No xestor de transaccións, engádeselle un número de secuencia. O identificador recibido emítese a transaccional.id.expiration.ms, que está configurado nun clúster de Kafka e ten un valor predeterminado de "7 días". Se durante este tempo a aplicación non recibiu ningunha mensaxe, entón cando tente o seguinte envío transaccional recibirás InvalidPidMappingException. O coordinador da transacción emitirá entón un novo número de secuencia para a seguinte transacción. Non obstante, a mensaxe pode perderse se a InvalidPidMappingException non se xestiona correctamente.

En lugar de resultados

Como podes ver, non basta con enviar mensaxes a Kafka. Debe escoller unha combinación de parámetros e estar preparado para facer cambios rápidos. Neste artigo, tentei mostrar en detalle a configuración de entrega exactamente unha vez e describín varios problemas coas configuracións client.id e transactional.id que atopamos. A continuación móstrase un resumo da configuración do produtor e do consumidor.

Produtor:

  1. acks = todos
  2. reintentos > 0
  3. habilitar.idempotencia = verdadeiro
  4. máx.en.voo.solicitudes.por.conexión ≤ 5 (1 para envío ordenado)
  5. transactional.id = ${nome-aplicación}-${nome de host}

Consumidor:

  1. isolation.level = read_committed

Para minimizar os erros en futuras aplicacións, fixemos o noso propio envoltorio sobre a configuración do resorte, onde xa están establecidos os valores para algúns dos parámetros enumerados.

Aquí tes un par de materiais para o autoestudo:

Fonte: www.habr.com

Engadir un comentario