Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Эй Хабр!

Ұшқан ұшақтарды ұнатасыз ба? Мен оны жақсы көремін, бірақ оқшаулану кезінде мен бір белгілі ресурстан - Aviasales-тен әуе билеттері туралы деректерді талдауды ұнатамын.

Бүгін біз Amazon Kinesis жұмысын талдаймыз, нақты уақыттағы аналитикасы бар ағындық жүйені құрастырамыз, негізгі деректерді сақтау орны ретінде Amazon DynamoDB NoSQL дерекқорын орнатамыз және қызықты билеттер үшін SMS хабарламаларын орнатамыз.

Барлық мәліметтер кесудің астында! Бар!

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Кіріспе

Мысалы, бізге қол жеткізу керек Aviasales API. Оған кіру тегін және шектеусіз беріледі, деректерге қол жеткізу үшін API таңбалауышын алу үшін «Әзірлеушілер» бөлімінде тіркелу жеткілікті.

Бұл мақаланың негізгі мақсаты - AWS жүйесінде ақпарат ағынын пайдалану туралы жалпы түсінік беру; біз пайдаланылатын API қайтаратын деректердің қатаң жаңартылмайтынын және кэштен тасымалданатынын ескереміз. Aviasales.ru және Jetradar.com сайттарын пайдаланушылардың соңғы 48 сағаттағы іздеулері негізінде қалыптастырылды.

API арқылы алынған, өндіруші машинада орнатылған Kinesis-агент Kinesis Data Analytics арқылы деректерді автоматты түрде талдайды және қажетті ағынға жібереді. Бұл ағынның өңделмеген нұсқасы тікелей дүкенге жазылады. DynamoDB жүйесінде орналастырылған бастапқы деректер қоймасы AWS Quick Sight сияқты BI құралдары арқылы билеттерді тереңірек талдауға мүмкіндік береді.

Біз бүкіл инфрақұрылымды орналастырудың екі нұсқасын қарастырамыз:

  • Қолмен - AWS басқару консолі арқылы;
  • Terraform кодының инфрақұрылымы жалқау автоматтарға арналған;

Әзірленген жүйенің архитектурасы

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Қолданылатын компоненттер:

  • Aviasales API — осы API қайтарған деректер барлық келесі жұмыстар үшін пайдаланылады;
  • EC2 продюсер данасы — кіріс деректер ағыны жасалатын бұлттағы кәдімгі виртуалды машина:
    • Kinesis агенті Kinesis (Kinesis Data Streams немесе Kinesis Firehose) жүйесіне деректерді жинаудың және жіберудің оңай жолын қамтамасыз ететін құрылғыда жергілікті түрде орнатылған Java қолданбасы. Агент көрсетілген каталогтардағы файлдар жинағын үнемі бақылайды және Kinesis-ке жаңа деректерді жібереді;
    • API қоңырау шалушы сценарийі — API сұрауларын жасайтын және жауапты Kinesis агенті бақылайтын қалтаға салатын Python сценарийі;
  • Kinesis деректер ағындары — масштабтаудың кең мүмкіндіктері бар нақты уақыт режиміндегі деректер ағыны қызметі;
  • Kinesis Analytics нақты уақыт режимінде ағындық деректерді талдауды жеңілдететін серверсіз қызмет болып табылады. Amazon Kinesis Data Analytics қолданба ресурстарын конфигурациялайды және кіріс деректердің кез келген көлемін өңдеу үшін автоматты түрде масштабтайды;
  • AWS Lambda — серверлердің сақтық көшірмесін жасамай немесе орнатусыз кодты іске қосуға мүмкіндік беретін қызмет. Барлық есептеу қуаты әрбір қоңырау үшін автоматты түрде масштабталады;
  • Amazon DynamoDB - Кез келген масштабта іске қосылғанда 10 миллисекундтан аз кідірісті қамтамасыз ететін кілт-мән жұптары мен құжаттардың дерекқоры. DynamoDB пайдалану кезінде кез келген серверлерді қамтамасыз ету, түзету немесе басқару қажет емес. DynamoDB қол жетімді ресурстардың көлемін реттеу және жоғары өнімділікті қолдау үшін кестелерді автоматты түрде масштабтайды. Жүйені басқару қажет емес;
  • Amazon SNS - микросервистерді, бөлінген жүйелерді және серверсіз қолданбаларды оқшаулауға болатын баспагер-жазылу (Pub/Sub) үлгісін пайдаланып хабарларды жіберуге арналған толық басқарылатын қызмет. SNS мобильді push хабарландырулары, SMS хабарламалар және электрондық пошта арқылы соңғы пайдаланушыларға ақпаратты жіберу үшін пайдаланылуы мүмкін.

