Aviasales API integration with Amazon Kinesis and serverless simplicity

Hey Habr!

Do you like to fly on airplanes? I love it, but during self-isolation I also fell in love with analyzing data on air tickets from one well-known resource - Aviasales.

Today we will analyze the work of Amazon Kinesis, build a streaming system with real-time analytics, install the Amazon DynamoDB NoSQL database as the main data store, and set up notification via SMS for interesting tickets.

All the details under the cut! Go!

Aviasales API integration with Amazon Kinesis and serverless simplicity

Introduction

For example, we need access to Aviasales API. Access to it is provided free of charge and without restrictions, you just need to register in the "Developers" section to get your API token to access data.

The main purpose of this article is to give a general understanding of the use of streaming information in AWS, we take it out of the brackets that the data returned by the API used is not strictly up-to-date and is transmitted from the cache, which is formed based on searches by users of the Aviasales.ru and Jetradar.com sites for last 48 hours.

The Kinesis-agent installed on the producer machine, received via the API, will automatically parse and transfer the data on the tickets to the required stream via Kinesis Data Analytics. The raw version of this stream will be written directly to the repository. The raw data storage deployed in DynamoDB will allow for deeper analysis of tickets through BI tools, such as AWS Quick Sight.

We will consider two options for deploying the entire infrastructure:

  • Manual - via AWS Management Console;
  • Infrastructure from the Terraform code - for lazy automators;

The architecture of the developed system

Aviasales API integration with Amazon Kinesis and serverless simplicity
Used components:

  • Aviasales API - the data returned by this API will be used for all subsequent work;
  • EC2 Producer Instance - a regular virtual machine in the cloud, on which the input data stream will be generated:
    • Kinesis Agent is a Java application installed locally on a machine that provides an easy way to collect and send data to Kinesis (Kinesis Data Streams or Kinesis Firehose). The agent constantly monitors a set of files in the specified directories and sends new data to Kinesis;
    • API Caller Script - A Python script that makes requests to the API and adds the response to a folder monitored by the Kinesis Agent;
  • Kinesis Data Streams — Real-time streaming service with high scalability;
  • Kinesis Analytics is a serverless service that simplifies the analysis of streaming data in real time. Amazon Kinesis Data Analytics configures resources to run applications and automatically scales to handle any amount of incoming data;
  • AWS Lambda - a service that allows you to run code without redundancy and server configuration. All computing power is automatically scaled for each call;
  • Amazon DynamoDB - a database of key-value pairs and documents that provides a delay of less than 10 milliseconds when working at any scale. With DynamoDB, you don't need to provision, patch, or manage any servers. DynamoDB automatically scales tables to adjust for available resources while maintaining high performance. No system administration is required;
  • Amazon SNS is a fully managed publish-subscribe (Pub/Sub) messaging service that can isolate microservices, distributed systems, and serverless applications. SNS can be used to distribute information to end users through mobile push notifications, SMS messages, and emails.

Initial training

To emulate the data flow, I decided to use the airfare information returned by the Aviasales API. IN documentation quite an extensive list of different methods, let's take one of them - "Monthly Price Calendar", which returns prices for each day of the month, grouped by the number of transfers. If you do not send the month of the search in the query, then the information for the month following the current one will be returned.

So, register, get your token.

An example request is below:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

The above method of getting data from the API with the token in the request will work, but I prefer to pass the access token through the header, so we will use this method in the api_caller.py script.

Answer example:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

The example API response above shows a ticket from St. Petersburg to Phuk… Oh, what a dream…
Since I am from Kazan, and Phuket is now "we are only dreaming", we will look for tickets from St. Petersburg to Kazan.

This assumes you already have an AWS account. Immediately I want to pay special attention to the fact that Kinesis and sending notifications via SMS are not included in the annual Free Tier (free to use). But even so, with a couple of dollars in mind, it is quite possible to build the proposed system and play with it. And, of course, do not forget to delete all resources after they are no longer needed.

