Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Hey Habr!

Uçan təyyarələri sevirsən? Mən bunu sevirəm, amma özünütəcrid zamanı mən də bir tanınmış resursdan - Aviasalesdən aviabiletlər haqqında məlumatları təhlil etməyə aşiq oldum.

Bu gün biz Amazon Kinesis-in işini təhlil edəcəyik, real vaxt analitikası ilə axın sistemi quracağıq, əsas məlumat yaddaşı kimi Amazon DynamoDB NoSQL verilənlər bazasını quraşdıracaq və maraqlı biletlər üçün SMS bildirişləri quraşdıracağıq.

Bütün detallar kəsik altındadır! Get!

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Giriş

Məsələn, bizə giriş lazımdır Aviasales API. Ona giriş pulsuz və məhdudiyyətsiz təmin edilir; məlumatlara daxil olmaq üçün API nişanınızı almaq üçün sadəcə olaraq “Tərtibatçılar” bölməsində qeydiyyatdan keçməlisiniz.

Bu məqalənin əsas məqsədi AWS-də məlumat axınının istifadəsi haqqında ümumi anlayış verməkdir; biz nəzərə alırıq ki, istifadə olunan API tərəfindən qaytarılan məlumatlar ciddi şəkildə yeni deyil və keşdən ötürülür. Aviasales.ru və Jetradar.com saytlarının istifadəçilərinin son 48 saat ərzində apardıqları axtarışlar əsasında formalaşıb.

İstehsalçı maşında quraşdırılmış, API vasitəsilə qəbul edilən Kinesis-agent Kinesis Data Analytics vasitəsilə məlumatları avtomatik olaraq təhlil edəcək və istənilən axına ötürəcək. Bu axının xam versiyası birbaşa mağazaya yazılacaq. DynamoDB-də yerləşdirilmiş xam məlumat yaddaşı AWS Quick Sight kimi BI alətləri vasitəsilə daha dərin bilet təhlilinə imkan verəcək.

Bütün infrastrukturu yerləşdirmək üçün iki variantı nəzərdən keçirəcəyik:

  • Manual - AWS Management Console vasitəsilə;
  • Terraform kodundan infrastruktur tənbəl avtomatlar üçündür;

İnkişaf etmiş sistemin arxitekturası

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İstifadə olunan komponentlər:

  • Aviasales API — bu API tərəfindən qaytarılan məlumatlar bütün sonrakı işlər üçün istifadə olunacaq;
  • EC2 İstehsalçı Nümunəsi — giriş məlumat axınının yaradılacağı buludda müntəzəm virtual maşın:
    • Kinesis agenti Kinesis (Kinesis Data Streams və ya Kinesis Firehose)-a məlumat toplamaq və göndərmək üçün asan yol təqdim edən maşında yerli olaraq quraşdırılmış Java proqramıdır. Agent müəyyən edilmiş kataloqlardakı bir sıra faylları daim izləyir və Kinesis-ə yeni məlumatlar göndərir;
    • API Zəng edən Skript — API-yə sorğu göndərən və cavabı Kinesis Agenti tərəfindən nəzarət edilən qovluğa qoyan Python skripti;
  • Kinesis məlumat axınları — geniş miqyaslama imkanlarına malik real vaxt rejimində məlumat axını xidməti;
  • Kinesis Analytics real vaxt rejimində axın məlumatlarının təhlilini asanlaşdıran serversiz xidmətdir. Amazon Kinesis Data Analytics proqram resurslarını konfiqurasiya edir və daxil olan məlumatların istənilən həcmini idarə etmək üçün avtomatik miqyaslaşdırır;
  • AWS Lambda — serverlərin ehtiyat nüsxəsini çıxarmadan və ya quraşdırmadan kodu işlətməyə imkan verən xidmət. Bütün hesablama gücü hər zəng üçün avtomatik olaraq ölçülür;
  • Amazon DynamoDB - İstənilən miqyasda işləyərkən 10 millisaniyədən az gecikmə təmin edən açar-dəyər cütləri və sənədlər verilənlər bazası. DynamoDB-dən istifadə edərkən hər hansı serveri təmin etmək, yamaq və ya idarə etmək lazım deyil. DynamoDB mövcud resursların miqdarını tənzimləmək və yüksək performansı qorumaq üçün cədvəlləri avtomatik olaraq miqyaslandırır. Sistem idarəçiliyinə ehtiyac yoxdur;
  • Amazon SNS - naşir-abunəçi (Pub/Sub) modelindən istifadə edərək mesajların göndərilməsi üçün tam idarə olunan xidmət, onun köməyi ilə mikroservisləri, paylanmış sistemləri və serversiz proqramları təcrid edə bilərsiniz. SNS mobil push bildirişləri, SMS mesajları və e-poçt vasitəsilə son istifadəçilərə məlumat göndərmək üçün istifadə edilə bilər.

