Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Hey Habr!

Apa sampeyan seneng pesawat mabur? Aku seneng, nanging sajrone mandhiri, aku uga seneng nganalisa data babagan tiket pesawat saka sumber sing kondhang - Aviasales.

Dina iki kita bakal nganalisa karya Amazon Kinesis, mbangun sistem streaming kanthi analytics wektu nyata, nginstal basis data Amazon DynamoDB NoSQL minangka panyimpenan data utama, lan nyetel kabar SMS kanggo tiket sing menarik.

Kabeh rincian ana ing sangisore potong! Tindak!

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Pambuka

Contone, kita butuh akses menyang Aviasales API. Akses kasebut diwenehake kanthi gratis lan tanpa watesan; sampeyan mung kudu ndhaptar ing bagean "Pengembang" kanggo nampa token API kanggo ngakses data.

Tujuan utama artikel iki yaiku kanggo menehi pangerten umum babagan panggunaan streaming informasi ing AWS; kita nganggep manawa data sing dibalekake dening API sing digunakake ora strictly up-to-date lan ditularake saka cache, yaiku dibentuk adhedhasar telusuran dening pangguna situs Aviasales.ru lan Jetradar.com suwene 48 jam.

Kinesis-agen, diinstal ing mesin prodhuksi, ditampa liwat API bakal otomatis parse lan ngirim data menyang stream dikarepake liwat Kinesis Data Analytics. Versi mentah saka stream iki bakal ditulis langsung menyang toko. Panyimpenan data mentah sing dipasang ing DynamoDB bakal ngidini analisis tiket sing luwih jero liwat alat BI, kayata AWS Quick Sight.

Kita bakal nimbang rong pilihan kanggo nyebarake kabeh infrastruktur:

  • Manual - liwat AWS Management Console;
  • Infrastruktur saka kode Terraform kanggo automators puguh;

Arsitektur sistem sing dikembangake

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Komponen sing digunakake:

  • Aviasales API - data bali dening API iki bakal digunakake kanggo kabeh karya sakteruse;
  • EC2 Produser Instance - mesin virtual biasa ing méga ing ngendi aliran data input bakal diasilake:
    • Agen Kinesis minangka aplikasi Java sing diinstal sacara lokal ing mesin sing nyedhiyakake cara sing gampang kanggo ngumpulake lan ngirim data menyang Kinesis (Kinesis Data Streams utawa Kinesis Firehose). Agen terus-terusan ngawasi sakumpulan file ing direktori sing ditemtokake lan ngirim data anyar menyang Kinesis;
    • API Caller Script - A script Python sing nggawe panjalukan kanggo API lan nempatno respon menyang folder sing teliti dening Agen Kinesis;
  • Aliran Data Kinesis - layanan streaming data wektu nyata kanthi kapabilitas skala lebar;
  • Analisis Kinesis minangka layanan tanpa server sing nyederhanakake analisis data streaming ing wektu nyata. Amazon Kinesis Data Analytics ngatur sumber daya aplikasi lan kanthi otomatis skala kanggo nangani volume data sing mlebu;
  • AWAK Lambda — layanan sing ngidini sampeyan mbukak kode tanpa gawe serep utawa nyetel server. Kabeh daya komputasi otomatis scaled kanggo saben telpon;
  • Amazon DynamoDB - Basis data pasangan kunci-nilai lan dokumen sing nyedhiyakake latensi kurang saka 10 milidetik nalika mlaku ing skala apa wae. Nalika nggunakake DynamoDB, sampeyan ora perlu nyedhiyakake, tembelan, utawa ngatur server apa wae. DynamoDB kanthi otomatis ngukur tabel kanggo nyetel jumlah sumber daya sing kasedhiya lan njaga kinerja dhuwur. Ora ana administrasi sistem sing dibutuhake;
  • Amazon SNS minangka layanan olahpesen sing dikelola kanthi lengkap adhedhasar model publisher-subscriber (Pub/Sub), sing bisa digunakake kanggo ngisolasi layanan mikro, sistem sing disebarake, lan aplikasi tanpa server. SNS bisa digunakake kanggo ngirim informasi menyang pangguna pungkasan liwat kabar push seluler, pesen SMS lan email.

Latihan wiwitan

