嘿哈布爾!
你喜歡開飛機嗎? 我喜歡它,但在自我隔離期間,我也愛上了分析來自一個知名資源 - Aviasales 的機票數據。
今天我們將分析 Amazon Kinesis 的工作,建立具有即時分析功能的串流系統,安裝 Amazon DynamoDB NoSQL 資料庫作為主要資料存儲,並為有興趣的工單設定簡訊通知。
所有的細節都在切割之下! 去!
介紹
例如,我們需要訪問
本文的主要目的是讓大家對AWS中資訊流的使用有一個大致的了解;我們考慮到所使用的API返回的資料並不是嚴格最新的,並且是從快取中傳輸的,這是根據過去48 小時內Aviasales.ru 和Jetradar.com 網站的使用者搜尋而形成。
Kinesis-agent 安裝在生產機器上,透過 API 接收,將透過 Kinesis Data Analytics 自動解析資料並將資料傳輸到所需的流。 該流的原始版本將直接寫入儲存。 DynamoDB 中部署的原始資料儲存將允許透過 BI 工具(例如 AWS Quick Sight)進行更深入的票證分析。
我們將考慮部署整個基礎架構的兩種選擇:
- 手動 - 透過 AWS 管理主控台;
- Terraform 程式碼的基礎架構適用於懶惰的自動化人員;
開發系統的架構
使用的組件:
航空銷售 API ——該API傳回的資料將用於後續所有工作;EC2 生產者實例 — 雲端中的常規虛擬機,將在其中產生輸入資料流:Kinesis 數據流 — 具有廣泛擴展能力的即時資料流服務;運動分析 是一種無伺服器服務,可簡化即時串流資料分析。 Amazon Kinesis Data Analytics 配置應用程式資源並自動擴展以處理任意數量的傳入資料;AWS Lambda — 一項可讓您執行程式碼而無需備份或設定伺服器的服務。 所有運算能力都會針對每次呼叫自動縮放;亞馬遜DynamoDB - 鍵值對和文件的資料庫,在任何規模運行時延遲均小於 10 毫秒。 使用 DynamoDB 時,您無需預置、修補或管理任何伺服器。 DynamoDB 自動擴充表以調整可用資源量並保持高效能。 無需系統管理;亞馬遜SNS - 使用發布者-訂閱者 (Pub/Sub) 模型發送訊息的完全託管服務,您可以使用該模型隔離微服務、分散式系統和無伺服器應用程式。 SNS 可用於透過行動推播通知、簡訊和電子郵件向最終用戶發送訊息。
初步培訓
為了模擬資料流,我決定使用 Aviasales API 傳回的機票資訊。 在
那麼,讓我們註冊並獲取我們的令牌。
請求示例如下:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
上述透過在請求中指定令牌來從 API 接收資料的方法是可行的,但我更喜歡透過標頭傳遞存取令牌,因此我們將在 api_caller.py 腳本中使用此方法。
回答範例:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
上面的範例 API 回應顯示了從聖彼得堡到 Phuk 的機票...哦,多棒的夢想...
由於我來自喀山,而普吉島現在“只是一個夢想”,讓我們尋找從聖彼得堡到喀山的機票。
它假設您已經擁有一個 AWS 帳戶。 我想立即特別注意以下事實:Kinesis 和透過 SMS 發送通知不包含在年度報告中
免費套餐(免費使用) 。 但即便如此,只要花幾美元,就很有可能建立提議的系統並使用它。 當然,不要忘記刪除不再需要的所有資源。
幸運的是,如果我們達到每月的免費限制,DynamoDb 和 lambda 函數將對我們免費。 例如,對於 DynamoDB:25 GB 儲存、25 WCU/RCU 和 100 億個查詢。 每月有一百萬次 lambda 函數呼叫。
手動系統部署
設定 Kinesis 資料流
讓我們轉到 Kinesis Data Streams 服務並建立兩個新流,每個流一個分片。
什麼是分片?
分片是 Amazon Kinesis 流的基本資料傳輸單元。 一個段以 1 MB/s 的速度提供輸入資料傳輸,以 2 MB/s 的速度提供輸出資料傳輸。 一個段落每秒最多支援 1000 個 PUT 條目。 建立資料流時,需要指定所需的段數。 例如,您可以建立包含兩個段的資料流。 此資料流將以 2 MB/s 的速度提供輸入資料傳輸,以 4 MB/s 的速度提供輸出資料傳輸,支援每秒多達 2000 個 PUT 記錄。
流中的分片越多,其吞吐量就越大。 原則上,這就是透過添加分片來擴展流量的方式。 但擁有的碎片越多,價格就越高。 每個分片每小時花費 1,5 美分,每百萬 PUT 有效負載單位額外花費 1.4 美分。
讓我們建立一個名為的新串流 機票,1 個碎片對他來說就足夠了:
現在讓我們建立另一個線程,名稱為 特殊串流:
製作人設定
要分析任務,使用常規 EC2 執行個體作為資料生產者就足夠了。 它不一定是功能強大、昂貴的虛擬機器;t2.micro 就可以了。
重要提示:例如,您應該使用鏡像 - Amazon Linux AMI 2018.03.0,它用於快速啟動 Kinesis Agent 的設定較少。
前往 EC2 服務,建立一個新虛擬機,選擇類型為 t2.micro 的所需 AMI,該 AMI 包含在免費套餐中:
為了使新建立的虛擬機器能夠與 Kinesis 服務交互,必須為其授予執行此操作的權限。 執行此操作的最佳方法是指派 IAM 角色。 因此,在步驟 3:設定實例詳細資料畫面上,您應該選擇 建立新的 IAM 角色:
為 EC2 建立 IAM 角色
在開啟的視窗中,選擇我們正在為 EC2 建立新角色,然後轉到權限部分:
使用訓練範例,我們不必深入了解資源權限的精細配置的所有複雜性,因此我們將選擇 Amazon 預先配置的策略:AmazonKinesisFullAccess 和 CloudWatchFullAccess。
讓我們為此角色指定一些有意義的名稱,例如:EC2-KinesisStreams-FullAccess。 結果應該與下圖所示相同:
創建這個新角色後,不要忘記將其附加到創建的虛擬機器實例:
我們不更改此畫面上的任何其他內容,然後轉到下一個視窗。
硬碟設定以及標籤可以保留為預設值(儘管使用標籤是一種很好的做法,但至少為實例指定一個名稱並指示環境)。
現在我們進入第 6 步:設定安全群組標籤,您需要在其中建立一個新安全性群組或指定現有安全性群組,這樣您就可以透過 ssh(連接埠 22)連線到執行個體。 在那裡選擇“來源”->“我的 IP”,您可以啟動該執行個體。
一旦它切換到運行狀態,您就可以嘗試透過 ssh 連線到它。
為了能夠使用 Kinesis Agent,成功連接到電腦後,您必須在終端機中輸入以下命令:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
讓我們建立一個資料夾來保存 API 回應:
sudo mkdir /var/log/airline_tickets
在啟動代理之前,您需要配置其配置:
sudo vim /etc/aws-kinesis/agent.json
agent.json 檔案的內容應如下所示:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
從設定檔中可以看出,代理程式將監視/var/log/airline_tickets/目錄中擴展名為.log的文件,解析它們並將它們傳輸到airline_tickets流。
我們重新啟動服務並確保它已啟動並運行:
sudo service aws-kinesis-agent restart
現在讓我們下載將從 API 請求資料的 Python 腳本:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py 腳本從 Aviasales 請求數據,並將收到的回應保存在 Kinesis 代理掃描的目錄中。 這個腳本的實作非常標準,有一個 TicketsApi 類,它允許您非同步拉取 API。 我們將帶有令牌的標頭和請求參數傳遞給此類:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
為了測試代理程式的正確設定和功能,讓我們測試執行 api_caller.py 腳本:
sudo ./api_caller.py TOKEN
我們在 Agent 日誌和 Airlines_tickets 資料流的「Monitoring」標籤中查看工作結果:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
如您所見,一切正常,Kinesis Agent 成功將資料傳送到串流。 現在讓我們來配置消費者。
設定 Kinesis Data Analytics
讓我們繼續討論整個系統的核心元件 - 在 Kinesis Data Analytics 中建立一個名為 kinesis_analytics_airlines_app 的新應用程式:
Kinesis Data Analytics 可讓您使用 SQL 語言從 Kinesis Streams 執行即時數據分析。 它是一項完全自動擴充的服務(與 Kinesis Streams 不同):
- 允許您根據對來源資料的請求建立新的流(輸出流);
- 提供應用程式運行時發生的錯誤的流(錯誤流);
- 可自動確定輸入資料方案(必要時可手動重新定義)。
這不是一項便宜的服務 - 每小時 0.11 美元,因此您應該謹慎使用它,並在完成後將其刪除。
讓我們將應用程式連接到資料來源:
選擇我們要連接的流(airline_tickets):
接下來,您需要附加一個新的 IAM 角色,以便應用程式可以從流中讀取資料並向流寫入資料。 為此,無需更改存取權限區塊中的任何內容就足夠了:
現在讓我們請求發現流中的資料模式;為此,請點擊「發現模式」按鈕。 因此,IAM 角色將被更新(將創建一個新角色),並且將從已到達流中的資料啟動模式檢測:
現在您需要轉到 SQL 編輯器。 當您單擊此按鈕時,將出現一個窗口,要求您啟動應用程式 - 選擇您要啟動的內容:
將以下簡單查詢插入 SQL 編輯器窗口,然後按一下「儲存並執行 SQL」:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
在關聯式資料庫中,您可以使用 INSERT 語句來新增記錄並使用 SELECT 語句來查詢資料。 在 Amazon Kinesis Data Analytics 中,您可以使用流 (STREAM) 和幫浦 (PUMP),即連續插入請求,將應用程式中的一個流中的資料插入到另一個流中。
上面提供的 SQL 查詢搜尋價格低於 XNUMX 盧布的 Aeroflot 機票。 所有符合這些條件的記錄都將放置在 DESTINATION_SQL_STREAM 流中。
在 Destination 區塊中,選擇special_stream 流,然後在應用程式內流名稱 DESTINATION_SQL_STREAM 下拉清單中:
所有操作的結果應該類似下圖:
建立並訂閱 SNS 主題
前往簡單通知服務並在那裡建立一個名為 Airlines 的新主題:
訂閱此主題並指定將向其發送簡訊通知的手機號碼:
在 DynamoDB 中建立表
為了儲存 Airlines_tickets 流中的原始數據,我們在 DynamoDB 中建立一個具有相同名稱的表。 我們將使用 record_id 作為主鍵:
建立 lambda 函數收集器
讓我們建立一個名為 Collector 的 lambda 函數,其任務是輪詢airline_tickets 流,如果在那裡找到新記錄,則將這些記錄插入 DynamoDB 表中。 顯然,除了預設權限之外,此 lambda 還必須具有對 Kinesis 資料流的讀取存取權和對 DynamoDB 的寫入存取權。
為收集器 lambda 函數建立 IAM 角色
首先,我們為 lambda 建立一個名為 Lambda-TicketsProcessingRole 的新 IAM 角色:
對於測試範例,預先設定的 AmazonKinesisReadOnlyAccess 和 AmazonDynamoDBFullAccess 策略非常合適,如下圖所示:
當新條目進入airline_stream 時,該 lambda 應該由 Kinesis 的觸發器啟動,因此我們需要新增一個觸發器:
剩下的就是插入程式碼並保存 lambda。
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
建立 lambda 函數通知程序
第二個 lambda 函數將監視第二個流 (special_stream) 並向 SNS 發送通知,以類似的方式建立。 因此,該 lambda 必須有權從 Kinesis 讀取資料並將訊息發送到給定的 SNS 主題,然後 SNS 服務將這些訊息發送給該主題的所有訂閱者(電子郵件、簡訊等)。
建立 IAM 角色
首先,我們為此 lambda 建立 IAM 角色 Lambda-KinesisAlarm,然後將此角色指派給正在建立的 Alarm_notifier lambda:
此 lambda 應該會作用於新記錄進入special_stream 的觸發器,因此您需要以與 Collector lambda 相同的方式配置觸發器。
為了更輕鬆地配置此 lambda,我們引入一個新的環境變數 - TOPIC_ARN,其中放置 Airlines 主題的 ANR(Amazon 資源名稱):
然後插入 lambda 程式碼,一點也不複雜:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
看來手動系統配置到此就完成了。 剩下的就是測試並確保我們已正確配置所有內容。
從 Terraform 程式碼部署
所需準備
您可以下載發行版
如何開始
項目完整代碼為
一個好的做法是在部署整個基礎架構之前執行 plan 命令,以查看 Terraform 目前在雲端中為我們創建的內容:
terraform.exe plan
系統將提示您輸入用於發送通知的電話號碼。 此階段無需進入。
分析完程式的運行計劃,我們就可以開始建立資源了:
terraform.exe apply
發送此命令後,系統將再次要求您輸入電話號碼;當顯示有關實際執行操作的問題時,請撥打「是」。 這將允許您設定整個基礎架構、執行 EC2 的所有必要配置、部署 lambda 函數等。
透過 Terraform 程式碼成功建立所有資源後,您需要深入了解 Kinesis Analytics 應用程式的詳細資訊(不幸的是,我沒有找到如何直接從程式碼中執行此操作)。
啟動應用程式:
此後,您必須透過從下拉清單中選擇來明確設定應用程式內流名稱:
現在一切準備就緒。
測試應用程式
無論您如何部署系統,手動還是透過 Terraform 程式碼,它的工作原理都是一樣的。
我們透過 SSH 登入安裝了 Kinesis Agent 的 EC2 虛擬機器並執行 api_caller.py 腳本
sudo ./api_caller.py TOKEN
您所要做的就是等待簡訊發送到您的號碼:
SMS - 一則訊息在大約 1 分鐘內到達手機:
仍有待觀察這些記錄是否保存在 DynamoDB 資料庫中,以便進行後續更詳細的分析。 Airlines_tickets 表大約包含以下資料:
結論
在工作過程中,基於Amazon Kinesis建立了線上資料處理系統。 考慮了將 Kinesis Agent 與 Kinesis Data Streams 和使用 SQL 命令的即時分析 Kinesis Analytics 結合使用的選項,以及 Amazon Kinesis 與其他 AWS 服務的交互作用。
我們透過兩種方式部署上述系統:一種是相當長的手動方式,另一種是透過 Terraform 程式碼快速部署。
所有項目原始碼均可用
我很高興討論這篇文章,期待您的評論。 我希望得到建設性的批評。
祝你成功!
來源: www.habr.com