Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Héy Habr!

Naha anjeun resep pesawat terbang? Abdi resep éta, tapi nalika ngasingkeun diri kuring ogé resep nganalisa data dina tiket pesawat tina hiji sumber anu terkenal - Aviasales.

Dinten ieu kami bakal nganalisis karya Amazon Kinesis, ngawangun sistem streaming sareng analytics real-time, masang database Amazon DynamoDB NoSQL salaku panyimpen data utama, sareng nyetél béwara SMS pikeun tiket anu pikaresepeun.

Sadaya detil aya di handapeun cut! indit!

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

perkenalan

Salaku conto, urang peryogi aksés ka Aviasales API. Aksés ka éta disayogikeun gratis sareng tanpa larangan; anjeun ngan ukur kedah ngadaptar dina bagian "Pamekar" pikeun nampi token API anjeun pikeun ngaksés data.

Tujuan utama tulisan ieu nyaéta pikeun masihan pamahaman umum ngeunaan pamakean inpormasi streaming dina AWS; kami ngémutan yén data anu dipulangkeun ku API anu dianggo henteu leres-leres énggal sareng dikirimkeun tina cache, nyaéta dibentuk dumasar kana panéangan ku pangguna situs Aviasales.ru sareng Jetradar.com salami 48 jam terakhir.

Kinesis-agén, dipasang dina mesin ngahasilkeun, nampi via API bakal otomatis parse sarta ngirimkeun data kana aliran dipikahoyong via Kinesis Data Analytics. Versi atah aliran ieu bakal ditulis langsung ka toko. Panyimpen data atah anu dipasang dina DynamoDB bakal ngamungkinkeun analisa tikét anu langkung jero ngaliwatan alat BI, sapertos AWS Quick Sight.

Kami bakal nganggap dua pilihan pikeun nyebarkeun sakumna infrastruktur:

  • Manual - via Konsol Manajemén AWS;
  • Infrastruktur tina kode Terraform kanggo automators puguh;

Arsitéktur sistem dimekarkeun

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Komponén dipaké:

  • Aviasales API - data anu dipulangkeun ku API ieu bakal dianggo pikeun sadaya padamelan anu salajengna;
  • EC2 Produsén Instance - mesin virtual biasa dina awan dimana aliran data input bakal dihasilkeun:
    • Agén Kinesis nyaéta aplikasi Java dipasang lokal dina mesin nu nyadiakeun cara gampang pikeun ngumpulkeun sarta ngirim data ka Kinesis (Kinesis Data Aliran atanapi Kinesis Firehose). Agén terus ngawas sakumpulan file dina diréktori anu ditangtukeun sareng ngirim data énggal ka Kinesis;
    • API Panelepon Script - A Aksara Python nu ngajadikeun requests ka API tur nyimpen respon kana folder nu diawaskeun ku Agen Kinesis;
  • Aliran Data Kinesis - Ladenan streaming data real-time kalayan kamampuan skala lega;
  • Kinesis Analytics nyaéta layanan tanpa server anu nyederhanakeun analisa streaming data sacara real waktos. Amazon Kinesis Data Analytics ngonpigurasikeun sumberdaya aplikasi tur otomatis skala pikeun nanganan sagala volume data asup;
  • AWS Lambda — jasa anu ngamungkinkeun anjeun ngajalankeun kode tanpa nyadangkeun atanapi nyetél server. Kabéh kakuatan komputasi otomatis diskalakeun pikeun tiap panggero;
  • Amazon DynamoDB - Basis data pasangan konci-nilai sareng dokumén anu nyayogikeun latén kirang ti 10 milidetik nalika ngajalankeun dina skala naon waé. Nalika nganggo DynamoDB, anjeun henteu kedah nyayogikeun, nambal, atanapi ngatur server naon waé. DynamoDB otomatis skala tabel pikeun nyaluyukeun jumlah sumberdaya sadia tur ngajaga kinerja luhur. Taya administrasi sistem diperlukeun;
  • Amazon SNS - ladenan anu diurus sapinuhna pikeun ngirim pesen nganggo modél penerbit-palanggan (Pub/Sub), dimana anjeun tiasa ngasingkeun jasa mikro, sistem anu disebarkeun sareng aplikasi tanpa server. SNS tiasa dianggo pikeun ngirim inpormasi ka pangguna akhir ngalangkungan béwara push mobile, pesen SMS sareng email.

Latihan awal