Бастапқы дайындық

Деректер ағынын эмуляциялау үшін мен Aviasales API қайтарған әуе билеті туралы ақпаратты пайдалануды шештім. IN құжаттама әртүрлі әдістердің өте кең тізімі, олардың бірін алайық - аударымдар саны бойынша топтастырылған айдың әрбір күні үшін бағаларды қайтаратын «Айлық баға күнтізбесі». Сұрауда іздеу айын көрсетпесеңіз, ақпарат ағымдағы айдан кейінгі айға қайтарылады.

Ендеше тіркеліп, жетонымызды алайық.

Төмендегі мысал сұрау:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Сұраудағы таңбалауышты көрсету арқылы API-ден деректерді алудың жоғарыдағы әдісі жұмыс істейді, бірақ мен қол жеткізу таңбалауышын тақырып арқылы өткізгенді қалаймын, сондықтан біз бұл әдісті api_caller.py сценарийінде қолданамыз.

Жауаптың үлгісі:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Жоғарыдағы мысал API жауабы Санкт-Петербургтен Пукқа билетті көрсетеді... О, қандай арман...
Мен Қазандық болғандықтан және Пхукет қазір «тек арман» болғандықтан, Санкт-Петербургтен Қазанға билеттерді іздеп көрейік.

Ол сізде AWS тіркелгісі бар деп болжайды. Мен бірден ерекше назар аударғым келеді, бұл Kinesis және SMS арқылы хабарлама жіберу жыл сайынғы бағдарламаға кірмейді. Тегін деңгей (тегін пайдалану). Бірақ бұған қарамастан, бір-екі долларды ескере отырып, ұсынылған жүйені құрастырып, онымен ойнауға әбден болады. Және, әрине, қажет болмай қалғаннан кейін барлық ресурстарды жоюды ұмытпаңыз.

Бақытымызға орай, егер біз ай сайынғы тегін шектеулерімізді орындасақ, DynamoDb және lambda функциялары біз үшін тегін болады. Мысалы, DynamoDB үшін: 25 ГБ жады, 25 WCU/RCU және 100 миллион сұрау. Және айына миллион лямбда функциясы шақырылады.

Жүйені қолмен орналастыру

Kinesis деректер ағындарын орнату

Kinesis Data Streams қызметіне өтіп, әрқайсысы үшін бір бөліктен тұратын екі жаңа ағын жасайық.

Сынық дегеніміз не?
Бөлшек Amazon Kinesis ағынының негізгі деректерді тасымалдау бірлігі болып табылады. Бір сегмент 1 МБ/с жылдамдықпен кіріс деректерін беруді және 2 МБ/с жылдамдықпен деректерді беруді қамтамасыз етеді. Бір сегмент секундына 1000 PUT жазбасын қолдайды. Деректер ағынын жасау кезінде сегменттердің қажетті санын көрсету қажет. Мысалы, екі сегменті бар деректер ағынын жасауға болады. Бұл деректер ағыны секундына 2 PUT жазбасына дейін қолдау көрсете отырып, кіріс деректерін 4 МБ/с және шығыс деректерін 2000 МБ/с жылдамдықпен тасымалдауды қамтамасыз етеді.

