The book "Kafka Streams in action. Applications and microservices for real-time work"

The book "Kafka Streams in action. Applications and microservices for real-time work" Hello Habrites! This book is suitable for any developer who wants to understand streaming. Understanding distributed programming will help you better understand Kafka and Kafka Streams. It would be nice to know the Kafka framework itself, but this is not necessary: ​​I will tell you everything you need. Experienced Kafka developers as well as beginners will learn how to build interesting streaming applications using the Kafka Streams library with this book. Intermediate to advanced Java developers, already familiar with concepts like serialization, will learn how to apply their skills to building Kafka Streams applications. The book's source code is written in Java 8 and heavily uses the Java 8 lambda expression syntax, so knowing how to work with lambda functions (even in a different programming language) will come in handy.

Excerpt. 5.3. Aggregation and window operations

In this section, we will move on to exploring the most promising parts of Kafka Streams. So far we have covered the following aspects of Kafka Streams:

  • creation of processing topology;
  • using state in streaming applications;
  • performing data stream connections;
  • differences between event streams (KStream) and update streams (KTable).

In the following examples, we will collect all these elements together. You'll also learn about window operations, another great feature of streaming applications. Our first example will be a simple aggregation.

5.3.1. Aggregation of share sales by industry

Aggregation and grouping are vital tools when working with streaming data. Researching individual records as they come in is often not enough. To extract additional information from the data, they need to be grouped and combined.

In this example, you'll be dressed as a day trader who needs to track the sales of stocks in companies across multiple industries. Specifically, you are interested in the five companies with the largest share sales in each industry.

For such aggregation, several next steps will be required to translate the data into the desired form (in general terms).

  1. Create a topic-based source that publishes raw stock trading information. We will have to map an object of type StockTransaction to an object of type ShareVolume. The fact is that the StockTransaction object contains sales metadata, and we only need data on the number of shares being sold.
  2. Group ShareVolume data by share symbols. Once grouped by symbol, you can collapse this data down to subtotals of stock sales volumes. Note that the KStream.groupBy method returns an instance of the KGroupedStream type. And you can get a KTable instance by calling the KGroupedStream.reduce method next.

What is the KGroupedStream interface

The KStream.groupBy and KStream.groupByKey methods return a KGroupedStream instance. KGroupedStream is an intermediate representation of the stream of events after grouping by keys. It is not designed to work directly with him at all. Instead, KGroupedStream is used for aggregation operations that always result in a KTable. And since the result of the aggregation operations is a KTable and they use a state store, it is possible that not all updates in the result are sent further down the pipeline.

The KTable.groupBy method returns a similar KGroupedTable - an intermediate representation of a stream of updates regrouped by key.

Let's take a short break and look at Fig. 5.9, which shows what we have achieved. This topology should be familiar to you by now.

The book "Kafka Streams in action. Applications and microservices for real-time work"
Let's now take a look at the code for this topology (it can be found in src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.2).

The book "Kafka Streams in action. Applications and microservices for real-time work"
The above code is notable for its brevity and large volume of actions performed in several lines. You may notice something new in the first parameter of the builder.stream method: the value of the AutoOffsetReset.EARLIEST enumerated type (there is also LATEST), which is set using the Consumed.withOffsetResetPolicy method. This enumerated type can be used to specify an offset reset strategy for each KStream or KTable, and takes precedence over the offset reset setting from the configuration.

GroupByKey and GroupBy

The KStream interface has two methods for grouping records: GroupByKey and GroupBy. Both return KGroupedTable, so you might be wondering what is the difference between them and when to use which one?

The GroupByKey method is used when the keys in the KStream are already non-empty. More importantly, the "requires repartitioning" flag was never set.

The GroupBy method assumes that you have changed the keys for the grouping, so the repartition flag is set to true. Joins, aggregations, etc. after the GroupBy method will automatically re-partition.
Summary: You should use GroupByKey, not GroupBy, whenever possible.

What the mapValues ​​and groupBy methods do is clear, so let's take a look at the sum() method (found in src/main/java/bbejeck/model/ShareVolume.java) (Listing 5.3).

