ProHoster > Blog > Uprava > Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Integracija API-ja Aviasales z Amazon Kinesis in preprostost brez strežnika
Pozdravljeni, Habr!
Ali radi letite z letali? Všeč mi je, a med samoizolacijo sem se zaljubil tudi v analiziranje podatkov o letalskih vozovnicah enega znanega vira - Aviasales.
Danes bomo analizirali delo Amazon Kinesis, zgradili pretočni sistem z analitiko v realnem času, namestili bazo podatkov Amazon DynamoDB NoSQL kot glavno shrambo podatkov in nastavili SMS obvestila za zanimive vstopnice.
Vsi detajli so pod krojem! Pojdi!
Predstavitev
Za primer potrebujemo dostop do Aviasales API. Dostop do njega je na voljo brezplačno in brez omejitev; samo registrirati se morate v razdelku »Razvijalci«, da prejmete žeton API za dostop do podatkov.
Glavni namen tega članka je podati splošno razumevanje uporabe pretakanja informacij v AWS; upoštevamo, da podatki, ki jih vrne uporabljeni API, niso strogo posodobljeni in se prenašajo iz predpomnilnika, ki je oblikovan na podlagi iskanj uporabnikov spletnih mest Aviasales.ru in Jetradar.com v zadnjih 48 urah.
Kinesis-agent, nameščen na proizvodnem stroju, prejet prek API-ja, bo samodejno razčlenil in posredoval podatke v želeni tok prek Kinesis Data Analytics. Neobdelana različica tega toka bo zapisana neposredno v trgovino. Shranjevanje neobdelanih podatkov, uvedeno v DynamoDB, bo omogočilo globljo analizo vstopnic prek orodij BI, kot je AWS Quick Sight.
Upoštevali bomo dve možnosti za postavitev celotne infrastrukture:
Ročno - preko AWS Management Console;
Infrastruktura iz kode Terraform je za lene avtomatizatorje;
Arhitektura razvitega sistema
Uporabljene komponente:
Aviasales API — podatki, ki jih vrne ta API, bodo uporabljeni za vse nadaljnje delo;
Primerek proizvajalca EC2 — običajni virtualni stroj v oblaku, na katerem se bo generiral vhodni podatkovni tok:
Agent Kinesis je aplikacija Java, nameščena lokalno na napravi, ki omogoča preprost način zbiranja in pošiljanja podatkov v Kinesis (Kinesis Data Streams ali Kinesis Firehose). Agent nenehno spremlja nabor datotek v določenih imenikih in pošilja nove podatke v Kinesis;
API klicateljski skript — Skript Python, ki pošlje zahteve API-ju in postavi odgovor v mapo, ki jo nadzoruje Kinesis Agent;
Podatkovni tokovi Kinesis — storitev pretakanja podatkov v realnem času s širokimi zmožnostmi skaliranja;
Analitika kineze je storitev brez strežnika, ki poenostavi analizo pretočnih podatkov v realnem času. Amazon Kinesis Data Analytics konfigurira vire aplikacije in se samodejno prilagodi za obdelavo katere koli količine dohodnih podatkov;
AWS Lambda — storitev, ki vam omogoča zagon kode brez varnostnega kopiranja ali nastavitve strežnikov. Vsa računalniška moč se samodejno poveča za vsak klic;
Amazon DynamoDB - Podatkovna baza parov ključ-vrednost in dokumentov, ki zagotavlja zakasnitev manj kot 10 milisekund pri izvajanju v katerem koli obsegu. Ko uporabljate DynamoDB, vam ni treba zagotoviti, popraviti ali upravljati nobenih strežnikov. DynamoDB samodejno skalira tabele, da prilagodi količino razpoložljivih virov in ohrani visoko zmogljivost. Sistemska administracija ni potrebna;
Amazonska SNS - popolnoma upravljana storitev za pošiljanje sporočil po modelu izdajatelj-naročnik (Pub/Sub), s katero lahko izolirate mikrostoritve, porazdeljene sisteme in brezstrežniške aplikacije. SNS se lahko uporablja za pošiljanje informacij končnim uporabnikom prek mobilnih potisnih obvestil, sporočil SMS in e-pošte.
Začetno usposabljanje
Za posnemanje pretoka podatkov sem se odločil uporabiti informacije o letalskih vozovnicah, ki jih vrne API Aviasales. IN dokumentacijo precej obsežen seznam različnih metod, vzemimo eno od njih - "Koledar mesečnih cen", ki vrne cene za vsak dan v mesecu, razvrščene po številu prenosov. Če v zahtevku ne navedete meseca iskanja, se vrnejo podatki za mesec, ki sledi tekočemu.
Zgornji način prejemanja podatkov iz API-ja z navedbo žetona v zahtevi bo deloval, vendar raje posredujem žeton dostopa skozi glavo, zato bomo to metodo uporabili v skriptu api_caller.py.
Zgornji primer odgovora API-ja prikazuje vozovnico iz St. Petersburga v Phuk ... Oh, kakšne sanje ...
Ker sem iz Kazana in je Phuket zdaj "samo sanje", poiščimo karte od Sankt Peterburga do Kazana.
Predpostavlja, da že imate račun AWS. Takoj želim posebej opozoriti, da Kinesis in pošiljanje obvestil preko SMS nista vključena v letno Brezplačna stopnja (brezplačna uporaba). Toda tudi kljub temu je z nekaj dolarji v mislih povsem mogoče sestaviti predlagani sistem in se z njim igrati. In seveda ne pozabite izbrisati vseh virov, ko jih ne potrebujete več.
Na srečo bosta funkciji DynamoDb in lambda za nas brezplačni, če dosežemo svoje mesečne brezplačne omejitve. Na primer za DynamoDB: 25 GB prostora za shranjevanje, 25 WCU/RCU in 100 milijonov poizvedb. In milijon klicev lambda funkcije na mesec.
Ročna namestitev sistema
Nastavitev podatkovnih tokov Kinesis
Pojdimo v storitev Kinesis Data Streams in ustvarimo dva nova toka, en shard za vsakega.
Kaj je shard?
Shard je osnovna enota za prenos podatkov toka Amazon Kinesis. En segment zagotavlja vhodni prenos podatkov s hitrostjo 1 MB/s in izhodni prenos podatkov s hitrostjo 2 MB/s. En segment podpira do 1000 vnosov PUT na sekundo. Ko ustvarjate podatkovni tok, morate določiti potrebno število segmentov. Ustvarite lahko na primer tok podatkov z dvema segmentoma. Ta tok podatkov bo zagotavljal prenos vhodnih podatkov pri 2 MB/s in prenos izhodnih podatkov pri 4 MB/s ter podpira do 2000 zapisov PUT na sekundo.
Več kot je drobcev v vašem toku, večja je njegova prepustnost. Načeloma se tokovi skalirajo tako - z dodajanjem shardov. Toda več kot imate drobcev, višja je cena. Vsak delček stane 1,5 centa na uro in dodatnih 1.4 centa za vsak milijon enot tovora PUT.
Ustvarimo nov tok z imenom letalske_vozovnice, 1 shard mu bo dovolj:
Zdaj pa ustvarimo še eno nit z imenom poseben_tok:
Nastavitev proizvajalca
Za analizo naloge je dovolj, da kot proizvajalec podatkov uporabite običajni primerek EC2. Ni nujno, da gre za zmogljiv in drag virtualni stroj; točkovni t2.micro bo povsem v redu.
Pomembna opomba: na primer, uporabite sliko - Amazon Linux AMI 2018.03.0, ima manj nastavitev za hiter zagon Kinesis Agenta.
Pojdite na storitev EC2, ustvarite nov virtualni stroj, izberite želeni AMI z vrsto t2.micro, ki je vključen v Free Tier:
Da bi novo ustvarjeni virtualni stroj lahko komuniciral s storitvijo Kinesis, mora imeti za to podeljene pravice. Najboljši način za to je dodelitev vloge IAM. Zato morate na zaslonu 3. korak: Konfiguracija podrobnosti primerka izbrati Ustvari novo vlogo IAM:
Ustvarjanje vloge IAM za EC2
V oknu, ki se odpre, izberite, da ustvarjamo novo vlogo za EC2, in pojdite na razdelek Dovoljenja:
Z uporabo primera usposabljanja se nam ni treba spuščati v vse zapletenosti natančne konfiguracije pravic do virov, zato bomo izbrali pravilnike, ki jih je vnaprej konfiguriral Amazon: AmazonKinesisFullAccess in CloudWatchFullAccess.
Dajmo tej vlogi kakšno smiselno ime, na primer: EC2-KinesisStreams-FullAccess. Rezultat mora biti enak, kot je prikazano na spodnji sliki:
Ko ustvarite to novo vlogo, je ne pozabite priložiti ustvarjenemu primerku navideznega stroja:
Na tem zaslonu ne spremenimo ničesar drugega in se premaknemo na naslednja okna.
Nastavitve trdega diska lahko pustite kot privzete, prav tako oznake (čeprav je uporaba oznak dobra praksa, dajte primerku vsaj ime in navedite okolje).
Zdaj smo na zavihku 6. korak: Konfigurirajte varnostno skupino, kjer morate ustvariti novo ali določiti obstoječo varnostno skupino, ki vam omogoča povezavo prek ssh (vrata 22) s primerkom. Tam izberite Vir -> Moj IP in lahko zaženete primerek.
Takoj, ko preklopi v stanje delovanja, se lahko poskusite z njim povezati prek ssh.
Za delo s Kinesis Agentom morate po uspešni povezavi z napravo v terminal vnesti naslednje ukaze:
Kot je razvidno iz konfiguracijske datoteke, bo agent spremljal datoteke s končnico .log v imeniku /var/log/airline_tickets/, jih razčlenil in prenesel v tok airline_tickets.
Ponovno zaženemo storitev in se prepričamo, da deluje in deluje:
sudo service aws-kinesis-agent restart
Zdaj pa prenesimo skript Python, ki bo zahteval podatke iz API-ja:
Skript api_caller.py zahteva podatke od Aviasales in shrani prejeti odgovor v imenik, ki ga skenira agent Kinesis. Izvedba tega skripta je precej standardna, obstaja razred TicketsApi, ki vam omogoča asinhrono vlečenje API-ja. Temu razredu posredujemo glavo z žetonom in zahtevamo parametre:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Za preizkus pravilnih nastavitev in delovanja agenta poskusno zaženimo skript api_caller.py:
sudo ./api_caller.py TOKEN
In pogledamo rezultat dela v dnevnikih agenta in na zavihku Monitoring v podatkovnem toku airline_tickets:
Kot lahko vidite, vse deluje in Kinesis Agent uspešno pošilja podatke v tok. Zdaj pa konfigurirajmo potrošnika.
Nastavitev Kinesis Data Analytics
Preidimo na osrednjo komponento celotnega sistema – ustvarite novo aplikacijo v Kinesis Data Analytics z imenom kinesis_analytics_airlines_app:
Kinesis Data Analytics vam omogoča izvajanje analize podatkov v realnem času iz Kinesis Streams z uporabo jezika SQL. Je popolnoma samodejna storitev (za razliko od Kinesis Streams), ki:
omogoča ustvarjanje novih tokov (Output Stream) na podlagi zahtev izvornih podatkov;
zagotavlja tok z napakami, ki so se pojavile med delovanjem aplikacij (Error Stream);
lahko samodejno določi shemo vhodnih podatkov (po potrebi jo lahko ročno redefiniramo).
To ni poceni storitev - 0.11 USD na uro dela, zato jo uporabljajte previdno in jo izbrišite, ko končate.
Povežimo aplikacijo z virom podatkov:
Izberite tok, s katerim se bomo povezali (letalske_vozovnice):
Nato morate priložiti novo vlogo IAM, da lahko aplikacija bere iz toka in piše v tok. Če želite to narediti, je dovolj, da v bloku dovoljenj za dostop ne spremenite ničesar:
Zdaj pa zahtevajmo odkritje podatkovne sheme v toku; za to kliknite gumb »Odkrij shemo«. Posledično bo vloga IAM posodobljena (ustvarjena bo nova) in zaznavanje sheme se bo zagnalo iz podatkov, ki so že prispeli v tok:
Zdaj morate iti v urejevalnik SQL. Ko kliknete na ta gumb, se prikaže okno, v katerem morate zagnati aplikacijo – izberite, kaj želite zagnati:
V okno urejevalnika SQL vstavite naslednjo preprosto poizvedbo in kliknite Shrani in zaženi SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
V relacijskih bazah podatkov delate s tabelami z uporabo stavkov INSERT za dodajanje zapisov in stavkov SELECT za poizvedovanje podatkov. V storitvi Amazon Kinesis Data Analytics delate s tokovi (STREAMs) in črpalkami (PUMPs) – neprekinjene zahteve za vstavljanje, ki vstavljajo podatke iz enega toka v aplikaciji v drug tok.
Zgoraj predstavljena poizvedba SQL išče vozovnice Aeroflota po ceni pod pet tisoč rubljev. Vsi zapisi, ki izpolnjujejo te pogoje, bodo postavljeni v tok DESTINATION_SQL_STREAM.
V bloku Destination izberite tok special_stream in na spustnem seznamu In-application stream name DESTINATION_SQL_STREAM:
Rezultat vseh manipulacij bi moral biti podoben spodnji sliki:
Ustvarjanje in naročanje na temo SNS
Pojdite na Simple Notification Service in tam ustvarite novo temo z imenom Airlines:
Naročite se na to temo in navedite številko mobilnega telefona, na katero bodo poslana SMS obvestila:
Ustvarite tabelo v DynamoDB
Za shranjevanje neobdelanih podatkov iz njihovega toka airline_tickets ustvarimo tabelo v DynamoDB z istim imenom. Kot primarni ključ bomo uporabili record_id:
Ustvarjanje zbiralnika funkcij lambda
Ustvarimo lambda funkcijo, imenovano Collector, katere naloga bo anketirati tok airline_tickets in, če so tam najdeni novi zapisi, vstaviti te zapise v tabelo DynamoDB. Očitno mora imeti ta lambda poleg privzetih pravic dostop za branje do toka podatkov Kinesis in dostop za pisanje do DynamoDB.
Ustvarjanje vloge IAM za funkcijo zbiralnika lambda
Najprej ustvarimo novo vlogo IAM za lambdo z imenom Lambda-TicketsProcessingRole:
Za testni primer sta prednastavljena pravilnika AmazonKinesisReadOnlyAccess in AmazonDynamoDBFullAccess precej primerna, kot je prikazano na spodnji sliki:
To lambdo bi moral zagnati sprožilec Kinesis, ko novi vnosi vstopijo v airline_stream, zato moramo dodati nov sprožilec:
Ostane le še vstavljanje kode in shranjevanje lambde.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Ustvarjanje obvestila o funkciji lambda
Druga lambda funkcija, ki bo spremljala drugi tok (special_stream) in poslala obvestilo SNS, je ustvarjena na podoben način. Zato mora ta lambda imeti dostop do branja iz Kinesisa in pošiljanja sporočil v določeno temo SNS, ki jih bo nato storitev SNS poslala vsem naročnikom te teme (e-pošta, SMS itd.).
Ustvarjanje vloge IAM
Najprej ustvarimo vlogo IAM Lambda-KinesisAlarm za to lambda, nato pa to vlogo dodelimo lambda alarm_notifier, ki se ustvarja:
Ta lambda bi morala delovati na sprožilec za vstop novih zapisov v special_stream, zato morate konfigurirati sprožilec na enak način, kot smo naredili za lambdo Collector.
Da bi olajšali konfiguracijo te lambde, uvedimo novo spremenljivko okolja - TOPIC_ARN, kamor postavimo ANR (imena virov Amazon) teme Airlines:
In vstavite lambda kodo, sploh ni zapleteno:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Zdi se, da je tukaj ročna konfiguracija sistema končana. Vse kar ostane je, da preizkusimo in se prepričamo, da smo vse pravilno konfigurirali.
Razmestite iz kode Terraform
Potrebna priprava
Terraform je zelo priročno odprtokodno orodje za uvajanje infrastrukture iz kode. Ima lastno sintakso, ki se je enostavno naučiti, in ima veliko primerov, kako in kaj uvesti. Urejevalnik Atom ali Visual Studio Code ima veliko priročnih vtičnikov, ki olajšajo delo s Terraformom.
Distribucijo lahko prenesete zato. Podrobna analiza vseh zmogljivosti Terraform presega obseg tega članka, zato se bomo omejili na glavne točke.
Kako začeti
Celotna koda projekta je v mojem skladišču. Repozitorij kloniramo sebi. Preden začnete, se morate prepričati, da imate nameščen in konfiguriran AWS CLI, ker ... Terraform bo iskal poverilnice v datoteki ~/.aws/credentials.
Dobra praksa je, da pred uvedbo celotne infrastrukture zaženete ukaz plan, da vidite, kaj Terraform trenutno ustvarja za nas v oblaku:
terraform.exe plan
Pozvani boste, da vnesete telefonsko številko, na katero želite pošiljati obvestila. Na tej stopnji ga ni treba vnesti.
Ko analiziramo načrt delovanja programa, lahko začnemo ustvarjati vire:
terraform.exe apply
Po pošiljanju tega ukaza boste znova pozvani, da vnesete telefonsko številko; izberite »da«, ko se prikaže vprašanje o dejanskem izvajanju dejanj. To vam bo omogočilo nastavitev celotne infrastrukture, izvedbo vseh potrebnih konfiguracij EC2, uvedbo lambda funkcij itd.
Ko so vsi viri uspešno ustvarjeni s kodo Terraform, se morate poglobiti v podrobnosti aplikacije Kinesis Analytics (žal nisem našel, kako to narediti neposredno iz kode).
Zaženite aplikacijo:
Po tem morate izrecno nastaviti ime toka v aplikaciji tako, da izberete na spustnem seznamu:
Zdaj je vse pripravljeno za odhod.
Testiranje aplikacije
Ne glede na to, kako ste uvedli sistem, ročno ali prek kode Terraform, bo deloval enako.
Preko SSH se prijavimo na virtualni stroj EC2, kjer je nameščen Kinesis Agent in zaženemo skripto api_caller.py
sudo ./api_caller.py TOKEN
Vse kar morate storiti je, da počakate na SMS na vašo številko:
SMS - sporočilo prispe na vaš telefon v skoraj 1 minuti:
Videti je treba, ali so bili zapisi shranjeni v bazi podatkov DynamoDB za kasnejšo, podrobnejšo analizo. Tabela airline_tickets vsebuje približno naslednje podatke:
Zaključek
V okviru opravljenega dela je bil zgrajen sistem za spletno obdelavo podatkov, ki temelji na Amazon Kinesis. Upoštevane so bile možnosti za uporabo Kinesis Agenta v povezavi s Kinesis Data Streams in analitiko Kinesis Analytics v realnem času z uporabo ukazov SQL ter interakcija Amazon Kinesis z drugimi storitvami AWS.
Zgornji sistem smo uvedli na dva načina: precej dolgega ročnega in hitrega iz kode Terraform.