د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

اې حبره!

ایا تاسو الوتنې الوتکې خوښوي؟ زه دا خوښوم ، مګر د ځان انزوا پرمهال زه د یوې پیژندل شوې سرچینې - Aviasales څخه د هوایی ټیکټونو ډیټا تحلیل کولو سره هم مینه لرم.

نن ورځ به موږ د ایمیزون کاینسیس کار تحلیل کړو ، د ریښتیني وخت تحلیلونو سره د سټیمینګ سیسټم رامینځته کړو ، د اصلي ډیټا ذخیره کولو په توګه د Amazon DynamoDB NoSQL ډیټابیس نصب کړو ، او په زړه پورې ټیکټونو لپاره د SMS خبرتیاوې تنظیم کړو.

ټول توضیحات د کټ لاندې دي! لاړ شه!

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

پېژندنه

د مثال په توګه، موږ ته اړتیا لرو Aviasales API. دې ته لاسرسی وړیا او پرته له محدودیتونو چمتو شوی؛ تاسو اړتیا لرئ یوازې د "پراختیا کونکو" برخې کې راجسټر شئ ترڅو ډیټا ته لاسرسي لپاره خپل API نښه ترلاسه کړئ.

د دې مقالې اصلي هدف په AWS کې د معلوماتو سټینګ کارولو عمومي پوهه ورکول دي؛ موږ په پام کې نیسو چې د API لخوا کارول شوي ډاټا په کلکه تازه ندي او د کیچ څخه لیږدول کیږي، کوم چې د تیرو 48 ساعتونو لپاره د Aviasales.ru او Jetradar.com سایټونو کاروونکو لخوا د لټونونو پراساس رامینځته شوی.

د Kinesis-Agent، د تولید په ماشین کې نصب شوی، د API له لارې ترلاسه شوی به په اوتومات ډول د Kinesis Data Analytics له لارې مطلوب جریان ته ډاټا پارس او لیږدوي. د دې جریان خام نسخه به مستقیم پلورنځي ته ولیکل شي. په DynamoDB کې ګمارل شوي خام ډیټا ذخیره به د BI وسیلو له لارې د ټیکټ ژور تحلیل ته اجازه ورکړي ، لکه د AWS Quick Sight.

موږ به د ټول زیربنا د ځای پرځای کولو لپاره دوه اختیارونه په پام کې ونیسو:

  • لارښود - د AWS مدیریت کنسول له لارې؛
  • د Terraform کوډ څخه زیربنا د سست اتوماتیکانو لپاره ده؛

د پرمختللي سیسټم جوړښت

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
کارول شوي اجزا:

  • Aviasales API - د دې API لخوا بیرته راستانه شوي معلومات به د ټولو راتلونکو کارونو لپاره وکارول شي؛
  • د EC2 تولیدونکي مثال - په بادل کې یو منظم مجازی ماشین چې په هغې کې به د ان پټ ډیټا جریان تولید شي:
    • د کینیسیس ایجنټ د جاوا اپلیکیشن دی چې په محلي ډول په ماشین کې نصب شوی چې کاینسیس ته د ډیټا راټولولو او لیږلو لپاره اسانه لار وړاندې کوي (د کاینسیس ډیټا سټریمونه یا کیینیسس فایر هوز). اجنټ په دوامداره توګه په ټاکل شوي لارښود کې د فایلونو سیټ نظارت کوي او کینیسیس ته نوي معلومات لیږي؛
    • د API کالر سکریپټ - د Python سکریپټ چې API ته غوښتنې کوي او ځواب په فولډر کې اچوي چې د کیینسیس اجنټ لخوا څارل کیږي؛
  • د Kinesis ډیټا جریانونه - د ریښتیني وخت ډیټا سټیمینګ خدمت د پراخه اندازه کولو وړتیاو سره؛
  • Kinesis Analytics یو بې سرور خدمت دی چې په ریښتیني وخت کې د سټیمینګ ډیټا تحلیل ساده کوي. د Amazon Kinesis Data Analytics د غوښتنلیک سرچینې تنظیموي او په اتوماتيک ډول د راتلونکو معلوماتو هر مقدار اداره کولو لپاره پیمانه کوي؛
  • AWS لامبده - یو خدمت چې تاسو ته اجازه درکوي د سرورونو بیک اپ یا تنظیم کولو پرته کوډ چل کړئ. د کمپیوټر ټول ځواک په اتوماتيک ډول د هر کال لپاره اندازه کیږي.
  • Amazon DynamoDB - د کلیدي ارزښت جوړو او اسنادو ډیټابیس چې په هره پیمانه چلولو پرمهال له 10 ملی ثانیو څخه لږ ځنډ چمتو کوي. کله چې د DynamoDB کاروئ، تاسو اړتیا نلرئ کوم سرور چمتو کړئ، پیچ یا اداره کړئ. DynamoDB په اوتومات ډول میزونه اندازه کوي ترڅو د شته سرچینو مقدار تنظیم کړي او لوړ فعالیت وساتي. د سیسټم مدیریت ته اړتیا نشته؛
  • ایمیزون SNS - د خپرونکي - پیرودونکي (Pub/Sub) ماډل په کارولو سره د پیغامونو لیږلو لپاره په بشپړ ډول اداره شوی خدمت ، د کوم سره چې تاسو کولی شئ مایکرو خدمتونه ، توزیع شوي سیسټمونه او بې سرور غوښتنلیکونه جلا کړئ. SNS د ګرځنده فشار خبرتیاو، SMS پیغامونو او بریښنالیکونو له لارې پای کاروونکو ته د معلوماتو لیږلو لپاره کارول کیدی شي.

