Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Эй Ҳабр!

Оё шумо ҳавопаймоҳои парвозкунандаро дӯст медоред? Ман онро дӯст медорам, аммо ҳангоми ҷудошавӣ ман инчунин ба таҳлили маълумот дар бораи чиптаҳои ҳавопаймо аз як манбаи маъруф - Aviasales ошиқ шудам.

Имрӯз мо кори Amazon Kinesis-ро таҳлил мекунем, системаи ҷараёнро бо таҳлили вақти воқеӣ бунёд мекунем, базаи Amazon DynamoDB NoSQL-ро ҳамчун нигаҳдории асосии маълумот насб мекунем ва барои чиптаҳои ҷолиб огоҳиномаҳои SMS насб мекунем.

Ҳама тафсилотҳо дар зери бурида ҳастанд! Бирав!

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Муқаддима

Масалан, ба мо дастрасӣ лозим аст API Aviasales. Дастрасӣ ба он ройгон ва бидуни маҳдудият пешниҳод карда мешавад; ба шумо танҳо лозим аст, ки дар бахши "Таҳиягарон" сабти ном шавед, то ки API-и худро барои дастрасӣ ба маълумот дастрас кунед.

Мақсади асосии ин мақола додани фаҳмиши умумӣ дар бораи истифодаи ҷараёни иттилоот дар AWS мебошад; мо ба назар мегирем, ки маълумоте, ки аз ҷониби API истифода мешавад, ба таври қатъӣ навсозӣ нест ва аз кэш интиқол дода мешавад, ки дар асоси ҷустуҷӯҳои корбарони сайтҳои Aviasales.ru ва Jetradar.com дар давоми 48 соати охир ташаккул ёфтааст.

Kinesis-агент, ки дар мошини истеҳсолкунанда насб шудааст, тавассути API гирифта мешавад, ба таври худкор маълумотро ба ҷараёни дилхоҳ тавассути Kinesis Data Analytics таҳлил ва интиқол медиҳад. Варианти хоми ин ҷараён мустақиман ба мағоза навишта мешавад. Нигоҳдории маълумоти хом, ки дар DynamoDB ҷойгир шудааст, имкон медиҳад, ки тавассути абзорҳои BI, ба монанди AWS Quick Sight таҳлили амиқтари чиптаҳоро фароҳам оварад.

Мо ду вариантро барои ҷойгиркунии тамоми инфрасохтор баррасӣ хоҳем кард:

  • Дастӣ - тавассути Console Management AWS;
  • Инфрасохтор аз рамзи Terraform барои автоматҳои танбал аст;

