Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

ارے حبر!

کیا آپ کو اڑنے والے ہوائی جہاز پسند ہیں؟ مجھے یہ پسند ہے، لیکن خود کو الگ تھلگ کرنے کے دوران مجھے ایک معروف وسیلہ - Aviasales سے ہوائی ٹکٹوں کے ڈیٹا کا تجزیہ کرنے سے بھی پیار ہو گیا۔

آج ہم Amazon Kinesis کے کام کا تجزیہ کریں گے، ریئل ٹائم اینالیٹکس کے ساتھ ایک اسٹریمنگ سسٹم بنائیں گے، Amazon DynamoDB NoSQL ڈیٹا بیس کو مرکزی ڈیٹا اسٹوریج کے طور پر انسٹال کریں گے، اور دلچسپ ٹکٹوں کے لیے SMS اطلاعات مرتب کریں گے۔

تمام تفصیلات کٹ کے نیچے ہیں! جاؤ!

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

تعارف

مثال کے طور پر، ہمیں رسائی کی ضرورت ہے۔ Aviasales API. اس تک رسائی مفت اور بغیر کسی پابندی کے فراہم کی جاتی ہے؛ ڈیٹا تک رسائی حاصل کرنے کے لیے آپ کو اپنا API ٹوکن حاصل کرنے کے لیے صرف "Developers" سیکشن میں رجسٹر کرنے کی ضرورت ہے۔

اس مضمون کا بنیادی مقصد AWS میں انفارمیشن اسٹریمنگ کے استعمال کے بارے میں ایک عام فہم دینا ہے؛ ہم اس بات کو مدنظر رکھتے ہیں کہ استعمال شدہ API کے ذریعے واپس کیا گیا ڈیٹا سختی سے اپ ٹو ڈیٹ نہیں ہے اور اسے کیشے سے منتقل کیا جاتا ہے، جو کہ گزشتہ 48 گھنٹوں کے دوران Aviasales.ru اور Jetradar.com سائٹس کے صارفین کی تلاش کی بنیاد پر تشکیل دیا گیا ہے۔

API کے ذریعے موصول ہونے والی پروڈیوسنگ مشین پر انسٹال ہونے والا Kinesis-Agent خود بخود تجزیہ کرے گا اور Kinesis Data Analytics کے ذریعے مطلوبہ سٹریم میں ڈیٹا منتقل کرے گا۔ اس سلسلے کا خام ورژن براہ راست اسٹور پر لکھا جائے گا۔ DynamoDB میں تعینات خام ڈیٹا اسٹوریج BI ٹولز، جیسے AWS Quick Sight کے ذریعے ٹکٹ کے گہرے تجزیہ کی اجازت دے گا۔

ہم پورے انفراسٹرکچر کی تعیناتی کے لیے دو اختیارات پر غور کریں گے:

  • دستی - AWS مینجمنٹ کنسول کے ذریعے؛
  • Terraform کوڈ سے انفراسٹرکچر سست آٹومیٹرز کے لیے ہے۔