لومړنۍ روزنه

د معلوماتو جریان تقلید کولو لپاره ، ما پریکړه وکړه چې د هوایی ډګر ټیکټ معلومات وکاروم چې د Aviasales API لخوا بیرته راستانه شوي. IN اسناد د مختلف میتودونو خورا پراخه لیست ، راځئ چې یو له دوی څخه واخلو - "د میاشتنۍ نرخ کیلنڈر" ، کوم چې د میاشتې د هرې ورځې نرخونه بیرته راګرځوي ، د لیږدونو شمیر سره ګروپ شوي. که تاسو په غوښتنه کې د لټون میاشت مشخص نه کړئ، معلومات به د روانې میاشتې وروسته بیرته راستانه شي.

نو، راځئ چې راجستر کړو او زموږ نښه ترلاسه کړو.

یوه بیلګه غوښتنه په لاندې ډول ده:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

په غوښتنه کې د نښه په ټاکلو سره د API څخه د معلوماتو ترلاسه کولو پورته میتود به کار وکړي ، مګر زه ترجیح ورکوم چې د لاسرسي نښه د سرلیک له لارې تیر کړم ، نو موږ به دا میتود په api_caller.py سکریپټ کې وکاروو.

ځواب بېلګه:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

پورته د API ځواب مثال د سینټ پیټرزبورګ څخه فوک ته ټیکټ ښیې ... اوه ، څه خوب ...
څرنګه چې زه د کازان څخه یم، او فوکیټ اوس "یوازې یو خوب" دی، راځئ چې د سینټ پیټرزبورګ څخه کازان ته ټکټونه وګورو.

دا فرض کوي چې تاسو دمخه د AWS حساب لرئ. زه غواړم سمدلاسه دې حقیقت ته ځانګړې پاملرنه راجلب کړم چې کینیسیس او د SMS له لارې د خبرتیاو لیږل په کلنۍ کې شامل ندي وړیا درجه (وړیا کارول). مګر حتی د دې سره سره، په ذهن کې د څو ډالرو سره، دا د وړاندیز شوي سیسټم رامینځته کول او د هغې سره لوبې کول خورا ممکن دي. او البته، مه هیروئ چې ټولې سرچینې حذف کړئ وروسته له دې چې دوی نور اړتیا نلري.

خوشبختانه، د DynamoDb او lambda فعالیتونه به زموږ لپاره وړیا وي که موږ خپل میاشتني وړیا محدودیتونه پوره کړو. د مثال په توګه، د DynamoDB لپاره: 25 GB ذخیره، 25 WCU/RCU او 100 ملیون پوښتنې. او په میاشت کې یو ملیون لامبډا فنکشن زنګونه.

د لاسي سیسټم پلي کول

