Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Hey Habr!

Magst du fliegende Flugzeuge? Ich liebe es, aber während der Selbstisolation habe ich mich auch in die Analyse von Flugticketdaten einer bekannten Quelle verliebt – Aviasales.

Heute analysieren wir die Arbeit von Amazon Kinesis, bauen ein Streaming-System mit Echtzeitanalysen auf, installieren die Amazon DynamoDB NoSQL-Datenbank als Hauptdatenspeicher und richten SMS-Benachrichtigungen für interessante Tickets ein.

Alle Details sind unter dem Schnitt! Gehen!

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Einführung

Für das Beispiel benötigen wir Zugriff auf Aviasales-API. Der Zugriff darauf ist kostenlos und ohne Einschränkungen möglich; Sie müssen sich lediglich im Bereich „Entwickler“ registrieren, um Ihr API-Token für den Zugriff auf die Daten zu erhalten.

Der Hauptzweck dieses Artikels besteht darin, ein allgemeines Verständnis der Verwendung von Informations-Streaming in AWS zu vermitteln. Wir berücksichtigen, dass die von der verwendeten API zurückgegebenen Daten nicht unbedingt aktuell sind und aus dem Cache übertragen werden Wird auf der Grundlage von Suchanfragen von Benutzern der Websites Aviasales.ru und Jetradar.com in den letzten 48 Stunden erstellt.

Der Kinesis-Agent, der auf der produzierenden Maschine installiert ist und über die API empfangen wird, analysiert die Daten automatisch und überträgt sie über Kinesis Data Analytics an den gewünschten Stream. Die Rohversion dieses Streams wird direkt in den Store geschrieben. Der in DynamoDB bereitgestellte Rohdatenspeicher ermöglicht eine tiefergehende Ticketanalyse durch BI-Tools wie AWS Quick Sight.

Wir werden zwei Optionen für die Bereitstellung der gesamten Infrastruktur in Betracht ziehen:

  • Manuell – über die AWS-Managementkonsole;
  • Infrastruktur aus Terraform-Code ist für faule Automatisierer;

Die Architektur des entwickelten Systems

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Verwendete Komponenten:

  • Aviasales-API — Die von dieser API zurückgegebenen Daten werden für alle nachfolgenden Arbeiten verwendet.
  • EC2-Produzenteninstanz — eine reguläre virtuelle Maschine in der Cloud, auf der der Eingabedatenstrom generiert wird:
    • Kinesis-Agent ist eine lokal auf dem Computer installierte Java-Anwendung, die eine einfache Möglichkeit bietet, Daten zu sammeln und an Kinesis (Kinesis Data Streams oder Kinesis Firehose) zu senden. Der Agent überwacht ständig eine Reihe von Dateien in den angegebenen Verzeichnissen und sendet neue Daten an Kinesis;
    • API-Aufruferskript – Ein Python-Skript, das Anfragen an die API stellt und die Antwort in einen Ordner legt, der vom Kinesis-Agenten überwacht wird;
  • Kinesis-Datenströme — Echtzeit-Daten-Streaming-Dienst mit breiten Skalierungsfunktionen;
  • Kinesis Analytics ist ein serverloser Dienst, der die Analyse von Streaming-Daten in Echtzeit vereinfacht. Amazon Kinesis Data Analytics konfiguriert Anwendungsressourcen und skaliert automatisch, um jede Menge eingehender Daten zu verarbeiten.
  • AWS Lambda – ein Dienst, der es Ihnen ermöglicht, Code auszuführen, ohne Server zu sichern oder einzurichten. Die gesamte Rechenleistung wird automatisch für jeden Anruf skaliert;
  • Amazon DynamoDB – Eine Datenbank mit Schlüssel-Wert-Paaren und Dokumenten, die bei Ausführung in jedem Maßstab eine Latenz von weniger als 10 Millisekunden bietet. Wenn Sie DynamoDB verwenden, müssen Sie keine Server bereitstellen, patchen oder verwalten. DynamoDB skaliert Tabellen automatisch, um die Menge der verfügbaren Ressourcen anzupassen und eine hohe Leistung aufrechtzuerhalten. Es ist keine Systemadministration erforderlich;
  • AmazonSNS - ein vollständig verwalteter Dienst zum Senden von Nachrichten mithilfe des Publisher-Subscriber-Modells (Pub/Sub), mit dem Sie Microservices, verteilte Systeme und serverlose Anwendungen isolieren können. SNS kann verwendet werden, um Informationen über mobile Push-Benachrichtigungen, SMS-Nachrichten und E-Mails an Endbenutzer zu senden.

