Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

හෙලෝ, හබ්ර්!

ඔබ පියාසර කරන ගුවන් යානා වලට කැමතිද? මම එයට කැමතියි, නමුත් ස්වයං හුදකලාව අතරතුර මම එක් ප්‍රසිද්ධ සම්පතක් වන Aviasales වෙතින් ගුවන් ටිකට්පත් පිළිබඳ දත්ත විශ්ලේෂණය කිරීමට ද ආදරය කළෙමි.

අද අපි Amazon Kinesis හි වැඩ විශ්ලේෂණය කරන්නෙමු, තත්‍ය කාලීන විශ්ලේෂණ සමඟ ප්‍රවාහ පද්ධතියක් ගොඩනඟමු, ප්‍රධාන දත්ත ගබඩාව ලෙස Amazon DynamoDB NoSQL දත්ත සමුදාය ස්ථාපනය කරන්න, සහ රසවත් ටිකට්පත් සඳහා SMS දැනුම්දීම් සකසන්නෙමු.

සියලුම විස්තර කපා ඇත! යන්න!

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

හැඳින්වීම

උදාහරණයක් ලෙස, අපට ප්රවේශය අවශ්ය වේ Aviasales API. එයට ප්‍රවේශය නොමිලේ සහ සීමාවකින් තොරව සපයනු ලැබේ; දත්ත වෙත ප්‍රවේශ වීමට ඔබේ API ටෝකනය ලබා ගැනීමට ඔබට “සංවර්ධකයින්” කොටසේ ලියාපදිංචි විය යුතුය.

මෙම ලිපියේ ප්‍රධාන අරමුණ AWS හි තොරතුරු ප්‍රවාහය භාවිතය පිළිබඳ සාමාන්‍ය අවබෝධයක් ලබා දීමයි; භාවිතා කරන ලද API මඟින් ආපසු ලබා දෙන දත්ත දැඩි ලෙස යාවත්කාලීන නොවන අතර එය හැඹිලියෙන් සම්ප්‍රේෂණය වන බව අපි සැලකිල්ලට ගනිමු. පසුගිය පැය 48 තුළ Aviasales.ru සහ Jetradar.com අඩවි භාවිතා කරන්නන් විසින් කරන ලද සෙවීම් මත පදනම්ව සාදන ලදී.

නිෂ්පාදන යන්ත්‍රයේ ස්ථාපනය කර ඇති, API හරහා ලැබෙන Kinesis-agent, ස්වයංක්‍රීයව දත්ත විග්‍රහ කර Kinesis Data Analytics හරහා අවශ්‍ය ප්‍රවාහයට දත්ත සම්ප්‍රේෂණය කරයි. මෙම ප්‍රවාහයේ අමු අනුවාදය කෙලින්ම ගබඩාවට ලියා ඇත. DynamoDB හි යොදවා ඇති අමු දත්ත ගබඩාව AWS Quick Sight වැනි BI මෙවලම් හරහා ගැඹුරු ප්‍රවේශපත්‍ර විශ්ලේෂණයට ඉඩ සලසයි.

සම්පූර්ණ යටිතල පහසුකම් යෙදවීම සඳහා අපි විකල්ප දෙකක් සලකා බලමු:

  • අත්පොත - AWS කළමනාකරණ කොන්සෝලය හරහා;
  • ටෙරාෆෝම් කේතයෙන් යටිතල පහසුකම් කම්මැලි ස්වයංක්රීයකරුවන් සඳහා වේ;