د Kinesis ډیټا سټریمونو تنظیم کول

راځئ چې د کیینیسس ډیټا سټریمونو خدمت ته لاړ شو او دوه نوي جریانونه رامینځته کړو ، د هر یو لپاره یو شارډ.

شارډ څه شی دی؟
A شارډ د ایمیزون کینیسیس جریان د معلوماتو لیږد لومړني واحد دی. یوه برخه د 1 MB/s په سرعت سره د ان پټ ډیټا لیږد چمتو کوي او د 2 MB / s په سرعت سره د معلوماتو ډیټا لیږد چمتو کوي. یوه برخه په هره ثانیه کې تر 1000 PUT ننوتلو ملاتړ کوي. کله چې د ډیټا جریان رامینځته کړئ ، تاسو اړتیا لرئ د اړین شمیر برخو مشخص کړئ. د مثال په توګه، تاسو کولی شئ د دوو برخو سره د معلوماتو جریان جوړ کړئ. دا ډیټا جریان به په 2 MB / s کې د ان پټ ډیټا لیږد چمتو کړي او په 4 MB / s کې د محصول ډیټا لیږد چمتو کړي ، په هر ثانیه کې تر 2000 PUT ریکارډونو ملاتړ کوي.

هرڅومره چې ستاسو په جریان کې شارډونه ډیر وي ، هومره د هغې ټروپټ ډیریږي. په اصل کې، دا څنګه جریان اندازه کیږي - د شارډونو په اضافه کولو سره. مګر څومره چې تاسو ډیر شارډونه لرئ، قیمت به لوړ وي. هر شارډ په ساعت کې 1,5 سینټ او د هر ملیون PUT تادیاتو واحدونو لپاره اضافي 1.4 سینټ لګښت لري.

راځئ چې د نوم سره یو نوی جریان جوړ کړو هوايي_ټکيټونهد هغه لپاره به 1 شارډ کافی وي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
اوس راځئ چې د نوم سره بل تار جوړ کړو ځانګړی_سړی:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

د تولید کونکي تنظیم کول

د یوې دندې تحلیل کولو لپاره ، دا کافي دي چې د ډیټا تولید کونکي په توګه د منظم EC2 مثال وکاروئ. دا باید یو پیاوړی، ګران مجازی ماشین نه وي؛ یو ځای t2.micرو به ښه کار وکړي.

مهم یادونه: د مثال په توګه ، تاسو باید عکس وکاروئ - د ایمیزون لینکس AMI 2018.03.0 ، دا د کاینسیس ایجنټ ګړندي پیل کولو لپاره لږ تنظیمات لري.

د EC2 خدمت ته لاړ شئ، یو نوی مجازی ماشین جوړ کړئ، مطلوب AMI د t2.micro ډول سره وټاکئ، کوم چې په وړیا ټیر کې شامل دی:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د دې لپاره چې نوي رامینځته شوي مجازی ماشین د کینیسیس خدمت سره متقابل عمل کولو وړ وي ، دا باید د دې کولو حق ورکړل شي. د دې کولو غوره لاره د IAM رول ټاکل دي. له همدې امله ، په 3 مرحله کې: د مثال توضیحاتو سکرین تنظیم کړئ ، تاسو باید غوره کړئ د IAM نوی رول جوړ کړئ:

د EC2 لپاره د IAM رول رامینځته کول
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
په هغه کړکۍ کې چې خلاصیږي، غوره کړئ چې موږ د EC2 لپاره نوی رول رامینځته کوو او د اجازې برخې ته لاړ شئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د روزنې مثال په کارولو سره ، موږ اړتیا نلرو د سرچینو حقونو دانه تنظیم کولو ټولو پیچلتیاو ته لاړ شو ، نو موږ به د ایمیزون لخوا دمخه تنظیم شوي پالیسۍ غوره کړو: AmazonKinesisFullAccess او CloudWatchFullAccess.

راځئ چې د دې رول لپاره یو څه معنی لرونکي نوم ورکړو، د بیلګې په توګه: EC2-KinesisStreams-FullAccess. پایله باید ورته وي لکه څنګه چې لاندې عکس کې ښودل شوي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د دې نوي رول رامینځته کولو وروسته ، مه هیروئ چې دا د رامینځته شوي مجازی ماشین مثال سره ضمیمه کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
موږ په دې سکرین کې بل څه نه بدلوو او راتلونکي کړکۍ ته ځو.

