Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Hai Habr!

Adakah anda suka kapal terbang terbang? Saya menyukainya, tetapi semasa pengasingan diri saya juga jatuh cinta dengan menganalisis data pada tiket penerbangan daripada satu sumber terkenal - Aviasales.

Hari ini kami akan menganalisis kerja Amazon Kinesis, membina sistem penstriman dengan analisis masa nyata, memasang pangkalan data Amazon DynamoDB NoSQL sebagai storan data utama dan menyediakan pemberitahuan SMS untuk tiket yang menarik.

Semua butiran adalah di bawah potongan! Pergi!

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Pengenalan

Sebagai contoh, kita memerlukan akses kepada API Aviasales. Akses kepadanya disediakan secara percuma dan tanpa sekatan; anda hanya perlu mendaftar di bahagian "Pembangun" untuk menerima token API anda untuk mengakses data.

Tujuan utama artikel ini adalah untuk memberikan pemahaman umum tentang penggunaan penstriman maklumat dalam AWS; kami mengambil kira bahawa data yang dikembalikan oleh API yang digunakan tidak dikemas kini sepenuhnya dan dihantar daripada cache, iaitu dibentuk berdasarkan carian oleh pengguna tapak Aviasales.ru dan Jetradar.com selama 48 jam terakhir.

Agen Kinesis, dipasang pada mesin penghasil, yang diterima melalui API akan menghuraikan dan menghantar data secara automatik ke aliran yang dikehendaki melalui Analitis Data Kinesis. Versi mentah strim ini akan ditulis terus ke kedai. Storan data mentah yang digunakan dalam DynamoDB akan membolehkan analisis tiket yang lebih mendalam melalui alatan BI, seperti AWS Quick Sight.

Kami akan mempertimbangkan dua pilihan untuk menggunakan keseluruhan infrastruktur:

  • Manual - melalui AWS Management Console;
  • Infrastruktur daripada kod Terraform adalah untuk automator yang malas;

Seni bina sistem yang dibangunkan

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Komponen yang digunakan:

  • API Aviasales — data yang dikembalikan oleh API ini akan digunakan untuk semua kerja seterusnya;
  • Contoh Pengeluar EC2 — mesin maya biasa dalam awan di mana aliran data input akan dijana:
    • Ejen Kinesis ialah aplikasi Java yang dipasang secara tempatan pada mesin yang menyediakan cara mudah untuk mengumpul dan menghantar data ke Kinesis (Kinesis Data Streams atau Kinesis Firehose). Ejen sentiasa memantau satu set fail dalam direktori yang ditentukan dan menghantar data baharu kepada Kinesis;
    • Skrip Pemanggil API — Skrip Python yang membuat permintaan kepada API dan meletakkan respons ke dalam folder yang dipantau oleh Agen Kinesis;
  • Aliran Data Kinesis — perkhidmatan penstriman data masa nyata dengan keupayaan penskalaan yang luas;
  • Analisis Kinesis ialah perkhidmatan tanpa pelayan yang memudahkan analisis penstriman data dalam masa nyata. Amazon Kinesis Data Analytics mengkonfigurasi sumber aplikasi dan menskala secara automatik untuk mengendalikan sebarang volum data masuk;
  • AWS Lambda — perkhidmatan yang membolehkan anda menjalankan kod tanpa membuat sandaran atau menyediakan pelayan. Semua kuasa pengkomputeran diskalakan secara automatik untuk setiap panggilan;
  • Amazon DynamoDB - Pangkalan data pasangan nilai kunci dan dokumen yang menyediakan kependaman kurang daripada 10 milisaat apabila dijalankan pada sebarang skala. Apabila menggunakan DynamoDB, tidak perlu menyediakan, menampal atau mengurus mana-mana pelayan. DynamoDB secara automatik menskalakan jadual untuk melaraskan jumlah sumber yang tersedia dan mengekalkan prestasi tinggi. Tiada pentadbiran sistem diperlukan;
  • Amazon SNS - perkhidmatan terurus sepenuhnya untuk menghantar mesej menggunakan model pelanggan-penerbit (Pub/Sub), yang dengannya anda boleh mengasingkan perkhidmatan mikro, sistem teragih dan aplikasi tanpa pelayan. SNS boleh digunakan untuk menghantar maklumat kepada pengguna akhir melalui pemberitahuan tolak mudah alih, mesej SMS dan e-mel.

Latihan awal

