Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Hola Habr!

T'agrada volar avions? M'encanta, però durant l'autoaïllament també em vaig enamorar d'analitzar dades sobre bitllets d'avió d'un recurs conegut: Aviasales.

Avui analitzarem el treball d'Amazon Kinesis, construirem un sistema de streaming amb anàlisi en temps real, instal·larem la base de dades NoSQL d'Amazon DynamoDB com a emmagatzematge de dades principal i configurarem notificacions per SMS per a entrades interessants.

Tots els detalls estan sota el tall! Va!

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Introducció

Per exemple, necessitem accedir a API Aviasales. L'accés s'hi ofereix de manera gratuïta i sense restriccions; només cal que us registreu a la secció "Desenvolupadors" per rebre el vostre testimoni API per accedir a les dades.

L'objectiu principal d'aquest article és donar una comprensió general de l'ús del streaming d'informació a AWS; tenim en compte que les dades que retorna l'API utilitzada no estan estrictament actualitzades i es transmeten des de la memòria cau, que és format a partir de les cerques dels usuaris dels llocs Aviasales.ru i Jetradar.com durant les últimes 48 hores.

Kinesis-agent, instal·lat a la màquina productora, rebut mitjançant l'API analitzarà i transmetrà automàticament les dades al flux desitjat mitjançant Kinesis Data Analytics. La versió en brut d'aquest flux s'escriurà directament a la botiga. L'emmagatzematge de dades en brut desplegat a DynamoDB permetrà una anàlisi més profunda dels bitllets mitjançant eines de BI, com ara AWS Quick Sight.

Considerarem dues opcions per desplegar tota la infraestructura:

  • Manual - mitjançant AWS Management Console;
  • La infraestructura del codi Terraform és per a automatistes mandrosos;

L'arquitectura del sistema desenvolupat

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Components utilitzats:

  • API Aviasales — les dades retornades per aquesta API s'utilitzaran per a tots els treballs posteriors;
  • Instància del productor EC2 — una màquina virtual normal al núvol on es generarà el flux de dades d'entrada:
    • Agent de Kinesis és una aplicació Java instal·lada localment a la màquina que proporciona una manera senzilla de recopilar i enviar dades a Kinesis (Kinesis Data Streams o Kinesis Firehose). L'agent supervisa constantment un conjunt de fitxers als directoris especificats i envia dades noves a Kinesis;
    • API Caller Script — Un script de Python que fa sol·licituds a l'API i posa la resposta en una carpeta supervisada per l'agent de Kinesis;
  • Kinesis Data Streams — Servei de transmissió de dades en temps real amb capacitats d'escala àmplia;
  • Kinesis Analytics és un servei sense servidor que simplifica l'anàlisi de dades de streaming en temps real. Amazon Kinesis Data Analytics configura els recursos de l'aplicació i escala automàticament per gestionar qualsevol volum de dades entrants;
  • AWS Lambda — un servei que us permet executar codi sense fer una còpia de seguretat ni configurar servidors. Tota la potència de càlcul s'escala automàticament per a cada trucada;
  • Amazon DynamoDB - Una base de dades de parells clau-valor i documents que proporciona una latència de menys de 10 mil·lisegons quan s'executa a qualsevol escala. Quan utilitzeu DynamoDB, no cal que proveïu, apliqueu o gestioneu cap servidor. DynamoDB escala automàticament les taules per ajustar la quantitat de recursos disponibles i mantenir un alt rendiment. No es requereix cap administració del sistema;
  • Amazon SNS - un servei totalment gestionat per enviar missatges mitjançant el model editor-subscriptor (Pub/Sub), amb el qual podeu aïllar microserveis, sistemes distribuïts i aplicacions sense servidor. SNS es pot utilitzar per enviar informació als usuaris finals mitjançant notificacions push mòbils, missatges SMS i correus electrònics.

Formació inicial