ترقی یافتہ نظام کا فن تعمیر

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
استعمال شدہ اجزاء:

  • Aviasales API - اس API کے ذریعہ واپس کردہ ڈیٹا کو بعد کے تمام کاموں کے لیے استعمال کیا جائے گا۔
  • EC2 پروڈیوسر مثال - کلاؤڈ میں ایک باقاعدہ ورچوئل مشین جس پر ان پٹ ڈیٹا سٹریم تیار کیا جائے گا:
    • Kinesis ایجنٹ مشین پر مقامی طور پر نصب ایک جاوا ایپلی کیشن ہے جو Kinesis (Kinesis Data Streams یا Kinesis Firehose) کو ڈیٹا اکٹھا کرنے اور بھیجنے کا ایک آسان طریقہ فراہم کرتی ہے۔ ایجنٹ مخصوص ڈائریکٹریز میں فائلوں کے سیٹ کی مسلسل نگرانی کرتا ہے اور Kinesis کو نیا ڈیٹا بھیجتا ہے۔
    • API کالر اسکرپٹ - ایک ازگر کا اسکرپٹ جو API سے درخواست کرتا ہے اور جواب کو ایک فولڈر میں رکھتا ہے جس کی نگرانی Kinesis ایجنٹ کرتا ہے۔
  • Kinesis ڈیٹا اسٹریمز - وسیع پیمانے کی صلاحیتوں کے ساتھ ریئل ٹائم ڈیٹا سٹریمنگ سروس؛
  • Kinesis تجزیات ایک سرور لیس سروس ہے جو حقیقی وقت میں سٹریمنگ ڈیٹا کے تجزیہ کو آسان بناتی ہے۔ Amazon Kinesis Data Analytics ایپلیکیشن کے وسائل کو ترتیب دیتا ہے اور آنے والے ڈیٹا کے کسی بھی حجم کو سنبھالنے کے لیے خود بخود اسکیل کرتا ہے۔
  • او ڈبلیو ایس لامبڈا۔ — ایک ایسی خدمت جو آپ کو سرورز کو بیک اپ یا سیٹ اپ کیے بغیر کوڈ چلانے کی اجازت دیتی ہے۔ تمام کمپیوٹنگ پاور ہر کال کے لیے خود بخود سکیل ہو جاتی ہے۔
  • ایمیزون ڈائنومو ڈی بی - کلیدی قدر کے جوڑوں اور دستاویزات کا ایک ڈیٹا بیس جو کسی بھی پیمانے پر چلنے پر 10 ملی سیکنڈ سے کم تاخیر فراہم کرتا ہے۔ DynamoDB استعمال کرتے وقت، آپ کو کسی سرور کی فراہمی، پیچ یا انتظام کرنے کی ضرورت نہیں ہے۔ DynamoDB دستیاب وسائل کی مقدار کو ایڈجسٹ کرنے اور اعلی کارکردگی کو برقرار رکھنے کے لیے میزیں خود بخود ترازو کرتا ہے۔ سسٹم ایڈمنسٹریشن کی ضرورت نہیں ہے۔
  • ایمیزون ایس این ایس - پبلشر-سبسکرائبر (Pub/Sub) ماڈل کا استعمال کرتے ہوئے پیغامات بھیجنے کے لیے ایک مکمل طور پر منظم سروس، جس کے ساتھ آپ مائیکرو سروسز، تقسیم شدہ نظام اور سرور لیس ایپلی کیشنز کو الگ کر سکتے ہیں۔ موبائل پش نوٹیفیکیشنز، ایس ایم ایس پیغامات اور ای میلز کے ذریعے اختتامی صارفین کو معلومات بھیجنے کے لیے SNS کا استعمال کیا جا سکتا ہے۔

ابتدائی تربیت

ڈیٹا فلو کی تقلید کرنے کے لیے، میں نے Aviasales API کے ذریعے واپس کی گئی ایئر لائن ٹکٹ کی معلومات استعمال کرنے کا فیصلہ کیا۔ میں دستاویزات مختلف طریقوں کی کافی وسیع فہرست، آئیے ان میں سے ایک کو لیتے ہیں - "ماہانہ قیمت کیلنڈر"، جو مہینے کے ہر دن کی قیمتیں واپس کرتا ہے، منتقلیوں کی تعداد کے لحاظ سے گروپ کیا جاتا ہے۔ اگر آپ درخواست میں تلاش کے مہینے کی وضاحت نہیں کرتے ہیں، تو موجودہ مہینے کے بعد کے مہینے کے لیے معلومات واپس کر دی جائیں گی۔

تو آئیے رجسٹر کریں اور اپنا ٹوکن حاصل کریں۔

ایک مثال کی درخواست ذیل میں ہے:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

درخواست میں ٹوکن بتا کر API سے ڈیٹا حاصل کرنے کا مذکورہ بالا طریقہ کام کرے گا، لیکن میں ایکسیس ٹوکن کو ہیڈر کے ذریعے منتقل کرنے کو ترجیح دیتا ہوں، اس لیے ہم اس طریقہ کو api_caller.py اسکرپٹ میں استعمال کریں گے۔

نمونہ جواب:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

مندرجہ بالا API کے جواب کی مثال سینٹ پیٹرزبرگ سے فوک تک کا ٹکٹ دکھاتی ہے... اوہ، کیا خواب ہے...
چونکہ میں کازان سے ہوں، اور فوکٹ اب "صرف ایک خواب" ہے، آئیے سینٹ پیٹرزبرگ سے کازان کے ٹکٹ تلاش کرتے ہیں۔

