Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Hé Habr!

Hou je van vliegende vliegtuigen? Ik vind het geweldig, maar tijdens zelfisolatie werd ik ook verliefd op het analyseren van gegevens over vliegtickets van een bekende bron: Aviasales.

Vandaag zullen we het werk van Amazon Kinesis analyseren, een streamingsysteem bouwen met realtime analyses, de Amazon DynamoDB NoSQL-database installeren als de belangrijkste gegevensopslag en sms-meldingen instellen voor interessante tickets.

Alle details zijn onder de snit! Gaan!

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Introductie

Voor het voorbeeld hebben we toegang nodig tot Aviasales-API. Toegang hiertoe wordt gratis en zonder beperkingen verleend; u hoeft zich alleen maar te registreren in de sectie ‘Ontwikkelaars’ om uw API-token te ontvangen voor toegang tot de gegevens.

Het belangrijkste doel van dit artikel is om een ​​algemeen inzicht te geven in het gebruik van informatiestreaming in AWS; we houden er rekening mee dat de gegevens die door de gebruikte API worden geretourneerd niet strikt actueel zijn en worden verzonden vanuit de cache, die gevormd op basis van zoekopdrachten door gebruikers van de sites Aviasales.ru en Jetradar.com gedurende de afgelopen 48 uur.

Kinesis-agent, geïnstalleerd op de producerende machine, ontvangen via de API, zal automatisch gegevens parseren en verzenden naar de gewenste stream via Kinesis Data Analytics. De onbewerkte versie van deze stream wordt rechtstreeks naar de winkel geschreven. De ruwe gegevensopslag die in DynamoDB wordt ingezet, maakt een diepere ticketanalyse mogelijk via BI-tools, zoals AWS Quick Sight.

We zullen twee opties overwegen om de volledige infrastructuur in te zetten:

  • Handmatig - via AWS Management Console;
  • Infrastructuur van Terraform-code is voor luie automatiseerders;

Architectuur van het ontwikkelde systeem

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Gebruikte componenten:

  • Aviasales-API — de door deze API geretourneerde gegevens zullen worden gebruikt voor al het daaropvolgende werk;
  • EC2 Producentinstantie — een gewone virtuele machine in de cloud waarop de invoergegevensstroom wordt gegenereerd:
    • Kinesis-agent is een Java-applicatie die lokaal op de machine is geïnstalleerd en die een eenvoudige manier biedt om gegevens te verzamelen en naar Kinesis te verzenden (Kinesis Data Streams of Kinesis Firehose). De agent controleert voortdurend een reeks bestanden in de opgegeven mappen en stuurt nieuwe gegevens naar Kinesis;
    • API-aanroepscript — Een Python-script dat verzoeken indient bij de API en het antwoord in een map plaatst die wordt gecontroleerd door de Kinesis Agent;
  • Kinesis-gegevensstromen — real-time datastreamingdienst met brede schaalmogelijkheden;
  • Kinesis-analyse is een serverloze service die de analyse van streaminggegevens in realtime vereenvoudigt. Amazon Kinesis Data Analytics configureert applicatiebronnen en schaalt automatisch om elk volume aan binnenkomende gegevens te verwerken;
  • AWS Lambda — een service waarmee u code kunt uitvoeren zonder een back-up te maken of servers in te stellen. Alle rekenkracht wordt voor elk gesprek automatisch geschaald;
  • Amazon DynamoDB - Een database met sleutel-waardeparen en documenten die een latentie van minder dan 10 milliseconden biedt bij uitvoering op elke schaal. Wanneer u DynamoDB gebruikt, hoeft u geen servers in te richten, te patchen of te beheren. DynamoDB schaalt automatisch tabellen om de hoeveelheid beschikbare bronnen aan te passen en hoge prestaties te behouden. Er is geen systeembeheer vereist;
  • Amazon SNS - een volledig beheerde service voor het verzenden van berichten via het uitgever-abonneemodel (Pub/Sub), waarmee u microservices, gedistribueerde systemen en serverloze applicaties kunt isoleren. SNS kan worden gebruikt om informatie naar eindgebruikers te sturen via mobiele pushmeldingen, sms-berichten en e-mails.

Eerste training

