Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Oi, Habr!

Você gosta de pilotar aviões? Adoro, mas durante o auto-isolamento também me apaixonei pela análise de dados de passagens aéreas de um recurso conhecido - Aviasales.

Hoje analisaremos o trabalho do Amazon Kinesis, construiremos um sistema de streaming com análises em tempo real, instalaremos o banco de dados NoSQL Amazon DynamoDB como principal armazenamento de dados e configuraremos notificações por SMS para tickets interessantes.

Todos os detalhes estão sob o corte! Ir!

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Introdução

Por exemplo, precisamos de acesso a API Aviasales. O acesso é gratuito e sem restrições, bastando se cadastrar na seção “Desenvolvedores” para receber seu token API para acessar os dados.

O objetivo principal deste artigo é dar uma compreensão geral do uso de streaming de informações na AWS; levamos em consideração que os dados retornados pela API utilizada não estão estritamente atualizados e são transmitidos do cache, que é formado com base em pesquisas feitas por usuários dos sites Aviasales.ru e Jetradar.com nas últimas 48 horas.

O agente Kinesis, instalado na máquina produtora, recebido por meio da API, analisará e transmitirá automaticamente os dados para o fluxo desejado por meio do Kinesis Data Analytics. A versão bruta deste fluxo será gravada diretamente na loja. O armazenamento de dados brutos implantado no DynamoDB permitirá análises mais profundas de tickets por meio de ferramentas de BI, como o AWS Quick Sight.

Consideraremos duas opções para implantar toda a infraestrutura:

  • Manual - via AWS Management Console;
  • A infraestrutura do código Terraform é para automatizadores preguiçosos;

Arquitetura do sistema desenvolvido

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Componentes usados:

  • API Aviasales — os dados retornados por esta API serão utilizados para todos os trabalhos subsequentes;
  • Instância de Produtor EC2 — uma máquina virtual normal na nuvem na qual o fluxo de dados de entrada será gerado:
    • Agente Kinesis é um aplicativo Java instalado localmente na máquina que fornece uma maneira fácil de coletar e enviar dados para o Kinesis (Kinesis Data Streams ou Kinesis Firehose). O agente monitora constantemente um conjunto de arquivos nos diretórios especificados e envia novos dados ao Kinesis;
    • Script do chamador de API — Um script Python que faz solicitações à API e coloca a resposta em uma pasta monitorada pelo Kinesis Agent;
  • Streams de dados Kinesis — serviço de streaming de dados em tempo real com amplas capacidades de escala;
  • Kinesis Analytics é um serviço sem servidor que simplifica a análise de streaming de dados em tempo real. O Amazon Kinesis Data Analytics configura recursos de aplicativos e escala automaticamente para lidar com qualquer volume de dados recebidos;
  • AWS Lambda — um serviço que permite executar código sem fazer backup ou configurar servidores. Todo o poder de computação é dimensionado automaticamente para cada chamada;
  • Amazon DynamoDB - Um banco de dados de pares de valores-chave e documentos que fornece latência inferior a 10 milissegundos ao executar em qualquer escala. Ao usar o DynamoDB, você não precisa provisionar, corrigir ou gerenciar nenhum servidor. O DynamoDB dimensiona tabelas automaticamente para ajustar a quantidade de recursos disponíveis e manter o alto desempenho. Nenhuma administração do sistema é necessária;
  • Amazon SNS - um serviço totalmente gerenciado para envio de mensagens usando o modelo editor-assinante (Pub/Sub), com o qual você pode isolar microsserviços, sistemas distribuídos e aplicações serverless. O SNS pode ser usado para enviar informações aos usuários finais por meio de notificações push móveis, mensagens SMS e e-mails.

Treinamento inicial

Para emular o fluxo de dados, decidi utilizar as informações das passagens aéreas retornadas pela API Aviasales. EM documentação uma lista bastante extensa de métodos diferentes, vamos pegar um deles - “Calendário de Preços Mensal”, que retorna os preços de cada dia do mês, agrupados pelo número de transferências. Caso não especifique o mês de busca na solicitação, serão retornadas informações referentes ao mês seguinte ao atual.

Então, vamos nos cadastrar e pegar nosso token.

Um exemplo de solicitação está abaixo:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

O método acima de receber dados da API especificando um token na solicitação funcionará, mas prefiro passar o token de acesso pelo cabeçalho, então usaremos esse método no script api_caller.py.

Exemplo de resposta:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

O exemplo de resposta da API acima mostra uma passagem de São Petersburgo para Phuk... Ah, que sonho...
Como sou de Kazan e Phuket agora é “apenas um sonho”, vamos procurar passagens de São Petersburgo para Kazan.