Untuk meniru aliran data, saya memutuskan untuk menggunakan maklumat tiket penerbangan yang dikembalikan oleh Aviasales API. DALAM dokumentasi senarai kaedah berbeza yang agak luas, mari kita ambil salah satu daripadanya - "Kalendar Harga Bulanan", yang mengembalikan harga untuk setiap hari dalam bulan itu, dikumpulkan mengikut bilangan pemindahan. Jika anda tidak menyatakan bulan carian dalam permintaan, maklumat akan dikembalikan untuk bulan selepas bulan semasa.

Jadi, mari daftar dan dapatkan token kami.

Contoh permintaan adalah di bawah:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Kaedah di atas untuk menerima data daripada API dengan menyatakan token dalam permintaan akan berfungsi, tetapi saya lebih suka menghantar token akses melalui pengepala, jadi kami akan menggunakan kaedah ini dalam skrip api_caller.py.

Contoh jawapan:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Contoh respons API di atas menunjukkan tiket dari St. Petersburg ke Phuk... Oh, sungguh mimpi...
Memandangkan saya dari Kazan, dan Phuket kini "hanya mimpi", mari cari tiket dari St. Petersburg ke Kazan.

Ia menganggap bahawa anda sudah mempunyai akaun AWS. Saya ingin segera menarik perhatian khusus kepada fakta bahawa Kinesis dan menghantar pemberitahuan melalui SMS tidak termasuk dalam tahunan Peringkat Percuma (penggunaan percuma). Tetapi walaupun ini, dengan beberapa dolar dalam fikiran, adalah agak mungkin untuk membina sistem yang dicadangkan dan bermain dengannya. Dan, sudah tentu, jangan lupa untuk memadam semua sumber selepas ia tidak diperlukan lagi.

Nasib baik, fungsi DynamoDb dan lambda akan percuma untuk kami jika anda memenuhi had percuma bulanan. Contohnya, untuk DynamoDB: 25 GB storan, 25 WCU/RCU dan 100 juta pertanyaan. Dan satu juta panggilan fungsi lambda setiap bulan.

Penggunaan sistem manual

Menyediakan Aliran Data Kinesis

Mari pergi ke perkhidmatan Strim Data Kinesis dan buat dua strim baharu, satu serpihan untuk setiap satu.

Apa itu serpihan?
Shard ialah unit pemindahan data asas bagi aliran Amazon Kinesis. Satu segmen menyediakan pemindahan data input pada kelajuan 1 MB/s dan pemindahan data output pada kelajuan 2 MB/s. Satu segmen menyokong sehingga 1000 entri PUT sesaat. Apabila membuat aliran data, anda perlu menentukan bilangan segmen yang diperlukan. Sebagai contoh, anda boleh membuat aliran data dengan dua segmen. Strim data ini akan menyediakan pemindahan data input pada 2 MB/s dan pemindahan data output pada 4 MB/s, menyokong sehingga 2000 rekod PUT sesaat.

Lebih banyak serpihan dalam strim anda, lebih besar daya pemprosesannya. Pada dasarnya, ini adalah cara aliran diskalakan - dengan menambah serpihan. Tetapi semakin banyak serpihan yang anda miliki, semakin tinggi harganya. Setiap serpihan berharga 1,5 sen sejam dan tambahan 1.4 sen untuk setiap juta unit muatan PUT.

Mari buat strim baharu dengan nama itu tiket kapal terbang, 1 serpihan akan cukup untuknya:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Sekarang mari buat benang lain dengan nama aliran_istimewa:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Persediaan pengeluar

Untuk menganalisis tugasan, cukup menggunakan contoh EC2 biasa sebagai pengeluar data. Ia tidak semestinya mesin maya yang berkuasa dan mahal; spot t2.micro akan berfungsi dengan baik.

Nota penting: sebagai contoh, anda harus menggunakan imej - Amazon Linux AMI 2018.03.0, ia mempunyai lebih sedikit tetapan untuk melancarkan Agen Kinesis dengan cepat.

Pergi ke perkhidmatan EC2, cipta mesin maya baharu, pilih AMI yang dikehendaki dengan jenis t2.micro, yang disertakan dalam Peringkat Percuma:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Agar mesin maya yang baru dicipta dapat berinteraksi dengan perkhidmatan Kinesis, ia mesti diberi hak untuk berbuat demikian. Cara terbaik untuk melakukan ini ialah dengan menetapkan Peranan IAM. Oleh itu, pada skrin Langkah 3: Konfigurasi Butiran Contoh, anda harus memilih Cipta Peranan IAM baharu:

