Piv txwv li, peb xav tau kev nkag mus rau Aviasale API. Kev nkag mus rau nws yog muab pub dawb thiab tsis muaj kev txwv; koj tsuas yog yuav tsum tau sau npe rau hauv ntu "Developers" kom tau txais koj API token nkag mus rau cov ntaub ntawv.
Lub hom phiaj tseem ceeb ntawm tsab xov xwm no yog muab kev nkag siab dav dav ntawm kev siv cov ntaub ntawv streaming hauv AWS; peb muab rau hauv tus account tias cov ntaub ntawv xa rov qab los ntawm API siv tsis yog nruj me ntsis raws li niaj hnub thiab raug xa los ntawm cache, uas yog tsim los ntawm kev tshawb fawb los ntawm cov neeg siv ntawm Aviasales.ru thiab Jetradar.com qhov chaw rau 48 teev dhau los.
Kinesis-tus neeg sawv cev, ntsia rau ntawm lub tshuab tsim khoom, tau txais los ntawm API yuav cia li cais thiab xa cov ntaub ntawv mus rau qhov xav tau ntawm Kinesis Data Analytics. Cov ntawv nyoos ntawm cov kwj dej no yuav raug sau ncaj qha rau lub khw. Cov ntaub ntawv nyoos khaws cia rau hauv DynamoDB yuav tso cai rau kev txheeb xyuas daim pib sib sib zog nqus los ntawm BI cov cuab yeej, xws li AWS Quick Sight.
Peb yuav txiav txim siab ob qho kev xaiv rau kev xa tawm tag nrho cov infrastructure:
Manual - via AWS Management Console;
Infrastructure los ntawm Terraform code yog rau tub nkeeg automators;
Architecture ntawm tus tsim system
Cheebtsam siv:
Aviasale API - cov ntaub ntawv xa rov qab los ntawm API no yuav raug siv rau txhua txoj haujlwm tom ntej;
EC2 Tus tsim khoom piv txwv - lub tshuab virtual tsis tu ncua hauv huab uas cov ntaub ntawv tawm tswv yim yuav raug tsim tawm:
Kinesis Tus neeg saib xyuas yog ib daim ntawv thov Java ntsia hauv zos ntawm lub tshuab uas muab txoj hauv kev yooj yim los sau thiab xa cov ntaub ntawv mus rau Kinesis (Kinesis Data Strems lossis Kinesis Firehose). Tus neeg sawv cev saib xyuas ib txheej ntawm cov ntaub ntawv hauv cov npe teev tseg thiab xa cov ntaub ntawv tshiab rau Kinesis;
API Caller Script - Ib tsab ntawv Python uas thov rau API thiab muab cov lus teb rau hauv daim nplaub tshev uas tau saib xyuas los ntawm Kinesis Agent;
Kinesis Data Stream - cov ntaub ntawv streaming ntawm lub sijhawm tiag tiag nrog cov peev txheej dav dav;
Txhawm rau ua raws li cov ntaub ntawv ntws, kuv txiav txim siab siv daim pib dav hlau cov ntaub ntawv xa rov qab los ntawm Aviasales API. IN cov ntaub ntawv heev ib daim ntawv teev cov txheej txheem sib txawv, cia peb coj ib tug ntawm lawv - "Lub hli Nqe Daim Ntawv Qhia", uas rov qab cov nqi rau txhua hnub ntawm lub hli, pab pawg los ntawm tus lej hloov tsheb. Yog tias koj tsis qhia lub hli nrhiav hauv qhov kev thov, cov ntaub ntawv yuav raug xa rov qab rau lub hli tom qab qhov tam sim no.
Yog li, cia peb sau npe thiab tau txais peb cov token.
Cov txheej txheem saum toj no tau txais cov ntaub ntawv los ntawm API los ntawm kev qhia lub token hauv qhov kev thov yuav ua haujlwm, tab sis kuv xav kom dhau qhov nkag token los ntawm header, yog li peb yuav siv cov qauv no hauv api_caller.py tsab ntawv.
Qhov piv txwv API teb saum toj no qhia txog daim pib ntawm St. Petersburg mus rau Phuk... Auj, npau suav dab tsi...
Txij li thaum kuv los ntawm Kazan, thiab Phuket tam sim no yog "tsuas yog npau suav", cia peb saib daim pib ntawm St. Petersburg mus rau Kazan.
Nws xav tias koj twb muaj AWS account lawm. Kuv xav tam sim kos tshwj xeeb rau qhov tseeb tias Kinesis thiab xa cov ntawv ceeb toom ntawm SMS tsis suav nrog txhua xyoo. Dawb Tier (dawb siv). Tab sis txawm hais tias qhov no, nrog ob peb las hauv siab, nws yog qhov ua tau los tsim cov txheej txheem npaj thiab ua si nrog nws. Thiab, ntawm chav kawm, tsis txhob hnov ββββqab tshem tawm tag nrho cov peev txheej tom qab lawv tsis xav tau lawm.
Qhov ntau shards nyob rau hauv koj kwj, qhov ntau dua nws throughput. Nyob rau hauv txoj cai, qhov no yog li cas ntws yog scaled - los ntawm kev ntxiv shards. Tab sis qhov ntau shards koj muaj, tus nqi siab dua. Txhua shard raug nqi 1,5 xees ib teev thiab ntxiv 1.4 xees rau txhua lab PUT payload units.
Cia peb tsim ib lub kwj tshiab nrog lub npe airline_tickets, 1 shard yuav txaus rau nws:
Tam sim no cia peb tsim lwm txoj xov nrog lub npe tshwj xeeb_stream:
Tus neeg tsim khoom teeb tsa
Txhawm rau txheeb xyuas txoj haujlwm, nws txaus los siv EC2 piv txwv li tus tsim cov ntaub ntawv. Nws tsis tas yuav yog lub tshuab virtual uas muaj zog, kim heev; qhov chaw t2.micro yuav ua tau zoo.
Cov lus ceeb toom tseem ceeb: piv txwv li, koj yuav tsum siv cov duab - Amazon Linux AMI 2018.03.0, nws muaj tsawg dua kev teeb tsa kom sai sai rau Kinesis Agent.
Mus rau EC2 qhov kev pabcuam, tsim lub tshuab virtual tshiab, xaiv AMI yam xav tau nrog hom t2.micro, uas suav nrog hauv Qib Dawb:
Txhawm rau kom lub tshuab virtual tsim tshiab tuaj yeem cuam tshuam nrog Kinesis kev pabcuam, nws yuav tsum tau muab txoj cai los ua. Txoj hauv kev zoo tshaj los ua qhov no yog muab lub luag haujlwm IAM. Yog li ntawd, ntawm Kauj Ruam 3: Configure Instance Details screen, koj yuav tsum xaiv Tsim lub luag haujlwm IAM tshiab:
Tsim lub luag haujlwm IAM rau EC2
Hauv qhov rai uas qhib, xaiv tias peb tab tom tsim lub luag haujlwm tshiab rau EC2 thiab mus rau ntu Kev Tso Cai:
Siv cov piv txwv kev cob qhia, peb tsis tas yuav mus rau hauv txhua qhov tsis sib haum xeeb ntawm kev teeb tsa ntawm cov peev txheej muaj cai, yog li peb yuav xaiv cov cai tswjfwm ua ntej los ntawm Amazon: AmazonKinesisFullAccess thiab CloudWatchFullAccess.
Cia peb muab qee lub npe tseem ceeb rau lub luag haujlwm no, piv txwv li: EC2-KinesisStreams-FullAccess. Cov txiaj ntsig yuav tsum zoo ib yam li pom hauv daim duab hauv qab no:
Tom qab tsim lub luag haujlwm tshiab no, tsis txhob hnov ββββqab muab nws tso rau hauv lub tshuab virtual tsim:
Peb tsis hloov lwm yam ntawm qhov screen no thiab txav mus rau lub qhov rais tom ntej.
Lub hard drive chaw tuaj yeem raug tso tseg raws li lub neej ntawd, nrog rau cov cim npe (txawm hais tias nws yog qhov zoo los siv cov cim npe, tsawg kawg yog muab lub npe piv txwv thiab qhia txog ib puag ncig).
Txhawm rau ua haujlwm nrog Kinesis Agent, tom qab ua tiav txuas rau lub tshuab, koj yuav tsum nkag mus rau cov lus txib hauv qab no hauv lub davhlau ya nyob twg:
Raws li koj tuaj yeem pom, txhua yam ua haujlwm thiab Kinesis Agent ua tiav xa cov ntaub ntawv mus rau kwj. Tam sim no cia peb teeb tsa cov neeg siv khoom.
Teeb tsa Kinesis Data Analytics
Cia peb txav mus rau qhov tseem ceeb ntawm tag nrho cov kab ke - tsim ib daim ntawv thov tshiab hauv Kinesis Data Analytics lub npe hu ua kinesis_analytics_airlines_app:
Kinesis Cov Ntaub Ntawv Analytics tso cai rau koj los ua cov ntaub ntawv tshawb xyuas lub sijhawm ntawm Kinesis Stream siv cov lus SQL. Nws yog qhov kev pabcuam autoscaling tag nrho (tsis zoo li Kinesis Stream) uas:
tso cai rau koj los tsim cov kwj tshiab (Output Stream) raws li kev thov rau cov ntaub ntawv;
muab cov kwj deg uas tshwm sim thaum cov ntawv thov tau ua haujlwm (Yam kwj);
Tom ntej no, koj yuav tsum tau txuas lub luag haujlwm IAM tshiab kom daim ntawv thov tuaj yeem nyeem los ntawm cov kwj dej thiab sau rau hauv kwj. Txhawm rau ua qhov no, nws txaus kom tsis txhob hloov dab tsi hauv Access permissions block:
Tam sim no cia peb nrhiav pom cov ntaub ntawv schema hauv cov kwj deg; ua qhov no, nyem rau ntawm "Discover schema" khawm. Raws li qhov tshwm sim, lub luag haujlwm IAM yuav raug hloov kho (ib qho tshiab yuav raug tsim) thiab kev tshawb nrhiav schema yuav raug tso tawm los ntawm cov ntaub ntawv uas twb tuaj txog hauv cov kwj deg:
Tam sim no koj yuav tsum mus rau SQL editor. Thaum koj nyem rau ntawm lub pob no, lub qhov rai yuav tshwm sim hais kom koj tso daim ntawv thov - xaiv yam koj xav tso tawm:
Ntxig cov lus nug yooj yim hauv qab no rau hauv SQL editor qhov rai thiab nyem Txuag thiab khiav SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
Cov lus nug SQL tau nthuav tawm saum toj no tshawb rau Aeroflot daim pib ntawm tus nqi qis dua tsib txhiab rubles. Txhua cov ntaub ntawv uas ua tau raws li cov xwm txheej no yuav muab tso rau hauv DESTINATION_SQL_STREAM kwj.
Sau npe rau lub ncauj lus no thiab qhia tus lej xov tooj ntawm tes uas yuav xa SMS ceeb toom:
Tsim ib lub rooj hauv DynamoDB
Txhawm rau khaws cov ntaub ntawv nyoos los ntawm lawv cov airline_tickets kwj, cia peb tsim ib lub rooj hauv DynamoDB nrog tib lub npe. Peb yuav siv record_id ua tus yuam sij tseem ceeb:
Tsim lub lambda muaj nuj nqi collector
Cia peb tsim lub luag haujlwm lambda hu ua Tus Sau, uas nws txoj haujlwm yuav yog los xaiv cov airline_tickets kwj thiab, yog tias pom cov ntaub ntawv tshiab muaj, ntxig cov ntaub ntawv no rau hauv DynamoDB rooj. Pom tseeb, ntxiv rau cov cai qub, lambda no yuav tsum tau nyeem nkag mus rau Kinesis cov ntaub ntawv ntws thiab sau nkag mus rau DynamoDB.
Tsim lub luag haujlwm IAM rau lub luag haujlwm lambda
Ua ntej, cia peb tsim lub luag haujlwm IAM tshiab rau lambda npe Lambda-TicketsProcessingRole:
Rau qhov kev xeem piv txwv, qhov pre-configured AmazonKinesisReadOnlyAccess thiab AmazonDynamoDBFullAccess cov cai yog qhov tsim nyog, raws li qhia hauv daim duab hauv qab no:
Qhov no lambda yuav tsum tau launched los ntawm ib tug tshwm sim los ntawm Kinesis thaum tshiab nkag mus rau hauv lub airline_stream, yog li peb yuav tsum tau ntxiv ib tug tshiab trigger:
Txhua yam uas tseem tshuav yog txhawm rau ntxig tus lej thiab txuag lub lambda.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Tsim lub lambda muaj nuj nqi ceeb toom
Qhov thib ob lambda muaj nuj nqi, uas yuav saib xyuas cov kwj thib ob (special_stream) thiab xa cov ntawv ceeb toom rau SNS, yog tsim nyob rau hauv ib txoj kev zoo sib xws. Yog li ntawd, lub lambda no yuav tsum muaj kev nkag mus nyeem los ntawm Kinesis thiab xa cov lus mus rau lub ntsiab lus SNS, uas yuav raug xa los ntawm SNS kev pabcuam rau txhua tus neeg siv ntawm lub ncauj lus no (email, SMS, thiab lwm yam).
Tsim lub luag haujlwm IAM
Ua ntej, peb tsim lub luag haujlwm IAM Lambda-KinesisAlarm rau lub lambda no, thiab tom qab ntawd muab lub luag haujlwm no rau lub alarm_notifier lambda tau tsim:
Qhov no lambda yuav tsum ua hauj lwm ntawm ib tug trigger rau cov ntaub ntawv tshiab nkag mus rau tshwj xeeb_stream, yog li koj yuav tsum tau configure lub trigger nyob rau hauv tib txoj kev raws li peb tau ua rau lub Collector lambda.
Txhawm rau ua kom yooj yim rau kev teeb tsa lub lambda no, cia peb qhia qhov hloov pauv ib puag ncig tshiab - TOPIC_ARN, qhov twg peb tso ANR (Amazon Recourse Names) ntawm lub ntsiab lus ntawm lub tuam txhab:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Nws zoo nkaus li qhov no yog qhov uas phau ntawv qhia kev teeb tsa tiav. Txhua yam uas tseem tshuav yog sim thiab xyuas kom meej tias peb tau teeb tsa txhua yam kom raug.
Deploy los ntawm Terraform code
Yuav tsum tau npaj
Terraform yog ib qho yooj yim qhib-qhov cuab yeej rau deploying infrastructure los ntawm code. Nws muaj nws tus kheej syntax uas yooj yim kawm thiab muaj ntau yam piv txwv ntawm yuav ua li cas thiab siv dab tsi. Atom editor lossis Visual Studio Code muaj ntau cov plugins uas ua haujlwm nrog Terraform yooj yim dua.
Tag nrho cov cai ntawm qhov project yog hauv kuv qhov chaw khaws cia. Peb clone lub repository rau peb tus kheej. Ua ntej pib, koj yuav tsum paub tseeb tias koj muaj AWS CLI ntsia thiab teeb tsa, vim ... Terraform yuav nrhiav cov ntaub ntawv pov thawj hauv cov ntaub ntawv ~/.aws/credentials.
Ib qho kev xyaum zoo yog los khiav cov phiaj xwm hais kom ua ua ntej siv tag nrho cov txheej txheem kom pom dab tsi Terraform tab tom tsim rau peb hauv huab:
terraform.exe plan
Koj yuav raug ceeb toom kom nkag mus rau tus lej xov tooj kom xa cov ntawv ceeb toom rau. Nws tsis yog yuav tsum tau nkag mus rau hauv theem no.
Tom qab txheeb xyuas qhov kev npaj ua haujlwm, peb tuaj yeem pib tsim cov peev txheej:
terraform.exe apply
Tom qab xa cov lus txib no, koj yuav rov hais dua kom nkag mus rau tus lej xov tooj; hu rau "yog" thaum muaj lus nug txog kev ua haujlwm tau tshwm sim. Qhov no yuav tso cai rau koj los teeb tsa tag nrho cov txheej txheem, ua tiav txhua qhov tsim nyog ntawm EC2, xa cov haujlwm lambda, thiab lwm yam.
Tom qab tag nrho cov peev txheej tau ua tiav tiav los ntawm Terraform code, koj yuav tsum nkag mus rau cov ntsiab lus ntawm Kinesis Analytics daim ntawv thov (hmoov tsis tau, kuv tsis pom yuav ua li cas ncaj qha los ntawm cov cai).
Tua tawm daim ntawv thov:
Tom qab ntawd, koj yuav tsum qhia meej meej lub npe hauv daim ntawv thov kwj los ntawm kev xaiv los ntawm cov npe teev tseg:
Tam sim no txhua yam yog npaj mus.
Kuaj daim ntawv thov
Txawm hais tias koj siv lub system li cas, manually lossis los ntawm Terraform code, nws yuav ua haujlwm zoo ib yam.
Peb nkag los ntawm SSH mus rau EC2 lub tshuab virtual qhov twg Kinesis Agent raug teeb tsa thiab khiav api_caller.py tsab ntawv
sudo ./api_caller.py TOKEN
Txhua yam koj yuav tsum ua yog tos SMS rau koj tus lej: