Apache Kafka and Data Streaming with Spark Streaming
Hey Habr! Today we will build a system that will process Apache Kafka message streams using Spark Streaming and write the processing result to the AWS RDS cloud database.
Imagine that a certain credit institution sets us the task of processing incoming transactions "on the fly" for all its branches. This can be done for the purpose of quickly calculating the open currency position for the treasury, limits or financial result on transactions, etc.
How to implement this case without the use of magic and magic spells - read under the cut! Go!
Of course, real-time processing of a large amount of data provides ample opportunities for use in modern systems. One of the most popular combinations for this is the tandem of Apache Kafka and Spark Streaming, where Kafka creates a stream of incoming message packets, and Spark Streaming processes these packets at a specified time interval.
To improve the fault tolerance of the application, we will use checkpoints - checkpoints. With this mechanism, when the Spark Streaming module needs to recover lost data, it only needs to return to the last checkpoint and resume calculations from there.
The architecture of the developed system
Used components:
Apache Kafka is a distributed publish-and-subscribe messaging system. Suitable for both offline and online message consumption. To prevent data loss, Kafka messages are stored on disk and replicated within the cluster. The Kafka system is built on top of the ZooKeeper synchronization service;
Apache Spark Streaming - a Spark component for processing streaming data. The Spark Streaming module is built using a micro-batch architecture, when a data stream is interpreted as a continuous sequence of small data packets. Spark Streaming takes data from different sources and combines it into small batches. New packages are created at regular intervals. At the start of each time interval, a new packet is created, and any data received during that interval is included in the packet. At the end of the interval, packet growth stops. The size of the interval is determined by a parameter called the batch interval;
Apache Spark SQL - Combines relational processing with Spark functional programming. Structured data refers to data that has a schema, that is, a single set of fields for all records. Spark SQL supports input from a variety of structured data sources and, due to the presence of schema information, it can efficiently retrieve only the required fields of records, and also provides DataFrame APIs;
AWS RDS is a relatively inexpensive cloud-based relational database, a web service that simplifies setup, operation, and scaling, administered directly by Amazon.
Installing and running the Kafka server
Before using Kafka directly, you need to make sure that you have Java, because JVM is used for work:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
The next step is optional. The fact is that the default settings do not allow you to fully use all the features of Apache Kafka. For example, delete a topic, category, group, on which messages can be published. To change this, let's edit the configuration file:
vim ~/kafka/config/server.properties
Add the following to the end of the file:
delete.topic.enable = true
Before starting the Kafka server, you need to start the ZooKeeper server, we will use the helper script that comes with the Kafka distribution:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
After ZooKeeper has successfully started, we start the Kafka server in a separate terminal:
Let's miss the moments of testing the producer and consumer for the newly created topic. More details on how you can test sending and receiving messages are written in the official documentation - Send some messages. Well, we are moving on to writing a producer in Python using the KafkaProducer API.
Producer writing
The producer will generate random data - 100 messages every second. By random data we mean a dictionary consisting of three fields:
Branch — name of the point of sale of the credit institution;
Currency — transaction currency;
Amount - transaction amount. The amount will be positive if it is a purchase of currency by the Bank, and negative if it is a sale.
Next, using the send method, we send a message to the server, to the topic we need, in JSON format:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
When running the script, we get the following messages in the terminal:
This means that everything works as we wanted - the producer generates and sends messages to the topic we need.
The next step is to install Spark and process this message flow.
Installing Apache Spark
Apache Spark is a versatile and high performance cluster computing platform.
Spark outperforms popular implementations of the MapReduce model in terms of performance, while providing support for a wider range of computation types, including interactive queries and streaming. Speed plays an important role when processing large amounts of data, since it is speed that allows you to work interactively without spending minutes or hours waiting. One of Spark's biggest strengths for delivering this speed is its ability to perform in-memory calculations.
This framework is written in Scala, so you need to install it first:
sudo apt-get install scala
Download the Spark distribution from the official website:
Run the command below after making changes to bashrc:
source ~/.bashrc
Deploying AWS PostgreSQL
It remains to expand the database, where we will fill in the processed information from the streams. To do this, we will use the AWS RDS service.
Go to the AWS console -> AWS RDS -> Databases -> Create database:
Select PostgreSQL and click the Next button:
Because this example is analyzed solely for educational purposes, we will use a free server “at the minimum” (Free Tier):
Next, check the box Free Tier, and after that we will be automatically offered an instance of the t2.micro class - although weak, but free and quite suitable for our task:
Very important things follow: the name of the DB instance, the name of the master user and his password. Let's call the instance: myHabrTest, master user: there will, password: habr12345 and click on the Next button:
The next page contains the parameters responsible for the accessibility of our database server from the outside (Public accessibility) and the availability of ports:
Let's create a new setting for the VPC security group, which will allow external access to our database server through port 5432 (PostgreSQL).
Let's go in a separate browser window to the AWS console in the VPC Dashboard -> Security Groups -> Create security group section:
We set the name for the Security group - PostgreSQL, a description, specify which VPC this group should be associated with and click the Create button:
We fill in for the newly created group Inbound rules for port 5432, as shown in the picture below. You can not specify the port manually, but select PostgreSQL from the Type drop-down list.
Strictly speaking, the value ::/0 means the availability of incoming traffic for the server from all over the world, which is not quite true canonically, but to analyze the example, let's use this approach:
We return to the browser page, where we have “Configure advanced settings” open and select VPC security groups -> Choose existing VPC security groups -> PostgreSQL in the section:
Next, in the section Database options -> Database name -> set the name - habrDB.
We can leave the rest of the parameters, with the exception of disabling backup (backup retention period - 0 days), monitoring and Performance Insights, by default. Click on the button Create database:
Stream Handler
The final stage will be the development of a Spark job, which will process new data from Kafka every two seconds and enter the result into the database.
As noted above, checkpoints are the main mechanism in SparkStreaming that must be configured to provide fault tolerance. We will use checkpoints and, in the event of a procedure failure, the Spark Streaming module will only need to return to the last checkpoint and resume calculations from it to recover the lost data.
A checkpoint can be enabled by setting a directory on a fault-tolerant, reliable file system (e.g. HDFS, S3, etc.) where the checkpoint information will be stored. This is done with, for example:
streamingContext.checkpoint(checkpointDirectory)
In our example, we will use the following approach, namely, if the checkpointDirectory exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e. it is being executed for the first time), then the functionToCreateContext function is called to create a new context and set up DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
We create a DirectStream object to connect to the "transaction" topic using the createDirectStream method of the KafkaUtils library:
Using Spark SQL, we do a simple grouping and output the result to the console:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Getting the query body and running it through Spark SQL:
And then we save the received aggregated data into a table in AWS RDS. To save the results of the aggregation to a database table, we will use the write method of the DataFrame object:
A few words about setting up a connection to AWS RDS. We created the user and password for it at the “Deploying AWS PostgreSQL” step. As the url of the database server, you should use the Endpoint, which is displayed in the Connectivity & security section:
In order to correctly connect Spark and Kafka, you should run the job through smark-submit using the artifact spark-streaming-kafka-0-8_2.11. Additionally, we will also use an artifact for interacting with the PostgreSQL database, we will pass them through --packages.
For flexibility of the script, we will also take out the name of the message server and the topic from which we want to receive data as input parameters.
Everything worked out! As you can see in the picture below, while the application is running, new aggregation results are displayed every 2 seconds, because we set the bundling interval to 2 seconds when we created the StreamingContext object:
Next, we make a simple query to the database to check for records in the table transaction_flow:
Conclusion
In this article, an example of streaming information processing using Spark Streaming in conjunction with Apache Kafka and PostgreSQL was considered. With the growth in volumes of data from various sources, it is difficult to overestimate the practical value of Spark Streaming for creating real-time and streaming applications.
You can find the full source code in my repository at GitHub.
I am happy to discuss this article, I look forward to your comments, and also, I hope for constructive criticism from all concerned readers.
I wish you success!
Ps. It was originally planned to use a local PostgreSQL database, but given my love for AWS, I decided to move the database to the cloud. In the next article on this topic, I will show you how to implement the entire system described above in AWS using AWS Kinesis and AWS EMR. Follow the news!