Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Ola Habr!

Gústache voar avións? Encántame, pero durante o autoillamento tamén me namorei de analizar os datos dos billetes de avión dun recurso coñecido: Aviasales.

Hoxe analizaremos o traballo de Amazon Kinesis, construiremos un sistema de transmisión en tempo real con análise en tempo real, instalaremos a base de datos NoSQL de Amazon DynamoDB como principal almacenamento de datos e configuraremos notificacións por SMS para tickets interesantes.

Todos os detalles están baixo o corte! Vaia!

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Introdución

Por exemplo, necesitamos acceso a Aviasales API. O acceso a ela é gratuíto e sen restricións; só tes que rexistrarte na sección "Desenvolvedores" para recibir o teu token API para acceder aos datos.

O obxectivo principal deste artigo é dar unha comprensión xeral do uso da transmisión de información en AWS; temos en conta que os datos devoltos pola API utilizada non están estritamente actualizados e transmítense desde a caché, que é formado en base a buscas realizadas por usuarios dos sitios Aviasales.ru e Jetradar.com durante as últimas 48 horas.

Kinesis-agent, instalado na máquina produtora, recibido a través da API analizará e transmitirá automaticamente os datos ao fluxo desexado a través de Kinesis Data Analytics. A versión en bruto deste fluxo escribirase directamente na tenda. O almacenamento de datos en bruto despregado en DynamoDB permitirá unha análise máis profunda de tickets a través de ferramentas de BI, como AWS Quick Sight.

Consideraremos dúas opcións para implantar toda a infraestrutura:

  • Manual: mediante AWS Management Console;
  • A infraestrutura do código Terraform é para autómatas preguiceiros;

Arquitectura do sistema desenvolvido

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Compoñentes utilizados:

  • Aviasales API — os datos devoltos por esta API serán utilizados para todos os traballos posteriores;
  • Instancia de produtor EC2 — unha máquina virtual normal na nube na que se xerará o fluxo de datos de entrada:
    • Axente Kinesis é unha aplicación Java instalada localmente na máquina que ofrece un xeito sinxelo de recoller e enviar datos a Kinesis (Kinesis Data Streams ou Kinesis Firehose). O axente supervisa constantemente un conxunto de ficheiros nos directorios especificados e envía novos datos a Kinesis;
    • API Caller Script — Un script de Python que fai solicitudes á API e coloca a resposta nun cartafol supervisado polo axente de Kinesis;
  • Kinesis Data Streams — servizo de transmisión de datos en tempo real con amplas capacidades de escalado;
  • Kinesis Analytics é un servizo sen servidor que simplifica a análise de datos de transmisión en tempo real. Amazon Kinesis Data Analytics configura os recursos das aplicacións e escala automaticamente para xestionar calquera volume de datos entrantes;
  • AWS Lambda — un servizo que che permite executar código sen facer unha copia de seguranza nin configurar servidores. Toda a potencia de computación escala automaticamente para cada chamada;
  • Amazon DynamoDB - Unha base de datos de pares clave-valor e documentos que proporciona unha latencia inferior a 10 milisegundos cando se executa a calquera escala. Ao usar DynamoDB, non precisa aprovisionar, parchear ou xestionar ningún servidor. DynamoDB escala automaticamente as táboas para axustar a cantidade de recursos dispoñibles e manter un alto rendemento. Non é necesaria a administración do sistema;
  • Amazon SNS - un servizo totalmente xestionado para o envío de mensaxes mediante o modelo editor-subscrito (Pub/Sub), co que pode illar microservizos, sistemas distribuídos e aplicacións sen servidor. O SNS pódese usar para enviar información aos usuarios finais a través de notificacións automáticas móbiles, mensaxes SMS e correos electrónicos.

Formación inicial

Para emular o fluxo de datos, decidín utilizar a información do billete de avión que devolveu a API de Aviasales. EN documentación unha lista bastante extensa de diferentes métodos, tomemos un deles: "Calendario de prezos mensuales", que devolve os prezos para cada día do mes, agrupados polo número de transferencias. Se non especifica o mes de busca na solicitude, devolverase a información para o mes seguinte ao actual.

Entón, imos rexistrarte e conseguir o noso token.

A continuación móstrase un exemplo de solicitude:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

O método anterior de recibir datos da API especificando un token na solicitude funcionará, pero prefiro pasar o token de acceso a través da cabeceira, polo que usaremos este método no script api_caller.py.

Exemplo de resposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

O exemplo de resposta da API anterior mostra un billete de San Petersburgo a Phuk... Oh, que soño...
Xa que son de Kazán e Phuket é agora "só un soño", busquemos billetes de San Petersburgo a Kazán.