Erstausbildung

Um den Datenfluss zu emulieren, habe ich mich entschieden, die von der Aviasales-API zurückgegebenen Flugticketinformationen zu verwenden. IN Dokumentation eine ziemlich umfangreiche Liste verschiedener Methoden, nehmen wir eine davon – den „Monatlichen Preiskalender“, der die Preise für jeden Tag des Monats zurückgibt, gruppiert nach der Anzahl der Überweisungen. Wenn Sie den Suchmonat in der Anfrage nicht angeben, werden Informationen für den Monat zurückgegeben, der auf den aktuellen Monat folgt.

Also, lasst uns registrieren und unseren Token erhalten.

Nachfolgend finden Sie eine Beispielanfrage:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

Die obige Methode zum Empfangen von Daten von der API durch Angabe eines Tokens in der Anfrage funktioniert, aber ich bevorzuge es, das Zugriffstoken über den Header zu übergeben, daher werden wir diese Methode im Skript api_caller.py verwenden.

Antwortbeispiel:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

Die obige Beispiel-API-Antwort zeigt ein Ticket von St. Petersburg nach Phuk ... Oh, was für ein Traum ...
Da ich aus Kasan komme und Phuket jetzt „nur noch ein Traum“ ist, suchen wir nach Tickets von St. Petersburg nach Kasan.

Es wird davon ausgegangen, dass Sie bereits über ein AWS-Konto verfügen. Ich möchte sofort darauf hinweisen, dass Kinesis und der Versand von Benachrichtigungen per SMS nicht im Jahresumfang enthalten sind Kostenloses Kontingent (kostenlose Nutzung). Aber trotzdem ist es mit ein paar Dollar im Hinterkopf durchaus möglich, das vorgeschlagene System zu bauen und damit zu spielen. Und vergessen Sie natürlich nicht, alle Ressourcen zu löschen, wenn sie nicht mehr benötigt werden.

Glücklicherweise sind die DynamoDb- und Lambda-Funktionen für uns kostenlos, wenn wir unsere monatlichen kostenlosen Limits einhalten. Beispiel für DynamoDB: 25 GB Speicher, 25 WCU/RCU und 100 Millionen Abfragen. Und eine Million Lambda-Funktionsaufrufe pro Monat.

Manuelle Systembereitstellung

Einrichten von Kinesis-Datenströmen

Gehen wir zum Kinesis Data Streams-Dienst und erstellen zwei neue Streams, jeweils einen Shard.

Was ist eine Scherbe?
Ein Shard ist die grundlegende Datenübertragungseinheit eines Amazon Kinesis-Streams. Ein Segment ermöglicht die Eingabedatenübertragung mit einer Geschwindigkeit von 1 MB/s und die Ausgabedatenübertragung mit einer Geschwindigkeit von 2 MB/s. Ein Segment unterstützt bis zu 1000 PUT-Einträge pro Sekunde. Beim Erstellen eines Datenstroms müssen Sie die erforderliche Anzahl von Segmenten angeben. Sie können beispielsweise einen Datenstrom mit zwei Segmenten erstellen. Dieser Datenstrom ermöglicht eine Eingabedatenübertragung mit 2 MB/s und eine Ausgabedatenübertragung mit 4 MB/s und unterstützt bis zu 2000 PUT-Datensätze pro Sekunde.

