Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

اي حبر!

ڇا توهان پرواز ڪرڻ وارا هوائي جهاز پسند ڪندا آهيو؟ مون کي اهو پسند آهي، پر خود اڪيلائي دوران مون کي هڪ مشهور وسيلن کان هوائي ٽڪيٽن تي ڊيٽا جو تجزيو ڪرڻ سان پڻ پيار ٿي ويو - Aviasales.

اڄ اسان Amazon Kinesis جي ڪم جو تجزيو ڪنداسين، ريئل ٽائيم اينالائيٽڪس سان اسٽريمنگ سسٽم ٺاھينداسين، Amazon DynamoDB NoSQL ڊيٽابيس کي بنيادي ڊيٽا اسٽوريج طور انسٽال ڪنداسين، ۽ دلچسپ ٽڪيٽن لاءِ ايس ايم ايس نوٽيفڪيشن قائم ڪنداسين.

سڀ تفصيل کٽ هيٺ آهن! وڃ!

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

تعارف

مثال طور، اسان کي رسائي جي ضرورت آهي Aviasales API. ان تائين رسائي مفت ۽ بغير ڪنهن پابندي جي فراهم ڪئي وئي آهي؛ توهان کي صرف "ڊولپرز" سيڪشن ۾ رجسٽر ڪرڻ جي ضرورت آهي ڊيٽا تائين رسائي حاصل ڪرڻ لاءِ توهان جي API ٽوڪن حاصل ڪرڻ لاءِ.

هن آرٽيڪل جو بنيادي مقصد AWS ۾ معلومات جي اسٽريمنگ جي استعمال جي عام سمجهاڻي ڏيڻ آهي؛ اسان انهي ڳالهه کي ڌيان ۾ رکون ٿا ته استعمال ٿيل API طرفان واپس ڪيل ڊيٽا سختي سان تازه ڪاري نه آهي ۽ ڪيش مان منتقل ٿيل آهي، جيڪا گذريل 48 ڪلاڪن تائين Aviasales.ru ۽ Jetradar.com سائيٽن جي استعمال ڪندڙن جي ڳولا جي بنياد تي ٺاهي وئي.

Kinesis-agent، پيداوار واري مشين تي نصب ٿيل، API ذريعي وصول ڪيو ويندو خودڪار طريقي سان پارس ڪندو ۽ ڊيٽا کي منتقل ڪندو مطلوب اسٽريم تي Kinesis Data Analytics ذريعي. هن وهڪري جو خام نسخو سڌو اسٽور ڏانهن لکيو ويندو. DynamoDB ۾ رکيل خام ڊيٽا اسٽوريج BI ٽولز، جهڙوڪ AWS Quick Sight ذريعي گہرا ٽڪيٽ تجزيو ڪرڻ جي اجازت ڏيندو.

اسان سڄي زيربنا کي ترتيب ڏيڻ لاء ٻن اختيارن تي غور ڪنداسين:

  • دستياب - AWS مئنيجمينٽ ڪنسول ذريعي؛
  • Terraform ڪوڊ مان انفراسٽرڪچر سست آٽوميٽرز لاءِ آهي.

ترقي يافته نظام جو فن تعمير

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
استعمال ٿيل اجزاء:

  • Aviasales API - هن API پاران واپس ڪيل ڊيٽا سڀني ايندڙ ڪم لاءِ استعمال ڪئي ويندي؛
  • EC2 پروسيسر مثال - بادل ۾ هڪ باقاعده ورچوئل مشين جنهن تي ان پٽ ڊيٽا اسٽريم ٺاهي ويندي:
    • Kinesis ايجنٽ هڪ جاوا ايپليڪيشن آهي جيڪا مقامي طور تي مشين تي نصب ڪئي وئي آهي جيڪا ڊيٽا گڏ ڪرڻ ۽ موڪلڻ جو آسان طريقو مهيا ڪري ٿي Kinesis (Kinesis Data Streams يا Kinesis Firehose). ايجنٽ مسلسل مخصوص ڊائريڪٽرن ۾ فائلن جي هڪ سيٽ جي نگراني ڪري ٿو ۽ Kinesis ڏانهن نئين ڊيٽا موڪلي ٿو؛
    • API ڪالر اسڪرپٽ - هڪ پٿون اسڪرپٽ جيڪو API کي درخواستون ڏئي ٿو ۽ جواب کي فولڊر ۾ رکي ٿو جيڪو Kinesis ايجنٽ جي نگراني ڪري ٿو.
  • Kinesis ڊيٽا اسٽريمز - حقيقي وقت ڊيٽا اسٽريمنگ سروس وسيع اسڪيلنگ صلاحيتن سان؛
  • Kinesis تجزياتي هڪ بي سرور سروس آهي جيڪا حقيقي وقت ۾ اسٽريمنگ ڊيٽا جي تجزيو کي آسان بڻائي ٿي. Amazon Kinesis Data Analytics ايپليڪيشن وسيلن کي ترتيب ڏئي ٿو ۽ ايندڙ ڊيٽا جي ڪنهن به مقدار کي سنڀالڻ لاءِ خودڪار طريقي سان ماپ ڪري ٿو.
  • AWS Lambda - هڪ خدمت جيڪا توهان کي اجازت ڏئي ٿي ڪوڊ هلائڻ جي بغير بيڪ اپ ڪرڻ يا سرور قائم ڪرڻ جي. سڀ ڪمپيوٽنگ پاور هر ڪال لاء خودڪار طريقي سان ماپ ڪئي وئي آهي.
  • Amazon DynamoDB - اهم-قدر جوڑوں ۽ دستاويزن جو هڪ ڊيٽابيس جيڪو 10 مليسيڪنڊن کان گهٽ جي دير فراهم ڪري ٿو جڏهن ڪنهن به پيماني تي هلندي. جڏهن DynamoDB استعمال ڪندي، توهان کي ڪنهن به سرور جي روزي، پيچ، يا انتظام ڪرڻ جي ضرورت ناهي. DynamoDB دستياب وسيلن جي مقدار کي ترتيب ڏيڻ ۽ اعلي ڪارڪردگي کي برقرار رکڻ لاء خودڪار طور تي ٽيبل اسڪيل ڪري ٿو. ڪو به سسٽم انتظاميه جي ضرورت ناهي؛
  • ايم ڊي ايس ايس ايس ايس - پبلشر-سبسڪرائبر (Pub/Sub) ماڊل استعمال ڪندي پيغام موڪلڻ لاءِ مڪمل طور تي منظم ڪيل سروس، جنهن سان توهان مائڪرو سروسز، ورهايل سسٽم ۽ سرور کان سواءِ ايپليڪيشنن کي الڳ ڪري سگهو ٿا. SNS استعمال ڪري سگھجي ٿو معلومات موڪلڻ لاءِ آخري صارفين کي موبائل پُش اطلاعن، ايس ايم ايس پيغامن ۽ اي ميلن ذريعي.