Mencipta peranan IAM untuk EC2
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Dalam tetingkap yang terbuka, pilih bahawa kami sedang mencipta peranan baharu untuk EC2 dan pergi ke bahagian Kebenaran:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Menggunakan contoh latihan, kami tidak perlu pergi ke semua selok-belok konfigurasi butiran hak sumber, jadi kami akan memilih dasar yang diprakonfigurasikan oleh Amazon: AmazonKinesisFullAccess dan CloudWatchFullAccess.

Mari berikan beberapa nama yang bermakna untuk peranan ini, contohnya: EC2-KinesisStreams-FullAccess. Hasilnya hendaklah sama seperti yang ditunjukkan dalam gambar di bawah:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Selepas mencipta peranan baharu ini, jangan lupa untuk melampirkannya pada contoh mesin maya yang dibuat:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Kami tidak mengubah apa-apa lagi pada skrin ini dan beralih ke tetingkap seterusnya.

Tetapan cakera keras boleh dibiarkan sebagai lalai, serta teg (walaupun adalah amalan yang baik untuk menggunakan teg, sekurang-kurangnya beri nama contoh dan tunjukkan persekitaran).

Sekarang kita berada di Langkah 6: tab Konfigurasi Kumpulan Keselamatan, di mana anda perlu membuat yang baharu atau menentukan kumpulan Keselamatan sedia ada anda, yang membolehkan anda menyambung melalui ssh (port 22) ke contoh. Pilih Sumber -> IP saya di sana dan anda boleh melancarkan contoh itu.

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Sebaik sahaja ia bertukar kepada status berjalan, anda boleh cuba menyambungkannya melalui ssh.

Untuk dapat bekerja dengan Agen Kinesis, selepas berjaya menyambung ke mesin, anda mesti memasukkan arahan berikut dalam terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Mari buat folder untuk menyimpan respons API:

sudo mkdir /var/log/airline_tickets

Sebelum memulakan ejen, anda perlu mengkonfigurasi konfigurasinya:

sudo vim /etc/aws-kinesis/agent.json

Kandungan fail agent.json sepatutnya kelihatan seperti ini:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Seperti yang dapat dilihat daripada fail konfigurasi, ejen akan memantau fail dengan sambungan .log dalam direktori /var/log/airline_tickets/, menghuraikannya dan memindahkannya ke strim airline_tickets.

Kami memulakan semula perkhidmatan dan memastikan ia berfungsi dan berjalan:

sudo service aws-kinesis-agent restart

Sekarang mari muat turun skrip Python yang akan meminta data daripada API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrip api_caller.py meminta data daripada Aviasales dan menyimpan respons yang diterima dalam direktori yang diimbas oleh ejen Kinesis. Pelaksanaan skrip ini agak standard, terdapat kelas TicketsApi, ia membolehkan anda menarik API secara tidak segerak. Kami menghantar pengepala dengan token dan meminta parameter ke kelas ini:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Untuk menguji tetapan dan kefungsian ejen yang betul, mari uji jalankan skrip api_caller.py:

sudo ./api_caller.py TOKEN

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Dan kami melihat hasil kerja dalam log Ejen dan pada tab Pemantauan dalam aliran data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Seperti yang anda lihat, semuanya berfungsi dan Ejen Kinesis berjaya menghantar data ke strim. Sekarang mari kita konfigurasikan pengguna.

Menyediakan Analitis Data Kinesis

Mari kita beralih kepada komponen pusat keseluruhan sistem - buat aplikasi baharu dalam Kinesis Data Analytics bernama kinesis_analytics_airlines_app:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Kinesis Data Analytics membolehkan anda melakukan analisis data masa nyata daripada Kinesis Streams menggunakan bahasa SQL. Ia adalah perkhidmatan autoscaling sepenuhnya (tidak seperti Kinesis Streams) yang:

  1. membolehkan anda membuat strim baharu (Strim Keluaran) berdasarkan permintaan kepada sumber data;
  2. menyediakan strim dengan ralat yang berlaku semasa aplikasi sedang berjalan (Strim Ralat);
  3. boleh secara automatik menentukan skema data input (ia boleh ditakrifkan semula secara manual jika perlu).

Ini bukan perkhidmatan yang murah - 0.11 USD sejam kerja, jadi anda harus menggunakannya dengan berhati-hati dan memadamkannya apabila anda selesai.