İlkin məşq

Məlumat axınını təqlid etmək üçün Aviasales API tərəfindən qaytarılan aviabilet məlumatından istifadə etmək qərarına gəldim. IN sənədləşdirmə müxtəlif üsulların olduqca geniş siyahısı, onlardan birini götürək - köçürmələrin sayına görə ayın hər günü üçün qiymətləri qaytaran "Aylıq Qiymət Təqvimi". Sorğuda axtarış ayını göstərməsəniz, məlumat cari aydan sonrakı ay üçün qaytarılacaq.

Beləliklə, gəlin qeydiyyatdan keçək və nişanımızı əldə edək.

Nümunə sorğu aşağıdadır:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Sorğuda token göstərməklə API-dən məlumatların qəbulunun yuxarıdakı üsulu işləyəcək, lakin mən giriş işarəsini başlıqdan keçirməyi üstün tuturam, ona görə də bu metoddan api_caller.py skriptində istifadə edəcəyik.

Cavab nümunəsi:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Yuxarıdakı nümunə API cavabı Sankt-Peterburqdan Phuka bileti göstərir... Oh, nə yuxudur...
Mən Kazanlı olduğum üçün və Phuket indi “yalnız bir yuxu” olduğundan, gəlin Sankt-Peterburqdan Kazana bilet axtaraq.

Bu, artıq bir AWS hesabınızın olduğunu güman edir. Dərhal xüsusi diqqəti cəlb etmək istərdim ki, Kinesis və SMS vasitəsilə bildirişlər göndərmək illik hesaba daxil deyil. Pulsuz səviyyə (pulsuz istifadə). Ancaq buna baxmayaraq, bir neçə dollar nəzərə alınmaqla, təklif olunan sistemi qurmaq və onunla oynamaq tamamilə mümkündür. Və təbii ki, bütün resurslara ehtiyac qalmadıqdan sonra onları silməyi unutmayın.

Xoşbəxtlikdən, aylıq pulsuz limitlərimizə əməl etsək, DynamoDb və lambda funksiyaları bizim üçün pulsuz olacaq. Məsələn, DynamoDB üçün: 25 GB yaddaş, 25 WCU/RCU və 100 milyon sorğu. Və ayda bir milyon lambda funksiyası çağırışı.

Sistemin əl ilə yerləşdirilməsi

Kinesis Data Streams qurulması

Gəlin Kinesis Data Streams xidmətinə keçək və hər biri üçün bir parça olmaqla iki yeni axın yaradaq.

Qırıntı nədir?
Parça, Amazon Kinesis axınının əsas məlumat ötürmə vahididir. Bir seqment 1 MB/s sürətlə daxilolma məlumatlarının ötürülməsini və 2 MB/s sürətlə çıxış məlumatlarının ötürülməsini təmin edir. Bir seqment saniyədə 1000-ə qədər PUT girişini dəstəkləyir. Məlumat axını yaratarkən, lazımi sayda seqmentləri təyin etməlisiniz. Məsələn, iki seqmentli məlumat axını yarada bilərsiniz. Bu məlumat axını saniyədə 2-ə qədər PUT qeydini dəstəkləyərək, 4 MB/s sürətlə daxilolma məlumatlarının ötürülməsini və 2000 MB/s sürətlə çıxış məlumat ötürülməsini təmin edəcək.

