Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Еј Хабр!

Дали сакате да летате со авиони? Го сакам тоа, но за време на самоизолацијата се заљубив и во анализирањето на податоците за авионските билети од еден добро познат извор - Aviasales.

Денес ќе ја анализираме работата на Amazon Kinesis, ќе изградиме систем за стриминг со аналитика во реално време, ќе ја инсталираме базата на податоци на Amazon DynamoDB NoSQL како главно складирање на податоци и ќе поставиме СМС известувања за интересни билети.

Сите детали се под рез! Оди!

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Вовед

На пример, ни треба пристап до Aviasales API. Пристапот до него е обезбеден бесплатно и без ограничувања; само треба да се регистрирате во делот „Програмери“ за да го добиете вашиот API токен за пристап до податоците.

Главната цел на овој напис е да даде општо разбирање за употребата на пренос на информации во AWS; ние земаме предвид дека податоците вратени од користениот API не се строго ажурирани и се пренесуваат од кешот, што е формирана врз основа на пребарувања на корисниците на страниците Aviasales.ru и Jetradar.com во последните 48 часа.

Kinesis-agent, инсталиран на машината за производство, примен преку API автоматски ќе ги анализира и пренесува податоците до саканиот поток преку Kinesis Data Analytics. Суровата верзија на овој пренос ќе биде напишана директно во продавницата. Складирањето необработени податоци распоредено во DynamoDB ќе овозможи подлабока анализа на билетите преку алатките за БИ, како што е AWS Quick Sight.

Ќе разгледаме две опции за распоредување на целата инфраструктура:

  • Прирачник - преку AWS Management Console;
  • Инфраструктурата од Terraform кодот е за мрзливи автомати;

Архитектура на развиениот систем

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Користени компоненти:

  • Aviasales API — податоците вратени од овој API ќе се користат за целата наредна работа;
  • EC2 продуцент пример — обична виртуелна машина во облакот на која ќе се генерира влезниот проток на податоци:
    • Кинезис агент е Java апликација инсталирана локално на машината која обезбедува лесен начин за собирање и испраќање податоци до Kinesis (Kinesis Data Streams или Kinesis Firehose). Агентот постојано следи збир на датотеки во наведените директориуми и испраќа нови податоци до Kinesis;
    • Скрипта за повикувач на API — Python скрипта што испраќа барања до API и го става одговорот во папка што е надгледувана од Kinesis Agent;
  • Кинезис текови на податоци — услуга за пренос на податоци во реално време со широки можности за скалирање;
  • Анализа на кинезис е услуга без сервер која ја поедноставува анализата на стриминг податоци во реално време. Amazon Kinesis Data Analytics ги конфигурира ресурсите на апликацијата и автоматски се зголемува за да се справи со кој било обем на дојдовни податоци;
  • AWS Ламбда — услуга која ви овозможува да извршувате код без да правите резервна копија или да поставувате сервери. Целата компјутерска моќ автоматски се намалува за секој повик;
  • Amazon DynamoDB - База на податоци со парови и документи со клучеви и вредности што обезбедува латентност помала од 10 милисекунди кога работи во која било скала. Кога користите DynamoDB, не треба да обезбедувате, закрпите или управувате со какви било сервери. DynamoDB автоматски ги зголемува табелите за да ја прилагоди количината на достапни ресурси и да одржува високи перформанси. Не е потребна системска администрација;
  • Амазон СНС - целосно управувана услуга за испраќање пораки со користење на моделот издавач-претплатник (Pub/Sub), со кој можете да изолирате микросервиси, дистрибуирани системи и апликации без сервер. SNS може да се користи за испраќање информации до крајните корисници преку мобилни притисни известувања, СМС пораки и е-пошта.

Почетна обука