Om de gegevensstroom te emuleren, besloot ik de vliegticketinformatie te gebruiken die werd geretourneerd door de Aviasales API. IN documentatie een behoorlijk uitgebreide lijst met verschillende methoden, laten we er een nemen: "Maandelijkse prijskalender", die prijzen voor elke dag van de maand retourneert, gegroepeerd op het aantal overboekingen. Als u de zoekmaand niet opgeeft in de aanvraag, wordt informatie geretourneerd voor de maand die volgt op de huidige maand.

Dus laten we ons registreren en onze token krijgen.

Een voorbeeldverzoek vindt u hieronder:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

De bovenstaande methode om gegevens van de API te ontvangen door een token in het verzoek op te geven zal werken, maar ik geef er de voorkeur aan om het toegangstoken door de header door te geven, dus we zullen deze methode gebruiken in het api_caller.py script.

Antwoord voorbeeld:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Het bovenstaande voorbeeld van een API-antwoord toont een ticket van St. Petersburg naar Phuk... Oh, wat een droom...
Omdat ik uit Kazan kom en Phuket nu “slechts een droom” is, gaan we op zoek naar kaartjes van St. Petersburg naar Kazan.

Er wordt van uitgegaan dat u al een AWS-account heeft. Bijzondere aandacht wil ik er meteen op vestigen dat Kinesis en het versturen van notificaties via SMS niet in de jaarcijfers zijn opgenomen Gratis niveau (gratis gebruik). Maar ondanks dit is het, met een paar dollar in gedachten, heel goed mogelijk om het voorgestelde systeem te bouwen en ermee te spelen. En vergeet natuurlijk niet alle bronnen te verwijderen nadat ze niet langer nodig zijn.

Gelukkig zijn de DynamoDb- en lambda-functies gratis voor ons als we aan onze maandelijkse gratis limieten voldoen. Voor DynamoDB: 25 GB opslag, 25 WCU/RCU en 100 miljoen queries. En een miljoen lambda-functieoproepen per maand.

Handmatige systeemimplementatie

Kinesis-gegevensstromen instellen

Laten we naar de Kinesis Data Streams-service gaan en twee nieuwe streams maken, één scherf voor elk.

Wat is een scherf?
Een scherf is de basiseenheid voor gegevensoverdracht van een Amazon Kinesis-stream. Eén segment biedt invoergegevensoverdracht met een snelheid van 1 MB/s en uitvoergegevensoverdracht met een snelheid van 2 MB/s. Eén segment ondersteunt maximaal 1000 PUT-invoer per seconde. Wanneer u een gegevensstroom maakt, moet u het vereiste aantal segmenten opgeven. U kunt bijvoorbeeld een gegevensstroom maken met twee segmenten. Deze datastroom biedt invoergegevensoverdracht met een snelheid van 2 MB/s en uitvoergegevensoverdracht met 4 MB/s, en ondersteunt maximaal 2000 PUT-records per seconde.

Hoe meer scherven in uw stream, hoe groter de doorvoer. In principe is dit de manier waarop stromen worden geschaald: door scherven toe te voegen. Maar hoe meer scherven je hebt, hoe hoger de prijs. Elke scherf kost 1,5 cent per uur en nog eens 1.4 cent voor elke miljoen PUT-payload-eenheden.

Laten we een nieuwe stream maken met de naam vliegtickets, 1 scherf is genoeg voor hem:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Laten we nu een nieuwe thread maken met de naam speciale_stream:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Producent opstelling

Om een ​​taak te analyseren is het voldoende om een ​​gewone EC2-instance als dataproducent te gebruiken. Het hoeft geen krachtige, dure virtuele machine te zijn; een spot t2.micro voldoet prima.

Belangrijke opmerking: u moet bijvoorbeeld image - Amazon Linux AMI 2018.03.0 gebruiken, deze heeft minder instellingen voor het snel starten van de Kinesis Agent.

Ga naar de EC2-service, maak een nieuwe virtuele machine aan, selecteer de gewenste AMI met type t2.micro, die is opgenomen in de Free Tier:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Om de nieuw gecreëerde virtuele machine te laten communiceren met de Kinesis-service, moeten er rechten voor worden gegeven. De beste manier om dit te doen is door een IAM-rol toe te wijzen. Daarom moet u op het scherm Stap 3: Instancedetails configureren selecteren Maak een nieuwe IAM-rol:

