Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Hei Habr!

Apakah Anda suka menerbangkan pesawat? Saya menyukainya, tetapi selama isolasi diri saya juga jatuh cinta dengan menganalisis data tiket pesawat dari salah satu sumber terkenal - Aviasales.

Hari ini kita akan menganalisis kerja Amazon Kinesis, membangun sistem streaming dengan analitik real-time, menginstal database Amazon DynamoDB NoSQL sebagai penyimpanan data utama, dan mengatur notifikasi SMS untuk tiket menarik.

Semua detailnya sedang dipotong! Pergi!

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

pengenalan

Misalnya, kita memerlukan akses ke API Aviasales. Akses ke sana disediakan gratis dan tanpa batasan; Anda hanya perlu mendaftar di bagian “Pengembang” untuk menerima token API Anda untuk mengakses data.

Tujuan utama artikel ini adalah untuk memberikan pemahaman umum tentang penggunaan streaming informasi di AWS; kami memperhitungkan bahwa data yang dikembalikan oleh API yang digunakan tidak sepenuhnya mutakhir dan dikirimkan dari cache, yaitu dibentuk berdasarkan pencarian pengguna situs Aviasales.ru dan Jetradar.com selama 48 jam terakhir.

Kinesis-agent, yang diinstal pada mesin produksi, diterima melalui API akan secara otomatis mengurai dan mengirimkan data ke aliran yang diinginkan melalui Kinesis Data Analytics. Versi mentah aliran ini akan ditulis langsung ke toko. Penyimpanan data mentah yang diterapkan di DynamoDB akan memungkinkan analisis tiket lebih mendalam melalui alat BI, seperti AWS Quick Sight.

Kami akan mempertimbangkan dua opsi untuk menerapkan seluruh infrastruktur:

  • Manual - melalui AWS Management Console;
  • Infrastruktur dari kode Terraform ditujukan untuk automator yang malas;

Arsitektur sistem yang dikembangkan

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Komponen yang digunakan:

  • API Aviasales — data yang dikembalikan oleh API ini akan digunakan untuk semua pekerjaan selanjutnya;
  • Mesin Virtual Produser EC2 — mesin virtual biasa di cloud tempat aliran data masukan akan dihasilkan:
    • Agen Kinesis adalah aplikasi Java yang diinstal secara lokal di mesin yang menyediakan cara mudah untuk mengumpulkan dan mengirim data ke Kinesis (Kinesis Data Streams atau Kinesis Firehose). Agen terus-menerus memantau sekumpulan file di direktori yang ditentukan dan mengirimkan data baru ke Kinesis;
    • Skrip Pemanggil API — Skrip Python yang membuat permintaan ke API dan memasukkan respons ke dalam folder yang dipantau oleh Agen Kinesis;
  • Aliran Data Kinesis — layanan streaming data real-time dengan kemampuan skala luas;
  • Analisis Kinesis adalah layanan tanpa server yang menyederhanakan analisis data streaming secara real time. Amazon Kinesis Data Analytics mengonfigurasi sumber daya aplikasi dan secara otomatis melakukan penskalaan untuk menangani volume data masuk apa pun;
  • AWS Lambda — layanan yang memungkinkan Anda menjalankan kode tanpa membuat cadangan atau menyiapkan server. Semua daya komputasi secara otomatis diskalakan untuk setiap panggilan;
  • Amazon DynamoDB - Basis data pasangan nilai kunci dan dokumen yang memberikan latensi kurang dari 10 milidetik saat dijalankan pada skala apa pun. Saat menggunakan DynamoDB, Anda tidak perlu menyediakan, melakukan patch, atau mengelola server apa pun. DynamoDB secara otomatis menskalakan tabel untuk menyesuaikan jumlah sumber daya yang tersedia dan mempertahankan kinerja tinggi. Tidak diperlukan administrasi sistem;
  • Amazon SNS - layanan terkelola sepenuhnya untuk mengirim pesan menggunakan model penerbit-pelanggan (Pub/Sub), yang dengannya Anda dapat mengisolasi layanan mikro, sistem terdistribusi, dan aplikasi tanpa server. SNS dapat digunakan untuk mengirimkan informasi kepada pengguna akhir melalui notifikasi push seluler, pesan SMS, dan email.

Pelatihan awal

