ప్రోహోస్టర్ > బ్లాగ్ > పరిపాలన > Amazon Kinesis మరియు సర్వర్లెస్ సింప్లిసిటీతో Aviasales API ఇంటిగ్రేషన్
Amazon Kinesis మరియు సర్వర్లెస్ సింప్లిసిటీతో Aviasales API ఇంటిగ్రేషన్
హే హబ్ర్!
మీకు ఎగిరే విమానాలు ఇష్టమా? నేను దీన్ని ఇష్టపడుతున్నాను, కానీ స్వీయ-ఒంటరిగా ఉన్న సమయంలో నేను కూడా ఒక ప్రసిద్ధ వనరు - Aviasales నుండి విమాన టిక్కెట్ల డేటాను విశ్లేషించడంలో ప్రేమలో పడ్డాను.
ఈ రోజు మనం Amazon Kinesis యొక్క పనిని విశ్లేషిస్తాము, నిజ-సమయ విశ్లేషణలతో స్ట్రీమింగ్ సిస్టమ్ను రూపొందిస్తాము, Amazon DynamoDB NoSQL డేటాబేస్ను ప్రధాన డేటా నిల్వగా ఇన్స్టాల్ చేస్తాము మరియు ఆసక్తికరమైన టిక్కెట్ల కోసం SMS నోటిఫికేషన్లను సెటప్ చేస్తాము.
అన్ని వివరాలు కట్ కింద ఉన్నాయి! వెళ్ళండి!
పరిచయం
ఉదాహరణకు, మనకు యాక్సెస్ అవసరం Aviasales API. దీనికి యాక్సెస్ ఉచితంగా మరియు పరిమితులు లేకుండా అందించబడుతుంది; డేటాను యాక్సెస్ చేయడానికి మీ API టోకెన్ను స్వీకరించడానికి మీరు “డెవలపర్లు” విభాగంలో నమోదు చేసుకోవాలి.
ఈ కథనం యొక్క ముఖ్య ఉద్దేశ్యం AWSలో సమాచార ప్రసారం యొక్క ఉపయోగం గురించి సాధారణ అవగాహన కల్పించడం; ఉపయోగించిన API ద్వారా అందించబడిన డేటా ఖచ్చితంగా తాజాగా ఉండదని మరియు కాష్ నుండి ప్రసారం చేయబడుతుందని మేము పరిగణనలోకి తీసుకుంటాము. Aviasales.ru మరియు Jetradar.com సైట్ల వినియోగదారులు గత 48 గంటలపాటు చేసిన శోధనల ఆధారంగా రూపొందించబడింది.
ఉత్పత్తి చేసే మెషీన్లో ఇన్స్టాల్ చేయబడిన కైనెసిస్-ఏజెంట్, API ద్వారా స్వీకరించబడినది స్వయంచాలకంగా అన్వయిస్తుంది మరియు కైనెసిస్ డేటా అనలిటిక్స్ ద్వారా కావలసిన స్ట్రీమ్కు డేటాను ప్రసారం చేస్తుంది. ఈ స్ట్రీమ్ యొక్క ముడి వెర్షన్ నేరుగా స్టోర్కు వ్రాయబడుతుంది. DynamoDBలో అమలు చేయబడిన ముడి డేటా నిల్వ AWS క్విక్ సైట్ వంటి BI సాధనాల ద్వారా లోతైన టిక్కెట్ విశ్లేషణను అనుమతిస్తుంది.
మొత్తం మౌలిక సదుపాయాలను అమలు చేయడానికి మేము రెండు ఎంపికలను పరిశీలిస్తాము:
మాన్యువల్ - AWS మేనేజ్మెంట్ కన్సోల్ ద్వారా;
టెర్రాఫార్మ్ కోడ్ నుండి మౌలిక సదుపాయాలు సోమరి ఆటోమేటర్ల కోసం;
అభివృద్ధి చెందిన వ్యవస్థ యొక్క ఆర్కిటెక్చర్
ఉపయోగించిన భాగాలు:
Aviasales API — ఈ API ద్వారా అందించబడిన డేటా అన్ని తదుపరి పని కోసం ఉపయోగించబడుతుంది;
EC2 ప్రొడ్యూసర్ ఉదాహరణ — ఇన్పుట్ డేటా స్ట్రీమ్ రూపొందించబడే క్లౌడ్లోని సాధారణ వర్చువల్ మెషీన్:
కినిసిస్ ఏజెంట్ మెషీన్లో స్థానికంగా ఇన్స్టాల్ చేయబడిన జావా అప్లికేషన్, ఇది కైనెసిస్ (కైనెసిస్ డేటా స్ట్రీమ్లు లేదా కైనెసిస్ ఫైర్హోస్)కి డేటాను సేకరించడానికి మరియు పంపడానికి సులభమైన మార్గాన్ని అందిస్తుంది. ఏజెంట్ పేర్కొన్న డైరెక్టరీలలోని ఫైల్ల సమితిని నిరంతరం పర్యవేక్షిస్తుంది మరియు కినిసిస్కి కొత్త డేటాను పంపుతుంది;
API కాలర్ స్క్రిప్ట్ — APIకి అభ్యర్థనలు చేసే పైథాన్ స్క్రిప్ట్ మరియు ప్రతిస్పందనను కైనెసిస్ ఏజెంట్ పర్యవేక్షించే ఫోల్డర్లో ఉంచుతుంది;
కైనెసిస్ అనలిటిక్స్ నిజ సమయంలో స్ట్రీమింగ్ డేటా విశ్లేషణను సులభతరం చేసే సర్వర్లెస్ సేవ. Amazon Kinesis డేటా అనలిటిక్స్ అప్లికేషన్ వనరులను కాన్ఫిగర్ చేస్తుంది మరియు ఇన్కమింగ్ డేటా యొక్క ఏదైనా వాల్యూమ్ను నిర్వహించడానికి స్వయంచాలకంగా స్కేల్ చేస్తుంది;
AWS లాంబ్డా — బ్యాకప్ చేయకుండా లేదా సర్వర్లను సెటప్ చేయకుండా కోడ్ని అమలు చేయడానికి మిమ్మల్ని అనుమతించే సేవ. ప్రతి కాల్కు కంప్యూటింగ్ శక్తి మొత్తం స్వయంచాలకంగా స్కేల్ చేయబడుతుంది;
అమెజాన్ డైనమోడిబి - ఏదైనా స్కేల్లో నడుస్తున్నప్పుడు 10 మిల్లీసెకన్ల కంటే తక్కువ జాప్యాన్ని అందించే కీలక-విలువ జతల మరియు పత్రాల డేటాబేస్. DynamoDBని ఉపయోగిస్తున్నప్పుడు, మీరు ఏదైనా సర్వర్లను అందించడం, ప్యాచ్ చేయడం లేదా నిర్వహించాల్సిన అవసరం లేదు. అందుబాటులో ఉన్న వనరుల మొత్తాన్ని సర్దుబాటు చేయడానికి మరియు అధిక పనితీరును నిర్వహించడానికి DynamoDB స్వయంచాలకంగా పట్టికలను స్కేల్ చేస్తుంది. సిస్టమ్ నిర్వహణ అవసరం లేదు;
అమెజాన్ SNS - పబ్లిషర్-సబ్స్క్రైబర్ (పబ్/సబ్) మోడల్ని ఉపయోగించి సందేశాలను పంపడానికి పూర్తిగా నిర్వహించబడే సేవ, దీనితో మీరు మైక్రోసర్వీస్లు, డిస్ట్రిబ్యూట్ సిస్టమ్లు మరియు సర్వర్లెస్ అప్లికేషన్లను వేరు చేయవచ్చు. మొబైల్ పుష్ నోటిఫికేషన్లు, SMS సందేశాలు మరియు ఇమెయిల్ల ద్వారా తుది వినియోగదారులకు సమాచారాన్ని పంపడానికి SNS ఉపయోగించవచ్చు.
ప్రారంభ శిక్షణ
డేటా ఫ్లోను అనుకరించడానికి, Aviasales API ద్వారా అందించబడిన ఎయిర్లైన్ టిక్కెట్ సమాచారాన్ని ఉపయోగించాలని నేను నిర్ణయించుకున్నాను. IN డాక్యుమెంటేషన్ విభిన్న పద్ధతుల యొక్క చాలా విస్తృతమైన జాబితా, వాటిలో ఒకదానిని తీసుకుందాం - "నెలవారీ ధర క్యాలెండర్", ఇది బదిలీల సంఖ్య ద్వారా సమూహం చేయబడిన నెలలోని ప్రతి రోజు ధరలను అందిస్తుంది. మీరు అభ్యర్థనలో శోధన నెలను పేర్కొనకపోతే, ప్రస్తుత నెల తర్వాతి నెలకు సమాచారం అందించబడుతుంది.
అభ్యర్థనలో టోకెన్ను పేర్కొనడం ద్వారా API నుండి డేటాను స్వీకరించే ఎగువ పద్ధతి పని చేస్తుంది, కానీ నేను యాక్సెస్ టోకెన్ను హెడర్ ద్వారా పాస్ చేయాలనుకుంటున్నాను, కాబట్టి మేము ఈ పద్ధతిని api_caller.py స్క్రిప్ట్లో ఉపయోగిస్తాము.
ఎగువ ఉదాహరణ API ప్రతిస్పందన సెయింట్ పీటర్స్బర్గ్ నుండి ఫుక్కి టిక్కెట్ని చూపుతుంది... ఓహ్, వాట్ ఎ డ్రీమ్...
నేను కజాన్ నుండి వచ్చాను మరియు ఫుకెట్ ఇప్పుడు "ఒక కల మాత్రమే" కాబట్టి, సెయింట్ పీటర్స్బర్గ్ నుండి కజాన్కు టిక్కెట్ల కోసం చూద్దాం.
మీకు ఇప్పటికే AWS ఖాతా ఉందని ఇది ఊహిస్తుంది. కైనెసిస్ మరియు SMS ద్వారా నోటిఫికేషన్లు పంపడం వార్షికంలో చేర్చబడలేదని నేను వెంటనే ప్రత్యేక దృష్టిని ఆకర్షించాలనుకుంటున్నాను ఉచిత టైర్ (ఉచిత ఉపయోగం). అయితే ఇది ఉన్నప్పటికీ, కొన్ని డాలర్లను దృష్టిలో ఉంచుకుని, ప్రతిపాదిత వ్యవస్థను నిర్మించడం మరియు దానితో ఆడుకోవడం చాలా సాధ్యమే. మరియు, వాస్తవానికి, అన్ని వనరులు అవసరం లేని తర్వాత వాటిని తొలగించడం మర్చిపోవద్దు.
అదృష్టవశాత్తూ, మేము మా నెలవారీ ఉచిత పరిమితులకు అనుగుణంగా ఉంటే, DynamoDb మరియు లాంబ్డా ఫంక్షన్లు మాకు ఉచితం. ఉదాహరణకు, DynamoDB కోసం: 25 GB నిల్వ, 25 WCU/RCU మరియు 100 మిలియన్ ప్రశ్నలు. మరియు నెలకు ఒక మిలియన్ లాంబ్డా ఫంక్షన్ కాల్లు.
మాన్యువల్ సిస్టమ్ విస్తరణ
కైనెసిస్ డేటా స్ట్రీమ్లను సెటప్ చేస్తోంది
కైనెసిస్ డేటా స్ట్రీమ్స్ సర్వీస్కి వెళ్లి, రెండు కొత్త స్ట్రీమ్లను క్రియేట్ చేద్దాం, ఒక్కో దానికి ఒక షార్డ్.
ఒక ముక్క ఏమిటి?
షార్డ్ అనేది అమెజాన్ కినిసిస్ స్ట్రీమ్ యొక్క ప్రాథమిక డేటా బదిలీ యూనిట్. ఒక సెగ్మెంట్ 1 MB/s వేగంతో ఇన్పుట్ డేటా బదిలీని మరియు 2 MB/s వేగంతో అవుట్పుట్ డేటా బదిలీని అందిస్తుంది. ఒక సెగ్మెంట్ సెకనుకు 1000 PUT ఎంట్రీలకు మద్దతు ఇస్తుంది. డేటా స్ట్రీమ్ను సృష్టించేటప్పుడు, మీరు అవసరమైన విభాగాల సంఖ్యను పేర్కొనాలి. ఉదాహరణకు, మీరు రెండు విభాగాలతో డేటా స్ట్రీమ్ను సృష్టించవచ్చు. ఈ డేటా స్ట్రీమ్ 2 MB/s వద్ద ఇన్పుట్ డేటా బదిలీని మరియు 4 MB/s వద్ద అవుట్పుట్ డేటా బదిలీని అందిస్తుంది, సెకనుకు 2000 PUT రికార్డ్లకు మద్దతు ఇస్తుంది.
మీ స్ట్రీమ్లో ఎక్కువ ముక్కలు, దాని నిర్గమాంశం ఎక్కువ. సూత్రప్రాయంగా, ఈ విధంగా ప్రవాహాలు స్కేల్ చేయబడతాయి - ముక్కలు జోడించడం ద్వారా. కానీ మీ వద్ద ఎక్కువ ముక్కలు ఉంటే, ధర ఎక్కువ. ప్రతి షార్డ్ గంటకు 1,5 సెంట్లు మరియు ప్రతి మిలియన్ PUT పేలోడ్ యూనిట్లకు అదనంగా 1.4 సెంట్లు ఖర్చవుతుంది.
పేరుతో కొత్త స్ట్రీమ్ని క్రియేట్ చేద్దాం ఎయిర్లైన్_టికెట్లు, అతనికి 1 ముక్క సరిపోతుంది:
ఇప్పుడు పేరుతో మరో థ్రెడ్ని క్రియేట్ చేద్దాం ప్రత్యేక_స్ట్రీమ్:
నిర్మాత సెటప్
టాస్క్ను విశ్లేషించడానికి, డేటా ప్రొడ్యూసర్గా సాధారణ EC2 ఉదాహరణను ఉపయోగించడం సరిపోతుంది. ఇది శక్తివంతమైన, ఖరీదైన వర్చువల్ మెషీన్ కానవసరం లేదు; స్పాట్ t2.micro బాగా పని చేస్తుంది.
ముఖ్యమైన గమనిక: ఉదాహరణకు, మీరు ఇమేజ్ని ఉపయోగించాలి - Amazon Linux AMI 2018.03.0, Kinesis ఏజెంట్ను త్వరగా ప్రారంభించడం కోసం ఇది తక్కువ సెట్టింగ్లను కలిగి ఉంది.
EC2 సేవకు వెళ్లండి, కొత్త వర్చువల్ మిషన్ను సృష్టించండి, ఉచిత టైర్లో చేర్చబడిన t2.micro రకంతో కావలసిన AMIని ఎంచుకోండి:
కొత్తగా సృష్టించబడిన వర్చువల్ మెషీన్ కైనెసిస్ సేవతో పరస్పర చర్య చేయగలగడానికి, దానికి హక్కులు ఇవ్వాలి. దీన్ని చేయడానికి ఉత్తమ మార్గం IAM పాత్రను కేటాయించడం. కాబట్టి, దశ 3: కాన్ఫిగర్ ఇన్స్టాన్స్ వివరాల స్క్రీన్లో, మీరు ఎంచుకోవాలి కొత్త IAM పాత్రను సృష్టించండి:
EC2 కోసం IAM పాత్రను సృష్టిస్తోంది
తెరుచుకునే విండోలో, మేము EC2 కోసం కొత్త పాత్రను సృష్టిస్తున్నామని ఎంచుకుని, అనుమతుల విభాగానికి వెళ్లండి:
శిక్షణ ఉదాహరణను ఉపయోగించి, మేము వనరుల హక్కుల యొక్క గ్రాన్యులర్ కాన్ఫిగరేషన్ యొక్క అన్ని చిక్కులలోకి వెళ్లవలసిన అవసరం లేదు, కాబట్టి మేము Amazon ద్వారా ముందే కాన్ఫిగర్ చేసిన విధానాలను ఎంచుకుంటాము: AmazonKinesisFullAccess మరియు CloudWatchFullAccess.
ఈ పాత్రకు కొంత అర్థవంతమైన పేరును ఇద్దాం, ఉదాహరణకు: EC2-KinesisStreams-FullAccess. ఫలితం క్రింది చిత్రంలో చూపిన విధంగానే ఉండాలి:
ఈ కొత్త పాత్రను సృష్టించిన తర్వాత, సృష్టించిన వర్చువల్ మెషీన్ ఉదాహరణకి దానిని జోడించడం మర్చిపోవద్దు:
మేము ఈ స్క్రీన్పై వేటినీ మార్చము మరియు తదుపరి విండోస్కి వెళ్లము.
హార్డ్ డ్రైవ్ సెట్టింగ్లు అలాగే ట్యాగ్లను డిఫాల్ట్గా వదిలివేయవచ్చు (ట్యాగ్లను ఉపయోగించడం మంచి పద్ధతి అయినప్పటికీ, కనీసం ఉదాహరణకి పేరు పెట్టండి మరియు పర్యావరణాన్ని సూచించండి).
ఇప్పుడు మేము దశ 6లో ఉన్నాము: భద్రతా సమూహాన్ని కాన్ఫిగర్ చేయండి, ఇక్కడ మీరు క్రొత్తదాన్ని సృష్టించాలి లేదా ఇప్పటికే ఉన్న మీ భద్రతా సమూహాన్ని పేర్కొనాలి, ఇది ssh (పోర్ట్ 22) ద్వారా ఉదాహరణకి కనెక్ట్ చేయడానికి మిమ్మల్ని అనుమతిస్తుంది. అక్కడ మూలం -> నా IP ఎంచుకోండి మరియు మీరు ఉదాహరణను ప్రారంభించవచ్చు.
ఇది నడుస్తున్న స్థితికి మారిన వెంటనే, మీరు ssh ద్వారా దానికి కనెక్ట్ చేయడానికి ప్రయత్నించవచ్చు.
కైనెసిస్ ఏజెంట్తో పని చేయడానికి, మెషీన్కు విజయవంతంగా కనెక్ట్ అయిన తర్వాత, మీరు తప్పనిసరిగా టెర్మినల్లో కింది ఆదేశాలను నమోదు చేయాలి:
కాన్ఫిగరేషన్ ఫైల్ నుండి చూడగలిగినట్లుగా, ఏజెంట్ /var/log/airline_tickets/ డైరెక్టరీలో .log పొడిగింపుతో ఫైల్లను పర్యవేక్షిస్తుంది, వాటిని అన్వయించి మరియు airline_tickets స్ట్రీమ్కు బదిలీ చేస్తుంది.
మేము సేవను పునఃప్రారంభించి, అది అప్ మరియు రన్ అవుతుందని నిర్ధారించుకోండి:
sudo service aws-kinesis-agent restart
ఇప్పుడు API నుండి డేటాను అభ్యర్థించే పైథాన్ స్క్రిప్ట్ని డౌన్లోడ్ చేద్దాం:
Api_caller.py స్క్రిప్ట్ Aviasales నుండి డేటాను అభ్యర్థిస్తుంది మరియు Kinesis ఏజెంట్ స్కాన్ చేసే డైరెక్టరీలో అందుకున్న ప్రతిస్పందనను సేవ్ చేస్తుంది. ఈ స్క్రిప్ట్ యొక్క అమలు చాలా ప్రామాణికమైనది, TicketsApi తరగతి ఉంది, ఇది APIని అసమకాలికంగా లాగడానికి మిమ్మల్ని అనుమతిస్తుంది. మేము ఈ తరగతికి టోకెన్ మరియు అభ్యర్థన పారామితులతో హెడర్ను పంపుతాము:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
ఏజెంట్ యొక్క సరైన సెట్టింగ్లు మరియు కార్యాచరణను పరీక్షించడానికి, api_caller.py స్క్రిప్ట్ని పరీక్షిద్దాం:
sudo ./api_caller.py TOKEN
మరియు మేము ఏజెంట్ లాగ్లలో మరియు airline_tickets డేటా స్ట్రీమ్లోని మానిటరింగ్ ట్యాబ్లో పని ఫలితాన్ని పరిశీలిస్తాము:
మీరు చూడగలిగినట్లుగా, ప్రతిదీ పని చేస్తుంది మరియు కైనెసిస్ ఏజెంట్ విజయవంతంగా డేటాను స్ట్రీమ్కు పంపుతుంది. ఇప్పుడు వినియోగదారుని కాన్ఫిగర్ చేద్దాం.
కైనెసిస్ డేటా అనలిటిక్స్ని సెటప్ చేస్తోంది
మొత్తం సిస్టమ్ యొక్క కేంద్ర భాగానికి వెళ్దాం - Kinesis Data Analyticsలో kinesis_analytics_airlines_app పేరుతో కొత్త అప్లికేషన్ను సృష్టించండి:
కైనెసిస్ డేటా అనలిటిక్స్ SQL భాషను ఉపయోగించి కైనెసిస్ స్ట్రీమ్ల నుండి నిజ-సమయ డేటా విశ్లేషణలను నిర్వహించడానికి మిమ్మల్ని అనుమతిస్తుంది. ఇది పూర్తిగా ఆటోస్కేలింగ్ సేవ (కైనెసిస్ స్ట్రీమ్ల వలె కాకుండా):
సోర్స్ డేటాకు అభ్యర్థనల ఆధారంగా కొత్త స్ట్రీమ్లను (అవుట్పుట్ స్ట్రీమ్) సృష్టించడానికి మిమ్మల్ని అనుమతిస్తుంది;
అప్లికేషన్లు నడుస్తున్నప్పుడు సంభవించిన లోపాలతో స్ట్రీమ్ను అందిస్తుంది (ఎర్రర్ స్ట్రీమ్);
ఇన్పుట్ డేటా స్కీమ్ను స్వయంచాలకంగా నిర్ణయించవచ్చు (అవసరమైతే అది మానవీయంగా పునర్నిర్వచించబడుతుంది).
ఇది చౌకైన సేవ కాదు - గంటకు 0.11 USD, కాబట్టి మీరు దీన్ని జాగ్రత్తగా ఉపయోగించాలి మరియు మీరు పూర్తి చేసినప్పుడు దాన్ని తొలగించాలి.
అప్లికేషన్ని డేటా సోర్స్కి కనెక్ట్ చేద్దాం:
మేము కనెక్ట్ చేయబోయే స్ట్రీమ్ను ఎంచుకోండి (airline_tickets):
తర్వాత, మీరు కొత్త IAM పాత్రను జోడించాలి, తద్వారా అప్లికేషన్ స్ట్రీమ్ నుండి చదవగలదు మరియు స్ట్రీమ్కు వ్రాయగలదు. దీన్ని చేయడానికి, యాక్సెస్ అనుమతుల బ్లాక్లో దేనినీ మార్చకుండా ఉంటే సరిపోతుంది:
ఇప్పుడు స్ట్రీమ్లోని డేటా స్కీమాను కనుగొనమని అభ్యర్థిద్దాము; దీన్ని చేయడానికి, "డిస్కవర్ స్కీమా" బటన్పై క్లిక్ చేయండి. ఫలితంగా, IAM పాత్ర నవీకరించబడుతుంది (కొత్తది సృష్టించబడుతుంది) మరియు స్ట్రీమ్లో ఇప్పటికే వచ్చిన డేటా నుండి స్కీమా డిటెక్షన్ ప్రారంభించబడుతుంది:
ఇప్పుడు మీరు SQL ఎడిటర్కి వెళ్లాలి. మీరు ఈ బటన్పై క్లిక్ చేసినప్పుడు, అప్లికేషన్ను ప్రారంభించమని అడుగుతున్న విండో కనిపిస్తుంది - మీరు ప్రారంభించాలనుకుంటున్న దాన్ని ఎంచుకోండి:
SQL ఎడిటర్ విండోలో క్రింది సాధారణ ప్రశ్నను చొప్పించండి మరియు SQLని సేవ్ చేసి రన్ చేయి క్లిక్ చేయండి:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
రిలేషనల్ డేటాబేస్లలో, మీరు రికార్డ్లను జోడించడానికి ఇన్సర్ట్ స్టేట్మెంట్లను మరియు డేటాను ప్రశ్నించడానికి SELECT స్టేట్మెంట్ను ఉపయోగించి టేబుల్లతో పని చేస్తారు. Amazon Kinesis డేటా అనలిటిక్స్లో, మీరు స్ట్రీమ్లు (స్ట్రీమ్లు) మరియు పంప్లు (PUMPలు)తో పని చేస్తారు-ఒక అప్లికేషన్లోని ఒక స్ట్రీమ్ నుండి డేటాను మరొక స్ట్రీమ్లోకి చొప్పించే నిరంతర ఇన్సర్ట్ అభ్యర్థనలు.
పైన అందించిన SQL ప్రశ్న ఏరోఫ్లాట్ టిక్కెట్ల కోసం ఐదు వేల రూబిళ్లు కంటే తక్కువ ధరతో శోధనలు చేస్తుంది. ఈ షరతులకు అనుగుణంగా ఉన్న అన్ని రికార్డులు DESTINATION_SQL_STREAM స్ట్రీమ్లో ఉంచబడతాయి.
డెస్టినేషన్ బ్లాక్లో, ప్రత్యేక_స్ట్రీమ్ స్ట్రీమ్ను ఎంచుకోండి మరియు అప్లికేషన్లో స్ట్రీమ్ పేరు DESTINATION_SQL_STREAM డ్రాప్-డౌన్ జాబితాలో:
అన్ని అవకతవకల ఫలితం క్రింది చిత్రాన్ని పోలి ఉండాలి:
SNS టాపిక్ని సృష్టించడం మరియు చందా చేయడం
సింపుల్ నోటిఫికేషన్ సర్వీస్కి వెళ్లి, ఎయిర్లైన్స్ పేరుతో కొత్త టాపిక్ని సృష్టించండి:
ఈ అంశానికి సభ్యత్వాన్ని పొందండి మరియు SMS నోటిఫికేషన్లు పంపబడే మొబైల్ ఫోన్ నంబర్ను సూచించండి:
DynamoDBలో పట్టికను సృష్టించండి
వారి airline_tickets స్ట్రీమ్ నుండి ముడి డేటాను నిల్వ చేయడానికి, DynamoDBలో అదే పేరుతో పట్టికను క్రియేట్ చేద్దాం. మేము record_idని ప్రాథమిక కీగా ఉపయోగిస్తాము:
లాంబ్డా ఫంక్షన్ కలెక్టర్ని సృష్టిస్తోంది
కలెక్టర్ అనే లాంబ్డా ఫంక్షన్ని క్రియేట్ చేద్దాం, దీని పని ఎయిర్లైన్_టికెట్ల స్ట్రీమ్ను పోల్ చేయడం మరియు అక్కడ కొత్త రికార్డులు కనుగొనబడితే, ఈ రికార్డ్లను DynamoDB టేబుల్లోకి చొప్పించండి. సహజంగానే, డిఫాల్ట్ హక్కులతో పాటు, ఈ లాంబ్డా కినిసిస్ డేటా స్ట్రీమ్కి రీడ్ యాక్సెస్ మరియు డైనమోడిబికి రైట్ యాక్సెస్ కలిగి ఉండాలి.
కలెక్టర్ లాంబ్డా ఫంక్షన్ కోసం IAM పాత్రను సృష్టిస్తోంది
ముందుగా, Lambda-TicketsProcessingRole పేరుతో లాంబ్డా కోసం కొత్త IAM పాత్రను సృష్టిద్దాం:
పరీక్ష ఉదాహరణ కోసం, దిగువ చిత్రంలో చూపిన విధంగా ముందే కాన్ఫిగర్ చేయబడిన AmazonKinesisReadOnlyAccess మరియు AmazonDynamoDBFullAccess విధానాలు చాలా అనుకూలంగా ఉంటాయి:
ఎయిర్లైన్_స్ట్రీమ్లోకి కొత్త ఎంట్రీలు ప్రవేశించినప్పుడు ఈ లాంబ్డా కైనెసిస్ నుండి ట్రిగ్గర్ ద్వారా ప్రారంభించబడాలి, కాబట్టి మనం కొత్త ట్రిగ్గర్ను జోడించాలి:
కోడ్ను చొప్పించడం మరియు లాంబ్డాను సేవ్ చేయడం మాత్రమే మిగిలి ఉంది.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
లాంబ్డా ఫంక్షన్ నోటిఫైయర్ని సృష్టిస్తోంది
రెండవ స్ట్రీమ్ (special_stream)ని పర్యవేక్షిస్తుంది మరియు SNSకి నోటిఫికేషన్ పంపే రెండవ లాంబ్డా ఫంక్షన్ ఇదే విధంగా సృష్టించబడుతుంది. అందువల్ల, ఈ లాంబ్డా తప్పనిసరిగా కినిసిస్ నుండి చదవడానికి మరియు ఇచ్చిన SNS టాపిక్కి సందేశాలను పంపడానికి ప్రాప్యతను కలిగి ఉండాలి, ఇది SNS సేవ ద్వారా ఈ అంశం యొక్క చందాదారులందరికీ (ఇమెయిల్, SMS, మొదలైనవి) పంపబడుతుంది.
IAM పాత్రను సృష్టిస్తోంది
ముందుగా, మేము ఈ లాంబ్డా కోసం IAM రోల్ Lambda-KinesisAlarmని సృష్టిస్తాము, ఆపై ఈ పాత్రను సృష్టించబడుతున్న alarm_notifier lambdaకి కేటాయిస్తాము:
ఈ లాంబ్డా ప్రత్యేక_స్ట్రీమ్లోకి ప్రవేశించడానికి కొత్త రికార్డ్ల కోసం ట్రిగ్గర్పై పని చేయాలి, కాబట్టి మీరు కలెక్టర్ లాంబ్డా కోసం మేము చేసిన విధంగానే ట్రిగ్గర్ను కాన్ఫిగర్ చేయాలి.
ఈ లాంబ్డాను కాన్ఫిగర్ చేయడాన్ని సులభతరం చేయడానికి, కొత్త ఎన్విరాన్మెంట్ వేరియబుల్ని పరిచయం చేద్దాం - TOPIC_ARN, ఇక్కడ మేము ఎయిర్లైన్స్ టాపిక్ యొక్క ANR (అమెజాన్ రికోర్స్ పేర్లు) ఉంచుతాము:
మరియు లాంబ్డా కోడ్ను చొప్పించండి, ఇది సంక్లిష్టంగా లేదు:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
ఇక్కడే మాన్యువల్ సిస్టమ్ కాన్ఫిగరేషన్ పూర్తయినట్లు తెలుస్తోంది. మేము ప్రతిదీ సరిగ్గా కాన్ఫిగర్ చేసామని పరీక్షించడం మరియు నిర్ధారించుకోవడం మాత్రమే మిగిలి ఉంది.
టెర్రాఫార్మ్ కోడ్ నుండి అమలు చేయండి
అవసరమైన తయారీ
Terraform కోడ్ నుండి ఇన్ఫ్రాస్ట్రక్చర్ని అమలు చేయడానికి చాలా అనుకూలమైన ఓపెన్ సోర్స్ సాధనం. ఇది దాని స్వంత వాక్యనిర్మాణాన్ని కలిగి ఉంది, ఇది నేర్చుకోవడం సులభం మరియు ఎలా మరియు ఏమి అమలు చేయాలి అనేదానికి అనేక ఉదాహరణలు ఉన్నాయి. Atom ఎడిటర్ లేదా విజువల్ స్టూడియో కోడ్ టెర్రాఫార్మ్తో పని చేయడాన్ని సులభతరం చేసే అనేక సులభ ప్లగిన్లను కలిగి ఉంది.
మీరు పంపిణీని డౌన్లోడ్ చేసుకోవచ్చు ఇక్కడ నుండి. అన్ని టెర్రాఫార్మ్ సామర్థ్యాల యొక్క వివరణాత్మక విశ్లేషణ ఈ కథనం యొక్క పరిధికి మించినది, కాబట్టి మేము ప్రధాన అంశాలకు మమ్మల్ని పరిమితం చేస్తాము.
ఎలా ప్రారంభించాలి
ప్రాజెక్ట్ యొక్క పూర్తి కోడ్ నా రిపోజిటరీలో. మేము రిపోజిటరీని మనమే క్లోన్ చేస్తాము. ప్రారంభించడానికి ముందు, మీరు AWS CLIని ఇన్స్టాల్ చేసి కాన్ఫిగర్ చేశారని నిర్ధారించుకోవాలి, ఎందుకంటే... Terraform ~/.aws/credentials ఫైల్లో ఆధారాల కోసం చూస్తుంది.
క్లౌడ్లో టెర్రాఫార్మ్ ప్రస్తుతం మన కోసం ఏమి సృష్టిస్తోందో చూడటానికి మొత్తం అవస్థాపనను అమలు చేయడానికి ముందు ప్లాన్ కమాండ్ను అమలు చేయడం మంచి అభ్యాసం:
terraform.exe plan
నోటిఫికేషన్లను పంపడానికి మీరు ఫోన్ నంబర్ను నమోదు చేయమని ప్రాంప్ట్ చేయబడతారు. ఈ దశలో దానిని నమోదు చేయవలసిన అవసరం లేదు.
ప్రోగ్రామ్ యొక్క కార్యాచరణ ప్రణాళికను విశ్లేషించిన తర్వాత, మేము వనరులను సృష్టించడం ప్రారంభించవచ్చు:
terraform.exe apply
ఈ ఆదేశాన్ని పంపిన తర్వాత, మీరు మళ్లీ ఫోన్ నంబర్ను నమోదు చేయమని అడగబడతారు; వాస్తవానికి చర్యలను చేయడం గురించిన ప్రశ్న చూపబడినప్పుడు "అవును" డయల్ చేయండి. ఇది మొత్తం అవస్థాపనను సెటప్ చేయడానికి, EC2 యొక్క అన్ని అవసరమైన కాన్ఫిగరేషన్ను నిర్వహించడానికి, లాంబ్డా ఫంక్షన్లను అమలు చేయడానికి, మొదలైనవాటిని అనుమతిస్తుంది.
Terraform కోడ్ ద్వారా అన్ని వనరులు విజయవంతంగా సృష్టించబడిన తర్వాత, మీరు Kinesis Analytics అప్లికేషన్ యొక్క వివరాలలోకి వెళ్లాలి (దురదృష్టవశాత్తూ, కోడ్ నుండి నేరుగా దీన్ని ఎలా చేయాలో నేను కనుగొనలేదు).
అప్లికేషన్ను ప్రారంభించండి:
దీని తర్వాత, డ్రాప్-డౌన్ జాబితా నుండి ఎంచుకోవడం ద్వారా మీరు అప్లికేషన్లోని స్ట్రీమ్ పేరును స్పష్టంగా సెట్ చేయాలి:
ఇప్పుడు అంతా వెళ్ళడానికి సిద్ధంగా ఉంది.
అప్లికేషన్ని పరీక్షిస్తోంది
మీరు సిస్టమ్ను మాన్యువల్గా లేదా టెర్రాఫార్మ్ కోడ్ ద్వారా ఎలా అమలు చేసినప్పటికీ, అది అలాగే పని చేస్తుంది.
మేము Kinesis ఏజెంట్ ఇన్స్టాల్ చేయబడిన EC2 వర్చువల్ మెషీన్కు SSH ద్వారా లాగిన్ చేస్తాము మరియు api_caller.py స్క్రిప్ట్ను అమలు చేస్తాము
sudo ./api_caller.py TOKEN
మీరు చేయాల్సిందల్లా మీ నంబర్కు SMS కోసం వేచి ఉండండి:
SMS - దాదాపు 1 నిమిషంలో మీ ఫోన్కి సందేశం వస్తుంది:
తదుపరి, మరింత వివరణాత్మక విశ్లేషణ కోసం DynamoDB డేటాబేస్లో రికార్డ్లు సేవ్ చేయబడాయో లేదో చూడాల్సి ఉంది. Airline_tickets పట్టికలో సుమారుగా కింది డేటా ఉంటుంది:
తీర్మానం
చేసిన పనిలో, Amazon Kinesis ఆధారంగా ఆన్లైన్ డేటా ప్రాసెసింగ్ సిస్టమ్ నిర్మించబడింది. కైనెసిస్ డేటా స్ట్రీమ్లతో కలిపి కైనెసిస్ ఏజెంట్ను ఉపయోగించడం మరియు SQL కమాండ్లను ఉపయోగించి రియల్ టైమ్ అనలిటిక్స్ కైనెసిస్ అనలిటిక్స్, అలాగే ఇతర AWS సేవలతో Amazon Kinesis పరస్పర చర్య వంటి ఎంపికలు పరిగణించబడ్డాయి.
మేము పై సిస్టమ్ను రెండు విధాలుగా అమలు చేసాము: పొడవైన మాన్యువల్ మరియు టెర్రాఫార్మ్ కోడ్ నుండి శీఘ్రమైనది.
అన్ని ప్రాజెక్ట్ సోర్స్ కోడ్ అందుబాటులో ఉంది నా GitHub రిపోజిటరీలో, దానితో మిమ్మల్ని మీరు పరిచయం చేసుకోవాలని నేను సూచిస్తున్నాను.
వ్యాసం గురించి చర్చించడం నాకు సంతోషంగా ఉంది, మీ వ్యాఖ్యల కోసం నేను ఎదురు చూస్తున్నాను. నేను నిర్మాణాత్మక విమర్శలను ఆశిస్తున్నాను.