Asume que xa tes unha conta de AWS. Gustaríame chamar inmediatamente a atención especial sobre o feito de que Kinesis e o envío de notificacións por SMS non están incluídos no Nivel gratuíto (uso gratuíto). Pero aínda a pesar diso, con un par de dólares en mente, é moi posible construír o sistema proposto e xogar con el. E, por suposto, non esquezas eliminar todos os recursos despois de que xa non sexan necesarios.

Afortunadamente, as funcións DynamoDb e lambda serán gratuítas para nós se cumprimos os nosos límites mensuais gratuítos. Por exemplo, para DynamoDB: 25 GB de almacenamento, 25 WCU/RCU e 100 millóns de consultas. E un millón de chamadas á función lambda ao mes.

Implementación manual do sistema

Configurar Kinesis Data Streams

Imos ao servizo Kinesis Data Streams e creemos dous novos fluxos, un fragmento para cada un.

Que é un fragmento?
Un fragmento é a unidade básica de transferencia de datos dun fluxo de Amazon Kinesis. Un segmento proporciona transferencia de datos de entrada a unha velocidade de 1 MB/s e transferencia de datos de saída a unha velocidade de 2 MB/s. Un segmento admite ata 1000 entradas PUT por segundo. Ao crear un fluxo de datos, cómpre especificar o número necesario de segmentos. Por exemplo, pode crear un fluxo de datos con dous segmentos. Este fluxo de datos proporcionará transferencia de datos de entrada a 2 MB/s e transferencia de datos de saída a 4 MB/s, admitindo ata 2000 rexistros PUT por segundo.

Cantos máis fragmentos teña o teu fluxo, maior será o seu rendemento. En principio, así é como se escalan os fluxos: engadindo fragmentos. Pero cantos máis fragmentos teñas, maior será o prezo. Cada fragmento custa 1,5 céntimos por hora e 1.4 céntimos adicionais por cada millón de unidades de carga útil PUT.

Imos crear unha nova emisión co nome billetes_avións, 1 fragmento será suficiente para el:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Agora imos crear outro fío co nome fluxo_especial:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Configuración do produtor

Para analizar unha tarefa, abonda con usar unha instancia EC2 normal como produtor de datos. Non ten que ser unha máquina virtual potente e cara; un spot t2.micro funcionará ben.

Nota importante: por exemplo, debes usar imaxe: Amazon Linux AMI 2018.03.0, ten menos opcións para iniciar rapidamente o axente Kinesis.

Vaia ao servizo EC2, cree unha nova máquina virtual, seleccione a AMI desexada co tipo t2.micro, que se inclúe no Nivel gratuíto:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Para que a máquina virtual recén creada poida interactuar co servizo Kinesis, hai que darlle dereitos para facelo. A mellor forma de facelo é asignar un rol de IAM. Polo tanto, na pantalla Paso 3: Configurar detalles da instancia, debes seleccionar Crear un novo rol de IAM:

Creación dun rol IAM para EC2
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Na xanela que se abre, seleccione que estamos a crear un novo rol para EC2 e vai á sección Permisos:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Usando o exemplo de formación, non temos que entrar en todas as complexidades da configuración granular dos dereitos de recursos, polo que seleccionaremos as políticas preconfiguradas por Amazon: AmazonKinesisFullAccess e CloudWatchFullAccess.

Poñamos algún nome significativo para este rol, por exemplo: EC2-KinesisStreams-FullAccess. O resultado debe ser o mesmo que se mostra na seguinte imaxe:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Despois de crear este novo rol, non esquezas anexalo á instancia de máquina virtual creada:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Non cambiamos nada máis nesta pantalla e pasamos ás seguintes ventás.

A configuración do disco duro pódese deixar por defecto, así como as etiquetas (aínda que é unha boa práctica usar etiquetas, polo menos darlle un nome á instancia e indicar o entorno).

Agora estamos na pestana Paso 6: Configurar Grupo de Seguridade, onde debes crear un novo ou especificar o teu grupo de Seguridade existente, o que che permite conectarte mediante ssh (porto 22) á instancia. Seleccione Fonte -> A miña IP alí e pode iniciar a instancia.

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
En canto cambie ao estado de execución, pode tentar conectarse a el a través de ssh.

Para poder traballar con Kinesis Agent, despois de conectarse correctamente á máquina, debes introducir os seguintes comandos no terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Imos crear un cartafol para gardar as respostas da API:

sudo mkdir /var/log/airline_tickets

Antes de iniciar o axente, cómpre configurar a súa configuración:

sudo vim /etc/aws-kinesis/agent.json

