Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Hé Habr !

Aimez-vous voler des avions? J'adore ça, mais pendant l'auto-isolement, je suis également tombé amoureux de l'analyse des données sur les billets d'avion provenant d'une ressource bien connue - Aviasales.

Aujourd'hui, nous allons analyser le travail d'Amazon Kinesis, créer un système de streaming avec des analyses en temps réel, installer la base de données Amazon DynamoDB NoSQL comme stockage de données principal et configurer des notifications SMS pour les tickets intéressants.

Tous les détails sont sous la coupe ! Aller!

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

introduction

Pour l'exemple, nous avons besoin d'accéder à API Aviasales. L'accès à celui-ci est gratuit et sans restrictions, il vous suffit de vous inscrire dans la rubrique « Développeurs » pour recevoir votre token API pour accéder aux données.

L'objectif principal de cet article est de donner une compréhension générale de l'utilisation du streaming d'informations dans AWS ; nous tenons compte du fait que les données renvoyées par l'API utilisée ne sont pas strictement à jour et sont transmises depuis le cache, qui est formé sur la base des recherches effectuées par les utilisateurs des sites Aviasales.ru et Jetradar.com au cours des dernières 48 heures.

L'agent Kinesis, installé sur la machine de production, reçu via l'API analysera et transmettra automatiquement les données au flux souhaité via Kinesis Data Analytics. La version brute de ce flux sera écrite directement dans le magasin. Le stockage de données brutes déployé dans DynamoDB permettra une analyse plus approfondie des tickets via des outils de BI, tels qu'AWS Quick Sight.

Nous considérerons deux options pour déployer l'ensemble de l'infrastructure :

  • Manuel - via AWS Management Console ;
  • L'infrastructure du code Terraform est destinée aux automates paresseux ;

Architecture du système développé

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Composants utilisés :

  • API Aviasales — les données renvoyées par cette API seront utilisées pour tous les travaux ultérieurs ;
  • Instance de producteur EC2 — une machine virtuelle classique dans le cloud sur laquelle le flux de données d'entrée sera généré :
    • Agent Kinesis est une application Java installée localement sur la machine qui offre un moyen simple de collecter et d'envoyer des données à Kinesis (Kinesis Data Streams ou Kinesis Firehose). L'agent surveille en permanence un ensemble de fichiers dans les répertoires spécifiés et envoie de nouvelles données à Kinesis ;
    • Script de l'appelant API — Un script Python qui envoie des requêtes à l'API et place la réponse dans un dossier surveillé par l'agent Kinesis ;
  • Flux de données Kinesis — service de streaming de données en temps réel avec de larges capacités d'évolutivité ;
  • Analyse Kinesis est un service sans serveur qui simplifie l'analyse des données en streaming en temps réel. Amazon Kinesis Data Analytics configure les ressources de l'application et évolue automatiquement pour gérer n'importe quel volume de données entrantes ;
  • AWS Lambda — un service qui vous permet d'exécuter du code sans sauvegarder ni configurer de serveurs. Toute la puissance de calcul est automatiquement adaptée à chaque appel ;
  • Amazon DynamoDB - Une base de données de paires clé-valeur et de documents offrant une latence inférieure à 10 millisecondes lors d'une exécution à n'importe quelle échelle. Lorsque vous utilisez DynamoDB, vous n'avez pas besoin de provisionner, de corriger ou de gérer des serveurs. DynamoDB met automatiquement à l'échelle les tables pour ajuster la quantité de ressources disponibles et maintenir des performances élevées. Aucune administration système n’est requise ;
  • Amazon SNS - un service entièrement géré d'envoi de messages selon le modèle éditeur-abonné (Pub/Sub), avec lequel vous pouvez isoler les microservices, les systèmes distribués et les applications sans serveur. SNS peut être utilisé pour envoyer des informations aux utilisateurs finaux via des notifications push mobiles, des messages SMS et des e-mails.

Formation initiale

