Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

Hey Xabr!

Sizga uchadigan samolyotlar yoqadimi? Men buni yaxshi ko'raman, lekin o'zimni izolyatsiya qilish paytida men bitta taniqli resursdan - Aviasalesdan aviachiptalar haqidagi ma'lumotlarni tahlil qilishni yaxshi ko'raman.

Bugun biz Amazon Kinesis ishini tahlil qilamiz, real vaqt tahlili bilan oqim tizimini quramiz, asosiy ma'lumotlarni saqlash joyi sifatida Amazon DynamoDB NoSQL ma'lumotlar bazasini o'rnatamiz va qiziqarli chiptalar uchun SMS-xabarnomalarni o'rnatamiz.

Barcha tafsilotlar kesma ostida! Bor!

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

kirish

Misol uchun, bizga kirish kerak Aviasales API. Unga kirish bepul va cheklovlarsiz taqdim etiladi, ma'lumotlarga kirish uchun API tokenini olish uchun "Ishlab chiquvchilar" bo'limida ro'yxatdan o'tishingiz kifoya.

Ushbu maqolaning asosiy maqsadi AWS-da axborot oqimidan foydalanish haqida umumiy tushuncha berishdir; biz foydalanilgan API tomonidan qaytarilgan ma'lumotlar qat'iy yangilanmaganligini va keshdan uzatilishini hisobga olamiz. Aviasales.ru va Jetradar.com saytlari foydalanuvchilarining oxirgi 48 soat davomida qilgan qidiruvlari asosida tuzilgan.

API orqali qabul qilingan ishlab chiqaruvchi mashinaga o'rnatilgan Kinesis-agent Kinesis Data Analytics orqali ma'lumotlarni avtomatik ravishda tahlil qiladi va kerakli oqimga uzatadi. Ushbu oqimning xom versiyasi to'g'ridan-to'g'ri do'konga yoziladi. DynamoDB-da o'rnatilgan xom ma'lumotlarni saqlash AWS Quick Sight kabi BI vositalari orqali chiptalarni chuqurroq tahlil qilish imkonini beradi.

Biz butun infratuzilmani joylashtirishning ikkita variantini ko'rib chiqamiz:

  • Qo'llanma - AWS Management Console orqali;
  • Terraform kodidagi infratuzilma dangasa avtomatlar uchun;

Rivojlangan tizimning arxitekturasi

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ishlatilgan komponentlar:

  • Aviasales API — ushbu API tomonidan qaytarilgan maʼlumotlar barcha keyingi ishlar uchun ishlatiladi;
  • EC2 ishlab chiqaruvchi namunasi - kirish ma'lumotlar oqimi yaratiladigan bulutdagi oddiy virtual mashina:
    • Kinesis agenti Kinesis (Kinesis Data Streams yoki Kinesis Firehose) ga maʼlumotlarni toʻplash va joʻnatishning oson yoʻlini taʼminlovchi kompyuterda mahalliy oʻrnatilgan Java ilovasi. Agent ko'rsatilgan kataloglardagi fayllar to'plamini doimiy ravishda kuzatib boradi va Kinesis-ga yangi ma'lumotlarni yuboradi;
    • API Caller skripti — API ga so‘rovlar yuboradigan va javobni Kinesis agenti tomonidan nazorat qilinadigan papkaga joylashtiradigan Python skripti;
  • Kinesis ma'lumotlar oqimlari — keng masshtablash imkoniyatlariga ega real vaqt rejimida maʼlumotlarni uzatish xizmati;
  • Kinesis tahlili real vaqtda oqim ma'lumotlarini tahlil qilishni soddalashtiradigan serversiz xizmatdir. Amazon Kinesis Data Analytics ilova resurslarini sozlaydi va kiruvchi maʼlumotlarning istalgan hajmini boshqarish uchun avtomatik ravishda masshtablaydi;
  • AWS Lambda — serverlarning zaxira nusxasini yaratmasdan yoki o‘rnatmasdan kodni ishga tushirish imkonini beruvchi xizmat. Barcha hisoblash quvvati har bir qo'ng'iroq uchun avtomatik ravishda o'lchanadi;
  • Amazon DynamoDB - Har qanday masshtabda ishlaganda 10 millisekunddan kam kechikishni ta'minlovchi kalit-qiymat juftliklari va hujjatlar ma'lumotlar bazasi. DynamoDB dan foydalanganda serverlarni ta'minlash, tuzatish yoki boshqarishga hojat yo'q. DynamoDB mavjud resurslar miqdorini sozlash va yuqori samaradorlikni saqlash uchun jadvallarni avtomatik ravishda o'lchaydi. Tizim boshqaruvi talab qilinmaydi;
  • Amazon SNS - nashriyotchi-obunachi (Pub/Sub) modelidan foydalangan holda xabarlarni yuborish uchun to'liq boshqariladigan xizmat, uning yordamida siz mikroservislarni, taqsimlangan tizimlarni va serversiz ilovalarni izolyatsiya qilishingiz mumkin. SNS mobil push-bildirishnomalar, SMS xabarlar va elektron pochta orqali oxirgi foydalanuvchilarga ma'lumot yuborish uchun ishlatilishi mumkin.

