اې حبره!
ایا تاسو الوتنې الوتکې خوښوي؟ زه دا خوښوم ، مګر د ځان انزوا پرمهال زه د یوې پیژندل شوې سرچینې - Aviasales څخه د هوایی ټیکټونو ډیټا تحلیل کولو سره هم مینه لرم.
نن ورځ به موږ د ایمیزون کاینسیس کار تحلیل کړو ، د ریښتیني وخت تحلیلونو سره د سټیمینګ سیسټم رامینځته کړو ، د اصلي ډیټا ذخیره کولو په توګه د Amazon DynamoDB NoSQL ډیټابیس نصب کړو ، او په زړه پورې ټیکټونو لپاره د SMS خبرتیاوې تنظیم کړو.
ټول توضیحات د کټ لاندې دي! لاړ شه!
پېژندنه
د مثال په توګه، موږ ته اړتیا لرو
د دې مقالې اصلي هدف په AWS کې د معلوماتو سټینګ کارولو عمومي پوهه ورکول دي؛ موږ په پام کې نیسو چې د API لخوا کارول شوي ډاټا په کلکه تازه ندي او د کیچ څخه لیږدول کیږي، کوم چې د تیرو 48 ساعتونو لپاره د Aviasales.ru او Jetradar.com سایټونو کاروونکو لخوا د لټونونو پراساس رامینځته شوی.
د Kinesis-Agent، د تولید په ماشین کې نصب شوی، د API له لارې ترلاسه شوی به په اوتومات ډول د Kinesis Data Analytics له لارې مطلوب جریان ته ډاټا پارس او لیږدوي. د دې جریان خام نسخه به مستقیم پلورنځي ته ولیکل شي. په DynamoDB کې ګمارل شوي خام ډیټا ذخیره به د BI وسیلو له لارې د ټیکټ ژور تحلیل ته اجازه ورکړي ، لکه د AWS Quick Sight.
موږ به د ټول زیربنا د ځای پرځای کولو لپاره دوه اختیارونه په پام کې ونیسو:
- لارښود - د AWS مدیریت کنسول له لارې؛
- د Terraform کوډ څخه زیربنا د سست اتوماتیکانو لپاره ده؛
د پرمختللي سیسټم جوړښت
کارول شوي اجزا:
Aviasales API - د دې API لخوا بیرته راستانه شوي معلومات به د ټولو راتلونکو کارونو لپاره وکارول شي؛د EC2 تولیدونکي مثال - په بادل کې یو منظم مجازی ماشین چې په هغې کې به د ان پټ ډیټا جریان تولید شي:د کینیسیس ایجنټ د جاوا اپلیکیشن دی چې په محلي ډول په ماشین کې نصب شوی چې کاینسیس ته د ډیټا راټولولو او لیږلو لپاره اسانه لار وړاندې کوي (د کاینسیس ډیټا سټریمونه یا کیینیسس فایر هوز). اجنټ په دوامداره توګه په ټاکل شوي لارښود کې د فایلونو سیټ نظارت کوي او کینیسیس ته نوي معلومات لیږي؛د API کالر سکریپټ - د Python سکریپټ چې API ته غوښتنې کوي او ځواب په فولډر کې اچوي چې د کیینسیس اجنټ لخوا څارل کیږي؛
د Kinesis ډیټا جریانونه - د ریښتیني وخت ډیټا سټیمینګ خدمت د پراخه اندازه کولو وړتیاو سره؛Kinesis Analytics یو بې سرور خدمت دی چې په ریښتیني وخت کې د سټیمینګ ډیټا تحلیل ساده کوي. د Amazon Kinesis Data Analytics د غوښتنلیک سرچینې تنظیموي او په اتوماتيک ډول د راتلونکو معلوماتو هر مقدار اداره کولو لپاره پیمانه کوي؛AWS لامبده - یو خدمت چې تاسو ته اجازه درکوي د سرورونو بیک اپ یا تنظیم کولو پرته کوډ چل کړئ. د کمپیوټر ټول ځواک په اتوماتيک ډول د هر کال لپاره اندازه کیږي.Amazon DynamoDB - د کلیدي ارزښت جوړو او اسنادو ډیټابیس چې په هره پیمانه چلولو پرمهال له 10 ملی ثانیو څخه لږ ځنډ چمتو کوي. کله چې د DynamoDB کاروئ، تاسو اړتیا نلرئ کوم سرور چمتو کړئ، پیچ یا اداره کړئ. DynamoDB په اوتومات ډول میزونه اندازه کوي ترڅو د شته سرچینو مقدار تنظیم کړي او لوړ فعالیت وساتي. د سیسټم مدیریت ته اړتیا نشته؛ایمیزون SNS - د خپرونکي - پیرودونکي (Pub/Sub) ماډل په کارولو سره د پیغامونو لیږلو لپاره په بشپړ ډول اداره شوی خدمت ، د کوم سره چې تاسو کولی شئ مایکرو خدمتونه ، توزیع شوي سیسټمونه او بې سرور غوښتنلیکونه جلا کړئ. SNS د ګرځنده فشار خبرتیاو، SMS پیغامونو او بریښنالیکونو له لارې پای کاروونکو ته د معلوماتو لیږلو لپاره کارول کیدی شي.
لومړنۍ روزنه
د معلوماتو جریان تقلید کولو لپاره ، ما پریکړه وکړه چې د هوایی ډګر ټیکټ معلومات وکاروم چې د Aviasales API لخوا بیرته راستانه شوي. IN
نو، راځئ چې راجستر کړو او زموږ نښه ترلاسه کړو.
یوه بیلګه غوښتنه په لاندې ډول ده:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
په غوښتنه کې د نښه په ټاکلو سره د API څخه د معلوماتو ترلاسه کولو پورته میتود به کار وکړي ، مګر زه ترجیح ورکوم چې د لاسرسي نښه د سرلیک له لارې تیر کړم ، نو موږ به دا میتود په api_caller.py سکریپټ کې وکاروو.
ځواب بېلګه:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
پورته د API ځواب مثال د سینټ پیټرزبورګ څخه فوک ته ټیکټ ښیې ... اوه ، څه خوب ...
څرنګه چې زه د کازان څخه یم، او فوکیټ اوس "یوازې یو خوب" دی، راځئ چې د سینټ پیټرزبورګ څخه کازان ته ټکټونه وګورو.
دا فرض کوي چې تاسو دمخه د AWS حساب لرئ. زه غواړم سمدلاسه دې حقیقت ته ځانګړې پاملرنه راجلب کړم چې کینیسیس او د SMS له لارې د خبرتیاو لیږل په کلنۍ کې شامل ندي
وړیا درجه (وړیا کارول) . مګر حتی د دې سره سره، په ذهن کې د څو ډالرو سره، دا د وړاندیز شوي سیسټم رامینځته کول او د هغې سره لوبې کول خورا ممکن دي. او البته، مه هیروئ چې ټولې سرچینې حذف کړئ وروسته له دې چې دوی نور اړتیا نلري.
خوشبختانه، د DynamoDb او lambda فعالیتونه به زموږ لپاره وړیا وي که موږ خپل میاشتني وړیا محدودیتونه پوره کړو. د مثال په توګه، د DynamoDB لپاره: 25 GB ذخیره، 25 WCU/RCU او 100 ملیون پوښتنې. او په میاشت کې یو ملیون لامبډا فنکشن زنګونه.
د لاسي سیسټم پلي کول
د Kinesis ډیټا سټریمونو تنظیم کول
راځئ چې د کیینیسس ډیټا سټریمونو خدمت ته لاړ شو او دوه نوي جریانونه رامینځته کړو ، د هر یو لپاره یو شارډ.
شارډ څه شی دی؟
A شارډ د ایمیزون کینیسیس جریان د معلوماتو لیږد لومړني واحد دی. یوه برخه د 1 MB/s په سرعت سره د ان پټ ډیټا لیږد چمتو کوي او د 2 MB / s په سرعت سره د معلوماتو ډیټا لیږد چمتو کوي. یوه برخه په هره ثانیه کې تر 1000 PUT ننوتلو ملاتړ کوي. کله چې د ډیټا جریان رامینځته کړئ ، تاسو اړتیا لرئ د اړین شمیر برخو مشخص کړئ. د مثال په توګه، تاسو کولی شئ د دوو برخو سره د معلوماتو جریان جوړ کړئ. دا ډیټا جریان به په 2 MB / s کې د ان پټ ډیټا لیږد چمتو کړي او په 4 MB / s کې د محصول ډیټا لیږد چمتو کړي ، په هر ثانیه کې تر 2000 PUT ریکارډونو ملاتړ کوي.
هرڅومره چې ستاسو په جریان کې شارډونه ډیر وي ، هومره د هغې ټروپټ ډیریږي. په اصل کې، دا څنګه جریان اندازه کیږي - د شارډونو په اضافه کولو سره. مګر څومره چې تاسو ډیر شارډونه لرئ، قیمت به لوړ وي. هر شارډ په ساعت کې 1,5 سینټ او د هر ملیون PUT تادیاتو واحدونو لپاره اضافي 1.4 سینټ لګښت لري.
راځئ چې د نوم سره یو نوی جریان جوړ کړو هوايي_ټکيټونهد هغه لپاره به 1 شارډ کافی وي:
اوس راځئ چې د نوم سره بل تار جوړ کړو ځانګړی_سړی:
د تولید کونکي تنظیم کول
د یوې دندې تحلیل کولو لپاره ، دا کافي دي چې د ډیټا تولید کونکي په توګه د منظم EC2 مثال وکاروئ. دا باید یو پیاوړی، ګران مجازی ماشین نه وي؛ یو ځای t2.micرو به ښه کار وکړي.
مهم یادونه: د مثال په توګه ، تاسو باید عکس وکاروئ - د ایمیزون لینکس AMI 2018.03.0 ، دا د کاینسیس ایجنټ ګړندي پیل کولو لپاره لږ تنظیمات لري.
د EC2 خدمت ته لاړ شئ، یو نوی مجازی ماشین جوړ کړئ، مطلوب AMI د t2.micro ډول سره وټاکئ، کوم چې په وړیا ټیر کې شامل دی:
د دې لپاره چې نوي رامینځته شوي مجازی ماشین د کینیسیس خدمت سره متقابل عمل کولو وړ وي ، دا باید د دې کولو حق ورکړل شي. د دې کولو غوره لاره د IAM رول ټاکل دي. له همدې امله ، په 3 مرحله کې: د مثال توضیحاتو سکرین تنظیم کړئ ، تاسو باید غوره کړئ د IAM نوی رول جوړ کړئ:
د EC2 لپاره د IAM رول رامینځته کول
په هغه کړکۍ کې چې خلاصیږي، غوره کړئ چې موږ د EC2 لپاره نوی رول رامینځته کوو او د اجازې برخې ته لاړ شئ:
د روزنې مثال په کارولو سره ، موږ اړتیا نلرو د سرچینو حقونو دانه تنظیم کولو ټولو پیچلتیاو ته لاړ شو ، نو موږ به د ایمیزون لخوا دمخه تنظیم شوي پالیسۍ غوره کړو: AmazonKinesisFullAccess او CloudWatchFullAccess.
راځئ چې د دې رول لپاره یو څه معنی لرونکي نوم ورکړو، د بیلګې په توګه: EC2-KinesisStreams-FullAccess. پایله باید ورته وي لکه څنګه چې لاندې عکس کې ښودل شوي:
د دې نوي رول رامینځته کولو وروسته ، مه هیروئ چې دا د رامینځته شوي مجازی ماشین مثال سره ضمیمه کړئ:
موږ په دې سکرین کې بل څه نه بدلوو او راتلونکي کړکۍ ته ځو.
د هارډ ډرایو تنظیمات د ډیفالټ په توګه پاتې کیدی شي ، په بیله بیا ټاګونه (که څه هم د ټاګونو کارول ښه عمل دی ، لږترلږه مثال ته نوم ورکړئ او چاپیریال په ګوته کړئ).
اوس موږ په 6 مرحله کې یو: د امنیت ګروپ ټب تنظیم کړئ ، چیرې چې تاسو اړتیا لرئ یو نوی رامینځته کړئ یا خپل موجود امنیت ګروپ مشخص کړئ ، کوم چې تاسو ته اجازه درکوي د مثال سره د ssh (پورټ 22) له لارې وصل شئ. سرچینه وټاکئ -> زما IP هلته او تاسو کولی شئ مثال پیل کړئ.
هرڅومره ژر چې دا د چلولو حالت ته واړوي ، تاسو کولی شئ د ssh له لارې دې سره وصل کیدو هڅه وکړئ.
د کینیسیس اجنټ سره د کار کولو وړ کیدو لپاره ، د ماشین سره په بریالیتوب سره وصل کیدو وروسته ، تاسو باید لاندې کمانډونه په ترمینل کې دننه کړئ:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
راځئ چې د API ځوابونو خوندي کولو لپاره فولډر جوړ کړو:
sudo mkdir /var/log/airline_tickets
د ایجنټ پیل کولو دمخه، تاسو اړتیا لرئ چې د هغې ترتیب ترتیب کړئ:
sudo vim /etc/aws-kinesis/agent.json
د agent.json فایل منځپانګې باید داسې ښکاري:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
لکه څنګه چې د ترتیب کولو فایل څخه لیدل کیدی شي، اجنټ به فایلونه په /var/log/airline_tickets/ ډایرکټر کې د .log توسیع سره وڅاري، دوی یې پارس کړي او د airline_tickets جریان ته یې لیږدوي.
موږ خدمت بیا پیل کوو او ډاډ ترلاسه کوو چې دا پورته او روان دی:
sudo service aws-kinesis-agent restart
اوس راځئ چې د Python سکریپټ ډاونلوډ کړو چې د API څخه به د معلوماتو غوښتنه وکړي:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py سکریپټ د Aviasales څخه د معلوماتو غوښتنه کوي او ترلاسه شوي ځواب په ډایرکټر کې خوندي کوي چې د کینیسیس اجنټ سکین کوي. د دې سکریپټ پلي کول خورا معیاري دي ، د TicketsApi ټولګي شتون لري ، دا تاسو ته اجازه درکوي په غیر متزلزل ډول API راوباسي. موږ یو سرلیک د نښه سره لیږدوو او دې ټولګي ته د پیرامیټونو غوښتنه کوو:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
د اجنټ د سم تنظیماتو او فعالیت ازموینې لپاره ، راځئ چې د api_caller.py سکریپټ چلولو ازموینه وکړو:
sudo ./api_caller.py TOKEN
او موږ د کار پایلې د اجنټ لاګونو کې او د هوایی_ټیکټس ډیټا جریان کې د څارنې ټب کې ګورو:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
لکه څنګه چې تاسو لیدلی شئ، هرڅه کار کوي او د کیینیسس ایجنټ په بریالیتوب سره جریان ته ډاټا لیږي. اوس راځئ چې مصرف کونکي تنظیم کړو.
د کیینیسس ډیټا تحلیل تنظیم کول
راځئ چې د ټول سیسټم مرکزي برخې ته لاړ شو - د Kinesis_analytics_airlines_app په نوم د Kinesis Data Analytics کې یو نوی غوښتنلیک جوړ کړئ:
د Kinesis ډیټا تحلیل تاسو ته اجازه درکوي د ریښتیني وخت ډیټا تحلیلونه د کیینیسس سټریمونو څخه د SQL ژبې په کارولو سره ترسره کړئ. دا په بشپړ ډول د اتوماتیک کولو خدمت دی (د کیینیسس جریانونو برعکس) چې:
- تاسو ته اجازه درکوي نوي جریانونه رامینځته کړئ (آؤټ پټ سټریم) د سرچینې ډیټا غوښتنې پراساس؛
- د تیروتنو سره یو جریان چمتو کوي چې د غوښتنلیکونو د چلولو پرمهال پیښ شوي (د تېروتنې جریان)؛
- کولی شي په اتوماتيک ډول د ان پټ ډیټا سکیم وټاکي (دا د اړتیا په صورت کې په لاسي ډول بیا تعریف کیدی شي).
دا یو ارزانه خدمت ندی - د کار په هر ساعت کې 0.11 USD، نو تاسو باید دا په احتیاط سره وکاروئ او کله چې تاسو پای ته ورسیږئ حذف کړئ.
راځئ چې غوښتنلیک د معلوماتو سرچینې سره وصل کړو:
هغه جریان غوره کړئ چې موږ ورسره وصل یو (ایرلاین_ټیکټونه):
بیا، تاسو اړتیا لرئ یو نوی IAM رول ضمیمه کړئ ترڅو غوښتنلیک د جریان څخه لوستل شي او جریان ته ولیکئ. د دې کولو لپاره، دا بسیا ده چې د لاسرسي اجازه بلاک کې هیڅ شی بدل نه کړئ:
اوس راځئ چې په جریان کې د ډیټا سکیما کشف غوښتنه وکړو؛ د دې کولو لپاره ، د "سکیما کشف" تڼۍ کلیک وکړئ. د پایلې په توګه ، د IAM رول به تازه شي (یو نوی به رامینځته شي) او د سکیما کشف به د هغه ډیټا څخه پیل شي چې دمخه جریان ته رسیدلي:
اوس تاسو اړتیا لرئ د SQL مدیر ته لاړ شئ. کله چې تاسو په دې تڼۍ کلیک وکړئ، یوه کړکۍ به راښکاره شي چې تاسو څخه غوښتنه کوي چې اپلیکیشن پیل کړئ - هغه څه غوره کړئ چې تاسو یې پیل کول غواړئ:
لاندې ساده پوښتنه د SQL مدیر کړکۍ کې دننه کړئ او د SQL خوندي کولو او چلولو کلیک وکړئ:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
په اړونده ډیټابیسونو کې، تاسو د جدولونو سره کار کوئ د INSERT بیاناتو په کارولو سره د ریکارډونو اضافه کولو لپاره او د پوښتنو ډیټا لپاره د SELECT بیان. د ایمیزون کینیسیس ډیټا انلایټیکونو کې ، تاسو د جریانونو (سټریمونو) او پمپونو (PUMPs) سره کار کوئ - د دوامداره داخلولو غوښتنې چې په غوښتنلیک کې له یو جریان څخه بل جریان ته ډیټا داخلوي.
د SQL پوښتنې پورته وړاندې شوي د ایروفلوټ ټیکټونو لټون د پنځه زره روبلو څخه ښکته لګښت کې. ټول ریکارډونه چې دا شرایط پوره کوي په DESTINATION_SQL_STREAM جریان کې به ځای په ځای شي.
د منزل بلاک کې، د ځانګړي_سټریم جریان غوره کړئ، او د غوښتنلیک دننه جریان نوم DESTINATION_SQL_STREAM ډراپ-ډاون لیست کې:
د ټولو لاسوهنو پایله باید لاندې عکس ته ورته وي:
د SNS موضوع جوړول او ګډون کول
د ساده خبرتیا خدماتو ته لاړشئ او هلته د ایرلاین په نوم نوې موضوع جوړه کړئ:
دې موضوع ته ګډون وکړئ او د ګرځنده تلیفون شمیره په ګوته کړئ چې د SMS خبرتیاوې به لیږل کیږي:
په DynamoDB کې یو میز جوړ کړئ
د دوی د ایرلاین_ټیکټ جریان څخه د خام ډیټا ذخیره کولو لپاره ، موږ به په ورته نوم DynamoDB کې جدول جوړ کړو. موږ به ریکارډ_id د لومړني کیلي په توګه وکاروو:
د لامبډا فنکشن راټولونکی رامینځته کول
راځئ چې د کلکټر په نوم د لامبډا فنکشن رامینځته کړو ، چې دنده به یې د ایرلاین_ټیکټس جریان رایه اچونه وي او که چیرې نوي ریکارډونه وموندل شي ، دا ریکارډونه د DynamoDB جدول کې دننه کړئ. په ښکاره ډول ، د ډیفالټ حقونو سربیره ، دا لامبډا باید د کیینیسس ډیټا جریان ته لاسرسی لوستلی وي او ډینامو ډی بی ته لاسرسی لیکلی وي.
د راټولونکي لامبډا فنکشن لپاره د IAM رول رامینځته کول
لومړی، راځئ چې د Lambda-TicketsProcessingRole په نوم د لامبډا لپاره نوی IAM رول رامینځته کړو:
د ازموینې مثال لپاره، مخکې ترتیب شوي AmazonKinesisReadOnlyAccess او AmazonDynamoDBFullAccess پالیسۍ خورا مناسب دي، لکه څنګه چې په لاندې انځور کې ښودل شوي:
دا لامبډا باید د کینیسیس څخه د محرک په واسطه پیل شي کله چې نوي ننوتل ایرلاین_سټریم ته ننوځي ، نو موږ اړتیا لرو یو نوی محرک اضافه کړو:
ټول هغه څه چې پاتې دي د کوډ دننه کول او لامبډا خوندي کول دي.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
د لامبډا فنکشن نوټیفایر رامینځته کول
د دوهم لامبډا فنکشن ، کوم چې به دوهم جریان وڅاري (special_stream) او SNS ته خبرتیا واستوي ، په ورته ډول رامینځته شوی. له همدې امله، دا لامبډا باید د Kinesis څخه لوستلو ته لاسرسی ولري او د ورکړل شوي SNS موضوع ته پیغامونه واستوي، چې بیا به د SNS خدمت لخوا د دې موضوع ټولو پیرودونکو ته لیږل کیږي (بریښنالیک، SMS، او نور).
د IAM رول رامینځته کول
لومړی، موږ د دې لامبډا لپاره د IAM رول Lambda-KinesisAlarm جوړوو، او بیا دا رول د الارم_notifier لامبډا ته وټاکو:
دا لامبډا باید ځانګړي_سټریم ته د ننوتلو لپاره د نوي ریکارډونو لپاره په محرک باندې کار وکړي ، نو تاسو اړتیا لرئ محرک په ورته ډول تنظیم کړئ لکه څنګه چې موږ د کلکټر لیمبډا لپاره کړی.
د دې لپاره چې د دې لامبډا تنظیم کول اسانه کړي، راځئ چې یو نوی چاپیریال متغیر معرفي کړو - TOPIC_ARN، چیرته چې موږ د ایرلاین موضوع ANR (Amazon Recourse Names) ځای په ځای کوو:
او د لامبډا کوډ دننه کړئ ، دا په هیڅ ډول پیچلي ندي:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
داسې ښکاري چې دا هغه ځای دی چې د لارښود سیسټم ترتیب بشپړ شوی. ټول هغه څه چې پاتې دي د ازموینې او ډاډ ترلاسه کول دي چې موږ هرڅه په سمه توګه تنظیم کړي دي.
د Terraform کوډ څخه ځای په ځای کړئ
اړین تیاری
تاسو کولی شئ توزیع ډاونلوډ کړئ
څنګه پیل وکړو
د پروژې بشپړ کوډ دی
یو ښه تمرین دا دی چې د ټول زیربنا له مینځه وړلو دمخه د پلان قوماندې پرمخ بوځي ترڅو وګورو چې Terraform اوس مهال زموږ لپاره په بادل کې څه رامینځته کوي:
terraform.exe plan
تاسو ته به د خبرتیا لیږلو لپاره د تلیفون شمیره دننه کولو ته وهڅول شي. دا اړینه نده چې په دې مرحله کې داخل شي.
د برنامه د عملیاتو پلان تحلیل کولو سره ، موږ کولی شو د سرچینو رامینځته کول پیل کړو:
terraform.exe apply
د دې قوماندې لیږلو وروسته ، تاسو به بیا وغوښتل شي چې د تلیفون شمیره دننه کړئ؛ "هو" ډایل کړئ کله چې د واقعیا عملونو ترسره کولو په اړه پوښتنه ښودل کیږي. دا به تاسو ته اجازه درکړي چې ټول زیربناوې تنظیم کړئ، د EC2 ټول اړین ترتیبات ترسره کړئ، د لامبډا دندې ځای په ځای کړئ، او داسې نور.
وروسته له دې چې ټولې سرچینې په بریالیتوب سره د Terraform کوډ له لارې رامینځته شوې ، تاسو اړتیا لرئ د کینیسیس تحلیلي غوښتنلیک توضیحاتو ته لاړشئ (له بده مرغه ، ما ونه موندل چې دا مستقیم د کوډ څخه څنګه ترسره کړم).
اپلیکیشن پیل کړئ:
له دې وروسته، تاسو باید د ډراپ-ډاون لیست څخه په غوره کولو سره په ښکاره ډول د غوښتنلیک دننه جریان نوم تنظیم کړئ:
اوس هرڅه د تګ لپاره چمتو دي.
د غوښتنلیک ازموینه
پرته لدې چې تاسو سیسټم په لاسي ډول یا د Terraform کوډ له لارې ځای په ځای کړی ، دا به ورته کار وکړي.
موږ د SSH له لارې د EC2 مجازی ماشین ته ننوتل چیرې چې کینیسیس ایجنټ نصب شوی او api_caller.py سکریپټ چلوي
sudo ./api_caller.py TOKEN
ټول هغه څه چې تاسو یې باید وکړئ ستاسو شمیرې ته د SMS انتظار وکړئ:
SMS - ستاسو په تلیفون کې نږدې 1 دقیقو کې یو پیغام راځي:
دا د لیدلو لپاره پاتې ده چې ایا ریکارډونه د راتلونکو، نورو مفصلو تحلیلونو لپاره د DynamoDB ډیټابیس کې خوندي شوي. د هوایی_ټیکټونو جدول نږدې لاندې معلومات لري:
پایلې
د ترسره شوي کار په جریان کې، د آنلاین ډیټا پروسس کولو سیسټم د ایمیزون کینیسیس پراساس جوړ شو. د Kinesis ډیټا سټریمونو او د ریښتیني وخت تحلیلونو Kinesis Analytics سره په ګډه د کیینیسس ایجنټ کارولو اختیارونه د SQL کمانډونو په کارولو سره ، او همدارنګه د نورو AWS خدماتو سره د ایمیزون کینیسیس تعامل په پام کې نیول شوی.
موږ پورتني سیسټم په دوه لارو ځای په ځای کړی: یو پر ځای اوږد لارښود او یو ګړندی د Terraform کوډ څخه.
د پروژې ټولې سرچینې کوډ شتون لري
زه خوشحاله یم چې د مقالې په اړه بحث وکړم، زه ستاسو نظرونو ته سترګې په لار یم. د رغنده انتقاد په هیله.
بریالیتوب مو غواړم!
سرچینه: www.habr.com