Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Hey Habr!

Hutt Dir gär Fligeren ze fléien? Ech hunn et gär, awer wärend der Selbstisolatioun sinn ech och verléift gefall fir Daten iwwer Fluchbilljeeën aus enger bekannter Ressource ze analyséieren - Aviasales.

Haut wäerte mir d'Aarbecht vun Amazon Kinesis analyséieren, e Streamingsystem mat Echtzäitanalyse bauen, d'Amazon DynamoDB NoSQL Datebank als Haaptdatenlager installéieren an SMS Notifikatiounen fir interessant Ticketen opstellen.

All Detailer stinn ënnert dem Schnëtt! Gitt!

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Aféierung

Zum Beispill brauche mir Zougang zu Aviasales API. Zougang zu et gëtt gratis an ouni Restriktiounen zur Verfügung gestallt; Dir musst Iech just an der Rubrik "Entwéckler" registréieren fir Ären API Token ze kréien fir Zougang zu den Donnéeën ze kréien.

Den Haaptzweck vun dësem Artikel ass en allgemengt Verständnis vun der Notzung vun Informatiounsstreamen an AWS ze ginn; mir berécksiichtegen datt d'Donnéeën, déi vun der benotzt API zréckginn, net strikt aktuell sinn a vum Cache iwwerdroen ginn, wat ass geformt baséiert op Recherchen vun de Benotzer vun den Aviasales.ru an Jetradar.com Siten fir déi lescht 48 Stonnen.

Kinesis-Agent, op der produzéierter Maschinn installéiert, iwwer d'API kritt, wäert automatesch Daten op de gewënschten Stream iwwer Kinesis Data Analytics analyséieren an iwwerdroen. Déi raw Versioun vun dësem Stream gëtt direkt an de Buttek geschriwwe ginn. Déi raw Datelagerung, déi an DynamoDB ofgesat gëtt, erlaabt eng méi déif Ticketanalyse duerch BI Tools, wéi AWS Quick Sight.

Mir betruechten zwou Méiglechkeeten fir déi ganz Infrastruktur z'installéieren:

  • Manuell - iwwer AWS Management Console;
  • Infrastruktur vum Terraform Code ass fir faul Automater;

Architektur vum entwéckelte System

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Benotzt Komponenten:

  • Aviasales API - d'Donnéeën, déi vun dëser API zréckginn, gi fir all spéider Aarbecht benotzt;
  • EC2 Produzent Instanz - eng regulär virtuell Maschinn an der Wollek op där den Inputdatenstroum generéiert gëtt:
    • Kinesis Agent ass eng Java Applikatioun déi lokal op der Maschinn installéiert ass, déi en einfache Wee gëtt fir Daten op Kinesis (Kinesis Data Streams oder Kinesis Firehose) ze sammelen an ze schécken. Den Agent iwwerwaacht stänneg eng Rei vu Dateien an de spezifizéierte Verzeichnisser a schéckt nei Daten op Kinesis;
    • API Caller Script - E Python Skript dat Ufroe fir d'API mécht an d'Äntwert an en Dossier setzt deen vum Kinesis Agent iwwerwaacht gëtt;
  • Kinesis Data Streams - Echtzäit Daten Streaming Service mat breet Skaléierungsfäegkeeten;
  • Kinesis Analytics ass e Serverlosen Service deen d'Analyse vu Streamingdaten an Echtzäit vereinfacht. Amazon Kinesis Data Analytics konfiguréiert Applikatiounsressourcen an automatesch skaléiert fir all Volumen vun erakommen Daten ze handhaben;
  • AWS Lambda - e Service deen Iech erlaabt Code ze lafen ouni Backup ze maachen oder Serveren opzestellen. All Rechenkraaft gëtt automatesch fir all Uruff skaléiert;
  • Amazon DynamoDB - Eng Datebank vu Schlësselwäertpaaren an Dokumenter déi Latenz vu manner wéi 10 Millisekonnen ubitt wann se op all Skala lafen. Wann Dir DynamoDB benotzt, musst Dir keng Servere versuergen, patchen oder verwalten. DynamoDB skaléiert automatesch Dëscher fir d'Quantitéit u verfügbare Ressourcen unzepassen an héich Leeschtung ze halen. Keng Systemverwaltung ass erfuerderlech;
  • Amazon SNS - e komplett verwalteten Service fir Messagen ze schécken mat dem Publisher-Abonnent (Pub/Sub) Modell, mat deem Dir Mikroservicer, verdeelt Systemer a Serverlos Uwendungen isoléiere kënnt. SNS ka benotzt ginn fir Informatioun un Endbenotzer duerch mobil Push Notifikatiounen, SMSen an E-Mailen ze schécken.