Fortunately, DynamoDb and lambda functions will be shareware for us, as long as we meet the monthly free limits. For example, for DynamoDB: 25 GB storage, 25 WCU/RCU, and 100 million queries. And a million lambda function calls per month.

Manual system deployment

Setting up Kinesis Data Streams

Let's go to the Kinesis Data Streams service and create two new streams, one shard each.

What is a shard?
A shard is the basic data transfer unit of an Amazon Kinesis stream. One segment provides 1 MB/s input and 2 MB/s output. One segment supports up to 1000 PUT records per second. When creating a data flow, you must specify the desired number of segments. For example, you can create a data stream with two segments. This data stream will provide 2 MB/s input and 4 MB/s output, supporting up to 2000 PUTs per second.

The more shards in your stream, the greater its throughput. In principle, this is how flows are scaled - by adding shards. But the more shards you have, the higher the price. Each shard costs 1,5 cents per hour and an additional 1.4 cents for every million PUT payload units.

Let's create a new thread named airline_tickets, 1 shard will be enough for him:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Now let's create another thread named special_stream:

Aviasales API integration with Amazon Kinesis and serverless simplicity

Producer setup

It is enough to use a regular EC2 instance as a data producer to parse the task. It doesn't have to be a powerful, expensive VM, a spot t2.micro is fine.

Important note: for example, you should use image - Amazon Linux AMI 2018.03.0, with it there are fewer settings for quickly launching Kinesis Agent.

We go to the EC2 service, create a new virtual machine, select the desired AMI with the t2.micro type, which is included in the Free Tier:

Aviasales API integration with Amazon Kinesis and serverless simplicity
In order for the newly created virtual machine to be able to interact with the Kinesis service, it must be given permission to do so. The best way to do this is to assign an IAM Role. Therefore, on the Step 3: Configure Instance Details screen, select Create new IAM Role:

Creating an IAM role for EC2
Aviasales API integration with Amazon Kinesis and serverless simplicity
In the window that opens, select that we are creating a new role for EC2 and go to the Permissions section:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Using the training example, you can not go into all the intricacies of granular configuration of resource rights, so we will choose the policies preconfigured by Amazon: AmazonKinesisFullAccess and CloudWatchFullAccess.

Let's give some meaningful name for this role, for example: EC2-KinesisStreams-FullAccess. As a result, you should get the same as shown in the picture below:

Aviasales API integration with Amazon Kinesis and serverless simplicity
After creating this new role, do not forget to attach it to the created virtual machine instance:

Aviasales API integration with Amazon Kinesis and serverless simplicity
We do not change anything else on this screen and move on to the next windows.

You can leave the hard disk parameters as default, tags too (although it is good practice to use tags, at least name the instance and specify the environment).

Now we are on the Step 6: Configure Security Group tab, where you need to create a new one or specify the Security group you have, which allows you to connect via ssh (port 22) to the instance. Select Source -> My IP there and you can launch the instance.

Aviasales API integration with Amazon Kinesis and serverless simplicity
As soon as it changes to the running status, you can try to connect to it via ssh.

To be able to work with Kinesis Agent, after successfully connecting to the machine, you must enter the following commands in the terminal:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Let's create a folder to save the API responses:

sudo mkdir /var/log/airline_tickets

Before starting the agent, you need to configure its config:

sudo vim /etc/aws-kinesis/agent.json

The content of the agent.json file should look like this:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

As can be seen from the configuration file, the agent will monitor files with the .log extension in the /var/log/airline_tickets/ directory, parse them and pass them to the airline_tickets stream.

Restart the service and make sure it is up and running:

sudo service aws-kinesis-agent restart

Now let's download a Python script that will request data from the API:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

The api_caller.py script requests data from Aviasales and stores the received response in the directory that the Kinesis agent scans. The implementation of this script is quite standard, there is a TicketsApi class, it allows you to pull the API asynchronously. We pass the header with the token and request parameters to this class:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

To test the correctness of the settings and the operability of the agent, we will test run the api_caller.py script:

sudo ./api_caller.py TOKEN

Aviasales API integration with Amazon Kinesis and serverless simplicity
And we look at the result of work in the Agent logs and on the Monitoring tab in the airline_tickets data stream:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API integration with Amazon Kinesis and serverless simplicity
Aviasales API integration with Amazon Kinesis and serverless simplicity
As you can see, everything works and Kinesis Agent successfully sends data to the stream. Now let's set up the consumer.

Setting up Kinesis Data Analytics

Let's move on to the central component of the entire system - let's create a new application in Kinesis Data Analytics called kinesis_analytics_airlines_app:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Kinesis Data Analytics allows you to perform real-time data analytics from Kinesis Streams using the SQL language. It is a fully autoscaling service (unlike Kinesis Streams) that:

  1. allows you to create new streams (Output Stream) based on queries to the source data;
  2. provides a stream with errors that occurred during the operation of applications (Error Stream);
  3. can automatically determine the schema of the input data (it can be manually overridden if necessary).

This is an expensive service - 0.11 USD per hour of work, so you should use it carefully and delete it when you finish working.

Let's connect the application to the data source:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Select the stream to which we are going to connect (airline_tickets):

Aviasales API integration with Amazon Kinesis and serverless simplicity
Next, you need to attach a new IAM Role so that the application can read from the stream and write to the stream. To do this, it is enough not to change anything in the Access permissions block:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Now we will request the discovery of the data schema in the stream, for this we click on the “Discover schema” button. As a result, the IAM role will be updated (created) and schema discovery will be started from the data that has already arrived in the stream:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Now you need to go to the SQL editor. When you click on this button, a window will appear with a question about launching the application - select what we want to launch:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Insert the following simple query into the SQL editor window and click Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relational databases, you work with tables using INSERT statements to add records and SELECT statements to query data. In Amazon Kinesis Data Analytics, you work with streams (STREAM) and "pumps" (PUMPs), continuous insert queries that insert data from one stream in your application into another stream.

The above SQL query searches for Aeroflot tickets at a cost below five thousand rubles. All records that match these conditions will be placed on the DESTINATION_SQL_STREAM stream.

Aviasales API integration with Amazon Kinesis and serverless simplicity
In the Destination block, select the special_stream stream, and in the In-application stream name drop-down list DESTINATION_SQL_STREAM:

Aviasales API integration with Amazon Kinesis and serverless simplicity
As a result of all the manipulations, you should get something similar to the picture below:

Aviasales API integration with Amazon Kinesis and serverless simplicity

Create and subscribe to an SNS topic

Go to the Simple Notification Service and create a new topic there with the name Airlines:

Aviasales API integration with Amazon Kinesis and serverless simplicity
We subscribe to this topic, in it we indicate the mobile phone number to which SMS notifications will be sent:

Aviasales API integration with Amazon Kinesis and serverless simplicity

Creating a table in DynamoDB

To store the raw data from the airline_tickets stream, let's create a table in DynamoDB with the same name. We will use record_id as the primary key:

Aviasales API integration with Amazon Kinesis and serverless simplicity

Creating a lambda function collector

Let's create a lambda function called Collector, whose task is to poll the airline_tickets stream and, if new records are found there, insert these records into the DynamoDB table. Obviously, in addition to the default permissions, this lambda must have access to read the Kinesis data stream and write to DynamoDB.

Creating an IAM role for the collector lambda function
First, let's create a new IAM lambda role named Lambda-TicketsProcessingRole:

Aviasales API integration with Amazon Kinesis and serverless simplicity
For a test example, the pre-configured AmazonKinesisReadOnlyAccess and AmazonDynamoDBFullAccess policies are quite suitable, as shown in the picture below:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Aviasales API integration with Amazon Kinesis and serverless simplicity

This lambda should be triggered by Kinesis when new entries enter the airline_stream stream, so we need to add a new trigger:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Aviasales API integration with Amazon Kinesis and serverless simplicity
It remains to paste the code and save the lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Creating a lambda function notifier

