Apache Kafka 和 Spark Streaming 的流数据处理

你好,哈布尔! 今天我们将构建一个系统,使用 Spark Streaming 处理 Apache Kafka 消息流,并将处理结果写入 AWS RDS 云数据库。

让我们想象一下,某个信贷机构给我们安排了在其所有分支机构“即时”处理传入交易的任务。 这样做的目的是为了及时计算财政部的未平仓货币头寸、交易的限额或财务结果等。

如何在不使用魔法和魔法咒语的情况下实现这种情况——阅读下切! 去!

Apache Kafka 和 Spark Streaming 的流数据处理
(图片来源)

介绍

当然,实时处理大量数据为现代系统提供了充足的使用机会。 最流行的组合之一是 Apache Kafka 和 Spark Streaming 的串联,其中 Kafka 创建传入消息数据包的流,而 Spark Streaming 以给定的时间间隔处理这些数据包。

为了提高应用程序的容错能力,我们将使用检查点。 通过这种机制,当Spark Streaming引擎需要恢复丢失的数据时,只需要返回到上一个检查点并从那里恢复计算即可。

开发系统的架构

Apache Kafka 和 Spark Streaming 的流数据处理

使用的组件:

  • 阿帕奇卡夫卡 是一个分布式发布订阅消息系统。 适合离线和在线消息消费。 为了防止数据丢失,Kafka 消息存储在磁盘上并在集群内进行复制。 Kafka系统构建在ZooKeeper同步服务之上;
  • Apache Spark 流 - 用于处理流数据的 Spark 组件。 Spark Streaming 模块使用微批量架构构建,其中数据流被解释为小数据包的连续序列。 Spark Streaming 从不同来源获取数据并将其组合成小包。 定期创建新包。 在每个时间间隔开始时,都会创建一个新数据包,并且在该时间间隔期间接收到的任何数据都包含在该数据包中。 在间隔结束时,数据包增长停止。 间隔的大小由一个称为批次间隔的参数决定;
  • 阿帕奇火花 SQL - 将关系处理与 Spark 函数式编程相结合。 结构化数据是指具有模式的数据,即所有记录的一组字段。 Spark SQL支持来自各种结构化数据源的输入,并且由于模式信息的可用性,它可以高效地仅检索记录的所需字段,并且还提供DataFrame API;
  • 云数据库 是一种相对便宜的基于云的关系数据库、Web 服务,可简化设置、操作和扩展,并由 Amazon 直接管理。

安装并运行 Kafka 服务器

在直接使用Kafka之前,你需要确保你有Java,因为...... JVM用于工作:

sudo apt-get update 
sudo apt-get install default-jre
java -version

让我们创建一个新用户来使用 Kafka:

sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo

接下来,从 Apache Kafka 官方网站下载发行版:

wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"

解压下载的存档:

tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka

下一步是可选的。 事实上,默认设置不允许您充分使用 Apache Kafka 的所有功能。 例如,删除可以发布消息的主题、类别、组。 要更改此设置,让我们编辑配置文件:

vim ~/kafka/config/server.properties

将以下内容添加到文件末尾:

delete.topic.enable = true

在启动Kafka服务器之前,需要启动ZooKeeper服务器;我们将使用Kafka发行版附带的辅助脚本:

Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

ZooKeeper 成功启动后,在单独的终端中启动 Kafka 服务器:

bin/kafka-server-start.sh config/server.properties

让我们创建一个名为 Transaction 的新主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction

让我们确保已创建具有所需数量的分区和复制的主题:

bin/kafka-topics.sh --describe --zookeeper localhost:2181

Apache Kafka 和 Spark Streaming 的流数据处理

让我们错过为新创建的主题测试生产者和消费者的时刻。 有关如何测试发送和接收消息的更多详细信息,请参阅官方文档 - 发送一些消息。 好吧,我们继续使用 KafkaProducer API 用 Python 编写生产者。

制片人写作

生产者将生成随机数据 - 每秒 100 条消息。 随机数据是指由三个字段组成的字典:

  • 分支机构 — 信贷机构销售点的名称;
  • 货币 ——交易货币;
  • 金额 ——交易金额。 如果是银行购买货币,则金额将为正数;如果是出售,则金额将为负数。

生产者的代码如下所示:

from numpy.random import choice, randint

