ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

هی هابر!

آیا پرواز هواپیما را دوست دارید؟ من آن را دوست دارم، اما در طول انزوا، عاشق تجزیه و تحلیل داده های بلیط هواپیما از یک منبع شناخته شده - Aviasales نیز شدم.

امروز ما کار Amazon Kinesis را تجزیه و تحلیل خواهیم کرد، یک سیستم استریم با تجزیه و تحلیل زمان واقعی ایجاد می کنیم، پایگاه داده Amazon DynamoDB NoSQL را به عنوان ذخیره اصلی داده نصب می کنیم و اعلان های SMS را برای بلیط های جالب تنظیم می کنیم.

تمام جزئیات زیر برش است! برو!

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

معرفی

برای مثال، ما نیاز به دسترسی داریم Aviasales API. دسترسی به آن به صورت رایگان و بدون محدودیت ارائه می شود؛ فقط باید در بخش «توسعه دهندگان» ثبت نام کنید تا رمز API خود را برای دسترسی به داده ها دریافت کنید.

هدف اصلی این مقاله ارائه یک درک کلی از استفاده از جریان اطلاعات در AWS است؛ ما در نظر داریم که داده های بازگردانده شده توسط API مورد استفاده کاملاً به روز نیستند و از حافظه پنهان منتقل می شوند. بر اساس جستجوهای کاربران سایت های Aviasales.ru و Jetradar.com در 48 ساعت گذشته شکل گرفته است.

Kinesis-agent، نصب شده بر روی دستگاه تولید کننده، دریافت شده از طریق API، به طور خودکار داده ها را از طریق Kinesis Data Analytics تجزیه و به جریان مورد نظر منتقل می کند. نسخه خام این جریان مستقیماً در فروشگاه نوشته می شود. ذخیره‌سازی داده‌های خام مستقر در DynamoDB امکان تحلیل عمیق‌تر بلیط را از طریق ابزارهای BI، مانند AWS Quick Sight، فراهم می‌کند.

ما دو گزینه را برای استقرار کل زیرساخت در نظر خواهیم گرفت:

  • دستی - از طریق کنسول مدیریت AWS؛
  • زیرساخت کد Terraform برای اتوماتورهای تنبل است.

معماری سیستم توسعه یافته

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
اجزای مورد استفاده:

  • Aviasales API - داده های بازگردانده شده توسط این API برای تمام کارهای بعدی استفاده خواهد شد.
  • نمونه تولید کننده EC2 - یک ماشین مجازی معمولی در فضای ابری که جریان داده های ورودی روی آن تولید می شود:
    • عامل Kinesis یک برنامه جاوا است که به صورت محلی روی دستگاه نصب شده است که روشی آسان برای جمع آوری و ارسال داده ها به Kinesis (Kinesis Data Streams یا Kinesis Firehose) ارائه می دهد. عامل به طور مداوم مجموعه ای از فایل ها را در دایرکتوری های مشخص شده نظارت می کند و داده های جدید را به Kinesis ارسال می کند.
    • اسکریپت تماس گیرنده API - یک اسکریپت پایتون که درخواست هایی را به API می دهد و پاسخ را در پوشه ای قرار می دهد که توسط Kinesis Agent نظارت می شود.
  • جریان داده های Kinesis - سرویس جریان داده در زمان واقعی با قابلیت مقیاس بندی گسترده؛
  • تجزیه و تحلیل Kinesis یک سرویس بدون سرور است که تجزیه و تحلیل جریان داده ها را در زمان واقعی ساده می کند. Amazon Kinesis Data Analytics منابع برنامه را پیکربندی می کند و به طور خودکار مقیاس می شود تا هر حجمی از داده های دریافتی را مدیریت کند.
  • AWS لامبدا — سرویسی که به شما امکان می دهد کد را بدون پشتیبان گیری یا راه اندازی سرور اجرا کنید. تمام توان محاسباتی به طور خودکار برای هر تماس مقیاس می شود.
  • آمازون DynamoDB - پایگاه داده‌ای از جفت‌ها و اسناد کلید-مقدار که تأخیر کمتر از 10 میلی‌ثانیه را هنگام اجرا در هر مقیاسی ارائه می‌کند. هنگام استفاده از DynamoDB، نیازی به تهیه، وصله یا مدیریت هیچ سروری ندارید. DynamoDB به طور خودکار جداول را برای تنظیم مقدار منابع موجود و حفظ عملکرد بالا مقیاس می کند. نیازی به مدیریت سیستم نیست.
  • آمازون SNS - یک سرویس کاملا مدیریت شده برای ارسال پیام با استفاده از مدل ناشر-مشترک (Pub/Sub)، که با آن می توانید میکروسرویس ها، سیستم های توزیع شده و برنامه های بدون سرور را جدا کنید. SNS می تواند برای ارسال اطلاعات به کاربران نهایی از طریق اعلان های فشار تلفن همراه، پیام های SMS و ایمیل استفاده شود.

