Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Ey Habr!

Uçak uçurmayı sever misiniz? Bunu seviyorum ama kişisel izolasyon sırasında tanınmış bir kaynak olan Aviasales'ten gelen uçak biletlerine ilişkin verileri analiz etmeye de aşık oldum.

Bugün Amazon Kinesis'in çalışmalarını analiz edeceğiz, gerçek zamanlı analizlerle bir akış sistemi oluşturacağız, ana veri depolama alanı olarak Amazon DynamoDB NoSQL veritabanını kuracağız ve ilginç biletler için SMS bildirimleri ayarlayacağız.

Tüm detaylar kesim altında! Gitmek!

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Giriş

Örneğin, erişime ihtiyacımız var Aviasales API'si. Erişim ücretsiz ve kısıtlama olmaksızın sağlanır; verilere erişmek için API jetonunuzu almak üzere "Geliştiriciler" bölümüne kaydolmanız yeterlidir.

Bu makalenin temel amacı, AWS'de bilgi akışının kullanımına ilişkin genel bir anlayış kazandırmaktır; kullanılan API tarafından döndürülen verilerin tam olarak güncel olmadığını ve önbellekten aktarıldığını dikkate alıyoruz. Aviasales.ru ve Jetradar.com sitelerinin kullanıcılarının son 48 saatteki aramaları temel alınarak oluşturulmuştur.

Üretim makinesine yüklenen ve API aracılığıyla alınan Kinesis-agent, verileri otomatik olarak ayrıştıracak ve Kinesis Data Analytics aracılığıyla istenen akışa iletecektir. Bu akışın ham sürümü doğrudan mağazaya yazılacaktır. DynamoDB'de dağıtılan ham veri depolama, AWS Quick Sight gibi BI araçları aracılığıyla daha derin bildirim analizi yapılmasına olanak tanıyacak.

Altyapının tamamını dağıtmak için iki seçeneği ele alacağız:

  • Manuel - AWS Yönetim Konsolu aracılığıyla;
  • Terraform kodunun altyapısı tembel otomatörler içindir;

Geliştirilen sistemin mimarisi

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Kullanılan bileşenler:

  • Aviasales API'si — bu API tarafından döndürülen veriler sonraki tüm çalışmalar için kullanılacaktır;
  • EC2 Üretici Örneği — bulutta, giriş veri akışının oluşturulacağı normal bir sanal makine:
    • Kinesis Ajanı makineye yerel olarak yüklenen ve Kinesis'e (Kinesis Data Streams veya Kinesis Firehose) veri toplamanın ve göndermenin kolay bir yolunu sağlayan bir Java uygulamasıdır. Aracı, belirtilen dizinlerdeki bir dizi dosyayı sürekli olarak izler ve yeni verileri Kinesis'e gönderir;
    • API Arayan Komut Dosyası — API'ye istekte bulunan ve yanıtı Kinesis Agent tarafından izlenen bir klasöre koyan bir Python betiği;
  • Kinesis Veri Akışları — geniş ölçeklendirme yeteneklerine sahip gerçek zamanlı veri akışı hizmeti;
  • Kinesis Analizi gerçek zamanlı olarak akış verilerinin analizini kolaylaştıran sunucusuz bir hizmettir. Amazon Kinesis Data Analytics, uygulama kaynaklarını yapılandırır ve gelen her türlü veriyi işleyecek şekilde otomatik olarak ölçeklenir;
  • AWS Lambda — sunucuları yedeklemeden veya ayarlamadan kod çalıştırmanıza olanak tanıyan bir hizmet. Tüm bilgi işlem gücü her arama için otomatik olarak ölçeklendirilir;
  • Amazon DinamoDB - Herhangi bir ölçekte çalışırken 10 milisaniyeden daha az gecikme sağlayan anahtar/değer çiftleri ve belgelerden oluşan bir veritabanı. DynamoDB'yi kullanırken herhangi bir sunucuyu sağlamaya, yama yapmaya veya yönetmeye gerek yoktur. DynamoDB, kullanılabilir kaynak miktarını ayarlamak ve yüksek performansı sürdürmek için tabloları otomatik olarak ölçeklendirir. Hiçbir sistem yönetimine gerek yoktur;
  • Amazon SNS'si - mikro hizmetleri, dağıtılmış sistemleri ve sunucusuz uygulamaları izole edebileceğiniz yayıncı-abone (Pub/Sub) modelini kullanarak mesaj göndermeye yönelik tam olarak yönetilen bir hizmet. SNS, mobil anlık bildirimler, SMS mesajları ve e-postalar aracılığıyla son kullanıcılara bilgi göndermek için kullanılabilir.