Je mehr Shards Ihr Stream enthält, desto größer ist sein Durchsatz. Im Prinzip werden Flüsse auf diese Weise skaliert – durch das Hinzufügen von Shards. Aber je mehr Scherben Sie haben, desto höher ist der Preis. Jeder Shard kostet 1,5 Cent pro Stunde und zusätzlich 1.4 Cent pro Million PUT-Nutzlasteinheiten.

Erstellen wir einen neuen Stream mit dem Namen Flugtickets, 1 Splitter reicht ihm:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Jetzt erstellen wir einen weiteren Thread mit dem Namen spezieller_stream:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Produzenten-Setup

Um eine Aufgabe zu analysieren, reicht es aus, eine reguläre EC2-Instanz als Datenproduzent zu verwenden. Es muss keine leistungsstarke, teure virtuelle Maschine sein; ein Spot t2.micro reicht vollkommen aus.

Wichtiger Hinweis: Sie sollten beispielsweise das Image Amazon Linux AMI 2018.03.0 verwenden, es verfügt über weniger Einstellungen zum schnellen Starten des Kinesis-Agenten.

Gehen Sie zum EC2-Dienst, erstellen Sie eine neue virtuelle Maschine, wählen Sie das gewünschte AMI mit dem Typ t2.micro aus, das im kostenlosen Kontingent enthalten ist:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Damit die neu erstellte virtuelle Maschine mit dem Kinesis-Dienst interagieren kann, müssen ihr entsprechende Rechte erteilt werden. Der beste Weg, dies zu tun, ist die Zuweisung einer IAM-Rolle. Daher sollten Sie im Bildschirm Schritt 3: Instanzdetails konfigurieren die Option auswählen Erstellen Sie eine neue IAM-Rolle:

Erstellen einer IAM-Rolle für EC2
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Wählen Sie im sich öffnenden Fenster aus, dass wir eine neue Rolle für EC2 erstellen, und gehen Sie zum Abschnitt „Berechtigungen“:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Anhand des Trainingsbeispiels müssen wir nicht auf alle Feinheiten der granularen Konfiguration von Ressourcenrechten eingehen, daher wählen wir die von Amazon vorkonfigurierten Richtlinien aus: AmazonKinesisFullAccess und CloudWatchFullAccess.

Geben wir dieser Rolle einen aussagekräftigen Namen, zum Beispiel: EC2-KinesisStreams-FullAccess. Das Ergebnis sollte das gleiche sein wie im Bild unten:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Vergessen Sie nach dem Erstellen dieser neuen Rolle nicht, sie der erstellten Instanz der virtuellen Maschine anzuhängen:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Wir ändern an diesem Bildschirm nichts weiter und fahren mit den nächsten Fenstern fort.

Die Festplatteneinstellungen können als Standard beibehalten werden, ebenso wie die Tags (obwohl es empfehlenswert ist, Tags zu verwenden, geben Sie der Instanz zumindest einen Namen und geben Sie die Umgebung an).

Jetzt befinden wir uns auf der Registerkarte Schritt 6: Sicherheitsgruppe konfigurieren, wo Sie eine neue erstellen oder Ihre vorhandene Sicherheitsgruppe angeben müssen, die Ihnen die Verbindung über SSH (Port 22) mit der Instanz ermöglicht. Wählen Sie dort Quelle -> Meine IP und Sie können die Instanz starten.

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Sobald es in den Ausführungsstatus wechselt, können Sie versuchen, eine Verbindung per SSH herzustellen.

Um mit Kinesis Agent arbeiten zu können, müssen Sie nach erfolgreicher Verbindung mit der Maschine die folgenden Befehle im Terminal eingeben:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

Erstellen wir einen Ordner zum Speichern von API-Antworten:

sudo mkdir /var/log/airline_tickets

Bevor Sie den Agenten starten, müssen Sie seine Konfiguration konfigurieren:

sudo vim /etc/aws-kinesis/agent.json

Der Inhalt der Datei agent.json sollte wie folgt aussehen:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

Wie aus der Konfigurationsdatei hervorgeht, überwacht der Agent Dateien mit der Erweiterung .log im Verzeichnis /var/log/airline_tickets/, analysiert sie und überträgt sie an den Airline_tickets-Stream.