Архитектураи системаи таҳияшуда

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Компонентҳои истифодашуда:

  • API Aviasales — маълумоте, ки ин API баргардонида мешавад, барои ҳама корҳои минбаъда истифода мешавад;
  • Намунаи истеҳсолкунандаи EC2 - як мошини муқаррарии виртуалӣ дар абр, ки дар он ҷараёни додаҳои воридотӣ тавлид мешавад:
    • Agent Kinesis як барномаи Java аст, ки ба таври маҳаллӣ дар мошин насб шудааст, ки роҳи осони ҷамъоварӣ ва ирсоли маълумотро ба Kinesis (Kinesis Data Streams ё Kinesis Firehose) таъмин мекунад. Агент пайваста маҷмӯи файлҳоро дар директорияҳои муайяншуда назорат мекунад ва ба Kinesis маълумоти нав мефиристад;
    • Скрипти занги API — Скрипти Python, ки ба API дархостҳо медиҳад ва посухро ба ҷузвдоне мегузорад, ки аз ҷониби Agent Kinesis назорат мешавад;
  • Ҷараёни маълумот Kinesis — хидмати ҷараёнии маълумот дар вақти воқеӣ бо имкониятҳои васеъи миқёс;
  • Kinesis Analytics як хидмати бе сервер аст, ки таҳлили ҷараёнро дар вақти воқеӣ осон мекунад. Amazon Kinesis Data Analytics захираҳои барномаро танзим мекунад ва ба таври худкор барои коркарди ҳама гуна ҳаҷми маълумоти воридотӣ миқёс мекунад;
  • AWS Lambda — хидмате, ки ба шумо имкон медиҳад кодро бидуни нусхабардорӣ ё насби серверҳо иҷро кунед. Ҳама қудрати ҳисоббарорӣ ба таври худкор барои ҳар як занг андоза карда мешавад;
  • Amazon DynamoDB - Махзани маълумот оид ба ҷуфтҳои арзишҳои калидӣ ва ҳуҷҷатҳое, ки таъхири камтар аз 10 миллисонияро ҳангоми кор дар ҳама гуна миқёс таъмин мекунанд. Ҳангоми истифодаи DynamoDB, ба шумо лозим нест, ки ягон серверро таъмин кунед, часпонед ё идора кунед. DynamoDB ба таври худкор ҷадвалҳоро миқёс мекунад, то миқдори захираҳои мавҷударо танзим кунад ва иҷрои баландро нигоҳ дорад. Маъмурияти система талаб карда намешавад;
  • Amazon SNS - хидмати пурра идорашаванда барои фиристодани паёмҳо бо истифода аз модели муштарии нашркунанда (Pub/Sub), ки бо он шумо метавонед хидматҳои хурд, системаҳои тақсимшуда ва барномаҳои бе серверро ҷудо кунед. SNS метавонад барои фиристодани маълумот ба корбарони ниҳоӣ тавассути огоҳиномаҳои мобилӣ, паёмҳои SMS ва почтаи электронӣ истифода шавад.

Омӯзиши ибтидоӣ

Барои тақлид кардани ҷараёни маълумот, ман тасмим гирифтам, ки маълумоти чиптаи ҳавопайморо, ки аз ҷониби Aviasales API баргардонида шудааст, истифода барам. ДАР хуччатхо Рӯйхати хеле васеи усулҳои гуногун, биёед яке аз онҳоро гирем - "Тақвими нархҳои моҳона", ки нархҳоро барои ҳар як рӯзи моҳ, гурӯҳбандишуда аз рӯи шумораи интиқолҳо бармегардонад. Агар шумо дар дархост моҳи ҷустуҷӯро нишон надиҳед, маълумот барои моҳи ояндаи ҷорӣ баргардонида мешавад.

Пас, биёед сабти ном шавем ва аломати худро гирем.

Намунаи дархост дар зер оварда шудааст:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Усули дар боло зикршудаи қабули маълумот аз API тавассути нишон додани нишона дар дархост кор хоҳад кард, аммо ман бартарӣ медиҳам, ки аломати дастрасиро тавассути сарлавҳа гузаронам, аз ин рӯ мо ин усулро дар скрипти api_caller.py истифода хоҳем кард.

Намунаи ҷавоб:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Намунаи посухи API дар боло чипта аз Санкт-Петербург ба Фукро нишон медиҳад... Оҳ, чӣ орзуе буд...
Азбаски ман аз Қазон ҳастам ва Пхукет ҳоло "танҳо орзу" аст, биёед чиптаҳоро аз Санкт-Петербург ба Қазон ҷустуҷӯ кунем.

Он тахмин мекунад, ки шумо аллакай ҳисоби AWS доред. Ман мехоҳам фавран таваҷҷӯҳи махсусро ба он ҷалб намоям, ки Kinesis ва фиристодани огоҳиномаҳо тавассути SMS ба ҳисоби солона дохил карда намешаванд. Сатҳи ройгон (истифодаи ройгон). Аммо бо вуҷуди ин, бо назардошти як ду доллар, системаи пешниҳодшударо сохтан ва бо он бозӣ кардан комилан имконпазир аст. Ва, албатта, фаромӯш накунед, ки ҳамаи захираҳо пас аз он ки онҳо дигар лозим нестанд, нест кунед.

