Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Ehi Habr!

Ti piace l'aviò volante? Mi piace, ma durante l'autoisolamentu sò ancu innamuratu di l'analisi di dati nantu à i biglietti aerei da una risorsa ben cunnisciuta - Aviasales.

Oghje analizzeremu u travagliu di Amazon Kinesis, custruiscenu un sistema di streaming cù analitiche in tempu reale, installate a basa di dati Amazon DynamoDB NoSQL cum'è u almacenamentu di dati principale, è cunfigurà notificazioni SMS per biglietti interessanti.

Tutti i dettagli sò sottu à u tagliu! Vai !

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Introduzione

Per esempiu, avemu bisognu di accessu à Aviasales API. L'accessu à questu hè furnitu gratuitamente è senza restrizioni; basta à registrà in a sezione "Sviluppatori" per riceve u vostru token API per accede à e dati.

U scopu principale di questu articulu hè di dà una cunniscenza generale di l'usu di l'informazioni in streaming in AWS; avemu cunsideratu chì e dati restituiti da l'API utilizata ùn sò micca strettamente aghjurnati è sò trasmessi da a cache, chì hè furmatu basatu nantu à e ricerche da l'utilizatori di i siti Aviasales.ru è Jetradar.com per l'ultime 48 ore.

Kinesis-agent, stallatu nantu à a macchina produttrice, ricivutu via l'API automaticamente analizà è trasmette dati à u flussu desideratu via Kinesis Data Analytics. A versione cruda di stu flussu serà scritta direttamente à a tenda. L'almacenamiento di dati grezzi implementatu in DynamoDB permetterà un'analisi più profonda di u bigliettu attraversu strumenti di BI, cum'è AWS Quick Sight.

Cunsidereremu duie opzioni per implementà tutta l'infrastruttura:

  • Manuale - via AWS Management Console;
  • L'infrastruttura da u codice Terraform hè per l'automatichi lazy;

L'architettura di u sistema sviluppatu

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Cumpunenti utilizati:

  • Aviasales API - i dati restituiti da questa API seranu utilizati per tutti i travaglii successivi;
  • EC2 Produttore Istanza - una macchina virtuale regulare in u nuvulu nantu à quale u flussu di dati di input serà generatu:
    • Agente Kinesis hè una applicazione Java installata in u locu nantu à a macchina chì furnisce un modu faciule per cullà è mandà dati à Kinesis (Kinesis Data Streams o Kinesis Firehose). L'agente monitoreghja constantemente un inseme di schedari in i repertorii specificati è manda novi dati à Kinesis;
    • API Caller Script - Un script Python chì face richieste à l'API è mette a risposta in un cartulare chì hè monitoratu da l'Agente Kinesis;
  • Kinesis Data Streams - serviziu di streaming di dati in tempu reale cù capacità di scala larga;
  • Kinesis Analytics hè un serviziu senza servitore chì simplifica l'analisi di dati in streaming in tempu reale. Amazon Kinesis Data Analytics cunfigurà e risorse di l'applicazione è scala automaticamente per trattà ogni volume di dati entranti;
  • AWS Lambda - un serviziu chì vi permette di eseguisce codice senza fà una copia di salvezza o stallà i servitori. Tuttu u putere di computing hè automaticamente scalatu per ogni chjama;
  • Amazon DynamoDB - Una basa di dati di coppie chjave-valore è documenti chì furnisce una latenza di menu di 10 millisecondi quandu funziona à qualsiasi scala. Quandu aduprate DynamoDB, ùn avete micca bisognu di furnisce, patch, o gestisce qualsiasi servitori. DynamoDB scala automaticamente e tavule per aghjustà a quantità di risorse dispunibili è mantene un altu rendiment. Nisuna amministrazione di u sistema hè necessariu;
  • Amazon SNS - un serviziu cumpletamente gestitu per l'invio di missaghji cù u mudellu editore-abbonatu (Pub/Sub), cù quale pudete isolà i microservizi, sistemi distribuiti è applicazioni senza servitore. SNS pò esse usatu per mandà infurmazioni à l'utilizatori finali per mezu di notificazioni push mobili, SMS è email.

Formazione iniziale

Per emulà u flussu di dati, aghju decisu d'utilizà l'infurmazione di u bigliettu di l'aereo restituita da l'API Aviasales. IN ducumentazione una lista abbastanza larga di metudi diffirenti, pigliemu unu di elli - "Calendariu di Prezzi Mensili", chì torna i prezzi per ogni ghjornu di u mese, raggruppati da u numeru di trasferimenti. Se ùn specificate micca u mese di ricerca in a dumanda, l'infurmazione serà restituita per u mesi dopu à u currente.

Allora, registremu è uttene u nostru token.

Un esempiu di dumanda hè quì sottu:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

U metudu di sopra di riceve dati da l'API specificendu un token in a dumanda hà da travaglià, ma preferimu passà u token d'accessu per l'intestazione, cusì avemu aduprà stu metudu in u script api_caller.py.

Esempiu di risposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L'esempiu di risposta API sopra mostra un bigliettu da San Petruburgu à Phuk... Oh, chì sognu...
Siccomu sò da Kazan, è Phuket hè avà "solu un sognu", andemu à circà i biglietti da San Petruburgu à Kazan.

Si assume chì avete digià un contu AWS. Vogliu attirà immediatamente una attenzione particulari à u fattu chì Kinesis è l'inviu di notificazioni via SMS ùn sò micca inclusi in l'annu. Free Tier (usu gratuitu). Ma ancu questu, cù un paru di dollari in mente, hè abbastanza pussibule di custruisce u sistema prupostu è ghjucà cun ellu. E, sicuru, ùn vi scurdate di sguassà tutte e risorse dopu chì ùn sò più necessarii.

Fortunatamente, e funzioni DynamoDb è lambda seranu libere per noi se scuntrate i limiti gratuiti mensili. Per esempiu, per DynamoDB: 25 GB di almacenamiento, 25 WCU/RCU è 100 milioni di dumande. È un milione di chjama di funzione lambda per mese.

Implementazione manuale di u sistema

Configurazione di Kinesis Data Streams

Andemu à u serviziu Kinesis Data Streams è creanu dui novi flussi, un shard per ognunu.

Cosa hè un shard?
Un shard hè l'unità di trasferimentu di dati di basa di un flussu Amazon Kinesis. Un segmentu furnisce trasferimentu di dati di input à una velocità di 1 MB/s è trasferimentu di dati di output à una velocità di 2 MB/s. Un segmentu supporta finu à 1000 entrate PUT per seconda. Quandu crea un flussu di dati, avete bisognu di specificà u numeru necessariu di segmenti. Per esempiu, pudete creà un flussu di dati cù dui segmenti. Stu flussu di dati furnisce u trasferimentu di dati di input à 2 MB/s è u trasferimentu di dati di output à 4 MB/s, supportendu finu à 2000 record PUT per seconda.

Quantu più frammenti in u vostru flussu, più grande u so throughput. In principiu, questu hè cumu i flussi sò scalati - aghjunghjendu frammenti. Ma più frammenti avete, più altu hè u prezzu. Ogni shard costa 1,5 centesimi per ora è 1.4 centesimi supplementari per ogni milione di unità di carichi PUT.

Creemu un novu flussu cù u nome biglietti aerei, 1 shard serà abbastanza per ellu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Avà criemu un altru filu cù u nome Special_stream:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Configurazione di u pruduttore

Per analizà un compitu, hè abbastanza à utilizà una istanza EC2 regulare cum'è pruduttore di dati. Ùn deve esse micca una macchina virtuale putente è caru; un spot t2.micro farà bè.

Nota impurtante: per esempiu, duvete aduprà l'imagine - Amazon Linux AMI 2018.03.0, hà menu paràmetri per lancià rapidamente l'Agente Kinesis.

Andà à u serviziu EC2, crea una nova macchina virtuale, selezziunate l'AMI desiderata cù u tipu t2.micro, chì hè inclusu in u Tier Gratuitu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Per chì a nova macchina virtuale creata possa interagisce cù u serviziu Kinesis, deve esse datu i diritti per fà. U megliu modu per fà questu hè di assignà un Role IAM. Per quessa, nantu à u Passu 3: Configurate l'Instance Details screen, duvete selezziunate Crea un novu rolu IAM:

Creazione di un rolu IAM per EC2
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
In a finestra chì si apre, selezziunate chì creemu un novu rolu per EC2 è andate à a sezione Permissions:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Utilizendu l'esempiu di furmazione, ùn avemu micca da andà in tutte l'intricacies di a cunfigurazione granulare di i diritti di risorse, cusì selezziunà e pulitiche pre-configurate da Amazon: AmazonKinesisFullAccess è CloudWatchFullAccess.

Demu un nome significativu per stu rolu, per esempiu: EC2-KinesisStreams-FullAccess. U risultatu deve esse u listessu cum'è mostra in a stampa sottu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Dopu avè creatu stu novu rolu, ùn vi scurdate di attaccà à l'istanza di a macchina virtuale creata:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Ùn cambiamu nunda d'altru nantu à sta schermu è andemu à a prossima finestra.

I paràmetri di u discu duru pò esse lasciatu in modu predeterminatu, è ancu i tags (ancu se hè una bona pratica per aduprà tag, almenu dà à l'istanza un nome è indicà l'ambiente).

Avà simu nantu à u Passu 6: Configurate a tabulazione Gruppu di Sicurezza, induve avete bisognu di creà un novu o specificà u vostru gruppu di Sicurezza esistente, chì vi permette di cunnette via ssh (port 22) à l'istanza. Selezziunà Fonte -> U mo IP quì è pudete lancià l'istanza.

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Appena si passa à u statu di esecuzione, pudete pruvà à cunnette vi via ssh.

Per pudè travaglià cù Kinesis Agent, dopu avè cunnessu bè cù a macchina, duvete inserisce i seguenti cumandamenti in u terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Creemu un cartulare per salvà e risposte API:

sudo mkdir /var/log/airline_tickets

Prima di inizià l'agente, avete bisognu di cunfigurà a so cunfigurazione:

sudo vim /etc/aws-kinesis/agent.json

U cuntenutu di u schedariu agent.json deve esse cusì:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Comu pò esse vistu da u schedariu di cunfigurazione, l'agente monitorerà i schedari cù l'estensione .log in u cartulare /var/log/airline_tickets/, analizà è i trasfiriu à u flussu airline_tickets.

Riavviamu u serviziu è assicuratemu chì hè in funzione:

sudo service aws-kinesis-agent restart

Avà scarichemu u script Python chì richiederà dati da l'API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

U script api_caller.py dumanda dati da Aviasales è salva a risposta ricevuta in u cartulare chì l'agente Kinesis scans. L'implementazione di stu script hè abbastanza standard, ci hè una classe TicketsApi, vi permette di tirà asincronamente l'API. Passemu un header cù un token è dumandemu parametri à sta classa:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Per pruvà i paràmetri curretti è e funziunalità di l'agente, pruvemu u script api_caller.py:

sudo ./api_caller.py TOKEN

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
E fighjemu u risultatu di u travagliu in i logs di l'Agent è in a tabulazione Monitoring in u flussu di dati airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Comu pudete vede, tuttu u travagliu è l'Agente Kinesis manda cù successu dati à u flussu. Avà cunfiguremu u cunsumadore.

Configurazione di Kinesis Data Analytics

Passemu à u cumpunente centrale di tuttu u sistema - crea una nova applicazione in Kinesis Data Analytics chjamatu kinesis_analytics_airlines_app:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Kinesis Data Analytics permette di realizà analitiche di dati in tempu reale da Kinesis Streams utilizendu a lingua SQL. Hè un serviziu cumpletamente autoscaling (cuntrariu di Kinesis Streams) chì:

  1. permette di creà novi flussi (Output Stream) basatu nantu à e dumande à a fonte di dati;
  2. furnisce un flussu cù errori chì sò accaduti mentre l'applicazioni eranu in esecuzione (Error Stream);
  3. pò determinà automaticamente u schema di dati di input (pò esse ridefinitu manualmente se ne necessariu).

Questu ùn hè micca un serviziu prezzu - 0.11 USD per ora di travagliu, cusì duvete aduprà cù cura è sguassate quandu avete finitu.