Presume-se que você já tenha uma conta AWS. Gostaria de chamar desde já especial atenção para o facto de o Kinesis e o envio de notificações por SMS não estarem incluídos na tarifa anual Nível gratuito (uso gratuito). Mas mesmo apesar disso, com alguns dólares em mente, é perfeitamente possível construir o sistema proposto e brincar com ele. E, claro, não se esqueça de excluir todos os recursos depois que eles não forem mais necessários.

Felizmente, as funções DynamoDb e lambda serão gratuitas para nós se atingirmos nossos limites mensais gratuitos. Por exemplo, para DynamoDB: 25 GB de armazenamento, 25 WCU/RCU e 100 milhões de consultas. E um milhão de chamadas de função lambda por mês.

Implantação manual do sistema

Configurando fluxos de dados do Kinesis

Vamos para o serviço Kinesis Data Streams e criar dois novos streams, um fragmento para cada.

O que é um fragmento?
Um fragmento é a unidade básica de transferência de dados de um stream do Amazon Kinesis. Um segmento fornece transferência de dados de entrada a uma velocidade de 1 MB/s e transferência de dados de saída a uma velocidade de 2 MB/s. Um segmento suporta até 1000 entradas PUT por segundo. Ao criar um fluxo de dados, você precisa especificar o número necessário de segmentos. Por exemplo, você pode criar um fluxo de dados com dois segmentos. Este fluxo de dados fornecerá transferência de dados de entrada a 2 MB/s e transferência de dados de saída a 4 MB/s, suportando até 2000 registros PUT por segundo.

Quanto mais fragmentos houver no seu stream, maior será o rendimento. Em princípio, é assim que os fluxos são dimensionados - adicionando fragmentos. Mas quanto mais fragmentos você tiver, maior será o preço. Cada fragmento custa 1,5 centavos por hora e 1.4 centavos adicionais para cada milhão de unidades de carga útil PUT.

Vamos criar um novo stream com o nome passagens aéreas, 1 fragmento será suficiente para ele:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Agora vamos criar outro tópico com o nome stream_especial:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Configuração do produtor

Para analisar uma tarefa, basta utilizar uma instância regular do EC2 como produtora de dados. Não precisa ser uma máquina virtual poderosa e cara; um spot t2.micro servirá perfeitamente.

Observação importante: por exemplo, você deve usar imagem - Amazon Linux AMI 2018.03.0, ela tem menos configurações para iniciar rapidamente o Kinesis Agent.

Acesse o serviço EC2, crie uma nova máquina virtual, selecione a AMI desejada com tipo t2.micro, que está incluída no Free Tier:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Para que a máquina virtual recém-criada possa interagir com o serviço Kinesis, ela deve receber direitos para fazê-lo. A melhor maneira de fazer isso é atribuir uma função IAM. Portanto, na tela Etapa 3: Configurar detalhes da instância, você deve selecionar Criar nova função IAM:

Criando uma função IAM para EC2
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Na janela que se abre, selecione que estamos criando uma nova função para EC2 e vá para a seção Permissões:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Usando o exemplo de treinamento, não precisamos entrar em todos os meandros da configuração granular de direitos de recursos, então selecionaremos as políticas pré-configuradas pela Amazon: AmazonKinesisFullAccess e CloudWatchFullAccess.

Vamos dar um nome significativo para esta função, por exemplo: EC2-KinesisStreams-FullAccess. O resultado deve ser o mesmo mostrado na imagem abaixo:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Após criar esta nova função, não esqueça de anexá-la à instância de máquina virtual criada:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Não alteramos mais nada nesta tela e passamos para as próximas janelas.

As configurações do disco rígido podem ser deixadas como padrão, assim como as tags (embora seja uma boa prática usar tags, pelo menos dê um nome à instância e indique o ambiente).

Agora estamos na etapa 6: guia Configurar grupo de segurança, onde você precisa criar um novo ou especificar seu grupo de segurança existente, que permite conectar-se via ssh (porta 22) à instância. Selecione Fonte -> Meu IP e você poderá iniciar a instância.

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Assim que ele mudar para o status de execução, você pode tentar conectar-se a ele via ssh.

Para poder trabalhar com o Kinesis Agent, após conectar-se com sucesso à máquina, você deve inserir os seguintes comandos no terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Vamos criar uma pasta para salvar as respostas da API:

sudo mkdir /var/log/airline_tickets

Antes de iniciar o agente, você precisa configurar sua configuração:

sudo vim /etc/aws-kinesis/agent.json