Per emular el flux de dades, vaig decidir utilitzar la informació del bitllet d'avió que retornava l'API d'Aviasales. EN documentació una llista força extensa de diferents mètodes, prenem un d'ells: "Calendari de preus mensuals", que retorna els preus per a cada dia del mes, agrupats pel nombre de transferències. Si no especifiqueu el mes de cerca a la sol·licitud, es retornarà la informació del mes següent a l'actual.

Per tant, registrem-nos i aconseguim el nostre testimoni.

A continuació es mostra un exemple de sol·licitud:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

El mètode anterior per rebre dades de l'API especificant un testimoni a la sol·licitud funcionarà, però prefereixo passar el testimoni d'accés per la capçalera, de manera que utilitzarem aquest mètode a l'script api_caller.py.

Exemple de resposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L'exemple de resposta de l'API anterior mostra un bitllet de Sant Petersburg a Phuk... Oh, quin somni...
Com que sóc de Kazan, i Phuket ara és "només un somni", busquem bitllets de Sant Petersburg a Kazan.

Se suposa que ja teniu un compte d'AWS. M'agradaria cridar immediatament una atenció especial sobre el fet que Kinesis i l'enviament de notificacions per SMS no estan inclosos en el Nivell gratuït (ús gratuït). Però tot i això, amb un parell de dòlars en ment, és molt possible construir el sistema proposat i jugar-hi. I, per descomptat, no us oblideu d'eliminar tots els recursos quan ja no siguin necessaris.

Afortunadament, les funcions DynamoDb i lambda seran gratuïtes per a nosaltres si complim els nostres límits gratuïts mensuals. Per exemple, per a DynamoDB: 25 GB d'emmagatzematge, 25 WCU/RCU i 100 milions de consultes. I un milió de trucades de funció lambda al mes.

Desplegament manual del sistema

Configuració de Kinesis Data Streams

Anem al servei Kinesis Data Streams i creem dos nous fluxos, un fragment per a cadascun.

Què és un fragment?
Un fragment és la unitat bàsica de transferència de dades d'un flux d'Amazon Kinesis. Un segment proporciona la transferència de dades d'entrada a una velocitat d'1 MB/s i la transferència de dades de sortida a una velocitat de 2 MB/s. Un segment admet fins a 1000 entrades PUT per segon. Quan creeu un flux de dades, heu d'especificar el nombre necessari de segments. Per exemple, podeu crear un flux de dades amb dos segments. Aquest flux de dades proporcionarà una transferència de dades d'entrada a 2 MB/s i una transferència de dades de sortida a 4 MB/s, suportant fins a 2000 registres PUT per segon.

Com més fragments hi hagi el vostre flux, més gran serà el rendiment. En principi, així s'escala els fluxos, afegint fragments. Però com més fragments tinguis, més alt serà el preu. Cada fragment costa 1,5 cèntims per hora i 1.4 cèntims addicionals per cada milió d'unitats de càrrega útil PUT.

Creem un nou flux amb el nom bitllets_aeri, 1 fragment serà suficient per a ell:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Ara anem a crear un altre fil amb el nom corrent_especial:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Configuració del productor

Per analitzar una tasca, n'hi ha prou amb utilitzar una instància EC2 normal com a productor de dades. No ha de ser una màquina virtual potent i cara; un spot t2.micro anirà bé.

Nota important: per exemple, hauríeu d'utilitzar la imatge: Amazon Linux AMI 2018.03.0, té menys configuracions per iniciar ràpidament l'agent Kinesis.

Aneu al servei EC2, creeu una màquina virtual nova, seleccioneu l'AMI desitjada amb el tipus t2.micro, que s'inclou al Nivell gratuït:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Perquè la màquina virtual de nova creació pugui interactuar amb el servei Kinesis, s'ha de donar els drets per fer-ho. La millor manera de fer-ho és assignar un rol IAM. Per tant, a la pantalla Pas 3: Configura els detalls de la instància, hauríeu de seleccionar Creeu una nova funció IAM:

Creació d'un rol IAM per a EC2
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
A la finestra que s'obre, seleccioneu que estem creant un rol nou per a EC2 i aneu a la secció Permisos:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Utilitzant l'exemple de formació, no hem d'aprofundir en totes les complexitats de la configuració granular dels drets dels recursos, de manera que seleccionarem les polítiques preconfigurades per Amazon: AmazonKinesisFullAccess i CloudWatchFullAccess.

Donem un nom significatiu per a aquesta funció, per exemple: EC2-KinesisStreams-FullAccess. El resultat hauria de ser el mateix que es mostra a la imatge següent:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Després de crear aquesta nova funció, no oblideu adjuntar-la a la instància de màquina virtual creada:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
No canviem res més en aquesta pantalla i passem a les finestres següents.

La configuració del disc dur es pot deixar per defecte, així com les etiquetes (tot i que és una bona pràctica utilitzar etiquetes, almenys donar un nom a la instància i indicar l'entorn).

Ara estem a la pestanya Pas 6: Configura el grup de seguretat, on n'heu de crear un de nou o especificar el vostre grup de seguretat existent, que us permet connectar-vos mitjançant ssh (port 22) a la instància. Seleccioneu Font -> La meva IP allà i podreu iniciar la instància.

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Tan bon punt canviï a l'estat d'execució, podeu provar de connectar-hi mitjançant ssh.

Per poder treballar amb Kinesis Agent, després de connectar-vos correctament a la màquina, heu d'introduir les ordres següents al terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Creem una carpeta per desar les respostes de l'API:

sudo mkdir /var/log/airline_tickets

Abans d'iniciar l'agent, heu de configurar la seva configuració:

sudo vim /etc/aws-kinesis/agent.json

El contingut del fitxer agent.json hauria de ser així:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Com es pot veure al fitxer de configuració, l'agent supervisarà els fitxers amb l'extensió .log al directori /var/log/airline_tickets/, els analitzarà i els transferirà al flux airline_tickets.

Reiniciem el servei i ens assegurem que estigui en funcionament:

sudo service aws-kinesis-agent restart

Ara descarreguem l'script de Python que sol·licitarà dades de l'API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

L'script api_caller.py sol·licita dades a Aviasales i desa la resposta rebuda al directori que escaneja l'agent de Kinesis. La implementació d'aquest script és bastant estàndard, hi ha una classe TicketsApi, us permet extreure l'API de manera asíncrona. Passem una capçalera amb un testimoni i demanem paràmetres a aquesta classe:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Per provar la configuració correcta i la funcionalitat de l'agent, provem a executar l'script api_caller.py:

sudo ./api_caller.py TOKEN

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
I mirem el resultat del treball als registres de l'agent i a la pestanya Monitorització del flux de dades airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Com podeu veure, tot funciona i l'agent Kinesis envia dades amb èxit al flux. Ara configurem el consumidor.

Configuració de Kinesis Data Analytics

Passem al component central de tot el sistema: creeu una nova aplicació a Kinesis Data Analytics anomenada kinesis_analytics_airlines_app:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Kinesis Data Analytics us permet realitzar anàlisis de dades en temps real des de Kinesis Streams mitjançant el llenguatge SQL. És un servei d'escalat automàtic (a diferència de Kinesis Streams) que:

  1. us permet crear nous fluxos (output Stream) basats en les sol·licituds de dades d'origen;
  2. proporciona un flux amb errors que s'han produït mentre s'executaven les aplicacions (Error Stream);
  3. pot determinar automàticament l'esquema de dades d'entrada (es pot redefinir manualment si cal).

Aquest no és un servei barat: 0.11 USD per hora de treball, per la qual cosa hauríeu d'utilitzar-lo amb cura i eliminar-lo quan hàgiu acabat.

Connectem l'aplicació a la font de dades:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Seleccioneu el flux al qual anem a connectar (airline_tickets):

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
A continuació, heu d'adjuntar un nou rol d'IAM perquè l'aplicació pugui llegir des del flux i escriure al flux. Per fer-ho, n'hi ha prou amb no canviar res al bloc de permisos d'accés:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Ara sol·licitem el descobriment de l'esquema de dades al flux; per fer-ho, feu clic al botó "Descobriu l'esquema". Com a resultat, la funció IAM s'actualitzarà (se'n crearà una de nova) i es llançarà la detecció d'esquemes a partir de les dades que ja han arribat al flux:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Ara heu d'anar a l'editor SQL. Quan feu clic a aquest botó, apareixerà una finestra que us demanarà que inicieu l'aplicació; seleccioneu el que voleu iniciar:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Inseriu la següent consulta senzilla a la finestra de l'editor SQL i feu clic a Desa i executa SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