සංවර්ධිත පද්ධතියේ ගෘහ නිර්මාණ ශිල්පය

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
භාවිතා කරන සංරචක:

  • Aviasales API — මෙම API මඟින් ආපසු ලබා දෙන දත්ත සියලු පසුකාලීන වැඩ සඳහා භාවිතා කරනු ඇත;
  • EC2 නිෂ්පාදක අවස්ථාව — ආදාන දත්ත ප්‍රවාහය ජනනය කරනු ලබන වලාකුළෙහි සාමාන්‍ය අතථ්‍ය යන්ත්‍රයක්:
    • Kinesis නියෝජිතයා Kinesis (Kinesis Data Streams හෝ Kinesis Firehose) වෙත දත්ත රැස් කිරීමට සහ යැවීමට පහසු ක්‍රමයක් සපයන යන්ත්‍රයේ දේශීයව ස්ථාපනය කර ඇති ජාවා යෙදුමකි. නියෝජිතයා විසින් නිශ්චිත නාමාවලිවල ගොනු කට්ටලයක් නිරන්තරයෙන් නිරීක්ෂණය කරන අතර Kinesis වෙත නව දත්ත යවයි;
    • API ඇමතුම් ස්ක්‍රිප්ට් — API වෙත ඉල්ලීම් කරන පයිතන් ස්ක්‍රිප්ට් එකක් සහ ප්‍රතිචාරය Kinesis නියෝජිතයා විසින් නිරීක්ෂණය කරනු ලබන ෆෝල්ඩරයකට දමන්න;
  • Kinesis දත්ත ප්‍රවාහ - පුළුල් පරිමාණ හැකියාවන් සහිත තත්‍ය කාලීන දත්ත ප්‍රවාහ සේවාව;
  • Kinesis Analytics තත්‍ය කාලීන ප්‍රවාහ දත්ත විශ්ලේෂණය සරල කරන සේවාදායක රහිත සේවාවකි. Amazon Kinesis Data Analytics යෙදුම් සම්පත් වින්‍යාස කරන අතර ලැබෙන ඕනෑම දත්ත පරිමාවක් හැසිරවීමට ස්වයංක්‍රීයව පරිමාණය කරයි;
  • ඒඩබ්ලිව්එස් ලැම්බඩා — උපස්ථ කිරීම හෝ සේවාදායකයන් පිහිටුවීමකින් තොරව කේතය ධාවනය කිරීමට ඔබට ඉඩ සලසන සේවාවකි. සෑම ඇමතුමක් සඳහාම සියලුම පරිගණක බලය ස්වයංක්‍රීයව පරිමාණය කෙරේ;
  • Amazon DynamoDB - ඕනෑම පරිමාණයකින් ක්‍රියාත්මක වන විට මිලි තත්පර 10කට වඩා අඩු ප්‍රමාදයක් ලබා දෙන යතුරු-අගය යුගල සහ ලේඛනවල දත්ත සමුදායක්. DynamoDB භාවිතා කරන විට, ඔබට කිසිදු සේවාදායකයක් සැපයීම, පැච් කිරීම හෝ කළමනාකරණය කිරීම අවශ්‍ය නොවේ. DynamoDB පවතින සම්පත් ප්‍රමාණය වෙනස් කිරීමට සහ ඉහළ කාර්ය සාධනයක් පවත්වා ගැනීමට වගු ස්වයංක්‍රීයව පරිමාණය කරයි. පද්ධති පරිපාලනය අවශ්ය නොවේ;
  • ඇමේසන් එස්එන්එස් - ප්‍රකාශක-ග්‍රාහක (Pub/Sub) ආකෘතිය භාවිතයෙන් පණිවිඩ යැවීම සඳහා පූර්ණ කළමනාකරණය කළ සේවාවක්, ඔබට ක්ෂුද්‍ර සේවා, බෙදා හරින ලද පද්ධති සහ සේවාදායක රහිත යෙදුම් හුදකලා කළ හැකිය. ජංගම තල්ලු දැනුම්දීම්, SMS පණිවිඩ සහ ඊමේල් හරහා අවසාන පරිශීලකයින්ට තොරතුරු යැවීමට SNS භාවිතා කළ හැක.

මූලික පුහුණුව

දත්ත ප්‍රවාහය අනුකරණය කිරීම සඳහා, Aviasales API විසින් ආපසු ලබා දුන් ගුවන් ටිකට්පත් තොරතුරු භාවිතා කිරීමට මම තීරණය කළෙමි. තුල ලියකියවිලි විවිධ ක්‍රමවල ඉතා පුළුල් ලැයිස්තුවක්, අපි ඒවායින් එකක් ගනිමු - “මාසික මිල දින දර්ශනය”, එය මාරුවීම් ගණන අනුව කාණ්ඩගත කර මාසයේ සෑම දිනකම මිල ගණන් ලබා දෙයි. ඔබ ඉල්ලීමෙහි සෙවුම් මාසය සඳහන් නොකරන්නේ නම්, වත්මන් මාසයට පසුව මාසය සඳහා තොරතුරු ආපසු ලබා දෙනු ඇත.

ඉතින්, අපි ලියාපදිංචි වී අපගේ ටෝකනය ලබා ගනිමු.

උදාහරණ ඉල්ලීමක් පහත දැක්වේ:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

ඉල්ලීමෙහි ටෝකනයක් සඳහන් කිරීමෙන් API වෙතින් දත්ත ලබා ගැනීමේ ඉහත ක්‍රමය ක්‍රියාත්මක වනු ඇත, නමුත් මම ප්‍රවේශ ටෝකනය ශීර්ෂකය හරහා යැවීමට කැමැත්තෙමි, එබැවින් අපි මෙම ක්‍රමය api_caller.py ස්ක්‍රිප්ට් තුළ භාවිතා කරමු.

පිළිතුර උදාහරණය:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

