ProHoster > blog > administrasi > Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Integrasi API Aviasales dengan Amazon Kinesis dan kesederhanaan tanpa server
Hei Habr!
Apakah Anda suka menerbangkan pesawat? Saya menyukainya, tetapi selama isolasi diri saya juga jatuh cinta dengan menganalisis data tiket pesawat dari salah satu sumber terkenal - Aviasales.
Hari ini kita akan menganalisis kerja Amazon Kinesis, membangun sistem streaming dengan analitik real-time, menginstal database Amazon DynamoDB NoSQL sebagai penyimpanan data utama, dan mengatur notifikasi SMS untuk tiket menarik.
Semua detailnya sedang dipotong! Pergi!
pengenalan
Misalnya, kita memerlukan akses ke API Aviasales. Akses ke sana disediakan gratis dan tanpa batasan; Anda hanya perlu mendaftar di bagian “Pengembang” untuk menerima token API Anda untuk mengakses data.
Tujuan utama artikel ini adalah untuk memberikan pemahaman umum tentang penggunaan streaming informasi di AWS; kami memperhitungkan bahwa data yang dikembalikan oleh API yang digunakan tidak sepenuhnya mutakhir dan dikirimkan dari cache, yaitu dibentuk berdasarkan pencarian pengguna situs Aviasales.ru dan Jetradar.com selama 48 jam terakhir.
Kinesis-agent, yang diinstal pada mesin produksi, diterima melalui API akan secara otomatis mengurai dan mengirimkan data ke aliran yang diinginkan melalui Kinesis Data Analytics. Versi mentah aliran ini akan ditulis langsung ke toko. Penyimpanan data mentah yang diterapkan di DynamoDB akan memungkinkan analisis tiket lebih mendalam melalui alat BI, seperti AWS Quick Sight.
Kami akan mempertimbangkan dua opsi untuk menerapkan seluruh infrastruktur:
Manual - melalui AWS Management Console;
Infrastruktur dari kode Terraform ditujukan untuk automator yang malas;
Arsitektur sistem yang dikembangkan
Komponen yang digunakan:
API Aviasales — data yang dikembalikan oleh API ini akan digunakan untuk semua pekerjaan selanjutnya;
Agen Kinesis adalah aplikasi Java yang diinstal secara lokal di mesin yang menyediakan cara mudah untuk mengumpulkan dan mengirim data ke Kinesis (Kinesis Data Streams atau Kinesis Firehose). Agen terus-menerus memantau sekumpulan file di direktori yang ditentukan dan mengirimkan data baru ke Kinesis;
Skrip Pemanggil API — Skrip Python yang membuat permintaan ke API dan memasukkan respons ke dalam folder yang dipantau oleh Agen Kinesis;
Aliran Data Kinesis — layanan streaming data real-time dengan kemampuan skala luas;
Analisis Kinesis adalah layanan tanpa server yang menyederhanakan analisis data streaming secara real time. Amazon Kinesis Data Analytics mengonfigurasi sumber daya aplikasi dan secara otomatis melakukan penskalaan untuk menangani volume data masuk apa pun;
AWS Lambda — layanan yang memungkinkan Anda menjalankan kode tanpa membuat cadangan atau menyiapkan server. Semua daya komputasi secara otomatis diskalakan untuk setiap panggilan;
Amazon DynamoDB - Basis data pasangan nilai kunci dan dokumen yang memberikan latensi kurang dari 10 milidetik saat dijalankan pada skala apa pun. Saat menggunakan DynamoDB, Anda tidak perlu menyediakan, melakukan patch, atau mengelola server apa pun. DynamoDB secara otomatis menskalakan tabel untuk menyesuaikan jumlah sumber daya yang tersedia dan mempertahankan kinerja tinggi. Tidak diperlukan administrasi sistem;
Amazon SNS - layanan terkelola sepenuhnya untuk mengirim pesan menggunakan model penerbit-pelanggan (Pub/Sub), yang dengannya Anda dapat mengisolasi layanan mikro, sistem terdistribusi, dan aplikasi tanpa server. SNS dapat digunakan untuk mengirimkan informasi kepada pengguna akhir melalui notifikasi push seluler, pesan SMS, dan email.
Pelatihan awal
Untuk meniru aliran data, saya memutuskan untuk menggunakan informasi tiket pesawat yang dikembalikan oleh Aviasales API. DI DALAM dokumentasi daftar metode yang berbeda cukup lengkap, mari kita ambil salah satunya - “Kalender Harga Bulanan”, yang mengembalikan harga untuk setiap hari dalam sebulan, dikelompokkan berdasarkan jumlah transfer. Jika Anda tidak menentukan bulan pencarian dalam permintaan, informasi akan dikembalikan untuk bulan setelah bulan sekarang.
Metode penerimaan data dari API di atas dengan menentukan token dalam permintaan akan berfungsi, tetapi saya lebih suka meneruskan token akses melalui header, jadi kami akan menggunakan metode ini dalam skrip api_caller.py.
Contoh respons API di atas menunjukkan tiket dari St. Petersburg ke Phuk... Oh, sungguh mimpi...
Karena saya dari Kazan, dan Phuket sekarang “hanya mimpi”, yuk cari tiket dari St. Petersburg ke Kazan.
Ini mengasumsikan bahwa Anda sudah memiliki akun AWS. Saya ingin segera menarik perhatian khusus pada fakta bahwa Kinesis dan pengiriman notifikasi melalui SMS tidak termasuk dalam acara tahunan Tingkat Gratis (penggunaan gratis). Namun meskipun demikian, dengan mempertimbangkan beberapa dolar, sangat mungkin untuk membangun sistem yang diusulkan dan memainkannya. Dan tentunya jangan lupa untuk menghapus semua sumber daya setelah tidak diperlukan lagi.
Untungnya, fungsi DynamoDb dan lambda akan gratis bagi kami jika kami memenuhi batas gratis bulanan. Misalnya, untuk DynamoDB: penyimpanan 25 GB, 25 WCU/RCU, dan 100 juta kueri. Dan satu juta panggilan fungsi lambda per bulan.
Penerapan sistem manual
Menyiapkan Kinesis Data Streams
Mari buka layanan Kinesis Data Streams dan buat dua aliran baru, masing-masing satu pecahan.
Apa itu pecahan?
Pecahan adalah unit transfer data dasar aliran Amazon Kinesis. Satu segmen menyediakan transfer data masukan dengan kecepatan 1 MB/s dan transfer data keluaran dengan kecepatan 2 MB/s. Satu segmen mendukung hingga 1000 entri PUT per detik. Saat membuat aliran data, Anda perlu menentukan jumlah segmen yang diperlukan. Misalnya, Anda bisa membuat aliran data dengan dua segmen. Aliran data ini akan menyediakan transfer data input pada 2 MB/s dan transfer data output pada 4 MB/s, mendukung hingga 2000 catatan PUT per detik.
Semakin banyak pecahan di aliran Anda, semakin besar throughputnya. Pada prinsipnya, ini adalah bagaimana aliran diskalakan - dengan menambahkan pecahan. Namun semakin banyak pecahan yang Anda miliki, semakin tinggi harganya. Setiap pecahan berharga 1,5 sen per jam dan tambahan 1.4 sen untuk setiap juta unit muatan PUT.
Mari kita buat aliran baru dengan nama tersebut tiket pesawat, 1 pecahan sudah cukup untuknya:
Sekarang mari kita buat thread lain dengan nama tersebut aliran_khusus:
Pengaturan produser
Untuk menganalisis suatu tugas, cukup menggunakan instans EC2 biasa sebagai penghasil data. Itu tidak harus berupa mesin virtual yang kuat dan mahal; tempat t2.micro sudah cukup.
Catatan penting: misalnya, Anda harus menggunakan image - Amazon Linux AMI 2018.03.0, pengaturannya lebih sedikit untuk meluncurkan Agen Kinesis dengan cepat.
Masuk ke layanan EC2, buat mesin virtual baru, pilih AMI yang diinginkan dengan tipe t2.micro, yang termasuk dalam Tingkat Gratis:
Agar mesin virtual yang baru dibuat dapat berinteraksi dengan layanan Kinesis, mesin virtual tersebut harus diberikan hak untuk melakukannya. Cara terbaik untuk melakukannya adalah dengan menetapkan Peran IAM. Oleh karena itu, pada layar Langkah 3: Konfigurasikan Detail Instans, Anda harus memilih Buat Peran IAM baru:
Membuat peran IAM untuk EC2
Di jendela yang terbuka, pilih bahwa kami sedang membuat peran baru untuk EC2 dan buka bagian Izin:
Dengan menggunakan contoh pelatihan, kita tidak perlu membahas semua seluk-beluk konfigurasi granular hak sumber daya, jadi kita akan memilih kebijakan yang telah dikonfigurasi sebelumnya oleh Amazon: AmazonKinesisFullAccess dan CloudWatchFullAccess.
Mari kita beri nama yang bermakna untuk peran ini, misalnya: EC2-KinesisStreams-FullAccess. Hasilnya akan sama seperti yang ditunjukkan pada gambar di bawah ini:
Setelah membuat peran baru ini, jangan lupa untuk melampirkannya ke instance mesin virtual yang dibuat:
Kami tidak mengubah apa pun di layar ini dan beralih ke jendela berikutnya.
Pengaturan hard drive dapat dibiarkan sebagai default, begitu juga dengan tagnya (walaupun merupakan praktik yang baik untuk menggunakan tag, setidaknya beri nama instance dan tunjukkan lingkungannya).
Sekarang kita berada di tab Langkah 6: Konfigurasikan Grup Keamanan, di mana Anda perlu membuat yang baru atau menentukan grup Keamanan yang ada, yang memungkinkan Anda terhubung melalui ssh (port 22) ke instance. Pilih Sumber -> IP Saya di sana dan Anda dapat meluncurkan instance.
Segera setelah beralih ke status berjalan, Anda dapat mencoba menyambungkannya melalui ssh.
Untuk dapat bekerja dengan Kinesis Agent, setelah berhasil terhubung ke mesin, Anda harus memasukkan perintah berikut di terminal:
Seperti yang dapat dilihat dari file konfigurasi, agen akan memantau file dengan ekstensi .log di direktori /var/log/airline_tickets/, menguraikannya dan mentransfernya ke aliran airline_tickets.
Kami memulai ulang layanan dan memastikan layanan aktif dan berjalan:
sudo service aws-kinesis-agent restart
Sekarang mari kita unduh skrip Python yang akan meminta data dari API:
Skrip api_caller.py meminta data dari Aviasales dan menyimpan respons yang diterima di direktori yang dipindai oleh agen Kinesis. Implementasi skrip ini cukup standar, ada kelas TicketsApi, yang memungkinkan Anda menarik API secara asinkron. Kami meneruskan header dengan token dan parameter permintaan ke kelas ini:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Untuk menguji pengaturan dan fungsionalitas agen yang benar, mari kita uji jalankan skrip api_caller.py:
sudo ./api_caller.py TOKEN
Dan kami melihat hasil pekerjaan di log Agen dan pada tab Pemantauan di aliran data tiket_maskapai:
Seperti yang Anda lihat, semuanya berfungsi dan Agen Kinesis berhasil mengirimkan data ke aliran. Sekarang mari kita konfigurasikan konsumen.
Menyiapkan Kinesis Data Analytics
Mari beralih ke komponen utama keseluruhan sistem - buat aplikasi baru di Kinesis Data Analytics bernama kinesis_analytics_airlines_app:
Kinesis Data Analytics memungkinkan Anda melakukan analisis data real-time dari Kinesis Streams menggunakan bahasa SQL. Ini adalah layanan penskalaan otomatis sepenuhnya (tidak seperti Kinesis Streams) yang:
memungkinkan Anda membuat aliran baru (Output Stream) berdasarkan permintaan ke sumber data;
menyediakan aliran dengan kesalahan yang terjadi saat aplikasi sedang berjalan (Error Stream);
dapat secara otomatis menentukan skema data masukan (dapat didefinisikan ulang secara manual jika perlu).
Ini bukan layanan murah - 0.11 USD per jam kerja, jadi Anda harus menggunakannya dengan hati-hati dan menghapusnya setelah selesai.
Mari sambungkan aplikasi ke sumber data:
Pilih aliran yang akan kita sambungkan (airline_tickets):
Selanjutnya, Anda perlu melampirkan IAM Role baru agar aplikasi dapat membaca dari aliran dan menulis ke aliran. Untuk melakukan ini, cukup dengan tidak mengubah apa pun di blok Izin akses:
Sekarang mari kita meminta penemuan skema data di aliran; untuk melakukan ini, klik tombol “Temukan skema”. Akibatnya, peran IAM akan diperbarui (yang baru akan dibuat) dan deteksi skema akan diluncurkan dari data yang telah masuk ke aliran:
Sekarang Anda harus pergi ke editor SQL. Ketika Anda mengklik tombol ini, sebuah jendela akan muncul meminta Anda untuk meluncurkan aplikasi - pilih apa yang ingin Anda luncurkan:
Masukkan kueri sederhana berikut ke jendela editor SQL dan klik Simpan dan Jalankan SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
Dalam database relasional, Anda bekerja dengan tabel menggunakan pernyataan INSERT untuk menambahkan catatan dan pernyataan SELECT untuk mengkueri data. Di Amazon Kinesis Data Analytics, Anda bekerja dengan aliran (STREAM) dan pompa (PUMP)—permintaan penyisipan berkelanjutan yang memasukkan data dari satu aliran dalam aplikasi ke aliran lain.
Kueri SQL yang disajikan di atas mencari tiket Aeroflot dengan biaya di bawah lima ribu rubel. Semua catatan yang memenuhi ketentuan ini akan ditempatkan di aliran DESTINATION_SQL_STREAM.
Di blok Tujuan, pilih aliran_khusus, dan di daftar tarik-turun Nama aliran dalam aplikasi DESTINATION_SQL_STREAM:
Hasil dari semua manipulasi akan terlihat seperti gambar di bawah ini:
Membuat dan berlangganan topik SNS
Buka Layanan Pemberitahuan Sederhana dan buat topik baru di sana dengan nama Maskapai:
Berlangganan topik ini dan tunjukkan nomor ponsel tujuan pengiriman notifikasi SMS:
Buat tabel di DynamoDB
Untuk menyimpan data mentah dari aliran airline_tickets, mari buat tabel di DynamoDB dengan nama yang sama. Kami akan menggunakan record_id sebagai kunci utama:
Membuat kolektor fungsi lambda
Mari kita buat fungsi lambda yang disebut Collector, yang tugasnya adalah melakukan polling aliran airline_tickets dan, jika catatan baru ditemukan di sana, masukkan catatan ini ke dalam tabel DynamoDB. Tentunya, selain hak default, lambda ini harus memiliki akses baca ke aliran data Kinesis dan akses tulis ke DynamoDB.
Membuat peran IAM untuk fungsi lambda kolektor
Pertama, mari buat IAM role baru untuk lambda bernama Lambda-TicketsProcessingRole:
Untuk contoh pengujian, kebijakan AmazonKinesisReadOnlyAccess dan AmazonDynamoDBFullAccess yang telah dikonfigurasi sebelumnya cukup sesuai, seperti yang ditunjukkan pada gambar di bawah ini:
Lambda ini harus diluncurkan oleh pemicu dari Kinesis ketika entri baru memasuki aliran_maskapai, jadi kita perlu menambahkan pemicu baru:
Yang tersisa hanyalah memasukkan kode dan menyimpan lambda.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Membuat pemberi notifikasi fungsi lambda
Fungsi lambda kedua, yang akan memantau aliran kedua (special_stream) dan mengirimkan pemberitahuan ke SNS, dibuat dengan cara yang sama. Oleh karena itu, lambda ini harus memiliki akses untuk membaca dari Kinesis dan mengirim pesan ke topik SNS tertentu, yang kemudian akan dikirim oleh layanan SNS ke semua pelanggan topik ini (email, SMS, dll.).
Membuat peran IAM
Pertama, kita membuat peran IAM Lambda-KinesisAlarm untuk lambda ini, lalu menetapkan peran ini ke lambda alarm_notifier yang sedang dibuat:
Lambda ini harus bekerja pada pemicu catatan baru untuk memasukkan aliran_khusus, jadi Anda perlu mengonfigurasi pemicu dengan cara yang sama seperti yang kami lakukan untuk lambda Kolektor.
Untuk mempermudah konfigurasi lambda ini, mari perkenalkan variabel lingkungan baru - TOPIC_ARN, tempat kita menempatkan ANR (Amazon Recourse Names) dari topik Maskapai Penerbangan:
Dan masukkan kode lambda, tidak ribet sama sekali:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Tampaknya di sinilah konfigurasi sistem manual selesai. Yang tersisa hanyalah menguji dan memastikan bahwa kami telah mengkonfigurasi semuanya dengan benar.
Terapkan dari kode Terraform
Persiapan yang diperlukan
Terraform adalah alat sumber terbuka yang sangat nyaman untuk menerapkan infrastruktur dari kode. Ia memiliki sintaksisnya sendiri yang mudah dipelajari dan memiliki banyak contoh tentang bagaimana dan apa yang harus diterapkan. Editor Atom atau Visual Studio Code memiliki banyak plugin praktis yang membuat bekerja dengan Terraform lebih mudah.
Anda dapat mengunduh distribusinya karenanya. Analisis mendetail tentang semua kemampuan Terraform berada di luar cakupan artikel ini, jadi kami akan membatasi diri pada poin utama.
Bagaimana memulainya
Kode lengkap proyek ini adalah di repositori saya. Kami mengkloning repositori untuk diri kami sendiri. Sebelum memulai, Anda perlu memastikan bahwa Anda telah menginstal dan mengonfigurasi AWS CLI, karena... Terraform akan mencari kredensial di file ~/.aws/credentials.
Praktik yang baik adalah menjalankan perintah plan sebelum menerapkan seluruh infrastruktur untuk melihat apa yang sedang dibuat Terraform untuk kita di cloud:
terraform.exe plan
Anda akan diminta memasukkan nomor telepon untuk mengirim pemberitahuan. Tidak perlu memasukkannya pada tahap ini.
Setelah menganalisis rencana operasi program, kita dapat mulai membuat sumber daya:
terraform.exe apply
Setelah mengirimkan perintah ini, Anda akan diminta lagi untuk memasukkan nomor telepon; tekan “ya” ketika pertanyaan tentang benar-benar melakukan tindakan ditampilkan. Ini akan memungkinkan Anda untuk mengatur seluruh infrastruktur, melakukan semua konfigurasi EC2 yang diperlukan, menerapkan fungsi lambda, dll.
Setelah semua sumber daya berhasil dibuat melalui kode Terraform, Anda perlu masuk ke detail aplikasi Kinesis Analytics (sayangnya, saya tidak menemukan cara melakukannya langsung dari kode).
Luncurkan aplikasi:
Setelah ini, Anda harus secara eksplisit menetapkan nama aliran dalam aplikasi dengan memilih dari daftar drop-down:
Sekarang semuanya siap untuk berangkat.
Menguji aplikasi
Terlepas dari cara Anda menerapkan sistem, secara manual atau melalui kode Terraform, cara kerjanya akan sama.
Kami masuk melalui SSH ke mesin virtual EC2 tempat Kinesis Agent diinstal dan menjalankan skrip api_caller.py
sudo ./api_caller.py TOKEN
Anda tinggal menunggu SMS ke nomor Anda:
SMS - pesan masuk ke ponsel Anda dalam waktu hampir 1 menit:
Masih harus dilihat apakah catatan telah disimpan dalam database DynamoDB untuk analisis selanjutnya yang lebih rinci. Tabelairline_tickets berisi kira-kira data berikut:
Kesimpulan
Selama pekerjaan yang dilakukan, sistem pemrosesan data online dibangun berdasarkan Amazon Kinesis. Opsi untuk menggunakan Agen Kinesis bersama dengan Kinesis Data Streams dan analitik real-time Kinesis Analytics menggunakan perintah SQL, serta interaksi Amazon Kinesis dengan layanan AWS lainnya telah dipertimbangkan.
Kami menerapkan sistem di atas dengan dua cara: cara manual yang agak panjang dan cara cepat dari kode Terraform.
Semua kode sumber proyek tersedia di repositori GitHub saya, saya sarankan Anda membiasakan diri dengannya.
Saya senang membahas artikel ini, saya menantikan komentar Anda. Saya mengharapkan kritik yang membangun.