آموزش اولیه

برای تقلید از جریان داده، تصمیم گرفتم از اطلاعات بلیط هواپیما که توسط API Aviasales بازگردانده شده است استفاده کنم. که در مستندات لیست بسیار گسترده ای از روش های مختلف، بیایید یکی از آنها را در نظر بگیریم - "تقویم قیمت ماهانه"، که قیمت ها را برای هر روز از ماه، گروه بندی شده بر اساس تعداد نقل و انتقالات، برمی گرداند. اگر ماه جستجو را در درخواست مشخص نکنید، اطلاعات مربوط به ماه بعد از ماه جاری برگردانده می شود.

بنابراین، بیایید ثبت نام کنیم و توکن خود را دریافت کنیم.

یک نمونه درخواست در زیر آمده است:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

روش فوق برای دریافت داده از API با مشخص کردن یک توکن در درخواست جواب می دهد، اما من ترجیح می دهم رمز دسترسی را از هدر عبور دهم، بنابراین از این روش در اسکریپت api_caller.py استفاده خواهیم کرد.

نمونه پاسخ:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

نمونه پاسخ API بالا یک بلیط از سن پترزبورگ به فوک را نشان می دهد ... اوه ، چه رویایی ...
از آنجایی که من اهل کازان هستم و پوکت اکنون "فقط یک رویا" است، بیایید به دنبال بلیط از سن پترزبورگ به کازان باشیم.

فرض بر این است که شما قبلا یک حساب AWS دارید. من می خواهم بلافاصله توجه ویژه ای را به این واقعیت جلب کنم که Kinesis و ارسال اعلان ها از طریق پیام کوتاه در سالانه گنجانده نشده است. ردیف رایگان (استفاده رایگان). اما حتی با وجود این، با در نظر گرفتن چند دلار، ساخت سیستم پیشنهادی و بازی با آن کاملاً ممکن است. و البته فراموش نکنید که همه منابع را بعد از اینکه دیگر مورد نیاز نیستند حذف کنید.

خوشبختانه، اگر محدودیت های رایگان ماهانه خود را برآورده کنیم، توابع DynamoDb و لامبدا برای ما رایگان خواهند بود. به عنوان مثال، برای DynamoDB: 25 گیگابایت فضای ذخیره سازی، 25 WCU/RCU و 100 میلیون جستجو. و یک میلیون تماس تابع لامبدا در ماه.

استقرار دستی سیستم

راه اندازی Kinesis Data Streams

بیایید به سرویس Kinesis Data Streams برویم و دو جریان جدید، یک قطعه برای هر کدام ایجاد کنیم.

خرده چیست؟
یک خرده واحد اصلی انتقال داده یک جریان آمازون Kinesis است. یک سگمنت انتقال داده های ورودی را با سرعت 1 مگابایت بر ثانیه و انتقال داده های خروجی را با سرعت 2 مگابایت بر ثانیه فراهم می کند. یک بخش از 1000 ورودی PUT در ثانیه پشتیبانی می کند. هنگام ایجاد یک جریان داده، باید تعداد قطعات مورد نیاز را مشخص کنید. به عنوان مثال، می توانید یک جریان داده با دو بخش ایجاد کنید. این جریان داده انتقال داده ورودی را با سرعت 2 مگابایت بر ثانیه و انتقال داده خروجی را با سرعت 4 مگابایت بر ثانیه ارائه می دهد و حداکثر 2000 رکورد PUT در ثانیه را پشتیبانی می کند.

