Aviasales API-integrasie met Amazon Kinesis en bedienerlose eenvoud
Haai Habr!
Hou jy daarvan om op vliegtuie te vlieg? Ek is mal daaroor, maar tydens selfisolasie het ek ook verlief geraak op die ontleding van data oor vliegkaartjies van een bekende hulpbron – Aviasales.
Vandag sal ons die werk van Amazon Kinesis ontleed, 'n stroomstelsel bou met intydse analise, die Amazon DynamoDB NoSQL-databasis installeer as die hoofdatawinkel, en kennisgewing per SMS opstel vir interessante kaartjies.
Al die besonderhede onder die snit! Gaan!
Inleiding
Ons het byvoorbeeld toegang nodig tot Aviasales API. Toegang tot dit word gratis verskaf en sonder beperkings, jy hoef net in die "Ontwikkelaars"-afdeling te registreer om jou API-token te kry om toegang tot data te verkry.
Die hoofdoel van hierdie artikel is om 'n algemene begrip te gee van die gebruik van stroominligting in AWS, ons haal dit uit die hakies dat die data wat deur die gebruikte API teruggestuur word nie streng op datum is nie en vanaf die kas versend word , wat gevorm word op grond van soektogte deur gebruikers van die Aviasales.ru en Jetradar.com webwerwe vir die afgelope 48 uur.
Die Kinesis-agent wat op die vervaardigermasjien geïnstalleer is, ontvang via die API, sal outomaties die data op die kaartjies ontleed en oordra na die vereiste stroom via Kinesis Data Analytics. Die rou weergawe van hierdie stroom sal direk na die bewaarplek geskryf word. Die stoor van rou data wat in DynamoDB ontplooi word, sal dieper ontleding van kaartjies moontlik maak deur BI-nutsgoed, soos AWS Quick Sight.
Ons sal twee opsies oorweeg om die hele infrastruktuur te ontplooi:
Handleiding - via AWS Management Console;
Infrastruktuur van die Terraform-kode - vir lui outomateerders;
Die argitektuur van die ontwikkelde stelsel
Gebruikte komponente:
Aviasales API - die data wat deur hierdie API teruggestuur word, sal vir alle daaropvolgende werk gebruik word;
EC2 Producer Instance - 'n gewone virtuele masjien in die wolk, waarop die insetdatastroom gegenereer sal word:
Kinesis Agent is 'n Java-toepassing wat plaaslik op 'n masjien geïnstalleer is wat 'n maklike manier bied om data te versamel en na Kinesis (Kinesis Data Streams of Kinesis Firehose) te stuur. Die agent monitor voortdurend 'n stel lêers in die gespesifiseerde dopgehou en stuur nuwe data na Kinesis;
API-oproeperskrif - 'n Python-skrip wat versoeke aan die API rig en die reaksie by 'n gids voeg wat deur die Kinesis Agent gemonitor word;
Kinesis Analytics is 'n bedienerlose diens wat die ontleding van stroomdata in reële tyd vergemaklik. Amazon Kinesis Data Analytics konfigureer toepassingshulpbronne en skaal outomaties om enige volume inkomende data te hanteer;
AWS Lambda - 'n diens waarmee u kode kan laat loop sonder oortolligheid en bedienerkonfigurasie. Alle rekenaarkrag word outomaties vir elke oproep afgeskaal;
Amazon DynamoDB - 'n databasis van sleutel-waarde-pare en dokumente wat 'n vertraging van minder as 10 millisekondes bied wanneer daar op enige skaal gewerk word. Met DynamoDB hoef jy nie enige bedieners te voorsien, reg te maak of te bestuur nie. DynamoDB skaal outomaties tabelle om aan te pas vir beskikbare hulpbronne terwyl hoë werkverrigting gehandhaaf word. Geen stelseladministrasie word vereis nie;
Amazon SNS is 'n volledig bestuurde publiseer-inteken (Pub/Sub) boodskapdiens wat mikrodienste, verspreide stelsels en bedienerlose toepassings kan isoleer. SNS kan gebruik word om inligting aan eindgebruikers te versprei deur mobiele stootkennisgewings, SMS-boodskappe en e-posse.
Aanvanklike opleiding
Om die datavloei na te boots, het ek besluit om die vliegkaartjieinligting te gebruik wat deur die Aviasales API teruggestuur word. IN dokumentasie nogal 'n uitgebreide lys van verskillende metodes, kom ons neem een van hulle - "Maandelikse Pryskalender", wat pryse vir elke dag van die maand gee, gegroepeer volgens die aantal oordragte. As jy nie die maand van die soektog in die navraag stuur nie, sal die inligting vir die maand wat volg op die huidige een teruggestuur word.
Die bogenoemde metode om data van die API af te kry met die teken in die versoek sal werk, maar ek verkies om die toegangstoken deur die kop te stuur, so ons sal hierdie metode in die api_caller.py-skrip gebruik.
Die voorbeeld API-reaksie hierbo wys 'n kaartjie van St. Petersburg na Phuk ... O, wat 'n droom ...
Aangesien ek van Kazan is, en Phuket nou "ons droom net" is, sal ons kaartjies van St. Petersburg na Kazan soek.
Dit neem aan dat u reeds 'n AWS-rekening het. Ek wil dadelik spesiale aandag gee aan die feit dat Kinesis en die stuur van kennisgewings per SMS nie by die jaarlikse ingesluit is nie Gratis vlak (gratis om te gebruik). Maar tog, met 'n paar dollar in gedagte, is dit heel moontlik om die voorgestelde stelsel te bou en daarmee te speel. En, natuurlik, moenie vergeet om alle hulpbronne uit te vee nadat dit nie meer nodig is nie.
Gelukkig sal DynamoDb- en lambda-funksies vir ons deelware wees, solank ons aan die maandelikse gratis limiete voldoen. Byvoorbeeld, vir DynamoDB: 25 GB berging, 25 WCU/RCU en 100 miljoen navrae. En 'n miljoen lambda-funksie-oproepe per maand.
Handmatige stelselontplooiing
Die opstel van Kinesis-datastrome
Kom ons gaan na die Kinesis Data Streams-diens en skep twee nuwe strome, een skerf elk.
Wat is 'n skerf?
'n Skerf is die basiese data-oordrageenheid van 'n Amazon Kinesis-stroom. Een segment verskaf 1 MB/s invoer en 2 MB/s uitvoer. Een segment ondersteun tot 1000 PUT-rekords per sekonde. Wanneer u 'n datavloei skep, moet u die verlangde aantal segmente spesifiseer. Byvoorbeeld, jy kan 'n datastroom met twee segmente skep. Hierdie datastroom sal 2 MB/s invoer en 4 MB/s uitvoer verskaf, wat tot 2000 PUTs per sekonde ondersteun.
Hoe meer skerwe in jou stroom, hoe groter is die deurset daarvan. In beginsel is dit hoe vloeie afgeskaal word – deur skerwe by te voeg. Maar hoe meer skerwe jy het, hoe hoër is die prys. Elke skerf kos 1,5 sent per uur en 'n bykomende 1.4 sent vir elke miljoen PUT-loonvrag-eenhede.
Kom ons skep 'n nuwe draad met die naam vliegtuigkaartjies, 1 skerf sal vir hom genoeg wees:
Kom ons skep nou 'n ander draad met die naam spesiale_stroom:
Vervaardiger opstelling
Dit is genoeg om 'n gewone EC2-instansie as 'n datavervaardiger te gebruik om die taak te ontleed. Dit hoef nie 'n kragtige, duur VM te wees nie, 'n spot t2.micro is goed.
Belangrike nota: jy moet byvoorbeeld beeld gebruik - Amazon Linux AMI 2018.03.0, met minder instellings om Kinesis Agent vinnig te begin.
Ons gaan na die EC2-diens, skep 'n nuwe virtuele masjien, kies die gewenste AMI met die t2.micro-tipe, wat ingesluit is in die Free Tier:
Om die nuutgeskepte virtuele masjien met die Kinesis-diens te kan kommunikeer, moet dit toestemming gegee word om dit te doen. Die beste manier om dit te doen is om 'n IAM-rol toe te wys. Daarom, op die Stap 3: Stel instansiebesonderhede skerm, kies Skep nuwe IAM-rol:
Skep 'n IAM-rol vir EC2
Kies in die venster wat oopmaak dat ons 'n nuwe rol vir EC2 skep en gaan na die Toestemmings-afdeling:
Deur die opleidingsvoorbeeld te gebruik, kan u nie ingaan op al die ingewikkeldhede van korrelkonfigurasie van hulpbronregte nie, daarom sal ons die beleide kies wat vooraf deur Amazon gekonfigureer is: AmazonKinesisFullAccess en CloudWatchFullAccess.
Kom ons gee 'n betekenisvolle naam vir hierdie rol, byvoorbeeld: EC2-KinesisStreams-FullAccess. As gevolg hiervan, moet jy dieselfde kry as in die prentjie hieronder:
Nadat u hierdie nuwe rol geskep het, moenie vergeet om dit aan die geskepde virtuele masjien-instansie te heg nie:
Ons verander niks anders op hierdie skerm nie en gaan aan na die volgende vensters.
Jy kan die hardeskyfparameters as verstek laat, etikette ook (alhoewel dit goeie praktyk is om etikette te gebruik, noem ten minste die instansie en spesifiseer die omgewing).
Nou is ons op die Stap 6: Stel Sekuriteitsgroep-oortjie in, waar jy 'n nuwe een moet skep of die Sekuriteitsgroep moet spesifiseer wat jy het, wat jou toelaat om via ssh (poort 22) aan die instansie te koppel. Kies Bron -> My IP daar en jy kan die instansie begin.
Sodra dit verander na die lopende status, kan jy probeer om dit via ssh te koppel.
Om met Kinesis Agent te kan werk, nadat u suksesvol aan die masjien gekoppel het, moet u die volgende opdragte in die terminaal invoer:
Soos gesien kan word uit die konfigurasielêer, sal die agent lêers met die .log-uitbreiding in die /var/log/airline_tickets/-gids monitor, dit ontleed en na die airline_tickets-stroom deurgee.
Herbegin die diens en maak seker dat dit aan die gang is:
sudo service aws-kinesis-agent restart
Kom ons laai nou 'n Python-skrip af wat data van die API sal aanvra:
Die api_caller.py-skrip versoek data van Aviasales en stoor die ontvangde antwoord in die gids wat die Kinesis-agent skandeer. Die implementering van hierdie skrif is redelik standaard, daar is 'n TicketsApi-klas, dit laat jou toe om die API asynchronies te trek. Ons gee die kopskrif met die teken en versoek parameters na hierdie klas:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Om die korrektheid van die instellings en die werkbaarheid van die agent te toets, sal ons die api_caller.py-skrip toets:
sudo ./api_caller.py TOKEN
En ons kyk na die resultaat van werk in die Agent-logboeke en op die Monitering-oortjie in die airline_tickets-datastroom:
Soos u kan sien, werk alles en Kinesis Agent stuur data suksesvol na die stroom. Kom ons stel nou die verbruiker op.
Die opstel van Kinesis Data Analytics
Kom ons gaan aan na die sentrale komponent van die hele stelsel - kom ons skep 'n nuwe toepassing in Kinesis Data Analytics genaamd kinesis_analytics_airlines_app:
Kinesis Data Analytics laat jou toe om intydse data-analise uit Kinesis Streams uit te voer met behulp van die SQL-taal. Dit is 'n volledig outoskaaldiens (anders as Kinesis Streams) wat:
laat jou toe om nuwe strome (Uitvoerstroom) te skep gebaseer op navrae na die brondata;
verskaf 'n stroom met foute wat tydens die werking van toepassings plaasgevind het (Foutstroom);
kan outomaties die skema van die invoerdata bepaal (dit kan met die hand oorskryf word indien nodig).
Dit is 'n duur diens - 0.11 USD per uur se werk, so jy moet dit versigtig gebruik en dit uitvee wanneer jy klaar is met werk.
Kom ons koppel die toepassing aan die databron:
Kies die stroom waaraan ons gaan koppel (airline_tickets):
Vervolgens moet jy 'n nuwe IAM-rol aanheg sodat die toepassing uit die stroom kan lees en na die stroom kan skryf. Om dit te doen, is dit genoeg om niks in die Toegangstoestemmingsblok te verander nie:
Nou sal ons die ontdekking van die dataskema in die stroom versoek, hiervoor klik ons op die "Ontdek skema" -knoppie. As gevolg hiervan sal die IAM-rol opgedateer (geskep) word en skema-ontdekking sal begin word vanaf die data wat reeds in die stroom aangekom het:
Nou moet jy na die SQL-redigeerder gaan. Wanneer jy op hierdie knoppie klik, sal 'n venster verskyn met 'n vraag oor die bekendstelling van die toepassing - kies wat ons wil begin:
Voeg die volgende eenvoudige navraag in die SQL-redigeerdervenster in en klik Save and Run SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
In relasionele databasisse werk jy met tabelle deur INSERT-stellings te gebruik om rekords by te voeg en SELECT-stellings om data te bevraagteken. In Amazon Kinesis Data Analytics werk jy met strome (STREAM) en "pompe" (PUMP's), deurlopende invoegnavrae wat data van een stroom in jou toepassing in 'n ander stroom invoeg.
Die bogenoemde SQL-navraag soek na Aeroflot-kaartjies teen 'n koste van minder as vyfduisend roebels. Alle rekords wat by hierdie voorwaardes pas, sal op die DESTINATION_SQL_STREAM-stroom geplaas word.
In die Bestemming-blok, kies die spesiale_stroomstroom, en in die In-toepassing stroomnaam aftreklys DESTINATION_SQL_STREAM:
As gevolg van al die manipulasies, moet jy iets kry soortgelyk aan die prentjie hieronder:
Skep en teken in op 'n SNS-onderwerp
Gaan na die Simple Notification Service en skep 'n nuwe onderwerp daar met die naam Airlines:
Ons teken in op hierdie onderwerp, daarin dui ons die selfoonnommer aan waarna SMS-kennisgewings gestuur sal word:
Skep 'n tabel in DynamoDB
Om die rou data van die airline_tickets-stroom te stoor, kom ons skep 'n tabel in DynamoDB met dieselfde naam. Ons sal record_id as die primêre sleutel gebruik:
Die skep van 'n lambda funksie versamelaar
Kom ons skep 'n lambda-funksie genaamd Collector, wie se taak is om die airline_tickets-stroom te poll en, indien nuwe rekords daar gevind word, hierdie rekords in die DynamoDB-tabel in te voeg. Uiteraard moet hierdie lambda, benewens die verstektoestemmings, toegang hê om die Kinesis-datastroom te lees en na DynamoDB te skryf.
Skep 'n IAM-rol vir die versamelaar lambda-funksie
Kom ons skep eers 'n nuwe IAM lambda-rol genaamd Lambda-TicketsProcessingRole:
Vir 'n toetsvoorbeeld is die vooraf gekonfigureerde AmazonKinesisReadOnlyAccess- en AmazonDynamoDBFullAccess-beleide redelik geskik, soos in die prentjie hieronder getoon:
Hierdie lambda behoort deur Kinesis geaktiveer te word wanneer nuwe inskrywings die airline_stream-stroom binnegaan, so ons moet 'n nuwe sneller byvoeg:
Dit bly om die kode te plak en die lambda te stoor.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Skep 'n lambda-funksie-kennisgewer
Die tweede lambda-funksie, wat die tweede stroom (spesiale_stroom) sal monitor en 'n kennisgewing aan die SNS sal stuur, word op dieselfde manier geskep. Daarom moet hierdie lambda toegang hê om vanaf Kinesis te lees en boodskappe na 'n gegewe SNS-onderwerp te stuur, wat dan deur die SNS-diens aan alle intekenare van hierdie onderwerp (e-pos, SMS, ens.) gestuur sal word.
Skep 'n IAM-rol
Eerstens skep ons die Lambda-KinesisAlarm IAM-rol vir hierdie lambda, en dan ken ons hierdie rol toe aan die geskepte alarm_notifier lambda:
Hierdie lambda moet op 'n sneller werk vir nuwe rekords om die special_stream stroom te betree, so jy moet die sneller op dieselfde manier konfigureer as wat ons vir die Collector lambda gedoen het.
Vir die gerief om hierdie lambda op te stel, kom ons stel 'n nuwe omgewingsveranderlike bekend - TOPIC_ARN, waar ons die ANR (Amazon Recourse Names) van die Lugdiens-onderwerp plaas:
En ons voeg die lambda-kode in, dit is redelik eenvoudig:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Dit blyk dat hierdie handmatige konfigurasie van die stelsel voltooi is. Dit bly net om te toets en seker te maak dat ons alles korrek opstel.
Ontplooi vanaf Terraform-kode
Vereiste voorbereiding
terraform - 'n baie gerieflike oopbron-instrument vir die implementering van infrastruktuur vanaf kode. Dit het sy eie sintaksis, wat maklik is om te leer en baie voorbeelde van hoe en wat om te ontplooi. Daar is baie handige inproppe in die Atom-redigeerder of Visual Studio Code wat dit makliker maak om met Terraform te werk.
Verspreider kan afgelaai word vandaar. 'N Gedetailleerde ontleding van al die kenmerke van Terraform is buite die bestek van hierdie artikel, so ons sal ons beperk tot die hoofpunte.
Hoe om te begin
Die volledige projekkode is in my bewaarplek. Ons kloon ons bewaarplek. Voordat jy begin, moet jy seker maak dat jy AWS CLI geïnstalleer en gekonfigureer het, want. Terraform sal na geloofsbriewe in die ~/.aws/credentials-lêer soek.
Dit is goeie praktyk om die planopdrag uit te voer voordat die hele infrastruktuur ontplooi word om te sien wat Terraform tans vir ons in die wolk skep:
terraform.exe plan
Jy sal gevra word om 'n telefoonnommer in te voer om kennisgewings na te stuur. Op hierdie stadium is dit nie nodig om dit in te voer nie.
Nadat ons die programwerkplan ontleed het, kan ons begin om hulpbronne te skep:
terraform.exe apply
Nadat u hierdie opdrag gestuur het, sal u weer gevra word om 'n telefoonnommer in te voer, tik "ja" wanneer die vraag oor die werklike uitvoering van aksies vertoon word. Dit sal jou toelaat om die hele infrastruktuur te verhoog, al die nodige konfigurasie van EC2 uit te voer, lambda-funksies te ontplooi, ens.
Nadat alle hulpbronne suksesvol deur die Terraform-kode geskep is, moet u die besonderhede van die Kinesis Analytics-toepassing ingaan (ongelukkig het ek nie gevind hoe om dit direk vanaf die kode te doen nie).
Ons begin die toepassing:
Daarna moet u die naam in die toepassingstroom uitdruklik stel deur uit die aftreklys te kies:
Nou is alles gereed om te gaan.
Toepassingstoetsing
Ongeag hoe jy die stelsel ontplooi het, met die hand of deur die Terraform-kode, sal dit op dieselfde manier werk.
Ons gaan via SSH na die EC2 virtuele masjien waar Kinesis Agent geïnstalleer is en hardloop die script api_caller.py
sudo ./api_caller.py TOKEN
Dit bly om te wag vir 'n SMS na jou nommer:
SMS - 'n boodskap kom binne byna 1 minuut op die telefoon:
Dit moet nog gesien word of die rekords in die DynamoDB-databasis bewaar word vir daaropvolgende, meer gedetailleerde ontleding. Die vliegtuigkaartjies-tabel bevat iets soos hierdie:
Gevolgtrekking
In die loop van die werk wat gedoen is, is 'n aanlyn dataverwerkingstelsel gebaseer op Amazon Kinesis gebou. Die opsies vir die gebruik van Kinesis Agent in samewerking met Kinesis Datastrome en intydse analise van Kinesis Analytics deur gebruik te maak van SQL-opdragte, sowel as die interaksie van Amazon Kinesis met ander AWS-dienste, is oorweeg.
Ons het die bogenoemde stelsel op twee maniere ontplooi: 'n taamlik lang handleiding en 'n vinnige een vanaf die Terraform-kode.