The book "Kafka Streams in action. Applications and microservices for real-time work"
The ShareVolume.sum method returns the subtotal of the share sales volume, and the result of the entire calculation chain is a KTable object . Now you understand what role KTable plays. When ShareVolume objects arrive, the latest up-to-date update is stored in the corresponding KTable object. It is important to remember that all updates are reflected in the previous shareVolumeKTable, but not all are sent further.

We then use this KTable to aggregate (by number of shares traded) to get the five companies with the highest volumes of stock sales in each industry. Our actions in this case will be similar to the actions during the first aggregation.

  1. Perform another groupBy operation to group individual ShareVolume objects by industry.
  2. Proceed to summing the ShareVolume objects. This time the aggregation object is a fixed size priority queue. Only the five companies with the most sold shares remain in this fixed size queue.
  3. Map the queues from the previous item to a string value and return the five top-selling stocks by industry by industry.
  4. Write the results in string form to the topic.

On fig. Figure 5.10 shows a data flow topology graph. As you can see, the second round of processing is quite simple.

The book "Kafka Streams in action. Applications and microservices for real-time work"
Now, having clearly understood the structure of this second processing circle, we can refer to its source code (you will find it in the file src/main/java/bbejeck/chapter_5/AggregationsAndReducingExample.java) (Listing 5.4).

This initializer has a fixedQueue variable. This is a custom adapter object for a java.util.TreeSet that is used to keep track of the top N results in descending order of the number of shares sold.

