Cómo Kafka se hizo realidad

Cómo Kafka se hizo realidad

¡Hola, Habr!

Trabajo en el equipo de Tinkoff, que está desarrollando su propio centro de notificaciones. Principalmente desarrollo en Java usando Spring boot y resuelvo varios problemas técnicos que surgen en un proyecto.

La mayoría de nuestros microservicios se comunican entre sí de forma asincrónica a través de un intermediario de mensajes. Anteriormente utilizábamos IBM MQ como intermediario, que ya no podía hacer frente a la carga, pero al mismo tiempo tenía altas garantías de entrega.

Como reemplazo, nos ofrecieron Apache Kafka, que tiene un alto potencial de escalabilidad, pero, desafortunadamente, requiere un enfoque de configuración casi individual para diferentes escenarios. Además, el mecanismo de entrega al menos una vez que funciona en Kafka de forma predeterminada no permitía mantener el nivel requerido de coherencia desde el primer momento. A continuación, compartiré nuestra experiencia en la configuración de Kafka, en particular, les diré cómo configurar y vivir con la entrega exactamente una vez.

Entrega garantizada y más

Las configuraciones que se describen a continuación ayudarán a evitar una serie de problemas con la configuración de conexión predeterminada. Pero primero me gustaría prestar atención a un parámetro que facilitará una posible depuración.

Esto ayudará Identificación del cliente para Productor y Consumidor. A primera vista, puede utilizar el nombre de la aplicación como valor y, en la mayoría de los casos, esto funcionará. Aunque la situación cuando una aplicación utiliza varios Consumidores y les das el mismo client.id, resulta en la siguiente advertencia:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

Si desea utilizar JMX en una aplicación con Kafka, esto podría ser un problema. En este caso, es mejor utilizar una combinación del nombre de la aplicación y, por ejemplo, el nombre del tema como valor client.id. El resultado de nuestra configuración se puede ver en la salida del comando. grupos-de-consumidores-de-kafka de servicios públicos de Confluent:

Cómo Kafka se hizo realidad

Ahora veamos el escenario de entrega de mensajes garantizada. Kafka Producer tiene un parámetro acusaciones, que le permite configurar después de cuántos reconocimientos necesita el líder del clúster para considerar que el mensaje se escribió correctamente. Este parámetro puede tomar los siguientes valores:

  • 0: no se considerará el reconocimiento.
  • 1 es el parámetro predeterminado, solo se requiere 1 réplica para confirmar.
  • −1: se requiere reconocimiento de todas las réplicas sincronizadas (configuración del clúster min.insync.réplicas).

De los valores enumerados queda claro que los reconocimientos iguales a −1 ofrecen la garantía más sólida de que el mensaje no se perderá.

Como todos sabemos, los sistemas distribuidos no son confiables. Para protegerse contra fallas transitorias, Kafka Producer ofrece la opción reintentos, que le permite establecer el número de intentos de reenvío dentro entrega.tiempo de espera.ms. Dado que el parámetro de reintentos tiene un valor predeterminado de Integer.MAX_VALUE (2147483647), el número de reintentos de mensajes se puede ajustar cambiando solo delivery.timeout.ms.

Estamos avanzando hacia la entrega exactamente una vez.

Las configuraciones enumeradas permiten a nuestro Productor entregar mensajes con una alta garantía. Hablemos ahora sobre cómo garantizar que solo se escriba una copia de un mensaje en un tema de Kafka. En el caso más simple, para hacer esto, debe configurar el parámetro en Productor habilitar.idempotencia a verdadero. La idempotencia garantiza que solo se escriba un mensaje en una partición específica de un tema. La condición previa para permitir la idempotencia son los valores acks = todos, reintentar > 0, max.in.flight.requests.per.connection ≤ 5. Si el desarrollador no especifica estos parámetros, los valores anteriores se establecerán automáticamente.

