Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

¡Hola, Habr!

¿Te gusta volar aviones? Me encanta, pero durante el autoaislamiento también me enamoré del análisis de datos sobre boletos aéreos de un recurso muy conocido: Aviasales.

Hoy analizaremos el trabajo de Amazon Kinesis, crearemos un sistema de transmisión con análisis en tiempo real, instalaremos la base de datos NoSQL de Amazon DynamoDB como almacenamiento de datos principal y configuraremos notificaciones por SMS para tickets interesantes.

¡Todos los detalles están debajo del corte! ¡Ir!

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

introducción

Por ejemplo, necesitamos acceso a API de Aviasales. El acceso al mismo es gratuito y sin restricciones, sólo necesitas registrarte en la sección “Desarrolladores” para recibir tu token API para acceder a los datos.

El objetivo principal de este artículo es dar una comprensión general del uso del streaming de información en AWS; tomamos en cuenta que los datos que devuelve la API utilizada no están estrictamente actualizados y se transmiten desde el caché, el cual es formado en base a búsquedas realizadas por usuarios de los sitios Aviasales.ru y Jetradar.com durante las últimas 48 horas.

El agente Kinesis, instalado en la máquina productora, recibido a través de la API, analizará y transmitirá automáticamente los datos al flujo deseado a través de Kinesis Data Analytics. La versión sin formato de esta transmisión se escribirá directamente en la tienda. El almacenamiento de datos sin procesar implementado en DynamoDB permitirá un análisis de tickets más profundo a través de herramientas de BI, como AWS Quick Sight.

Consideraremos dos opciones para implementar toda la infraestructura:

  • Manual: a través de la Consola de administración de AWS;
  • La infraestructura del código Terraform es para automatizadores perezosos;

Arquitectura del sistema desarrollado.

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Componentes utilizados:

  • API de Aviasales — los datos devueltos por esta API se utilizarán para todos los trabajos posteriores;
  • Instancia de productor EC2 — una máquina virtual normal en la nube en la que se generará el flujo de datos de entrada:
    • Agente de Kinesis es una aplicación Java instalada localmente en la máquina que proporciona una manera sencilla de recopilar y enviar datos a Kinesis (Kinesis Data Streams o Kinesis Firehose). El agente monitorea constantemente un conjunto de archivos en los directorios especificados y envía nuevos datos a Kinesis;
    • Script de llamada API — Un script de Python que realiza solicitudes a la API y coloca la respuesta en una carpeta supervisada por Kinesis Agent;
  • Flujos de datos de Kinesis — servicio de transmisión de datos en tiempo real con amplias capacidades de escala;
  • Análisis de Kinesis es un servicio sin servidor que simplifica el análisis de datos en streaming en tiempo real. Amazon Kinesis Data Analytics configura los recursos de la aplicación y los escala automáticamente para manejar cualquier volumen de datos entrantes;
  • AWS Lambda — un servicio que le permite ejecutar código sin realizar copias de seguridad ni configurar servidores. Toda la potencia informática se escala automáticamente para cada llamada;
  • Amazon DynamoDB - Una base de datos de documentos y pares clave-valor que proporciona una latencia de menos de 10 milisegundos cuando se ejecuta a cualquier escala. Al utilizar DynamoDB, no es necesario aprovisionar, parchear ni administrar ningún servidor. DynamoDB escala automáticamente las tablas para ajustar la cantidad de recursos disponibles y mantener un alto rendimiento. No se requiere administración del sistema;
  • redes sociales de amazon - un servicio totalmente administrado para enviar mensajes utilizando el modelo editor-suscriptor (Pub/Sub), con el que puede aislar microservicios, sistemas distribuidos y aplicaciones sin servidor. SNS se puede utilizar para enviar información a los usuarios finales a través de notificaciones push móviles, mensajes SMS y correos electrónicos.

Entrenamiento inicial

Para emular el flujo de datos, decidí utilizar la información del billete de avión devuelta por la API de Aviasales. EN documentación Hay una lista bastante extensa de diferentes métodos, tomemos uno de ellos: el "Calendario de precios mensuales", que devuelve precios para cada día del mes, agrupados por el número de transferencias. Si no especifica el mes de búsqueda en la solicitud, se devolverá información del mes siguiente al actual.

Entonces, registrémonos y obtengamos nuestro token.

A continuación se muestra un ejemplo de solicitud:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

El método anterior para recibir datos de la API especificando un token en la solicitud funcionará, pero prefiero pasar el token de acceso a través del encabezado, por lo que usaremos este método en el script api_caller.py.

Ejemplo de respuesta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

El ejemplo de respuesta API anterior muestra un billete de San Petersburgo a Phuk... Oh, qué sueño...
Como soy de Kazán y Phuket ahora es “sólo un sueño”, busquemos billetes de San Petersburgo a Kazán.

