How Kafka became a reality

How Kafka became a reality

Hey Habr!

I work in the Tinkoff team, which is developing its own notification center. For the most part, I develop in Java using Spring boot and solve various technical problems that arise in the project.

Most of our microservices communicate with each other asynchronously through a message broker. Previously, we used IBM MQ as a broker, which could no longer cope with the load, but at the same time had high delivery guarantees.

As a replacement, we were offered Apache Kafka, which has a high scaling potential, but, unfortunately, requires an almost individual approach to configuring for different scenarios. In addition, the at least once delivery mechanism that works in Kafka by default did not allow maintaining the required level of consistency out of the box. Next, I will share our experience with configuring Kafka, in particular, how to set up and live with exactly once delivery.

Guaranteed delivery and more

The options discussed below will help prevent a number of problems with the default connection settings. But first, I would like to pay attention to one parameter that will facilitate a possible debug.

This will help client.id for Producer and Consumer. At first glance, you can use the name of the application as the value, and in most cases this will work. Although the situation when there are several Consumers in the application and you give them the same client.id, leads to the following warning:

org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0

If you want to use JMX in an application with Kafka, then this can be a problem. For this case, it is best to use a combination of the application name and, for example, the topic name as the client.id value. The result of our configuration can be seen in the output of the command kafka-consumer-groups from utilities from Confluent:

How Kafka became a reality

Now let's look at the scenario of guaranteed message delivery. Kafka Producer has a parameter acks, which allows you to configure after how many acknowledges the cluster leader should consider the message as successfully written. This parameter can take the following values:

  • 0 - acknowledge will not be considered.
  • 1 — default parameter, only 1 replica needs to be acknowledged.
  • −1 — acknowledge is required from all synchronized replicas (cluster setting min.insync.replicas).

From the listed values, it can be seen that acks equal to -1 gives the strongest guarantee that the message will not be lost.

As we all know, distributed systems are unreliable. To guard against transient failures, Kafka Producer provides the parameter retries, which allows you to set the number of resubmission attempts within delivery.timeout.ms. Since the retries parameter has a default value of Integer.MAX_VALUE (2147483647), the number of resubmissions of the message can be adjusted by changing only delivery.timeout.ms.

Moving towards exactly once delivery

These settings allow our Producer to deliver messages with a high guarantee. Let's now talk about how to ensure that only one copy of a message is written to a Kafka topic? In the simplest case, for this, you need to set the Producer parameter enable.idempotence to true. Idempotency guarantees that only one message is written to a particular partition of one topic. The precondition for enabling idempotency are the values acks = all, retry > 0, max.in.flight.requests.per.connection ≤ 5. If these parameters are not set by the developer, the above values ​​will be automatically set.

When idempotency is set up, it is necessary to ensure that the same messages end up in the same partitions every time. This can be done by setting the partitioner.class key and parameter to Producer. Let's start with the key. It must be the same for every submission. This is easy to achieve using some business ID from the original post. The partitioner.class parameter has a default value − DefaultPartitioner. With this default partitioning strategy, we act like this:

  • If the partition is explicitly specified when sending the message, then we use it.
  • If the partition is not specified, but the key is specified, select the partition by hash from the key.
  • If the partition and key are not specified, select the partitions in turn (round-robin).

Also, using a key and idempotent send with a parameter max.in.flight.requests.per.connection = 1 gives you ordered message handling on Consumer. Separately, it is worth remembering that if access control is configured on your cluster, then you will need rights to idempotent write to the topic.

If suddenly you lack the ability to send idempotent by key, or the logic on the Producer side requires maintaining data consistency between different partitions, then transactions will come to the rescue. In addition, using a chain transaction, you can conditionally synchronize a record in Kafka, for example, with a record in a database. To enable transactional sending to a Producer, it must be idempotent, and additionally set transactional.id. If your Kafka cluster has access control configured, then a transactional record, like an idempotent one, will need write permissions, which can be granted by mask using the value stored in transactional.id.

Formally, any string, such as the application name, can be used as a transaction identifier. But if you start multiple instances of the same application with the same transactional.id, then the first instance launched will be stopped with an error, because Kafka will consider it a zombie process.

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

To solve this problem, we add a hostname suffix to the application name, which we get from environment variables.

The Producer is set up, but Kafka transactions only control the scope of the message. Regardless of the status of the transaction, the message immediately gets into the topic, but has additional system attributes.

To prevent such messages from being read by the Consumer ahead of time, it needs to set the parameter isolation level to the read_committed value. Such a Consumer will be able to read non-transactional messages as before, and transactional messages only after the commit.
If you have set all the settings listed above, then you have configured exactly once delivery. Congratulations!

But there is one more nuance. The transactional.id we set up above is actually the transaction prefix. On the transaction manager, a serial number is appended to it. The resulting identifier is issued to transactional.id.expiration.ms, which is configured on the Kafka cluster and has a default value of 7 days. If during this time the application did not receive any messages, then when you try the next transactional send, you will receive InvalidPidMappingException. The transaction coordinator will then issue a new sequence number for the next transaction. However, the message may be lost if the InvalidPidMappingException is not correctly handled.

Instead of totals

As you can see, it's not enough to just send messages to Kafka. You need to choose a combination of parameters and be ready to make quick changes. In this article, I tried to show the exactly once delivery setting in detail and described several problems with the client.id and transactional.id configurations that we encountered. The Producer and Consumer settings are summarized below.

Producer:

  1. acks = all
  2. retries > 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 for ordered send)
  5. transactional.id = ${application-name}-${hostname}

consumer:

  1. isolation.level = read_committed

In order to minimize errors in future applications, we have made our own wrapper over the spring configuration, where the values ​​\uXNUMXb\uXNUMXbof some of the listed parameters are already set.

And here are a couple of materials for self-study:

Source: habr.com

Add a comment