Éischt Formatioun

Fir den Datefloss ze emuléieren, hunn ech beschloss d'Airline Ticket Informatioun ze benotzen déi vun der Aviasales API zréckkomm ass. IN Dokumentatioun zimmlech eng extensiv Lëscht vu verschiddene Methoden, loosst eis ee vun hinnen huelen - "Monthly Price Calendar", deen d'Präisser fir all Dag vum Mount zréckkënnt, gruppéiert no der Zuel vun den Transferen. Wann Dir net de Sich Mount an der Ufro uginn, gëtt Informatiounen fir de Mount no der aktueller zréckginn.

Also, loosst eis aschreiwen an eis Token kréien.

E Beispill Ufro ass hei ënnen:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Déi uewe genannte Method fir Daten vun der API ze kréien andeems en Token an der Ufro spezifizéiert gëtt funktionnéiert, awer ech léiwer den Zougangstoken duerch den Header ze passéieren, sou datt mir dës Method am api_caller.py Skript benotzen.

Äntwert Beispill:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

D'Beispill API Äntwert hei uewen weist en Ticket vu St. Petersburg op Phuk ... Oh, wat en Dram ...
Well ech aus Kazan sinn, a Phuket ass elo "nëmmen en Dram", loosst eis Tickete vu St. Petersburg op Kazan kucken.

Et gëtt ugeholl datt Dir schonn en AWS Kont hutt. Ech wëll direkt besonnesch opmierksam maachen datt d'Kinesis an d'Notifikatiounen per SMS net am jährlechen abegraff sinn Gratis Tier (gratis Notzung). Awer och trotz dësem, mat e puer Dollar am Kapp, ass et ganz méiglech de proposéierte System ze bauen a mat him ze spillen. An, natierlech, vergiesst net all Ressourcen ze läschen nodeems se net méi gebraucht ginn.

Glécklecherweis sinn DynamoDb a Lambda Funktiounen gratis fir eis wa mir eis monatlecht gratis Limiten treffen. Zum Beispill, fir DynamoDB: 25 GB Späichere, 25 WCU / RCU an 100 Milliounen Ufroen. An eng Millioun Lambda Funktioun Appellen pro Mount.

Manuell System Deployment

Kinesis Data Streams opsetzen

Loosst eis op de Kinesis Data Streams Service goen an zwee nei Streamen erstellen, ee Schnëtt fir all.

Wat ass e Schnéi?
E Shard ass d'Basisdatentransfer Eenheet vun engem Amazon Kinesis Stream. Ee Segment bitt Inputdatentransfer mat enger Geschwindegkeet vun 1 MB / s an Ausgangsdatentransfer mat enger Geschwindegkeet vun 2 MB / s. Ee Segment ënnerstëtzt bis zu 1000 PUT Entréen pro Sekonn. Wann Dir en Datestroum erstellt, musst Dir déi erfuerderlech Unzuel u Segmenter spezifizéieren. Zum Beispill kënnt Dir en Datestroum mat zwee Segmenter erstellen. Dësen Datestroum liwwert Inputdatentransfer mat 2 MB / s an Ausgangsdatentransfer bei 4 MB / s, ënnerstëtzt bis zu 2000 PUT-Records pro Sekonn.

