Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Хей Хабр!

Харесвате ли да летите със самолети? Обичам го, но по време на самоизолацията се влюбих и в анализирането на данни за самолетни билети от един добре познат ресурс - Aviasales.

Днес ще анализираме работата на Amazon Kinesis, ще изградим система за стрийминг с анализи в реално време, ще инсталираме базата данни Amazon DynamoDB NoSQL като основно хранилище за данни и ще настроим SMS известия за интересни билети.

Всички детайли са под кройката! Отивам!

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

въведение

За примера се нуждаем от достъп до API на Aviasales. Достъпът до него се предоставя безплатно и без ограничения, просто трябва да се регистрирате в секцията „Разработчици“, за да получите своя API токен за достъп до данните.

Основната цел на тази статия е да даде общо разбиране за използването на поточно предаване на информация в AWS; ние вземаме предвид, че данните, върнати от използвания API, не са строго актуални и се предават от кеша, който е формирана въз основа на търсения от потребители на сайтовете Aviasales.ru и Jetradar.com за последните 48 часа.

Kinesis-agent, инсталиран на машината за производство, получен чрез API автоматично ще анализира и предава данни към желания поток чрез Kinesis Data Analytics. Необработената версия на този поток ще бъде записана директно в магазина. Съхранението на необработени данни, внедрено в DynamoDB, ще позволи по-задълбочен анализ на билети чрез BI инструменти, като AWS Quick Sight.

Ще разгледаме два варианта за разполагане на цялата инфраструктура:

  • Ръчно - през AWS Management Console;
  • Инфраструктурата от кода на Terraform е за мързеливи автоматизатори;

Архитектурата на разработената система

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Използвани компоненти:

  • API на Aviasales — данните, върнати от този API, ще бъдат използвани за цялата последваща работа;
  • Екземпляр на производителя на EC2 — обикновена виртуална машина в облака, на която ще се генерира входният поток от данни:
    • Kinesis агент е Java приложение, инсталирано локално на машината, което предоставя лесен начин за събиране и изпращане на данни към Kinesis (Kinesis Data Streams или Kinesis Firehose). Агентът постоянно следи набор от файлове в посочените директории и изпраща нови данни на Kinesis;
    • API Caller скрипт — Python скрипт, който прави заявки към API и поставя отговора в папка, която се наблюдава от Kinesis Agent;
  • Потоци данни Kinesis — услуга за поточно предаване на данни в реално време с широки възможности за мащабиране;
  • Kinesis Analytics е услуга без сървър, която опростява анализа на поточни данни в реално време. Amazon Kinesis Data Analytics конфигурира ресурсите на приложението и автоматично се мащабира, за да обработва всякакъв обем входящи данни;
  • AWS Lambda — услуга, която ви позволява да изпълнявате код без архивиране или настройка на сървъри. Цялата изчислителна мощност се мащабира автоматично за всяко повикване;
  • Amazon DynamoDB - База данни от двойки ключ-стойност и документи, която осигурява латентност от по-малко от 10 милисекунди, когато се изпълнява във всякакъв мащаб. Когато използвате DynamoDB, не е необходимо да предоставяте, кръпвате или управлявате каквито и да било сървъри. DynamoDB автоматично мащабира таблиците, за да коригира количеството налични ресурси и да поддържа висока производителност. Не е необходима системна администрация;
  • Amazon SNS - напълно управлявана услуга за изпращане на съобщения чрез модела издател-абонат (Pub/Sub), с която можете да изолирате микроуслуги, разпределени системи и безсървърни приложения. SNS може да се използва за изпращане на информация до крайни потребители чрез мобилни насочени известия, SMS съобщения и имейли.

Първоначално обучение

За да емулирам потока от данни, реших да използвам информацията за самолетните билети, върната от API на Aviasales. IN документация доста обширен списък от различни методи, нека вземем един от тях - „Календар на месечните цени“, който връща цените за всеки ден от месеца, групирани по броя на трансферите. Ако не посочите месеца на търсене в заявката, ще бъде върната информация за месеца, следващ текущия.

Така че, нека да се регистрираме и да вземем нашия жетон.

Примерна заявка е по-долу:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Горният метод за получаване на данни от API чрез посочване на токен в заявката ще работи, но предпочитам да предам токена за достъп през заглавката, така че ще използваме този метод в скрипта api_caller.py.

Примерен отговор:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Примерният отговор на API по-горе показва билет от Санкт Петербург до Пук... О, каква мечта...
Тъй като съм от Казан и Пукет сега е „само мечта“, нека потърсим билети от Санкт Петербург до Казан.

