ãããããã«ïŒ
é£è¡æ©ãé£ã°ãã®ã¯å¥œãã§ãã? ç§ã¯ããã倧奜ãã§ãããèªäž»éé¢äžã«ãããç¥ããããªãœãŒã¹ã® XNUMX ã€ã§ãã Aviasales ã®èªç©ºåžã®ããŒã¿åæã«ã倢äžã«ãªããŸããã
ä»æ¥ã¯ãAmazon Kinesis ã®åäœãåæãããªã¢ã«ã¿ã€ã åæãåããã¹ããªãŒãã³ã° ã·ã¹ãã ãæ§ç¯ããã¡ã€ã³ ããŒã¿ ã¹ãã¬ãŒãžãšã㊠Amazon DynamoDB NoSQL ããŒã¿ããŒã¹ãã€ã³ã¹ããŒã«ããèå³æ·±ããã±ããã® SMS éç¥ãèšå®ããŸãã
ãã¹ãŠã®è©³çŽ°ã¯ã«ããã®äžã«ãããŸãïŒ è¡ãïŒ
å°å ¥
ããšãã°ã次ãžã®ã¢ã¯ã»ã¹ãå¿
èŠã§ãã
ãã®èšäºã®äž»ãªç®çã¯ãAWS ã§ã®æ å ±ã¹ããªãŒãã³ã°ã®äœ¿çšã«ã€ããŠäžè¬çãªç解ãäžããããšã§ãã䜿çšããã API ã«ãã£ãŠè¿ãããããŒã¿ã¯å³å¯ã«ã¯ææ°ã§ã¯ãªãããã£ãã·ã¥ããéä¿¡ãããããšãèæ ®ããŠããŸããéå» 48 æéã® Aviasales.ru ããã³ Jetradar.com ãµã€ãã®ãŠãŒã¶ãŒã«ããæ€çŽ¢ã«åºã¥ããŠåœ¢æãããŸãã
çæãã·ã³ã«ã€ã³ã¹ããŒã«ãããAPI çµç±ã§åä¿¡ããã Kinesis ãšãŒãžã§ã³ãã¯ãããŒã¿ãèªåçã«è§£æããKinesis Data Analytics çµç±ã§ç®çã®ã¹ããªãŒã ã«éä¿¡ããŸãã ãã®ã¹ããªãŒã ã®çããŒãžã§ã³ã¯ã¹ãã¢ã«çŽæ¥æžã蟌ãŸããŸãã DynamoDB ã«ãããã€ãããçããŒã¿ ã¹ãã¬ãŒãžã«ãããAWS Quick Sight ãªã©ã® BI ããŒã«ãéããŠãã詳现ãªãã±ããåæãå¯èœã«ãªããŸãã
ã€ã³ãã©ã¹ãã©ã¯ãã£å šäœããããã€ããããã® XNUMX ã€ã®ãªãã·ã§ã³ãæ€èšããŸãã
- ããã¥ã¢ã« - AWS ãããžã¡ã³ãã³ã³ãœãŒã«çµç±ã
- Terraform ã³ãŒãããã®ã€ã³ãã©ã¹ãã©ã¯ãã£ã¯ãé 延ãªãŒãã¡ãŒã¿çšã§ãã
éçºããã·ã¹ãã ã®ã¢ãŒããã¯ãã£
䜿çšããã³ã³ããŒãã³ã:
Aviasales API â ãã® API ã«ãã£ãŠè¿ãããããŒã¿ã¯ãåŸç¶ã®ãã¹ãŠã®äœæ¥ã«äœ¿çšãããŸããEC2ãããã¥ãŒãµãŒã€ã³ã¹ã¿ã³ã¹ â å ¥åããŒã¿ ã¹ããªãŒã ãçæãããã¯ã©ãŠãå ã®éåžžã®ä»®æ³ãã·ã³:ããã·ã¹ãšãŒãžã§ã³ã ã¯ãã·ã³ã«ããŒã«ã«ã«ã€ã³ã¹ããŒã«ããã Java ã¢ããªã±ãŒã·ã§ã³ã§ãããŒã¿ãåéã㊠Kinesis (Kinesis Data Streams ãŸã㯠Kinesis Firehose) ã«éä¿¡ããç°¡åãªæ¹æ³ãæäŸããŸãã ãšãŒãžã§ã³ãã¯ãæå®ããããã£ã¬ã¯ããªå ã®äžé£ã®ãã¡ã€ã«ãåžžã«ç£èŠããæ°ããããŒã¿ã Kinesis ã«éä¿¡ããŸããAPIåŒã³åºãå ã¹ã¯ãªãã â API ã«ãªã¯ãšã¹ããäœæãããã®å¿çã Kinesis ãšãŒãžã§ã³ãã«ãã£ãŠç£èŠããããã©ã«ããŒã«å ¥ãã Python ã¹ã¯ãªããã
KinesisããŒã¿ã¹ããªãŒã â å¹ åºãæ¡åŒµæ©èœãåãããªã¢ã«ã¿ã€ã ããŒã¿ ã¹ããªãŒãã³ã° ãµãŒãã¹ãããã·ã¹åæ ã¯ãã¹ããªãŒãã³ã° ããŒã¿ã®ãªã¢ã«ã¿ã€ã åæãç°¡çŽ åãããµãŒããŒã¬ã¹ ãµãŒãã¹ã§ãã Amazon Kinesis Data Analytics ã¯ã¢ããªã±ãŒã·ã§ã³ãªãœãŒã¹ãæ§æããåä¿¡ããŒã¿ã®éã«é¢ä¿ãªãåŠçã§ããããã«èªåçã«ã¹ã±ãŒãªã³ã°ããŸããAWSã©ã ã â ãµãŒããŒã®ããã¯ã¢ãããã»ããã¢ãããè¡ããã«ã³ãŒããå®è¡ã§ãããµãŒãã¹ã ãã¹ãŠã®ã³ã³ãã¥ãŒãã£ã³ã°èœåã¯åŒã³åºãããšã«èªåçã«ã¹ã±ãŒãªã³ã°ãããŸããAmazon DynamoDB - ãããªãèŠæš¡ã§ãå®è¡æã« 10 ããªç§æªæºã®ã¬ã€ãã³ã·ãæäŸãããããŒãšå€ã®ãã¢ãšããã¥ã¡ã³ãã®ããŒã¿ããŒã¹ã DynamoDB ã䜿çšããå ŽåããµãŒããŒã®ããããžã§ãã³ã°ããããé©çšã管çã¯å¿ èŠãããŸããã DynamoDB ã¯ããŒãã«ãèªåçã«ã¹ã±ãŒãªã³ã°ããŠãå©çšå¯èœãªãªãœãŒã¹ã®éã調æŽããé«ãããã©ãŒãã³ã¹ãç¶æããŸãã ã·ã¹ãã 管çã¯å¿ èŠãããŸãããã¢ããŸã³SNS - ãããªãã·ã£ãŒ/ãµãã¹ã¯ã©ã€ã㌠(Pub/Sub) ã¢ãã«ã䜿çšããŠã¡ãã»ãŒãžãéä¿¡ããããã®ãã«ãããŒãžã ãµãŒãã¹ããã€ã¯ããµãŒãã¹ãåæ£ã·ã¹ãã ããµãŒããŒã¬ã¹ ã¢ããªã±ãŒã·ã§ã³ãåé¢ã§ããŸãã SNS ã䜿çšãããšãã¢ãã€ã« ããã·ã¥éç¥ãSMS ã¡ãã»ãŒãžãé»åã¡ãŒã«ãéããŠãšã³ã ãŠãŒã¶ãŒã«æ å ±ãéä¿¡ã§ããŸãã
åæãã¬ãŒãã³ã°
ããŒã¿ ãããŒããšãã¥ã¬ãŒãããããã«ãAviasales API ããè¿ãããèªç©ºåžæ
å ±ã䜿çšããããšã«ããŸããã ã§
ããã§ã¯ãç»é²ããŠããŒã¯ã³ãååŸããŸãããã
ãªã¯ãšã¹ãã®äŸã以äžã«ç€ºããŸãã
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
ãªã¯ãšã¹ãã§ããŒã¯ã³ãæå®ã㊠API ããããŒã¿ãåãåãäžèšã®æ¹æ³ã¯æ©èœããŸãããããããŒãéããŠã¢ã¯ã»ã¹ ããŒã¯ã³ãæž¡ãããšã奜ãããããã®ã¡ãœããã api_caller.py ã¹ã¯ãªããã§äœ¿çšããŸãã
åçäŸ:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
äžã® API å¿çã®äŸã¯ããµã³ã¯ãããã«ãã«ã¯ããããã¯ãŸã§ã®ãã±ããã瀺ããŠããŸã...ããããªããŠå€¢ã§ããã...
ç§ã¯ã«ã¶ã³åºèº«ã§ãããŒã±ããã¯ä»ã§ã¯ãåãªã倢ããªã®ã§ããµã³ã¯ãããã«ãã«ã¯ããã«ã¶ã³ãŸã§ã®ãã±ãããæ¢ããŸãããã
ãã§ã« AWS ã¢ã«ãŠã³ããæã£ãŠããããšãåæãšããŠããŸãã Kinesis ãš SMS ã«ããéç¥ã®éä¿¡ãå¹Žæ¬¡å ±åæžã«å«ãŸããŠããªããšããäºå®ã«ãçŽã¡ã«ç¹å¥ãªæ³šæãæã£ãŠããã ããããšæããŸãã
ç¡æå©çšæ (ç¡æ䜿çš) ã ããããããã«ãããããããæ°ãã«ã念é ã«çœ®ããŠãææ¡ãããã·ã¹ãã ãæ§ç¯ããŠãããè©Šãããšã¯ååã«å¯èœã§ãã ãããŠãã¡ãããäžèŠã«ãªã£ããªãœãŒã¹ã¯ãã¹ãŠåé€ããããšãå¿ããªãã§ãã ããã
幞ããªããšã«ãæ¯æã®ç¡æå¶éãæºãããŠããã°ãDynamoDb ãš lambda é¢æ°ã¯ç¡æã«ãªããŸãã ããšãã°ãDynamoDB ã®å Žå: 25 GB ã®ã¹ãã¬ãŒãžã25 WCU/RCUãããã³ 100 åã¯ãšãªã ãããŠãXNUMX ãæããã XNUMX äžä»¶ã®ã©ã ãé¢æ°åŒã³åºãããããŸãã
æåã«ããã·ã¹ãã å±é
Kinesis Data Streams ã®ã»ããã¢ãã
Kinesis Data Streams ãµãŒãã¹ã«ç§»åããXNUMX ã€ã®æ°ããã¹ããªãŒã (ããããã« XNUMX ã€ã®ã·ã£ãŒã) ãäœæããŠã¿ãŸãããã
ã·ã£ãŒããšã¯äœã§ãã?
ã·ã£ãŒãã¯ãAmazon Kinesis ã¹ããªãŒã ã®åºæ¬çãªããŒã¿è»¢éåäœã§ãã 1 ã€ã®ã»ã°ã¡ã³ãã¯ã2 MB/s ã®é床ã§å
¥åããŒã¿è»¢éãæäŸãã1000 MB/s ã®é床ã§åºåããŒã¿è»¢éãæäŸããŸãã 2 ã€ã®ã»ã°ã¡ã³ãã¯ã4 ç§ãããæ倧 2000 ã® PUT ãšã³ããªããµããŒãããŸãã ããŒã¿ ã¹ããªãŒã ãäœæãããšãã¯ãå¿
èŠãªã»ã°ã¡ã³ãæ°ãæå®ããå¿
èŠããããŸãã ããšãã°ãXNUMX ã€ã®ã»ã°ã¡ã³ããå«ãããŒã¿ ã¹ããªãŒã ãäœæã§ããŸãã ãã®ããŒã¿ ã¹ããªãŒã ã¯ãXNUMX MB/ç§ã®å
¥åããŒã¿è»¢éãš XNUMX MB/ç§ã®åºåããŒã¿è»¢éãæäŸããXNUMX ç§ãããæ倧 XNUMX ã® PUT ã¬ã³ãŒãããµããŒãããŸãã
ã¹ããªãŒã å ã®ã·ã£ãŒããå€ãã»ã©ãã¹ã«ãŒããããåäžããŸãã ååãšããŠãããã¯ã·ã£ãŒããè¿œå ããããšã«ãã£ãŠãããŒãã¹ã±ãŒãªã³ã°ããæ¹æ³ã§ãã ãã ããã·ã£ãŒããå€ããã°å€ãã»ã©ãäŸ¡æ Œã¯é«ããªããŸãã åã·ã£ãŒãã®æé㯠1,5 æéããã 1.4 ã»ã³ãã§ãPUT ãã€ããŒã ãŠããã XNUMX äžåããšã«è¿œå 㧠XNUMX ã»ã³ãããããŸãã
次ã®ååã§æ°ããã¹ããªãŒã ãäœæããŸããã èªç©ºåž_ãã±ããã1 ã€ã®ã·ã£ãŒãã§ååã§ãã
次ã®ååã§å¥ã®ã¹ã¬ãããäœæããŸããã ç¹å¥ãªã¹ããªãŒã :
ãããã¥ãŒãµãŒã®ã»ããã¢ãã
ã¿ã¹ã¯ãåæããã«ã¯ãéåžžã® EC2 ã€ã³ã¹ã¿ã³ã¹ãããŒã¿ãããã¥ãŒãµãŒãšããŠäœ¿çšããã ãã§ååã§ãã 匷åã§é«äŸ¡ãªä»®æ³ãã·ã³ã§ããå¿ èŠã¯ãªããã¹ãããç㪠t2.micro ã§ååã§ãã
éèŠãªæ³šæäºé : ããšãã°ãã€ã¡ãŒãž - Amazon Linux AMI 2018.03.0 ã䜿çšããå¿ èŠããããŸããKinesis ãšãŒãžã§ã³ããè¿ éã«èµ·åããããã®èšå®ãå°ãªããªããŸãã
EC2 ãµãŒãã¹ã«ç§»åããæ°ããä»®æ³ãã·ã³ãäœæããç¡æå©çšæ ã«å«ãŸããã¿ã€ã t2.micro ã®ç®çã® AMI ãéžæããŸãã
æ°ããäœæãããä»®æ³ãã·ã³ã Kinesis ãµãŒãã¹ãšå¯Ÿè©±ã§ããããã«ããã«ã¯ãä»®æ³ãã·ã³ã«ãã®ããã®æš©éãä»äžããå¿
èŠããããŸãã ãããè¡ãæåã®æ¹æ³ã¯ãIAM ããŒã«ãå²ãåœãŠãããšã§ãã ãããã£ãŠããã¹ããã 3: ã€ã³ã¹ã¿ã³ã¹ã®è©³çŽ°ã®æ§æãç»é¢ã§ããã€ã³ã¹ã¿ã³ã¹ã®è©³çŽ°ãæ§æããããéžæããå¿
èŠããããŸãã æ°ãã IAM ããŒã«ãäœæãã:
EC2 ã® IAM ããŒã«ã®äœæ
éãããŠã£ã³ããŠã§ãEC2 ã®æ°ããããŒã«ãäœæããããšãéžæãã[æš©é] ã»ã¯ã·ã§ã³ã«ç§»åããŸãã
ãã¬ãŒãã³ã°ã®äŸã䜿çšãããšããªãœãŒã¹æš©éã®è©³çŽ°ãªèšå®ã®è€éãããã¹ãŠè¡ãå¿
èŠããªããããAmazon ã«ãã£ãŠäºåèšå®ãããããªã·ãŒ (AmazonKinesisFullAccess ãš CloudWatchFullAccess) ãéžæããŸãã
ãã®ããŒã«ã«ããããããååãä»ããŸããã (äŸ: EC2-KinesisStreams-FullAccess)ã çµæã¯æ¬¡ã®å³ãšåãã«ãªãã¯ãã§ãã
ãã®æ°ããããŒã«ãäœæããããäœæããä»®æ³ãã·ã³ ã€ã³ã¹ã¿ã³ã¹ã«å¿ããã«ã¢ã¿ããããŠãã ããã
ãã®ç»é¢ã§ã¯ä»ã«äœãå€æŽããã次ã®ãŠã£ã³ããŠã«é²ã¿ãŸãã
ããŒããã©ã€ãã®èšå®ãšã¿ã°ã¯ããã©ã«ãã®ãŸãŸã«ããããšãã§ããŸã (ã¿ã°ã䜿çšããããšããå§ãããŸãããå°ãªããšãã€ã³ã¹ã¿ã³ã¹ã«ååãä»ããç°å¢ã瀺ãããã«ããŠãã ãã)ã
[ã¹ããã 6: ã»ãã¥ãªã㣠ã°ã«ãŒãã®æ§æ] ã¿ãã衚瀺ãããŸããããã§ãæ°ããã»ãã¥ãªã㣠ã°ã«ãŒããäœæããããæ¢åã®ã»ãã¥ãªã㣠ã°ã«ãŒããæå®ããå¿ èŠããããŸããããã«ãããSSH (ããŒã 22) çµç±ã§ã€ã³ã¹ã¿ã³ã¹ã«æ¥ç¶ã§ããããã«ãªããŸãã ãã㧠[Source] -> [My IP] ãéžæãããšãã€ã³ã¹ã¿ã³ã¹ãèµ·åã§ããŸãã
å®è¡ã¹ããŒã¿ã¹ã«åãæ¿ãããšããã«ãssh çµç±ã§æ¥ç¶ãè©Šè¡ã§ããŸãã
Kinesis Agent ã䜿çšã§ããããã«ããã«ã¯ããã·ã³ã«æ£åžžã«æ¥ç¶ããåŸãã¿ãŒããã«ã§æ¬¡ã®ã³ãã³ããå ¥åããå¿ èŠããããŸãã
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
API ã¬ã¹ãã³ã¹ãä¿åãããã©ã«ããŒãäœæããŸãããã
sudo mkdir /var/log/airline_tickets
ãšãŒãžã§ã³ããéå§ããåã«ããã®æ§æãæ§æããå¿ èŠããããŸãã
sudo vim /etc/aws-kinesis/agent.json
Agent.json ãã¡ã€ã«ã®å 容ã¯æ¬¡ã®ããã«ãªããŸãã
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
èšå®ãã¡ã€ã«ãããããããã«ããšãŒãžã§ã³ã㯠/var/log/airline_tickets/ ãã£ã¬ã¯ããªå ã® .log æ¡åŒµåãæã€ãã¡ã€ã«ãç£èŠãããããã解æããŠãairline_tickets ã¹ããªãŒã ã«è»¢éããŸãã
ãµãŒãã¹ãåèµ·åãããµãŒãã¹ã皌åããŠããããšã確èªããŸãã
sudo service aws-kinesis-agent restart
次ã«ãAPI ããããŒã¿ããªã¯ãšã¹ããã Python ã¹ã¯ãªãããããŠã³ããŒãããŸãããã
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py ã¹ã¯ãªããã¯ãAviasales ããããŒã¿ãèŠæ±ããåä¿¡ããå¿çã Kinesis ãšãŒãžã§ã³ããã¹ãã£ã³ãããã£ã¬ã¯ããªã«ä¿åããŸãã ãã®ã¹ã¯ãªããã®å®è£ ã¯éåžžã«æšæºçã§ãTicketsApi ã¯ã©ã¹ããããããã䜿çšã㊠API ãéåæã«ãã«ã§ããŸãã ããŒã¯ã³ãå«ãããããŒãšãªã¯ãšã¹ã ãã©ã¡ãŒã¿ãŒããã®ã¯ã©ã¹ã«æž¡ããŸãã
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
ãšãŒãžã§ã³ãã®æ£ããèšå®ãšæ©èœããã¹ãããããã«ãapi_caller.py ã¹ã¯ãªããããã¹ãå®è¡ããŠã¿ãŸãããã
sudo ./api_caller.py TOKEN
ãããŠããšãŒãžã§ã³ã ãã°ãšãairline_tickets ããŒã¿ ã¹ããªãŒã ã® [ç£èŠ] ã¿ãã§äœæ¥ã®çµæã確èªããŸãã
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
ã芧ã®ãšããããã¹ãŠãæ©èœããKinesis ãšãŒãžã§ã³ãã¯ã¹ããªãŒã ã«ããŒã¿ãæ£åžžã«éä¿¡ããŸãã 次ã«ãã³ã³ã·ã¥ãŒããŒãæ§æããŸãããã
Kinesis Data Analytics ã®ã»ããã¢ãã
ã·ã¹ãã å šäœã®äžå¿ã³ã³ããŒãã³ãã«ç§»ããŸããããKinesis Data Analytics 㧠kinesis_analytics_airlines_app ãšããååã®æ°ããã¢ããªã±ãŒã·ã§ã³ãäœæããŸãã
Kinesis Data Analytics ã䜿çšãããšãSQL èšèªã䜿çšã㊠Kinesis Streams ãããªã¢ã«ã¿ã€ã ã®ããŒã¿åæãå®è¡ã§ããŸãã ããã¯ã(Kinesis Streams ãšã¯ç°ãªã) å®å
šã«èªåã¹ã±ãŒãªã³ã°ãããµãŒãã¹ã§ããã次ã®ããšãè¡ããŸãã
- ãœãŒã¹ ããŒã¿ãžã®ãªã¯ãšã¹ãã«åºã¥ããŠæ°ããã¹ããªãŒã (åºåã¹ããªãŒã ) ãäœæã§ããŸãã
- ã¢ããªã±ãŒã·ã§ã³ã®å®è¡äžã«çºçãããšã©ãŒãå«ãã¹ããªãŒã (ãšã©ãŒ ã¹ããªãŒã ) ãæäŸããŸãã
- å ¥åããŒã¿ ã¹ããŒã ãèªåçã«æ±ºå®ã§ããŸã (å¿ èŠã«å¿ããŠæåã§åå®çŸ©ã§ããŸã)ã
ããã¯ã0.11 æéã®äœæ¥ããã XNUMX ç±³ãã«ãšããå®ããµãŒãã¹ã§ã¯ãªããããæ éã«äœ¿çšããçµäºãããåé€ããå¿ èŠããããŸãã
ã¢ããªã±ãŒã·ã§ã³ãããŒã¿ ãœãŒã¹ã«æ¥ç¶ããŸãããã
æ¥ç¶ããã¹ããªãŒã (airline_tickets) ãéžæããŸãã
次ã«ãã¢ããªã±ãŒã·ã§ã³ãã¹ããªãŒã ããèªã¿åããã¹ããªãŒã ã«æžã蟌ãããšãã§ããããã«ãæ°ãã IAM ããŒã«ãã¢ã¿ããããå¿
èŠããããŸãã ãããè¡ãã«ã¯ãã¢ã¯ã»ã¹èš±å¯ãããã¯ãäœãå€æŽããªãã ãã§ååã§ãã
次ã«ãã¹ããªãŒã å
ã®ããŒã¿ ã¹ããŒãã®æ€åºããªã¯ãšã¹ãããŸãããããããè¡ãã«ã¯ã[ã¹ããŒãã®æ€åº] ãã¿ã³ãã¯ãªãã¯ããŸãã ãã®çµæãIAM ããŒã«ãæŽæ°ãã (æ°ããããŒã«ãäœæãã)ãã¹ããªãŒã ã«ãã§ã«å°çããŠããããŒã¿ããã¹ããŒãæ€åºãéå§ãããŸãã
次ã«ãSQL ãšãã£ã¿ãŒã«ç§»åããå¿
èŠããããŸãã ãã®ãã¿ã³ãã¯ãªãã¯ãããšãã¢ããªã±ãŒã·ã§ã³ã®èµ·åãæ±ãããŠã£ã³ããŠã衚瀺ãããŸããèµ·åãããã®ãéžæããŠãã ããã
次ã®åçŽãªã¯ãšãªã SQL ãšãã£ã¿ãŒ ãŠã£ã³ããŠã«æ¿å
¥ãã[SQL ãä¿åããŠå®è¡] ãã¯ãªãã¯ããŸãã
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
ãªã¬ãŒã·ã§ãã« ããŒã¿ããŒã¹ã§ã¯ãINSERT ã¹ããŒãã¡ã³ãã䜿çšããŠããŒãã«ãæäœããã¬ã³ãŒããè¿œå ããSELECT ã¹ããŒãã¡ã³ãã䜿çšããŠããŒã¿ãã¯ãšãªããŸãã Amazon Kinesis Data Analytics ã§ã¯ãã¹ããªãŒã (STREAM) ãšãã³ã (PUMP)ãã€ãŸãã¢ããªã±ãŒã·ã§ã³å ã® XNUMX ã€ã®ã¹ããªãŒã ããå¥ã®ã¹ããªãŒã ã«ããŒã¿ãæ¿å ¥ããé£ç¶çãªæ¿å ¥ãªã¯ãšã¹ããæäœããŸãã
äžèšã® SQL ã¯ãšãªã¯ãXNUMX ã«ãŒãã«æªæºã®ã¢ãšããããŒãèªç©ºåžãæ€çŽ¢ããŸãã ãããã®æ¡ä»¶ãæºãããã¹ãŠã®ã¬ã³ãŒã㯠DESTINATION_SQL_STREAM ã¹ããªãŒã ã«é 眮ãããŸãã
[å®å
] ãããã¯ã§ãspecial_stream ã¹ããªãŒã ãéžæãã[ã¢ããªã±ãŒã·ã§ã³å
ã¹ããªãŒã å] DESTINATION_SQL_STREAM ããããããŠã³ ãªã¹ãã§æ¬¡ã®ããã«éžæããŸãã
ãã¹ãŠã®æäœã®çµæã¯ã次ã®å³ã®ããã«ãªããŸãã
SNSãããã¯ã®äœæãšè³Œèª
ã·ã³ãã«éç¥ãµãŒãã¹ã«ç§»åããããã«ãèªç©ºäŒç€Ÿããšããååã®æ°ãããããã¯ãäœæããŸãã
ãã®ãããã¯ã賌èªããSMS éç¥ã®éä¿¡å
ãšãªãæºåž¯é»è©±çªå·ãæå®ããŸãã
DynamoDB ã«ããŒãã«ãäœæãã
airline_tickets ã¹ããªãŒã ããã®çããŒã¿ãä¿åããã«ã¯ãDynamoDB ã«åãååã®ããŒãã«ãäœæããŸãããã äž»ããŒãšã㊠Record_id ã䜿çšããŸãã
ã©ã ãé¢æ°ã³ã¬ã¯ã¿ãŒã®äœæ
Collector ãšããã©ã ãé¢æ°ãäœæããŸãããããã®é¢æ°ã®ã¿ã¹ã¯ã¯ãairline_tickets ã¹ããªãŒã ãããŒãªã³ã°ããããã§æ°ããã¬ã³ãŒããèŠã€ãã£ãå Žåã¯ããããã®ã¬ã³ãŒãã DynamoDB ããŒãã«ã«æ¿å ¥ããããšã§ãã æããã«ããã®ã©ã ãã«ã¯ããã©ã«ãã®æš©éã«å ããŠãKinesis ããŒã¿ã¹ããªãŒã ãžã®èªã¿åãã¢ã¯ã»ã¹ãš DynamoDB ãžã®æžã蟌ã¿ã¢ã¯ã»ã¹ãå¿ èŠã§ãã
ã³ã¬ã¯ã¿ãŒã©ã ãé¢æ°ã® IAM ããŒã«ã®äœæ
ãŸããLambda-TicketsProcessingRole ãšããååã®ã©ã ãçšã®æ°ãã IAM ããŒã«ãäœæããŸãããã
ãã¹ãäŸã§ã¯ã以äžã®å³ã«ç€ºãããã«ãäºåèšå®ããã AmazonKinesisReadOnlyAccess ããªã·ãŒãš AmazonDynamoDBFullAccess ããªã·ãŒãéåžžã«é©ããŠããŸãã
ãã®ã©ã ãã¯ãæ°ãããšã³ããªãairline_streamã«å
¥ã£ããšãã«Kinesisããã®ããªã¬ãŒã«ãã£ãŠèµ·åãããå¿
èŠããããããæ°ããããªã¬ãŒãè¿œå ããå¿
èŠããããŸãã
æ®ã£ãŠããã®ã¯ãã³ãŒããæ¿å
¥ããŠã©ã ããä¿åããããšã ãã§ãã
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
ã©ã ãé¢æ°éç¥æ©èœã®äœæ
XNUMX çªç®ã®ã¹ããªãŒã (special_stream) ãç£èŠããSNS ã«éç¥ãéä¿¡ãã XNUMX çªç®ã®ã©ã ãé¢æ°ãåæ§ã®æ¹æ³ã§äœæãããŸãã ãããã£ãŠããã®ã©ã ãã«ã¯ãKinesis ããèªã¿åããç¹å®ã® SNS ãããã¯ã«ã¡ãã»ãŒãžãéä¿¡ããããã®ã¢ã¯ã»ã¹æš©ãå¿ èŠã§ããã¡ãã»ãŒãžã¯ãSNS ãµãŒãã¹ã«ãã£ãŠãã®ãããã¯ã®ãã¹ãŠã®ãµãã¹ã¯ã©ã€ã㌠(é»åã¡ãŒã«ãSMS ãªã©) ã«éä¿¡ãããŸãã
IAM ããŒã«ã®äœæ
ãŸãããã®ã©ã ãã® IAM ããŒã« Lambda-KinesisAlarm ãäœæãããã®ããŒã«ãäœæäžã®alarm_notifier ã©ã ãã«å²ãåœãŠãŸãã
ãã®ã©ã ãã¯ãæ°ããã¬ã³ãŒããspecial_stream ã«å
¥ãããªã¬ãŒã§åäœããå¿
èŠããããããã³ã¬ã¯ã¿ãŒ ã©ã ãã®å Žåãšåãæ¹æ³ã§ããªã¬ãŒãæ§æããå¿
èŠããããŸãã
ãã®ã©ã ãã®èšå®ãç°¡åã«ããããã«ãæ°ããç°å¢å€æ° TOPIC_ARN ãå°å ¥ããŸããããããã«ãèªç©ºäŒç€Ÿãããã¯ã® ANR (Amazon ãªã³ãŒã¹å) ãé 眮ããŸãã
ãããŠã©ã ãã³ãŒããæ¿å
¥ããŸããããã¯ãŸã£ããè€éã§ã¯ãããŸããã
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
ããã§æåã«ããã·ã¹ãã èšå®ãå®äºããããã§ãã æ®ã£ãŠããã®ã¯ããã¹ãŠãæ£ããæ§æãããŠããããšããã¹ãããŠç¢ºèªããããšã ãã§ãã
Terraform ã³ãŒããããããã€ãã
å¿ èŠãªæºå
ãã£ã¹ããªãã¥ãŒã·ã§ã³ãããŠã³ããŒãã§ããŸã
å§ãæ¹
ãããžã§ã¯ãã®å®å
šãªã³ãŒãã¯æ¬¡ã®ãšããã§ã
ã€ã³ãã©ã¹ãã©ã¯ãã£å šäœããããã€ããåã« plan ã³ãã³ããå®è¡ããŠãTerraform ãçŸåšã¯ã©ãŠãå ã«äœãäœæããŠãããã確èªããããšããå§ãããŸãã
terraform.exe plan
éç¥ã®éä¿¡å ã®é»è©±çªå·ãå ¥åããããã«æ±ããããŸãã ãã®æ®µéã§ã¯å ¥åããå¿ èŠã¯ãããŸããã
ããã°ã©ã ã®æäœèšç»ãåæãããããªãœãŒã¹ã®äœæãéå§ã§ããŸãã
terraform.exe apply
ãã®ã³ãã³ããéä¿¡ãããšãå床é»è©±çªå·ã®å ¥åãæ±ããããŸããå®éã®ã¢ã¯ã·ã§ã³ã®å®è¡ã«é¢ãã質åã衚瀺ããããããã¯ããããã€ã€ã«ããŸãã ããã«ãããã€ã³ãã©ã¹ãã©ã¯ãã£å šäœã®ã»ããã¢ãããEC2 ã®å¿ èŠãªãã¹ãŠã®æ§æã®å®è¡ãã©ã ãé¢æ°ã®ãããã€ãªã©ãå¯èœã«ãªããŸãã
Terraform ã³ãŒããéããŠãã¹ãŠã®ãªãœãŒã¹ãæ£åžžã«äœæãããããKinesis Analytics ã¢ããªã±ãŒã·ã§ã³ã®è©³çŽ°ã«å ¥ãå¿ èŠããããŸã (æ®å¿µãªããããããã³ãŒãããçŽæ¥è¡ãæ¹æ³ã¯èŠã€ãããŸããã§ãã)ã
ã¢ããªã±ãŒã·ã§ã³ãèµ·åããŸãã
ãã®åŸãããããããŠã³ ãªã¹ãããéžæããŠãã¢ããªã±ãŒã·ã§ã³å
ã¹ããªãŒã åãæ瀺çã«èšå®ããå¿
èŠããããŸãã
ããã§ãã¹ãŠã®æºåãæŽããŸããã
ã¢ããªã±ãŒã·ã§ã³ã®ãã¹ã
ã·ã¹ãã ãæåã§ãããã€ããå Žåã§ããTerraform ã³ãŒãã䜿çšããŠãããã€ããå Žåã§ããã·ã¹ãã ã¯åãããã«æ©èœããŸãã
Kinesis Agent ãã€ã³ã¹ããŒã«ãããŠãã EC2 ä»®æ³ãã·ã³ã« SSH çµç±ã§ãã°ã€ã³ããapi_caller.py ã¹ã¯ãªãããå®è¡ããŸãã
sudo ./api_caller.py TOKEN
èªåã®çªå·ã« SMS ãå±ãã®ãåŸ ã€ã ãã§ãã
SMS - ã»ãŒ 1 å以å
ã«ã¡ãã»ãŒãžãé»è©±ã«å±ããŸãã
ä»åŸã®ãã詳现ãªåæã®ããã«ãã¬ã³ãŒãã DynamoDB ããŒã¿ããŒã¹ã«ä¿åããããã©ããã確èªããå¿
èŠããããŸãã airline_tickets ããŒãã«ã«ã¯ãããã次ã®ããŒã¿ãå«ãŸããŸãã
ãŸãšã
äœæ¥ã®éçšã§ãAmazon Kinesis ã«åºã¥ããŠãªã³ã©ã€ã³ ããŒã¿åŠçã·ã¹ãã ãæ§ç¯ãããŸããã Kinesis Agent ã Kinesis Data Streams ããã³ SQL ã³ãã³ãã䜿çšãããªã¢ã«ã¿ã€ã åæ Kinesis Analytics ãšçµã¿åãããŠäœ¿çšââãããªãã·ã§ã³ãããã³ Amazon Kinesis ãšä»ã® AWS ãµãŒãã¹ãšã®çžäºäœçšãæ€èšãããŸããã
äžèšã®ã·ã¹ãã ã XNUMX ã€ã®æ¹æ³ã§ãããã€ããŸãããããªãé·ãæåã«ããæ¹æ³ãšãTerraform ã³ãŒãããã®ç°¡åãªæ¹æ³ã§ãã
ãã¹ãŠã®ãããžã§ã¯ãã®ãœãŒã¹ã³ãŒããå©çšå¯èœã§ã
ãã®èšäºã«ã€ããŠåãã§è°è«ããŸãã®ã§ãã³ã¡ã³ãããåŸ ã¡ããŠããŸãã 建èšçãªæ¹å€ãæåŸ ããŸãã
ç§ã¯ããªãã«æåãç¥ãïŒ
åºæïŒ habr.com