Хушбахтона, функсияҳои DynamoDb ва lambda барои мо ройгон хоҳанд буд, агар мо маҳдудиятҳои ҳармоҳаи ройгони худро иҷро кунем. Масалан, барои DynamoDB: 25 ГБ нигоҳдорӣ, 25 WCU/RCU ва 100 миллион дархост. Ва дар як моҳ як миллион занги функсияи лямбда.

Ҷойгиркунии дастӣ система

Танзими ҷараёни маълумот Kinesis

Биёед ба хидмати Kinesis Data Streams равем ва ду ҷараёнҳои нав, барои ҳар яки онҳо як пора эҷод кунем.

Шард чист?
Шард воҳиди асосии интиқоли маълумот дар ҷараёни Amazon Kinesis мебошад. Як сегмент интиқоли додаҳои вурудро бо суръати 1 МБ/с ва интиқоли маълумотро бо суръати 2 МБ/с таъмин мекунад. Як сегмент то 1000 вурудоти PUT дар як сонияро дастгирӣ мекунад. Ҳангоми эҷоди ҷараёни маълумот шумо бояд миқдори зарурии сегментҳоро муайян кунед. Масалан, шумо метавонед ҷараёни маълумотро бо ду сегмент эҷод кунед. Ин ҷараёни додаҳо интиқоли додаҳои вурудро дар 2 МБ/с ва интиқоли маълумотро дар 4 МБ/с таъмин мекунад ва то 2000 сабти PUT дар як сонияро дастгирӣ мекунад.

Чӣ қадаре ки пораҳо дар ҷараёни шумо зиёдтар бошанд, гузариши он ҳамон қадар зиёдтар мешавад. Аслан, ҷараёнҳо ҳамин тавр миқёс карда мешаванд - бо илова кардани пораҳо. Аммо чӣ қадаре ки шумо пораҳои зиёд дошта бошед, ҳамон қадар нарх баландтар мешавад. Ҳар як пора дар як соат 1,5 сент ва барои ҳар як миллион воҳиди пурборкунандаи PUT 1.4 сенти иловагӣ арзиш дорад.

Биёед ҷараёнҳои навро бо ном эҷод кунем чиптаҳои_ҳавоӣ, 1 пора барои ӯ басанда хоҳад буд:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Акнун биёед риштаи дигареро бо ном эҷод кунем ҷараёни_ махсус:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Танзимоти истеҳсолкунанда

Барои таҳлили вазифа, истифодаи намунаи муқаррарии EC2 ҳамчун истеҳсолкунандаи маълумот кифоя аст. Он набояд як мошини пуриқтидор ва гаронбаҳои виртуалӣ бошад; Spot t2.micro хуб кор мекунад.

Қайд кардани муҳим: масалан, шумо бояд тасвирро истифода баред - Amazon Linux AMI 2018.03.0, он барои зуд оғоз кардани Kinesis Agent танзимоти камтар дорад.

Ба хидмати EC2 гузаред, як мошини нави виртуалӣ эҷод кунед, AMI-и дилхоҳро бо навъи t2.micro, ки ба сатҳи ройгон дохил карда шудааст, интихоб кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Барои он ки мошини маҷозии навтаъсис қодир бошад, ки бо хидмати Kinesis муошират кунад, ба он ҳуқуқ барои ин кор дода шавад. Беҳтарин роҳи ин таъин кардани Нақши IAM мебошад. Аз ин рӯ, дар Қадами 3: Экрани конфигуратсияи тафсилоти мисол, шумо бояд интихоб кунед Нақши нави IAM эҷод кунед:

Эҷоди нақши IAM барои EC2
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Дар равзанаи кушодашуда, интихоб кунед, ки мо барои EC2 нақши нав эҷод карда истодаем ва ба бахши Иҷозатҳо гузаред:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Бо истифода аз мисоли омӯзиш, ба мо лозим нест, ки ба ҳама нозукиҳои конфигуратсияи гранулярии ҳуқуқҳои захираҳо дохил шавем, аз ин рӯ мо сиёсатҳоеро, ки Amazon пешакӣ танзим кардааст, интихоб мекунем: AmazonKinesisFullAccess ва CloudWatchFullAccess.