ابتدائي تربيت

ڊيٽا جي وهڪري کي نقل ڪرڻ لاءِ، مون Aviasales API پاران واپس ڪيل ايئر لائن ٽڪيٽ جي معلومات کي استعمال ڪرڻ جو فيصلو ڪيو. IN دستاويز مختلف طريقن جي ڪافي وسيع فهرست، اچو ته انھن مان ھڪڙو وٺون - "مهيني قيمت ڪئلينڊر"، جيڪو مھيني جي ھر ڏينھن لاء قيمتون واپس ڪري ٿو، منتقلي جي تعداد جي لحاظ سان گروپ ڪيو ويو آھي. جيڪڏهن توهان درخواست ۾ ڳولا جو مهينو نه ٻڌايو، معلومات موجوده مهيني جي پٺيان ايندڙ مهيني لاءِ واپس ڪئي ويندي.

تنهن ڪري، اچو ته رجسٽر ٿيو ۽ اسان جي ٽوڪن حاصل ڪريو.

هڪ مثال هيٺ ڏنل درخواست آهي:

http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_API

درخواست ۾ ٽوڪن جي وضاحت ڪندي API کان ڊيٽا حاصل ڪرڻ جو مٿيون طريقو ڪم ڪندو، پر مان ترجيح ڏيان ٿو رسائي ٽوڪن کي هيڊر ذريعي، ان ڪري اسان هي طريقو استعمال ڪنداسين api_caller.py اسڪرپٽ ۾.

جواب مثال:

{{
   "success":true,
   "data":[{
      "show_to_affiliates":true,
      "trip_class":0,
      "origin":"LED",
      "destination":"HKT",
      "depart_date":"2015-10-01",
      "return_date":"",
      "number_of_changes":1,
      "value":29127,
      "found_at":"2015-09-24T00:06:12+04:00",
      "distance":8015,
      "actual":true
   }]
}

مثال API جواب مٿي ڏيکاري ٿو هڪ ٽڪيٽ سينٽ پيٽرسبرگ کان فوڪ تائين... اوه، ڪهڙو خواب آهي...
جيئن ته مان قازان کان آهيان، ۽ فوڪيٽ هاڻي ”صرف هڪ خواب“ آهي، اچو ته سينٽ پيٽرسبرگ کان ڪازان تائين ٽڪيٽون ڳوليون.

اهو فرض ڪري ٿو ته توهان وٽ اڳ ۾ ئي AWS اڪائونٽ آهي. مان فوري طور تي ان حقيقت تي خاص ڌيان ڏيڻ چاهيان ٿو ته Kinesis ۽ ايس ايم ايس ذريعي اطلاع موڪلڻ سالياني ۾ شامل نه آهن. مفت ٽائر (مفت استعمال). پر ان جي باوجود، ذهن ۾ ڪجهه ڊالرن سان، اهو ممڪن آهي ته تجويز ڪيل نظام کي تعمير ڪرڻ ۽ ان سان راند ڪرڻ. ۽، يقينا، سڀني وسيلن کي ختم ڪرڻ نه وساريو جڏهن اهي وڌيڪ گهربل نه آهن.

خوشقسمتيءَ سان، DynamoDb ۽ lambda فنڪشن اسان لاءِ مفت هوندا جيڪڏهن اسان پنهنجي مهيني جي مفت حدن کي پورا ڪريون. مثال طور، DynamoDB لاءِ: 25 GB اسٽوريج، 25 WCU/RCU ۽ 100 ملين سوال. ۽ هر مهيني هڪ ملين ليمبڊا فنڪشن ڪالون.

دستي نظام لڳائڻ

ڪينيسس ڊيٽا اسٽريمز کي ترتيب ڏيڻ

اچو ته ڪينيسس ڊيٽا اسٽريمز سروس تي وڃو ۽ ٻه نوان اسٽريم ٺاهي، هر هڪ لاءِ هڪ شارڊ.

شارڊ ڇا آهي؟
هڪ شارڊ هڪ Amazon Kinesis وهڪرو جو بنيادي ڊيٽا جي منتقلي يونٽ آهي. ھڪڙو ڀاڱو 1 MB / s جي رفتار تي ان پٽ ڊيٽا جي منتقلي ۽ 2 MB / s جي رفتار تي ڊيٽا جي ٻاھرين منتقلي مهيا ڪري ٿو. ھڪڙو ڀاڱو 1000 PUT داخلا في سيڪنڊ تائين سپورٽ ڪري ٿو. جڏهن ڊيٽا اسٽريم ٺاهيندي، توهان کي گهربل تعداد جي حصن جي وضاحت ڪرڻ جي ضرورت آهي. مثال طور، توهان ٺاهي سگهو ٿا هڪ ڊيٽا وهڪرو ٻن حصن سان. هي ڊيٽا اسٽريم 2 MB/s تي ان پٽ ڊيٽا جي منتقلي ۽ 4 MB/s تي ڊيٽا جي ٻاھران منتقلي فراهم ڪندو، 2000 PUT ريڪارڊ في سيڪنڊ تائين سپورٽ ڪندو.

توهان جي وهڪري ۾ وڌيڪ شارڊز، ان جي ذريعي وڌو. اصولي طور تي، وهڪري کي اسڪيل ڪيو ويندو آهي - شارڊ شامل ڪندي. پر توهان وٽ وڌيڪ شارڊز آهن، قيمت وڌيڪ. هر شارڊ جي قيمت 1,5 سينٽ في ڪلاڪ ۽ اضافي 1.4 سينٽ هر ملين PUT پيل لوڊ يونٽن لاءِ.

اچو ته نالي سان هڪ نئون وهڪرو ٺاهيو ايئر لائن_ٽڪيٽونهن لاءِ 1 شرڊ ڪافي هوندو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
هاڻي اچو ته نالي سان هڪ ٻيو ٿريڊ ٺاهيو special_stream:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

ٺاهيندڙ سيٽ اپ

ھڪڙي ڪم جو تجزيو ڪرڻ لاء، اھو ڪافي آھي ھڪڙو باقاعده EC2 مثال استعمال ڪرڻ لاء ڊيٽا پروسيسر جي طور تي. اهو هڪ طاقتور، قيمتي مجازي مشين نه هجڻ ضروري آهي؛ هڪ جڳهه t2.micرو ٺيڪ ڪندو.

اھم نوٽ: مثال طور، توھان کي استعمال ڪرڻ گھرجي تصوير - Amazon Linux AMI 2018.03.0، ھن ۾ گھٽ سيٽنگون آھن جلدي لانچ ڪرڻ لاءِ ڪنيسس ايجنٽ.

EC2 سروس ڏانھن وڃو، ھڪڙي نئين ورچوئل مشين ٺاھيو، گھربل AMI چونڊيو t2.micro ٽائپ سان، جيڪو مفت ٽائر ۾ شامل آھي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
نئين ٺاهيل ورچوئل مشين لاءِ ڪنيسس سروس سان لهه وچڙ ڪرڻ جي قابل ٿي وڃڻ لاءِ، ان کي ائين ڪرڻ جا حق ڏنا وڃن. اهو ڪرڻ جو بهترين طريقو هڪ IAM ڪردار تفويض ڪرڻ آهي. تنهن ڪري، قدم 3 تي: مثال جي تفصيلات اسڪرين کي ترتيب ڏيو، توهان کي چونڊڻ گهرجي نئون IAM ڪردار ٺاهيو:

EC2 لاءِ IAM ڪردار ٺاهڻ
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ونڊو ۾ جيڪو کلي ٿو، چونڊيو ته اسان EC2 لاءِ نئون ڪردار ٺاهي رهيا آهيون ۽ وڃو اجازتون سيڪشن:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
تربيتي مثال کي استعمال ڪندي، اسان کي وسيلن جي حقن جي گرينولر ترتيب جي سڀني پيچيدگين ۾ وڃڻ جي ضرورت ناهي، تنهنڪري اسان Amazon پاران اڳ ۾ ترتيب ڏنل پاليسين کي چونڊينداسين: AmazonKinesisFullAccess ۽ CloudWatchFullAccess.

اچو ته هن ڪردار کي ڪجهه معنيٰ وارو نالو ڏيون، مثال طور: EC2-KinesisStreams-FullAccess. نتيجو ساڳيو هجڻ گهرجي جيئن هيٺ ڏنل تصوير ۾ ڏيکاريل آهي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ھن نئين ڪردار کي ٺاھڻ کان پوء، ان کي ٺاھيل ورچوئل مشين مثال سان ڳنڍڻ نه وساريو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
اسان هن اسڪرين تي ٻيو ڪجهه به تبديل نه ڪندا آهيون ۽ ايندڙ ونڊوز ڏانهن وڃو.

هارڊ ڊرائيو سيٽنگن کي ڊفالٽ طور ڇڏي سگھجي ٿو، گڏوگڏ ٽيگ (جيتوڻيڪ اهو ٽيگ استعمال ڪرڻ سٺو عمل آهي، گهٽ ۾ گهٽ مثال کي نالو ڏيو ۽ ماحول جي نشاندهي ڪريو).

ھاڻي اسان آھيون مرحلا 6: ترتيب ڏيو سيڪيورٽي گروپ ٽئب، جتي توھان کي ھڪڙو نئون ٺاھڻ جي ضرورت آھي يا پنھنجي موجوده سيڪيورٽي گروپ کي بيان ڪرڻ جي ضرورت آھي، جيڪا توھان کي اجازت ڏئي ٿي ssh (port 22) ذريعي ڳنڍڻ جي مثال سان. ماخذ چونڊيو -> منهنجو IP اتي ۽ توهان مثال لانچ ڪري سگهو ٿا.

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
جيترو جلدي اهو هلندڙ حالت ڏانهن سوئچ ڪري ٿو، توهان ان سان ڳنڍڻ جي ڪوشش ڪري سگهو ٿا ssh ذريعي.

Kinesis ايجنٽ سان ڪم ڪرڻ جي قابل ٿيڻ لاء، ڪاميابيء سان مشين سان ڳنڍڻ کان پوء، توھان کي ھيٺ ڏنل حڪمن کي ٽرمينل ۾ داخل ڪرڻ گھرجي:

sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent

اچو ته API جوابن کي محفوظ ڪرڻ لاءِ فولڊر ٺاھيون:

sudo mkdir /var/log/airline_tickets

ايجنٽ کي شروع ڪرڻ کان پهريان، توهان کي ان جي ترتيب کي ترتيب ڏيڻ جي ضرورت آهي:

sudo vim /etc/aws-kinesis/agent.json

agent.json فائل جو مواد هن طرح ڏسڻ گهرجي:

{
  "cloudwatch.emitMetrics": true,
  "kinesis.endpoint": "",
  "firehose.endpoint": "",

  "flows": [
    {
      "filePattern": "/var/log/airline_tickets/*log",
      "kinesisStream": "airline_tickets",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
         {
            "optionName": "CSVTOJSON",
            "customFieldNames": ["cost","trip_class","show_to_affiliates",
                "return_date","origin","number_of_changes","gate","found_at",
                "duration","distance","destination","depart_date","actual","record_id"]
         }
      ]
    }
  ]
}

جيئن ترتيب واري فائل مان ڏسي سگھجي ٿو، ايجنٽ /var/log/airline_tickets/ ڊاريڪٽري ۾ .log ايڪسٽينشن سان فائلن جي نگراني ڪندو، انھن کي پارس ڪندو ۽ انھن کي airline_tickets اسٽريم ڏانھن منتقل ڪندو.

اسان سروس ٻيهر شروع ڪريون ٿا ۽ پڪ ڪريو ته اهو مٿي ۽ هلندڙ آهي:

sudo service aws-kinesis-agent restart

ھاڻي اچو ته پائٿون اسڪرپٽ ڊائون لوڊ ڪريو جيڪو API کان ڊيٽا جي درخواست ڪندو:

REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer

wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt

api_caller.py اسڪرپٽ Aviasales کان ڊيٽا جي درخواست ڪري ٿو ۽ حاصل ڪيل جواب کي ڊاريڪٽري ۾ محفوظ ڪري ٿو جيڪا Kinesis ايجنٽ اسڪين ڪري ٿي. هن اسڪرپٽ تي عمل درآمد ڪافي معياري آهي، اتي هڪ TicketsApi ڪلاس آهي، اهو توهان کي اجازت ڏئي ٿو ته غير مطابقت سان API کي ڇڪي. اسان هڪ ٽوڪن سان هيڊر پاس ڪريون ٿا ۽ هن ڪلاس لاءِ پيٽرول جي درخواست ڪريون ٿا:

class TicketsApi:
    """Api caller class."""

    def __init__(self, headers):
        """Init method."""
        self.base_url = BASE_URL
        self.headers = headers

    async def get_data(self, data):
        """Get the data from API query."""
        response_json = {}
        async with ClientSession(headers=self.headers) as session:
            try:
                response = await session.get(self.base_url, data=data)
                response.raise_for_status()
                LOGGER.info('Response status %s: %s',
                            self.base_url, response.status)
                response_json = await response.json()
            except HTTPError as http_err:
                LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
            except Exception as err:
                LOGGER.error('Oops! An error ocurred: %s', str(err))
            return response_json


