ProHoster > блог > Администрација > Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Еј Хабр!
Дали сакате да летате со авиони? Го сакам тоа, но за време на самоизолацијата се заљубив и во анализирањето на податоците за авионските билети од еден добро познат извор - Aviasales.
Денес ќе ја анализираме работата на Amazon Kinesis, ќе изградиме систем за стриминг со аналитика во реално време, ќе ја инсталираме базата на податоци на Amazon DynamoDB NoSQL како главно складирање на податоци и ќе поставиме СМС известувања за интересни билети.
Сите детали се под рез! Оди!
Вовед
На пример, ни треба пристап до Aviasales API. Пристапот до него е обезбеден бесплатно и без ограничувања; само треба да се регистрирате во делот „Програмери“ за да го добиете вашиот API токен за пристап до податоците.
Главната цел на овој напис е да даде општо разбирање за употребата на пренос на информации во AWS; ние земаме предвид дека податоците вратени од користениот API не се строго ажурирани и се пренесуваат од кешот, што е формирана врз основа на пребарувања на корисниците на страниците Aviasales.ru и Jetradar.com во последните 48 часа.
Kinesis-agent, инсталиран на машината за производство, примен преку API автоматски ќе ги анализира и пренесува податоците до саканиот поток преку Kinesis Data Analytics. Суровата верзија на овој пренос ќе биде напишана директно во продавницата. Складирањето необработени податоци распоредено во DynamoDB ќе овозможи подлабока анализа на билетите преку алатките за БИ, како што е AWS Quick Sight.
Ќе разгледаме две опции за распоредување на целата инфраструктура:
Прирачник - преку AWS Management Console;
Инфраструктурата од Terraform кодот е за мрзливи автомати;
Архитектура на развиениот систем
Користени компоненти:
Aviasales API — податоците вратени од овој API ќе се користат за целата наредна работа;
EC2 продуцент пример — обична виртуелна машина во облакот на која ќе се генерира влезниот проток на податоци:
Кинезис агент е Java апликација инсталирана локално на машината која обезбедува лесен начин за собирање и испраќање податоци до Kinesis (Kinesis Data Streams или Kinesis Firehose). Агентот постојано следи збир на датотеки во наведените директориуми и испраќа нови податоци до Kinesis;
Скрипта за повикувач на API — Python скрипта што испраќа барања до API и го става одговорот во папка што е надгледувана од Kinesis Agent;
Кинезис текови на податоци — услуга за пренос на податоци во реално време со широки можности за скалирање;
Анализа на кинезис е услуга без сервер која ја поедноставува анализата на стриминг податоци во реално време. Amazon Kinesis Data Analytics ги конфигурира ресурсите на апликацијата и автоматски се зголемува за да се справи со кој било обем на дојдовни податоци;
AWS Ламбда — услуга која ви овозможува да извршувате код без да правите резервна копија или да поставувате сервери. Целата компјутерска моќ автоматски се намалува за секој повик;
Amazon DynamoDB - База на податоци со парови и документи со клучеви и вредности што обезбедува латентност помала од 10 милисекунди кога работи во која било скала. Кога користите DynamoDB, не треба да обезбедувате, закрпите или управувате со какви било сервери. DynamoDB автоматски ги зголемува табелите за да ја прилагоди количината на достапни ресурси и да одржува високи перформанси. Не е потребна системска администрација;
Амазон СНС - целосно управувана услуга за испраќање пораки со користење на моделот издавач-претплатник (Pub/Sub), со кој можете да изолирате микросервиси, дистрибуирани системи и апликации без сервер. SNS може да се користи за испраќање информации до крајните корисници преку мобилни притисни известувања, СМС пораки и е-пошта.
Почетна обука
За да го имитирам протокот на податоци, решив да ги користам информациите за авионски билети вратени од API-то на Aviasales. ВО документација доста обемна листа на различни методи, ајде да земеме еден од нив - „Месечен календар на цени“, кој ги враќа цените за секој ден од месецот, групирани според бројот на трансфери. Ако не го наведете месецот на пребарување во барањето, информациите ќе бидат вратени за месецот што следи по тековниот.
Значи, ајде да се регистрираме и да го добиеме нашиот токен.
Горенаведениот метод за примање податоци од API со назначување токен во барањето ќе работи, но јас претпочитам да го префрлам токенот за пристап преку заглавието, така што ќе го користиме овој метод во скриптата api_caller.py.
Примерот API одговор погоре покажува билет од Санкт Петербург до Пук... О, каков сон...
Бидејќи јас сум од Казан, а Пукет сега е „само сон“, ајде да бараме билети од Санкт Петербург до Казан.
Претпоставува дека веќе имате сметка AWS. Би сакал веднаш да привлечам посебно внимание на фактот дека Kinesis и испраќањето известувања преку СМС не се вклучени во годишниот Бесплатно ниво (бесплатна употреба). Но, и покрај ова, со неколку долари на ум, сосема е можно да се изгради предложениот систем и да се игра со него. И, се разбира, не заборавајте да ги избришете сите ресурси откако повеќе не се потребни.
За среќа, функциите DynamoDb и ламбда ќе ни бидат бесплатни доколку ги исполниме нашите месечни бесплатни граници. На пример, за DynamoDB: 25 GB простор, 25 WCU/RCU и 100 милиони барања. И милион повици со ламбда функција месечно.
Рачно распоредување на системот
Поставување на Kinesis податочни текови
Ајде да одиме на услугата Kinesis Data Streams и да создадеме два нови текови, по еден фрагмент за секој.
Што е фрагмент?
Шард е основната единица за пренос на податоци на потокот на Amazon Kinesis. Еден сегмент обезбедува пренос на влезни податоци со брзина од 1 MB/s и пренос на излезни податоци со брзина од 2 MB/s. Еден сегмент поддржува до 1000 PUT записи во секунда. Кога креирате поток на податоци, треба да го наведете потребниот број на сегменти. На пример, можете да креирате поток на податоци со два сегменти. Овој поток на податоци ќе обезбеди влезни пренос на податоци со 2 MB/s и излезни пренос на податоци со 4 MB/s, поддржувајќи до 2000 PUT записи во секунда.
Колку повеќе парчиња во вашиот поток, толку е поголема неговата пропусност. Во принцип, вака се скалираат тековите - со додавање на парчиња. Но, колку повеќе парчиња имате, толку е поголема цената. Секој фрагмент чини 1,5 центи на час и дополнителни 1.4 центи за секој милион PUT единици на носивост.
Ајде да создадеме нов поток со името авиобилети, 1 парче ќе му биде доволно:
Сега ајде да создадеме друга нишка со името special_stream:
Поставување на производителот
За да се анализира задачата, доволно е да се користи обична EC2 инстанца како продуцент на податоци. Не мора да биде моќна, скапа виртуелна машина; spot t2.micro ќе биде добро.
Важна забелешка: на пример, треба да користите слика - Amazon Linux AMI 2018.03.0, има помалку поставки за брзо стартување на Kinesis Agent.
Одете во услугата EC2, креирајте нова виртуелна машина, изберете го саканиот AMI со тип t2.micro, кој е вклучен во Free Tier:
За да може новосоздадената виртуелна машина да има интеракција со услугата Kinesis, мора да и се дадат права за тоа. Најдобар начин да го направите ова е да доделите улога на IAM. Затоа, на екранот Чекор 3: Конфигурирај ги деталите за примерот, треба да изберете Создадете нова улога на IAM:
Креирање на улога на IAM за EC2
Во прозорецот што се отвора, изберете дека создаваме нова улога за EC2 и одете во делот Дозволи:
Користејќи го примерот за обука, не мора да навлегуваме во сите сложености на грануларна конфигурација на правата на ресурсите, па затоа ќе ги избереме политиките претходно конфигурирани од Amazon: AmazonKinesisFullAccess и CloudWatchFullAccess.
Ајде да дадеме некое значајно име за оваа улога, на пример: EC2-KinesisStreams-FullAccess. Резултатот треба да биде ист како што е прикажан на сликата подолу:
Откако ќе ја креирате оваа нова улога, не заборавајте да ја прикачите на примерот на креираната виртуелна машина:
Ние не менуваме ништо друго на овој екран и преминуваме на следните прозорци.
Поставките на хард дискот може да се остават како стандардни, како и ознаките (иако е добра практика да се користат ознаки, барем да се даде име на примерокот и да се означи околината).
Сега сме на табулаторот Чекор 6: Конфигурирај група за безбедност, каде што треба да креирате нова или да ја наведете постоечката група за безбедност, која ви овозможува да се поврзете преку ssh (порта 22) со примерокот. Изберете Source -> My IP таму и можете да ја стартувате инстанцата.
Веднаш штом ќе се префрли на статусот на работа, може да се обидете да се поврзете со него преку ssh.
За да можете да работите со Kinesis Agent, по успешно поврзување со машината, мора да ги внесете следните команди во терминалот:
Како што може да се види од конфигурациската датотека, агентот ќе ги следи датотеките со наставката .log во директориумот /var/log/airline_tickets/, ќе ги анализира и ќе ги префрли во протокот на airline_tickets.
Ја рестартираме услугата и се уверуваме дека таа е вклучена и работи:
sudo service aws-kinesis-agent restart
Сега да ја преземеме Python скриптата што ќе бара податоци од API:
Скриптата api_caller.py бара податоци од Aviasales и го зачувува добиениот одговор во директориумот што го скенира агентот Kinesis. Имплементацијата на оваа скрипта е сосема стандардна, постои класа TicketsApi, ви овозможува асинхроно да го повлечете API. Подаваме заглавие со токен и бараме параметри во оваа класа:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
За да ги тестираме точните поставки и функционалноста на агентот, ајде да ја тестираме скриптата api_caller.py:
sudo ./api_caller.py TOKEN
И го гледаме резултатот од работата во дневниците на агентите и на картичката Мониторинг во протокот на податоци за авионски билети:
Како што можете да видите, сè работи и Kinesis Agent успешно испраќа податоци до потокот. Сега ајде да го конфигурираме потрошувачот.
Поставување на кинезис анализа на податоци
Ајде да преминеме на централната компонента на целиот систем - креирајте нова апликација во Kinesis Data Analytics со име kinesis_analytics_airlines_app:
Kinesis Data Analytics ви овозможува да вршите аналитика на податоци во реално време од Kinesis Streams користејќи го јазикот SQL. Тоа е услуга за целосно автоматско скалирање (за разлика од Kinesis Streams) која:
ви овозможува да креирате нови преноси (Излезен тек) врз основа на барања за извор на податоци;
обезбедува пренос со грешки што се случиле додека се извршувале апликациите (Проток на грешка);
може автоматски да ја одреди шемата за влезни податоци (може рачно да се редефинира доколку е потребно).
Ова не е евтина услуга - 0.11 американски долари за час работа, затоа треба внимателно да ја користите и да ја избришете кога ќе завршите.
Ајде да ја поврземе апликацијата со изворот на податоци:
Изберете го потокот на кој ќе се поврземе (авиокомпанија_тикети):
Следно, треба да прикачите нова улога на IAM за да може апликацијата да чита од преносот и да пишува на преносот. За да го направите ова, доволно е да не менувате ништо во блокот за дозволи за пристап:
Сега да побараме откривање на шемата за податоци во потокот; за да го направите ова, кликнете на копчето „Откриј шема“. Како резултат на тоа, улогата на IAM ќе се ажурира (ќе се создаде нова) и ќе се стартува откривањето шема од податоците што веќе пристигнале во преносот:
Сега треба да отидете во SQL уредникот. Кога ќе кликнете на ова копче, ќе се појави прозорец со барање да ја стартувате апликацијата - изберете што сакате да стартувате:
Вметнете го следното едноставно барање во прозорецот на уредувачот SQL и кликнете Зачувај и Стартувај SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
Во релационите бази на податоци, работите со табели користејќи INSERT изјави за додавање записи и SELECT изјава за барање податоци. Во Amazon Kinesis Data Analytics, работите со потоци (STREAMs) и пумпи (PUMPs) - континуирани барања за вметнување кои вметнуваат податоци од еден поток во апликација во друг пренос.
Барањето SQL претставено погоре бара билети за Аерофлот по цена под пет илјади рубли. Сите записи што ги исполнуваат овие услови ќе бидат ставени во преносот DESTINATION_SQL_STREAM.
Во блокот Destination, изберете го преносот special_stream и во паѓачката листа DESTINATION_SQL_STREAM името на преносот во апликација:
Резултатот од сите манипулации треба да биде нешто слично на сликата подолу:
Креирање и претплата на тема за SNS
Одете во услугата за едноставни известувања и креирајте нова тема таму со името Авиокомпании:
Претплатете се на оваа тема и наведете го бројот на мобилниот телефон на кој ќе се испраќаат СМС известувања:
Направете табела во DynamoDB
За да ги зачувате необработените податоци од нивниот тек на airline_tickets, ајде да создадеме табела во DynamoDB со истото име. Ќе користиме record_id како примарен клуч:
Создавање колектор на ламбда функции
Ајде да создадеме ламбда функција наречена Колектор, чија задача ќе биде да го анкетира протокот на авионски билети и, доколку се најдат нови записи таму, да ги вметне овие записи во табелата DynamoDB. Очигледно, покрај стандардните права, оваа ламбда мора да има пристап за читање до протокот на податоци на Kinesis и пристап за пишување до DynamoDB.
Креирање IAM улога за колекторската ламбда функција
Прво, ајде да создадеме нова IAM улога за ламбда наречена Lambda-TicketsProcessingRole:
За примерот за тестирање, претходно конфигурираните политики на AmazonKinesisReadOnlyAccess и AmazonDynamoDBFullAccess се сосема соодветни, како што е прикажано на сликата подолу:
Оваа ламбда треба да се активира со активирач од Kinesis кога новите записи ќе влезат во airline_stream, па затоа треба да додадеме нов активирач:
Останува само да се вметне кодот и да се зачува ламбда.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Креирање известувач за ламбда функција
На сличен начин е креирана и втората ламбда функција, која ќе го следи вториот поток (special_stream) и ќе испрати известување до SNS. Затоа, оваа ламбда мора да има пристап за читање од Kinesis и испраќање пораки до дадена тема на SNS, кои потоа ќе бидат испратени од SNS сервисот до сите претплатници на оваа тема (email, SMS итн.).
Креирање на улога на IAM
Прво, ја креираме улогата IAM Lambda-KinesisAlarm за оваа ламбда, а потоа ја доделуваме оваа улога на alarm_notifier ламбда што се создава:
Оваа ламбда треба да работи на активирањето за новите записи да влезат во special_stream, така што треба да го конфигурирате активирањето на ист начин како што направивме за колекторската ламбда.
За да го олесниме конфигурирањето на оваа ламбда, да воведеме нова променлива на животната средина - TOPIC_ARN, каде што го ставаме ANR (Имиња на ресурси на Амазон) на темата на авиокомпаниите:
И вметнете го ламбда кодот, воопшто не е комплицирано:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Се чини дека тука е завршена рачната конфигурација на системот. Останува само да тестираме и да се увериме дека сме конфигурирале сè правилно.
Распоредете од кодот Terraform
Неопходна подготовка
Terraform е многу удобна алатка со отворен код за распоредување на инфраструктура од код. Има своја синтакса која е лесна за учење и има многу примери за тоа како и што да се распореди. Уредникот Atom или Visual Studio Code има многу практични приклучоци што ја олеснуваат работата со Terraform.
Можете да ја преземете дистрибуцијата оттука. Деталната анализа на сите способности на Terraform е надвор од опсегот на овој напис, па затоа ќе се ограничиме на главните точки.
Како да започнете
Целосниот код на проектот е во моето складиште. Ние го клонираме складиштето за себе. Пред да започнете, треба да бидете сигурни дека имате инсталирано и конфигурирано AWS CLI, бидејќи ... Terraform ќе бара ингеренции во датотеката ~/.aws/credentials.
Добра практика е да ја извршите командата за план пред да ја распоредите целата инфраструктура за да видите што Тераформ моментално создава за нас во облакот:
terraform.exe plan
Ќе ви биде побарано да внесете телефонски број на кој ќе испраќате известувања. Не е неопходно да се внесе во оваа фаза.
Откако го анализиравме оперативниот план на програмата, можеме да започнеме да создаваме ресурси:
terraform.exe apply
По испраќањето на оваа команда, повторно ќе биде побарано да внесете телефонски број; бирајте „да“ кога ќе се прикаже прашање за навистина извршување на дејствата. Ова ќе ви овозможи да ја поставите целата инфраструктура, да ја извршите целата потребна конфигурација на EC2, да распоредите ламбда функции итн.
Откако сите ресурси се успешно креирани преку кодот Terraform, треба да влезете во деталите за апликацијата Kinesis Analytics (за жал, не најдов како да го направам тоа директно од кодот).
Стартувајте ја апликацијата:
После ова, мора експлицитно да го поставите името на преносот во апликацијата со избирање од паѓачката листа:
Сега сè е подготвено да тргне.
Тестирање на апликацијата
Без оглед на тоа како сте го распоредиле системот, рачно или преку Terraform кодот, тој ќе работи исто.
Се најавуваме преку SSH на виртуелната машина EC2 каде што е инсталиран Kinesis Agent и ја извршуваме скриптата api_caller.py
sudo ./api_caller.py TOKEN
Сè што треба да направите е да чекате СМС на вашиот број:
СМС - пораката пристигнува на телефонот за скоро 1 минута:
Останува да видиме дали записите се зачувани во базата на податоци DynamoDB за последователна, подетална анализа. Табелата за авионски билети ги содржи приближно следните податоци:
Заклучок
Во текот на сработеното, беше изграден систем за обработка на податоци преку Интернет базиран на Amazon Kinesis. Беа разгледани опциите за користење на Kinesis Agent во врска со Kinesis Data Streams и аналитиката во реално време Kinesis Analytics со помош на SQL команди, како и интеракцијата на Amazon Kinesis со други услуги AWS.
Го употребивме горенаведениот систем на два начина: прилично долг рачен и брз од кодот Terraform.