د هارډ ډرایو تنظیمات د ډیفالټ په توګه پاتې کیدی شي ، په بیله بیا ټاګونه (که څه هم د ټاګونو کارول ښه عمل دی ، لږترلږه مثال ته نوم ورکړئ او چاپیریال په ګوته کړئ).

اوس موږ په 6 مرحله کې یو: د امنیت ګروپ ټب تنظیم کړئ ، چیرې چې تاسو اړتیا لرئ یو نوی رامینځته کړئ یا خپل موجود امنیت ګروپ مشخص کړئ ، کوم چې تاسو ته اجازه درکوي د مثال سره د ssh (پورټ 22) له لارې وصل شئ. سرچینه وټاکئ -> زما IP هلته او تاسو کولی شئ مثال پیل کړئ.

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
هرڅومره ژر چې دا د چلولو حالت ته واړوي ، تاسو کولی شئ د ssh له لارې دې سره وصل کیدو هڅه وکړئ.

د کینیسیس اجنټ سره د کار کولو وړ کیدو لپاره ، د ماشین سره په بریالیتوب سره وصل کیدو وروسته ، تاسو باید لاندې کمانډونه په ترمینل کې دننه کړئ:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

راځئ چې د API ځوابونو خوندي کولو لپاره فولډر جوړ کړو:

sudo mkdir /var/log/airline_tickets

د ایجنټ پیل کولو دمخه، تاسو اړتیا لرئ چې د هغې ترتیب ترتیب کړئ:

sudo vim /etc/aws-kinesis/agent.json

د agent.json فایل منځپانګې باید داسې ښکاري:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

لکه څنګه چې د ترتیب کولو فایل څخه لیدل کیدی شي، اجنټ به فایلونه په /var/log/airline_tickets/ ډایرکټر کې د .log توسیع سره وڅاري، دوی یې پارس کړي او د airline_tickets جریان ته یې لیږدوي.

موږ خدمت بیا پیل کوو او ډاډ ترلاسه کوو چې دا پورته او روان دی:

sudo service aws-kinesis-agent restart

اوس راځئ چې د Python سکریپټ ډاونلوډ کړو چې د API څخه به د معلوماتو غوښتنه وکړي:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py سکریپټ د Aviasales څخه د معلوماتو غوښتنه کوي او ترلاسه شوي ځواب په ډایرکټر کې خوندي کوي چې د کینیسیس اجنټ سکین کوي. د دې سکریپټ پلي کول خورا معیاري دي ، د TicketsApi ټولګي شتون لري ، دا تاسو ته اجازه درکوي په غیر متزلزل ډول API راوباسي. موږ یو سرلیک د نښه سره لیږدوو او دې ټولګي ته د پیرامیټونو غوښتنه کوو:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

د اجنټ د سم تنظیماتو او فعالیت ازموینې لپاره ، راځئ چې د api_caller.py سکریپټ چلولو ازموینه وکړو:

sudo ./api_caller.py TOKEN

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
او موږ د کار پایلې د اجنټ لاګونو کې او د هوایی_ټیکټس ډیټا جریان کې د څارنې ټب کې ګورو:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
لکه څنګه چې تاسو لیدلی شئ، هرڅه کار کوي او د کیینیسس ایجنټ په بریالیتوب سره جریان ته ډاټا لیږي. اوس راځئ چې مصرف کونکي تنظیم کړو.

د کیینیسس ډیټا تحلیل تنظیم کول

راځئ چې د ټول سیسټم مرکزي برخې ته لاړ شو - د Kinesis_analytics_airlines_app په نوم د Kinesis Data Analytics کې یو نوی غوښتنلیک جوړ کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د Kinesis ډیټا تحلیل تاسو ته اجازه درکوي د ریښتیني وخت ډیټا تحلیلونه د کیینیسس سټریمونو څخه د SQL ژبې په کارولو سره ترسره کړئ. دا په بشپړ ډول د اتوماتیک کولو خدمت دی (د کیینیسس جریانونو برعکس) چې:

  1. تاسو ته اجازه درکوي نوي جریانونه رامینځته کړئ (آؤټ پټ سټریم) د سرچینې ډیټا غوښتنې پراساس؛
  2. د تیروتنو سره یو جریان چمتو کوي چې د غوښتنلیکونو د چلولو پرمهال پیښ شوي (د تېروتنې جریان)؛
  3. کولی شي په اتوماتيک ډول د ان پټ ډیټا سکیم وټاکي (دا د اړتیا په صورت کې په لاسي ډول بیا تعریف کیدی شي).

