Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Привіт, Хабре!

А ви любите літати літаками? Я обожнюю, але на самоізоляції полюбив ще й аналізувати дані про авіаквитки одного відомого ресурсу - Aviasales.

Сьогодні ми розберемо роботу Amazon Kinesis, побудуємо стримінгову систему з реал-тайм аналітикою, поставимо NoSQL базу даних Amazon DynamoDB як основне сховище даних та налаштуємо сповіщення через SMS за цікавими квитками.

Усі подробиці під катом! Поїхали!

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Запровадження

Для прикладу нам потрібний доступ до API Aviasales. Доступ до нього надається безкоштовно і без обмежень, необхідно лише зареєструватися в розділі «Розробникам», щоб отримати свій API токен для доступу до даних.

Основна мета цієї статті - дати загальне розуміння використання потокової передачі інформації в AWS, ми виносимо за дужки, що дані, що повертаються використовуваним API не є строго актуальними і передаються з кешу, який формується на основі пошуків користувачів сайтів Aviasales.ru та Jetradar.com останні 48 годин.

Отримані через API дані про авіаквитки Kinesis-agent, встановлений на машині-продюсері, автоматично парсить і передаватиме в потрібний потік через Kinesis Data Analytics. Необроблена версія цього потоку писатиметься безпосередньо у сховищі. Розгорнуте в DynamoDB сховище «сирих» даних дозволить проводити більш глибокий аналіз квитків через BI інструменти, наприклад, AWS Quick Sight.

Ми розглянемо два варіанти деплою всієї інфраструктури:

  • Ручний через AWS Management Console;
  • Інфраструктура з коду Terraform - для лінивих автоматизаторів;

Архітектура системи, що розробляється

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Використовувані компоненти:

  • Aviasales API — дані, що повертаються цим API, будуть використовуватися для подальшої роботи;
  • EC2 Producer Instance - звичайна віртуальна машина в хмарі, на якій буде генеруватися вхідний потік даних:
    • Kinesis Agent — це Java-додаток, що встановлюється локально на машину, який надає простий спосіб збирання та відправлення даних у Kinesis (Kinesis Data Streams або Kinesis Firehose). Агент постійно відстежує набір файлів у зазначених директоріях та надсилає нові дані до Kinesis;
    • Скрипт API Caller - Python-скрипт, що робить запити до API і складає відповідь у папку, яку моніторить Kinesis Agent;
  • Потоки даних Kinesis - сервіс потокової передачі в режимі реального часу з широкими можливостями масштабування;
  • Kinesis Analytics - Безсерверний сервіс, що спрощує аналіз потокових даних у режимі реального часу. Amazon Kinesis Data Analytics налаштовує ресурси для роботи програм і автоматично масштабується для обробки будь-яких обсягів вхідних даних;
  • AWS Lambda — сервіс, що дозволяє запускати код без резервування та налаштування серверів. Усі обчислювальні потужності автоматично масштабуються під кожний виклик;
  • Amazon DynamoDB — база даних пар «ключ-значення» та документів, що забезпечує затримку менше 10 мілісекунд під час роботи в будь-якому масштабі. При використанні DynamoDB не потрібно розподіляти сервери, встановлювати на них виправлення або керувати ними. DynamoDB автоматично масштабує таблиці, коригуючи обсяг доступних ресурсів та зберігаючи високу продуктивність. Жодні дії з адміністрування системи не потрібні;
  • Amazon SNS — повністю керований сервіс надсилання повідомлень за моделлю «видавець — передплатник» (Pub/Sub), за допомогою якого можна ізолювати мікросервіси, розподілені системи та безсерверні програми. SNS можна використовувати для розсилки інформації кінцевим користувачам за допомогою мобільних push-повідомлень, SMS-повідомлень та електронних листів.

Початкова підготовка

Для емуляції потоку даних я вирішив використовувати інформацію про авіаквитки, що повертається API Aviasales. У документації досить великий список різних методів, візьмемо один із них — «Календар цін на місяць», який повертає ціни за кожен день місяця, згруповані за кількістю пересадок. Якщо не передавати в запиті місяць пошуку, то буде повернуто інформацію за місяць, що настає за поточним.

Отже, реєструємось, отримуємо свій токен.

Приклад запиту нижче:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Вищеописаний спосіб отримання даних від API із зазначенням токена в запиті працюватиме, але мені більше подобається передавати токен доступу через заголовок, тому в скрипті api_caller.py будемо користуватися саме цим способом.

Приклад відповіді:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