Untuk meniru aliran data, saya memutuskan untuk menggunakan informasi tiket pesawat yang dikembalikan oleh Aviasales API. DI DALAM dokumentasi daftar metode yang berbeda cukup lengkap, mari kita ambil salah satunya - “Kalender Harga Bulanan”, yang mengembalikan harga untuk setiap hari dalam sebulan, dikelompokkan berdasarkan jumlah transfer. Jika Anda tidak menentukan bulan pencarian dalam permintaan, informasi akan dikembalikan untuk bulan setelah bulan sekarang.

Jadi, ayo daftar dan dapatkan token kita.

Contoh permintaannya ada di bawah ini:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metode penerimaan data dari API di atas dengan menentukan token dalam permintaan akan berfungsi, tetapi saya lebih suka meneruskan token akses melalui header, jadi kami akan menggunakan metode ini dalam skrip api_caller.py.

Contoh jawaban:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Contoh respons API di atas menunjukkan tiket dari St. Petersburg ke Phuk... Oh, sungguh mimpi...
Karena saya dari Kazan, dan Phuket sekarang “hanya mimpi”, yuk cari tiket dari St. Petersburg ke Kazan.

Ini mengasumsikan bahwa Anda sudah memiliki akun AWS. Saya ingin segera menarik perhatian khusus pada fakta bahwa Kinesis dan pengiriman notifikasi melalui SMS tidak termasuk dalam acara tahunan Tingkat Gratis (penggunaan gratis). Namun meskipun demikian, dengan mempertimbangkan beberapa dolar, sangat mungkin untuk membangun sistem yang diusulkan dan memainkannya. Dan tentunya jangan lupa untuk menghapus semua sumber daya setelah tidak diperlukan lagi.

Untungnya, fungsi DynamoDb dan lambda akan gratis bagi kami jika kami memenuhi batas gratis bulanan. Misalnya, untuk DynamoDB: penyimpanan 25 GB, 25 WCU/RCU, dan 100 juta kueri. Dan satu juta panggilan fungsi lambda per bulan.

Penerapan sistem manual

Menyiapkan Kinesis Data Streams

Mari buka layanan Kinesis Data Streams dan buat dua aliran baru, masing-masing satu pecahan.

Apa itu pecahan?
Pecahan adalah unit transfer data dasar aliran Amazon Kinesis. Satu segmen menyediakan transfer data masukan dengan kecepatan 1 MB/s dan transfer data keluaran dengan kecepatan 2 MB/s. Satu segmen mendukung hingga 1000 entri PUT per detik. Saat membuat aliran data, Anda perlu menentukan jumlah segmen yang diperlukan. Misalnya, Anda bisa membuat aliran data dengan dua segmen. Aliran data ini akan menyediakan transfer data input pada 2 MB/s dan transfer data output pada 4 MB/s, mendukung hingga 2000 catatan PUT per detik.

Semakin banyak pecahan di aliran Anda, semakin besar throughputnya. Pada prinsipnya, ini adalah bagaimana aliran diskalakan - dengan menambahkan pecahan. Namun semakin banyak pecahan yang Anda miliki, semakin tinggi harganya. Setiap pecahan berharga 1,5 sen per jam dan tambahan 1.4 sen untuk setiap juta unit muatan PUT.

Mari kita buat aliran baru dengan nama tersebut tiket pesawat, 1 pecahan sudah cukup untuknya:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Sekarang mari kita buat thread lain dengan nama tersebut aliran_khusus:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Pengaturan produser

Untuk menganalisis suatu tugas, cukup menggunakan instans EC2 biasa sebagai penghasil data. Itu tidak harus berupa mesin virtual yang kuat dan mahal; tempat t2.micro sudah cukup.

Catatan penting: misalnya, Anda harus menggunakan image - Amazon Linux AMI 2018.03.0, pengaturannya lebih sedikit untuk meluncurkan Agen Kinesis dengan cepat.

Masuk ke layanan EC2, buat mesin virtual baru, pilih AMI yang diinginkan dengan tipe t2.micro, yang termasuk dalam Tingkat Gratis:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Agar mesin virtual yang baru dibuat dapat berinteraksi dengan layanan Kinesis, mesin virtual tersebut harus diberikan hak untuk melakukannya. Cara terbaik untuk melakukannya adalah dengan menetapkan Peran IAM. Oleh karena itu, pada layar Langkah 3: Konfigurasikan Detail Instans, Anda harus memilih Buat Peran IAM baru:

Membuat peran IAM untuk EC2
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Di jendela yang terbuka, pilih bahwa kami sedang membuat peran baru untuk EC2 dan buka bagian Izin:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Dengan menggunakan contoh pelatihan, kita tidak perlu membahas semua seluk-beluk konfigurasi granular hak sumber daya, jadi kita akan memilih kebijakan yang telah dikonfigurasi sebelumnya oleh Amazon: AmazonKinesisFullAccess dan CloudWatchFullAccess.

Mari kita beri nama yang bermakna untuk peran ini, misalnya: EC2-KinesisStreams-FullAccess. Hasilnya akan sama seperti yang ditunjukkan pada gambar di bawah ini:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Setelah membuat peran baru ini, jangan lupa untuk melampirkannya ke instance mesin virtual yang dibuat:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Kami tidak mengubah apa pun di layar ini dan beralih ke jendela berikutnya.

Pengaturan hard drive dapat dibiarkan sebagai default, begitu juga dengan tagnya (walaupun merupakan praktik yang baik untuk menggunakan tag, setidaknya beri nama instance dan tunjukkan lingkungannya).

Sekarang kita berada di tab Langkah 6: Konfigurasikan Grup Keamanan, di mana Anda perlu membuat yang baru atau menentukan grup Keamanan yang ada, yang memungkinkan Anda terhubung melalui ssh (port 22) ke instance. Pilih Sumber -> IP Saya di sana dan Anda dapat meluncurkan instance.

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Segera setelah beralih ke status berjalan, Anda dapat mencoba menyambungkannya melalui ssh.

Untuk dapat bekerja dengan Kinesis Agent, setelah berhasil terhubung ke mesin, Anda harus memasukkan perintah berikut di terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Mari buat folder untuk menyimpan respons API:

sudo mkdir /var/log/airline_tickets

Sebelum memulai agen, Anda perlu mengonfigurasi konfigurasinya:

sudo vim /etc/aws-kinesis/agent.json

Isi file agent.json akan terlihat seperti ini:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Seperti yang dapat dilihat dari file konfigurasi, agen akan memantau file dengan ekstensi .log di direktori /var/log/airline_tickets/, menguraikannya dan mentransfernya ke aliran airline_tickets.

Kami memulai ulang layanan dan memastikan layanan aktif dan berjalan:

sudo service aws-kinesis-agent restart

Sekarang mari kita unduh skrip Python yang akan meminta data dari API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrip api_caller.py meminta data dari Aviasales dan menyimpan respons yang diterima di direktori yang dipindai oleh agen Kinesis. Implementasi skrip ini cukup standar, ada kelas TicketsApi, yang memungkinkan Anda menarik API secara asinkron. Kami meneruskan header dengan token dan parameter permintaan ke kelas ini:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Untuk menguji pengaturan dan fungsionalitas agen yang benar, mari kita uji jalankan skrip api_caller.py:

sudo ./api_caller.py TOKEN

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Dan kami melihat hasil pekerjaan di log Agen dan pada tab Pemantauan di aliran data tiket_maskapai:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Seperti yang Anda lihat, semuanya berfungsi dan Agen Kinesis berhasil mengirimkan data ke aliran. Sekarang mari kita konfigurasikan konsumen.

Menyiapkan Kinesis Data Analytics

Mari beralih ke komponen utama keseluruhan sistem - buat aplikasi baru di Kinesis Data Analytics bernama kinesis_analytics_airlines_app:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Kinesis Data Analytics memungkinkan Anda melakukan analisis data real-time dari Kinesis Streams menggunakan bahasa SQL. Ini adalah layanan penskalaan otomatis sepenuhnya (tidak seperti Kinesis Streams) yang:

  1. memungkinkan Anda membuat aliran baru (Output Stream) berdasarkan permintaan ke sumber data;
  2. menyediakan aliran dengan kesalahan yang terjadi saat aplikasi sedang berjalan (Error Stream);
  3. dapat secara otomatis menentukan skema data masukan (dapat didefinisikan ulang secara manual jika perlu).

Ini bukan layanan murah - 0.11 USD per jam kerja, jadi Anda harus menggunakannya dengan hati-hati dan menghapusnya setelah selesai.

