Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Ehi Habr!

Ti piacciono gli aerei in volo? Lo adoro, ma durante l'autoisolamento mi sono anche innamorato dell'analisi dei dati sui biglietti aerei da una nota risorsa: Aviasales.

Oggi analizzeremo il lavoro di Amazon Kinesis, costruiremo un sistema di streaming con analisi in tempo reale, installeremo il database NoSQL di Amazon DynamoDB come archivio dati principale e configureremo notifiche SMS per ticket interessanti.

Tutti i dettagli sono sotto il taglio! Andare!

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Introduzione

Per l'esempio, abbiamo bisogno dell'accesso a API Aviasales. L'accesso è gratuito e senza restrizioni; basta registrarsi nella sezione “Sviluppatori” per ricevere il token API per accedere ai dati.

Lo scopo principale di questo articolo è quello di fornire una comprensione generale dell'utilizzo dello streaming di informazioni in AWS; teniamo conto che i dati restituiti dall'API utilizzata non sono strettamente aggiornati e vengono trasmessi dalla cache, che è formato sulla base delle ricerche effettuate dagli utenti dei siti Aviasales.ru e Jetradar.com nelle ultime 48 ore.

L'agente Kinesis, installato sulla macchina di produzione, ricevuto tramite l'API analizzerà e trasmetterà automaticamente i dati al flusso desiderato tramite Kinesis Data Analytics. La versione grezza di questo flusso verrà scritta direttamente nello store. Lo storage dei dati grezzi distribuito in DynamoDB consentirà un'analisi più approfondita dei ticket tramite strumenti di BI, come AWS Quick Sight.

Considereremo due opzioni per la distribuzione dell'intera infrastruttura:

  • Manuale: tramite Console di gestione AWS;
  • L'infrastruttura del codice Terraform è per automatismi pigri;

Architettura del sistema sviluppato

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Componenti utilizzati:

  • API Aviasales — i dati restituiti da questa API verranno utilizzati per tutto il lavoro successivo;
  • Istanza del produttore EC2 — una normale macchina virtuale nel cloud su cui verrà generato il flusso di dati di input:
    • Agente cinetico è un'applicazione Java installata localmente sulla macchina che fornisce un modo semplice per raccogliere e inviare dati a Kinesis (Kinesis Data Streams o Kinesis Firehose). L'agente monitora costantemente un insieme di file nelle directory specificate e invia nuovi dati a Kinesis;
    • Script chiamante API — Uno script Python che effettua richieste all'API e inserisce la risposta in una cartella monitorata dall'agente Kinesis;
  • Flussi di dati Kinesis — servizio di streaming di dati in tempo reale con ampie capacità di scalabilità;
  • Kinesis Analytics è un servizio serverless che semplifica l'analisi dei dati in streaming in tempo reale. Amazon Kinesis Data Analytics configura le risorse dell'applicazione e si dimensiona automaticamente per gestire qualsiasi volume di dati in entrata;
  • AWS Lambda — un servizio che consente di eseguire codice senza eseguire il backup o configurare i server. Tutta la potenza di calcolo viene ridimensionata automaticamente per ogni chiamata;
  • Amazon DynamoDB - Un database di coppie chiave-valore e documenti che fornisce una latenza inferiore a 10 millisecondi durante l'esecuzione su qualsiasi scala. Quando utilizzi DynamoDB, non è necessario effettuare il provisioning, applicare patch o gestire alcun server. DynamoDB ridimensiona automaticamente le tabelle per regolare la quantità di risorse disponibili e mantenere prestazioni elevate. Non è richiesta alcuna amministrazione del sistema;
  • AmazonSNS - un servizio completamente gestito per l'invio di messaggi secondo il modello editore-abbonato (Pub/Sub), con il quale è possibile isolare microservizi, sistemi distribuiti e applicazioni serverless. SNS può essere utilizzato per inviare informazioni agli utenti finali tramite notifiche push mobili, messaggi SMS ed e-mail.

Formazione iniziale