Pikeun niru aliran data, kuring mutuskeun pikeun ngagunakeun inpormasi tikét maskapai anu dipulangkeun ku Aviasales API. DI dokuméntasi daptar cukup éksténsif ngeunaan métode béda, hayu urang nyandak salah sahijina - "Harga Bulanan Calendar", nu mulih harga pikeun tiap poé dina bulan, dikelompokeun ku Jumlah mindahkeun. Upami anjeun henteu netepkeun bulan milarian dina pamundut, inpormasi bakal dipulangkeun pikeun sasih saatos anu ayeuna.

Janten, hayu urang ngadaptar sareng kéngingkeun token kami.

Hiji conto pamundut ieu di handap:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Metodeu di luhur narima data ti API ku nangtukeun hiji token dina pamundut bakal jalan, tapi kuring leuwih resep lulus token aksés ngaliwatan lulugu, sangkan bakal ngagunakeun metoda ieu dina aksara api_caller.py.

conto jawaban:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Conto réspon API di luhur nunjukkeun tikét ti St. Petersburg ka Phuk... Duh, ngimpi...
Kusabab Abdi ti Kazan, sarta Phuket kiwari "ukur ngimpi", hayu urang néangan tiket ti St.. Petersburg ka Kazan.

Éta nganggap yén anjeun parantos gaduh akun AWS. Abdi hoyong langsung ngagambar perhatian husus kana kanyataan yén Kinesis jeung ngirim bewara via SMS teu kaasup kana taunan Tingkat Gratis (gratis panggunaan). Tapi sanajan ieu, kalawan sababaraha dollar dina pikiran, éta rada mungkin pikeun ngawangun sistem diusulkeun sarta maénkeun kalayan eta. Sareng, tangtosna, tong hilap mupus sadaya sumber saatos henteu diperyogikeun deui.

Untungna, fungsi DynamoDb sareng lambda bakal gratis kanggo urang upami urang nyumponan wates gratis bulanan. Contona, pikeun DynamoDB: 25 GB gudang, 25 WCU/RCU jeung 100 juta queries. Sareng sajuta telepon fungsi lambda per bulan.

Panyebaran sistem manual

Nyetél Kinesis Data Streams

Hayu urang angkat kana jasa Kinesis Data Streams sareng jieun dua aliran énggal, masing-masing hiji beling.

Naon ari beling?
Beling mangrupikeun unit transfer data dasar tina aliran Amazon Kinesis. Hiji bagéan nyadiakeun mindahkeun data input dina laju 1 MB/s jeung mindahkeun data kaluaran dina laju 2 MB/s. Hiji bagéan ngarojong nepi ka 1000 éntri PUT per detik. Nalika nyieun aliran data, Anjeun kudu nangtukeun jumlah diperlukeun bagéan. Contona, anjeun tiasa ngadamel aliran data sareng dua bagéan. Aliran data ieu bakal nyadiakeun transfer data input dina 2 MB/s jeung mindahkeun data kaluaran dina 4 MB/s, ngarojong nepi ka 2000 PUT rékaman per detik.

Beuki beling dina aliran anjeun, langkung ageung throughputna. Sacara prinsip, ieu kumaha aliran skala - ku nambahkeun shards. Tapi beuki beling anjeun gaduh, langkung luhur hargana. Unggal beling hargana 1,5 cents per jam sareng tambahan 1.4 cents pikeun unggal juta unit payload PUT.

Hayu urang nyieun aliran anyar kalawan ngaran airline_tickets, 1 beling bakal cukup pikeun anjeunna:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ayeuna hayu urang nyieun thread sejen kalawan ngaran special_stream:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Setélan produser

Pikeun nganalisis tugas, cukup ngagunakeun conto EC2 biasa salaku produser data. Henteu kedah janten mesin virtual anu kuat sareng mahal; titik t2.micro tiasa leres-leres.

Catetan penting: contona, anjeun kedah nganggo gambar - Amazon Linux AMI 2018.03.0, éta gaduh setélan pangsaeutikna pikeun gancang ngaluncurkeun Agen Kinesis.

Pindah ka jasa EC2, jieun mesin virtual anyar, pilih AMI anu dipikahoyong kalayan jinis t2.micro, anu kalebet dina Free Tier:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Supados mesin virtual anu nembé diciptakeun tiasa berinteraksi sareng jasa Kinesis, éta kedah dipasihan hak pikeun ngalakukeunana. Cara anu pangsaéna pikeun ngalakukeun ieu nyaéta netepkeun Peran IAM. Ku alatan éta, dina Lengkah 3: Konpigurasikeun layar Rincian Instance, anjeun kedah milih Jieun Peran IAM anyar:

Nyiptakeun peran IAM pikeun EC2
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Dina jandéla anu muka, pilih yén urang nyiptakeun peran énggal pikeun EC2 sareng angkat ka bagian Idin:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ngagunakeun conto latihan, urang teu kudu lebet kana sagala intricacies konfigurasi granular hak sumberdaya, jadi urang bakal milih kawijakan tos ngonpigurasi ku Amazon: AmazonKinesisFullAccess na CloudWatchFullAccess.

Hayu urang masihan sababaraha ngaran bermakna pikeun peran ieu, contona: EC2-KinesisStreams-FullAccess. Hasilna kedah sami sareng anu dipidangkeun dina gambar di handap ieu:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Sanggeus nyieun peran anyar ieu, ulah poho pikeun ngagantelkeun kana conto mesin virtual dijieun:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Urang teu ngarobah nanaon sejenna dina layar ieu sarta ngaléngkah ka jandéla salajengna.

Setélan hard drive tiasa ditinggalkeun salaku standar, kitu ogé tag (sanaos prakték anu saé pikeun ngagunakeun tag, sahenteuna masihan nami conto sareng nunjukkeun lingkunganana).

Ayeuna kami aya dina Lengkah 6: tab Konpigurasikeun Grup Kaamanan, dimana anjeun kedah nyiptakeun anu énggal atanapi netepkeun grup Kaamanan anjeun anu tos aya, anu ngamungkinkeun anjeun nyambungkeun via ssh (port 22) kana conto. Pilih Sumber -> IP abdi aya sareng anjeun tiasa ngaluncurkeun conto.

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Pas ngalih ka status ngajalankeun, anjeun tiasa nyobian nyambung ka éta via ssh.

Pikeun tiasa damel sareng Agen Kinesis, saatos suksés nyambungkeun kana mesin, anjeun kedah ngalebetkeun paréntah di handap ieu dina terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Hayu urang jieun polder pikeun nyimpen réspon API:

sudo mkdir /var/log/airline_tickets

Sateuacan ngamimitian agén, anjeun kedah ngonpigurasikeun konfigurasi na:

sudo vim /etc/aws-kinesis/agent.json

Eusi file agent.json kedah siga kieu:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Salaku bisa ditempo ti file konfigurasi, agén bakal ngawas file kalawan extension .log dina /var/log/airline_tickets/ diréktori, parse aranjeunna sarta mindahkeun kana stream airline_tickets.

Urang ngabalikan deui jasa sareng pastikeun yén éta jalan sareng jalan:

sudo service aws-kinesis-agent restart

Ayeuna hayu urang unduh naskah Python anu bakal nyuhunkeun data tina API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skrip api_caller.py nyuhunkeun data ti Aviasales sareng nyimpen réspon anu ditampi dina diréktori anu di-scan ku agén Kinesis. Palaksanaan naskah ieu rada baku, aya kelas TicketsApi, eta ngidinan Anjeun pikeun asynchronously narik API. Urang lulus lulugu sareng token sareng menta parameter ka kelas ieu:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Pikeun nguji setélan sareng fungsionalitas agén anu leres, hayu urang uji ngajalankeun skrip api_caller.py:

sudo ./api_caller.py TOKEN

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Sareng urang ningali hasil padamelan dina log Agen sareng dina tab Monitoring dina aliran data airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Sakumaha anjeun tiasa tingali, sadayana jalan sareng Agen Kinesis suksés ngirim data kana aliran. Ayeuna hayu urang ngonpigurasikeun konsumen.

Nyetél Kinesis Data Analytics

Hayu urang ngalih ka komponén sentral sakabéh sistem - nyieun aplikasi anyar dina Kinesis Data Analytics ngaranna kinesis_analytics_airlines_app:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Kinesis Data Analytics ngamungkinkeun anjeun ngalakukeun analitik data sacara real-time tina Kinesis Streams nganggo basa SQL. Éta mangrupikeun jasa autoscaling pinuh (teu sapertos Kinesis Streams) yén:

  1. ngidinan Anjeun pikeun nyieun aliran anyar (Kaluaran Stream) dumasar kana pamundut ka sumber data;
  2. nyadiakeun aliran kalawan kasalahan anu lumangsung nalika aplikasi keur ngajalankeun (Error Stream);
  3. tiasa sacara otomatis nangtukeun skéma data input (tiasa didefinisikeun sacara manual upami diperyogikeun).

Ieu sanes jasa mirah - 0.11 USD per jam gawé, jadi Anjeun kudu make eta taliti tur ngahapus lamun geus rengse.

