Aviasales API-integration med Amazon Kinesis og serverløs enkelhed
Hej Habr!
Kan du lide at flyve med fly? Jeg elsker det, men under selvisolering blev jeg også forelsket i at analysere data om flybilletter fra én velkendt ressource - Aviasales.
I dag vil vi analysere Amazon Kinesis' arbejde, bygge et streamingsystem med realtidsanalyse, installere Amazon DynamoDB NoSQL-databasen som hoveddatalageret og opsætte SMS-beskeder for interessante billetter.
Alle detaljer er under snittet! Gå!
Indledning
For eksempel skal vi have adgang til Aviasales API. Adgang til det er gratis og uden begrænsninger; du skal blot registrere dig i afsnittet "Udviklere" for at modtage dit API-token for at få adgang til dataene.
Hovedformålet med denne artikel er at give en generel forståelse af brugen af informationsstreaming i AWS; vi tager højde for, at de data, der returneres af den anvendte API, ikke er strengt opdaterede og transmitteres fra cachen, som er dannet baseret på søgninger foretaget af brugere af Aviasales.ru og Jetradar.com webstederne i de sidste 48 timer.
Kinesis-agent, installeret på den producerende maskine, modtaget via API'et vil automatisk parse og transmittere data til den ønskede strøm via Kinesis Data Analytics. Råversionen af denne stream vil blive skrevet direkte til butikken. Den rå datalagring, der er implementeret i DynamoDB, giver mulighed for dybere billetanalyse gennem BI-værktøjer, såsom AWS Quick Sight.
Vi vil overveje to muligheder for at implementere hele infrastrukturen:
Manual - via AWS Management Console;
Infrastruktur fra Terraform-kode er til dovne automatister;
Arkitekturen af det udviklede system
Brugte komponenter:
Aviasales API — de data, der returneres af denne API, vil blive brugt til alt efterfølgende arbejde;
EC2 Producer Forekomst — en almindelig virtuel maskine i skyen, hvorpå inputdatastrømmen vil blive genereret:
Kinesis Agent er en Java-applikation installeret lokalt på maskinen, som giver en nem måde at indsamle og sende data til Kinesis (Kinesis Data Streams eller Kinesis Firehose). Agenten overvåger konstant et sæt filer i de angivne mapper og sender nye data til Kinesis;
API Caller Script — Et Python-script, der sender anmodninger til API'et og lægger svaret i en mappe, der overvåges af Kinesis Agent;
Kinesis datastrømme — datastreaming i realtid med brede skaleringsmuligheder;
Kinesis Analytics er en serverløs tjeneste, der forenkler analysen af streamingdata i realtid. Amazon Kinesis Data Analytics konfigurerer applikationsressourcer og skalerer automatisk til at håndtere enhver mængde af indgående data;
AWS Lambda — en tjeneste, der giver dig mulighed for at køre kode uden at sikkerhedskopiere eller opsætte servere. Al computerkraft skaleres automatisk for hvert opkald;
Amazon DynamoDB - En database med nøgleværdi-par og dokumenter, der giver en ventetid på mindre end 10 millisekunder, når den kører i enhver skala. Når du bruger DynamoDB, behøver du ikke at klargøre, patche eller administrere nogen servere. DynamoDB skalerer automatisk tabeller for at justere mængden af tilgængelige ressourcer og opretholde høj ydeevne. Ingen systemadministration er påkrævet;
Amazon SNS - en fuldt administreret tjeneste til afsendelse af beskeder ved hjælp af udgiver-abonnent-modellen (Pub/Sub), hvormed du kan isolere mikrotjenester, distribuerede systemer og serverløse applikationer. SNS kan bruges til at sende information til slutbrugere gennem mobile push-beskeder, SMS-beskeder og e-mails.
Indledende træning
For at efterligne datastrømmen besluttede jeg at bruge flybilletoplysningerne returneret af Aviasales API. I dokumentation en ret omfattende liste over forskellige metoder, lad os tage en af dem - "Månedlig priskalender", som returnerer priser for hver dag i måneden, grupperet efter antallet af overførsler. Hvis du ikke angiver søgemåneden i anmodningen, returneres oplysninger for måneden efter den aktuelle.
Ovenstående metode til at modtage data fra API'et ved at angive et token i anmodningen vil fungere, men jeg foretrækker at sende adgangstokenet gennem headeren, så vi vil bruge denne metode i api_caller.py scriptet.
Eksempel-API-svaret ovenfor viser en billet fra St. Petersborg til Phuk... Åh, hvilken drøm...
Da jeg er fra Kazan, og Phuket nu "kun er en drøm", lad os kigge efter billetter fra St. Petersborg til Kazan.
Det forudsætter, at du allerede har en AWS-konto. Jeg vil straks gøre særligt opmærksom på, at Kinesis og afsendelse af notifikationer via SMS ikke er inkluderet i den årlige Gratis niveau (gratis brug). Men selv på trods af dette, med et par dollars i tankerne, er det ganske muligt at bygge det foreslåede system og lege med det. Og, selvfølgelig, glem ikke at slette alle ressourcer, efter at de ikke længere er nødvendige.
Heldigvis vil DynamoDb og lambda funktioner være gratis for os, hvis vi overholder vores månedlige gratis grænser. For eksempel til DynamoDB: 25 GB lagerplads, 25 WCU/RCU og 100 millioner forespørgsler. Og en million lambdafunktionsopkald om måneden.
Manuel systemimplementering
Opsætning af Kinesis Data Streams
Lad os gå til Kinesis Data Streams-tjenesten og oprette to nye streams, et shard til hver.
Hvad er en skærv?
Et shard er den grundlæggende dataoverførselsenhed i en Amazon Kinesis-strøm. Et segment giver input dataoverførsel med en hastighed på 1 MB/s og output dataoverførsel med en hastighed på 2 MB/s. Et segment understøtter op til 1000 PUT-indgange i sekundet. Når du opretter en datastrøm, skal du angive det nødvendige antal segmenter. For eksempel kan du oprette en datastrøm med to segmenter. Denne datastrøm vil give input-dataoverførsel ved 2 MB/s og output-dataoverførsel ved 4 MB/s, hvilket understøtter op til 2000 PUT-poster pr. sekund.
Jo flere skår i din strøm, jo større er dens gennemstrømning. I princippet er det sådan, strømme skaleres - ved at tilføje skår. Men jo flere skår du har, jo højere er prisen. Hvert skår koster 1,5 cent i timen og yderligere 1.4 cent for hver million PUT nyttelastenheder.
Lad os oprette en ny strøm med navnet flybilletter, 1 skår vil være nok til ham:
Lad os nu oprette endnu en tråd med navnet special_stream:
Opsætning af producent
For at analysere en opgave er det nok at bruge en almindelig EC2-instans som dataproducent. Det behøver ikke at være en kraftfuld, dyr virtuel maskine; en spot t2.micro vil klare sig fint.
Vigtig bemærkning: for eksempel skal du bruge billede - Amazon Linux AMI 2018.03.0, det har færre indstillinger til hurtigt at starte Kinesis Agent.
Gå til EC2-tjenesten, opret en ny virtuel maskine, vælg den ønskede AMI med typen t2.micro, som er inkluderet i Free Tier:
For at den nyoprettede virtuelle maskine skal kunne interagere med Kinesis-tjenesten, skal den have rettigheder til det. Den bedste måde at gøre dette på er at tildele en IAM-rolle. Derfor skal du på skærmen Trin 3: Konfigurer instansdetaljer vælge Opret ny IAM-rolle:
Oprettelse af en IAM-rolle for EC2
I vinduet, der åbnes, skal du vælge, at vi opretter en ny rolle for EC2 og gå til sektionen Tilladelser:
Ved at bruge træningseksemplet behøver vi ikke at gå ind i alle forviklingerne ved granulær konfiguration af ressourcerettigheder, så vi vælger de politikker, der er forudkonfigureret af Amazon: AmazonKinesisFullAccess og CloudWatchFullAccess.
Lad os give denne rolle et meningsfuldt navn, for eksempel: EC2-KinesisStreams-FullAccess. Resultatet skal være det samme som vist på billedet nedenfor:
Efter at have oprettet denne nye rolle, glem ikke at vedhæfte den til den oprettede virtuelle maskine-instans:
Vi ændrer ikke andet på denne skærm og går videre til de næste vinduer.
Harddiskindstillingerne kan stå som standard, såvel som tags (selvom det er god praksis at bruge tags, giv i det mindste forekomsten et navn og angiv miljøet).
Nu er vi på fanen Trin 6: Konfigurer sikkerhedsgruppe, hvor du skal oprette en ny eller angive din eksisterende sikkerhedsgruppe, som giver dig mulighed for at oprette forbindelse via ssh (port 22) til instansen. Vælg Kilde -> Min IP der, og du kan starte forekomsten.
Så snart den skifter til kørestatus, kan du prøve at oprette forbindelse til den via ssh.
For at kunne arbejde med Kinesis Agent skal du, efter at have oprettet forbindelse til maskinen, indtaste følgende kommandoer i terminalen:
Som det kan ses af konfigurationsfilen, vil agenten overvåge filer med .log-udvidelsen i mappen /var/log/airline_tickets/, parse dem og overføre dem til airline_tickets-strømmen.
Vi genstarter tjenesten og sikrer, at den er oppe og køre:
sudo service aws-kinesis-agent restart
Lad os nu downloade Python-scriptet, der vil anmode om data fra API'en:
Scriptet api_caller.py anmoder om data fra Aviasales og gemmer det modtagne svar i den mappe, som Kinesis-agenten scanner. Implementeringen af dette script er ret standard, der er en TicketsApi-klasse, den giver dig mulighed for asynkront at trække API'en. Vi sender en header med et token og anmoder om parametre til denne klasse:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
For at teste agentens korrekte indstillinger og funktionalitet, lad os prøvekøre api_caller.py-scriptet:
sudo ./api_caller.py TOKEN
Og vi ser på resultatet af arbejdet i agentlogfilerne og på fanen Overvågning i airline_tickets-datastrømmen:
Som du kan se, fungerer alt, og Kinesis Agent sender med succes data til strømmen. Lad os nu konfigurere forbrugeren.
Opsætning af Kinesis Data Analytics
Lad os gå videre til den centrale komponent i hele systemet - opret en ny applikation i Kinesis Data Analytics ved navn kinesis_analytics_airlines_app:
Kinesis Data Analytics giver dig mulighed for at udføre dataanalyse i realtid fra Kinesis Streams ved hjælp af SQL-sproget. Det er en fuld autoskaleringstjeneste (i modsætning til Kinesis Streams), der:
giver dig mulighed for at oprette nye streams (Output Stream) baseret på anmodninger om kildedata;
giver en strøm med fejl, der opstod, mens programmer kørte (fejlstrøm);
kan automatisk bestemme inputdataskemaet (det kan omdefineres manuelt om nødvendigt).
Dette er ikke en billig service - 0.11 USD per time arbejde, så du bør bruge den forsigtigt og slette den, når du er færdig.
Lad os forbinde applikationen til datakilden:
Vælg den strøm, vi vil oprette forbindelse til (flybilletter):
Dernæst skal du vedhæfte en ny IAM-rolle, så applikationen kan læse fra streamen og skrive til streamen. For at gøre dette er det nok ikke at ændre noget i adgangstilladelsesblokken:
Lad os nu anmode om opdagelse af dataskemaet i strømmen; for at gøre dette skal du klikke på knappen "Opdag skema". Som et resultat heraf vil IAM-rollen blive opdateret (en ny oprettes), og skemadetektion vil blive lanceret fra de data, der allerede er ankommet i strømmen:
Nu skal du gå til SQL-editoren. Når du klikker på denne knap, vises et vindue, hvor du bliver bedt om at starte programmet - vælg hvad du vil starte:
Indsæt følgende enkle forespørgsel i SQL-editorvinduet, og klik på Gem og kør SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
I relationelle databaser arbejder du med tabeller ved hjælp af INSERT-sætninger til at tilføje poster og en SELECT-sætning til at forespørge data. I Amazon Kinesis Data Analytics arbejder du med strømme (STREAMs) og pumper (PUMP'er) – kontinuerlige indsættelsesanmodninger, der indsætter data fra én strøm i en applikation i en anden strøm.
SQL-forespørgslen præsenteret ovenfor søger efter Aeroflot-billetter til en pris på under fem tusind rubler. Alle poster, der opfylder disse betingelser, vil blive placeret i DESTINATION_SQL_STREAM-strømmen.
I Destinationsblokken skal du vælge special_stream-strømmen og på rullelisten DESTINATION_SQL_STREAM i rullelisten In-application stream name:
Resultatet af alle manipulationer skulle være noget, der ligner billedet nedenfor:
Oprettelse og abonnement på et SNS-emne
Gå til Simple Notification Service og opret et nyt emne der med navnet Airlines:
Abonner på dette emne og angiv det mobiltelefonnummer, som SMS-beskeder vil blive sendt til:
Opret en tabel i DynamoDB
Lad os oprette en tabel i DynamoDB med samme navn for at gemme rådataene fra deres flyrejsestrøm. Vi vil bruge record_id som den primære nøgle:
Oprettelse af en lambdafunktionsopsamler
Lad os oprette en lambda-funktion kaldet Collector, hvis opgave vil være at polle airline_tickets-strømmen og, hvis nye poster findes der, indsætte disse poster i DynamoDB-tabellen. Ud over standardrettighederne skal denne lambda naturligvis have læseadgang til Kinesis-datastrømmen og skriveadgang til DynamoDB.
Oprettelse af en IAM-rolle for collector lambda-funktionen
Lad os først oprette en ny IAM-rolle for lambdaen ved navn Lambda-TicketsProcessingRole:
Til testeksemplet er de forudkonfigurerede AmazonKinesisReadOnlyAccess- og AmazonDynamoDBFullAccess-politikker ganske velegnede, som vist på billedet nedenfor:
Denne lambda bør lanceres af en trigger fra Kinesis, når nye poster kommer ind i airline_stream, så vi skal tilføje en ny trigger:
Tilbage er blot at indsætte koden og gemme lambdaen.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Oprettelse af en lambdafunktionsmeddelelse
Den anden lambda-funktion, som vil overvåge den anden strøm (special_stream) og sende en notifikation til SNS, er oprettet på lignende måde. Derfor skal denne lambda have adgang til at læse fra Kinesis og sende beskeder til et givet SNS-emne, som så sendes af SNS-tjenesten til alle abonnenter af dette emne (e-mail, SMS, etc.).
Oprettelse af en IAM-rolle
Først opretter vi IAM-rollen Lambda-KinesisAlarm for denne lambda, og tildeler derefter denne rolle til den alarm_notifier lambda, der oprettes:
Denne lambda skulle fungere på en trigger for nye poster for at komme ind i special_stream, så du skal konfigurere triggeren på samme måde som vi gjorde for Collector lambdaen.
For at gøre det nemmere at konfigurere denne lambda, lad os introducere en ny miljøvariabel - TOPIC_ARN, hvor vi placerer ANR (Amazon Recourse Names) for Airlines-emnet:
Og indsæt lambdakoden, det er slet ikke kompliceret:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Det ser ud til, at det er her den manuelle systemkonfiguration er afsluttet. Det eneste, der er tilbage, er at teste og sikre, at vi har konfigureret alt korrekt.
Implementer fra Terraform-kode
Nødvendig forberedelse
terraform er et meget praktisk open source-værktøj til at implementere infrastruktur fra kode. Det har sin egen syntaks, der er let at lære og har mange eksempler på, hvordan og hvad der skal implementeres. Atom-editoren eller Visual Studio Code har mange praktiske plugins, der gør arbejdet med Terraform lettere.
Du kan downloade distributionen dermed. En detaljeret analyse af alle Terraform-kapaciteter ligger uden for rammerne af denne artikel, så vi vil begrænse os til hovedpunkterne.
Sådan starter du
Den fulde kode for projektet er i mit depot. Vi kloner depotet til os selv. Før du starter, skal du sikre dig, at du har AWS CLI installeret og konfigureret, fordi... Terraform vil lede efter legitimationsoplysninger i filen ~/.aws/credentials.
En god praksis er at køre plankommandoen, før du implementerer hele infrastrukturen for at se, hvad Terraform i øjeblikket skaber for os i skyen:
terraform.exe plan
Du bliver bedt om at indtaste et telefonnummer, du vil sende meddelelser til. Det er ikke nødvendigt at indtaste det på dette tidspunkt.
Efter at have analyseret programmets driftsplan kan vi begynde at oprette ressourcer:
terraform.exe apply
Efter at have sendt denne kommando, bliver du igen bedt om at indtaste et telefonnummer; tast "ja", når et spørgsmål om faktisk udførelse af handlingerne vises. Dette giver dig mulighed for at opsætte hele infrastrukturen, udføre al den nødvendige konfiguration af EC2, implementere lambda-funktioner osv.
Efter at alle ressourcer er blevet oprettet gennem Terraform-koden, skal du gå ind i detaljerne i Kinesis Analytics-applikationen (desværre fandt jeg ikke, hvordan man gør dette direkte fra koden).
Start applikationen:
Herefter skal du udtrykkeligt angive navnet på streaming i applikationen ved at vælge fra rullelisten:
Nu er alt klar til at gå.
Test af applikationen
Uanset hvordan du implementerede systemet, manuelt eller gennem Terraform-kode, vil det fungere på samme måde.
Vi logger ind via SSH på den virtuelle EC2-maskine, hvor Kinesis Agent er installeret og kører scriptet api_caller.py
sudo ./api_caller.py TOKEN
Alt du skal gøre er at vente på en SMS til dit nummer:
SMS - beskeden kommer på telefonen om næsten 1 minut:
Det er tilbage at se, om posterne blev gemt i DynamoDB-databasen til efterfølgende, mere detaljeret analyse. Flybilletter-tabellen indeholder omtrent følgende data:
Konklusion
I løbet af det udførte arbejde blev der bygget et online databehandlingssystem baseret på Amazon Kinesis. Muligheder for at bruge Kinesis Agent i forbindelse med Kinesis Data Streams og realtidsanalyse Kinesis Analytics ved hjælp af SQL-kommandoer samt interaktionen af Amazon Kinesis med andre AWS-tjenester blev overvejet.
Vi implementerede ovenstående system på to måder: en ret lang manuel og en hurtig fra Terraform-koden.
Al projektkildekode er tilgængelig i mit GitHub-lager, jeg foreslår, at du gør dig bekendt med det.
Jeg er glad for at diskutere artiklen, jeg ser frem til dine kommentarer. Jeg håber på konstruktiv kritik.