ඉහත උදාහරණ API ප්‍රතිචාරය ශාන්ත පීටර්ස්බර්ග් සිට ෆුක් දක්වා ටිකට් පතක් පෙන්වයි... ඔහ්, මොනතරම් සිහිනයක්ද...
මම කසාන් සිට, සහ ෆුකෙට් දැන් "සිහිනයක් පමණක්" බැවින්, අපි ශාන්ත පීටර්ස්බර්ග් සිට කසාන් දක්වා ටිකට්පත් සොයා බලමු.

ඔබට දැනටමත් AWS ගිණුමක් ඇති බව උපකල්පනය කරයි. කයිනෙසිස් සහ කෙටි පණිවුඩ හරහා දැනුම්දීම් යැවීම වාර්ෂිකව ඇතුළත් කර නොමැති බව වහාම විශේෂ අවධානයට යොමු කිරීමට කැමැත්තෙමි. නිදහස් ස්ථරය (නොමිලේ භාවිතය). නමුත් එසේ තිබියදීත්, ඩොලර් කිහිපයක් මනසේ තබාගෙන, යෝජිත පද්ධතිය ගොඩනඟා එය සමඟ සෙල්ලම් කිරීම තරමක් කළ හැකිය. තවද, ඇත්ත වශයෙන්ම, ඒවා තවදුරටත් අවශ්ය නොවන පසු සියලු සම්පත් මකා දැමීමට අමතක නොකරන්න.

වාසනාවකට මෙන්, අපි අපගේ මාසික නොමිලේ සීමාවන් සපුරාලන්නේ නම්, DynamoDb සහ lambda කාර්යයන් අපට නොමිලේ වනු ඇත. උදාහරණයක් ලෙස, DynamoDB සඳහා: 25 GB ආචයනය, 25 WCU/RCU සහ මිලියන 100 විමසුම්. සහ මසකට ලැම්ඩා ක්‍රියාකාරී ඇමතුම් මිලියනයක්.

අතින් පද්ධති යෙදවීම

Kinesis දත්ත ප්‍රවාහයන් පිහිටුවීම

අපි Kinesis Data Streams සේවාව වෙත ගොස් නව ප්‍රවාහ දෙකක්, එක් කැබැල්ලක් බැගින් නිර්මාණය කරමු.

ෂර්ඩ් යනු කුමක්ද?
shard යනු Amazon Kinesis ප්‍රවාහයක මූලික දත්ත හුවමාරු ඒකකයයි. එක් අංශයක් 1 MB/s වේගයකින් ආදාන දත්ත හුවමාරුව සහ 2 MB/s වේගයකින් ප්‍රතිදාන දත්ත හුවමාරුව සපයයි. එක් අංශයක් තත්පරයකට PUT ඇතුළත් කිරීම් 1000ක් දක්වා සහය දක්වයි. දත්ත ප්‍රවාහයක් නිර්මාණය කිරීමේදී, ඔබට අවශ්‍ය කොටස් ගණන සඳහන් කළ යුතුය. උදාහරණයක් ලෙස, ඔබට කොටස් දෙකක් සහිත දත්ත ප්‍රවාහයක් සෑදිය හැක. මෙම දත්ත ප්‍රවාහය 2 MB/s හි ආදාන දත්ත හුවමාරුවක් සහ 4 MB/s හි ප්‍රතිදාන දත්ත හුවමාරුවක් සපයනු ඇත, තත්පරයකට PUT වාර්තා 2000 දක්වා සහය දක්වයි.

ඔබේ ප්‍රවාහයේ කැබලි වැඩි වන තරමට එහි ප්‍රවාහය වැඩි වේ. ප්‍රතිපත්තිමය වශයෙන්, ප්‍රවාහයන් පරිමාණය කරන්නේ එලෙසයි - කැබලි එකතු කිරීමෙන්. නමුත් ඔබ සතුව ඇති කැබලි වැඩි වන තරමට මිල වැඩි වේ. සෑම කැබැල්ලක්ම පැයකට ශත 1,5 ක් සහ සෑම PUT ගෙවුම් ඒකක මිලියනයක් සඳහා අමතර ශත 1.4 ක් වැය වේ.

නමින් අලුත් ප්‍රවාහයක් නිර්මාණය කරමු ගුවන්_ප්‍රවේශපත්, 1 කෑල්ලක් ඔහුට ප්රමාණවත් වනු ඇත:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
දැන් අපි නමත් එක්ක තව ත්‍රෙඩ් එකක් හදමු විශේෂ_ප්‍රවාහය:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

නිෂ්පාදක සැකසුම

කාර්යයක් විශ්ලේෂණය කිරීමට, දත්ත නිෂ්පාදකයෙකු ලෙස සාමාන්‍ය EC2 අවස්ථාවක් භාවිතා කිරීම ප්‍රමාණවත් වේ. එය බලවත්, මිල අධික අතථ්‍ය යන්ත්‍රයක් විය යුතු නැත; ස්පෝට් t2.මයික්‍රෝ ඉතා හොඳින් ක්‍රියා කරයි.