هرچه قطعات در جریان شما بیشتر باشد، توان عملیاتی آن بیشتر است. در اصل، به این صورت است که جریان ها با اضافه کردن خرده ها مقیاس بندی می شوند. اما هرچه خرده های شما بیشتر باشد، قیمت آن بالاتر است. هر خرده 1,5 سنت در ساعت و 1.4 سنت اضافی برای هر میلیون واحد محموله PUT هزینه دارد.

بیایید یک جریان جدید با نام ایجاد کنیم بلیط های هواپیمایی1 قطعه برای او کافی است:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
حالا بیایید تاپیک دیگری با نام ایجاد کنیم جریان_ ویژه:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

راه اندازی تولید کننده

برای تجزیه و تحلیل یک کار، کافی است از یک نمونه EC2 معمولی به عنوان تولید کننده داده استفاده کنید. لازم نیست که یک ماشین مجازی قدرتمند و گران قیمت باشد؛ یک نقطه t2.micro به خوبی انجام خواهد داد.

نکته مهم: به عنوان مثال، شما باید از image - Amazon Linux AMI 2018.03.0 استفاده کنید، تنظیمات کمتری برای راه اندازی سریع Kinesis Agent دارد.

به سرویس EC2 بروید، یک ماشین مجازی جدید ایجاد کنید، AMI مورد نظر را با نوع t2.micro که در Free Tier موجود است انتخاب کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
برای اینکه ماشین مجازی تازه ایجاد شده بتواند با سرویس Kinesis تعامل داشته باشد، باید حقوقی برای این کار به آن داده شود. بهترین راه برای انجام این کار، اختصاص یک نقش IAM است. بنابراین، در صفحه مرحله 3: پیکربندی جزئیات نمونه، باید انتخاب کنید نقش IAM جدید ایجاد کنید:

ایجاد نقش IAM برای EC2
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
در پنجره ای که باز می شود، انتخاب کنید که یک نقش جدید برای EC2 ایجاد می کنیم و به قسمت Permissions بروید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
با استفاده از مثال آموزشی، لازم نیست به تمام پیچیدگی‌های پیکربندی دانه‌بندی حقوق منابع برویم، بنابراین خط‌مشی‌های از پیش پیکربندی‌شده توسط آمازون را انتخاب می‌کنیم: AmazonKinesisFullAccess و CloudWatchFullAccess.

بیایید نام معناداری برای این نقش بگذاریم، به عنوان مثال: EC2-KinesisStreams-FullAccess. نتیجه باید همان چیزی باشد که در تصویر زیر نشان داده شده است:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
پس از ایجاد این نقش جدید، فراموش نکنید که آن را به نمونه ماشین مجازی ایجاد شده متصل کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ما هیچ چیز دیگری را در این صفحه تغییر نمی دهیم و به پنجره های بعدی می رویم.

تنظیمات هارد دیسک و همچنین تگ ها را می توان به عنوان پیش فرض رها کرد (اگرچه استفاده از برچسب ها تمرین خوبی است، حداقل برای نمونه یک نام بگذارید و محیط را مشخص کنید).

اکنون در مرحله 6: پیکربندی تب گروه امنیتی هستیم، جایی که باید یک مورد جدید ایجاد کنید یا گروه امنیتی موجود خود را مشخص کنید، که به شما امکان می دهد از طریق ssh (پورت 22) به نمونه متصل شوید. منبع -> IP من را در آنجا انتخاب کنید و می توانید نمونه را راه اندازی کنید.

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
به محض اینکه به وضعیت در حال اجرا تغییر کرد، می توانید سعی کنید از طریق ssh به آن متصل شوید.

برای اینکه بتوانید با Kinesis Agent کار کنید، پس از اتصال موفقیت آمیز به دستگاه، باید دستورات زیر را در ترمینال وارد کنید:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

بیایید یک پوشه برای ذخیره پاسخ های API ایجاد کنیم:

sudo mkdir /var/log/airline_tickets

قبل از راه اندازی عامل، باید پیکربندی آن را پیکربندی کنید:

sudo vim /etc/aws-kinesis/agent.json

محتویات فایل agent.json باید به شکل زیر باشد:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

