ProHoster > Блог > Administracija > Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Aviasales API integracija sa Amazon Kinesisom i jednostavnost bez servera
Hej Habr!
Volite li letenje avionima? Volim to, ali sam se tokom samoizolacije zaljubio i u analizu podataka o avionskim kartama sa jednog poznatog resursa - Aviasales.
Danas ćemo analizirati rad Amazon Kinesis-a, izgraditi sistem za striming sa analitikom u realnom vremenu, instalirati Amazon DynamoDB NoSQL bazu podataka kao glavno skladište podataka i postaviti SMS obaveštenja za zanimljive karte.
Svi detalji su ispod reza! Idi!
Uvod
Za primjer, potreban nam je pristup Aviasales API. Pristup mu je besplatan i bez ograničenja; potrebno je samo da se registrujete u odeljku „Programeri“ da biste dobili svoj API token za pristup podacima.
Glavna svrha ovog članka je dati opće razumijevanje upotrebe strujanja informacija u AWS-u; uzimamo u obzir da podaci koje vraća korišteni API nisu striktno ažurirani i prenose se iz predmemorije, koja je formirana na osnovu pretraga korisnika sajtova Aviasales.ru i Jetradar.com u poslednjih 48 sati.
Kinesis-agent, instaliran na mašini za proizvodnju, primljen preko API-ja, automatski će analizirati i prenijeti podatke u željeni stream putem Kinesis Data Analytics. Sirova verzija ovog streama će biti napisana direktno u prodavnicu. Skladište sirovih podataka raspoređeno u DynamoDB omogućit će dublju analizu ulaznica putem BI alata, kao što je AWS Quick Sight.
Razmotrit ćemo dvije opcije za postavljanje cjelokupne infrastrukture:
Ručno - preko AWS upravljačke konzole;
Infrastruktura iz Terraform koda je za lijene automatore;
Arhitektura razvijenog sistema
Korištene komponente:
Aviasales API — podaci koje vrati ovaj API će se koristiti za sav naredni rad;
EC2 Instanca proizvođača — obična virtuelna mašina u oblaku na kojoj će se generisati ulazni tok podataka:
Kinesis Agent je Java aplikacija instalirana lokalno na mašini koja pruža jednostavan način za prikupljanje i slanje podataka u Kinesis (Kinesis Data Streams ili Kinesis Firehose). Agent stalno prati skup datoteka u navedenim direktorijima i šalje nove podatke u Kinesis;
API Caller Script — Python skripta koja postavlja zahtjeve API-ju i stavlja odgovor u mapu koju nadgleda Kinesis Agent;
Kinesis tokovi podataka — usluga strimovanja podataka u realnom vremenu sa širokim mogućnostima skaliranja;
Kinesis Analytics je usluga bez servera koja pojednostavljuje analizu striming podataka u realnom vremenu. Amazon Kinesis Data Analytics konfigurira resurse aplikacije i automatski skalira za rukovanje bilo kojom količinom dolaznih podataka;
AWS Lambda — usluga koja vam omogućava da pokrenete kod bez pravljenja rezervnih kopija ili podešavanja servera. Sva računarska snaga se automatski skalira za svaki poziv;
Amazon DynamoDB - Baza podataka parova ključ/vrijednost i dokumenata koja pruža kašnjenje manje od 10 milisekundi kada se radi na bilo kojoj skali. Kada koristite DynamoDB, ne morate obezbjeđivati, krpiti ili upravljati serverima. DynamoDB automatski skalira tabele kako bi prilagodio količinu dostupnih resursa i održao visoke performanse. Nije potrebna sistemska administracija;
Amazon SNS - potpuno upravljana usluga za slanje poruka po modelu izdavač-pretplatnik (Pub/Sub), sa kojom možete izolovati mikroservise, distribuirane sisteme i aplikacije bez servera. SNS se može koristiti za slanje informacija krajnjim korisnicima putem mobilnih push obavijesti, SMS poruka i e-pošte.
Inicijalna obuka
Da bih emulirao protok podataka, odlučio sam da koristim informacije o avionskim kartama koje je vratio Aviasales API. IN dokumentaciju prilično opsežna lista različitih metoda, uzmimo jednu od njih - "Mjesečni kalendar cijena", koji vraća cijene za svaki dan u mjesecu, grupirane po broju transfera. Ako u zahtjevu ne navedete mjesec traženja, podaci će biti vraćeni za mjesec koji slijedi nakon tekućeg.
Dakle, hajde da se registrujemo i dobijemo naš token.
Gornji način primanja podataka od API-ja navođenjem tokena u zahtjevu će funkcionirati, ali ja radije propuštam pristupni token kroz zaglavlje, pa ćemo ovaj metod koristiti u skripti api_caller.py.
Primjer API odgovora iznad pokazuje kartu od Sankt Peterburga do Phuk... Oh, kakav san...
Pošto sam ja iz Kazanja, a Phuket je sada „samo san“, hajde da potražimo karte od Sankt Peterburga do Kazanja.
Pretpostavlja se da već imate AWS nalog. Posebno bih odmah skrenuo pažnju na činjenicu da Kinesis i slanje obavještenja putem SMS-a nisu uključeni u godišnji Besplatan nivo (besplatno korištenje). Ali čak i uprkos tome, sa par dolara na umu, sasvim je moguće izgraditi predloženi sistem i igrati se s njim. I, naravno, ne zaboravite izbrisati sve resurse nakon što više nisu potrebni.
Na sreću, DynamoDb i lambda funkcije će biti besplatne za nas ako ispunimo naše mjesečne besplatne limite. Na primjer, za DynamoDB: 25 GB prostora za pohranu, 25 WCU/RCU i 100 miliona upita. I milion poziva lambda funkcija mjesečno.
Ručna implementacija sistema
Postavljanje Kinesis Data Streams
Idemo na uslugu Kinesis Data Streams i kreiramo dva nova toka, po jedan dio za svaki.
Šta je krhotina?
Shard je osnovna jedinica za prijenos podataka Amazon Kinesis toka. Jedan segment omogućava prijenos ulaznih podataka brzinom od 1 MB/s i prijenos izlaznih podataka brzinom od 2 MB/s. Jedan segment podržava do 1000 PUT unosa u sekundi. Prilikom kreiranja toka podataka potrebno je navesti potreban broj segmenata. Na primjer, možete kreirati tok podataka sa dva segmenta. Ovaj tok podataka će omogućiti prijenos ulaznih podataka brzinom od 2 MB/s i prijenos izlaznih podataka brzinom od 4 MB/s, podržavajući do 2000 PUT zapisa u sekundi.
Što je više fragmenata u vašem streamu, veća je njegova propusnost. U principu, ovako se skaliraju tokovi - dodavanjem krhotina. Ali što više komadića imate, to je veća cijena. Svaki dio košta 1,5 centi po satu i dodatnih 1.4 centa za svaki milion PUT jedinica korisnog tereta.
Kreirajmo novi stream sa imenom airline_tickets, 1 krhotina će mu biti dovoljna:
Sada napravimo drugu nit sa imenom special_stream:
Postavljanje proizvođača
Za analizu zadatka dovoljno je koristiti običnu EC2 instancu kao proizvođač podataka. To ne mora biti moćna, skupa virtuelna mašina; spot t2.micro će biti u redu.
Važna napomena: na primjer, trebali biste koristiti image - Amazon Linux AMI 2018.03.0, ima manje postavki za brzo pokretanje Kinesis Agenta.
Idite na uslugu EC2, kreirajte novu virtuelnu mašinu, izaberite željeni AMI sa tipom t2.micro, koji je uključen u besplatni nivo:
Da bi novostvorena virtuelna mašina mogla da komunicira sa uslugom Kinesis, mora da ima prava za to. Najbolji način da to učinite je da dodijelite IAM ulogu. Stoga, na ekranu Korak 3: Konfiguracija detalja instance, trebate odabrati Kreirajte novu IAM ulogu:
Kreiranje IAM uloge za EC2
U prozoru koji se otvori odaberite da kreiramo novu ulogu za EC2 i idite na odjeljak Dozvole:
Koristeći primjer obuke, ne moramo ulaziti u sve zamršenosti granularne konfiguracije prava na resurse, pa ćemo odabrati politike koje je Amazon unaprijed konfigurirao: AmazonKinesisFullAccess i CloudWatchFullAccess.
Dajmo neko smisleno ime za ovu ulogu, na primjer: EC2-KinesisStreams-FullAccess. Rezultat bi trebao biti isti kao što je prikazano na slici ispod:
Nakon kreiranja ove nove uloge, ne zaboravite je priložiti kreiranoj instanci virtuelne mašine:
Ne mijenjamo ništa drugo na ovom ekranu i prelazimo na sljedeće prozore.
Postavke tvrdog diska se mogu ostaviti kao zadane, kao i oznake (iako je dobra praksa koristiti oznake, barem dajte instanci ime i označite okruženje).
Sada smo na kartici Korak 6: Konfigurišite bezbednosnu grupu, gde morate da kreirate novu ili navedete postojeću bezbednosnu grupu, koja vam omogućava da se povežete preko ssh-a (port 22) na instancu. Odaberite Izvor -> Moj IP tamo i možete pokrenuti instancu.
Čim se prebaci u status pokretanja, možete pokušati da se povežete na njega putem ssh-a.
Da biste mogli raditi sa Kinesis Agentom, nakon uspješnog povezivanja na mašinu, morate unijeti sljedeće naredbe u terminal:
Kao što se može vidjeti iz konfiguracijske datoteke, agent će pratiti datoteke sa ekstenzijom .log u direktoriju /var/log/airline_tickets/, analizirati ih i prenijeti u tok airline_tickets.
Ponovo pokrećemo servis i uvjeravamo se da je pokrenut i radi:
sudo service aws-kinesis-agent restart
Sada preuzmimo Python skriptu koja će tražiti podatke od API-ja:
Skripta api_caller.py traži podatke od Aviasales-a i sprema primljeni odgovor u direktorij koji Kinesis agent skenira. Implementacija ove skripte je sasvim standardna, postoji klasa TicketsApi, koja vam omogućava da asinhrono povučete API. Ovoj klasi prosljeđujemo zaglavlje sa tokenom i tražimo parametre:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Da testiramo ispravne postavke i funkcionalnost agenta, testirajmo skriptu api_caller.py:
sudo ./api_caller.py TOKEN
I gledamo rezultat rada u zapisnicima agenta i na kartici Monitoring u toku podataka airline_tickets:
Kao što vidite, sve radi i Kinesis Agent uspješno šalje podatke u stream. Sada konfigurirajmo potrošača.
Postavljanje Kinesis Data Analytics
Prijeđimo na središnju komponentu cijelog sistema - kreirajte novu aplikaciju u Kinesis Data Analytics pod nazivom kinesis_analytics_airlines_app:
Kinesis Data Analytics omogućava vam da izvršite analizu podataka u realnom vremenu iz Kinesis Streams koristeći SQL jezik. To je usluga potpuno automatskog skaliranja (za razliku od Kinesis Streams) koja:
omogućava vam da kreirate nove tokove (Output Stream) na osnovu zahteva za izvornim podacima;
pruža stream s greškama koje su se dogodile dok su aplikacije bile pokrenute (Error Stream);
može automatski odrediti šemu ulaznih podataka (može se ručno redefinirati ako je potrebno).
Ovo nije jeftina usluga - 0.11 USD po satu rada, pa je pažljivo koristite i obrišite kada završite.
Povežimo aplikaciju sa izvorom podataka:
Odaberite stream na koji ćemo se povezati (airline_tickets):
Zatim morate priložiti novu IAM ulogu tako da aplikacija može čitati iz streama i pisati u stream. Da biste to učinili, dovoljno je ne mijenjati ništa u bloku Dozvole za pristup:
Sada zatražimo otkrivanje šeme podataka u toku; da biste to učinili, kliknite na dugme „Otkrij šemu“. Kao rezultat toga, uloga IAM-a će biti ažurirana (kreirana) i otkrivanje sheme će se pokrenuti iz podataka koji su već stigli u stream:
Sada morate otići u SQL editor. Kada kliknete na ovo dugme, pojaviće se prozor u kojem se traži da pokrenete aplikaciju - izaberite šta želite da pokrenete:
Umetnite sljedeći jednostavan upit u prozor SQL editora i kliknite na Spremi i pokreni SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
U relacionim bazama podataka radite sa tabelama koristeći INSERT izraze za dodavanje zapisa i SELECT naredbu za upit podataka. U Amazon Kinesis Data Analytics radite sa tokovima (STREAM) i pumpama (PUMP)—zahtjevi za kontinuirano umetanje koji ubacuju podatke iz jednog toka u aplikaciji u drugi tok.
Gore prikazan SQL upit traži karte Aeroflota po cijeni ispod pet hiljada rubalja. Svi zapisi koji ispunjavaju ove uslove bit će smješteni u DESTINATION_SQL_STREAM tok.
U bloku Destination izaberite stream special_stream, a na padajućoj listi naziv toka u aplikaciji DESTINATION_SQL_STREAM:
Rezultat svih manipulacija trebao bi biti nešto slično slici ispod:
Kreiranje i pretplata na SNS temu
Idite na Simple Notification Service i tamo kreirajte novu temu pod nazivom Airlines:
Pretplatite se na ovu temu i naznačite broj mobilnog telefona na koji će se slati SMS obavještenja:
Kreirajte tabelu u DynamoDB
Da pohranimo neobrađene podatke iz njihovog toka airline_tickets, napravimo tabelu u DynamoDB sa istim imenom. Koristit ćemo record_id kao primarni ključ:
Kreiranje kolektora lambda funkcija
Kreirajmo lambda funkciju nazvanu Collector, čiji će zadatak biti da ispita tok airline_tickets i, ako se tamo pronađu novi zapisi, ubaci ove zapise u DynamoDB tabelu. Očigledno, pored zadanih prava, ova lambda mora imati pristup za čitanje Kinesis toka podataka i pristup za pisanje u DynamoDB.
Kreiranje IAM uloge za kolektorsku lambda funkciju
Prvo, kreirajmo novu IAM ulogu za lambda pod nazivom Lambda-TicketsProcessingRole:
Za primjer testa, unaprijed konfigurirane politike AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess su sasvim prikladne, kao što je prikazano na slici ispod:
Ova lambda bi trebala biti pokrenuta okidačem iz Kinesisa kada novi unosi uđu u airline_stream, tako da moramo dodati novi okidač:
Ostaje samo da ubacite kod i sačuvate lambda.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Kreiranje notifiera lambda funkcije
Druga lambda funkcija, koja će pratiti drugi stream (special_stream) i slati obavještenje SNS-u, kreirana je na sličan način. Dakle, ova lambda mora imati pristup za čitanje iz Kinesisa i slanje poruka na datu SNS temu, koje će zatim SNS servis poslati svim pretplatnicima ove teme (e-mail, SMS, itd.).
Kreiranje IAM uloge
Prvo kreiramo IAM ulogu Lambda-KinesisAlarm za ovu lambdu, a zatim dodjeljujemo ovu ulogu lambdi alarm_notifier koja se kreira:
Ova lambda bi trebala raditi na okidaču za nove zapise za ulazak u special_stream, tako da trebate konfigurirati okidač na isti način kao što smo radili za Collector lambda.
Da bismo olakšali konfiguraciju ove lambda, uvedimo novu varijablu okruženja - TOPIC_ARN, gdje postavljamo ANR (Amazon Recourse Names) teme Airlines:
I ubacite lambda kod, nije nimalo komplikovano:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Čini se da je tu završena ručna konfiguracija sistema. Ostaje samo da testiramo i uvjerimo se da smo sve ispravno konfigurirali.
Postavite iz Terraform koda
Potrebna priprema
Terraform je vrlo zgodan alat otvorenog koda za postavljanje infrastrukture iz koda. Ima vlastitu sintaksu koju je lako naučiti i ima mnogo primjera kako i šta implementirati. Atom editor ili Visual Studio Code ima mnogo praktičnih dodataka koji olakšavaju rad sa Terraformom.
Možete preuzeti distribuciju odavde. Detaljna analiza svih mogućnosti Terraforma je izvan okvira ovog članka, pa ćemo se ograničiti na glavne tačke.
Kako započeti
Potpuni kod projekta je u mom spremištu. Mi kloniramo spremište za sebe. Prije nego što počnete, morate biti sigurni da imate instaliran i konfiguriran AWS CLI, jer... Terraform će tražiti vjerodajnice u ~/.aws/credentials datoteci.
Dobra praksa je da pokrenete naredbu plana prije postavljanja cijele infrastrukture kako biste vidjeli šta Terraform trenutno stvara za nas u oblaku:
terraform.exe plan
Od vas će se tražiti da unesete broj telefona na koji ćete slati obavještenja. U ovoj fazi nije potrebno unositi ga.
Nakon analize plana rada programa, možemo započeti kreiranje resursa:
terraform.exe apply
Nakon slanja ove komande, od vas će se ponovo tražiti da unesete broj telefona; birajte „da“ kada se prikaže pitanje o stvarnom izvođenju radnji. To će vam omogućiti da postavite cjelokupnu infrastrukturu, izvršite svu potrebnu konfiguraciju EC2, implementirate lambda funkcije, itd.
Nakon što su svi resursi uspješno kreirani putem Terraform koda, morate ući u detalje aplikacije Kinesis Analytics (nažalost, nisam pronašao kako to učiniti direktno iz koda).
Pokrenite aplikaciju:
Nakon ovoga, morate eksplicitno postaviti naziv toka u aplikaciji odabirom sa padajuće liste:
Sada je sve spremno za rad.
Testiranje aplikacije
Bez obzira na to kako ste implementirali sistem, ručno ili putem Terraform koda, on će raditi isto.
Prijavljujemo se preko SSH-a na EC2 virtuelnu mašinu na kojoj je instaliran Kinesis Agent i pokrećemo skriptu api_caller.py
sudo ./api_caller.py TOKEN
Sve što treba da uradite je da sačekate SMS na vaš broj:
SMS - poruka stiže na telefon za skoro 1 minut:
Ostaje da se vidi da li su zapisi sačuvani u DynamoDB bazi podataka radi naknadne, detaljnije analize. Tabela airline_tickets sadrži otprilike sljedeće podatke:
zaključak
U toku obavljenog posla izgrađen je onlajn sistem za obradu podataka baziran na Amazon Kinesis. Razmotrene su opcije za korištenje Kinesis Agenta u kombinaciji s Kinesis Data Streams i analitikom u realnom vremenu Kinesis Analytics korištenjem SQL komandi, kao i interakcija Amazon Kinesisa s drugim AWS servisima.
Gornji sistem smo implementirali na dva načina: prilično dugačak ručni i brzi iz Terraform koda.
Svi izvorni kodovi projekta su dostupni u mom GitHub spremištu, predlažem da se upoznate s tim.
Drago mi je da raspravljam o članku, radujem se vašim komentarima. Nadam se konstruktivnoj kritici.