වැදගත් සටහන: උදාහරණයක් ලෙස, ඔබ රූපය භාවිතා කළ යුතුය - Amazon Linux AMI 2018.03.0, එය Kinesis නියෝජිතයා ඉක්මනින් දියත් කිරීම සඳහා අඩු සැකසුම් ඇත.

EC2 සේවාව වෙත යන්න, නව අථත්‍ය යන්ත්‍රයක් සාදන්න, නිදහස් ස්ථරයට ඇතුළත් කර ඇති t2.micro වර්ගය සමඟ කැමති AMI තෝරන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
අලුතින් නිර්මාණය කරන ලද අතථ්‍ය යන්ත්‍රයට Kinesis සේවාව සමඟ අන්තර් ක්‍රියා කිරීමට හැකි වීම සඳහා, එය කිරීමට අයිතිය ලබා දිය යුතුය. මෙය කිරීමට හොඳම ක්රමය වන්නේ IAM භූමිකාවක් පැවරීමයි. එබැවින්, පියවර 3: වින්‍යාස කිරීමේ අවස්ථා විස්තර තිරයේ, ඔබ තෝරාගත යුතුය නව IAM භූමිකාව සාදන්න:

EC2 සඳහා IAM භූමිකාවක් නිර්මාණය කිරීම
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
විවෘත වන කවුළුව තුළ, අපි EC2 සඳහා නව භූමිකාවක් නිර්මාණය කරන බව තෝරන්න සහ අවසර අංශය වෙත යන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
පුහුණු උදාහරණය භාවිතා කරමින්, සම්පත් හිමිකම් පිළිබඳ කැටිති වින්‍යාස කිරීමේ සියලු සංකීර්ණතා වෙත යාමට අපට අවශ්‍ය නැත, එබැවින් අපි Amazon විසින් පෙර-වින්‍යාස කළ ප්‍රතිපත්ති තෝරා ගනිමු: AmazonKinesisFullAccess සහ CloudWatchFullAccess.

අපි මෙම භූමිකාව සඳහා අර්ථවත් නමක් ලබා දෙමු, උදාහරණයක් ලෙස: EC2-KinesisStreams-FullAccess. පහත පින්තූරයේ පෙන්වා ඇති පරිදි ප්රතිඵලය සමාන විය යුතුය:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
මෙම නව භූමිකාව නිර්මාණය කිරීමෙන් පසු, එය නිර්මාණය කරන ලද අථත්‍ය යන්ත්‍ර අවස්ථාවට එය අමුණන්න අමතක කරන්න එපා:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
අපි මෙම තිරයේ වෙනත් කිසිවක් වෙනස් නොකරන අතර ඊළඟ කවුළු වෙත යන්න.

දෘඪ තැටි සැකසුම් පෙරනිමි ලෙස තැබිය හැකිය, ටැග් (ටැග් භාවිතා කිරීම හොඳ පුරුද්දක් වුවද, අවම වශයෙන් අවස්ථාවට නමක් ලබා දී පරිසරය සඳහන් කරන්න).

දැන් අපි සිටින්නේ පියවර 6: ආරක්ෂක කණ්ඩායම් වින්‍යාස කරන්න පටිත්තෙහි, ඔබට නව එකක් තැනීමට හෝ ඔබගේ පවතින ආරක්ෂක කණ්ඩායම සඳහන් කිරීමට අවශ්‍ය වේ, එමඟින් ඔබට ssh (වරාය 22) හරහා එම අවස්ථාවට සම්බන්ධ වීමට ඉඩ සලසයි. එහි Source -> My IP තෝරන්න එවිට ඔබට එම අවස්ථාව දියත් කළ හැක.

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
එය ධාවන තත්ත්වයට මාරු වූ වහාම, ඔබට ssh හරහා එයට සම්බන්ධ වීමට උත්සාහ කළ හැකිය.

Kinesis Agent සමඟ වැඩ කිරීමට හැකිවීම සඳහා, යන්ත්‍රයට සාර්ථකව සම්බන්ධ වූ පසු, ඔබ ටර්මිනලයට පහත විධාන ඇතුළත් කළ යුතුය:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

API ප්‍රතිචාර සුරැකීමට ෆෝල්ඩරයක් සාදා ගනිමු:

sudo mkdir /var/log/airline_tickets

නියෝජිතයා ආරම්භ කිරීමට පෙර, ඔබ එහි වින්‍යාසය වින්‍යාසගත කළ යුතුය:

sudo vim /etc/aws-kinesis/agent.json

agent.json ගොනුවේ අන්තර්ගතය මෙසේ විය යුතුය:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

