Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Hej Habr!

Gillar du att flyga flygplan? Jag älskar det, men under självisolering blev jag också kär i att analysera data om flygbiljetter från en välkänd resurs - Aviasales.

Idag kommer vi att analysera Amazon Kinesis arbete, bygga ett streamingsystem med realtidsanalys, installera Amazon DynamoDB NoSQL-databasen som huvuddatalagring och ställa in SMS-aviseringar för intressanta biljetter.

Alla detaljer är under klippet! Gå!

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Inledning

För exemplet behöver vi tillgång till Aviasales API. Åtkomst till det tillhandahålls gratis och utan begränsningar; du behöver bara registrera dig i avsnittet "Utvecklare" för att få din API-token för att komma åt data.

Huvudsyftet med denna artikel är att ge en allmän förståelse för användningen av informationsströmning i AWS; vi tar hänsyn till att data som returneras av API:et som används inte är strikt uppdaterade och överförs från cachen, vilket är bildas baserat på sökningar av användare av Aviasales.ru och Jetradar.com webbplatser under de senaste 48 timmarna.

Kinesis-agent, installerad på den producerande maskinen, mottagen via API:et kommer automatiskt att analysera och överföra data till den önskade strömmen via Kinesis Data Analytics. Råversionen av denna stream kommer att skrivas direkt till butiken. Den rådatalagring som distribueras i DynamoDB kommer att möjliggöra djupare biljettanalys genom BI-verktyg, såsom AWS Quick Sight.

Vi kommer att överväga två alternativ för att distribuera hela infrastrukturen:

  • Manual - via AWS Management Console;
  • Infrastruktur från Terraform-kod är för lata automatörer;

Det utvecklade systemets arkitektur

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Komponenter som används:

  • Aviasales API — data som returneras av detta API kommer att användas för allt efterföljande arbete;
  • EC2 Producer Instance — en vanlig virtuell maskin i molnet på vilken indataströmmen kommer att genereras:
    • Kinesis Agent är en Java-applikation installerad lokalt på maskinen som ger ett enkelt sätt att samla in och skicka data till Kinesis (Kinesis Data Streams eller Kinesis Firehose). Agenten övervakar hela tiden en uppsättning filer i de angivna katalogerna och skickar ny data till Kinesis;
    • API-anropsskript — Ett Python-skript som gör förfrågningar till API:t och lägger svaret i en mapp som övervakas av Kinesis Agent;
  • Kinesis dataströmmar — Dataströmningstjänst i realtid med breda skalningsmöjligheter.
  • Kinesis Analytics är en serverlös tjänst som förenklar analysen av strömmande data i realtid. Amazon Kinesis Data Analytics konfigurerar applikationsresurser och skalar automatiskt för att hantera alla volymer av inkommande data;
  • AWS Lambda — en tjänst som låter dig köra kod utan att säkerhetskopiera eller ställa in servrar. All datorkraft skalas automatiskt för varje samtal;
  • Amazon DynamoDB - En databas med nyckel-värdepar och dokument som ger en latens på mindre än 10 millisekunder när den körs i valfri skala. När du använder DynamoDB behöver du inte tillhandahålla, korrigera eller hantera några servrar. DynamoDB skalar automatiskt tabeller för att justera mängden tillgängliga resurser och bibehålla hög prestanda. Ingen systemadministration krävs;
  • Amazon SNS - en helt hanterad tjänst för att skicka meddelanden med utgivare-abonnentmodellen (Pub/Sub), med vilken du kan isolera mikrotjänster, distribuerade system och serverlösa applikationer. SNS kan användas för att skicka information till slutanvändare genom mobila push-meddelanden, SMS och e-post.

Grundutbildning

För att efterlikna dataflödet bestämde jag mig för att använda flygbiljettinformationen som returneras av Aviasales API. I dokumentation en ganska omfattande lista över olika metoder, låt oss ta en av dem - "Månadspriskalender", som returnerar priser för varje dag i månaden, grupperade efter antalet överföringar. Om du inte anger sökmånad i begäran kommer information att returneras för månaden efter den aktuella.

Så låt oss registrera oss och få vår token.

Ett exempel på begäran är nedan:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ovanstående metod för att ta emot data från API:t genom att ange en token i begäran kommer att fungera, men jag föredrar att skicka åtkomsttoken genom rubriken, så vi kommer att använda den här metoden i api_caller.py-skriptet.

Svarsexempel:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Exemplet på API-svaret ovan visar en biljett från St. Petersburg till Phuk... Åh, vilken dröm...
Eftersom jag är från Kazan och Phuket nu "bara är en dröm", låt oss leta efter biljetter från St. Petersburg till Kazan.