Биёед барои ин нақш чанд номи пурмазмунро диҳем, масалан: EC2-KinesisStreams-FullAccess. Натиҷа бояд ҳамон тавре бошад, ки дар расми зер нишон дода шудааст:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Пас аз сохтани ин нақши нав, фаромӯш накунед, ки онро ба мисоли мошини виртуалии сохташуда замима кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Мо дар ин экран ҳеҷ чизи дигареро тағир намедиҳем ва ба тирезаҳои оянда мегузарем.

Танзимоти диски сахтро ҳамчун пешфарз ва инчунин тегҳо гузоштан мумкин аст (гарчанде ки истифодаи тегҳо таҷрибаи хуб аст, ҳадди аққал ба мисол ном диҳед ва муҳити атрофро нишон диҳед).

Ҳоло мо дар Қадами 6 ҳастем: Ҷадвали Гурӯҳи Амниятро танзим кунед, ки дар он шумо бояд як гурӯҳи нав эҷод кунед ё гурӯҳи Амнияти мавҷудаи худро муайян кунед, ки ба шумо имкон медиҳад тавассути ssh (порти 22) ба инстансия пайваст шавед. Дар он ҷо Сарчашма -> IP-и манро интихоб кунед ва шумо метавонед мисолро оғоз кунед.

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ҳамин ки он ба ҳолати корӣ мегузарад, шумо метавонед кӯшиш кунед, ки ба он тавассути ssh пайваст шавед.

Барои он ки бо Kinesis Agent кор карда тавонед, пас аз бомуваффақият пайваст шудан ба мошин, шумо бояд фармонҳои зеринро дар терминал ворид кунед:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Биёед барои захира кардани посухҳои API ҷузвдон эҷод кунем:

sudo mkdir /var/log/airline_tickets

Пеш аз оғози агент, шумо бояд конфигуратсияи онро танзим кунед:

sudo vim /etc/aws-kinesis/agent.json

Мундариҷаи файли agent.json бояд чунин бошад:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Тавре ки аз файли конфигуратсия дида мешавад, агент файлҳоро бо тамдиди .log дар директорияи /var/log/airline_tickets/ назорат мекунад, онҳоро таҳлил мекунад ва ба ҷараёни airline_tickets интиқол медиҳад.

Мо хидматро бозоғоз мекунем ва боварӣ ҳосил мекунем, ки он фаъол аст:

sudo service aws-kinesis-agent restart

Акнун биёед скрипти Python-ро зеркашӣ кунем, ки маълумотро аз API дархост мекунад:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Скрипти api_caller.py маълумотро аз Aviasales дархост мекунад ва посухи гирифташударо дар директорияе, ки агенти Kinesis скан мекунад, захира мекунад. Татбиқи ин скрипт комилан стандартӣ аст, синфи TicketsApi мавҷуд аст, он ба шумо имкон медиҳад, ки API-ро асинхронӣ кашед. Мо сарлавҳаро бо аломат мегузорем ва ба ин синф параметрҳоро дархост мекунем:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Барои санҷидани танзимоти дуруст ва фаъолияти агент, биёед скрипти api_caller.py -ро санҷем:

sudo ./api_caller.py TOKEN

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ва мо ба натиҷаи кор дар гузоришҳои агент ва ҷадвали Мониторинг дар ҷараёни маълумотҳои airline_tickets назар мекунем:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Тавре ки шумо мебинед, ҳама чиз кор мекунад ва агенти Kinesis маълумотро ба ҷараён бомуваффақият мефиристад. Акнун биёед истеъмолкунандаро танзим кунем.

Насб кардани Kinesis Data Analytics