За да го имитирам протокот на податоци, решив да ги користам информациите за авионски билети вратени од API-то на Aviasales. ВО документација доста обемна листа на различни методи, ајде да земеме еден од нив - „Месечен календар на цени“, кој ги враќа цените за секој ден од месецот, групирани според бројот на трансфери. Ако не го наведете месецот на пребарување во барањето, информациите ќе бидат вратени за месецот што следи по тековниот.

Значи, ајде да се регистрираме и да го добиеме нашиот токен.

Пример барање е подолу:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Горенаведениот метод за примање податоци од API со назначување токен во барањето ќе работи, но јас претпочитам да го префрлам токенот за пристап преку заглавието, така што ќе го користиме овој метод во скриптата api_caller.py.

Пример за одговор:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Примерот API одговор погоре покажува билет од Санкт Петербург до Пук... О, каков сон...
Бидејќи јас сум од Казан, а Пукет сега е „само сон“, ајде да бараме билети од Санкт Петербург до Казан.

Претпоставува дека веќе имате сметка AWS. Би сакал веднаш да привлечам посебно внимание на фактот дека Kinesis и испраќањето известувања преку СМС не се вклучени во годишниот Бесплатно ниво (бесплатна употреба). Но, и покрај ова, со неколку долари на ум, сосема е можно да се изгради предложениот систем и да се игра со него. И, се разбира, не заборавајте да ги избришете сите ресурси откако повеќе не се потребни.

За среќа, функциите DynamoDb и ламбда ќе ни бидат бесплатни доколку ги исполниме нашите месечни бесплатни граници. На пример, за DynamoDB: 25 GB простор, 25 WCU/RCU и 100 милиони барања. И милион повици со ламбда функција месечно.

Рачно распоредување на системот

Поставување на Kinesis податочни текови

Ајде да одиме на услугата Kinesis Data Streams и да создадеме два нови текови, по еден фрагмент за секој.

Што е фрагмент?
Шард е основната единица за пренос на податоци на потокот на Amazon Kinesis. Еден сегмент обезбедува пренос на влезни податоци со брзина од 1 MB/s и пренос на излезни податоци со брзина од 2 MB/s. Еден сегмент поддржува до 1000 PUT записи во секунда. Кога креирате поток на податоци, треба да го наведете потребниот број на сегменти. На пример, можете да креирате поток на податоци со два сегменти. Овој поток на податоци ќе обезбеди влезни пренос на податоци со 2 MB/s и излезни пренос на податоци со 4 MB/s, поддржувајќи до 2000 PUT записи во секунда.

Колку повеќе парчиња во вашиот поток, толку е поголема неговата пропусност. Во принцип, вака се скалираат тековите - со додавање на парчиња. Но, колку повеќе парчиња имате, толку е поголема цената. Секој фрагмент чини 1,5 центи на час и дополнителни 1.4 центи за секој милион PUT единици на носивост.

Ајде да создадеме нов поток со името авиобилети, 1 парче ќе му биде доволно:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Сега ајде да создадеме друга нишка со името special_stream:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Поставување на производителот

За да се анализира задачата, доволно е да се користи обична EC2 инстанца како продуцент на податоци. Не мора да биде моќна, скапа виртуелна машина; spot t2.micro ќе биде добро.

Важна забелешка: на пример, треба да користите слика - Amazon Linux AMI 2018.03.0, има помалку поставки за брзо стартување на Kinesis Agent.

Одете во услугата EC2, креирајте нова виртуелна машина, изберете го саканиот AMI со тип t2.micro, кој е вклучен во Free Tier:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
За да може новосоздадената виртуелна машина да има интеракција со услугата Kinesis, мора да и се дадат права за тоа. Најдобар начин да го направите ова е да доделите улога на IAM. Затоа, на екранот Чекор 3: Конфигурирај ги деталите за примерот, треба да изберете Создадете нова улога на IAM:

Креирање на улога на IAM за EC2
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Во прозорецот што се отвора, изберете дека создаваме нова улога за EC2 и одете во делот Дозволи:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Користејќи го примерот за обука, не мора да навлегуваме во сите сложености на грануларна конфигурација на правата на ресурсите, па затоа ќе ги избереме политиките претходно конфигурирани од Amazon: AmazonKinesisFullAccess и CloudWatchFullAccess.

Ајде да дадеме некое значајно име за оваа улога, на пример: EC2-KinesisStreams-FullAccess. Резултатот треба да биде ист како што е прикажан на сликата подолу:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Откако ќе ја креирате оваа нова улога, не заборавајте да ја прикачите на примерот на креираната виртуелна машина:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Ние не менуваме ништо друго на овој екран и преминуваме на следните прозорци.

Поставките на хард дискот може да се остават како стандардни, како и ознаките (иако е добра практика да се користат ознаки, барем да се даде име на примерокот и да се означи околината).

Сега сме на табулаторот Чекор 6: Конфигурирај група за безбедност, каде што треба да креирате нова или да ја наведете постоечката група за безбедност, која ви овозможува да се поврзете преку ssh (порта 22) со примерокот. Изберете Source -> My IP таму и можете да ја стартувате инстанцата.

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Веднаш штом ќе се префрли на статусот на работа, може да се обидете да се поврзете со него преку ssh.

За да можете да работите со Kinesis Agent, по успешно поврзување со машината, мора да ги внесете следните команди во терминалот:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ајде да создадеме папка за да ги зачуваме одговорите на API:

sudo mkdir /var/log/airline_tickets

Пред да го стартувате агентот, треба да ја конфигурирате неговата конфигурација:

sudo vim /etc/aws-kinesis/agent.json

Содржината на датотеката agent.json треба да изгледа вака:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Како што може да се види од конфигурациската датотека, агентот ќе ги следи датотеките со наставката .log во директориумот /var/log/airline_tickets/, ќе ги анализира и ќе ги префрли во протокот на airline_tickets.

Ја рестартираме услугата и се уверуваме дека таа е вклучена и работи:

sudo service aws-kinesis-agent restart

Сега да ја преземеме Python скриптата што ќе бара податоци од API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Скриптата api_caller.py бара податоци од Aviasales и го зачувува добиениот одговор во директориумот што го скенира агентот Kinesis. Имплементацијата на оваа скрипта е сосема стандардна, постои класа TicketsApi, ви овозможува асинхроно да го повлечете API. Подаваме заглавие со токен и бараме параметри во оваа класа:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

За да ги тестираме точните поставки и функционалноста на агентот, ајде да ја тестираме скриптата api_caller.py:

sudo ./api_caller.py TOKEN

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
И го гледаме резултатот од работата во дневниците на агентите и на картичката Мониторинг во протокот на податоци за авионски билети:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Како што можете да видите, сè работи и Kinesis Agent успешно испраќа податоци до потокот. Сега ајде да го конфигурираме потрошувачот.

Поставување на кинезис анализа на податоци

Ајде да преминеме на централната компонента на целиот систем - креирајте нова апликација во Kinesis Data Analytics со име kinesis_analytics_airlines_app:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Kinesis Data Analytics ви овозможува да вршите аналитика на податоци во реално време од Kinesis Streams користејќи го јазикот SQL. Тоа е услуга за целосно автоматско скалирање (за разлика од Kinesis Streams) која:

  1. ви овозможува да креирате нови преноси (Излезен тек) врз основа на барања за извор на податоци;
  2. обезбедува пренос со грешки што се случиле додека се извршувале апликациите (Проток на грешка);
  3. може автоматски да ја одреди шемата за влезни податоци (може рачно да се редефинира доколку е потребно).

Ова не е евтина услуга - 0.11 американски долари за час работа, затоа треба внимателно да ја користите и да ја избришете кога ќе завршите.

