Kuphatikiza kwa Aviasales API ndi Amazon Kinesis komanso kuphweka kopanda seva
Pa Habr!
Kodi mumakonda ndege zowuluka? Ndimakonda, koma panthawi yodzipatula ndidakondanso kusanthula matikiti a ndege kuchokera ku chinthu chimodzi chodziwika bwino - Aviasales.
Lero tidzasanthula ntchito ya Amazon Kinesis, kumanga makina osindikizira ndi ma analytics enieni, kukhazikitsa Amazon DynamoDB NoSQL database monga chosungira chachikulu cha deta, ndikukhazikitsa zidziwitso za SMS za matikiti osangalatsa.
Zonse zili pansi pa odulidwa! Pitani!
Mau oyamba
Mwachitsanzo, tiyenera kupeza Aviasales API. Kufikira kumaperekedwa kwaulere komanso popanda zoletsa; muyenera kungolembetsa mu gawo la "Madivelopa" kuti mulandire chizindikiro chanu cha API kuti mupeze zambiri.
Cholinga chachikulu cha nkhaniyi ndikupereka kumvetsetsa kwachidziwitso kwa kugwiritsidwa ntchito kwa chidziwitso mu AWS; timaganizira kuti deta yobwezeredwa ndi API yomwe imagwiritsidwa ntchito siili yamakono ndipo imafalitsidwa kuchokera ku cache, yomwe ndi opangidwa kutengera kusaka kwa ogwiritsa ntchito masamba a Aviasales.ru ndi Jetradar.com kwa maola 48 apitawa.
Kinesis-agent, yoyikidwa pamakina opangira, yolandiridwa kudzera pa API imangoyang'ana ndikutumiza deta kumtsinje womwe mukufuna kudzera pa Kinesis Data Analytics. Mtundu wosasinthika wa mtsinjewu ulembedwa mwachindunji kusitolo. Zosungirako zosaphika zomwe zatumizidwa ku DynamoDB zilola kusanthula kwa matikiti mwakuya kudzera mu zida za BI, monga AWS Quick Sight.
Zomangamanga zochokera ku Terraform code ndi zama automators aulesi;
Zomangamanga za dongosolo lopangidwa
Zogwiritsidwa ntchito:
Aviasales API - zomwe zabwezedwa ndi API iyi zidzagwiritsidwa ntchito pazotsatira zonse;
EC2 Producer Instance - makina okhazikika omwe ali mumtambo pomwe zolowetsamo zidzapangidwira:
Kinesis Agent ndi pulogalamu ya Java yomwe imayikidwa kwanuko pamakina omwe amapereka njira yosavuta yosonkhanitsira ndi kutumiza deta ku Kinesis (Kinesis Data Streams kapena Kinesis Firehose). Wothandizira nthawi zonse amayang'anira mndandanda wa mafayilo muzolemba zomwe zatchulidwa ndikutumiza deta yatsopano ku Kinesis;
API Caller Script - Python script yomwe imapanga zopempha ku API ndikuyika yankho mufoda yomwe imayang'aniridwa ndi Kinesis Agent;
Kinesis Data Mitsinje - ntchito yotsatsira nthawi yeniyeni yokhala ndi kuthekera kokulirapo;
Kinesis Analytics ndi ntchito yopanda seva yomwe imathandizira kusanthula kwa data yomwe ikukhamukira munthawi yeniyeni. Amazon Kinesis Data Analytics imakonza zida zogwiritsira ntchito ndikudziyesa zokha kuti zigwirizane ndi kuchuluka kwa deta yomwe ikubwera;
AWS Lambda - ntchito yomwe imakupatsani mwayi woyendetsa ma code osasunga kapena kukhazikitsa ma seva. Mphamvu zonse zamakompyuta zimasinthidwa zokha pa kuyimba kulikonse;
Amazon SNS ndi ntchito yotumizira mauthenga yomwe imayendetsedwa mokwanira kutengera chitsanzo cha osindikiza (Pub/Sub), chomwe chingagwiritsidwe ntchito kupatula ma microservices, makina ogawa, ndi mapulogalamu opanda seva. SNS itha kugwiritsidwa ntchito kutumiza zidziwitso kwa ogwiritsa ntchito kumapeto kudzera pazidziwitso zapa foni yam'manja, ma SMS ndi maimelo.
Maphunziro oyambirira
Kuti nditsanzire mayendedwe a data, ndidaganiza zogwiritsa ntchito chidziwitso cha tikiti yandege yobwezedwa ndi Aviasales API. MU zolemba mndandanda wambiri wa njira zosiyanasiyana, tiyeni titenge imodzi mwa izo - "Kalendala ya Mwezi Wamtengo Wapamwezi", yomwe imabwezera mitengo ya tsiku lililonse la mwezi, m'magulumagulu ndi chiwerengero cha kusamutsidwa. Ngati simunatchule mwezi wofufuzira mu pempho, zambiri zidzabwezedwa mwezi wotsatira womwe ulipo.
Njira yomwe ili pamwambayi yolandirira deta kuchokera ku API mwa kufotokoza chizindikiro mu pempho idzagwira ntchito, koma ndimakonda kudutsa chizindikiro chofikira pamutu, kotero tidzagwiritsa ntchito njirayi mu api_caller.py script.
Chitsanzo cha yankho la API pamwambapa chikuwonetsa tikiti yochokera ku St. Petersburg kupita ku Phuk ... O, ndimaloto otani ...
Popeza ndikuchokera ku Kazan, ndipo Phuket tsopano ndi "maloto okha", tiyeni tiyang'ane matikiti kuchokera ku St. Petersburg kupita ku Kazan.
Zimangoganiza kuti muli ndi akaunti ya AWS kale. Ndikufuna kuti nthawi yomweyo ndiwonetse chidwi chapadera kuti Kinesis ndi kutumiza zidziwitso kudzera pa SMS sizikuphatikizidwa pachaka. Gawo laulere (ntchito yaulere). Koma ngakhale izi, ndi ndalama zingapo m'malingaliro, ndizotheka kupanga dongosolo lomwe akufuna ndikusewera nalo. Ndipo, ndithudi, musaiwale kuchotsa zipangizo zonse pambuyo poti sizikufunikanso.
Mwamwayi, DynamoDb ndi ntchito za lambda zidzakhala zaulere kwa ife ngati tikwaniritsa malire athu aulere pamwezi. Mwachitsanzo, pa DynamoDB: 25 GB yosungirako, 25 WCU/RCU ndi mafunso 100 miliyoni. Ndipo ntchito ya lambda miliyoni imayimba pamwezi.
Kutumiza kwadongosolo pamanja
Kukhazikitsa Mitsinje ya Kinesis Data
Tiyeni tipite ku utumiki wa Kinesis Data Streams ndikupanga mitsinje iwiri yatsopano, shard imodzi kwa iliyonse.
Kodi shard ndi chiyani?
A shard ndiye gawo loyambira losamutsa deta la Amazon Kinesis mtsinje. Gawo limodzi limapereka kusamutsa kwa data lolowera pa liwiro la 1 MB/s ndi kusamutsa deta pa liwiro la 2 MB/s. Gawo limodzi limathandizira mpaka 1000 PUT zolowa pamphindikati. Mukamapanga mtsinje wa data, muyenera kufotokoza chiwerengero chofunikira cha zigawo. Mwachitsanzo, mutha kupanga mtsinje wa data wokhala ndi magawo awiri. Deta iyi idzapereka kutumiza kwa deta ku 2 MB / s ndi kutumiza kwa deta ku 4 MB / s, kuthandizira mpaka 2000 PUT zolemba pamphindi.
Pamene mitsinje yambiri mumtsinje wanu, imakula kwambiri. M'malo mwake, umu ndi momwe mafunde amachulukira - powonjezera shards. Koma mukakhala ndi shards zambiri, mtengo wake ndi wapamwamba kwambiri. Shard iliyonse imawononga masenti 1,5 pa ola limodzi ndi masenti 1.4 owonjezera pa miliyoni iliyonse ya PUT.
Tiyeni tipange mtsinje watsopano wokhala ndi dzina ndege_tiketi, 1 shard idzakhala yokwanira kwa iye:
Tsopano tiyeni tipange ulusi wina ndi dzina wapadera_mtsinje:
Kupanga kwa wopanga
Kusanthula ntchito, ndikokwanira kugwiritsa ntchito EC2 yokhazikika ngati wopanga deta. Sichiyenera kukhala makina amphamvu, okwera mtengo; malo t2.micro adzachita bwino.
Chofunikira chofunikira: mwachitsanzo, muyenera kugwiritsa ntchito chithunzi - Amazon Linux AMI 2018.03.0, ili ndi zoikamo zocheperako poyambitsa mwachangu Kinesis Agent.
Pitani ku ntchito ya EC2, pangani makina atsopano, sankhani AMI yomwe mukufuna ndi mtundu wa t2.micro, yomwe ikuphatikizidwa mu Free Tier:
Kuti makina opangidwa kumene azitha kulumikizana ndi ntchito ya Kinesis, iyenera kupatsidwa ufulu wochita izi. Njira yabwino yochitira izi ndikugawa Udindo wa IAM. Choncho, pa Gawo 3: Konzani Instance Tsatanetsatane chophimba, muyenera kusankha Pangani Udindo watsopano wa IAM:
Kupanga gawo la IAM la EC2
Pazenera lomwe likutsegulidwa, sankhani kuti tikupanga gawo latsopano la EC2 ndikupita kugawo la Zilolezo:
Pogwiritsa ntchito chitsanzo cha maphunziro, sitiyenera kupita ku zovuta zonse za kasinthidwe ka granular za ufulu wazinthu, kotero tidzasankha ndondomeko zomwe zinakonzedweratu ndi Amazon: AmazonKinesisFullAccess ndi CloudWatchFullAccess.
Tiyeni tipereke dzina latanthauzo la ntchitoyi, mwachitsanzo: EC2-KinesisStreams-FullAccess. Zotsatira zake ziyenera kukhala zofanana ndi zomwe zikuwonetsedwa pachithunzichi:
Mukapanga gawo latsopanoli, musaiwale kuziphatikiza ndi makina omwe adapangidwa:
Sitisintha china chilichonse pazenerali ndikupitilira mazenera otsatira.
Zokonda pa hard drive zitha kusiyidwa ngati zosasintha, komanso ma tag (ngakhale kuli bwino kugwiritsa ntchito ma tag, perekani chitsanzocho dzina ndikuwonetsa chilengedwe).
Tsopano tili pa Gawo 6: Konzani Gulu la Chitetezo tabu, pomwe muyenera kupanga latsopano kapena tchulani gulu lanu lachitetezo lomwe lilipo, lomwe limakupatsani mwayi wolumikizana kudzera pa ssh (doko 22) ku chitsanzocho. Sankhani Gwero -> IP yanga pamenepo ndipo mutha kuyambitsa chitsanzo.
Ikangosintha kukhala woyendetsa, mutha kuyesa kulumikizana nayo kudzera pa ssh.
Kuti muthe kugwira ntchito ndi Kinesis Agent, mutalumikizana bwino ndi makina, muyenera kuyika malamulo otsatirawa mu terminal:
Zolemba za api_caller.py zimapempha deta kuchokera ku Aviasales ndikusunga mayankho omwe alandilidwa m'ndandanda yomwe wothandizira wa Kinesis amasanthula. Kukhazikitsidwa kwa script iyi ndikokhazikika, pali gulu la TicketsApi, limakupatsani mwayi wokoka API. Timadutsa mutu ndi chizindikiro ndikupempha magawo ku kalasi ili:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Kuti tiyese zosintha zolondola ndi magwiridwe antchito a wothandizirayo, tiyeni tiyese api_caller.py script:
sudo ./api_caller.py TOKEN
Ndipo timayang'ana zotsatira za ntchito mu zolemba za Agent ndi pa Monitoring tabu mu airline_tickets data stream:
Tiyeni tipitirire ku gawo lapakati pa dongosolo lonse - pangani pulogalamu yatsopano mu Kinesis Data Analytics yotchedwa kinesis_analytics_airlines_app:
Kinesis Data Analytics imakupatsani mwayi wofufuza zenizeni zenizeni kuchokera ku Kinesis Streams pogwiritsa ntchito chilankhulo cha SQL. Ndi ntchito yodziyendetsa yokha (mosiyana ndi Kinesis Mitsinje) yomwe:
Kenako, muyenera kulumikiza gawo latsopano la IAM kuti pulogalamuyo iwerenge kuchokera pamtsinje ndikulembera kumtsinje. Kuti muchite izi, ndikokwanira kuti musasinthe chilichonse mu block ya zilolezo za Access:
Tsopano tiyeni tipemphe kupezeka kwa schema ya data mumtsinje; kuti muchite izi, dinani batani la "Discover schema". Zotsatira zake, gawo la IAM lidzasinthidwa (latsopano lidzapangidwa) ndipo kuzindikira kwa schema kudzakhazikitsidwa kuchokera ku deta yomwe yafika kale mumtsinje:
Lowetsani funso losavuta lotsatira pawindo la mkonzi wa SQL ndikudina Sungani ndi Kuthamanga SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
M'malo osungiramo maubale, mumagwira ntchito ndi matebulo pogwiritsa ntchito mawu a INSERT kuti muwonjezere marekodi ndi mawu osankhidwa kuti mufunse zambiri. Mu Amazon Kinesis Data Analytics, mumagwira ntchito ndi mitsinje (STREAMs) ndi mapampu (PUMPs) -zopempha zopitirizabe zomwe zimayika deta kuchokera pamtsinje umodzi mu pulogalamu mumtsinje wina.
Funso la SQL lomwe laperekedwa pamwambapa limasaka matikiti a Aeroflot pamtengo wotsika ma ruble zikwi zisanu. Zolemba zonse zomwe zikugwirizana ndi izi zidzayikidwa mumtsinje wa DESTINATION_SQL_STREAM.
Pamalo omwe Mukupita, sankhani special_stream stream, ndi mumndandanda wotsikira pansi wa dzina la In-application stream DESTINATION_SQL_STREAM:
Zotsatira zakusintha konse ziyenera kukhala zofanana ndi chithunzi chomwe chili pansipa:
Kupanga ndikulembetsa kumutu wa SNS
Pitani ku Simple Notification Service ndikupanga mutu watsopano pamenepo ndi dzina la Airlines:
Lembetsani ku mutuwu ndikuwonetsa nambala ya foni yam'manja yomwe zidziwitso za SMS zidzatumizidwa:
Pangani tebulo mu DynamoDB
Kuti tisunge zidziwitso kuchokera kumayendedwe awo a airline_tickets, tiyeni tipange tebulo mu DynamoDB lomwe lili ndi dzina lomwelo. Tigwiritsa ntchito record_id ngati kiyi yoyamba:
Kupanga chosonkhanitsa cha lambda
Tiyeni tipange ntchito ya lambda yotchedwa Collector, yomwe ntchito yake idzakhala kufufuza ndege_tickets mtsinje ndipo, ngati zolemba zatsopano zapezeka pamenepo, ikani zolembazi mu tebulo la DynamoDB. Mwachiwonekere, kuwonjezera pa maufulu osasinthika, lambda iyi iyenera kukhala ndi mwayi wowerengera kumtsinje wa data wa Kinesis ndikulemba mwayi wopita ku DynamoDB.
Kupanga gawo la IAM pa ntchito yosonkhanitsa lambda
Choyamba, tiyeni tipange gawo latsopano la IAM la lambda yotchedwa Lambda-TicketsProcessingRole:
Kwa chitsanzo choyesera, ndondomeko zokonzedweratu za AmazonKinesisReadOnlyAccess ndi AmazonDynamoDBFullAccess ndizoyenera, monga momwe chithunzi chili pansipa:
Lambda iyi iyenera kukhazikitsidwa ndi choyambitsa kuchokera ku Kinesis pomwe zolembera zatsopano zimalowa airline_stream, chifukwa chake tiyenera kuwonjezera choyambitsa chatsopano:
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Kupanga chidziwitso cha ntchito ya lambda
Ntchito yachiwiri ya lambda, yomwe idzayang'anire mtsinje wachiwiri (special_stream) ndikutumiza chidziwitso ku SNS, imapangidwa mofananamo. Chifukwa chake, lambda iyi iyenera kukhala ndi mwayi wowerenga kuchokera ku Kinesis ndikutumiza mauthenga kumutu womwe wapatsidwa wa SNS, womwe udzatumizidwa ndi ntchito ya SNS kwa onse olembetsa mutuwu (imelo, SMS, ndi zina).
Kupanga gawo la IAM
Choyamba, timapanga gawo la IAM Lambda-KinesisAlarm pa lambda iyi, ndikugawa gawoli ku alarm_notifier lambda yomwe ikupangidwa:
Lambda iyi iyenera kugwira ntchito poyambitsa zolemba zatsopano kuti zilowe mu special_stream, kotero muyenera kukonza choyambitsacho mofanana ndi momwe tinachitira kwa Collector lambda.
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Zikuwoneka kuti apa ndi pamene kasinthidwe kachitidwe kamanja kamalizidwe. Chotsalira ndikuyesa ndikuwonetsetsa kuti takonza zonse molondola.
Ikani kuchokera ku Terraform code
Kukonzekera kofunikira
Terraform ndi chida chosavuta chotsegulira poyika zida zopangira ma code. Ili ndi mawu akeake omwe ndi osavuta kuphunzira ndipo ali ndi zitsanzo zambiri za momwe angagwiritsire ntchito komanso zomwe angatumize. The Atom editor kapena Visual Studio Code ili ndi mapulagini ambiri othandiza omwe amapangitsa kuti Terraform ikhale yosavuta.
Mukhoza kukopera kugawa kuchokera pano. Kusanthula mwatsatanetsatane za kuthekera konse kwa Terraform sikungakwaniritsidwe ndi nkhaniyi, chifukwa chake tikhala ndi mfundo zazikuluzikulu.
Momwe mungayambire
Khodi yonse ya polojekitiyi ndi m'nkhokwe yanga. Timadzipangira tokha posungira. Musanayambe, muyenera kuwonetsetsa kuti mwakhazikitsa ndi kukonza AWS CLI, chifukwa ... Terraform idzayang'ana zotsimikizira mu ~/.aws/credentials file.
Zimatsalira kuti tiwone ngati zolembazo zidasungidwa mu database ya DynamoDB kuti mufufuze mwatsatanetsatane. Gulu la airline_tickets lili ndi pafupifupi data iyi:
Pomaliza
M'kati mwa ntchito yomwe idachitika, makina opangira ma data pa intaneti adamangidwa kutengera Amazon Kinesis. Zosankha zogwiritsira ntchito Kinesis Agent mogwirizana ndi Kinesis Data Streams ndi nthawi yeniyeni analytics Kinesis Analytics pogwiritsa ntchito malamulo a SQL, komanso kuyanjana kwa Amazon Kinesis ndi ntchito zina za AWS zinaganiziridwa.
Tidayika dongosolo lomwe lili pamwambapa m'njira ziwiri: buku lalitali komanso lachangu kuchokera ku code ya Terraform.