புரோஹோஸ்டர் > Блог > நிர்வாகம் > அமேசான் கினிசிஸ் மற்றும் சர்வர்லெஸ் எளிமையுடன் Aviasales API ஒருங்கிணைப்பு
அமேசான் கினிசிஸ் மற்றும் சர்வர்லெஸ் எளிமையுடன் Aviasales API ஒருங்கிணைப்பு
ஹே ஹப்ர்!
நீங்கள் பறக்கும் விமானங்களை விரும்புகிறீர்களா? நான் அதை விரும்புகிறேன், ஆனால் சுய-தனிமைப்படுத்தலின் போது, நன்கு அறியப்பட்ட ஒரு ஆதாரமான அவியாசலேஸ்-ல் இருந்து விமான டிக்கெட்டுகள் பற்றிய தரவை பகுப்பாய்வு செய்வதிலும் எனக்கு காதல் ஏற்பட்டது.
இன்று நாம் Amazon Kinesis இன் வேலையை பகுப்பாய்வு செய்வோம், நிகழ்நேர பகுப்பாய்வுகளுடன் ஸ்ட்ரீமிங் அமைப்பை உருவாக்குவோம், Amazon DynamoDB NoSQL தரவுத்தளத்தை முக்கிய தரவு சேமிப்பகமாக நிறுவுவோம், மேலும் சுவாரஸ்யமான டிக்கெட்டுகளுக்கு SMS அறிவிப்புகளை அமைப்போம்.
அனைத்து விவரங்களும் வெட்டப்படுகின்றன! போ!
அறிமுகம்
உதாரணமாக, எங்களுக்கு அணுகல் தேவை Aviasales API. அதற்கான அணுகல் இலவசம் மற்றும் கட்டுப்பாடுகள் இல்லாமல் வழங்கப்படுகிறது; தரவை அணுக உங்கள் API டோக்கனைப் பெற, "டெவலப்பர்கள்" பிரிவில் பதிவு செய்ய வேண்டும்.
இந்த கட்டுரையின் முக்கிய நோக்கம் AWS இல் தகவல் ஸ்ட்ரீமிங்கின் பயன்பாட்டைப் பற்றிய பொதுவான புரிதலை வழங்குவதாகும்; பயன்படுத்தப்பட்ட API மூலம் வழங்கப்பட்ட தரவு கண்டிப்பாக புதுப்பித்த நிலையில் இல்லை மற்றும் தற்காலிக சேமிப்பிலிருந்து அனுப்பப்படுகிறது என்பதை நாங்கள் கணக்கில் எடுத்துக்கொள்கிறோம். கடந்த 48 மணிநேரமாக Aviasales.ru மற்றும் Jetradar.com தளங்களின் பயனர்களின் தேடல்களின் அடிப்படையில் உருவாக்கப்பட்டது.
உற்பத்தி செய்யும் இயந்திரத்தில் நிறுவப்பட்ட கினிசிஸ்-ஏஜென்ட், ஏபிஐ மூலம் பெறப்பட்ட தரவை தானாகவே பாகுபடுத்தி, கினசிஸ் டேட்டா அனலிட்டிக்ஸ் மூலம் விரும்பிய ஸ்ட்ரீமிற்கு தரவை அனுப்பும். இந்த ஸ்ட்ரீமின் மூல பதிப்பு நேரடியாக கடையில் எழுதப்படும். DynamoDB இல் பயன்படுத்தப்பட்ட மூல தரவு சேமிப்பகம் AWS விரைவு பார்வை போன்ற BI கருவிகள் மூலம் ஆழமான டிக்கெட் பகுப்பாய்வுக்கு அனுமதிக்கும்.
முழு உள்கட்டமைப்பையும் வரிசைப்படுத்த இரண்டு விருப்பங்களைக் கருத்தில் கொள்வோம்:
Aviasales API — இந்த API மூலம் வழங்கப்படும் தரவு அனைத்து அடுத்தடுத்த வேலைகளுக்கும் பயன்படுத்தப்படும்;
EC2 தயாரிப்பாளர் நிகழ்வு — உள்ளீட்டு தரவு ஸ்ட்ரீம் உருவாக்கப்படும் கிளவுட்டில் ஒரு வழக்கமான மெய்நிகர் இயந்திரம்:
கினேசிஸ் முகவர் கணினியில் உள்நாட்டில் நிறுவப்பட்ட ஜாவா பயன்பாடாகும், இது கினிசிஸ் (கினேசிஸ் டேட்டா ஸ்ட்ரீம்ஸ் அல்லது கினெசிஸ் ஃபயர்ஹோஸ்) தரவைச் சேகரித்து அனுப்புவதற்கு எளிதான வழியை வழங்குகிறது. முகவர் குறிப்பிட்ட கோப்பகங்களில் உள்ள கோப்புகளின் தொகுப்பை தொடர்ந்து கண்காணித்து புதிய தரவை Kinesis க்கு அனுப்புகிறார்;
API அழைப்பாளர் ஸ்கிரிப்ட் — பைதான் ஸ்கிரிப்ட், இது API க்கு கோரிக்கைகளை வைக்கிறது மற்றும் Kinesis முகவரால் கண்காணிக்கப்படும் ஒரு கோப்புறையில் பதிலை வைக்கிறது;
கினெசிஸ் அனலிட்டிக்ஸ் நிகழ்நேரத்தில் ஸ்ட்ரீமிங் தரவின் பகுப்பாய்வை எளிதாக்கும் சர்வர்லெஸ் சேவையாகும். Amazon Kinesis டேட்டா அனலிட்டிக்ஸ் பயன்பாட்டு ஆதாரங்களை உள்ளமைக்கிறது மற்றும் உள்வரும் தரவின் எந்த அளவையும் கையாள தானாக அளவிடுகிறது;
AWS லாம்ப்டா — காப்புப் பிரதி எடுக்காமல் அல்லது சேவையகங்களை அமைக்காமல் குறியீட்டை இயக்க உங்களை அனுமதிக்கும் சேவை. ஒவ்வொரு அழைப்புக்கும் அனைத்து கணினி சக்தியும் தானாகவே அளவிடப்படுகிறது;
அமேசான் டைனமோடிபி - எந்த அளவிலும் இயங்கும் போது 10 மில்லி விநாடிகளுக்கும் குறைவான தாமதத்தை வழங்கும் முக்கிய மதிப்பு ஜோடிகள் மற்றும் ஆவணங்களின் தரவுத்தளம். DynamoDB ஐப் பயன்படுத்தும் போது, நீங்கள் எந்த சேவையகத்தையும் வழங்கவோ, இணைக்கவோ அல்லது நிர்வகிக்கவோ தேவையில்லை. DynamoDB, கிடைக்கக்கூடிய ஆதாரங்களின் அளவைச் சரிசெய்வதற்கும், உயர் செயல்திறனைப் பராமரிப்பதற்கும் தானாகவே அட்டவணைகளை அளவிடுகிறது. கணினி நிர்வாகம் தேவையில்லை;
அமேசான் எஸ்என்எஸ் - வெளியீட்டாளர்-சந்தாதாரர் (பப்/சப்) மாதிரியைப் பயன்படுத்தி செய்திகளை அனுப்புவதற்கான முழுமையாக நிர்வகிக்கப்படும் சேவையாகும், இதன் மூலம் நீங்கள் மைக்ரோ சர்வீஸ்கள், விநியோகிக்கப்பட்ட அமைப்புகள் மற்றும் சர்வர்லெஸ் பயன்பாடுகளை தனிமைப்படுத்தலாம். மொபைல் புஷ் அறிவிப்புகள், SMS செய்திகள் மற்றும் மின்னஞ்சல்கள் மூலம் இறுதிப் பயனர்களுக்கு தகவலை அனுப்ப SNS ஐப் பயன்படுத்தலாம்.
ஆரம்ப பயிற்சி
தரவு ஓட்டத்தை பின்பற்ற, Aviasales API வழங்கிய விமான டிக்கெட் தகவலைப் பயன்படுத்த முடிவு செய்தேன். IN ஆவணங்கள் வெவ்வேறு முறைகளின் மிகவும் விரிவான பட்டியல், அவற்றில் ஒன்றை எடுத்துக் கொள்வோம் - "மாதாந்திர விலை நாட்காட்டி", இது மாதத்தின் ஒவ்வொரு நாளுக்கான விலைகளையும், இடமாற்றங்களின் எண்ணிக்கையால் தொகுக்கப்பட்டுள்ளது. கோரிக்கையில் தேடல் மாதத்தை நீங்கள் குறிப்பிடவில்லை என்றால், தற்போதைய மாதத்திற்கு அடுத்த மாதத்திற்கான தகவல் வழங்கப்படும்.
கோரிக்கையில் டோக்கனைக் குறிப்பிடுவதன் மூலம் API இலிருந்து தரவைப் பெறும் மேலே உள்ள முறை வேலை செய்யும், ஆனால் அணுகல் டோக்கனை தலைப்பு வழியாக அனுப்ப விரும்புகிறேன், எனவே இந்த முறையை api_caller.py ஸ்கிரிப்ட்டில் பயன்படுத்துவோம்.
மேலே உள்ள எடுத்துக்காட்டு API பதில், செயின்ட் பீட்டர்ஸ்பர்க்கிலிருந்து ஃபூக்கிற்கு ஒரு டிக்கெட்டைக் காட்டுகிறது... ஓ, என்ன கனவு...
நான் கசானில் இருந்து வருவதால், ஃபூகெட் இப்போது "கனவு மட்டுமே" என்பதால், செயின்ட் பீட்டர்ஸ்பர்க்கிலிருந்து கசானுக்கு டிக்கெட்டுகளைத் தேடுவோம்.
உங்களிடம் ஏற்கனவே AWS கணக்கு இருப்பதாக இது கருதுகிறது. Kinesis மற்றும் SMS மூலம் அறிவிப்புகளை அனுப்புதல் ஆகியவை வருடாந்தரத்தில் சேர்க்கப்படவில்லை என்பதில் உடனடியாக சிறப்பு கவனம் செலுத்த விரும்புகிறேன். இலவச அடுக்கு (இலவச பயன்பாடு). ஆனால் இது இருந்தபோதிலும், இரண்டு டாலர்களை மனதில் கொண்டு, முன்மொழியப்பட்ட அமைப்பை உருவாக்கி அதனுடன் விளையாடுவது மிகவும் சாத்தியமாகும். மேலும், நிச்சயமாக, எல்லா ஆதாரங்களும் தேவைப்படாத பிறகு அவற்றை நீக்க மறக்காதீர்கள்.
அதிர்ஷ்டவசமாக, எங்கள் மாதாந்திர இலவச வரம்புகளை நாங்கள் பூர்த்தி செய்தால், DynamoDb மற்றும் lambda செயல்பாடுகள் எங்களுக்கு இலவசமாக இருக்கும். எடுத்துக்காட்டாக, DynamoDBக்கு: 25 GB சேமிப்பு, 25 WCU/RCU மற்றும் 100 மில்லியன் வினவல்கள். மாதத்திற்கு ஒரு மில்லியன் லாம்ப்டா செயல்பாடு அழைப்புகள்.
கைமுறை அமைப்பு வரிசைப்படுத்தல்
கினிசிஸ் டேட்டா ஸ்ட்ரீம்களை அமைத்தல்
கினேசிஸ் டேட்டா ஸ்ட்ரீம்ஸ் சேவைக்குச் சென்று, ஒவ்வொன்றிற்கும் ஒரு ஷார்ட் என்ற இரண்டு புதிய ஸ்ட்ரீம்களை உருவாக்குவோம்.
ஷார்ட் என்றால் என்ன?
ஒரு ஷார்ட் என்பது Amazon Kinesis ஸ்ட்ரீமின் அடிப்படை தரவு பரிமாற்ற அலகு ஆகும். ஒரு பிரிவு 1 MB/s வேகத்தில் உள்ளீட்டு தரவு பரிமாற்றத்தையும் 2 MB/s வேகத்தில் வெளியீட்டு தரவு பரிமாற்றத்தையும் வழங்குகிறது. ஒரு பிரிவு வினாடிக்கு 1000 PUT உள்ளீடுகளை ஆதரிக்கிறது. தரவு ஸ்ட்ரீமை உருவாக்கும் போது, தேவையான எண்ணிக்கையிலான பிரிவுகளை நீங்கள் குறிப்பிட வேண்டும். எடுத்துக்காட்டாக, நீங்கள் இரண்டு பிரிவுகளுடன் தரவு ஸ்ட்ரீமை உருவாக்கலாம். இந்தத் தரவு ஸ்ட்ரீம் 2 MB/s இல் உள்ளீட்டு தரவு பரிமாற்றத்தையும், 4 MB/s இல் வெளியீட்டு தரவு பரிமாற்றத்தையும் வழங்கும், இது வினாடிக்கு 2000 PUT பதிவுகளை ஆதரிக்கும்.
உங்கள் ஸ்ட்ரீமில் அதிகமான துண்டுகள், அதன் செயல்திறன் அதிகமாகும். கொள்கையளவில், இந்த ஓட்டங்கள் அளவிடப்படுகின்றன - துண்டுகளைச் சேர்ப்பதன் மூலம். ஆனால் உங்களிடம் அதிகமான துண்டுகள், அதிக விலை. ஒவ்வொரு ஷார்ட் ஒரு மணி நேரத்திற்கு 1,5 சென்ட்கள் மற்றும் ஒவ்வொரு மில்லியன் PUT பேலோட் யூனிட்டுகளுக்கு கூடுதலாக 1.4 சென்ட்கள் செலவாகும்.
என்ற பெயரில் புதிய ஸ்ட்ரீமை உருவாக்குவோம் விமான_டிக்கெட்டுகள், 1 துண்டு அவருக்கு போதுமானதாக இருக்கும்:
இப்போது பெயருடன் மற்றொரு நூலை உருவாக்குவோம் சிறப்பு_ஸ்ட்ரீம்:
தயாரிப்பாளர் அமைப்பு
ஒரு பணியை பகுப்பாய்வு செய்ய, தரவு தயாரிப்பாளராக வழக்கமான EC2 நிகழ்வைப் பயன்படுத்தினால் போதும். இது ஒரு சக்திவாய்ந்த, விலையுயர்ந்த மெய்நிகர் இயந்திரமாக இருக்க வேண்டியதில்லை; ஒரு ஸ்பாட் t2.மைக்ரோ நன்றாகச் செய்யும்.
முக்கிய குறிப்பு: எடுத்துக்காட்டாக, நீங்கள் படத்தைப் பயன்படுத்த வேண்டும் - Amazon Linux AMI 2018.03.0, இது Kinesis முகவரை விரைவாகத் தொடங்குவதற்கு குறைவான அமைப்புகளைக் கொண்டுள்ளது.
EC2 சேவைக்குச் சென்று, ஒரு புதிய மெய்நிகர் இயந்திரத்தை உருவாக்கவும், t2.micro வகையுடன் விரும்பிய AMI ஐத் தேர்ந்தெடுக்கவும், இது இலவச அடுக்கில் சேர்க்கப்பட்டுள்ளது:
புதிதாக உருவாக்கப்பட்ட மெய்நிகர் இயந்திரம் Kinesis சேவையுடன் தொடர்பு கொள்ள, அதற்கான உரிமைகள் வழங்கப்பட வேண்டும். இதைச் செய்வதற்கான சிறந்த வழி, ஒரு IAM பாத்திரத்தை ஒதுக்குவதாகும். எனவே, படி 3: உள்ளமைவு நிகழ்வு விவரங்கள் திரையில், நீங்கள் தேர்ந்தெடுக்க வேண்டும் புதிய IAM பாத்திரத்தை உருவாக்கவும்:
EC2க்கான IAM பாத்திரத்தை உருவாக்குதல்
திறக்கும் சாளரத்தில், நாங்கள் EC2 க்கு ஒரு புதிய பாத்திரத்தை உருவாக்குகிறோம் என்பதைத் தேர்ந்தெடுத்து அனுமதிகள் பிரிவுக்குச் செல்லவும்:
பயிற்சி எடுத்துக்காட்டைப் பயன்படுத்தி, வள உரிமைகளின் சிறுமணி உள்ளமைவின் அனைத்து நுணுக்கங்களுக்கும் நாங்கள் செல்ல வேண்டியதில்லை, எனவே AmazonKinesisFullAccess மற்றும் CloudWatchFullAccess ஆகியவற்றால் முன்பே கட்டமைக்கப்பட்ட கொள்கைகளைத் தேர்ந்தெடுப்போம்.
இந்தப் பாத்திரத்திற்கு சில அர்த்தமுள்ள பெயரைக் கொடுப்போம், எடுத்துக்காட்டாக: EC2-KinesisStreams-FullAccess. கீழே உள்ள படத்தில் காட்டப்பட்டுள்ளதைப் போலவே முடிவும் இருக்க வேண்டும்:
இந்தப் புதிய பாத்திரத்தை உருவாக்கிய பிறகு, உருவாக்கப்பட்ட மெய்நிகர் இயந்திர நிகழ்வில் அதை இணைக்க மறக்காதீர்கள்:
இந்தத் திரையில் வேறு எதையும் நாங்கள் மாற்ற மாட்டோம் மற்றும் அடுத்த சாளரங்களுக்குச் செல்கிறோம்.
ஹார்ட் டிரைவ் அமைப்புகளையும், குறிச்சொற்களையும் இயல்புநிலையாக விடலாம் (குறிச்சொற்களைப் பயன்படுத்துவது நல்ல நடைமுறை என்றாலும், குறைந்தபட்சம் நிகழ்விற்கு ஒரு பெயரைக் கொடுத்து சூழலைக் குறிக்கவும்).
இப்போது நாங்கள் படி 6 இல் இருக்கிறோம்: பாதுகாப்பு குழுவை உள்ளமைக்கவும், அங்கு நீங்கள் புதிய ஒன்றை உருவாக்க வேண்டும் அல்லது ஏற்கனவே உள்ள உங்கள் பாதுகாப்பு குழுவை குறிப்பிட வேண்டும், இது ssh (போர்ட் 22) வழியாக இணைக்க அனுமதிக்கிறது. அங்கு Source -> My IP என்பதைத் தேர்ந்தெடுத்து, நீங்கள் நிகழ்வைத் தொடங்கலாம்.
அது இயங்கும் நிலைக்கு மாறியவுடன், நீங்கள் அதை ssh வழியாக இணைக்க முயற்சி செய்யலாம்.
Kinesis முகவருடன் பணிபுரிய, இயந்திரத்துடன் வெற்றிகரமாக இணைத்த பிறகு, முனையத்தில் பின்வரும் கட்டளைகளை உள்ளிட வேண்டும்:
உள்ளமைவு கோப்பிலிருந்து பார்க்க முடிந்தால், முகவர் /var/log/airline_tickets/ கோப்பகத்தில் உள்ள .log நீட்டிப்புடன் கோப்புகளைக் கண்காணித்து, அவற்றை அலசி, airline_tickets ஸ்ட்ரீமிற்கு மாற்றுவார்.
நாங்கள் சேவையை மறுதொடக்கம் செய்து, அது இயங்குவதை உறுதிசெய்கிறோம்:
sudo service aws-kinesis-agent restart
இப்போது API இலிருந்து தரவைக் கோரும் பைதான் ஸ்கிரிப்டைப் பதிவிறக்குவோம்:
Api_caller.py ஸ்கிரிப்ட் Aviasales இலிருந்து தரவைக் கோருகிறது மற்றும் Kinesis முகவர் ஸ்கேன் செய்யும் கோப்பகத்தில் பெறப்பட்ட பதிலைச் சேமிக்கிறது. இந்த ஸ்கிரிப்டை செயல்படுத்துவது மிகவும் நிலையானது, ஒரு TicketsApi வகுப்பு உள்ளது, இது API ஐ ஒத்திசைவின்றி இழுக்க உங்களை அனுமதிக்கிறது. இந்த வகுப்பிற்கு டோக்கன் மற்றும் கோரிக்கை அளவுருக்கள் கொண்ட தலைப்பை அனுப்புகிறோம்:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
ஏஜென்ட்டின் சரியான அமைப்புகளையும் செயல்பாட்டையும் சோதிக்க, api_caller.py ஸ்கிரிப்டை இயக்குவதைச் சோதிப்போம்:
sudo ./api_caller.py TOKEN
ஏஜென்ட் பதிவுகள் மற்றும் ஏர்லைன்_டிக்கெட்ஸ் டேட்டா ஸ்ட்ரீமில் உள்ள கண்காணிப்பு தாவலில் வேலையின் முடிவைப் பார்க்கிறோம்:
நீங்கள் பார்க்க முடியும் என, எல்லாம் வேலை செய்கிறது மற்றும் கினெசிஸ் ஏஜென்ட் வெற்றிகரமாக ஸ்ட்ரீமிற்கு தரவை அனுப்புகிறது. இப்போது நுகர்வோரை உள்ளமைப்போம்.
கினெசிஸ் டேட்டா அனலிட்டிக்ஸ் அமைத்தல்
முழு அமைப்பின் மையக் கூறுகளுக்குச் செல்வோம் - Kinesis Data Analytics இல் kinesis_analytics_airlines_app என்ற புதிய பயன்பாட்டை உருவாக்கவும்:
SQL மொழியைப் பயன்படுத்தி கினசிஸ் ஸ்ட்ரீம்களில் இருந்து நிகழ்நேர தரவு பகுப்பாய்வு செய்ய கினசிஸ் டேட்டா அனலிட்டிக்ஸ் உங்களை அனுமதிக்கிறது. இது ஒரு முழுமையான ஆட்டோஸ்கேலிங் சேவையாகும் (கினெசிஸ் ஸ்ட்ரீம்களைப் போலல்லாமல்):
மூல தரவுக்கான கோரிக்கைகளின் அடிப்படையில் புதிய ஸ்ட்ரீம்களை (வெளியீட்டு ஸ்ட்ரீம்) உருவாக்க உங்களை அனுமதிக்கிறது;
பயன்பாடுகள் இயங்கும் போது ஏற்பட்ட பிழைகள் கொண்ட ஸ்ட்ரீமை வழங்குகிறது (பிழை ஸ்ட்ரீம்);
உள்ளீட்டு தரவுத் திட்டத்தைத் தானாகத் தீர்மானிக்க முடியும் (தேவைப்பட்டால் அதை கைமுறையாக மறுவரையறை செய்யலாம்).
இது மலிவான சேவை அல்ல - ஒரு மணிநேர வேலைக்கு 0.11 USD, எனவே நீங்கள் அதை கவனமாகப் பயன்படுத்த வேண்டும் மற்றும் நீங்கள் முடித்தவுடன் அதை நீக்க வேண்டும்.
பயன்பாட்டை தரவு மூலத்துடன் இணைப்போம்:
நாங்கள் இணைக்கப் போகும் ஸ்ட்ரீமைத் தேர்ந்தெடுக்கவும் (airline_tickets):
அடுத்து, நீங்கள் ஒரு புதிய IAM பாத்திரத்தை இணைக்க வேண்டும், இதனால் பயன்பாடு ஸ்ட்ரீமில் இருந்து படித்து ஸ்ட்ரீமில் எழுத முடியும். இதைச் செய்ய, அணுகல் அனுமதிகள் தொகுதியில் எதையும் மாற்றாமல் இருப்பது போதுமானது:
இப்போது ஸ்ட்ரீமில் உள்ள டேட்டா ஸ்கீமாவைக் கண்டறியக் கோருவோம்; இதைச் செய்ய, "டிஸ்கவர் ஸ்கீமா" பொத்தானைக் கிளிக் செய்யவும். இதன் விளைவாக, IAM பங்கு புதுப்பிக்கப்படும் (புதிய ஒன்று உருவாக்கப்படும்) மேலும் ஸ்ட்ரீமில் ஏற்கனவே வந்துள்ள தரவிலிருந்து ஸ்கீமா கண்டறிதல் தொடங்கப்படும்:
இப்போது நீங்கள் SQL எடிட்டருக்கு செல்ல வேண்டும். இந்த பொத்தானைக் கிளிக் செய்தால், பயன்பாட்டைத் தொடங்கும்படி கேட்கும் சாளரம் தோன்றும் - நீங்கள் தொடங்க விரும்புவதைத் தேர்ந்தெடுக்கவும்:
SQL எடிட்டர் சாளரத்தில் பின்வரும் எளிய வினவலைச் செருகவும் மற்றும் SQL ஐச் சேமித்து இயக்கவும் என்பதைக் கிளிக் செய்யவும்:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
தொடர்புடைய தரவுத்தளங்களில், பதிவுகளைச் சேர்க்க INSERT அறிக்கைகளையும் தரவை வினவ SELECT அறிக்கையையும் பயன்படுத்தி அட்டவணைகளுடன் நீங்கள் வேலை செய்கிறீர்கள். Amazon Kinesis Data Analytics இல், நீங்கள் ஸ்ட்ரீம்கள் (ஸ்ட்ரீம்கள்) மற்றும் பம்ப்கள் (பம்ப்கள்) உடன் பணிபுரிகிறீர்கள் - ஒரு பயன்பாட்டில் உள்ள ஒரு ஸ்ட்ரீமில் இருந்து தரவை மற்றொரு ஸ்ட்ரீமில் செருகும் தொடர்ச்சியான செருகல் கோரிக்கைகள்.
மேலே வழங்கப்பட்ட SQL வினவல் ஐயாயிரம் ரூபிள்களுக்குக் குறைவான விலையில் ஏரோஃப்ளோட் டிக்கெட்டுகளைத் தேடுகிறது. இந்த நிபந்தனைகளை பூர்த்தி செய்யும் அனைத்து பதிவுகளும் DESTINATION_SQL_STREAM ஸ்ட்ரீமில் வைக்கப்படும்.
டெஸ்டினேஷன் பிளாக்கில், சிறப்பு_ஸ்ட்ரீம் ஸ்ட்ரீமைத் தேர்ந்தெடுக்கவும், மேலும் இன்-அப்ளிகேஷன் ஸ்ட்ரீம் பெயரில் DESTINATION_SQL_STREAM கீழ்தோன்றும் பட்டியலில்:
அனைத்து கையாளுதல்களின் முடிவும் கீழே உள்ள படத்தைப் போலவே இருக்க வேண்டும்:
SNS தலைப்பை உருவாக்குதல் மற்றும் குழுசேர்தல்
எளிய அறிவிப்பு சேவைக்குச் சென்று, ஏர்லைன்ஸ் என்ற பெயரில் புதிய தலைப்பை உருவாக்கவும்:
இந்த தலைப்புக்கு குழுசேர்ந்து, SMS அறிவிப்புகள் அனுப்பப்படும் மொபைல் எண்ணைக் குறிப்பிடவும்:
DynamoDB இல் அட்டவணையை உருவாக்கவும்
அவர்களின் airline_tickets ஸ்ட்ரீமில் இருந்து மூலத் தரவைச் சேமிக்க, DynamoDB இல் அதே பெயரில் ஒரு அட்டவணையை உருவாக்குவோம். பதிவு_ஐடியை முதன்மை விசையாகப் பயன்படுத்துவோம்:
லாம்ப்டா செயல்பாடு சேகரிப்பாளரை உருவாக்குதல்
கலெக்டர் என்றழைக்கப்படும் ஒரு லாம்ப்டா செயல்பாட்டை உருவாக்குவோம், அதன் பணி விமான_டிக்கெட் ஸ்ட்ரீமில் வாக்களிக்க வேண்டும், மேலும் புதிய பதிவுகள் அங்கு கண்டறியப்பட்டால், இந்த பதிவுகளை DynamoDB அட்டவணையில் செருகவும். வெளிப்படையாக, இயல்புநிலை உரிமைகளுக்கு கூடுதலாக, இந்த லாம்ப்டா Kinesis தரவு ஸ்ட்ரீமைப் படிக்கவும் மற்றும் DynamoDBக்கான அணுகலை எழுதவும் வேண்டும்.
சேகரிப்பான் லாம்ப்டா செயல்பாட்டிற்கான IAM பங்கை உருவாக்குதல்
முதலில், Lambda-TicketsProcessingRole என பெயரிடப்பட்ட லாம்ப்டாவிற்கு புதிய IAM பாத்திரத்தை உருவாக்குவோம்:
சோதனை உதாரணத்திற்கு, கீழே உள்ள படத்தில் காட்டப்பட்டுள்ளபடி, முன்பே கட்டமைக்கப்பட்ட AmazonKinesisReadOnlyAccess மற்றும் AmazonDynamoDBFullAccess கொள்கைகள் மிகவும் பொருத்தமானவை:
புதிய உள்ளீடுகள் Airline_stream இல் நுழையும் போது, Kinesis இலிருந்து தூண்டுதலால் இந்த லாம்ப்டா தொடங்கப்பட வேண்டும், எனவே நாம் ஒரு புதிய தூண்டுதலைச் சேர்க்க வேண்டும்:
குறியீட்டைச் செருகவும், லாம்ப்டாவைச் சேமிக்கவும் மட்டுமே மீதமுள்ளது.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
லாம்ப்டா செயல்பாடு அறிவிப்பை உருவாக்குகிறது
இரண்டாவது லாம்ப்டா செயல்பாடு, இது இரண்டாவது ஸ்ட்ரீமை (ஸ்பெஷல்_ஸ்ட்ரீம்) கண்காணித்து, SNS க்கு அறிவிப்பை அனுப்பும், அதே வழியில் உருவாக்கப்பட்டது. எனவே, இந்த லாம்ப்டாவுக்கு கினேசிஸிலிருந்து படிக்க மற்றும் கொடுக்கப்பட்ட SNS தலைப்புக்கு செய்திகளை அனுப்புவதற்கான அணுகல் இருக்க வேண்டும், இது SNS சேவையால் இந்த தலைப்பின் அனைத்து சந்தாதாரர்களுக்கும் (மின்னஞ்சல், SMS போன்றவை) அனுப்பப்படும்.
IAM பாத்திரத்தை உருவாக்குதல்
முதலில், இந்த லாம்ப்டாவிற்காக IAM ரோல் Lambda-KinesisAlarm ஐ உருவாக்குகிறோம், பின்னர் இந்த பாத்திரத்தை உருவாக்கப்படும் alarm_notifier lambda க்கு ஒதுக்குகிறோம்:
ஸ்பெஷல்_ஸ்ட்ரீமில் புதிய பதிவுகள் நுழைவதற்கான தூண்டுதலில் இந்த லாம்ப்டா செயல்பட வேண்டும், எனவே கலெக்டர் லாம்ப்டாவிற்கு நாங்கள் செய்ததைப் போலவே தூண்டுதலையும் உள்ளமைக்க வேண்டும்.
இந்த லாம்ப்டாவை உள்ளமைப்பதை எளிதாக்க, ஒரு புதிய சூழல் மாறியை அறிமுகப்படுத்துவோம் - TOPIC_ARN, இதில் ஏர்லைன்ஸ் தலைப்பின் ANR (Amazon Recourse Names) ஐ வைப்போம்:
லாம்ப்டா குறியீட்டைச் செருகவும், இது சிக்கலானது அல்ல:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
இங்குதான் கையேடு அமைப்பு உள்ளமைவு முடிந்தது என்று தெரிகிறது. எல்லாவற்றையும் சரியாக உள்ளமைத்துள்ளோம் என்பதை சோதித்து உறுதிசெய்வதே எஞ்சியுள்ளது.
டெர்ராஃபார்ம் குறியீட்டிலிருந்து பயன்படுத்தவும்
தேவையான தயாரிப்பு
Terraform குறியீட்டிலிருந்து உள்கட்டமைப்பைப் பயன்படுத்துவதற்கு மிகவும் வசதியான திறந்த மூலக் கருவியாகும். இது அதன் சொந்த தொடரியல் உள்ளது, இது கற்றுக்கொள்வதற்கு எளிதானது மற்றும் எப்படி, எதைப் பயன்படுத்த வேண்டும் என்பதற்கான பல எடுத்துக்காட்டுகளைக் கொண்டுள்ளது. ஆட்டம் எடிட்டர் அல்லது விஷுவல் ஸ்டுடியோ குறியீடு டெர்ராஃபார்முடன் வேலை செய்வதை எளிதாக்கும் பல எளிமையான செருகுநிரல்களைக் கொண்டுள்ளது.
நீங்கள் விநியோகத்தை பதிவிறக்கம் செய்யலாம் இங்கிருந்து. அனைத்து டெர்ராஃபார்ம் திறன்களின் விரிவான பகுப்பாய்வு இந்த கட்டுரையின் எல்லைக்கு அப்பாற்பட்டது, எனவே முக்கிய புள்ளிகளுக்கு நம்மை கட்டுப்படுத்துவோம்.
எப்படி ஓடுவது
திட்டத்தின் முழு குறியீடு என் களஞ்சியத்தில். களஞ்சியத்தை நமக்கு நாமே குளோன் செய்கிறோம். தொடங்குவதற்கு முன், நீங்கள் AWS CLI ஐ நிறுவி உள்ளமைத்துள்ளீர்கள் என்பதை உறுதிப்படுத்திக் கொள்ள வேண்டும், ஏனெனில்... Terraform ஆனது ~/.aws/credentials கோப்பில் நற்சான்றிதழ்களைத் தேடும்.
டெர்ராஃபார்ம் தற்போது கிளவுட்டில் நமக்காக என்ன உருவாக்குகிறது என்பதைப் பார்க்க, முழு உள்கட்டமைப்பையும் பயன்படுத்துவதற்கு முன் திட்ட கட்டளையை இயக்குவது ஒரு நல்ல நடைமுறை:
terraform.exe plan
அறிவிப்புகளை அனுப்ப ஃபோன் எண்ணை உள்ளிடும்படி கேட்கப்படுவீர்கள். இந்த கட்டத்தில் அதை உள்ளிட வேண்டிய அவசியமில்லை.
நிரலின் செயல்பாட்டுத் திட்டத்தைப் பகுப்பாய்வு செய்த பிறகு, வளங்களை உருவாக்கத் தொடங்கலாம்:
terraform.exe apply
இந்த கட்டளையை அனுப்பிய பிறகு, நீங்கள் மீண்டும் ஒரு தொலைபேசி எண்ணை உள்ளிடுமாறு கேட்கப்படுவீர்கள்; உண்மையில் செயல்களைச் செய்வது பற்றிய கேள்வி காட்டப்படும்போது "ஆம்" என்று டயல் செய்யவும். இது முழு உள்கட்டமைப்பையும் அமைக்கவும், EC2 இன் தேவையான அனைத்து உள்ளமைவுகளையும் மேற்கொள்ளவும், லாம்ப்டா செயல்பாடுகளை வரிசைப்படுத்தவும் உங்களை அனுமதிக்கும்.
டெர்ராஃபார்ம் குறியீட்டின் மூலம் அனைத்து ஆதாரங்களும் வெற்றிகரமாக உருவாக்கப்பட்ட பிறகு, நீங்கள் Kinesis Analytics பயன்பாட்டின் விவரங்களுக்குச் செல்ல வேண்டும் (துரதிர்ஷ்டவசமாக, குறியீட்டிலிருந்து நேரடியாக இதை எப்படி செய்வது என்று நான் கண்டுபிடிக்கவில்லை).
பயன்பாட்டைத் தொடங்கவும்:
இதற்குப் பிறகு, கீழ்தோன்றும் பட்டியலில் இருந்து தேர்ந்தெடுத்து பயன்பாட்டில் உள்ள ஸ்ட்ரீம் பெயரை நீங்கள் வெளிப்படையாக அமைக்க வேண்டும்:
இப்போது எல்லாம் செல்ல தயாராக உள்ளது.
விண்ணப்பத்தை சோதித்தல்
கணினியை கைமுறையாக அல்லது டெர்ராஃபார்ம் குறியீட்டின் மூலம் நீங்கள் எவ்வாறு பயன்படுத்தினாலும், அது அப்படியே செயல்படும்.
Kinesis Agent நிறுவப்பட்டிருக்கும் EC2 மெய்நிகர் கணினியில் SSH வழியாக உள்நுழைந்து api_caller.py ஸ்கிரிப்டை இயக்குவோம்.
sudo ./api_caller.py TOKEN
நீங்கள் செய்ய வேண்டியது எல்லாம் உங்கள் எண்ணுக்கு SMS வரும் வரை காத்திருக்க வேண்டும்:
SMS - கிட்டத்தட்ட 1 நிமிடத்தில் தொலைபேசியில் ஒரு செய்தி வரும்:
அடுத்தடுத்த விரிவான பகுப்பாய்விற்காக டைனமோடிபி தரவுத்தளத்தில் பதிவுகள் சேமிக்கப்பட்டதா என்பதைப் பார்க்க வேண்டும். Airline_tickets அட்டவணையில் தோராயமாக பின்வரும் தரவு உள்ளது:
முடிவுக்கு
வேலையின் போது, அமேசான் கினிசிஸ் அடிப்படையில் ஆன்லைன் தரவு செயலாக்க அமைப்பு உருவாக்கப்பட்டது. Kinesis டேட்டா ஸ்ட்ரீம்கள் மற்றும் SQL கட்டளைகளைப் பயன்படுத்தி நிகழ்நேர பகுப்பாய்வு Kinesis Analytics ஆகியவற்றுடன் இணைந்து Kinesis முகவரைப் பயன்படுத்துவதற்கான விருப்பங்கள் மற்றும் பிற AWS சேவைகளுடன் Amazon Kinesis இன் தொடர்பு ஆகியவை பரிசீலிக்கப்பட்டன.
மேலே உள்ள அமைப்பை இரண்டு வழிகளில் பயன்படுத்தியுள்ளோம்: ஒரு நீண்ட கையேடு மற்றும் டெர்ராஃபார்ம் குறியீட்டிலிருந்து விரைவானது.
அனைத்து திட்ட மூல குறியீடும் உள்ளது எனது கிட்ஹப் களஞ்சியத்தில், அதை நீங்கள் நன்கு அறிந்திருக்குமாறு நான் பரிந்துரைக்கிறேன்.
கட்டுரையைப் பற்றி விவாதிப்பதில் மகிழ்ச்சி அடைகிறேன், உங்கள் கருத்துகளை எதிர்பார்க்கிறேன். ஆக்கபூர்வமான விமர்சனத்தை எதிர்பார்க்கிறேன்.