İlk eğitim

Veri akışını taklit etmek için Aviasales API'sinin döndürdüğü uçak bileti bilgilerini kullanmaya karar verdim. İÇİNDE belgeleme Farklı yöntemlerden oluşan oldukça kapsamlı bir liste var; bunlardan birini ele alalım: Ayın her günü için transfer sayısına göre gruplanmış fiyatları veren "Aylık Fiyat Takvimi". Talepte arama ayını belirtmezseniz mevcut ayı takip eden aya ait bilgiler döndürülecektir.

O halde kayıt olalım ve jetonumuzu alalım.

Örnek bir istek aşağıdadır:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Yukarıdaki, istekte bir belirteç belirterek API'den veri alma yöntemi işe yarayacaktır, ancak ben erişim belirtecini başlıktan geçirmeyi tercih ediyorum, bu nedenle api_caller.py betiğinde bu yöntemi kullanacağız.

Cevap örneği:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Yukarıdaki örnek API yanıtı, St. Petersburg'dan Phuk'a bir bileti göstermektedir... Ah, ne rüya...
Kazanlı olduğum ve Phuket artık "sadece bir rüya" olduğuna göre, St. Petersburg'dan Kazan'a bilet arayalım.

Zaten bir AWS hesabınızın olduğu varsayılır. Kinesis ve SMS ile bildirim göndermenin yıllık tarifeye dahil olmadığını hemen belirtmek isterim. Ücretsiz Katman (ücretsiz kullanım). Ancak buna rağmen, birkaç doları göz önünde bulundurarak önerilen sistemi kurmak ve onunla oynamak oldukça mümkün. Ve elbette, artık ihtiyaç duyulmadığında tüm kaynakları silmeyi unutmayın.

Neyse ki aylık ücretsiz limitlerimizi karşıladığımız takdirde DynamoDb ve lambda fonksiyonları bizim için ücretsiz olacak. Örneğin DynamoDB için: 25 GB depolama, 25 WCU/RCU ve 100 milyon sorgu. Ve ayda bir milyon lambda işlev çağrısı.

Manuel sistem dağıtımı

Kinesis Veri Akışlarını Ayarlama

Kinesis Veri Akışları hizmetine gidelim ve her biri için bir parça olmak üzere iki yeni akış oluşturalım.

Kırık nedir?
Parça, Amazon Kinesis akışının temel veri aktarım birimidir. Bir segment, 1 MB/s hızında giriş veri aktarımı ve 2 MB/s hızında çıkış veri aktarımı sağlar. Bir segment saniyede 1000'e kadar PUT girişini destekler. Bir veri akışı oluştururken gerekli sayıda segmenti belirtmeniz gerekir. Örneğin iki segmentli bir veri akışı oluşturabilirsiniz. Bu veri akışı, saniyede 2'e kadar PUT kaydını destekleyecek şekilde 4 MB/s'de giriş veri aktarımı ve 2000 MB/s'de çıkış veri aktarımı sağlayacak.