یہ فرض کرتا ہے کہ آپ کے پاس پہلے سے ہی AWS اکاؤنٹ ہے۔ میں فوری طور پر اس حقیقت کی طرف خصوصی توجہ مبذول کرانا چاہتا ہوں کہ Kinesis اور SMS کے ذریعے اطلاعات بھیجنا سالانہ میں شامل نہیں ہیں۔ مفت درجے (مفت استعمال). لیکن اس کے باوجود، چند ڈالر کو ذہن میں رکھتے ہوئے، مجوزہ نظام کی تعمیر اور اس کے ساتھ کھیلنا بالکل ممکن ہے۔ اور، یقینا، تمام وسائل کو حذف کرنا نہ بھولیں جب ان کی مزید ضرورت نہ رہے۔

خوش قسمتی سے، DynamoDb اور lambda فنکشنز ہمارے لیے مفت ہوں گے اگر ہم اپنی ماہانہ مفت حدود کو پورا کرتے ہیں۔ مثال کے طور پر، DynamoDB کے لیے: 25 GB اسٹوریج، 25 WCU/RCU اور 100 ملین سوالات۔ اور ہر ماہ ایک ملین لیمبڈا فنکشن کالز۔

دستی نظام کی تعیناتی۔

Kinesis ڈیٹا اسٹریمز کو ترتیب دینا

آئیے Kinesis Data Streams سروس پر جائیں اور دو نئے سلسلے بنائیں، ہر ایک کے لیے ایک شارڈ۔

شارڈ کیا ہے؟
ایک شارڈ ایمیزون کینیسیس اسٹریم کا بنیادی ڈیٹا ٹرانسفر یونٹ ہے۔ ایک سیگمنٹ 1 MB/s کی رفتار سے ان پٹ ڈیٹا کی منتقلی اور 2 MB/s کی رفتار سے آؤٹ پٹ ڈیٹا کی منتقلی فراہم کرتا ہے۔ ایک سیگمنٹ فی سیکنڈ 1000 PUT اندراجات کو سپورٹ کرتا ہے۔ ڈیٹا اسٹریم بناتے وقت، آپ کو سیگمنٹس کی مطلوبہ تعداد بتانے کی ضرورت ہوتی ہے۔ مثال کے طور پر، آپ دو حصوں کے ساتھ ڈیٹا سٹریم بنا سکتے ہیں۔ یہ ڈیٹا سٹریم 2 MB/s پر ان پٹ ڈیٹا کی منتقلی اور 4 MB/s پر آؤٹ پٹ ڈیٹا ٹرانسفر فراہم کرے گا، جو فی سیکنڈ 2000 PUT ریکارڈز کو سپورٹ کرے گا۔

آپ کے سلسلے میں جتنے زیادہ شارڈز، اس کا تھرو پٹ اتنا ہی زیادہ ہوگا۔ اصولی طور پر، اس طرح بہاؤ کو پیمانہ بنایا جاتا ہے - شارڈز جوڑ کر۔ لیکن آپ کے پاس جتنے زیادہ شارڈز ہوں گے، قیمت اتنی ہی زیادہ ہوگی۔ ہر شارڈ کی قیمت 1,5 سینٹ فی گھنٹہ اور ہر ملین PUT پے لوڈ یونٹ کے لیے اضافی 1.4 سینٹ ہے۔

آئیے نام کے ساتھ ایک نیا سلسلہ بنائیں ایئر لائن_ٹکٹساس کے لیے 1 شارڈ کافی ہوگا:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اب نام کے ساتھ ایک اور تھریڈ بناتے ہیں۔ خصوصی_سٹریم:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

پروڈیوسر سیٹ اپ

کسی کام کا تجزیہ کرنے کے لیے، ڈیٹا پروڈیوسر کے طور پر باقاعدہ EC2 مثال استعمال کرنا کافی ہے۔ یہ ایک طاقتور، مہنگی ورچوئل مشین ہونا ضروری نہیں ہے؛ ایک سپاٹ t2.micro بالکل ٹھیک کرے گا۔