Предполага се, че вече имате акаунт в AWS. Бих искал веднага да обърна специално внимание на факта, че Kinesis и изпращането на известия чрез SMS не са включени в годишния Безплатно ниво (безплатно използване). Но дори въпреки това, имайки предвид няколко долара, е напълно възможно да се изгради предложената система и да се играе с нея. И, разбира се, не забравяйте да изтриете всички ресурси, след като вече не са необходими.

За щастие функциите DynamoDb и lambda ще бъдат безплатни за нас, ако спазим месечните си безплатни лимити. Например за DynamoDB: 25 GB памет, 25 WCU/RCU и 100 милиона заявки. И милион извиквания на ламбда функция на месец.

Ръчно внедряване на системата

Настройване на Kinesis Data Streams

Нека да отидем в услугата Kinesis Data Streams и да създадем два нови потока, по един шард за всеки.

Какво е шард?
Шардът е основната единица за пренос на данни на поток на Amazon Kinesis. Един сегмент осигурява входен трансфер на данни със скорост 1 MB/s и изходен трансфер на данни със скорост 2 MB/s. Един сегмент поддържа до 1000 PUT записа в секунда. Когато създавате поток от данни, трябва да посочите необходимия брой сегменти. Например, можете да създадете поток от данни с два сегмента. Този поток от данни ще осигури трансфер на входни данни при 2 MB/s и трансфер на изходни данни при 4 MB/s, като поддържа до 2000 PUT записа в секунда.

Колкото повече фрагменти във вашия поток, толкова по-голяма е неговата пропускателна способност. Принципно така се мащабират потоците - чрез добавяне на шардове. Но колкото повече шардове имате, толкова по-висока е цената. Всеки шард струва 1,5 цента на час и допълнителни 1.4 цента за всеки милион единици полезен товар PUT.

Нека създадем нов поток с името самолетни билети, 1 шард ще му е достатъчен:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Сега нека създадем друга нишка с името специален_поток:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Настройка на производителя

За да анализирате задача, е достатъчно да използвате обикновен EC2 екземпляр като производител на данни. Не е задължително да е мощна, скъпа виртуална машина; спот t2.micro ще се справи добре.

Важна забележка: например трябва да използвате изображение - Amazon Linux AMI 2018.03.0, има по-малко настройки за бързо стартиране на Kinesis Agent.

Отидете на услугата EC2, създайте нова виртуална машина, изберете желания AMI с тип t2.micro, който е включен в Free Tier:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
За да може новосъздадената виртуална машина да взаимодейства с услугата Kinesis, трябва да й бъдат дадени права за това. Най-добрият начин да направите това е да присвоите IAM роля. Следователно на екрана Стъпка 3: Конфигуриране на подробности за екземпляра трябва да изберете Създайте нова IAM роля:

Създаване на IAM роля за EC2
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
В прозореца, който се отваря, изберете, че създаваме нова роля за EC2 и отидете в секцията Разрешения:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Използвайки примера за обучение, не е нужно да навлизаме във всички тънкости на детайлната конфигурация на правата за ресурси, така че ще изберем политиките, предварително конфигурирани от Amazon: AmazonKinesisFullAccess и CloudWatchFullAccess.

Нека дадем някакво смислено име за тази роля, например: EC2-KinesisStreams-FullAccess. Резултатът трябва да е същият, както е показано на снимката по-долу:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
След като създадете тази нова роля, не забравяйте да я прикачите към създадения екземпляр на виртуална машина:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Не променяме нищо друго на този екран и преминаваме към следващите прозорци.

Настройките на твърдия диск могат да бъдат оставени по подразбиране, както и таговете (въпреки че е добра практика да използвате тагове, поне дайте име на екземпляра и посочете средата).

Сега сме в раздела Стъпка 6: Конфигуриране на група за сигурност, където трябва да създадете нова или да посочите съществуващата група за сигурност, която ви позволява да се свържете чрез ssh (порт 22) към екземпляра. Изберете Източник -> Моят IP там и можете да стартирате екземпляра.

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Веднага щом превключи в състояние на работа, можете да опитате да се свържете с него чрез ssh.

За да можете да работите с Kinesis Agent, след успешно свързване с машината, трябва да въведете следните команди в терминала:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Нека създадем папка за запазване на отговорите на API:

sudo mkdir /var/log/airline_tickets

Преди да стартирате агента, трябва да конфигурирате неговата конфигурация:

sudo vim /etc/aws-kinesis/agent.json

Съдържанието на файла agent.json трябва да изглежда така:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Както може да се види от конфигурационния файл, агентът ще наблюдава файлове с разширение .log в директорията /var/log/airline_tickets/, ще ги анализира и ще ги прехвърля в потока airline_tickets.