O conteúdo do arquivo agent.json deve ser semelhante a este:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Como pode ser visto no arquivo de configuração, o agente irá monitorar arquivos com extensão .log no diretório /var/log/airline_tickets/, analisá-los e transferi-los para o fluxoairline_tickets.

Reiniciamos o serviço e verificamos se ele está funcionando:

sudo service aws-kinesis-agent restart

Agora vamos baixar o script Python que solicitará os dados da API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

O script api_caller.py solicita dados do Aviasales e salva a resposta recebida no diretório que o agente Kinesis verifica. A implementação deste script é bastante padrão, existe uma classe TicketsApi, que permite extrair a API de forma assíncrona. Passamos um cabeçalho com um token e solicitamos parâmetros para esta classe:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Para testar as configurações corretas e a funcionalidade do agente, vamos testar a execução do script api_caller.py:

sudo ./api_caller.py TOKEN

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
E observamos o resultado do trabalho nos logs do Agente e na guia Monitoramento no fluxo de dados da companhia aérea_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Como você pode ver, tudo funciona e o Kinesis Agent envia dados para o stream com sucesso. Agora vamos configurar o consumidor.

Configurando o Kinesis Data Analytics

Vamos passar para o componente central de todo o sistema: crie um novo aplicativo no Kinesis Data Analytics chamado kinesis_analytics_airlines_app:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
O Kinesis Data Analytics permite realizar análises de dados em tempo real do Kinesis Streams usando a linguagem SQL. É um serviço de escalonamento totalmente automático (ao contrário do Kinesis Streams) que:

  1. permite criar novos fluxos (Output Stream) com base em solicitações de dados de origem;
  2. fornece um fluxo com erros que ocorreram durante a execução dos aplicativos (Error Stream);
  3. pode determinar automaticamente o esquema de dados de entrada (pode ser redefinido manualmente, se necessário).

Este não é um serviço barato - 0.11 USD por hora de trabalho, portanto você deve usá-lo com cuidado e excluí-lo quando terminar.

Vamos conectar o aplicativo à fonte de dados:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Selecione o stream ao qual nos conectaremos (airline_tickets):

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Em seguida, você precisa anexar uma nova função do IAM para que o aplicativo possa ler e gravar no stream. Para isso, basta não alterar nada no bloco Permissões de acesso:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Agora vamos solicitar a descoberta do esquema de dados no stream; para isso, clique no botão “Descobrir esquema”. Como resultado, a função IAM será atualizada (uma nova será criada) e a detecção de esquema será iniciada a partir dos dados que já chegaram ao fluxo:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Agora você precisa ir para o editor SQL. Ao clicar neste botão, aparecerá uma janela solicitando que você inicie o aplicativo - selecione o que deseja iniciar:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Insira a seguinte consulta simples na janela do editor SQL e clique em Salvar e executar SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Em bancos de dados relacionais, você trabalha com tabelas usando instruções INSERT para adicionar registros e uma instrução SELECT para consultar dados. No Amazon Kinesis Data Analytics, você trabalha com streams (STREAMs) e bombas (PUMPs) — solicitações de inserção contínua que inserem dados de um stream de uma aplicação em outro stream.

A consulta SQL apresentada acima procura bilhetes da Aeroflot com custo inferior a cinco mil rublos. Todos os registros que atenderem a essas condições serão colocados no fluxo DESTINATION_SQL_STREAM.

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
No bloco Destino, selecione o fluxo special_stream e na lista suspensa Nome do fluxo no aplicativo DESTINATION_SQL_STREAM:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
O resultado de todas as manipulações deve ser algo semelhante à imagem abaixo:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Criando e assinando um tópico SNS

Acesse o Serviço de Notificação Simples e crie um novo tópico lá com o nome Companhias Aéreas:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Assine este tópico e indique o número do celular para o qual serão enviadas as notificações por SMS:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Crie uma tabela no DynamoDB

Para armazenar os dados brutos de seu streamairline_tickets, vamos criar uma tabela no DynamoDB com o mesmo nome. Usaremos record_id como chave primária:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Criando um coletor de função lambda

Vamos criar uma função lambda chamada Collector, cuja tarefa será sondar o fluxoairline_tickets e, caso sejam encontrados novos registros lá, inserir esses registros na tabela DynamoDB. Obviamente, além dos direitos padrão, esse lambda deve ter acesso de leitura ao fluxo de dados do Kinesis e acesso de gravação ao DynamoDB.

Criando uma função IAM para a função lambda do coletor
Primeiro, vamos criar uma nova função IAM para o lambda chamada Lambda-TicketsProcessingRole:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Para o exemplo de teste, as políticas pré-configuradas AmazonKinesisReadOnlyAccess e AmazonDynamoDBFullAccess são bastante adequadas, conforme mostrado na imagem abaixo:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Este lambda deve ser iniciado por um gatilho do Kinesis quando novas entradas entrarem noairline_stream, portanto, precisamos adicionar um novo gatilho:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Resta inserir o código e salvar o lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Criando um notificador de função lambda

