Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Ehi Habr!

Ti piace volare? Io lo adoro, ma durante l'autoisolamento mi sono anche appassionato all'analisi dei dati dei biglietti aerei tramite una fonte molto nota: Aviasales.

Oggi analizzeremo il funzionamento di Amazon Kinesis, creeremo un sistema di streaming con analisi in tempo reale, installeremo il database NoSQL Amazon DynamoDB come principale archivio dati e configureremo le notifiche SMS per i ticket interessanti.

Tutti i dettagli sotto il taglio! Andiamo!

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Introduzione

Ad esempio, avremo bisogno di accedere a API di AviasalesL'accesso è gratuito e senza restrizioni, è sufficiente registrarsi nella sezione "Sviluppatori" per ottenere il token API per accedere ai dati.

Lo scopo principale di questo articolo è fornire una comprensione generale dell'uso delle informazioni in streaming in AWS; teniamo presente che i dati restituiti dall'API utilizzata non sono strettamente aggiornati e vengono trasmessi da una cache formata in base alle ricerche degli utenti su Aviasales.ru e Jetradar.com nelle ultime 48 ore.

L'agente Kinesis installato sulla macchina del produttore analizzerà automaticamente i dati dei biglietti aerei ricevuti tramite l'API e li trasferirà al flusso richiesto tramite Kinesis Data Analytics. La versione non elaborata di questo flusso verrà scritta direttamente nello storage. L'archiviazione dei dati grezzi implementata in DynamoDB consentirà un'analisi più approfondita dei biglietti tramite strumenti di BI, come AWS Quick Sight.

Prenderemo in considerazione due opzioni per l'implementazione dell'intera infrastruttura:

  • Manuale - tramite AWS Management Console;
  • Infrastruttura di codice Terraform per ingegneri dell'automazione pigri;

Architettura del sistema sviluppato

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Componenti utilizzati:

  • API di Aviasales — i dati restituiti da questa API verranno utilizzati per tutti i lavori successivi;
  • Istanza del produttore EC2 — una normale macchina virtuale nel cloud, sulla quale verrà generato il flusso di dati di input:
    • Agente Kinesis — è un'applicazione Java installata localmente sulla macchina che fornisce un modo semplice per raccogliere e inviare dati a Kinesis (Kinesis Data Streams o Kinesis Firehose). L'agente monitora costantemente un set di file in directory specifiche e invia nuovi dati a Kinesis;
    • Script chiamante API — Uno script Python che invia richieste all'API e inserisce la risposta in una cartella monitorata da Kinesis Agent;
  • Flussi di dati Kinesis — un servizio di streaming di dati in tempo reale con ampia scalabilità;
  • Analisi cinetica — un servizio serverless che semplifica l'analisi dei dati in streaming in tempo reale. Amazon Kinesis Data Analytics configura le risorse per l'esecuzione delle applicazioni e si ridimensiona automaticamente per gestire qualsiasi volume di dati in entrata;
  • AWS Lambda — un servizio che consente di eseguire codice senza dover prenotare e configurare server. Tutta la potenza di calcolo viene automaticamente scalata a ogni chiamata;
  • Amazon DynamoDB — un database di chiavi-valori e documenti che offre una latenza inferiore a 10 millisecondi a qualsiasi scala. Con DynamoDB, non ci sono server da predisporre, applicare patch o gestire. DynamoDB scala automaticamente le tabelle in base alle risorse disponibili, mantenendo al contempo prestazioni elevate. Non è richiesta alcuna amministrazione di sistema.
  • AmazonSNS — un servizio di messaggistica Pub/Sub completamente gestito che può essere utilizzato per isolare microservizi, sistemi distribuiti e applicazioni serverless. SNS può essere utilizzato per distribuire informazioni agli utenti finali tramite notifiche push mobili, SMS ed e-mail.

Formazione iniziale