O contido do ficheiro agent.json debería verse así:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Como se pode ver no ficheiro de configuración, o axente supervisará os ficheiros coa extensión .log no directorio /var/log/airline_tickets/, analizalos e transferiráos ao fluxo airline_tickets.

Reiniciamos o servizo e asegurámonos de que estea funcionando:

sudo service aws-kinesis-agent restart

Agora imos descargar o script de Python que solicitará datos da API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

O script api_caller.py solicita datos a Aviasales e garda a resposta recibida no directorio que explora o axente de Kinesis. A implementación deste script é bastante estándar, hai unha clase TicketsApi, permítelle tirar a API de forma asíncrona. Pasamos unha cabeceira cun token e solicitamos parámetros a esta clase:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Para probar a configuración e a funcionalidade correctas do axente, probemos a executar o script api_caller.py:

sudo ./api_caller.py TOKEN

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
E observamos o resultado do traballo nos rexistros do axente e na pestana Monitorización do fluxo de datos airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Como podes ver, todo funciona e o axente Kinesis envía datos con éxito ao fluxo. Agora imos configurar o consumidor.

Configuración de Kinesis Data Analytics

Pasemos ao compoñente central de todo o sistema: cree unha nova aplicación en Kinesis Data Analytics chamada kinesis_analytics_airlines_app:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Kinesis Data Analytics permítelle realizar análises de datos en tempo real desde Kinesis Streams usando a linguaxe SQL. É un servizo de escalado totalmente automático (a diferenza de Kinesis Streams) que:

  1. permítelle crear novos fluxos (Output Stream) en función das solicitudes de datos de orixe;
  2. proporciona un fluxo con erros que se produciron mentres se executaban as aplicacións (Fluxo de erros);
  3. pode determinar automaticamente o esquema de datos de entrada (pódese redefinir manualmente se é necesario).

Este non é un servizo barato: 0.11 USD por hora de traballo, polo que debes usalo con coidado e borralo cando remates.

Conectemos a aplicación á fonte de datos:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Selecciona a emisión á que nos imos conectar (airline_tickets):

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
A continuación, cómpre anexar un novo rol de IAM para que a aplicación poida ler desde o fluxo e escribir no fluxo. Para iso, abonda con non cambiar nada no bloque de permisos de acceso:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Agora imos solicitar o descubrimento do esquema de datos no fluxo; para facelo, faga clic no botón "Descubrir esquema". Como resultado, o rol de IAM actualizarase (crearase un novo) e iniciarase a detección de esquemas a partir dos datos que xa chegaron ao fluxo:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Agora tes que ir ao editor SQL. Cando fai clic neste botón, aparecerá unha xanela na que lle pedirá que inicie a aplicación; seleccione o que quere iniciar:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Insira a seguinte consulta sinxela na xanela do editor SQL e faga clic en Gardar e executar SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Nas bases de datos relacionais, traballa con táboas usando instrucións INSERT para engadir rexistros e unha instrución SELECT para consultar datos. En Amazon Kinesis Data Analytics, traballa con fluxos (STREAM) e bombas (PUMP): solicitudes de inserción continuas que insiren datos dun fluxo dunha aplicación noutro fluxo.

A consulta SQL presentada anteriormente busca billetes de Aeroflot a un custo inferior a cinco mil rublos. Todos os rexistros que cumpran estas condicións colocaranse no fluxo DESTINATION_SQL_STREAM.

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
No bloque Destino, seleccione o fluxo especial_stream e na lista despregable Nome do fluxo na aplicación DESTINATION_SQL_STREAM:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
O resultado de todas as manipulacións debería ser algo semellante ao da seguinte imaxe:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Crear e subscribirse a un tema de SNS

Vaia ao Servizo de notificación simple e crea alí un novo tema co nome Aerolíneas:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Subscríbete a este tema e indica o número de teléfono móbil ao que se enviarán as notificacións por SMS:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Crea unha táboa en DynamoDB

Para almacenar os datos en bruto do seu fluxo airline_tickets, creemos unha táboa en DynamoDB co mesmo nome. Usaremos record_id como clave principal:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Creando un colector de funcións lambda

Imos crear unha función lambda chamada Collector, cuxa tarefa será sondear o fluxo airline_tickets e, se alí se atopan novos rexistros, inserir estes rexistros na táboa DynamoDB. Obviamente, ademais dos dereitos predeterminados, esta lambda debe ter acceso de lectura ao fluxo de datos de Kinesis e acceso de escritura a DynamoDB.

Creando un rol IAM para a función lambda do colector
En primeiro lugar, creemos un novo rol IAM para o lambda chamado Lambda-TicketsProcessingRole:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Para o exemplo de proba, as políticas preconfiguradas de AmazonKinesisReadOnlyAccess e AmazonDynamoDBFullAccess son bastante adecuadas, como se mostra na seguinte imaxe:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Este lambda debería ser iniciado por un disparador de Kinesis cando entran novas entradas no airline_stream, polo que necesitamos engadir un novo disparador:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Só queda inserir o código e gardar a lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creando un notificador de función lambda