Wir starten den Dienst neu und stellen sicher, dass er betriebsbereit ist:

sudo service aws-kinesis-agent restart

Laden wir nun das Python-Skript herunter, das Daten von der API anfordert:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

Das Skript api_caller.py fordert Daten von Aviasales an und speichert die empfangene Antwort in dem Verzeichnis, das der Kinesis-Agent scannt. Die Implementierung dieses Skripts ist recht standardmäßig, es gibt eine TicketsApi-Klasse, mit der Sie die API asynchron abrufen können. Wir übergeben einen Header mit einem Token und Anforderungsparametern an diese Klasse:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

Um die korrekten Einstellungen und Funktionen des Agenten zu testen, führen wir das Skript api_caller.py testweise aus:

sudo ./api_caller.py TOKEN

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Und wir sehen uns das Ergebnis der Arbeit in den Agentenprotokollen und auf der Registerkarte „Überwachung“ im Datenstrom „airline_tickets“ an:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Wie Sie sehen, funktioniert alles und der Kinesis Agent sendet erfolgreich Daten an den Stream. Lassen Sie uns nun den Verbraucher konfigurieren.

Einrichten von Kinesis Data Analytics

Kommen wir zur zentralen Komponente des gesamten Systems – erstellen Sie eine neue Anwendung in Kinesis Data Analytics mit dem Namen kinesis_analytics_airlines_app:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Mit Kinesis Data Analytics können Sie Echtzeit-Datenanalysen von Kinesis Streams mithilfe der SQL-Sprache durchführen. Es handelt sich um einen vollständig automatisch skalierenden Dienst (im Gegensatz zu Kinesis Streams), der:

  1. ermöglicht die Erstellung neuer Streams (Output Stream) basierend auf Anfragen zur Datenquelle;
  2. stellt einen Stream mit Fehlern bereit, die während der Ausführung von Anwendungen aufgetreten sind (Error Stream);
  3. kann das Eingabedatenschema automatisch bestimmen (es kann bei Bedarf manuell neu definiert werden).

Dies ist kein billiger Service – 0.11 USD pro Arbeitsstunde, daher sollten Sie ihn vorsichtig verwenden und ihn löschen, wenn Sie fertig sind.

Verbinden wir die Anwendung mit der Datenquelle:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Wählen Sie den Stream aus, mit dem wir eine Verbindung herstellen möchten (airline_tickets):

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Als Nächstes müssen Sie eine neue IAM-Rolle anhängen, damit die Anwendung aus dem Stream lesen und in den Stream schreiben kann. Dazu reicht es aus, im Block Zugriffsberechtigungen nichts zu ändern:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Lassen Sie uns nun die Entdeckung des Datenschemas im Stream anfordern; klicken Sie dazu auf die Schaltfläche „Schema entdecken“. Infolgedessen wird die IAM-Rolle aktualisiert (eine neue wird erstellt) und die Schemaerkennung wird anhand der Daten gestartet, die bereits im Stream angekommen sind:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Jetzt müssen Sie zum SQL-Editor gehen. Wenn Sie auf diese Schaltfläche klicken, erscheint ein Fenster, in dem Sie aufgefordert werden, die Anwendung zu starten. Wählen Sie aus, was Sie starten möchten:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Fügen Sie die folgende einfache Abfrage in das SQL-Editor-Fenster ein und klicken Sie auf „SQL speichern und ausführen“:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

In relationalen Datenbanken arbeiten Sie mit Tabellen, indem Sie INSERT-Anweisungen zum Hinzufügen von Datensätzen und eine SELECT-Anweisung zum Abfragen von Daten verwenden. In Amazon Kinesis Data Analytics arbeiten Sie mit Streams (STREAMs) und Pumpen (PUMPs) – kontinuierlichen Einfügeanforderungen, die Daten aus einem Stream in einer Anwendung in einen anderen Stream einfügen.