Det förutsätter att du redan har ett AWS-konto. Jag vill genast uppmärksamma att Kinesis och att skicka aviseringar via SMS inte ingår i den årliga Gratis nivå (fri användning). Men trots detta, med ett par dollar i åtanke, är det fullt möjligt att bygga det föreslagna systemet och leka med det. Och, naturligtvis, glöm inte att ta bort alla resurser efter att de inte längre behövs.

Som tur är kommer DynamoDb och lambda-funktionerna att vara gratis för oss om vi når våra månatliga gratisgränser. Till exempel för DynamoDB: 25 GB lagringsutrymme, 25 WCU/RCU och 100 miljoner frågor. Och en miljon lambdafunktionssamtal per månad.

Manuell systeminstallation

Konfigurera Kinesis Dataströmmar

Låt oss gå till tjänsten Kinesis Data Streams och skapa två nya strömmar, en shard för varje.

Vad är en skärva?
En skärva är den grundläggande dataöverföringsenheten i en Amazon Kinesis-ström. Ett segment ger indataöverföring med en hastighet av 1 MB/s och utdataöverföring med en hastighet av 2 MB/s. Ett segment stöder upp till 1000 PUT-poster per sekund. När du skapar en dataström måste du ange det antal segment som krävs. Du kan till exempel skapa en dataström med två segment. Denna dataström ger indataöverföring med 2 MB/s och utdataöverföring med 4 MB/s, vilket stöder upp till 2000 PUT-poster per sekund.

Ju fler skärvor i din ström, desto större genomströmning. I princip är det så flöden skalas - genom att lägga till skärvor. Men ju fler skärvor du har, desto högre pris. Varje skärva kostar 1,5 cent per timme och ytterligare 1.4 cent för varje miljon PUT-nyttolastenheter.

Låt oss skapa en ny ström med namnet flyg biljetter, 1 skärva räcker för honom:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Låt oss nu skapa en annan tråd med namnet special_ström:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Inställning av producent

För att analysera en uppgift räcker det att använda en vanlig EC2-instans som dataproducent. Det behöver inte vara en kraftfull, dyr virtuell maskin; en spot t2.micro kommer att fungera bra.

Viktig notering: till exempel bör du använda bild - Amazon Linux AMI 2018.03.0, den har färre inställningar för att snabbt starta Kinesis Agent.

Gå till EC2-tjänsten, skapa en ny virtuell maskin, välj önskad AMI med typen t2.micro, som ingår i Free Tier:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
För att den nyskapade virtuella maskinen ska kunna interagera med Kinesis-tjänsten måste den ges rättigheter att göra det. Det bästa sättet att göra detta är att tilldela en IAM-roll. På skärmen Steg 3: Konfigurera instansdetaljer bör du därför välja Skapa ny IAM-roll:

Skapa en IAM-roll för EC2
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
I fönstret som öppnas väljer du att vi skapar en ny roll för EC2 och går till avsnittet Behörigheter:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Med hjälp av utbildningsexemplet behöver vi inte gå in på alla krångligheterna med detaljerad konfiguration av resursrättigheter, så vi väljer policyerna som förkonfigurerats av Amazon: AmazonKinesisFullAccess och CloudWatchFullAccess.

Låt oss ge den här rollen ett meningsfullt namn, till exempel: EC2-KinesisStreams-FullAccess. Resultatet bör vara detsamma som visas på bilden nedan:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Efter att ha skapat den här nya rollen, glöm inte att bifoga den till den skapade virtuella maskininstansen:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Vi ändrar inget annat på den här skärmen och går vidare till nästa fönster.

Hårddiskinställningarna kan lämnas som standard, liksom taggarna (även om det är bra att använda taggar, ge åtminstone instansen ett namn och ange miljön).

Nu är vi på fliken Steg 6: Konfigurera säkerhetsgrupp, där du behöver skapa en ny eller ange din befintliga säkerhetsgrupp, som låter dig ansluta via ssh (port 22) till instansen. Välj Källa -> Min IP där så kan du starta instansen.

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Så fort den växlar till körstatus kan du försöka ansluta till den via ssh.

För att kunna arbeta med Kinesis Agent, efter att ha lyckats ansluta till maskinen, måste du ange följande kommandon i terminalen:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Låt oss skapa en mapp för att spara API-svar:

sudo mkdir /var/log/airline_tickets

Innan du startar agenten måste du konfigurera dess konfiguration:

sudo vim /etc/aws-kinesis/agent.json

Innehållet i filen agent.json bör se ut så här:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Som framgår av konfigurationsfilen kommer agenten att övervaka filer med filtillägget .log i katalogen /var/log/airline_tickets/, analysera dem och överföra dem till airline_tickets-strömmen.

Vi startar om tjänsten och ser till att den är igång:

sudo service aws-kinesis-agent restart