Yayımınızda nə qədər çox fraqment varsa, onun ötürmə qabiliyyəti bir o qədər çox olur. Prinsipcə, axınlar belə ölçülür - qırıqlar əlavə etməklə. Ancaq nə qədər çox parça varsa, qiymət bir o qədər yüksəkdir. Hər bir parça saatda 1,5 sent və hər milyon PUT faydalı yük vahidi üçün əlavə 1.4 sentə başa gəlir.

Gəlin adı ilə yeni axın yaradaq aviabiletlər, 1 qəlpə ona kifayət edəcək:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İndi adı ilə başqa mövzu yaradaq xüsusi_axın:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

İstehsalçı quraşdırma

Tapşırığı təhlil etmək üçün məlumat istehsalçısı kimi adi EC2 instansiyasından istifadə etmək kifayətdir. Bunun güclü, bahalı virtual maşın olması lazım deyil; bir spot t2.micro yaxşı işləyəcək.

Vacib qeyd: məsələn, təsvirdən istifadə etməlisiniz - Amazon Linux AMI 2018.03.0, onun Kinesis Agentini tez işə salmaq üçün daha az parametrləri var.

EC2 xidmətinə keçin, yeni virtual maşın yaradın, Pulsuz Tierə daxil olan t2.micro növü ilə istədiyiniz AMI seçin:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Yeni yaradılmış virtual maşının Kinesis xidməti ilə qarşılıqlı əlaqə qura bilməsi üçün ona bunu etmək hüququ verilməlidir. Bunu etməyin ən yaxşı yolu IAM Rolu təyin etməkdir. Buna görə də, Addım 3: Nümunə Təfərrüatlarını Konfiqurasiya ekranında siz seçməlisiniz Yeni IAM Rolu yaradın:

EC2 üçün IAM rolunun yaradılması
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Açılan pəncərədə EC2 üçün yeni rol yaratdığımızı seçin və İcazələr bölməsinə keçin:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Təlim nümunəsindən istifadə edərək, biz resurs hüquqlarının dənəvər konfiqurasiyasının bütün incəliklərinə daxil olmaq məcburiyyətində deyilik, ona görə də Amazon tərəfindən əvvəlcədən konfiqurasiya edilmiş siyasətləri seçəcəyik: AmazonKinesisFullAccess və CloudWatchFullAccess.

Bu rol üçün bəzi mənalı ad verək, məsələn: EC2-KinesisStreams-FullAccess. Nəticə aşağıdakı şəkildə göstərildiyi kimi olmalıdır:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Bu yeni rolu yaratdıqdan sonra onu yaradılmış virtual maşın nümunəsinə əlavə etməyi unutmayın:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Bu ekranda başqa heç nəyi dəyişmirik və növbəti pəncərələrə keçirik.

Sərt disk parametrləri, həmçinin teqlər kimi standart olaraq buraxıla bilər (baxmayaraq ki, teqlərdən istifadə etmək yaxşı təcrübədir, heç olmasa nümunəyə ad verin və mühiti göstərin).

İndi biz 6-cı Addım: Təhlükəsizlik Qrupunu Konfiqurasiya et sekmesindeyik, burada yenisini yaratmalı və ya ssh (port 22) vasitəsilə instansiyaya qoşulmağa imkan verən mövcud Təhlükəsizlik qrupunu təyin etməlisiniz. Orada Mənbə -> Mənim IP-ni seçin və nümunəni işə sala bilərsiniz.

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İşləyən vəziyyətə keçən kimi ona ssh vasitəsilə qoşulmağa cəhd edə bilərsiniz.

Kinesis Agent ilə işləyə bilmək üçün maşına uğurla qoşulduqdan sonra terminalda aşağıdakı əmrləri daxil etməlisiniz:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API cavablarını saxlamaq üçün qovluq yaradaq:

sudo mkdir /var/log/airline_tickets

Agenti işə salmazdan əvvəl onun konfiqurasiyasını konfiqurasiya etməlisiniz:

sudo vim /etc/aws-kinesis/agent.json