دا یو ارزانه خدمت ندی - د کار په هر ساعت کې 0.11 USD، نو تاسو باید دا په احتیاط سره وکاروئ او کله چې تاسو پای ته ورسیږئ حذف کړئ.

راځئ چې غوښتنلیک د معلوماتو سرچینې سره وصل کړو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
هغه جریان غوره کړئ چې موږ ورسره وصل یو (ایرلاین_ټیکټونه):

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
بیا، تاسو اړتیا لرئ یو نوی IAM رول ضمیمه کړئ ترڅو غوښتنلیک د جریان څخه لوستل شي او جریان ته ولیکئ. د دې کولو لپاره، دا بسیا ده چې د لاسرسي اجازه بلاک کې هیڅ شی بدل نه کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
اوس راځئ چې په جریان کې د ډیټا سکیما کشف غوښتنه وکړو؛ د دې کولو لپاره ، د "سکیما کشف" تڼۍ کلیک وکړئ. د پایلې په توګه ، د IAM رول به تازه شي (یو نوی به رامینځته شي) او د سکیما کشف به د هغه ډیټا څخه پیل شي چې دمخه جریان ته رسیدلي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
اوس تاسو اړتیا لرئ د SQL مدیر ته لاړ شئ. کله چې تاسو په دې تڼۍ کلیک وکړئ، یوه کړکۍ به راښکاره شي چې تاسو څخه غوښتنه کوي چې اپلیکیشن پیل کړئ - هغه څه غوره کړئ چې تاسو یې پیل کول غواړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
لاندې ساده پوښتنه د SQL مدیر کړکۍ کې دننه کړئ او د SQL خوندي کولو او چلولو کلیک وکړئ:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

په اړونده ډیټابیسونو کې، تاسو د جدولونو سره کار کوئ د INSERT بیاناتو په کارولو سره د ریکارډونو اضافه کولو لپاره او د پوښتنو ډیټا لپاره د SELECT بیان. د ایمیزون کینیسیس ډیټا انلایټیکونو کې ، تاسو د جریانونو (سټریمونو) او پمپونو (PUMPs) سره کار کوئ - د دوامداره داخلولو غوښتنې چې په غوښتنلیک کې له یو جریان څخه بل جریان ته ډیټا داخلوي.

د SQL پوښتنې پورته وړاندې شوي د ایروفلوټ ټیکټونو لټون د پنځه زره روبلو څخه ښکته لګښت کې. ټول ریکارډونه چې دا شرایط پوره کوي په DESTINATION_SQL_STREAM جریان کې به ځای په ځای شي.

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د منزل بلاک کې، د ځانګړي_سټریم جریان غوره کړئ، او د غوښتنلیک دننه جریان نوم DESTINATION_SQL_STREAM ډراپ-ډاون لیست کې:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ټولو لاسوهنو پایله باید لاندې عکس ته ورته وي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

د SNS موضوع جوړول او ګډون کول

د ساده خبرتیا خدماتو ته لاړشئ او هلته د ایرلاین په نوم نوې موضوع جوړه کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
دې موضوع ته ګډون وکړئ او د ګرځنده تلیفون شمیره په ګوته کړئ چې د SMS خبرتیاوې به لیږل کیږي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

په DynamoDB کې یو میز جوړ کړئ

د دوی د ایرلاین_ټیکټ جریان څخه د خام ډیټا ذخیره کولو لپاره ، موږ به په ورته نوم DynamoDB کې جدول جوړ کړو. موږ به ریکارډ_id د لومړني کیلي په توګه وکاروو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

د لامبډا فنکشن راټولونکی رامینځته کول