def get_random_value():
    new_dict = {}

    branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
    currency_list = ['RUB', 'USD', 'EUR', 'GBP']

    new_dict['branch'] = choice(branch_list)
    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(-100, 100)

    return new_dict

接下来,使用 send 方法,我们以 JSON 格式向服务器发送一条消息到我们需要的主题:

from kafka import KafkaProducer    

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:dumps(x).encode('utf-8'),
                             compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()

try:
    future = producer.send(topic = my_topic, value = data)
    record_metadata = future.get(timeout=10)
    
    print('--> The message has been sent to a topic: 
            {}, partition: {}, offset: {}' 
            .format(record_metadata.topic,
                record_metadata.partition,
                record_metadata.offset ))   
                             
except Exception as e:
    print('--> It seems an Error occurred: {}'.format(e))

finally:
    producer.flush()

运行脚本时,我们在终端中收到以下消息:

Apache Kafka 和 Spark Streaming 的流数据处理

这意味着一切都按照我们想要的方式进行 - 生产者生成消息并将其发送到我们需要的主题。
下一步是安装 Spark 并处理该消息流。

安装 Apache Spark

Apache Spark 是一个通用、高性能的集群计算平台。

Spark 的性能优于 MapReduce 模型的流行实现,同时支持更广泛的计算类型,包括交互式查询和流处理。 在处理大量数据时,速度起着重要作用,因为正是速度使您能够交互式地工作,而无需花费几分钟或几小时的等待。 Spark 之所以如此之快,最大的优势之一是它能够执行内存计算。

这个框架是用Scala编写的,所以你需要先安装它:

sudo apt-get install scala

从官方网站下载 Spark 发行版:

wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"

解压存档:

sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark

将 Spark 的路径添加到 bash 文件中:

vim ~/.bashrc

通过编辑器添加以下行:

SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH

更改 bashrc 后运行以下命令:

source ~/.bashrc

部署 AWS PostgreSQL

剩下的就是部署数据库,我们将从流中上传处理后的信息。 为此,我们将使用 AWS RDS 服务。

转到 AWS 控制台 -> AWS RDS -> 数据库 -> 创建数据库:
Apache Kafka 和 Spark Streaming 的流数据处理

选择 PostgreSQL 并单击下一步:
Apache Kafka 和 Spark Streaming 的流数据处理

因为此示例仅用于教育目的;我们“至少”将使用免费服务器(免费套餐):
Apache Kafka 和 Spark Streaming 的流数据处理

接下来,我们在“免费层”块中打勾,之后我们将自动提供 t2.micro 类的实例 - 虽然很弱,但它是免费的并且非常适合我们的任务:
Apache Kafka 和 Spark Streaming 的流数据处理

接下来是非常重要的事情:数据库实例的名称、主用户的名称及其密码。 让我们将实例命名为:myHabrTest,主用户: 哈布尔, 密码: 哈布尔12345 然后单击“下一步”按钮:
Apache Kafka 和 Spark Streaming 的流数据处理

下一页上有一些参数负责从外部访问我们的数据库服务器(公共可访问性)和端口可用性:

Apache Kafka 和 Spark Streaming 的流数据处理

让我们为 VPC 安全组创建一个新设置,该设置将允许通过端口 5432 (PostgreSQL) 从外部访问我们的数据库服务器。
让我们在单独的浏览器窗口中转到 AWS 控制台的 VPC 仪表板 -> 安全组 -> 创建安全组部分:
Apache Kafka 和 Spark Streaming 的流数据处理

我们设置安全组的名称 - PostgreSQL、描述,指示该组应与哪个 VPC 关联,然后单击创建按钮:
Apache Kafka 和 Spark Streaming 的流数据处理

为新创建的组填写端口5432的入站规则,如下图所示。 您不能手动指定端口,但可以从类型下拉列表中选择 PostgreSQL。

严格来说,值 ::/0 表示来自世界各地的服务器传入流量的可用性,这在规范上并不完全正确,但为了分析示例,让我们允许自己使用这种方法:
Apache Kafka 和 Spark Streaming 的流数据处理

我们返回到浏览器页面,打开“配置高级设置”并在 VPC 安全组部分中选择 -> 选择现有 VPC 安全组 -> PostgreSQL:
Apache Kafka 和 Spark Streaming 的流数据处理

接下来,在数据库选项->数据库名称->设置名称- 哈伯数据库.