اہم نوٹ: مثال کے طور پر، آپ کو امیج استعمال کرنا چاہیے - Amazon Linux AMI 2018.03.0، اس میں Kinesis ایجنٹ کو تیزی سے لانچ کرنے کے لیے کم سیٹنگز ہیں۔

EC2 سروس پر جائیں، ایک نئی ورچوئل مشین بنائیں، t2.micro ٹائپ کے ساتھ مطلوبہ AMI منتخب کریں، جو کہ فری ٹائر میں شامل ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
نئی تخلیق شدہ ورچوئل مشین کے لیے Kinesis سروس کے ساتھ تعامل کرنے کے قابل ہونے کے لیے، اسے ایسا کرنے کے حقوق دیے جائیں۔ ایسا کرنے کا بہترین طریقہ IAM رول تفویض کرنا ہے۔ لہذا، مرحلہ 3: مثال کی تفصیلات کی اسکرین کو ترتیب دیں، آپ کو منتخب کرنا چاہئے۔ نیا IAM کردار بنائیں:

EC2 کے لیے IAM رول بنانا
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
کھلنے والی ونڈو میں، منتخب کریں کہ ہم EC2 کے لیے ایک نیا رول بنا رہے ہیں اور اجازت کے سیکشن میں جائیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
تربیتی مثال کا استعمال کرتے ہوئے، ہمیں وسائل کے حقوق کی دانے دار ترتیب کی تمام پیچیدگیوں میں جانے کی ضرورت نہیں ہے، اس لیے ہم Amazon کی طرف سے پہلے سے تشکیل شدہ پالیسیوں کا انتخاب کریں گے: AmazonKinesisFullAccess اور CloudWatchFullAccess۔

آئیے اس کردار کے لیے کچھ معنی خیز نام دیتے ہیں، مثال کے طور پر: EC2-KinesisStreams-FullAccess۔ نتیجہ وہی ہونا چاہئے جیسا کہ ذیل کی تصویر میں دکھایا گیا ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اس نئے کردار کو بنانے کے بعد، اسے تخلیق شدہ ورچوئل مشین کے ساتھ منسلک کرنا نہ بھولیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
ہم اس اسکرین پر کوئی اور چیز تبدیل نہیں کرتے ہیں اور اگلی ونڈوز پر چلے جاتے ہیں۔

ہارڈ ڈرائیو کی ترتیبات کو بطور ڈیفالٹ چھوڑا جا سکتا ہے، ساتھ ہی ٹیگز (اگرچہ ٹیگز استعمال کرنا اچھا عمل ہے، کم از کم مثال کو ایک نام دیں اور ماحول کی نشاندہی کریں)۔

اب ہم مرحلہ 6 پر ہیں: سیکیورٹی گروپ کو ترتیب دیں، جہاں آپ کو ایک نیا بنانا ہوگا یا اپنا موجودہ سیکیورٹی گروپ بتانا ہوگا، جو آپ کو ssh (پورٹ 22) کے ذریعے مثال سے جڑنے کی اجازت دیتا ہے۔ وہاں ماخذ -> میرا آئی پی منتخب کریں اور آپ مثال کو لانچ کرسکتے ہیں۔

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
جیسے ہی یہ رننگ اسٹیٹس میں بدل جاتا ہے، آپ ssh کے ذریعے اس سے جڑنے کی کوشش کر سکتے ہیں۔

Kinesis ایجنٹ کے ساتھ کام کرنے کے قابل ہونے کے لیے، مشین سے کامیابی کے ساتھ جڑنے کے بعد، آپ کو ٹرمینل میں درج ذیل کمانڈز داخل کرنا ہوں گی۔

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

آئیے API کے جوابات کو محفوظ کرنے کے لیے ایک فولڈر بنائیں:

sudo mkdir /var/log/airline_tickets

ایجنٹ کو شروع کرنے سے پہلے، آپ کو اس کی تشکیل کو ترتیب دینے کی ضرورت ہے:

sudo vim /etc/aws-kinesis/agent.json

agent.json فائل کے مواد کو اس طرح نظر آنا چاہئے:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

جیسا کہ کنفیگریشن فائل سے دیکھا جا سکتا ہے، ایجنٹ /var/log/airline_tickets/ ڈائرکٹری میں .log ایکسٹینشن کے ساتھ فائلوں کی نگرانی کرے گا، انہیں پارس کرے گا اور انہیں airline_tickets سٹریم میں منتقل کرے گا۔