У прикладі відповіді API вище показаний квиток із Санкт-Петербурга до Пхука… Ех, та що мріяти…
Так як я з Казані, а Пхукет зараз нам тільки сниться, пошукаємо квитки з Санкт-Петербурга в Казань.

Передбачається, що у вас вже є обліковий запис в AWS. Відразу хочу звернути особливу увагу, що Kinesis та відправлення повідомлень через SMS не входять до річного Free Tier (безкоштовне використання). Але навіть незважаючи на це, заклавши в думці пару доларів, цілком можна побудувати запропоновану систему і пограти з нею. І, звичайно, не варто забувати видаляти всі ресурси після того, як вони стали не потрібні.

На щастя, DynamoDb та лямбда-функції будуть для нас умовно безкоштовними, якщо вкластися у місячні безкоштовні ліміти. Наприклад, для DynamoDB: 25 Гб сховища, 25 WCU/RCU та 100 млн. запитів. І мільйон викликів лямбда функцій на місяць.

Ручний деплой системи

Налаштування Kinesis Data Streams

Перейдемо в сервіс Kinesis Data Streams і створюємо два нових потоки по одному шарду на кожен.

Що таке шард?
Шард – це основна одиниця передачі даних потоку Amazon Kinesis. Один сегмент забезпечує передачу вхідних даних зі швидкістю 1 МБ/с та передачу вихідних даних зі швидкістю 2 МБ/с. Один сегмент підтримує до 1000 записів PUT за секунду. Під час створення потоку даних потрібно вказати необхідну кількість сегментів. Наприклад, можна створити потік даних із двома сегментами. Цей потік даних забезпечить передачу вхідних даних зі швидкістю 2 МБ/с та передачу вихідних даних зі швидкістю 4 МБ/с із підтримкою до 2000 записів PUT на секунду.

Чим більше шардів у вашому потоці — тим більша його пропускна спроможність. В принципі, так і масштабуються потоки шляхом додавання шардів. Але чим більше у вас шардів, тим вища і ціна. Кожен шард коштує 1,5 цента на годину та додатково 1.4 цента за кожні мільйон операцій додавання до потоку (PUT payload units).

Створимо новий потік з ім'ям airline_tickets, йому цілком достатньо буде 1 шарда:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Тепер створимо ще один потік із ім'ям special_stream:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Налаштування продюсера

Як продюсер даних для аналізу завдання досить використовувати звичайний EC2 інстанс. Це не має бути потужна дорога віртуальна машина, цілком підійде спотовий t2.micro.

Важливо: Для прикладу слід використовувати image — Amazon Linux AMI 2018.03.0, з ним менше налаштувань для швидкого запуску Kinesis Agent.

Переходимо на обслуговування EC2, створюємо нову віртуальну машину, вибираємо необхідний AMI з типом t2.micro, що входить у Free Tier:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Для того щоб новостворена віртуальна машина змогла взаємодіяти з сервісом Kinesis, необхідно дати їй на це права. Найкращий спосіб це зробити – призначити IAM Role. Тому на екрані Step 3: Configure Instance Details слід вибрати Create new IAM Role:

Створення IAM ролі для EC2
Інтеграція Aviasales API з Amazon Kinesis та простота serverless
У вікні, вибираємо, що нову роль створюємо для EC2 і переходимо в розділ Permissions:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
На навчальному прикладі можна не вдаватися в усі тонкощі гранулярного налаштування прав на ресурси, тому виберемо поліси, що налаштовані Амазоном: AmazonKinesisFullAccess і CloudWatchFullAccess.

Дамо якесь осмислене ім'я для цієї ролі, наприклад: EC2-KinesisStreams-FullAccess. В результаті, має вийти те саме, що вказано на малюнку нижче:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Після створення цієї нової ролі, не забуваємо причепити її до створюваного інстансу віртуальної машини:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Більше на цьому екрані нічого не змінюємо та переходимо до наступних вікон.