A les bases de dades relacionals, treballeu amb taules utilitzant sentències INSERT per afegir registres i una sentència SELECT per consultar dades. A Amazon Kinesis Data Analytics, treballeu amb fluxos (STREAM) i bombes (PUMP): sol·licituds d'inserció contínues que insereixen dades d'un flux d'una aplicació a un altre flux.

La consulta SQL presentada anteriorment cerca bitllets d'Aeroflot amb un cost inferior a cinc mil rubles. Tots els registres que compleixin aquestes condicions es col·locaran al flux DESTINATION_SQL_STREAM.

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Al bloc Destinació, seleccioneu el flux special_stream i a la llista desplegable Nom del flux dins de l'aplicació DESTINATION_SQL_STREAM:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
El resultat de totes les manipulacions hauria de ser similar a la imatge següent:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Creació i subscripció a un tema de SNS

Aneu al Servei de notificacions simples i creeu-hi un tema nou amb el nom Aerolínies:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Subscriu-te a aquest tema i indica el número de telèfon mòbil al qual s'enviaran les notificacions per SMS:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Creeu una taula a DynamoDB

Per emmagatzemar les dades en brut del seu flux airline_tickets, creem una taula a DynamoDB amb el mateix nom. Utilitzarem record_id com a clau primària:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Creació d'un col·lector de funcions lambda

Creem una funció lambda anomenada Collector, la tasca de la qual serà enquestar el flux airline_tickets i, si s'hi troben nous registres, inserir aquests registres a la taula DynamoDB. Òbviament, a més dels drets per defecte, aquesta lambda ha de tenir accés de lectura al flux de dades de Kinesis i accés d'escriptura a DynamoDB.

Creació d'un rol IAM per a la funció lambda del col·lector
Primer, creem un nou rol IAM per al lambda anomenat Lambda-TicketsProcessingRole:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Per a l'exemple de prova, les polítiques preconfigurades d'AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess són molt adequades, tal com es mostra a la imatge següent:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Aquest lambda s'hauria de llançar mitjançant un activador de Kinesis quan entrades noves entren a airline_stream, per la qual cosa hem d'afegir un activador nou:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Només queda inserir el codi i desar la lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creació d'un notificador de funció lambda

La segona funció lambda, que supervisarà el segon flux (special_stream) i enviarà una notificació a SNS, es crea de manera similar. Per tant, aquesta lambda ha de tenir accés per llegir des de Kinesis i enviar missatges a un determinat tema SNS, que després seran enviats pel servei SNS a tots els subscriptors d'aquest tema (correu electrònic, SMS, etc.).

Creació d'un rol IAM
Primer, creem la funció IAM Lambda-KinesisAlarm per a aquesta lambda i, a continuació, assignem aquesta funció a la lambda alarm_notifier que s'està creant:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Aquesta lambda hauria de funcionar en un activador perquè els nous registres entrin a l'especial_stream, per la qual cosa cal que configureu l'activador de la mateixa manera que ho vam fer per a la lambda de Collector.