def prepare_request(api_token):
    """Return the headers and query fot the API request."""
    headers = {'X-Access-Token': api_token,
               'Accept-Encoding': 'gzip'}

    data = FormData()
    data.add_field('currency', CURRENCY)
    data.add_field('origin', ORIGIN)
    data.add_field('destination', DESTINATION)
    data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
    data.add_field('trip_duration', TRIP_DURATION)
    return headers, data


async def main():
    """Get run the code."""
    if len(sys.argv) != 2:
        print('Usage: api_caller.py <your_api_token>')
        sys.exit(1)
        return
    api_token = sys.argv[1]
    headers, data = prepare_request(api_token)

    api = TicketsApi(headers)
    response = await api.get_data(data)
    if response.get('success', None):
        LOGGER.info('API has returned %s items', len(response['data']))
        try:
            count_rows = log_maker(response)
            LOGGER.info('%s rows have been saved into %s',
                        count_rows,
                        TARGET_FILE)
        except Exception as e:
            LOGGER.error('Oops! Request result was not saved to file. %s',
                         str(e))
    else:
        LOGGER.error('Oops! API request was unsuccessful %s!', response)

ايجنٽ جي صحيح سيٽنگون ۽ ڪارڪردگي کي جانچڻ لاءِ، اچو ته آزمايون api_caller.py اسڪرپٽ کي:

sudo ./api_caller.py TOKEN

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
۽ اسان ڏسون ٿا ڪم جا نتيجا ايجنٽ لاگز ۾ ۽ مانيٽرنگ ٽيب تي ايئر لائن_ٽڪٽس ڊيٽا اسٽريم ۾:

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
جئين توهان ڏسي سگهو ٿا، هر شي ڪم ڪري ٿي ۽ ڪنيسس ايجنٽ ڪاميابيء سان ڊيٽا کي وهڪرو ڏانهن موڪلي ٿو. هاڻي اچو ته صارف کي ترتيب ڏيو.

Kinesis ڊيٽا اينالائيٽڪس کي ترتيب ڏيڻ

اچو ته سڄي سسٽم جي مرڪزي حصي ڏانهن وڃو - Kinesis_analytics_airlines_app نالي Kinesis Data Analytics ۾ هڪ نئين ايپليڪيشن ٺاهيو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Kinesis ڊيٽا اينالائيٽڪس توهان کي اجازت ڏئي ٿو حقيقي وقت ڊيٽا اينالائيٽڪس کي انجام ڏيڻ جي Kinesis اسٽريمز مان SQL ٻولي استعمال ڪندي. اها هڪ مڪمل طور تي خودڪار اسڪيلنگ سروس آهي (ڪنيسس اسٽريمز جي برعڪس) جيڪا:

  1. توهان کي اجازت ڏئي ٿو نئين اسٽريمز (آئوٽ پٽ اسٽريم) ٺاهڻ جي بنياد تي درخواستن جي بنياد تي ماخذ ڊيٽا؛
  2. غلطين سان گڏ ھڪڙو وهڪرو مهيا ڪري ٿو جيڪو واقع ٿيو جڏھن ايپليڪيشنون ھلنديون ھيون (Error Stream)؛
  3. خودڪار طور تي ان پٽ ڊيٽا اسڪيم جو اندازو لڳائي سگھي ٿو (جيڪڏھن ضروري هجي ته دستي طور تي ٻيهر بيان ڪري سگھجي ٿو).

هي ڪا سستي خدمت ناهي - 0.11 USD في ڪلاڪ ڪم، تنهنڪري توهان کي ان کي احتياط سان استعمال ڪرڻ گهرجي ۽ ختم ٿيڻ کان پوءِ ختم ڪرڻ گهرجي.

اچو ته ايپليڪيشن کي ڊيٽا جي ماخذ سان ڳنڍيون:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
اهو وهڪرو چونڊيو جنهن سان اسان ڳنڍڻ وارا آهيون (airline_tickets):

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
اڳيون، توهان کي هڪ نئون IAM ڪردار ڳنڍڻ جي ضرورت آهي ته جيئن ايپليڪيشن اسٽريم مان پڙهي ۽ اسٽريم ڏانهن لکي سگهي. هن کي ڪرڻ لاء، اهو ڪافي آهي ته رسائي جي اجازت واري بلاڪ ۾ ڪجھ به تبديل نه ڪريو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
هاڻي اچو ته اسٽريم ۾ ڊيٽا اسڪيما جي دريافت جي درخواست ڪريون؛ هن کي ڪرڻ لاءِ، "اسڪيما دريافت ڪريو" بٽڻ تي ڪلڪ ڪريو. نتيجي طور، IAM ڪردار کي اپڊيٽ ڪيو ويندو (هڪ نئون ٺاهيو ويندو) ۽ اسڪيما ڳولڻ شروع ڪيو ويندو ڊيٽا مان جيڪو اڳ ۾ ئي وهڪرو ۾ اچي چڪو آهي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
هاڻي توهان کي SQL ايڊيٽر ڏانهن وڃڻ جي ضرورت آهي. جڏهن توهان هن بٽڻ تي ڪلڪ ڪندا، هڪ ونڊو ظاهر ٿيندي جيڪا توهان کي ايپليڪيشن لانچ ڪرڻ لاءِ پڇندي - چونڊيو جيڪو توهان لانچ ڪرڻ چاهيو ٿا:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
SQL ايڊيٽر ونڊو ۾ ھيٺ ڏنل سادو سوال داخل ڪريو ۽ ڪلڪ ڪريو محفوظ ڪريو ۽ SQL هلائڻ:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
    and "gate" = 'Aeroflot';