A segunda função lambda, que irá monitorar o segundo stream (special_stream) e enviar uma notificação ao SNS, é criada de forma semelhante. Portanto, este lambda deve ter acesso para ler do Kinesis e enviar mensagens para um determinado tópico do SNS, que serão então enviadas pelo serviço SNS a todos os assinantes deste tópico (e-mail, SMS, etc.).

Criando uma função do IAM
Primeiro, criamos a função IAM Lambda-KinesisAlarm para este lambda e, em seguida, atribuímos essa função ao lambda alarm_notifier que está sendo criado:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Este lambda deve funcionar em um trigger para que novos registros entrem no special_stream, então você precisa configurar o trigger da mesma forma que fizemos para o lambda do Coletor.

Para facilitar a configuração deste lambda, vamos introduzir uma nova variável de ambiente - TOPIC_ARN, onde colocamos o ANR (Amazon Recourse Names) do tópico Airlines:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
E insira o código lambda, não é nada complicado:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Parece que é aqui que a configuração manual do sistema é concluída. Resta testar e ter certeza de que configuramos tudo corretamente.

Implantar a partir do código Terraform

Preparação necessária

Terraform é uma ferramenta de código aberto muito conveniente para implantar infraestrutura a partir de código. Possui sintaxe própria, fácil de aprender e muitos exemplos de como e o que implantar. O editor Atom ou Visual Studio Code possui muitos plug-ins úteis que facilitam o trabalho com o Terraform.

Você pode baixar a distribuição por isso. Uma análise detalhada de todas as capacidades do Terraform está além do escopo deste artigo, portanto nos limitaremos aos pontos principais.

Como correr

O código completo do projeto é no meu repositório. Clonamos o repositório para nós mesmos. Antes de começar, você precisa ter certeza de que possui o AWS CLI instalado e configurado, porque... O Terraform procurará credenciais no arquivo ~/.aws/credentials.

Uma boa prática é executar o comando plan antes de implantar toda a infraestrutura para ver o que o Terraform está criando atualmente para nós na nuvem:

terraform.exe plan

Você será solicitado a inserir um número de telefone para o qual enviar notificações. Não é necessário inseri-lo nesta fase.

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Analisado o plano de funcionamento do programa, podemos começar a criar recursos:

terraform.exe apply

Após enviar este comando, você será solicitado novamente a inserir um número de telefone; disque “sim” quando for exibida uma pergunta sobre a execução efetiva das ações. Isso permitirá que você configure toda a infraestrutura, realize todas as configurações necessárias do EC2, implante funções lambda, etc.

Depois que todos os recursos foram criados com sucesso por meio do código Terraform, você precisa entrar nos detalhes do aplicativo Kinesis Analytics (infelizmente não encontrei como fazer isso diretamente no código).

Inicie o aplicativo:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Depois disso, você deve definir explicitamente o nome do stream no aplicativo selecionando na lista suspensa:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Agora tudo está pronto para funcionar.

Testando o aplicativo

Independentemente de como você implantou o sistema, manualmente ou por meio do código Terraform, ele funcionará da mesma forma.

Efetuamos login via SSH na máquina virtual EC2 onde o Kinesis Agent está instalado e executamos o script api_caller.py

sudo ./api_caller.py TOKEN

Basta aguardar um SMS para o seu número:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
SMS - uma mensagem chega ao telefone em quase 1 minuto:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor
Resta saber se os registros foram salvos no banco de dados DynamoDB para posterior análise mais detalhada. A tabelaairline_tickets contém aproximadamente os seguintes dados:

Integração da API Aviasales com Amazon Kinesis e simplicidade sem servidor

Conclusão

No decorrer do trabalho realizado foi construído um sistema de processamento de dados online baseado no Amazon Kinesis. Foram consideradas opções de uso do Kinesis Agent em conjunto com Kinesis Data Streams e análise em tempo real do Kinesis Analytics usando comandos SQL, bem como a interação do Amazon Kinesis com outros serviços da AWS.

Implantamos o sistema acima de duas maneiras: uma manual bastante longa e uma rápida a partir do código Terraform.

Todo o código-fonte do projeto está disponível no meu repositório GitHub, sugiro que você se familiarize com ele.

Fico feliz em discutir o artigo, aguardo seus comentários. Espero críticas construtivas.

Desejo-lhe sucesso!

Fonte: habr.com

Adicionar um comentário