Pour émuler le flux de données, j'ai décidé d'utiliser les informations sur les billets d'avion renvoyées par l'API Aviasales. DANS documentation une liste assez longue de méthodes différentes, prenons l'une d'entre elles - "Monthly Price Calendar", qui renvoie les prix pour chaque jour du mois, regroupés par nombre de transferts. Si vous ne précisez pas le mois de recherche dans la demande, les informations seront renvoyées pour le mois suivant celui en cours.

Alors, inscrivons-nous et récupérons notre jeton.

Un exemple de demande est ci-dessous :

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

La méthode ci-dessus pour recevoir des données de l'API en spécifiant un jeton dans la requête fonctionnera, mais je préfère transmettre le jeton d'accès via l'en-tête, nous utiliserons donc cette méthode dans le script api_caller.py.

Exemple de réponse :

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

L'exemple de réponse API ci-dessus montre un billet de Saint-Pétersbourg à Phuk... Oh, quel rêve...
Puisque je viens de Kazan et que Phuket n’est désormais « qu’un rêve », cherchons des billets de Saint-Pétersbourg à Kazan.

Cela suppose que vous disposez déjà d'un compte AWS. Je voudrais tout de suite attirer une attention particulière sur le fait que Kinesis et l'envoi de notifications par SMS ne sont pas inclus dans le forfait annuel. Niveau gratuit (utilisation gratuite). Mais même malgré cela, avec quelques dollars en tête, il est tout à fait possible de construire le système proposé et de jouer avec. Et bien sûr, n’oubliez pas de supprimer toutes les ressources lorsqu’elles ne sont plus nécessaires.

Heureusement, les fonctions DynamoDb et lambda seront gratuites pour nous si nous respectons nos limites mensuelles gratuites. Par exemple, pour DynamoDB : 25 Go de stockage, 25 WCU/RCU et 100 millions de requêtes. Et un million d'appels de fonction lambda par mois.

Déploiement manuel du système

Configuration des flux de données Kinesis

Allons au service Kinesis Data Streams et créons deux nouveaux flux, une partition pour chacun.

Qu'est-ce qu'un fragment ?
Une partition est l'unité de transfert de données de base d'un flux Amazon Kinesis. Un segment fournit un transfert de données d'entrée à une vitesse de 1 Mo/s et un transfert de données de sortie à une vitesse de 2 Mo/s. Un segment prend en charge jusqu'à 1000 2 entrées PUT par seconde. Lors de la création d'un flux de données, vous devez spécifier le nombre requis de segments. Par exemple, vous pouvez créer un flux de données avec deux segments. Ce flux de données fournira un transfert de données d'entrée à 4 Mo/s et un transfert de données de sortie à 2000 Mo/s, prenant en charge jusqu'à XNUMX XNUMX enregistrements PUT par seconde.

Plus votre flux contient de fragments, plus son débit est élevé. En principe, c'est ainsi que les flux sont mis à l'échelle : en ajoutant des fragments. Mais plus vous possédez de fragments, plus le prix est élevé. Chaque fragment coûte 1,5 cents par heure et 1.4 cents supplémentaires pour chaque million d'unités de charge utile PUT.

Créons un nouveau flux avec le nom billets d'avion, 1 fragment lui suffira :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Créons maintenant un autre fil avec le nom flux_spécial:

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Configuration du producteur

Pour analyser une tâche, il suffit d'utiliser une instance EC2 classique comme producteur de données. Il n’est pas nécessaire qu’il s’agisse d’une machine virtuelle puissante et coûteuse ; un spot t2.micro fera très bien l’affaire.

Remarque importante : par exemple, vous devez utiliser image - Amazon Linux AMI 2018.03.0, elle a moins de paramètres pour lancer rapidement l'agent Kinesis.

Accédez au service EC2, créez une nouvelle machine virtuelle, sélectionnez l'AMI souhaitée avec le type t2.micro, qui est incluse dans l'offre gratuite :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Pour que la machine virtuelle nouvellement créée puisse interagir avec le service Kinesis, elle doit disposer des droits nécessaires pour le faire. La meilleure façon de procéder consiste à attribuer un rôle IAM. Par conséquent, sur l'écran Étape 3 : Configurer les détails de l'instance, vous devez sélectionner Créer un nouveau rôle IAM:

Création d'un rôle IAM pour EC2
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Dans la fenêtre qui s'ouvre, sélectionnez que nous créons un nouveau rôle pour EC2 et accédez à la section Autorisations :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
En utilisant l'exemple de formation, nous n'avons pas besoin d'entrer dans toutes les subtilités de la configuration granulaire des droits sur les ressources, nous sélectionnerons donc les politiques préconfigurées par Amazon : AmazonKinesisFullAccess et CloudWatchFullAccess.

Donnons un nom significatif à ce rôle, par exemple : EC2-KinesisStreams-FullAccess. Le résultat devrait être le même que celui indiqué dans l’image ci-dessous :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Après avoir créé ce nouveau rôle, n'oubliez pas de l'attacher à l'instance de machine virtuelle créée :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Nous ne modifions rien d’autre sur cet écran et passons aux fenêtres suivantes.

Les paramètres du disque dur peuvent être laissés par défaut, ainsi que les balises (bien qu'il soit recommandé d'utiliser des balises, donnez au moins un nom à l'instance et indiquez l'environnement).

Nous sommes maintenant à l'onglet Étape 6 : Configurer le groupe de sécurité, où vous devez en créer un nouveau ou spécifier votre groupe de sécurité existant, qui vous permet de vous connecter via ssh (port 22) à l'instance. Sélectionnez Source -> Mon IP là-bas et vous pourrez lancer l'instance.

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Dès qu'il passe en état d'exécution, vous pouvez essayer de vous y connecter via ssh.

Pour pouvoir travailler avec Kinesis Agent, après vous être connecté avec succès à la machine, vous devez saisir les commandes suivantes dans le terminal :

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Créons un dossier pour enregistrer les réponses de l'API :

sudo mkdir /var/log/airline_tickets

Avant de démarrer l'agent, vous devez configurer sa configuration :

sudo vim /etc/aws-kinesis/agent.json

Le contenu du fichier agent.json devrait ressembler à ceci :

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Comme le montre le fichier de configuration, l'agent surveillera les fichiers avec l'extension .log dans le répertoire /var/log/airline_tickets/, les analysera et les transférera vers le flux Airlines_tickets.

Nous redémarrons le service et nous assurons qu'il est opérationnel :

sudo service aws-kinesis-agent restart

Téléchargeons maintenant le script Python qui demandera des données à l'API :

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Le script api_caller.py demande des données à Aviasales et enregistre la réponse reçue dans le répertoire analysé par l'agent Kinesis. L'implémentation de ce script est assez standard, il existe une classe TicketsApi, elle permet d'extraire l'API de manière asynchrone. Nous transmettons un en-tête avec un jeton et des paramètres de requête à cette classe :

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Pour tester les paramètres et fonctionnalités corrects de l'agent, testons l'exécution du script api_caller.py :

sudo ./api_caller.py TOKEN

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Et nous regardons le résultat du travail dans les journaux des agents et dans l'onglet Surveillance du flux de données Airlines_tickets :

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Comme vous pouvez le constater, tout fonctionne et l'agent Kinesis envoie avec succès les données au flux. Configurons maintenant le consommateur.

Configuration de Kinesis Data Analytics

Passons au composant central de l'ensemble du système : créez une nouvelle application dans Kinesis Data Analytics nommée kinesis_analytics_airlines_app :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Kinesis Data Analytics vous permet d'effectuer des analyses de données en temps réel à partir de Kinesis Streams à l'aide du langage SQL. Il s'agit d'un service entièrement autoscaling (contrairement à Kinesis Streams) qui :

  1. vous permet de créer de nouveaux flux (Output Stream) en fonction des demandes de données source ;
  2. fournit un flux avec les erreurs survenues lors de l'exécution des applications (Error Stream) ;
  3. peut déterminer automatiquement le schéma des données d'entrée (il peut être redéfini manuellement si nécessaire).

Ce service n'est pas bon marché - 0.11 USD par heure de travail, vous devez donc l'utiliser avec précaution et le supprimer lorsque vous avez terminé.