Mari sambungkan aplikasi ke sumber data:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Pilih strim yang akan kami sambungkan (airline_tickets):

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Seterusnya, anda perlu melampirkan Peranan IAM baharu supaya aplikasi boleh membaca daripada strim dan menulis ke strim. Untuk melakukan ini, cukup untuk tidak mengubah apa-apa dalam blok kebenaran Akses:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Sekarang mari kita minta penemuan skema data dalam strim; untuk melakukan ini, klik pada butang "Temui skema". Akibatnya, peranan IAM akan dikemas kini (yang baharu akan dibuat) dan pengesanan skema akan dilancarkan daripada data yang telah pun tiba dalam strim:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Sekarang anda perlu pergi ke editor SQL. Apabila anda mengklik pada butang ini, tetingkap akan muncul meminta anda melancarkan aplikasi - pilih perkara yang anda mahu lancarkan:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Masukkan pertanyaan mudah berikut ke dalam tetingkap editor SQL dan klik Simpan dan Jalankan SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Dalam pangkalan data hubungan, anda bekerja dengan jadual menggunakan pernyataan INSERT untuk menambah rekod dan pernyataan SELECT untuk pertanyaan data. Dalam Analitis Data Amazon Kinesis, anda bekerja dengan strim (STREAM) dan pam (PUMP)—permintaan sisipan berterusan yang memasukkan data daripada satu aliran dalam aplikasi ke strim lain.

Pertanyaan SQL yang dibentangkan di atas mencari tiket Aeroflot pada kos di bawah lima ribu rubel. Semua rekod yang memenuhi syarat ini akan diletakkan dalam strim DESTINATION_SQL_STREAM.

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Dalam blok Destinasi, pilih strim special_stream dan dalam senarai juntai bawah nama Strim Dalam aplikasi DESTINATION_SQL_STREAM:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Hasil daripada semua manipulasi sepatutnya serupa dengan gambar di bawah:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Mencipta dan melanggan topik SNS

Pergi ke Perkhidmatan Pemberitahuan Mudah dan buat topik baharu di sana dengan nama Syarikat Penerbangan:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Langgan topik ini dan nyatakan nombor telefon mudah alih yang akan dihantar pemberitahuan SMS:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Buat jadual dalam DynamoDB

Untuk menyimpan data mentah daripada strim airline_tickets mereka, mari buat jadual dalam DynamoDB dengan nama yang sama. Kami akan menggunakan record_id sebagai kunci utama:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Mencipta pengumpul fungsi lambda

Mari kita cipta fungsi lambda yang dipanggil Collector, yang tugasnya adalah untuk meninjau strim airline_tickets dan, jika rekod baharu ditemui di sana, masukkan rekod ini ke dalam jadual DynamoDB. Jelas sekali, sebagai tambahan kepada hak lalai, lambda ini mesti mempunyai akses baca kepada aliran data Kinesis dan akses tulis kepada DynamoDB.

Mencipta peranan IAM untuk fungsi lambda pengumpul
Mula-mula, mari kita cipta peranan IAM baharu untuk lambda bernama Lambda-TicketsProcessingRole:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Untuk contoh ujian, dasar AmazonKinesisReadOnlyAccess dan AmazonDynamoDBFullAccess yang diprakonfigurasikan adalah agak sesuai, seperti yang ditunjukkan dalam gambar di bawah:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Lambda ini harus dilancarkan oleh pencetus daripada Kinesis apabila entri baharu memasuki airline_stream, jadi kami perlu menambah pencetus baharu:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Apa yang tinggal ialah memasukkan kod dan menyimpan lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Mencipta pemberitahuan fungsi lambda

Fungsi lambda kedua, yang akan memantau aliran kedua (special_stream) dan menghantar pemberitahuan kepada SNS, dibuat dengan cara yang sama. Oleh itu, lambda ini mesti mempunyai akses untuk membaca daripada Kinesis dan menghantar mesej ke topik SNS tertentu, yang kemudiannya akan dihantar oleh perkhidmatan SNS kepada semua pelanggan topik ini (e-mel, SMS, dll.).

Mencipta peranan IAM
Mula-mula, kami mencipta peranan IAM Lambda-KinesisAlarm untuk lambda ini, dan kemudian menetapkan peranan ini kepada alarm_notifier lambda yang sedang dibuat:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Lambda ini harus berfungsi pada pencetus untuk rekod baharu untuk memasuki aliran_khusus, jadi anda perlu mengkonfigurasi pencetus dengan cara yang sama seperti yang kami lakukan untuk lambda Pengumpul.