Ағыныңыздағы үзінділер неғұрлым көп болса, оның өткізу қабілеті соғұрлым жоғары болады. Негізінде, ағындар осылайша масштабталады - үзінділерді қосу арқылы. Бірақ сізде сынықтар неғұрлым көп болса, соғұрлым баға жоғары болады. Әрбір сынық сағатына 1,5 цент және әрбір миллион PUT пайдалы жүктеме бірлігі үшін қосымша 1.4 цент тұрады.

Аты бар жаңа ағын жасайық әуе_билеттері, оған 1 сынық жеткілікті болады:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Енді атаумен тағы бір ағын құрайық арнайы_ағын:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Продюсерді орнату

Тапсырманы талдау үшін деректерді өндіруші ретінде кәдімгі EC2 данасын пайдалану жеткілікті. Бұл қуатты, қымбат виртуалды машина болуы міндетті емес; spot t2.micro өте жақсы жұмыс істейді.

Маңызды ескерту: мысалы, суретті пайдалану керек - Amazon Linux AMI 2018.03.0, оның Kinesis агентін жылдам іске қосу үшін параметрлері аз.

EC2 қызметіне өтіңіз, жаңа виртуалды машина жасаңыз, Тегін деңгейге кіретін t2.micro түрімен қалаған AMI таңдаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Жаңадан жасалған виртуалды машина Kinesis қызметімен өзара әрекеттесе алуы үшін оған құқықтар берілуі керек. Мұны істеудің ең жақсы жолы - IAM рөлін тағайындау. Сондықтан 3-қадам: Дана мәліметтерін конфигурациялау экранында таңдау керек Жаңа IAM рөлін жасаңыз:

EC2 үшін IAM рөлін жасау
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Ашылған терезеде EC2 үшін жаңа рөл жасап жатқанымызды таңдап, Рұқсаттар бөліміне өтіңіз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Жаттығу мысалын пайдалана отырып, біз ресурс құқықтарының түйіршікті конфигурациясының барлық қыр-сырын меңгерудің қажеті жоқ, сондықтан Amazon алдын ала конфигурациялаған саясаттарды таңдаймыз: AmazonKinesisFullAccess және CloudWatchFullAccess.

Бұл рөлге мағыналы атау берейік, мысалы: EC2-KinesisStreams-FullAccess. Нәтиже төмендегі суретте көрсетілгендей болуы керек:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Осы жаңа рөлді жасағаннан кейін оны жасалған виртуалды машина данасына тіркеуді ұмытпаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Біз бұл экранда басқа ештеңені өзгертпейміз және келесі терезелерге көшеміз.

Қатты диск параметрлерін тегтермен қатар әдепкі ретінде қалдыруға болады (тегтерді пайдалану жақсы тәжірибе болса да, кем дегенде данаға атау беріңіз және ортаны көрсетіңіз).

Енді біз 6-қадам: Қауіпсіздік тобын конфигурациялау қойындысында тұрмыз, мұнда жаңасын жасау керек немесе данаға ssh (порт 22) арқылы қосылуға мүмкіндік беретін бар Қауіпсіздік тобын көрсету керек. Сол жерде Source -> My IP таңдаңыз, сонда сіз дананы іске қоса аласыз.

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Ол жұмыс күйіне ауысқаннан кейін оған ssh арқылы қосылуға болады.

Kinesis Agent-пен жұмыс істеу үшін құрылғыға сәтті қосылғаннан кейін терминалға келесі пәрмендерді енгізу керек:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API жауаптарын сақтау үшін қалтаны жасайық:

sudo mkdir /var/log/airline_tickets

Агентті іске қоспас бұрын оның конфигурациясын конфигурациялау қажет:

sudo vim /etc/aws-kinesis/agent.json