Биёед ба ҷузъи марказии тамоми система гузарем - як барномаи нав дар Kinesis Data Analytics бо номи kinesis_analytics_airlines_app эҷод кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Kinesis Data Analytics ба шумо имкон медиҳад, ки дар вақти воқеӣ аз Kinesis Streams бо истифода аз забони SQL таҳлили маълумотро анҷом диҳед. Ин як хидмати пурраи автоматӣ аст (баръакси Kinesis Streams), ки:

  1. ба шумо имкон медиҳад, ки ҷараёнҳои нав (Ҷараёни Натиҷа) дар асоси дархостҳо ба маълумоти манбаъ эҷод кунед;
  2. ҷараёнро бо хатогиҳое, ки ҳангоми кор кардани барномаҳо рух додаанд, таъмин мекунад (Ҷараёни хато);
  3. метавонад схемаи маълумоти воридшударо ба таври худкор муайян кунад (дар ҳолати зарурӣ онро дастӣ аз нав муайян кардан мумкин аст).

Ин хидмати арзон нест - 0.11 USD дар як соати кор, аз ин рӯ шумо бояд онро бодиққат истифода баред ва пас аз анҷоми кор онро нест кунед.

Биёед барномаро ба манбаи маълумот пайваст кунем:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ҷараеро интихоб кунед, ки мо ба он пайваст мешавем (airline_tickets):

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Баъдан, шумо бояд Нақши нави IAM-ро замима кунед, то барнома тавонад аз ҷараён хонад ва ба ҷараён нависад. Барои ин кифоя аст, ки чизеро дар блоки иҷозатҳои дастрасӣ тағир надиҳед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Акнун биёед пайдо кардани схемаи маълумотро дар ҷараён дархост кунем; Барои ин тугмаи "Кашф кардани схема" -ро клик кунед. Дар натиҷа, нақши IAM нав карда мешавад (наси нав эҷод карда мешавад) ва муайянкунии схема аз маълумоте, ки аллакай ба ҷараён омадааст, оғоз карда мешавад:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Акнун шумо бояд ба муҳаррири SQL равед. Вақте ки шумо ин тугмаро пахш мекунед, тирезае пайдо мешавад, ки аз шумо хоҳиш мекунад, ки барномаро оғоз кунед - он чизеро, ки мехоҳед оғоз кунед, интихоб кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Дар равзанаи муҳаррири SQL дархости оддии зеринро ворид кунед ва Захира ва иҷро кардани SQL-ро пахш кунед:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Дар пойгоҳи додаҳои релятсионӣ, шумо бо ҷадвалҳо бо истифода аз изҳороти INSERT барои илова кардани сабтҳо ва изҳороти SELECT барои дархости додаҳо кор мекунед. Дар Amazon Kinesis Data Analytics, шумо бо ҷараёнҳо (STREAMs) ва насосҳо (PUMPs) кор мекунед - дархостҳои пайваста ворид кунед, ки маълумотро аз як ҷараён дар барнома ба ҷараёни дигар ворид мекунанд.

Дархости SQL, ки дар боло оварда шудааст, чиптаҳои Аэрофлотро бо арзиши камтар аз панҷ ҳазор рубл ҷустуҷӯ мекунад. Ҳама сабтҳое, ки ба ин шартҳо мувофиқанд, дар ҷараёни DESTINATION_SQL_STREAM ҷойгир карда мешаванд.

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Дар блоки Destination, ҷараёни special_stream-ро интихоб кунед ва дар рӯйхати афтанда номи ҷараёни дохили барнома DESTINATION_SQL_STREAM:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Натиҷаи ҳама амалҳо бояд чизе монанд ба расми дар поён овардашуда бошад:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Эҷод ва обуна ба мавзӯи SNS

Ба Хадамоти огоҳии оддӣ равед ва дар он ҷо мавзӯи навро бо номи Airlines эҷод кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ба ин мавзӯъ обуна шавед ва рақами телефони мобилиро нишон диҳед, ки ба он SMS огоҳиномаҳо фиристода мешаванд:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Дар DynamoDB ҷадвал эҷод кунед

Барои нигоҳ доштани маълумоти хом аз ҷараёни airline_tickets, биёед дар DynamoDB ҷадвал бо ҳамон ном эҷод кунем. Мо record_id-ро ҳамчун калиди асосӣ истифода хоҳем кард:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Эҷоди коллектори функсияи ламбда