වින්‍යාස ගොනුවෙන් දැකිය හැකි පරිදි, නියෝජිතයා විසින් /var/log/airline_tickets/ බහලුම තුළ .log දිගුව සහිත ගොනු නිරීක්ෂණය කර, ඒවා විග්‍රහ කර airline_tickets ප්‍රවාහයට මාරු කරනු ඇත.

අපි සේවාව නැවත ආරම්භ කර එය ක්‍රියාත්මක වන බවට වග බලා ගන්න:

sudo service aws-kinesis-agent restart

දැන් අපි API වෙතින් දත්ත ඉල්ලා සිටින පයිතන් ස්ක්‍රිප්ට් බාගත කරමු:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py ස්ක්‍රිප්ටය Aviasales වෙතින් දත්ත ඉල්ලා සිටින අතර ලැබුණු ප්‍රතිචාරය Kinesis නියෝජිතයා ස්කෑන් කරන නාමාවලියෙහි සුරකියි. මෙම ස්ක්‍රිප්ට් ක්‍රියාත්මක කිරීම තරමක් සම්මතයි, TicketsApi පන්තියක් ඇත, එය ඔබට API අසමමුහුර්තව ඇද ගැනීමට ඉඩ සලසයි. අපි ටෝකනයක් සහිත ශීර්ෂයක් ලබා දී මෙම පන්තියට පරාමිති ඉල්ලමු:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

නියෝජිතයාගේ නිවැරදි සැකසුම් සහ ක්‍රියාකාරීත්වය පරීක්ෂා කිරීමට, අපි api_caller.py ස්ක්‍රිප්ට් ධාවනය පරීක්ෂා කරමු:

sudo ./api_caller.py TOKEN

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
තවද අපි නියෝජිත ලොගවල සහ එයාර්ලයින්_ටිකට් දත්ත ප්‍රවාහයේ අධීක්ෂණ පටිත්තෙහි කාර්යයේ ප්‍රතිඵලය දෙස බලමු:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
ඔබට පෙනෙන පරිදි, සෑම දෙයක්ම ක්රියාත්මක වන අතර Kinesis නියෝජිතයා සාර්ථකව ප්රවාහයට දත්ත යවයි. දැන් අපි පාරිභෝගිකයා වින්‍යාස කරමු.

Kinesis දත්ත විශ්ලේෂණ සැකසීම

අපි සම්පූර්ණ පද්ධතියේ කේන්ද්‍රීය අංගය වෙත යමු - Kinesis Data Analytics හි kinesis_analytics_airlines_app නමින් නව යෙදුමක් සාදන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Kinesis Data Analytics ඔබට SQL භාෂාව භාවිතයෙන් Kinesis Streams වෙතින් තත්‍ය කාලීන දත්ත විශ්ලේෂණ සිදු කිරීමට ඉඩ සලසයි. එය සම්පුර්ණ ස්වයං පරිමාණ සේවාවකි (Kinesis Streams මෙන් නොව):

  1. මූලාශ්‍ර දත්ත සඳහා වන ඉල්ලීම් මත පදනම්ව නව ප්‍රවාහ (ප්‍රතිදාන ප්‍රවාහ) නිර්මාණය කිරීමට ඔබට ඉඩ සලසයි;
  2. යෙදුම් ක්‍රියාත්මක වන විට සිදු වූ දෝෂ සහිත ප්‍රවාහයක් සපයයි (දෝෂ ප්‍රවාහය);
  3. ආදාන දත්ත ක්‍රමය ස්වයංක්‍රීයව තීරණය කළ හැක (අවශ්‍ය නම් එය අතින් නැවත අර්ථ දැක්විය හැක).

මෙය ලාභදායී සේවාවක් නොවේ - වැඩ කරන පැයකට USD 0.11, එබැවින් ඔබ එය ප්රවේශමෙන් භාවිතා කළ යුතු අතර ඔබ අවසන් වූ පසු එය මකා දැමිය යුතුය.

අපි යෙදුම දත්ත මූලාශ්‍රයට සම්බන්ධ කරමු:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
අපි සම්බන්ධ කිරීමට යන ප්‍රවාහය තෝරන්න (airline_tickets):

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
ඊළඟට, ඔබ යෙදුමට ප්‍රවාහයෙන් කියවා ප්‍රවාහයට ලිවීමට හැකි වන පරිදි නව IAM භූමිකාවක් ඇමිණිය යුතුය. මෙය සිදු කිරීම සඳහා, ප්‍රවේශ අවසර කොටසේ කිසිවක් වෙනස් නොකිරීමට ප්‍රමාණවත් වේ:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
දැන් අපි ප්‍රවාහයේ දත්ත යෝජනා ක්‍රමය සොයා ගැනීමට ඉල්ලා සිටිමු; මෙය කිරීමට, "Discover schema" බොත්තම ක්ලික් කරන්න. එහි ප්‍රතිඵලයක් ලෙස, IAM භූමිකාව යාවත්කාලීන වනු ඇත (නව එකක් සාදනු ඇත) සහ දැනටමත් ප්‍රවාහයට පැමිණ ඇති දත්ත වලින් ක්‍රම හඳුනාගැනීම දියත් කෙරේ:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
දැන් ඔබ SQL සංස්කාරකය වෙත යා යුතුය. ඔබ මෙම බොත්තම මත ක්ලික් කළ විට, යෙදුම දියත් කරන ලෙස ඉල්ලා සිටින කවුළුවක් දිස්වනු ඇත - ඔබට දියත් කිරීමට අවශ්‍ය දේ තෝරන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
පහත සරල විමසුම SQL සංස්කාරක කවුළුවට ඇතුළු කර SQL සුරකින්න සහ ධාවනය කරන්න ක්ලික් කරන්න:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