Connectons l'application à la source de données :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Sélectionnez le flux auquel nous allons nous connecter (airline_tickets) :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Ensuite, vous devez attacher un nouveau rôle IAM afin que l'application puisse lire à partir du flux et écrire dans le flux. Pour cela, il suffit de ne rien changer dans le bloc Autorisations d'accès :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Demandons maintenant la découverte du schéma de données dans le flux ; pour cela, cliquez sur le bouton « Découvrir le schéma ». De ce fait, le rôle IAM sera mis à jour (un nouveau sera créé) et la détection de schéma sera lancée à partir des données déjà arrivées dans le flux :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Vous devez maintenant accéder à l'éditeur SQL. Lorsque vous cliquez sur ce bouton, une fenêtre apparaîtra vous demandant de lancer l'application - sélectionnez ce que vous souhaitez lancer :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Insérez la requête simple suivante dans la fenêtre de l'éditeur SQL et cliquez sur Enregistrer et exécuter SQL :

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

Dans les bases de données relationnelles, vous travaillez avec des tables à l'aide d'instructions INSERT pour ajouter des enregistrements et d'une instruction SELECT pour interroger des données. Dans Amazon Kinesis Data Analytics, vous travaillez avec des flux (STREAM) et des pompes (PUMP) : des requêtes d'insertion continues qui insèrent les données d'un flux d'une application dans un autre flux.

La requête SQL présentée ci-dessus recherche des billets Aeroflot d'un coût inférieur à cinq mille roubles. Tous les enregistrements qui remplissent ces conditions seront placés dans le flux DESTINATION_SQL_STREAM.

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Dans le bloc Destination, sélectionnez le flux special_stream et dans la liste déroulante Nom du flux intégré à l'application DESTINATION_SQL_STREAM :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Le résultat de toutes les manipulations devrait ressembler à l’image ci-dessous :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Créer et s'abonner à un sujet SNS

Accédez au service de notification simple et créez-y un nouveau sujet avec le nom Airlines :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Abonnez-vous à ce sujet et indiquez le numéro de téléphone mobile sur lequel les notifications SMS seront envoyées :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Créer une table dans DynamoDB

Pour stocker les données brutes de leur flux skyline_tickets, créons une table dans DynamoDB avec le même nom. Nous utiliserons record_id comme clé primaire :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Création d'un collecteur de fonctions lambda

Créons une fonction lambda appelée Collector, dont la tâche sera d'interroger le flux Airlines_tickets et, si de nouveaux enregistrements y sont trouvés, d'insérer ces enregistrements dans la table DynamoDB. Évidemment, en plus des droits par défaut, ce lambda doit avoir un accès en lecture au flux de données Kinesis et un accès en écriture à DynamoDB.

Création d'un rôle IAM pour la fonction lambda du collecteur
Tout d'abord, créons un nouveau rôle IAM pour le lambda nommé Lambda-TicketsProcessingRole :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Pour l'exemple de test, les stratégies AmazonKinesisReadOnlyAccess et AmazonDynamoDBFullAccess préconfigurées conviennent parfaitement, comme le montre l'image ci-dessous :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Ce lambda doit être lancé par un déclencheur de Kinesis lorsque de nouvelles entrées entrent dans le skyline_stream, nous devons donc ajouter un nouveau déclencheur :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Il ne reste plus qu'à insérer le code et à sauvegarder le lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Création d'un notificateur de fonction lambda

La deuxième fonction lambda, qui surveillera le deuxième flux (special_stream) et enverra une notification à SNS, est créée de la même manière. Ce lambda doit donc avoir accès pour lire depuis Kinesis et envoyer des messages vers un sujet SNS donné, qui seront ensuite envoyés par le service SNS à tous les abonnés de ce sujet (email, SMS, etc.).

Création d'un rôle IAM
Tout d'abord, nous créons le rôle IAM Lambda-KinesisAlarm pour ce lambda, puis attribuons ce rôle au lambda alarm_notifier en cours de création :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Ce lambda devrait fonctionner sur un déclencheur pour que les nouveaux enregistrements entrent dans le flux_spécial, vous devez donc configurer le déclencheur de la même manière que nous l'avons fait pour le lambda du collecteur.