Hayu urang sambungkeun aplikasi ka sumber data:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Pilih aliran anu urang badé sambungkeun (airline_tickets):

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Salajengna, anjeun kedah ngagantelkeun Peran IAM énggal supados aplikasi tiasa maca tina aliran sareng nyerat kana aliran. Jang ngalampahkeun ieu, cukup teu ngarobah nanaon dina blok idin Aksés:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ayeuna hayu urang nyuhunkeun mendakan skéma data dina aliran; pikeun ngalakukeun ieu, klik tombol "Papanggihan skéma". Hasilna, peran IAM bakal diropéa (dijieun) sareng deteksi skéma bakal diluncurkeun tina data anu parantos sumping dina aliran:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ayeuna anjeun kedah angkat ka pangropéa SQL. Nalika anjeun ngaklik tombol ieu, jandela bakal muncul anu naroskeun anjeun ngaluncurkeun aplikasi - pilih naon anu anjeun hoyong jalankeun:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Selapkeun pamundut basajan ieu kana jandela pangropéa SQL teras klik Simpen sareng Jalankeun SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Dina basis data relasional, anjeun damel sareng tabel nganggo pernyataan INSERT pikeun nambihan rékaman sareng pernyataan SELECT pikeun naroskeun data. Dina Amazon Kinesis Data Analytics, anjeun damel sareng aliran (STREAMs) sareng pompa (PUMPs) - paménta sisipan kontinyu anu ngalebetkeun data tina hiji aliran dina aplikasi kana aliran anu sanés.

Paménta SQL anu dipidangkeun di luhur milarian tikét Aeroflot kalayan biaya sahandapeun lima rébu rubles. Sadaya rékaman anu minuhan kaayaan ieu bakal disimpen dina aliran DESTINATION_SQL_STREAM.

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Dina blok Tujuan, pilih aliran special_stream, sarta dina daptar turun-handap ngaran aliran Dina-aplikasi DESTINATION_SQL_STREAM:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Hasil tina sagala manipulasi kedah sami sareng gambar di handap ieu:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Nyiptakeun sareng ngalanggan topik SNS

Pindah ka Layanan Bewara Basajan sareng jieun topik anyar di dinya kalayan nami Airlines:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ngalanggan topik ieu sareng nunjukkeun nomer telepon sélulér anu bakal dikirimkeun bewara SMS:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Jieun tabel dina DynamoDB

Pikeun nyimpen data atah tina stream airline_tickets maranéhanana, hayu urang nyieun tabel di DynamoDB kalawan ngaran nu sarua. Urang bakal ngagunakeun record_id salaku konci primér:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Nyiptakeun kolektor fungsi lambda

Hayu urang nyieun fungsi lambda disebut Kolektor, anu tugas bakal polling stream airline_tickets na, upami rékaman anyar kapanggih aya, selapkeun rékaman ieu kana tabel DynamoDB. Jelas, salian hak standar, lambda ieu kedah gaduh aksés maca kana aliran data Kinesis sareng aksés nulis kana DynamoDB.

Nyiptakeun peran IAM pikeun fungsi kolektor lambda
Kahiji, hayu urang nyieun peran IAM anyar pikeun lambda ngaranna Lambda-TicketsProcessingRole:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Pikeun conto tés, kabijakan AmazonKinesisReadOnlyAccess sareng AmazonDynamoDBFullAccess anu tos dikonpigurasi cocog, sapertos anu dipidangkeun dina gambar di handap ieu:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Lambda ieu kedah diluncurkeun ku pemicu ti Kinesis nalika éntri énggal asup ka airline_stream, janten urang kedah nambihan pemicu énggal:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Sadaya anu tetep nyaéta ngalebetkeun kode sareng simpen lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Nyiptakeun béwara fungsi lambda

Fungsi lambda kadua, anu bakal ngawas aliran kadua (special_stream) sareng ngirim bewara ka SNS, didamel ku cara anu sami. Ku alatan éta, lambda ieu kudu boga aksés ka maca tina Kinesis sarta ngirim pesen ka topik SNS dibikeun, nu lajeng bakal dikirim ku layanan SNS ka sadaya palanggan topik ieu (email, SMS, jsb).

Nyiptakeun peran IAM
Kahiji, urang nyieun peran IAM Lambda-KinesisAlarm pikeun lambda ieu, lajeng nangtukeun peran ieu ka alarm_notifier lambda keur dijieun:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

Lambda ieu kedah dianggo dina pemicu pikeun rékaman anyar asup ka special_stream, jadi Anjeun kudu ngonpigurasikeun pemicu dina cara nu sarua salaku urang ngalakukeun pikeun kolektor lambda.