සම්බන්ධක දත්ත සමුදායන්හිදී, ඔබ වාර්තා එක් කිරීමට INSERT ප්‍රකාශ සහ දත්ත විමසීමට SELECT ප්‍රකාශයක් භාවිතා කරමින් වගු සමඟ වැඩ කරයි. Amazon Kinesis Data Analytics හි, ඔබ ප්‍රවාහ (STREAMs) සහ පොම්ප (PUMPs) සමඟ වැඩ කරයි - යෙදුමක එක් ප්‍රවාහයකින් දත්ත වෙනත් ප්‍රවාහයකට ඇතුළු කරන අඛණ්ඩ ඇතුළු කිරීමේ ඉල්ලීම්.

ඉහත ඉදිරිපත් කර ඇති SQL විමසුම රූබල් පන්දහසකට වඩා අඩු වියදමකින් Aeroflot ටිකට් සඳහා සෙවුම් කරයි. මෙම කොන්දේසි සපුරාලන සියලුම වාර්තා DESTINATION_SQL_STREAM ප්‍රවාහයේ තබනු ඇත.

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
ගමනාන්ත කොටසෙහි, විශේෂ_ප්‍රවාහ ප්‍රවාහය තෝරන්න, සහ යෙදුම තුළ ප්‍රවාහ නාමයෙන් DESTINATION_SQL_STREAM පතන ලැයිස්තුව:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
සියලුම උපාමාරු වල ප්‍රතිඵලය පහත පින්තූරයට සමාන දෙයක් විය යුතුය:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

SNS මාතෘකාවක් නිර්මාණය කිරීම සහ දායක වීම

සරල දැනුම්දීම් සේවාව වෙත ගොස් එහි ගුවන් සේවය නමින් නව මාතෘකාවක් සාදන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
මෙම මාතෘකාවට දායක වී SMS දැනුම්දීම් යවනු ලබන ජංගම දුරකථන අංකය සඳහන් කරන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

DynamoDB හි වගුවක් සාදන්න

ඔවුන්ගේ එයාර්ලයින්_ටිකට් ප්‍රවාහයෙන් අමු දත්ත ගබඩා කිරීමට, අපි DynamoDB හි එම නමින්ම වගුවක් නිර්මාණය කරමු. අපි Record_id මූලික යතුර ලෙස භාවිතා කරන්නෙමු:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

ලැම්ඩා ශ්‍රිත එකතුකරන්නෙකු නිර්මාණය කිරීම

අපි කලෙක්ටර් නමින් ලැම්ඩා ශ්‍රිතයක් නිර්මාණය කරමු, එහි කාර්යය වන්නේ එයාර්ලයින්_ටිකට් ප්‍රවාහය ඡන්ද විමසීම සහ එහි නව වාර්තා හමු වුවහොත්, මෙම වාර්තා DynamoDB වගුවට ඇතුළු කිරීමයි. පැහැදිලිවම, පෙරනිමි හිමිකම් වලට අමතරව, මෙම lambda හට Kinesis දත්ත ප්‍රවාහයට කියවීමට සහ DynamoDB වෙත ලිවීමට ප්‍රවේශය තිබිය යුතුය.

එකතුකරන්නා lambda කාර්යය සඳහා IAM භූමිකාවක් නිර්මාණය කිරීම
පළමුව, Lambda-TicketsProcessingRole නමින් lambda සඳහා නව IAM භූමිකාවක් නිර්මාණය කරමු:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
පරීක්ෂණ උදාහරණය සඳහා, පහත පින්තූරයේ පෙන්වා ඇති පරිදි, පෙර-වින්‍යාසගත AmazonKinesisReadOnlyAccess සහ AmazonDynamoDBFullAccess ප්‍රතිපත්ති බෙහෙවින් සුදුසු ය:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