Mari sambungkan aplikasi ke sumber data:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Pilih aliran yang akan kita sambungkan (airline_tickets):

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Selanjutnya, Anda perlu melampirkan IAM Role baru agar aplikasi dapat membaca dari aliran dan menulis ke aliran. Untuk melakukan ini, cukup dengan tidak mengubah apa pun di blok Izin akses:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Sekarang mari kita meminta penemuan skema data di aliran; untuk melakukan ini, klik tombol “Temukan skema”. Akibatnya, peran IAM akan diperbarui (yang baru akan dibuat) dan deteksi skema akan diluncurkan dari data yang telah masuk ke aliran:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Sekarang Anda harus pergi ke editor SQL. Ketika Anda mengklik tombol ini, sebuah jendela akan muncul meminta Anda untuk meluncurkan aplikasi - pilih apa yang ingin Anda luncurkan:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Masukkan kueri sederhana berikut ke jendela editor SQL dan klik Simpan dan Jalankan SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Dalam database relasional, Anda bekerja dengan tabel menggunakan pernyataan INSERT untuk menambahkan catatan dan pernyataan SELECT untuk mengkueri data. Di Amazon Kinesis Data Analytics, Anda bekerja dengan aliran (STREAM) dan pompa (PUMP)—permintaan penyisipan berkelanjutan yang memasukkan data dari satu aliran dalam aplikasi ke aliran lain.

Kueri SQL yang disajikan di atas mencari tiket Aeroflot dengan biaya di bawah lima ribu rubel. Semua catatan yang memenuhi ketentuan ini akan ditempatkan di aliran DESTINATION_SQL_STREAM.

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Di blok Tujuan, pilih aliran_khusus, dan di daftar tarik-turun Nama aliran dalam aplikasi DESTINATION_SQL_STREAM:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Hasil dari semua manipulasi akan terlihat seperti gambar di bawah ini:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Membuat dan berlangganan topik SNS

Buka Layanan Pemberitahuan Sederhana dan buat topik baru di sana dengan nama Maskapai:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Berlangganan topik ini dan tunjukkan nomor ponsel tujuan pengiriman notifikasi SMS:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Buat tabel di DynamoDB

Untuk menyimpan data mentah dari aliran airline_tickets, mari buat tabel di DynamoDB dengan nama yang sama. Kami akan menggunakan record_id sebagai kunci utama:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Membuat kolektor fungsi lambda

Mari kita buat fungsi lambda yang disebut Collector, yang tugasnya adalah melakukan polling aliran airline_tickets dan, jika catatan baru ditemukan di sana, masukkan catatan ini ke dalam tabel DynamoDB. Tentunya, selain hak default, lambda ini harus memiliki akses baca ke aliran data Kinesis dan akses tulis ke DynamoDB.

Membuat peran IAM untuk fungsi lambda kolektor
Pertama, mari buat IAM role baru untuk lambda bernama Lambda-TicketsProcessingRole:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Untuk contoh pengujian, kebijakan AmazonKinesisReadOnlyAccess dan AmazonDynamoDBFullAccess yang telah dikonfigurasi sebelumnya cukup sesuai, seperti yang ditunjukkan pada gambar di bawah ini:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Lambda ini harus diluncurkan oleh pemicu dari Kinesis ketika entri baru memasuki aliran_maskapai, jadi kita perlu menambahkan pemicu baru:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Yang tersisa hanyalah memasukkan kode dan menyimpan lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Membuat pemberi notifikasi fungsi lambda

Fungsi lambda kedua, yang akan memantau aliran kedua (special_stream) dan mengirimkan pemberitahuan ke SNS, dibuat dengan cara yang sama. Oleh karena itu, lambda ini harus memiliki akses untuk membaca dari Kinesis dan mengirim pesan ke topik SNS tertentu, yang kemudian akan dikirim oleh layanan SNS ke semua pelanggan topik ini (email, SMS, dll.).

Membuat peran IAM
Pertama, kita membuat peran IAM Lambda-KinesisAlarm untuk lambda ini, lalu menetapkan peran ini ke lambda alarm_notifier yang sedang dibuat:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Lambda ini harus bekerja pada pemicu catatan baru untuk memasukkan aliran_khusus, jadi Anda perlu mengonfigurasi pemicu dengan cara yang sama seperti yang kami lakukan untuk lambda Kolektor.