همانطور که از فایل پیکربندی مشاهده می شود، عامل فایل های با پسوند log را در فهرست /var/log/airline_tickets/ نظارت می کند، آنها را تجزیه و به جریان airline_tickets منتقل می کند.

ما سرویس را مجددا راه اندازی می کنیم و مطمئن می شویم که آن را راه اندازی و اجرا می کنیم:

sudo service aws-kinesis-agent restart

حالا بیایید اسکریپت پایتون را دانلود کنیم که داده‌ها را از API درخواست می‌کند:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

اسکریپت api_caller.py داده ها را از Aviasales درخواست می کند و پاسخ دریافتی را در فهرستی که عامل Kinesis اسکن می کند ذخیره می کند. اجرای این اسکریپت کاملا استاندارد است، یک کلاس TicketsApi وجود دارد، به شما امکان می دهد API را به طور ناهمزمان بکشید. یک هدر را با یک نشانه ارسال می کنیم و پارامترها را به این کلاس می خواهیم:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

برای آزمایش تنظیمات و عملکرد صحیح عامل، بیایید اسکریپت api_caller.py را آزمایش کنیم:

sudo ./api_caller.py TOKEN

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
و ما به نتیجه کار در گزارش‌های Agent و در تب Monitoring در جریان داده‌های airline_tickets نگاه می‌کنیم:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
همانطور که می بینید، همه چیز کار می کند و Kinesis Agent با موفقیت داده ها را به جریان ارسال می کند. حالا بیایید مصرف کننده را پیکربندی کنیم.

راه اندازی تجزیه و تحلیل داده های Kinesis

بیایید به بخش مرکزی کل سیستم برویم - یک برنامه جدید در Kinesis Data Analytics با نام kinesis_analytics_airlines_app ایجاد کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
تجزیه و تحلیل داده های Kinesis به شما امکان می دهد با استفاده از زبان SQL تجزیه و تحلیل داده ها را در زمان واقعی از Kinesis Streams انجام دهید. این یک سرویس کاملا خودکار است (برخلاف Kinesis Streams) که:

  1. به شما اجازه می دهد تا بر اساس درخواست ها برای منبع داده، جریان های جدید (جریان خروجی) ایجاد کنید.
  2. جریانی را با خطاهایی ارائه می دهد که در حین اجرای برنامه ها رخ داده است (Error Stream).
  3. می تواند به طور خودکار طرح داده های ورودی را تعیین کند (در صورت لزوم می توان آن را به صورت دستی دوباره تعریف کرد).

این یک سرویس ارزان نیست - 0.11 دلار در هر ساعت کار، بنابراین باید با دقت از آن استفاده کنید و پس از پایان کار آن را حذف کنید.

بیایید برنامه را به منبع داده متصل کنیم:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
جریانی را که قرار است به آن متصل شویم (ایرلاین_ بلیط) انتخاب کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
در مرحله بعد، باید یک نقش IAM جدید پیوست کنید تا برنامه بتواند از جریان بخواند و در جریان بنویسد. برای انجام این کار، کافی است چیزی در بلوک دسترسی دسترسی تغییر ندهید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
اکنون بیایید درخواست کشف طرح داده در جریان را بدهیم؛ برای انجام این کار، روی دکمه «Discover schema» کلیک کنید. در نتیجه، نقش IAM به‌روزرسانی می‌شود (یک مورد جدید ایجاد می‌شود) و تشخیص طرح‌واره از داده‌هایی که قبلاً وارد جریان شده‌اند راه‌اندازی می‌شود:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
اکنون باید به ویرایشگر SQL بروید. وقتی روی این دکمه کلیک می کنید، پنجره ای ظاهر می شود که از شما می خواهد برنامه را راه اندازی کنید - آنچه را که می خواهید راه اندازی کنید انتخاب کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
کوئری ساده زیر را در پنجره ویرایشگر SQL وارد کنید و روی Save and Run SQL کلیک کنید:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

در پایگاه داده های رابطه ای، شما با جداول با استفاده از دستورات INSERT برای افزودن رکوردها و دستور SELECT به داده های پرس و جو کار می کنید. در Amazon Kinesis Data Analytics، شما با جریان‌ها (STREAM) و پمپ‌ها (PUMP) کار می‌کنید - درخواست‌های درج پیوسته که داده‌ها را از یک جریان در یک برنامه در یک جریان دیگر وارد می‌کنند.