راځئ چې د کلکټر په نوم د لامبډا فنکشن رامینځته کړو ، چې دنده به یې د ایرلاین_ټیکټس جریان رایه اچونه وي او که چیرې نوي ریکارډونه وموندل شي ، دا ریکارډونه د DynamoDB جدول کې دننه کړئ. په ښکاره ډول ، د ډیفالټ حقونو سربیره ، دا لامبډا باید د کیینیسس ډیټا جریان ته لاسرسی لوستلی وي او ډینامو ډی بی ته لاسرسی لیکلی وي.

د راټولونکي لامبډا فنکشن لپاره د IAM رول رامینځته کول
لومړی، راځئ چې د Lambda-TicketsProcessingRole په نوم د لامبډا لپاره نوی IAM رول رامینځته کړو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ازموینې مثال لپاره، مخکې ترتیب شوي AmazonKinesisReadOnlyAccess او AmazonDynamoDBFullAccess پالیسۍ خورا مناسب دي، لکه څنګه چې په لاندې انځور کې ښودل شوي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

دا لامبډا باید د کینیسیس څخه د محرک په واسطه پیل شي کله چې نوي ننوتل ایرلاین_سټریم ته ننوځي ، نو موږ اړتیا لرو یو نوی محرک اضافه کړو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
ټول هغه څه چې پاتې دي د کوډ دننه کول او لامبډا خوندي کول دي.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

د لامبډا فنکشن نوټیفایر رامینځته کول

د دوهم لامبډا فنکشن ، کوم چې به دوهم جریان وڅاري (special_stream) او SNS ته خبرتیا واستوي ، په ورته ډول رامینځته شوی. له همدې امله، دا لامبډا باید د Kinesis څخه لوستلو ته لاسرسی ولري او د ورکړل شوي SNS موضوع ته پیغامونه واستوي، چې بیا به د SNS خدمت لخوا د دې موضوع ټولو پیرودونکو ته لیږل کیږي (بریښنالیک، SMS، او نور).

د IAM رول رامینځته کول
لومړی، موږ د دې لامبډا لپاره د IAM رول Lambda-KinesisAlarm جوړوو، او بیا دا رول د الارم_notifier لامبډا ته وټاکو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

دا لامبډا باید ځانګړي_سټریم ته د ننوتلو لپاره د نوي ریکارډونو لپاره په محرک باندې کار وکړي ، نو تاسو اړتیا لرئ محرک په ورته ډول تنظیم کړئ لکه څنګه چې موږ د کلکټر لیمبډا لپاره کړی.

د دې لپاره چې د دې لامبډا تنظیم کول اسانه کړي، راځئ چې یو نوی چاپیریال متغیر معرفي کړو - TOPIC_ARN، چیرته چې موږ د ایرلاین موضوع ANR (Amazon Recourse Names) ځای په ځای کوو:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
او د لامبډا کوډ دننه کړئ ، دا په هیڅ ډول پیچلي ندي:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

داسې ښکاري چې دا هغه ځای دی چې د لارښود سیسټم ترتیب بشپړ شوی. ټول هغه څه چې پاتې دي د ازموینې او ډاډ ترلاسه کول دي چې موږ هرڅه په سمه توګه تنظیم کړي دي.

د Terraform کوډ څخه ځای په ځای کړئ

اړین تیاری

تیرافیف د کوډ څخه د زیربنا پلي کولو لپاره خورا اسانه خلاصې سرچینې وسیله ده. دا خپل ترکیب لري چې زده کول یې اسانه دي او ډیری مثالونه لري چې څنګه او څه ځای په ځای کړي. د اتوم ایډیټر یا د بصری سټوډیو کوډ ډیری لاسي پلگ انونه لري چې د Terraform سره کار کول اسانه کوي.

تاسو کولی شئ توزیع ډاونلوډ کړئ له دې ځایه. د ټولو Terraform وړتیاو تفصيلي تحلیل د دې مقالې له دائرې څخه بهر دی، نو موږ به خپل ځان اصلي ټکو ته محدود کړو.

څنګه پیل وکړو

