你好,哈布爾! 今天我們將建立一個系統,使用 Spark Streaming 處理 Apache Kafka 訊息流,並將處理結果寫入 AWS RDS 雲端資料庫。
讓我們想像一下,某個信貸機構給我們安排了在其所有分支機構「即時」處理傳入交易的任務。 這樣做的目的是為了及時計算國庫的未平倉貨幣部位、交易的限額或財務結果等。
如何在不使用魔法和魔法咒語的情況下實現這種情況——閱讀下切! 去!
介紹
當然,即時處理大量數據為現代系統提供了充足的使用機會。 最受歡迎的組合之一是 Apache Kafka 和 Spark Streaming 的串聯,其中 Kafka 創建傳入訊息資料包的流,而 Spark Streaming 以給定的時間間隔處理這些資料包。
為了提高應用程式的容錯能力,我們將使用檢查點。 透過這種機制,當Spark Streaming引擎需要恢復遺失的資料時,只需要返回到上一個檢查點並從那裡恢復計算。
開發系統的架構
使用的組件:
阿帕奇卡夫卡 是一個分散式發布訂閱訊息系統。 適合離線和線上訊息消費。 為了防止資料遺失,Kafka 訊息儲存在磁碟上並在叢集內進行複製。 Kafka系統建構在ZooKeeper同步服務之上;Apache Spark 串流 - 用於處理串流資料的 Spark 元件。 Spark Streaming 模組使用微批量架構構建,其中資料流被解釋為小資料包的連續序列。 Spark Streaming 從不同來源取得資料並將其組合成小包。 定期建立新包。 在每個時間間隔開始時,都會建立一個新資料包,並且在該時間間隔期間接收到的任何資料都包含在該資料包中。 在間隔結束時,資料包增長停止。 間隔的大小由一個稱為批次間隔的參數決定;Apache Spark SQL - 將關係處理與 Spark 函數式程式設計結合。 結構化資料是指具有模式的數據,即所有記錄的一組欄位。 Spark SQL支援來自各種結構化資料來源的輸入,由於模式資訊的可用性,它可以有效率地僅檢索記錄的所需字段,並且還提供DataFrame API;AWS RDS 是一種相對便宜的基於雲端的關聯式資料庫、Web 服務,可簡化設定、操作和擴展,並由 Amazon 直接管理。
安裝並運行 Kafka 伺服器
在直接使用Kafka之前,你需要確保你有Java,因為... JVM用於工作:
sudo apt-get update
sudo apt-get install default-jre
java -version
讓我們建立一個新使用者來使用 Kafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
接下來,從 Apache Kafka 官方網站下載發行版:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
解壓縮下載的檔案:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
下一步是可選的。 事實上,預設設定不允許您充分使用 Apache Kafka 的所有功能。 例如,刪除可以發佈訊息的主題、類別、群組。 要更改此設置,讓我們編輯配置檔案:
vim ~/kafka/config/server.properties
將以下內容新增至文件末尾:
delete.topic.enable = true
在啟動Kafka伺服器之前,需要啟動ZooKeeper伺服器;我們將使用Kafka發行版附帶的輔助腳本:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper 成功啟動後,在單獨的終端機中啟動 Kafka 伺服器:
bin/kafka-server-start.sh config/server.properties
讓我們建立一個名為 Transaction 的新主題:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
讓我們確保已建立具有所需數量的分區和複製的主題:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
讓我們錯過為新創建的主題測試生產者和消費者的時刻。 有關如何測試發送和接收訊息的更多詳細信息,請參閱官方文件 -
製片寫作
生產者將產生隨機資料 - 每秒 100 則訊息。 隨機資料是指由三個欄位組成的字典:
- 支 — 信貸機構銷售點的名稱;
- 貨幣 ——交易貨幣;
- 金額 ——交易金額。 如果是銀行購買貨幣,則金額將為正數;如果是出售,則金額將為負數。
生產者的程式碼如下所示:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
接下來,使用 send 方法,我們以 JSON 格式向伺服器發送一條訊息到我們需要的主題:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
運行腳本時,我們在終端機中收到以下訊息:
這意味著一切都按照我們想要的方式進行 - 生產者產生訊息並將其發送到我們需要的主題。
下一步是安裝 Spark 並處理該訊息流。
安裝 Apache Spark
Apache Spark 是一個通用、高效能的叢集運算平台。
Spark 的效能優於 MapReduce 模型的流行實現,同時支援更廣泛的運算類型,包括互動式查詢和串流處理。 在處理大量數據時,速度起著重要作用,因為正是速度使您能夠互動式地工作,而無需花費幾分鐘或幾小時的等待。 Spark 之所以如此之快,最大的優勢之一是它能夠執行記憶體計算。
這個框架是用Scala寫的,所以你需要先安裝它:
sudo apt-get install scala
從官方網站下載 Spark 發行版:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
解壓縮存檔:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
將 Spark 的路徑加入 bash 檔案:
vim ~/.bashrc
透過編輯器新增以下行:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
更改 bashrc 後執行以下命令:
source ~/.bashrc
部署 AWS PostgreSQL
剩下的就是部署資料庫,我們將從流中上傳處理後的資訊。 為此,我們將使用 AWS RDS 服務。
前往 AWS 主控台 -> AWS RDS -> 資料庫 -> 建立資料庫:
選擇 PostgreSQL 並點擊下一步:
因為此範例僅用於教育目的;我們「至少」將使用免費伺服器(免費套餐):
接下來,我們在「免費層」區塊中打勾,之後我們將自動提供 t2.micro 類別的實例 - 雖然很弱,但它是免費的並且非常適合我們的任務:
接下來是非常重要的事情:資料庫實例的名稱、主用戶的名稱及其密碼。 讓我們將實例命名為:myHabrTest,主用戶: 哈布爾, 密碼: 哈布爾12345 然後點選“下一步”按鈕:
下一頁有一些參數負責從外部存取我們的資料庫伺服器(公共可訪問性)和連接埠可用性:
讓我們為 VPC 安全群組建立一個新設置,該設定將允許透過連接埠 5432 (PostgreSQL) 從外部存取我們的資料庫伺服器。
讓我們在單獨的瀏覽器視窗中前往 AWS 控制台的 VPC 儀表板 -> 安全性群組 -> 建立安全群組部分:
我們設定安全群組的名稱 - PostgreSQL、描述,指示該群組應與哪個 VPC 關聯,然後按一下建立按鈕:
為新建立的群組填寫連接埠5432的入站規則,如下圖所示。 您不能手動指定端口,但可以從類型下拉清單中選擇 PostgreSQL。
嚴格來說,值 ::/0 表示來自世界各地的伺服器傳入流量的可用性,這在規範上並不完全正確,但為了分析範例,讓我們允許自己使用這種方法:
我們會回到瀏覽器頁面,開啟「設定進階設定」並在 VPC 安全群組部分中選擇 -> 選擇現有 VPC 安全群組 -> PostgreSQL:
接下來,在資料庫選項->資料庫名稱->設定名稱- 哈伯資料庫.
我們可以保留其餘參數,但預設情況下停用備份(備份保留期 - 0 天)、監控和 Performance Insights 除外。 點擊按鈕 建立資料庫:
線程處理程序
最後階段將是開發 Spark 作業,該作業將每兩秒處理來自 Kafka 的新資料並將結果輸入資料庫。
如上所述,檢查點是 SparkStreaming 中的核心機制,必須對其進行配置以確保容錯。 我們將使用檢查點,如果過程失敗,Spark Streaming 模組只需返回到最後一個檢查點並從中恢復計算即可恢復丟失的資料。
可以透過在容錯、可靠的檔案系統(例如 HDFS、S3 等)上設定一個用於儲存檢查點資訊的目錄來啟用檢查點。 例如,這是使用以下方法完成的:
streamingContext.checkpoint(checkpointDirectory)
在我們的範例中,我們將使用以下方法,即,如果 checkpointDirectory 存在,則將從檢查點資料重新建立上下文。 如果目錄不存在(即第一次執行),則呼叫 functionToCreateContext 來建立新的上下文並配置 DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
我們使用 KafkaUtils 函式庫的 createDirectStream 方法建立一個 DirectStream 物件來連接到「事務」主題:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
解析 JSON 格式的傳入資料:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
使用 Spark SQL,我們進行簡單的分組並將結果顯示在控制台中:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
取得查詢文字並透過 Spark SQL 運行它:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
然後我們將產生的聚合資料儲存到 AWS RDS 中的表中。 要將聚合結果儲存到資料庫表中,我們將使用 DataFrame 物件的 write 方法:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
關於設定與 AWS RDS 的連接的幾句話。 我們在「部署 AWS PostgreSQL」步驟中為其建立了使用者和密碼。 您應該使用 Endpoint 作為資料庫伺服器 URL,該 URL 顯示在連線和安全性部分:
為了正確連接 Spark 和 Kafka,您應該使用工件透過 smark-submit 來執行作業 火花流-kafka-0-8_2.11。 此外,我們還將使用一個工件與 PostgreSQL 資料庫互動;我們將透過 --packages 傳輸它們。
為了腳本的靈活性,我們還將包括訊息伺服器的名稱和我們想要從中接收資料的主題作為輸入參數。
因此,是時候啟動並檢查系統的功能了:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
一切順利! 如下圖所示,當應用程式運行時,每 2 秒輸出一次新的聚合結果,因為我們在建立 StreamingContext 物件時將批次間隔設為 2 秒:
接下來,我們對資料庫進行一個簡單的查詢來檢查表中是否存在記錄 交易流程:
結論
本文介紹了使用 Spark Streaming 結合 Apache Kafka 和 PostgreSQL 進行資訊流處理的範例。 隨著各種來源資料的成長,很難高估 Spark Streaming 對於創建串流和即時應用程式的實用價值。
您可以在我的儲存庫中找到完整的原始程式碼:
很高興討論這篇文章,期待大家的評論,也希望所有有愛心的讀者提出建設性的批評。
祝你成功!
詩。 最初計劃使用本地 PostgreSQL 資料庫,但考慮到我對 AWS 的熱愛,我決定將資料庫遷移到雲端。 在關於這個主題的下一篇文章中,我將展示如何使用 AWS Kinesis 和 AWS EMR 在 AWS 中實作上述整個系統。 關注新聞!
來源: www.habr.com