The second lambda function, which will monitor the second stream (special_stream) and send a notification to the SNS, is created in the same way. Therefore, this lambda must have access to read from Kinesis and send messages to a given SNS topic, which will then be sent by the SNS service to all subscribers of this topic (email, SMS, etc.).

Creating an IAM role
First, we create the Lambda-KinesisAlarm IAM role for this lambda, and then we assign this role to the created alarm_notifier lambda:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Aviasales API integration with Amazon Kinesis and serverless simplicity

This lambda must work on a trigger for new records to enter the special_stream stream, so you need to configure the trigger in the same way as we did for the Collector lambda.

For the convenience of setting up this lambda, let's introduce a new environment variable - TOPIC_ARN, where we put the ANR (Amazon Recourse Names) of the Airlines topic:

Aviasales API integration with Amazon Kinesis and serverless simplicity
And we insert the lambda code, it is quite simple:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

It seems that this manual configuration of the system is completed. It remains only to test and make sure that we set everything up correctly.

Deploy from Terraform code

Necessary preparation

terraform - a very convenient open-source tool for deploying infrastructure from code. It has its own syntax, which is easy to learn and many examples of how and what to deploy. There are many handy plugins in the Atom editor or Visual Studio Code that make it easier to work with Terraform.

Distributor can be downloaded hence. A detailed analysis of all the features of Terraform is beyond the scope of this article, so we will limit ourselves to the main points.

How to start

The complete project code is in my repository. We clone our repository. Before starting, you need to make sure that you have AWS CLI installed and configured, because. Terraform will look for credentials in the ~/.aws/credentials file.

It is good practice to run the plan command before deploying the entire infrastructure to see what Terraform is currently creating for us in the cloud:

terraform.exe plan

You will be prompted to enter a phone number to send notifications to. At this stage, it is not necessary to enter it.

Aviasales API integration with Amazon Kinesis and serverless simplicity
After analyzing the program work plan, we can start creating resources:

terraform.exe apply

After sending this command, you will again be prompted to enter a phone number, type "yes" when the question about the actual execution of actions is displayed. This will allow you to raise the entire infrastructure, carry out all the necessary configuration of EC2, deploy lambda functions, etc.

After all resources have been successfully created through the Terraform code, you need to go into the details of the Kinesis Analytics application (unfortunately, I did not find how to do this right from the code).

We start the application:

Aviasales API integration with Amazon Kinesis and serverless simplicity
After that, you must explicitly set the in-application stream name by selecting from the drop-down list:

Aviasales API integration with Amazon Kinesis and serverless simplicity
Aviasales API integration with Amazon Kinesis and serverless simplicity
Now everything is ready to go.

Application testing

Regardless of how you deployed the system, manually or through the Terraform code, it will work the same way.

We go via SSH to the EC2 virtual machine where Kinesis Agent is installed and run the script api_caller.py

sudo ./api_caller.py TOKEN

It remains to wait for an SMS to your number:

Aviasales API integration with Amazon Kinesis and serverless simplicity
SMS - a message arrives on the phone in almost 1 minute:

Aviasales API integration with Amazon Kinesis and serverless simplicity
It remains to be seen whether the records are preserved in the DynamoDB database for subsequent, more detailed analysis. The airline_tickets table contains something like this:

Aviasales API integration with Amazon Kinesis and serverless simplicity

Conclusion

In the course of the work done, an online data processing system based on Amazon Kinesis was built. The options for using Kinesis Agent in conjunction with Kinesis Data Streams and real-time analytics of Kinesis Analytics using SQL commands, as well as the interaction of Amazon Kinesis with other AWS services, were considered.

We deployed the above system in two ways: a rather long manual one and a quick one from the Terraform code.

All project source code is available in my GitHub repositoryI suggest you take a look at it.

I am happy to discuss the article, I look forward to your comments. Hope for constructive criticism.

I wish you success!

Source: habr.com

Add a comment