α αα α!
ααΎα’αααα αΌαα α·αααααααα ααα αααα? αααα»ααααααΆααααΆ ααα»αααααααα»αα’αα‘α»ααααα―αααααα½αα―α αααα»αααααααΆαααααα»αα’αααααααααα αααΆαα½αααΉαααΆααα·ααΆααα·ααααααα’αααΈαααα»αααααααα ααααΈααααΆαααααααΈαα½αααΊ Aviasales α
αααααααααΎαααΉααα·ααΆαααΆαααΆααααα Amazon Kinesis αααααΎαααααααααααααααΈαααΆαα½αααΆααα·ααΆααααααααΆααΆααααααα ααα‘αΎαααΌαααααΆααα·αααααα Amazon DynamoDB NoSQL ααΆαααααααααα»ααα·ααααααααααΆαα αα·ααααα αααΆαααΌαααααΉα SMS αααααΆαααααα»αααααααα½αα±ααα αΆααα’αΆααααααα
ααααααΆααααα’α·αααΊαααα·ααα αααααααΆαααΆαα! αα !
ααα ααααΈααααΆα
ααΆα§ααΆα ααα ααΎαααααΌαααΆαα
αΌαααααΎ
αααααααααααΆααααα’ααααααααααΊααΎααααΈαααααααΆααααααΉαααΌαα α’αααΈααΆαααααΎααααΆααααΆαααααΆαααααααΆααα αααα»α AWS ααΎααα·α αΆαααΆααΆαα·αααααααααααΆααααααΌααααα·αααα API αααααΆαααααΎααΊαα·αααΆαααααααααΆαααΉααααΉα αα·αααααΌαααΆααααααΌαααΈααααΆαααααααΆαααααααΆ αααααΎαα‘αΎααααααα’ααααΎααΆαααααααααααα’αααααααΎααααΆααααααα ααααα Aviasales.ru αα·α Jetradar.com αααααα 48 ααααα α»ααααααα
Kinesis-agent αααααΆαααα‘αΎααα ααΎαααΆαααΈαααα·α ααΆαααα½αααΆαααα API ααΉαααα αα·ααααααΌααα·αααααααααααααααααααααα·αα ααααααΈααααα ααααΆαααΆαααα Kinesis Data Analytics α ααααααΎαααααααααΈααααααΉαααααΌαααΆαααααααααααααΆαααα α αΆαα ααΆααααα»ααα·αααααααα αααααΆαααΆαααααααΆααα αααα»α DynamoDB ααΉαα’αα»ααααΆαα±ααααΆαααΆααα·ααΆααααα»αααααΆαααααααΈααααα ααΆααααα§ααααα BI ααΌα ααΆ AWS Quick Sight ααΆααΎαα
ααΎαααΉααα·α αΆαααΆαααααΎαααΈααααααΆααααΆαααααααΆαα αααααΆαα ααΆααααααααααΆααααΌαα
- ααααα ααααΆα - ααΆαααα AWS Management Console;
- α αααααΆαα ααΆααααααααααΈααΌα Terraform ααΊαααααΆαααααΆαααΈαααααααααααααα·αααα·αα
ααααΆααααααααααααααααααα’αα·αααααα
αααΆαααΆαα»αααααΆαααααΎα
Aviasales API - αα·αααααααααααΆααααααΌααααα·αααα API αααααΉαααααΌαααΆαααααΎαααααΆααααΆαααΆαααΆαααααααααΆααααΆααα’αααα§ααΆα αααα’αααααα·α EC2 - αααΆαααΈααα·αααα·αααααααΆαα αααα»αααααααααααααΈααα·αααααααααα αΌαααΉαααααΌαααΆααααααΎααααααΆααααΆα Kinesis ααΊααΆαααααα·ααΈ Java αααααΆαααα‘αΎααααα»αααΌαααααΆααα ααΎαααΆαααΈα ααααααααααΌααα·ααΈααΆααααα½ααααα»αααΆααααααΌα αα·ααααααΌααα·αααααααα ααΆαα Kinesis (Kinesis Data Stream α¬ Kinesis Firehose)α ααααΆααααΆααααα½ααα·αα·αααααΆαααααααααΆααααΌααααα»αα―αααΆααα αααα»ααααααααΆααααααΆαα α αΎααααααΌααα·ααααααααααΈαα Kinesis αAPI Caller Script - ααααααΈα Python αααααααΎααααΎαα ααΆαα API α αΎαααΆααααΆαααααΎααααα αααα»αααα―αααΆααααααααΌαααΆααααα½ααα·αα·ααααααααααΆααααΆα Kinesis α
ααααααΈααα·αααααα Kinesis - ααααΆααααααααααΈααα·ααααααααΆααααααααΆααΆαααααααααΆαα½αααΉααααααααΆαααααΎααΆαααααααΆαααααΌααΆααααΆααα·ααΆα Kinesis ααΊααΆααααΆααααααααΆααααΆαααΈααα ααααα½αααααα½ααααααΆααα·ααΆαααααΆαααααΆααα·αααααααααα»ααααααααΆααΆαααααααα Amazon Kinesis Data Analytics ααααααα ααΆααααααααααααΆααααααα·ααΈ αα·αααααΎααΆαααααααΆααααααααααααααααα· ααΎααααΈαααααααααααα·ααΆααααα·ααααααα αΌααAWS Lambda - ααααΆαααααααα’αα»ααααΆαα±ααα’αααααααΎαααΆαααΌαααααα·αα αΆαααΆα αααααα»ααα»α α¬ααα‘αΎααααΆαααΈαααα ααΆααααα»αααααΌαααααΆααα’ααααααΌαααΆαααααΎααΆαααααααΆααααααααααααααααα·αααααΆααααΆαα α ααΈαα½ααααααα»αα αα»α Amazon DynamoDB - ααΌαααααΆααα·ααααααααααΌααααα αα·αα―αααΆαααααααααααΆαααΊααααΆααα·α ααΆα 10 ααΈααΈαα·ααΆααΈ αα αααααααΎαααΆααα ααΆαααααααΆαααΆαα½αα αα αααααααΎ DynamoDB α’ααααα·αα αΆαααΆα αααααα αααα α¬ααααααααααααΆαααΈαααααΆαα½αα‘αΎαα DynamoDB ααααΎααΆαααααααΆαααΆααΆααααααααααααααααα· ααΎααααΈαααααααΌαααα·ααΆαααααΆααααααΆα αα·ααααααΆααααΎαααΆααααααα ααααΆαααΆααααααααααααααααααααααΌαααΆαααΆαααΆα;αααα»αα αα»α Amazon SNS - ααααΆαααααααααααααααααααΆααααααααααααΆααααΆαααααΎααΆααααααααΎααααΌα’αααααααα»αααααααΆα-ααΆα (Pub/Sub) αααα’αααα’αΆα αααααααΆααααααααΆαααΌα ααααααααα ααα αΆα αα·ααααααα·ααΈαααααααΆααααΆαααΈαααα SNS α’αΆα βααααΌαβααΆαβααααΎβααΎααααΈβααααΎβααααααΆαβαα βα’αααβααααΎβα α»αβαααααβααΆαβαααβααΆαβααΌαβααααΉαβααΆαβααΌαααααβα ααα ααΆα SMS αα·αβα’ααΈαααα
ααΆαααααα»ααααααΆαααα
ααΎααααΈααααΆααααΆαααα αΌααα·αααααα αααα»αααΆααααααα
α
α·αααααααΎααααααΆααααα»αααααααα ααααααααααΌααααα·αααα Aviasales APIα IN
ααΌα αααα ααΌαα α»αααααα αα·αααα½αααΆααααααΆαααααΆααααααααΎαα
ααααΎα§ααΆα ααααα½αααΆαααΌα ααΆααααααα
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
αα·ααΈααΆαααααααΆαααΎααααΆαααα½ααα·ααααααααΈ API ααααααααΆαααααααΆαααααΆαααααα»αααααΎααΉαααααΎαααΆα ααα»αααααααα»αα αΌαα α·ααααααααΌααααααΆαααααΆααα αΌαααααΎααΆααααααααααΆ ααΌα ααααααΎαααΉαααααΎαα·ααΈααΆαααααααααα αααα»αααααααΈα api_caller.py α
α§ααΆα αααα ααααΎαα
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
α§ααΆα αααααΆαααααΎααα API ααΆαααΎαααα αΆαααΈαααα»αααααΈααΆααααααΊαααΊααα
ααΌα... α’αΌ!
αααααΆααααα»αααααΈααΆα αααΆα α αΎαααΌαααα₯α‘αΌααααααααΆααααααΆαα»αα·ααα½αααα»ααααα αααααααΎααααα»αααααΈααΆααααααΊαααΊααα
ααΆα αααΆαα
ααΆααααααααΆα’αααααΆαααααΈ AWS αα½α α αΎαα αααα»αα ααααΆααααΆαααΆαααα α·ααααα»αααΆααααΆαα·αααααααΆααα ααααααΆααα·ααααααΆ Kinesis αα·αααΆαααααΎααΆαααΌαααααΉαααΆααααααΆα SMS αα·αααααΌαααΆαααΆαααααα αΌααααα»αααααΆα
ααααα·αα₯ααα·ααααα (ααΆαααααΎααααΆαααααα₯ααα·ααααα) . ααα»αααααααααΈααΆαααααααα ααααα·ααα»αααΈαααΈαα»ααααΆα ααΆαα·αααΆα’αΆα αα αα½α αααα»αααΆαααΆαααααααααααααααααΆαααααΎα‘αΎα α αΎααααααΆαα½αααΆα α αΎαααΆααΆααα·αααΆαααα»αααααα αα»αααααΆαααΆααα’αααααααΆααααΈαα½ααααααααααΌαααΆαα
ααΆααααΆαααα’ αα»αααΆα DynamoDb αα·α lambda ααΉααα·ααα·ααααααααααΆααααΎααα ααααα·αααΎααΎααααααααΆαααααααααα₯ααα·ααααααααα αΆαααααααααΎαα α§ααΆα ααααααααΆαα DynamoDBα α’ααααααα»ααα·αααααα 25 GB, 25 WCU/RCU αα·α 100 ααΆααααα½αα αα·αααΆαα α αα»αααΆα lambda αα½αααΆααααααα»ααα½αααα
ααΆαααΆαααααααΆαααααααααααααα
ααΆαααα‘αΎα Kinesis Data Stream
ααααα ααααΆαααα Kinesis Data Stream α αΎααααααΎαααααααΈαααααΈααΈα αα½α shard αααααΆααααΈαα½ααα
α’αααΈβαα
βααΆ shard ?
Shard ααΊααΆα’αααααΆαααααααα·ααααααααΌαααααΆαααααααααΈα Amazon Kinesis α ααααααα½ααααααααΌαααΆαααααααα·αααααααααα
αΌααααα»αααααΏα 1 MB/s αα·αααΆαααααααα·αααααααααααααααα»αααααΏα 2 MB/s α ααααααα½αααΆαααααα αΌαααα 1000 PUT ααΆαα»αααα»ααα½ααα·ααΆααΈα αα
ααααααααΎαααααααΈααα·αααααα α’αααααααΌααααααΆααα
ααα½αααααααααααααΌαααΆαα α§ααΆα ααα α’αααα’αΆα
αααααΎαααααααΈααα·αααααααααααΆαααΈααααααα ααααααΈααα·αααααααααααΉααααααααΌαααΆαααααααα·αααααααααα
αΌααααα»αααααΏα 2 MB/s αα·αααΆαααααααα·αααααααααααααααα»αααααΏα 4 MB/s αααααΆαααααα αΌαααα 2000 PUT records αααα»ααα½ααα·ααΆααΈα
ααΆαβααααααΈαβααΆααβααβα αααΎαβαα βαααα»αβααΆαβααααΆαβααααβα’ααα ααα αΌαβααααβααΆβααΆααβααβααα ααΆαααααΆααααααααΆαααααααααα αΌαααααΌαααΆαααααΎααΆαααααααΆα - ααααααααααααααα ααα»ααααβααΎβα’αααβααΆαβααααβααΆααβααβα αααΎα αααααβααΆααβααβαααααα shard ααΈαα½ααααΆαααααα 1,5 ααααααα»ααα½ααααα αα·ααααααα 1.4 ααααααααΆααααΆααααΆααααααΌα PUT ααΆααααΆαααααΏαα
ααααααααΎαααααααΈαααααΈααΆαα½αααααα αααα»αααααααα αα, 1 shard ααΉααααααααααΆαααααααΆααααΆαα:
α₯α‘αΌαβαααβααΎαβαααααΎαβααααβαααα‘αΆαβαα½αβαααβααΆαα½αβααΉαβααααα ααααααΈααα·ααα:
ααΆααααα αα’αααααα·α
ααΎααααΈαα·ααΆααα·α αα ααΆααα½α ααΆαααααααααΆαααααα»αααΆαααααΎα§ααΆα ααα EC2 ααααααΆααΆα’αααααα·ααα·ααααααα ααΆαα·αα αΆαααΆα αααΆαααΆαααΈααα·αααα·ααααααΆααααααααααααααΆααααααα αααααα t2.micro ααΉαααααΎαααΆαααΆαααα’α
α αααΆαααααΆααα α§ααΆα ααα α’ααααα½αααααααΎααΌαααΆα - Amazon Linux AMI 2018.03.0 ααΆααΆαααΆαααααααα·α ααΆααα»ααααααΆααααΆαααΎαααααΎαααΆαααααΆααααΆα Kinesis αααΆααα ααα
α αΌααα ααΆααααααΆ EC2 αααααΎααααΆαααΈααα·αααα·αααααΈ ααααΎαααΎα AMI αααα ααααΆαααΆαα½ααααααα t2.micro αααααααΌαααΆαααΆαααααα αΌααααα»α Free Tierα
ααΎααααΈα±αααααΆαααΈααα·αααα·ααααααΆααααααΎαααααΈα’αΆα
ααααΎα’ααααααααααΆαα½αααααΆαααα Kinesis ααΆααααΌαααααααααα·αααα·ααΎααααΈααααΎααΌα
ααααααΆαα αααααααΆαααααα’αααα»αααΎααααΈααααΎααΆααΊααααααα½ααΆααΈ IAM α ααΌα
αααααα
ααΎααα αΆαααΈ 3: ααααααα
ααΆααααααααααααααΆααααα’α·αααα’αααααα α’ααααα½αααααααΎαααΎα αααααΎααα½ααΆααΈ IAM ααααΈα:
ααΆααααααΎααα½ααΆααΈ IAM αααααΆαα EC2
αα
αααα»ααααα’α½α
αααααΎα ααΌαααααΎαααΎαααΆααΎααααα»ααααααΎααα½ααΆααΈααααΈαααααΆαα EC2 α αΎαα
αΌααα
ααΆαααααααααΆαα’αα»ααααΆαα
αααααααΎα§ααΆα αααααααΆαααααα»ααααααΆα ααΎααα·αα
αΆαααΆα
αα
αΌααα
αααα»αααΆααααα»αααααΆαααΆααα’ααααααΆαααααααα
ααΆααααααααααΆαααα‘αΆαααα·αααα·ααααΆα ααΌα
ααααααΎαααΉαααααΎαααΎααααααΆααααααααΆααααααααΆαα»αααα Amazonα AmazonKinesisFullAccess αα·α CloudWatchFullAccess α
α αΌαααααααααααααααΆαα’αααααααααααΆαααα½ααΆααΈααα α§ααΆα αααα EC2-KinesisStreams-FullAccessα αααααααα½αααααΌα αααα»αααΌαααΆαααΆααααααα
αααααΆααααΈαααααΎααα½ααΆααΈααααΈααα αα»αααααα
ααααΆααααΆαα
αααΆαααΈααα·αααα·ααααααΆααααααΎαα
ααΎααα·αααααΆααααααΌαα’αααΈαααααααααα
ααΎα’ααααααααα α αΎααααααα
αααα’α½α
αααααΆααα
ααΆααααααααααΆαααααΉαα’αΆα ααααΌαααΆααα»αααΆααααΆαααΎα ααααΌα ααΆααααΆα (αααααΈααΆααΆααΆααΆαα’αα»ααααααα’αααα»αααΆαααααΎααααΆα αααΆαα αα ααΆααααααααααααααα±ααα§ααΆα ααα αα·ααααα αΆαααΈααα·ααααΆα)α
α₯α‘αΌααααααΎααααα·ααα ααΎααα αΆαααΈ 6α ααααααα ααΆααααααααααααΆαααααα»ααα»ααααα·ααΆα αααα’αααααααΌααααααΎαααααΈαα½α α¬αααααΆαααααα»ααα»ααααα·ααΆααααααΆαααααΆααααααα’ααα αααα’αα»ααααΆαα±ααα’αααααααΆααααΆαααα ssh (α ααα 22) αα ααΆαα instance α ααααΎαααΎαααααα -> IP αααααααα»ααα ααΈααα α αΎαα’αααα’αΆα α αΆααααααΎαα§ααΆα αααα
αααΆαααΆααΆααααΌααα
ααααΆαααΆαααααααα»αααααΎαααΆα α’αααα’αΆα
ααααΆααΆαααααΆαααα
ααΆααΆαααα ssh α
ααΎααααΈα’αΆα ααααΎααΆαααΆαα½α Kinesis Agent αααααΆααααΈααααΆαααα αααΆαααΈαααααααααα α’αααααααΌααααααα αΌαααΆααααααααΆααΆαααααααα αααα»αααααΆααΈαα
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
ααααααααΎαααα―αααΆαααΎααααΈαααααΆαα»αααΆαααααΎααα APIα
sudo mkdir /var/log/airline_tickets
αα»ααααα αΆααααααΎαααααΆααααΆα α’αααααααΌαααααααα ααΆααααααααααααααΆα
sudo vim /etc/aws-kinesis/agent.json
ααααΉαααΆαααα―αααΆα agent.json αα½αααααΎααα ααΌα αααα
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
ααΌα αααα’αΆα ααΎαααΎαααΈα―αααΆαααααααα ααΆαααααααα ααααΆααααΆαααΉααααα½ααα·αα·αααα―αααΆααααααΆαααααααααααα .log αα αααα»ααα /var/log/airline_tickets/ ααααα½αααΆ αα·αααααααα½αααΆαα ααααααΈα airline_ticketsα
ααΎαα αΆααααααΎαααααΆααααα‘αΎααα·α α αΎαααααΌαααααΆααααΆααΆααααΎαααΆαα αΎαα
sudo service aws-kinesis-agent restart
α₯α‘αΌαααα αααααΆαααααααααΈα Python αααααΉαααααΎαα»ααα·ααααααααΈ APIα
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
ααααααΈα api_caller.py ααααΎαα»ααα·ααααααααΈ Aviasales α αΎααααααΆαα»αααΆαααααΎααααααααΆαααα½ααα αααα»ααααααααααΆααααΆα Kinesis αααααα ααΆαα’αα»ααααααααααΈααααααΊααΆααααααααααααααΆα ααΆαααααΆαα TicketsApi ααΆα’αα»ααααΆαα±ααα’αααααΆα API ααααα·αααααΆαααααα ααΎααααααΌαααααααΆααΆαα½ααααααΆαααααΆαα αα·αααααΎαα»ααααΆαααΆαααααααα ααααΆαααααα
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
ααΎααααΈααΆαααααααΆααααααααααΉαααααΌα αα·ααα»αααΆαααααααααΆααααΆα ααΌαααΆαααααααααΎαααΆαααααααΈα api_caller.pyα
sudo ./api_caller.py TOKEN
α αΎαααΎαααΎαααααααααααΆαααΆααα
αααα»ααααααα ααα»ααααΆααααΆα αα·ααα
ααΎααααΆαααααα½ααα·αα·ααααα
αααα»αααααααΈααα·αααααα airline_ticketsα
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
ααΌα
αααα’αααα’αΆα
ααΎαα’αααΈααααααααΆαααααΎαααΆαα αΎαααααΆααααΆα Kinesis αααααΌααα·ααααααααααααααααα
ααααααΈαα α₯α‘αΌαααα α
αΌαααΎαααααααα
ααΆααααααααα’αααααααΎααααΆααα
ααα‘αΎα Kinesis Data Analytics
α αΌααααααα ααααααααααΆαααααααααααααΆααααΌα - αααααΎααααααα·ααΈααααΈαα½ααα αααα»α Kinesis Data Analytics αααααΆααααααααΆ kinesis_analytics_airlines_appα
Kinesis Data Analytics α’αα»ααααΆαα±ααα’αααααααΎααΆααα·ααΆααα·ααααααααΆααααααααΆααΆαααααααααΈ Kinesis Stream αααααααΎααΆααΆ SQL α ααΆααΊααΆααααΆααααααααΎααΆαααααααΆααααααααααααααααα·αααΆααααααα (αα·αααΌα
Kinesis Stream) αααα
- α’αα»ααααΆαα±ααα’ααααααααΎαααααααΈαααααΈ (ααααααΈααααααα) αααααα’ααααΎααΆαααααΎαα»ααα·αααααααααααα
- αααααααΌαααΆαααααααΈαααΆαα½αααΉαααα α»ααααααΆαααΎαα‘αΎαααααααααααααααα·ααΈαααα»αααααΎαααΆα (Error Stream);
- α’αΆα ααααααααααααΆααααα·αααααααααα αΌααααααααααααααααα· (ααΆα’αΆα ααααΌαααΆααααααα‘αΎααα·ααααααααααα·αααΎα αΆαααΆα α) α
ααααα·ααααααΆααααΆααααααααα - 0.11 αα»ααααΆααααα»ααα½αααααααααΆαααΆα ααΌα ααααα’ααααα½αααααααΎααΆααααααα»ααααααααα α αΎααα»αααΆα αααα αααα’αααααΆααααα ααα
αααααααΆαααααααα·ααΈαα ααααααα·ααααααα
ααααΎαααΎαααΆαααααΆααααααΎαααΉαααααΆαααα
(airline_tickets)α
αααααΆαααα α’αααααααΌαααααΆαααα½ααΆααΈ IAM ααααΈααΎααααΈα±αααααααα·ααΈα’αΆα
α’αΆαααΈααααααΈα αα·αααααααα
ααααααΈαα ααΎααααΈααααΎααΌα
αααα ααΆαααααααααΆααα αΎαααααα·αααααΌαααααΆααααααΌαα’αααΈααΆααα’αααα
αααα»ααααα»αααΆαα’αα»ααααΆαα
αΌαααααΎα
α₯α‘αΌαβαααβααΎαβααααΎβαα»αβααΆαβααβααΎαβαααααααΆαααβαα·ααααααβαα
βαααα»αβααΆαβααααΆα; ααΎααααΈβααααΎβααΆβα
α»α
βααΎβαααΌαα»α "Discover schema"α ααΆαααααα αα½ααΆααΈ IAM ααΉαααααΌαααΆαα’αΆααααα (αα½αααααΈααΉαααααΌαααΆααααααΎα) α αΎαααΆαααααΎααααααααΆαααααΉαααααΌαααΆαα
αΆααααααΎαααΈαα·αααααααααααΆαααααααα½α
α αΎααα
αααα»αααααααΈαα
α₯α‘αΌαα’αααααααΌαα
αΌααα
ααΆαααααααα·ααΈαα·αααα SQL α αα
αααα’αααα
α»α
ααΎαααΌαα»αααα αααα’α½α
ααΉαααα
α‘αΎαααααα½αα’αααα±ααααΎαααααΎαααΆααααααα·ααΈ - ααααΎαααΎαα’αααΈαααα’αααα
ααααΎαααααΎαααΆαα
αααα
αΌααααα½αααΆααααααΆαααααααα
αααα»ααααα’α½α
αααααα·ααΈαα·αααα SQL α αΎαα
α»α
αααααΆαα»α αα·αααααΎαααΆα SQLα
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
αα αααα»αααΌαααααΆααα·ααααααααααΆαααααα α’αααααααΎααΆαααΆαα½αααΆααΆααααααααΎααα ααααΈαααααααΆααα INSERT ααΎααααΈαααααααααααααααΆ αα·αααα ααααΈαααααααΆααα SELECT αα ααΆαααα·αααααααααα½αα αα αααα»α Amazon Kinesis Data Analytics α’αααααααΎααΆαααΆαα½αααααααΈα (ααααααΈα) αα·αααααα (PUMPs) - ααααΎαααα αΌαααΆαααααααααΆααααααααα αΌααα·ααααααααΈααααααΈααα½ααα αααα»ααααααα·ααΈαα½ααα αααα»αααααααΈαααααααααα
αααα½α SQL αααααΆααααα αΆαααΆαααΎααααααααααα»ααα Aeroflot αααα»αααααααααααααααΆαααΆααααΌαααα·αα αααααααααΆααΆααα’ααααααααααααααααααααΆαααααααΉαααααΌαααΆαααΆαααα αααα»αααααααΈα DESTINATION_SQL_STREAM α
αα
αααα»ααααα»ααα·ααα
ααααΎαααΎαααααααΈα special_stream α αΎααααα»ααααααΈαααααΆααα
α»ααααααααααααΈααααα»ααααααα·ααΈ DESTINATION_SQL_STREAMα
ααααααααα§ααΆαααααΆααα’αααα½αααααΆαααααααααααααααΉαααΌαααΆαααΆααααααα
ααΆααααααΎα αα·αααΆααααααΆααα SNS
α αΌααα ααΆααααααΆααΌαααααΉαααΆαααα α αΎααααααΎααααααΆαααααααΈαα ααΈααααααααΆααααααααααα»αα αα»αα’αΆααΆαα αααα
ααΆααααααΆαααααα α αΎααααααΆαααααααΌαααααααααααΆαααΌαααααΉα SMS ααΉαααααΌαααΆαααααΎα
αααααΎαααΆααΆααα αααα»α DynamoDB
ααΎααααΈαααααΆαα»ααα·αααααααα ααΈααΆαααααααΈα airline_tickets αααααα½ααα α αΌαααΎααααααΎαααΆααΆααα αααα»α DynamoDB αααααΆααααααααΌα ααααΆα ααΎαααΉαααααΎ record_id ααΆααα ααααα
ααΆααααααΎαα§ααααααααααΌααα»αααΆα lambda
ααααααααΎααα»αααΆα lambda α α ααΆ Collector αααααΆααα·α αα ααααααΆααΉαααΊααΎααααΈαααααααΎαααΆαααααΆα airline_tickets α αΎαααααα·αααΎαααααααααΆααααΈααααΌαααΆαααααΎααα ααΈααα ααΌααααα αΌααααααααααΆααΆααααααα αααα»αααΆααΆα DynamoDBα ααΆααααααα ααααααααΈααΎαα·αααα·ααααΆαααΎα lambda αααααααΌαααααΆααα·αααα·α’αΆααα ααΆααααααααΈααα·αααααα Kinesis αα·αααααααα·αααα·α αΌαααααΎ DynamoDB α
ααΆααααααΎααα½ααΆααΈ IAM αααααΆαααα»αααΆα lambda α’ααααααααΌα
ααααΌα ααΎααααααΎααα½ααΆααΈ IAM ααααΈαααααΆαα lambda αααααΆααααααααΆ Lambda-TicketsProcessingRoleα
αααααΆααα§ααΆα αααααΆααααα αααααΆααα AmazonKinesisReadOnlyAccess αα·α AmazonDynamoDBFullAccess αααααΆααααααααΆαα»αααΊααααααααΆαα ααΌα
αααα αΆααααα»αααΌαααΆαααΆααααααα
lambda ααααα½αααααααΌαααΆαααΎαααααΎαααΆαααααααααΈ Kinesis αα
ααααααααΆαα»ααααΈα
αΌααα
αααα»α airline_stream ααΌα
ααααααΎαααααΌααααααααααααΉαααααΈα
α’αααΈβαααβαα
βαααβααΊβαααα
αΌαβααΌαβα αΎαβαααααΆβαα»α lambdaα
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
ααΆααααααΎααααααα·ααΈααΌαααααΉααα»αααΆα lambda
αα»αααΆα lambda ααΈααΈα αααααΉααααα½ααα·αα·αααααΆαααααΆαααΈααΈα (special_stream) αα·αααααΎααΆαααΌαααααΉααα SNS ααααΌαααΆααααααΎαααΆαααααααααααααααΆα ααΌα αααα lambda αααααααΌαααααΆααα·αααα·α αΌαα’αΆαααΈ Kinesis αα·αααααΎααΆααα ααΆαααααααΆααα SNS αααααΆααααααα±αα ααααααααΆααααααΉαααααΌαααΆαααααΎαααααααΆαααα SNS αα ααΆααα’αα·αα·ααααΆααα’αααααααααΆαααααα (α’ααΈααα ααΆα SMS ααα)α
ααΆααααααΎααα½ααΆααΈ IAM
ααααΌα ααΎααααααΎααα½ααΆααΈ IAM Lambda-KinesisAlarm αααααΆαα lambda ααα α αΎααααααΆααααααααααα½ααΆααΈααααα
alarm_notifier lambda ααααααα»αααααΌαααΆααααααΎαα
lambda ααααα½αααααααΎαααΆααα
ααΎ trigger αααααΆαααααααααααΆααααΈααΎααααΈα
αΌααα
αααα»α special_stream ααΌα
ααααα’αααααααΌαααααααα
ααΆαααααααα trigger ααΆαααααααΌα
αααααΎαααΆαααααΎαααααΆαα Collector lambda αααα
ααΎααααΈααααΎα±ααααΆααΆααααααΆααααα½ααααα»αααΆαααααααα ααΆαααααααα lambda ααα ααΌαααααΆαα’αααααα·ααααΆαααααΈ - TOPIC_ARN αααααΆαααααααααααΎαααΆαα ANR (Amazon Recourse Names) αααααααΆααααααα»αα αα»αα’αΆααΆαα αααα
α αΎααααα
αΌαααΌα lambda ααΆαα·ααααα»αααααΆαααΆαααααααα
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
ααΆα αΆααααΌα ααΆααΆαααααΆαααααααααααΆαααααααα ααΆαααααααααααααααααααααααααΌαααΆααααα ααα α’αααΈβαααβαα βαααβααΊβααααΌαβααΆαααααβαα·αβααααΎβα±ααβααααΆααβααΆβααΎαβααΆαβαααααβαα ααΆααααααααβα’αααΈβαααααβαααΆαβααααΉαβααααΌαα
ααΆαααααααΆαααΈααΌα Terraform
ααΆααααα αα αΆαααΆα α
α’αααα’αΆα
ααΆαααααΆαα
ααα
αΆα
ααααα αΆααααααΎα
ααΌαααααααααααααααααΊ
ααΆαα’αα»ααααααα’ααΊααααΌαααααΎαααΆαααΆααααααααΆαααααΆα αα»ααααααΆαααααααΆαα αααααΆαα ααΆααααααααααΆααααΌα ααΎααααΈααΎαα’αααΈααα Terraform αααα»ααααααΎααααααΆαααα½αααΎααα ααΎαααα
terraform.exe plan
α’αααααΉαααααΌαααΆαααααα»αα±αααααα αΌααααααΌαααααα ααΎααααΈααααΎααΆαααΌαααααΉααα α ααΆαα·αα αΆαααΆα ααααα»αααΆααααα αΌαααΆαα ααααΆααααΆααααααα
αααααΆααα·ααΆααααααΆαααααα·ααααα·ααΆααααααααααα·ααΈ ααΎαα’αΆα
α
αΆααααααΎααααααΎαααααΆαα
terraform.exe apply
αααααΆααααΈααααΎααΆααααααααΆααα α’αααααΉαααααΌαααΆααα½ααααααααα±αααααα αΌααααααΌαααααα ααΌαα α»α "ααΆα" αα αααααααααα½αα’αααΈααΆαα’αα»αααααααααααΆαααΆαααααααααααΌαααΆααααα αΆαα ααΆααΉαα’αα»ααααΆαα±ααα’ααααααα αα αααααΆαα ααΆααααααααααΆααααΌα α’αα»ααααααΆαααααααα ααΆααααααααα αΆαααΆα αααΆααα’αααα EC2 ααΆαααααααΆααα»αααΆα lambda ααΆααΎαα
αααααΆααααΈααααΆαααΆααα’ααααααΌαααΆααααααΎααααααααααααΆααααααΌα Terraform α’αααααααΌαα αΌααα αααα»αααααααΆααααα’α·ααααααααα·ααΈ Kinesis Analytics (ααΆα’αα»αα αααα»ααα·αααΆαααααΎααα·ααΈααααΎααΆαααααααΆααααΈααΌααα)α
ααΎαααααΎαααΆααααααα·ααΈα
αααααΆααααΈααα α’αααααααΌαααααααααααααααααααΈααααα»ααααααα·ααΈαααΆαα
αααΆαα αααααααΎαααΎαααΈαααααΈαααααΆααα
α»αα
α₯α‘αΌααααα’αααΈααααααααΆαααΊααααααααα½αααΆααααα
ααΎααααΈαα
α
ααΆαααααΎααααααααααα·ααΈ
ααααα·ααα·αααΈαααααααα’αααααΆαααααααΆαααααααααααααα α¬ααΆααααααΌα Terraform ααΆααΉαααααΎαααΆαααΌα ααααΆα
ααΎαα αΌαααΆαααα SSH αα ααΆαααααΆαααΈααα·αααα·α EC2 ααα Kinesis Agent ααααΌαααΆαααα‘αΎα α αΎαααααΎαααΆαααααααΈα api_caller.py
sudo ./api_caller.py TOKEN
α’αααΈαααα’αααααααΌαααααΎααΊαααα αΆαααΆα SMS αα ααΆαααααααααα’αααα
ααΆα SMS - ααΆααααααααΌααααααααααα’ααααααα»ααααααααα·α 1 ααΆααΈα
ααΆαα
αααααΎααααΈααΎαααΆααΎαααααααααΆααααΌαααΆααααααΆαα»ααααα»αααΌαααααΆααα·αααααα DynamoDB αααααΆααααΆααα·ααΆααααα’α·ααααααααααα¬α’ααα ααΆααΆα airline_tickets ααΆααα·αααααααααα ααααΆααααααα
ααα ααααΈααααα·ααααΆα
αα αααα»αααααΎαααΆαααααΆαααΆααααααΆαααααΎ ααααααααααααΎαααΆααα·ααααααα’αα‘αΆαααααΌαααΆααααααΎαα‘αΎααααααα’ααααΎ Amazon Kinesis α αααααΎααααααΆααααΆαααααΎααααΆααααααΆααααΆα Kinesis αααααααΆααααΆαα½α Kinesis Data Stream αα·αααΆααα·ααΆααααααααΆααΆααααααα Kinesis Analytics αααααααΎααΆααααααααΆ SQL ααααΌα ααΆα’αααααααααααα Amazon Kinesis ααΆαα½αααααΆαααα AWS ααααααααααααΌαααΆααααα·α αΆαααΆα
ααΎαααΆαααΆαααααααΆαααααααααααΆαααΎααΆαααΈααα·ααΈα ααααα αααααααΆα αα·ααα½ααα ααααΈααΌα Terraform α
ααΌααααααααααααααΆααα’ααααΆα
αααα»αααΈαααΆααααα»αααΆααα·ααΆααααΆα’ααααα αααα»ααααααΉααααα αΆααααααααααα’αααα αααα»ααααααΉαααΎααΆααα·αααααααα»ααααααααΆαααΆα
αααα»αααΌαααΌαααα±ααα’αααααα½αααΆααααααα!
ααααα: www.habr.com