Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Hej Habr!

Kan du lide at flyve med fly? Jeg elsker det, men under selvisolering blev jeg også forelsket i at analysere data om flybilletter fra én velkendt ressource - Aviasales.

I dag vil vi analysere Amazon Kinesis' arbejde, bygge et streamingsystem med realtidsanalyse, installere Amazon DynamoDB NoSQL-databasen som hoveddatalageret og opsætte SMS-beskeder for interessante billetter.

Alle detaljer er under snittet! Gå!

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Indledning

For eksempel skal vi have adgang til Aviasales API. Adgang til det er gratis og uden begrænsninger; du skal blot registrere dig i afsnittet "Udviklere" for at modtage dit API-token for at få adgang til dataene.

Hovedformålet med denne artikel er at give en generel forståelse af brugen af ​​informationsstreaming i AWS; vi tager højde for, at de data, der returneres af den anvendte API, ikke er strengt opdaterede og transmitteres fra cachen, som er dannet baseret på søgninger foretaget af brugere af Aviasales.ru og Jetradar.com webstederne i de sidste 48 timer.

Kinesis-agent, installeret på den producerende maskine, modtaget via API'et vil automatisk parse og transmittere data til den ønskede strøm via Kinesis Data Analytics. Råversionen af ​​denne stream vil blive skrevet direkte til butikken. Den rå datalagring, der er implementeret i DynamoDB, giver mulighed for dybere billetanalyse gennem BI-værktøjer, såsom AWS Quick Sight.

Vi vil overveje to muligheder for at implementere hele infrastrukturen:

  • Manual - via AWS Management Console;
  • Infrastruktur fra Terraform-kode er til dovne automatister;

Arkitekturen af ​​det udviklede system

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Brugte komponenter:

  • Aviasales API — de data, der returneres af denne API, vil blive brugt til alt efterfølgende arbejde;
  • EC2 Producer Forekomst — en almindelig virtuel maskine i skyen, hvorpå inputdatastrømmen vil blive genereret:
    • Kinesis Agent er en Java-applikation installeret lokalt på maskinen, som giver en nem måde at indsamle og sende data til Kinesis (Kinesis Data Streams eller Kinesis Firehose). Agenten overvåger konstant et sæt filer i de angivne mapper og sender nye data til Kinesis;
    • API Caller Script — Et Python-script, der sender anmodninger til API'et og lægger svaret i en mappe, der overvåges af Kinesis Agent;
  • Kinesis datastrømme — datastreaming i realtid med brede skaleringsmuligheder;
  • Kinesis Analytics er en serverløs tjeneste, der forenkler analysen af ​​streamingdata i realtid. Amazon Kinesis Data Analytics konfigurerer applikationsressourcer og skalerer automatisk til at håndtere enhver mængde af indgående data;
  • AWS Lambda — en tjeneste, der giver dig mulighed for at køre kode uden at sikkerhedskopiere eller opsætte servere. Al computerkraft skaleres automatisk for hvert opkald;
  • Amazon DynamoDB - En database med nøgleværdi-par og dokumenter, der giver en ventetid på mindre end 10 millisekunder, når den kører i enhver skala. Når du bruger DynamoDB, behøver du ikke at klargøre, patche eller administrere nogen servere. DynamoDB skalerer automatisk tabeller for at justere mængden af ​​tilgængelige ressourcer og opretholde høj ydeevne. Ingen systemadministration er påkrævet;
  • Amazon SNS - en fuldt administreret tjeneste til afsendelse af beskeder ved hjælp af udgiver-abonnent-modellen (Pub/Sub), hvormed du kan isolere mikrotjenester, distribuerede systemer og serverløse applikationer. SNS kan bruges til at sende information til slutbrugere gennem mobile push-beskeder, SMS-beskeder og e-mails.

Indledende træning