Ајде да ја поврземе апликацијата со изворот на податоци:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Изберете го потокот на кој ќе се поврземе (авиокомпанија_тикети):

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Следно, треба да прикачите нова улога на IAM за да може апликацијата да чита од преносот и да пишува на преносот. За да го направите ова, доволно е да не менувате ништо во блокот за дозволи за пристап:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Сега да побараме откривање на шемата за податоци во потокот; за да го направите ова, кликнете на копчето „Откриј шема“. Како резултат на тоа, улогата на IAM ќе се ажурира (ќе се создаде нова) и ќе се стартува откривањето шема од податоците што веќе пристигнале во преносот:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Сега треба да отидете во SQL уредникот. Кога ќе кликнете на ова копче, ќе се појави прозорец со барање да ја стартувате апликацијата - изберете што сакате да стартувате:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Вметнете го следното едноставно барање во прозорецот на уредувачот SQL и кликнете Зачувај и Стартувај SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Во релационите бази на податоци, работите со табели користејќи INSERT изјави за додавање записи и SELECT изјава за барање податоци. Во Amazon Kinesis Data Analytics, работите со потоци (STREAMs) и пумпи (PUMPs) - континуирани барања за вметнување кои вметнуваат податоци од еден поток во апликација во друг пренос.

Барањето SQL претставено погоре бара билети за Аерофлот по цена под пет илјади рубли. Сите записи што ги исполнуваат овие услови ќе бидат ставени во преносот DESTINATION_SQL_STREAM.

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Во блокот Destination, изберете го преносот special_stream и во паѓачката листа DESTINATION_SQL_STREAM името на преносот во апликација:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Резултатот од сите манипулации треба да биде нешто слично на сликата подолу:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Креирање и претплата на тема за SNS

Одете во услугата за едноставни известувања и креирајте нова тема таму со името Авиокомпании:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Претплатете се на оваа тема и наведете го бројот на мобилниот телефон на кој ќе се испраќаат СМС известувања:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Направете табела во DynamoDB

За да ги зачувате необработените податоци од нивниот тек на airline_tickets, ајде да создадеме табела во DynamoDB со истото име. Ќе користиме record_id како примарен клуч:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Создавање колектор на ламбда функции

Ајде да создадеме ламбда функција наречена Колектор, чија задача ќе биде да го анкетира протокот на авионски билети и, доколку се најдат нови записи таму, да ги вметне овие записи во табелата DynamoDB. Очигледно, покрај стандардните права, оваа ламбда мора да има пристап за читање до протокот на податоци на Kinesis и пристап за пишување до DynamoDB.

Креирање IAM улога за колекторската ламбда функција
Прво, ајде да создадеме нова IAM улога за ламбда наречена Lambda-TicketsProcessingRole:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
За примерот за тестирање, претходно конфигурираните политики на AmazonKinesisReadOnlyAccess и AmazonDynamoDBFullAccess се сосема соодветни, како што е прикажано на сликата подолу:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Оваа ламбда треба да се активира со активирач од Kinesis кога новите записи ќе влезат во airline_stream, па затоа треба да додадеме нов активирач:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Останува само да се вметне кодот и да се зачува ламбда.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Креирање известувач за ламбда функција

На сличен начин е креирана и втората ламбда функција, која ќе го следи вториот поток (special_stream) и ќе испрати известување до SNS. Затоа, оваа ламбда мора да има пристап за читање од Kinesis и испраќање пораки до дадена тема на SNS, кои потоа ќе бидат испратени од SNS сервисот до сите претплатници на оваа тема (email, SMS итн.).

Креирање на улога на IAM
Прво, ја креираме улогата IAM Lambda-KinesisAlarm за оваа ламбда, а потоа ја доделуваме оваа улога на alarm_notifier ламбда што се создава:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Оваа ламбда треба да работи на активирањето за новите записи да влезат во special_stream, така што треба да го конфигурирате активирањето на ист начин како што направивме за колекторската ламбда.