මෙම lambda නව ප්‍රවේශයන් airline_stream වෙත ඇතුළු වන විට Kinesis වෙතින් ප්‍රේරකයක් මඟින් දියත් කළ යුතුය, එබැවින් අපට නව ප්‍රේරකයක් එක් කිරීමට අවශ්‍ය වේ:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
ඉතිරිව ඇත්තේ කේතය ඇතුළු කිරීම සහ ලැම්ඩා සුරැකීම පමණි.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

lambda function notifier එකක් නිර්මාණය කිරීම

දෙවන ප්‍රවාහය (විශේෂ_ප්‍රවාහය) නිරීක්ෂණය කර SNS වෙත දැනුම්දීමක් යවන දෙවන lambda ශ්‍රිතය ද ඒ හා සමාන ආකාරයකින් නිර්මාණය කර ඇත. එමනිසා, මෙම ලැම්ඩාට Kinesis වෙතින් කියවීමට සහ ලබා දී ඇති SNS මාතෘකාවකට පණිවිඩ යැවීමට ප්‍රවේශය තිබිය යුතු අතර, එය SNS සේවාව මගින් මෙම මාතෘකාවේ සියලුම ග්‍රාහකයින් වෙත යවනු ලැබේ (ඊමේල්, SMS, ආදිය).

IAM භූමිකාවක් නිර්මාණය කිරීම
පළමුව, අපි මෙම lambda සඳහා IAM භූමිකාව Lambda-KinesisAlarm නිර්මාණය කර, පසුව මෙම භූමිකාව නිර්මාණය කරන alarm_notifier lambda වෙත පවරමු:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

මෙම ලැම්ඩා විශේෂ_ප්‍රවාහයට ඇතුළු වීමට නව වාර්තා සඳහා ප්‍රේරකයක් මත ක්‍රියා කළ යුතුය, එබැවින් අපි එකතු කරන්නා ලැම්ඩා සඳහා කළ ආකාරයටම ඔබට ප්‍රේරකය වින්‍යාස කිරීමට අවශ්‍ය වේ.

මෙම lambda වින්‍යාස කිරීම පහසු කිරීම සඳහා, අපි නව පරිසර විචල්‍යයක් හඳුන්වා දෙමු - TOPIC_ARN, එහිදී අපි එයාර්ලයින්ස් මාතෘකාවේ ANR (Amazon Recourse Names) ස්ථානගත කරමු:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
ලැම්ඩා කේතය ඇතුළු කරන්න, එය කිසිසේත් සංකීර්ණ නොවේ:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

මෙය අතින් පද්ධති වින්‍යාසය සම්පූර්ණ කර ඇති බව පෙනේ. ඉතිරිව ඇත්තේ අප සියල්ල නිවැරදිව වින්‍යාස කර ඇති බව පරීක්ෂා කර සහතික කර ගැනීමයි.

Terraform කේතයෙන් යෙදවීම

අවශ්ය සූදානම

භූමිෂ් .ය කේතයෙන් යටිතල පහසුකම් යෙදවීම සඳහා ඉතා පහසු විවෘත මූලාශ්‍ර මෙවලමකි. එය ඉගෙන ගැනීමට පහසු වන තමන්ගේම වාක්‍ය ඛණ්ඩයක් ඇති අතර යෙදවිය යුතු ආකාරය සහ කුමක් ද යන්න පිළිබඳ බොහෝ උදාහරණ ඇත. Atom සංස්කාරකය හෝ විෂුවල් ස්ටුඩියෝ කේතය ටෙරාෆෝම් සමඟ වැඩ කිරීම පහසු කරන බොහෝ පහසු ප්ලගීන ඇත.

ඔබට බෙදා හැරීම බාගත කළ හැකිය මෙතනින්. සියලුම ටෙරාෆෝම් හැකියාවන් පිළිබඳ සවිස්තරාත්මක විශ්ලේෂණයක් මෙම ලිපියේ විෂය පථයෙන් ඔබ්බට ය, එබැවින් අපි ප්‍රධාන කරුණු වලට පමණක් සීමා වෙමු.

ආරම්භ කරන්නේ කෙසේද?

ව්යාපෘතියේ සම්පූර්ණ කේතය වේ මගේ ගබඩාවේ. අපි ගබඩාව අපටම ක්ලෝන කරමු. ආරම්භ කිරීමට පෙර, ඔබ AWS CLI ස්ථාපනය කර වින්‍යාස කර ඇති බව සහතික කර ගත යුතුය, මන්ද... Terraform ~/.aws/credentials ගොනුව තුළ අක්තපත්‍ර සොයනු ඇත.

හොඳ පරිචයක් නම්, ටෙරාෆෝම් දැනට වලාකුළ තුළ අප වෙනුවෙන් නිර්මාණය කරන්නේ කුමක් දැයි බැලීමට සම්පූර්ණ යටිතල පහසුකම් යෙදවීමට පෙර සැලසුම් විධානය ක්‍රියාත්මක කිරීමයි:

terraform.exe plan