Per emulare il flusso di dati, ho deciso di utilizzare le informazioni sui biglietti aerei restituite dall'API Aviasales. IN documentazione un elenco piuttosto ampio di metodi diversi, prendiamone uno: "Calendario mensile dei prezzi", che restituisce i prezzi per ogni giorno del mese, raggruppati in base al numero di trasferimenti. Se nella richiesta non si specifica il mese di ricerca, verranno restituite le informazioni relative al mese successivo a quello corrente.

Quindi, registriamoci e otteniamo il nostro token.

Di seguito è riportato un esempio di richiesta:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Il metodo sopra descritto per ricevere dati dall'API specificando un token nella richiesta funzionerà, ma preferisco passare il token di accesso attraverso l'intestazione, quindi utilizzeremo questo metodo nello script api_caller.py.

Esempio di risposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

La risposta API di esempio qui sopra mostra un biglietto da San Pietroburgo a Phuk... Oh, che sogno...
Dato che sono di Kazan e Phuket ormai è “solo un sogno”, cerchiamo i biglietti da San Pietroburgo a Kazan.

Si presuppone che tu abbia già un account AWS. Vorrei subito attirare l'attenzione sul fatto che Kinesis e l'invio di notifiche tramite SMS non sono inclusi nell'annuale Livello gratuito (uso gratuito). Ma nonostante ciò, con un paio di dollari in mente, è del tutto possibile costruire il sistema proposto e giocarci. E, naturalmente, non dimenticare di eliminare tutte le risorse quando non sono più necessarie.

Fortunatamente, le funzioni DynamoDb e lambda saranno gratuite per noi se raggiungiamo i nostri limiti mensili gratuiti. Ad esempio, per DynamoDB: 25 GB di storage, 25 WCU/RCU e 100 milioni di query. E un milione di chiamate di funzioni lambda al mese.

Distribuzione manuale del sistema

Configurazione di Kinesis Data Streams

Andiamo al servizio Kinesis Data Streams e creiamo due nuovi flussi, uno shard per ciascuno.

Cos'è un frammento?
Uno shard è l'unità di trasferimento dati di base di un flusso Amazon Kinesis. Un segmento fornisce il trasferimento dei dati in ingresso ad una velocità di 1 MB/s e il trasferimento dei dati in uscita ad una velocità di 2 MB/s. Un segmento supporta fino a 1000 voci PUT al secondo. Quando si crea un flusso di dati, è necessario specificare il numero richiesto di segmenti. Ad esempio, puoi creare uno stream di dati con due segmenti. Questo flusso di dati fornirà il trasferimento dei dati in ingresso a 2 MB/s e il trasferimento dei dati in uscita a 4 MB/s, supportando fino a 2000 record PUT al secondo.

Maggiore è il numero di shard nel tuo flusso, maggiore sarà il suo throughput. In linea di principio, questo è il modo in cui i flussi vengono ridimensionati: aggiungendo frammenti. Ma più frammenti hai, più alto sarà il prezzo. Ogni frammento costa 1,5 centesimi all'ora e ulteriori 1.4 centesimi per ogni milione di unità di carico utile PUT.

Creiamo un nuovo stream con il nome biglietti aerei, gli basterà 1 frammento:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora creiamo un altro thread con il nome flusso_speciale:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Configurazione del produttore

Per analizzare un'attività è sufficiente utilizzare una normale istanza EC2 come produttore di dati. Non deve essere una macchina virtuale potente e costosa; un t2.micro spot andrà benissimo.

Nota importante: ad esempio, dovresti utilizzare l'immagine: Amazon Linux AMI 2018.03.0, ha meno impostazioni per l'avvio rapido dell'agente Kinesis.

Vai al servizio EC2, crea una nuova macchina virtuale, seleziona l'AMI desiderata di tipo t2.micro, che è inclusa nel Piano Gratuito:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Affinché la macchina virtuale appena creata possa interagire con il servizio Kinesis, è necessario concederle i diritti per farlo. Il modo migliore per farlo è assegnare un ruolo IAM. Pertanto, nella schermata Passaggio 3: Configura dettagli istanza, è necessario selezionare Crea un nuovo ruolo IAM:

Creazione di un ruolo IAM per EC2
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nella finestra che si apre, seleziona che stiamo creando un nuovo ruolo per EC2 e vai alla sezione Autorizzazioni:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Utilizzando l'esempio di formazione, non dobbiamo addentrarci in tutte le complessità della configurazione granulare dei diritti sulle risorse, quindi selezioneremo le policy preconfigurate da Amazon: AmazonKinesisFullAccess e CloudWatchFullAccess.

Diamo un nome significativo a questo ruolo, ad esempio: EC2-KinesisStreams-FullAccess. Il risultato dovrebbe essere lo stesso mostrato nell'immagine qui sotto:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Dopo aver creato questo nuovo ruolo, non dimenticare di allegarlo all'istanza della macchina virtuale creata:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non cambiamo nient'altro in questa schermata e passiamo alle finestre successive.

Le impostazioni del disco rigido possono essere lasciate come predefinite, così come i tag (anche se è buona pratica utilizzare i tag, almeno dare un nome all'istanza e indicare l'ambiente).

Ora siamo al Passaggio 6: scheda Configura gruppo di sicurezza, dove devi crearne uno nuovo o specificare il gruppo di sicurezza esistente, che ti consente di connetterti tramite ssh (porta 22) all'istanza. Seleziona Sorgente -> Il mio IP lì e puoi avviare l'istanza.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non appena passa allo stato di esecuzione, puoi provare a connetterti tramite ssh.

Per poter lavorare con Kinesis Agent, dopo essersi connessi con successo alla macchina, è necessario inserire i seguenti comandi nel terminale:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Creiamo una cartella per salvare le risposte API:

sudo mkdir /var/log/airline_tickets

Prima di avviare l'agente, è necessario configurarne la configurazione:

sudo vim /etc/aws-kinesis/agent.json

Il contenuto del file agent.json dovrebbe assomigliare a questo:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Come si può vedere dal file di configurazione, l'agente monitorerà i file con estensione .log nella directory /var/log/airline_tickets/, li analizzerà e li trasferirà nello stream Airlines_tickets.

Riavviamo il servizio e ci assicuriamo che sia attivo e funzionante:

sudo service aws-kinesis-agent restart

Ora scarichiamo lo script Python che richiederà i dati dall'API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Lo script api_caller.py richiede i dati da Aviasales e salva la risposta ricevuta nella directory scansionata dall'agente Kinesis. L'implementazione di questo script è abbastanza standard, esiste una classe TicketsApi, che consente di estrarre l'API in modo asincrono. Passiamo un'intestazione con un token e richiediamo i parametri a questa classe:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Per testare le corrette impostazioni e funzionalità dell'agente, eseguiamo il test dello script api_caller.py:

sudo ./api_caller.py TOKEN

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
E guardiamo il risultato del lavoro nei log dell'agente e nella scheda Monitoraggio nel flusso di dati Airlines_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Come puoi vedere, tutto funziona e Kinesis Agent invia correttamente i dati al flusso. Ora configuriamo consumer.

Configurazione di Kinesis Data Analytics

Passiamo al componente centrale dell'intero sistema: crea una nuova applicazione in Kinesis Data Analytics denominata kinesis_analytics_airlines_app:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Kinesis Data Analytics ti consente di eseguire analisi dei dati in tempo reale da Kinesis Streams utilizzando il linguaggio SQL. È un servizio completamente scalabile (a differenza di Kinesis Streams) che:

  1. permette di creare nuovi flussi (Output Stream) in base alle richieste di origine dati;
  2. fornisce un flusso con gli errori che si sono verificati durante l'esecuzione delle applicazioni (Error Stream);
  3. può determinare automaticamente lo schema dei dati di input (può essere ridefinito manualmente se necessario).

Questo non è un servizio economico: 0.11 USD per ora di lavoro, quindi dovresti usarlo con attenzione ed eliminarlo quando hai finito.

