Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Hey Habr!

We remind you that following the book about Kafka we have released an equally interesting work on the library Kafka Streams API.

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

So far, the community is only learning the limits of this powerful tool. So, recently an article was published, with the translation of which we want to introduce you. Based on his own experience, the author tells how to make a distributed data warehouse out of Kafka Streams. Enjoy reading!

Apache Library Kafka Streams is used worldwide in the enterprise for distributed stream processing on top of Apache Kafka. One of the underestimated aspects of this framework is that it allows you to store local state based on stream processing.

In this article, I will tell you how our company managed to use this opportunity to its advantage when developing a cloud application security product. Using Kafka Streams, we have created shared state microservices, each of which serves as a fault-tolerant and highly available source of reliable information about the state of objects in the system. For us, this is a step forward both in terms of reliability and ease of support.

If you are interested in an alternative approach that allows you to use a single central database to maintain the formal state of your objects, read it, it will be interesting ...

Why we thought it was time to change how we deal with shared state

We needed to maintain the state of various objects based on agent reports (for example: was the site under attack)? Before switching to Kafka Streams, we often relied on a single central database (+ service API) for state management. This approach has its drawbacks: data intensive situations maintaining consistency and synchronization turns into a real challenge. The database can become a bottleneck, or end up in race condition and suffer from unpredictability.

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Figure 1: A typical split-state scenario encountered on before the transition to
Kafka and Kafka Streams: agents communicate their views via API, updated state is calculated via central database

Meet Kafka Streams - it's now easy to create microservices with shared state

About a year ago, we decided to take a hard look at our shared state scenarios to address these issues. We immediately decided to try Kafka Streams - we know how scalable, highly available and fault-tolerant it is, how rich its streaming functionality is (transformations, including stateful transformations). Just what we needed, not to mention how mature and robust the messaging system is in Kafka.

Each of the stateful microservices we created was built on top of a Kafka Streams instance with a fairly simple topology. It consisted of 1) a source 2) a processor with persistent key and value storage 3) a sink:

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Figure 2: The default topology of our streaming instances for stateful microservices. Note that there is also a vault that contains scheduling metadata.

With this new approach, agents compose messages that are fed into the source topic, and consumers—say, a mail notification service—receive the computed shared state via the sink (output topic).

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Figure 3: A new workflow example for a shared microservices scenario: 1) the agent spawns a message that arrives in a Kafka source topic; 2) a shared state microservice (using Kafka Streams) processes it and writes the computed state to the Kafka end topic; after which 3) consumers accept the new state

Hey, that built-in key and value store is really useful!

As mentioned above, our shared state topology contains a key and value store. We have found several uses for it, and two of them are described below.

Option #1: Using a Key and Value Store in Calculations

Our first key and value store contained ancillary data that we needed for our calculations. For example, in some cases, the shared state was determined by the principle of "majority votes". It was possible to store all the latest reports of agents on the state of some object in the repository. Then, when we received a new report from one agent or another, we could save it, extract the reports of all other agents on the state of the same object from the storage, and repeat the calculation.
Figure 4 below shows how we exposed the key and value store to the processing method of the processor so that the new message could then be processed.

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Figure 4: Accessing the key and value store for the processor's handler method (after that, in every script that works with shared state, you need to implement the method doProcess)

Option #2: Building a CRUD API on top of Kafka Streams

With our basic task flow in place, we started trying to write a RESTful CRUD API for our shared state microservices. We wanted to be able to retrieve the state of some or all of the objects, as well as set or remove the state of an object (this is useful with back-end support).

To support all the Get State APIs, whenever we needed to recompute the state during processing, we long-term stacked it in the built-in key and value store. In this case, it becomes quite simple to implement such an API with a single instance of Kafka Streams, as shown in the listing below:

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Figure 5: Using the built-in key and value store to get the precomputed state of an object

Updating the state of an object via the API is also easy to implement. In principle, all you need to do is create a Kafka producer, and use it to make a record that encloses the new state. This ensures that all messages generated via the API will be treated exactly the same as those coming from other producers (eg agents).

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Illustration 6: You can set the state of an object using a Kafka producer

Minor complication: Kafka has many partitions

Next, we wanted to distribute the processing load and improve availability by providing a cluster of shared state microservices per scenario. Setup was a breeze for us: once we configured all the instances to run with the same application ID (and the same bootstrap servers), pretty much everything else was done automatically. We also specified that each source topic will consist of several partitions, so that each instance can be assigned a subset of such partitions.

I will also mention that here it is in the order of things to make a backup copy of the state store, so that, for example, in case of recovery after a failure, transfer this copy to another instance. For each state store in Kafka Streams, a replicated topic with a change log (which tracks local updates) is created. Thus, Kafka constantly secures the state store. Therefore, in the event of a failure of one or another Kafka Streams instance, the state store can be quickly restored to another instance, where the corresponding partitions will go. Our tests have shown that this is done in a matter of seconds, even if there are millions of records in storage.

Moving from a single shared state microservice to a cluster of microservices, it becomes less trivial to implement the Get State API. In the new situation, the state store of each microservice contains only a part of the overall picture (those objects whose keys were mapped to a particular partition). We had to determine which instance contained the state of the object we wanted, and we did this based on the stream metadata, as shown below:

Not only processing: How we made a distributed database from Kafka Streams, and what came of it

Illustration 7: using stream metadata, we determine from which instance to request the state of the desired object; a similar approach was taken with the GET ALL API

Main conclusions

State stores in Kafka Streams can de facto serve as a distributed database,

  • permanently replicated in Kafka
  • A CRUD API is easily built on top of such a system.
  • Handling multiple partitions is a bit tricky
  • It is also possible to add one or more state stores to the streaming topology to store auxiliary data. This option can be used for:
  • Long-term storage of data needed for calculations during stream processing
  • Long-term storage of data that can be useful the next time a streaming instance is initialized
  • many more...

These and other advantages make Kafka Streams great for maintaining global state in a distributed system like ours. Kafka Streams has proven to be very reliable in production (we've lost virtually no messages since it was deployed), and we're sure it doesn't stop there!

Source: habr.com

Add a comment