The book "Kafka Streams in action. Applications and microservices for real-time work"
You've already seen calls to groupBy and mapValues, so we won't dwell on them (we're calling the KTable.toStream method because the KTable.print method is deprecated). But you haven't seen the KTable version of aggregate() yet, so we'll spend a little time discussing it.

As you remember, KTable is different in that records with the same key are considered updates. KTable replaces the old entry with the new one. Aggregation works in a similar way: the latest records with the same key are aggregated. When an entry arrives, it is added to an instance of the FixedSizePriorityQueue class using an adder (the second parameter in the call to the aggregate method), but if another entry with the same key already exists, then the old entry is removed using the subtractor (the third parameter in the call to the aggregate method).

This all means that our aggregator, FixedSizePriorityQueue, does not aggregate all values ​​with one key at all, but stores a rolling sum of N quantities of the most traded types of stocks. Each incoming entry contains the total number of shares sold so far. KTable will give you information about which stocks are currently selling the most, rolling aggregation of each update is not required.

We have learned to do two important things:

  • group values ​​in KTable by their common key;
  • perform useful operations on these grouped values, such as convolution and aggregation.

Knowing how to perform these operations is important for understanding the meaning of the data moving through the Kafka Streams application and understanding what information it conveys.

We have also brought together some of the key concepts discussed earlier in this book. In Chapter 4, we discussed the importance of fault-tolerant, local state in a streaming application. The first example in this chapter showed why local state is so importantβ€”it gives you the ability to keep track of what information you've already seen. Local access avoids network delays, making the application more performant and error tolerant.

When performing any rollup or aggregation operation, you must specify the name of the state store. The fold and aggregate operations return a KTable instance, and the KTable uses a state store to replace old results with new ones. As you have seen, not all updates are sent further down the pipeline, and this is important, since aggregation operations are designed to obtain summary information. If you do not apply local state, KTable will send all the results of aggregation and reduction further.

Next, we will look at performing operations such as aggregation within a specific period of time - the so-called window operations (windowing operations).

5.3.2. Window Operations

In the previous section, we got acquainted with "sliding" convolution and aggregation. The application continuously rolled up the volume of stock sales, followed by aggregation of the five best-selling stocks on the exchange.

Sometimes such continuous aggregation and convolution of results is necessary. And sometimes you need to perform operations only over a given period of time. For example, to calculate how many exchange transactions were made with the shares of a particular company in the last 10 minutes. Or how many users clicked on a new banner ad in the last 15 minutes. An application can perform such operations repeatedly, but with results related only to specified time intervals (time windows).

Counting exchange transactions by buyer

In the following example, we will be tracking stock exchange transactions for several traders - either large organizations or smart lone financiers.

There are two possible reasons for this tracking. One of them is the need to know what the market leaders are buying/selling. If these big players and sophisticated investors see an opportunity for themselves, it makes sense to follow their strategy. The second reason is the desire to notice any possible signs of illegal transactions using inside information. To do this, you will need to analyze the correlation of large sales spikes with important press releases.

This tracking consists of the following steps:

  • creating a thread for reading from the stock-transactions topic;
  • grouping incoming records by buyer ID and stock symbol. Calling the groupBy method returns an instance of the KGroupedStream class;
  • the KGroupedStream.windowedBy method returns a stream of data bounded by a time window, which allows for windowed aggregation. Depending on the window type, either a TimeWindowedKStream or a SessionWindowedKStream is returned;
  • transaction count for the aggregation operation. The window dataflow determines whether a particular record is included in this count;
  • writing results to a topic or outputting them to the console at design time.

The topology of this application is simple, but its visual picture does not hurt. Let's look at fig. 5.11.

Next, we will look at the functionality of window operations and the corresponding code.

The book "Kafka Streams in action. Applications and microservices for real-time work"

Window types

There are three types of windows in Kafka Streams:

  • session;
  • "tumbling" (tumbling);
  • sliding / "jumping" (sliding / hopping).

Which one to choose depends on business requirements. Tumbling and jumping windows are time-limited, while session-windows are time-bound, with session(s) duration determined solely by how active the user is. The main thing to remember is that all window types are based on the date/time stamps of the entries, not the system time.

Next, we implement our topology with each of the window types. The full code will be shown only in the first example, nothing will change for other types of windows, except for the type of window operation.

Session windows

Session windows are very different from all other types of windows. They are limited not so much by time as by user activity (or the activity of the entity that you would like to track). Session windows are delimited by periods of inactivity.

Figure 5.12 illustrates the concept of session windows. The smaller session will merge with the session to its left. And the session on the right will be separate because it follows a long period of inactivity. Session windows are based on user actions, but use the date/time stamps from the entries to determine which session the entry belongs to.

The book "Kafka Streams in action. Applications and microservices for real-time work"

Using session windows to track exchange transactions

Let's use session windows to capture information about exchange transactions. The implementation of session windows is shown in Listing 5.5 (which can be found in src/main/java/bbejeck/chapter_5/CountingWindowingAndKTableJoinExample.java).

The book "Kafka Streams in action. Applications and microservices for real-time work"
You have already seen most of the operations of this topology, so there is no need to review them here again. But there are also a few new elements here, which we will now discuss.

Any groupBy operation usually performs some kind of aggregation operation (aggregate, rollup, or count). You can perform either a running total cumulative aggregation or a windowed aggregation that considers records within a given time window.

The code in Listing 5.5 does a count of the number of transactions within session windows. On fig. 5.13 these actions are analyzed step by step.

By calling windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)) we create a session window with a sleep interval of 20 seconds and a save interval of 15 minutes. An idle interval of 20 seconds means that the application will include any entry that arrives within 20 seconds of the end or start of the current session in the current (active) session.

The book "Kafka Streams in action. Applications and microservices for real-time work"
Next, we specify which aggregation operation to perform in the session windowβ€”in this case, count. If the incoming write goes beyond the idle interval (on either side of the date/time stamp), then the application creates a new session. A persistence interval means keeping a session alive for a certain amount of time and allows for late data that goes beyond the session's idle period but can still be attached. In addition, the start and end of the new session resulting from the merging correspond to the earliest and latest date/time stamps.

Let's look at a few entries from the count method to see how sessions work (Table 5.1).

The book "Kafka Streams in action. Applications and microservices for real-time work"
When records arrive, we look for already existing sessions with the same key, end time less than current date/time stamp - idle interval, and start time greater than current date/time stamp + idle interval. With this in mind, four entries from Table. 5.1 merge into a single session as follows.

1. Record 1 arrives first, so the start time is equal to the end time and is 00:00:00.

