Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Прывітанне, Хабр!

А вы любіце лятаць на самалётах? Я люблю, але на самаізаляцыі палюбіў яшчэ і аналізаваць дадзеныя аб авіябілетах аднаго вядомага рэсурсу – Aviasales.

Сёння мы разбяром працу Amazon Kinesis, пабудуем стрыммінгавую сістэму з рэал-тайм аналітыкай, паставім NoSQL базу дадзеных Amazon DynamoDB у якасці асноўнага сховішчы дадзеных і наладзім апавяшчэнне праз SMS па цікавых квітках.

Усе падрабязнасці пад катом! Паехалі!

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Увядзенне

Для прыкладу нам спатрэбіцца доступ да API Aviasales. Доступ да яго прадастаўляецца бясплатна і без абмежаванняў, неабходна толькі зарэгістравацца ў раздзеле "Распрацоўшчыкам", каб атрымаць свой API токен для доступу да дадзеных.

Асноўная мэта дадзенага артыкула – даць агульнае разуменне выкарыстання струменевай перадачы інфармацыі ў AWS, мы выносім за дужкі, што дадзеныя, якія вяртаюцца выкарыстоўваным API не з'яўляюцца строга актуальнымі і перадаюцца з кэша, які фармуецца на падставе пошукаў карыстачоў сайтаў Aviasales.ru і Jetradar.com за апошнія 48 гадзін.

Атрыманыя праз API дадзеныя аб авіябілетах Kinesis-agent, усталяваны на машыне-прадзюсары, будзе аўтаматам парсіць і перадаваць у патрэбны струмень праз Kinesis Data Analytics. Неапрацаваная версія гэтага патоку будзе пісацца напрамую ў сховішча. Разгорнутае ў DynamoDB сховішча "волкіх" дадзеных дазволіць праводзіць глыбейшы ​​аналіз квіткоў праз BI прылады, напрыклад, AWS Quick Sight.

Мы разгледзім два варыянты дэплою ўсёй інфраструктуры:

  • Ручны - праз AWS Management Console;
  • Інфраструктура з кода Terraform – для лянівых аўтаматызатараў;

Архітэктура распрацоўванай сістэмы

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Выкарыстоўваныя кампаненты:

  • Aviasales API - дадзеныя, якія вяртаюцца гэтым API, будуць выкарыстоўвацца для ўсёй наступнай працы;
  • EC2 Producer Instance - звычайная віртуальная машына ў воблаку, на якой будзе генеравацца ўваходны паток дадзеных:
    • Kinesis Agent - гэта Java-дадатак, якое ўстанаўліваецца лакальна на машыну, якое дае просты спосаб збору і адпраўкі дадзеных у Kinesis (Kinesis Data Streams або Kinesis Firehose). Агент увесь час адсочвае набор файлаў у паказаных дырэкторыях і адпраўляе новыя дадзеныя ў Kinesis;
    • Скрыпт API Caller - Python-скрыпт, які робіць запыты да API і які складае адказ у тэчку, якую маніторыць Kinesis Agent;
  • Kinesis Data Streams - сэрвіс струменевай перадачы дадзеных у рэжыме рэальнага часу з шырокімі магчымасцямі маштабавання;
  • Kinesis Analytics - бессерверны сэрвіс, які спрашчае аналіз струменевых дадзеных у рэжыме рэальнага часу. Amazon Kinesis Data Analytics наладжвае рэсурсы для працы прыкладанняў і аўтаматычна маштабуецца для апрацоўкі любых аб'ёмаў уваходных дадзеных;
  • AWS лямбда - сэрвіс, які дазваляе запускаць код без рэзервавання і налады сервераў. Усе вылічальныя магутнасці аўтаматычна маштабуюцца пад кожны выклік;
  • Amazon DynamoDB - база дадзеных пар "ключ-значэнне" і дакументаў, якая забяспечвае затрымку менш за 10 мілісекунд пры працы ў любым маштабе. Пры выкарыстанні DynamoDB не патрабуецца размяркоўваць якія-небудзь серверы, усталёўваць на іх выпраўленні ці кіраваць імі. DynamoDB аўтаматычна маштабуе табліцы, карэктуючы аб'ём даступных рэсурсаў і захоўваючы высокую прадукцыйнасць. Ніякія дзеянні па адміністраванні сістэмы не патрабуюцца;
  • Amazon SNS – цалкам кіраваны сэрвіс адпраўкі паведамленняў па мадэлі «выдавец – падпісант» (Pub/Sub), з дапамогай якога можна ізаляваць мікрасэрвісы, размеркаваныя сістэмы і бессерверныя прыкладанні. SNS можна выкарыстоўваць для рассылання інфармацыі канчатковым карыстальнікам з дапамогай мабільных push-паведамленняў, SMS-паведамленняў і электронных лістоў.

Пачатковая падрыхтоўка

Для эмуляцыі патоку дадзеных я вырашыў выкарыстоўваць інфармацыю аб авіябілетах, якая вяртаецца API Aviasales. У дакументацыі даволі шырокі спіс розных метадаў, возьмем адзін з іх - «Каляндар коштаў на месяц», які вяртае кошты за кожны дзень месяца, згрупаваныя па колькасці перасадак. Калі не перадаваць у запыце месяц пошуку, то будзе вернутая інфармацыя за месяц, наступны за бягучым.

Такім чынам, рэгіструемся, атрымліваем свой токен.

Прыклад запыту ніжэй:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Вышэйапісаны спосаб атрымання дадзеных ад API з указаннем токена ў запыце будзе працаваць, але мне больш падабаецца перадаваць токен доступу праз загаловак, таму ў скрыпце api_caller.py будзем карыстацца менавіта гэтым спосабам.

Прыклад адказу:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

У прыкладзе адказу API вышэй паказаны білет з Санкт-Пецярбурга ў Пхук… Эх, ды што марыць…
Так як я з Казані, а Пхукет зараз «нам толькі сніцца», пашукаем квіткі з Санкт-Пецярбурга ў Казань.

Мяркуецца, што ў вас ужо ёсць рахунак на AWS. Адразу хачу звярнуць асаблівую ўвагу, што Kinesis і адпраўка апавяшчэнняў праз SMS не ўваходзяць у гадавы Free Tier (бясплатнае выкарыстанне). Але нават нягледзячы на ​​гэта, заклаўшы ў розуме пары даляраў, суцэль можна пабудаваць прапанаваную сістэму і пагуляць з ёй. І, вядома ж, не варта забываць выдаляць усе рэсурсы пасля таго, як яны сталі не патрэбны.

На шчасце, DynamoDb і лямбда-функцыі будуць для нас умоўна бясплатнымі, калі ўкласціся ў месячныя бясплатныя ліміты. Напрыклад, для DynamoDB: 25 Гб сховішчы, 25 WCU/RCU і 100 млн. запытаў. І мільён выклікаў лямбда функцый у месяц.

Ручны дэплой сістэмы

Настройка Kinesis Data Streams

Пяройдзем у сэрвіс Kinesis Data Streams і ствараем два новых патоку па адным шардзе на кожны.

Што такое шард?
Шард - гэта асноўная адзінка перадачы дадзеных патоку Amazon Kinesis. Адзін сегмент забяспечвае перадачу ўваходных дадзеных са хуткасцю 1 МБ/С і перадачу выходных дадзеных са хуткасцю 2 МБ/С. Адзін сегмент падтрымлівае да 1000 запісаў PUT у секунду. Пры стварэнні струменя дадзеных патрабуецца паказаць патрэбную колькасць сегментаў. Напрыклад, можна стварыць струмень дадзеных з двума сегментамі. Гэты струмень дадзеных забяспечыць перадачу ўваходных дадзеных са хуткасцю 2 МБ/С і перадачу выходных дадзеных са хуткасцю 4 МБ/С з падтрымкай да 2000 запісаў PUT у секунду.

Чым больш шардов у вашым струмені - тым больш яго прапускная здольнасць. У прынцыпе, так і маштабуюцца патокі - шляхам дадання шардаў. Але чым больш у вас шардаў, тым вышэй і кошт. Кожны шард каштуе 1,5 цэнта ў гадзіну і дадаткова 1.4/XNUMX цэнта за кожныя мільён аперацый дадання ў струмень (PUT payload units).

Створым новы паток з імем airline_tickets, яму цалкам дастаткова будзе 1 шарда:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Цяпер створым яшчэ адзін паток з імем special_stream:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Настройка прадзюсара

У якасці прадзюсара дадзеных для разбору задачы дастаткова выкарыстоўваць звычайны EC2 інстанс. Гэта не павінна быць магутная дарагая віртуальная машына, суцэль падыдзе спотавых t2.micro.

Важная заўвага: для прыкладу варта выкарыстоўваць image – Amazon Linux AMI 2018.03.0, з ім менш налад для хуткага запуску Kinesis Agent.

Пераходзім у сэрвіс EC2, ствараем новую віртуальную машыну, выбіраемы патрэбны AMI з тыпам t2.micro, які ўваходзіць ва Free Tier:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Для таго, каб ізноў створаная віртуальная машына змагла ўзаемадзейнічаць з сэрвісам Kinesis, неабходна даць ёй на гэтае права. Лепшы спосаб гэта зрабіць - прызначыць IAM Role. Таму, на экране Step 3: Configure Instance Details трэба абраць Create new IAM Role:

Стварэнне IAM ролі для EC2
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
У якое адкрылася акне, выбіраемы, што новую ролю ствараем для EC2 і пераходзім у падзел Permissions:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
На вучэбным прыкладзе можна не ўдавацца ва ўсе тонкасці гранулярнай наладкі правоў на рэсурсы, таму выберам праднастроеныя Амазонам полісі: AmazonKinesisFullAccess і CloudWatchFullAccess.

Дамо якое-небудзь асэнсаванае імя для гэтай ролі, напрыклад: EC2-KinesisStreams-FullAccess. У выніку, павінна атрымацца тое ж самае, што паказана на малюнку ніжэй:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Пасля стварэння гэтай новай ролі, не забываем прычапіць яе да стваранага інстанса віртуальнай машыны:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Больш на гэтым экране нічога не мяняем і пераходзім да наступных вокнаў.