ہم سروس کو دوبارہ شروع کرتے ہیں اور یقینی بناتے ہیں کہ یہ چل رہی ہے:

sudo service aws-kinesis-agent restart

اب آئیے Python اسکرپٹ ڈاؤن لوڈ کریں جو API سے ڈیٹا کی درخواست کرے گا:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py اسکرپٹ Aviasales سے ڈیٹا کی درخواست کرتی ہے اور موصول ہونے والے جواب کو ڈائرکٹری میں محفوظ کرتی ہے جسے Kinesis ایجنٹ اسکین کرتا ہے۔ اس اسکرپٹ کا نفاذ کافی معیاری ہے، یہاں ایک TicketsApi کلاس ہے، یہ آپ کو API کو متضاد طور پر کھینچنے کی اجازت دیتا ہے۔ ہم ٹوکن کے ساتھ ایک ہیڈر پاس کرتے ہیں اور اس کلاس سے پیرامیٹرز کی درخواست کرتے ہیں:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

ایجنٹ کی درست ترتیبات اور فعالیت کو جانچنے کے لیے، آئیے api_caller.py اسکرپٹ کو چلائیں:

sudo ./api_caller.py TOKEN

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اور ہم ایجنٹ لاگ اور airline_tickets ڈیٹا سٹریم میں مانیٹرنگ ٹیب میں کام کے نتیجے کو دیکھتے ہیں:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
جیسا کہ آپ دیکھ سکتے ہیں، سب کچھ کام کرتا ہے اور Kinesis ایجنٹ کامیابی کے ساتھ ڈیٹا کو سٹریم میں بھیجتا ہے۔ اب صارفین کو کنفیگر کرتے ہیں۔

Kinesis ڈیٹا تجزیات کو ترتیب دینا

آئیے پورے سسٹم کے مرکزی جز کی طرف چلتے ہیں - Kinesis Data Analytics میں kinesis_analytics_airlines_app کے نام سے ایک نئی ایپلیکیشن بنائیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Kinesis Data Analytics آپ کو SQL زبان کا استعمال کرتے ہوئے Kinesis Strems سے حقیقی وقت میں ڈیٹا کے تجزیات کرنے کی اجازت دیتا ہے۔ یہ مکمل طور پر آٹو اسکیلنگ سروس ہے (کائنیس اسٹریمز کے برعکس) جو کہ:

  1. ماخذ ڈیٹا کی درخواستوں کی بنیاد پر آپ کو نئے سلسلے (آؤٹ پٹ اسٹریم) بنانے کی اجازت دیتا ہے۔
  2. ایپلی کیشنز کے چلنے کے دوران پیش آنے والی غلطیوں کے ساتھ ایک سلسلہ فراہم کرتا ہے (Error Stream)؛
  3. ان پٹ ڈیٹا سکیم کا خود بخود تعین کر سکتا ہے (اگر ضروری ہو تو اسے دستی طور پر دوبارہ بیان کیا جا سکتا ہے)۔

یہ کوئی سستی سروس نہیں ہے - 0.11 USD فی گھنٹہ کام، لہذا آپ کو اسے احتیاط سے استعمال کرنا چاہیے اور جب آپ کام ختم کر لیں تو اسے حذف کر دیں۔

آئیے ایپلیکیشن کو ڈیٹا سورس سے جوڑتے ہیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
وہ سلسلہ منتخب کریں جس سے ہم رابطہ کرنے جا رہے ہیں (ایئر لائن_ٹکٹس):

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اگلا، آپ کو ایک نیا IAM رول منسلک کرنے کی ضرورت ہے تاکہ ایپلیکیشن سٹریم سے پڑھ سکے اور سٹریم پر لکھ سکے۔ ایسا کرنے کے لیے، رسائی کی اجازت کے بلاک میں کچھ بھی تبدیل نہ کرنا کافی ہے۔

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
آئیے اب اسٹریم میں ڈیٹا اسکیما کی دریافت کی درخواست کرتے ہیں؛ ایسا کرنے کے لیے، "Discover schema" بٹن پر کلک کریں۔ نتیجے کے طور پر، IAM رول کو اپ ڈیٹ کر دیا جائے گا (ایک نیا بنایا جائے گا) اور اسکیما کا پتہ لگانے کو اس ڈیٹا سے شروع کیا جائے گا جو پہلے ہی سٹریم میں آ چکا ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اب آپ کو ایس کیو ایل ایڈیٹر پر جانے کی ضرورت ہے۔ جب آپ اس بٹن پر کلک کریں گے تو ایک ونڈو نمودار ہوگی جو آپ سے ایپلیکیشن لانچ کرنے کے لیے کہے گی - آپ جو لانچ کرنا چاہتے ہیں اسے منتخب کریں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
ایس کیو ایل ایڈیٹر ونڈو میں درج ذیل آسان سوال داخل کریں اور ایس کیو ایل کو محفوظ کریں اور چلائیں پر کلک کریں:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