Рестартираме услугата и се уверяваме, че тя работи:

sudo service aws-kinesis-agent restart

Сега нека изтеглим скрипта на Python, който ще изисква данни от API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Скриптът api_caller.py изисква данни от Aviasales и записва получения отговор в директорията, която агентът Kinesis сканира. Изпълнението на този скрипт е доста стандартно, има клас TicketsApi, който ви позволява да изтеглите API асинхронно. Предаваме заглавка с токен и изискваме параметри към този клас:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

За да тествате правилните настройки и функционалност на агента, нека изпробваме скрипта api_caller.py:

sudo ./api_caller.py TOKEN

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
И разглеждаме резултата от работата в регистрационните файлове на агента и в раздела Мониторинг в потока от данни airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Както можете да видите, всичко работи и Kinesis Agent успешно изпраща данни към потока. Сега нека конфигурираме потребителя.

Настройване на Kinesis Data Analytics

Нека да преминем към централния компонент на цялата система - създайте ново приложение в Kinesis Data Analytics с име kinesis_analytics_airlines_app:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Kinesis Data Analytics ви позволява да извършвате анализ на данни в реално време от Kinesis Streams, като използвате езика SQL. Това е услуга с напълно автоматично мащабиране (за разлика от Kinesis Streams), която:

  1. позволява ви да създавате нови потоци (Output Stream) въз основа на заявки към източник на данни;
  2. осигурява поток с грешки, възникнали по време на изпълнение на приложения (Error Stream);
  3. може автоматично да определи схемата на входните данни (може да бъде предефинирана ръчно, ако е необходимо).

Това не е евтина услуга - 0.11 USD на час работа, така че трябва да я използвате внимателно и да я изтриете, когато приключите.

Нека свържем приложението към източника на данни:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Изберете потока, към който ще се свържем (airline_tickets):

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
След това трябва да прикачите нова IAM роля, така че приложението да може да чете от потока и да пише в потока. За да направите това, достатъчно е да не променяте нищо в блока Разрешения за достъп:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Сега нека поискаме откриване на схемата за данни в потока; за да направите това, щракнете върху бутона „Откриване на схема“. В резултат на това IAM ролята ще бъде актуализирана (ще бъде създадена нова) и откриването на схема ще бъде стартирано от данните, които вече са пристигнали в потока:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Сега трябва да отидете в SQL редактора. Когато щракнете върху този бутон, ще се появи прозорец с молба да стартирате приложението - изберете какво искате да стартирате:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Вмъкнете следната проста заявка в прозореца на SQL редактора и щракнете върху Запазване и изпълнение на SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

В релационни бази данни вие работите с таблици, като използвате оператори INSERT за добавяне на записи и оператор SELECT за запитване към данни. В Amazon Kinesis Data Analytics работите с потоци (STREAMs) и помпи (PUMPs) – непрекъснати заявки за вмъкване, които вмъкват данни от един поток в приложение в друг поток.

Представената по-горе SQL заявка търси билети на Aeroflot на цена под пет хиляди рубли. Всички записи, които отговарят на тези условия, ще бъдат поставени в потока DESTINATION_SQL_STREAM.

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
В блока Destination изберете потока special_stream и в падащия списък с име на поток в приложението DESTINATION_SQL_STREAM:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Резултатът от всички манипулации трябва да бъде нещо подобно на снимката по-долу:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Създаване и абониране за SNS тема

Отидете в Simple Notification Service и създайте там нова тема с името Airlines:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Абонирайте се за тази тема и посочете мобилния телефонен номер, на който ще се изпращат SMS известия:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Създайте таблица в DynamoDB

За да съхраним необработените данни от техния поток airline_tickets, нека създадем таблица в DynamoDB със същото име. Ще използваме record_id като първичен ключ:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Създаване на колектор на ламбда функция

Нека създадем ламбда функция, наречена Collector, чиято задача ще бъде да анкетира потока airline_tickets и, ако бъдат намерени нови записи там, да вмъкне тези записи в таблицата DynamoDB. Очевидно, в допълнение към правата по подразбиране, тази ламбда трябва да има достъп за четене до потока от данни Kinesis и достъп за запис до DynamoDB.

Създаване на IAM роля за колекторната ламбда функция
Първо, нека създадем нова IAM роля за ламбда с име Lambda-TicketsProcessingRole:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
За тестовия пример предварително конфигурираните правила AmazonKinesisReadOnlyAccess и AmazonDynamoDBFullAccess са доста подходящи, както е показано на снимката по-долу:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Тази ламбда трябва да бъде стартирана от тригер от Kinesis, когато нови записи влязат в airline_stream, така че трябва да добавим нов тригер:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Остава само да поставите кода и да запазите ламбдата.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Създаване на уведомител за ламбда функция