Untuk mempermudah konfigurasi lambda ini, mari perkenalkan variabel lingkungan baru - TOPIC_ARN, tempat kita menempatkan ANR (Amazon Recourse Names) dari topik Maskapai Penerbangan:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Dan masukkan kode lambda, tidak ribet sama sekali:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Tampaknya di sinilah konfigurasi sistem manual selesai. Yang tersisa hanyalah menguji dan memastikan bahwa kami telah mengkonfigurasi semuanya dengan benar.

Terapkan dari kode Terraform

Persiapan yang diperlukan

Terraform adalah alat sumber terbuka yang sangat nyaman untuk menerapkan infrastruktur dari kode. Ia memiliki sintaksisnya sendiri yang mudah dipelajari dan memiliki banyak contoh tentang bagaimana dan apa yang harus diterapkan. Editor Atom atau Visual Studio Code memiliki banyak plugin praktis yang membuat bekerja dengan Terraform lebih mudah.

Anda dapat mengunduh distribusinya karenanya. Analisis mendetail tentang semua kemampuan Terraform berada di luar cakupan artikel ini, jadi kami akan membatasi diri pada poin utama.

Bagaimana memulainya

Kode lengkap proyek ini adalah di repositori saya. Kami mengkloning repositori untuk diri kami sendiri. Sebelum memulai, Anda perlu memastikan bahwa Anda telah menginstal dan mengonfigurasi AWS CLI, karena... Terraform akan mencari kredensial di file ~/.aws/credentials.

Praktik yang baik adalah menjalankan perintah plan sebelum menerapkan seluruh infrastruktur untuk melihat apa yang sedang dibuat Terraform untuk kita di cloud:

terraform.exe plan

Anda akan diminta memasukkan nomor telepon untuk mengirim pemberitahuan. Tidak perlu memasukkannya pada tahap ini.

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Setelah menganalisis rencana operasi program, kita dapat mulai membuat sumber daya:

terraform.exe apply

Setelah mengirimkan perintah ini, Anda akan diminta lagi untuk memasukkan nomor telepon; tekan “ya” ketika pertanyaan tentang benar-benar melakukan tindakan ditampilkan. Ini akan memungkinkan Anda untuk mengatur seluruh infrastruktur, melakukan semua konfigurasi EC2 yang diperlukan, menerapkan fungsi lambda, dll.

Setelah semua sumber daya berhasil dibuat melalui kode Terraform, Anda perlu masuk ke detail aplikasi Kinesis Analytics (sayangnya, saya tidak menemukan cara melakukannya langsung dari kode).

Luncurkan aplikasi:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Setelah ini, Anda harus secara eksplisit menetapkan nama aliran dalam aplikasi dengan memilih dari daftar drop-down:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Sekarang semuanya siap untuk berangkat.

Menguji aplikasi

Terlepas dari cara Anda menerapkan sistem, secara manual atau melalui kode Terraform, cara kerjanya akan sama.

Kami masuk melalui SSH ke mesin virtual EC2 tempat Kinesis Agent diinstal dan menjalankan skrip api_caller.py

sudo ./api_caller.py TOKEN

Anda tinggal menunggu SMS ke nomor Anda:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
SMS - pesan masuk ke ponsel Anda dalam waktu hampir 1 menit:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Masih harus dilihat apakah catatan telah disimpan dalam database DynamoDB untuk analisis selanjutnya yang lebih rinci. Tabelairline_tickets berisi kira-kira data berikut:

Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server

Kesimpulan

Selama pekerjaan yang dilakukan, sistem pemrosesan data online dibangun berdasarkan Amazon Kinesis. Opsi untuk menggunakan Agen Kinesis bersama dengan Kinesis Data Streams dan analitik real-time Kinesis Analytics menggunakan perintah SQL, serta interaksi Amazon Kinesis dengan layanan AWS lainnya telah dipertimbangkan.

Kami menerapkan sistem di atas dengan dua cara: cara manual yang agak panjang dan cara cepat dari kode Terraform.

Semua kode sumber proyek tersedia di repositori GitHub saya, saya sarankan Anda membiasakan diri dengannya.

Saya senang membahas artikel ini, saya menantikan komentar Anda. Saya mengharapkan kritik yang membangun.

Saya berharap Anda sukses!

Sumber: www.habr.com

Tambah komentar