Creëren van een IAM-rol voor EC2
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Selecteer in het geopende venster dat we een nieuwe rol voor EC2 maken en ga naar de sectie Machtigingen:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Met behulp van het trainingsvoorbeeld hoeven we niet in te gaan op alle fijne kneepjes van de gedetailleerde configuratie van bronrechten, dus selecteren we het beleid dat vooraf is geconfigureerd door Amazon: AmazonKinesisFullAccess en CloudWatchFullAccess.

Laten we deze rol een betekenisvolle naam geven, bijvoorbeeld: EC2-KinesisStreams-FullAccess. Het resultaat zou hetzelfde moeten zijn als weergegeven in de onderstaande afbeelding:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Vergeet na het maken van deze nieuwe rol niet om deze aan de gemaakte virtuele machine-instantie te koppelen:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
We veranderen verder niets op dit scherm en gaan verder naar de volgende vensters.

De instellingen van de harde schijf kunnen als standaard blijven staan, evenals de tags (hoewel het een goede gewoonte is om tags te gebruiken, geef de instance in ieder geval een naam en geef de omgeving aan).

Nu bevinden we ons op het tabblad Stap 6: Beveiligingsgroep configureren, waar u een nieuwe moet maken of uw bestaande beveiligingsgroep moet opgeven, waarmee u via ssh (poort 22) verbinding kunt maken met de instantie. Selecteer daar Bron -> Mijn IP en u kunt de instantie starten.

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Zodra het overschakelt naar de actieve status, kunt u proberen er via ssh verbinding mee te maken.

Om met Kinesis Agent te kunnen werken, moet u, nadat u succesvol verbinding heeft gemaakt met de machine, de volgende opdrachten in de terminal invoeren:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Laten we een map maken om API-reacties op te slaan:

sudo mkdir /var/log/airline_tickets

Voordat u de agent start, moet u de configuratie ervan configureren:

sudo vim /etc/aws-kinesis/agent.json

De inhoud van het agent.json-bestand zou er als volgt uit moeten zien:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Zoals u kunt zien in het configuratiebestand, zal de agent bestanden met de extensie .log in de directory /var/log/airline_tickets/ monitoren, parseren en overbrengen naar de stream airline_tickets.

We starten de service opnieuw op en zorgen ervoor dat deze actief is:

sudo service aws-kinesis-agent restart

Laten we nu het Python-script downloaden dat gegevens van de API opvraagt:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Het api_caller.py-script vraagt ​​gegevens op bij Aviasales en slaat het ontvangen antwoord op in de map die de Kinesis-agent scant. De implementatie van dit script is vrij standaard, er is een TicketsApi-klasse, waarmee je de API asynchroon kunt ophalen. We geven een header met een token door en vragen parameters aan deze klasse:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Laten we het script api_caller.py testen om de juiste instellingen en functionaliteit van de agent te testen:

sudo ./api_caller.py TOKEN

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
En we kijken naar het resultaat van het werk in de agentlogboeken en op het tabblad Monitoring in de gegevensstroom airline_tickets:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Zoals u kunt zien, werkt alles en verzendt de Kinesis Agent met succes gegevens naar de stream. Laten we nu de consument configureren.

Kinesis Data Analytics opzetten

Laten we verder gaan met de centrale component van het hele systeem: maak een nieuwe applicatie in Kinesis Data Analytics met de naam kinesis_analytics_airlines_app:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Met Kinesis Data Analytics kunt u realtime gegevensanalyses uitvoeren vanuit Kinesis Streams met behulp van de SQL-taal. Het is een volledig automatische schalingsservice (in tegenstelling tot Kinesis Streams) die:

  1. stelt u in staat nieuwe streams (Output Stream) te creëren op basis van verzoeken om gegevens te verzamelen;
  2. biedt een stream met fouten die zijn opgetreden terwijl applicaties actief waren (Error Stream);
  3. kan automatisch het invoergegevensschema bepalen (het kan indien nodig handmatig opnieuw worden gedefinieerd).

Dit is geen goedkope service - 0.11 USD per uur werk, dus u moet er zorgvuldig mee omgaan en deze verwijderen als u klaar bent.

