ProHoster > Блог > administratio > Aviasales API integratio cum Amazonum Kinesis et simplicitate inserviens
Aviasales API integratio cum Amazonum Kinesis et simplicitate inserviens
Heus Habr!
Placetne tibi aëronavibus volans? Ego eam amo, sed per se-solitudinem etiam in amorem incidi analysin datas super tesseras aereas ab uno notissimo subsidio - Aviasales.
Hodie opus Kinesis Amazonum resolvemus, systema profusum cum analyticis realibus aedificabimus, Amazonum DynamoDB NoSQL datorum instituemus ut principales notitias repositionis, et notificationes SMS pro tesseras interesting erexerit.
Singula omnia sub cut! Perge!
introduction
Exempli gratia, aditus ad Aviasales API. Accessus ad id sine pretio et sine restrictionibus providetur: vos iustus postulo ut subcriptio in sectione "Developers" tuum API signum recipiendi ad notitias accedendi.
Praecipuum huius articuli propositum est ad generalem intelligentiam de usu informationum in AWS profluentium, consideremus notitias ab API adhibitas redditas non esse stricte sursum ut- obsoletas et e conditorio transmissas, quod est. formatae ex quaesitis ab usoribus Aviasales.ru et Jetradar.com situs pro ultimis 48 horis.
Kinesis-agens, in machina producenda inauguratus, per API acceptam sponte parse faciet et data ad desideratum rivum per Kinesis Data Analytics tradet. Rudis huius rivi versio inscripta erit in copia. Rudis notitia repositionis in DynamoDB explicata per instrumenta tesserae altioris BI permittet, qualia sunt AWS Velox Visus.
Duo optiones infrastructurae totam explicandam considerabimus:
Manual - per AWS Management Console;
Infrastructura e Terraform codice pro pigris automatoribus est;
Architectura systematis enucleatae
Components usus est:
Aviasales API - notitia hac API reddita omnibus subsequentibus operi adhibebitur;
EC2 Producentis instantiam - machina virtualis regularis in nube in qua generabitur initus notitia rivus;
Kinesis Agent applicatio Javae localiter in machina inaugurata est, quae viam facilem praebet ad colligendas et datas Kinesis mittendas (Kinesis Data Streams vel Kinesis Firehose). Agens constanter monitores tabulariorum in directoriis determinatis et novas notitias Kinesis mittit;
API RECENS Script — Scriptum Python quod API petit et responsionem ponit in folder quod ab agente Kinesis monitore est;
Kinesis Data rivi - real-time data officia effusis latissime scandendis facultatibus;
Kinesis Analytics serviens servitus est quae simpliciorem efficit analysin profluentem datam in tempore reali. Amazon Kinesis Data Analytics applicandi facultates configurat et automatice squamas ad quodlibet volumen notitiarum advenientis tractandum;
Lambda AWS - officium quod permittit ut codicem currere sine tergum sursum vel in servientibus constituere. Omnis potestas computandi ad singulas vocationes automatice scandet;
Amazon DynamoDB - Datorum clavium valoris paria et documenta quae latentiam praebet minus quam 10 millisecondorum cum quavis scala currentem. Cum DynamoDB utens, provisione, commissura, nec ullis servientibus administrare non debes. DynamoDB automatice mensas squamas ad quantitatem facultatum promptarum accommodandas ac altam observantiam obtinent. Nulla ratio administrationis exigitur;
Amazon SNS - munus plene administratum ad mittendas epistulas utens exemplar publisher-sub/sub/scribentis, quocum microservias segregare potes, systemata et applicationes ministrativas distribui. SNS informationes mittere possunt ad finem users per mobiles notificationes, nuntios SMS et electronicas mittere.
Coepi disciplina
Ad aemulandum notitiarum fluxus, tesseram airline nuntiorum per Aviasales API redditos uti decrevi. IN' documentum satis amplum diversorum rationum indicem, unum ex iis ducamus - "Censarium menstruale", quod reddit pretium pro quolibet die mensis, a numero translationum. Si mensem inquisitionis in instantia non indicas, informationes reddentur pro mense currenti sequenti.
Itaque inscriptae sint et nostrum signum capiamus.
Superior methodus recipiendi notitias ex API denotando signum in petitione operabitur, sed accessum per caput transire malo, sic hac methodo in api_caller.py scripto utemur.
Exemplum API responsio supra ostendit tesseram a St. Petersburg ad Phuk... O quam somnium...
Cum de Kazan sum, et Phuket nunc "solum somnium est", tesseras St. Petersburg in Kazan exspecto.
Ponit te iam rationem habere AWS. Velimus statim specialem attentionem habere in Kinesis et notificationes per SMS mittere in annuo non comprehendi. Free Tier (liberum usum). Sed hoc tamen, duobus dollariis mente proposita ratio proposita aedificare potest et ludere cum eo. Et sane noli oblivisci omnes facultates delere postquam iam non sunt necessariae.
Fortunate, DynamoDb et lambda munera nobis libera erunt si fines nostros menstruos liberos occurremus. Exempli gratia, pro DynamoDB: 25 GB repositionis, 25 WCU/RCU et 100 decies centena millia quaesitorum. Et decies centena millia lambda munus vocat invidunt.
Manual systema instruere
Kinesis data rivi erigerent
Eamus ad Kinesis Data Fluminum ministerium et duos novos rivos creandos, unum per se squamulum.
quid testa?
Pervalida est prima notitia unitatis translationis amnis Amazonis Kinesis. Segmentum unum praebet input notitia translationis ad celeritatem 1 MB/s et notitia translationis output ad celeritatem 2 MB/s. Segmentum unum sustinet usque ad 1000 PUT entries per alterum. Cum datam rivum creando, numerum segmentorum denotare debes. Exempli gratia, cum duobus segmentis rivum notitia creare potes. Haec notitia stream providebit input data translationis ad 2 MB/s et output data translationis ad 4 MB/s, sustinens usque ad 2000 PUT records secundo per.
Quo plus shards in tuo flumine, eo plus habet throughput. Principio, haec quomodo profluvia sunt, squamae, addendo lithargyri. quanto autem plus shards habes, tanto pluris pretium. Quaelibet testa constat 1,5 cents per horam et additis 1.4 cents pro singulis miliones PONO payload unitatibus.
Novo nomine amnis faciamus airline_tickets, I testae satis erit ei;
Nunc aliud linum nomine faciamus special_stream:
Producentis setup
Ad analysim molis, satis est iustam EC2 instantiam uti notitia producentis. Machinam virtualem esse potentem non habet, t2.micro macula multa denique faciet.
Nota magna: exempli gratia, imagine uti debes - Linux Amazon AMI 2018.03.0, pauciores habet occasus ut Kinesis Agent cito deducendo.
Vade ad ministerium EC2, novam machinam virtualem crea, desideratum AMI cum typo t2.micro elige, quod in libero Tier comprehenditur:
Ut machinae virtualis nuper creatae cum servitio Kinesis correspondere possint, iura praestanda sunt. Optime hoc facere est munus IAM assignare. Ideo in Gradu III: Configure Instantia Details screen, eligere debes IAM Partes novas crea:
Creando IAM munus pro EC2
In fenestra aperiente, elige nos novum munus creare pro EC2 et ad sectionem Permissionum ire:
Exercitatio exemplo utens, omnes ambages granularis configurationis iurium subsidiorum ire non debemus, sic rationes eligemus prae-figuratas ab Amazon: AmazonKinesisFull Access et CloudWatchFullAccess.
Nomen aliquod significantius huic muneri demus, exempli gratia: EC2-KinesisStreams-FullAccess. Idem evenit ut in tabula infra ostendetur.
Post hanc novam condicionem creando, noli oblivisci ut exemplum machinae virtualis creato adiungatur:
Nihil aliud in hoc screen mutamus et in proximas fenestras transigimus.
Occasus ferreus coegi potest pro defectu relinqui, ac textilia (quamvis usu tags uti bona est, saltem instantia nomen dat et ambitum indicabit).
Nunc sumus in Gradu 6: Configurare Securitatis Tab Group, ubi debes novam creare vel denotare coetus Securitatis existentis, quod permittit te ad instanciam per ssh (portum 22) coniungere. Lego Source -> IP My ibi et exempli gratia deduci potes.
Cum primum virgas ad statum currit, illi per ssh coniungere potes.
Posse cum Kinesis Agent elaborare, postquam machinam feliciter connectens, sequentia mandata in termino inire debes:
Ut ex file configurationis videri potest, agens lima monitorem cum .loga extensionis in /var/log/airline_tickets/ indicis reddet, parse eas et eas in porttitor_ticet amnis transferet.
Nos servitium sileo et fac quod sursum est et currit;
sudo service aws-kinesis-agent restart
Nunc de scriptore Pythone quod notitias ex API petet:
Litterae api_caller.py postulationes datae ab Aviasales et receptam responsionem servat in indicem quod agens Kinesis lustrat. Exsecutio huius scripti satis vexillum est, genus est TicketsApi, sinit te asynchronously API trahere. Transimus caput cum signo et rogatu parametri huius ordinis:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Ut probetur occasus et functiones agentis, test run scriptor api_caller.py:
sudo ./api_caller.py TOKEN
Et spectamus eventum operis in lignis agentis et in vigilantia tab in in porttitor_tickets data fluminis:
Ut videre potes, omnia opera feliciter et Kinesis Agent data ad rivum mittit. Nunc gravida lobortis dolor.
Kinesis Data Analytics
Transeamus ad systema centrale totius systematis - novam applicationem in Kinesis Data Analytics nomine kinesis_analytics_airlines_app:
Kinesis Data Analytica permittit ut notitias reales temporis analyticas praestare ex Kinesis Fluviis lingua SQL utens. Est officium plene autoscaling (dissimilis rivi Kinesis) illud:
sino vos novos rivos creare (Output Stream) in petitionibus ad fontem data;
rivulum errorum praebet, qui applicationes accurrebant (Error Stream);
automatice potest determinare propositum initus notitiae (si opus est manually redintegrari potest).
Hoc munus non vile - 0.11 USD per horam laboris est, ut ea diligenter utaris et cum finieris deleas.
Sit scriptor notitia ad applicationem in fonte coniungere:
Elige flumen quod imus ad coniungere ad (airline_tickets):
Deinde novam IAM Partem apponere debes ut applicatio ab rivo legere et ad rivum scribere possit. Ad hoc faciendum, satis est ne quid in accessu permissionum intercluderetur mutare;
Nunc petat de inventione schematis in flumine: hoc facere, deprime in bulla "Schema Inventionis". Quam ob rem, munus IAM renovabitur (nova creabitur) et detectio schematis e notitia quae iam in flumine pervenerat mittetur;
Nunc debes ire ad editorem SQL. Cum hanc conjunctionem deprimes, fenestra apparebit rogans te ut applicationem - selige quod vis mittere:
Sequentem simplicem interrogationem in SQL editorem fenestra inserere et deprimere, Servare et SQL Curre:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
In databases relativis, cum tabulis operaris utentes INDO constitutiones addere monumentis et propositionem selectam interrogationi datae. In Amazonibus Kinesis Data Analyticis, rivis (fluviis) et soleatus (PUMPs) laboratis — petitiones continuae insertas quae ex uno rivo in applicatione in alium rivum datae inseruntur.
SQL Quaestio supra allata quaesita est tesserarum Aeroflotensium pretio infra quinque milia rublorum. Omnes monumenta quae his conditionibus conveniunt, in DESTINATION_SQL_STREAM amnis collocabuntur.
In Destination scandalum, elige speciale_stremum amnis, et in In-applicationibus amnis nomen DESTINATION_SQL_STREAM deprimit album:
Ex quibus omnibus artificiis conficiendis aliquid simile esse debet imaginis inferius:
Creando et scribendo ad SNS topic
Vade ad Simplex Notification Service et novum locum crea in nomine Airlines:
Adice huic argumento et indica numerum telephonicum mobilem ad quem notificationes SMS mittentur;
Creare mensam in DynamoDB
Ut notitias vivas ex airline_singulis suis congregem, mensam in DynamoDB eodem nomine efficiamus. Utemur record_id sicut clavis primaria:
Munus collector creando lambda
Munus Lambda nomine Collector creemus, cuius munus erit tondendas airline_tesseras amnis et, si novae ibi inventae fuerint, has tabulas in tabulam DynamoDB inseremus. Patet, praeter iura defaltam, haec lambda legere debet accessum ad datam Kinesis rivum et accessum ad DynamoDB scribere.
Creando IAM munus pro munere collectoris lambda
Primum, novum munus IAM creare pro Lambda-TicketsProcessingRole nomine Lambda:
Ad exemplum experimentum, prae-configuratae AmazonKinesisReadOnlyAccess et AmazonDynamoDBFullAccess rationes satis idoneae sunt, ut in tabula infra ostendetur:
Haec lambda a felis a Kinesis immissa debet cum novae viscus in airline_stream intrant, ergo opus est novam felis addere:
Reliquum est ut codicem insereret et labda servaret.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Munus creando notifier lambda
Munus alterum lambda, quod alterum rivum (special_stream) monitor erit et notificationem ad SNS mittet, simili modo creatur. Haec igitur lambda accessum habere debet ut ex Kinesis legatur et nuntios mittat ad locum quemdam SNS, qui tunc per ministerium SNS mittetur omnibus huius argumenti subscribentibus (email, SMS, etc.).
Partum an IAM partes
Primum munus Lambda-KinesisAlarm pro hoc lambda creamus, et deinde hoc munus cum terror_notificatorio Lambda creato assignamus:
Haec lambda in felis pro novis monumentis operari debet ad special_stremam ingrediendi, sic felis oportet conformare eodem modo ac pro Collectore lambda.
Quo facilius hanc lambda configurare, novam environment variabilis - TOPIC_ARN introducemus, ubi ponimus ANR (Amazon Recursus Nomina) topicorum Airlines:
Et in lambda codicem inseres, omnino non complicata est:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Hoc esse videtur ubi conformatio systematis manualis perficitur. Reliquum est ut omnia probe configurata probemus et efficiamus.
Deploy ex codice Terraform
Praeparatio requiratur
Terraform commodissimum est instrumentum fontis aperti ad infrastructuram e codice explicandam. Habet suam syntaxin facilem ad discendum et multa exempla quomodo et quid explicandum. Atom editor vel Visual Studio Codicis multa plugina habilis habet quae operando cum Terraform faciliora faciunt.
Distributionem potes download hic. Accurate analysis omnium facultatum Terraformium est extra ambitum huius articuli, sic nos ad praecipua capita limitamus.
Quam incipere
Plenum codicem consilii in reposito. Repositorium nobis clone. Priusquam incipias, debes efficere te habere institutum et configuratum AWS CLI, quia... Terraform documentorum in tabella ~/.aws/credentialium exspectabit.
Bona praxis est consilium praecipere priusquam totam infrastructuram explices videre quid Terraform in nube nobis creet:
terraform.exe plan
Promptus eris ad numerum telephonicum ut notificationes mittas. Hac in scena necesse non est intrare.
Cum enucleata operatione programma, creandi facultates incipere possumus;
terraform.exe apply
Hoc mandato misso, iterum rogaberis ut numerum telephonicum ingrediaris, horologium "sic" cum quaestio de actionibus actu faciendo ostenditur. Hoc licebit tibi totam infrastructuram erigere, omnem conformationem necessariam EC2 exseque, functiones lambda explicas, etc.
Postquam facultates omnes per Terraform codicem feliciter creatae sunt, debes ire in singula applicationis analyticorum Kinesis (proh dolor, non inveni quomodo hoc facere directe e codice).
Launch applicationem;
Post hoc, expresse debes imponere nomen rivi applicationis eligendo e indice gutta-down:
Iam parata sunt omnia.
Testis application
Nihilominus quomodo systema manually vel per Terraform codicem explicuisti, eadem operabitur.
Log in via SSH ad EC2 machinam virtualem ubi Kinesis Agens inauguratur et currit scriptura api_caller.py
sudo ./api_caller.py TOKEN
Omnia tibi facienda exspectamus SMS numero tuo:
SMS - nuntius advenit in phone in 1 fere momento:
Restat videre num monumenta in DynamoDB datorum servata sint pro analysi subsequenti accuratiore. Tabulae airline_ tesseras proxime sequentes notitias continent:
conclusio,
In cursu operis facto, systema notularum online fundatur in Kinesis Amazonibus aedificata. Optiones Kinesis agentis in conjunctione cum Kinesis Data Fluviorum et reali-temporis analyticorum Kinesis Analyticorum SQL mandatorum utentes, necnon commercium Kinesis Amazonum cum aliis AWS servitiis habiti sunt.
Systema praedicta dupliciter explicavimus: unum manuale longiorem et unum velox e codice Terraformi.
Omnes project source code is available in GitHub repositioId tibi admoneo perdiscere.