تعلقي ڊيٽابيس ۾، توهان جدولن سان ڪم ڪريو ٿا INSERT بيان استعمال ڪندي رڪارڊ شامل ڪرڻ لاءِ ۽ هڪ SELECT بيان ڊيٽا سوال ڪرڻ لاءِ. Amazon Kinesis ڊيٽا اينالائيٽڪس ۾، توهان اسٽريمز (اسٽريم) ۽ پمپس (PUMPs) سان ڪم ڪريو ٿا - مسلسل داخل ڪرڻ جون درخواستون جيڪي هڪ اسٽريم مان ڊيٽا کي ايپليڪيشن ۾ ٻئي اسٽريم ۾ داخل ڪن ٿيون.

مٿي پيش ڪيل SQL سوال Aeroflot ٽڪيٽن لاءِ ڳولي ٿو پنج هزار روبل کان گھٽ قيمت تي. سڀئي رڪارڊ جيڪي انهن شرطن کي پورا ڪن ٿا، DESTINATION_SQL_STREAM اسٽريم ۾ رکيا ويندا.

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
منزل بلاڪ ۾، خاص_اسٽريم اسٽريم کي چونڊيو، ۽ ان-ايپليڪيشن اسٽريم جو نالو DESTINATION_SQL_STREAM ڊراپ-ڊائون لسٽ ۾:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
سڀني manipulations جو نتيجو هيٺ ڏنل تصوير وانگر ڪجهه هجڻ گهرجي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

SNS موضوع ٺاهڻ ۽ رڪنيت حاصل ڪرڻ

سادو نوٽيفڪيشن سروس ڏانهن وڃو ۽ اتي ايئر لائنز جي نالي سان ھڪڙو نئون موضوع ٺاھيو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ھن موضوع تي رڪنيت حاصل ڪريو ۽ موبائل فون نمبر ڏيکاريو جنھن تي ايس ايم ايس نوٽيفڪيشن موڪليا ويندا:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

DynamoDB ۾ ٽيبل ٺاهيو

انهن جي airline_tickets اسٽريم مان خام ڊيٽا محفوظ ڪرڻ لاءِ، اسان ساڳي نالي سان DynamoDB ۾ هڪ ٽيبل ٺاهينداسين. اسان رڪارڊ_id کي بنيادي ڪيئي طور استعمال ڪنداسين:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

Lambda فنڪشن ڪليڪٽر ٺاهڻ

اچو ته ڪليڪٽر نالي هڪ ليمبڊا فنڪشن ٺاهيو، جنهن جو ڪم هوندو airline_tickets اسٽريم کي پول ڪرڻ ۽، جيڪڏهن نوان رڪارڊ مليا آهن، انهن رڪارڊ کي DynamoDB ٽيبل ۾ داخل ڪريو. ظاهر آهي، ڊفالٽ حقن کان علاوه، هي ليمبڊا ضرور پڙهڻ جي رسائي هوندي Kinesis ڊيٽا اسٽريم تائين ۽ لکڻ جي رسائي DynamoDB تائين.

ڪليڪٽر ليمبڊا فنڪشن لاءِ IAM ڪردار ٺاهڻ
پهرين، اچو ته ليمبڊا لاءِ هڪ نئون IAM ڪردار ٺاهيو جنهن جو نالو Lambda-TicketsProcessingRole:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ٽيسٽ جي مثال لاءِ، اڳ ۾ ترتيب ڏنل AmazonKinesisReadOnlyAccess ۽ AmazonDynamoDBFullAccess پاليسيون بلڪل موزون آهن، جيئن هيٺ ڏنل تصوير ۾ ڏيکاريل آهي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

هي ليمبڊا ڪائنيسس کان ٽرگر ذريعي شروع ڪيو وڃي جڏهن نيون داخلائون ايئر لائن_ اسٽريم ۾ داخل ٿين، تنهنڪري اسان کي هڪ نئون ٽرگر شامل ڪرڻ جي ضرورت آهي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
اهو سڀ ڪجهه رهي ٿو ڪوڊ کي پيسٽ ڪرڻ ۽ لامبڊا کي محفوظ ڪرڻ.

"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal

DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'