Die oben dargestellte SQL-Abfrage sucht nach Aeroflot-Tickets mit einem Preis unter fünftausend Rubel. Alle Datensätze, die diese Bedingungen erfüllen, werden im Stream DESTINATION_SQL_STREAM platziert.

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Wählen Sie im Zielblock den Stream „special_stream“ und in der Dropdown-Liste „Name des anwendungsinternen Streams“ DESTINATION_SQL_STREAM aus:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Das Ergebnis aller Manipulationen sollte in etwa wie im Bild unten aussehen:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Erstellen und Abonnieren eines SNS-Themas

Gehen Sie zum Simple Notification Service und erstellen Sie dort ein neues Thema mit dem Namen Airlines:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Abonnieren Sie dieses Thema und geben Sie die Mobiltelefonnummer an, an die SMS-Benachrichtigungen gesendet werden:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Erstellen Sie eine Tabelle in DynamoDB

Um die Rohdaten aus ihrem Airline_tickets-Stream zu speichern, erstellen wir in DynamoDB eine Tabelle mit demselben Namen. Wir werden record_id als Primärschlüssel verwenden:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Erstellen eines Lambda-Funktionskollektors

Erstellen wir eine Lambda-Funktion namens Collector, deren Aufgabe darin besteht, den Airline_tickets-Stream abzufragen und, wenn dort neue Datensätze gefunden werden, diese Datensätze in die DynamoDB-Tabelle einzufügen. Zusätzlich zu den Standardrechten muss dieses Lambda natürlich Lesezugriff auf den Kinesis-Datenstrom und Schreibzugriff auf DynamoDB haben.

Erstellen einer IAM-Rolle für die Collector-Lambda-Funktion
Erstellen wir zunächst eine neue IAM-Rolle für das Lambda mit dem Namen Lambda-TicketsProcessingRole:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Für das Testbeispiel sind die vorkonfigurierten Richtlinien AmazonKinesisReadOnlyAccess und AmazonDynamoDBFullAccess durchaus geeignet, wie im Bild unten dargestellt:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Dieses Lambda sollte durch einen Trigger von Kinesis gestartet werden, wenn neue Einträge in den Airline_stream eingehen, daher müssen wir einen neuen Trigger hinzufügen:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Es bleibt nur noch, den Code einzufügen und das Lambda zu speichern.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

Erstellen eines Lambda-Funktions-Notifiers

Die zweite Lambda-Funktion, die den zweiten Stream (special_stream) überwacht und eine Benachrichtigung an SNS sendet, wird auf ähnliche Weise erstellt. Daher muss dieses Lambda Zugriff haben, um von Kinesis zu lesen und Nachrichten an ein bestimmtes SNS-Thema zu senden, die dann vom SNS-Dienst an alle Abonnenten dieses Themas gesendet werden (E-Mail, SMS usw.).

Erstellen einer IAM-Rolle
Zuerst erstellen wir die IAM-Rolle Lambda-KinesisAlarm für dieses Lambda und weisen diese Rolle dann dem zu erstellenden alarm_notifier-Lambda zu:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Dieses Lambda sollte an einem Auslöser arbeiten, damit neue Datensätze in den „special_stream“ gelangen. Daher müssen Sie den Auslöser auf die gleiche Weise konfigurieren, wie wir es für das Collector-Lambda getan haben.

Um die Konfiguration dieses Lambda zu vereinfachen, führen wir eine neue Umgebungsvariable ein – TOPIC_ARN, in der wir die ANR (Amazon Recourse Names) des Airlines-Themas platzieren:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Und den Lambda-Code einfügen, es ist überhaupt nicht kompliziert:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

Es scheint, dass hier die manuelle Systemkonfiguration abgeschlossen ist. Jetzt müssen wir nur noch testen und sicherstellen, dass wir alles richtig konfiguriert haben.

Bereitstellung aus Terraform-Code

Notwendige Vorbereitung