Pour faciliter la configuration de ce lambda, introduisons une nouvelle variable d'environnement - TOPIC_ARN, où nous plaçons l'ANR (Amazon Recourse Names) du sujet Airlines :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Et insérez le code lambda, ce n’est pas compliqué du tout :

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Il semble que c'est ici que la configuration manuelle du système soit terminée. Il ne reste plus qu'à tester et s'assurer que nous avons tout configuré correctement.

Déployer à partir du code Terraform

Préparation requise

Terraform est un outil open source très pratique pour déployer une infrastructure à partir de code. Il possède sa propre syntaxe facile à apprendre et contient de nombreux exemples de comment et quoi déployer. L'éditeur Atom ou Visual Studio Code dispose de nombreux plugins pratiques qui facilitent le travail avec Terraform.

Vous pouvez télécharger la distribution par conséquent,. Une analyse détaillée de toutes les fonctionnalités de Terraform dépasse le cadre de cet article, nous nous limiterons donc aux points principaux.

Comment courir

Le code complet du projet est dans mon dépôt. Nous clonons le référentiel sur nous-mêmes. Avant de commencer, vous devez vous assurer qu'AWS CLI est installé et configuré, car... Terraform recherchera les informations d'identification dans le fichier ~/.aws/credentials.

Une bonne pratique consiste à exécuter la commande plan avant de déployer l'intégralité de l'infrastructure pour voir ce que Terraform crée actuellement pour nous dans le cloud :

terraform.exe plan

Vous serez invité à saisir un numéro de téléphone auquel envoyer des notifications. Il n’est pas nécessaire de le saisir à ce stade.

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Après avoir analysé le plan opérationnel du programme, nous pouvons commencer à créer des ressources :

terraform.exe apply

Après avoir envoyé cette commande, il vous sera à nouveau demandé de saisir un numéro de téléphone ; composez « oui » lorsqu'une question sur l'exécution réelle des actions s'affiche. Cela vous permettra de mettre en place toute l'infrastructure, de réaliser toute la configuration nécessaire d'EC2, de déployer des fonctions lambda, etc.

Une fois que toutes les ressources ont été créées avec succès via le code Terraform, vous devez entrer dans les détails de l'application Kinesis Analytics (malheureusement, je n'ai pas trouvé comment faire cela directement à partir du code).

Lancez l'application :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Après cela, vous devez définir explicitement le nom du flux dans l'application en sélectionnant dans la liste déroulante :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Maintenant, tout est prêt.

Tester l'application

Quelle que soit la manière dont vous avez déployé le système, manuellement ou via le code Terraform, il fonctionnera de la même manière.

Nous nous connectons via SSH à la machine virtuelle EC2 sur laquelle Kinesis Agent est installé et exécutons le script api_caller.py

sudo ./api_caller.py TOKEN

Il ne vous reste plus qu'à attendre un SMS sur votre numéro :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
SMS - un message arrive sur votre téléphone en presque 1 minute :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur
Il reste à voir si les enregistrements ont été enregistrés dans la base de données DynamoDB pour une analyse ultérieure plus détaillée. La table Airlines_tickets contient approximativement les données suivantes :

Intégration de l'API Aviasales avec Amazon Kinesis et simplicité sans serveur

Conclusion

Au cours des travaux effectués, un système de traitement de données en ligne basé sur Amazon Kinesis a été construit. Les options d'utilisation de Kinesis Agent conjointement avec Kinesis Data Streams et l'analyse en temps réel Kinesis Analytics à l'aide de commandes SQL, ainsi que l'interaction d'Amazon Kinesis avec d'autres services AWS ont été envisagées.

Nous avons déployé le système ci-dessus de deux manières : une manuelle assez longue et une rapide à partir du code Terraform.

Tout le code source du projet est disponible dans mon dépôt GitHub, je vous propose de vous y familiariser.

Je suis heureux de discuter de l’article, j’attends avec impatience vos commentaires. J'espère des critiques constructives.

Je vous souhaite du succès!

Source: habr.com

Ajouter un commentaire