Dastlabki mashg'ulot

Ma'lumotlar oqimiga taqlid qilish uchun men Aviasales API tomonidan qaytarilgan aviachipta ma'lumotlaridan foydalanishga qaror qildim. IN hujjatlar Turli usullarning juda keng ro'yxati, keling, ulardan birini olaylik - "Oylik narxlar taqvimi", oyning har bir kuni uchun narxlarni o'tkazmalar soni bo'yicha guruhlangan holda qaytaradi. Agar so'rovda qidiruv oyini ko'rsatmasangiz, ma'lumot joriy oydan keyingi oy uchun qaytariladi.

Shunday qilib, keling, ro'yxatdan o'tamiz va tokenimizni olamiz.

Misol so'rovi quyida keltirilgan:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

So'rovda tokenni ko'rsatish orqali API'dan ma'lumotlarni olishning yuqoridagi usuli ishlaydi, lekin men kirish tokenini sarlavha orqali o'tkazishni afzal ko'raman, shuning uchun biz bu usuldan api_caller.py skriptida foydalanamiz.

Javob misoli:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Yuqoridagi misol API javobida Sankt-Peterburgdan Phukga chipta ko'rsatilgan ... Oh, qanday tush ...
Men Qozondan bo'lganim uchun va Phuket endi "faqat orzu" bo'lib, keling, Sankt-Peterburgdan Qozonga chiptalarni izlaylik.

Sizda allaqachon AWS hisob qaydnomangiz bor deb taxmin qilinadi. Men darhol alohida e'tiborni qaratmoqchimanki, Kinesis va SMS orqali bildirishnomalarni yuborish yillik hisobotga kiritilmagan. Bepul daraja (bepul foydalanish). Ammo shunga qaramay, bir necha dollarni hisobga olgan holda, taklif qilingan tizimni qurish va u bilan o'ynash mumkin. Va, albatta, kerak bo'lmaganidan keyin barcha resurslarni o'chirishni unutmang.

Yaxshiyamki, agar biz oylik bepul limitlarimizni bajarsak, DynamoDb va lambda funksiyalari biz uchun bepul bo'ladi. Masalan, DynamoDB uchun: 25 GB saqlash, 25 WCU/RCU va 100 million so'rov. Va oyiga bir million lambda funktsiyasi qo'ng'iroqlari.

Tizimni qo'lda joylashtirish

Kinesis ma'lumotlar oqimlarini sozlash

Keling, Kinesis Data Streams xizmatiga o'tamiz va ikkita yangi oqim yaratamiz, har biri uchun bitta parcha.

Shard nima?
Shard Amazon Kinesis oqimining asosiy ma'lumotlarni uzatish birligidir. Bitta segment 1 MB/s tezlikda kirish ma'lumotlarini uzatishni va 2 MB / s tezlikda ma'lumotlarni uzatishni ta'minlaydi. Bitta segment sekundiga 1000 tagacha PUT yozuvlarini qo'llab-quvvatlaydi. Ma'lumotlar oqimini yaratishda siz segmentlarning kerakli sonini ko'rsatishingiz kerak. Masalan, siz ikkita segmentli ma'lumotlar oqimini yaratishingiz mumkin. Ushbu ma'lumotlar oqimi sekundiga 2 tagacha PUT yozuvlarini qo'llab-quvvatlovchi 4 MB/s tezlikda kirish ma'lumotlarini uzatishni va 2000 MB/s tezlikda ma'lumotlarni uzatishni ta'minlaydi.