Laten we de applicatie verbinden met de gegevensbron:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Selecteer de stream waarmee we verbinding willen maken (airline_tickets):

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Vervolgens moet u een nieuwe IAM-rol koppelen, zodat de applicatie uit de stream kan lezen en naar de stream kan schrijven. Om dit te doen, volstaat het om niets te wijzigen in het blok Toegangsrechten:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Laten we nu de ontdekking van het gegevensschema in de stream aanvragen; klik hiervoor op de knop 'Schema ontdekken'. Als gevolg hiervan wordt de IAM-rol bijgewerkt (er wordt een nieuwe aangemaakt) en wordt schemadetectie gestart op basis van de gegevens die al in de stream zijn aangekomen:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Nu moet je naar de SQL-editor gaan. Wanneer u op deze knop klikt, verschijnt er een venster waarin u wordt gevraagd de toepassing te starten. Selecteer wat u wilt starten:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Voeg de volgende eenvoudige query in het SQL-editorvenster in en klik op SQL opslaan en uitvoeren:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relationele databases werkt u met tabellen met behulp van INSERT-instructies om records toe te voegen en een SELECT-instructie om gegevens op te vragen. In Amazon Kinesis Data Analytics werk je met streams (STREAMs) en pompen (PUMPs): continue invoegverzoeken die gegevens uit de ene stream in een applicatie invoegen in een andere stream.

De hierboven gepresenteerde SQL-query zoekt naar Aeroflot-tickets voor minder dan vijfduizend roebel. Alle records die aan deze voorwaarden voldoen, worden in de DESTINATION_SQL_STREAM-stream geplaatst.

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Selecteer in het Destination-blok de special_stream-stream en in de vervolgkeuzelijst In-application stream-naam DESTINATION_SQL_STREAM:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Het resultaat van alle manipulaties zou ongeveer hetzelfde moeten zijn als de onderstaande afbeelding:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Een SNS-onderwerp maken en erop abonneren

Ga naar de Simple Notification Service en maak daar een nieuw onderwerp aan met de naam Airlines:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Abonneer u op dit onderwerp en geef het mobiele telefoonnummer aan waarnaar sms-meldingen worden verzonden:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Maak een tabel in DynamoDB

Om de onbewerkte gegevens van hun airline_tickets-stream op te slaan, maken we een tabel in DynamoDB met dezelfde naam. We gebruiken record_id als de primaire sleutel:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Een lambda-functiecollector maken

Laten we een lambda-functie maken met de naam Collector, wiens taak het is om de airline_tickets-stroom te pollen en, als daar nieuwe records worden gevonden, deze records in de DynamoDB-tabel in te voegen. Uiteraard moet deze lambda, naast de standaardrechten, leestoegang hebben tot de Kinesis-datastroom en schrijftoegang tot DynamoDB.

Een IAM-rol maken voor de collector-lambda-functie
Laten we eerst een nieuwe IAM-rol maken voor de lambda met de naam Lambda-TicketsProcessingRole:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Voor het testvoorbeeld zijn de vooraf geconfigureerde beleidsregels AmazonKinesisReadOnlyAccess en AmazonDynamoDBFullAccess redelijk geschikt, zoals weergegeven in de onderstaande afbeelding:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Deze lambda zou moeten worden gelanceerd door een trigger van Kinesis wanneer nieuwe vermeldingen de airline_stream binnenkomen, dus we moeten een nieuwe trigger toevoegen:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Het enige dat overblijft is het invoegen van de code en het opslaan van de lambda.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Een lambda-functiemelder maken

De tweede lambda-functie, die de tweede stream (special_stream) zal monitoren en een melding naar SNS zal sturen, is op een vergelijkbare manier gemaakt. Daarom moet deze lambda toegang hebben om vanuit Kinesis berichten te lezen en naar een bepaald SNS-onderwerp te verzenden, die vervolgens door de SNS-dienst naar alle abonnees van dit onderwerp worden verzonden (e-mail, sms, enz.).

Een IAM-rol creëren
Eerst creëren we de IAM-rol Lambda-KinesisAlarm voor deze lambda, en wijzen deze rol vervolgens toe aan de alarm_notifier lambda die wordt aangemaakt:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Deze lambda zou moeten werken op een trigger voor nieuwe records om de special_stream binnen te komen, dus je moet de trigger op dezelfde manier configureren als we deden voor de Collector lambda.