For at efterligne datastrømmen besluttede jeg at bruge flybilletoplysningerne returneret af Aviasales API. I dokumentation en ret omfattende liste over forskellige metoder, lad os tage en af ​​dem - "Månedlig priskalender", som returnerer priser for hver dag i måneden, grupperet efter antallet af overførsler. Hvis du ikke angiver søgemåneden i anmodningen, returneres oplysninger for måneden efter den aktuelle.

Så lad os registrere og få vores token.

Et eksempel på anmodning er nedenfor:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Ovenstående metode til at modtage data fra API'et ved at angive et token i anmodningen vil fungere, men jeg foretrækker at sende adgangstokenet gennem headeren, så vi vil bruge denne metode i api_caller.py scriptet.

Eksempel på svar:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Eksempel-API-svaret ovenfor viser en billet fra St. Petersborg til Phuk... Åh, hvilken drøm...
Da jeg er fra Kazan, og Phuket nu "kun er en drøm", lad os kigge efter billetter fra St. Petersborg til Kazan.

Det forudsætter, at du allerede har en AWS-konto. Jeg vil straks gøre særligt opmærksom på, at Kinesis og afsendelse af notifikationer via SMS ikke er inkluderet i den årlige Gratis niveau (gratis brug). Men selv på trods af dette, med et par dollars i tankerne, er det ganske muligt at bygge det foreslåede system og lege med det. Og, selvfølgelig, glem ikke at slette alle ressourcer, efter at de ikke længere er nødvendige.

Heldigvis vil DynamoDb og lambda funktioner være gratis for os, hvis vi overholder vores månedlige gratis grænser. For eksempel til DynamoDB: 25 GB lagerplads, 25 WCU/RCU og 100 millioner forespørgsler. Og en million lambdafunktionsopkald om måneden.

Manuel systemimplementering

Opsætning af Kinesis Data Streams

Lad os gå til Kinesis Data Streams-tjenesten og oprette to nye streams, et shard til hver.

Hvad er en skærv?
Et shard er den grundlæggende dataoverførselsenhed i en Amazon Kinesis-strøm. Et segment giver input dataoverførsel med en hastighed på 1 MB/s og output dataoverførsel med en hastighed på 2 MB/s. Et segment understøtter op til 1000 PUT-indgange i sekundet. Når du opretter en datastrøm, skal du angive det nødvendige antal segmenter. For eksempel kan du oprette en datastrøm med to segmenter. Denne datastrøm vil give input-dataoverførsel ved 2 MB/s og output-dataoverførsel ved 4 MB/s, hvilket understøtter op til 2000 PUT-poster pr. sekund.

Jo flere skår i din strøm, jo ​​større er dens gennemstrømning. I princippet er det sådan, strømme skaleres - ved at tilføje skår. Men jo flere skår du har, jo højere er prisen. Hvert skår koster 1,5 cent i timen og yderligere 1.4 cent for hver million PUT nyttelastenheder.

Lad os oprette en ny strøm med navnet flybilletter, 1 skår vil være nok til ham:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Lad os nu oprette endnu en tråd med navnet special_stream:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Opsætning af producent

For at analysere en opgave er det nok at bruge en almindelig EC2-instans som dataproducent. Det behøver ikke at være en kraftfuld, dyr virtuel maskine; en spot t2.micro vil klare sig fint.

Vigtig bemærkning: for eksempel skal du bruge billede - Amazon Linux AMI 2018.03.0, det har færre indstillinger til hurtigt at starte Kinesis Agent.

Gå til EC2-tjenesten, opret en ny virtuel maskine, vælg den ønskede AMI med typen t2.micro, som er inkluderet i Free Tier:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
For at den nyoprettede virtuelle maskine skal kunne interagere med Kinesis-tjenesten, skal den have rettigheder til det. Den bedste måde at gøre dette på er at tildele en IAM-rolle. Derfor skal du på skærmen Trin 3: Konfigurer instansdetaljer vælge Opret ny IAM-rolle:

Oprettelse af en IAM-rolle for EC2
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
I vinduet, der åbnes, skal du vælge, at vi opretter en ny rolle for EC2 og gå til sektionen Tilladelser:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Ved at bruge træningseksemplet behøver vi ikke at gå ind i alle forviklingerne ved granulær konfiguration af ressourcerettigheder, så vi vælger de politikker, der er forudkonfigureret af Amazon: AmazonKinesisFullAccess og CloudWatchFullAccess.

Lad os give denne rolle et meningsfuldt navn, for eksempel: EC2-KinesisStreams-FullAccess. Resultatet skal være det samme som vist på billedet nedenfor:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Efter at have oprettet denne nye rolle, glem ikke at vedhæfte den til den oprettede virtuelle maskine-instans:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Vi ændrer ikke andet på denne skærm og går videre til de næste vinduer.

Harddiskindstillingerne kan stå som standard, såvel som tags (selvom det er god praksis at bruge tags, giv i det mindste forekomsten et navn og angiv miljøet).

Nu er vi på fanen Trin 6: Konfigurer sikkerhedsgruppe, hvor du skal oprette en ny eller angive din eksisterende sikkerhedsgruppe, som giver dig mulighed for at oprette forbindelse via ssh (port 22) til instansen. Vælg Kilde -> Min IP der, og du kan starte forekomsten.

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Så snart den skifter til kørestatus, kan du prøve at oprette forbindelse til den via ssh.

For at kunne arbejde med Kinesis Agent skal du, efter at have oprettet forbindelse til maskinen, indtaste følgende kommandoer i terminalen:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Lad os oprette en mappe for at gemme API-svar:

sudo mkdir /var/log/airline_tickets

Før du starter agenten, skal du konfigurere dens konfiguration:

sudo vim /etc/aws-kinesis/agent.json

Indholdet af filen agent.json skal se sådan ud:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Som det kan ses af konfigurationsfilen, vil agenten overvåge filer med .log-udvidelsen i mappen /var/log/airline_tickets/, parse dem og overføre dem til airline_tickets-strømmen.

Vi genstarter tjenesten og sikrer, at den er oppe og køre:

sudo service aws-kinesis-agent restart

Lad os nu downloade Python-scriptet, der vil anmode om data fra API'en:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Scriptet api_caller.py anmoder om data fra Aviasales og gemmer det modtagne svar i den mappe, som Kinesis-agenten scanner. Implementeringen af ​​dette script er ret standard, der er en TicketsApi-klasse, den giver dig mulighed for asynkront at trække API'en. Vi sender en header med et token og anmoder om parametre til denne klasse:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

For at teste agentens korrekte indstillinger og funktionalitet, lad os prøvekøre api_caller.py-scriptet:

sudo ./api_caller.py TOKEN

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Og vi ser på resultatet af arbejdet i agentlogfilerne og på fanen Overvågning i airline_tickets-datastrømmen:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Som du kan se, fungerer alt, og Kinesis Agent sender med succes data til strømmen. Lad os nu konfigurere forbrugeren.

Opsætning af Kinesis Data Analytics

Lad os gå videre til den centrale komponent i hele systemet - opret en ny applikation i Kinesis Data Analytics ved navn kinesis_analytics_airlines_app:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Kinesis Data Analytics giver dig mulighed for at udføre dataanalyse i realtid fra Kinesis Streams ved hjælp af SQL-sproget. Det er en fuld autoskaleringstjeneste (i modsætning til Kinesis Streams), der:

  1. giver dig mulighed for at oprette nye streams (Output Stream) baseret på anmodninger om kildedata;
  2. giver en strøm med fejl, der opstod, mens programmer kørte (fejlstrøm);
  3. kan automatisk bestemme inputdataskemaet (det kan omdefineres manuelt om nødvendigt).

Dette er ikke en billig service - 0.11 USD per time arbejde, så du bør bruge den forsigtigt og slette den, når du er færdig.