Параметры жорсткага дыска можна пакінуць па змаўчанні, тэгі таксама (хоць, добрай практыкай з'яўляецца тэгі выкарыстоўваць, хаця б даваць імя інстансу і ўказваць энвайранмент).

Цяпер мы на закладцы Step 6: Configure Security Group, дзе неабходна стварыць новы або паказаць наяўны ў вас Sеcurity group, які дазваляе рабіць канэкт праз ssh (порт 22) на інстанс. Абярыце там Source —> My IP і можаце запускаць інстанс.

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Як толькі ён пяройдзе ў статут running, можна спрабаваць законнектіться на яго праз ssh.

Каб атрымаць магчымасць працы з Kinesis Agent, пасля паспяховага канэкту да машыны, неабходна ўвесці наступныя каманды ў тэрмінале:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Створым тэчку для захавання адказаў API:

sudo mkdir /var/log/airline_tickets

Перад запускам агента, неабходна наладзіць яго канфіг:

sudo vim /etc/aws-kinesis/agent.json

Утрыманне файла agent.json павінна мець наступны выгляд:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Як відаць з файла канфігурацыі, агент будзе маніторыць у дырэкторыі /var/log/airline_tickets/ файлы з пашырэннем .log, парсіць іх і перадаваць у струмень airline_tickets.

Перазапускаем сэрвіс і пераконваемся, што ён запусціўся і працуе:

sudo service aws-kinesis-agent restart

Цяпер спампуем Python-скрыпт, які будзе запытваць дадзеныя ў API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Скрыпт api_caller.py запытвае дадзеныя ў Aviasales і захоўвае атрыманы адказ у дырэкторыі, якую скануе Kinesis agent. Рэалізацыя гэтага скрыпту дастаткова стандартная, ёсць клас TicketsApi, ён дазваляе асінхронна тузаць API. У гэты клас перадаем загаловак з токенам і параметры запыту:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Для тэставання правільнасці налад і працаздольнасці агента зробім тэставы запуск скрыпту api_caller.py:

sudo ./api_caller.py TOKEN

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
І глядзім вынік працы ў логах Агента і на закладцы Monitoring у струмені дадзеных airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Як відаць, усё працуе і Kinesis Agent паспяхова адпраўляе дадзеныя ў струмень. Цяпер наладзім consumer.

Настройка Kinesis Data Analytics

Пяройдзем да цэнтральнага кампанента ўсёй сістэмы - створым новае прыкладанне ў Kinesis Data Analytics з імем kinesis_analytics_airlines_app:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Kinesis Data Analytics дазваляе выконваць аналітыку дадзеных у рэальным часе з Kinesis Streams з дапамогай мовы SQL. Гэта цалкам аўтамаштабаваны сэрвіс (у адрозненне ад Kinesis Streams), які:

  1. дазваляе ствараць новыя плыні (Output Stream) на аснове запытаў да зыходных дадзеных;
  2. падае струмень з памылкамі, якія ўзніклі падчас працы прыкладанняў (Error Stream);
  3. умее аўтаматычна вызначаць схему ўваходных дадзеных (яе можна ўручную перавызначыць пры неабходнасці).

Гэта нятанны сэрвіс - 0.11 USD за гадзіну працы, таму карыстацца ім варта акуратна і выдаляць пры завяршэнні працы.

Падключым дадатак да крыніцы дадзеных:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Выбіраемы струмень, да якога збіраемся падлучыцца (airline_tickets):

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Далей, неабходна прынадаць новую IAM Роля для таго, каб прыкладанне магло чытаць з патоку і пісаць у паток. Для гэтага дастаткова нічога не мяняць у блоку Access permissions:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Цяпер запытаем выяўленне схемы дадзеных у струмені, для гэтага націскаем на кнопку "Discover schema". У выніку абновіцца (створыцца новая) роля IAM і будзе запушчана выяўленне схемы з дадзеных, якія ўжо прыляцелі ў струмень:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Цяпер неабходна перайсці ў рэдактар ​​SQL. Пры націску на гэтую кнопку, выйдзе акно з пытаннем аб запуску прыкладання - выбіраемы што жадаем запусціць:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
У акно рэдактара SQL уставім такі просты запыт і націсканы Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

У рэляцыйных базах дадзеных вы працуеце з табліцамі, выкарыстоўваючы аператары INSERT для дадання запісаў і аператар SELECT для запыту даных. У Amazon Kinesis Data Analytics вы працуеце з струменямі (STREAM) і "помпамі" (PUMP) - бесперапыннымі запытамі ўстаўкі, якія ўстаўляюць дадзеныя з аднаго патоку ў дадатку ў іншы паток.

У прадстаўленым вышэй SQL запыце адбываецца пошук квіткоў Аэрафлота па кошце ніжэй пяці тысяч рублёў. Усе запісы, якія трапляюць пад гэтыя ўмовы, будуць змешчаныя ў струмень DESTINATION_SQL_STREAM.

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
У блоку Destination выбіраемы струмень special_stream, а ў які расчыняецца спісе In-application stream name DESTINATION_SQL_STREAM:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
У выніку ўсіх маніпуляцый павінна атрымацца нешта падобнае на карцінку ніжэй:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Стварэнне і падпіска на топік SNS

Пераходзім у сэрвіс Simple Notification Service і ствараем тамака новы топік з імем Airlines:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Афармляем падпіску на гэты топік, у ёй паказваем нумар мабільнага тэлефона, на які будуць прыходзіць СМС-паведамленні:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Стварэнне табліцы ў DynamoDB

Для захоўвання неапрацаваных дадзеных іх струменя airline_tickets, створым табліцу ў DynamoDB з такім жа імем. У якасці першаснага ключа будзем выкарыстоўваць record_id:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Стварэнне лямбда-функцыі collector

Створым лямбда-функцыю пад назовам Collector, задачай якой будзе апытанне струменя airline_tickets і, у выпадку знаходжання тамака новых запісаў, устаўка гэтых запісаў у табліцу DynamoDB. Відавочна, што апроч правоў па змаўчанні, гэтая лямбда павінна мець доступ да чытання струменя дадзеных Kinesis і запісы ў DynamoDB.

Стварэнне IAM ролі для лямбда-функцыі collector
Для пачатку створым новую IAM роля для лямбды з імем Lambda-TicketsProcessingRole:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Для тэставага прыкладу цалкам падыдуць пераднастроеныя полісі AmazonKinesisReadOnlyAccess і AmazonDynamoDBFullAccess, як паказана на малюнку ніжэй:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Дадзеная лямбда павінна запускацца па трыгеру ад Kinesis пры трапленні новых запісаў у струмень airline_stream, таму трэба дадаць новы трыгер:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Засталося ўставіць код і захаваць лямбду.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Стварэнне лямбда-функцыі notifier

Другая лямбда-функцыя, якая будзе маніторыць другі струмень (special_stream) і адпраўляць апавяшчэнне ў SNS, ствараецца аналагічна. Такім чынам, гэтая лямбда павінна мець доступ на чытанне з Kinesis і адпраўку паведамленняў у зададзены SNS-топік, які далей сервісам SNS будзе адпраўлены ўсім падпісчыкам гэтага топіка (email, SMS і г.д).

Стварэнне IAM ролі
Спачатку ствараем IAM роля Lambda-KinesisAlarm для гэтай лямбды, а потым прызначаем гэтую ролю для стваранай лямбды alarm_notifier:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Гэтая лямбда павінна працаваць па трыгеры на трапленне новых запісаў у струмень special_stream, таму неабходна наладзіць трыгер аналагічна таму, як мы гэта рабілі для лямбды Collector.

Для выгоды налады гэтай лямбды, увядзем новую зменную асяроддзі - TOPIC_ARN, куды змяшчаем ANR (Amazon Recourse Names) топіка Airlines:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
І ўстаўляемы код лямбды, ён зусім нескладаны:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Здаецца, на гэтым ручная настройка сістэмы завершана. Застаецца толькі пратэставаць і пераканацца ў тым, што мы настроілі ўсё правільна.

Дэплой з кода Terraform

Неабходная падрыхтоўка

Terraform - Вельмі зручны open-source інструмент для разгортвання інфраструктуры з кода. У яго свой сінтаксіс, які лёгка асвоіць і мноства прыкладаў, як і што разгарнуць. У рэдактары Atom або Visual Studio Code шмат зручных убудоў, якія дазваляюць палегчыць працу з Terraform.

Дыстрыбутыў спампаваць можна адсюль. Падрабязны разбор усіх магчымасцяў Terraform выходзіць за рамкі дадзенага артыкула, таму абмяжуемся асноўнымі момантамі.

як запусціць

Поўны код праекта ляжыць у маім рэпазітары. Клануем да сябе рэпазітар. Перад запускам неабходна пераканацца, што ў вас усталяваны і наладжаны AWS CLI, т.я. Terraform будзе шукаць уліковыя дадзеныя ў файле ~/.aws/credentials.

Добрай практыкай з'яўляецца перад дэплоем усёй інфраструктуры, запускаць каманду plan, каб паглядзець, што Terraform нам зараз стварае ў воблаку:

terraform.exe plan

Будзе прапанавана ўвесці нумар тэлефона для адпраўкі на яго апавяшчэнняў. На гэтым этапе яго ўводзіць неабавязкова.

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Прааналізаваўшы план працы праграмы, можам запускаць стварэнне рэсурсаў:

terraform.exe apply

Пасля адпраўкі гэтай каманды зноў з'явіцца запыт на ўвядзенне нумара тэлефона, набіраем "yes", калі будзе паказана пытанне аб рэальным выкананні дзеянняў. Гэта дазволіць падняць усю інфраструктуру, правесці ўсю неабходную настройку EC2, разгарнуць лямбда-функцыі і г.д.

Пасля таго, як усе рэсурсы будуць паспяхова створаны праз код Terraform, неабходна зайсці ў дэталі дадатку Kinesis Analytics (на жаль, я не знайшоў як гэта зрабіць адразу з кода).

Запускаем дадатак:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Пасля гэтага неабходна відавочна задаць in-application stream name, абраўшы з які расчыняецца спісу:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Цяпер усё гатова да працы.

Тэставанне працы прыкладання

Незалежна, як вы дэплоілі сістэму, уручную ці праз код Terraform, працаваць яна будзе аднолькава.

Заходзім па SSH на віртуальную машыну EC2, дзе ўсталяваны Kinesis Agent і запускаем скрыпт api_caller.py

sudo ./api_caller.py TOKEN

Засталося дачакацца SMS на ваш нумар:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
SMS - паведамленне прыходзіць на тэлефон практычна праз 1 хвіліну:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless
Засталося паглядзець, ці захаваліся запісы ў базе дадзеных DynamoDB для наступнага, больш дэталёвага аналізу. Табліца airline_tickets змяшчае прыкладна такія дадзеныя:

Інтэграцыя Aviasales API з Amazon Kinesis і прастата serverless

Заключэнне

Падчас праведзенай працы была пабудавана сістэма анлайн-апрацоўкі дадзеных на базе Amazon Kinesis. Былі разгледжаны варыянты выкарыстання Kinesis Agent у звязку з Kinesis Data Streams і рэал-тайм аналітыкай Kinesis Analytics пры дапамозе SQL каманд, а таксама ўзаемадзеянне Amazon Kinesis з іншымі сэрвісамі AWS.

Вышэйапісаную сістэму мы разгарнулі двума спосабамі: досыць доўгім ручным і хуткім з кода Terraform.

Увесь зыходны код праекта даступны у маім рэпазітары на GitHub, прапаную з ім азнаёміцца.

З задавальненнем гатовы абмеркаваць артыкул, чакаю Вашых каментароў. Спадзяюся на канструктыўную крытыку.

Жадаю поспехаў!

Крыніца: habr.com

Дадаць каментар