agent.json файлының мазмұны келесідей болуы керек:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Конфигурация файлынан көрініп тұрғандай, агент /var/log/airline_tickets/ каталогындағы .log кеңейтімі бар файлдарды бақылайды, оларды талдайды және оларды airline_tickets ағынына тасымалдайды.

Біз қызметті қайта іске қосып, оның жұмыс істеп тұрғанына көз жеткіземіз:

sudo service aws-kinesis-agent restart

Енді API деректерін сұрайтын Python сценарийін жүктеп алайық:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py сценарийі Aviasales қызметінен деректерді сұрайды және алынған жауапты Kinesis агенті сканерлейтін каталогта сақтайды. Бұл сценарийді іске асыру өте стандартты, TicketsApi класы бар, ол API-ны асинхронды түрде тартуға мүмкіндік береді. Біз таңбалауышы бар тақырыпты жібереміз және осы сыныпқа параметрлерді сұраймыз:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Агенттің дұрыс параметрлері мен функционалдығын тексеру үшін api_caller.py сценарийін іске қосуды сынап көрейік:

sudo ./api_caller.py TOKEN

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Біз жұмыс нәтижесін Агент журналдарында және airline_tickets деректер ағынындағы Мониторинг қойындысында қарастырамыз:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Көріп отырғаныңыздай, бәрі жұмыс істейді және Kinesis агенті деректерді ағынға сәтті жібереді. Енді тұтынушыны конфигурациялайық.

Kinesis Data Analytics орнату

Бүкіл жүйенің орталық құрамдас бөлігіне көшейік - Kinesis Data Analytics жүйесінде kinesis_analytics_airlines_app атты жаңа қолданба жасаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Kinesis Data Analytics SQL тілін пайдаланып Kinesis Streams-тен нақты уақыттағы деректер талдауын орындауға мүмкіндік береді. Бұл толық автомасштабтау қызметі (Kinesis Streams-тен айырмашылығы):

  1. бастапқы деректерге сұраныстар негізінде жаңа ағындарды (Output Stream) жасауға мүмкіндік береді;
  2. қолданбаларды іске қосу кезінде орын алған қателері бар ағынмен қамтамасыз етеді (Error Stream);
  3. кіріс деректер схемасын автоматты түрде анықтай алады (қажет болған жағдайда оны қолмен қайта анықтауға болады).

Бұл арзан қызмет емес - жұмыс сағатына 0.11 АҚШ доллары, сондықтан оны мұқият пайдаланып, жұмыс аяқталғаннан кейін жою керек.

Қолданбаны деректер көзіне қосайық:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Біз қосылатын ағынды таңдаңыз (әуе билеттері):

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Әрі қарай, қолданба ағыннан оқып, ағынға жаза алатындай жаңа IAM рөлін тіркеу керек. Ол үшін Access рұқсаттар блогында ештеңені өзгертпеу жеткілікті:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Енді ағындағы деректер схемасын табуды сұрайық; ол үшін «Схеманы табу» түймесін басыңыз. Нәтижесінде IAM рөлі жаңартылады (жаңасы жасалады) және схеманы анықтау ағынға әлдеқашан келген деректерден іске қосылады:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Енді SQL редакторына өту керек. Осы түймені басқан кезде қолданбаны іске қосуды сұрайтын терезе пайда болады - іске қосқыңыз келетін нәрсені таңдаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
SQL өңдегішінің терезесіне келесі қарапайым сұрауды енгізіп, SQL-ді сақтау және іске қосу түймешігін басыңыз:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Реляциялық дерекқорларда жазбаларды қосу үшін INSERT мәлімдемелерін және сұрау деректеріне SELECT операторын пайдаланып кестелермен жұмыс істейсіз. Amazon Kinesis Data Analytics бағдарламасында сіз ағындармен (STREAMs) және сорғылармен (PUMPs) жұмыс істейсіз — қолданбадағы бір ағыннан деректерді басқа ағынға кірістіретін үздіксіз кірістіру сұраулары.