درخواست SQL ارائه شده در بالا، بلیط های Aeroflot را با هزینه کمتر از پنج هزار روبل جستجو می کند. تمام رکوردهایی که این شرایط را دارند در جریان DESTINATION_SQL_STREAM قرار می گیرند.

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
در بلوک Destination، جریان special_stream و در لیست کشویی نام جریان درون برنامه DESTINATION_SQL_STREAM را انتخاب کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
نتیجه تمام دستکاری ها باید چیزی شبیه به تصویر زیر باشد:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

ایجاد و اشتراک در یک موضوع SNS

به سرویس اطلاع رسانی ساده بروید و یک موضوع جدید با نام خطوط هوایی ایجاد کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
در این موضوع مشترک شوید و شماره تلفن همراهی که اعلان های پیامکی به آن ارسال می شود را مشخص کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

یک جدول در DynamoDB ایجاد کنید

برای ذخیره داده های خام از جریان airline_tickets، بیایید جدولی با همین نام در DynamoDB ایجاد کنیم. ما از record_id به عنوان کلید اصلی استفاده خواهیم کرد:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

ایجاد یک کلکتور تابع لامبدا

بیایید یک تابع لامبدا به نام Collector ایجاد کنیم، که وظیفه آن نظرسنجی جریان بلیط هواپیما و اگر رکوردهای جدیدی در آنجا یافت شد، این رکوردها را در جدول DynamoDB وارد کنید. بدیهی است که علاوه بر حقوق پیش‌فرض، این لامبدا باید دسترسی خواندن به جریان داده‌های Kinesis و دسترسی نوشتن به DynamoDB داشته باشد.

ایجاد یک نقش IAM برای تابع لامبدا کلکتور
ابتدا، اجازه دهید یک نقش IAM جدید برای لامبدا به نام Lambda-TicketsProcessingRole ایجاد کنیم:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
برای مثال آزمایشی، سیاست‌های AmazonKinesisReadOnlyAccess و AmazonDynamoDBFullAccess از پیش پیکربندی شده کاملاً مناسب هستند، همانطور که در تصویر زیر نشان داده شده است:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

هنگامی که ورودی های جدید وارد airline_stream می شوند، این لامبدا باید توسط یک ماشه از Kinesis راه اندازی شود، بنابراین باید یک ماشه جدید اضافه کنیم:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
تنها چیزی که باقی می ماند این است که کد را وارد کنید و لامبدا را ذخیره کنید.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

ایجاد یک اعلان کننده تابع لامبدا

دومین تابع لامبدا، که جریان دوم (special_stream) را کنترل می کند و یک اعلان به SNS ارسال می کند، به روشی مشابه ایجاد شده است. بنابراین، این لامبدا باید دسترسی به خواندن از Kinesis و ارسال پیام به یک موضوع SNS داده شده داشته باشد، که سپس توسط سرویس SNS برای همه مشترکین این موضوع (ایمیل، پیامک و غیره) ارسال می شود.

ایجاد نقش IAM
ابتدا نقش IAM Lambda-KinesisAlarm را برای این lambda ایجاد می کنیم و سپس این نقش را به alarm_notifier lambda در حال ایجاد اختصاص می دهیم:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

این لامبدا باید روی یک ماشه کار کند تا رکوردهای جدید وارد special_stream شوند، بنابراین باید تریگر را به همان روشی که برای لامبدا جمع کننده انجام دادیم پیکربندی کنید.

برای آسان‌تر کردن پیکربندی این لامبدا، اجازه دهید یک متغیر محیطی جدید را معرفی کنیم - TOPIC_ARN، که در آن ANR (نام‌های منابع آمازون) موضوع خطوط هوایی را قرار می‌دهیم:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
و کد لامبدا را وارد کنید، اصلاً پیچیده نیست:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

به نظر می رسد اینجاست که پیکربندی دستی سیستم کامل شده است. تنها چیزی که باقی می ماند این است که آزمایش کنید و مطمئن شوید که همه چیز را به درستی پیکربندی کرده ایم.

استقرار از کد Terraform

آمادگی لازم