agent.json faylının məzmunu belə görünməlidir:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Konfiqurasiya faylından göründüyü kimi, agent /var/log/airline_tickets/ kataloqunda .log uzantılı fayllara nəzarət edəcək, onları təhlil edəcək və aviabiletlər axınına köçürəcək.

Xidməti yenidən işə salırıq və onun işlək vəziyyətdə olduğundan əmin oluruq:

sudo service aws-kinesis-agent restart

İndi API-dən məlumat tələb edəcək Python skriptini yükləyək:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py skripti Aviasales-dən məlumat tələb edir və alınan cavabı Kinesis agentinin skan etdiyi kataloqda saxlayır. Bu skriptin tətbiqi olduqca standartdır, TicketsApi sinfi var, o, API-ni asinxron şəkildə çəkməyə imkan verir. Bu sinifə işarə və sorğu parametrləri olan başlığı ötürürük:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Agentin düzgün parametrlərini və funksionallığını yoxlamaq üçün api_caller.py skriptini sınaqdan keçirək:

sudo ./api_caller.py TOKEN

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Və biz Agent jurnallarında və airline_tickets məlumat axınındakı Monitorinq sekmesinde işin nəticəsinə baxırıq:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Gördüyünüz kimi, hər şey işləyir və Kinesis Agent məlumat axınına uğurla göndərir. İndi istehlakçı konfiqurasiya edək.

Kinesis Data Analytics-in qurulması

Gəlin bütün sistemin mərkəzi komponentinə keçək - Kinesis Data Analytics-də kinesis_analytics_airlines_app adlı yeni proqram yaradın:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Kinesis Data Analytics sizə SQL dilindən istifadə edərək Kinesis Streams-dən real vaxt rejimində məlumat analitikasını həyata keçirməyə imkan verir. Bu, tam avtomatik ölçmə xidmətidir (Kinesis Streams-dən fərqli olaraq):

  1. mənbə məlumatlarına sorğular əsasında yeni axınlar (Çıxış axını) yaratmağa imkan verir;
  2. proqramlar işləyərkən baş verən xətalarla axını təmin edir (Error Stream);
  3. giriş məlumatlarının sxemini avtomatik müəyyən edə bilər (lazım olduqda əl ilə yenidən təyin oluna bilər).

Bu, ucuz xidmət deyil - iş saatı üçün 0.11 ABŞ dolları, ona görə də ondan ehtiyatla istifadə etməli və işiniz bitdikdən sonra onu silməlisiniz.

Proqramı məlumat mənbəyinə bağlayaq:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Qoşulacağımız axını seçin (havayolu_biletləri):

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Sonra, yeni IAM Rolu əlavə etməlisiniz ki, proqram axından oxuya və axına yaza bilsin. Bunun üçün Giriş icazələri blokunda heç nəyi dəyişməmək kifayətdir:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İndi axındakı məlumat sxeminin aşkar edilməsini tələb edək; bunun üçün “Sxemi kəşf et” düyməsini klikləyin. Nəticədə, IAM rolu yenilənəcək (yenisi yaradılacaq) və axına artıq daxil olan məlumatlardan sxem aşkarlanması işə salınacaq:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İndi SQL redaktoruna keçməlisiniz. Bu düyməni kliklədiyiniz zaman proqramı işə salmağı xahiş edən bir pəncərə görünəcək - başlamaq istədiyinizi seçin:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Aşağıdakı sadə sorğunu SQL redaktoru pəncərəsinə daxil edin və SQL-i Saxla və Çalıştır düyməsini basın:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Əlaqəli verilənlər bazalarında siz qeydlər əlavə etmək üçün INSERT ifadələrindən və sorğu məlumatlarına SELECT ifadəsindən istifadə edərək cədvəllərlə işləyirsiniz. Amazon Kinesis Data Analytics-də siz axınlarla (STREAM-lar) və nasoslarla (PUMP-lar) işləyirsiniz - proqramdakı bir axından məlumatları digər axına daxil edən davamlı daxiletmə sorğuları.