Kanggo niru aliran data, aku mutusake nggunakake informasi tiket maskapai sing bali dening Aviasales API. ING dokumentasi dhaftar cukup ekstensif saka macem-macem cara, ayo kang njupuk salah siji saka wong-wong mau - "Tanggalan Price Monthly", kang ngasilake prices kanggo saben dina sasi, diklompokaké dening nomer transfer. Yen sampeyan ora nemtokake sasi panelusuran ing panyuwunan, informasi bakal bali kanggo sasi sawise sing saiki.

Dadi, ayo ndhaptar lan entuk token.

Conto panyuwunan ing ngisor iki:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Cara ing ndhuwur kanggo nampa data saka API kanthi nemtokake token ing panyuwunan bakal bisa digunakake, nanging aku luwih seneng ngliwati token akses liwat header, supaya kita bakal nggunakake metode iki ing skrip api_caller.py.

Tuladha wangsulan:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Conto tanggapan API ing ndhuwur nuduhake tiket saka St. Petersburg menyang Phuk... Oh, ngimpi ...
Awit aku saka Kazan, lan Phuket saiki "mung ngimpi", ayo goleki tiket saka St.. Petersburg kanggo Kazan.

Iku nganggep yen sampeyan wis duwe akun AWS. Aku kaya kanggo langsung narik kawigaten manungsa waé khusus kanggo kasunyatan sing Kinesis lan ngirim kabar liwat SMS ora klebu ing taunan Tingkat Gratis (gratis nggunakake). Nanging sanajan iki, karo sawetara dolar ing atine, iku cukup bisa kanggo mbangun sistem ngajokaken lan muter karo. Lan, mesthi, aja lali mbusak kabeh sumber daya sawise ora dibutuhake maneh.

Untunge, fungsi DynamoDb lan lambda bakal gratis kanggo kita yen kita ketemu watesan gratis saben wulan. Contone, kanggo DynamoDB: panyimpenan 25 GB, 25 WCU/RCU lan 100 yuta pitakon. Lan yuta fungsi lambda telpon saben sasi.

Penyebaran sistem manual

Nyetel Kinesis Data Streams

Ayo pindhah menyang layanan Kinesis Data Streams lan nggawe loro stream anyar, siji shard kanggo saben.

Apa iku beling?
Shard minangka unit transfer data dhasar saka aliran Amazon Kinesis. Siji segmen nyedhiyakake transfer data input kanthi kacepetan 1 MB/s lan transfer data output kanthi kecepatan 2 MB/s. Siji segmen ndhukung nganti 1000 entri PUT per detik. Nalika nggawe aliran data, sampeyan kudu nemtokake jumlah segmen sing dibutuhake. Contone, sampeyan bisa nggawe stream data kanthi rong segmen. Aliran data iki bakal nyedhiyakake transfer data input ing 2 MB / s lan transfer data output ing 4 MB / s, ndhukung nganti 2000 cathetan PUT per detik.

Luwih akeh pecahan ing stream sampeyan, luwih akeh throughpute. Ing asas, iki carane mili skala - kanthi nambah shards. Nanging luwih akeh pecahan sampeyan, luwih dhuwur regane. Saben beling regane 1,5 sen saben jam lan tambahan 1.4 sen kanggo saben yuta unit muatan PUT.

Ayo nggawe stream anyar kanthi jeneng tiket_maskapai, 1 shard bakal cukup kanggo dheweke:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Saiki ayo nggawe thread liyane kanthi jeneng khusus_stream:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Setelan produser

Kanggo nganalisa tugas, cukup nggunakake conto EC2 biasa minangka produser data. Ora kudu mesin virtual sing kuat lan larang; titik t2.micro bakal apik.

Cathetan penting: contone, sampeyan kudu nggunakake gambar - Amazon Linux AMI 2018.03.0, nduweni setelan sing luwih sithik kanggo ngluncurake Agen Kinesis kanthi cepet.

Pindhah menyang layanan EC2, nggawe mesin virtual anyar, pilih AMI sing dikarepake kanthi jinis t2.micro, sing kalebu ing Free Tier:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Supaya mesin virtual sing mentas digawe bisa sesambungan karo layanan Kinesis, kudu diwenehi hak kanggo nglakoni. Cara paling apik kanggo nindakake iki yaiku nemtokake Peran IAM. Mulane, ing Langkah 3: Konfigurasi layar Rincian Instance, sampeyan kudu milih Nggawe Peran IAM anyar:

Nggawe peran IAM kanggo EC2
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Ing jendhela sing mbukak, pilih manawa kita nggawe peran anyar kanggo EC2 lan pindhah menyang bagean Idin:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Nggunakake conto latihan, kita ora kudu mbukak kabeh seluk-beluk konfigurasi granular hak sumber daya, supaya kita bakal milih kawicaksanan sing wis diatur dening Amazon: AmazonKinesisFullAccess lan CloudWatchFullAccess.

Ayo menehi sawetara jeneng migunani kanggo peran iki, contone: EC2-KinesisStreams-FullAccess. Asil kudu padha karo gambar ing ngisor iki:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Sawise nggawe peran anyar iki, aja lali masangake menyang conto mesin virtual sing digawe:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Kita ora ngganti apa-apa liyane ing layar iki lan pindhah menyang jendhela sabanjuré.

Setelan hard drive bisa ditinggalake minangka standar, uga minangka tag (sanajan praktik apik nggunakake tag, paling ora menehi jeneng conto lan nuduhake lingkungan).

Saiki kita ana ing Langkah 6: Konfigurasi tab Grup Keamanan, ing ngendi sampeyan kudu nggawe sing anyar utawa nemtokake grup Keamanan sing wis ana, sing ngidini sampeyan nyambungake liwat ssh (port 22) menyang conto kasebut. Pilih Sumber -> IPku ing kana lan sampeyan bisa miwiti conto kasebut.

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Sanalika ngalih menyang status mlaku, sampeyan bisa nyoba kanggo nyambung menyang liwat ssh.

Kanggo bisa nggarap Agen Kinesis, sawise kasil nyambungake menyang mesin, sampeyan kudu ngetik printah ing ngisor iki ing terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Ayo nggawe folder kanggo nyimpen respon API:

sudo mkdir /var/log/airline_tickets

Sadurunge miwiti agen, sampeyan kudu ngatur konfigurasi:

sudo vim /etc/aws-kinesis/agent.json

Isi file agent.json kudu katon kaya iki:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Kaya sing bisa dideleng saka file konfigurasi, agen bakal ngawasi file kanthi ekstensi .log ing direktori /var/log/airline_tickets/, parse lan transfer menyang stream airline_tickets.

Kita miwiti maneh layanan lan priksa manawa wis aktif:

sudo service aws-kinesis-agent restart

Saiki ayo ndownload skrip Python sing bakal njaluk data saka API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrip api_caller.py njaluk data saka Aviasales lan nyimpen respon sing ditampa ing direktori sing dipindai agen Kinesis. Implementasine script iki cukup standar, ana kelas TicketsApi, ngijini sampeyan kanggo asynchronously narik API. Kita ngliwati header kanthi token lan njaluk parameter menyang kelas iki:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Kanggo nguji setelan lan fungsi agen sing bener, ayo nyoba skrip api_caller.py:

sudo ./api_caller.py TOKEN

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Lan kita ndeleng asil karya ing log Agen lan ing tab Ngawasi ing stream data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Nalika sampeyan bisa ndeleng, kabeh bisa lan Agen Kinesis kasil ngirim data menyang stream. Saiki ayo konfigurasi konsumen.

Nggawe Kinesis Data Analytics

Ayo pindhah menyang komponen tengah kabeh sistem - nggawe aplikasi anyar ing Kinesis Data Analytics jenenge kinesis_analytics_airlines_app:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Kinesis Data Analytics ngidini sampeyan nindakake analytics data wektu nyata saka Kinesis Streams nggunakake basa SQL. Iki minangka layanan autoscaling kanthi lengkap (ora kaya Kinesis Streams) sing:

  1. ngijini sampeyan kanggo nggawe stream anyar (Output Stream) adhedhasar panjalukan kanggo sumber data;
  2. nyedhiyakake stream kanthi kasalahan sing kedadeyan nalika aplikasi lagi mlaku (Error Stream);
  3. bisa kanthi otomatis nemtokake skema data input (bisa kanthi manual redefined yen perlu).

Iki dudu layanan sing murah - 0.11 USD saben jam kerja, dadi sampeyan kudu nggunakake kanthi ati-ati lan mbusak yen wis rampung.