Жоғарыда ұсынылған SQL сұрауы құны бес мың рубльден төмен Аэрофлот билеттерін іздейді. Осы шарттарға сәйкес келетін барлық жазбалар DESTINATION_SQL_STREAM ағынында орналастырылады.

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Destination блогында special_stream ағынын және In-application stream name DESTINATION_SQL_STREAM ашылмалы тізімінде таңдаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Барлық манипуляциялардың нәтижесі төмендегі суретке ұқсас болуы керек:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

SNS тақырыбын жасау және оған жазылу

Қарапайым хабарландыру қызметіне өтіңіз және сол жерде авиакомпания атауымен жаңа тақырып жасаңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Осы тақырыпқа жазылыңыз және SMS хабарламалар жіберілетін ұялы телефон нөмірін көрсетіңіз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

DynamoDB бағдарламасында кесте жасаңыз

Олардың airline_tickets ағынындағы бастапқы деректерді сақтау үшін DynamoDB жүйесінде бірдей атпен кестені жасайық. Негізгі кілт ретінде record_id пайдаланамыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Ламбда функциясының коллекторын жасау

Коллектор деп аталатын лямбда функциясын жасайық, оның міндеті airline_tickets ағынын сұрау және егер ол жерден жаңа жазбалар табылса, осы жазбаларды DynamoDB кестесіне енгізіңіз. Әлбетте, әдепкі құқықтардан басқа, бұл лямбда Kinesis деректер ағынына оқуға және DynamoDB жүйесіне жазу рұқсатына ие болуы керек.

Коллектор ламбда функциясы үшін IAM рөлін жасау
Алдымен, Lambda-TicketsProcessingRole деп аталатын лямбда үшін жаңа IAM рөлін жасайық:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Сынақ мысалы үшін алдын ала конфигурацияланған AmazonKinesisReadOnlyAccess және AmazonDynamoDBFullAccess саясаттары төмендегі суретте көрсетілгендей өте қолайлы:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Бұл лямбда Kinesis триггерімен airline_stream ішіне жаңа жазбалар енген кезде іске қосылуы керек, сондықтан бізге жаңа триггерді қосу керек:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Тек кодты енгізу және ламбданы сақтау ғана қалады.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda функциясы хабарландырушысын жасау

Екінші ағынды (арнайы_ағын) бақылайтын және SNS хабарландыруын жіберетін екінші лямбда функциясы ұқсас жолмен жасалған. Сондықтан, бұл лямбда Kinesis-тен оқуға және берілген SNS тақырыбына хабарламалар жіберуге рұқсаты болуы керек, содан кейін оны SNS қызметі осы тақырыптың барлық жазылушыларына (электрондық пошта, SMS және т.б.) жібереді.

IAM рөлін жасау
Алдымен, осы ламбда үшін Lambda-KinesisAlarm IAM рөлін жасаймыз, содан кейін бұл рөлді жасалып жатқан alarm_notifier лямбдаға тағайындаймыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

Бұл лямбда special_stream ішіне кіру үшін жаңа жазбалар үшін триггерде жұмыс істеуі керек, сондықтан триггерді коллектор ламбда үшін жасағандай конфигурациялау қажет.

Бұл лямбданы конфигурациялауды жеңілдету үшін жаңа орта айнымалысын енгізейік - TOPIC_ARN, онда біз авиакомпаниялар тақырыбының ANR (Amazon Recourse Names) орналастырамыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Ламбда кодын енгізіңіз, бұл мүлдем күрделі емес:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Бұл жерде жүйені қолмен конфигурациялау аяқталған сияқты. Тек тестілеу және барлығын дұрыс конфигурациялағанымызға көз жеткізу ғана қалады.

Terraform кодынан орналастыру

Қажетті дайындық