Yuxarıda təqdim olunan SQL sorğusu beş min rubldan aşağı qiymətə Aeroflot biletlərini axtarır. Bu şərtlərə cavab verən bütün qeydlər DESTINATION_SQL_STREAM axınına yerləşdiriləcək.

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Təyinat blokunda xüsusi_axın axını və tətbiqdaxili axın adı DESTINATION_SQL_STREAM açılan siyahısında seçin:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Bütün manipulyasiyaların nəticəsi aşağıdakı şəkildəki kimi bir şey olmalıdır:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

SNS mövzusu yaratmaq və ona abunə olmaq

Sadə Bildiriş Xidmətinə gedin və orada Hava Yolları adı ilə yeni mövzu yaradın:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Bu mövzuya abunə olun və SMS bildirişlərinin göndəriləcəyi mobil telefon nömrəsini göstərin:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

DynamoDB-də cədvəl yaradın

Onların airline_tickets axınından xam məlumatları saxlamaq üçün DynamoDB-də eyni adlı cədvəl yaradacağıq. Əsas açar kimi record_id istifadə edəcəyik:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Lambda funksiyası kollektorunun yaradılması

Gəlin Kollektor adlı lambda funksiyasını yaradaq, onun vəzifəsi havayolu_tickets axınını sorğulamaq və orada yeni qeydlər aşkar edilərsə, bu qeydləri DynamoDB cədvəlinə daxil etmək olacaq. Aydındır ki, standart hüquqlara əlavə olaraq, bu lambda Kinesis məlumat axınına oxumaq və DynamoDB-yə yazma imkanı olmalıdır.

Kollektor lambda funksiyası üçün IAM rolunun yaradılması
Əvvəlcə Lambda-TicketsProcessingRole adlı lambda üçün yeni IAM rolu yaradaq:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Test nümunəsi üçün əvvəlcədən konfiqurasiya edilmiş AmazonKinesisReadOnlyAccess və AmazonDynamoDBFullAccess siyasətləri aşağıdakı şəkildə göstərildiyi kimi olduqca uyğundur:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Yeni girişlər airline_stream-ə daxil olduqda, bu lambda Kinesis-dən bir tətiklə işə salınmalıdır, ona görə də yeni bir tətik əlavə etməliyik:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Qalan şey kodu daxil etmək və lambda saxlamaqdır.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda funksiyası bildirişinin yaradılması

İkinci axını (special_stream) izləyəcək və SNS-ə bildiriş göndərəcək ikinci lambda funksiyası da oxşar şəkildə yaradılmışdır. Buna görə də, bu lambda Kinesis-dən oxumaq və müəyyən bir SNS mövzusuna mesaj göndərmək imkanına malik olmalıdır, sonra SNS xidməti tərəfindən bu mövzunun bütün abunəçilərinə (e-poçt, SMS və s.) göndəriləcəkdir.

IAM rolunun yaradılması
Əvvəlcə bu lambda üçün IAM rolunu Lambda-KinesisAlarm yaradırıq və sonra bu rolu yaradılan alarm_notifier lambdasına təyin edirik:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Bu lambda, xüsusi_axına daxil olmaq üçün yeni qeydlər üçün trigger üzərində işləməlidir, ona görə də siz Kollektor lambdası üçün etdiyimiz kimi tətiyi konfiqurasiya etməlisiniz.

Bu lambdanın konfiqurasiyasını asanlaşdırmaq üçün gəlin yeni mühit dəyişənini təqdim edək - TOPIC_ARN, burada Hava Yolları mövzusunun ANR (Amazon Resurs Adları) yerləşdiririk:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Və lambda kodunu daxil edin, heç də mürəkkəb deyil:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Görünür, burada əl ilə sistem konfiqurasiyası tamamlanır. Yalnız test etmək və hər şeyi düzgün konfiqurasiya etdiyimizə əmin olmaq qalır.

Terraform kodundan yerləşdirin

Lazımi hazırlıq

Terraform koddan infrastrukturun yerləşdirilməsi üçün çox rahat açıq mənbə alətidir. Onun öyrənilməsi asan olan öz sintaksisi var və necə və nəyi yerləşdirməyə dair bir çox nümunə var. Atom redaktoru və ya Visual Studio Kodunda Terraform ilə işi asanlaşdıran çoxlu lazımlı plaginlər var.