Colleghiamo l'applicazione all'origine dati:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Seleziona lo streaming a cui ci collegheremo (airline_tickets):

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Successivamente, devi collegare un nuovo ruolo IAM in modo che l'applicazione possa leggere dal flusso e scrivere nel flusso. Per fare ciò è sufficiente non modificare nulla nel blocco Permessi di accesso:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Richiediamo ora il discovery dello schema dati presente nello stream; per farlo clicchiamo sul pulsante “Discover schema”. Di conseguenza, il ruolo IAM verrà aggiornato (creato) e il rilevamento dello schema verrà avviato dai dati già arrivati ​​nel flusso:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora devi andare all'editor SQL. Quando fai clic su questo pulsante, verrà visualizzata una finestra che ti chiede di avviare l'applicazione: seleziona ciò che desideri avviare:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Inserisci la seguente semplice query nella finestra dell'editor SQL e fai clic su Salva ed esegui SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Nei database relazionali, lavori con le tabelle utilizzando le istruzioni INSERT per aggiungere record e un'istruzione SELECT per eseguire query sui dati. In Amazon Kinesis Data Analytics, lavori con flussi (STREAM) e pompe (PUMP): richieste di inserimento continue che inseriscono dati da un flusso in un'applicazione in un altro flusso.

La query SQL presentata sopra cerca i biglietti Aeroflot con un costo inferiore a cinquemila rubli. Tutti i record che soddisfano queste condizioni verranno inseriti nel flusso DESTINATION_SQL_STREAM.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nel blocco Destinazione, seleziona il flusso special_stream e nell'elenco a discesa Nome flusso in-applicazione DESTINATION_SQL_STREAM:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Il risultato di tutte le manipolazioni dovrebbe essere qualcosa di simile all'immagine qui sotto:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Creazione e sottoscrizione a un argomento SNS

Vai al Servizio di notifica semplice e crea lì un nuovo argomento con il nome Compagnie aeree:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Iscriviti a questo argomento e indica il numero di cellulare a cui verranno inviate le notifiche SMS:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Crea una tabella in DynamoDB

Per archiviare i dati grezzi dal flusso Airlines_tickets, creiamo una tabella in DynamoDB con lo stesso nome. Utilizzeremo record_id come chiave primaria:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Creazione di un raccoglitore di funzioni lambda

Creiamo una funzione lambda chiamata Collector, il cui compito sarà quello di eseguire il polling del flusso Airlines_tickets e, se vengono trovati nuovi record, inserirli nella tabella DynamoDB. Ovviamente, oltre ai diritti predefiniti, questa lambda deve avere accesso in lettura al flusso di dati Kinesis e accesso in scrittura a DynamoDB.

Creazione di un ruolo IAM per la funzione lambda del raccoglitore
Innanzitutto, creiamo un nuovo ruolo IAM per la lambda denominato Lambda-TicketsProcessingRole:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Per l'esempio di test, le policy preconfigurate AmazonKinesisReadOnlyAccess e AmazonDynamoDBFullAccess sono abbastanza adatte, come mostrato nella figura seguente:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Questo lambda dovrebbe essere avviato da un trigger di Kinesis quando nuove voci entrano in Airlines_stream, quindi dobbiamo aggiungere un nuovo trigger:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non resta che inserire il codice e salvare la lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creazione di un notificatore di funzione lambda

La seconda funzione lambda, che monitorerà il secondo flusso (special_stream) e invierà una notifica a SNS, viene creata in modo simile. Pertanto, questo lambda deve avere accesso in lettura da Kinesis e inviare messaggi a un determinato argomento SNS, che verranno poi inviati dal servizio SNS a tutti gli abbonati di questo argomento (e-mail, SMS, ecc.).

Creazione di un ruolo IAM
Per prima cosa creiamo il ruolo IAM Lambda-KinesisAlarm per questa lambda, quindi assegniamo questo ruolo alla lambda alert_notifier in fase di creazione:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Questa lambda dovrebbe funzionare su un trigger affinché i nuovi record entrino in special_stream, quindi è necessario configurare il trigger nello stesso modo in cui abbiamo fatto per la lambda Collector.