දැනුම්දීම් යැවීමට දුරකථන අංකයක් ඇතුළත් කිරීමට ඔබෙන් විමසනු ඇත. මෙම අදියරේදී එය ඇතුල් කිරීම අවශ්ය නොවේ.

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
වැඩසටහනේ මෙහෙයුම් සැලැස්ම විශ්ලේෂණය කිරීමෙන් පසු, අපට සම්පත් නිර්මාණය කිරීම ආරම්භ කළ හැකිය:

terraform.exe apply

මෙම විධානය යැවීමෙන් පසු, ඔබට නැවත දුරකථන අංකයක් ඇතුළත් කිරීමට අසනු ඇත; ඇත්ත වශයෙන්ම ක්‍රියාවන් සිදු කිරීම පිළිබඳ ප්‍රශ්නයක් පෙන්වන විට "ඔව්" අමතන්න. මෙය ඔබට සම්පූර්ණ යටිතල පහසුකම් සැකසීමට, EC2 හි අවශ්‍ය සියලුම වින්‍යාසයන් සිදු කිරීමට, ලැම්ඩා කාර්යයන් යෙදවීමට යනාදියට ඉඩ සලසයි.

Terraform කේතය හරහා සියලුම සම්පත් සාර්ථකව නිර්මාණය කළ පසු, ඔබ Kinesis Analytics යෙදුමේ විස්තර වෙත යා යුතුය (අවාසනාවකට, කේතයෙන් මෙය කෙළින්ම කරන්නේ කෙසේදැයි මම සොයා ගත්තේ නැත).

යෙදුම දියත් කරන්න:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
මෙයින් පසු, ඔබ පතන ලැයිස්තුවෙන් තේරීමෙන් යෙදුම තුළ ප්‍රවාහ නාමය පැහැදිලිව සැකසිය යුතුය:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
දැන් යන්න සියල්ල සූදානම්.

යෙදුම පරීක්ෂා කිරීම

ඔබ අතින් හෝ Terraform කේතය හරහා පද්ධතිය යෙදවූ ආකාරය කුමක් වුවත්, එය එලෙසම ක්‍රියා කරයි.

අපි SSH හරහා Kinesis Agent ස්ථාපනය කර ඇති EC2 අතථ්‍ය යන්ත්‍රයට ලොග් වී api_caller.py ස්ක්‍රිප්ට් ධාවනය කරන්නෙමු.

sudo ./api_caller.py TOKEN

ඔබ කළ යුත්තේ ඔබගේ අංකයට කෙටි පණිවිඩයක් එනතෙක් බලා සිටීමයි.

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
SMS - විනාඩි 1කින් පමණ ඔබගේ දුරකථනයට පණිවිඩයක් පැමිණේ:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව
පසුව වඩාත් සවිස්තරාත්මක විශ්ලේෂණයක් සඳහා වාර්තා DynamoDB දත්ත ගබඩාවේ සුරකින ලද්දේ දැයි බැලීමට ඉතිරිව ඇත. Airline_tickets වගුවේ ආසන්න වශයෙන් පහත දත්ත අඩංගු වේ:

Aviasales API Amazon Kinesis සමඟ ඒකාබද්ධ කිරීම සහ සේවාදායක රහිත සරල බව

නිගමනය

සිදු කරන ලද කාර්යය අතරතුර, Amazon Kinesis මත පදනම්ව මාර්ගගත දත්ත සැකසුම් පද්ධතියක් ගොඩනගා ඇත. Kinesis දත්ත ප්‍රවාහ සහ SQL විධාන භාවිතා කරන තත්‍ය කාලීන විශ්ලේෂණ Kinesis Analytics සමඟ ඒකාබද්ධව Kinesis නියෝජිතයා භාවිතා කිරීමේ විකල්ප මෙන්ම අනෙකුත් AWS සේවාවන් සමඟ Amazon Kinesis අන්තර්ක්‍රියා සලකා බලන ලදී.

අපි ඉහත පද්ධතිය ක්‍රම දෙකකින් යෙදුවෙමු: තරමක් දිගු අත්පොත සහ ටෙරාෆෝම් කේතයෙන් ඉක්මන් එකක්.

සියලුම ව්‍යාපෘති මූල කේතය තිබේ මගේ GitHub ගබඩාවේ, මම ඔබට එය හුරු පුරුදු කිරීමට යෝජනා කරනවා.

ලිපිය ගැන සාකච්ඡා කිරීමට ලැබීම ගැන මම සතුටු වෙමි, ඔබගේ අදහස් බලාපොරොත්තු වෙමි. නිර්මාණාත්මක විවේචනයක් බලාපොරොත්තු වෙමි.

ඔබට සාර්ථක වේවා!

මූලාශ්රය: www.habr.com

අදහස් එක් කරන්න