Se supone que ya tiene una cuenta de AWS. Me gustaría llamar inmediatamente la atención sobre el hecho de que Kinesis y el envío de notificaciones por SMS no están incluidos en el presupuesto anual. Nivel gratuito (uso gratuito). Pero incluso a pesar de esto, con un par de dólares en mente, es muy posible construir el sistema propuesto y jugar con él. Y, por supuesto, no olvide eliminar todos los recursos cuando ya no sean necesarios.

Afortunadamente, las funciones de DynamoDb y lambda serán gratuitas para nosotros si cumplimos con nuestros límites gratuitos mensuales. Por ejemplo, para DynamoDB: 25 GB de almacenamiento, 25 WCU/RCU y 100 millones de consultas. Y un millón de llamadas a funciones lambda al mes.

Implementación manual del sistema

Configuración de flujos de datos de Kinesis

Vayamos al servicio Kinesis Data Streams y creemos dos transmisiones nuevas, un fragmento para cada una.

¿Qué es un fragmento?
Un fragmento es la unidad básica de transferencia de datos de una transmisión de Amazon Kinesis. Un segmento proporciona transferencia de datos de entrada a una velocidad de 1 MB/s y transferencia de datos de salida a una velocidad de 2 MB/s. Un segmento admite hasta 1000 entradas PUT por segundo. Al crear un flujo de datos, debe especificar la cantidad requerida de segmentos. Por ejemplo, puede crear un flujo de datos con dos segmentos. Este flujo de datos proporcionará una transferencia de datos de entrada a 2 MB/s y una transferencia de datos de salida a 4 MB/s, admitiendo hasta 2000 registros PUT por segundo.

Cuantos más fragmentos haya en su flujo, mayor será su rendimiento. En principio, así es como se escalan los flujos: añadiendo fragmentos. Pero cuantos más fragmentos tengas, mayor será el precio. Cada fragmento cuesta 1,5 centavos por hora y 1.4 centavos adicionales por cada millón de unidades de carga útil PUT.

Creemos una nueva secuencia con el nombre. tickets de avión, 1 fragmento le bastará:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Ahora creemos otro hilo con el nombre flujo_especial:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Configuración del productor

Para analizar una tarea, basta con utilizar una instancia EC2 normal como productor de datos. No tiene por qué ser una máquina virtual potente y costosa; un t2.micro spot funcionará bien.

Nota importante: por ejemplo, debe usar la imagen: Amazon Linux AMI 2018.03.0, tiene menos configuraciones para iniciar rápidamente Kinesis Agent.

Vaya al servicio EC2, cree una nueva máquina virtual, seleccione la AMI deseada con el tipo t2.micro, que está incluida en la capa gratuita:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Para que la máquina virtual recién creada pueda interactuar con el servicio Kinesis, se le deben otorgar derechos para hacerlo. La mejor manera de hacerlo es asignar una función de IAM. Por lo tanto, en la pantalla Paso 3: Configurar detalles de la instancia, debe seleccionar Crear un nuevo rol de IAM:

Crear un rol de IAM para EC2
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
En la ventana que se abre seleccionamos que estamos creando un nuevo rol para EC2 y vamos a la sección Permisos:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Usando el ejemplo de capacitación, no tenemos que entrar en todas las complejidades de la configuración granular de los derechos de recursos, por lo que seleccionaremos las políticas preconfiguradas por Amazon: AmazonKinesisFullAccess y CloudWatchFullAccess.

Démosle un nombre significativo a esta función, por ejemplo: EC2-KinesisStreams-FullAccess. El resultado debería ser el mismo que se muestra en la siguiente imagen:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Después de crear este nuevo rol, no olvide adjuntarlo a la instancia de máquina virtual creada:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
No cambiamos nada más en esta pantalla y pasamos a las siguientes ventanas.

La configuración del disco duro se puede dejar por defecto, así como las etiquetas (aunque es una buena práctica usar etiquetas, al menos darle un nombre a la instancia e indicar el entorno).

Ahora estamos en la pestaña Paso 6: Configurar grupo de seguridad, donde necesita crear uno nuevo o especificar su grupo de seguridad existente, que le permite conectarse a través de ssh (puerto 22) a la instancia. Seleccione Fuente -> Mi IP allí y podrá iniciar la instancia.

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Tan pronto como cambie al estado de ejecución, puede intentar conectarse a través de ssh.

Para poder trabajar con Kinesis Agent, después de conectarse exitosamente a la máquina, debe ingresar los siguientes comandos en la terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Creemos una carpeta para guardar las respuestas de la API:

sudo mkdir /var/log/airline_tickets

Antes de iniciar el agente, debe configurar su configuración:

sudo vim /etc/aws-kinesis/agent.json