Låt oss nu ladda ner Python-skriptet som kommer att begära data från API:et:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Skriptet api_caller.py begär data från Aviasales och sparar det mottagna svaret i katalogen som Kinesis-agenten skannar. Implementeringen av detta skript är ganska standard, det finns en TicketsApi-klass, den låter dig dra asynkront API:et. Vi skickar en rubrik med en token och begär parametrar till denna klass:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

För att testa korrekta inställningar och funktionalitet hos agenten, låt oss testköra api_caller.py-skriptet:

sudo ./api_caller.py TOKEN

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Och vi tittar på resultatet av arbetet i agentloggarna och på fliken Övervakning i airline_tickets-dataströmmen:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Som du kan se fungerar allt och Kinesis Agent skickar framgångsrikt data till strömmen. Låt oss nu konfigurera konsumenten.

Konfigurera Kinesis Data Analytics

Låt oss gå vidare till den centrala komponenten i hela systemet - skapa en ny applikation i Kinesis Data Analytics med namnet kinesis_analytics_airlines_app:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Kinesis Data Analytics låter dig utföra dataanalys i realtid från Kinesis Streams med hjälp av SQL-språket. Det är en helt automatisk skalningstjänst (till skillnad från Kinesis Streams) som:

  1. låter dig skapa nya strömmar (Output Stream) baserat på förfrågningar om att källdata;
  2. tillhandahåller en ström med fel som uppstod när applikationer kördes (Error Stream);
  3. kan automatiskt bestämma indataschemat (det kan omdefinieras manuellt vid behov).

Det här är ingen billig tjänst - 0.11 USD per timmes arbete, så du bör använda den försiktigt och ta bort den när du är klar.

Låt oss ansluta applikationen till datakällan:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Välj strömmen vi ska ansluta till (airline_tickets):

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Därefter måste du bifoga en ny IAM-roll så att applikationen kan läsa från strömmen och skriva till strömmen. För att göra detta räcker det att inte ändra något i åtkomstbehörighetsblocket:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Låt oss nu begära upptäckt av dataschemat i strömmen; för att göra detta klickar du på knappen "Upptäck schema". Som ett resultat kommer IAM-rollen att uppdateras (en ny kommer att skapas) och schemaidentifiering kommer att startas från data som redan har anlänt i flödet:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Nu måste du gå till SQL-redigeraren. När du klickar på den här knappen visas ett fönster som ber dig att starta programmet - välj vad du vill starta:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Infoga följande enkla fråga i SQL-redigeringsfönstret och klicka på Spara och kör SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

I relationsdatabaser arbetar du med tabeller med hjälp av INSERT-satser för att lägga till poster och en SELECT-sats för att fråga data. I Amazon Kinesis Data Analytics arbetar du med strömmar (STREAMs) och pumpar (PUMPs) – kontinuerliga infogningsbegäranden som infogar data från en ström i en applikation till en annan ström.

SQL-frågan som presenteras ovan söker efter Aeroflot-biljetter till en kostnad under fem tusen rubel. Alla poster som uppfyller dessa villkor kommer att placeras i DESTINATION_SQL_STREAM-strömmen.

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
I Destination-blocket väljer du special_stream-strömmen och i listrutan In-application stream name DESTINATION_SQL_STREAM:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Resultatet av alla manipulationer bör vara något som liknar bilden nedan:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Skapa och prenumerera på ett SNS-ämne

Gå till Simple Notification Service och skapa ett nytt ämne där med namnet Airlines:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Prenumerera på detta ämne och ange vilket mobiltelefonnummer som SMS-aviseringar kommer att skickas till:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Skapa en tabell i DynamoDB

För att lagra rådata från deras airline_tickets-ström, låt oss skapa en tabell i DynamoDB med samma namn. Vi kommer att använda record_id som primärnyckel:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Skapa en lambdafunktionsuppsamlare

Låt oss skapa en lambdafunktion som heter Collector, vars uppgift kommer att vara att polla strömmen airline_tickets och, om nya poster hittas där, infoga dessa poster i DynamoDB-tabellen. Uppenbarligen, förutom standardrättigheterna, måste denna lambda ha läsbehörighet till Kinesis dataström och skrivåtkomst till DynamoDB.

Skapa en IAM-roll för collector lambda-funktionen
Låt oss först skapa en ny IAM-roll för lambdan som heter Lambda-TicketsProcessingRole:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
För testexemplet är de förkonfigurerade AmazonKinesisReadOnlyAccess- och AmazonDynamoDBFullAccess-policyerna ganska lämpliga, som visas på bilden nedan:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Denna lambda bör lanseras av en trigger från Kinesis när nya poster kommer in i airline_stream, så vi måste lägga till en ny trigger:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Allt som återstår är att sätta in koden och spara lambdan.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Skapa en lambdafunktionsmeddelande