За да го олесниме конфигурирањето на оваа ламбда, да воведеме нова променлива на животната средина - TOPIC_ARN, каде што го ставаме ANR (Имиња на ресурси на Амазон) на темата на авиокомпаниите:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
И вметнете го ламбда кодот, воопшто не е комплицирано:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Се чини дека тука е завршена рачната конфигурација на системот. Останува само да тестираме и да се увериме дека сме конфигурирале сè правилно.

Распоредете од кодот Terraform

Неопходна подготовка

Terraform е многу удобна алатка со отворен код за распоредување на инфраструктура од код. Има своја синтакса која е лесна за учење и има многу примери за тоа како и што да се распореди. Уредникот Atom или Visual Studio Code има многу практични приклучоци што ја олеснуваат работата со Terraform.

Можете да ја преземете дистрибуцијата оттука. Деталната анализа на сите способности на Terraform е надвор од опсегот на овој напис, па затоа ќе се ограничиме на главните точки.

Како да започнете

Целосниот код на проектот е во моето складиште. Ние го клонираме складиштето за себе. Пред да започнете, треба да бидете сигурни дека имате инсталирано и конфигурирано AWS CLI, бидејќи ... Terraform ќе бара ингеренции во датотеката ~/.aws/credentials.

Добра практика е да ја извршите командата за план пред да ја распоредите целата инфраструктура за да видите што Тераформ моментално создава за нас во облакот:

terraform.exe plan

Ќе ви биде побарано да внесете телефонски број на кој ќе испраќате известувања. Не е неопходно да се внесе во оваа фаза.

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Откако го анализиравме оперативниот план на програмата, можеме да започнеме да создаваме ресурси:

terraform.exe apply

По испраќањето на оваа команда, повторно ќе биде побарано да внесете телефонски број; бирајте „да“ кога ќе се прикаже прашање за навистина извршување на дејствата. Ова ќе ви овозможи да ја поставите целата инфраструктура, да ја извршите целата потребна конфигурација на EC2, да распоредите ламбда функции итн.

Откако сите ресурси се успешно креирани преку кодот Terraform, треба да влезете во деталите за апликацијата Kinesis Analytics (за жал, не најдов како да го направам тоа директно од кодот).

Стартувајте ја апликацијата:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
После ова, мора експлицитно да го поставите името на преносот во апликацијата со избирање од паѓачката листа:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Сега сè е подготвено да тргне.

Тестирање на апликацијата

Без оглед на тоа како сте го распоредиле системот, рачно или преку Terraform кодот, тој ќе работи исто.

Се најавуваме преку SSH на виртуелната машина EC2 каде што е инсталиран Kinesis Agent и ја извршуваме скриптата api_caller.py

sudo ./api_caller.py TOKEN

Сè што треба да направите е да чекате СМС на вашиот број:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
СМС - пораката пристигнува на телефонот за скоро 1 минута:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер
Останува да видиме дали записите се зачувани во базата на податоци DynamoDB за последователна, подетална анализа. Табелата за авионски билети ги содржи приближно следните податоци:

Интеграција на Aviasales API со Amazon Kinesis и едноставност без сервер

Заклучок

Во текот на сработеното, беше изграден систем за обработка на податоци преку Интернет базиран на Amazon Kinesis. Беа разгледани опциите за користење на Kinesis Agent во врска со Kinesis Data Streams и аналитиката во реално време Kinesis Analytics со помош на SQL команди, како и интеракцијата на Amazon Kinesis со други услуги AWS.

Го употребивме горенаведениот систем на два начина: прилично долг рачен и брз од кодот Terraform.

Целиот изворен код на проектот е достапен во моето складиште на GitHub, Ви предлагам да се запознаете со него.

Среќен сум да разговарам за статијата, со нетрпение ги очекувам вашите коментари. Се надевам на конструктивна критика.

Ви посакувам успех!

Извор: www.habr.com

Додадете коментар