El contenido del archivo agent.json debería verse así:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Como se puede ver en el archivo de configuración, el agente monitoreará los archivos con la extensión .log en el directorio /var/log/airline_tickets/, los analizará y los transferirá a la secuencia aerolínea_tickets.

Reiniciamos el servicio y nos aseguramos de que está en funcionamiento:

sudo service aws-kinesis-agent restart

Ahora descarguemos el script de Python que solicitará datos de la API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

El script api_caller.py solicita datos de Aviasales y guarda la respuesta recibida en el directorio que escanea el agente de Kinesis. La implementación de este script es bastante estándar, hay una clase TicketsApi que le permite extraer la API de forma asincrónica. Pasamos un encabezado con un token y solicitamos parámetros a esta clase:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Para probar la configuración y funcionalidad correctas del agente, probemos ejecutar el script api_caller.py:

sudo ./api_caller.py TOKEN

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Y observamos el resultado del trabajo en los registros del Agente y en la pestaña Monitoreo en el flujo de datos de billetes_aviones:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Como puede ver, todo funciona y Kinesis Agent envía datos a la transmisión con éxito. Ahora configuremos el consumidor.

Configurar el análisis de datos de Kinesis

Pasemos al componente central de todo el sistema: creemos una nueva aplicación en Kinesis Data Analytics llamada kinesis_analytics_airlines_app:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Kinesis Data Analytics le permite realizar análisis de datos en tiempo real desde Kinesis Streams utilizando el lenguaje SQL. Es un servicio totalmente de escalado automático (a diferencia de Kinesis Streams) que:

  1. le permite crear nuevas secuencias (Output Stream) basadas en solicitudes de datos de origen;
  2. proporciona una secuencia con errores que ocurrieron mientras se ejecutaban las aplicaciones (Error Stream);
  3. puede determinar automáticamente el esquema de datos de entrada (se puede redefinir manualmente si es necesario).

Este no es un servicio barato: 0.11 USD por hora de trabajo, por lo que debes utilizarlo con cuidado y eliminarlo cuando hayas terminado.

Conectemos la aplicación a la fuente de datos:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Seleccione la transmisión a la que nos vamos a conectar (airline_tickets):

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
A continuación, debe adjuntar una nueva función de IAM para que la aplicación pueda leer y escribir en la secuencia. Para ello basta con no cambiar nada en el bloque de permisos de acceso:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Ahora solicitemos el descubrimiento del esquema de datos en la secuencia; para hacer esto, haga clic en el botón "Descubrir esquema". Como resultado, se actualizará el rol de IAM (se creará uno nuevo) y se iniciará la detección de esquemas a partir de los datos que ya llegaron al flujo:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Ahora necesitas ir al editor SQL. Cuando haga clic en este botón, aparecerá una ventana pidiéndole que inicie la aplicación; seleccione lo que desea iniciar:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Inserte la siguiente consulta simple en la ventana del editor SQL y haga clic en Guardar y ejecutar SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

En las bases de datos relacionales, se trabaja con tablas utilizando declaraciones INSERT para agregar registros y una declaración SELECT para consultar datos. En Amazon Kinesis Data Analytics, trabaja con transmisiones (STREAM) y bombas (PUMP): solicitudes de inserción continuas que insertan datos de una secuencia en una aplicación en otra secuencia.

La consulta SQL presentada anteriormente busca billetes de Aeroflot con un coste inferior a cinco mil rublos. Todos los registros que cumplan estas condiciones se colocarán en la secuencia DESTINATION_SQL_STREAM.

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
En el bloque Destino, seleccione la secuencia special_stream y en la lista desplegable Nombre de secuencia en la aplicación DESTINATION_SQL_STREAM:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
El resultado de todas las manipulaciones debería ser algo similar a la siguiente imagen:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Crear y suscribirse a un tema de SNS

Vaya al Servicio de notificación simple y cree allí un nuevo tema con el nombre Aerolíneas:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Suscríbete a este tema e indica el número de teléfono móvil al que se enviarán las notificaciones por SMS:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Crear una tabla en DynamoDB

Para almacenar los datos sin procesar de su flujo de billetes de avión, creemos una tabla en DynamoDB con el mismo nombre. Usaremos record_id como clave principal:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Creando un recopilador de funciones lambda

Creemos una función lambda llamada Collector, cuya tarea será sondear el flujo de billetes de avión y, si se encuentran nuevos registros allí, insertarlos en la tabla de DynamoDB. Obviamente, además de los derechos predeterminados, esta lambda debe tener acceso de lectura al flujo de datos de Kinesis y acceso de escritura a DynamoDB.