Lad os forbinde applikationen til datakilden:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Vælg den strøm, vi vil oprette forbindelse til (flybilletter):

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Dernæst skal du vedhæfte en ny IAM-rolle, så applikationen kan læse fra streamen og skrive til streamen. For at gøre dette er det nok ikke at ændre noget i adgangstilladelsesblokken:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Lad os nu anmode om opdagelse af dataskemaet i strømmen; for at gøre dette skal du klikke på knappen "Opdag skema". Som et resultat heraf vil IAM-rollen blive opdateret (en ny oprettes), og skemadetektion vil blive lanceret fra de data, der allerede er ankommet i strømmen:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Nu skal du gå til SQL-editoren. Når du klikker på denne knap, vises et vindue, hvor du bliver bedt om at starte programmet - vælg hvad du vil starte:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Indsæt følgende enkle forespørgsel i SQL-editorvinduet, og klik på Gem og kør SQL:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

I relationelle databaser arbejder du med tabeller ved hjælp af INSERT-sætninger til at tilføje poster og en SELECT-sætning til at forespørge data. I Amazon Kinesis Data Analytics arbejder du med strømme (STREAMs) og pumper (PUMP'er) – kontinuerlige indsættelsesanmodninger, der indsætter data fra én strøm i en applikation i en anden strøm.

SQL-forespørgslen præsenteret ovenfor søger efter Aeroflot-billetter til en pris på under fem tusind rubler. Alle poster, der opfylder disse betingelser, vil blive placeret i DESTINATION_SQL_STREAM-strømmen.

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
I Destinationsblokken skal du vælge special_stream-strømmen og på rullelisten DESTINATION_SQL_STREAM i rullelisten In-application stream name:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Resultatet af alle manipulationer skulle være noget, der ligner billedet nedenfor:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Oprettelse og abonnement på et SNS-emne

Gå til Simple Notification Service og opret et nyt emne der med navnet Airlines:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Abonner på dette emne og angiv det mobiltelefonnummer, som SMS-beskeder vil blive sendt til:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Opret en tabel i DynamoDB

Lad os oprette en tabel i DynamoDB med samme navn for at gemme rådataene fra deres flyrejsestrøm. Vi vil bruge record_id som den primære nøgle:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Oprettelse af en lambdafunktionsopsamler

Lad os oprette en lambda-funktion kaldet Collector, hvis opgave vil være at polle airline_tickets-strømmen og, hvis nye poster findes der, indsætte disse poster i DynamoDB-tabellen. Ud over standardrettighederne skal denne lambda naturligvis have læseadgang til Kinesis-datastrømmen og skriveadgang til DynamoDB.

Oprettelse af en IAM-rolle for collector lambda-funktionen
Lad os først oprette en ny IAM-rolle for lambdaen ved navn Lambda-TicketsProcessingRole:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Til testeksemplet er de forudkonfigurerede AmazonKinesisReadOnlyAccess- og AmazonDynamoDBFullAccess-politikker ganske velegnede, som vist på billedet nedenfor:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Denne lambda bør lanceres af en trigger fra Kinesis, når nye poster kommer ind i airline_stream, så vi skal tilføje en ny trigger:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Tilbage er blot at indsætte koden og gemme lambdaen.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Oprettelse af en lambdafunktionsmeddelelse

Den anden lambda-funktion, som vil overvåge den anden strøm (special_stream) og sende en notifikation til SNS, er oprettet på lignende måde. Derfor skal denne lambda have adgang til at læse fra Kinesis og sende beskeder til et givet SNS-emne, som så sendes af SNS-tjenesten til alle abonnenter af dette emne (e-mail, SMS, etc.).

Oprettelse af en IAM-rolle
Først opretter vi IAM-rollen Lambda-KinesisAlarm for denne lambda, og tildeler derefter denne rolle til den alarm_notifier lambda, der oprettes:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Denne lambda skulle fungere på en trigger for nye poster for at komme ind i special_stream, så du skal konfigurere triggeren på samme måde som vi gjorde for Collector lambdaen.

For at gøre det nemmere at konfigurere denne lambda, lad os introducere en ny miljøvariabel - TOPIC_ARN, hvor vi placerer ANR (Amazon Recourse Names) for Airlines-emnet:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Og indsæt lambdakoden, det er slet ikke kompliceret:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Det ser ud til, at det er her den manuelle systemkonfiguration er afsluttet. Det eneste, der er tilbage, er at teste og sikre, at vi har konfigureret alt korrekt.

Implementer fra Terraform-kode

Nødvendig forberedelse

terraform er et meget praktisk open source-værktøj til at implementere infrastruktur fra kode. Det har sin egen syntaks, der er let at lære og har mange eksempler på, hvordan og hvad der skal implementeres. Atom-editoren eller Visual Studio Code har mange praktiske plugins, der gør arbejdet med Terraform lettere.

Du kan downloade distributionen dermed. En detaljeret analyse af alle Terraform-kapaciteter ligger uden for rammerne af denne artikel, så vi vil begrænse os til hovedpunkterne.

Sådan starter du

Den fulde kode for projektet er i mit depot. Vi kloner depotet til os selv. Før du starter, skal du sikre dig, at du har AWS CLI installeret og konfigureret, fordi... Terraform vil lede efter legitimationsoplysninger i filen ~/.aws/credentials.

En god praksis er at køre plankommandoen, før du implementerer hele infrastrukturen for at se, hvad Terraform i øjeblikket skaber for os i skyen:

terraform.exe plan

Du bliver bedt om at indtaste et telefonnummer, du vil sende meddelelser til. Det er ikke nødvendigt at indtaste det på dette tidspunkt.

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Efter at have analyseret programmets driftsplan kan vi begynde at oprette ressourcer:

terraform.exe apply

Efter at have sendt denne kommando, bliver du igen bedt om at indtaste et telefonnummer; tast "ja", når et spørgsmål om faktisk udførelse af handlingerne vises. Dette giver dig mulighed for at opsætte hele infrastrukturen, udføre al den nødvendige konfiguration af EC2, implementere lambda-funktioner osv.

Efter at alle ressourcer er blevet oprettet gennem Terraform-koden, skal du gå ind i detaljerne i Kinesis Analytics-applikationen (desværre fandt jeg ikke, hvordan man gør dette direkte fra koden).

Start applikationen:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Herefter skal du udtrykkeligt angive navnet på streaming i applikationen ved at vælge fra rullelisten:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Nu er alt klar til at gå.

Test af applikationen

Uanset hvordan du implementerede systemet, manuelt eller gennem Terraform-kode, vil det fungere på samme måde.

Vi logger ind via SSH på den virtuelle EC2-maskine, hvor Kinesis Agent er installeret og kører scriptet api_caller.py

sudo ./api_caller.py TOKEN

Alt du skal gøre er at vente på en SMS til dit nummer:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
SMS - beskeden kommer på telefonen om næsten 1 minut:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Det er tilbage at se, om posterne blev gemt i DynamoDB-databasen til efterfølgende, mere detaljeret analyse. Flybilletter-tabellen indeholder omtrent følgende data:

Aviasales API-integration med Amazon Kinesis og serverløs enkelhed

Konklusion

I løbet af det udførte arbejde blev der bygget et online databehandlingssystem baseret på Amazon Kinesis. Muligheder for at bruge Kinesis Agent i forbindelse med Kinesis Data Streams og realtidsanalyse Kinesis Analytics ved hjælp af SQL-kommandoer samt interaktionen af ​​Amazon Kinesis med andre AWS-tjenester blev overvejet.

Vi implementerede ovenstående system på to måder: en ret lang manuel og en hurtig fra Terraform-koden.

Al projektkildekode er tilgængelig i mit GitHub-lager, jeg foreslår, at du gør dig bekendt med det.

Jeg er glad for at diskutere artiklen, jeg ser frem til dine kommentarer. Jeg håber på konstruktiv kritik.

Jeg ønsker dig succes!

Kilde: www.habr.com

Tilføj en kommentar