class TicketsParser:
    """Parsing info from the Stream."""

    def __init__(self, table_name, records):
        """Init method."""
        self.table = DYNAMO_DB.Table(table_name)
        self.json_data = TicketsParser.get_json_data(records)

    @staticmethod
    def get_json_data(records):
        """Return deserialized data from the stream."""
        decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
                                for record in records])
        json_data = ([json.loads(decoded_record)
                      for decoded_record in decoded_record_data])
        return json_data

    @staticmethod
    def get_item_from_json(json_item):
        """Pre-process the json data."""
        new_item = {
            'record_id': json_item.get('record_id'),
            'cost': Decimal(json_item.get('cost')),
            'trip_class': json_item.get('trip_class'),
            'show_to_affiliates': json_item.get('show_to_affiliates'),
            'origin': json_item.get('origin'),
            'number_of_changes': int(json_item.get('number_of_changes')),
            'gate': json_item.get('gate'),
            'found_at': json_item.get('found_at'),
            'duration': int(json_item.get('duration')),
            'distance': int(json_item.get('distance')),
            'destination': json_item.get('destination'),
            'depart_date': json_item.get('depart_date'),
            'actual': json_item.get('actual')
        }
        return new_item

    def run(self):
        """Batch insert into the table."""
        with self.table.batch_writer() as batch_writer:
            for item in self.json_data:
                dynamodb_item = TicketsParser.get_item_from_json(item)
                batch_writer.put_item(dynamodb_item)

        print('Has been added ', len(self.json_data), 'items')

def lambda_handler(event, context):
    """Parse the stream and insert into the DynamoDB table."""
    print('Got event:', event)
    parser = TicketsParser(TABLE_NAME, event['Records'])
    parser.run()

هڪ lambda فنڪشن نوٽيفڪيشن ٺاهڻ

ٻيو لامبڊا فنڪشن، جيڪو سيڪنڊ اسٽريم (special_stream) جي نگراني ڪندو ۽ SNS ڏانهن نوٽيفڪيشن موڪليندو، ساڳئي طرح ٺهيل آهي. تنهن ڪري، هن lambda کي Kinesis کان پڙهڻ لاءِ رسائي حاصل ڪرڻ گهرجي ۽ ڏنل SNS موضوع تي پيغام موڪلڻ لاءِ، جيڪو پوءِ SNS سروس طرفان موڪليو ويندو هن موضوع جي سڀني رڪنن کي (اي ميل، ايس ايم ايس، وغيره).

IAM ڪردار ٺاهڻ
پهرين، اسان هن لامبڊا لاءِ IAM ڪردار Lambda-KinesisAlarm ٺاهيندا آهيون، ۽ پوءِ هي ڪردار تفويض ڪريون ٿا الارم_notifier lambda لاءِ:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

ھن ليمبڊا کي خاص_اسٽريم ۾ داخل ٿيڻ لاءِ نئين رڪارڊ لاءِ ٽرگر تي ڪم ڪرڻ گھرجي، تنھنڪري توھان کي ٽريگر کي ساڳيءَ طرح ترتيب ڏيڻو پوندو جيئن اسان ڪليڪٽر ليمبڊا لاءِ ڪيو آھي.

هن ليمبڊا کي ترتيب ڏيڻ کي آسان بڻائڻ لاءِ، اچو ته هڪ نئون ماحوليات وارو متغير متعارف ڪرايو - TOPIC_ARN، جتي اسان ANR (Amazon Recourse Names) کي ايئر لائنز جي موضوع تي رکون ٿا:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
۽ lambda ڪوڊ داخل ڪريو، اهو سڀ ڪجهه پيچيده ناهي:

import boto3
import base64
import os

SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']


def lambda_handler(event, context):
    try:
        SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
                           Message='Hi! I have found an interesting stuff!',
                           Subject='Airline tickets alarm')
        print('Alarm message has been successfully delivered')
    except Exception as err:
        print('Delivery failure', str(err))

اهو لڳي ٿو ته هي آهي جتي دستياب سسٽم جي ترتيب مڪمل ڪئي وئي آهي. اهو سڀ ڪجهه رهي ٿو جانچڻ ۽ پڪ ڪرڻ لاءِ ته اسان هر شي کي صحيح ترتيب ڏني آهي.

Terraform ڪوڊ مان ترتيب ڏيو

گهربل تياري

ٽرافيف ڪوڊ مان انفراسٽرڪچر کي ترتيب ڏيڻ لاءِ هڪ تمام آسان اوپن سورس اوزار آهي. ان جو پنهنجو نحو آهي جيڪو سکڻ آسان آهي ۽ ان جا ڪيترائي مثال آهن ته ڪيئن ۽ ڪهڙي ترتيب ڏيڻ. Atom ايڊيٽر يا Visual Studio Code وٽ ڪيترائي مددگار پلگ ان آھن جيڪي Terraform سان ڪم ڪرڻ آسان بڻائين ٿا.

توھان ڊائون لوڊ ڪري سگھو ٿا تقسيم هتي کان. سڀني Terraform صلاحيتن جو تفصيلي تجزيو هن مضمون جي دائري کان ٻاهر آهي، تنهنڪري اسان پاڻ کي بنيادي نقطي تائين محدود ڪنداسين.

ڪيئن شروع ڪجي