Crear un rol de IAM para la función lambda del recopilador
Primero, creemos una nueva función de IAM para la lambda denominada Lambda-TicketsProcessingRole:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Para el ejemplo de prueba, las políticas preconfiguradas de AmazonKinesisReadOnlyAccess y AmazonDynamoDBFullAccess son bastante adecuadas, como se muestra en la siguiente imagen:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Esta lambda debe iniciarse mediante un disparador de Kinesis cuando nuevas entradas ingresan a aerolínea_stream, por lo que debemos agregar un nuevo disparador:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Ya sólo queda insertar el código y guardar la lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creando un notificador de función lambda

La segunda función lambda, que monitoreará la segunda transmisión (special_stream) y enviará una notificación al SNS, se crea de manera similar. Por lo tanto, esta lambda debe tener acceso para leer desde Kinesis y enviar mensajes a un tema SNS determinado, que luego será enviado por el servicio SNS a todos los suscriptores de este tema (correo electrónico, SMS, etc.).

Crear un rol de IAM
Primero, creamos la función de IAM Lambda-KinesisAlarm para esta lambda y luego asignamos esta función a la lambda alarm_notifier que se está creando:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Esta lambda debería funcionar en un disparador para que nuevos registros ingresen al flujo_especial, por lo que debe configurar el disparador de la misma manera que lo hicimos para la lambda del recopilador.

Para facilitar la configuración de esta lambda, introduzcamos una nueva variable de entorno: TOPIC_ARN, donde colocamos el ANR (Nombres de recursos de Amazon) del tema Aerolíneas:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
E inserta el código lambda, no es nada complicado:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Parece que aquí es donde se completa la configuración manual del sistema. Ya sólo queda probar y asegurarnos de que hemos configurado todo correctamente.

Implementar desde el código Terraform

Preparación necesaria

Terraform es una herramienta de código abierto muy conveniente para implementar infraestructura a partir de código. Tiene su propia sintaxis que es fácil de aprender y tiene muchos ejemplos de cómo y qué implementar. El editor Atom o Visual Studio Code tiene muchos complementos útiles que facilitan el trabajo con Terraform.

Puedes descargar la distribución. por lo tanto. Un análisis detallado de todas las capacidades de Terraform está fuera del alcance de este artículo, por lo que nos limitaremos a los puntos principales.

Como correr

El código completo del proyecto es en mi repositorio. Clonamos el repositorio para nosotros mismos. Antes de comenzar, debe asegurarse de tener AWS CLI instalada y configurada, porque... Terraform buscará credenciales en el archivo ~/.aws/credentials.

Una buena práctica es ejecutar el comando plan antes de implementar toda la infraestructura para ver qué está creando Terraform actualmente para nosotros en la nube:

terraform.exe plan

Se le pedirá que ingrese un número de teléfono al que enviar notificaciones. No es necesario ingresarlo en esta etapa.

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Habiendo analizado el plan operativo del programa, podemos comenzar a crear recursos:

terraform.exe apply

Después de enviar este comando, se le pedirá nuevamente que ingrese un número de teléfono; marque "sí" cuando se muestre una pregunta sobre cómo realizar las acciones. Esto le permitirá configurar toda la infraestructura, realizar toda la configuración necesaria de EC2, implementar funciones lambda, etc.

Una vez que todos los recursos se hayan creado correctamente a través del código Terraform, debe entrar en los detalles de la aplicación Kinesis Analytics (desafortunadamente, no encontré cómo hacerlo directamente desde el código).

Iniciar la aplicacion:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Después de esto, debe establecer explícitamente el nombre de la secuencia en la aplicación seleccionando en la lista desplegable:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Ahora todo está listo para funcionar.

Probando la aplicación

Independientemente de cómo implementó el sistema, manualmente o mediante código Terraform, funcionará igual.

Iniciamos sesión vía SSH en la máquina virtual EC2 donde está instalado Kinesis Agent y ejecutamos el script api_caller.py

sudo ./api_caller.py TOKEN

Todo lo que tienes que hacer es esperar un SMS a tu número:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
SMS: el mensaje llega al teléfono en casi 1 minuto:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor
Queda por ver si los registros se guardaron en la base de datos de DynamoDB para un análisis posterior más detallado. La tabla de billetes de avión contiene aproximadamente los siguientes datos:

Integración de la API de Aviasales con Amazon Kinesis y simplicidad sin servidor

Conclusión

Durante el trabajo realizado, se construyó un sistema de procesamiento de datos en línea basado en Amazon Kinesis. Se consideraron opciones para usar Kinesis Agent junto con Kinesis Data Streams y análisis en tiempo real Kinesis Analytics mediante comandos SQL, así como la interacción de Amazon Kinesis con otros servicios de AWS.

Implementamos el sistema anterior de dos maneras: una manual bastante larga y otra rápida del código Terraform.

Todo el código fuente del proyecto está disponible. en mi repositorio de GitHub, Te sugiero que te familiarices con él.

Estoy feliz de discutir el artículo, espero sus comentarios. Espero críticas constructivas.

Les deseo mucho éxito!

Fuente: habr.com

Añadir un comentario