我们可以保留其余参数,但默认情况下禁用备份(备份保留期 - 0 天)、监控和 Performance Insights 除外。 点击按钮 创建数据库:
Apache Kafka 和 Spark Streaming 的流数据处理

线程处理程序

最后阶段将开发 Spark 作业,该作业将每两秒处理来自 Kafka 的新数据并将结果输入数据库。

如上所述,检查点是 SparkStreaming 中的核心机制,必须对其进行配置以确保容错。 我们将使用检查点,如果过程失败,Spark Streaming 模块只需返回到最后一个检查点并从中恢复计算即可恢复丢失的数据。

可以通过在容错、可靠的文件系统(例如 HDFS、S3 等)上设置一个用于存储检查点信息的目录来启用检查点。 例如,这是使用以下方法完成的:

streamingContext.checkpoint(checkpointDirectory)

在我们的示例中,我们将使用以下方法,即,如果 checkpointDirectory 存在,则将从检查点数据重新创建上下文。 如果该目录不存在(即第一次执行),则调用 functionToCreateContext 来创建新的上下文并配置 DStreams:

from pyspark.streaming import StreamingContext

context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

我们使用 KafkaUtils 库的 createDirectStream 方法创建一个 DirectStream 对象来连接到“事务”主题:

from pyspark.streaming.kafka import KafkaUtils
    
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)

broker_list = 'localhost:9092'
topic = 'transaction'

directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                [topic],
                                {"metadata.broker.list": broker_list})

解析 JSON 格式的传入数据:

rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")

使用 Spark SQL,我们进行简单的分组并将结果显示在控制台中:

select 
    from_unixtime(unix_timestamp()) as curr_time,
    t.branch                        as branch_name,
    t.currency                      as currency_code,
    sum(amount)                     as batch_value
from treasury_stream t
group by
    t.branch,
    t.currency

获取查询文本并通过 Spark SQL 运行它:

sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)

然后我们将生成的聚合数据保存到 AWS RDS 中的表中。 要将聚合结果保存到数据库表中,我们将使用 DataFrame 对象的 write 方法:

testResultDataFrame.write 
    .format("jdbc") 
    .mode("append") 
    .option("driver", 'org.postgresql.Driver') 
    .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") 
    .option("dbtable", "transaction_flow") 
    .option("user", "habr") 
    .option("password", "habr12345") 
    .save()

关于设置与 AWS RDS 的连接的几句话。 我们在“部署 AWS PostgreSQL”步骤中为其创建了用户和密码。 您应该使用 Endpoint 作为数据库服务器 URL,该 URL 显示在连接和安全部分中:

Apache Kafka 和 Spark Streaming 的流数据处理

为了正确连接 Spark 和 Kafka,您应该使用工件通过 smark-submit 运行作业 火花流-kafka-0-8_2.11。 此外,我们还将使用一个工件与 PostgreSQL 数据库交互;我们将通过 --packages 传输它们。

为了脚本的灵活性,我们还将包括消息服务器的名称和我们想要从中接收数据的主题作为输入参数。

因此,是时候启动并检查系统的功能了:

spark-submit 
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207 
spark_job.py localhost:9092 transaction

一切顺利! 如下图所示,当应用程序运行时,每 2 秒输出一次新的聚合结果,因为我们在创建 StreamingContext 对象时将批处理间隔设置为 2 秒:

Apache Kafka 和 Spark Streaming 的流数据处理

接下来,我们对数据库进行一个简单的查询来检查表中是否存在记录 交易流程:

Apache Kafka 和 Spark Streaming 的流数据处理

结论

本文介绍了使用 Spark Streaming 结合 Apache Kafka 和 PostgreSQL 进行信息流处理的示例。 随着各种来源数据的增长,很难高估 Spark Streaming 对于创建流式和实时应用程序的实用价值。

您可以在我的存储库中找到完整的源代码: GitHub上.

很高兴讨论这篇文章,期待大家的评论,也希望所有有爱心的读者提出建设性的批评。

我祝你成功!

PS。 最初计划使用本地 PostgreSQL 数据库,但考虑到我对 AWS 的热爱,我决定将数据库迁移到云端。 在关于该主题的下一篇文章中,我将展示如何使用 AWS Kinesis 和 AWS EMR 在 AWS 中实施上述整个系统。 关注新闻!

来源: habr.com

添加评论