2. Next comes record 2, and we look for sessions that end no earlier than 23:59:55 and start no later than 00:00:35. We find record 1 and merge sessions 1 and 2. We take the start time of session 1 (earlier) and the end time of session 2 (later), so our new session starts at 00:00:00 and ends at 00:00:15.

3. Record 3 arrives, we look for sessions between 00:00:30 and 00:01:10 and find none. We add a second session for the key 123-345-654,FFBE, starting and ending at 00:00:50.

4. Record 4 arrives and we are looking for sessions between 23:59:45 and 00:00:25. This time both sessions are found - 1 and 2. All three sessions are combined into one, with a start time of 00:00:00 and an end time of 00:00:15.

Here are some important things to keep in mind from this section:

  • sessions are not fixed-size windows. The duration of a session is determined by the activity within a given period of time;
  • the date/time stamps in the data determine whether the event falls within an existing session or an idle period.

Next, we will discuss the next type of window - "tumble" windows.

"Tumbling" windows

"Tumbling" windows capture events that fall within a certain period of time. Imagine that you need to capture all the exchange transactions of a certain company every 20 seconds, so you collect all the events for this period of time. At the end of the 20-second interval, the window "tumbles" and switches to a new 20-second observation interval. Figure 5.14 illustrates this situation.

The book "Kafka Streams in action. Applications and microservices for real-time work"
As you can see, all events received in the last 20 seconds are included in the window. After this period of time, a new window is created.