Oqimdagi parchalar qancha ko'p bo'lsa, uning o'tkazuvchanligi shunchalik katta bo'ladi. Asosan, oqimlar shunday miqyosda - parchalarni qo'shish orqali amalga oshiriladi. Lekin sizda qancha ko'p parcha bo'lsa, narx shunchalik yuqori bo'ladi. Har bir parcha soatiga 1,5 sent va har bir million PUT foydali yuk birligi uchun qo'shimcha 1.4 sent turadi.

Nomi bilan yangi oqim yarataylik aviachiptalar, unga 1 ta parcha kifoya qiladi:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Endi nom bilan boshqa mavzu yaratamiz maxsus_oqim:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

Ishlab chiqaruvchini sozlash

Vazifani tahlil qilish uchun ma'lumotlar ishlab chiqaruvchisi sifatida oddiy EC2 nusxasidan foydalanish kifoya. Bu kuchli, qimmat virtual mashina bo'lishi shart emas; spot t2.micro juda yaxshi ishlaydi.

Muhim eslatma: masalan, siz tasvirdan foydalanishingiz kerak - Amazon Linux AMI 2018.03.0, u Kinesis agentini tezda ishga tushirish uchun kamroq sozlamalarga ega.

EC2 xizmatiga o'ting, yangi virtual mashina yarating, Bepul darajaga kiritilgan t2.micro turi bilan kerakli AMI ni tanlang:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Yangi yaratilgan virtual mashina Kinesis xizmati bilan o'zaro aloqada bo'lishi uchun unga huquqlar berilishi kerak. Buning eng yaxshi usuli IAM rolini belgilashdir. Shuning uchun, 3-qadam: Namuna tafsilotlarini sozlash ekranida siz tanlashingiz kerak Yangi IAM rolini yarating:

EC2 uchun IAM rolini yaratish
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ochilgan oynada biz EC2 uchun yangi rol yaratayotganimizni tanlang va Ruxsatlar bo'limiga o'ting:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Trening misolidan foydalanib, biz resurs huquqlarining batafsil konfiguratsiyasining barcha nozikliklarini o'rganishimiz shart emas, shuning uchun biz Amazon tomonidan oldindan sozlangan siyosatlarni tanlaymiz: AmazonKinesisFullAccess va CloudWatchFullAccess.

Keling, ushbu rolga ma'noli nom beraylik, masalan: EC2-KinesisStreams-FullAccess. Natija quyidagi rasmda ko'rsatilganidek bo'lishi kerak:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ushbu yangi rolni yaratgandan so'ng, uni yaratilgan virtual mashina namunasiga qo'shishni unutmang:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Biz ushbu ekranda boshqa hech narsani o'zgartirmaymiz va keyingi oynalarga o'tamiz.

Qattiq disk sozlamalari, shuningdek, teglar sifatida sukut bo'yicha qoldirilishi mumkin (garchi teglardan foydalanish yaxshi amaliyot bo'lsa-da, hech bo'lmaganda misolga nom bering va muhitni ko'rsating).

Endi biz 6-qadam: Xavfsizlik guruhini sozlash yorlig'ida turibmiz, bu erda siz yangisini yaratishingiz yoki mavjud xavfsizlik guruhini belgilashingiz kerak, bu sizga ssh (port 22) orqali misolga ulanish imkonini beradi. U erda Manba -> Mening IP-ni tanlang va siz misolni ishga tushirishingiz mumkin.

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ishlash holatiga o'tishi bilan siz unga ssh orqali ulanishga harakat qilishingiz mumkin.

Kinesis Agent bilan ishlash uchun, mashinaga muvaffaqiyatli ulangandan so'ng, terminalda quyidagi buyruqlarni kiritishingiz kerak:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API javoblarini saqlash uchun papka yarataylik:

sudo mkdir /var/log/airline_tickets

Agentni ishga tushirishdan oldin uning konfiguratsiyasini sozlashingiz kerak:

sudo vim /etc/aws-kinesis/agent.json