Per facilitar la configuració d'aquesta lambda, introduïm una nova variable d'entorn - TOPIC_ARN, on col·loquem l'ANR (Amazon Recourse Names) del tema Airlines:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
I inseriu el codi lambda, no és gens complicat:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Sembla que aquí és on es completa la configuració manual del sistema. Només queda provar i assegurar-nos que ho hem configurat tot correctament.

Desplega des del codi Terraform

Preparació necessària

Terraform és una eina de codi obert molt convenient per desplegar infraestructura des del codi. Té la seva pròpia sintaxi que és fàcil d'aprendre i té molts exemples de com i què implementar. L'editor Atom o Visual Studio Code té molts complements útils que faciliten el treball amb Terraform.

Podeu descarregar-vos la distribució per tant. Una anàlisi detallada de totes les capacitats de Terraform està fora de l'abast d'aquest article, de manera que ens limitarem als punts principals.

Com començar

El codi complet del projecte és al meu repositori. Clonem el dipòsit per a nosaltres mateixos. Abans de començar, heu d'assegurar-vos que teniu l'AWS CLI instal·lada i configurada, perquè... Terraform buscarà credencials al fitxer ~/.aws/credentials.

Una bona pràctica és executar l'ordre del pla abans de desplegar tota la infraestructura per veure què està creant Terraform actualment per a nosaltres al núvol:

terraform.exe plan

Se us demanarà que introduïu un número de telèfon al qual enviar notificacions. No cal entrar-hi en aquesta etapa.

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Un cop analitzat el pla de funcionament del programa, podem començar a crear recursos:

terraform.exe apply

Després d'enviar aquesta ordre, se us demanarà de nou que introduïu un número de telèfon; marqueu "sí" quan es mostri una pregunta sobre la realització de les accions. Això us permetrà configurar tota la infraestructura, dur a terme tota la configuració necessària de l'EC2, desplegar funcions lambda, etc.

Després que tots els recursos s'hagin creat amb èxit mitjançant el codi Terraform, heu d'entrar en els detalls de l'aplicació Kinesis Analytics (malauradament, no he trobat com fer-ho directament des del codi).

Inicieu l'aplicació:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Després d'això, heu d'establir explícitament el nom del flux dins de l'aplicació seleccionant a la llista desplegable:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Ara tot està llest per marxar.

Prova de l'aplicació

Independentment de com hàgiu desplegat el sistema, manualment o mitjançant el codi Terraform, funcionarà igual.

Iniciem sessió mitjançant SSH a la màquina virtual EC2 on està instal·lat Kinesis Agent i executem l'script api_caller.py

sudo ./api_caller.py TOKEN

Tot el que has de fer és esperar un SMS al teu número:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
SMS: arriba un missatge al telèfon en gairebé 1 minut:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor
Queda per veure si els registres es van desar a la base de dades DynamoDB per a una anàlisi posterior i més detallada. La taula airline_tickets conté aproximadament les dades següents:

Integració de l'API d'Aviasales amb Amazon Kinesis i simplicitat sense servidor

Conclusió

En el transcurs del treball realitzat, es va construir un sistema de tractament de dades en línia basat en Amazon Kinesis. Es van considerar les opcions per utilitzar l'agent Kinesis juntament amb Kinesis Data Streams i l'anàlisi en temps real de Kinesis Analytics mitjançant ordres SQL, així com la interacció d'Amazon Kinesis amb altres serveis d'AWS.

Hem desplegat el sistema anterior de dues maneres: una manual força llarga i una altra ràpida des del codi Terraform.

Tot el codi font del projecte està disponible al meu repositori de GitHub, us suggereixo que us hi familiaritzeu.

Estic encantat de parlar de l'article, espero els vostres comentaris. Espero una crítica constructiva.

Els desitjo molt èxit!

Font: www.habr.com

Afegeix comentari