Cunnetteremu l'applicazione à a fonte di dati:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Selezziunate u flussu à quale avemu da cunnette (airline_tickets):

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Dopu, avete bisognu di aghjunghje un novu Role IAM per chì l'applicazione pò leghje da u flussu è scrive à u flussu. Per fà questu, hè abbastanza per ùn cambià nunda in u bloccu di permessi di Accessu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Avà dumandemu a scuperta di u schema di dati in u flussu; per fà questu, cliccate nantu à u buttone "Scopri schema". In u risultatu, u rolu IAM serà aghjurnatu (un novu serà creatu) è a deteczione di schema serà lanciata da i dati chì sò digià ghjunti in u flussu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Avà avete bisognu à andà à l'editore SQL. Quandu cliccate nant'à stu buttone, apparirà una finestra chì vi dumanda di lancià l'applicazione - selezziunate ciò chì vulete lancià:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Inserite a seguente dumanda simplice in a finestra di l'editor SQL è cliccate Salvà è Eseguite SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In e basa di dati relazionale, travagliate cù tavule chì utilizanu dichjarazioni INSERT per aghjunghje registri è una dichjarazione SELECT per dumandà dati. In Amazon Kinesis Data Analytics, travagliate cù flussi (STREAM) è pumps (PUMPs) - richieste di inserimentu cuntinuu chì inserisce dati da un flussu in una applicazione in un altru flussu.

A dumanda SQL presentata sopra cerca i biglietti Aeroflot à un costu sottu à cinque mila rubli. Tutti i registri chì rispondenu à sti cundizioni seranu posti in u flussu DESTINATION_SQL_STREAM.

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
In u bloccu di Destinazione, selezziunate u flussu speciale_stream, è in u nome di u flussu in l'applicazione DESTINATION_SQL_STREAM lista drop-down:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
U risultatu di tutte e manipulazioni deve esse qualcosa simili à a figura sottu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Creazione è abbunamentu à un tema SNS

Andà à u Serviziu di Notificazione Simple è crea un novu tema quì cù u nome Airlines:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Abbonate à stu tema è indicà u numeru di telefuninu à quale seranu mandate e notificazioni SMS:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Crea una tabella in DynamoDB

Per almacenà e dati crudi da u so flussu airline_tickets, creemu una tavola in DynamoDB cù u listessu nome. Useremu record_id cum'è chjave primaria:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Creazione di un cullettore di funzioni lambda

Creemu una funzione lambda chjamata Collector, chì u so compitu serà di sondaghju u flussu di airline_tickets è, se ci sò novi registri, inserisci questi registri in a tavola DynamoDB. Ovviamente, in più di i diritti predeterminati, sta lambda deve avè l'accessu di lettura à u flussu di dati Kinesis è l'accessu di scrittura à DynamoDB.

Creazione di un rolu IAM per a funzione lambda di u cullettore
Prima, creemu un novu rolu IAM per a lambda chjamata Lambda-TicketsProcessingRole:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Per l'esempiu di prova, i pulitiche AmazonKinesisReadOnlyAccess è AmazonDynamoDBFullAccess pre-configurati sò abbastanza adattati, cum'è mostra in a stampa sottu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Questa lambda deve esse lanciata da un trigger da Kinesis quandu e novi entrate entranu in airline_stream, cusì avemu bisognu di aghjunghje un novu trigger:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Tuttu ciò chì resta hè di inserisce u codice è salvà a lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creazione di un notificatore di funzione lambda

A seconda funzione lambda, chì monitorerà u sicondu flussu (special_stream) è mandà una notificazione à SNS, hè creata in modu simili. Per quessa, sta lambda deve avè accessu à leghje da Kinesis è mandà missaghji à un determinatu tema SNS, chì poi sarà mandatu da u serviziu SNS à tutti l'abbonati di stu tema (email, SMS, etc.).

Creazione di un rolu IAM
Prima, creemu u rolu IAM Lambda-KinesisAlarm per questa lambda, è poi assignà stu rolu à u alarm_notifier lambda chì hè creatu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

Questa lambda duveria travaglià nantu à un trigger per novi registri per entra in u special_stream, cusì avete bisognu di cunfigurà u trigger in a listessa manera chì avemu fattu per u Collector lambda.

