Apache Kafka and Data Streaming with Spark Streaming

Hey Habr! Today we will build a system that will process Apache Kafka message streams using Spark Streaming and write the processing result to the AWS RDS cloud database.

Imagine that a certain credit institution sets us the task of processing incoming transactions "on the fly" for all its branches. This can be done for the purpose of quickly calculating the open currency position for the treasury, limits or financial result on transactions, etc.

How to implement this case without the use of magic and magic spells - read under the cut! Go!

Apache Kafka and Data Streaming with Spark Streaming
(Picture source)

Introduction

Of course, real-time processing of a large amount of data provides ample opportunities for use in modern systems. One of the most popular combinations for this is the tandem of Apache Kafka and Spark Streaming, where Kafka creates a stream of incoming message packets, and Spark Streaming processes these packets at a specified time interval.

To improve the fault tolerance of the application, we will use checkpoints - checkpoints. With this mechanism, when the Spark Streaming module needs to recover lost data, it only needs to return to the last checkpoint and resume calculations from there.

The architecture of the developed system

Apache Kafka and Data Streaming with Spark Streaming

Used components:

  • Apache Kafka is a distributed publish-and-subscribe messaging system. Suitable for both offline and online message consumption. To prevent data loss, Kafka messages are stored on disk and replicated within the cluster. The Kafka system is built on top of the ZooKeeper synchronization service;
  • Apache Spark Streaming - a Spark component for processing streaming data. The Spark Streaming module is built using a micro-batch architecture, when a data stream is interpreted as a continuous sequence of small data packets. Spark Streaming takes data from different sources and combines it into small batches. New packages are created at regular intervals. At the start of each time interval, a new packet is created, and any data received during that interval is included in the packet. At the end of the interval, packet growth stops. The size of the interval is determined by a parameter called the batch interval;
  • Apache Spark SQL - Combines relational processing with Spark functional programming. Structured data refers to data that has a schema, that is, a single set of fields for all records. Spark SQL supports input from a variety of structured data sources and, due to the presence of schema information, it can efficiently retrieve only the required fields of records, and also provides DataFrame APIs;
  • AWS RDS is a relatively inexpensive cloud-based relational database, a web service that simplifies setup, operation, and scaling, administered directly by Amazon.

Installing and running the Kafka server

Before using Kafka directly, you need to make sure that you have Java, because JVM is used for work:

sudo apt-get update 
sudo apt-get install default-jre
java -version

Let's create a new user to work with Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

Next, download the distribution kit from the official Apache Kafka website:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

Unpack the downloaded archive:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

The next step is optional. The fact is that the default settings do not allow you to fully use all the features of Apache Kafka. For example, delete a topic, category, group, on which messages can be published. To change this, let's edit the configuration file:

vim ~/kafka/config/server.properties

Add the following to the end of the file:

delete.topic.enable = true

Before starting the Kafka server, you need to start the ZooKeeper server, we will use the helper script that comes with the Kafka distribution:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

After ZooKeeper has successfully started, we start the Kafka server in a separate terminal:

bin/kafka-server-start.sh config/server.properties

Let's create a new topic called Transaction:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

Let's make sure that a topic with the required number of partitions and replication has been created:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka and Data Streaming with Spark Streaming

Let's miss the moments of testing the producer and consumer for the newly created topic. More details on how you can test sending and receiving messages are written in the official documentation - Send some messages. Well, we are moving on to writing a producer in Python using the KafkaProducer API.

Producer writing

The producer will generate random data - 100 messages every second. By random data we mean a dictionary consisting of three fields:

  • Branch — name of the point of sale of the credit institution;
  • Currency — transaction currency;
  • Amount - transaction amount. The amount will be positive if it is a purchase of currency by the Bank, and negative if it is a sale.

The code for the producer looks like this:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

Next, using the send method, we send a message to the server, to the topic we need, in JSON format:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

When running the script, we get the following messages in the terminal:

Apache Kafka and Data Streaming with Spark Streaming

This means that everything works as we wanted - the producer generates and sends messages to the topic we need.
The next step is to install Spark and process this message flow.

Installing Apache Spark

Apache Spark is a versatile and high performance cluster computing platform.

Spark outperforms popular implementations of the MapReduce model in terms of performance, while providing support for a wider range of computation types, including interactive queries and streaming. Speed ​​plays an important role when processing large amounts of data, since it is speed that allows you to work interactively without spending minutes or hours waiting. One of Spark's biggest strengths for delivering this speed is its ability to perform in-memory calculations.

This framework is written in Scala, so you need to install it first:

sudo apt-get install scala

Download the Spark distribution from the official website:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

Unpack the archive:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

Add the path to Spark to the bash file:

vim ~/.bashrc

Add the following lines through the editor:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

Run the command below after making changes to bashrc:

source ~/.bashrc

Deploying AWS PostgreSQL

It remains to expand the database, where we will fill in the processed information from the streams. To do this, we will use the AWS RDS service.

Go to the AWS console -> AWS RDS -> Databases -> Create database:
Apache Kafka and Data Streaming with Spark Streaming

Select PostgreSQL and click the Next button:
Apache Kafka and Data Streaming with Spark Streaming

Because this example is analyzed solely for educational purposes, we will use a free server “at the minimum” (Free Tier):
Apache Kafka and Data Streaming with Spark Streaming

Next, check the box Free Tier, and after that we will be automatically offered an instance of the t2.micro class - although weak, but free and quite suitable for our task:
Apache Kafka and Data Streaming with Spark Streaming

Very important things follow: the name of the DB instance, the name of the master user and his password. Let's call the instance: myHabrTest, master user: there will, password: habr12345 and click on the Next button:
Apache Kafka and Data Streaming with Spark Streaming

The next page contains the parameters responsible for the accessibility of our database server from the outside (Public accessibility) and the availability of ports:

Apache Kafka and Data Streaming with Spark Streaming

Let's create a new setting for the VPC security group, which will allow external access to our database server through port 5432 (PostgreSQL).
Let's go in a separate browser window to the AWS console in the VPC Dashboard -> Security Groups -> Create security group section:
Apache Kafka and Data Streaming with Spark Streaming

We set the name for the Security group - PostgreSQL, a description, specify which VPC this group should be associated with and click the Create button:
Apache Kafka and Data Streaming with Spark Streaming

We fill in for the newly created group Inbound rules for port 5432, as shown in the picture below. You can not specify the port manually, but select PostgreSQL from the Type drop-down list.

Strictly speaking, the value ::/0 means the availability of incoming traffic for the server from all over the world, which is not quite true canonically, but to analyze the example, let's use this approach:
Apache Kafka and Data Streaming with Spark Streaming

We return to the browser page, where we have “Configure advanced settings” open and select VPC security groups -> Choose existing VPC security groups -> PostgreSQL in the section:
Apache Kafka and Data Streaming with Spark Streaming

Next, in the section Database options -> Database name -> set the name - habrDB.

We can leave the rest of the parameters, with the exception of disabling backup (backup retention period - 0 days), monitoring and Performance Insights, by default. Click on the button Create database:
Apache Kafka and Data Streaming with Spark Streaming

Stream Handler

The final stage will be the development of a Spark job, which will process new data from Kafka every two seconds and enter the result into the database.

As noted above, checkpoints are the main mechanism in SparkStreaming that must be configured to provide fault tolerance. We will use checkpoints and, in the event of a procedure failure, the Spark Streaming module will only need to return to the last checkpoint and resume calculations from it to recover the lost data.

A checkpoint can be enabled by setting a directory on a fault-tolerant, reliable file system (e.g. HDFS, S3, etc.) where the checkpoint information will be stored. This is done with, for example:

streamingContext.checkpoint(checkpointDirectory)

In our example, we will use the following approach, namely, if the checkpointDirectory exists, then the context will be recreated from the checkpoint data. If the directory does not exist (i.e. it is being executed for the first time), then the functionToCreateContext function is called to create a new context and set up DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

We create a DirectStream object to connect to the "transaction" topic using the createDirectStream method of the KafkaUtils library:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

Parsing incoming data in JSON format:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

Using Spark SQL, we do a simple grouping and output the result to the console:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

Getting the query body and running it through Spark SQL:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

And then we save the received aggregated data into a table in AWS RDS. To save the results of the aggregation to a database table, we will use the write method of the DataFrame object:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

A few words about setting up a connection to AWS RDS. We created the user and password for it at the “Deploying AWS PostgreSQL” step. As the url of the database server, you should use the Endpoint, which is displayed in the Connectivity & security section:

Apache Kafka and Data Streaming with Spark Streaming

In order to correctly connect Spark and Kafka, you should run the job through smark-submit using the artifact spark-streaming-kafka-0-8_2.11. Additionally, we will also use an artifact for interacting with the PostgreSQL database, we will pass them through --packages.

For flexibility of the script, we will also take out the name of the message server and the topic from which we want to receive data as input parameters.

So, it's time to run and test the system:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

Everything worked out! As you can see in the picture below, while the application is running, new aggregation results are displayed every 2 seconds, because we set the bundling interval to 2 seconds when we created the StreamingContext object:

Apache Kafka and Data Streaming with Spark Streaming

Next, we make a simple query to the database to check for records in the table transaction_flow:

Apache Kafka and Data Streaming with Spark Streaming

Conclusion

In this article, an example of streaming information processing using Spark Streaming in conjunction with Apache Kafka and PostgreSQL was considered. With the growth in volumes of data from various sources, it is difficult to overestimate the practical value of Spark Streaming for creating real-time and streaming applications.

You can find the full source code in my repository at GitHub.

I am happy to discuss this article, I look forward to your comments, and also, I hope for constructive criticism from all concerned readers.

I wish you success!

Ps. It was originally planned to use a local PostgreSQL database, but given my love for AWS, I decided to move the database to the cloud. In the next article on this topic, I will show you how to implement the entire system described above in AWS using AWS Kinesis and AWS EMR. Follow the news!

Source: habr.com

Add a comment