Per emulare il flusso di dati, ho deciso di utilizzare le informazioni sui biglietti aerei restituite dall'API Aviasales. documentazione Tra i metodi disponibili, ne prendiamo in considerazione uno piuttosto lungo: "Calendario prezzi per un mese", che restituisce i prezzi per ogni giorno del mese, raggruppati in base al numero di trasferimenti. Se non si specifica il mese di ricerca nella richiesta, verranno restituite le informazioni per il mese successivo a quello corrente.

Quindi, registriamoci e riceviamo il nostro token.

Di seguito un esempio di richiesta:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Il metodo sopra descritto per ottenere dati dall'API specificando il token nella richiesta funzionerà, ma preferisco passare il token di accesso tramite l'intestazione, quindi nello script api_caller.py useremo questo metodo.

Esempio di risposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L'esempio di risposta API qui sopra mostra un biglietto da San Pietroburgo a Phuk... Oh, che senso ha sognare...
Dato che sono di Kazan e Phuket ormai è "solo un sogno per noi", cerchiamo i biglietti da San Pietroburgo a Kazan.

Si presume che tu abbia già un account AWS. Vorrei attirare la tua attenzione sul fatto che Kinesis e l'invio di notifiche tramite SMS non sono inclusi nel piano annuale. Livello gratuito (gratuito)Ma nonostante questo, avendo a disposizione un paio di dollari, è possibile costruire il sistema proposto e sperimentarlo. E, naturalmente, non dimenticare di eliminare tutte le risorse quando non sono più necessarie.

Fortunatamente, DynamoDB e le funzioni lambda saranno gratuite a determinate condizioni se rispettiamo i limiti mensili gratuiti. Ad esempio, per DynamoDB: 25 GB di spazio di archiviazione, 25 WCU/RCU e 100 milioni di query. E un milione di chiamate alle funzioni lambda al mese.

Distribuzione manuale del sistema

Impostazione dei flussi di dati Kinesis

Andiamo al servizio Kinesis Data Streams e creiamo due nuovi flussi, uno shard per ciascuno.

Cos'è uno shard?
Uno shard è l'unità di base per il trasferimento dati in un flusso di dati Amazon Kinesis. Uno shard fornisce 1 MB/sec di dati in input e 2 MB/sec di dati in output. Uno shard supporta fino a 1000 record PUT al secondo. Quando si crea un flusso di dati, è necessario specificare il numero di shard desiderati. Ad esempio, è possibile creare un flusso di dati con due shard. Questo flusso di dati fornirà 2 MB/sec di dati in input e 4 MB/sec di dati in output, supportando fino a 2000 record PUT al secondo.

Più shard ci sono nel tuo stream, maggiore è la sua capacità di elaborazione. In linea di principio, è così che gli stream scalano: aggiungendo shard. Ma più shard hai, più alto è il prezzo. Ogni shard costa 1,5 centesimi all'ora e 1.4 centesimi aggiuntivi per ogni milione di operazioni di aggiunta allo stream (unità di payload PUT).

Creiamo un nuovo thread con il nome biglietti_aerei, gli basterà 1 frammento:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora creiamo un altro thread denominato flusso speciale:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Impostazione del produttore

Come produttore di dati per l'analisi delle attività, è sufficiente utilizzare una normale istanza EC2. Non è necessario che si tratti di una macchina virtuale potente e costosa: una istanza t2.micro spot andrà benissimo.

Важное замечание: для примера следует использовать image — Amazon Linux AMI 2018.03.0, с ним меньше настроек для быстрого запуска Kinesis Agent.

Andiamo al servizio EC2, creiamo una nuova macchina virtuale, selezioniamo l'AMI richiesta con il tipo t2.micro, che è incluso nel livello gratuito:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Affinché la macchina virtuale appena creata possa interagire con il servizio Kinesis, è necessario assegnarle l'autorizzazione necessaria. Il modo migliore per farlo è assegnare un ruolo IAM. Pertanto, nella schermata "Passaggio 3: Configura dettagli istanza", è necessario selezionare Crea un nuovo ruolo IAM:

Crea ruolo IAM per EC2
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nella finestra che si apre, seleziona che stiamo creando un nuovo ruolo per EC2 e vai alla sezione Autorizzazioni:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nell'esempio, non è necessario entrare nei dettagli delle impostazioni granulari dei diritti sulle risorse, quindi sceglieremo le policy preconfigurate di Amazon: AmazonKinesisFullAccess e CloudWatchFullAccess.

Diamo a questo ruolo un nome significativo, ad esempio: EC2-KinesisStreams-FullAccess. Il risultato dovrebbe essere quello mostrato nell'immagine seguente:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Dopo aver creato questo nuovo ruolo, non dimenticare di associarlo all'istanza della macchina virtuale che stai creando:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non modifichiamo nient'altro in questa schermata e passiamo alle finestre successive.

I parametri del disco rigido possono essere lasciati ai valori predefiniti, così come i tag (anche se è buona norma utilizzare i tag, almeno per dare un nome all'istanza e specificare l'ambiente).

Ora siamo alla scheda "Passaggio 6: Configura gruppo di sicurezza", dove è necessario crearne uno nuovo o specificarne uno esistente che consenta di connettersi all'istanza tramite SSH (porta 22). Selezionare "Sorgente" -> "Il mio IP" e avviare l'istanza.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non appena passa allo stato di esecuzione, puoi provare a connetterti tramite ssh.

Per poter lavorare con Kinesis Agent, dopo aver effettuato correttamente la connessione alla macchina, è necessario immettere i seguenti comandi nel terminale:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Creiamo una cartella in cui salvare le risposte API:

sudo mkdir /var/log/airline_tickets

Prima di avviare l'agente, è necessario configurarne la configurazione:

sudo vim /etc/aws-kinesis/agent.json

Il contenuto del file agent.json dovrebbe apparire così:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Come si può vedere dal file di configurazione, l'agente monitorerà i file con estensione .log nella directory /var/log/airline_tickets/, li analizzerà e li passerà al flusso airline_tickets.

Riavviamo il servizio e ci assicuriamo che sia avviato e funzioni:

sudo service aws-kinesis-agent restart

Ora scarichiamo uno script Python che richiederà i dati dall'API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Lo script api_caller.py richiede dati ad Aviasales e salva la risposta ricevuta nella directory scansionata dall'agente Kinesis. L'implementazione di questo script è piuttosto standard: esiste una classe TicketsApi, che consente di estrarre l'API in modo asincrono. Passiamo l'intestazione con il token e i parametri di richiesta a questa classe:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Per testare la correttezza delle impostazioni e la funzionalità dell'agente, eseguiamo un'esecuzione di prova dello script api_caller.py:

sudo ./api_caller.py TOKEN

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
E osserviamo il risultato del lavoro nei registri dell'agente e nella scheda Monitoraggio nel flusso di dati airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Come puoi vedere, tutto funziona e Kinesis Agent invia correttamente i dati allo stream. Ora configuriamo il consumer.

Impostazione di Kinesis Data Analytics

Passiamo ora al componente centrale dell'intero sistema: creiamo una nuova applicazione in Kinesis Data Analytics denominata kinesis_analytics_airlines_app:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Kinesis Data Analytics consente l'analisi in tempo reale dei dati di Kinesis Streams utilizzando SQL. Si tratta di un servizio completamente scalabile automaticamente (a differenza di Kinesis Streams) che:

  1. consente di creare nuovi flussi (Output Stream) in base alle richieste ai dati di origine;
  2. fornisce un flusso di errori verificatisi durante il funzionamento delle applicazioni (Error Stream);
  3. può rilevare automaticamente lo schema dei dati di input (può essere ridefinito manualmente se necessario).

Non si tratta di un servizio economico: costa 0.11 USD all'ora di lavoro, quindi dovresti usarlo con cautela ed eliminarlo una volta terminato.

Colleghiamo l'applicazione alla sorgente dati:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Selezioniamo il flusso a cui vogliamo connetterci (airline_tickets):

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Successivamente, è necessario associare un nuovo ruolo IAM in modo che l'applicazione possa leggere e scrivere dal flusso. Per farlo, è sufficiente lasciare invariato il blocco "Autorizzazioni di accesso":

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora richiederemo la scoperta dello schema dati nel flusso, per farlo premiamo il pulsante "Scopri schema". Di conseguenza, il ruolo IAM verrà aggiornato (ne verrà creato uno nuovo) e verrà avviata la scoperta dello schema dai dati già arrivati nel flusso:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora devi andare all'editor SQL. Cliccando su questo pulsante, apparirà una finestra che ti chiederà se vuoi eseguire l'applicazione: scegli cosa vuoi eseguire:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nella finestra dell'editor SQL, inserisci la seguente query semplice e fai clic su Salva ed esegui SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Nei database relazionali, si lavora con le tabelle utilizzando istruzioni INSERT per aggiungere record e istruzioni SELECT per interrogare i dati. In Amazon Kinesis Data Analytics, si lavora con STREAMS e PUMP, ovvero richieste di inserimento continue che spingono i dati da un flusso dell'applicazione a un altro.

La query SQL sopra cerca i biglietti Aeroflot con un prezzo inferiore a cinquemila rubli. Tutti i record che soddisfano queste condizioni verranno inseriti nel flusso DESTINATION_SQL_STREAM.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Nel blocco Destinazione, seleziona il flusso special_stream e nell'elenco a discesa Nome flusso in-applicazione, seleziona DESTINATION_SQL_STREAM:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Come risultato di tutte le manipolazioni dovresti ottenere qualcosa di simile all'immagine qui sotto:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Crea e iscriviti a un argomento SNS

Andiamo al Simple Notification Service e creiamo un nuovo argomento con il nome Airlines:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ci iscriviamo a questo argomento, in esso indichiamo il numero di cellulare a cui verranno inviate le notifiche SMS:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Creazione di una tabella in DynamoDB

Per memorizzare i dati grezzi dal flusso airline_tickets, creiamo una tabella in DynamoDB con lo stesso nome. Useremo record_id come chiave primaria:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Creazione di un collettore di funzioni lambda

Creiamo una funzione lambda chiamata Collector, il cui compito è interrogare il flusso airline_tickets e, se vengono trovati nuovi record, inserirli nella tabella DynamoDB. Ovviamente, oltre ai permessi predefiniti, questa funzione lambda deve avere accesso in lettura al flusso di dati Kinesis e accesso in scrittura a DynamoDB.

Crea ruolo IAM per la funzione lambda del collettore
Per prima cosa, creiamo un nuovo ruolo IAM per la lambda denominato Lambda-TicketsProcessingRole:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Per l'esempio di test, le policy preconfigurate AmazonKinesisReadOnlyAccess e AmazonDynamoDBFullAccess sono molto adatte, come mostrato nell'immagine seguente:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Questa lambda dovrebbe essere attivata da un trigger Kinesis quando vengono aggiunti nuovi record al flusso airline_stream, quindi dobbiamo aggiungere un nuovo trigger:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Non resta che inserire il codice e salvare la lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creazione di una funzione lambda di notifica

La seconda funzione lambda, che monitorerà il secondo flusso (special_stream) e invierà una notifica a SNS, viene creata in modo simile. Pertanto, questa lambda deve avere accesso alla lettura da Kinesis e inviare messaggi all'argomento SNS specificato, che verranno poi inviati dal servizio SNS a tutti gli abbonati a questo argomento (e-mail, SMS, ecc.).

Creazione di un ruolo IAM
Per prima cosa, creiamo il ruolo IAM Lambda-KinesisAlarm per questa lambda, quindi assegniamo questo ruolo alla lambda alarm_notifier creata:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

Questa lambda dovrebbe funzionare su un trigger quando nuovi record entrano nel flusso special_stream, quindi è necessario configurare il trigger nello stesso modo in cui abbiamo fatto per la lambda Collector.