Ayo sambungake aplikasi menyang sumber data:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Pilih stream sing arep kita sambungake (airline_tickets):

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Sabanjure, sampeyan kudu masang Peran IAM anyar supaya aplikasi bisa maca saka stream lan nulis menyang stream. Kanggo nindakake iki, cukup ora ngganti apa wae ing blok ijin Akses:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Saiki ayo njaluk panemuan skema data ing stream; kanggo nindakake iki, klik tombol "Temokake skema". Akibaté, peran IAM bakal dianyari (sing anyar bakal digawe) lan deteksi skema bakal diluncurake saka data sing wis teka ing stream:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Saiki sampeyan kudu pindhah menyang editor SQL. Nalika sampeyan ngeklik tombol iki, bakal katon jendhela sing njaluk sampeyan miwiti aplikasi - pilih apa sing pengin diluncurake:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Lebokake pitakon prasaja ing ngisor iki menyang jendela editor SQL banjur klik Simpen lan Jalanake SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Ing basis data relasional, sampeyan bisa nggarap tabel nggunakake statement INSERT kanggo nambah cathetan lan statement SELECT kanggo query data. Ing Amazon Kinesis Data Analytics, sampeyan nggarap stream (STREAMs) lan pumps (PUMPs) - panjalukan sisipan terus-terusan sing nglebokake data saka siji stream ing aplikasi menyang stream liyane.

Pitakonan SQL sing ditampilake ing ndhuwur nggoleki tiket Aeroflot kanthi biaya kurang saka limang ewu rubel. Kabeh cathetan sing cocog karo kahanan kasebut bakal diselehake ing aliran DESTINATION_SQL_STREAM.

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Ing blok Tujuan, pilih stream special_stream, lan ing dhaptar gulung mudhun jeneng stream ing aplikasi DESTINATION_SQL_STREAM:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Asil saka kabeh manipulasi kudu kaya gambar ing ngisor iki:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Nggawe lan langganan topik SNS

Pindhah menyang Layanan Notifikasi Sederhana lan gawe topik anyar ing kana kanthi jeneng Maskapai:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Langganan topik iki lan tulisake nomer ponsel sing bakal dikirim kabar SMS:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Nggawe tabel ing DynamoDB

Kanggo nyimpen data mentah saka stream airline_tickets, ayo nggawe tabel ing DynamoDB kanthi jeneng sing padha. Kita bakal nggunakake record_id minangka kunci utama:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Nggawe kolektor fungsi lambda

Ayo nggawe fungsi lambda disebut Collector, sing tugas bakal polling stream airline_tickets lan, yen cathetan anyar ditemokake ana, lebokake cathetan iki menyang tabel DynamoDB. Temenan, saliyane hak standar, lambda iki kudu duwe akses maca menyang aliran data Kinesis lan akses nulis menyang DynamoDB.

Nggawe peran IAM kanggo fungsi lambda kolektor
Pisanan, ayo nggawe peran IAM anyar kanggo lambda sing jenenge Lambda-TicketsProcessingRole:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Kanggo conto tes, kabijakan AmazonKinesisReadOnlyAccess lan AmazonDynamoDBFullAccess sing wis dikonfigurasi cukup cocog, kaya sing ditampilake ing gambar ing ngisor iki:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Lambda iki kudu diluncurake kanthi pemicu saka Kinesis nalika entri anyar mlebu ing airline_stream, mula kita kudu nambah pemicu anyar:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Kabeh sing isih ana yaiku masang kode lan nyimpen lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Nggawe notifier fungsi lambda

Fungsi lambda kapindho, sing bakal ngawasi aliran kapindho (special_stream) lan ngirim kabar menyang SNS, digawe kanthi cara sing padha. Mulane, lambda iki kudu duwe akses kanggo maca saka Kinesis lan ngirim pesen menyang topik SNS tartamtu, kang banjur bakal dikirim dening layanan SNS kanggo kabeh pelanggan topik iki (email, SMS, etc.).

Nggawe peran IAM
Pisanan, kita nggawe peran IAM Lambda-KinesisAlarm kanggo lambda iki, banjur nemtokake peran iki menyang alarm_notifier lambda sing digawe:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

Lambda iki kudu bisa ing pemicu kanggo cathetan anyar kanggo ngetik special_stream, supaya sampeyan kudu ngatur pemicu ing cara sing padha kanggo Collector lambda.