Den andra lambdafunktionen, som kommer att övervaka den andra strömmen (special_stream) och skicka ett meddelande till SNS, skapas på liknande sätt. Därför måste denna lambda ha tillgång att läsa från Kinesis och skicka meddelanden till ett givet SNS-ämne, som sedan kommer att skickas av SNS-tjänsten till alla prenumeranter av detta ämne (e-post, SMS, etc.).

Skapa en IAM-roll
Först skapar vi IAM-rollen Lambda-KinesisAlarm för denna lambda och tilldelar sedan denna roll till alarm_notifier lambda som skapas:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Denna lambda bör fungera på en trigger för nya poster att komma in i special_stream, så du måste konfigurera triggern på samma sätt som vi gjorde för Collector lambdan.

För att göra det enklare att konfigurera denna lambda, låt oss introducera en ny miljövariabel - TOPIC_ARN, där vi placerar ANR (Amazon Recourse Names) för flygbolagsämnet:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Och sätt in lambdakoden, det är inte alls komplicerat:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Det verkar som att det är här den manuella systemkonfigurationen är klar. Allt som återstår är att testa och se till att vi har konfigurerat allt korrekt.

Distribuera från Terraform-kod

Krävs förberedelse

Terraform är ett mycket bekvämt verktyg med öppen källkod för att distribuera infrastruktur från kod. Den har sin egen syntax som är lätt att lära sig och har många exempel på hur och vad som ska användas. Atom-redigeraren eller Visual Studio Code har många praktiska plugins som gör det enklare att arbeta med Terraform.

Du kan ladda ner distributionen hence. En detaljerad analys av alla Terraform-funktioner ligger utanför ramen för denna artikel, så vi kommer att begränsa oss till huvudpunkterna.

Hur man börjar

Hela koden för projektet är i mitt förråd. Vi klonar förvaret till oss själva. Innan du börjar måste du se till att du har AWS CLI installerat och konfigurerat, eftersom... Terraform kommer att leta efter referenser i filen ~/.aws/credentials.

En bra praxis är att köra plankommandot innan du distribuerar hela infrastrukturen för att se vad Terraform för närvarande skapar åt oss i molnet:

terraform.exe plan

Du kommer att bli ombedd att ange ett telefonnummer att skicka meddelanden till. Det är inte nödvändigt att ange det i detta skede.

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Efter att ha analyserat programmets verksamhetsplan kan vi börja skapa resurser:

terraform.exe apply

Efter att ha skickat detta kommando kommer du återigen att bli ombedd att ange ett telefonnummer; slå "ja" när en fråga om att faktiskt utföra åtgärderna visas. Detta gör att du kan ställa in hela infrastrukturen, utföra all nödvändig konfiguration av EC2, distribuera lambdafunktioner, etc.

Efter att alla resurser har skapats framgångsrikt genom Terraform-koden måste du gå in i detaljerna för Kinesis Analytics-applikationen (tyvärr hittade jag inte hur man gör detta direkt från koden).

Starta applikationen:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Efter detta måste du uttryckligen ange namnet på strömmen i applikationen genom att välja från rullgardinsmenyn:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Nu är allt redo att gå.

Testar applikationen

Oavsett hur du distribuerade systemet, manuellt eller genom Terraform-kod, kommer det att fungera på samma sätt.

Vi loggar in via SSH till den virtuella EC2-maskinen där Kinesis Agent är installerad och kör skriptet api_caller.py

sudo ./api_caller.py TOKEN

Allt du behöver göra är att vänta på ett SMS till ditt nummer:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
SMS - ett meddelande kommer till din telefon på nästan 1 minut:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet
Det återstår att se om posterna sparades i DynamoDB-databasen för efterföljande, mer detaljerad analys. Tabellen flygbiljetter innehåller ungefär följande data:

Aviasales API-integration med Amazon Kinesis och serverlös enkelhet

Slutsats

Under arbetets gång byggdes ett online databehandlingssystem baserat på Amazon Kinesis. Alternativ för att använda Kinesis Agent i samband med Kinesis Dataströmmar och realtidsanalys Kinesis Analytics med SQL-kommandon, samt interaktionen mellan Amazon Kinesis och andra AWS-tjänster övervägdes.

Vi distribuerade ovanstående system på två sätt: ett ganska långt manuellt och ett snabbt från Terraform-koden.

All källkod för projektet är tillgänglig i mitt GitHub-förråd, jag föreslår att du bekantar dig med det.

Jag diskuterar gärna artikeln, jag ser fram emot dina kommentarer. Jag hoppas på konstruktiv kritik.

Jag önskar dig framgång!

Källa: will.com

Lägg en kommentar