agent.json faylining mazmuni quyidagicha ko'rinishi kerak:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Konfiguratsiya faylidan ko'rinib turibdiki, agent /var/log/airline_tickets/ katalogidagi .log kengaytmali fayllarni kuzatib boradi, ularni tahlil qiladi va aviachiptalar oqimiga o'tkazadi.

Xizmatni qayta ishga tushiramiz va uning ishlayotganiga ishonch hosil qilamiz:

sudo service aws-kinesis-agent restart

Endi API dan ma'lumotlarni so'raydigan Python skriptini yuklab olaylik:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py skripti Aviasales'dan ma'lumotlarni so'raydi va olingan javobni Kinesis agenti skanerlaydigan katalogga saqlaydi. Ushbu skriptni amalga oshirish juda standart, TicketsApi klassi mavjud, u APIni asinxron ravishda tortib olishga imkon beradi. Biz ushbu sinfga token va so'rov parametrlari bilan sarlavhani uzatamiz:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Agentning to'g'ri sozlamalari va funksionalligini tekshirish uchun keling, api_caller.py skriptini sinab ko'raylik:

sudo ./api_caller.py TOKEN

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Va biz ish natijalarini Agent jurnallarida va aviakompaniyalar ma'lumotlar oqimidagi Monitoring yorlig'ida ko'rib chiqamiz:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ko'rib turganingizdek, hamma narsa ishlaydi va Kinesis agenti ma'lumotlarni oqimga muvaffaqiyatli yuboradi. Endi iste'molchini sozlaymiz.

Kinesis Data Analytics-ni sozlash