Terraform یک ابزار منبع باز بسیار مناسب برای استقرار زیرساخت از کد است. این سینتکس مخصوص به خود را دارد که یادگیری آن آسان است و نمونه های زیادی از نحوه و آنچه که باید به کار گرفته شود را دارد. ویرایشگر اتم یا کد ویژوال استودیو پلاگین های مفید زیادی دارد که کار با Terraform را آسان تر می کند.

می توانید توزیع را دانلود کنید از این رو. تحلیل دقیق تمامی قابلیت های Terraform از حوصله این مقاله خارج است، بنابراین به نکات اصلی اکتفا می کنیم.

چگونه اجرا شود

کد کامل پروژه می باشد در مخزن من. ما مخزن را برای خودمان شبیه سازی می کنیم. قبل از شروع، باید مطمئن شوید که AWS CLI را نصب و پیکربندی کرده اید، زیرا... Terraform به دنبال اعتبارنامه ها در فایل ~/.aws/credentials می گردد.

یک تمرین خوب این است که دستور plan را قبل از استقرار کل زیرساخت اجرا کنید تا ببینید Terraform در حال حاضر چه چیزی را برای ما در فضای ابری ایجاد می کند:

terraform.exe plan

از شما خواسته می شود یک شماره تلفن برای ارسال اعلان ها وارد کنید. ورود آن در این مرحله ضروری نیست.

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
پس از تجزیه و تحلیل برنامه عملیاتی برنامه، می توانیم شروع به ایجاد منابع کنیم:

terraform.exe apply

پس از ارسال این دستور، مجدداً از شما خواسته می شود که یک شماره تلفن وارد کنید؛ زمانی که سؤالی در مورد انجام واقعی اقدامات نمایش داده شد، "بله" را شماره گیری کنید. این به شما امکان می دهد کل زیرساخت را راه اندازی کنید، تمام تنظیمات لازم EC2 را انجام دهید، عملکردهای لامبدا و غیره را اجرا کنید.

پس از اینکه همه منابع با موفقیت از طریق کد Terraform ایجاد شدند، باید به جزئیات برنامه Kinesis Analytics بروید (متاسفانه من نحوه انجام این کار را مستقیماً از روی کد پیدا نکردم).

برنامه را اجرا کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
پس از این، شما باید به صراحت نام جریان درون برنامه را با انتخاب از لیست کشویی تنظیم کنید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
حالا همه چیز برای رفتن آماده است.

تست اپلیکیشن

صرف نظر از نحوه استقرار سیستم، به صورت دستی یا از طریق کد Terraform، به همان صورت عمل می کند.

ما از طریق SSH به ماشین مجازی EC2 که در آن Kinesis Agent نصب شده است وارد می شویم و اسکریپت api_caller.py را اجرا می کنیم.

sudo ./api_caller.py TOKEN

تنها کاری که باید انجام دهید این است که منتظر پیامک به شماره خود باشید:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
پیامک - یک پیام تقریباً در 1 دقیقه به تلفن می رسد:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور
باید دید آیا رکوردها در پایگاه داده DynamoDB برای تجزیه و تحلیل بعدی و دقیق تر ذخیره شده اند یا خیر. جدول بلیط_هواپیما تقریباً حاوی داده های زیر است:

ادغام Aviasales API با Amazon Kinesis و سادگی بدون سرور

نتیجه

در طول کار انجام شده، یک سیستم پردازش داده آنلاین بر اساس Amazon Kinesis ساخته شد. گزینه‌هایی برای استفاده از Kinesis Agent در ارتباط با Kinesis Data Streams و تجزیه و تحلیل زمان واقعی Kinesis Analytics با استفاده از دستورات SQL، و همچنین تعامل Amazon Kinesis با سایر سرویس‌های AWS در نظر گرفته شد.

ما سیستم فوق را به دو صورت مستقر کردیم: یک سیستم دستی نسبتا طولانی و یک سیستم سریع از کد Terraform.

تمام کد منبع پروژه در دسترس است در مخزن GitHub من، پیشنهاد می کنم با آن آشنا شوید.

من خوشحالم که در مورد مقاله بحث می کنم، منتظر نظرات شما هستم. امیدوارم انتقاد سازنده داشته باشم.

برای شما آرزوی موفقیت دارم!

منبع: www.habr.com

اضافه کردن نظر