Per semplificare la configurazione di questo lambda, introduciamo una nuova variabile di ambiente - TOPIC_ARN, dove inseriamo l'ANR (Amazon Recourse Names) dell'argomento Airlines:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
E inserisci il codice lambda, non è affatto complicato:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Sembra che sia qui che viene completata la configurazione manuale del sistema. Non resta che testare e assicurarsi di aver configurato tutto correttamente.

Distribuisci dal codice Terraform

Preparazione necessaria

Terraform è uno strumento open source molto conveniente per la distribuzione dell'infrastruttura dal codice. Ha una propria sintassi facile da imparare e contiene molti esempi di come e cosa distribuire. L'editor Atom o Visual Studio Code dispone di numerosi plug-in utili che semplificano il lavoro con Terraform.

Puoi scaricare la distribuzione quindi. Un'analisi dettagliata di tutte le funzionalità di Terraform va oltre lo scopo di questo articolo, quindi ci limiteremo ai punti principali.

Come correre

Il codice completo del progetto è nel mio archivio. Cloniamo il repository per noi stessi. Prima di iniziare, devi assicurarti di avere AWS CLI installato e configurato, perché... Terraform cercherà le credenziali nel file ~/.aws/credentials.

Una buona pratica è eseguire il comando plan prima di distribuire l'intera infrastruttura per vedere cosa Terraform sta attualmente creando per noi nel cloud:

terraform.exe plan

Ti verrà richiesto di inserire un numero di telefono a cui inviare le notifiche. Non è necessario inserirlo in questa fase.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Dopo aver analizzato il piano operativo del programma, possiamo iniziare a creare risorse:

terraform.exe apply

Dopo aver inviato questo comando, ti verrà nuovamente chiesto di inserire un numero di telefono; componi "sì" quando viene visualizzata una domanda sull'effettiva esecuzione delle azioni. Ciò ti consentirà di impostare l'intera infrastruttura, eseguire tutta la configurazione necessaria di EC2, implementare funzioni lambda, ecc.

Dopo che tutte le risorse sono state create con successo tramite il codice Terraform, è necessario entrare nei dettagli dell'applicazione Kinesis Analytics (purtroppo non ho trovato come farlo direttamente dal codice).

Avvia l'applicazione:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Successivamente, è necessario impostare esplicitamente il nome del flusso nell'applicazione selezionando dall'elenco a discesa:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora tutto è pronto per partire.

Testare l'applicazione

Indipendentemente da come hai distribuito il sistema, manualmente o tramite il codice Terraform, funzionerà allo stesso modo.

Effettuiamo l'accesso tramite SSH alla macchina virtuale EC2 su cui è installato Kinesis Agent ed eseguiamo lo script api_caller.py

sudo ./api_caller.py TOKEN

Non devi fare altro che attendere un SMS al tuo numero:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
SMS - il messaggio arriva al telefono in quasi 1 minuto:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Resta da vedere se i record sono stati salvati nel database DynamoDB per successive analisi più dettagliate. La tabella Airlines_tickets contiene approssimativamente i seguenti dati:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

conclusione

Nel corso del lavoro svolto è stato realizzato un sistema di elaborazione dati online basato su Amazon Kinesis. Sono state prese in considerazione le opzioni per l'utilizzo dell'agente Kinesis insieme a Kinesis Data Streams e l'analisi in tempo reale Kinesis Analytics utilizzando comandi SQL, nonché l'interazione di Amazon Kinesis con altri servizi AWS.

Abbiamo distribuito il sistema di cui sopra in due modi: uno manuale piuttosto lungo e uno veloce dal codice Terraform.

Tutto il codice sorgente del progetto è disponibile nel mio repository GitHub, ti suggerisco di familiarizzare con esso.

Sono felice di discutere l'articolo, aspetto con ansia i vostri commenti. Spero in critiche costruttive.

Vi auguro successo!

Fonte: habr.com

Aggiungi un commento