Втората ламбда функция, която ще наблюдава втория поток (special_stream) и ще изпрати известие до SNS, се създава по подобен начин. Следователно тази ламбда трябва да има достъп за четене от Kinesis и изпращане на съобщения до дадена SNS тема, които след това ще бъдат изпратени от SNS услугата до всички абонати на тази тема (имейл, SMS и т.н.).

Създаване на IAM роля
Първо създаваме IAM ролята Lambda-KinesisAlarm за тази ламбда и след това присвояваме тази роля на създаваната ламбда alarm_notifier:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Тази ламбда трябва да работи на тригер за влизане на нови записи в special_stream, така че трябва да конфигурирате тригера по същия начин, както направихме за ламбда Collector.

За да улесним конфигурирането на тази ламбда, нека въведем нова променлива на средата - TOPIC_ARN, където поставяме ANR (имена на ресурси на Amazon) на темата Airlines:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
И вмъкнете ламбда кода, не е никак сложно:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Изглежда, че това е мястото, където ръчната конфигурация на системата е завършена. Остава само да тестваме и да се уверим, че сме конфигурирали всичко правилно.

Внедрете от кода на Terraform

Необходима подготовка

тераформира е много удобен инструмент с отворен код за внедряване на инфраструктура от код. Той има свой собствен синтаксис, който е лесен за научаване и има много примери за това как и какво да се внедри. Редакторът Atom или Visual Studio Code има много удобни добавки, които улесняват работата с Terraform.

Можете да изтеглите дистрибуцията следователно. Подробният анализ на всички възможности на Terraform е извън обхвата на тази статия, така че ще се ограничим до основните точки.

Как да започнем

Пълният код на проекта е в моето хранилище. Ние клонираме хранилището към себе си. Преди да започнете, трябва да се уверите, че имате инсталиран и конфигуриран AWS CLI, защото... Terraform ще търси идентификационни данни във файла ~/.aws/credentials.

Добра практика е да стартирате командата за планиране, преди да разположите цялата инфраструктура, за да видите какво създава Terraform в момента за нас в облака:

terraform.exe plan

Ще бъдете подканени да въведете телефонен номер, на който да изпращате известия. Не е необходимо да го въвеждате на този етап.

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
След като анализирахме плана за работа на програмата, можем да започнем да създаваме ресурси:

terraform.exe apply

След като изпратите тази команда, отново ще бъдете помолени да въведете телефонен номер; наберете „да“, когато се покаже въпрос относно действителното извършване на действията. Това ще ви позволи да настроите цялата инфраструктура, да извършите цялата необходима конфигурация на EC2, да разположите ламбда функции и т.н.

След като всички ресурси са създадени успешно чрез кода на Terraform, трябва да влезете в подробностите на приложението Kinesis Analytics (за съжаление не намерих как да направя това директно от кода).

Стартирайте приложението:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
След това трябва изрично да зададете името на потока в приложението, като изберете от падащия списък:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Сега всичко е готово за работа.

Тестване на приложението

Независимо как сте разположили системата, ръчно или чрез кода на Terraform, тя ще работи по същия начин.

Влизаме чрез SSH във виртуалната машина EC2, където е инсталиран Kinesis Agent и стартираме скрипта api_caller.py

sudo ./api_caller.py TOKEN

Всичко, което трябва да направите, е да изчакате SMS на вашия номер:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
SMS - съобщението пристига на телефона за почти 1 минута:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър
Остава да видим дали записите са били запазени в базата данни DynamoDB за последващ, по-подробен анализ. Таблицата airline_tickets съдържа приблизително следните данни:

Интегриране на API на Aviasales с Amazon Kinesis и простота без сървър

Заключение

В хода на извършената работа беше изградена онлайн система за обработка на данни, базирана на Amazon Kinesis. Бяха разгледани опции за използване на Kinesis Agent във връзка с Kinesis Data Streams и анализи в реално време Kinesis Analytics с помощта на SQL команди, както и взаимодействието на Amazon Kinesis с други AWS услуги.

Ние внедрихме горната система по два начина: доста дълъг ръчен и бърз от кода на Terraform.

Целият изходен код на проекта е наличен в моето хранилище на GitHub, предлагам ви да се запознаете с него.

Радвам се да обсъдя статията, очаквам вашите коментари. Надявам се на градивна критика.

Желая ви успех!

Източник: www.habr.com

Добавяне на нов коментар