پروجيڪٽ جو مڪمل ڪوڊ آهي منهنجي مخزن ۾. اسان پاڻ وٽ مخزن کي ڪلون ڪريون ٿا. شروع ڪرڻ کان پهريان، توهان کي پڪ ڪرڻ جي ضرورت آهي ته توهان وٽ AWS CLI نصب ۽ ترتيب ڏنل آهي، ڇاڪاڻ ته ... Terraform ~/.aws/credentials فائل ۾ سندون ڳوليندو.

هڪ سٺو عمل اهو آهي ته پلان ڪمانڊ کي هلائڻ کان اڳ سڄي انفراسٽرڪچر کي ترتيب ڏيڻ لاءِ ڏسو ته ڇا Terraform هن وقت اسان لاءِ ڪلائوڊ ۾ ٺاهي رهيو آهي:

terraform.exe plan

توهان کي اطلاع موڪلڻ لاءِ فون نمبر داخل ڪرڻ لاءِ چيو ويندو. ان کي هن مرحلي ۾ داخل ڪرڻ ضروري نه آهي.

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
پروگرام جي آپريشن پلان جو تجزيو ڪرڻ بعد، اسان وسيلا ٺاهڻ شروع ڪري سگھون ٿا:

terraform.exe apply

هن حڪم موڪلڻ کان پوء، توهان کي ٻيهر چيو ويندو هڪ فون نمبر داخل ڪرڻ لاء؛ ڊائل "ها" جڏهن هڪ سوال اصل ۾ عمل ڪرڻ بابت ڏيکاريل آهي. اهو توهان کي مڪمل انفراسٽرڪچر کي ترتيب ڏيڻ جي اجازت ڏيندو، EC2 جي سڀني ضروري ترتيبن کي کڻڻ، لامبڊا افعال کي ترتيب ڏيڻ، وغيره.

سڀ وسيلا ڪاميابيءَ سان Terraform ڪوڊ ذريعي ٺاهيا ويا، توهان کي Kinesis Analytics ايپليڪيشن جي تفصيل ۾ وڃڻ جي ضرورت آهي (بدقسمتي سان، مون کي اهو نه مليو ته اهو سڌو سنئون ڪوڊ مان ڪيئن ڪجي).

ايپليڪيشن لانچ ڪريو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ان کان پوء، توهان کي واضح طور تي ان-ايپليڪيشن اسٽريم جو نالو مقرر ڪرڻ گهرجي ڊراپ-ڊائون لسٽ مان چونڊيو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
هاڻي سڀڪنھن شيء کي وڃڻ لاء تيار آهي.

ايپليڪيشن جي جانچ ڪندي

قطع نظر ته توهان سسٽم کي ڪيئن لڳايو، دستي طور تي يا Terraform ڪوڊ ذريعي، اهو ساڳيو ڪم ڪندو.

اسان لاگ ان ٿيو SSH ذريعي EC2 ورچوئل مشين ۾ جتي Kinesis ايجنٽ انسٽال ٿيل آهي ۽ api_caller.py اسڪرپٽ هلائيندا آهيون

sudo ./api_caller.py TOKEN

توهان کي صرف اهو ڪرڻو آهي ته توهان جي نمبر تي هڪ ايس ايم ايس جو انتظار ڪريو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
ايس ايم ايس - هڪ پيغام توهان جي فون تي تقريبا 1 منٽ ۾ اچي ٿو:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام
اهو ڏسڻ ۾ اچي ٿو ته ڇا رڪارڊ محفوظ ڪيا ويا هئا DynamoDB ڊيٽابيس ۾ ايندڙ، وڌيڪ تفصيلي تجزيو لاء. airline_tickets جدول لڳ ڀڳ هيٺين ڊيٽا تي مشتمل آهي:

Aviasales API Amazon Kinesis ۽ بي سرور سادگي سان انضمام

ٿڪل

ڪم جي دوران، هڪ آن لائن ڊيٽا پروسيسنگ سسٽم ٺاهي وئي هئي Amazon Kinesis جي بنياد تي. Kinesis ايجنٽ استعمال ڪرڻ جا اختيار ڪائنسس ڊيٽا اسٽريمز ۽ حقيقي وقت اينالائيٽڪس Kinesis تجزياتي سان گڏ SQL حڪمن کي استعمال ڪندي، گڏوگڏ ٻين AWS خدمتن سان Amazon Kinesis جي رابطي تي غور ڪيو ويو.

اسان مٿي ڏنل سسٽم کي ٻن طريقن سان ترتيب ڏنو: هڪ بلڪه ڊگھو دستي هڪ ۽ هڪ تڪڙو ٽيرافارم ڪوڊ مان.

سڀ پروجيڪٽ سورس ڪوڊ موجود آهي منهنجي GitHub مخزن ۾، مان صلاح ڏيان ٿو ته توهان پاڻ کي ان سان واقف ڪيو.

مان مضمون تي بحث ڪرڻ لاء خوش آهيان، مان توهان جي تبصرن جو منتظر آهيان. مون کي تعميري تنقيد جي اميد آهي.

مان توهان کي ڪاميابي چاهيان ٿو!

جو ذريعو: www.habr.com

تبصرو شامل ڪريو