Биёед функсияи ламбдаро бо номи Коллектор созем, ки вазифаи он пурсиши ҷараёни airline_tickets мебошад ва агар дар он ҷо сабтҳои нав пайдо шаванд, ин сабтҳоро ба ҷадвали DynamoDB дохил кунед. Аён аст, ки ба ғайр аз ҳуқуқҳои пешфарз, ин ламбда бояд дастрасии хондан ба ҷараёни додаҳои Kinesis ва дастрасии навиштан ба DynamoDB дошта бошад.

Эҷоди нақши IAM барои функсияи лямбда коллектор
Аввалан, биёед нақши нави IAM-ро барои ламбда бо номи Lambda-TicketsProcessingRole эҷод кунем:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Барои мисоли санҷиш, сиёсатҳои қаблан танзимшудаи AmazonKinesisReadOnlyAccess ва AmazonDynamoDBFullAccess хеле мувофиқанд, тавре ки дар расми зер нишон дода шудааст:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Ин ламбда бояд тавассути триггер аз Kinesis ҳангоми ворид шудан ба airline_stream вурудоти нав оғоз шавад, бинобар ин мо бояд триггери нав илова кунем:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ҳама чизи боқимонда ворид кардани код ва захира кардани ламбда аст.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Эҷоди огоҳкунандаи функсияи lambda

Функсияи дуюми ламбда, ки ҷараёни дуюмро (special_stream) назорат мекунад ва ба SNS огоҳинома мефиристад, ба ҳамин тариқ сохта шудааст. Аз ин рӯ, ин ламбда бояд дастрасии хондан аз Kinesis дошта бошад ва ба мавзӯи додаи SNS паём фиристад, ки баъдан аз ҷониби хадамоти SNS ба ҳамаи муштариёни ин мавзӯъ (почтаи электронӣ, SMS ва ғайра) фиристода мешавад.

Эҷоди нақши IAM
Аввалан, мо нақши IAM Lambda-KinesisAlarm-ро барои ин лямбда эҷод мекунем ва сипас ин нақшро ба ламбдаҳои alarm_notifier таъсис медиҳем:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

Ин ламбда бояд дар триггер барои сабтҳои нав барои ворид шудан ба special_stream кор кунад, бинобар ин шумо бояд триггерро ба ҳамон тарзе танзим кунед, ки мо барои lambda Collector кардаем.

Барои осонтар кардани конфигуратсияи ин ламбда, биёед як тағирёбандаи нави муҳити зистро ҷорӣ кунем - TOPIC_ARN, ки дар он мо ANR (Номҳои Amazon Recourse) -и мавзӯи Airlines -ро ҷойгир мекунем:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Ва рамзи лямбдаро ворид кунед, он тамоман мушкил нест:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Чунин ба назар мерасад, ки дар ин ҷо конфигуратсияи дастӣ система ба анҷом мерасад. Танҳо санҷиш ва боварӣ ҳосил кардан аст, ки мо ҳама чизро дуруст танзим кардаем.

Аз рамзи Terraform ҷойгир кунед

Тайёрии зарурӣ

Терафром як воситаи хеле қулайи кушодаасос барои густариши инфрасохтор аз код мебошад. Он синтаксиси худро дорад, ки омӯхтан осон аст ва мисолҳои зиёде дорад, ки чӣ гуна ва чӣ кор кардан лозим аст. Муҳаррири Atom ё Visual Studio Code дорои бисёр плагинҳои қулай мебошад, ки кор бо Terraformро осон мекунанд.

Шумо метавонед тақсимотро зеркашӣ кунед аз ин ҷо. Таҳлили муфассали тамоми қобилиятҳои Terraform аз доираи ин мақола берун аст, аз ин рӯ мо худро бо нуктаҳои асосӣ маҳдуд хоҳем кард.

Чӣ гуна бояд оғоз кард

