Інтеграція Aviasales API з Amazon Kinesis та простота serverless
Привіт, Хабре!
А ви любите літати літаками? Я обожнюю, але на самоізоляції полюбив ще й аналізувати дані про авіаквитки одного відомого ресурсу - Aviasales.
Сьогодні ми розберемо роботу Amazon Kinesis, побудуємо стримінгову систему з реал-тайм аналітикою, поставимо NoSQL базу даних Amazon DynamoDB як основне сховище даних та налаштуємо сповіщення через SMS за цікавими квитками.
Усі подробиці під катом! Поїхали!
Запровадження
Для прикладу нам потрібний доступ до API Aviasales. Доступ до нього надається безкоштовно і без обмежень, необхідно лише зареєструватися в розділі «Розробникам», щоб отримати свій API токен для доступу до даних.
Основна мета цієї статті - дати загальне розуміння використання потокової передачі інформації в AWS, ми виносимо за дужки, що дані, що повертаються використовуваним API не є строго актуальними і передаються з кешу, який формується на основі пошуків користувачів сайтів Aviasales.ru та Jetradar.com останні 48 годин.
Отримані через API дані про авіаквитки Kinesis-agent, встановлений на машині-продюсері, автоматично парсить і передаватиме в потрібний потік через Kinesis Data Analytics. Необроблена версія цього потоку писатиметься безпосередньо у сховищі. Розгорнуте в DynamoDB сховище «сирих» даних дозволить проводити більш глибокий аналіз квитків через BI інструменти, наприклад, AWS Quick Sight.
Ми розглянемо два варіанти деплою всієї інфраструктури:
Ручний через AWS Management Console;
Інфраструктура з коду Terraform - для лінивих автоматизаторів;
Архітектура системи, що розробляється
Використовувані компоненти:
Aviasales API — дані, що повертаються цим API, будуть використовуватися для подальшої роботи;
EC2 Producer Instance - звичайна віртуальна машина в хмарі, на якій буде генеруватися вхідний потік даних:
Kinesis Agent — це Java-додаток, що встановлюється локально на машину, який надає простий спосіб збирання та відправлення даних у Kinesis (Kinesis Data Streams або Kinesis Firehose). Агент постійно відстежує набір файлів у зазначених директоріях та надсилає нові дані до Kinesis;
Скрипт API Caller - Python-скрипт, що робить запити до API і складає відповідь у папку, яку моніторить Kinesis Agent;
Потоки даних Kinesis - сервіс потокової передачі в режимі реального часу з широкими можливостями масштабування;
Kinesis Analytics - Безсерверний сервіс, що спрощує аналіз потокових даних у режимі реального часу. Amazon Kinesis Data Analytics налаштовує ресурси для роботи програм і автоматично масштабується для обробки будь-яких обсягів вхідних даних;
AWS Lambda — сервіс, що дозволяє запускати код без резервування та налаштування серверів. Усі обчислювальні потужності автоматично масштабуються під кожний виклик;
Amazon DynamoDB — база даних пар «ключ-значення» та документів, що забезпечує затримку менше 10 мілісекунд під час роботи в будь-якому масштабі. При використанні DynamoDB не потрібно розподіляти сервери, встановлювати на них виправлення або керувати ними. DynamoDB автоматично масштабує таблиці, коригуючи обсяг доступних ресурсів та зберігаючи високу продуктивність. Жодні дії з адміністрування системи не потрібні;
Amazon SNS — повністю керований сервіс надсилання повідомлень за моделлю «видавець — передплатник» (Pub/Sub), за допомогою якого можна ізолювати мікросервіси, розподілені системи та безсерверні програми. SNS можна використовувати для розсилки інформації кінцевим користувачам за допомогою мобільних push-повідомлень, SMS-повідомлень та електронних листів.
Початкова підготовка
Для емуляції потоку даних я вирішив використовувати інформацію про авіаквитки, що повертається API Aviasales. У документації досить великий список різних методів, візьмемо один із них — «Календар цін на місяць», який повертає ціни за кожен день місяця, згруповані за кількістю пересадок. Якщо не передавати в запиті місяць пошуку, то буде повернуто інформацію за місяць, що настає за поточним.
Вищеописаний спосіб отримання даних від API із зазначенням токена в запиті працюватиме, але мені більше подобається передавати токен доступу через заголовок, тому в скрипті api_caller.py будемо користуватися саме цим способом.
У прикладі відповіді API вище показаний квиток із Санкт-Петербурга до Пхука… Ех, та що мріяти…
Так як я з Казані, а Пхукет зараз нам тільки сниться, пошукаємо квитки з Санкт-Петербурга в Казань.
Передбачається, що у вас вже є обліковий запис в AWS. Відразу хочу звернути особливу увагу, що Kinesis та відправлення повідомлень через SMS не входять до річного Free Tier (безкоштовне використання). Але навіть незважаючи на це, заклавши в думці пару доларів, цілком можна побудувати запропоновану систему і пограти з нею. І, звичайно, не варто забувати видаляти всі ресурси після того, як вони стали не потрібні.
На щастя, DynamoDb та лямбда-функції будуть для нас умовно безкоштовними, якщо вкластися у місячні безкоштовні ліміти. Наприклад, для DynamoDB: 25 Гб сховища, 25 WCU/RCU та 100 млн. запитів. І мільйон викликів лямбда функцій на місяць.
Ручний деплой системи
Налаштування Kinesis Data Streams
Перейдемо в сервіс Kinesis Data Streams і створюємо два нових потоки по одному шарду на кожен.
Що таке шард?
Шард – це основна одиниця передачі даних потоку Amazon Kinesis. Один сегмент забезпечує передачу вхідних даних зі швидкістю 1 МБ/с та передачу вихідних даних зі швидкістю 2 МБ/с. Один сегмент підтримує до 1000 записів PUT за секунду. Під час створення потоку даних потрібно вказати необхідну кількість сегментів. Наприклад, можна створити потік даних із двома сегментами. Цей потік даних забезпечить передачу вхідних даних зі швидкістю 2 МБ/с та передачу вихідних даних зі швидкістю 4 МБ/с із підтримкою до 2000 записів PUT на секунду.
Чим більше шардів у вашому потоці — тим більша його пропускна спроможність. В принципі, так і масштабуються потоки шляхом додавання шардів. Але чим більше у вас шардів, тим вища і ціна. Кожен шард коштує 1,5 цента на годину та додатково 1.4 цента за кожні мільйон операцій додавання до потоку (PUT payload units).
Створимо новий потік з ім'ям airline_tickets, йому цілком достатньо буде 1 шарда:
Тепер створимо ще один потік із ім'ям special_stream:
Налаштування продюсера
Як продюсер даних для аналізу завдання досить використовувати звичайний EC2 інстанс. Це не має бути потужна дорога віртуальна машина, цілком підійде спотовий t2.micro.
Важливо: Для прикладу слід використовувати image — Amazon Linux AMI 2018.03.0, з ним менше налаштувань для швидкого запуску Kinesis Agent.
Переходимо на обслуговування EC2, створюємо нову віртуальну машину, вибираємо необхідний AMI з типом t2.micro, що входить у Free Tier:
Для того щоб новостворена віртуальна машина змогла взаємодіяти з сервісом Kinesis, необхідно дати їй на це права. Найкращий спосіб це зробити – призначити IAM Role. Тому на екрані Step 3: Configure Instance Details слід вибрати Create new IAM Role:
Створення IAM ролі для EC2
У вікні, вибираємо, що нову роль створюємо для EC2 і переходимо в розділ Permissions:
На навчальному прикладі можна не вдаватися в усі тонкощі гранулярного налаштування прав на ресурси, тому виберемо поліси, що налаштовані Амазоном: AmazonKinesisFullAccess і CloudWatchFullAccess.
Дамо якесь осмислене ім'я для цієї ролі, наприклад: EC2-KinesisStreams-FullAccess. В результаті, має вийти те саме, що вказано на малюнку нижче:
Після створення цієї нової ролі, не забуваємо причепити її до створюваного інстансу віртуальної машини:
Більше на цьому екрані нічого не змінюємо та переходимо до наступних вікон.
Параметри жорсткого диска можна залишити за замовчуванням, теги теж (хоча, гарною практикою є теги використовувати, хоча б давати ім'я інстансу та вказувати енвайронмент).
Тепер ми на закладці Step 6: Configure Security Group, де необхідно створити новий або вказати Security group, що має у вас, що дозволяє робити коннект через ssh (порт 22) на інстанс. Виберіть там Source -> My IP і можете запускати інстанс.
Як тільки він перейде в статус running, можна намагатися законнектитися на нього через ssh.
Щоб отримати можливість роботи з Kinesis Agent після успішного коннекту до машини, необхідно ввести наступні команди в терміналі:
Як видно з файлу конфігурації, агент буде моніторити в директорії /var/log/airline_tickets/ файли з розширенням .log, парс їх і передавати в потік airline_tickets.
Перезапускаємо сервіс і переконуємося, що він запустився та працює:
sudo service aws-kinesis-agent restart
Тепер скачаємо Python-скрипт, який запитуватиме дані у API:
Скрипт api_caller.py запитує дані у Aviasales та зберігає отриману відповідь у директорії, яку сканує Kinesis agent. Реалізація цього скрипта досить стандартна, є клас TicketsApi, він дозволяє асинхронно смикати API. У цей клас передаємо заголовок з токеном та параметри запиту:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Для тестування правильності налаштувань та працездатності агента зробимо тестовий запуск скрипту api_caller.py:
sudo ./api_caller.py TOKEN
І дивимося результат роботи в логах Агента та на закладці Monitoring у потоці даних airline_tickets:
Як видно, все працює і Kinesis Agent успішно надсилає дані в потік. Тепер налаштуємо consumer.
Налаштування Kinesis Data Analytics
Перейдемо до центрального компоненту всієї системи — створимо нову програму в Kinesis Data Analytics з іменем kinesis_analytics_airlines_app:
Kinesis Data Analytics дозволяє виконувати аналітику даних у реальному часі з Kinesis Streams за допомогою мови SQL. Це повністю автомасштабований сервіс (на відміну від Kinesis Streams), який:
дозволяє створювати нові потоки (Output Stream) на основі запитів до вихідних даних;
надає потік з помилками, що виникли під час роботи програм (Error Stream);
вміє автоматично визначати схему вхідних даних (її можна вручну перевизначити за потреби).
Це недешевий сервіс - 0.11 USD за годину роботи, тому користуватися ним слід акуратно та видаляти при завершенні роботи.
Підключимо додаток до джерела даних:
Вибираємо потік, до якого збираємося підключитися (airline_tickets):
Далі, необхідно приточити нову IAM Роль для того, щоб програма могла читати з потоку і писати в потік. Для цього достатньо нічого не міняти в блоці Access permissions:
Тепер запитаємо виявлення схеми даних у потоці, при цьому натискаємо кнопку «Discover schema». В результаті оновиться (створеться нова) роль IAM і буде запущено виявлення схеми даних, які вже прилетіли в потік:
Тепер потрібно перейти до редактора SQL. При натисканні на цю кнопку, вийде вікно з питанням про запуск програми - вибираємо що хочемо запустити:
У вікно редактора SQL вставимо такий простий запит і натискаємо Save and Run SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
У реляційних базах даних ви працюєте з таблицями, використовуючи оператори INSERT для додавання записів та оператор SELECT для запиту даних. В Amazon Kinesis Data Analytics ви працюєте з потоками (STREAM) та «насосами» (PUMP) — безперервними запитами вставки, які вставляють дані з одного потоку в додатку до іншого потоку.
У представленому вище SQL запит відбувається пошук квитків Аерофлоту за вартістю нижче п'яти тисяч рублів. Всі записи, які підпадають під ці умови, будуть поміщені до потоку DESTINATION_SQL_STREAM.
У блоці Destination вибираємо потік special_stream, а в списку, що розкривається In-application stream name DESTINATION_SQL_STREAM:
В результаті всіх маніпуляцій має вийти щось схоже на картинку нижче:
Створення та передплата на топік SNS
Переходимо в сервіс Simple Notification Service і створюємо там новий топік з ім'ям Airlines:
Оформляємо підписку на цей топік, в ній вказуємо номер мобільного телефону, на який надходитимуть SMS-повідомлення:
Створення таблиці у DynamoDB
Для зберігання необроблених даних їх потоку airline_tickets створимо таблицю в DynamoDB з таким же ім'ям. Як первинний ключ будемо використовувати record_id:
Створення лямбда-функції collector
Створимо лямбда-функцію під назвою Collector, завданням якої буде опитування потоку airline_tickets і, у разі знаходження нових записів, вставка цих записів в таблицю DynamoDB. Очевидно, що крім стандартних прав, ця лямбда повинна мати доступ до читання потоку даних Kinesis і запису в DynamoDB.
Створення IAM ролі для лямбда-функції collector
Для початку створимо нову IAM роль для лямбди з ім'ям Lambda-TicketsProcessingRole:
Для тестового прикладу цілком підійдуть встановлені поліси AmazonKinesisReadOnlyAccess і AmazonDynamoDBFullAccess, як показано на малюнку нижче:
Дана лямбда повинна запускатись по тригеру від Kinesis при попаданні нових записів у потік airline_stream, тому треба додати новий тригер:
Залишилося вставити код та зберегти лямбду.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Створення лямбда-функції notifier
Друга лямбда-функція, яка моніторитиме другий потік (special_stream) і відправлятиме повідомлення в SNS, створюється аналогічно. Отже, ця лямбда повинна мати доступ на читання з Kinesis та відправлення повідомлень у заданий SNS-топік, який далі сервісом SNS буде надіслано всім передплатникам цього топіка (email, SMS тощо).
Створення IAM ролі
Спочатку створюємо IAM роль Lambda-KinesisAlarm для цієї лямбди, а потім призначаємо цю роль для створюваної лямбди alarm_notifier:
Ця лямбда повинна працювати по тригеру для потрапляння нових записів у потік special_stream, тому необхідно налаштувати тригер аналогічно тому, як ми це робили для лямбди Collector.
Для зручності налаштування цієї лямбди, введемо нову змінну оточення – TOPIC_ARN, куди поміщаємо ANR (Amazon Recourse Names) топіка Airlines:
І вставляємо код лямбди, він зовсім нескладний:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Здається, на цьому ручне налаштування системи завершено. Залишається тільки протестувати і переконатися, що ми налаштували все правильно.
Деплой із коду Terraform
Необхідна підготовка
Terraform - Дуже зручний open-source інструмент для розгортання інфраструктури з коду. Він має свій синтаксис, який легко освоїти і безліч прикладів, як і що розгорнути. У редакторі Atom або Visual Studio Code є багато зручних плагінів, що дозволяють полегшити роботу з Terraform.
Дистрибутив можна скачати звідси. Докладний аналіз всіх можливостей Terraform виходить за рамки цієї статті, тому обмежимося основними моментами.
як запустити
Повний код проекту лежить у моєму репозиторії. Клонуємо себе репозиторій. Перед запуском необхідно переконатися, що у вас встановлений та налаштований AWS CLI. Terraform шукатиме облікові дані у файлі ~/.aws/credentials.
Хорошою практикою є перед деплом всієї інфраструктури, запускати команду plan, щоб подивитися, що Terraform нам зараз натворює у хмарі:
terraform.exe plan
Буде запропоновано ввести номер телефону для надсилання на нього повідомлень. На цьому етапі його запроваджувати необов'язково.
Проаналізувавши план роботи програми, можемо запускати створення ресурсів:
terraform.exe apply
Після відправки цієї команди знову з'явиться запит на введення номера телефону, набираємо "yes", коли буде показано питання реального виконання дій. Це дозволить підняти всю інфраструктуру, провести необхідне налаштування EC2, розгорнути лямбда-функції і т.д.
Після того, як всі ресурси будуть успішно створені через код Terraform, необхідно зайти до деталей програми Kinesis Analytics (на жаль, я не знайшов як це зробити відразу з коду).
Запускаємо додаток:
Після цього необхідно явно задати in-application stream name, вибравши зі списку, що розкривається:
Наразі все готове до роботи.
Тестування роботи програми
Незалежно від того, як ви деплоїли систему, вручну або через код Terraform, працюватиме вона однаково.
Заходимо SSH на віртуальну машину EC2, де встановлений Kinesis Agent і запускаємо скрипт api_caller.py
sudo ./api_caller.py TOKEN
Залишилось дочекатися SMS на ваш номер:
SMS – повідомлення надходить на телефон практично через 1 хвилину:
Залишилося подивитися, чи збереглися записи в базі даних DynamoDB для подальшого, детальнішого аналізу. Таблиця airline_tickets містить приблизно такі дані:
Висновок
У ході виконаної роботи було побудовано систему онлайн-обробки даних на базі Amazon Kinesis. Були розглянуті варіанти використання Kinesis Agent у зв'язку з Kinesis Data Streams та реал-тайм аналітикою Kinesis Analytics за допомогою SQL команд, а також взаємодія Amazon Kinesis з іншими сервісами AWS.
Вищеописану систему ми розгорнули двома способами: досить довгим ручним та швидким із коду Terraform.