嘿哈布尔!
你喜欢开飞机吗? 我喜欢它,但在自我隔离期间,我也爱上了分析来自一个知名资源 - Aviasales 的机票数据。
今天我们将分析 Amazon Kinesis 的工作,构建具有实时分析功能的流系统,安装 Amazon DynamoDB NoSQL 数据库作为主要数据存储,并为感兴趣的工单设置短信通知。
所有的细节都在切割之下! 去!
介绍
例如,我们需要访问
本文的主要目的是让大家对AWS中信息流的使用有一个大致的了解;我们考虑到所使用的API返回的数据并不是严格最新的,并且是从缓存中传输的,这是根据过去 48 小时内 Aviasales.ru 和 Jetradar.com 网站的用户搜索而形成。
Kinesis-agent 安装在生产机器上,通过 API 接收,将通过 Kinesis Data Analytics 自动解析数据并将数据传输到所需的流。 该流的原始版本将直接写入存储。 DynamoDB 中部署的原始数据存储将允许通过 BI 工具(例如 AWS Quick Sight)进行更深入的票证分析。
我们将考虑部署整个基础设施的两种选择:
- 手动 - 通过 AWS 管理控制台;
- Terraform 代码的基础设施适用于懒惰的自动化人员;
开发系统的架构
使用的组件:
航空销售API ——该API返回的数据将用于后续所有工作;EC2 生产者实例 — 云中的常规虚拟机,将在其中生成输入数据流:Kinesis 数据流 — 具有广泛扩展能力的实时数据流服务;运动分析 是一种无服务器服务,可简化实时流数据分析。 Amazon Kinesis Data Analytics 配置应用程序资源并自动扩展以处理任意数量的传入数据;AWS Lambda — 一项允许您运行代码而无需备份或设置服务器的服务。 所有计算能力都会针对每次调用自动缩放;Amazon DynamoDB - 键值对和文档的数据库,在任何规模运行时延迟均小于 10 毫秒。 使用 DynamoDB 时,您无需预置、修补或管理任何服务器。 DynamoDB 自动扩展表以调整可用资源量并保持高性能。 无需系统管理;亚马逊SNS - 使用发布者-订阅者 (Pub/Sub) 模型发送消息的完全托管服务,您可以使用该模型隔离微服务、分布式系统和无服务器应用程序。 SNS 可用于通过移动推送通知、短信和电子邮件向最终用户发送信息。
初步培训
为了模拟数据流,我决定使用 Aviasales API 返回的机票信息。 在
那么,让我们注册并获取我们的令牌。
请求示例如下:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
上述通过在请求中指定令牌来从 API 接收数据的方法是可行的,但我更喜欢通过标头传递访问令牌,因此我们将在 api_caller.py 脚本中使用此方法。
回答示例:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
上面的示例 API 响应显示了从圣彼得堡到 Phuk 的机票...哦,多棒的梦想...
由于我来自喀山,而普吉岛现在“只是一个梦想”,让我们寻找从圣彼得堡到喀山的机票。
它假设您已经拥有一个 AWS 账户。 我想立即特别注意以下事实:Kinesis 和通过 SMS 发送通知不包含在年度报告中
免费套餐(免费使用) 。 但即便如此,只要花几美元,就很有可能构建提议的系统并使用它。 当然,不要忘记删除不再需要的所有资源。
幸运的是,如果我们达到每月的免费限制,DynamoDb 和 lambda 函数将对我们免费。 例如,对于 DynamoDB:25 GB 存储、25 WCU/RCU 和 100 亿个查询。 每月有一百万次 lambda 函数调用。
手动系统部署
设置 Kinesis 数据流
让我们转到 Kinesis Data Streams 服务并创建两个新流,每个流一个分片。
什么是分片?
分片是 Amazon Kinesis 流的基本数据传输单元。 一个段以 1 MB/s 的速度提供输入数据传输,以 2 MB/s 的速度提供输出数据传输。 一个段每秒最多支持 1000 个 PUT 条目。 创建数据流时,需要指定所需的段数。 例如,您可以创建包含两个段的数据流。 该数据流将以 2 MB/s 的速度提供输入数据传输,以 4 MB/s 的速度提供输出数据传输,支持每秒多达 2000 个 PUT 记录。
流中的分片越多,其吞吐量就越大。 原则上,这就是通过添加分片来扩展流量的方式。 但拥有的碎片越多,价格就越高。 每个分片每小时花费 1,5 美分,每百万 PUT 有效负载单位额外花费 1.4 美分。
让我们创建一个名为的新流 机票,1 个碎片对他来说就足够了:
现在让我们创建另一个线程,名称为 特殊流:
制作人设置
要分析任务,使用常规 EC2 实例作为数据生产者就足够了。 它不一定是功能强大、昂贵的虚拟机;t2.micro 就可以了。
重要提示:例如,您应该使用镜像 - Amazon Linux AMI 2018.03.0,它用于快速启动 Kinesis Agent 的设置较少。
转到 EC2 服务,创建一个新虚拟机,选择类型为 t2.micro 的所需 AMI,该 AMI 包含在免费套餐中:
为了使新创建的虚拟机能够与 Kinesis 服务交互,必须为其授予执行此操作的权限。 执行此操作的最佳方法是分配 IAM 角色。 因此,在步骤 3:配置实例详细信息屏幕上,您应该选择 创建新的 IAM 角色:
为 EC2 创建 IAM 角色
在打开的窗口中,选择我们正在为 EC2 创建新角色,然后转到权限部分:
使用训练示例,我们不必深入了解资源权限的精细配置的所有复杂性,因此我们将选择 Amazon 预先配置的策略:AmazonKinesisFullAccess 和 CloudWatchFullAccess。
让我们为此角色指定一些有意义的名称,例如:EC2-KinesisStreams-FullAccess。 结果应该与下图所示相同:
创建这个新角色后,不要忘记将其附加到创建的虚拟机实例:
我们不更改此屏幕上的任何其他内容,然后转到下一个窗口。
硬盘驱动器设置以及标签可以保留为默认值(尽管使用标签是一种很好的做法,但至少为实例指定一个名称并指示环境)。
现在我们进入第 6 步:配置安全组选项卡,您需要在其中创建一个新安全组或指定现有安全组,这样您就可以通过 ssh(端口 22)连接到实例。 在那里选择“源”->“我的 IP”,您可以启动该实例。
一旦它切换到运行状态,您就可以尝试通过 ssh 连接到它。
为了能够使用 Kinesis Agent,成功连接到计算机后,您必须在终端中输入以下命令:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
让我们创建一个文件夹来保存 API 响应:
sudo mkdir /var/log/airline_tickets
在启动代理之前,您需要配置其配置:
sudo vim /etc/aws-kinesis/agent.json
agent.json 文件的内容应如下所示:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
从配置文件中可以看出,代理将监视/var/log/airline_tickets/目录中扩展名为.log的文件,解析它们并将它们传输到airline_tickets流。
我们重新启动服务并确保它已启动并运行:
sudo service aws-kinesis-agent restart
现在让我们下载将从 API 请求数据的 Python 脚本:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py 脚本从 Aviasales 请求数据,并将收到的响应保存在 Kinesis 代理扫描的目录中。 这个脚本的实现非常标准,有一个 TicketsApi 类,它允许您异步拉取 API。 我们将带有令牌的标头和请求参数传递给此类:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
为了测试代理的正确设置和功能,让我们测试运行 api_caller.py 脚本:
sudo ./api_caller.py TOKEN
我们在 Agent 日志和 Airlines_tickets 数据流的“Monitoring”选项卡中查看工作结果:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
如您所见,一切正常,Kinesis Agent 成功将数据发送到流。 现在让我们配置消费者。
设置 Kinesis Data Analytics
让我们继续讨论整个系统的核心组件 - 在 Kinesis Data Analytics 中创建一个名为 kinesis_analytics_airlines_app 的新应用程序:
Kinesis Data Analytics 允许您使用 SQL 语言从 Kinesis Streams 执行实时数据分析。 它是一项完全自动扩展的服务(与 Kinesis Streams 不同):
- 允许您根据对源数据的请求创建新的流(输出流);
- 提供应用程序运行时发生的错误的流(错误流);
- 可以自动确定输入数据方案(必要时可以手动重新定义)。
这不是一项便宜的服务 - 每小时 0.11 美元,因此您应该谨慎使用它,并在完成后将其删除。
让我们将应用程序连接到数据源:
选择我们要连接的流(airline_tickets):
接下来,您需要附加一个新的 IAM 角色,以便应用程序可以从流中读取数据并向流中写入数据。 为此,无需更改访问权限块中的任何内容就足够了:
现在让我们请求发现流中的数据模式;为此,请单击“发现模式”按钮。 因此,IAM 角色将被更新(将创建一个新角色),并且将从已到达流中的数据启动模式检测:
现在您需要转到 SQL 编辑器。 当您单击此按钮时,将出现一个窗口,要求您启动应用程序 - 选择您要启动的内容:
将以下简单查询插入 SQL 编辑器窗口,然后单击“保存并运行 SQL”:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
在关系数据库中,您可以使用 INSERT 语句来添加记录并使用 SELECT 语句来查询数据。 在 Amazon Kinesis Data Analytics 中,您可以使用流 (STREAM) 和泵 (PUMP),即连续插入请求,将应用程序中的一个流中的数据插入到另一流中。
上面提供的 SQL 查询搜索价格低于 XNUMX 卢布的 Aeroflot 机票。 所有满足这些条件的记录都将放置在 DESTINATION_SQL_STREAM 流中。
在 Destination 块中,选择special_stream 流,然后在应用程序内流名称 DESTINATION_SQL_STREAM 下拉列表中:
所有操作的结果应该类似于下图:
创建并订阅 SNS 主题
转到简单通知服务并在那里创建一个名为 Airlines 的新主题:
订阅此主题并指定将向其发送短信通知的手机号码:
在 DynamoDB 中创建表
为了存储 Airlines_tickets 流中的原始数据,我们在 DynamoDB 中创建一个具有相同名称的表。 我们将使用 record_id 作为主键:
创建 lambda 函数收集器
让我们创建一个名为 Collector 的 lambda 函数,其任务是轮询airline_tickets 流,如果在那里找到新记录,则将这些记录插入到 DynamoDB 表中。 显然,除了默认权限之外,此 lambda 还必须具有对 Kinesis 数据流的读取访问权限和对 DynamoDB 的写入访问权限。
为收集器 lambda 函数创建 IAM 角色
首先,我们为 lambda 创建一个名为 Lambda-TicketsProcessingRole 的新 IAM 角色:
对于测试示例,预先配置的 AmazonKinesisReadOnlyAccess 和 AmazonDynamoDBFullAccess 策略非常合适,如下图所示:
当新条目进入airline_stream 时,该 lambda 应该由 Kinesis 的触发器启动,因此我们需要添加一个新触发器:
剩下的就是插入代码并保存 lambda。
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
创建 lambda 函数通知程序
第二个 lambda 函数将监视第二个流 (special_stream) 并向 SNS 发送通知,以类似的方式创建。 因此,此 lambda 必须有权从 Kinesis 读取消息并将消息发送到给定的 SNS 主题,然后 SNS 服务将这些消息发送给该主题的所有订阅者(电子邮件、短信等)。
创建 IAM 角色
首先,我们为此 lambda 创建 IAM 角色 Lambda-KinesisAlarm,然后将此角色分配给正在创建的 Alarm_notifier lambda:
此 lambda 应该作用于新记录进入special_stream 的触发器,因此您需要以与 Collector lambda 相同的方式配置触发器。
为了更轻松地配置此 lambda,我们引入一个新的环境变量 - TOPIC_ARN,其中放置 Airlines 主题的 ANR(Amazon 资源名称):
然后插入 lambda 代码,一点也不复杂:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
看来手动系统配置到此就完成了。 剩下的就是测试并确保我们已正确配置所有内容。
从 Terraform 代码部署
所需准备
您可以下载发行版
如何运行
项目完整代码为
一个好的做法是在部署整个基础设施之前运行 plan 命令,以查看 Terraform 当前在云中为我们创建的内容:
terraform.exe plan
系统将提示您输入用于发送通知的电话号码。 此阶段无需进入。
分析完程序的运行计划,我们就可以开始创建资源了:
terraform.exe apply
发送此命令后,系统将再次要求您输入电话号码;当显示有关实际执行操作的问题时,请拨打“是”。 这将允许您设置整个基础设施、执行 EC2 的所有必要配置、部署 lambda 函数等。
通过 Terraform 代码成功创建所有资源后,您需要深入了解 Kinesis Analytics 应用程序的详细信息(不幸的是,我没有找到如何直接从代码中执行此操作)。
启动应用程序:
此后,您必须通过从下拉列表中选择来显式设置应用程序内流名称:
现在一切准备就绪。
测试应用程序
无论您如何部署系统,手动还是通过 Terraform 代码,它的工作原理都是一样的。
我们通过 SSH 登录安装了 Kinesis Agent 的 EC2 虚拟机并运行 api_caller.py 脚本
sudo ./api_caller.py TOKEN
您所要做的就是等待短信发送到您的号码:
短信 - 一条消息会在大约 1 分钟内到达您的手机:
仍有待观察这些记录是否保存在 DynamoDB 数据库中,以便进行后续更详细的分析。 Airlines_tickets 表大约包含以下数据:
结论
在工作过程中,基于Amazon Kinesis构建了在线数据处理系统。 考虑了将 Kinesis Agent 与 Kinesis Data Streams 和使用 SQL 命令的实时分析 Kinesis Analytics 结合使用的选项,以及 Amazon Kinesis 与其他 AWS 服务的交互。
我们通过两种方式部署上述系统:一种是相当长的手动方式,另一种是通过 Terraform 代码快速部署。
所有项目源代码均可用
我很高兴讨论这篇文章,期待您的评论。 我希望得到建设性的批评。
我祝你成功!
来源: habr.com