Terraform кодтан инфрақұрылымды орналастыруға арналған өте ыңғайлы ашық бастапқы құрал. Оның үйренуге оңай өзіндік синтаксисі бар және қалай және нені орналастыру керектігі туралы көптеген мысалдар бар. Atom редакторында немесе Visual Studio кодында Terraform-пен жұмыс істеуді жеңілдететін көптеген ыңғайлы плагиндер бар.

Сіз таратуды жүктей аласыз мұнда. Барлық Terraform мүмкіндіктерін егжей-тегжейлі талдау осы мақаланың аясынан тыс, сондықтан біз негізгі ойлармен шектелеміз.

Қалай іске қосу керек

Жобаның толық коды менің репозиторийімде. Біз репозиторийді өзімізге клондаймыз. Бастамас бұрын, сізде AWS CLI орнатылғанына және конфигурацияланғанына көз жеткізу керек, себебі... Terraform тіркелгі деректерін ~/.aws/credentials файлынан іздейді.

Терраформ қазір бұлтта біз үшін не жасап жатқанын көру үшін бүкіл инфрақұрылымды орналастырмас бұрын жоспар пәрменін іске қосу жақсы тәжірибе болып табылады:

terraform.exe plan

Хабарландырулар жіберу үшін телефон нөмірін енгізу сұралады. Бұл кезеңде оны енгізу қажет емес.

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Бағдарламаның жұмыс жоспарын талдай отырып, біз ресурстарды құруға кірісе аламыз:

terraform.exe apply

Осы пәрменді жібергеннен кейін сізден телефон нөмірін енгізу сұралады, әрекеттерді нақты орындау туралы сұрақ көрсетілгенде «иә» теріңіз. Бұл сізге бүкіл инфрақұрылымды орнатуға, EC2 барлық қажетті конфигурациясын орындауға, lambda функцияларын орналастыруға және т.б. мүмкіндік береді.

Барлық ресурстар Terraform коды арқылы сәтті жасалғаннан кейін, Kinesis Analytics қолданбасының егжей-тегжейіне өту керек (өкінішке орай, мен мұны тікелей кодтан қалай жасау керектігін таппадым).

Қолданбаны іске қосыңыз:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Осыдан кейін, ашылмалы тізімнен таңдау арқылы қолданбадағы ағын атауын нақты орнатуыңыз керек:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Енді бәрі жүруге дайын.

Қолданбаны сынау

Жүйені қолмен немесе Terraform коды арқылы қалай орналастырғаныңызға қарамастан, ол бірдей жұмыс істейді.

Kinesis Agent орнатылған EC2 виртуалды машинасына SSH арқылы кіріп, api_caller.py сценарийін іске қосамыз.

sudo ./api_caller.py TOKEN

Сізге тек нөміріңізге SMS күту жеткілікті:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
SMS – телефоныңызға хабарлама 1 минутта келеді:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық
Кейінгі егжей-тегжейлі талдау үшін жазбалардың DynamoDB дерекқорында сақталғанын көру керек. Airline_tickets кестесі шамамен келесі деректерді қамтиды:

Aviasales API біріктіру Amazon Kinesis және серверсіз қарапайымдылық

қорытынды

Атқарылған жұмыс барысында Amazon Kinesis негізінде деректерді онлайн өңдеу жүйесі құрылды. Kinesis агентін Kinesis Data Streams және SQL пәрмендерін қолданатын нақты уақыттағы Kinesis Analytics аналитикасымен бірге пайдалану опциялары, сондай-ақ Amazon Kinesis басқа AWS қызметтерімен өзара әрекеттесуі қарастырылды.

Біз жоғарыда аталған жүйені екі жолмен орналастырдық: өте ұзын қолмен және Terraform кодынан жылдам.

Барлық жобаның бастапқы коды қолжетімді менің GitHub репозиторийімде, Мен сізге онымен танысуды ұсынамын.

Мақаланы талқылауға қуаныштымын, пікірлеріңізді күтемін. Мен сындарлы сынға үміттенемін.

Сәттілік тілеймін!

Ақпарат көзі: www.habr.com

пікір қалдыру