متعلقہ ڈیٹا بیس میں، آپ ریکارڈز کو شامل کرنے کے لیے INSERT سٹیٹمنٹس کا استعمال کرتے ہوئے ٹیبلز کے ساتھ کام کرتے ہیں اور ڈیٹا کو استفسار کرنے کے لیے SELECT سٹیٹمنٹ کا استعمال کرتے ہیں۔ Amazon Kinesis Data Analytics میں، آپ اسٹریمز (STREAMs) اور پمپس (PUMPs) کے ساتھ کام کرتے ہیں—مسلسل داخل کرنے کی درخواستیں جو کسی ایپلیکیشن میں ایک اسٹریم سے ڈیٹا کو دوسرے اسٹریم میں داخل کرتی ہیں۔

اوپر پیش کردہ SQL استفسار پانچ ہزار روبل سے کم قیمت پر ایروفلوٹ ٹکٹوں کی تلاش کرتا ہے۔ ان شرائط کو پورا کرنے والے تمام ریکارڈز DESTINATION_SQL_STREAM سلسلہ میں رکھے جائیں گے۔

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
ڈیسٹینیشن بلاک میں اسپیشل_سٹریم اسٹریم کو منتخب کریں، اور ان ایپلیکیشن اسٹریم نام DESTINATION_SQL_STREAM ڈراپ ڈاؤن فہرست میں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
تمام ہیرا پھیری کا نتیجہ ذیل کی تصویر کی طرح کچھ ہونا چاہئے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

SNS موضوع بنانا اور سبسکرائب کرنا

سادہ نوٹیفکیشن سروس پر جائیں اور وہاں ایئر لائنز کے نام کے ساتھ ایک نیا موضوع بنائیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اس موضوع کو سبسکرائب کریں اور اس موبائل فون نمبر کی نشاندہی کریں جس پر SMS اطلاعات بھیجی جائیں گی:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

DynamoDB میں ایک ٹیبل بنائیں

ان کی airline_tickets سٹریم سے خام ڈیٹا ذخیرہ کرنے کے لیے، آئیے اسی نام کے ساتھ DynamoDB میں ایک ٹیبل بنائیں۔ ہم بنیادی کلید کے طور پر record_id استعمال کریں گے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

لیمبڈا فنکشن کلیکٹر بنانا

آئیے کلکٹر کے نام سے ایک لیمبڈا فنکشن بناتے ہیں، جس کا کام airline_tickets سٹریم کو پول کرنا ہوگا اور، اگر وہاں نئے ریکارڈز پائے جاتے ہیں، تو ان ریکارڈز کو DynamoDB ٹیبل میں داخل کریں۔ ظاہر ہے، پہلے سے طے شدہ حقوق کے علاوہ، اس لیمبڈا کے پاس Kinesis ڈیٹا سٹریم تک پڑھنے اور DynamoDB تک لکھنے تک رسائی ہونی چاہیے۔

کلکٹر لیمبڈا فنکشن کے لیے ایک IAM رول بنانا
پہلے، آئیے لیمبڈا کے لیے ایک نیا IAM رول بنائیں جس کا نام Lambda-TicketsProcessingRole ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
ٹیسٹ کی مثال کے لیے، پہلے سے تشکیل شدہ AmazonKinesisReadOnlyAccess اور AmazonDynamoDBFullAccess پالیسیاں کافی موزوں ہیں، جیسا کہ ذیل کی تصویر میں دکھایا گیا ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