Per fà più faciule per cunfigurà sta lambda, introducemu una nova variabile d'ambiente - TOPIC_ARN, induve situemu l'ANR (Amazon Recourse Names) di u tema Airlines:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
È inserisci u codice lambda, ùn hè micca cumplicatu à tutti:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Sembra chì questu hè induve a cunfigurazione manuale di u sistema hè cumpletata. Tuttu ciò chì resta hè di pruvà è assicuratevi chì avemu cunfiguratu tuttu bè.

Implementa da u codice Terraform

Preparazione necessaria

Terraform hè un strumentu open-source assai convenientu per implementà l'infrastruttura da u codice. Havi a so propria sintassi chì hè faciule d'amparà è hà assai esempi di cumu è ciò chì implementà. L'editore Atom o Visual Studio Code hà assai plugins utili chì facenu u travagliu cù Terraform più faciule.

Pudete scaricà a distribuzione da quì. Un analisi detallatu di tutte e capacità di Terraform hè fora di u scopu di stu articulu, cusì ci limiteremu à i punti principali.

Cumu principià

U codice sanu di u prugettu hè in u mo repository. Clonemu u repository per noi stessi. Prima di inizià, avete bisognu di assicurà chì avete AWS CLI installatu è cunfiguratu, perchè ... Terraform cercà e credenziali in u schedariu ~/.aws/credentials.

Una bona pratica hè di eseguisce u cumandamentu di u pianu prima di implementà tutta l'infrastruttura per vede ciò chì Terraform crea attualmente per noi in u nuvulu:

terraform.exe plan

Vi sarà dumandatu à inserisce un numeru di telefunu per mandà notificazioni. Ùn hè micca necessariu di entre in questu stadiu.

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Dopu avè analizatu u pianu di operazione di u prugramma, pudemu cumincià à creà risorse:

terraform.exe apply

Dopu avè mandatu stu cumandamentu, vi sarà di novu dumandatu à inserisce un numeru di telefunu; marcate "sì" quandu una dumanda nantu à realizà l'azzioni hè mostrata. Questu permetterà di stallà tutta l'infrastruttura, eseguisce tutte a cunfigurazione necessaria di EC2, implementà e funzioni lambda, etc.

Dopu chì tutti i risorsi sò stati creati bè cù u codice Terraform, avete bisognu à andà in i dettagli di l'applicazione Kinesis Analytics (sfurtunatamente, ùn aghju micca truvatu cumu fà questu direttamente da u codice).

Lanciate l'applicazione:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Dopu questu, duvete stabilisce esplicitamente u nome di u flussu in l'applicazione selezziunendu da a lista a tendina:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Avà tuttu hè prontu per andà.

Pruvate l'applicazione

Indipendentemente da cumu avete implementatu u sistema, manualmente o attraversu u codice Terraform, hà da travaglià u listessu.

Entramu via SSH à a macchina virtuale EC2 induve Kinesis Agent hè stallatu è eseguite u script api_caller.py

sudo ./api_caller.py TOKEN

Tuttu ciò chì duvete fà hè aspittà un SMS à u vostru numeru:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
SMS - un messagiu arriva in u vostru telefunu in quasi 1 minutu:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless
Resta da vede s'ellu i registri sò stati salvati in a basa di dati DynamoDB per un analisi più detallatu dopu. A tabella airline_tickets cuntene circa i seguenti dati:

Integrazione API Aviasales cù Amazon Kinesis è simplicità serverless

cunchiusioni

In u cursu di u travagliu fattu, un sistema di trattamentu di dati in linea hè statu custruitu basatu annantu à Amazon Kinesis. Opzioni per aduprà l'Agente Kinesis in cunjunzione cù Kinesis Data Streams è analytics in tempu reale Kinesis Analytics utilizendu cumandamenti SQL, è ancu l'interazzione di Amazon Kinesis cù altri servizii AWS sò stati cunsiderati.

Avemu implementatu u sistema di sopra in dui maneri: un manuale abbastanza longu è un rapidu da u codice Terraform.

Tuttu u codice fonte di u prughjettu hè dispunibule in u mo repository GitHub, Vi cunsigliu di familiarizàvi cun ellu.

Sò cuntentu di discutiri l'articulu, aghju aspittatu i vostri cumenti. Spergu una critica constructiva.

Ju ti vògliu successu!

Source: www.habr.com

Add a comment