Wat méi Stécker an Ärem Stroum, dest méi grouss ass säin Duerchsatz. Prinzipiell ass dëst wéi d'Flows scaléiert ginn - andeems d'Schärft bäigefüügt ginn. Awer wat méi Stécker Dir hutt, dest méi héich ass de Präis. All Shard kascht 1,5 Cent pro Stonn an zousätzlech 1.4 Cent fir all Millioun PUT Notzlaaschtunitéiten.

Loosst eis en neie Stream mam Numm erstellen Flugbilljeeën, 1 Schnéi geet him duer:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Loosst eis elo en anere Fuedem mam Numm erstellen speziell_stream:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Produzent Setup

Fir eng Aufgab ze analyséieren, ass et genuch fir eng regulär EC2 Instanz als Dateproduzent ze benotzen. Et muss net eng mächteg, deier virtuell Maschinn sinn; e Punkt t2.micro wäert et gutt maachen.

Wichteg Notiz: zum Beispill, Dir sollt Bild benotzen - Amazon Linux AMI 2018.03.0, et huet manner Astellunge fir séier de Kinesis Agent ze starten.

Gitt op den EC2 Service, erstellt eng nei virtuell Maschinn, wielt de gewënschte AMI mam Typ t2.micro, deen am Free Tier abegraff ass:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Fir datt déi nei erstallt virtuell Maschinn fäeg ass mat dem Kinesis Service ze interagéieren, muss et Rechter kréien fir dat ze maachen. De beschte Wee fir dëst ze maachen ass eng IAM Roll ze ginn. Dofir, op der Schrëtt 3: Configure Instance Details Écran, sollt Dir wielen Erstellt eng nei IAM Roll:

Schafen eng IAM Roll fir EC2
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
An der Fënster déi opmaacht, wielt datt mir eng nei Roll fir EC2 erstellen a gitt op d'Permissiounen Sektioun:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Mat Hëllef vum Trainingsbeispill musse mir net an all d'Intricacies vun der granulärer Konfiguratioun vu Ressourcerechter goen, also wäerte mir d'Politik auswielen, déi vun Amazon virkonfiguréiert sinn: AmazonKinesisFullAccess an CloudWatchFullAccess.

Loosst eis e sënnvoll Numm fir dës Roll ginn, zum Beispill: EC2-KinesisStreams-FullAccess. D'Resultat sollt d'selwecht sinn wéi op der Foto hei ënnen:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Nodeems Dir dës nei Roll erstallt hutt, vergiesst net et un déi erstallt virtuell Maschinn Instanz ze befestigen:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Mir änneren näischt anescht op dësem Écran a réckelen op déi nächst Fënsteren.

