ProHoster > Blog > Verwaltung > Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Aviasales-API-Integration mit Amazon Kinesis und serverlose Einfachheit
Hey Habr!
Magst du fliegende Flugzeuge? Ich liebe es, aber während der Selbstisolation habe ich mich auch in die Analyse von Flugticketdaten einer bekannten Quelle verliebt – Aviasales.
Heute analysieren wir die Arbeit von Amazon Kinesis, bauen ein Streaming-System mit Echtzeitanalysen auf, installieren die Amazon DynamoDB NoSQL-Datenbank als Hauptdatenspeicher und richten SMS-Benachrichtigungen für interessante Tickets ein.
Alle Details sind unter dem Schnitt! Gehen!
Einführung
Für das Beispiel benötigen wir Zugriff auf Aviasales-API. Der Zugriff darauf ist kostenlos und ohne Einschränkungen möglich; Sie müssen sich lediglich im Bereich „Entwickler“ registrieren, um Ihr API-Token für den Zugriff auf die Daten zu erhalten.
Der Hauptzweck dieses Artikels besteht darin, ein allgemeines Verständnis der Verwendung von Informations-Streaming in AWS zu vermitteln. Wir berücksichtigen, dass die von der verwendeten API zurückgegebenen Daten nicht unbedingt aktuell sind und aus dem Cache übertragen werden Wird auf der Grundlage von Suchanfragen von Benutzern der Websites Aviasales.ru und Jetradar.com in den letzten 48 Stunden erstellt.
Der Kinesis-Agent, der auf der produzierenden Maschine installiert ist und über die API empfangen wird, analysiert die Daten automatisch und überträgt sie über Kinesis Data Analytics an den gewünschten Stream. Die Rohversion dieses Streams wird direkt in den Store geschrieben. Der in DynamoDB bereitgestellte Rohdatenspeicher ermöglicht eine tiefergehende Ticketanalyse durch BI-Tools wie AWS Quick Sight.
Wir werden zwei Optionen für die Bereitstellung der gesamten Infrastruktur in Betracht ziehen:
Manuell – über die AWS-Managementkonsole;
Infrastruktur aus Terraform-Code ist für faule Automatisierer;
Die Architektur des entwickelten Systems
Verwendete Komponenten:
Aviasales-API — Die von dieser API zurückgegebenen Daten werden für alle nachfolgenden Arbeiten verwendet.
EC2-Produzenteninstanz — eine reguläre virtuelle Maschine in der Cloud, auf der der Eingabedatenstrom generiert wird:
Kinesis-Agent ist eine lokal auf dem Computer installierte Java-Anwendung, die eine einfache Möglichkeit bietet, Daten zu sammeln und an Kinesis (Kinesis Data Streams oder Kinesis Firehose) zu senden. Der Agent überwacht ständig eine Reihe von Dateien in den angegebenen Verzeichnissen und sendet neue Daten an Kinesis;
API-Aufruferskript – Ein Python-Skript, das Anfragen an die API stellt und die Antwort in einen Ordner legt, der vom Kinesis-Agenten überwacht wird;
Kinesis-Datenströme — Echtzeit-Daten-Streaming-Dienst mit breiten Skalierungsfunktionen;
Kinesis Analytics ist ein serverloser Dienst, der die Analyse von Streaming-Daten in Echtzeit vereinfacht. Amazon Kinesis Data Analytics konfiguriert Anwendungsressourcen und skaliert automatisch, um jede Menge eingehender Daten zu verarbeiten.
AWS Lambda – ein Dienst, der es Ihnen ermöglicht, Code auszuführen, ohne Server zu sichern oder einzurichten. Die gesamte Rechenleistung wird automatisch für jeden Anruf skaliert;
Amazon DynamoDB – Eine Datenbank mit Schlüssel-Wert-Paaren und Dokumenten, die bei Ausführung in jedem Maßstab eine Latenz von weniger als 10 Millisekunden bietet. Wenn Sie DynamoDB verwenden, müssen Sie keine Server bereitstellen, patchen oder verwalten. DynamoDB skaliert Tabellen automatisch, um die Menge der verfügbaren Ressourcen anzupassen und eine hohe Leistung aufrechtzuerhalten. Es ist keine Systemadministration erforderlich;
AmazonSNS - ein vollständig verwalteter Dienst zum Senden von Nachrichten mithilfe des Publisher-Subscriber-Modells (Pub/Sub), mit dem Sie Microservices, verteilte Systeme und serverlose Anwendungen isolieren können. SNS kann verwendet werden, um Informationen über mobile Push-Benachrichtigungen, SMS-Nachrichten und E-Mails an Endbenutzer zu senden.
Erstausbildung
Um den Datenfluss zu emulieren, habe ich mich entschieden, die von der Aviasales-API zurückgegebenen Flugticketinformationen zu verwenden. IN Dokumentation eine ziemlich umfangreiche Liste verschiedener Methoden, nehmen wir eine davon – den „Monatlichen Preiskalender“, der die Preise für jeden Tag des Monats zurückgibt, gruppiert nach der Anzahl der Überweisungen. Wenn Sie den Suchmonat in der Anfrage nicht angeben, werden Informationen für den Monat zurückgegeben, der auf den aktuellen Monat folgt.
Also, lasst uns registrieren und unseren Token erhalten.
Die obige Methode zum Empfangen von Daten von der API durch Angabe eines Tokens in der Anfrage funktioniert, aber ich bevorzuge es, das Zugriffstoken über den Header zu übergeben, daher werden wir diese Methode im Skript api_caller.py verwenden.
Die obige Beispiel-API-Antwort zeigt ein Ticket von St. Petersburg nach Phuk ... Oh, was für ein Traum ...
Da ich aus Kasan komme und Phuket jetzt „nur noch ein Traum“ ist, suchen wir nach Tickets von St. Petersburg nach Kasan.
Es wird davon ausgegangen, dass Sie bereits über ein AWS-Konto verfügen. Ich möchte sofort darauf hinweisen, dass Kinesis und der Versand von Benachrichtigungen per SMS nicht im Jahresumfang enthalten sind Kostenloses Kontingent (kostenlose Nutzung). Aber trotzdem ist es mit ein paar Dollar im Hinterkopf durchaus möglich, das vorgeschlagene System zu bauen und damit zu spielen. Und vergessen Sie natürlich nicht, alle Ressourcen zu löschen, wenn sie nicht mehr benötigt werden.
Glücklicherweise sind die DynamoDb- und Lambda-Funktionen für uns kostenlos, wenn wir unsere monatlichen kostenlosen Limits einhalten. Beispiel für DynamoDB: 25 GB Speicher, 25 WCU/RCU und 100 Millionen Abfragen. Und eine Million Lambda-Funktionsaufrufe pro Monat.
Manuelle Systembereitstellung
Einrichten von Kinesis-Datenströmen
Gehen wir zum Kinesis Data Streams-Dienst und erstellen zwei neue Streams, jeweils einen Shard.
Was ist eine Scherbe?
Ein Shard ist die grundlegende Datenübertragungseinheit eines Amazon Kinesis-Streams. Ein Segment ermöglicht die Eingabedatenübertragung mit einer Geschwindigkeit von 1 MB/s und die Ausgabedatenübertragung mit einer Geschwindigkeit von 2 MB/s. Ein Segment unterstützt bis zu 1000 PUT-Einträge pro Sekunde. Beim Erstellen eines Datenstroms müssen Sie die erforderliche Anzahl von Segmenten angeben. Sie können beispielsweise einen Datenstrom mit zwei Segmenten erstellen. Dieser Datenstrom ermöglicht eine Eingabedatenübertragung mit 2 MB/s und eine Ausgabedatenübertragung mit 4 MB/s und unterstützt bis zu 2000 PUT-Datensätze pro Sekunde.
Je mehr Shards Ihr Stream enthält, desto größer ist sein Durchsatz. Im Prinzip werden Flüsse auf diese Weise skaliert – durch das Hinzufügen von Shards. Aber je mehr Scherben Sie haben, desto höher ist der Preis. Jeder Shard kostet 1,5 Cent pro Stunde und zusätzlich 1.4 Cent pro Million PUT-Nutzlasteinheiten.
Erstellen wir einen neuen Stream mit dem Namen Flugtickets, 1 Splitter reicht ihm:
Jetzt erstellen wir einen weiteren Thread mit dem Namen spezieller_stream:
Produzenten-Setup
Um eine Aufgabe zu analysieren, reicht es aus, eine reguläre EC2-Instanz als Datenproduzent zu verwenden. Es muss keine leistungsstarke, teure virtuelle Maschine sein; ein Spot t2.micro reicht vollkommen aus.
Wichtiger Hinweis: Sie sollten beispielsweise das Image Amazon Linux AMI 2018.03.0 verwenden, es verfügt über weniger Einstellungen zum schnellen Starten des Kinesis-Agenten.
Gehen Sie zum EC2-Dienst, erstellen Sie eine neue virtuelle Maschine, wählen Sie das gewünschte AMI mit dem Typ t2.micro aus, das im kostenlosen Kontingent enthalten ist:
Damit die neu erstellte virtuelle Maschine mit dem Kinesis-Dienst interagieren kann, müssen ihr entsprechende Rechte erteilt werden. Der beste Weg, dies zu tun, ist die Zuweisung einer IAM-Rolle. Daher sollten Sie im Bildschirm Schritt 3: Instanzdetails konfigurieren die Option auswählen Erstellen Sie eine neue IAM-Rolle:
Erstellen einer IAM-Rolle für EC2
Wählen Sie im sich öffnenden Fenster aus, dass wir eine neue Rolle für EC2 erstellen, und gehen Sie zum Abschnitt „Berechtigungen“:
Anhand des Trainingsbeispiels müssen wir nicht auf alle Feinheiten der granularen Konfiguration von Ressourcenrechten eingehen, daher wählen wir die von Amazon vorkonfigurierten Richtlinien aus: AmazonKinesisFullAccess und CloudWatchFullAccess.
Geben wir dieser Rolle einen aussagekräftigen Namen, zum Beispiel: EC2-KinesisStreams-FullAccess. Das Ergebnis sollte das gleiche sein wie im Bild unten:
Vergessen Sie nach dem Erstellen dieser neuen Rolle nicht, sie der erstellten Instanz der virtuellen Maschine anzuhängen:
Wir ändern an diesem Bildschirm nichts weiter und fahren mit den nächsten Fenstern fort.
Die Festplatteneinstellungen können als Standard beibehalten werden, ebenso wie die Tags (obwohl es empfehlenswert ist, Tags zu verwenden, geben Sie der Instanz zumindest einen Namen und geben Sie die Umgebung an).
Jetzt befinden wir uns auf der Registerkarte Schritt 6: Sicherheitsgruppe konfigurieren, wo Sie eine neue erstellen oder Ihre vorhandene Sicherheitsgruppe angeben müssen, die Ihnen die Verbindung über SSH (Port 22) mit der Instanz ermöglicht. Wählen Sie dort Quelle -> Meine IP und Sie können die Instanz starten.
Sobald es in den Ausführungsstatus wechselt, können Sie versuchen, eine Verbindung per SSH herzustellen.
Um mit Kinesis Agent arbeiten zu können, müssen Sie nach erfolgreicher Verbindung mit der Maschine die folgenden Befehle im Terminal eingeben:
Wie aus der Konfigurationsdatei hervorgeht, überwacht der Agent Dateien mit der Erweiterung .log im Verzeichnis /var/log/airline_tickets/, analysiert sie und überträgt sie an den Airline_tickets-Stream.
Wir starten den Dienst neu und stellen sicher, dass er betriebsbereit ist:
sudo service aws-kinesis-agent restart
Laden wir nun das Python-Skript herunter, das Daten von der API anfordert:
Das Skript api_caller.py fordert Daten von Aviasales an und speichert die empfangene Antwort in dem Verzeichnis, das der Kinesis-Agent scannt. Die Implementierung dieses Skripts ist recht standardmäßig, es gibt eine TicketsApi-Klasse, mit der Sie die API asynchron abrufen können. Wir übergeben einen Header mit einem Token und Anforderungsparametern an diese Klasse:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Um die korrekten Einstellungen und Funktionen des Agenten zu testen, führen wir das Skript api_caller.py testweise aus:
sudo ./api_caller.py TOKEN
Und wir sehen uns das Ergebnis der Arbeit in den Agentenprotokollen und auf der Registerkarte „Überwachung“ im Datenstrom „airline_tickets“ an:
Wie Sie sehen, funktioniert alles und der Kinesis Agent sendet erfolgreich Daten an den Stream. Lassen Sie uns nun den Verbraucher konfigurieren.
Einrichten von Kinesis Data Analytics
Kommen wir zur zentralen Komponente des gesamten Systems – erstellen Sie eine neue Anwendung in Kinesis Data Analytics mit dem Namen kinesis_analytics_airlines_app:
Mit Kinesis Data Analytics können Sie Echtzeit-Datenanalysen von Kinesis Streams mithilfe der SQL-Sprache durchführen. Es handelt sich um einen vollständig automatisch skalierenden Dienst (im Gegensatz zu Kinesis Streams), der:
ermöglicht die Erstellung neuer Streams (Output Stream) basierend auf Anfragen zur Datenquelle;
stellt einen Stream mit Fehlern bereit, die während der Ausführung von Anwendungen aufgetreten sind (Error Stream);
kann das Eingabedatenschema automatisch bestimmen (es kann bei Bedarf manuell neu definiert werden).
Dies ist kein billiger Service – 0.11 USD pro Arbeitsstunde, daher sollten Sie ihn vorsichtig verwenden und ihn löschen, wenn Sie fertig sind.
Verbinden wir die Anwendung mit der Datenquelle:
Wählen Sie den Stream aus, mit dem wir eine Verbindung herstellen möchten (airline_tickets):
Als Nächstes müssen Sie eine neue IAM-Rolle anhängen, damit die Anwendung aus dem Stream lesen und in den Stream schreiben kann. Dazu reicht es aus, im Block Zugriffsberechtigungen nichts zu ändern:
Lassen Sie uns nun die Entdeckung des Datenschemas im Stream anfordern; klicken Sie dazu auf die Schaltfläche „Schema entdecken“. Infolgedessen wird die IAM-Rolle aktualisiert (eine neue wird erstellt) und die Schemaerkennung wird anhand der Daten gestartet, die bereits im Stream angekommen sind:
Jetzt müssen Sie zum SQL-Editor gehen. Wenn Sie auf diese Schaltfläche klicken, erscheint ein Fenster, in dem Sie aufgefordert werden, die Anwendung zu starten. Wählen Sie aus, was Sie starten möchten:
Fügen Sie die folgende einfache Abfrage in das SQL-Editor-Fenster ein und klicken Sie auf „SQL speichern und ausführen“:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
In relationalen Datenbanken arbeiten Sie mit Tabellen, indem Sie INSERT-Anweisungen zum Hinzufügen von Datensätzen und eine SELECT-Anweisung zum Abfragen von Daten verwenden. In Amazon Kinesis Data Analytics arbeiten Sie mit Streams (STREAMs) und Pumpen (PUMPs) – kontinuierlichen Einfügeanforderungen, die Daten aus einem Stream in einer Anwendung in einen anderen Stream einfügen.
Die oben dargestellte SQL-Abfrage sucht nach Aeroflot-Tickets mit einem Preis unter fünftausend Rubel. Alle Datensätze, die diese Bedingungen erfüllen, werden im Stream DESTINATION_SQL_STREAM platziert.
Wählen Sie im Zielblock den Stream „special_stream“ und in der Dropdown-Liste „Name des anwendungsinternen Streams“ DESTINATION_SQL_STREAM aus:
Das Ergebnis aller Manipulationen sollte in etwa wie im Bild unten aussehen:
Erstellen und Abonnieren eines SNS-Themas
Gehen Sie zum Simple Notification Service und erstellen Sie dort ein neues Thema mit dem Namen Airlines:
Abonnieren Sie dieses Thema und geben Sie die Mobiltelefonnummer an, an die SMS-Benachrichtigungen gesendet werden:
Erstellen Sie eine Tabelle in DynamoDB
Um die Rohdaten aus ihrem Airline_tickets-Stream zu speichern, erstellen wir in DynamoDB eine Tabelle mit demselben Namen. Wir werden record_id als Primärschlüssel verwenden:
Erstellen eines Lambda-Funktionskollektors
Erstellen wir eine Lambda-Funktion namens Collector, deren Aufgabe darin besteht, den Airline_tickets-Stream abzufragen und, wenn dort neue Datensätze gefunden werden, diese Datensätze in die DynamoDB-Tabelle einzufügen. Zusätzlich zu den Standardrechten muss dieses Lambda natürlich Lesezugriff auf den Kinesis-Datenstrom und Schreibzugriff auf DynamoDB haben.
Erstellen einer IAM-Rolle für die Collector-Lambda-Funktion
Erstellen wir zunächst eine neue IAM-Rolle für das Lambda mit dem Namen Lambda-TicketsProcessingRole:
Für das Testbeispiel sind die vorkonfigurierten Richtlinien AmazonKinesisReadOnlyAccess und AmazonDynamoDBFullAccess durchaus geeignet, wie im Bild unten dargestellt:
Dieses Lambda sollte durch einen Trigger von Kinesis gestartet werden, wenn neue Einträge in den Airline_stream eingehen, daher müssen wir einen neuen Trigger hinzufügen:
Es bleibt nur noch, den Code einzufügen und das Lambda zu speichern.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Erstellen eines Lambda-Funktions-Notifiers
Die zweite Lambda-Funktion, die den zweiten Stream (special_stream) überwacht und eine Benachrichtigung an SNS sendet, wird auf ähnliche Weise erstellt. Daher muss dieses Lambda Zugriff haben, um von Kinesis zu lesen und Nachrichten an ein bestimmtes SNS-Thema zu senden, die dann vom SNS-Dienst an alle Abonnenten dieses Themas gesendet werden (E-Mail, SMS usw.).
Erstellen einer IAM-Rolle
Zuerst erstellen wir die IAM-Rolle Lambda-KinesisAlarm für dieses Lambda und weisen diese Rolle dann dem zu erstellenden alarm_notifier-Lambda zu:
Dieses Lambda sollte an einem Auslöser arbeiten, damit neue Datensätze in den „special_stream“ gelangen. Daher müssen Sie den Auslöser auf die gleiche Weise konfigurieren, wie wir es für das Collector-Lambda getan haben.
Um die Konfiguration dieses Lambda zu vereinfachen, führen wir eine neue Umgebungsvariable ein – TOPIC_ARN, in der wir die ANR (Amazon Recourse Names) des Airlines-Themas platzieren:
Und den Lambda-Code einfügen, es ist überhaupt nicht kompliziert:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Es scheint, dass hier die manuelle Systemkonfiguration abgeschlossen ist. Jetzt müssen wir nur noch testen und sicherstellen, dass wir alles richtig konfiguriert haben.
Bereitstellung aus Terraform-Code
Notwendige Vorbereitung
Terraform ist ein sehr praktisches Open-Source-Tool zum Bereitstellen von Infrastruktur aus Code. Es verfügt über eine eigene Syntax, die leicht zu erlernen ist und viele Beispiele dafür enthält, wie und was bereitgestellt werden soll. Der Atom-Editor oder Visual Studio Code verfügt über viele praktische Plugins, die die Arbeit mit Terraform erleichtern.
Sie können die Distribution herunterladen daher. Eine detaillierte Analyse aller Terraform-Fähigkeiten würde den Rahmen dieses Artikels sprengen, daher beschränken wir uns auf die wesentlichen Punkte.
Wie man rennt
Der vollständige Code des Projekts ist in meinem Repository. Wir klonen das Repository für uns selbst. Bevor Sie beginnen, müssen Sie sicherstellen, dass AWS CLI installiert und konfiguriert ist, denn... Terraform sucht in der Datei ~/.aws/credentials nach Anmeldeinformationen.
Eine gute Vorgehensweise besteht darin, den Befehl „plan“ auszuführen, bevor die gesamte Infrastruktur bereitgestellt wird, um zu sehen, was Terraform derzeit für uns in der Cloud erstellt:
terraform.exe plan
Sie werden aufgefordert, eine Telefonnummer einzugeben, an die Benachrichtigungen gesendet werden sollen. Eine Eingabe ist zu diesem Zeitpunkt nicht erforderlich.
Nachdem wir den Betriebsplan des Programms analysiert haben, können wir mit der Erstellung von Ressourcen beginnen:
terraform.exe apply
Nach dem Senden dieses Befehls werden Sie erneut aufgefordert, eine Telefonnummer einzugeben; wählen Sie „Ja“, wenn eine Frage zur tatsächlichen Durchführung der Aktionen angezeigt wird. Auf diese Weise können Sie die gesamte Infrastruktur einrichten, alle erforderlichen EC2-Konfigurationen durchführen, Lambda-Funktionen bereitstellen usw.
Nachdem alle Ressourcen erfolgreich über den Terraform-Code erstellt wurden, müssen Sie auf die Details der Kinesis Analytics-Anwendung eingehen (leider habe ich nicht herausgefunden, wie das direkt aus dem Code geht).
Anwendung starten:
Danach müssen Sie den In-Application-Stream-Namen explizit festlegen, indem Sie Folgendes aus der Dropdown-Liste auswählen:
Jetzt ist alles startklar.
Testen der Anwendung
Unabhängig davon, wie Sie das System manuell oder über Terraform-Code bereitgestellt haben, funktioniert es immer gleich.
Wir melden uns über SSH bei der virtuellen EC2-Maschine an, auf der Kinesis Agent installiert ist, und führen das Skript api_caller.py aus
sudo ./api_caller.py TOKEN
Sie müssen lediglich auf eine SMS an Ihre Nummer warten:
SMS – in fast 1 Minute kommt eine Nachricht auf dem Telefon an:
Es bleibt abzuwarten, ob die Datensätze für eine spätere, detailliertere Analyse in der DynamoDB-Datenbank gespeichert wurden. Die Tabelle „airline_tickets“ enthält ungefähr die folgenden Daten:
Abschluss
Im Zuge der durchgeführten Arbeiten wurde ein Online-Datenverarbeitungssystem auf Basis von Amazon Kinesis aufgebaut. Berücksichtigt wurden Optionen für den Einsatz des Kinesis-Agenten in Verbindung mit Kinesis Data Streams und Echtzeitanalysen Kinesis Analytics mithilfe von SQL-Befehlen sowie die Interaktion von Amazon Kinesis mit anderen AWS-Diensten.
Wir haben das obige System auf zwei Arten bereitgestellt: eine ziemlich lange manuelle und eine schnelle über den Terraform-Code.
Der gesamte Quellcode des Projekts ist verfügbar in meinem GitHub-RepositoryIch schlage vor, dass Sie sich damit vertraut machen.
Ich freue mich über die Diskussion des Artikels und freue mich auf Ihre Kommentare. Ich hoffe auf konstruktive Kritik.