Untuk memudahkan mengkonfigurasi lambda ini, mari perkenalkan pembolehubah persekitaran baharu - TOPIC_ARN, di mana kita meletakkan ANR (Nama Recourse Amazon) bagi topik Syarikat Penerbangan:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Dan masukkan kod lambda, ia tidak rumit sama sekali:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Nampaknya di sinilah konfigurasi sistem manual selesai. Yang tinggal hanyalah untuk menguji dan memastikan bahawa kami telah mengkonfigurasi semuanya dengan betul.

Sebarkan daripada kod Terraform

Persediaan yang diperlukan

Terraform ialah alat sumber terbuka yang sangat mudah untuk menggunakan infrastruktur daripada kod. Ia mempunyai sintaks sendiri yang mudah dipelajari dan mempunyai banyak contoh cara dan perkara yang hendak digunakan. Editor Atom atau Kod Visual Studio mempunyai banyak pemalam berguna yang memudahkan kerja dengan Terraform.

Anda boleh memuat turun pengedaran oleh itu. Analisis terperinci tentang semua keupayaan Terraform berada di luar skop artikel ini, jadi kami akan mengehadkan diri kami kepada perkara utama.

Bagaimana untuk memulakan

Kod penuh projek ialah dalam repositori saya. Kami mengklon repositori kepada diri kita sendiri. Sebelum memulakan, anda perlu memastikan bahawa anda telah memasang dan mengkonfigurasi AWS CLI, kerana... Terraform akan mencari bukti kelayakan dalam fail ~/.aws/credentials.

Amalan yang baik ialah menjalankan arahan pelan sebelum menggunakan keseluruhan infrastruktur untuk melihat perkara yang sedang dicipta oleh Terraform untuk kami di awan:

terraform.exe plan

Anda akan diminta memasukkan nombor telefon untuk menghantar pemberitahuan. Ia tidak perlu memasukinya pada peringkat ini.

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Setelah menganalisis rancangan operasi program, kami boleh mula mencipta sumber:

terraform.exe apply

Selepas menghantar arahan ini, anda sekali lagi akan diminta untuk memasukkan nombor telefon; dail "ya" apabila soalan tentang sebenarnya melakukan tindakan ditunjukkan. Ini akan membolehkan anda menyediakan keseluruhan infrastruktur, menjalankan semua konfigurasi EC2 yang diperlukan, menggunakan fungsi lambda, dsb.

Selepas semua sumber berjaya dibuat melalui kod Terraform, anda perlu pergi ke butiran aplikasi Kinesis Analytics (malangnya, saya tidak menemui cara untuk melakukan ini terus dari kod).

Lancarkan aplikasi:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Selepas ini, anda mesti menetapkan nama strim dalam aplikasi secara eksplisit dengan memilih daripada senarai juntai bawah:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Sekarang semuanya sudah bersedia untuk pergi.

Menguji aplikasi

Tidak kira bagaimana anda menggunakan sistem, secara manual atau melalui kod Terraform, ia akan berfungsi sama.

Kami log masuk melalui SSH ke mesin maya EC2 di mana Ejen Kinesis dipasang dan menjalankan skrip api_caller.py

sudo ./api_caller.py TOKEN

Anda hanya perlu menunggu SMS ke nombor anda:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
SMS - mesej tiba di telefon dalam masa hampir 1 minit:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan
Ia kekal untuk melihat sama ada rekod telah disimpan dalam pangkalan data DynamoDB untuk analisis yang lebih terperinci yang berikutnya. Jadual airline_tickets mengandungi lebih kurang data berikut:

Penyepaduan API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa pelayan

Kesimpulan

Dalam proses kerja yang dilakukan, sistem pemprosesan data dalam talian telah dibina berdasarkan Amazon Kinesis. Pilihan untuk menggunakan Ejen Kinesis bersama-sama dengan Aliran Data Kinesis dan analisis masa nyata Kinesis Analytics menggunakan arahan SQL, serta interaksi Amazon Kinesis dengan perkhidmatan AWS lain telah dipertimbangkan.

Kami menggunakan sistem di atas dalam dua cara: yang manual yang agak panjang dan yang cepat daripada kod Terraform.

Semua kod sumber projek tersedia dalam repositori GitHub saya, saya cadangkan anda membiasakan diri dengannya.

Saya gembira untuk membincangkan artikel itu, saya menantikan komen anda. Saya mengharapkan kritikan yang membina.

Saya berharap kejayaan anda!

Sumber: www.habr.com

Tambah komen