Per semplificare la configurazione di questa lambda, introdurremo una nuova variabile d'ambiente, TOPIC_ARN, in cui inseriremo gli ANR (Amazon Recourse Names) dell'argomento Airlines:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
E inseriamo il codice lambda, non è affatto complicato:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Sembra che la configurazione manuale del sistema sia completata. Non resta che testare e accertarsi di aver impostato tutto correttamente.

Distribuisci dal codice Terraform

Preparazione necessaria

Terraform — uno strumento open source molto pratico per la distribuzione di infrastrutture da codice. Ha una propria sintassi, facile da apprendere, e molti esempi su come e cosa distribuire. L'editor Atom o Visual Studio Code offrono molti plugin utili che semplificano l'utilizzo di Terraform.

Puoi scaricare la distribuzione quindiUn'analisi dettagliata di tutte le funzionalità di Terraform esula dallo scopo di questo articolo, pertanto ci limiteremo ai punti principali.

Come correre

Il codice completo del progetto è qui nel mio archivioClona il repository per te stesso. Prima di iniziare, assicurati di aver installato e configurato AWS CLI, perché Terraform cercherà le credenziali nel file ~/.aws/credentials.

Una buona pratica è quella di eseguire il comando plan prima di distribuire l'intera infrastruttura per vedere cosa Terraform creerà per noi nel cloud:

terraform.exe plan

Ti verrà chiesto di inserire un numero di telefono a cui verranno inviate le notifiche. L'inserimento è facoltativo in questa fase.

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Dopo aver analizzato il piano di lavoro del programma, possiamo iniziare a creare risorse:

terraform.exe apply

Dopo aver inviato questo comando, ti verrà nuovamente chiesto di inserire il tuo numero di telefono; digita "Sì" quando verrà visualizzata la domanda sull'effettiva esecuzione delle azioni. Questo ti consentirà di avviare l'intera infrastruttura, eseguire tutte le impostazioni EC2 necessarie, distribuire le funzioni lambda, ecc.

Dopo aver creato correttamente tutte le risorse tramite il codice Terraform, è necessario accedere ai dettagli dell'applicazione Kinesis Analytics (purtroppo non ho trovato come farlo direttamente dal codice).

Avviare l'applicazione:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Dopodiché, è necessario impostare esplicitamente il nome del flusso nell'applicazione selezionandolo dall'elenco a discesa:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Ora tutto è pronto per funzionare.

Test dell'applicazione

Indipendentemente da come hai distribuito il sistema, manualmente o tramite il codice Terraform, funzionerà sempre allo stesso modo.

Andiamo tramite SSH alla macchina virtuale EC2 dove è installato Kinesis Agent ed eseguiamo lo script api_caller.py

sudo ./api_caller.py TOKEN

Non resta che attendere un SMS al tuo numero:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
SMS - il messaggio arriva sul telefono in circa 1 minuto:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless
Resta da vedere se i record vengono salvati nel database DynamoDB per un'analisi più approfondita. La tabella airline_tickets contiene dati più o meno come questi:

Integrazione dell'API Aviasales con Amazon Kinesis e semplicità serverless

conclusione

Nel corso del lavoro svolto, è stato sviluppato un sistema di elaborazione dati online basato su Amazon Kinesis. Sono state valutate le opzioni per l'utilizzo di Kinesis Agent in combinazione con Kinesis Data Streams e l'analisi in tempo reale di Kinesis Analytics tramite comandi SQL, nonché l'interazione di Amazon Kinesis con altri servizi AWS.

Abbiamo implementato il sistema sopra descritto in due modi: un metodo manuale piuttosto lungo e un metodo rapido basato sul codice Terraform.

L'intero codice sorgente del progetto è disponibile nel mio repository GitHub, ti consiglio di leggerlo.

Sono felice di discutere l'articolo, in attesa dei vostri commenti. Spero in critiche costruttive.

Vi auguro successo!

Fonte: habr.com

Acquista hosting affidabile per siti con protezione DDoS, server VPS VDS 🔥 Acquista un hosting web affidabile con protezione DDoS, server VPS e VDS | ProHoster