Рамзи пурраи лоиҳа ин аст дар анбори ман. Мо анборро ба худамон клон мекунем. Пеш аз оғоз, шумо бояд боварӣ ҳосил кунед, ки шумо AWS CLI насб ва танзим кардаед, зеро... Terraform дар файли ~/.aws/credentials маълумотномаҳоро ҷустуҷӯ мекунад.

Таҷрибаи хуб иҷро кардани фармони нақша пеш аз ҷойгиркунии тамоми инфрасохтор аст, то бубинед, ки Terraform ҳоло барои мо дар абр чӣ эҷод мекунад:

terraform.exe plan

Аз шумо хоҳиш карда мешавад, ки рақами телефонро барои фиристодани огоҳинома ворид кунед. Дар ин марҳила ворид кардани он шарт нест.

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Нақшаи амалиётии барномаро таҳлил карда, мо метавонем ба эҷоди захираҳо шурӯъ кунем:

terraform.exe apply

Пас аз фиристодани ин фармон, аз шумо дубора дархост карда мешавад, ки рақами телефонро ворид кунед; вақте ки савол дар бораи иҷро кардани амалҳо пайдо мешавад, "ҳа" -ро гиред. Ин ба шумо имкон медиҳад, ки тамоми инфрасохторро насб кунед, ҳама конфигуратсияҳои зарурии EC2-ро иҷро кунед, функсияҳои ламбда ва ғайраро ҷойгир кунед.

Пас аз он ки тамоми захираҳо тавассути рамзи Terraform бомуваффақият сохта шудаанд, шумо бояд ба тафсилоти замимаи Kinesis Analytics ворид шавед (мутаассифона, ман ин корро мустақиман аз код наёфтам).

Барномаро оғоз кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Пас аз ин, шумо бояд ба таври возеҳ номи ҷараёни дохили барномаро тавассути интихоб аз рӯйхати афтанда таъин кунед:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Акнун ҳама чиз барои рафтан омода аст.

Санҷиши барнома

Новобаста аз он ки шумо системаро ба таври дастӣ ё тавассути рамзи Terraform ҷойгир кардаед, он ҳамон тавр кор хоҳад кард.

Мо тавассути SSH ба мошини виртуалии EC2, ки дар он Kinesis Agent насб шудааст, ворид мешавем ва скрипти api_caller.py -ро иҷро мекунем

sudo ./api_caller.py TOKEN

Шумо бояд танҳо интизори SMS ба рақами шумост:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
SMS - паём ба телефон тақрибан дар 1 дақиқа мерасад:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер
Барои дидани он, ки оё сабтҳо дар пойгоҳи додаҳои DynamoDB барои таҳлили минбаъда ва муфассал захира карда шудаанд, боқӣ мемонад. Ҷадвали авиабилетҳо тақрибан маълумоти зеринро дар бар мегирад:

Интегратсияи Aviasales API бо Amazon Kinesis ва соддагии бе сервер

хулоса

Дар ҷараёни кори анҷомдодашуда дар асоси Amazon Kinesis системаи коркарди маълумот дар интернет сохта шуд. Вариантҳои истифодаи Kinesis Agent дар якҷоягӣ бо Kinesis Data Streams ва таҳлили вақти воқеӣ Kinesis Analytics бо истифода аз фармонҳои SQL, инчунин ҳамкории Amazon Kinesis бо дигар хидматҳои AWS баррасӣ шуданд.

Мо системаи дар боло зикршударо бо ду роҳ ҷойгир кардем: як дастӣ хеле дароз ва системаи зуд аз рамзи Terraform.

Ҳама рамзи сарчашмаи лоиҳа дастрасанд дар анбори GitHub ман, Ман тавсия медиҳам, ки шумо бо он шинос шавед.

Ман шодам, ки мақоларо баррасӣ кунам, шарҳҳои шуморо интизорам. Ман ба танқиди созанда умедворам.

Ба шумо муваффақият орзумандам!

Манбаъ: will.com

Илова Эзоҳ