A segunda función lambda, que supervisará o segundo fluxo (special_stream) e enviará unha notificación a SNS, créase dun xeito similar. Polo tanto, esta lambda debe ter acceso para ler desde Kinesis e enviar mensaxes a un determinado tema SNS, que despois serán enviados polo servizo SNS a todos os subscritores deste tema (correo electrónico, SMS, etc.).

Creando un rol IAM
En primeiro lugar, creamos o rol IAM Lambda-KinesisAlarm para este lambda e, a continuación, asignamos este rol ao alarm_notifier lambda que se está creando:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Este lambda debería funcionar nun disparador para que os novos rexistros entren no fluxo_special, polo que cómpre configurar o disparador do mesmo xeito que fixemos para o lambda de Collector.

Para facilitar a configuración deste lambda, introduzamos unha nova variable de ambiente - TOPIC_ARN, onde colocamos o ANR (Amazon Recourse Names) do tema Airlines:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
E introduce o código lambda, non é nada complicado:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Parece que aquí é onde se completa a configuración manual do sistema. Só queda probar e asegurarse de que temos todo configurado correctamente.

Implementar desde o código Terraform

Preparación necesaria

Terraform é unha ferramenta de código aberto moi conveniente para implementar infraestrutura desde código. Ten a súa propia sintaxe que é fácil de aprender e ten moitos exemplos de como e que implementar. O editor Atom ou Visual Studio Code ten moitos complementos útiles que facilitan o traballo con Terraform.

Podes descargar a distribución por iso. Unha análise detallada de todas as capacidades de Terraform está fóra do alcance deste artigo, polo que limitarémonos aos puntos principais.

Como comezar

O código completo do proxecto é no meu repositorio. Clonamos o repositorio para nós mesmos. Antes de comezar, debes asegurarte de que tes AWS CLI instalado e configurado, porque... Terraform buscará credenciais no ficheiro ~/.aws/credentials.

Unha boa práctica é executar o comando plan antes de implantar toda a infraestrutura para ver o que Terraform está a crear actualmente para nós na nube:

terraform.exe plan

Solicitarase que introduza un número de teléfono ao que enviar notificacións. Non é necesario ingresalo nesta fase.

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Unha vez analizado o plan de funcionamento do programa, podemos comezar a crear recursos:

terraform.exe apply

Despois de enviar este comando, pediráselle de novo que introduza un número de teléfono; marque "si" cando se amose unha pregunta sobre a realización das accións. Isto permitirá configurar toda a infraestrutura, realizar toda a configuración necesaria do EC2, despregar funcións lambda, etc.

Despois de que todos os recursos se crearon con éxito a través do código Terraform, cómpre entrar nos detalles da aplicación Kinesis Analytics (lamentablemente, non atopei como facelo directamente desde o código).

Inicie a aplicación:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Despois diso, debes establecer explícitamente o nome do fluxo na aplicación seleccionando na lista despregable:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Agora todo está listo para ir.

Probando a aplicación

Independentemente de como implantou o sistema, manualmente ou a través do código Terraform, funcionará igual.

Iniciamos sesión mediante SSH na máquina virtual EC2 onde está instalado Kinesis Agent e executamos o script api_caller.py

sudo ./api_caller.py TOKEN

Todo o que tes que facer é esperar unha SMS ao teu número:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
SMS: chega unha mensaxe ao teléfono en case 1 minuto:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor
Queda por ver se os rexistros foron gardados na base de datos DynamoDB para unha análise posterior e máis detallada. A táboa airline_tickets contén aproximadamente os seguintes datos:

Integración da API de Aviasales con Amazon Kinesis e simplicidade sen servidor

Conclusión

No transcurso do traballo realizado, construíuse un sistema de procesamento de datos en liña baseado en Amazon Kinesis. Consideráronse opcións para usar o axente Kinesis en conxunto con Kinesis Data Streams e análise en tempo real Kinesis Analytics mediante comandos SQL, así como a interacción de Amazon Kinesis con outros servizos de AWS.

Implementamos o sistema anterior de dúas formas: unha manual bastante longa e outra rápida a partir do código Terraform.

Todo o código fonte do proxecto está dispoñible no meu repositorio de GitHub, suxíroche que te familiarices con el.

Estou feliz de discutir o artigo, espero os teus comentarios. Espero unha crítica construtiva.

Desexo vostede éxito!

Fonte: www.habr.com

Engadir un comentario