Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka

Continuation of the translation of a small book:
Understanding Message Brokers
author: Jakub Korab, publisher: O'Reilly Media, Inc., date of publication: June 2017, ISBN: 9781492049296.

Previous translated part: Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 1 Introduction

CHAPTER 3

Kafka

Kafka was developed at LinkedIn to get around some of the limitations of traditional message brokers and avoid having to set up multiple message brokers for different point-to-point interactions, which is described in this book under "Scaling up and out" on page 28. Use cases LinkedIn has largely relied on one-way ingestion of very large amounts of data, such as page clicks and access logs, while still allowing that data to be used by multiple systems without impacting the productivity of producers or other consumers. In fact, the reason Kafka exists is to get the kind of messaging architecture that the Universal Data Pipeline describes.

Given this ultimate goal, other requirements naturally arose. Kafka should:

  • Be extremely fast
  • Provide more bandwidth when working with messages
  • Support Publisher-Subscriber and Point-to-Point models
  • Don't slow down with adding consumers. For example, the performance of both the queue and the topic in ActiveMQ degrades as the number of consumers on the destination grows.
  • Be horizontally scalable; if one broker that persists messages can only do so at maximum disk speed, then it makes sense to go beyond a single broker instance to increase performance
  • Limit access to storing and re-retrieving messages

To achieve all this, Kafka adopted an architecture that redefined the roles and responsibilities of clients and messaging brokers. The JMS model is very broker oriented, where the broker is responsible for distributing messages and clients only have to worry about sending and receiving messages. Kafka, on the other hand, is client-centric, with the client taking on many of the features of a traditional broker, such as fair distribution of relevant messages to consumers, in exchange for an extremely fast and scalable broker. For people who have worked with traditional messaging systems, working with Kafka requires a fundamental change of mind.
This engineering direction has led to the creation of a messaging infrastructure capable of increasing throughput by many orders of magnitude compared to a conventional broker. As we will see, this approach comes with trade-offs, which means that Kafka is not suitable for certain types of workloads and installed software.

Unified Destination Model

To fulfill the requirements described above, Kafka has combined publish-subscribe and point-to-point messaging under one kind of destination βˆ’ topic. This is confusing to people who have worked with messaging systems, where the word "topic" refers to a broadcast mechanism from which (from the topic) reading is nondurable. Kafka topics should be considered a hybrid destination type, as defined in the introduction to this book.

For the remainder of this chapter, unless we explicitly state otherwise, the term "topic" will refer to a Kafka topic.

To fully understand how topics behave and what guarantees they provide, we need to first look at how they are implemented in Kafka.
Each topic in Kafka has its own log.
Producers sending messages to Kafka write to this log, and consumers read from the log using pointers that constantly move forward. Periodically, Kafka deletes the oldest parts of the log, whether the messages in those parts have been read or not. A central part of Kafka's design is that the broker doesn't care if messages are read or not - that's the client's responsibility.

The terms "log" and "pointer" do not appear in Kafka documentation. These well-known terms are used here to aid understanding.

This model is completely different from ActiveMQ, where messages from all queues are stored in the same log, and the broker marks the messages as deleted after they have been read.
Let's now dig a little deeper and look at the topic log in more detail.
The Kafka log consists of several partitions (Figure 3-1). Kafka guarantees strict ordering in each partition. This means that messages written to the partition in a certain order will be read in the same order. Each partition is implemented as a rolling log file that contains subset (subset) of all messages sent to the topic by its producers. The created topic contains, by default, one partition. The idea of ​​partitions is the central idea of ​​Kafka for horizontal scaling.

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-1. Kafka Partitions

When a producer sends a message to a Kafka topic, it decides which partition to send the message to. We will look at this in more detail later.

Reading messages

The client that wants to read the messages manages a named pointer called consumer group, which points to offset messages in the partition. An offset is an incremental position that starts at 0 at the start of a partition. This consumer group, referenced in the API via the user-defined group_id, corresponds to one logical consumer or system.