اس لیمبڈا کو Kinesis کے ٹرگر کے ذریعے لانچ کیا جانا چاہیے جب نئی اندراجات airline_stream میں داخل ہوں، اس لیے ہمیں ایک نیا ٹرگر شامل کرنے کی ضرورت ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
جو کچھ باقی ہے وہ کوڈ داخل کرنا اور لیمبڈا کو محفوظ کرنا ہے۔

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

لیمبڈا فنکشن نوٹیفائر بنانا

دوسرا لیمبڈا فنکشن، جو دوسرے سلسلے (خصوصی_سٹریم) کی نگرانی کرے گا اور SNS کو اطلاع بھیجے گا، اسی طرح بنایا گیا ہے۔ اس لیے، اس لیمبڈا کو Kinesis سے پڑھنے اور ایک دیئے گئے SNS موضوع پر پیغامات بھیجنے کے لیے رسائی ہونی چاہیے، جو SNS سروس کے ذریعے اس موضوع کے تمام صارفین (ای میل، ایس ایم ایس، وغیرہ) کو بھیجی جائے گی۔

ایک IAM کردار بنانا
سب سے پہلے، ہم اس لیمبڈا کے لیے IAM رول Lambda-KinesisAlarm بناتے ہیں، اور پھر اس کردار کو الارم_notifier lambda کے لیے تفویض کرتے ہیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

اس لیمبڈا کو اسپیشل_سٹریم میں داخل ہونے کے لیے نئے ریکارڈز کے لیے ٹرگر پر کام کرنا چاہیے، اس لیے آپ کو ٹرگر کو اسی طرح کنفیگر کرنا ہوگا جیسا کہ ہم نے کلکٹر لیمبڈا کے لیے کیا تھا۔

اس لیمبڈا کو کنفیگر کرنا آسان بنانے کے لیے، آئیے ایک نیا ماحولیاتی متغیر متعارف کراتے ہیں - TOPIC_ARN، جہاں ہم ANR (Amazon Recourse Names) کو ایئر لائنز کے موضوع پر رکھتے ہیں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اور لیمبڈا کوڈ داخل کریں، یہ بالکل بھی پیچیدہ نہیں ہے:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

ایسا لگتا ہے کہ یہ وہ جگہ ہے جہاں دستی نظام کی ترتیب مکمل ہوئی ہے۔ جو کچھ باقی ہے وہ جانچنا اور یقینی بنانا ہے کہ ہم نے ہر چیز کو صحیح طریقے سے ترتیب دیا ہے۔

Terraform کوڈ سے تعینات کریں۔

مطلوبہ تیاری

ٹرافیفار کوڈ سے انفراسٹرکچر کی تعیناتی کے لیے ایک بہت ہی آسان اوپن سورس ٹول ہے۔ اس کا اپنا نحو ہے جو سیکھنا آسان ہے اور اس میں بہت سی مثالیں ہیں کہ کس طرح اور کس چیز کو تعینات کرنا ہے۔ ایٹم ایڈیٹر یا ویژول اسٹوڈیو کوڈ میں بہت سے آسان پلگ ان ہیں جو Terraform کے ساتھ کام کرنا آسان بناتے ہیں۔

آپ ڈسٹری بیوشن ڈاؤن لوڈ کر سکتے ہیں۔ اس وجہ سے. تمام Terraform صلاحیتوں کا تفصیلی تجزیہ اس مضمون کے دائرہ کار سے باہر ہے، اس لیے ہم خود کو اہم نکات تک محدود رکھیں گے۔

کیسے شروع کریں

پروجیکٹ کا مکمل کوڈ یہ ہے۔ میرے ذخیرے میں. ہم اپنے پاس ذخیرہ کلون کرتے ہیں۔ شروع کرنے سے پہلے، آپ کو یہ یقینی بنانا ہوگا کہ آپ کے پاس AWS CLI انسٹال اور کنفیگر ہے، کیونکہ... Terraform ~/.aws/credentials فائل میں اسناد تلاش کرے گا۔

ایک اچھا عمل یہ ہے کہ پورے انفراسٹرکچر کو تعینات کرنے سے پہلے پلان کمانڈ کو چلائیں تاکہ یہ دیکھا جا سکے کہ Terraform اس وقت ہمارے لیے کلاؤڈ میں کیا بنا رہا ہے:

terraform.exe plan

آپ کو اطلاعات بھیجنے کے لیے ایک فون نمبر درج کرنے کے لیے کہا جائے گا۔ اس مرحلے میں داخل ہونا ضروری نہیں ہے۔

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
پروگرام کے آپریشن پلان کا تجزیہ کرنے کے بعد، ہم وسائل بنانا شروع کر سکتے ہیں:

terraform.exe apply

اس کمانڈ کو بھیجنے کے بعد، آپ سے دوبارہ فون نمبر درج کرنے کے لیے کہا جائے گا؛ جب اصل میں اعمال انجام دینے کے بارے میں کوئی سوال ظاہر ہوتا ہے تو "ہاں" ڈائل کریں۔ یہ آپ کو پورے انفراسٹرکچر کو ترتیب دینے، EC2 کے تمام ضروری کنفیگریشن کو انجام دینے، لیمبڈا فنکشنز کو تعینات کرنے کی اجازت دے گا۔

ٹیرافارم کوڈ کے ذریعے تمام وسائل کو کامیابی کے ساتھ تخلیق کرنے کے بعد، آپ کو Kinesis Analytics ایپلیکیشن کی تفصیلات میں جانے کی ضرورت ہے (بدقسمتی سے، مجھے یہ نہیں ملا کہ یہ براہ راست کوڈ سے کیسے کریں)۔

ایپلیکیشن لانچ کریں:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اس کے بعد، آپ کو ڈراپ ڈاؤن فہرست میں سے منتخب کرکے واضح طور پر ان ایپلیکیشن اسٹریم کا نام سیٹ کرنا ہوگا:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
اب سب کچھ جانے کے لیے تیار ہے۔

درخواست کی جانچ کر رہا ہے۔

اس سے قطع نظر کہ آپ نے سسٹم کو دستی طور پر یا Terraform کوڈ کے ذریعے کیسے تعینات کیا، یہ وہی کام کرے گا۔

ہم SSH کے ذریعے EC2 ورچوئل مشین میں لاگ ان ہوتے ہیں جہاں Kinesis ایجنٹ انسٹال ہوتا ہے اور api_caller.py اسکرپٹ چلاتے ہیں۔

sudo ./api_caller.py TOKEN

آپ کو بس اپنے نمبر پر ایک SMS کا انتظار کرنا ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
ایس ایم ایس - فون پر پیغام تقریباً 1 منٹ میں آتا ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام
یہ دیکھنا باقی ہے کہ آیا ریکارڈز DynamoDB ڈیٹا بیس میں بعد کے، مزید تفصیلی تجزیے کے لیے محفوظ کیے گئے تھے۔ airline_tickets ٹیبل تقریباً درج ذیل ڈیٹا پر مشتمل ہے:

Amazon Kinesis اور سرور کے بغیر سادگی کے ساتھ Aviasales API کا انضمام

حاصل يہ ہوا

کئے گئے کام کے دوران، ایک آن لائن ڈیٹا پروسیسنگ سسٹم ایمیزون کنیسس پر مبنی بنایا گیا تھا۔ ایس کیو ایل کمانڈز کا استعمال کرتے ہوئے Kinesis ڈیٹا اسٹریمز اور ریئل ٹائم اینالیٹکس Kinesis Analytics کے ساتھ مل کر Kinesis ایجنٹ استعمال کرنے کے اختیارات کے ساتھ ساتھ دیگر AWS سروسز کے ساتھ Amazon Kinesis کے تعامل پر غور کیا گیا۔

ہم نے مندرجہ بالا نظام کو دو طریقوں سے متعین کیا: ایک طویل دستی والا اور ٹیرافارم کوڈ سے فوری۔

تمام پروجیکٹ سورس کوڈ دستیاب ہے۔ میرے GitHub ذخیرے میں، میرا مشورہ ہے کہ آپ خود کو اس سے واقف کر لیں۔

میں مضمون پر بحث کرنے میں خوش ہوں، میں آپ کے تبصروں کا منتظر ہوں۔ مجھے تعمیری تنقید کی امید ہے۔

میں تمہارے لیے کامیابی چاہتا ہوں!

ماخذ: www.habr.com

نیا تبصرہ شامل کریں