Keling, butun tizimning markaziy komponentiga o'tamiz - Kinesis Data Analytics-da kinesis_analytics_airlines_app nomli yangi dastur yarating:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Kinesis Data Analytics sizga SQL tilidan foydalangan holda Kinesis Streams-dan real vaqt rejimida ma'lumotlar tahlilini amalga oshirish imkonini beradi. Bu to'liq avtomatik o'lchov xizmati (Kinesis Streams-dan farqli o'laroq):

  1. manba ma'lumotlariga so'rovlar asosida yangi oqimlarni (Output Stream) yaratishga imkon beradi;
  2. ilovalar ishlayotganda yuzaga kelgan xatolar bilan oqim beradi (Error Stream);
  3. kirish ma'lumotlari sxemasini avtomatik ravishda aniqlashi mumkin (kerak bo'lsa, uni qo'lda qayta belgilash mumkin).

Bu arzon xizmat emas - ish soatiga 0.11 AQSh dollari, shuning uchun uni ehtiyotkorlik bilan ishlatishingiz va tugatgandan so'ng uni o'chirishingiz kerak.

Ilovani ma'lumotlar manbasiga ulaymiz:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Biz ulanadigan oqimni tanlang (aviakompaniya_chiptalari):

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Keyinchalik, dastur oqimdan o'qishi va oqimga yozishi uchun yangi IAM rolini biriktirishingiz kerak. Buning uchun kirish ruxsatnomalari blokida hech narsani o'zgartirmaslik kifoya:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Endi oqimdagi ma'lumotlar sxemasini topishni so'raymiz; buning uchun "Sxemani kashf qilish" tugmasini bosing. Natijada, IAM roli yangilanadi (yangisi yaratiladi) va oqimga allaqachon kelgan ma'lumotlardan sxemani aniqlash ishga tushiriladi:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Endi siz SQL muharririga o'tishingiz kerak. Ushbu tugmani bosganingizda, dasturni ishga tushirishingizni so'ragan oyna paydo bo'ladi - ishga tushirmoqchi bo'lgan narsani tanlang:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
SQL muharriri oynasiga quyidagi oddiy so'rovni kiriting va SQLni saqlash va ishga tushirish tugmasini bosing:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Relyatsion ma'lumotlar bazalarida siz yozuvlarni qo'shish uchun INSERT va ma'lumotlarni so'rash uchun SELECT iborasidan foydalangan holda jadvallar bilan ishlaysiz. Amazon Kinesis Data Analytics’da siz oqimlar (STREAMlar) va nasoslar (PUMPlar) bilan ishlaysiz — ilovadagi bir oqimdan ma’lumotlarni boshqa oqimga qo‘shadigan doimiy kiritish so‘rovlari.

Yuqorida keltirilgan SQL so'rovi Aeroflot chiptalarini besh ming rubldan past narxda qidiradi. Ushbu shartlarga javob beradigan barcha yozuvlar DESTINATION_SQL_STREAM oqimiga joylashtiriladi.

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Destination blokida maxsus_stream oqimini va ilova ichidagi oqim nomi DESTINATION_SQL_STREAM ochiladigan roʻyxatidan tanlang:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Barcha manipulyatsiyalarning natijasi quyidagi rasmga o'xshash bo'lishi kerak:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

SNS mavzusini yaratish va unga obuna bo'lish

Oddiy bildirishnoma xizmatiga o'ting va u erda Aviakompaniyalar nomi bilan yangi mavzu yarating:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Ushbu mavzuga obuna bo'ling va SMS-xabarnomalar yuboriladigan mobil telefon raqamini ko'rsating:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

DynamoDB da jadval yarating

Ularning airline_tickets oqimidagi dastlabki ma'lumotlarni saqlash uchun keling, DynamoDB-da xuddi shu nomdagi jadval yarataylik. Asosiy kalit sifatida record_id dan foydalanamiz:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

Lambda funksiyasi kollektorini yaratish

Keling, Kollektor deb nomlangan lambda funktsiyasini yarataylik, uning vazifasi aviakompaniyalar oqimini so'rash va agar u erda yangi yozuvlar topilsa, bu yozuvlarni DynamoDB jadvaliga kiriting. Shubhasiz, standart huquqlarga qo'shimcha ravishda, bu lambda Kinesis ma'lumotlar oqimiga o'qish va DynamoDB-ga yozish huquqiga ega bo'lishi kerak.

Kollektor lambda funksiyasi uchun IAM rolini yaratish
Birinchidan, Lambda-TicketsProcessingRole nomli lambda uchun yangi IAM rolini yarataylik:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Sinov misoli uchun, quyidagi rasmda ko'rsatilganidek, oldindan sozlangan AmazonKinesisReadOnlyAccess va AmazonDynamoDBFullAccess siyosatlari juda mos keladi:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

Bu lambda Airline_stream ga yangi yozuvlar kirganda Kinesis triggeri tomonidan ishga tushirilishi kerak, shuning uchun biz yangi trigger qo'shishimiz kerak:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Qolgan narsa kodni kiritish va lambdani saqlashdir.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funksiyasi bildirishnomasini yaratish

Ikkinchi oqimni (special_stream) kuzatadigan va SNS-ga bildirishnoma yuboradigan ikkinchi lambda funktsiyasi xuddi shunday tarzda yaratilgan. Shuning uchun, bu lambda Kinesis-dan o'qish va ma'lum bir SNS mavzusiga xabarlar yuborish huquqiga ega bo'lishi kerak, keyinchalik ular SNS xizmati tomonidan ushbu mavzuning barcha abonentlariga (elektron pochta, SMS va boshqalar) yuboriladi.

IAM rolini yaratish
Birinchidan, biz ushbu lambda uchun Lambda-KinesisAlarm IAM rolini yaratamiz va keyin bu rolni yaratilayotgan alarm_notifier lambdaga tayinlaymiz:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

Bu lambda maxsus_streamga kirish uchun yangi yozuvlar uchun triggerda ishlashi kerak, shuning uchun siz triggerni xuddi biz Kollektor lambda uchun qilganimizdek sozlashingiz kerak.

Ushbu lambdani sozlashni osonlashtirish uchun keling, yangi muhit o'zgaruvchisini kiritamiz - TOPIC_ARN, bu erda biz Aviakompaniyalar mavzusining ANR (Amazon Resurs nomlari) ni joylashtiramiz:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Va lambda kodini kiriting, bu umuman murakkab emas:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Bu erda tizimni qo'lda sozlash tugallanganga o'xshaydi. Qolgan narsa - sinab ko'rish va biz hamma narsani to'g'ri sozlaganimizga ishonch hosil qilish.

Terraform kodidan tarqating

Kerakli tayyorgarlik

Terraform koddan infratuzilmani o'rnatish uchun juda qulay ochiq manbali vositadir. Uning o'z sintaksisi bor, uni o'rganish oson va qanday va nimani joylashtirishga oid ko'plab misollar mavjud. Atom muharriri yoki Visual Studio Code Terraform bilan ishlashni osonlashtiradigan ko'plab qulay plaginlarga ega.

Siz tarqatishni yuklab olishingiz mumkin shu yerda. Terraformning barcha imkoniyatlarini batafsil tahlil qilish ushbu maqola doirasidan tashqarida, shuning uchun biz asosiy fikrlar bilan cheklanamiz.

Qanday ishlash kerak

Loyihaning to'liq kodi mening omborimda. Biz omborni o'zimizga klonlaymiz. Boshlashdan oldin, sizda AWS CLI o'rnatilgan va sozlanganligiga ishonch hosil qilishingiz kerak, chunki... Terraform ~/.aws/credentials faylida hisobga olish ma'lumotlarini qidiradi.

Terraform hozirda bulutda biz uchun nima yaratayotganini ko'rish uchun butun infratuzilmani o'rnatishdan oldin reja buyrug'ini bajarish yaxshi amaliyotdir:

terraform.exe plan

Sizdan bildirishnomalarni yuborish uchun telefon raqamini kiritish so'raladi. Ushbu bosqichda uni kiritish shart emas.

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Dasturning ishlash rejasini tahlil qilib, biz resurslarni yaratishni boshlashimiz mumkin:

terraform.exe apply

Ushbu buyruqni yuborganingizdan so'ng, sizdan yana telefon raqamini kiritish so'raladi, amallarni bajarish haqida savol paydo bo'lganda "ha" ni tering. Bu sizga butun infratuzilmani o'rnatishga, EC2 ning barcha kerakli konfiguratsiyasini bajarishga, lambda funktsiyalarini o'rnatishga va hokazolarga imkon beradi.

Terraform kodi orqali barcha resurslar muvaffaqiyatli yaratilgandan so'ng, siz Kinesis Analytics ilovasining tafsilotlariga o'tishingiz kerak (afsuski, buni to'g'ridan-to'g'ri koddan qanday qilishni topa olmadim).

Ilovani ishga tushiring:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Shundan so'ng, siz ochiladigan ro'yxatdan tanlash orqali ilova ichidagi oqim nomini aniq belgilashingiz kerak:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Endi hamma narsa borishga tayyor.

Ilovani sinovdan o'tkazish

Tizimni qo'lda yoki Terraform kodi orqali qanday joylashtirganingizdan qat'i nazar, u xuddi shunday ishlaydi.

Biz SSH orqali Kinesis Agent o'rnatilgan EC2 virtual mashinasiga kiramiz va api_caller.py skriptini ishga tushiramiz.

sudo ./api_caller.py TOKEN

Raqamingizga SMS kelishini kutish kifoya:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
SMS - telefonga xabar deyarli 1 daqiqada keladi:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi
Yozuvlar keyingi, batafsil tahlil qilish uchun DynamoDB ma'lumotlar bazasida saqlanganmi yoki yo'qligini ko'rish kerak. Airline_tickets jadvali taxminan quyidagi ma'lumotlarni o'z ichiga oladi:

Aviasales API-ning Amazon Kinesis bilan integratsiyasi va serversiz soddaligi

xulosa

Bajarilgan ishlar davomida Amazon Kinesis asosida onlayn ma'lumotlarni qayta ishlash tizimi qurildi. Kinesis agentidan Kinesis Data Streams va real vaqt rejimida SQL buyruqlari yordamida Kinesis Analytics tahlillari bilan birgalikda foydalanish variantlari, shuningdek Amazon Kinesisning boshqa AWS xizmatlari bilan oʻzaro taʼsiri koʻrib chiqildi.

Biz yuqoridagi tizimni ikki usulda joylashtirdik: ancha uzoq qo'lda va Terraform kodidan tezkor.

Loyihaning barcha manba kodi mavjud mening GitHub omborimda, Men sizga u bilan tanishishingizni tavsiya qilaman.

Men maqolani muhokama qilishdan xursandman, sharhlaringizni kutaman. Men konstruktiv tanqidga umid qilaman.

Omad tilayman!

Manba: www.habr.com

a Izoh qo'shish