د پروژې بشپړ کوډ دی زما په ذخیره کې. موږ خپل ځان ته ذخیره کلون کوو. د پیل کولو دمخه، تاسو اړتیا لرئ ډاډ ترلاسه کړئ چې تاسو AWS CLI نصب او تنظیم کړی، ځکه چې ... Terraform به د ~/.aws/credentials فایل کې اسناد وګوري.

یو ښه تمرین دا دی چې د ټول زیربنا له مینځه وړلو دمخه د پلان قوماندې پرمخ بوځي ترڅو وګورو چې Terraform اوس مهال زموږ لپاره په بادل کې څه رامینځته کوي:

terraform.exe plan

تاسو ته به د خبرتیا لیږلو لپاره د تلیفون شمیره دننه کولو ته وهڅول شي. دا اړینه نده چې په دې مرحله کې داخل شي.

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د برنامه د عملیاتو پلان تحلیل کولو سره ، موږ کولی شو د سرچینو رامینځته کول پیل کړو:

terraform.exe apply

د دې قوماندې لیږلو وروسته ، تاسو به بیا وغوښتل شي چې د تلیفون شمیره دننه کړئ؛ "هو" ډایل کړئ کله چې د واقعیا عملونو ترسره کولو په اړه پوښتنه ښودل کیږي. دا به تاسو ته اجازه درکړي چې ټول زیربناوې تنظیم کړئ، د EC2 ټول اړین ترتیبات ترسره کړئ، د لامبډا دندې ځای په ځای کړئ، او داسې نور.

وروسته له دې چې ټولې سرچینې په بریالیتوب سره د Terraform کوډ له لارې رامینځته شوې ، تاسو اړتیا لرئ د کینیسیس تحلیلي غوښتنلیک توضیحاتو ته لاړشئ (له بده مرغه ، ما ونه موندل چې دا مستقیم د کوډ څخه څنګه ترسره کړم).

اپلیکیشن پیل کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
له دې وروسته، تاسو باید د ډراپ-ډاون لیست څخه په غوره کولو سره په ښکاره ډول د غوښتنلیک دننه جریان نوم تنظیم کړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
اوس هرڅه د تګ لپاره چمتو دي.

د غوښتنلیک ازموینه

پرته لدې چې تاسو سیسټم په لاسي ډول یا د Terraform کوډ له لارې ځای په ځای کړی ، دا به ورته کار وکړي.

موږ د SSH له لارې د EC2 مجازی ماشین ته ننوتل چیرې چې کینیسیس ایجنټ نصب شوی او api_caller.py سکریپټ چلوي

sudo ./api_caller.py TOKEN

ټول هغه څه چې تاسو یې باید وکړئ ستاسو شمیرې ته د SMS انتظار وکړئ:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
SMS - ستاسو په تلیفون کې نږدې 1 دقیقو کې یو پیغام راځي:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام
دا د لیدلو لپاره پاتې ده چې ایا ریکارډونه د راتلونکو، نورو مفصلو تحلیلونو لپاره د DynamoDB ډیټابیس کې خوندي شوي. د هوایی_ټیکټونو جدول نږدې لاندې معلومات لري:

د ایمیزون کینیسیس او بې سرور سادګۍ سره د Aviasales API ادغام

پایلې

د ترسره شوي کار په جریان کې، د آنلاین ډیټا پروسس کولو سیسټم د ایمیزون کینیسیس پراساس جوړ شو. د Kinesis ډیټا سټریمونو او د ریښتیني وخت تحلیلونو Kinesis Analytics سره په ګډه د کیینیسس ایجنټ کارولو اختیارونه د SQL کمانډونو په کارولو سره ، او همدارنګه د نورو AWS خدماتو سره د ایمیزون کینیسیس تعامل په پام کې نیول شوی.

موږ پورتني سیسټم په دوه لارو ځای په ځای کړی: یو پر ځای اوږد لارښود او یو ګړندی د Terraform کوډ څخه.

د پروژې ټولې سرچینې کوډ شتون لري زما د GitHub ذخیره کې، زه وړاندیز کوم چې تاسو له دې سره ځان وپیژنئ.

زه خوشحاله یم چې د مقالې په اړه بحث وکړم، زه ستاسو نظرونو ته سترګې په لار یم. د رغنده انتقاد په هیله.

بریالیتوب مو غواړم!

سرچینه: www.habr.com

Add a comment