ProHoster > Blog > uprava > Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Integracija Aviasales API-ja s Amazon Kinesisom i jednostavnost bez poslužitelja
Hej Habr!
Volite li letjeti avionima? Volim to, ali tijekom samoizolacije sam se zaljubio i u analiziranje podataka o avio kartama jednog poznatog izvora - Aviasales.
Danas ćemo analizirati rad Amazon Kinesisa, izgraditi streaming sustav s analitikom u stvarnom vremenu, instalirati Amazon DynamoDB NoSQL bazu podataka kao glavnu pohranu podataka te postaviti SMS obavijesti za zanimljive tikete.
Svi detalji su ispod kroja! Ići!
Uvod
Za primjer, trebamo pristup Aviasales API. Pristup mu je besplatan i bez ograničenja; samo se trebate registrirati u odjeljku "Programeri" da biste dobili svoj API token za pristup podacima.
Glavna svrha ovog članka je pružiti opće razumijevanje upotrebe strujanja informacija u AWS-u; uzimamo u obzir da podaci koje vraća korišteni API nisu strogo ažurni i prenose se iz predmemorije, što je formirana na temelju pretraživanja korisnika stranica Aviasales.ru i Jetradar.com u zadnjih 48 sati.
Kinesis-agent, instaliran na stroju za proizvodnju, primljen putem API-ja automatski će analizirati i prenijeti podatke u željeni tok putem Kinesis Data Analytics. Neobrađena verzija ovog streama bit će zapisana izravno u trgovinu. Pohrana neobrađenih podataka postavljena u DynamoDB omogućit će dublju analizu ulaznica putem BI alata, kao što je AWS Quick Sight.
Razmotrit ćemo dvije mogućnosti za implementaciju cijele infrastrukture:
Ručno - preko AWS Management Console;
Infrastruktura iz Terraform koda je za lijene automatiste;
Arhitektura razvijenog sustava
Korištene komponente:
Aviasales API — podaci koje vraća ovaj API koristit će se za sav daljnji rad;
Instanca proizvođača EC2 — obični virtualni stroj u oblaku na kojem će se generirati ulazni tok podataka:
Kinesis agent je Java aplikacija instalirana lokalno na stroju koja omogućuje jednostavan način prikupljanja i slanja podataka u Kinesis (Kinesis Data Streams ili Kinesis Firehose). Agent stalno prati skup datoteka u navedenim direktorijima i šalje nove podatke Kinesisu;
API pozivatelj skripte — Python skripta koja šalje zahtjeve API-ju i stavlja odgovor u mapu koju nadzire Kinesis Agent;
Kinesis tokovi podataka — usluga strujanja podataka u stvarnom vremenu sa širokim mogućnostima skaliranja;
Analitika kineze je usluga bez poslužitelja koja pojednostavljuje analizu strujanja podataka u stvarnom vremenu. Amazon Kinesis Data Analytics konfigurira resurse aplikacije i automatski se skalira za obradu bilo koje količine dolaznih podataka;
AWS Lambda — usluga koja vam omogućuje pokretanje koda bez sigurnosne kopije ili postavljanja poslužitelja. Sva računalna snaga automatski se skalira za svaki poziv;
Amazon DynamoDB - Baza podataka parova ključ-vrijednost i dokumenata koja omogućuje kašnjenje manje od 10 milisekundi pri izvođenju na bilo kojoj razini. Kada koristite DynamoDB, ne trebate osiguravati, krpati niti upravljati bilo kojim poslužiteljem. DynamoDB automatski skalira tablice kako bi prilagodio količinu dostupnih resursa i održao visoku izvedbu. Nije potrebna administracija sustava;
Amazon SNS - potpuno upravljana usluga za slanje poruka po modelu izdavač-pretplatnik (Pub/Sub), s kojom možete izolirati mikroservise, distribuirane sustave i aplikacije bez poslužitelja. SNS se može koristiti za slanje informacija krajnjim korisnicima putem mobilnih push obavijesti, SMS poruka i e-pošte.
Početna obuka
Kako bih emulirao tijek podataka, odlučio sam upotrijebiti informacije o zrakoplovnim kartama koje je vratio Aviasales API. U dokumentacija prilično opsežan popis različitih metoda, uzmimo jednu od njih - "Mjesečni kalendar cijena", koji vraća cijene za svaki dan u mjesecu, grupirane prema broju prijenosa. Ako u zahtjevu ne navedete mjesec pretraživanja, podaci će biti vraćeni za mjesec koji slijedi nakon tekućeg.
Gornja metoda primanja podataka iz API-ja navođenjem tokena u zahtjevu će funkcionirati, ali ja radije prosljeđujem pristupni token kroz zaglavlje, pa ćemo ovu metodu koristiti u skripti api_caller.py.
Primjer gore navedenog API odgovora prikazuje kartu od St. Petersburga do Phuka... Oh, kakav san...
Budući da sam iz Kazana, a Phuket je sada "samo san", potražimo karte od St. Petersburga do Kazana.
Pretpostavlja se da već imate AWS račun. Posebno bih odmah skrenuo pozornost da Kinesis i slanje obavijesti putem SMS-a nisu uključeni u godišnji Besplatna razina (besplatna upotreba). Ali čak i unatoč tome, s nekoliko dolara na umu, sasvim je moguće izgraditi predloženi sustav i igrati se s njim. I, naravno, ne zaboravite izbrisati sve resurse nakon što više nisu potrebni.
Srećom, funkcije DynamoDb i lambda bit će besplatne za nas ako ispunimo svoja mjesečna besplatna ograničenja. Na primjer, za DynamoDB: 25 GB prostora za pohranu, 25 WCU/RCU i 100 milijuna upita. I milijun poziva lambda funkcija mjesečno.
Ručno postavljanje sustava
Postavljanje protoka Kinesis podataka
Idemo na uslugu Kinesis Data Streams i stvorimo dva nova toka, po jedan shard za svaki.
Što je shard?
Shard je osnovna jedinica prijenosa podataka Amazon Kinesis streama. Jedan segment osigurava ulazni prijenos podataka brzinom od 1 MB/s i izlazni prijenos podataka brzinom od 2 MB/s. Jedan segment podržava do 1000 PUT unosa u sekundi. Prilikom izrade podatkovnog toka potrebno je navesti potreban broj segmenata. Na primjer, možete stvoriti tok podataka s dva segmenta. Ovaj tok podataka omogućit će ulazni prijenos podataka pri 2 MB/s i izlazni prijenos podataka pri 4 MB/s, podržavajući do 2000 PUT zapisa u sekundi.
Što je više fragmenata u vašem streamu, veća je njegova propusnost. U principu se tokovi skaliraju - dodavanjem fragmenata. Ali što više fragmenata imate, to je viša cijena. Svaki shard košta 1,5 centi po satu i dodatnih 1.4 centa za svakih milijun PUT jedinica korisnog opterećenja.
Kreirajmo novi stream s imenom avio karte, 1 shard će mu biti dovoljan:
Kreirajmo sada drugu nit s tim imenom posebni_tok:
Postavljanje proizvođača
Za analizu zadatka dovoljno je koristiti običnu EC2 instancu kao proizvođača podataka. To ne mora biti snažan, skup virtualni stroj; točkasti t2.micro će biti sasvim u redu.
Važna napomena: na primjer, trebali biste koristiti sliku - Amazon Linux AMI 2018.03.0, ima manje postavki za brzo pokretanje Kinesis Agenta.
Idite na EC2 servis, kreirajte novi virtualni stroj, odaberite željeni AMI s tipom t2.micro koji je uključen u Free Tier:
Kako bi novostvoreni virtualni stroj mogao komunicirati s uslugom Kinesis, potrebno mu je dati prava za to. Najbolji način da to učinite je dodijeliti IAM ulogu. Stoga biste na zaslonu Korak 3: Konfigurirajte pojedinosti instance trebali odabrati Stvorite novu IAM ulogu:
Stvaranje IAM uloge za EC2
U prozoru koji se otvori odaberite da stvaramo novu ulogu za EC2 i idite na odjeljak Dozvole:
Koristeći primjer obuke, ne moramo ulaziti u sve zamršenosti detaljne konfiguracije prava na resurse, pa ćemo odabrati pravila koja je unaprijed konfigurirao Amazon: AmazonKinesisFullAccess i CloudWatchFullAccess.
Dajmo neko smisleno ime za ovu ulogu, na primjer: EC2-KinesisStreams-FullAccess. Rezultat bi trebao biti isti kao što je prikazano na slici ispod:
Nakon što stvorite ovu novu ulogu, nemojte je zaboraviti priložiti stvorenoj instanci virtualnog stroja:
Ne mijenjamo ništa više na ovom zaslonu i prelazimo na sljedeće prozore.
Postavke tvrdog diska mogu se ostaviti kao zadane, kao i oznake (iako je dobra praksa koristiti oznake, barem dodijelite naziv instanci i označite okruženje).
Sada smo na kartici Korak 6: Konfigurirajte sigurnosnu grupu, gdje trebate stvoriti novu ili navesti postojeću sigurnosnu grupu, koja vam omogućuje povezivanje putem ssh-a (port 22) na instancu. Tamo odaberite Izvor -> Moj IP i možete pokrenuti instancu.
Čim se prebaci u status rada, možete se pokušati spojiti na njega putem ssh-a.
Kako biste mogli raditi s Kinesis Agentom, nakon uspješnog spajanja na uređaj morate unijeti sljedeće naredbe u terminal:
Kao što se može vidjeti iz konfiguracijske datoteke, agent će nadzirati datoteke s ekstenzijom .log u direktoriju /var/log/airline_tickets/, analizirati ih i prenijeti u tok airline_tickets.
Ponovno pokrećemo uslugu i uvjeravamo se da radi i radi:
sudo service aws-kinesis-agent restart
Sada preuzmimo Python skriptu koja će tražiti podatke iz API-ja:
Skripta api_caller.py traži podatke od Aviasalesa i sprema primljeni odgovor u direktorij koji Kinesis agent skenira. Implementacija ove skripte prilično je standardna, postoji klasa TicketsApi, omogućuje vam asinkrono povlačenje API-ja. Ovoj klasi prosljeđujemo zaglavlje s tokenom i zahtijevamo parametre:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Kako bismo testirali ispravne postavke i funkcionalnost agenta, testirajmo skriptu api_caller.py:
sudo ./api_caller.py TOKEN
I gledamo rezultat rada u zapisnicima agenta i na kartici Praćenje u toku podataka airline_tickets:
Kao što vidite, sve radi i Kinesis Agent uspješno šalje podatke u stream. Sada konfigurirajmo potrošača.
Postavljanje Kinesis Data Analytics
Prijeđimo na središnju komponentu cijelog sustava - kreirajte novu aplikaciju u Kinesis Data Analytics pod nazivom kinesis_analytics_airlines_app:
Kinesis Data Analytics omogućuje vam izvođenje analitike podataka u stvarnom vremenu iz Kinesis Streamsa pomoću SQL jezika. To je usluga potpunog automatskog skaliranja (za razliku od Kinesis Streamsa) koja:
omogućuje stvaranje novih tokova (Output Stream) na temelju zahtjeva za izvor podataka;
pruža stream s pogreškama koje su se dogodile tijekom rada aplikacija (Error Stream);
može automatski odrediti shemu ulaznih podataka (po potrebi se može ručno redefinirati).
Ovo nije jeftina usluga - 0.11 USD po satu rada, pa je koristite pažljivo i obrišite kada završite.
Povežimo aplikaciju s izvorom podataka:
Odaberite stream na koji ćemo se spojiti (avionske_karte):
Zatim trebate priložiti novu IAM ulogu kako bi aplikacija mogla čitati iz toka i pisati u tok. Da biste to učinili, dovoljno je ne mijenjati ništa u bloku Dopuštenja pristupa:
Zatražimo sada otkrivanje sheme podataka u streamu; da biste to učinili, kliknite na gumb "Otkrij shemu". Kao rezultat toga, IAM uloga će se ažurirati (stvorit će se nova) i pokrenut će se otkrivanje sheme iz podataka koji su već stigli u stream:
Sada morate otići u SQL editor. Kada kliknete na ovaj gumb, pojavit će se prozor s upitom da pokrenete aplikaciju - odaberite što želite pokrenuti:
Umetnite sljedeći jednostavni upit u prozor SQL uređivača i kliknite Spremi i pokreni SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
U relacijskim bazama podataka radite s tablicama pomoću naredbi INSERT za dodavanje zapisa i naredbe SELECT za upit podataka. U Amazon Kinesis Data Analyticsu radite s tokovima (STREAMs) i pumpama (PUMPs)—kontinuiranim zahtjevima za umetanje koji umeću podatke iz jednog toka u aplikaciji u drugi tok.
Gornji SQL upit traži karte za Aeroflot po cijeni ispod pet tisuća rubalja. Svi zapisi koji zadovoljavaju ove uvjete bit će smješteni u tok DESTINATION_SQL_STREAM.
U bloku Destination odaberite tok special_stream, a na padajućem popisu In-application stream name DESTINATION_SQL_STREAM:
Rezultat svih manipulacija trebao bi biti nešto slično donjoj slici:
Stvaranje i pretplata na SNS temu
Idite na Simple Notification Service i tamo stvorite novu temu pod nazivom Airlines:
Pretplatite se na ovu temu i navedite broj mobitela na koji će se slati SMS obavijesti:
Napravite tablicu u DynamoDB-u
Kako bismo pohranili neobrađene podatke iz njihovog toka airline_tickets, stvorimo tablicu u DynamoDB-u s istim imenom. Koristit ćemo record_id kao primarni ključ:
Stvaranje kolektora lambda funkcija
Kreirajmo lambda funkciju pod nazivom Collector, čija će zadaća biti propitivanje streama airline_tickets i, ako se tamo pronađu novi zapisi, umetanje tih zapisa u DynamoDB tablicu. Očito, osim zadanih prava, ova lambda mora imati pristup čitanju protoka podataka Kinesis i pristup pisanju DynamoDB-u.
Stvaranje IAM uloge za kolektorsku lambda funkciju
Prvo, stvorimo novu IAM ulogu za lambda pod nazivom Lambda-TicketsProcessingRole:
Za testni primjer, unaprijed konfigurirana pravila AmazonKinesisReadOnlyAccess i AmazonDynamoDBFullAccess sasvim su prikladna, kao što je prikazano na slici ispod:
Ovu lambdu trebao bi pokrenuti okidač iz Kinesisa kada novi unosi uđu u airline_stream, pa moramo dodati novi okidač:
Ostaje samo ubaciti kod i spremiti lambdu.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Izrada obavijesti o lambda funkciji
Druga lambda funkcija, koja će nadzirati drugi tok (special_stream) i poslati obavijest SNS-u, kreira se na sličan način. Dakle, ova lambda mora imati pristup čitanju iz Kinesisa i slanju poruka na zadanu SNS temu, koje će onda SNS usluga slati svim pretplatnicima ove teme (e-mail, SMS, itd.).
Stvaranje IAM uloge
Najprije stvaramo IAM ulogu Lambda-KinesisAlarm za ovu lambdu, a zatim tu ulogu dodjeljujemo lambda alarm_notifieru koji se stvara:
Ova lambda bi trebala raditi na okidaču za nove zapise da uđu u special_stream, tako da morate konfigurirati okidač na isti način kao što smo mi učinili za Collector lambda.
Kako bismo olakšali konfiguriranje ove lambda, uvedimo novu varijablu okruženja - TOPIC_ARN, gdje postavljamo ANR (Amazon Recourse Names) teme Airlines:
I umetnite lambda kod, uopće nije komplicirano:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Čini se da je tu ručna konfiguracija sustava završena. Ostaje samo testirati i uvjeriti se da smo sve ispravno konfigurirali.
Implementiraj iz koda Terraform
Obavezna priprema
Terraform je vrlo zgodan alat otvorenog koda za postavljanje infrastrukture iz koda. Ima vlastitu sintaksu koju je lako naučiti i ima mnogo primjera kako i što implementirati. Atom editor ili Visual Studio Code ima mnogo praktičnih dodataka koji olakšavaju rad s Terraformom.
Distribuciju možete preuzeti stoga. Detaljna analiza svih Terraform mogućnosti je izvan opsega ovog članka, pa ćemo se ograničiti na glavne točke.
Kako započeti
Puni kod projekta je u mom spremištu. Kloniramo spremište prema sebi. Prije početka, morate biti sigurni da imate instaliran i konfiguriran AWS CLI, jer... Terraform će tražiti vjerodajnice u datoteci ~/.aws/credentials.
Dobra praksa je pokrenuti naredbu plana prije postavljanja cijele infrastrukture da vidimo što Terraform trenutno stvara za nas u oblaku:
terraform.exe plan
Od vas će se tražiti da unesete telefonski broj na koji ćete slati obavijesti. Nije ga potrebno unijeti u ovoj fazi.
Nakon što smo analizirali plan rada programa, možemo započeti sa stvaranjem resursa:
terraform.exe apply
Nakon slanja ove naredbe, od vas će se ponovno tražiti da unesete telefonski broj; birajte "da" kada se prikaže pitanje o stvarnom izvođenju radnji. To će vam omogućiti da postavite cjelokupnu infrastrukturu, provedete svu potrebnu konfiguraciju EC2, postavite lambda funkcije itd.
Nakon što su svi resursi uspješno kreirani kroz Terraform kod, potrebno je ući u detalje aplikacije Kinesis Analytics (nažalost, nisam našao kako to učiniti izravno iz koda).
Pokrenite aplikaciju:
Nakon toga morate izričito postaviti naziv toka unutar aplikacije odabirom s padajućeg popisa:
Sada je sve spremno za polazak.
Testiranje aplikacije
Bez obzira na to kako ste postavili sustav, ručno ili putem Terraform koda, radit će isto.
Logiramo se putem SSH-a na EC2 virtualni stroj na kojem je instaliran Kinesis Agent i pokrećemo skriptu api_caller.py
sudo ./api_caller.py TOKEN
Sve što trebate učiniti je pričekati SMS na vaš broj:
SMS - poruka stiže na vaš telefon za skoro 1 minutu:
Ostaje vidjeti jesu li zapisi spremljeni u DynamoDB bazu podataka za naknadnu, detaljniju analizu. Tablica airline_tickets sadrži otprilike sljedeće podatke:
Zaključak
Tijekom obavljenog posla izgrađen je online sustav za obradu podataka baziran na Amazon Kinesis. Razmotrene su mogućnosti korištenja Kinesis Agenta u kombinaciji s Kinesis Data Streams i analitikom Kinesis Analytics u stvarnom vremenu pomoću SQL naredbi, kao i interakcija Amazon Kinesis s drugim AWS servisima.
Gornji sustav implementirali smo na dva načina: prilično dugačak ručni i brzi iz Terraform koda.