Most messaging systems read data from the destination using multiple instances and threads to process messages in parallel. Thus, there will usually be many consumer instances sharing the same consumer group.

The problem of reading can be represented as follows:

  • Topic has multiple partitions
  • Multiple groups of consumers can use a topic at the same time
  • A group of consumers can have multiple separate instances

This is a non-trivial many-to-many problem. To understand how Kafka handles relationships between consumer groups, consumer instances, and partitions, let's look at a series of progressively more complex reading scenarios.

Consumers and consumer groups

Let's take as a starting point a topic with one partition (Figure 3-2).

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-2. Consumer reads from partition

When a consumer instance connects with its own group_id to this topic, it is assigned a read partition and an offset in that partition. The position of this offset is configurable in the client as a pointer to the most recent position (newest message) or earliest position (oldest message). The consumer requests (polls) messages from the topic, which causes them to be sequentially read from the log.
The offset position is regularly committed back to Kafka and stored as messages in an internal topic _consumer_offsets. Read messages are still not deleted, unlike a regular broker, and the client can rewind the offset to re-process already viewed messages.

When a second logical consumer connects using a different group_id, it manages a second pointer that is independent of the first (Figure 3-3). Thus, a Kafka topic acts like a queue where there is one consumer and like a normal publish-subscribe (pub-sub) topic that multiple consumers subscribe to, with the added benefit that all messages are stored and can be processed multiple times.

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-3. Two consumers in different consumer groups read from the same partition

Consumers in a consumer group

When one consumer instance reads data from a partition, it has full control of the pointer and processes messages as described in the previous section.
If several instances of consumers were connected with the same group_id to a topic with one partition, then the instance that connected last will be given control over the pointer and from that moment on it will receive all messages (Figure 3-4).

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-4. Two consumers in the same consumer group read from the same partition

This mode of processing, in which the number of consumer instances exceeds the number of partitions, can be thought of as a kind of exclusive consumer. This can be useful if you need "active-passive" (or "hot-warm") clustering of your consumer instances, although running multiple consumers in parallel ("active-active" or "hot-hot") is much more typical than consumers. In standby.

This message distribution behavior described above can be surprising compared to how a normal JMS queue behaves. In this model, messages sent to the queue will be evenly distributed between the two consumers.

Most often, when we create multiple instances of consumers, we do this either to process messages in parallel, or to increase the speed of reading, or to increase the stability of the reading process. Since only one consumer instance can read data from a partition at a time, how is this achieved in Kafka?

One way to do this is to use a single consumer instance to read all the messages and pass them to the thread pool. While this approach increases processing throughput, it increases the complexity of the consumer logic and does nothing to increase the robustness of the reading system. If one copy of the consumer goes down due to a power failure or similar event, then the subtraction stops.

The canonical way to solve this problem in Kafka is to use bОmore partitions.

Partitioning

Partitions are the main mechanism for parallelizing reading and scaling a topic beyond the bandwidth of a single broker instance. To better understand this, let's consider a situation where there is a topic with two partitions and one consumer subscribes to this topic (Figure 3-5).

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-5. One consumer reads from multiple partitions

In this scenario, the consumer is given control over the pointers corresponding to its group_id in both partitions and starts reading messages from both partitions.
When an additional consumer for the same group_id is added to this topic, Kafka reallocates one of the partitions from the first to the second consumer. After that, each instance of the consumer will read from one partition of the topic (Figure 3-6).

To ensure that messages are processed in parallel in 20 threads, you need at least 20 partitions. If there are fewer partitions, you will be left with consumers that have nothing to work on, as described earlier in the discussion of exclusive consumers.

Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 3. Kafka
Figure 3-6. Two consumers in the same consumer group read from different partitions

This scheme greatly reduces the complexity of the Kafka broker compared to the message distribution required to maintain the JMS queue. Here you do not need to worry about the following points:

  • Which consumer should receive the next message, based on round-robin allocation, current capacity of prefetch buffers, or previous messages (as for JMS message groups).
  • Which messages are sent to which consumers and whether they should be re-delivered in case of failure.