Om het configureren van deze lambda gemakkelijker te maken, introduceren we een nieuwe omgevingsvariabele - TOPIC_ARN, waar we de ANR (Amazon Recourse Names) van het Airlines-onderwerp plaatsen:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
En voer de lambdacode in, het is helemaal niet ingewikkeld:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Het lijkt erop dat hier de handmatige systeemconfiguratie is voltooid. Het enige dat overblijft is testen en ervoor zorgen dat we alles correct hebben geconfigureerd.

Implementeren vanuit Terraform-code

Noodzakelijke voorbereiding

Terraform is een zeer handige open-sourcetool voor het implementeren van infrastructuur vanuit code. Het heeft zijn eigen syntaxis die gemakkelijk te leren is en veel voorbeelden bevat van hoe en wat te implementeren. De Atom-editor of Visual Studio Code heeft veel handige plug-ins die het werken met Terraform eenvoudiger maken.

Je kunt de distributie downloaden vandaar. Een gedetailleerde analyse van alle Terraform-mogelijkheden valt buiten het bestek van dit artikel, dus we zullen ons beperken tot de hoofdpunten.

Hoe te beginnen

De volledige code van het project is in mijn repository. We klonen de repository naar onszelf. Voordat u begint, moet u ervoor zorgen dat AWS CLI is geïnstalleerd en geconfigureerd, omdat... Terraform zoekt naar referenties in het bestand ~/.aws/credentials.

Een goede gewoonte is om de opdracht plan uit te voeren voordat u de volledige infrastructuur implementeert, om te zien wat Terraform momenteel voor ons in de cloud creëert:

terraform.exe plan

U wordt gevraagd een telefoonnummer in te voeren waarnaar u meldingen wilt verzenden. Het is in dit stadium niet nodig om dit in te voeren.

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Nadat we het werkingsplan van het programma hebben geanalyseerd, kunnen we beginnen met het creëren van hulpmiddelen:

terraform.exe apply

Na het verzenden van dit commando wordt u opnieuw gevraagd een telefoonnummer in te voeren; kies “ja” wanneer er een vraag verschijnt over het daadwerkelijk uitvoeren van de acties. Hiermee kunt u de volledige infrastructuur opzetten, alle noodzakelijke configuraties van EC2 uitvoeren, lambda-functies inzetten, enz.

Nadat alle bronnen met succes zijn aangemaakt via de Terraform-code, moet je ingaan op de details van de Kinesis Analytics-applicatie (helaas heb ik niet gevonden hoe ik dit rechtstreeks vanuit de code moest doen).

Start de applicatie:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Hierna moet u expliciet de naam van de stream in de applicatie instellen door deze in de vervolgkeuzelijst te selecteren:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Nu is alles klaar om te gaan.

Het testen van de applicatie

Ongeacht hoe u het systeem heeft geïmplementeerd, handmatig of via Terraform-code, het zal hetzelfde werken.

We loggen via SSH in op de virtuele EC2-machine waarop Kinesis Agent is geïnstalleerd en voeren het script api_caller.py uit

sudo ./api_caller.py TOKEN

Het enige dat u hoeft te doen, is wachten op een sms naar uw nummer:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
SMS - het bericht arriveert binnen bijna 1 minuut op de telefoon:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud
Het blijft de vraag of de records zijn opgeslagen in de DynamoDB-database voor daaropvolgende, meer gedetailleerde analyse. De tabel airline_tickets bevat ongeveer de volgende gegevens:

Aviasales API-integratie met Amazon Kinesis en serverloze eenvoud

Conclusie

In de loop van de werkzaamheden werd een online gegevensverwerkingssysteem gebouwd op basis van Amazon Kinesis. Opties voor het gebruik van de Kinesis Agent in combinatie met Kinesis Data Streams en real-time analytics Kinesis Analytics met behulp van SQL-opdrachten, evenals de interactie van Amazon Kinesis met andere AWS-diensten werden overwogen.

We hebben het bovenstaande systeem op twee manieren geïmplementeerd: een vrij lang handmatig systeem en een snel systeem vanuit de Terraform-code.

Alle projectbroncode is beschikbaar in mijn GitHub-repository, Ik stel voor dat u er vertrouwd mee raakt.

Ik bespreek het artikel graag, ik kijk uit naar uw opmerkingen. Ik hoop op opbouwende kritiek.

Ik wens u veel succes!

Bron: www.habr.com

Voeg een reactie