Dağıtım yükləyə bilərsiniz buradan. Bütün Terraform imkanlarının ətraflı təhlili bu məqalənin əhatə dairəsi xaricindədir, ona görə də biz özümüzü əsas məqamlarla məhdudlaşdıracağıq.

Necə başlamaq lazımdır

Layihənin tam kodu mənim depomda. Biz anbarı özümüzə klonlayırıq. Başlamazdan əvvəl AWS CLI-nin quraşdırıldığına və konfiqurasiya olunduğuna əmin olmalısınız, çünki... Terraform ~/.aws/credentials faylında etimadnamələri axtaracaq.

Terraformun buludda hazırda bizim üçün nə yaratdığını görmək üçün bütün infrastrukturu yerləşdirməzdən əvvəl plan əmrini yerinə yetirmək yaxşı təcrübədir:

terraform.exe plan

Sizdən bildiriş göndərmək üçün telefon nömrəsini daxil etməyiniz xahiş olunacaq. Bu mərhələdə ona daxil olmaq lazım deyil.

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Proqramın əməliyyat planını təhlil etdikdən sonra resurslar yaratmağa başlaya bilərik:

terraform.exe apply

Bu əmri göndərdikdən sonra sizdən yenidən telefon nömrəsini daxil etməyiniz istəniləcək, hərəkətlərin həqiqətən yerinə yetirilməsi ilə bağlı sual göstərildikdə “bəli” yığın. Bu, bütün infrastrukturu qurmağa, EC2-nin bütün lazımi konfiqurasiyasını həyata keçirməyə, lambda funksiyalarını yerləşdirməyə və s.

Terraform kodu vasitəsilə bütün resurslar uğurla yaradıldıqdan sonra Kinesis Analytics tətbiqinin təfərrüatlarına daxil olmalısınız (təəssüf ki, bunu birbaşa koddan necə edəcəyimi tapa bilmədim).

Tətbiqi işə salın:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Bundan sonra, açılan siyahıdan seçməklə tətbiqdaxili axın adını açıq şəkildə təyin etməlisiniz:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
İndi hər şey getməyə hazırdır.

Tətbiqin sınaqdan keçirilməsi

Sistemi əl ilə və ya Terraform kodu vasitəsilə necə yerləşdirməyinizdən asılı olmayaraq, o, eyni şəkildə işləyəcək.

Kinesis Agentin quraşdırıldığı EC2 virtual maşınına SSH vasitəsilə daxil oluruq və api_caller.py skriptini işlədirik.

sudo ./api_caller.py TOKEN

Bunun üçün nömrənizə SMS gəlməsini gözləmək kifayətdir:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
SMS - demək olar ki, 1 dəqiqə ərzində telefonunuza mesaj gəlir:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik
Sonrakı, daha ətraflı təhlil üçün qeydlərin DynamoDB verilənlər bazasında saxlanıb saxlanmadığını görmək qalır. Aviaşirkətlərin_biletləri cədvəli təxminən aşağıdakı məlumatları ehtiva edir:

Amazon Kinesis ilə Aviasales API inteqrasiyası və serversiz sadəlik

Nəticə

Görülən işlərin gedişində Amazon Kinesis əsasında onlayn məlumat emalı sistemi quruldu. Kinesis Agentindən Kinesis Data Streams və SQL əmrlərindən istifadə edən real vaxt analitikası Kinesis Analytics ilə birlikdə istifadə variantları, həmçinin Amazon Kinesis-in digər AWS xidmətləri ilə qarşılıqlı əlaqəsi nəzərdən keçirilib.

Yuxarıdakı sistemi iki şəkildə yerləşdirdik: kifayət qədər uzun bir əl sistemi və Terraform kodundan sürətli.

Bütün layihə mənbə kodu mövcuddur GitHub depomda, Mən sizə onunla tanış olmağı təklif edirəm.

Məqaləni müzakirə etməkdən məmnunam, şərhlərinizi gözləyirəm. Konstruktiv tənqidə ümid edirəm.

Uğurlar diləyirəm!

Mənbə: www.habr.com

Добавить комментарий