Akışınızda ne kadar çok parça varsa aktarım hızı da o kadar artar. Prensip olarak, akışlar bu şekilde ölçeklendirilir - parçalar eklenerek. Ancak ne kadar çok parçanız varsa, fiyat da o kadar yüksek olur. Her bir parçanın maliyeti saat başına 1,5 sent ve her milyon PUT yük birimi için ilave 1.4 senttir.

Adıyla yeni bir akış oluşturalım uçak bileti, 1 parça onun için yeterli olacaktır:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Şimdi bu isimde başka bir konu oluşturalım özel_akım:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Üretici kurulumu

Bir görevi analiz etmek için veri üreticisi olarak normal bir EC2 örneğini kullanmak yeterlidir. Güçlü ve pahalı bir sanal makine olmasına gerek yok; spot t2.micro işinizi görecektir.

Önemli not: örneğin, image - Amazon Linux AMI 2018.03.0 kullanmalısınız; Kinesis Agent'ı hızlı bir şekilde başlatmak için daha az ayara sahiptir.

EC2 hizmetine gidin, yeni bir sanal makine oluşturun, Ücretsiz Kullanıma dahil olan t2.micro türüyle istediğiniz AMI'yi seçin:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Yeni oluşturulan sanal makinenin Kinesis hizmetiyle etkileşime girebilmesi için kendisine bu yönde haklar verilmesi gerekir. Bunu yapmanın en iyi yolu bir IAM Rolü atamaktır. Bu nedenle, 3. Adım: Bulut Sunucusu Ayrıntılarını Yapılandır ekranında şunu seçmelisiniz: Yeni IAM Rolü oluştur:

EC2 için IAM rolü oluşturma
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Açılan pencerede EC2 için yeni bir rol oluşturduğumuzu seçin ve İzinler bölümüne gidin:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Eğitim örneğini kullanarak, kaynak haklarının ayrıntılı yapılandırmasının tüm inceliklerine girmemize gerek yok; bu nedenle Amazon tarafından önceden yapılandırılmış politikaları seçeceğiz: AmazonKinesisFullAccess ve CloudWatchFullAccess.

Bu role anlamlı bir isim verelim, örneğin: EC2-KinesisStreams-FullAccess. Sonuç aşağıdaki resimde gösterilenle aynı olmalıdır:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Bu yeni rolü oluşturduktan sonra, oluşturulan sanal makine örneğine eklemeyi unutmayın:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Bu ekranda başka bir değişiklik yapmıyoruz ve sonraki pencerelere geçiyoruz.

Etiketlerin yanı sıra sabit sürücü ayarları da varsayılan olarak bırakılabilir (her ne kadar etiketleri kullanmak iyi bir uygulama olsa da, en azından örneğe bir ad verin ve ortamı belirtin).

Şimdi 6. Adım: Güvenlik Grubunu Yapılandır sekmesindeyiz; burada yeni bir tane oluşturmanız veya ssh (bağlantı noktası 22) aracılığıyla örneğe bağlanmanıza olanak tanıyan mevcut Güvenlik grubunuzu belirtmeniz gerekir. Orada Kaynak -> IP'm'i seçin ve örneği başlatabilirsiniz.

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Çalışma durumuna geçer geçmez ssh üzerinden bağlanmayı deneyebilirsiniz.

Kinesis Agent ile çalışabilmek için makineye başarıyla bağlandıktan sonra terminale aşağıdaki komutları girmelisiniz:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API yanıtlarını kaydetmek için bir klasör oluşturalım:

sudo mkdir /var/log/airline_tickets

Aracıyı başlatmadan önce yapılandırmasını yapılandırmanız gerekir:

sudo vim /etc/aws-kinesis/agent.json

Agent.json dosyasının içeriği şu şekilde görünmelidir:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Konfigürasyon dosyasından da görülebileceği gibi aracı, /var/log/airline_tickets/ dizinindeki .log uzantılı dosyaları izleyecek, ayrıştıracak ve havayolu_tickets akışına aktaracaktır.