D'Harddisk-Astellunge kënnen als Standard hannerlooss ginn, souwéi d'Tags (obwuel et gutt ass fir Tags ze benotzen, gitt op d'mannst d'Instanz en Numm an d'Ëmfeld uginn).

Elo si mir op de Schrëtt 6: Konfiguréieren Sécherheetsgrupp Tab, wou Dir eng nei erstellen musst oder Är existent Sécherheetsgrupp spezifizéieren, wat Iech erlaabt Iech iwwer ssh (Port 22) mat der Instanz ze verbannen. Wielt Quell -> Meng IP do an Dir kënnt d'Instanz starten.

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Soubal et op de Lafenstatus wiesselt, kënnt Dir probéieren mat ssh ze verbannen.

Fir mat Kinesis Agent ze schaffen, nodeems Dir erfollegräich mat der Maschinn verbënnt, musst Dir déi folgend Kommandoen am Terminal aginn:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Loosst eis en Dossier erstellen fir API Äntwerten ze späicheren:

sudo mkdir /var/log/airline_tickets

Ier Dir den Agent starten, musst Dir seng Configuratioun konfiguréieren:

sudo vim /etc/aws-kinesis/agent.json

Den Inhalt vun der agent.json Datei soll esou ausgesinn:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Wéi aus der Konfiguratiounsdatei gesi ka ginn, iwwerwaacht den Agent d'Dateien mat der .log Extensioun am /var/log/airline_tickets/ Verzeechnes, parse se an iwwerdroen se an de airline_tickets Stream.

Mir starten de Service nei a stellen sécher datt et op a leeft:

sudo service aws-kinesis-agent restart

Loosst eis elo de Python Skript eroflueden deen Daten vun der API ufroen:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

De Skript api_caller.py freet Daten vun Aviasales a späichert déi kritt Äntwert am Verzeechnes, deen de Kinesis Agent scannt. D'Ëmsetzung vun dësem Skript ass zimmlech Standard, et gëtt eng TicketsApi Klass, et erlaabt Iech asynchron d'API ze zéien. Mir passéieren en Header mat engem Token an froen Parameteren un dës Klass:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Fir déi richteg Astellungen a Funktionalitéit vum Agent ze testen, loosst eis den api_caller.py Skript testen:

sudo ./api_caller.py TOKEN

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
A mir kucken d'Resultat vun der Aarbecht an den Agent Logbicher an op der Iwwerwaachungs Tab am airline_tickets Datestroum:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Wéi Dir gesitt, funktionnéiert alles an de Kinesis Agent schéckt erfollegräich Daten op de Stroum. Loosst eis elo de Konsument konfiguréieren.

Kinesis Data Analytics opsetzen

Loosst eis op den zentrale Bestanddeel vum ganze System weidergoen - erstellt eng nei Applikatioun an Kinesis Data Analytics mam Numm kinesis_analytics_airlines_app:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Kinesis Data Analytics erlaabt Iech Echtzäit Datenanalyse vu Kinesis Streams mat der SQL Sprooch auszeféieren. Et ass e komplett Autoscaling Service (am Géigesaz zu Kinesis Streams) deen:

  1. erlaabt Iech nei Baachen ze kreéieren (Output Stream) baséiert op Ufroe fir Quelldaten;
  2. stellt e Stroum mat Fehler, déi geschitt sinn während d'Applikatioune lafen (Error Stream);
  3. kann den Inputdatenschema automatesch bestëmmen (et kann manuell nei definéiert ginn wann néideg).

Dëst ass net e bëllege Service - 0.11 USD pro Stonn Aarbecht, also sollt Dir et virsiichteg benotzen an et läschen wann Dir fäerdeg sidd.

Loosst eis d'Applikatioun mat der Datequell verbannen:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Wielt de Stream mat deem mir verbannen (airline_tickets):

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Als nächst musst Dir eng nei IAM Roll befestigen, sou datt d'Applikatioun aus dem Stream liest a schreift an de Stream. Fir dëst ze maachen, ass et genuch näischt am Access Permissiounen Block ze änneren:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Loosst eis elo d'Entdeckung vum Dateschema am Stream ufroen; fir dëst ze maachen, klickt op de "Discover Schema" Knäppchen. Als Resultat gëtt d'IAM Roll aktualiséiert (en neie gëtt erstallt) a Schema Detektioun gëtt vun den Donnéeën gestart, déi schonn am Stream ukomm sinn:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Elo musst Dir op de SQL Editor goen. Wann Dir op dëse Knäppchen klickt, erschéngt eng Fënster déi Iech freet d'Applikatioun ze starten - wielt wat Dir wëllt starten:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Füügt déi folgend einfach Ufro an d'SQL Editor-Fënster a klickt op Save and Run SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

A relational Datenbanken schafft Dir mat Dëscher mat INSERT Aussoen fir Rekorder ze addéieren an eng SELECT Ausso fir Daten ze froen. An Amazon Kinesis Data Analytics schafft Dir mat Streamen (STREAMs) a Pompelen (PUMPs) - kontinuéierlech Insert Ufroen déi Daten vun engem Stream an enger Applikatioun an en anere Stream setzen.

D'SQL Ufro uewe presentéiert sicht no Aeroflot Ticketen zu enger Käschte vu manner wéi fënnef dausend Rubel. All Opzeechnungen, déi dëse Konditiounen erfëllen, ginn am DESTINATION_SQL_STREAM Stream gesat.

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Am Destinatiounsblock, wielt de special_stream Stream, an an der In-Applikatioun Stream Numm DESTINATION_SQL_STREAM Dropdown Lëscht:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
D'Resultat vun all Manipulatioune sollt eppes ähnlech wéi d'Bild hei drënner sinn:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Erstellen an abonnéieren op en SNS Thema

Gitt op den Simple Notification Service a erstellt en neit Thema do mam Numm Airlines:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Abonnéiert Iech op dëst Thema a gitt d'Handynummer un, op déi d'SMS-Notifikatioune geschéckt ginn:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Erstellt en Dësch an DynamoDB

Fir déi rau Daten aus hirem Airline_tickets Stream ze späicheren, loosst eis en Dësch an DynamoDB mam selwechten Numm erstellen. Mir benotze record_id als primäre Schlëssel:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Schafen eng Lambda Funktioun Sammelstécker

Loosst eis eng Lambda Funktioun mam Numm Collector erstellen, deem seng Aufgab ass den Airline_tickets Stream ze pollen an, wann nei Opzeechnungen do fonnt ginn, dës Opzeechnungen an d'DynamoDB Tabelle setzen. Natierlech, zousätzlech zu de Standardrechter, muss dës Lambda Lieszougang zum Kinesis Datestroum hunn a Schreifzougang op DynamoDB hunn.

Schafen eng IAM Roll fir d'Sammler Lambda Funktioun
Als éischt, loosst eis eng nei IAM Roll erstellen fir d'Lambda mam Numm Lambda-TicketsProcessingRole:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Fir den Testbeispill sinn déi virkonfiguréiert AmazonKinesisReadOnlyAccess an AmazonDynamoDBFullAccess Politik ganz gëeegent, wéi am Bild hei ënnen gewisen:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Dës Lambda soll vun engem Ausléiser vu Kinesis gestart ginn wann nei Entréen an d'Airline_stream erakommen, also musse mir en neien Ausléiser derbäi:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Alles wat bleift ass de Code anzeginn an d'Lambda späicheren.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Schafen eng Lambda Funktioun Notifier

Déi zweet Lambda Funktioun, déi den zweete Stream (special_stream) iwwerwaacht an eng Notifikatioun un SNS schéckt, gëtt op eng ähnlech Manéier erstallt. Dofir muss dës Lambda Zougang hunn fir aus Kinesis ze liesen a Messagen un e bestëmmten SNS Thema ze schécken, deen dann vum SNS Service un all Abonnente vun dësem Thema geschéckt gëtt (E-Mail, SMS, etc.).

Schafen eng IAM Roll
Als éischt kreéiere mir d'IAM Roll Lambda-KinesisAlarm fir dës Lambda, an ginn dës Roll dann un den Alarm_notifier lambda zou:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Dës Lambda soll un engem Ausléiser schaffen fir nei Opzeechnunge fir an de special_stream anzeginn, also musst Dir den Ausléiser op déiselwecht Manéier konfiguréieren wéi mir fir de Collector lambda gemaach hunn.

Fir et méi einfach ze maachen dës Lambda ze konfiguréieren, loosst eis eng nei Ëmfeldvariabel aféieren - TOPIC_ARN, wou mir den ANR (Amazon Recourse Names) vum Airlines Thema placéieren:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
An setzt de Lambda Code an, et ass guer net komplizéiert:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Et schéngt, datt dëst ass wou déi manuell Systemkonfiguratioun fäerdeg ass. Alles wat bleift ass ze testen a sécherzestellen datt mir alles richteg konfiguréiert hunn.

Deploy vum Terraform Code

Noutwendeg Virbereedung

Terraform ass e ganz prakteschen Open-Source Tool fir Infrastruktur aus Code z'installéieren. Et huet seng eege Syntax déi einfach ze léieren ass an huet vill Beispiller vu wéi a wat fir z'installéieren. Den Atom Editor oder Visual Studio Code huet vill praktesch Plugins déi d'Aarbecht mat Terraform méi einfach maachen.

Dir kënnt d'Verdeelung eroflueden vun hei. Eng detailléiert Analyse vun all Terraform Fäegkeeten ass iwwer den Ëmfang vun dësem Artikel, also wäerte mir eis op d'Haaptpunkte limitéieren.

Wéi ufänken

De komplette Code vum Projet ass a mengem Repository. Mir klonen de Repository fir eis selwer. Ier Dir ufänkt, musst Dir sécher sinn datt Dir AWS CLI installéiert a konfiguréiert hutt, well ... Terraform sicht Umeldungsinformatiounen an der ~/.aws/credentials Datei.

Eng gutt Praxis ass de Plang Kommando auszeféieren ier Dir déi ganz Infrastruktur ofsetzt fir ze kucken wat Terraform am Moment fir eis an der Wollek erstellt:

terraform.exe plan

Dir wäert gefrot ginn eng Telefonsnummer unzeginn fir Notifikatiounen ze schécken. Et ass net néideg et op dëser Etapp anzeginn.

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Nodeems mir den Operatiounsplang vum Programm analyséiert hunn, kënne mir ufänken Ressourcen ze kreéieren:

terraform.exe apply

Nodeems Dir dëse Kommando geschéckt hutt, gitt Dir nach eng Kéier gefrot eng Telefonsnummer anzeginn; wielt "Jo" wann eng Fro iwwer d'Aktivitéit vun den Aktiounen ugewise gëtt. Dëst erlaabt Iech déi ganz Infrastruktur opzestellen, all déi néideg Konfiguratioun vun EC2 auszeféieren, Lambda Funktiounen z'installéieren, asw.

Nodeems all Ressourcen erfollegräich duerch den Terraform Code erstallt goufen, musst Dir an d'Detailer vun der Kinesis Analytics Applikatioun goen (leider hunn ech net fonnt wéi ech dat direkt aus dem Code maachen).

Start der Applikatioun:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Duerno musst Dir explizit den In-Applikatioun Stream Numm setzen andeems Dir aus der Dropdown-Lëscht auswielt:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Elo ass alles prett fir ze goen.

Testen der Applikatioun

Egal wéi Dir de System ofgebaut hutt, manuell oder duerch Terraform Code, et funktionnéiert d'selwecht.

Mir aloggen iwwer SSH op d'EC2 virtuell Maschinn wou Kinesis Agent installéiert ass a lafen de api_caller.py Skript

sudo ./api_caller.py TOKEN

Alles wat Dir maache musst ass op eng SMS op Är Nummer ze waarden:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
SMS - e Message kënnt a bal 1 Minutt um Telefon:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet
Et bleift ze gesinn ob d'Records an der DynamoDB Datebank gespäichert goufen fir spéider, méi detailléiert Analyse. D'Airline_tickets Tabell enthält ongeféier déi folgend Donnéeën:

Aviasales API Integratioun mat Amazon Kinesis a Serverlos Einfachheet

Konklusioun

Am Laf vun der Aarbecht ass en Online Dateveraarbechtungssystem baséiert op Amazon Kinesis gebaut. Optiounen fir de Kinesis Agent a Verbindung mat Kinesis Data Streams an Echtzäitanalyse Kinesis Analytics mat SQL Kommandoen ze benotzen, souwéi d'Interaktioun vun Amazon Kinesis mat aneren AWS Servicer goufen berücksichtegt.

Mir hunn de uewe genannte System op zwou Weeër ofgebaut: e zimlech laange manuelle an e séiere vum Terraform Code.

All Projet Quellcode ass verfügbar a mengem GitHub Repository, Ech proposéieren Iech mat et vertraut.

Ech si frou den Artikel ze diskutéieren, ech freeën eis op Är Kommentaren. Ech hoffen op konstruktiv Kritik.

Ech wënschen Iech Erfolleg!

Source: will.com

Setzt e Commentaire