Sangkan leuwih gampang pikeun ngonpigurasikeun lambda ieu, hayu urang ngenalkeun variabel lingkungan anyar - TOPIC_ARN, dimana urang nempatkeun ANR (Amazon Recourse Names) tina topik Airlines:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Sareng selapkeun kode lambda, éta henteu rumit pisan:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Sigana yén ieu téh dimana konfigurasi sistem manual geus réngsé. Anu tetep nyaéta pikeun nguji sareng mastikeun yén kami parantos ngonpigurasi sadayana leres.

Nyebarkeun tina kode Terraform

Persiapan anu diperlukeun

Terraform mangrupakeun alat open-source pisan merenah pikeun deploying infrastruktur tina kode. Éta ngagaduhan sintaksis sorangan anu gampang diajar sareng seueur conto kumaha sareng naon anu bakal disebarkeun. Éditor Atom atanapi Visual Studio Code gaduh seueur plugins gunana anu ngajantenkeun damel sareng Terraform langkung gampang.

Anjeun tiasa ngundeur distribusi di dieu. Analisis detil sadaya kamampuan Terraform di luar ruang lingkup tulisan ieu, ku kituna urang bakal ngawatesan diri kana titik-titik utama.

Kumaha ngamimitian

Kodeu pinuh ku proyék nyaéta dina gudang kuring. Urang clone gudang ka diri urang sorangan. Sateuacan ngamimitian, anjeun kedah mastikeun yén anjeun parantos dipasang sareng ngonpigurasi AWS CLI, sabab ... Terraform bakal milarian kredensial dina file ~/.aws/credentials.

Praktek anu saé nyaéta ngajalankeun paréntah rencana sateuacan nyebarkeun sakumna infrastruktur pikeun ningali naon anu ayeuna didamel Terraform pikeun urang dina méga:

terraform.exe plan

Anjeun bakal dipenta pikeun nuliskeun nomer telepon pikeun ngirim bewara. Teu perlu ngasupkeun eta dina tahap ieu.

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Saatos nganalisa rencana operasi program, urang tiasa ngamimitian nyiptakeun sumber:

terraform.exe apply

Saatos ngirim paréntah ieu, anjeun bakal dipenta deui pikeun nuliskeun nomer telepon; pencét "enya" nalika patarosan ngeunaan leres-leres ngalakukeun tindakan ditampilkeun. Ieu bakal ngamungkinkeun anjeun nyetél sadaya prasarana, ngalaksanakeun sagala konfigurasi anu diperyogikeun EC2, nyebarkeun fungsi lambda, jsb.

Saatos sadaya sumber parantos suksés didamel ngalangkungan kode Terraform, anjeun kedah lebet kana detil aplikasi Kinesis Analytics (hanjakalna, kuring henteu mendakan cara ngalakukeun ieu langsung tina kode).

Jalankeun aplikasi:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Saatos ieu, anjeun kedah sacara eksplisit nyetél nami aliran dina aplikasi ku milih tina daptar turun-handap:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Ayeuna sagalana geus siap.

Nguji aplikasi

Henteu paduli kumaha anjeun nyebarkeun sistem, sacara manual atanapi ngalangkungan kode Terraform, éta bakal tiasa dianggo sami.

Kami log in via SSH ka mesin virtual EC2 dimana Agen Kinesis dipasang sareng ngajalankeun skrip api_caller.py

sudo ./api_caller.py TOKEN

Sadaya anu anjeun kedah laksanakeun nyaéta ngantosan SMS ka nomer anjeun:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
SMS - pesen dugi ka telepon anjeun ampir 1 menit:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server
Tetep katingal naha rékaman disimpen dina database DynamoDB pikeun salajengna, analisis leuwih lengkep. Tabel airline_tickets ngandung kira-kira data ieu:

Integrasi API Aviasales sareng Amazon Kinesis sareng kesederhanaan tanpa server

kacindekan

Dina kursus gawé dipigawé, hiji sistem ngolah data online diwangun dumasar kana Amazon Kinesis. Pilihan pikeun ngagunakeun Agen Kinesis ditéang sareng Kinesis Data Streams sareng analytics real-time Kinesis Analytics nganggo paréntah SQL, ogé interaksi Amazon Kinesis sareng jasa AWS anu sanés dianggap.

Kami nyebarkeun sistem di luhur ku dua cara: manual anu rada panjang sareng anu gancang tina kode Terraform.

Sadaya kode sumber proyék sayogi dina gudang GitHub kuring, Kuring nyarankeun Anjeun familiarize diri jeung eta.

Abdi bagja ngabahas tulisan éta, kuring ngarepkeun koméntar anjeun. Abdi ngarepkeun kritik anu ngawangun.

Kuring miharep anjeun sukses!

sumber: www.habr.com

Tambahkeun komentar