Hizmeti yeniden başlatıyoruz ve çalışır durumda olduğundan emin oluyoruz:

sudo service aws-kinesis-agent restart

Şimdi API’den veri isteyecek Python betiğini indirelim:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py betiği, Aviasales'ten veri ister ve alınan yanıtı Kinesis aracısının taradığı dizine kaydeder. Bu betiğin uygulanması oldukça standarttır, bir TicketsApi sınıfı vardır, API'yi eşzamansız olarak çekmenize olanak tanır. Bu sınıfa belirteç ve istek parametreleri içeren bir başlık iletiyoruz:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Aracının doğru ayarlarını ve işlevselliğini test etmek için api_caller.py betiğini çalıştırmayı test edelim:

sudo ./api_caller.py TOKEN

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Ayrıca, Acente günlüklerinde ve havayolu_tickets veri akışındaki İzleme sekmesinde çalışmanın sonucuna bakıyoruz:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Gördüğünüz gibi her şey çalışıyor ve Kinesis Agent, verileri akışa başarıyla gönderiyor. Şimdi tüketiciyi yapılandıralım.

Kinesis Veri Analizini Kurma

Tüm sistemin merkezi bileşenine geçelim - Kinesis Data Analytics'te kinesis_analytics_airlines_app adında yeni bir uygulama oluşturun:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Kinesis Data Analytics, SQL dilini kullanarak Kinesis Streams'ten gerçek zamanlı veri analitiği gerçekleştirmenize olanak tanır. Tamamen otomatik ölçeklendirme hizmetidir (Kinesis Streams'in aksine):

  1. kaynak verilerine yönelik isteklere dayalı olarak yeni akışlar (Çıktı Akışı) oluşturmanıza olanak tanır;
  2. uygulamalar çalışırken oluşan hataları içeren bir akış sağlar (Hata Akışı);
  3. giriş veri şemasını otomatik olarak belirleyebilir (gerekirse manuel olarak yeniden tanımlanabilir).

Bu ucuz bir hizmet değil - çalışma saati başına 0.11 USD, bu yüzden dikkatli kullanmalı ve işiniz bittiğinde silmelisiniz.

Uygulamayı veri kaynağına bağlayalım:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Bağlanacağımız akışı seçin (airline_tickets):

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Daha sonra, uygulamanın akıştan okuyabilmesi ve akışa yazabilmesi için yeni bir IAM Rolü eklemeniz gerekir. Bunu yapmak için Erişim izinleri bloğundaki hiçbir şeyi değiştirmemeniz yeterlidir:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Şimdi akıştaki veri şemasının keşfedilmesini talep edelim; bunun için “Şemayı keşfet” butonuna tıklayın. Sonuç olarak, IAM rolü güncellenecek (yeni bir tane oluşturulacak) ve akışa zaten ulaşmış olan verilerden şema algılama başlatılacak:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Şimdi SQL editörüne gitmeniz gerekiyor. Bu düğmeye tıkladığınızda, uygulamayı başlatmanızı isteyen bir pencere açılacaktır - neyi başlatmak istediğinizi seçin:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Aşağıdaki basit sorguyu SQL düzenleyici penceresine ekleyin ve SQL'i Kaydet ve Çalıştır'a tıklayın:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

İlişkisel veritabanlarında, kayıtları eklemek için INSERT deyimlerini ve verileri sorgulamak için SELECT deyimini kullanarak tablolarla çalışırsınız. Amazon Kinesis Data Analytics'te akışlarla (STREAM'ler) ve pompalarla (PUMP'lar) çalışırsınız; bunlar, bir uygulamadaki bir akıştan başka bir akışa veri ekleyen sürekli ekleme istekleridir.

Yukarıda sunulan SQL sorgusu, maliyeti beş bin rublenin altında olan Aeroflot biletlerini arar. Bu koşulları karşılayan tüm kayıtlar DESTINATION_SQL_STREAM akışına yerleştirilecektir.

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Hedef bloğunda, özel_akım akışını seçin ve Uygulama içi akış adı DESTINATION_SQL_STREAM açılır listesinde:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Tüm manipülasyonların sonucu aşağıdaki resme benzer olmalıdır:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

SNS konusu oluşturma ve bu konuya abone olma

Basit Bildirim Hizmetine gidin ve orada Havayolları adıyla yeni bir konu oluşturun:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Bu konuya abone olun ve SMS bildirimlerinin gönderileceği cep telefonu numarasını belirtin:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

DynamoDB'de bir tablo oluşturun

Airlines_tickets akışındaki ham verileri depolamak için DynamoDB'de aynı adda bir tablo oluşturalım. Record_id'yi birincil anahtar olarak kullanacağız:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Lambda işlev toplayıcısı oluşturma

Collector adında, görevi havayolu_tickets akışını yoklamak ve orada yeni kayıtlar bulunursa bu kayıtları DynamoDB tablosuna eklemek olacak bir lambda işlevi oluşturalım. Açıkçası, varsayılan haklara ek olarak bu lambdanın Kinesis veri akışına okuma erişimine ve DynamoDB'ye yazma erişimine sahip olması gerekir.

Toplayıcı lambda işlevi için IAM rolü oluşturma
Öncelikle lambda için Lambda-TicketsProcessingRole adında yeni bir IAM rolü oluşturalım:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Test örneği için, aşağıdaki resimde gösterildiği gibi, önceden yapılandırılmış AmazonKinesisReadOnlyAccess ve AmazonDynamoDBFullAccess politikaları oldukça uygundur:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Bu lambda, yeni girişler havayolu_akışına girdiğinde Kinesis'ten bir tetikleyici tarafından başlatılmalıdır, bu nedenle yeni bir tetikleyici eklememiz gerekir:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Geriye kalan tek şey kodu girip lambdayı kaydetmek.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Lambda işlevi bildirimcisi oluşturma

İkinci akışı (special_stream) izleyip SNS'ye bildirim gönderecek olan ikinci lambda işlevi de benzer şekilde oluşturulur. Bu nedenle, bu lambdanın Kinesis'ten okuma ve belirli bir SNS konusuna mesaj gönderme erişimi olması gerekir; bu mesaj daha sonra SNS hizmeti tarafından bu konunun tüm abonelerine (e-posta, SMS vb.) gönderilecektir.

IAM rolü oluşturma
Öncelikle bu lambda için Lambda-KinesisAlarm IAM rolünü oluşturuyoruz ve ardından bu rolü oluşturulan alarm_notifier lambda'ya atadık:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Bu lambda, yeni kayıtların özel_akıma girmesi için bir tetikleyici üzerinde çalışmalıdır, bu nedenle tetikleyiciyi, Collector lambda için yaptığımız gibi yapılandırmanız gerekir.

Bu lambdayı yapılandırmayı kolaylaştırmak için, Havayolları konusunun ANR'sini (Amazon Başvuru Adlarını) yerleştirdiğimiz yeni bir ortam değişkeni olan TOPIC_ARN'yi tanıtalım:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Ve lambda kodunu girin, hiç de karmaşık değil:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Manuel sistem konfigürasyonunun tamamlandığı yer burası gibi görünüyor. Geriye kalan tek şey test etmek ve her şeyi doğru yapılandırdığımızdan emin olmaktır.

Terraform kodundan dağıtma

Gerekli hazırlık

Terraform altyapıyı koddan dağıtmak için çok kullanışlı bir açık kaynaklı araçtır. Öğrenmesi kolay olan ve nasıl ve neyin dağıtılacağına dair birçok örnek içeren kendi sözdizimine sahiptir. Atom editörü veya Visual Studio Code, Terraform ile çalışmayı kolaylaştıran birçok kullanışlı eklentiye sahiptir.

Dağıtımı indirebilirsiniz bundan dolayı. Tüm Terraform yeteneklerinin ayrıntılı bir analizi bu makalenin kapsamı dışındadır, bu nedenle kendimizi ana noktalarla sınırlayacağız.

nasıl başlamalı

Projenin tam kodu şu şekildedir benim depomda. Depoyu kendimize klonlarız. Başlamadan önce AWS CLI'nin kurulu ve yapılandırılmış olduğundan emin olmanız gerekir, çünkü... Terraform ~/.aws/credentials dosyasındaki kimlik bilgilerini arayacaktır.

Terraform'un şu anda bulutta bizim için ne oluşturduğunu görmek için tüm altyapıyı dağıtmadan önce plan komutunu çalıştırmak iyi bir uygulamadır:

terraform.exe plan

Bildirimlerin gönderileceği telefon numarasını girmeniz istenecektir. Bu aşamada girmenize gerek yoktur.

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Programın operasyon planını analiz ettikten sonra kaynak oluşturmaya başlayabiliriz:

terraform.exe apply

Bu komutu gönderdikten sonra tekrar bir telefon numarası girmeniz istenecektir; eylemlerin gerçekten gerçekleştirilmesiyle ilgili bir soru gösterildiğinde "evet"i çevirin. Bu, tüm altyapıyı kurmanıza, EC2'nin gerekli tüm yapılandırmasını gerçekleştirmenize, lambda işlevlerini dağıtmanıza vb. olanak tanır.

Tüm kaynaklar Terraform kodu üzerinden başarılı bir şekilde oluşturulduktan sonra Kinesis Analytics uygulamasının detaylarına girmeniz gerekiyor (maalesef bunu doğrudan koddan nasıl yapacağımı bulamadım).

Uygulamayı başlat:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Bundan sonra, açılır listeden seçim yaparak uygulama içi akış adını açıkça ayarlamanız gerekir:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Artık her şey yola çıkmaya hazır.

Uygulamanın test edilmesi

Sistemi manuel olarak veya Terraform kodu aracılığıyla nasıl kurduğunuzdan bağımsız olarak aynı şekilde çalışacaktır.

Kinesis Agent'ın kurulu olduğu EC2 sanal makinesine SSH üzerinden giriş yapıp api_caller.py scriptini çalıştırıyoruz.

sudo ./api_caller.py TOKEN

Tek yapmanız gereken numaranıza SMS gelmesini beklemek:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
SMS - neredeyse 1 dakika içinde telefona bir mesaj gelir:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik
Geriye kayıtların daha sonraki, daha ayrıntılı analiz için DynamoDB veritabanına kaydedilip kaydedilmediğinin görülmesi kalıyor. Airlines_tickets tablosu yaklaşık olarak aşağıdaki verileri içerir:

Amazon Kinesis ile Aviasales API entegrasyonu ve sunucusuz basitlik

Sonuç

Yapılan çalışmalar kapsamında Amazon Kinesis tabanlı çevrimiçi veri işleme sistemi kuruldu. Kinesis Agent'ı Kinesis Veri Akışları ve SQL komutlarını kullanan gerçek zamanlı analiz Kinesis Analytics ile birlikte kullanma seçeneklerinin yanı sıra Amazon Kinesis'in diğer AWS hizmetleriyle etkileşimi de değerlendirildi.

Yukarıdaki sistemi iki şekilde devreye aldık: oldukça uzun bir manuel sistem ve Terraform kodundan hızlı bir sistem.

Tüm proje kaynak kodu mevcuttur GitHub depomda, buna alışmanızı öneririm.

Makaleyi tartışmaktan mutluluk duyuyorum, yorumlarınızı bekliyorum. Yapıcı eleştiriler bekliyorum.

Sana başarılar diliyorum!

Kaynak: habr.com

Yorum ekle