Параметри жорсткого диска можна залишити за замовчуванням, теги теж (хоча, гарною практикою є теги використовувати, хоча б давати ім'я інстансу та вказувати енвайронмент).

Тепер ми на закладці Step 6: Configure Security Group, де необхідно створити новий або вказати Security group, що має у вас, що дозволяє робити коннект через ssh (порт 22) на інстанс. Виберіть там Source -> My IP і можете запускати інстанс.

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Як тільки він перейде в статус running, можна намагатися законнектитися на нього через ssh.

Щоб отримати можливість роботи з Kinesis Agent після успішного коннекту до машини, необхідно ввести наступні команди в терміналі:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Створимо папку для збереження відповідей API:

sudo mkdir /var/log/airline_tickets

Перед запуском агента необхідно налаштувати його конфіг:

sudo vim /etc/aws-kinesis/agent.json

Зміст файлу agent.json повинен мати такий вигляд:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Як видно з файлу конфігурації, агент буде моніторити в директорії /var/log/airline_tickets/ файли з розширенням .log, парс їх і передавати в потік airline_tickets.

Перезапускаємо сервіс і переконуємося, що він запустився та працює:

sudo service aws-kinesis-agent restart

Тепер скачаємо Python-скрипт, який запитуватиме дані у API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Скрипт api_caller.py запитує дані у Aviasales та зберігає отриману відповідь у директорії, яку сканує Kinesis agent. Реалізація цього скрипта досить стандартна, є клас TicketsApi, він дозволяє асинхронно смикати API. У цей клас передаємо заголовок з токеном та параметри запиту:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Для тестування правильності налаштувань та працездатності агента зробимо тестовий запуск скрипту api_caller.py:

sudo ./api_caller.py TOKEN

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
І дивимося результат роботи в логах Агента та на закладці Monitoring у потоці даних airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Як видно, все працює і Kinesis Agent успішно надсилає дані в потік. Тепер налаштуємо consumer.

Налаштування Kinesis Data Analytics

Перейдемо до центрального компоненту всієї системи — створимо нову програму в Kinesis Data Analytics з іменем kinesis_analytics_airlines_app:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Kinesis Data Analytics дозволяє виконувати аналітику даних у реальному часі з Kinesis Streams за допомогою мови SQL. Це повністю автомасштабований сервіс (на відміну від Kinesis Streams), який:

  1. дозволяє створювати нові потоки (Output Stream) на основі запитів до вихідних даних;
  2. надає потік з помилками, що виникли під час роботи програм (Error Stream);
  3. вміє автоматично визначати схему вхідних даних (її можна вручну перевизначити за потреби).

Це недешевий сервіс - 0.11 USD за годину роботи, тому користуватися ним слід акуратно та видаляти при завершенні роботи.

Підключимо додаток до джерела даних:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Вибираємо потік, до якого збираємося підключитися (airline_tickets):

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Далі, необхідно приточити нову IAM Роль для того, щоб програма могла читати з потоку і писати в потік. Для цього достатньо нічого не міняти в блоці Access permissions:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Тепер запитаємо виявлення схеми даних у потоці, при цьому натискаємо кнопку «Discover schema». В результаті оновиться (створеться нова) роль IAM і буде запущено виявлення схеми даних, які вже прилетіли в потік:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Тепер потрібно перейти до редактора SQL. При натисканні на цю кнопку, вийде вікно з питанням про запуск програми - вибираємо що хочемо запустити:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
У вікно редактора SQL вставимо такий простий запит і натискаємо Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

У реляційних базах даних ви працюєте з таблицями, використовуючи оператори INSERT для додавання записів та оператор SELECT для запиту даних. В Amazon Kinesis Data Analytics ви працюєте з потоками (STREAM) та «насосами» (PUMP) — безперервними запитами вставки, які вставляють дані з одного потоку в додатку до іншого потоку.

У представленому вище SQL запит відбувається пошук квитків Аерофлоту за вартістю нижче п'яти тисяч рублів. Всі записи, які підпадають під ці умови, будуть поміщені до потоку DESTINATION_SQL_STREAM.

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
У блоці Destination вибираємо потік special_stream, а в списку, що розкривається In-application stream name DESTINATION_SQL_STREAM:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
В результаті всіх маніпуляцій має вийти щось схоже на картинку нижче:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Створення та передплата на топік SNS

Переходимо в сервіс Simple Notification Service і створюємо там новий топік з ім'ям Airlines:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Оформляємо підписку на цей топік, в ній вказуємо номер мобільного телефону, на який надходитимуть SMS-повідомлення:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Створення таблиці у DynamoDB

Для зберігання необроблених даних їх потоку airline_tickets створимо таблицю в DynamoDB з таким же ім'ям. Як первинний ключ будемо використовувати record_id:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Створення лямбда-функції collector

Створимо лямбда-функцію під назвою Collector, завданням якої буде опитування потоку airline_tickets і, у разі знаходження нових записів, вставка цих записів в таблицю DynamoDB. Очевидно, що крім стандартних прав, ця лямбда повинна мати доступ до читання потоку даних Kinesis і запису в DynamoDB.

Створення IAM ролі для лямбда-функції collector
Для початку створимо нову IAM роль для лямбди з ім'ям Lambda-TicketsProcessingRole:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Для тестового прикладу цілком підійдуть встановлені поліси AmazonKinesisReadOnlyAccess і AmazonDynamoDBFullAccess, як показано на малюнку нижче:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Дана лямбда повинна запускатись по тригеру від Kinesis при попаданні нових записів у потік airline_stream, тому треба додати новий тригер:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Залишилося вставити код та зберегти лямбду.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Створення лямбда-функції notifier

Друга лямбда-функція, яка моніторитиме другий потік (special_stream) і відправлятиме повідомлення в SNS, створюється аналогічно. Отже, ця лямбда повинна мати доступ на читання з Kinesis та відправлення повідомлень у заданий SNS-топік, який далі сервісом SNS буде надіслано всім передплатникам цього топіка (email, SMS тощо).

Створення IAM ролі
Спочатку створюємо IAM роль Lambda-KinesisAlarm для цієї лямбди, а потім призначаємо цю роль для створюваної лямбди alarm_notifier:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Ця лямбда повинна працювати по тригеру для потрапляння нових записів у потік special_stream, тому необхідно налаштувати тригер аналогічно тому, як ми це робили для лямбди Collector.

Для зручності налаштування цієї лямбди, введемо нову змінну оточення – TOPIC_ARN, куди поміщаємо ANR (Amazon Recourse Names) топіка Airlines:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
І вставляємо код лямбди, він зовсім нескладний:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Здається, на цьому ручне налаштування системи завершено. Залишається тільки протестувати і переконатися, що ми налаштували все правильно.

Деплой із коду Terraform

Необхідна підготовка

Terraform - Дуже зручний open-source інструмент для розгортання інфраструктури з коду. Він має свій синтаксис, який легко освоїти і безліч прикладів, як і що розгорнути. У редакторі Atom або Visual Studio Code є багато зручних плагінів, що дозволяють полегшити роботу з Terraform.

Дистрибутив можна скачати звідси. Докладний аналіз всіх можливостей Terraform виходить за рамки цієї статті, тому обмежимося основними моментами.

як запустити

Повний код проекту лежить у моєму репозиторії. Клонуємо себе репозиторій. Перед запуском необхідно переконатися, що у вас встановлений та налаштований AWS CLI. Terraform шукатиме облікові дані у файлі ~/.aws/credentials.

Хорошою практикою є перед деплом всієї інфраструктури, запускати команду plan, щоб подивитися, що Terraform нам зараз натворює у хмарі:

terraform.exe plan

Буде запропоновано ввести номер телефону для надсилання на нього повідомлень. На цьому етапі його запроваджувати необов'язково.

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Проаналізувавши план роботи програми, можемо запускати створення ресурсів:

terraform.exe apply

Після відправки цієї команди знову з'явиться запит на введення номера телефону, набираємо "yes", коли буде показано питання реального виконання дій. Це дозволить підняти всю інфраструктуру, провести необхідне налаштування EC2, розгорнути лямбда-функції і т.д.

Після того, як всі ресурси будуть успішно створені через код Terraform, необхідно зайти до деталей програми Kinesis Analytics (на жаль, я не знайшов як це зробити відразу з коду).

Запускаємо додаток:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Після цього необхідно явно задати in-application stream name, вибравши зі списку, що розкривається:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Наразі все готове до роботи.

Тестування роботи програми

Незалежно від того, як ви деплоїли систему, вручну або через код Terraform, працюватиме вона однаково.

Заходимо SSH на віртуальну машину EC2, де встановлений Kinesis Agent і запускаємо скрипт api_caller.py

sudo ./api_caller.py TOKEN

Залишилось дочекатися SMS на ваш номер:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
SMS – повідомлення надходить на телефон практично через 1 хвилину:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Залишилося подивитися, чи збереглися записи в базі даних DynamoDB для подальшого, детальнішого аналізу. Таблиця airline_tickets містить приблизно такі дані:

Інтеграція Aviasales API з Amazon Kinesis та простота serverless

Висновок

У ході виконаної роботи було побудовано систему онлайн-обробки даних на базі Amazon Kinesis. Були розглянуті варіанти використання Kinesis Agent у зв'язку з Kinesis Data Streams та реал-тайм аналітикою Kinesis Analytics за допомогою SQL команд, а також взаємодія Amazon Kinesis з іншими сервісами AWS.

Вищеописану систему ми розгорнули двома способами: досить довгим ручним та швидким із коду Terraform.

Весь вихідний код проекту доступний у моєму репозиторії на GitHub, пропоную з ним ознайомитись.

Із задоволенням готовий обговорити статтю, чекаю на Ваші коментарі. Сподіваюся на конструктивну критику.

Бажаю успіхів!

Джерело: habr.com

Додати коментар або відгук