All the Kafka broker has to do is pass messages sequentially to the consumer when the latter requests them.

However, the requirements for parallelizing the proofreading and resending failed messages do not go away - the responsibility for them simply passes from the broker to the client. This means that they must be taken into account in your code.

Sending messages

It is the responsibility of the producer of that message to decide which partition to send a message to. To understand the mechanism by which this is done, we first need to consider what exactly we are actually sending.

Whereas in JMS we use a message structure with metadata (headers and properties) and a body containing the payload (payload), in Kafka the message is pair "key-value". The message payload is sent as a value. The key, on the other hand, is mainly used for partitioning and must contain business logic specific keyto put related messages in the same partition.

In Chapter 2, we discussed the online betting scenario where related events need to be processed in order by a single consumer:

  1. The user account is configured.
  2. Money is credited to the account.
  3. A bet is made that withdraws money from the account.

If each event is a message posted to a topic, then the natural key would be the account ID.
When a message is sent using the Kafka Producer API, it is passed to a partition function which, given the message and the current state of the Kafka cluster, returns the ID of the partition to which the message should be sent. This feature is implemented in Java through the Partitioner interface.

This interface looks like this:

interface Partitioner {
    int partition(String topic,
        Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
}

The Partitioner implementation uses the default general-purpose hashing algorithm over the key to determine the partition, or round-robin if no key is specified. This default value works well in most cases. However, in the future you will want to write your own.

Writing your own partitioning strategy

Let's look at an example where you want to send metadata along with the message payload. The payload in our example is an instruction to make a deposit to the game account. An instruction is something that we would like to be guaranteed not to be modified on transmission and want to be sure that only a trusted upstream system can initiate that instruction. In this case, the sending and receiving systems agree on the use of a signature to authenticate the message.
In normal JMS, we simply define a "message signature" property and add it to the message. However, Kafka does not provide us with a mechanism for passing metadata, only a key and a value.

Since the value is a bank transfer payload whose integrity we want to preserve, we have no choice but to define the data structure to use in the key. Assuming that we need an account ID for partitioning, since all messages related to an account must be processed in order, we will come up with the following JSON structure:

{
  "signature": "541661622185851c248b41bf0cea7ad0",
  "accountId": "10007865234"
}

Because the value of the signature will vary depending on the payload, the default hashing strategy of the Partitioner interface will not reliably group related messages. Therefore, we will need to write our own strategy that will parse this key and partition the accountId value.

Kafka includes checksums to detect corruption of messages in the store and has a full set of security features. Even so, industry-specific requirements, such as the one above, sometimes appear.

The user's partitioning strategy must ensure that all related messages end up in the same partition. While this seems simple, the requirement can be complicated by the importance of ordering related messages and how fixed the number of partitions in a topic is.

The number of partitions in a topic can change over time, as they can be added if traffic goes beyond initial expectations. Thus, message keys can be associated with the partition they were originally sent to, implying a piece of state to be shared between producer instances.

Another factor to consider is the even distribution of messages across partitions. Typically, keys are not distributed evenly across messages, and hash functions do not guarantee a fair distribution of messages for a small set of keys.
It's important to note that however you choose to split messages, the separator itself may need to be reused.

Consider the requirement to replicate data between Kafka clusters in different geographic locations. For this purpose, Kafka comes with a command line tool called MirrorMaker, which is used to read messages from one cluster and transfer them to another.

MirrorMaker must understand the keys of the replicated topic in order to maintain relative order between messages when replicating between clusters, since the number of partitions for that topic may not be the same in two clusters.

Custom partitioning strategies are relatively rare, as default hashing or round robin works well in most scenarios. However, if you require strong ordering guarantees or need to extract metadata from payloads, then partitioning is something you should take a closer look at.

The scalability and performance benefits of Kafka come from shifting some of the responsibilities of the traditional broker to the client. In this case, a decision is made to distribute potentially related messages among several consumers working in parallel.

JMS brokers also need to deal with such requirements. Interestingly, the mechanism for sending related messages to the same consumer, implemented through JMS Message Groups (a variation on the sticky load balancing (SLB) strategy), also requires the sender to mark messages as related. In the case of JMS, the broker is responsible for sending this group of related messages to one consumer out of many, and transferring ownership of the group if the consumer falls off.

Producer Agreements

Partitioning is not the only thing to consider when sending messages. Let's take a look at the send() methods of the Producer class in the Java API:

Future < RecordMetadata > send(ProducerRecord < K, V > record);
Future < RecordMetadata > send(ProducerRecord < K, V > record, Callback callback);

It should be immediately noted that both methods return Future, which indicates that the send operation is not performed immediately. The result is that a message (ProducerRecord) is written to the send buffer for each active partition and sent to the broker as a background thread in the Kafka client library. While this makes things incredibly fast, it means that an inexperienced application can lose messages if its process is stopped.

As always, there is a way to make the send operation more reliable at the cost of performance. The size of this buffer can be set to 0, and the sending application thread will be forced to wait until the message transfer to the broker is completed, as follows:

RecordMetadata metadata = producer.send(record).get();

More about reading messages

Reading messages has additional complexities that need to be speculated about. Unlike the JMS API, which can run a message listener in response to a message, the Consumer Kafka only polls. Let's take a closer look at the method poll()used for this purpose:

ConsumerRecords < K, V > poll(long timeout);

The return value of the method is a container structure containing multiple objects consumer record from potentially several partitions. consumer record is itself a holder object for a key-value pair with associated metadata, such as the partition from which it is derived.

As discussed in Chapter 2, we must keep in mind what happens to messages after they have been successfully or unsuccessfully processed, for example, if the client is unable to process the message or if it aborts. In JMS, this was handled through an acknowledgment mode. The broker will either delete the successfully processed message, or re-deliver the raw or fake message (assuming transactions were used).
Kafka works very differently. Messages are not deleted in the broker after proofreading, and what happens on failure is the responsibility of the proofreading code itself.

As we have said, the consumer group is associated with the offset in the log. The log position associated with this offset corresponds to the next message to be issued in response to poll(). The point in time when this offset increases is decisive for reading.

Returning to the reading model discussed earlier, message processing consists of three stages:

  1. Retrieve a message for reading.
  2. Process the message.
  3. Confirm message.

The Kafka consumer comes with a configuration option enable.auto.commit. This is a frequently used default setting, as is common with settings containing the word "auto".

Prior to Kafka 0.10, a client using this option would send the offset of the last message read on the next call poll() after processing. This meant that any messages that had already been fetched could be reprocessed if the client had already processed them but was unexpectedly destroyed before calling poll(). Since the broker does not keep any state about how many times a message has been read, the next consumer that retrieves that message will not know that something bad has happened. This behavior was pseudo-transactional. The offset was only committed if the message was successfully processed, but if the client aborted, the broker would send the same message again to another client. This behavior was consistent with the message delivery guarantee "at least onceΒ«.

In Kafka 0.10, the client code has been changed so that the commit is triggered periodically by the client library, as configured auto.commit.interval.ms. This behavior is somewhere between the JMS AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE modes. When using autocommit, messages could be committed regardless of whether they were actually processed - this could happen in the case of a slow consumer. If a consumer aborted, messages would be fetched by the next consumer, starting at the committed position, which could result in a missed message. In this case, Kafka didn't lose the messages, the reading code just didn't process them.

This mode has the same promise as in version 0.9: messages can be processed, but if it fails, the offset may not be committed, potentially causing delivery to be doubled. The more messages you fetch when executing poll(), the more this problem.

As discussed in β€œReading Messages from a Queue” on page 21, there is no such thing as a one-time delivery of a message in a messaging system when failure modes are taken into account.

In Kafka, there are two ways to commit (commit) an offset (offset): automatically and manually. In both cases, messages can be processed multiple times if the message was processed but failed before the commit. You can also choose not to process the message at all if the commit happened in the background and your code was completed before it could be processed (perhaps in Kafka 0.9 and earlier).

You can control the manual offset commit process in the Kafka consumer API by setting the parameter enable.auto.commit to false and explicitly calling one of the following methods:

void commitSync();
void commitAsync();

If you want to process the message "at least once", you must commit the offset manually with commitSync()by executing this command immediately after processing the messages.

These methods do not allow messages to be acknowledged before they are processed, but they do nothing to eliminate potential processing delays while giving the appearance of being transactional. There are no transactions in Kafka. The client does not have the ability to do the following:

  • Automatically roll back a faked message. Consumers themselves must handle exceptions arising from problematic payloads and backend outages, as they cannot rely on the broker to re-deliver messages.
  • Send messages to multiple topics in one atomic operation. As we will see shortly, control over different topics and partitions can reside on different machines in the Kafka cluster that do not coordinate transactions when sent. At the time of this writing, some work has been done to make this possible with the KIP-98.
  • Associate reading one message from one topic with sending another message to another topic. Again, the architecture of Kafka depends on many independent machines running as one bus and no attempt is made to hide this. For example, there are no API components that would allow you to link consumer ΠΈ Prouduser in a transaction. In JMS, this is provided by the object Sessionfrom which are created MessageProducers ΠΈ MessageConsumers.

If we can't rely on transactions, how can we provide semantics closer to those provided by traditional messaging systems?

If there is a possibility that the consumer's offset may increase before the message has been processed, such as during a consumer crash, then the consumer has no way of knowing if its consumer group missed the message when it is assigned a partition. So one strategy is to rewind the offset to the previous position. The Kafka consumer API provides the following methods for this:

void seek(TopicPartition partition, long offset);
void seekToBeginning(Collection < TopicPartition > partitions);

Method seek() can be used with method
offsetsForTimes(Map timestampsToSearch) to rewind to a state at some specific point in the past.

Implicitly, using this approach means that it is very likely that some messages that were previously processed will be read and processed again. To avoid this, we can use idempotent reading, as described in Chapter 4, to keep track of previously viewed messages and eliminate duplicates.

Alternatively, your consumer code can be kept simple, as long as message loss or duplication is acceptable. When we consider use cases for which Kafka is commonly used, such as handling log events, metrics, click tracking, etc., we understand that the loss of individual messages is unlikely to have a significant impact on surrounding applications. In such cases, the default values ​​are perfectly acceptable. On the other hand, if your application needs to send payments, you must carefully take care of each individual message. It all comes down to context.

Personal observations show that as the intensity of messages increases, the value of each individual message decreases. Large messages tend to be valuable when viewed in an aggregated form.

High Availability

Kafka's approach to high availability is very different from ActiveMQ's approach. Kafka is designed around scale-out clusters where all broker instances receive and distribute messages at the same time.

A Kafka cluster consists of multiple broker instances running on different servers. Kafka was designed to run on ordinary standalone hardware, where each node has its own dedicated storage. The use of network attached storage (SAN) is not recommended because multiple compute nodes can compete for time.Π«e storage intervals and create conflicts.

Kafka is always on system. Many large Kafka users never shut down their clusters and the software always updates with a sequential restart. This is achieved by guaranteeing compatibility with the previous version for messages and interactions between brokers.

Brokers connected to a server cluster ZooKeeper, which acts as a configuration data registry and is used to coordinate the roles of each broker. ZooKeeper itself is a distributed system that provides high availability through the replication of information by establishing quorum.

In the base case, a topic is created in a Kafka cluster with the following properties:

  • The number of partitions. As discussed earlier, the exact value used here depends on the desired level of parallel reading.
  • The replication factor (factor) determines how many broker instances in the cluster should contain logs for this partition.

Using ZooKeepers for coordination, Kafka attempts to fairly distribute new partitions among the brokers in the cluster. This is done by a single instance that acts as a Controller.

At runtime for each topic partition Controller assign roles to a broker the leader (leader, master, presenter) and followers (followers, slaves, subordinates). The broker, acting as the leader for this partition, is responsible for receiving all the messages sent to it by the producers and distributing the messages to the consumers. When messages are sent to a topic partition, they are replicated to all broker nodes acting as followers for that partition. Each node containing logs for a partition is called replica. A broker can act as a leader for some partitions and as a follower for others.

A follower containing all messages held by the leader is called synchronized replica (a replica that is in a synchronized state, in-sync replica). If a broker acting as a leader for a partition goes down, any broker that is up to date or synchronized for that partition can take over the leader role. It's an incredibly sustainable design.

Part of the producer configuration is the parameter acks, which determines how many replicas must acknowledge (acknowledge) receipt of a message before the application thread continues sending: 0, 1, or all. If set to ALL, then when a message is received, the leader will send a confirmation back to the producer as soon as it receives confirmations (acknowledgments) of the record from several cues (including itself) defined by the topic setting min.insync.replicas (default 1). If the message cannot be successfully replicated, then the producer will throw an application exception (NotEnoughReplicas or NotEnoughReplicasAfterAppend).

A typical configuration creates a topic with a replication factor of 3 (1 leader, 2 followers per partition) and the parameter min.insync.replicas is set to 2. In this case, the cluster will allow one of the brokers managing the topic partition to go down without affecting client applications.

This brings us back to the already familiar trade-off between performance and reliability. Replication occurs at the expense of additional waiting time for confirmations (acknowledgments) from followers. Although, because it runs in parallel, replication to at least three nodes has the same performance as two (ignoring the increase in network bandwidth usage).

By using this replication scheme, Kafka cleverly avoids the need to physically write each message to disk with the operation sync(). Each message sent by the producer will be written to the partition log, but as discussed in Chapter 2, writing to a file is initially done in the operating system's buffer. If this message is replicated to another Kafka instance and is in its memory, the loss of the leader does not mean that the message itself was lost - it can be taken over by a synchronized replica.
Refusal to perform the operation sync() means that Kafka can receive messages as fast as it can write them to memory. Conversely, the longer you can avoid flushing memory to disk, the better. For this reason, it is not uncommon for Kafka brokers to be allocated 64 GB or more of memory. This memory usage means that a single Kafka instance can easily run at speeds many thousands of times faster than a traditional message broker.

Kafka can also be configured to apply the operation sync() to message packages. Since everything in Kafka is package-oriented, it actually works quite well for many use cases and is a useful tool for users who require very strong guarantees. Much of the pure performance of Kafka comes from the messages that are sent to the broker as packets and that these messages are read from the broker in sequential blocks using zero copy operations (operations during which the task of copying data from one memory area to another is not performed). The latter is a big performance and resource gain and is only possible through the use of an underlying log data structure that defines the partition scheme.

Much better performance is possible in a Kafka cluster than with a single Kafka broker, because topic partitions can scale out across many separate machines.

Results

In this chapter, we looked at how the Kafka architecture reimagines the relationship between clients and brokers to provide an incredibly robust messaging pipeline, with throughput many times greater than that of a conventional message broker. We have discussed the functionality it uses to achieve this and briefly looked at the architecture of applications that provide this functionality. In the next chapter, we'll look at common problems messaging-based applications need to solve and discuss strategies for dealing with them. We'll end the chapter by outlining how to talk about messaging technologies in general so you can evaluate their suitability for your use cases.

Previous translated part: Understanding message brokers. Learning the mechanics of messaging with ActiveMQ and Kafka. Chapter 1

Translation done: tele.gg/middle_java

To be continued ...

Only registered users can participate in the survey. Sign in, you are welcome.

Is Kafka used in your organization?

  • Yes

  • No

  • Previously used, now not

  • We plan to use

38 users voted. 8 users abstained.

Source: habr.com

Add a comment