Cuando se configura la idempotencia, es necesario asegurarse de que los mismos mensajes terminen en las mismas particiones cada vez. Esto se puede hacer configurando la clave y el parámetro de particionador.class en Productor. Empecemos por la clave. Debe ser el mismo para cada envío. Esto se puede lograr fácilmente utilizando cualquiera de las identificaciones comerciales de la publicación original. El parámetro particionador.class tiene un valor predeterminado: Particionador predeterminado. Con esta estrategia de partición, por defecto actuamos así:

  • Si la partición se especifica explícitamente al enviar el mensaje, entonces la usamos.
  • Si no se especifica la partición, pero sí la clave, seleccione la partición por el hash de la clave.
  • Si no se especifican la partición ni la clave, seleccione las particiones una por una (por turnos).

Además, usando una clave y envío idempotente con un parámetro solicitudes.máximas.en.vuelo.por.conexión = 1 le brinda un procesamiento de mensajes optimizado en el Consumidor. También vale la pena recordar que si el control de acceso está configurado en su clúster, necesitará derechos para escribir de forma idempotente en un tema.

Si de repente le faltan las capacidades de envío idempotente por clave o la lógica del lado del Productor requiere mantener la coherencia de los datos entre diferentes particiones, entonces las transacciones vendrán al rescate. Además, utilizando una transacción en cadena, puede sincronizar condicionalmente un registro en Kafka, por ejemplo, con un registro en la base de datos. Para permitir el envío transaccional al Productor, debe ser idempotente y además configurarse transaccional.id. Si su clúster Kafka tiene configurado el control de acceso, entonces un registro transaccional, como un registro idempotente, necesitará permisos de escritura, que se pueden otorgar mediante máscara utilizando el valor almacenado en transaccional.id.

Formalmente, cualquier cadena, como el nombre de la aplicación, se puede utilizar como identificador de transacción. Pero si inicia varias instancias de la misma aplicación con el mismo transaccional.id, la primera instancia iniciada se detendrá con un error, ya que Kafka lo considerará un proceso zombie.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

Para resolver este problema, agregamos un sufijo al nombre de la aplicación en forma de nombre de host, que obtenemos de las variables de entorno.

El productor está configurado, pero las transacciones en Kafka solo controlan el alcance del mensaje. Independientemente del estado de la transacción, el mensaje pasa inmediatamente al tema, pero tiene atributos adicionales del sistema.

Para evitar que el consumidor lea dichos mensajes con anticipación, debe configurar el parámetro nivel.de.aislamiento al valor read_committed. Dicho consumidor podrá leer mensajes no transaccionales como antes y mensajes transaccionales solo después de una confirmación.
Si ha establecido todas las configuraciones enumeradas anteriormente, entonces habrá configurado la entrega exactamente una vez. ¡Felicidades!

Pero hay un matiz más. Transactional.id, que configuramos anteriormente, es en realidad el prefijo de transacción. En el administrador de transacciones, se le agrega un número de secuencia. El identificador recibido se emite a transaccional.id.expiración.ms, que está configurado en un clúster Kafka y tiene un valor predeterminado de "7 días". Si durante este tiempo la aplicación no ha recibido ningún mensaje, cuando intente el siguiente envío transaccional recibirá Excepción de asignación de Pid no válida. Luego, el coordinador de transacciones emitirá un nuevo número de secuencia para la siguiente transacción. Sin embargo, el mensaje puede perderse si InvalidPidMappingException no se maneja correctamente.

En lugar de totales

Como puede ver, no basta con enviar mensajes a Kafka. Debe elegir una combinación de parámetros y estar preparado para realizar cambios rápidos. En este artículo, intenté mostrar en detalle la configuración de entrega exactamente una vez y describí varios problemas con las configuraciones client.id y transaccional.id que encontramos. A continuación se muestra un resumen de la configuración de Productor y Consumidor.

Productor:

  1. ataques = todos
  2. reintentos > 0
  3. enable.idempotencia = verdadero
  4. max.in.flight.solicitudes.por.conexión ≤ 5 (1 para envío ordenado)
  5. transaccional.id = ${nombre-aplicación}-${nombrehost}

Consumidor:

  1. aislamiento.nivel = lectura_committed

Para minimizar errores en aplicaciones futuras, creamos nuestro propio contenedor sobre la configuración del resorte, donde ya están establecidos los valores para algunos de los parámetros enumerados.

Aquí hay un par de materiales para el autoestudio:

Fuente: habr.com

Añadir un comentario