Listing 5.6 shows the code that demonstrates the use of tumbling windows to capture exchange transactions every 20 seconds (found in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

The book "Kafka Streams in action. Applications and microservices for real-time work"
With this slight change to the call to the TimeWindows.of method, a tumbling window can be used. In this example, there is no call to the until() method, so the default save interval of 24 hours will be used.

Finally, it's time to move on to the last of the window options, hopping windows.

Sliding ("jumping") windows

Sliding/hopping windows are similar to tumbling windows, but with a slight difference. Sliding windows do not wait for a time interval to elapse before creating a new window to handle recent events. They start new calculations after a timeout interval less than the duration of the window.

To illustrate the differences between "tumbling" and "jumping" windows, let's go back to the example of counting exchange transactions. Our goal is still to count the number of transactions, but we don't want to wait all the time before updating the counter. Instead, we will update the counter at shorter intervals. For example, we will still count the number of transactions every 20 seconds, but update the counter every 5 seconds, as shown in Fig. 5.15. In this case, we have three result windows with overlapping data.

The book "Kafka Streams in action. Applications and microservices for real-time work"
Listing 5.7 shows the code for setting sliding windows (found in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java).

The book "Kafka Streams in action. Applications and microservices for real-time work"
A tumbling window can be made into a bouncing window by adding a call to the advanceBy() method. In the example shown, the save interval is 15 minutes.

You saw in this section how to limit the results of an aggregation to time windows. In particular, I would like you to remember the following three things from this section:

  • the size of session windows is limited not by a period of time, but by user activity;
  • "Tumble" windows give an idea of ​​events within a given period of time;
  • The duration of the "jumping" windows is fixed, but they are frequently updated and may contain overlapping entries in all windows.

Next, we'll learn how to convert a KTable back to a KStream for connection.

5.3.3. Connecting KStream and KTable Objects

In Chapter 4, we discussed connecting two KStream objects. Now we have to learn how to connect KTable and KStream. This may be needed for the following simple reason. KStream is a record stream and KTable is a record update stream, but sometimes it may be necessary to add additional context to the record stream with updates from KTable.

Let's take data on the number of exchange transactions and combine them with exchange news for the relevant industries. Here's what you need to do to achieve this, given the code you already have.

  1. Convert a KTable object with data on the number of exchange transactions to a KStream, and then replace the key with a key denoting the industry corresponding to the given stock symbol.
  2. Create a KTable object that reads data from an exchange news topic. This new KTable will be categorized by industry.
  3. Connect news updates with information on the number of exchange transactions by industry.

Now let's see how to implement this action plan.

Converting KTable to KStream

To convert KTable to KStream you need to do the following.

  1. Call the KTable.toStream() method.
  2. By calling the KStream.map method, replace the key with the name of the industry, and then retrieve the TransactionSummary object from the Windowed instance.

We will chain these operations as follows (the code can be found in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.8).

The book "Kafka Streams in action. Applications and microservices for real-time work"
Since we are performing the KStream.map operation, re-partitioning for the returned KStream instance is done automatically when it is used in a connection.

We have completed the conversion process, next we need to create a KTable object for reading stock news.

Creating a KTable for stock news

Fortunately, one line of code is enough to create a KTable object (this code can be found in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.9).

The book "Kafka Streams in action. Applications and microservices for real-time work"
It is worth noting that no Serde objects are required, since string Serde are used in the settings. Also, thanks to the use of the EARLIEST enumeration, the table is filled with records at the very beginning.

Now we can move on to the final step - the connection.

Linking news updates with transaction count data

Creating a connection is not difficult. We'll use a left join in case there's no stock news for the relevant industry (you can find the code in src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) (Listing 5.10-XNUMX).

The book "Kafka Streams in action. Applications and microservices for real-time work"
This leftJoin operator is quite simple. Unlike the joins in Chapter 4, the JoinWindow method is not used because when a KStream-KTable join is made, there is only one entry in the KTable for each key. Such a connection is not limited in time: the entry either exists in the KTable or it does not exist. Key takeaway: Using KTable objects, you can enrich a KStream with less frequently updated reference data.

And now we will look at a more efficient way to enrich events from KStream.

5.3.4. GlobalKTable objects

As you understand, there is a need to enrich the event streams or add context to them. In Chapter 4, you saw the connection of two KStream objects, and in the previous section, you saw the connection of KStream and KTable. In all these cases, it is necessary to repartition the data stream when mapping keys to a new type or value. Sometimes repartitioning is done explicitly, and sometimes Kafka Streams does it automatically. The repartitioning is necessary because the keys have changed and the records must end up in the new partitions or the join will not be possible (this was discussed in Chapter 4, "Data Repartitioning" in Section 4.2.4).

Re-partitioning has a price

Re-partitioning comes at a cost - additional resource costs for creating intermediate topics, storing duplicate data in another topic; it also means increased latency due to writing to and reading from that topic. In addition, if you want to join on more than one aspect or dimension, you need to chain join, map records with new keys, and run the repartition process again.

Connecting to smaller datasets

In some cases, the amount of reference data to which a connection is planned is relatively small, so that complete copies of it may well fit locally on each of the nodes. For such situations, Kafka Streams provides the GlobalKTable class.

GlobalKTable instances are unique because the application replicates all data to each of the nodes. And since each of the nodes has all the data, there is no need to partition the event stream by the reference data key so that it is available to all partitions. You can also perform keyless connections using GlobalKTable objects. Let's go back to one of the previous examples to demonstrate this possibility.

Connecting KStream Objects to GlobalKTable Objects

In subsection 5.3.2, we performed a windowed aggregation of exchange transactions by buyers. The results of this aggregation looked something like this:

{customerId='074-09-3705', stockTicker='GUTM'}, 17
{customerId='037-34-5184', stockTicker='CORK'}, 16

While these results served the intended purpose, it would have been more convenient if the customer's name and full company name were also displayed. To add a customer name and a company name, you can do normal joins, but you will need to do two key mappings and re-partition. With GlobalKTable, you can avoid the cost of such operations.

To do this, we'll use the countStream object from Listing 5.11-5 (the corresponding code can be found in src/main/java/bbejeck/chapter_XNUMX/GlobalKTableExample.java) and connect it to two GlobalKTable objects.

The book "Kafka Streams in action. Applications and microservices for real-time work"
We've discussed this before, so I won't repeat myself. But note that the code in the toStream().map function is abstracted into a function object for readability instead of an inline lambda expression.

The next step is to declare two GlobalKTable instances (this code can be found in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (listing 5.12-XNUMX).

The book "Kafka Streams in action. Applications and microservices for real-time work"

Note that topic names are described using enumerated types.

Now that we've got all the components ready, the only thing left to do is write the connection code (which can be found in src/main/java/bbejeck/chapter_5/GlobalKTableExample.java) (Listing 5.13-XNUMX).

The book "Kafka Streams in action. Applications and microservices for real-time work"
Although there are two joins in this code, they are organized as a chain because neither of their results is used separately. The results are displayed at the end of the entire operation.

When you run the above join operation, you will get results like this:

{customer='Barney, Smith' company="Exxon", transactions= 17}

The essence has not changed, but these results look more understandable.

If you count Chapter 4, you have already seen several types of joins in action. They are listed in Table. 5.2. This table reflects connectivity as of version 1.0.0 of Kafka Streams; things may change in future releases.

The book "Kafka Streams in action. Applications and microservices for real-time work"
In conclusion, let me remind you of the main thing: you can connect event streams (KStream) and update streams (KTable) using local state. In addition, if the size of the reference data is not too large, you can use the GlobalKTable object. GlobalKTable replicate all partitions to each of the nodes of the Kafka Streams application, thus ensuring that all data is available, regardless of which partition the key corresponds to.

Next, we will see the Kafka Streams feature, which allows you to observe state changes without consuming data from a Kafka topic.

5.3.5. Requestable state

We've done a few stateful operations before and have always logged the results to the console (for development purposes) or logged them to a topic (for production purposes). When writing results to a topic, you have to use a Kafka consumer to view them.

Reading data from these topics can be considered a kind of materialized views. For our tasks, we can use the definition of a materialized view from Wikipedia: β€œ... a physical database object containing the results of a query. For example, it could be a local copy of remote data, or a subset of the rows and/or columns of a table or join result, or a pivot table obtained by aggregation” (https://en.wikipedia.org/wiki/Materialized_view).

Kafka Streams also allows interactive queries against state stores, which makes it possible to directly read these materialized views. It is important to note that a request to the state store is a read-only operation. This ensures that you don't have to worry about accidentally making the state inconsistent while the application is processing data.

The ability to query state stores directly is important. It means that you can create dashboard applications without having to first get data from a Kafka consumer. It also increases the efficiency of the application, due to the fact that it is not necessary to write data again:

  • due to the locality of the data, they can be quickly accessed;
  • duplication of data is eliminated, since they are not written to external storage.

The main thing I would like you to remember is that you can query the state directly from the application. The possibilities this gives you cannot be overestimated. Instead of consuming data from Kafka and storing records in the database for the application, you can query the state stores with the same result. Querying state stores directly means less code (no consumer) and less software (no need for a database table to store results).

We've covered a lot of information in this chapter, so we'll stop our discussion of interactive queries on state stores for a moment. But don't worry: in Chapter 9, we'll create a simple dashboard application with interactive queries. It will use some of the examples in this and previous chapters to demonstrate interactive queries and how to add them to Kafka Streams applications.

Summary

  • KStream objects represent streams of events, comparable to database inserts. KTable objects represent update streams, more like updates in a database. The size of the KTable object does not grow, old records are replaced by new ones.
  • KTable objects are required for aggregation operations.
  • Windowing allows you to break down aggregated data into time bins.
  • Thanks to GlobalKTable objects, reference data can be accessed anywhere in the application, regardless of partitioning.
  • Connections between KStream, KTable and GlobalKTable objects are possible.

So far, we have focused on building Kafka Streams applications using the high-level DSL KStream. Although the high-level approach allows you to create neat and concise programs, its use represents a certain compromise. Working with DSL KStream means making code more concise at the cost of less control. In the next chapter, we'll look at the low-level handler node API and try other trade-offs. Programs will become longer than they have been so far, but we will be able to create almost any handler node that we may need.

β†’ More details about the book can be found at publisher's website

β†’ For Khabrozhiteli, a 25% discount on the coupon - Kafka Streams

β†’ Upon payment for the paper version of the book, an e-book is sent to the e-mail.

Source: habr.com

Add a comment