Terraform ist ein sehr praktisches Open-Source-Tool zum Bereitstellen von Infrastruktur aus Code. Es verfügt über eine eigene Syntax, die leicht zu erlernen ist und viele Beispiele dafür enthält, wie und was bereitgestellt werden soll. Der Atom-Editor oder Visual Studio Code verfügt über viele praktische Plugins, die die Arbeit mit Terraform erleichtern.

Sie können die Distribution herunterladen daher. Eine detaillierte Analyse aller Terraform-Fähigkeiten würde den Rahmen dieses Artikels sprengen, daher beschränken wir uns auf die wesentlichen Punkte.

Wie man rennt

Der vollständige Code des Projekts ist in meinem Repository. Wir klonen das Repository für uns selbst. Bevor Sie beginnen, müssen Sie sicherstellen, dass AWS CLI installiert und konfiguriert ist, denn... Terraform sucht in der Datei ~/.aws/credentials nach Anmeldeinformationen.

Eine gute Vorgehensweise besteht darin, den Befehl „plan“ auszuführen, bevor die gesamte Infrastruktur bereitgestellt wird, um zu sehen, was Terraform derzeit für uns in der Cloud erstellt:

terraform.exe plan

Sie werden aufgefordert, eine Telefonnummer einzugeben, an die Benachrichtigungen gesendet werden sollen. Eine Eingabe ist zu diesem Zeitpunkt nicht erforderlich.

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Nachdem wir den Betriebsplan des Programms analysiert haben, können wir mit der Erstellung von Ressourcen beginnen:

terraform.exe apply

Nach dem Senden dieses Befehls werden Sie erneut aufgefordert, eine Telefonnummer einzugeben; wählen Sie „Ja“, wenn eine Frage zur tatsächlichen Durchführung der Aktionen angezeigt wird. Auf diese Weise können Sie die gesamte Infrastruktur einrichten, alle erforderlichen EC2-Konfigurationen durchführen, Lambda-Funktionen bereitstellen usw.

Nachdem alle Ressourcen erfolgreich über den Terraform-Code erstellt wurden, müssen Sie auf die Details der Kinesis Analytics-Anwendung eingehen (leider habe ich nicht herausgefunden, wie das direkt aus dem Code geht).

Anwendung starten:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Danach müssen Sie den In-Application-Stream-Namen explizit festlegen, indem Sie Folgendes aus der Dropdown-Liste auswählen:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Jetzt ist alles startklar.

Testen der Anwendung

Unabhängig davon, wie Sie das System manuell oder über Terraform-Code bereitgestellt haben, funktioniert es immer gleich.

Wir melden uns über SSH bei der virtuellen EC2-Maschine an, auf der Kinesis Agent installiert ist, und führen das Skript api_caller.py aus

sudo ./api_caller.py TOKEN

Sie müssen lediglich auf eine SMS an Ihre Nummer warten:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
SMS – in fast 1 Minute kommt eine Nachricht auf dem Telefon an:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Es bleibt abzuwarten, ob die Datensätze für eine spätere, detailliertere Analyse in der DynamoDB-Datenbank gespeichert wurden. Die Tabelle „airline_tickets“ enthält ungefähr die folgenden Daten:

Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit

Abschluss

Im Zuge der durchgeführten Arbeiten wurde ein Online-Datenverarbeitungssystem auf Basis von Amazon Kinesis aufgebaut. Berücksichtigt wurden Optionen für den Einsatz des Kinesis-Agenten in Verbindung mit Kinesis Data Streams und Echtzeitanalysen Kinesis Analytics mithilfe von SQL-Befehlen sowie die Interaktion von Amazon Kinesis mit anderen AWS-Diensten.

Wir haben das obige System auf zwei Arten bereitgestellt: eine ziemlich lange manuelle und eine schnelle über den Terraform-Code.

Der gesamte Quellcode des Projekts ist verfügbar in meinem GitHub-RepositoryIch schlage vor, dass Sie sich damit vertraut machen.

Ich freue mich über die Diskussion des Artikels und freue mich auf Ihre Kommentare. Ich hoffe auf konstruktive Kritik.

Ich wünsche Ihnen viel Erfolg!

Source: habr.com

Kommentar hinzufügen