හෙලෝ, හබ්ර්!
ඔබ පියාසර කරන ගුවන් යානා වලට කැමතිද? මම එයට කැමතියි, නමුත් ස්වයං හුදකලාව අතරතුර මම එක් ප්රසිද්ධ සම්පතක් වන Aviasales වෙතින් ගුවන් ටිකට්පත් පිළිබඳ දත්ත විශ්ලේෂණය කිරීමට ද ආදරය කළෙමි.
අද අපි Amazon Kinesis හි වැඩ විශ්ලේෂණය කරන්නෙමු, තත්ය කාලීන විශ්ලේෂණ සමඟ ප්රවාහ පද්ධතියක් ගොඩනඟමු, ප්රධාන දත්ත ගබඩාව ලෙස Amazon DynamoDB NoSQL දත්ත සමුදාය ස්ථාපනය කරන්න, සහ රසවත් ටිකට්පත් සඳහා SMS දැනුම්දීම් සකසන්නෙමු.
සියලුම විස්තර කපා ඇත! යන්න!
හැඳින්වීම
උදාහරණයක් ලෙස, අපට ප්රවේශය අවශ්ය වේ
මෙම ලිපියේ ප්රධාන අරමුණ AWS හි තොරතුරු ප්රවාහය භාවිතය පිළිබඳ සාමාන්ය අවබෝධයක් ලබා දීමයි; භාවිතා කරන ලද API මඟින් ආපසු ලබා දෙන දත්ත දැඩි ලෙස යාවත්කාලීන නොවන අතර එය හැඹිලියෙන් සම්ප්රේෂණය වන බව අපි සැලකිල්ලට ගනිමු. පසුගිය පැය 48 තුළ Aviasales.ru සහ Jetradar.com අඩවි භාවිතා කරන්නන් විසින් කරන ලද සෙවීම් මත පදනම්ව සාදන ලදී.
නිෂ්පාදන යන්ත්රයේ ස්ථාපනය කර ඇති, API හරහා ලැබෙන Kinesis-agent, ස්වයංක්රීයව දත්ත විග්රහ කර Kinesis Data Analytics හරහා අවශ්ය ප්රවාහයට දත්ත සම්ප්රේෂණය කරයි. මෙම ප්රවාහයේ අමු අනුවාදය කෙලින්ම ගබඩාවට ලියා ඇත. DynamoDB හි යොදවා ඇති අමු දත්ත ගබඩාව AWS Quick Sight වැනි BI මෙවලම් හරහා ගැඹුරු ප්රවේශපත්ර විශ්ලේෂණයට ඉඩ සලසයි.
සම්පූර්ණ යටිතල පහසුකම් යෙදවීම සඳහා අපි විකල්ප දෙකක් සලකා බලමු:
- අත්පොත - AWS කළමනාකරණ කොන්සෝලය හරහා;
- ටෙරාෆෝම් කේතයෙන් යටිතල පහසුකම් කම්මැලි ස්වයංක්රීයකරුවන් සඳහා වේ;
සංවර්ධිත පද්ධතියේ ගෘහ නිර්මාණ ශිල්පය
භාවිතා කරන සංරචක:
Aviasales API — මෙම API මඟින් ආපසු ලබා දෙන දත්ත සියලු පසුකාලීන වැඩ සඳහා භාවිතා කරනු ඇත;EC2 නිෂ්පාදක අවස්ථාව — ආදාන දත්ත ප්රවාහය ජනනය කරනු ලබන වලාකුළෙහි සාමාන්ය අතථ්ය යන්ත්රයක්:Kinesis නියෝජිතයා Kinesis (Kinesis Data Streams හෝ Kinesis Firehose) වෙත දත්ත රැස් කිරීමට සහ යැවීමට පහසු ක්රමයක් සපයන යන්ත්රයේ දේශීයව ස්ථාපනය කර ඇති ජාවා යෙදුමකි. නියෝජිතයා විසින් නිශ්චිත නාමාවලිවල ගොනු කට්ටලයක් නිරන්තරයෙන් නිරීක්ෂණය කරන අතර Kinesis වෙත නව දත්ත යවයි;API ඇමතුම් ස්ක්රිප්ට් — API වෙත ඉල්ලීම් කරන පයිතන් ස්ක්රිප්ට් එකක් සහ ප්රතිචාරය Kinesis නියෝජිතයා විසින් නිරීක්ෂණය කරනු ලබන ෆෝල්ඩරයකට දමන්න;
Kinesis දත්ත ප්රවාහ - පුළුල් පරිමාණ හැකියාවන් සහිත තත්ය කාලීන දත්ත ප්රවාහ සේවාව;Kinesis Analytics තත්ය කාලීන ප්රවාහ දත්ත විශ්ලේෂණය සරල කරන සේවාදායක රහිත සේවාවකි. Amazon Kinesis Data Analytics යෙදුම් සම්පත් වින්යාස කරන අතර ලැබෙන ඕනෑම දත්ත පරිමාවක් හැසිරවීමට ස්වයංක්රීයව පරිමාණය කරයි;ඒඩබ්ලිව්එස් ලැම්බඩා — උපස්ථ කිරීම හෝ සේවාදායකයන් පිහිටුවීමකින් තොරව කේතය ධාවනය කිරීමට ඔබට ඉඩ සලසන සේවාවකි. සෑම ඇමතුමක් සඳහාම සියලුම පරිගණක බලය ස්වයංක්රීයව පරිමාණය කෙරේ;Amazon DynamoDB - ඕනෑම පරිමාණයකින් ක්රියාත්මක වන විට මිලි තත්පර 10කට වඩා අඩු ප්රමාදයක් ලබා දෙන යතුරු-අගය යුගල සහ ලේඛනවල දත්ත සමුදායක්. DynamoDB භාවිතා කරන විට, ඔබට කිසිදු සේවාදායකයක් සැපයීම, පැච් කිරීම හෝ කළමනාකරණය කිරීම අවශ්ය නොවේ. DynamoDB පවතින සම්පත් ප්රමාණය වෙනස් කිරීමට සහ ඉහළ කාර්ය සාධනයක් පවත්වා ගැනීමට වගු ස්වයංක්රීයව පරිමාණය කරයි. පද්ධති පරිපාලනය අවශ්ය නොවේ;ඇමේසන් එස්එන්එස් - ප්රකාශක-ග්රාහක (Pub/Sub) ආකෘතිය භාවිතයෙන් පණිවිඩ යැවීම සඳහා පූර්ණ කළමනාකරණය කළ සේවාවක්, ඔබට ක්ෂුද්ර සේවා, බෙදා හරින ලද පද්ධති සහ සේවාදායක රහිත යෙදුම් හුදකලා කළ හැකිය. ජංගම තල්ලු දැනුම්දීම්, SMS පණිවිඩ සහ ඊමේල් හරහා අවසාන පරිශීලකයින්ට තොරතුරු යැවීමට SNS භාවිතා කළ හැක.
මූලික පුහුණුව
දත්ත ප්රවාහය අනුකරණය කිරීම සඳහා, Aviasales API විසින් ආපසු ලබා දුන් ගුවන් ටිකට්පත් තොරතුරු භාවිතා කිරීමට මම තීරණය කළෙමි. තුල
ඉතින්, අපි ලියාපදිංචි වී අපගේ ටෝකනය ලබා ගනිමු.
උදාහරණ ඉල්ලීමක් පහත දැක්වේ:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API
ඉල්ලීමෙහි ටෝකනයක් සඳහන් කිරීමෙන් API වෙතින් දත්ත ලබා ගැනීමේ ඉහත ක්රමය ක්රියාත්මක වනු ඇත, නමුත් මම ප්රවේශ ටෝකනය ශීර්ෂකය හරහා යැවීමට කැමැත්තෙමි, එබැවින් අපි මෙම ක්රමය api_caller.py ස්ක්රිප්ට් තුළ භාවිතා කරමු.
පිළිතුර උදාහරණය:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
ඉහත උදාහරණ API ප්රතිචාරය ශාන්ත පීටර්ස්බර්ග් සිට ෆුක් දක්වා ටිකට් පතක් පෙන්වයි... ඔහ්, මොනතරම් සිහිනයක්ද...
මම කසාන් සිට, සහ ෆුකෙට් දැන් "සිහිනයක් පමණක්" බැවින්, අපි ශාන්ත පීටර්ස්බර්ග් සිට කසාන් දක්වා ටිකට්පත් සොයා බලමු.
ඔබට දැනටමත් AWS ගිණුමක් ඇති බව උපකල්පනය කරයි. කයිනෙසිස් සහ කෙටි පණිවුඩ හරහා දැනුම්දීම් යැවීම වාර්ෂිකව ඇතුළත් කර නොමැති බව වහාම විශේෂ අවධානයට යොමු කිරීමට කැමැත්තෙමි.
නිදහස් ස්ථරය (නොමිලේ භාවිතය) . නමුත් එසේ තිබියදීත්, ඩොලර් කිහිපයක් මනසේ තබාගෙන, යෝජිත පද්ධතිය ගොඩනඟා එය සමඟ සෙල්ලම් කිරීම තරමක් කළ හැකිය. තවද, ඇත්ත වශයෙන්ම, ඒවා තවදුරටත් අවශ්ය නොවන පසු සියලු සම්පත් මකා දැමීමට අමතක නොකරන්න.
වාසනාවකට මෙන්, අපි අපගේ මාසික නොමිලේ සීමාවන් සපුරාලන්නේ නම්, DynamoDb සහ lambda කාර්යයන් අපට නොමිලේ වනු ඇත. උදාහරණයක් ලෙස, DynamoDB සඳහා: 25 GB ආචයනය, 25 WCU/RCU සහ මිලියන 100 විමසුම්. සහ මසකට ලැම්ඩා ක්රියාකාරී ඇමතුම් මිලියනයක්.
අතින් පද්ධති යෙදවීම
Kinesis දත්ත ප්රවාහයන් පිහිටුවීම
අපි Kinesis Data Streams සේවාව වෙත ගොස් නව ප්රවාහ දෙකක්, එක් කැබැල්ලක් බැගින් නිර්මාණය කරමු.
ෂර්ඩ් යනු කුමක්ද?
shard යනු Amazon Kinesis ප්රවාහයක මූලික දත්ත හුවමාරු ඒකකයයි. එක් අංශයක් 1 MB/s වේගයකින් ආදාන දත්ත හුවමාරුව සහ 2 MB/s වේගයකින් ප්රතිදාන දත්ත හුවමාරුව සපයයි. එක් අංශයක් තත්පරයකට PUT ඇතුළත් කිරීම් 1000ක් දක්වා සහය දක්වයි. දත්ත ප්රවාහයක් නිර්මාණය කිරීමේදී, ඔබට අවශ්ය කොටස් ගණන සඳහන් කළ යුතුය. උදාහරණයක් ලෙස, ඔබට කොටස් දෙකක් සහිත දත්ත ප්රවාහයක් සෑදිය හැක. මෙම දත්ත ප්රවාහය 2 MB/s හි ආදාන දත්ත හුවමාරුවක් සහ 4 MB/s හි ප්රතිදාන දත්ත හුවමාරුවක් සපයනු ඇත, තත්පරයකට PUT වාර්තා 2000 දක්වා සහය දක්වයි.
ඔබේ ප්රවාහයේ කැබලි වැඩි වන තරමට එහි ප්රවාහය වැඩි වේ. ප්රතිපත්තිමය වශයෙන්, ප්රවාහයන් පරිමාණය කරන්නේ එලෙසයි - කැබලි එකතු කිරීමෙන්. නමුත් ඔබ සතුව ඇති කැබලි වැඩි වන තරමට මිල වැඩි වේ. සෑම කැබැල්ලක්ම පැයකට ශත 1,5 ක් සහ සෑම PUT ගෙවුම් ඒකක මිලියනයක් සඳහා අමතර ශත 1.4 ක් වැය වේ.
නමින් අලුත් ප්රවාහයක් නිර්මාණය කරමු ගුවන්_ප්රවේශපත්, 1 කෑල්ලක් ඔහුට ප්රමාණවත් වනු ඇත:
දැන් අපි නමත් එක්ක තව ත්රෙඩ් එකක් හදමු විශේෂ_ප්රවාහය:
නිෂ්පාදක සැකසුම
කාර්යයක් විශ්ලේෂණය කිරීමට, දත්ත නිෂ්පාදකයෙකු ලෙස සාමාන්ය EC2 අවස්ථාවක් භාවිතා කිරීම ප්රමාණවත් වේ. එය බලවත්, මිල අධික අතථ්ය යන්ත්රයක් විය යුතු නැත; ස්පෝට් t2.මයික්රෝ ඉතා හොඳින් ක්රියා කරයි.
වැදගත් සටහන: උදාහරණයක් ලෙස, ඔබ රූපය භාවිතා කළ යුතුය - Amazon Linux AMI 2018.03.0, එය Kinesis නියෝජිතයා ඉක්මනින් දියත් කිරීම සඳහා අඩු සැකසුම් ඇත.
EC2 සේවාව වෙත යන්න, නව අථත්ය යන්ත්රයක් සාදන්න, නිදහස් ස්ථරයට ඇතුළත් කර ඇති t2.micro වර්ගය සමඟ කැමති AMI තෝරන්න:
අලුතින් නිර්මාණය කරන ලද අතථ්ය යන්ත්රයට Kinesis සේවාව සමඟ අන්තර් ක්රියා කිරීමට හැකි වීම සඳහා, එය කිරීමට අයිතිය ලබා දිය යුතුය. මෙය කිරීමට හොඳම ක්රමය වන්නේ IAM භූමිකාවක් පැවරීමයි. එබැවින්, පියවර 3: වින්යාස කිරීමේ අවස්ථා විස්තර තිරයේ, ඔබ තෝරාගත යුතුය නව IAM භූමිකාව සාදන්න:
EC2 සඳහා IAM භූමිකාවක් නිර්මාණය කිරීම
විවෘත වන කවුළුව තුළ, අපි EC2 සඳහා නව භූමිකාවක් නිර්මාණය කරන බව තෝරන්න සහ අවසර අංශය වෙත යන්න:
පුහුණු උදාහරණය භාවිතා කරමින්, සම්පත් හිමිකම් පිළිබඳ කැටිති වින්යාස කිරීමේ සියලු සංකීර්ණතා වෙත යාමට අපට අවශ්ය නැත, එබැවින් අපි Amazon විසින් පෙර-වින්යාස කළ ප්රතිපත්ති තෝරා ගනිමු: AmazonKinesisFullAccess සහ CloudWatchFullAccess.
අපි මෙම භූමිකාව සඳහා අර්ථවත් නමක් ලබා දෙමු, උදාහරණයක් ලෙස: EC2-KinesisStreams-FullAccess. පහත පින්තූරයේ පෙන්වා ඇති පරිදි ප්රතිඵලය සමාන විය යුතුය:
මෙම නව භූමිකාව නිර්මාණය කිරීමෙන් පසු, එය නිර්මාණය කරන ලද අථත්ය යන්ත්ර අවස්ථාවට එය අමුණන්න අමතක කරන්න එපා:
අපි මෙම තිරයේ වෙනත් කිසිවක් වෙනස් නොකරන අතර ඊළඟ කවුළු වෙත යන්න.
දෘඪ තැටි සැකසුම් පෙරනිමි ලෙස තැබිය හැකිය, ටැග් (ටැග් භාවිතා කිරීම හොඳ පුරුද්දක් වුවද, අවම වශයෙන් අවස්ථාවට නමක් ලබා දී පරිසරය සඳහන් කරන්න).
දැන් අපි සිටින්නේ පියවර 6: ආරක්ෂක කණ්ඩායම් වින්යාස කරන්න පටිත්තෙහි, ඔබට නව එකක් තැනීමට හෝ ඔබගේ පවතින ආරක්ෂක කණ්ඩායම සඳහන් කිරීමට අවශ්ය වේ, එමඟින් ඔබට ssh (වරාය 22) හරහා එම අවස්ථාවට සම්බන්ධ වීමට ඉඩ සලසයි. එහි Source -> My IP තෝරන්න එවිට ඔබට එම අවස්ථාව දියත් කළ හැක.
එය ධාවන තත්ත්වයට මාරු වූ වහාම, ඔබට ssh හරහා එයට සම්බන්ධ වීමට උත්සාහ කළ හැකිය.
Kinesis Agent සමඟ වැඩ කිරීමට හැකිවීම සඳහා, යන්ත්රයට සාර්ථකව සම්බන්ධ වූ පසු, ඔබ ටර්මිනලයට පහත විධාන ඇතුළත් කළ යුතුය:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
API ප්රතිචාර සුරැකීමට ෆෝල්ඩරයක් සාදා ගනිමු:
sudo mkdir /var/log/airline_tickets
නියෝජිතයා ආරම්භ කිරීමට පෙර, ඔබ එහි වින්යාසය වින්යාසගත කළ යුතුය:
sudo vim /etc/aws-kinesis/agent.json
agent.json ගොනුවේ අන්තර්ගතය මෙසේ විය යුතුය:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
වින්යාස ගොනුවෙන් දැකිය හැකි පරිදි, නියෝජිතයා විසින් /var/log/airline_tickets/ බහලුම තුළ .log දිගුව සහිත ගොනු නිරීක්ෂණය කර, ඒවා විග්රහ කර airline_tickets ප්රවාහයට මාරු කරනු ඇත.
අපි සේවාව නැවත ආරම්භ කර එය ක්රියාත්මක වන බවට වග බලා ගන්න:
sudo service aws-kinesis-agent restart
දැන් අපි API වෙතින් දත්ත ඉල්ලා සිටින පයිතන් ස්ක්රිප්ට් බාගත කරමු:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
api_caller.py ස්ක්රිප්ටය Aviasales වෙතින් දත්ත ඉල්ලා සිටින අතර ලැබුණු ප්රතිචාරය Kinesis නියෝජිතයා ස්කෑන් කරන නාමාවලියෙහි සුරකියි. මෙම ස්ක්රිප්ට් ක්රියාත්මක කිරීම තරමක් සම්මතයි, TicketsApi පන්තියක් ඇත, එය ඔබට API අසමමුහුර්තව ඇද ගැනීමට ඉඩ සලසයි. අපි ටෝකනයක් සහිත ශීර්ෂයක් ලබා දී මෙම පන්තියට පරාමිති ඉල්ලමු:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
නියෝජිතයාගේ නිවැරදි සැකසුම් සහ ක්රියාකාරීත්වය පරීක්ෂා කිරීමට, අපි api_caller.py ස්ක්රිප්ට් ධාවනය පරීක්ෂා කරමු:
sudo ./api_caller.py TOKEN
තවද අපි නියෝජිත ලොගවල සහ එයාර්ලයින්_ටිකට් දත්ත ප්රවාහයේ අධීක්ෂණ පටිත්තෙහි කාර්යයේ ප්රතිඵලය දෙස බලමු:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log
ඔබට පෙනෙන පරිදි, සෑම දෙයක්ම ක්රියාත්මක වන අතර Kinesis නියෝජිතයා සාර්ථකව ප්රවාහයට දත්ත යවයි. දැන් අපි පාරිභෝගිකයා වින්යාස කරමු.
Kinesis දත්ත විශ්ලේෂණ සැකසීම
අපි සම්පූර්ණ පද්ධතියේ කේන්ද්රීය අංගය වෙත යමු - Kinesis Data Analytics හි kinesis_analytics_airlines_app නමින් නව යෙදුමක් සාදන්න:
Kinesis Data Analytics ඔබට SQL භාෂාව භාවිතයෙන් Kinesis Streams වෙතින් තත්ය කාලීන දත්ත විශ්ලේෂණ සිදු කිරීමට ඉඩ සලසයි. එය සම්පුර්ණ ස්වයං පරිමාණ සේවාවකි (Kinesis Streams මෙන් නොව):
- මූලාශ්ර දත්ත සඳහා වන ඉල්ලීම් මත පදනම්ව නව ප්රවාහ (ප්රතිදාන ප්රවාහ) නිර්මාණය කිරීමට ඔබට ඉඩ සලසයි;
- යෙදුම් ක්රියාත්මක වන විට සිදු වූ දෝෂ සහිත ප්රවාහයක් සපයයි (දෝෂ ප්රවාහය);
- ආදාන දත්ත ක්රමය ස්වයංක්රීයව තීරණය කළ හැක (අවශ්ය නම් එය අතින් නැවත අර්ථ දැක්විය හැක).
මෙය ලාභදායී සේවාවක් නොවේ - වැඩ කරන පැයකට USD 0.11, එබැවින් ඔබ එය ප්රවේශමෙන් භාවිතා කළ යුතු අතර ඔබ අවසන් වූ පසු එය මකා දැමිය යුතුය.
අපි යෙදුම දත්ත මූලාශ්රයට සම්බන්ධ කරමු:
අපි සම්බන්ධ කිරීමට යන ප්රවාහය තෝරන්න (airline_tickets):
ඊළඟට, ඔබ යෙදුමට ප්රවාහයෙන් කියවා ප්රවාහයට ලිවීමට හැකි වන පරිදි නව IAM භූමිකාවක් ඇමිණිය යුතුය. මෙය සිදු කිරීම සඳහා, ප්රවේශ අවසර කොටසේ කිසිවක් වෙනස් නොකිරීමට ප්රමාණවත් වේ:
දැන් අපි ප්රවාහයේ දත්ත යෝජනා ක්රමය සොයා ගැනීමට ඉල්ලා සිටිමු; මෙය කිරීමට, "Discover schema" බොත්තම ක්ලික් කරන්න. එහි ප්රතිඵලයක් ලෙස, IAM භූමිකාව යාවත්කාලීන වනු ඇත (නව එකක් සාදනු ඇත) සහ දැනටමත් ප්රවාහයට පැමිණ ඇති දත්ත වලින් ක්රම හඳුනාගැනීම දියත් කෙරේ:
දැන් ඔබ SQL සංස්කාරකය වෙත යා යුතුය. ඔබ මෙම බොත්තම මත ක්ලික් කළ විට, යෙදුම දියත් කරන ලෙස ඉල්ලා සිටින කවුළුවක් දිස්වනු ඇත - ඔබට දියත් කිරීමට අවශ්ය දේ තෝරන්න:
පහත සරල විමසුම SQL සංස්කාරක කවුළුවට ඇතුළු කර SQL සුරකින්න සහ ධාවනය කරන්න ක්ලික් කරන්න:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
සම්බන්ධක දත්ත සමුදායන්හිදී, ඔබ වාර්තා එක් කිරීමට INSERT ප්රකාශ සහ දත්ත විමසීමට SELECT ප්රකාශයක් භාවිතා කරමින් වගු සමඟ වැඩ කරයි. Amazon Kinesis Data Analytics හි, ඔබ ප්රවාහ (STREAMs) සහ පොම්ප (PUMPs) සමඟ වැඩ කරයි - යෙදුමක එක් ප්රවාහයකින් දත්ත වෙනත් ප්රවාහයකට ඇතුළු කරන අඛණ්ඩ ඇතුළු කිරීමේ ඉල්ලීම්.
ඉහත ඉදිරිපත් කර ඇති SQL විමසුම රූබල් පන්දහසකට වඩා අඩු වියදමකින් Aeroflot ටිකට් සඳහා සෙවුම් කරයි. මෙම කොන්දේසි සපුරාලන සියලුම වාර්තා DESTINATION_SQL_STREAM ප්රවාහයේ තබනු ඇත.
ගමනාන්ත කොටසෙහි, විශේෂ_ප්රවාහ ප්රවාහය තෝරන්න, සහ යෙදුම තුළ ප්රවාහ නාමයෙන් DESTINATION_SQL_STREAM පතන ලැයිස්තුව:
සියලුම උපාමාරු වල ප්රතිඵලය පහත පින්තූරයට සමාන දෙයක් විය යුතුය:
SNS මාතෘකාවක් නිර්මාණය කිරීම සහ දායක වීම
සරල දැනුම්දීම් සේවාව වෙත ගොස් එහි ගුවන් සේවය නමින් නව මාතෘකාවක් සාදන්න:
මෙම මාතෘකාවට දායක වී SMS දැනුම්දීම් යවනු ලබන ජංගම දුරකථන අංකය සඳහන් කරන්න:
DynamoDB හි වගුවක් සාදන්න
ඔවුන්ගේ එයාර්ලයින්_ටිකට් ප්රවාහයෙන් අමු දත්ත ගබඩා කිරීමට, අපි DynamoDB හි එම නමින්ම වගුවක් නිර්මාණය කරමු. අපි Record_id මූලික යතුර ලෙස භාවිතා කරන්නෙමු:
ලැම්ඩා ශ්රිත එකතුකරන්නෙකු නිර්මාණය කිරීම
අපි කලෙක්ටර් නමින් ලැම්ඩා ශ්රිතයක් නිර්මාණය කරමු, එහි කාර්යය වන්නේ එයාර්ලයින්_ටිකට් ප්රවාහය ඡන්ද විමසීම සහ එහි නව වාර්තා හමු වුවහොත්, මෙම වාර්තා DynamoDB වගුවට ඇතුළු කිරීමයි. පැහැදිලිවම, පෙරනිමි හිමිකම් වලට අමතරව, මෙම lambda හට Kinesis දත්ත ප්රවාහයට කියවීමට සහ DynamoDB වෙත ලිවීමට ප්රවේශය තිබිය යුතුය.
එකතුකරන්නා lambda කාර්යය සඳහා IAM භූමිකාවක් නිර්මාණය කිරීම
පළමුව, Lambda-TicketsProcessingRole නමින් lambda සඳහා නව IAM භූමිකාවක් නිර්මාණය කරමු:
පරීක්ෂණ උදාහරණය සඳහා, පහත පින්තූරයේ පෙන්වා ඇති පරිදි, පෙර-වින්යාසගත AmazonKinesisReadOnlyAccess සහ AmazonDynamoDBFullAccess ප්රතිපත්ති බෙහෙවින් සුදුසු ය:
මෙම lambda නව ප්රවේශයන් airline_stream වෙත ඇතුළු වන විට Kinesis වෙතින් ප්රේරකයක් මඟින් දියත් කළ යුතුය, එබැවින් අපට නව ප්රේරකයක් එක් කිරීමට අවශ්ය වේ:
ඉතිරිව ඇත්තේ කේතය ඇතුළු කිරීම සහ ලැම්ඩා සුරැකීම පමණි.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
lambda function notifier එකක් නිර්මාණය කිරීම
දෙවන ප්රවාහය (විශේෂ_ප්රවාහය) නිරීක්ෂණය කර SNS වෙත දැනුම්දීමක් යවන දෙවන lambda ශ්රිතය ද ඒ හා සමාන ආකාරයකින් නිර්මාණය කර ඇත. එමනිසා, මෙම ලැම්ඩාට Kinesis වෙතින් කියවීමට සහ ලබා දී ඇති SNS මාතෘකාවකට පණිවිඩ යැවීමට ප්රවේශය තිබිය යුතු අතර, එය SNS සේවාව මගින් මෙම මාතෘකාවේ සියලුම ග්රාහකයින් වෙත යවනු ලැබේ (ඊමේල්, SMS, ආදිය).
IAM භූමිකාවක් නිර්මාණය කිරීම
පළමුව, අපි මෙම lambda සඳහා IAM භූමිකාව Lambda-KinesisAlarm නිර්මාණය කර, පසුව මෙම භූමිකාව නිර්මාණය කරන alarm_notifier lambda වෙත පවරමු:
මෙම ලැම්ඩා විශේෂ_ප්රවාහයට ඇතුළු වීමට නව වාර්තා සඳහා ප්රේරකයක් මත ක්රියා කළ යුතුය, එබැවින් අපි එකතු කරන්නා ලැම්ඩා සඳහා කළ ආකාරයටම ඔබට ප්රේරකය වින්යාස කිරීමට අවශ්ය වේ.
මෙම lambda වින්යාස කිරීම පහසු කිරීම සඳහා, අපි නව පරිසර විචල්යයක් හඳුන්වා දෙමු - TOPIC_ARN, එහිදී අපි එයාර්ලයින්ස් මාතෘකාවේ ANR (Amazon Recourse Names) ස්ථානගත කරමු:
ලැම්ඩා කේතය ඇතුළු කරන්න, එය කිසිසේත් සංකීර්ණ නොවේ:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
මෙය අතින් පද්ධති වින්යාසය සම්පූර්ණ කර ඇති බව පෙනේ. ඉතිරිව ඇත්තේ අප සියල්ල නිවැරදිව වින්යාස කර ඇති බව පරීක්ෂා කර සහතික කර ගැනීමයි.
Terraform කේතයෙන් යෙදවීම
අවශ්ය සූදානම
ඔබට බෙදා හැරීම බාගත කළ හැකිය
ආරම්භ කරන්නේ කෙසේද?
ව්යාපෘතියේ සම්පූර්ණ කේතය වේ
හොඳ පරිචයක් නම්, ටෙරාෆෝම් දැනට වලාකුළ තුළ අප වෙනුවෙන් නිර්මාණය කරන්නේ කුමක් දැයි බැලීමට සම්පූර්ණ යටිතල පහසුකම් යෙදවීමට පෙර සැලසුම් විධානය ක්රියාත්මක කිරීමයි:
terraform.exe plan
දැනුම්දීම් යැවීමට දුරකථන අංකයක් ඇතුළත් කිරීමට ඔබෙන් විමසනු ඇත. මෙම අදියරේදී එය ඇතුල් කිරීම අවශ්ය නොවේ.
වැඩසටහනේ මෙහෙයුම් සැලැස්ම විශ්ලේෂණය කිරීමෙන් පසු, අපට සම්පත් නිර්මාණය කිරීම ආරම්භ කළ හැකිය:
terraform.exe apply
මෙම විධානය යැවීමෙන් පසු, ඔබට නැවත දුරකථන අංකයක් ඇතුළත් කිරීමට අසනු ඇත; ඇත්ත වශයෙන්ම ක්රියාවන් සිදු කිරීම පිළිබඳ ප්රශ්නයක් පෙන්වන විට "ඔව්" අමතන්න. මෙය ඔබට සම්පූර්ණ යටිතල පහසුකම් සැකසීමට, EC2 හි අවශ්ය සියලුම වින්යාසයන් සිදු කිරීමට, ලැම්ඩා කාර්යයන් යෙදවීමට යනාදියට ඉඩ සලසයි.
Terraform කේතය හරහා සියලුම සම්පත් සාර්ථකව නිර්මාණය කළ පසු, ඔබ Kinesis Analytics යෙදුමේ විස්තර වෙත යා යුතුය (අවාසනාවකට, කේතයෙන් මෙය කෙළින්ම කරන්නේ කෙසේදැයි මම සොයා ගත්තේ නැත).
යෙදුම දියත් කරන්න:
මෙයින් පසු, ඔබ පතන ලැයිස්තුවෙන් තේරීමෙන් යෙදුම තුළ ප්රවාහ නාමය පැහැදිලිව සැකසිය යුතුය:
දැන් යන්න සියල්ල සූදානම්.
යෙදුම පරීක්ෂා කිරීම
ඔබ අතින් හෝ Terraform කේතය හරහා පද්ධතිය යෙදවූ ආකාරය කුමක් වුවත්, එය එලෙසම ක්රියා කරයි.
අපි SSH හරහා Kinesis Agent ස්ථාපනය කර ඇති EC2 අතථ්ය යන්ත්රයට ලොග් වී api_caller.py ස්ක්රිප්ට් ධාවනය කරන්නෙමු.
sudo ./api_caller.py TOKEN
ඔබ කළ යුත්තේ ඔබගේ අංකයට කෙටි පණිවිඩයක් එනතෙක් බලා සිටීමයි.
SMS - විනාඩි 1කින් පමණ ඔබගේ දුරකථනයට පණිවිඩයක් පැමිණේ:
පසුව වඩාත් සවිස්තරාත්මක විශ්ලේෂණයක් සඳහා වාර්තා DynamoDB දත්ත ගබඩාවේ සුරකින ලද්දේ දැයි බැලීමට ඉතිරිව ඇත. Airline_tickets වගුවේ ආසන්න වශයෙන් පහත දත්ත අඩංගු වේ:
නිගමනය
සිදු කරන ලද කාර්යය අතරතුර, Amazon Kinesis මත පදනම්ව මාර්ගගත දත්ත සැකසුම් පද්ධතියක් ගොඩනගා ඇත. Kinesis දත්ත ප්රවාහ සහ SQL විධාන භාවිතා කරන තත්ය කාලීන විශ්ලේෂණ Kinesis Analytics සමඟ ඒකාබද්ධව Kinesis නියෝජිතයා භාවිතා කිරීමේ විකල්ප මෙන්ම අනෙකුත් AWS සේවාවන් සමඟ Amazon Kinesis අන්තර්ක්රියා සලකා බලන ලදී.
අපි ඉහත පද්ධතිය ක්රම දෙකකින් යෙදුවෙමු: තරමක් දිගු අත්පොත සහ ටෙරාෆෝම් කේතයෙන් ඉක්මන් එකක්.
සියලුම ව්යාපෘති මූල කේතය තිබේ
ලිපිය ගැන සාකච්ඡා කිරීමට ලැබීම ගැන මම සතුටු වෙමි, ඔබගේ අදහස් බලාපොරොත්තු වෙමි. නිර්මාණාත්මක විවේචනයක් බලාපොරොත්තු වෙමි.
ඔබට සාර්ථක වේවා!
මූලාශ්රය: www.habr.com