Kanggo luwih gampang ngatur lambda iki, ayo ngenalake variabel lingkungan anyar - TOPIC_ARN, ing ngendi kita nyelehake ANR (Amazon Recourse Names) saka topik Airlines:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Lan lebokake kode lambda, ora rumit:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Iku misale jek sing iki ngendi konfigurasi sistem manual wis rampung. Kabeh sing isih ana yaiku nyoba lan priksa manawa kita wis ngatur kabeh kanthi bener.

Nyebar saka kode Terraform

Persiapan sing dibutuhake

Terraform minangka alat open-source sing trep banget kanggo nyebarake infrastruktur saka kode. Nduwe sintaks dhewe sing gampang disinaoni lan duwe akeh conto babagan carane lan apa sing bakal ditindakake. Editor Atom utawa Visual Studio Code nduweni akeh plugin sing bisa digunakake kanggo nggarap Terraform.

Sampeyan bisa ngundhuh distribusi saka kene. Analisis rinci babagan kabeh kemampuan Terraform ora ana ing ruang lingkup artikel iki, mula kita bakal mbatesi titik utama.

Cara mbukak

Kode lengkap proyek kasebut yaiku ing repositoriku. Kita kloning repositori kanggo awake dhewe. Sadurunge miwiti, sampeyan kudu nggawe manawa sampeyan wis nginstal lan ngatur AWS CLI, amarga ... Terraform bakal nggoleki kredensial ing file ~/.aws/credentials.

Praktek sing apik yaiku nglakokake prentah rencana sadurunge nggunakake kabeh infrastruktur kanggo ndeleng apa sing saiki digawe Terraform kanggo kita ing awan:

terraform.exe plan

Sampeyan bakal dijaluk ngetik nomer telpon kanggo ngirim kabar. Ora perlu mlebu ing tahap iki.

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Sawise nganalisa rencana operasi program, kita bisa miwiti nggawe sumber daya:

terraform.exe apply

Sawise ngirim printah iki, sampeyan bakal dijaluk ngetik nomer telpon maneh; nelpon "ya" yen ana pitakonan babagan tumindak bener ditampilake. Iki bakal ngidini sampeyan nyiyapake kabeh infrastruktur, nindakake kabeh konfigurasi EC2 sing dibutuhake, masang fungsi lambda, lsp.

Sawise kabeh sumber daya wis kasil digawe liwat kode Terraform, sampeyan kudu pindhah menyang rincian aplikasi Kinesis Analytics (sayangé, aku ora nemokake carane nindakake iki langsung saka kode).

Bukak aplikasi:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Sawise iki, sampeyan kudu kanthi tegas nyetel jeneng stream ing aplikasi kanthi milih saka dhaptar gulung mudhun:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Saiki kabeh wis siyap.

Nguji aplikasi

Preduli saka carane sampeyan masang sistem, kanthi manual utawa liwat kode Terraform, iku bakal bisa digunakake padha.

Kita mlebu liwat SSH menyang mesin virtual EC2 ing ngendi Agen Kinesis diinstal lan mbukak skrip api_caller.py

sudo ./api_caller.py TOKEN

Sampeyan mung kudu ngenteni SMS menyang nomer sampeyan:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
SMS - pesen teka ing telpon meh 1 menit:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server
Iku tetep kanggo ndeleng apa cathetan wis disimpen ing database DynamoDB kanggo sakteruse, analisis luwih rinci. Tabel airline_tickets ngemot kira-kira data ing ngisor iki:

Integrasi API Aviasales karo Amazon Kinesis lan kesederhanaan tanpa server

kesimpulan

Sajrone karya rampung, sistem pangolahan data online dibangun adhedhasar Amazon Kinesis. Pilihan kanggo nggunakake Agen Kinesis magepokan karo Kinesis Data Streams lan analytics wektu nyata Kinesis Analytics nggunakake printah SQL, uga interaksi Amazon Kinesis karo layanan AWS liyane dianggep.

Kita nyebarake sistem ing ndhuwur kanthi rong cara: manual sing rada dawa lan cepet saka kode Terraform.

Kabeh kode sumber proyek kasedhiya ing repositori GitHubku, Aku saranake sampeyan familiarize dhewe karo.

Aku seneng ngrembug artikel kasebut, aku ngenteni komentar sampeyan. Mugi-mugi kritik ingkang mbangun.

Muga-muga sampeyan sukses!

Source: www.habr.com

Add a comment